summaryrefslogtreecommitdiff
path: root/services/strategy-engine/src/strategy_engine
diff options
context:
space:
mode:
Diffstat (limited to 'services/strategy-engine/src/strategy_engine')
-rw-r--r--services/strategy-engine/src/strategy_engine/main.py39
1 files changed, 21 insertions, 18 deletions
diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py
index 9f3b3c9..efd79e9 100644
--- a/services/strategy-engine/src/strategy_engine/main.py
+++ b/services/strategy-engine/src/strategy_engine/main.py
@@ -17,29 +17,32 @@ from strategy_engine.plugin_loader import load_strategies
STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies"
+async def process_symbol(engine: StrategyEngine, stream: str, log) -> None:
+ """Process candles for a single symbol stream."""
+ last_id = "$"
+ log.info("engine_loop_start", stream=stream)
+ while True:
+ last_id = await engine.process_once(stream, last_id)
+
+
async def run() -> None:
config = StrategyConfig()
log = setup_logging("strategy-engine", config.log_level, config.log_format)
metrics = ServiceMetrics("strategy_engine")
+
notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id
+ bot_token=config.telegram_bot_token,
+ chat_id=config.telegram_chat_id,
)
broker = RedisBroker(config.redis_url)
+ strategies = load_strategies(STRATEGIES_DIR)
- strategies_dir = STRATEGIES_DIR
- strategies = load_strategies(strategies_dir)
-
- # Configure each strategy with params from config
for strategy in strategies:
params = config.strategy_params.get(strategy.name, {})
strategy.configure(params)
- log.info(
- "strategies_loaded",
- count=len(strategies),
- names=[s.name for s in strategies],
- )
+ log.info("loaded_strategies", count=len(strategies), names=[s.name for s in strategies])
engine = StrategyEngine(broker=broker, strategies=strategies)
@@ -50,22 +53,22 @@ async def run() -> None:
await health.start()
metrics.service_up.labels(service="strategy-engine").set(1)
+ tasks = []
try:
for symbol in config.symbols:
stream = f"candles.{symbol.replace('/', '_')}"
- last_id = "$"
- log.info("engine_loop_started", stream=stream)
-
- while True:
- last_id = await engine.process_once(stream, last_id)
- metrics.events_processed.labels(
- service="strategy-engine", event_type="candle"
- ).inc()
+ task = asyncio.create_task(process_symbol(engine, stream, log))
+ tasks.append(task)
+
+ # Wait for all symbol processors (they run forever until cancelled)
+ await asyncio.gather(*tasks)
except Exception as exc:
log.error("fatal_error", error=str(exc))
await notifier.send_error(str(exc), "strategy-engine")
raise
finally:
+ for task in tasks:
+ task.cancel()
metrics.service_up.labels(service="strategy-engine").set(0)
await notifier.close()
await broker.close()