summaryrefslogtreecommitdiff
path: root/services/strategy-engine/src
diff options
context:
space:
mode:
Diffstat (limited to 'services/strategy-engine/src')
-rw-r--r--services/strategy-engine/src/strategy_engine/config.py2
-rw-r--r--services/strategy-engine/src/strategy_engine/engine.py8
-rw-r--r--services/strategy-engine/src/strategy_engine/main.py86
-rw-r--r--services/strategy-engine/src/strategy_engine/plugin_loader.py1
-rw-r--r--services/strategy-engine/src/strategy_engine/stock_selector.py418
5 files changed, 503 insertions, 12 deletions
diff --git a/services/strategy-engine/src/strategy_engine/config.py b/services/strategy-engine/src/strategy_engine/config.py
index e3a49c2..9fd9c49 100644
--- a/services/strategy-engine/src/strategy_engine/config.py
+++ b/services/strategy-engine/src/strategy_engine/config.py
@@ -4,6 +4,6 @@ from shared.config import Settings
class StrategyConfig(Settings):
- symbols: list[str] = ["BTC/USDT"]
+ symbols: list[str] = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA"]
timeframes: list[str] = ["1m"]
strategy_params: dict = {}
diff --git a/services/strategy-engine/src/strategy_engine/engine.py b/services/strategy-engine/src/strategy_engine/engine.py
index d401aee..4b2c468 100644
--- a/services/strategy-engine/src/strategy_engine/engine.py
+++ b/services/strategy-engine/src/strategy_engine/engine.py
@@ -2,11 +2,11 @@
import logging
-from shared.broker import RedisBroker
-from shared.events import CandleEvent, SignalEvent, Event
-
from strategies.base import BaseStrategy
+from shared.broker import RedisBroker
+from shared.events import CandleEvent, Event, SignalEvent
+
logger = logging.getLogger(__name__)
@@ -26,7 +26,7 @@ class StrategyEngine:
try:
event = Event.from_dict(raw)
except Exception as exc:
- logger.warning("Failed to parse event: %s – %s", raw, exc)
+ logger.warning("Failed to parse event: %s - %s", raw, exc)
continue
if not isinstance(event, CandleEvent):
diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py
index 4549f70..3d73058 100644
--- a/services/strategy-engine/src/strategy_engine/main.py
+++ b/services/strategy-engine/src/strategy_engine/main.py
@@ -1,17 +1,25 @@
"""Strategy Engine Service entry point."""
import asyncio
+import zoneinfo
+from datetime import datetime
from pathlib import Path
+import aiohttp
+
+from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
+from shared.db import Database
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
-
+from shared.sentiment_models import MarketSentiment
+from shared.shutdown import GracefulShutdown
from strategy_engine.config import StrategyConfig
from strategy_engine.engine import StrategyEngine
from strategy_engine.plugin_loader import load_strategies
+from strategy_engine.stock_selector import StockSelector
# The strategies directory lives alongside the installed package
STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies"
@@ -30,23 +38,74 @@ async def process_symbol(engine: StrategyEngine, stream: str, log) -> None:
last_id = await engine.process_once(stream, last_id)
+async def run_stock_selector(
+ selector: StockSelector,
+ notifier: TelegramNotifier,
+ db: Database,
+ config: StrategyConfig,
+ log,
+) -> None:
+ """Run the stock selector once per day at the configured time."""
+ et = zoneinfo.ZoneInfo("America/New_York")
+
+ while True:
+ now_et = datetime.now(et)
+ target_hour, target_min = map(int, config.selector_final_time.split(":"))
+
+ if now_et.hour == target_hour and now_et.minute == target_min:
+ log.info("stock_selector_running")
+ try:
+ selections = await selector.select()
+ if selections:
+ ms_data = await db.get_latest_market_sentiment()
+ ms = None
+ if ms_data:
+ ms = MarketSentiment(**ms_data)
+ await notifier.send_stock_selection(selections, ms)
+ log.info("stock_selector_complete", picks=[s.symbol for s in selections])
+ else:
+ log.info("stock_selector_no_picks")
+ except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc:
+ log.warning("stock_selector_network_error", error=str(exc))
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("stock_selector_data_error", error=str(exc))
+ except Exception as exc:
+ log.error("stock_selector_error", error=str(exc), exc_info=True)
+ await asyncio.sleep(120) # Sleep past this minute
+ else:
+ await asyncio.sleep(30)
+
+
async def run() -> None:
config = StrategyConfig()
log = setup_logging("strategy-engine", config.log_level, config.log_format)
metrics = ServiceMetrics("strategy_engine")
notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token,
+ bot_token=config.telegram_bot_token.get_secret_value(),
chat_id=config.telegram_chat_id,
)
- broker = RedisBroker(config.redis_url)
+ broker = RedisBroker(config.redis_url.get_secret_value())
+
+ db = Database(config.database_url.get_secret_value())
+ await db.connect()
+
+ alpaca = AlpacaClient(
+ api_key=config.alpaca_api_key.get_secret_value(),
+ api_secret=config.alpaca_api_secret.get_secret_value(),
+ paper=config.alpaca_paper,
+ )
+
strategies = load_strategies(STRATEGIES_DIR)
for strategy in strategies:
params = config.strategy_params.get(strategy.name, {})
strategy.configure(params)
+ shutdown = GracefulShutdown()
+ shutdown.install_handlers()
+
log.info("loaded_strategies", count=len(strategies), names=[s.name for s in strategies])
engine = StrategyEngine(broker=broker, strategies=strategies)
@@ -67,10 +126,23 @@ async def run() -> None:
task = asyncio.create_task(process_symbol(engine, stream, log))
tasks.append(task)
- # Wait for all symbol processors (they run forever until cancelled)
- await asyncio.gather(*tasks)
+ if config.anthropic_api_key.get_secret_value():
+ selector = StockSelector(
+ db=db,
+ broker=broker,
+ alpaca=alpaca,
+ anthropic_api_key=config.anthropic_api_key.get_secret_value(),
+ anthropic_model=config.anthropic_model,
+ max_picks=config.selector_max_picks,
+ )
+ tasks.append(
+ asyncio.create_task(run_stock_selector(selector, notifier, db, config, log))
+ )
+ log.info("stock_selector_enabled", time=config.selector_final_time)
+
+ await shutdown.wait()
except Exception as exc:
- log.error("fatal_error", error=str(exc))
+ log.error("fatal_error", error=str(exc), exc_info=True)
await notifier.send_error(str(exc), "strategy-engine")
raise
finally:
@@ -79,6 +151,8 @@ async def run() -> None:
metrics.service_up.labels(service="strategy-engine").set(0)
await notifier.close()
await broker.close()
+ await alpaca.close()
+ await db.close()
def main() -> None:
diff --git a/services/strategy-engine/src/strategy_engine/plugin_loader.py b/services/strategy-engine/src/strategy_engine/plugin_loader.py
index 62e4160..57680db 100644
--- a/services/strategy-engine/src/strategy_engine/plugin_loader.py
+++ b/services/strategy-engine/src/strategy_engine/plugin_loader.py
@@ -5,7 +5,6 @@ import sys
from pathlib import Path
import yaml
-
from strategies.base import BaseStrategy
diff --git a/services/strategy-engine/src/strategy_engine/stock_selector.py b/services/strategy-engine/src/strategy_engine/stock_selector.py
new file mode 100644
index 0000000..8657b93
--- /dev/null
+++ b/services/strategy-engine/src/strategy_engine/stock_selector.py
@@ -0,0 +1,418 @@
+"""3-stage stock selector engine: sentiment → technical → LLM."""
+
+import asyncio
+import json
+import logging
+import re
+from datetime import UTC, datetime
+
+import aiohttp
+
+from shared.alpaca import AlpacaClient
+from shared.broker import RedisBroker
+from shared.db import Database
+from shared.models import OrderSide
+from shared.sentiment_models import Candidate, MarketSentiment, SelectedStock
+
+logger = logging.getLogger(__name__)
+
+ANTHROPIC_API_URL = "https://api.anthropic.com/v1/messages"
+
+
+def _extract_json_array(text: str) -> list[dict] | None:
+ """Extract a JSON array from text that may contain markdown code blocks."""
+ code_block = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL)
+ if code_block:
+ raw = code_block.group(1)
+ else:
+ array_match = re.search(r"\[.*\]", text, re.DOTALL)
+ if array_match:
+ raw = array_match.group(0)
+ else:
+ raw = text.strip()
+
+ try:
+ data = json.loads(raw)
+ if isinstance(data, list):
+ return [item for item in data if isinstance(item, dict)]
+ return None
+ except (json.JSONDecodeError, TypeError):
+ return None
+
+
+def _parse_llm_selections(text: str) -> list[SelectedStock]:
+ """Parse LLM response into SelectedStock list.
+
+ Handles both bare JSON arrays and markdown code blocks.
+ Returns empty list on any parse error.
+ """
+ items = _extract_json_array(text)
+ if items is None:
+ return []
+
+ selections = []
+ for item in items:
+ try:
+ selection = SelectedStock(
+ symbol=item["symbol"],
+ side=OrderSide(item["side"]),
+ conviction=float(item["conviction"]),
+ reason=item.get("reason", ""),
+ key_news=item.get("key_news", []),
+ )
+ selections.append(selection)
+ except (KeyError, ValueError) as e:
+ logger.warning("Skipping invalid selection item: %s", e)
+ return selections
+
+
+class SentimentCandidateSource:
+ """Generates candidates from DB sentiment scores."""
+
+ def __init__(self, db: Database) -> None:
+ self._db = db
+
+ async def get_candidates(self) -> list[Candidate]:
+ rows = await self._db.get_top_symbol_scores(limit=20)
+ candidates = []
+ for row in rows:
+ composite = float(row.get("composite", 0))
+ if composite == 0:
+ continue
+ candidates.append(
+ Candidate(
+ symbol=row["symbol"],
+ source="sentiment",
+ score=composite,
+ reason=f"composite={composite:.2f}, news_count={row.get('news_count', 0)}",
+ )
+ )
+ return candidates
+
+
+class LLMCandidateSource:
+ """Generates candidates by asking Claude to analyze recent news."""
+
+ def __init__(self, db: Database, api_key: str, model: str) -> None:
+ self._db = db
+ self._api_key = api_key
+ self._model = model
+
+ async def get_candidates(self, session: aiohttp.ClientSession | None = None) -> list[Candidate]:
+ news_items = await self._db.get_recent_news(hours=24)
+ if not news_items:
+ return []
+
+ headlines = []
+ for item in news_items[:50]: # cap at 50 to stay within context
+ symbols = item.get("symbols", [])
+ sym_str = ", ".join(symbols) if symbols else "N/A"
+ headlines.append(f"[{sym_str}] {item['headline']}")
+
+ prompt = (
+ "You are a stock analyst. Given recent news headlines, identify the 5-10 most "
+ "actionable US stock tickers. Return ONLY a JSON array with objects having: "
+ "symbol (ticker), direction ('BUY' or 'SELL'), score (0-1), reason (brief).\n\n"
+ "Headlines:\n" + "\n".join(headlines)
+ )
+
+ own_session = session is None
+ if own_session:
+ session = aiohttp.ClientSession()
+
+ try:
+ async with session.post(
+ ANTHROPIC_API_URL,
+ headers={
+ "x-api-key": self._api_key,
+ "anthropic-version": "2023-06-01",
+ "content-type": "application/json",
+ },
+ json={
+ "model": self._model,
+ "max_tokens": 1024,
+ "messages": [{"role": "user", "content": prompt}],
+ },
+ ) as resp:
+ if resp.status != 200:
+ body = await resp.text()
+ logger.error("LLM candidate source error %d: %s", resp.status, body)
+ return []
+ data = await resp.json()
+
+ content = data.get("content", [])
+ text = ""
+ for block in content:
+ if isinstance(block, dict) and block.get("type") == "text":
+ text += block.get("text", "")
+
+ return self._parse_candidates(text)
+ except Exception as e:
+ logger.error("LLMCandidateSource error: %s", e)
+ return []
+ finally:
+ if own_session:
+ await session.close()
+
+ def _parse_candidates(self, text: str) -> list[Candidate]:
+ items = _extract_json_array(text)
+ if items is None:
+ return []
+
+ candidates = []
+ for item in items:
+ try:
+ direction_str = item.get("direction", "BUY")
+ direction = OrderSide(direction_str)
+ except ValueError:
+ direction = None
+ candidates.append(
+ Candidate(
+ symbol=item["symbol"],
+ source="llm",
+ direction=direction,
+ score=float(item.get("score", 0.5)),
+ reason=item.get("reason", ""),
+ )
+ )
+ return candidates
+
+
+def _compute_rsi(closes: list[float], period: int = 14) -> float:
+ """Compute RSI for the last data point."""
+ if len(closes) < period + 1:
+ return 50.0 # neutral if insufficient data
+
+ deltas = [closes[i] - closes[i - 1] for i in range(1, len(closes))]
+ gains = [d if d > 0 else 0.0 for d in deltas]
+ losses = [-d if d < 0 else 0.0 for d in deltas]
+
+ avg_gain = sum(gains[:period]) / period
+ avg_loss = sum(losses[:period]) / period
+
+ for i in range(period, len(deltas)):
+ avg_gain = (avg_gain * (period - 1) + gains[i]) / period
+ avg_loss = (avg_loss * (period - 1) + losses[i]) / period
+
+ if avg_loss == 0:
+ return 100.0
+ rs = avg_gain / avg_loss
+ return 100.0 - (100.0 / (1.0 + rs))
+
+
+class StockSelector:
+ """Orchestrates the 3-stage stock selection pipeline."""
+
+ def __init__(
+ self,
+ db: Database,
+ broker: RedisBroker,
+ alpaca: AlpacaClient,
+ anthropic_api_key: str,
+ anthropic_model: str = "claude-sonnet-4-20250514",
+ max_picks: int = 3,
+ ) -> None:
+ self._db = db
+ self._broker = broker
+ self._alpaca = alpaca
+ self._api_key = anthropic_api_key
+ self._model = anthropic_model
+ self._max_picks = max_picks
+ self._http_session: aiohttp.ClientSession | None = None
+ self._session_lock = asyncio.Lock()
+
+ async def _ensure_session(self) -> aiohttp.ClientSession:
+ async with self._session_lock:
+ if self._http_session is None or self._http_session.closed:
+ self._http_session = aiohttp.ClientSession()
+ return self._http_session
+
+ async def close(self) -> None:
+ if self._http_session and not self._http_session.closed:
+ await self._http_session.close()
+
+ async def select(self) -> list[SelectedStock]:
+ """Run the full 3-stage pipeline and return selected stocks."""
+ # Market gate: check sentiment
+ sentiment_data = await self._db.get_latest_market_sentiment()
+ if sentiment_data is None:
+ logger.warning("No market sentiment data; skipping selection")
+ return []
+
+ market_sentiment = MarketSentiment(**sentiment_data)
+ if market_sentiment.market_regime == "risk_off":
+ logger.info("Market is risk_off; skipping stock selection")
+ return []
+
+ # Stage 1: gather candidates from both sources
+ sentiment_source = SentimentCandidateSource(self._db)
+ llm_source = LLMCandidateSource(self._db, self._api_key, self._model)
+
+ session = await self._ensure_session()
+ sentiment_candidates = await sentiment_source.get_candidates()
+ llm_candidates = await llm_source.get_candidates(session=session)
+
+ candidates = self._merge_candidates(sentiment_candidates, llm_candidates)
+ if not candidates:
+ logger.info("No candidates found")
+ return []
+
+ # Stage 2: technical filter
+ filtered = await self._technical_filter(candidates)
+ if not filtered:
+ logger.info("All candidates filtered out by technical criteria")
+ return []
+
+ # Stage 3: LLM final selection
+ selections = await self._llm_final_select(filtered, market_sentiment)
+
+ # Persist and publish
+ today = datetime.now(UTC).date()
+ sentiment_snapshot = {
+ "fear_greed": market_sentiment.fear_greed,
+ "market_regime": market_sentiment.market_regime,
+ "vix": market_sentiment.vix,
+ }
+ for stock in selections:
+ try:
+ await self._db.insert_stock_selection(
+ trade_date=today,
+ symbol=stock.symbol,
+ side=stock.side.value,
+ conviction=stock.conviction,
+ reason=stock.reason,
+ key_news=stock.key_news,
+ sentiment_snapshot=sentiment_snapshot,
+ )
+ except Exception as e:
+ logger.error("Failed to persist selection for %s: %s", stock.symbol, e)
+
+ try:
+ await self._broker.publish(
+ "selected_stocks",
+ {
+ "symbol": stock.symbol,
+ "side": stock.side.value,
+ "conviction": stock.conviction,
+ "reason": stock.reason,
+ "key_news": stock.key_news,
+ "trade_date": str(today),
+ },
+ )
+ except Exception as e:
+ logger.error("Failed to publish selection for %s: %s", stock.symbol, e)
+
+ return selections
+
+ def _merge_candidates(
+ self, sentiment: list[Candidate], llm: list[Candidate]
+ ) -> list[Candidate]:
+ """Deduplicate candidates by symbol, keeping the higher score."""
+ by_symbol: dict[str, Candidate] = {}
+ for c in sentiment + llm:
+ existing = by_symbol.get(c.symbol)
+ if existing is None or c.score > existing.score:
+ by_symbol[c.symbol] = c
+ return sorted(by_symbol.values(), key=lambda c: c.score, reverse=True)
+
+ async def _technical_filter(self, candidates: list[Candidate]) -> list[Candidate]:
+ """Filter candidates using RSI, EMA20, and volume criteria."""
+ passed = []
+ for candidate in candidates:
+ try:
+ bars = await self._alpaca.get_bars(candidate.symbol, timeframe="1Day", limit=60)
+ if len(bars) < 21:
+ logger.debug("Insufficient bars for %s", candidate.symbol)
+ continue
+
+ closes = [float(b["c"]) for b in bars]
+ volumes = [float(b["v"]) for b in bars]
+
+ rsi = _compute_rsi(closes)
+ if not (30 <= rsi <= 70):
+ logger.debug("%s RSI=%.1f outside 30-70", candidate.symbol, rsi)
+ continue
+
+ ema20 = sum(closes[-20:]) / 20 # simple approximation
+ current_price = closes[-1]
+ if current_price <= ema20:
+ logger.debug(
+ "%s price %.2f <= EMA20 %.2f", candidate.symbol, current_price, ema20
+ )
+ continue
+
+ avg_volume = sum(volumes[:-1]) / max(len(volumes) - 1, 1)
+ current_volume = volumes[-1]
+ if current_volume <= 0.5 * avg_volume:
+ logger.debug(
+ "%s volume %.0f <= 50%% avg %.0f",
+ candidate.symbol,
+ current_volume,
+ avg_volume,
+ )
+ continue
+
+ passed.append(candidate)
+ except Exception as e:
+ logger.warning("Technical filter error for %s: %s", candidate.symbol, e)
+
+ return passed
+
+ async def _llm_final_select(
+ self, candidates: list[Candidate], market_sentiment: MarketSentiment
+ ) -> list[SelectedStock]:
+ """Ask Claude to pick 2-3 stocks with rationale."""
+ candidate_lines = [
+ f"- {c.symbol} (source={c.source}, score={c.score:.2f}, reason={c.reason})"
+ for c in candidates
+ ]
+ market_context = (
+ f"Fear/Greed: {market_sentiment.fear_greed} ({market_sentiment.fear_greed_label}), "
+ f"VIX: {market_sentiment.vix}, "
+ f"Fed stance: {market_sentiment.fed_stance}, "
+ f"Regime: {market_sentiment.market_regime}"
+ )
+
+ prompt = (
+ f"You are a portfolio manager. Select 2-3 stocks for today's session.\n\n"
+ f"Market context: {market_context}\n\n"
+ f"Candidates (already passed technical filters):\n"
+ + "\n".join(candidate_lines)
+ + "\n\n"
+ "Return ONLY a JSON array with objects having:\n"
+ " symbol, side ('BUY' or 'SELL'), conviction (0-1), reason (1-2 sentences), "
+ "key_news (list of 1-3 relevant headlines or facts)\n"
+ f"Select at most {self._max_picks} stocks."
+ )
+
+ try:
+ session = await self._ensure_session()
+ async with session.post(
+ ANTHROPIC_API_URL,
+ headers={
+ "x-api-key": self._api_key,
+ "anthropic-version": "2023-06-01",
+ "content-type": "application/json",
+ },
+ json={
+ "model": self._model,
+ "max_tokens": 1024,
+ "messages": [{"role": "user", "content": prompt}],
+ },
+ ) as resp:
+ if resp.status != 200:
+ body = await resp.text()
+ logger.error("LLM final select error %d: %s", resp.status, body)
+ return []
+ data = await resp.json()
+
+ content = data.get("content", [])
+ text = ""
+ for block in content:
+ if isinstance(block, dict) and block.get("type") == "text":
+ text += block.get("text", "")
+
+ return _parse_llm_selections(text)[: self._max_picks]
+ except Exception as e:
+ logger.error("LLM final select error: %s", e)
+ return []