diff options
| author | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-02 09:56:42 +0900 |
|---|---|---|
| committer | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-02 09:56:42 +0900 |
| commit | 87bf67bac771181aeb4f4c5bb11fae8f343c12bb (patch) | |
| tree | bd8feb016f7f1729e7448ff54927fe40f15fe9f3 /services/strategy-engine/src/strategy_engine/main.py | |
| parent | c32ea21d0e29a0894fe94ecc4236145541bce3ab (diff) | |
feat: wire sentiment into engine + add EMA/bullish candle entry filters
Diffstat (limited to 'services/strategy-engine/src/strategy_engine/main.py')
| -rw-r--r-- | services/strategy-engine/src/strategy_engine/main.py | 36 |
1 files changed, 35 insertions, 1 deletions
diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py index 4549f70..ac28d6b 100644 --- a/services/strategy-engine/src/strategy_engine/main.py +++ b/services/strategy-engine/src/strategy_engine/main.py @@ -8,6 +8,7 @@ from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier +from shared.sentiment import SentimentProvider, SentimentData from strategy_engine.config import StrategyConfig from strategy_engine.engine import StrategyEngine @@ -21,6 +22,28 @@ STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies" # order-executor: +2 (8082), portfolio-manager: +3 (8083) HEALTH_PORT_OFFSET = 1 +SENTIMENT_REFRESH_INTERVAL = 300 # 5 minutes + + +async def sentiment_loop(provider: SentimentProvider, strategies: list, log) -> None: + """Periodically fetch sentiment and update strategies that support it.""" + while True: + try: + sentiment = await provider.get_sentiment("SOL") + log.info( + "sentiment_updated", + fear_greed=sentiment.fear_greed_value, + news=sentiment.news_sentiment, + netflow=sentiment.exchange_netflow, + should_block=sentiment.should_block, + ) + for strategy in strategies: + if hasattr(strategy, "update_sentiment"): + strategy.update_sentiment(sentiment) + except Exception as exc: + log.warning("sentiment_fetch_failed", error=str(exc)) + await asyncio.sleep(SENTIMENT_REFRESH_INTERVAL) + async def process_symbol(engine: StrategyEngine, stream: str, log) -> None: """Process candles for a single symbol stream.""" @@ -51,6 +74,11 @@ async def run() -> None: engine = StrategyEngine(broker=broker, strategies=strategies) + provider = SentimentProvider( + cryptopanic_api_key=config.cryptopanic_api_key, + cryptoquant_api_key=config.cryptoquant_api_key, + ) + health = HealthCheckServer( "strategy-engine", port=config.health_port + HEALTH_PORT_OFFSET, @@ -62,12 +90,17 @@ async def run() -> None: tasks = [] try: + # Sentiment updater + tasks.append(asyncio.create_task( + sentiment_loop(provider, strategies, log) + )) + # Symbol processors for symbol in config.symbols: stream = f"candles.{symbol.replace('/', '_')}" task = asyncio.create_task(process_symbol(engine, stream, log)) tasks.append(task) - # Wait for all symbol processors (they run forever until cancelled) + # Wait for all tasks (they run forever until cancelled) await asyncio.gather(*tasks) except Exception as exc: log.error("fatal_error", error=str(exc)) @@ -78,6 +111,7 @@ async def run() -> None: task.cancel() metrics.service_up.labels(service="strategy-engine").set(0) await notifier.close() + await provider.close() await broker.close() |
