summaryrefslogtreecommitdiff
path: root/services/strategy-engine/src/strategy_engine/main.py
blob: ac28d6b63017e6c6511abb143e8ffd624af9fded (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""Strategy Engine Service entry point."""

import asyncio
from pathlib import Path

from shared.broker import RedisBroker
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
from shared.sentiment import SentimentProvider, SentimentData

from strategy_engine.config import StrategyConfig
from strategy_engine.engine import StrategyEngine
from strategy_engine.plugin_loader import load_strategies

# The strategies directory lives alongside the installed package
STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies"

# Health check port: base (HEALTH_PORT, default 8080) + offset
# data-collector: +0 (8080), strategy-engine: +1 (8081)
# order-executor: +2 (8082), portfolio-manager: +3 (8083)
HEALTH_PORT_OFFSET = 1

SENTIMENT_REFRESH_INTERVAL = 300  # 5 minutes


async def sentiment_loop(provider: SentimentProvider, strategies: list, log) -> None:
    """Periodically fetch sentiment and update strategies that support it."""
    while True:
        try:
            sentiment = await provider.get_sentiment("SOL")
            log.info(
                "sentiment_updated",
                fear_greed=sentiment.fear_greed_value,
                news=sentiment.news_sentiment,
                netflow=sentiment.exchange_netflow,
                should_block=sentiment.should_block,
            )
            for strategy in strategies:
                if hasattr(strategy, "update_sentiment"):
                    strategy.update_sentiment(sentiment)
        except Exception as exc:
            log.warning("sentiment_fetch_failed", error=str(exc))
        await asyncio.sleep(SENTIMENT_REFRESH_INTERVAL)


async def process_symbol(engine: StrategyEngine, stream: str, log) -> None:
    """Process candles for a single symbol stream."""
    last_id = "$"
    log.info("engine_loop_start", stream=stream)
    while True:
        last_id = await engine.process_once(stream, last_id)


async def run() -> None:
    config = StrategyConfig()
    log = setup_logging("strategy-engine", config.log_level, config.log_format)
    metrics = ServiceMetrics("strategy_engine")

    notifier = TelegramNotifier(
        bot_token=config.telegram_bot_token,
        chat_id=config.telegram_chat_id,
    )

    broker = RedisBroker(config.redis_url)
    strategies = load_strategies(STRATEGIES_DIR)

    for strategy in strategies:
        params = config.strategy_params.get(strategy.name, {})
        strategy.configure(params)

    log.info("loaded_strategies", count=len(strategies), names=[s.name for s in strategies])

    engine = StrategyEngine(broker=broker, strategies=strategies)

    provider = SentimentProvider(
        cryptopanic_api_key=config.cryptopanic_api_key,
        cryptoquant_api_key=config.cryptoquant_api_key,
    )

    health = HealthCheckServer(
        "strategy-engine",
        port=config.health_port + HEALTH_PORT_OFFSET,
        auth_token=config.metrics_auth_token,
    )
    health.register_check("redis", broker.ping)
    await health.start()
    metrics.service_up.labels(service="strategy-engine").set(1)

    tasks = []
    try:
        # Sentiment updater
        tasks.append(asyncio.create_task(
            sentiment_loop(provider, strategies, log)
        ))
        # Symbol processors
        for symbol in config.symbols:
            stream = f"candles.{symbol.replace('/', '_')}"
            task = asyncio.create_task(process_symbol(engine, stream, log))
            tasks.append(task)

        # Wait for all tasks (they run forever until cancelled)
        await asyncio.gather(*tasks)
    except Exception as exc:
        log.error("fatal_error", error=str(exc))
        await notifier.send_error(str(exc), "strategy-engine")
        raise
    finally:
        for task in tasks:
            task.cancel()
        metrics.service_up.labels(service="strategy-engine").set(0)
        await notifier.close()
        await provider.close()
        await broker.close()


def main() -> None:
    asyncio.run(run())


if __name__ == "__main__":
    main()