1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
"""Strategy Engine Service entry point."""
import asyncio
from datetime import datetime
from pathlib import Path
import zoneinfo
from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
from shared.db import Database
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
from shared.sentiment_models import MarketSentiment
from shared.shutdown import GracefulShutdown
from strategy_engine.config import StrategyConfig
from strategy_engine.engine import StrategyEngine
from strategy_engine.plugin_loader import load_strategies
from strategy_engine.stock_selector import StockSelector
# The strategies directory lives alongside the installed package
STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies"
# Health check port: base (HEALTH_PORT, default 8080) + offset
# data-collector: +0 (8080), strategy-engine: +1 (8081)
# order-executor: +2 (8082), portfolio-manager: +3 (8083)
HEALTH_PORT_OFFSET = 1
async def process_symbol(engine: StrategyEngine, stream: str, log) -> None:
"""Process candles for a single symbol stream."""
last_id = "$"
log.info("engine_loop_start", stream=stream)
while True:
last_id = await engine.process_once(stream, last_id)
async def run_stock_selector(
selector: StockSelector,
notifier: TelegramNotifier,
db: Database,
config: StrategyConfig,
log,
) -> None:
"""Run the stock selector once per day at the configured time."""
et = zoneinfo.ZoneInfo("America/New_York")
while True:
now_et = datetime.now(et)
target_hour, target_min = map(int, config.selector_final_time.split(":"))
if now_et.hour == target_hour and now_et.minute == target_min:
log.info("stock_selector_running")
try:
selections = await selector.select()
if selections:
ms_data = await db.get_latest_market_sentiment()
ms = None
if ms_data:
ms = MarketSentiment(**ms_data)
await notifier.send_stock_selection(selections, ms)
log.info("stock_selector_complete", picks=[s.symbol for s in selections])
else:
log.info("stock_selector_no_picks")
except Exception as exc:
log.error("stock_selector_error", error=str(exc))
await asyncio.sleep(120) # Sleep past this minute
else:
await asyncio.sleep(30)
async def run() -> None:
config = StrategyConfig()
log = setup_logging("strategy-engine", config.log_level, config.log_format)
metrics = ServiceMetrics("strategy_engine")
notifier = TelegramNotifier(
bot_token=config.telegram_bot_token.get_secret_value(),
chat_id=config.telegram_chat_id,
)
broker = RedisBroker(config.redis_url.get_secret_value())
db = Database(config.database_url.get_secret_value())
await db.connect()
alpaca = AlpacaClient(
api_key=config.alpaca_api_key.get_secret_value(),
api_secret=config.alpaca_api_secret.get_secret_value(),
paper=config.alpaca_paper,
)
strategies = load_strategies(STRATEGIES_DIR)
for strategy in strategies:
params = config.strategy_params.get(strategy.name, {})
strategy.configure(params)
shutdown = GracefulShutdown()
shutdown.install_handlers()
log.info("loaded_strategies", count=len(strategies), names=[s.name for s in strategies])
engine = StrategyEngine(broker=broker, strategies=strategies)
health = HealthCheckServer(
"strategy-engine",
port=config.health_port + HEALTH_PORT_OFFSET,
auth_token=config.metrics_auth_token,
)
health.register_check("redis", broker.ping)
await health.start()
metrics.service_up.labels(service="strategy-engine").set(1)
tasks = []
try:
for symbol in config.symbols:
stream = f"candles.{symbol.replace('/', '_')}"
task = asyncio.create_task(process_symbol(engine, stream, log))
tasks.append(task)
if config.anthropic_api_key.get_secret_value():
selector = StockSelector(
db=db,
broker=broker,
alpaca=alpaca,
anthropic_api_key=config.anthropic_api_key.get_secret_value(),
anthropic_model=config.anthropic_model,
max_picks=config.selector_max_picks,
)
tasks.append(
asyncio.create_task(run_stock_selector(selector, notifier, db, config, log))
)
log.info("stock_selector_enabled", time=config.selector_final_time)
await shutdown.wait()
except Exception as exc:
log.error("fatal_error", error=str(exc))
await notifier.send_error(str(exc), "strategy-engine")
raise
finally:
for task in tasks:
task.cancel()
metrics.service_up.labels(service="strategy-engine").set(0)
await notifier.close()
await broker.close()
await alpaca.close()
await db.close()
def main() -> None:
asyncio.run(run())
if __name__ == "__main__":
main()
|