"""Strategy Engine: consumes candle events and publishes signals.""" import logging from shared.broker import RedisBroker from shared.events import CandleEvent, SignalEvent, Event from strategies.base import BaseStrategy logger = logging.getLogger(__name__) class StrategyEngine: def __init__(self, broker: RedisBroker, strategies: list[BaseStrategy]) -> None: self._broker = broker self._strategies = strategies async def process_once(self, stream: str, last_id: str) -> str: """Read one batch of messages from the stream, process candles, publish signals. Returns the updated last_id for the next call. """ messages = await self._broker.read(stream, last_id=last_id, count=10, block=100) for raw in messages: try: event = Event.from_dict(raw) except Exception as exc: logger.warning("Failed to parse event: %s – %s", raw, exc) continue if not isinstance(event, CandleEvent): continue candle = event.data for strategy in self._strategies: try: signal = strategy.on_candle(candle) except Exception as exc: logger.error( "Strategy %s raised on candle: %s", strategy.name, exc ) continue if signal is not None: signal_event = SignalEvent(data=signal) await self._broker.publish("signals", signal_event.to_dict()) logger.info( "Signal published: strategy=%s symbol=%s side=%s", signal.strategy, signal.symbol, signal.side, ) return last_id