"""Strategy Engine Service entry point.""" import asyncio from datetime import datetime from pathlib import Path import zoneinfo from shared.alpaca import AlpacaClient from shared.broker import RedisBroker from shared.db import Database from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier from shared.sentiment_models import MarketSentiment from strategy_engine.config import StrategyConfig from strategy_engine.engine import StrategyEngine from strategy_engine.plugin_loader import load_strategies from strategy_engine.stock_selector import StockSelector # The strategies directory lives alongside the installed package STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies" # Health check port: base (HEALTH_PORT, default 8080) + offset # data-collector: +0 (8080), strategy-engine: +1 (8081) # order-executor: +2 (8082), portfolio-manager: +3 (8083) HEALTH_PORT_OFFSET = 1 async def process_symbol(engine: StrategyEngine, stream: str, log) -> None: """Process candles for a single symbol stream.""" last_id = "$" log.info("engine_loop_start", stream=stream) while True: last_id = await engine.process_once(stream, last_id) async def run_stock_selector( selector: StockSelector, notifier: TelegramNotifier, db: Database, config: StrategyConfig, log, ) -> None: """Run the stock selector once per day at the configured time.""" et = zoneinfo.ZoneInfo("America/New_York") while True: now_et = datetime.now(et) target_hour, target_min = map(int, config.selector_final_time.split(":")) if now_et.hour == target_hour and now_et.minute == target_min: log.info("stock_selector_running") try: selections = await selector.select() if selections: ms_data = await db.get_latest_market_sentiment() ms = None if ms_data: ms = MarketSentiment(**ms_data) await notifier.send_stock_selection(selections, ms) log.info("stock_selector_complete", picks=[s.symbol for s in selections]) else: log.info("stock_selector_no_picks") except Exception as exc: log.error("stock_selector_error", error=str(exc)) await asyncio.sleep(120) # Sleep past this minute else: await asyncio.sleep(30) async def run() -> None: config = StrategyConfig() log = setup_logging("strategy-engine", config.log_level, config.log_format) metrics = ServiceMetrics("strategy_engine") notifier = TelegramNotifier( bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id, ) broker = RedisBroker(config.redis_url) db = Database(config.database_url) await db.connect() alpaca = AlpacaClient( api_key=config.alpaca_api_key, api_secret=config.alpaca_api_secret, paper=config.alpaca_paper, ) strategies = load_strategies(STRATEGIES_DIR) for strategy in strategies: params = config.strategy_params.get(strategy.name, {}) strategy.configure(params) log.info("loaded_strategies", count=len(strategies), names=[s.name for s in strategies]) engine = StrategyEngine(broker=broker, strategies=strategies) health = HealthCheckServer( "strategy-engine", port=config.health_port + HEALTH_PORT_OFFSET, auth_token=config.metrics_auth_token, ) health.register_check("redis", broker.ping) await health.start() metrics.service_up.labels(service="strategy-engine").set(1) tasks = [] try: for symbol in config.symbols: stream = f"candles.{symbol.replace('/', '_')}" task = asyncio.create_task(process_symbol(engine, stream, log)) tasks.append(task) if config.anthropic_api_key: selector = StockSelector( db=db, broker=broker, alpaca=alpaca, anthropic_api_key=config.anthropic_api_key, anthropic_model=config.anthropic_model, max_picks=config.selector_max_picks, ) tasks.append(asyncio.create_task( run_stock_selector(selector, notifier, db, config, log) )) log.info("stock_selector_enabled", time=config.selector_final_time) await asyncio.gather(*tasks) except Exception as exc: log.error("fatal_error", error=str(exc)) await notifier.send_error(str(exc), "strategy-engine") raise finally: for task in tasks: task.cancel() metrics.service_up.labels(service="strategy-engine").set(0) await notifier.close() await broker.close() await alpaca.close() await db.close() def main() -> None: asyncio.run(run()) if __name__ == "__main__": main()