diff options
Diffstat (limited to 'services/strategy-engine/src/strategy_engine/engine.py')
| -rw-r--r-- | services/strategy-engine/src/strategy_engine/engine.py | 54 |
1 files changed, 54 insertions, 0 deletions
diff --git a/services/strategy-engine/src/strategy_engine/engine.py b/services/strategy-engine/src/strategy_engine/engine.py new file mode 100644 index 0000000..09dbf65 --- /dev/null +++ b/services/strategy-engine/src/strategy_engine/engine.py @@ -0,0 +1,54 @@ +"""Strategy Engine: consumes candle events and publishes signals.""" +import logging + +from shared.broker import RedisBroker +from shared.events import CandleEvent, SignalEvent, Event + +from strategies.base import BaseStrategy + +logger = logging.getLogger(__name__) + + +class StrategyEngine: + def __init__(self, broker: RedisBroker, strategies: list[BaseStrategy]) -> None: + self._broker = broker + self._strategies = strategies + + async def process_once(self, stream: str, last_id: str) -> str: + """Read one batch of messages from the stream, process candles, publish signals. + + Returns the updated last_id for the next call. + """ + messages = await self._broker.read(stream, last_id=last_id, count=10, block=100) + + for raw in messages: + try: + event = Event.from_dict(raw) + except Exception as exc: + logger.warning("Failed to parse event: %s – %s", raw, exc) + continue + + if not isinstance(event, CandleEvent): + continue + + candle = event.data + for strategy in self._strategies: + try: + signal = strategy.on_candle(candle) + except Exception as exc: + logger.error( + "Strategy %s raised on candle: %s", strategy.name, exc + ) + continue + + if signal is not None: + signal_event = SignalEvent(data=signal) + await self._broker.publish("signals", signal_event.to_dict()) + logger.info( + "Signal published: strategy=%s symbol=%s side=%s", + signal.strategy, + signal.symbol, + signal.side, + ) + + return last_id |
