"""Strategy Engine Service entry point.""" import asyncio from pathlib import Path from shared.broker import RedisBroker from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier from strategy_engine.config import StrategyConfig from strategy_engine.engine import StrategyEngine from strategy_engine.plugin_loader import load_strategies # The strategies directory lives alongside the installed package STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies" # Health check port: base (HEALTH_PORT, default 8080) + offset # data-collector: +0 (8080), strategy-engine: +1 (8081) # order-executor: +2 (8082), portfolio-manager: +3 (8083) HEALTH_PORT_OFFSET = 1 async def process_symbol(engine: StrategyEngine, stream: str, log) -> None: """Process candles for a single symbol stream.""" last_id = "$" log.info("engine_loop_start", stream=stream) while True: last_id = await engine.process_once(stream, last_id) async def run() -> None: config = StrategyConfig() log = setup_logging("strategy-engine", config.log_level, config.log_format) metrics = ServiceMetrics("strategy_engine") notifier = TelegramNotifier( bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id, ) broker = RedisBroker(config.redis_url) strategies = load_strategies(STRATEGIES_DIR) for strategy in strategies: params = config.strategy_params.get(strategy.name, {}) strategy.configure(params) log.info("loaded_strategies", count=len(strategies), names=[s.name for s in strategies]) engine = StrategyEngine(broker=broker, strategies=strategies) health = HealthCheckServer( "strategy-engine", port=config.health_port + HEALTH_PORT_OFFSET, auth_token=config.metrics_auth_token ) health.register_check("redis", broker.ping) await health.start() metrics.service_up.labels(service="strategy-engine").set(1) tasks = [] try: for symbol in config.symbols: stream = f"candles.{symbol.replace('/', '_')}" task = asyncio.create_task(process_symbol(engine, stream, log)) tasks.append(task) # Wait for all symbol processors (they run forever until cancelled) await asyncio.gather(*tasks) except Exception as exc: log.error("fatal_error", error=str(exc)) await notifier.send_error(str(exc), "strategy-engine") raise finally: for task in tasks: task.cancel() metrics.service_up.labels(service="strategy-engine").set(0) await notifier.close() await broker.close() def main() -> None: asyncio.run(run()) if __name__ == "__main__": main()