diff options
| author | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-01 17:46:09 +0900 |
|---|---|---|
| committer | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-01 17:46:09 +0900 |
| commit | 678005dc51892c4c1f4cea2730bbf0ec4ebc312d (patch) | |
| tree | ac6477f99cfe771810b406ef692e9d1887cf035d /services/strategy-engine | |
| parent | 51fd92c5ebb7adddcfee0586b70a9894d1ac0a06 (diff) | |
fix(strategy-engine): process multiple symbols concurrently with asyncio.gather
Diffstat (limited to 'services/strategy-engine')
| -rw-r--r-- | services/strategy-engine/src/strategy_engine/main.py | 39 | ||||
| -rw-r--r-- | services/strategy-engine/tests/test_multi_symbol.py | 66 |
2 files changed, 87 insertions, 18 deletions
diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py index 9f3b3c9..efd79e9 100644 --- a/services/strategy-engine/src/strategy_engine/main.py +++ b/services/strategy-engine/src/strategy_engine/main.py @@ -17,29 +17,32 @@ from strategy_engine.plugin_loader import load_strategies STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies" +async def process_symbol(engine: StrategyEngine, stream: str, log) -> None: + """Process candles for a single symbol stream.""" + last_id = "$" + log.info("engine_loop_start", stream=stream) + while True: + last_id = await engine.process_once(stream, last_id) + + async def run() -> None: config = StrategyConfig() log = setup_logging("strategy-engine", config.log_level, config.log_format) metrics = ServiceMetrics("strategy_engine") + notifier = TelegramNotifier( - bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id + bot_token=config.telegram_bot_token, + chat_id=config.telegram_chat_id, ) broker = RedisBroker(config.redis_url) + strategies = load_strategies(STRATEGIES_DIR) - strategies_dir = STRATEGIES_DIR - strategies = load_strategies(strategies_dir) - - # Configure each strategy with params from config for strategy in strategies: params = config.strategy_params.get(strategy.name, {}) strategy.configure(params) - log.info( - "strategies_loaded", - count=len(strategies), - names=[s.name for s in strategies], - ) + log.info("loaded_strategies", count=len(strategies), names=[s.name for s in strategies]) engine = StrategyEngine(broker=broker, strategies=strategies) @@ -50,22 +53,22 @@ async def run() -> None: await health.start() metrics.service_up.labels(service="strategy-engine").set(1) + tasks = [] try: for symbol in config.symbols: stream = f"candles.{symbol.replace('/', '_')}" - last_id = "$" - log.info("engine_loop_started", stream=stream) - - while True: - last_id = await engine.process_once(stream, last_id) - metrics.events_processed.labels( - service="strategy-engine", event_type="candle" - ).inc() + task = asyncio.create_task(process_symbol(engine, stream, log)) + tasks.append(task) + + # Wait for all symbol processors (they run forever until cancelled) + await asyncio.gather(*tasks) except Exception as exc: log.error("fatal_error", error=str(exc)) await notifier.send_error(str(exc), "strategy-engine") raise finally: + for task in tasks: + task.cancel() metrics.service_up.labels(service="strategy-engine").set(0) await notifier.close() await broker.close() diff --git a/services/strategy-engine/tests/test_multi_symbol.py b/services/strategy-engine/tests/test_multi_symbol.py new file mode 100644 index 0000000..2008c15 --- /dev/null +++ b/services/strategy-engine/tests/test_multi_symbol.py @@ -0,0 +1,66 @@ +"""Test that strategy engine processes multiple symbols concurrently.""" +import asyncio +import sys +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src")) +sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + +from strategy_engine.engine import StrategyEngine +from shared.events import CandleEvent +from shared.models import Candle, OrderSide +from decimal import Decimal +from datetime import datetime, timezone + + +@pytest.mark.asyncio +async def test_engine_processes_multiple_streams(): + """Verify engine can process candles from different streams.""" + broker = AsyncMock() + + candle_btc = Candle( + symbol="BTCUSDT", timeframe="1m", + open_time=datetime(2025, 1, 1, tzinfo=timezone.utc), + open=Decimal("50000"), high=Decimal("51000"), + low=Decimal("49000"), close=Decimal("50000"), + volume=Decimal("10"), + ) + candle_eth = Candle( + symbol="ETHUSDT", timeframe="1m", + open_time=datetime(2025, 1, 1, tzinfo=timezone.utc), + open=Decimal("3000"), high=Decimal("3100"), + low=Decimal("2900"), close=Decimal("3000"), + volume=Decimal("10"), + ) + + btc_events = [CandleEvent(data=candle_btc).to_dict()] + eth_events = [CandleEvent(data=candle_eth).to_dict()] + + # First call returns BTC event, second ETH, then empty + call_count = {"btc": 0, "eth": 0} + + async def mock_read(stream, **kwargs): + if "BTC" in stream: + call_count["btc"] += 1 + return btc_events if call_count["btc"] == 1 else [] + elif "ETH" in stream: + call_count["eth"] += 1 + return eth_events if call_count["eth"] == 1 else [] + return [] + + broker.read = AsyncMock(side_effect=mock_read) + + strategy = MagicMock() + strategy.on_candle = MagicMock(return_value=None) + + engine = StrategyEngine(broker=broker, strategies=[strategy]) + + # Process both streams + await engine.process_once("candles.BTCUSDT", "$") + await engine.process_once("candles.ETHUSDT", "$") + + # Strategy should have been called with both candles + assert strategy.on_candle.call_count == 2 |
