summaryrefslogtreecommitdiff
path: root/services/strategy-engine
diff options
context:
space:
mode:
authorTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 17:46:09 +0900
committerTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 17:46:09 +0900
commit678005dc51892c4c1f4cea2730bbf0ec4ebc312d (patch)
treeac6477f99cfe771810b406ef692e9d1887cf035d /services/strategy-engine
parent51fd92c5ebb7adddcfee0586b70a9894d1ac0a06 (diff)
fix(strategy-engine): process multiple symbols concurrently with asyncio.gather
Diffstat (limited to 'services/strategy-engine')
-rw-r--r--services/strategy-engine/src/strategy_engine/main.py39
-rw-r--r--services/strategy-engine/tests/test_multi_symbol.py66
2 files changed, 87 insertions, 18 deletions
diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py
index 9f3b3c9..efd79e9 100644
--- a/services/strategy-engine/src/strategy_engine/main.py
+++ b/services/strategy-engine/src/strategy_engine/main.py
@@ -17,29 +17,32 @@ from strategy_engine.plugin_loader import load_strategies
STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies"
+async def process_symbol(engine: StrategyEngine, stream: str, log) -> None:
+ """Process candles for a single symbol stream."""
+ last_id = "$"
+ log.info("engine_loop_start", stream=stream)
+ while True:
+ last_id = await engine.process_once(stream, last_id)
+
+
async def run() -> None:
config = StrategyConfig()
log = setup_logging("strategy-engine", config.log_level, config.log_format)
metrics = ServiceMetrics("strategy_engine")
+
notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id
+ bot_token=config.telegram_bot_token,
+ chat_id=config.telegram_chat_id,
)
broker = RedisBroker(config.redis_url)
+ strategies = load_strategies(STRATEGIES_DIR)
- strategies_dir = STRATEGIES_DIR
- strategies = load_strategies(strategies_dir)
-
- # Configure each strategy with params from config
for strategy in strategies:
params = config.strategy_params.get(strategy.name, {})
strategy.configure(params)
- log.info(
- "strategies_loaded",
- count=len(strategies),
- names=[s.name for s in strategies],
- )
+ log.info("loaded_strategies", count=len(strategies), names=[s.name for s in strategies])
engine = StrategyEngine(broker=broker, strategies=strategies)
@@ -50,22 +53,22 @@ async def run() -> None:
await health.start()
metrics.service_up.labels(service="strategy-engine").set(1)
+ tasks = []
try:
for symbol in config.symbols:
stream = f"candles.{symbol.replace('/', '_')}"
- last_id = "$"
- log.info("engine_loop_started", stream=stream)
-
- while True:
- last_id = await engine.process_once(stream, last_id)
- metrics.events_processed.labels(
- service="strategy-engine", event_type="candle"
- ).inc()
+ task = asyncio.create_task(process_symbol(engine, stream, log))
+ tasks.append(task)
+
+ # Wait for all symbol processors (they run forever until cancelled)
+ await asyncio.gather(*tasks)
except Exception as exc:
log.error("fatal_error", error=str(exc))
await notifier.send_error(str(exc), "strategy-engine")
raise
finally:
+ for task in tasks:
+ task.cancel()
metrics.service_up.labels(service="strategy-engine").set(0)
await notifier.close()
await broker.close()
diff --git a/services/strategy-engine/tests/test_multi_symbol.py b/services/strategy-engine/tests/test_multi_symbol.py
new file mode 100644
index 0000000..2008c15
--- /dev/null
+++ b/services/strategy-engine/tests/test_multi_symbol.py
@@ -0,0 +1,66 @@
+"""Test that strategy engine processes multiple symbols concurrently."""
+import asyncio
+import sys
+from pathlib import Path
+from unittest.mock import AsyncMock, MagicMock
+
+import pytest
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
+sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
+
+from strategy_engine.engine import StrategyEngine
+from shared.events import CandleEvent
+from shared.models import Candle, OrderSide
+from decimal import Decimal
+from datetime import datetime, timezone
+
+
+@pytest.mark.asyncio
+async def test_engine_processes_multiple_streams():
+ """Verify engine can process candles from different streams."""
+ broker = AsyncMock()
+
+ candle_btc = Candle(
+ symbol="BTCUSDT", timeframe="1m",
+ open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
+ open=Decimal("50000"), high=Decimal("51000"),
+ low=Decimal("49000"), close=Decimal("50000"),
+ volume=Decimal("10"),
+ )
+ candle_eth = Candle(
+ symbol="ETHUSDT", timeframe="1m",
+ open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
+ open=Decimal("3000"), high=Decimal("3100"),
+ low=Decimal("2900"), close=Decimal("3000"),
+ volume=Decimal("10"),
+ )
+
+ btc_events = [CandleEvent(data=candle_btc).to_dict()]
+ eth_events = [CandleEvent(data=candle_eth).to_dict()]
+
+ # First call returns BTC event, second ETH, then empty
+ call_count = {"btc": 0, "eth": 0}
+
+ async def mock_read(stream, **kwargs):
+ if "BTC" in stream:
+ call_count["btc"] += 1
+ return btc_events if call_count["btc"] == 1 else []
+ elif "ETH" in stream:
+ call_count["eth"] += 1
+ return eth_events if call_count["eth"] == 1 else []
+ return []
+
+ broker.read = AsyncMock(side_effect=mock_read)
+
+ strategy = MagicMock()
+ strategy.on_candle = MagicMock(return_value=None)
+
+ engine = StrategyEngine(broker=broker, strategies=[strategy])
+
+ # Process both streams
+ await engine.process_once("candles.BTCUSDT", "$")
+ await engine.process_once("candles.ETHUSDT", "$")
+
+ # Strategy should have been called with both candles
+ assert strategy.on_candle.call_count == 2