summaryrefslogtreecommitdiff
path: root/services/strategy-engine/src
diff options
context:
space:
mode:
Diffstat (limited to 'services/strategy-engine/src')
-rw-r--r--services/strategy-engine/src/strategy_engine/main.py36
1 files changed, 35 insertions, 1 deletions
diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py
index 4549f70..ac28d6b 100644
--- a/services/strategy-engine/src/strategy_engine/main.py
+++ b/services/strategy-engine/src/strategy_engine/main.py
@@ -8,6 +8,7 @@ from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
+from shared.sentiment import SentimentProvider, SentimentData
from strategy_engine.config import StrategyConfig
from strategy_engine.engine import StrategyEngine
@@ -21,6 +22,28 @@ STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies"
# order-executor: +2 (8082), portfolio-manager: +3 (8083)
HEALTH_PORT_OFFSET = 1
+SENTIMENT_REFRESH_INTERVAL = 300 # 5 minutes
+
+
+async def sentiment_loop(provider: SentimentProvider, strategies: list, log) -> None:
+ """Periodically fetch sentiment and update strategies that support it."""
+ while True:
+ try:
+ sentiment = await provider.get_sentiment("SOL")
+ log.info(
+ "sentiment_updated",
+ fear_greed=sentiment.fear_greed_value,
+ news=sentiment.news_sentiment,
+ netflow=sentiment.exchange_netflow,
+ should_block=sentiment.should_block,
+ )
+ for strategy in strategies:
+ if hasattr(strategy, "update_sentiment"):
+ strategy.update_sentiment(sentiment)
+ except Exception as exc:
+ log.warning("sentiment_fetch_failed", error=str(exc))
+ await asyncio.sleep(SENTIMENT_REFRESH_INTERVAL)
+
async def process_symbol(engine: StrategyEngine, stream: str, log) -> None:
"""Process candles for a single symbol stream."""
@@ -51,6 +74,11 @@ async def run() -> None:
engine = StrategyEngine(broker=broker, strategies=strategies)
+ provider = SentimentProvider(
+ cryptopanic_api_key=config.cryptopanic_api_key,
+ cryptoquant_api_key=config.cryptoquant_api_key,
+ )
+
health = HealthCheckServer(
"strategy-engine",
port=config.health_port + HEALTH_PORT_OFFSET,
@@ -62,12 +90,17 @@ async def run() -> None:
tasks = []
try:
+ # Sentiment updater
+ tasks.append(asyncio.create_task(
+ sentiment_loop(provider, strategies, log)
+ ))
+ # Symbol processors
for symbol in config.symbols:
stream = f"candles.{symbol.replace('/', '_')}"
task = asyncio.create_task(process_symbol(engine, stream, log))
tasks.append(task)
- # Wait for all symbol processors (they run forever until cancelled)
+ # Wait for all tasks (they run forever until cancelled)
await asyncio.gather(*tasks)
except Exception as exc:
log.error("fatal_error", error=str(exc))
@@ -78,6 +111,7 @@ async def run() -> None:
task.cancel()
metrics.service_up.labels(service="strategy-engine").set(0)
await notifier.close()
+ await provider.close()
await broker.close()