diff options
Diffstat (limited to 'services/strategy-engine/src')
4 files changed, 502 insertions, 10 deletions
diff --git a/services/strategy-engine/src/strategy_engine/engine.py b/services/strategy-engine/src/strategy_engine/engine.py index d401aee..4b2c468 100644 --- a/services/strategy-engine/src/strategy_engine/engine.py +++ b/services/strategy-engine/src/strategy_engine/engine.py @@ -2,11 +2,11 @@ import logging -from shared.broker import RedisBroker -from shared.events import CandleEvent, SignalEvent, Event - from strategies.base import BaseStrategy +from shared.broker import RedisBroker +from shared.events import CandleEvent, Event, SignalEvent + logger = logging.getLogger(__name__) @@ -26,7 +26,7 @@ class StrategyEngine: try: event = Event.from_dict(raw) except Exception as exc: - logger.warning("Failed to parse event: %s – %s", raw, exc) + logger.warning("Failed to parse event: %s - %s", raw, exc) continue if not isinstance(event, CandleEvent): diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py index 30de528..3d73058 100644 --- a/services/strategy-engine/src/strategy_engine/main.py +++ b/services/strategy-engine/src/strategy_engine/main.py @@ -1,17 +1,25 @@ """Strategy Engine Service entry point.""" import asyncio +import zoneinfo +from datetime import datetime from pathlib import Path +import aiohttp + +from shared.alpaca import AlpacaClient from shared.broker import RedisBroker +from shared.db import Database from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier - +from shared.sentiment_models import MarketSentiment +from shared.shutdown import GracefulShutdown from strategy_engine.config import StrategyConfig from strategy_engine.engine import StrategyEngine from strategy_engine.plugin_loader import load_strategies +from strategy_engine.stock_selector import StockSelector # The strategies directory lives alongside the installed package STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies" @@ -30,23 +38,74 @@ async def process_symbol(engine: StrategyEngine, stream: str, log) -> None: last_id = await engine.process_once(stream, last_id) +async def run_stock_selector( + selector: StockSelector, + notifier: TelegramNotifier, + db: Database, + config: StrategyConfig, + log, +) -> None: + """Run the stock selector once per day at the configured time.""" + et = zoneinfo.ZoneInfo("America/New_York") + + while True: + now_et = datetime.now(et) + target_hour, target_min = map(int, config.selector_final_time.split(":")) + + if now_et.hour == target_hour and now_et.minute == target_min: + log.info("stock_selector_running") + try: + selections = await selector.select() + if selections: + ms_data = await db.get_latest_market_sentiment() + ms = None + if ms_data: + ms = MarketSentiment(**ms_data) + await notifier.send_stock_selection(selections, ms) + log.info("stock_selector_complete", picks=[s.symbol for s in selections]) + else: + log.info("stock_selector_no_picks") + except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc: + log.warning("stock_selector_network_error", error=str(exc)) + except (ValueError, KeyError, TypeError) as exc: + log.warning("stock_selector_data_error", error=str(exc)) + except Exception as exc: + log.error("stock_selector_error", error=str(exc), exc_info=True) + await asyncio.sleep(120) # Sleep past this minute + else: + await asyncio.sleep(30) + + async def run() -> None: config = StrategyConfig() log = setup_logging("strategy-engine", config.log_level, config.log_format) metrics = ServiceMetrics("strategy_engine") notifier = TelegramNotifier( - bot_token=config.telegram_bot_token, + bot_token=config.telegram_bot_token.get_secret_value(), chat_id=config.telegram_chat_id, ) - broker = RedisBroker(config.redis_url) + broker = RedisBroker(config.redis_url.get_secret_value()) + + db = Database(config.database_url.get_secret_value()) + await db.connect() + + alpaca = AlpacaClient( + api_key=config.alpaca_api_key.get_secret_value(), + api_secret=config.alpaca_api_secret.get_secret_value(), + paper=config.alpaca_paper, + ) + strategies = load_strategies(STRATEGIES_DIR) for strategy in strategies: params = config.strategy_params.get(strategy.name, {}) strategy.configure(params) + shutdown = GracefulShutdown() + shutdown.install_handlers() + log.info("loaded_strategies", count=len(strategies), names=[s.name for s in strategies]) engine = StrategyEngine(broker=broker, strategies=strategies) @@ -67,9 +126,23 @@ async def run() -> None: task = asyncio.create_task(process_symbol(engine, stream, log)) tasks.append(task) - await asyncio.gather(*tasks) + if config.anthropic_api_key.get_secret_value(): + selector = StockSelector( + db=db, + broker=broker, + alpaca=alpaca, + anthropic_api_key=config.anthropic_api_key.get_secret_value(), + anthropic_model=config.anthropic_model, + max_picks=config.selector_max_picks, + ) + tasks.append( + asyncio.create_task(run_stock_selector(selector, notifier, db, config, log)) + ) + log.info("stock_selector_enabled", time=config.selector_final_time) + + await shutdown.wait() except Exception as exc: - log.error("fatal_error", error=str(exc)) + log.error("fatal_error", error=str(exc), exc_info=True) await notifier.send_error(str(exc), "strategy-engine") raise finally: @@ -78,6 +151,8 @@ async def run() -> None: metrics.service_up.labels(service="strategy-engine").set(0) await notifier.close() await broker.close() + await alpaca.close() + await db.close() def main() -> None: diff --git a/services/strategy-engine/src/strategy_engine/plugin_loader.py b/services/strategy-engine/src/strategy_engine/plugin_loader.py index 62e4160..57680db 100644 --- a/services/strategy-engine/src/strategy_engine/plugin_loader.py +++ b/services/strategy-engine/src/strategy_engine/plugin_loader.py @@ -5,7 +5,6 @@ import sys from pathlib import Path import yaml - from strategies.base import BaseStrategy diff --git a/services/strategy-engine/src/strategy_engine/stock_selector.py b/services/strategy-engine/src/strategy_engine/stock_selector.py new file mode 100644 index 0000000..8657b93 --- /dev/null +++ b/services/strategy-engine/src/strategy_engine/stock_selector.py @@ -0,0 +1,418 @@ +"""3-stage stock selector engine: sentiment → technical → LLM.""" + +import asyncio +import json +import logging +import re +from datetime import UTC, datetime + +import aiohttp + +from shared.alpaca import AlpacaClient +from shared.broker import RedisBroker +from shared.db import Database +from shared.models import OrderSide +from shared.sentiment_models import Candidate, MarketSentiment, SelectedStock + +logger = logging.getLogger(__name__) + +ANTHROPIC_API_URL = "https://api.anthropic.com/v1/messages" + + +def _extract_json_array(text: str) -> list[dict] | None: + """Extract a JSON array from text that may contain markdown code blocks.""" + code_block = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL) + if code_block: + raw = code_block.group(1) + else: + array_match = re.search(r"\[.*\]", text, re.DOTALL) + if array_match: + raw = array_match.group(0) + else: + raw = text.strip() + + try: + data = json.loads(raw) + if isinstance(data, list): + return [item for item in data if isinstance(item, dict)] + return None + except (json.JSONDecodeError, TypeError): + return None + + +def _parse_llm_selections(text: str) -> list[SelectedStock]: + """Parse LLM response into SelectedStock list. + + Handles both bare JSON arrays and markdown code blocks. + Returns empty list on any parse error. + """ + items = _extract_json_array(text) + if items is None: + return [] + + selections = [] + for item in items: + try: + selection = SelectedStock( + symbol=item["symbol"], + side=OrderSide(item["side"]), + conviction=float(item["conviction"]), + reason=item.get("reason", ""), + key_news=item.get("key_news", []), + ) + selections.append(selection) + except (KeyError, ValueError) as e: + logger.warning("Skipping invalid selection item: %s", e) + return selections + + +class SentimentCandidateSource: + """Generates candidates from DB sentiment scores.""" + + def __init__(self, db: Database) -> None: + self._db = db + + async def get_candidates(self) -> list[Candidate]: + rows = await self._db.get_top_symbol_scores(limit=20) + candidates = [] + for row in rows: + composite = float(row.get("composite", 0)) + if composite == 0: + continue + candidates.append( + Candidate( + symbol=row["symbol"], + source="sentiment", + score=composite, + reason=f"composite={composite:.2f}, news_count={row.get('news_count', 0)}", + ) + ) + return candidates + + +class LLMCandidateSource: + """Generates candidates by asking Claude to analyze recent news.""" + + def __init__(self, db: Database, api_key: str, model: str) -> None: + self._db = db + self._api_key = api_key + self._model = model + + async def get_candidates(self, session: aiohttp.ClientSession | None = None) -> list[Candidate]: + news_items = await self._db.get_recent_news(hours=24) + if not news_items: + return [] + + headlines = [] + for item in news_items[:50]: # cap at 50 to stay within context + symbols = item.get("symbols", []) + sym_str = ", ".join(symbols) if symbols else "N/A" + headlines.append(f"[{sym_str}] {item['headline']}") + + prompt = ( + "You are a stock analyst. Given recent news headlines, identify the 5-10 most " + "actionable US stock tickers. Return ONLY a JSON array with objects having: " + "symbol (ticker), direction ('BUY' or 'SELL'), score (0-1), reason (brief).\n\n" + "Headlines:\n" + "\n".join(headlines) + ) + + own_session = session is None + if own_session: + session = aiohttp.ClientSession() + + try: + async with session.post( + ANTHROPIC_API_URL, + headers={ + "x-api-key": self._api_key, + "anthropic-version": "2023-06-01", + "content-type": "application/json", + }, + json={ + "model": self._model, + "max_tokens": 1024, + "messages": [{"role": "user", "content": prompt}], + }, + ) as resp: + if resp.status != 200: + body = await resp.text() + logger.error("LLM candidate source error %d: %s", resp.status, body) + return [] + data = await resp.json() + + content = data.get("content", []) + text = "" + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + text += block.get("text", "") + + return self._parse_candidates(text) + except Exception as e: + logger.error("LLMCandidateSource error: %s", e) + return [] + finally: + if own_session: + await session.close() + + def _parse_candidates(self, text: str) -> list[Candidate]: + items = _extract_json_array(text) + if items is None: + return [] + + candidates = [] + for item in items: + try: + direction_str = item.get("direction", "BUY") + direction = OrderSide(direction_str) + except ValueError: + direction = None + candidates.append( + Candidate( + symbol=item["symbol"], + source="llm", + direction=direction, + score=float(item.get("score", 0.5)), + reason=item.get("reason", ""), + ) + ) + return candidates + + +def _compute_rsi(closes: list[float], period: int = 14) -> float: + """Compute RSI for the last data point.""" + if len(closes) < period + 1: + return 50.0 # neutral if insufficient data + + deltas = [closes[i] - closes[i - 1] for i in range(1, len(closes))] + gains = [d if d > 0 else 0.0 for d in deltas] + losses = [-d if d < 0 else 0.0 for d in deltas] + + avg_gain = sum(gains[:period]) / period + avg_loss = sum(losses[:period]) / period + + for i in range(period, len(deltas)): + avg_gain = (avg_gain * (period - 1) + gains[i]) / period + avg_loss = (avg_loss * (period - 1) + losses[i]) / period + + if avg_loss == 0: + return 100.0 + rs = avg_gain / avg_loss + return 100.0 - (100.0 / (1.0 + rs)) + + +class StockSelector: + """Orchestrates the 3-stage stock selection pipeline.""" + + def __init__( + self, + db: Database, + broker: RedisBroker, + alpaca: AlpacaClient, + anthropic_api_key: str, + anthropic_model: str = "claude-sonnet-4-20250514", + max_picks: int = 3, + ) -> None: + self._db = db + self._broker = broker + self._alpaca = alpaca + self._api_key = anthropic_api_key + self._model = anthropic_model + self._max_picks = max_picks + self._http_session: aiohttp.ClientSession | None = None + self._session_lock = asyncio.Lock() + + async def _ensure_session(self) -> aiohttp.ClientSession: + async with self._session_lock: + if self._http_session is None or self._http_session.closed: + self._http_session = aiohttp.ClientSession() + return self._http_session + + async def close(self) -> None: + if self._http_session and not self._http_session.closed: + await self._http_session.close() + + async def select(self) -> list[SelectedStock]: + """Run the full 3-stage pipeline and return selected stocks.""" + # Market gate: check sentiment + sentiment_data = await self._db.get_latest_market_sentiment() + if sentiment_data is None: + logger.warning("No market sentiment data; skipping selection") + return [] + + market_sentiment = MarketSentiment(**sentiment_data) + if market_sentiment.market_regime == "risk_off": + logger.info("Market is risk_off; skipping stock selection") + return [] + + # Stage 1: gather candidates from both sources + sentiment_source = SentimentCandidateSource(self._db) + llm_source = LLMCandidateSource(self._db, self._api_key, self._model) + + session = await self._ensure_session() + sentiment_candidates = await sentiment_source.get_candidates() + llm_candidates = await llm_source.get_candidates(session=session) + + candidates = self._merge_candidates(sentiment_candidates, llm_candidates) + if not candidates: + logger.info("No candidates found") + return [] + + # Stage 2: technical filter + filtered = await self._technical_filter(candidates) + if not filtered: + logger.info("All candidates filtered out by technical criteria") + return [] + + # Stage 3: LLM final selection + selections = await self._llm_final_select(filtered, market_sentiment) + + # Persist and publish + today = datetime.now(UTC).date() + sentiment_snapshot = { + "fear_greed": market_sentiment.fear_greed, + "market_regime": market_sentiment.market_regime, + "vix": market_sentiment.vix, + } + for stock in selections: + try: + await self._db.insert_stock_selection( + trade_date=today, + symbol=stock.symbol, + side=stock.side.value, + conviction=stock.conviction, + reason=stock.reason, + key_news=stock.key_news, + sentiment_snapshot=sentiment_snapshot, + ) + except Exception as e: + logger.error("Failed to persist selection for %s: %s", stock.symbol, e) + + try: + await self._broker.publish( + "selected_stocks", + { + "symbol": stock.symbol, + "side": stock.side.value, + "conviction": stock.conviction, + "reason": stock.reason, + "key_news": stock.key_news, + "trade_date": str(today), + }, + ) + except Exception as e: + logger.error("Failed to publish selection for %s: %s", stock.symbol, e) + + return selections + + def _merge_candidates( + self, sentiment: list[Candidate], llm: list[Candidate] + ) -> list[Candidate]: + """Deduplicate candidates by symbol, keeping the higher score.""" + by_symbol: dict[str, Candidate] = {} + for c in sentiment + llm: + existing = by_symbol.get(c.symbol) + if existing is None or c.score > existing.score: + by_symbol[c.symbol] = c + return sorted(by_symbol.values(), key=lambda c: c.score, reverse=True) + + async def _technical_filter(self, candidates: list[Candidate]) -> list[Candidate]: + """Filter candidates using RSI, EMA20, and volume criteria.""" + passed = [] + for candidate in candidates: + try: + bars = await self._alpaca.get_bars(candidate.symbol, timeframe="1Day", limit=60) + if len(bars) < 21: + logger.debug("Insufficient bars for %s", candidate.symbol) + continue + + closes = [float(b["c"]) for b in bars] + volumes = [float(b["v"]) for b in bars] + + rsi = _compute_rsi(closes) + if not (30 <= rsi <= 70): + logger.debug("%s RSI=%.1f outside 30-70", candidate.symbol, rsi) + continue + + ema20 = sum(closes[-20:]) / 20 # simple approximation + current_price = closes[-1] + if current_price <= ema20: + logger.debug( + "%s price %.2f <= EMA20 %.2f", candidate.symbol, current_price, ema20 + ) + continue + + avg_volume = sum(volumes[:-1]) / max(len(volumes) - 1, 1) + current_volume = volumes[-1] + if current_volume <= 0.5 * avg_volume: + logger.debug( + "%s volume %.0f <= 50%% avg %.0f", + candidate.symbol, + current_volume, + avg_volume, + ) + continue + + passed.append(candidate) + except Exception as e: + logger.warning("Technical filter error for %s: %s", candidate.symbol, e) + + return passed + + async def _llm_final_select( + self, candidates: list[Candidate], market_sentiment: MarketSentiment + ) -> list[SelectedStock]: + """Ask Claude to pick 2-3 stocks with rationale.""" + candidate_lines = [ + f"- {c.symbol} (source={c.source}, score={c.score:.2f}, reason={c.reason})" + for c in candidates + ] + market_context = ( + f"Fear/Greed: {market_sentiment.fear_greed} ({market_sentiment.fear_greed_label}), " + f"VIX: {market_sentiment.vix}, " + f"Fed stance: {market_sentiment.fed_stance}, " + f"Regime: {market_sentiment.market_regime}" + ) + + prompt = ( + f"You are a portfolio manager. Select 2-3 stocks for today's session.\n\n" + f"Market context: {market_context}\n\n" + f"Candidates (already passed technical filters):\n" + + "\n".join(candidate_lines) + + "\n\n" + "Return ONLY a JSON array with objects having:\n" + " symbol, side ('BUY' or 'SELL'), conviction (0-1), reason (1-2 sentences), " + "key_news (list of 1-3 relevant headlines or facts)\n" + f"Select at most {self._max_picks} stocks." + ) + + try: + session = await self._ensure_session() + async with session.post( + ANTHROPIC_API_URL, + headers={ + "x-api-key": self._api_key, + "anthropic-version": "2023-06-01", + "content-type": "application/json", + }, + json={ + "model": self._model, + "max_tokens": 1024, + "messages": [{"role": "user", "content": prompt}], + }, + ) as resp: + if resp.status != 200: + body = await resp.text() + logger.error("LLM final select error %d: %s", resp.status, body) + return [] + data = await resp.json() + + content = data.get("content", []) + text = "" + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + text += block.get("text", "") + + return _parse_llm_selections(text)[: self._max_picks] + except Exception as e: + logger.error("LLM final select error: %s", e) + return [] |
