1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
"""Strategy Engine: consumes candle events and publishes signals."""
import logging
from shared.broker import RedisBroker
from shared.events import CandleEvent, SignalEvent, Event
from strategies.base import BaseStrategy
logger = logging.getLogger(__name__)
class StrategyEngine:
def __init__(self, broker: RedisBroker, strategies: list[BaseStrategy]) -> None:
self._broker = broker
self._strategies = strategies
async def process_once(self, stream: str, last_id: str) -> str:
"""Read one batch of messages from the stream, process candles, publish signals.
Returns the updated last_id for the next call.
"""
messages = await self._broker.read(stream, last_id=last_id, count=10, block=100)
for raw in messages:
try:
event = Event.from_dict(raw)
except Exception as exc:
logger.warning("Failed to parse event: %s – %s", raw, exc)
continue
if not isinstance(event, CandleEvent):
continue
candle = event.data
for strategy in self._strategies:
try:
signal = strategy.on_candle(candle)
except Exception as exc:
logger.error("Strategy %s raised on candle: %s", strategy.name, exc)
continue
if signal is not None:
signal_event = SignalEvent(data=signal)
await self._broker.publish("signals", signal_event.to_dict())
logger.info(
"Signal published: strategy=%s symbol=%s side=%s",
signal.strategy,
signal.symbol,
signal.side,
)
return last_id
|