summaryrefslogtreecommitdiff
path: root/services/strategy-engine/src/strategy_engine/engine.py
diff options
context:
space:
mode:
Diffstat (limited to 'services/strategy-engine/src/strategy_engine/engine.py')
-rw-r--r--services/strategy-engine/src/strategy_engine/engine.py54
1 files changed, 54 insertions, 0 deletions
diff --git a/services/strategy-engine/src/strategy_engine/engine.py b/services/strategy-engine/src/strategy_engine/engine.py
new file mode 100644
index 0000000..09dbf65
--- /dev/null
+++ b/services/strategy-engine/src/strategy_engine/engine.py
@@ -0,0 +1,54 @@
+"""Strategy Engine: consumes candle events and publishes signals."""
+import logging
+
+from shared.broker import RedisBroker
+from shared.events import CandleEvent, SignalEvent, Event
+
+from strategies.base import BaseStrategy
+
+logger = logging.getLogger(__name__)
+
+
+class StrategyEngine:
+ def __init__(self, broker: RedisBroker, strategies: list[BaseStrategy]) -> None:
+ self._broker = broker
+ self._strategies = strategies
+
+ async def process_once(self, stream: str, last_id: str) -> str:
+ """Read one batch of messages from the stream, process candles, publish signals.
+
+ Returns the updated last_id for the next call.
+ """
+ messages = await self._broker.read(stream, last_id=last_id, count=10, block=100)
+
+ for raw in messages:
+ try:
+ event = Event.from_dict(raw)
+ except Exception as exc:
+ logger.warning("Failed to parse event: %s – %s", raw, exc)
+ continue
+
+ if not isinstance(event, CandleEvent):
+ continue
+
+ candle = event.data
+ for strategy in self._strategies:
+ try:
+ signal = strategy.on_candle(candle)
+ except Exception as exc:
+ logger.error(
+ "Strategy %s raised on candle: %s", strategy.name, exc
+ )
+ continue
+
+ if signal is not None:
+ signal_event = SignalEvent(data=signal)
+ await self._broker.publish("signals", signal_event.to_dict())
+ logger.info(
+ "Signal published: strategy=%s symbol=%s side=%s",
+ signal.strategy,
+ signal.symbol,
+ signal.side,
+ )
+
+ return last_id