summaryrefslogtreecommitdiff
path: root/services/strategy-engine/src/strategy_engine/engine.py
blob: 4b2c468443d25a6d7710ad742df7411d1c5b0884 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
"""Strategy Engine: consumes candle events and publishes signals."""

import logging

from strategies.base import BaseStrategy

from shared.broker import RedisBroker
from shared.events import CandleEvent, Event, SignalEvent

logger = logging.getLogger(__name__)


class StrategyEngine:
    def __init__(self, broker: RedisBroker, strategies: list[BaseStrategy]) -> None:
        self._broker = broker
        self._strategies = strategies

    async def process_once(self, stream: str, last_id: str) -> str:
        """Read one batch of messages from the stream, process candles, publish signals.

        Returns the updated last_id for the next call.
        """
        messages = await self._broker.read(stream, last_id=last_id, count=10, block=100)

        for raw in messages:
            try:
                event = Event.from_dict(raw)
            except Exception as exc:
                logger.warning("Failed to parse event: %s - %s", raw, exc)
                continue

            if not isinstance(event, CandleEvent):
                continue

            candle = event.data
            for strategy in self._strategies:
                try:
                    signal = strategy.on_candle(candle)
                except Exception as exc:
                    logger.error("Strategy %s raised on candle: %s", strategy.name, exc)
                    continue

                if signal is not None:
                    signal_event = SignalEvent(data=signal)
                    await self._broker.publish("signals", signal_event.to_dict())
                    logger.info(
                        "Signal published: strategy=%s symbol=%s side=%s",
                        signal.strategy,
                        signal.symbol,
                        signal.side,
                    )

        return last_id