"""Strategy Engine Service entry point.""" import asyncio from pathlib import Path from shared.broker import RedisBroker from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier from shared.sentiment import SentimentProvider, SentimentData from strategy_engine.config import StrategyConfig from strategy_engine.engine import StrategyEngine from strategy_engine.plugin_loader import load_strategies # The strategies directory lives alongside the installed package STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies" # Health check port: base (HEALTH_PORT, default 8080) + offset # data-collector: +0 (8080), strategy-engine: +1 (8081) # order-executor: +2 (8082), portfolio-manager: +3 (8083) HEALTH_PORT_OFFSET = 1 SENTIMENT_REFRESH_INTERVAL = 300 # 5 minutes async def sentiment_loop(provider: SentimentProvider, strategies: list, log) -> None: """Periodically fetch sentiment and update strategies that support it.""" while True: try: sentiment = await provider.get_sentiment("SOL") log.info( "sentiment_updated", fear_greed=sentiment.fear_greed_value, news=sentiment.news_sentiment, netflow=sentiment.exchange_netflow, should_block=sentiment.should_block, ) for strategy in strategies: if hasattr(strategy, "update_sentiment"): strategy.update_sentiment(sentiment) except Exception as exc: log.warning("sentiment_fetch_failed", error=str(exc)) await asyncio.sleep(SENTIMENT_REFRESH_INTERVAL) async def process_symbol(engine: StrategyEngine, stream: str, log) -> None: """Process candles for a single symbol stream.""" last_id = "$" log.info("engine_loop_start", stream=stream) while True: last_id = await engine.process_once(stream, last_id) async def run() -> None: config = StrategyConfig() log = setup_logging("strategy-engine", config.log_level, config.log_format) metrics = ServiceMetrics("strategy_engine") notifier = TelegramNotifier( bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id, ) broker = RedisBroker(config.redis_url) strategies = load_strategies(STRATEGIES_DIR) for strategy in strategies: params = config.strategy_params.get(strategy.name, {}) strategy.configure(params) log.info("loaded_strategies", count=len(strategies), names=[s.name for s in strategies]) engine = StrategyEngine(broker=broker, strategies=strategies) provider = SentimentProvider() health = HealthCheckServer( "strategy-engine", port=config.health_port + HEALTH_PORT_OFFSET, auth_token=config.metrics_auth_token, ) health.register_check("redis", broker.ping) await health.start() metrics.service_up.labels(service="strategy-engine").set(1) tasks = [] try: # Sentiment updater tasks.append(asyncio.create_task( sentiment_loop(provider, strategies, log) )) # Symbol processors for symbol in config.symbols: stream = f"candles.{symbol.replace('/', '_')}" task = asyncio.create_task(process_symbol(engine, stream, log)) tasks.append(task) # Wait for all tasks (they run forever until cancelled) await asyncio.gather(*tasks) except Exception as exc: log.error("fatal_error", error=str(exc)) await notifier.send_error(str(exc), "strategy-engine") raise finally: for task in tasks: task.cancel() metrics.service_up.labels(service="strategy-engine").set(0) await notifier.close() await provider.close() await broker.close() def main() -> None: asyncio.run(run()) if __name__ == "__main__": main()