diff options
52 files changed, 7216 insertions, 567 deletions
diff --git a/.env.example b/.env.example index 7a2751f..bdc6a67 100644 --- a/.env.example +++ b/.env.example @@ -19,6 +19,17 @@ TELEGRAM_CHAT_ID= TELEGRAM_ENABLED=false LOG_FORMAT=json HEALTH_PORT=8080 -CIRCUIT_BREAKER_THRESHOLD=5 -CIRCUIT_BREAKER_TIMEOUT=60 METRICS_AUTH_TOKEN= + +# News Collector +FINNHUB_API_KEY= +NEWS_POLL_INTERVAL=300 +SENTIMENT_AGGREGATE_INTERVAL=900 + +# Stock Selector +SELECTOR_FINAL_TIME=15:30 +SELECTOR_MAX_PICKS=3 + +# LLM (for stock selector) +ANTHROPIC_API_KEY= +ANTHROPIC_MODEL=claude-sonnet-4-20250514 diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..6e33f57 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,106 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +US stock trading platform built as a Python microservices architecture. Uses Alpaca Markets API for market data and order execution. Services communicate via Redis Streams and persist to PostgreSQL. + +## Common Commands + +```bash +make infra # Start Redis + Postgres (required before running services/tests) +make up # Start all services via Docker Compose +make down # Stop all services +make test # Run all tests (pytest -v) +make lint # Lint check (ruff check + format check) +make format # Auto-fix lint + format +make migrate # Run DB migrations (alembic upgrade head, from shared/) +make migrate-new msg="description" # Create new migration +make ci # Full CI: install deps, lint, test, Docker build +make e2e # End-to-end tests +``` + +Run a single test file: `pytest services/strategy-engine/tests/test_rsi_strategy.py -v` + +## Architecture + +### Services (each in `services/<name>/`, each has its own Dockerfile) + +- **data-collector** (port 8080): Fetches stock bars from Alpaca, publishes `CandleEvent` to Redis stream `candles` +- **news-collector** (port 8084): Continuously collects news from 7 sources (Finnhub, RSS, SEC EDGAR, Truth Social, Reddit, Fear & Greed, Fed), runs sentiment aggregation every 15 min +- **strategy-engine** (port 8081): Consumes candle events, runs strategies, publishes `SignalEvent` to stream `signals`. Also runs the stock selector at 15:30 ET daily +- **order-executor** (port 8082): Consumes signals, runs risk checks, places orders via Alpaca, publishes `OrderEvent` to stream `orders` +- **portfolio-manager** (port 8083): Tracks positions, PnL, portfolio snapshots +- **api** (port 8000): FastAPI REST endpoint layer +- **backtester**: Offline backtesting engine with walk-forward analysis + +### Event Flow + +``` +Alpaca → data-collector → [candles stream] → strategy-engine → [signals stream] → order-executor → [orders stream] → portfolio-manager + +News sources → news-collector → [news stream] → sentiment aggregator → symbol_scores DB + ↓ + stock selector (15:30 ET) → [selected_stocks stream] → MOC strategy → signals +``` + +All inter-service events use `shared/src/shared/events.py` (CandleEvent, SignalEvent, OrderEvent, NewsEvent) serialized as JSON over Redis Streams via `shared/src/shared/broker.py` (RedisBroker). + +### Shared Library (`shared/`) + +Installed as editable package (`pip install -e shared/`). Contains: +- `models.py` — Pydantic domain models: Candle, Signal, Order, Position, NewsItem, NewsCategory +- `sentiment_models.py` — SymbolScore, MarketSentiment, SelectedStock, Candidate +- `sa_models.py` — SQLAlchemy ORM models (CandleRow, SignalRow, OrderRow, PortfolioSnapshotRow, NewsItemRow, SymbolScoreRow, MarketSentimentRow, StockSelectionRow) +- `broker.py` — RedisBroker (async Redis Streams pub/sub with consumer groups) +- `db.py` — Database class (async SQLAlchemy 2.0), includes news/sentiment/selection CRUD methods +- `alpaca.py` — AlpacaClient (async aiohttp client for Alpaca Trading + Market Data APIs) +- `events.py` — Event types and serialization (CandleEvent, SignalEvent, OrderEvent, NewsEvent) +- `sentiment.py` — SentimentData (legacy gating) + SentimentAggregator (freshness-weighted composite scoring) +- `config.py`, `logging.py`, `metrics.py`, `notifier.py` (Telegram), `resilience.py`, `healthcheck.py` + +DB migrations live in `shared/alembic/`. + +### Strategy System (`services/strategy-engine/strategies/`) + +Strategies extend `BaseStrategy` (in `strategies/base.py`) and implement `on_candle()`, `configure()`, `warmup_period`. The plugin loader (`strategy_engine/plugin_loader.py`) auto-discovers `*.py` files in the strategies directory and loads YAML config from `strategies/config/<strategy_name>.yaml`. + +BaseStrategy provides optional filters (ADX regime, volume, ATR-based stops) via `_init_filters()` and `_apply_filters()`. + +### News-Driven Stock Selector (`services/strategy-engine/src/strategy_engine/stock_selector.py`) + +Dynamic stock selection for MOC (Market on Close) trading. Runs daily at 15:30 ET via `strategy-engine`: + +1. **Candidate Pool**: Top 20 by sentiment score + LLM-recommended stocks from today's news +2. **Technical Filter**: RSI 30-70, price > 20 EMA, volume > 50% average +3. **LLM Final Selection**: Claude picks 2-3 stocks with rationale + +Market gating: blocks all trades when Fear & Greed ≤ 20 or VIX > 30 (`risk_off` regime). + +### News Collector (`services/news-collector/`) + +7 collectors extending `BaseCollector` in `collectors/`: +- `finnhub.py` (5min), `rss.py` (10min), `reddit.py` (15min), `truth_social.py` (15min), `sec_edgar.py` (30min), `fear_greed.py` (1hr), `fed.py` (1hr) +- All use VADER (nltk) for sentiment scoring +- Provider abstraction via `BaseCollector` for future paid API swap (config change only) + +Sentiment aggregation (every 15min) computes per-symbol composite scores with freshness decay and category weights (policy 0.3, news 0.3, social 0.2, filing 0.2). + +### CLI (`cli/`) + +Click-based CLI installed as `trading` command. Depends on the shared library. + +## Tech Stack + +- Python 3.12+, async throughout (asyncio, aiohttp) +- Pydantic for models, SQLAlchemy 2.0 async ORM, Alembic for migrations +- Redis Streams for inter-service messaging +- PostgreSQL 16 for persistence +- Ruff for linting/formatting (line-length=100) +- pytest + pytest-asyncio (asyncio_mode="auto") +- Docker Compose for deployment; monitoring stack (Grafana/Prometheus/Loki) available via `--profile monitoring` + +## Environment + +Copy `.env.example` to `.env`. Key vars: `ALPACA_API_KEY`, `ALPACA_API_SECRET`, `ALPACA_PAPER=true`, `DRY_RUN=true`, `DATABASE_URL`, `REDIS_URL`, `FINNHUB_API_KEY`, `ANTHROPIC_API_KEY`. DRY_RUN=true simulates order fills without hitting Alpaca. Stock selector requires `ANTHROPIC_API_KEY` to be set. diff --git a/docker-compose.yml b/docker-compose.yml index e981f74..63630ff 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -22,7 +22,7 @@ services: volumes: - postgres_data:/var/lib/postgresql/data healthcheck: - test: ["CMD-LINE", "pg_isready", "-U", "trading"] + test: ["CMD", "pg_isready", "-U", "trading"] interval: 5s timeout: 3s retries: 5 @@ -122,6 +122,25 @@ services: retries: 3 restart: unless-stopped + news-collector: + build: + context: . + dockerfile: services/news-collector/Dockerfile + env_file: .env + ports: + - "8084:8084" + depends_on: + redis: + condition: service_healthy + postgres: + condition: service_healthy + healthcheck: + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8084/health')"] + interval: 10s + timeout: 5s + retries: 3 + restart: unless-stopped + loki: image: grafana/loki:latest profiles: ["monitoring"] diff --git a/docs/TODO.md b/docs/TODO.md index d737b0f..fd4f7eb 100644 --- a/docs/TODO.md +++ b/docs/TODO.md @@ -1,201 +1,147 @@ -# Trading Platform — TODO +# US Stock Trading Platform — TODO -> Last updated: 2026-04-01 +> Last updated: 2026-04-02 ## Current State -- **298 tests**, lint clean, production-ready 인프라 -- 8 strategies, 6 services, full monitoring/CI/CD stack -- **트레이딩 전략 업그레이드 필요** — 아래 상세 +- **375 tests**, lint clean +- **US 주식 전용** (Alpaca API, 수수료 0%) +- 6 microservices + CLI + shared library +- MOC (Market on Close) 전략 + 기술적 전략 7개 +- Prometheus/Grafana/Loki 모니터링, CI/CD, Telegram 알림 +- Claude 기반 종목 스크리너 --- -## Trading Strategy Upgrade Plan - -### Phase 1: 백테스터 현실화 (최우선) - -현재 백테스트 결과는 슬리피지/수수료 미포함으로 **실제 수익과 큰 차이**가 있음. 이것부터 수정해야 전략 개선의 효과를 정확히 측정 가능. - -#### 1-1. 슬리피지 + 수수료 모델링 -- **파일:** `backtester/simulator.py` -- **현재:** 시그널 가격 그대로 체결, 수수료 0 -- **수정:** 매수 시 `price * (1 + slippage_pct)`, 매도 시 `price * (1 - slippage_pct)` -- 수수료: `cost = price * quantity * fee_pct` (maker: 0.05%, taker: 0.1%) -- 슬리피지를 주문 크기에 비례하게 (대형 주문 → 더 큰 슬리피지) -- **설정:** `BacktestConfig`에 `slippage_pct`, `maker_fee_pct`, `taker_fee_pct` 추가 - -#### 1-2. 손절/익절 자동 실행 -- **파일:** `backtester/simulator.py` -- **현재:** 시그널로만 매매, 스탑 없음 -- **수정:** 각 포지션에 stop_loss, take_profit 가격 추적 -- 매 캔들마다 `high >= take_profit` 또는 `low <= stop_loss` 체크 → 자동 청산 -- `engine.py`에서 캔들 처리 시 시뮬레이터에 현재 가격 전달 - -#### 1-3. 공매도 지원 -- **파일:** `backtester/simulator.py` -- **현재:** 매도는 보유 수량 내에서만 가능 -- **수정:** `allow_short: bool` 설정, 공매도 시 음수 포지션 허용 -- 공매도 수수료 (borrow fee) 추가 - -#### 1-4. Walk-Forward Analysis -- **파일:** `backtester/engine.py` (신규 클래스) -- **현재:** 전체 데이터로 백테스트 → 과적합 위험 -- **수정:** `WalkForwardEngine` 클래스 - - 데이터를 N개 구간으로 분할 - - 각 구간: in-sample (파라미터 최적화) → out-of-sample (검증) - - 최종 결과는 out-of-sample 구간만 합산 -- 파라미터 최적화: grid search 또는 random search - -#### 1-5. 메트릭 정확도 개선 -- **파일:** `backtester/metrics.py` -- Sharpe/Sortino를 per-trade가 아닌 **일별 수익률 기반**으로 계산 -- Risk-free rate 설정 추가 (기본 5%) -- Recovery Factor (총수익 / 최대 drawdown) 추가 -- 최대 연속 손실 횟수 추가 -- 인트라 트레이드 drawdown (진입 후 최저점) 계산 +## Architecture ---- +``` +Alpaca API → data-collector (REST polling) + → Redis Streams → strategy-engine (MOC + 기술전략) + → signals → order-executor (Alpaca 주문) + → orders → portfolio-manager (포지션 추적) -### Phase 2: 전략 공통 인프라 - -모든 전략에 적용할 공통 기능. 개별 전략 개선 전에 인프라를 먼저 구축. - -#### 2-1. ATR 기반 동적 손절/익절 -- **파일:** `strategies/base.py` + 각 전략 -- **현재:** 어떤 전략도 손절/익절을 설정하지 않음 -- **수정:** `BaseStrategy`에 `calculate_stop_loss(candle, atr)` 메서드 추가 -- ATR (Average True Range) 유틸리티 함수 (`shared/` 또는 `strategies/indicators/`) -- 손절: entry - ATR * multiplier, 익절: entry + ATR * reward_ratio -- Signal에 `stop_loss`, `take_profit` 필드 추가 (`shared/models.py`) - -#### 2-2. 추세/횡보 레짐 필터 (ADX) -- **파일:** `strategies/indicators/adx.py` (신규) -- ADX (Average Directional Index) 계산 유틸리티 -- ADX > 25 = 추세장, ADX < 20 = 횡보장 -- 각 전략이 레짐에 따라 동작 변경: - - 추세 추종 전략 (MACD, EMA): ADX < 20이면 시그널 무시 - - 평균 회귀 전략 (RSI, Bollinger, Grid): ADX > 30이면 시그널 무시 - -#### 2-3. 볼륨 확인 필터 -- **파일:** 각 전략 -- **현재:** 모든 전략이 볼륨을 무시 (RSI, MACD, EMA 등) -- **수정:** 시그널 발생 시 해당 캔들의 볼륨이 최근 N개 평균 대비 일정 비율 이상인지 확인 -- 볼륨 < 평균의 50%면 시그널 무시 (유동성 부족) -- 볼륨 > 평균의 200%면 시그널 가중치 증가 - -#### 2-4. 시그널 강도 (Conviction Score) -- **파일:** `shared/models.py` Signal + 각 전략 -- **현재:** 시그널은 BUY/SELL/quantity만 있음 -- **수정:** Signal에 `conviction: float` (0.0~1.0) 필드 추가 - - RSI 5 → conviction 0.9, RSI 28 → conviction 0.3 - - MACD 히스토그램이 0에서 먼 크로스 → 높은 conviction -- Combined 전략에서 conviction 기반 가중치 사용 -- RiskManager에서 conviction 기반 포지션 사이징 - -#### 2-5. 지표 라이브러리 -- **디렉토리:** `strategies/indicators/` (신규) -- 재사용 가능한 기술 지표 함수: - - `atr(highs, lows, closes, period)` — Average True Range - - `adx(highs, lows, closes, period)` — Average Directional Index - - `ema(series, period)` — Exponential Moving Average - - `sma(series, period)` — Simple Moving Average - - `rsi(closes, period)` — RSI - - `bollinger_bands(closes, period, num_std)` — Bollinger - - `macd(closes, fast, slow, signal)` — MACD - - `volume_sma(volumes, period)` — Volume SMA -- 각 전략에서 직접 계산하지 않고 공통 라이브러리 사용 -- 테스트: 각 지표에 대한 unit test +Claude API → stock_screener.py (종목 분석/추천) +FastAPI → REST API (/api/v1/portfolio, orders, strategies) +``` ---- +## 핵심 매매 전략: MOC (Market on Close) -### Phase 3: 개별 전략 고도화 - -Phase 1-2 완료 후 각 전략을 전문가 수준으로 업그레이드. - -#### 3-1. RSI 전략 개선 -- [ ] RSI 다이버전스 감지 (가격 신고가 + RSI 하락 = 약세 다이버전스) -- [ ] ADX 레짐 필터 적용 (추세장에서는 RSI 매수 신호 무시) -- [ ] RSI 강도별 conviction score (RSI 5 vs RSI 28) -- [ ] ATR 기반 손절/익절 -- [ ] 볼륨 확인 필터 - -#### 3-2. MACD 전략 개선 -- [ ] 히스토그램 크로스오버 + MACD 제로라인 크로스오버 구분 -- [ ] MACD 다이버전스 감지 -- [ ] ADX 추세 확인 (ADX < 20이면 시그널 무시) -- [ ] 제로라인으로부터 거리 기반 시그널 강도 -- [ ] ATR 기반 손절 - -#### 3-3. Grid 전략 개선 -- [ ] ADX 기반 레짐 필터 (추세장 진입 차단) -- [ ] 동적 그리드 재설정 (실현 변동성 기반 범위 조정) -- [ ] 그리드 외 이탈 시 전 포지션 청산 + 알림 -- [ ] 볼륨 프로파일 기반 비균등 그리드 간격 - -#### 3-4. Bollinger Bands 전략 개선 -- [ ] 스퀴즈 감지 (밴드 압축 → 브레이크아웃 대비) -- [ ] %B 지표 활용 (밴드 내 위치 0~1) -- [ ] RSI 확인 (하단 밴드 터치 + RSI < 30 = 강한 매수) -- [ ] 볼륨 스파이크 확인 - -#### 3-5. EMA Crossover 전략 개선 -- [ ] ADX > 25 필터 (강한 추세만 진입) -- [ ] 풀백 진입 (크로스 후 단기 EMA로 되돌림 시 진입) -- [ ] 50 SMA 위/아래 필터 (장기 추세 방향 확인) -- [ ] 볼륨 확인 - -#### 3-6. VWAP 전략 개선 -- [ ] 일중 리셋 (매일 00:00 UTC에 VWAP 재계산) -- [ ] VWAP 표준편차 밴드 추가 (1σ, 2σ) -- [ ] ATR 기반 deviation threshold (고정값 대신 변동성 적응형) -- [ ] 세션 필터 (저유동성 시간대 진입 차단) - -#### 3-7. Volume Profile 전략 개선 -- [ ] HVN/LVN (고/저볼륨 노드) 식별 -- [ ] 세션 기반 프로파일 리셋 -- [ ] POC를 동적 지지/저항선으로 활용 -- [ ] 볼륨 델타 (매수량 - 매도량) 추적 - -#### 3-8. Combined 전략 개선 -- [ ] Sub-strategy conviction score 반영 -- [ ] Sub-strategy 간 상관관계 행렬 계산 → 중복 시그널 감쇄 -- [ ] 적응형 가중치 (최근 win rate 기반 동적 가중치 조정) -- [ ] 포트폴리오 집중도 제한 +``` +[매일 ET 15:50] Claude 종목 분석 → 매수 종목 선정 +[ET 15:50~16:00] 장 마감 직전 매수 (MOC 주문) +[다음날 ET 9:35~10:00] 시가 매도 + +조건: 양봉 + 볼륨 > 평균 + RSI 30~60 + EMA 위 + 모멘텀 양호 +손절: -2%, 포지션당 자본금 20%, 최대 5종목 +``` --- -### Phase 4: 리스크 관리 고도화 +## Remaining Work + +### 즉시 해야 할 것 -#### 4-1. 포트폴리오 레벨 리스크 -- [ ] 전체 노출도 제한 (총 포지션 가치 / 잔고 비율) -- [ ] 포지션 간 상관관계 계산 → 실효 리스크 산출 -- [ ] VaR (Value at Risk) 계산 — 95% 신뢰 구간 +#### 1. MOC 백테스트 스크립트 +- [ ] `scripts/backtest_moc.py` — 합성 데이터로 MOC 전략 파라미터 최적화 +- [ ] 5개 주식 (AAPL, MSFT, TSLA, NVDA, AMZN) 대상 90일 백테스트 +- [ ] RSI 범위, 손절률, EMA period 그리드 서치 +- [ ] Makefile target: `make backtest-moc` -#### 4-2. 동적 포지션 축소 -- [ ] Drawdown이 일정 수준 넘으면 포지션 크기 자동 축소 -- [ ] 연속 손실 N회 시 거래 일시 중단 → Telegram 알림 -- [ ] 시간대별 리스크 조정 (주말, 공휴일 축소) +#### 2. 실제 데이터 백테스트 +- [ ] Alpaca API로 과거 데이터 다운로드 → DB 저장 +- [ ] 실제 데이터 기반 MOC 전략 검증 +- [ ] Walk-forward analysis로 과적합 확인 -#### 4-3. 시나리오 분석 -- [ ] 과거 극단 이벤트 (FTX 사태, Luna 등)에 대한 포트폴리오 영향 시뮬레이션 -- [ ] 유동성 리스크 체크 (주문 크기 vs 호가창 깊이) +#### 3. Paper Trading 배포 +- [ ] `.env` 설정 (ALPACA_PAPER=true) +- [ ] `make up` → 전 서비스 실행 +- [ ] 2-4주 모의 매매 → 실제 성과 확인 +- [ ] Telegram 알림으로 매일 결과 수신 --- -## Priority & Effort +### 개선 사항 -| Phase | 내용 | 예상 작업량 | 영향 | -|-------|------|------------|------| -| **Phase 1** | 백테스터 현실화 | 1-2일 | **최대** — 이것 없이는 전략 평가 불가 | -| **Phase 2** | 전략 공통 인프라 | 1-2일 | **높음** — 모든 전략의 기반 | -| **Phase 3** | 개별 전략 고도화 | 3-5일 | 중간 — 수익률 직접 개선 | -| **Phase 4** | 리스크 관리 고도화 | 2-3일 | 높음 — 손실 방지 | +#### 4. Claude 스크리너 고도화 +- [ ] SEC 공시 분석 (10-K, 10-Q, 8-K) +- [ ] 실적 서프라이즈 감지 (EPS beat/miss) +- [ ] 섹터 로테이션 분석 +- [ ] 뉴스 감성 분석 (Yahoo Finance, MarketWatch) -**권장 순서: Phase 1 → Phase 2 → Phase 4 → Phase 3** -(전략 개선보다 리스크 관리가 더 중요 — 돈을 벌기 전에 잃지 않는 게 먼저) +#### 5. 주문 유형 확장 +- [ ] Limit order 지원 +- [ ] 프리마켓/애프터마켓 주문 +- [ ] 분할 매수/매도 + +#### 6. 추가 전략 +- [ ] ORB (Opening Range Breakout) — 장 시작 30분 전략 +- [ ] Gap & Go — 갭 상승 종목 추격 전략 +- [ ] Earnings Play — 실적 발표 전후 전략 + +#### 7. 리스크 관리 개선 +- [ ] 섹터 집중도 제한 (같은 섹터 3개 이상 금지) +- [ ] 실적 발표일 매매 회피 +- [ ] 시장 전체 하락 시 매매 중단 (SPY RSI 기반) --- -## Previously Completed (Infrastructure) +## Quick Start + +```bash +# 1. 환경 설정 +cp .env.example .env +# ALPACA_API_KEY, ALPACA_API_SECRET 입력 +# ANTHROPIC_API_KEY 입력 (Claude 스크리너용) + +# 2. 의존성 설치 +pip install -e shared/ + +# 3. 인프라 실행 +make infra # Redis + PostgreSQL +make migrate # DB 마이그레이션 + +# 4. 테스트 +make test # 375 tests + +# 5. 종목 스크리닝 +make screen # Claude가 33개 종목 분석 → Top 5 추천 + +# 6. 서비스 실행 +make up # 전 서비스 시작 (paper trading) + +# 7. 모니터링 +docker compose --profile monitoring up -d +# Grafana: http://localhost:3000 +# API: http://localhost:8000/api/v1/strategies + +# 8. CLI +trading strategy list +trading backtest run --strategy moc --symbol AAPL --timeframe 1Day +trading portfolio show +``` + +--- -모든 인프라 항목 완료 (27개): SQLAlchemy, Alembic, structlog, Telegram, Prometheus/Grafana/Loki, CI/CD, FastAPI, multi-exchange, Redis consumer groups, realized PnL, bearer auth, 298 tests. +## Completed (Infrastructure) + +- [x] Alpaca API 클라이언트 (paper + live) +- [x] MOC 전략 (종가 매수 / 시가 매도) +- [x] Claude 종목 스크리너 (33개 유니버스) +- [x] Data collector (Alpaca REST polling) +- [x] Order executor (Alpaca submit_order) +- [x] SQLAlchemy ORM + Alembic 마이그레이션 +- [x] structlog 구조화 로깅 +- [x] Telegram 알림 +- [x] Retry + Circuit Breaker +- [x] Prometheus + Grafana + Loki +- [x] Redis consumer groups +- [x] Portfolio snapshots + realized PnL +- [x] Bearer token auth +- [x] CI/CD (Gitea Actions) +- [x] E2E test script +- [x] FastAPI REST API +- [x] 백테스터 (슬리피지, 수수료, SL/TP, 공매도, walk-forward) +- [x] 기술 지표 라이브러리 (ATR, ADX, RSI, MACD, Bollinger, Stochastic, OBV) +- [x] 포트폴리오 VaR, 상관관계, drawdown 기반 리스크 +- [x] 375 tests, lint clean diff --git a/docs/superpowers/plans/2026-04-02-news-driven-stock-selector.md b/docs/superpowers/plans/2026-04-02-news-driven-stock-selector.md new file mode 100644 index 0000000..0964f21 --- /dev/null +++ b/docs/superpowers/plans/2026-04-02-news-driven-stock-selector.md @@ -0,0 +1,3689 @@ +# News-Driven Stock Selector Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Replace the MOC strategy's fixed symbol list with a dynamic, news-driven stock selection system that continuously collects news/sentiment data and selects 2-3 optimal stocks daily before market close. + +**Architecture:** A new `news-collector` service runs 7 data source collectors on individual poll intervals, storing `NewsItem` records in PostgreSQL and publishing to Redis. A sentiment aggregator computes per-symbol composite scores every 15 minutes. Before market close, a 3-stage stock selector (sentiment candidates → technical filter → LLM final pick) chooses 2-3 stocks and feeds them to the existing MOC strategy. + +**Tech Stack:** Python 3.12+, asyncio, aiohttp, Pydantic, SQLAlchemy 2.0 async, Redis Streams, VADER (nltk), feedparser (RSS), Anthropic SDK (Claude API), Alembic + +--- + +## Phase 1: Shared Foundation (Models, DB, Events) + +### Task 1: Add NewsItem and sentiment models to shared + +**Files:** +- Modify: `shared/src/shared/models.py` +- Create: `shared/src/shared/sentiment_models.py` +- Create: `shared/tests/test_sentiment_models.py` + +- [ ] **Step 1: Write tests for new models** + +Create `shared/tests/test_sentiment_models.py`: + +```python +"""Tests for news and sentiment models.""" + +import pytest +from datetime import datetime, timezone + +from shared.models import NewsCategory, NewsItem, OrderSide +from shared.sentiment_models import SymbolScore, MarketSentiment, SelectedStock, Candidate + + +def test_news_item_defaults(): + item = NewsItem( + source="finnhub", + headline="Test headline", + published_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + sentiment=0.5, + category=NewsCategory.MACRO, + ) + assert item.id # UUID generated + assert item.symbols == [] + assert item.summary is None + assert item.raw_data == {} + assert item.created_at is not None + + +def test_news_item_with_symbols(): + item = NewsItem( + source="rss", + headline="AAPL earnings beat", + published_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + sentiment=0.8, + category=NewsCategory.EARNINGS, + symbols=["AAPL"], + ) + assert item.symbols == ["AAPL"] + assert item.category == NewsCategory.EARNINGS + + +def test_news_category_values(): + assert NewsCategory.POLICY == "policy" + assert NewsCategory.EARNINGS == "earnings" + assert NewsCategory.MACRO == "macro" + assert NewsCategory.SOCIAL == "social" + assert NewsCategory.FILING == "filing" + assert NewsCategory.FED == "fed" + + +def test_symbol_score(): + score = SymbolScore( + symbol="AAPL", + news_score=0.5, + news_count=10, + social_score=0.3, + policy_score=0.0, + filing_score=0.2, + composite=0.3, + updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + ) + assert score.symbol == "AAPL" + assert score.composite == 0.3 + + +def test_market_sentiment(): + ms = MarketSentiment( + fear_greed=25, + fear_greed_label="Extreme Fear", + vix=32.5, + fed_stance="hawkish", + market_regime="risk_off", + updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + ) + assert ms.market_regime == "risk_off" + assert ms.vix == 32.5 + + +def test_market_sentiment_no_vix(): + ms = MarketSentiment( + fear_greed=50, + fear_greed_label="Neutral", + fed_stance="neutral", + market_regime="neutral", + updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + ) + assert ms.vix is None + + +def test_selected_stock(): + ss = SelectedStock( + symbol="NVDA", + side=OrderSide.BUY, + conviction=0.85, + reason="CHIPS Act expansion", + key_news=["Trump signs CHIPS Act expansion"], + ) + assert ss.conviction == 0.85 + assert len(ss.key_news) == 1 + + +def test_candidate(): + c = Candidate( + symbol="TSLA", + source="sentiment", + direction=OrderSide.BUY, + score=0.75, + reason="High social buzz", + ) + assert c.direction == OrderSide.BUY + + c2 = Candidate( + symbol="XOM", + source="llm", + score=0.6, + reason="Oil price surge", + ) + assert c2.direction is None +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest shared/tests/test_sentiment_models.py -v` +Expected: FAIL — `NewsCategory`, `NewsItem` not found in `shared.models`, `shared.sentiment_models` does not exist + +- [ ] **Step 3: Add NewsCategory and NewsItem to shared/models.py** + +Add to the end of `shared/src/shared/models.py`: + +```python +class NewsCategory(str, Enum): + POLICY = "policy" + EARNINGS = "earnings" + MACRO = "macro" + SOCIAL = "social" + FILING = "filing" + FED = "fed" + + +class NewsItem(BaseModel): + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + source: str + headline: str + summary: Optional[str] = None + url: Optional[str] = None + published_at: datetime + symbols: list[str] = [] + sentiment: float + category: NewsCategory + raw_data: dict = {} + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) +``` + +- [ ] **Step 4: Create shared/src/shared/sentiment_models.py** + +```python +"""Sentiment scoring and stock selection models.""" + +from datetime import datetime +from typing import Optional + +from pydantic import BaseModel + +from shared.models import OrderSide + + +class SymbolScore(BaseModel): + symbol: str + news_score: float + news_count: int + social_score: float + policy_score: float + filing_score: float + composite: float + updated_at: datetime + + +class MarketSentiment(BaseModel): + fear_greed: int + fear_greed_label: str + vix: Optional[float] = None + fed_stance: str + market_regime: str + updated_at: datetime + + +class SelectedStock(BaseModel): + symbol: str + side: OrderSide + conviction: float + reason: str + key_news: list[str] + + +class Candidate(BaseModel): + symbol: str + source: str + direction: Optional[OrderSide] = None + score: float + reason: str +``` + +- [ ] **Step 5: Run tests to verify they pass** + +Run: `pytest shared/tests/test_sentiment_models.py -v` +Expected: All 9 tests PASS + +- [ ] **Step 6: Commit** + +```bash +git add shared/src/shared/models.py shared/src/shared/sentiment_models.py shared/tests/test_sentiment_models.py +git commit -m "feat: add NewsItem, sentiment scoring, and stock selection models" +``` + +--- + +### Task 2: Add SQLAlchemy ORM models for news tables + +**Files:** +- Modify: `shared/src/shared/sa_models.py` +- Create: `shared/tests/test_sa_news_models.py` + +- [ ] **Step 1: Write tests for new SA models** + +Create `shared/tests/test_sa_news_models.py`: + +```python +"""Tests for news-related SQLAlchemy models.""" + +from shared.sa_models import NewsItemRow, SymbolScoreRow, MarketSentimentRow, StockSelectionRow + + +def test_news_item_row_tablename(): + assert NewsItemRow.__tablename__ == "news_items" + + +def test_symbol_score_row_tablename(): + assert SymbolScoreRow.__tablename__ == "symbol_scores" + + +def test_market_sentiment_row_tablename(): + assert MarketSentimentRow.__tablename__ == "market_sentiment" + + +def test_stock_selection_row_tablename(): + assert StockSelectionRow.__tablename__ == "stock_selections" + + +def test_news_item_row_columns(): + cols = {c.name for c in NewsItemRow.__table__.columns} + assert cols >= {"id", "source", "headline", "published_at", "sentiment", "category"} + + +def test_symbol_score_row_columns(): + cols = {c.name for c in SymbolScoreRow.__table__.columns} + assert cols >= {"id", "symbol", "news_score", "composite", "updated_at"} +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest shared/tests/test_sa_news_models.py -v` +Expected: FAIL — import errors + +- [ ] **Step 3: Add ORM models to sa_models.py** + +Add to the end of `shared/src/shared/sa_models.py`: + +```python +class NewsItemRow(Base): + __tablename__ = "news_items" + + id: Mapped[str] = mapped_column(Text, primary_key=True) + source: Mapped[str] = mapped_column(Text, nullable=False) + headline: Mapped[str] = mapped_column(Text, nullable=False) + summary: Mapped[str | None] = mapped_column(Text) + url: Mapped[str | None] = mapped_column(Text) + published_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + symbols: Mapped[str | None] = mapped_column(Text) # JSON-encoded list + sentiment: Mapped[float] = mapped_column(sa.Float, nullable=False) + category: Mapped[str] = mapped_column(Text, nullable=False) + raw_data: Mapped[str | None] = mapped_column(Text) # JSON string + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=sa.func.now() + ) + + +class SymbolScoreRow(Base): + __tablename__ = "symbol_scores" + + id: Mapped[str] = mapped_column(Text, primary_key=True) + symbol: Mapped[str] = mapped_column(Text, nullable=False, unique=True) + news_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0") + news_count: Mapped[int] = mapped_column(sa.Integer, nullable=False, server_default="0") + social_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0") + policy_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0") + filing_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0") + composite: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0") + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + + +class MarketSentimentRow(Base): + __tablename__ = "market_sentiment" + + id: Mapped[str] = mapped_column(Text, primary_key=True) + fear_greed: Mapped[int] = mapped_column(sa.Integer, nullable=False) + fear_greed_label: Mapped[str] = mapped_column(Text, nullable=False) + vix: Mapped[float | None] = mapped_column(sa.Float) + fed_stance: Mapped[str] = mapped_column(Text, nullable=False, server_default="neutral") + market_regime: Mapped[str] = mapped_column(Text, nullable=False, server_default="neutral") + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + + +class StockSelectionRow(Base): + __tablename__ = "stock_selections" + + id: Mapped[str] = mapped_column(Text, primary_key=True) + trade_date: Mapped[datetime] = mapped_column(sa.Date, nullable=False) + symbol: Mapped[str] = mapped_column(Text, nullable=False) + side: Mapped[str] = mapped_column(Text, nullable=False) + conviction: Mapped[float] = mapped_column(sa.Float, nullable=False) + reason: Mapped[str] = mapped_column(Text, nullable=False) + key_news: Mapped[str | None] = mapped_column(Text) # JSON string + sentiment_snapshot: Mapped[str | None] = mapped_column(Text) # JSON string + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=sa.func.now() + ) +``` + +Also add `import sqlalchemy as sa` to the imports at the top of `sa_models.py`. + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest shared/tests/test_sa_news_models.py -v` +Expected: All 6 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add shared/src/shared/sa_models.py shared/tests/test_sa_news_models.py +git commit -m "feat: add SQLAlchemy ORM models for news, scores, selections" +``` + +--- + +### Task 3: Create Alembic migration for news tables + +**Files:** +- Create: `shared/alembic/versions/002_news_sentiment_tables.py` + +- [ ] **Step 1: Create migration file** + +Create `shared/alembic/versions/002_news_sentiment_tables.py`: + +```python +"""Add news, sentiment, and stock selection tables + +Revision ID: 002 +Revises: 001 +Create Date: 2026-04-02 +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +revision: str = "002" +down_revision: Union[str, None] = "001" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "news_items", + sa.Column("id", sa.Text, primary_key=True), + sa.Column("source", sa.Text, nullable=False), + sa.Column("headline", sa.Text, nullable=False), + sa.Column("summary", sa.Text), + sa.Column("url", sa.Text), + sa.Column("published_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("symbols", sa.Text), + sa.Column("sentiment", sa.Float, nullable=False), + sa.Column("category", sa.Text, nullable=False), + sa.Column("raw_data", sa.Text), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + ) + op.create_index("idx_news_items_published", "news_items", ["published_at"]) + op.create_index("idx_news_items_source", "news_items", ["source"]) + + op.create_table( + "symbol_scores", + sa.Column("id", sa.Text, primary_key=True), + sa.Column("symbol", sa.Text, nullable=False, unique=True), + sa.Column("news_score", sa.Float, nullable=False, server_default="0"), + sa.Column("news_count", sa.Integer, nullable=False, server_default="0"), + sa.Column("social_score", sa.Float, nullable=False, server_default="0"), + sa.Column("policy_score", sa.Float, nullable=False, server_default="0"), + sa.Column("filing_score", sa.Float, nullable=False, server_default="0"), + sa.Column("composite", sa.Float, nullable=False, server_default="0"), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + ) + + op.create_table( + "market_sentiment", + sa.Column("id", sa.Text, primary_key=True), + sa.Column("fear_greed", sa.Integer, nullable=False), + sa.Column("fear_greed_label", sa.Text, nullable=False), + sa.Column("vix", sa.Float), + sa.Column("fed_stance", sa.Text, nullable=False, server_default="neutral"), + sa.Column("market_regime", sa.Text, nullable=False, server_default="neutral"), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + ) + + op.create_table( + "stock_selections", + sa.Column("id", sa.Text, primary_key=True), + sa.Column("trade_date", sa.Date, nullable=False), + sa.Column("symbol", sa.Text, nullable=False), + sa.Column("side", sa.Text, nullable=False), + sa.Column("conviction", sa.Float, nullable=False), + sa.Column("reason", sa.Text, nullable=False), + sa.Column("key_news", sa.Text), + sa.Column("sentiment_snapshot", sa.Text), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + ) + op.create_index("idx_stock_selections_date", "stock_selections", ["trade_date"]) + + +def downgrade() -> None: + op.drop_table("stock_selections") + op.drop_table("market_sentiment") + op.drop_table("symbol_scores") + op.drop_table("news_items") +``` + +- [ ] **Step 2: Verify migration imports correctly** + +Run: `cd shared && python -c "from alembic.versions import *; print('OK')" && cd ..` +Or simply: `python -c "import importlib.util; s=importlib.util.spec_from_file_location('m','shared/alembic/versions/002_news_sentiment_tables.py'); m=importlib.util.module_from_spec(s); s.loader.exec_module(m); print('OK')"` +Expected: OK (no import errors) + +- [ ] **Step 3: Commit** + +```bash +git add shared/alembic/versions/002_news_sentiment_tables.py +git commit -m "feat: add Alembic migration for news and sentiment tables" +``` + +--- + +### Task 4: Add NewsEvent to shared events and DB methods for news + +**Files:** +- Modify: `shared/src/shared/events.py` +- Modify: `shared/src/shared/db.py` +- Create: `shared/tests/test_news_events.py` +- Create: `shared/tests/test_db_news.py` + +- [ ] **Step 1: Write tests for NewsEvent** + +Create `shared/tests/test_news_events.py`: + +```python +"""Tests for NewsEvent.""" + +from datetime import datetime, timezone + +from shared.models import NewsCategory, NewsItem +from shared.events import NewsEvent, EventType, Event + + +def test_news_event_to_dict(): + item = NewsItem( + source="finnhub", + headline="Test", + published_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + sentiment=0.5, + category=NewsCategory.MACRO, + ) + event = NewsEvent(data=item) + d = event.to_dict() + assert d["type"] == EventType.NEWS + assert d["data"]["source"] == "finnhub" + + +def test_news_event_from_raw(): + raw = { + "type": "NEWS", + "data": { + "id": "abc", + "source": "rss", + "headline": "Test headline", + "published_at": "2026-04-02T00:00:00+00:00", + "sentiment": 0.3, + "category": "earnings", + "symbols": ["AAPL"], + "raw_data": {}, + }, + } + event = NewsEvent.from_raw(raw) + assert event.data.source == "rss" + assert event.data.symbols == ["AAPL"] + + +def test_event_dispatcher_news(): + raw = { + "type": "NEWS", + "data": { + "id": "abc", + "source": "finnhub", + "headline": "Test", + "published_at": "2026-04-02T00:00:00+00:00", + "sentiment": 0.0, + "category": "macro", + "raw_data": {}, + }, + } + event = Event.from_dict(raw) + assert isinstance(event, NewsEvent) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest shared/tests/test_news_events.py -v` +Expected: FAIL — `NewsEvent` not in events.py, `EventType.NEWS` missing + +- [ ] **Step 3: Add NewsEvent to events.py** + +Add `NEWS = "NEWS"` to `EventType` enum. + +Add `NewsEvent` class and register it in `_EVENT_TYPE_MAP`: + +```python +from shared.models import Candle, Signal, Order, NewsItem + +class NewsEvent(BaseModel): + type: EventType = EventType.NEWS + data: NewsItem + + def to_dict(self) -> dict: + return { + "type": self.type, + "data": self.data.model_dump(mode="json"), + } + + @classmethod + def from_raw(cls, raw: dict) -> "NewsEvent": + return cls(type=raw["type"], data=NewsItem(**raw["data"])) +``` + +Add to `_EVENT_TYPE_MAP`: +```python +EventType.NEWS: NewsEvent, +``` + +- [ ] **Step 4: Run event tests to verify they pass** + +Run: `pytest shared/tests/test_news_events.py -v` +Expected: All 3 tests PASS + +- [ ] **Step 5: Run all existing event tests to check no regressions** + +Run: `pytest shared/tests/test_events.py -v` +Expected: All existing tests PASS + +- [ ] **Step 6: Write tests for DB news methods** + +Create `shared/tests/test_db_news.py`: + +```python +"""Tests for database news/sentiment methods. + +These tests use an in-memory SQLite database. +""" + +import json +import uuid +import pytest +from datetime import datetime, date, timezone + +from shared.db import Database +from shared.models import NewsItem, NewsCategory +from shared.sentiment_models import SymbolScore, MarketSentiment + + +@pytest.fixture +async def db(): + database = Database("sqlite+aiosqlite://") + await database.connect() + yield database + await database.close() + + +async def test_insert_and_get_news_items(db): + item = NewsItem( + source="finnhub", + headline="AAPL earnings beat", + published_at=datetime(2026, 4, 2, 12, 0, tzinfo=timezone.utc), + sentiment=0.8, + category=NewsCategory.EARNINGS, + symbols=["AAPL"], + ) + await db.insert_news_item(item) + items = await db.get_recent_news(hours=24) + assert len(items) == 1 + assert items[0]["headline"] == "AAPL earnings beat" + + +async def test_upsert_symbol_score(db): + score = SymbolScore( + symbol="AAPL", + news_score=0.5, + news_count=10, + social_score=0.3, + policy_score=0.0, + filing_score=0.2, + composite=0.3, + updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + ) + await db.upsert_symbol_score(score) + scores = await db.get_top_symbol_scores(limit=5) + assert len(scores) == 1 + assert scores[0]["symbol"] == "AAPL" + + +async def test_upsert_market_sentiment(db): + ms = MarketSentiment( + fear_greed=55, + fear_greed_label="Neutral", + vix=18.2, + fed_stance="neutral", + market_regime="neutral", + updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + ) + await db.upsert_market_sentiment(ms) + result = await db.get_latest_market_sentiment() + assert result is not None + assert result["fear_greed"] == 55 + + +async def test_insert_stock_selection(db): + await db.insert_stock_selection( + trade_date=date(2026, 4, 2), + symbol="NVDA", + side="BUY", + conviction=0.85, + reason="CHIPS Act", + key_news=["Trump signs CHIPS expansion"], + sentiment_snapshot={"composite": 0.8}, + ) + selections = await db.get_stock_selections(date(2026, 4, 2)) + assert len(selections) == 1 + assert selections[0]["symbol"] == "NVDA" +``` + +- [ ] **Step 7: Run DB news tests to verify they fail** + +Run: `pytest shared/tests/test_db_news.py -v` +Expected: FAIL — methods not yet on Database class + +- [ ] **Step 8: Add news/sentiment DB methods to db.py** + +Add to `shared/src/shared/db.py` — new import at top: + +```python +import json +import uuid +from datetime import date +from shared.models import NewsItem +from shared.sentiment_models import SymbolScore, MarketSentiment +from shared.sa_models import NewsItemRow, SymbolScoreRow, MarketSentimentRow, StockSelectionRow +``` + +Add these methods to the `Database` class: + +```python + async def insert_news_item(self, item: NewsItem) -> None: + """Insert a news item.""" + row = NewsItemRow( + id=item.id, + source=item.source, + headline=item.headline, + summary=item.summary, + url=item.url, + published_at=item.published_at, + symbols=json.dumps(item.symbols), + sentiment=item.sentiment, + category=item.category.value, + raw_data=json.dumps(item.raw_data), + created_at=item.created_at, + ) + async with self._session_factory() as session: + try: + session.add(row) + await session.commit() + except Exception: + await session.rollback() + raise + + async def get_recent_news(self, hours: int = 24) -> list[dict]: + """Retrieve news items from the last N hours.""" + since = datetime.now(timezone.utc) - timedelta(hours=hours) + stmt = ( + select(NewsItemRow) + .where(NewsItemRow.published_at >= since) + .order_by(NewsItemRow.published_at.desc()) + ) + async with self._session_factory() as session: + result = await session.execute(stmt) + rows = result.scalars().all() + return [ + { + "id": r.id, + "source": r.source, + "headline": r.headline, + "summary": r.summary, + "url": r.url, + "published_at": r.published_at, + "symbols": json.loads(r.symbols) if r.symbols else [], + "sentiment": r.sentiment, + "category": r.category, + "created_at": r.created_at, + } + for r in rows + ] + + async def upsert_symbol_score(self, score: SymbolScore) -> None: + """Insert or update a symbol score.""" + async with self._session_factory() as session: + try: + existing = await session.execute( + select(SymbolScoreRow).where(SymbolScoreRow.symbol == score.symbol) + ) + row = existing.scalar_one_or_none() + if row: + row.news_score = score.news_score + row.news_count = score.news_count + row.social_score = score.social_score + row.policy_score = score.policy_score + row.filing_score = score.filing_score + row.composite = score.composite + row.updated_at = score.updated_at + else: + row = SymbolScoreRow( + id=str(uuid.uuid4()), + symbol=score.symbol, + news_score=score.news_score, + news_count=score.news_count, + social_score=score.social_score, + policy_score=score.policy_score, + filing_score=score.filing_score, + composite=score.composite, + updated_at=score.updated_at, + ) + session.add(row) + await session.commit() + except Exception: + await session.rollback() + raise + + async def get_top_symbol_scores(self, limit: int = 20) -> list[dict]: + """Get top symbol scores ordered by composite descending.""" + stmt = ( + select(SymbolScoreRow) + .order_by(SymbolScoreRow.composite.desc()) + .limit(limit) + ) + async with self._session_factory() as session: + result = await session.execute(stmt) + rows = result.scalars().all() + return [ + { + "symbol": r.symbol, + "news_score": r.news_score, + "news_count": r.news_count, + "social_score": r.social_score, + "policy_score": r.policy_score, + "filing_score": r.filing_score, + "composite": r.composite, + "updated_at": r.updated_at, + } + for r in rows + ] + + async def upsert_market_sentiment(self, ms: MarketSentiment) -> None: + """Insert or update the latest market sentiment (single row, id='latest').""" + async with self._session_factory() as session: + try: + existing = await session.execute( + select(MarketSentimentRow).where(MarketSentimentRow.id == "latest") + ) + row = existing.scalar_one_or_none() + if row: + row.fear_greed = ms.fear_greed + row.fear_greed_label = ms.fear_greed_label + row.vix = ms.vix + row.fed_stance = ms.fed_stance + row.market_regime = ms.market_regime + row.updated_at = ms.updated_at + else: + row = MarketSentimentRow( + id="latest", + fear_greed=ms.fear_greed, + fear_greed_label=ms.fear_greed_label, + vix=ms.vix, + fed_stance=ms.fed_stance, + market_regime=ms.market_regime, + updated_at=ms.updated_at, + ) + session.add(row) + await session.commit() + except Exception: + await session.rollback() + raise + + async def get_latest_market_sentiment(self) -> dict | None: + """Get the latest market sentiment.""" + stmt = select(MarketSentimentRow).where(MarketSentimentRow.id == "latest") + async with self._session_factory() as session: + result = await session.execute(stmt) + r = result.scalar_one_or_none() + if r is None: + return None + return { + "fear_greed": r.fear_greed, + "fear_greed_label": r.fear_greed_label, + "vix": r.vix, + "fed_stance": r.fed_stance, + "market_regime": r.market_regime, + "updated_at": r.updated_at, + } + + async def insert_stock_selection( + self, + trade_date: date, + symbol: str, + side: str, + conviction: float, + reason: str, + key_news: list[str], + sentiment_snapshot: dict, + ) -> None: + """Insert a stock selection record.""" + row = StockSelectionRow( + id=str(uuid.uuid4()), + trade_date=trade_date, + symbol=symbol, + side=side, + conviction=conviction, + reason=reason, + key_news=json.dumps(key_news), + sentiment_snapshot=json.dumps(sentiment_snapshot), + ) + async with self._session_factory() as session: + try: + session.add(row) + await session.commit() + except Exception: + await session.rollback() + raise + + async def get_stock_selections(self, trade_date: date) -> list[dict]: + """Get stock selections for a specific date.""" + stmt = ( + select(StockSelectionRow) + .where(StockSelectionRow.trade_date == trade_date) + .order_by(StockSelectionRow.conviction.desc()) + ) + async with self._session_factory() as session: + result = await session.execute(stmt) + rows = result.scalars().all() + return [ + { + "symbol": r.symbol, + "side": r.side, + "conviction": r.conviction, + "reason": r.reason, + "key_news": json.loads(r.key_news) if r.key_news else [], + "sentiment_snapshot": json.loads(r.sentiment_snapshot) if r.sentiment_snapshot else {}, + } + for r in rows + ] +``` + +- [ ] **Step 9: Run DB news tests to verify they pass** + +Run: `pytest shared/tests/test_db_news.py -v` +Expected: All 4 tests PASS + +Note: These tests require `aiosqlite` package. If not installed: `pip install aiosqlite` + +- [ ] **Step 10: Run all shared tests to check no regressions** + +Run: `pytest shared/tests/ -v` +Expected: All tests PASS + +- [ ] **Step 11: Commit** + +```bash +git add shared/src/shared/events.py shared/src/shared/db.py shared/tests/test_news_events.py shared/tests/test_db_news.py +git commit -m "feat: add NewsEvent, DB methods for news/sentiment/selections" +``` + +--- + +### Task 5: Update Settings with new env vars + +**Files:** +- Modify: `shared/src/shared/config.py` +- Modify: `.env.example` + +- [ ] **Step 1: Add new settings to config.py** + +Add these fields to the `Settings` class in `shared/src/shared/config.py`: + +```python + # News collector + finnhub_api_key: str = "" + news_poll_interval: int = 300 + sentiment_aggregate_interval: int = 900 + # Stock selector + selector_candidates_time: str = "15:00" + selector_filter_time: str = "15:15" + selector_final_time: str = "15:30" + selector_max_picks: int = 3 + # LLM + anthropic_api_key: str = "" + anthropic_model: str = "claude-sonnet-4-20250514" +``` + +- [ ] **Step 2: Add to .env.example** + +Append to `.env.example`: + +```bash + +# News Collector +FINNHUB_API_KEY= +NEWS_POLL_INTERVAL=300 +SENTIMENT_AGGREGATE_INTERVAL=900 + +# Stock Selector +SELECTOR_CANDIDATES_TIME=15:00 +SELECTOR_FILTER_TIME=15:15 +SELECTOR_FINAL_TIME=15:30 +SELECTOR_MAX_PICKS=3 + +# LLM (for stock selector) +ANTHROPIC_API_KEY= +ANTHROPIC_MODEL=claude-sonnet-4-20250514 +``` + +- [ ] **Step 3: Commit** + +```bash +git add shared/src/shared/config.py .env.example +git commit -m "feat: add news collector and stock selector config settings" +``` + +--- + +## Phase 2: News Collector Service + +### Task 6: Scaffold news-collector service + +**Files:** +- Create: `services/news-collector/pyproject.toml` +- Create: `services/news-collector/Dockerfile` +- Create: `services/news-collector/src/news_collector/__init__.py` +- Create: `services/news-collector/src/news_collector/config.py` +- Create: `services/news-collector/src/news_collector/collectors/__init__.py` +- Create: `services/news-collector/src/news_collector/collectors/base.py` +- Create: `services/news-collector/tests/__init__.py` + +- [ ] **Step 1: Create pyproject.toml** + +Create `services/news-collector/pyproject.toml`: + +```toml +[project] +name = "news-collector" +version = "0.1.0" +description = "News and sentiment data collector service" +requires-python = ">=3.12" +dependencies = [ + "trading-shared", + "feedparser>=6.0", + "nltk>=3.8", + "aiohttp>=3.9", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0", + "pytest-asyncio>=0.23", + "aioresponses>=0.7", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/news_collector"] +``` + +- [ ] **Step 2: Create Dockerfile** + +Create `services/news-collector/Dockerfile`: + +```dockerfile +FROM python:3.12-slim +WORKDIR /app +COPY shared/ shared/ +RUN pip install --no-cache-dir ./shared +COPY services/news-collector/ services/news-collector/ +RUN pip install --no-cache-dir ./services/news-collector +RUN python -c "import nltk; nltk.download('vader_lexicon', quiet=True)" +ENV PYTHONPATH=/app +CMD ["python", "-m", "news_collector.main"] +``` + +- [ ] **Step 3: Create config.py** + +Create `services/news-collector/src/news_collector/config.py`: + +```python +"""News Collector configuration.""" + +from shared.config import Settings + + +class NewsCollectorConfig(Settings): + health_port: int = 8084 + finnhub_api_key: str = "" + news_poll_interval: int = 300 + sentiment_aggregate_interval: int = 900 +``` + +- [ ] **Step 4: Create BaseCollector** + +Create `services/news-collector/src/news_collector/collectors/base.py`: + +```python +"""Base class for all news collectors.""" + +from abc import ABC, abstractmethod + +from shared.models import NewsItem + + +class BaseCollector(ABC): + name: str = "base" + poll_interval: int = 300 # seconds + + @abstractmethod + async def collect(self) -> list[NewsItem]: + """Collect news items from the source.""" + + @abstractmethod + async def is_available(self) -> bool: + """Check if this data source is accessible.""" +``` + +- [ ] **Step 5: Create __init__.py files** + +Create `services/news-collector/src/news_collector/__init__.py`: +```python +"""News collector service.""" +``` + +Create `services/news-collector/src/news_collector/collectors/__init__.py`: +```python +"""News collectors.""" +``` + +Create `services/news-collector/tests/__init__.py`: +```python +``` + +- [ ] **Step 6: Commit** + +```bash +git add services/news-collector/ +git commit -m "feat: scaffold news-collector service with BaseCollector" +``` + +--- + +### Task 7: Implement Finnhub news collector + +**Files:** +- Create: `services/news-collector/src/news_collector/collectors/finnhub.py` +- Create: `services/news-collector/tests/test_finnhub.py` + +- [ ] **Step 1: Write tests** + +Create `services/news-collector/tests/test_finnhub.py`: + +```python +"""Tests for Finnhub news collector.""" + +import pytest +from unittest.mock import AsyncMock, patch +from datetime import datetime, timezone + +from news_collector.collectors.finnhub import FinnhubCollector + + +@pytest.fixture +def collector(): + return FinnhubCollector(api_key="test_key") + + +def test_collector_name(collector): + assert collector.name == "finnhub" + assert collector.poll_interval == 300 + + +async def test_is_available_with_key(collector): + assert await collector.is_available() is True + + +async def test_is_available_without_key(): + c = FinnhubCollector(api_key="") + assert await c.is_available() is False + + +async def test_collect_parses_response(collector): + mock_response = [ + { + "category": "top news", + "datetime": 1711929600, + "headline": "AAPL beats earnings", + "id": 12345, + "related": "AAPL", + "source": "MarketWatch", + "summary": "Apple reported better than expected...", + "url": "https://example.com/article", + }, + { + "category": "top news", + "datetime": 1711929000, + "headline": "Fed holds rates steady", + "id": 12346, + "related": "", + "source": "Reuters", + "summary": "The Federal Reserve...", + "url": "https://example.com/fed", + }, + ] + + with patch.object(collector, "_fetch_news", new_callable=AsyncMock, return_value=mock_response): + items = await collector.collect() + + assert len(items) == 2 + assert items[0].source == "finnhub" + assert items[0].headline == "AAPL beats earnings" + assert items[0].symbols == ["AAPL"] + assert items[0].url == "https://example.com/article" + assert isinstance(items[0].sentiment, float) + # Second item has no related ticker + assert items[1].symbols == [] + + +async def test_collect_handles_empty_response(collector): + with patch.object(collector, "_fetch_news", new_callable=AsyncMock, return_value=[]): + items = await collector.collect() + assert items == [] +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest services/news-collector/tests/test_finnhub.py -v` +Expected: FAIL — module not found + +- [ ] **Step 3: Implement FinnhubCollector** + +Create `services/news-collector/src/news_collector/collectors/finnhub.py`: + +```python +"""Finnhub market news collector (free tier: 60 req/min).""" + +import logging +from datetime import datetime, timezone + +import aiohttp +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem +from news_collector.collectors.base import BaseCollector + +logger = logging.getLogger(__name__) + +FINNHUB_NEWS_URL = "https://finnhub.io/api/v1/news" + + +class FinnhubCollector(BaseCollector): + name = "finnhub" + poll_interval = 300 # 5 minutes + + def __init__(self, api_key: str) -> None: + self._api_key = api_key + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return bool(self._api_key) + + async def _fetch_news(self) -> list[dict]: + """Fetch general news from Finnhub API.""" + params = {"category": "general", "token": self._api_key} + async with aiohttp.ClientSession() as session: + async with session.get(FINNHUB_NEWS_URL, params=params) as resp: + if resp.status != 200: + logger.warning("finnhub_fetch_failed", status=resp.status) + return [] + return await resp.json() + + def _analyze_sentiment(self, text: str) -> float: + """Return VADER compound score (-1.0 to 1.0).""" + scores = self._vader.polarity_scores(text) + return scores["compound"] + + def _extract_symbols(self, related: str) -> list[str]: + """Parse Finnhub 'related' field into symbol list.""" + if not related or not related.strip(): + return [] + return [s.strip() for s in related.split(",") if s.strip()] + + def _categorize(self, article: dict) -> NewsCategory: + """Determine category from article content.""" + headline = article.get("headline", "").lower() + if any(w in headline for w in ["fed", "fomc", "rate", "inflation"]): + return NewsCategory.FED + if any(w in headline for w in ["tariff", "sanction", "regulation", "trump", "biden", "congress"]): + return NewsCategory.POLICY + if any(w in headline for w in ["earnings", "revenue", "profit", "eps"]): + return NewsCategory.EARNINGS + return NewsCategory.MACRO + + async def collect(self) -> list[NewsItem]: + raw = await self._fetch_news() + items = [] + for article in raw: + headline = article.get("headline", "") + summary = article.get("summary", "") + sentiment_text = f"{headline}. {summary}" if summary else headline + + items.append( + NewsItem( + source=self.name, + headline=headline, + summary=summary or None, + url=article.get("url"), + published_at=datetime.fromtimestamp( + article.get("datetime", 0), tz=timezone.utc + ), + symbols=self._extract_symbols(article.get("related", "")), + sentiment=self._analyze_sentiment(sentiment_text), + category=self._categorize(article), + raw_data=article, + ) + ) + return items +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest services/news-collector/tests/test_finnhub.py -v` +Expected: All 5 tests PASS + +Note: Requires `nltk` and VADER lexicon. If not downloaded: `python -c "import nltk; nltk.download('vader_lexicon')"` + +- [ ] **Step 5: Commit** + +```bash +git add services/news-collector/src/news_collector/collectors/finnhub.py services/news-collector/tests/test_finnhub.py +git commit -m "feat: implement Finnhub news collector with VADER sentiment" +``` + +--- + +### Task 8: Implement RSS news collector + +**Files:** +- Create: `services/news-collector/src/news_collector/collectors/rss.py` +- Create: `services/news-collector/tests/test_rss.py` + +- [ ] **Step 1: Write tests** + +Create `services/news-collector/tests/test_rss.py`: + +```python +"""Tests for RSS news collector.""" + +import pytest +from unittest.mock import AsyncMock, patch +from datetime import datetime, timezone + +from news_collector.collectors.rss import RSSCollector + + +@pytest.fixture +def collector(): + return RSSCollector() + + +def test_collector_name(collector): + assert collector.name == "rss" + assert collector.poll_interval == 600 + + +async def test_is_available(collector): + assert await collector.is_available() is True + + +async def test_collect_parses_feed(collector): + mock_feed = { + "entries": [ + { + "title": "NVDA surges on AI demand", + "link": "https://example.com/nvda", + "published_parsed": (2026, 4, 2, 12, 0, 0, 0, 0, 0), + "summary": "Nvidia stock jumped 5%...", + }, + { + "title": "Markets rally on jobs data", + "link": "https://example.com/market", + "published_parsed": (2026, 4, 2, 11, 0, 0, 0, 0, 0), + "summary": "The S&P 500 rose...", + }, + ], + } + + with patch.object(collector, "_fetch_feeds", new_callable=AsyncMock, return_value=[mock_feed]): + items = await collector.collect() + + assert len(items) == 2 + assert items[0].source == "rss" + assert items[0].headline == "NVDA surges on AI demand" + assert isinstance(items[0].sentiment, float) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest services/news-collector/tests/test_rss.py -v` +Expected: FAIL + +- [ ] **Step 3: Implement RSSCollector** + +Create `services/news-collector/src/news_collector/collectors/rss.py`: + +```python +"""RSS feed collector for Yahoo Finance, Google News, MarketWatch.""" + +import asyncio +import logging +import re +from calendar import timegm +from datetime import datetime, timezone + +import aiohttp +import feedparser +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem +from news_collector.collectors.base import BaseCollector + +logger = logging.getLogger(__name__) + +DEFAULT_FEEDS = [ + "https://feeds.finance.yahoo.com/rss/2.0/headline?s=^GSPC®ion=US&lang=en-US", + "https://news.google.com/rss/topics/CAAqJggKIiBDQkFTRWdvSUwyMHZNRGx6TVdZU0FtVnVHZ0pWVXlnQVAB?hl=en-US&gl=US&ceid=US:en", + "https://www.marketwatch.com/rss/topstories", +] + +# Common US stock tickers to detect in headlines +TICKER_PATTERN = re.compile( + r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|JPM|V|JNJ|WMT|PG|UNH|HD|" + r"MA|DIS|BAC|XOM|PFE|KO|PEP|CSCO|INTC|VZ|NFLX|ADBE|CRM|AMD|QCOM|" + r"GS|BA|CAT|MMM|IBM|GE|F|GM|NKE|MCD|SBUX|SPY|QQQ|IWM)\b" +) + + +class RSSCollector(BaseCollector): + name = "rss" + poll_interval = 600 # 10 minutes + + def __init__(self, feeds: list[str] | None = None) -> None: + self._feeds = feeds or DEFAULT_FEEDS + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_feeds(self) -> list[dict]: + """Fetch and parse all RSS feeds.""" + results = [] + async with aiohttp.ClientSession() as session: + for url in self._feeds: + try: + async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp: + if resp.status == 200: + text = await resp.text() + feed = feedparser.parse(text) + results.append(feed) + except Exception as exc: + logger.warning("rss_fetch_failed", url=url, error=str(exc)) + return results + + def _extract_symbols(self, text: str) -> list[str]: + """Extract stock tickers from text.""" + return list(set(TICKER_PATTERN.findall(text))) + + def _parse_time(self, entry: dict) -> datetime: + """Parse published time from feed entry.""" + parsed = entry.get("published_parsed") + if parsed: + return datetime.fromtimestamp(timegm(parsed), tz=timezone.utc) + return datetime.now(timezone.utc) + + async def collect(self) -> list[NewsItem]: + feeds = await self._fetch_feeds() + items = [] + seen_titles = set() + + for feed in feeds: + for entry in feed.get("entries", []): + title = entry.get("title", "").strip() + if not title or title in seen_titles: + continue + seen_titles.add(title) + + summary = entry.get("summary", "") + sentiment_text = f"{title}. {summary}" if summary else title + + items.append( + NewsItem( + source=self.name, + headline=title, + summary=summary or None, + url=entry.get("link"), + published_at=self._parse_time(entry), + symbols=self._extract_symbols(f"{title} {summary}"), + sentiment=self._vader.polarity_scores(sentiment_text)["compound"], + category=NewsCategory.MACRO, + raw_data={"feed_title": title}, + ) + ) + + return items +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest services/news-collector/tests/test_rss.py -v` +Expected: All 3 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add services/news-collector/src/news_collector/collectors/rss.py services/news-collector/tests/test_rss.py +git commit -m "feat: implement RSS news collector (Yahoo, Google News, MarketWatch)" +``` + +--- + +### Task 9: Implement Fear & Greed Index collector + +**Files:** +- Create: `services/news-collector/src/news_collector/collectors/fear_greed.py` +- Create: `services/news-collector/tests/test_fear_greed.py` + +- [ ] **Step 1: Write tests** + +Create `services/news-collector/tests/test_fear_greed.py`: + +```python +"""Tests for CNN Fear & Greed Index collector.""" + +import pytest +from unittest.mock import AsyncMock, patch + +from news_collector.collectors.fear_greed import FearGreedCollector + + +@pytest.fixture +def collector(): + return FearGreedCollector() + + +def test_collector_name(collector): + assert collector.name == "fear_greed" + assert collector.poll_interval == 3600 + + +async def test_is_available(collector): + assert await collector.is_available() is True + + +async def test_collect_parses_api_response(collector): + mock_data = { + "fear_and_greed": { + "score": 45.0, + "rating": "Fear", + "timestamp": "2026-04-02T12:00:00+00:00", + } + } + with patch.object(collector, "_fetch_index", new_callable=AsyncMock, return_value=mock_data): + result = await collector.collect() + + assert result.fear_greed == 45 + assert result.fear_greed_label == "Fear" + + +async def test_collect_returns_none_on_failure(collector): + with patch.object(collector, "_fetch_index", new_callable=AsyncMock, return_value=None): + result = await collector.collect() + assert result is None + + +def test_classify_label(): + c = FearGreedCollector() + assert c._classify(10) == "Extreme Fear" + assert c._classify(30) == "Fear" + assert c._classify(50) == "Neutral" + assert c._classify(70) == "Greed" + assert c._classify(85) == "Extreme Greed" +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest services/news-collector/tests/test_fear_greed.py -v` +Expected: FAIL + +- [ ] **Step 3: Implement FearGreedCollector** + +Create `services/news-collector/src/news_collector/collectors/fear_greed.py`: + +```python +"""CNN Fear & Greed Index collector.""" + +import logging +from dataclasses import dataclass +from typing import Optional + +import aiohttp + +from news_collector.collectors.base import BaseCollector +from shared.models import NewsItem + +logger = logging.getLogger(__name__) + +FEAR_GREED_URL = "https://production.dataviz.cnn.io/index/fearandgreed/graphdata" + + +@dataclass +class FearGreedResult: + fear_greed: int + fear_greed_label: str + + +class FearGreedCollector(BaseCollector): + """Fetches CNN Fear & Greed Index. + + Note: This collector does NOT return NewsItem — it returns FearGreedResult + which feeds directly into MarketSentiment. The main.py scheduler handles + this differently from news collectors. + """ + + name = "fear_greed" + poll_interval = 3600 # 1 hour + + async def is_available(self) -> bool: + return True + + async def _fetch_index(self) -> Optional[dict]: + """Fetch Fear & Greed data from CNN API.""" + headers = {"User-Agent": "Mozilla/5.0"} + try: + async with aiohttp.ClientSession() as session: + async with session.get( + FEAR_GREED_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=10) + ) as resp: + if resp.status != 200: + logger.warning("fear_greed_fetch_failed", status=resp.status) + return None + return await resp.json() + except Exception as exc: + logger.warning("fear_greed_error", error=str(exc)) + return None + + def _classify(self, score: int) -> str: + """Classify numeric score into label.""" + if score <= 20: + return "Extreme Fear" + if score <= 40: + return "Fear" + if score <= 60: + return "Neutral" + if score <= 80: + return "Greed" + return "Extreme Greed" + + async def collect(self) -> Optional[FearGreedResult]: + """Collect Fear & Greed Index. Returns FearGreedResult or None.""" + data = await self._fetch_index() + if data is None: + return None + + try: + fg = data["fear_and_greed"] + score = int(fg["score"]) + label = fg.get("rating", self._classify(score)) + return FearGreedResult(fear_greed=score, fear_greed_label=label) + except (KeyError, ValueError, TypeError) as exc: + logger.warning("fear_greed_parse_failed", error=str(exc)) + return None +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest services/news-collector/tests/test_fear_greed.py -v` +Expected: All 5 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add services/news-collector/src/news_collector/collectors/fear_greed.py services/news-collector/tests/test_fear_greed.py +git commit -m "feat: implement CNN Fear & Greed Index collector" +``` + +--- + +### Task 10: Implement SEC EDGAR collector + +**Files:** +- Create: `services/news-collector/src/news_collector/collectors/sec_edgar.py` +- Create: `services/news-collector/tests/test_sec_edgar.py` + +- [ ] **Step 1: Write tests** + +Create `services/news-collector/tests/test_sec_edgar.py`: + +```python +"""Tests for SEC EDGAR filing collector.""" + +import pytest +from unittest.mock import AsyncMock, patch + +from news_collector.collectors.sec_edgar import SecEdgarCollector + + +@pytest.fixture +def collector(): + return SecEdgarCollector() + + +def test_collector_name(collector): + assert collector.name == "sec_edgar" + assert collector.poll_interval == 1800 + + +async def test_is_available(collector): + assert await collector.is_available() is True + + +async def test_collect_parses_filings(collector): + mock_response = { + "filings": { + "recent": { + "accessionNumber": ["0001234-26-000001"], + "filingDate": ["2026-04-02"], + "primaryDocument": ["filing.htm"], + "form": ["8-K"], + "primaryDocDescription": ["Current Report"], + } + }, + "tickers": [{"ticker": "AAPL"}], + "name": "Apple Inc", + } + with patch.object(collector, "_fetch_recent_filings", new_callable=AsyncMock, return_value=[mock_response]): + items = await collector.collect() + + assert len(items) == 1 + assert items[0].source == "sec_edgar" + assert items[0].category.value == "filing" + assert "AAPL" in items[0].symbols + + +async def test_collect_handles_empty(collector): + with patch.object(collector, "_fetch_recent_filings", new_callable=AsyncMock, return_value=[]): + items = await collector.collect() + assert items == [] +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest services/news-collector/tests/test_sec_edgar.py -v` +Expected: FAIL + +- [ ] **Step 3: Implement SecEdgarCollector** + +Create `services/news-collector/src/news_collector/collectors/sec_edgar.py`: + +```python +"""SEC EDGAR filing collector (free, no API key required).""" + +import logging +from datetime import datetime, timezone + +import aiohttp +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem +from news_collector.collectors.base import BaseCollector + +logger = logging.getLogger(__name__) + +EDGAR_FULL_TEXT_SEARCH = "https://efts.sec.gov/LATEST/search-index" +EDGAR_RECENT_FILINGS = "https://efts.sec.gov/LATEST/search-index?q=%228-K%22&dateRange=custom&startdt={date}&enddt={date}&forms=8-K" +EDGAR_COMPANY_FILINGS = "https://data.sec.gov/submissions/CIK{cik}.json" + +# CIK numbers for major companies (subset — extend as needed) +TRACKED_CIKS = { + "0000320193": "AAPL", + "0000789019": "MSFT", + "0001652044": "GOOGL", + "0001018724": "AMZN", + "0001318605": "TSLA", + "0001045810": "NVDA", + "0001326801": "META", + "0000019617": "JPM", + "0000078003": "PFE", + "0000021344": "KO", +} + +SEC_USER_AGENT = "TradingPlatform research@example.com" + + +class SecEdgarCollector(BaseCollector): + name = "sec_edgar" + poll_interval = 1800 # 30 minutes + + def __init__(self) -> None: + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_recent_filings(self) -> list[dict]: + """Fetch recent 8-K filings for tracked companies.""" + results = [] + headers = {"User-Agent": SEC_USER_AGENT} + async with aiohttp.ClientSession() as session: + for cik, ticker in TRACKED_CIKS.items(): + try: + url = f"https://data.sec.gov/submissions/CIK{cik}.json" + async with session.get( + url, headers=headers, timeout=aiohttp.ClientTimeout(total=10) + ) as resp: + if resp.status == 200: + data = await resp.json() + data["tickers"] = [{"ticker": ticker}] + results.append(data) + except Exception as exc: + logger.warning("sec_fetch_failed", cik=cik, error=str(exc)) + return results + + async def collect(self) -> list[NewsItem]: + filings_data = await self._fetch_recent_filings() + items = [] + today = datetime.now(timezone.utc).strftime("%Y-%m-%d") + + for company_data in filings_data: + tickers = [t["ticker"] for t in company_data.get("tickers", [])] + company_name = company_data.get("name", "Unknown") + recent = company_data.get("filings", {}).get("recent", {}) + + forms = recent.get("form", []) + dates = recent.get("filingDate", []) + descriptions = recent.get("primaryDocDescription", []) + accessions = recent.get("accessionNumber", []) + + for i, form in enumerate(forms): + if form != "8-K": + continue + filing_date = dates[i] if i < len(dates) else "" + if filing_date != today: + continue + + desc = descriptions[i] if i < len(descriptions) else "8-K Filing" + accession = accessions[i] if i < len(accessions) else "" + headline = f"{company_name} ({', '.join(tickers)}): {form} - {desc}" + + items.append( + NewsItem( + source=self.name, + headline=headline, + summary=desc, + url=f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&accession={accession}", + published_at=datetime.strptime(filing_date, "%Y-%m-%d").replace(tzinfo=timezone.utc), + symbols=tickers, + sentiment=self._vader.polarity_scores(headline)["compound"], + category=NewsCategory.FILING, + raw_data={"form": form, "accession": accession}, + ) + ) + + return items +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest services/news-collector/tests/test_sec_edgar.py -v` +Expected: All 4 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add services/news-collector/src/news_collector/collectors/sec_edgar.py services/news-collector/tests/test_sec_edgar.py +git commit -m "feat: implement SEC EDGAR 8-K filing collector" +``` + +--- + +### Task 11: Implement Reddit collector + +**Files:** +- Create: `services/news-collector/src/news_collector/collectors/reddit.py` +- Create: `services/news-collector/tests/test_reddit.py` + +- [ ] **Step 1: Write tests** + +Create `services/news-collector/tests/test_reddit.py`: + +```python +"""Tests for Reddit collector.""" + +import pytest +from unittest.mock import AsyncMock, patch + +from news_collector.collectors.reddit import RedditCollector + + +@pytest.fixture +def collector(): + return RedditCollector() + + +def test_collector_name(collector): + assert collector.name == "reddit" + assert collector.poll_interval == 900 + + +async def test_is_available(collector): + assert await collector.is_available() is True + + +async def test_collect_parses_posts(collector): + mock_posts = [ + { + "data": { + "title": "NVDA to the moon! 🚀 AI demand is insane", + "selftext": "Just loaded up on NVDA calls", + "url": "https://reddit.com/r/wallstreetbets/123", + "created_utc": 1711929600, + "score": 500, + "num_comments": 200, + "subreddit": "wallstreetbets", + } + }, + ] + with patch.object(collector, "_fetch_subreddit", new_callable=AsyncMock, return_value=mock_posts): + items = await collector.collect() + + assert len(items) >= 1 + assert items[0].source == "reddit" + assert items[0].category.value == "social" + assert isinstance(items[0].sentiment, float) + + +async def test_collect_filters_low_score(collector): + mock_posts = [ + { + "data": { + "title": "Random question about stocks", + "selftext": "", + "url": "https://reddit.com/r/stocks/456", + "created_utc": 1711929600, + "score": 3, + "num_comments": 1, + "subreddit": "stocks", + } + }, + ] + with patch.object(collector, "_fetch_subreddit", new_callable=AsyncMock, return_value=mock_posts): + items = await collector.collect() + assert items == [] +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest services/news-collector/tests/test_reddit.py -v` +Expected: FAIL + +- [ ] **Step 3: Implement RedditCollector** + +Create `services/news-collector/src/news_collector/collectors/reddit.py`: + +```python +"""Reddit collector for r/wallstreetbets, r/stocks, r/investing.""" + +import logging +import re +from datetime import datetime, timezone + +import aiohttp +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem +from news_collector.collectors.base import BaseCollector + +logger = logging.getLogger(__name__) + +SUBREDDITS = ["wallstreetbets", "stocks", "investing"] +MIN_SCORE = 50 # Minimum upvotes to consider + +TICKER_PATTERN = re.compile( + r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|JPM|V|JNJ|WMT|PG|UNH|HD|" + r"MA|DIS|BAC|XOM|PFE|KO|PEP|CSCO|INTC|VZ|NFLX|ADBE|CRM|AMD|QCOM|" + r"GS|BA|CAT|MMM|IBM|GE|F|GM|NKE|MCD|SBUX|SPY|QQQ|IWM)\b" +) + + +class RedditCollector(BaseCollector): + name = "reddit" + poll_interval = 900 # 15 minutes + + def __init__(self) -> None: + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_subreddit(self, subreddit: str = "wallstreetbets") -> list[dict]: + """Fetch hot posts from a subreddit via JSON API.""" + url = f"https://www.reddit.com/r/{subreddit}/hot.json?limit=25" + headers = {"User-Agent": "TradingPlatform/1.0"} + try: + async with aiohttp.ClientSession() as session: + async with session.get( + url, headers=headers, timeout=aiohttp.ClientTimeout(total=10) + ) as resp: + if resp.status != 200: + logger.warning("reddit_fetch_failed", subreddit=subreddit, status=resp.status) + return [] + data = await resp.json() + return data.get("data", {}).get("children", []) + except Exception as exc: + logger.warning("reddit_error", subreddit=subreddit, error=str(exc)) + return [] + + async def collect(self) -> list[NewsItem]: + items = [] + seen_titles = set() + + for subreddit in SUBREDDITS: + posts = await self._fetch_subreddit(subreddit) + for post in posts: + data = post.get("data", {}) + title = data.get("title", "").strip() + score = data.get("score", 0) + + if not title or title in seen_titles or score < MIN_SCORE: + continue + seen_titles.add(title) + + selftext = data.get("selftext", "") + text = f"{title}. {selftext}" if selftext else title + symbols = list(set(TICKER_PATTERN.findall(text))) + + items.append( + NewsItem( + source=self.name, + headline=title, + summary=selftext[:500] if selftext else None, + url=data.get("url"), + published_at=datetime.fromtimestamp( + data.get("created_utc", 0), tz=timezone.utc + ), + symbols=symbols, + sentiment=self._vader.polarity_scores(text)["compound"], + category=NewsCategory.SOCIAL, + raw_data={ + "subreddit": data.get("subreddit", subreddit), + "score": score, + "num_comments": data.get("num_comments", 0), + }, + ) + ) + + return items +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest services/news-collector/tests/test_reddit.py -v` +Expected: All 4 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add services/news-collector/src/news_collector/collectors/reddit.py services/news-collector/tests/test_reddit.py +git commit -m "feat: implement Reddit social sentiment collector" +``` + +--- + +### Task 12: Implement Truth Social and Fed collectors + +**Files:** +- Create: `services/news-collector/src/news_collector/collectors/truth_social.py` +- Create: `services/news-collector/src/news_collector/collectors/fed.py` +- Create: `services/news-collector/tests/test_truth_social.py` +- Create: `services/news-collector/tests/test_fed.py` + +- [ ] **Step 1: Write tests for Truth Social** + +Create `services/news-collector/tests/test_truth_social.py`: + +```python +"""Tests for Truth Social collector.""" + +import pytest +from unittest.mock import AsyncMock, patch + +from news_collector.collectors.truth_social import TruthSocialCollector + + +@pytest.fixture +def collector(): + return TruthSocialCollector() + + +def test_collector_name(collector): + assert collector.name == "truth_social" + assert collector.poll_interval == 900 + + +async def test_is_available(collector): + assert await collector.is_available() is True + + +async def test_collect_parses_posts(collector): + mock_posts = [ + { + "content": "We are imposing 25% tariffs on all steel imports!", + "created_at": "2026-04-02T12:00:00.000Z", + "url": "https://truthsocial.com/@realDonaldTrump/12345", + }, + ] + with patch.object(collector, "_fetch_posts", new_callable=AsyncMock, return_value=mock_posts): + items = await collector.collect() + + assert len(items) == 1 + assert items[0].source == "truth_social" + assert items[0].category.value == "policy" + assert "tariff" in items[0].headline.lower() or "tariff" in items[0].raw_data.get("content", "").lower() + + +async def test_collect_handles_empty(collector): + with patch.object(collector, "_fetch_posts", new_callable=AsyncMock, return_value=[]): + items = await collector.collect() + assert items == [] +``` + +- [ ] **Step 2: Write tests for Fed collector** + +Create `services/news-collector/tests/test_fed.py`: + +```python +"""Tests for Federal Reserve collector.""" + +import pytest +from unittest.mock import AsyncMock, patch + +from news_collector.collectors.fed import FedCollector + + +@pytest.fixture +def collector(): + return FedCollector() + + +def test_collector_name(collector): + assert collector.name == "fed" + assert collector.poll_interval == 3600 + + +async def test_is_available(collector): + assert await collector.is_available() is True + + +async def test_collect_parses_rss(collector): + mock_entries = [ + { + "title": "Federal Reserve issues FOMC statement", + "link": "https://www.federalreserve.gov/newsevents/pressreleases/monetary20260402a.htm", + "published_parsed": (2026, 4, 2, 14, 0, 0, 0, 0, 0), + "summary": "The Federal Open Market Committee decided to maintain the target range...", + }, + ] + with patch.object(collector, "_fetch_fed_rss", new_callable=AsyncMock, return_value=mock_entries): + items = await collector.collect() + + assert len(items) == 1 + assert items[0].source == "fed" + assert items[0].category.value == "fed" +``` + +- [ ] **Step 3: Run tests to verify they fail** + +Run: `pytest services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py -v` +Expected: FAIL + +- [ ] **Step 4: Implement TruthSocialCollector** + +Create `services/news-collector/src/news_collector/collectors/truth_social.py`: + +```python +"""Truth Social collector for Trump posts (policy-relevant).""" + +import logging +from datetime import datetime, timezone + +import aiohttp +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem +from news_collector.collectors.base import BaseCollector + +logger = logging.getLogger(__name__) + +# Truth Social uses a Mastodon-compatible API +TRUTH_SOCIAL_API = "https://truthsocial.com/api/v1/accounts/107780257626128497/statuses" + + +class TruthSocialCollector(BaseCollector): + name = "truth_social" + poll_interval = 900 # 15 minutes + + def __init__(self) -> None: + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_posts(self) -> list[dict]: + """Fetch recent posts from Truth Social.""" + headers = {"User-Agent": "Mozilla/5.0"} + try: + async with aiohttp.ClientSession() as session: + async with session.get( + TRUTH_SOCIAL_API, + headers=headers, + params={"limit": 10}, + timeout=aiohttp.ClientTimeout(total=15), + ) as resp: + if resp.status != 200: + logger.warning("truth_social_fetch_failed", status=resp.status) + return [] + return await resp.json() + except Exception as exc: + logger.warning("truth_social_error", error=str(exc)) + return [] + + def _strip_html(self, text: str) -> str: + """Remove HTML tags from content.""" + import re + return re.sub(r"<[^>]+>", "", text).strip() + + async def collect(self) -> list[NewsItem]: + posts = await self._fetch_posts() + items = [] + + for post in posts: + content = self._strip_html(post.get("content", "")) + if not content: + continue + + created_at_str = post.get("created_at", "") + try: + published = datetime.fromisoformat(created_at_str.replace("Z", "+00:00")) + except (ValueError, AttributeError): + published = datetime.now(timezone.utc) + + items.append( + NewsItem( + source=self.name, + headline=content[:200], + summary=content if len(content) > 200 else None, + url=post.get("url"), + published_at=published, + symbols=[], # Symbols extracted at aggregation stage via LLM + sentiment=self._vader.polarity_scores(content)["compound"], + category=NewsCategory.POLICY, + raw_data={"content": content, "id": post.get("id")}, + ) + ) + + return items +``` + +- [ ] **Step 5: Implement FedCollector** + +Create `services/news-collector/src/news_collector/collectors/fed.py`: + +```python +"""Federal Reserve press release and FOMC statement collector.""" + +import logging +from calendar import timegm +from datetime import datetime, timezone + +import aiohttp +import feedparser +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem +from news_collector.collectors.base import BaseCollector + +logger = logging.getLogger(__name__) + +FED_RSS_URL = "https://www.federalreserve.gov/feeds/press_all.xml" + + +class FedCollector(BaseCollector): + name = "fed" + poll_interval = 3600 # 1 hour + + def __init__(self) -> None: + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_fed_rss(self) -> list[dict]: + """Fetch Federal Reserve RSS feed entries.""" + try: + async with aiohttp.ClientSession() as session: + async with session.get( + FED_RSS_URL, timeout=aiohttp.ClientTimeout(total=10) + ) as resp: + if resp.status != 200: + logger.warning("fed_rss_failed", status=resp.status) + return [] + text = await resp.text() + feed = feedparser.parse(text) + return feed.get("entries", []) + except Exception as exc: + logger.warning("fed_rss_error", error=str(exc)) + return [] + + def _detect_stance(self, text: str) -> str: + """Detect hawkish/dovish/neutral stance from text.""" + text_lower = text.lower() + hawkish_words = ["tighten", "raise", "inflation concern", "restrictive", "higher rates"] + dovish_words = ["accommodate", "cut", "easing", "lower rates", "support growth"] + + hawk_count = sum(1 for w in hawkish_words if w in text_lower) + dove_count = sum(1 for w in dovish_words if w in text_lower) + + if hawk_count > dove_count: + return "hawkish" + if dove_count > hawk_count: + return "dovish" + return "neutral" + + async def collect(self) -> list[NewsItem]: + entries = await self._fetch_fed_rss() + items = [] + + for entry in entries: + title = entry.get("title", "").strip() + if not title: + continue + + summary = entry.get("summary", "") + parsed_time = entry.get("published_parsed") + if parsed_time: + published = datetime.fromtimestamp(timegm(parsed_time), tz=timezone.utc) + else: + published = datetime.now(timezone.utc) + + text = f"{title}. {summary}" if summary else title + + items.append( + NewsItem( + source=self.name, + headline=title, + summary=summary or None, + url=entry.get("link"), + published_at=published, + symbols=[], + sentiment=self._vader.polarity_scores(text)["compound"], + category=NewsCategory.FED, + raw_data={"stance": self._detect_stance(text)}, + ) + ) + + return items +``` + +- [ ] **Step 6: Run tests to verify they pass** + +Run: `pytest services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py -v` +Expected: All 7 tests PASS + +- [ ] **Step 7: Commit** + +```bash +git add services/news-collector/src/news_collector/collectors/truth_social.py services/news-collector/src/news_collector/collectors/fed.py services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py +git commit -m "feat: implement Truth Social and Federal Reserve collectors" +``` + +--- + +### Task 13: Implement news-collector main.py (scheduler) + +**Files:** +- Create: `services/news-collector/src/news_collector/main.py` +- Create: `services/news-collector/tests/test_main.py` + +- [ ] **Step 1: Write tests** + +Create `services/news-collector/tests/test_main.py`: + +```python +"""Tests for news collector scheduler.""" + +import pytest +from unittest.mock import AsyncMock, patch, MagicMock +from datetime import datetime, timezone + +from shared.models import NewsCategory, NewsItem + +from news_collector.main import run_collector_once + + +async def test_run_collector_once_stores_and_publishes(): + mock_item = NewsItem( + source="test", + headline="Test news", + published_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + sentiment=0.5, + category=NewsCategory.MACRO, + ) + + mock_collector = MagicMock() + mock_collector.name = "test" + mock_collector.collect = AsyncMock(return_value=[mock_item]) + + mock_db = MagicMock() + mock_db.insert_news_item = AsyncMock() + + mock_broker = MagicMock() + mock_broker.publish = AsyncMock() + + count = await run_collector_once(mock_collector, mock_db, mock_broker) + + assert count == 1 + mock_db.insert_news_item.assert_called_once_with(mock_item) + mock_broker.publish.assert_called_once() + + +async def test_run_collector_once_handles_empty(): + mock_collector = MagicMock() + mock_collector.name = "test" + mock_collector.collect = AsyncMock(return_value=[]) + + mock_db = MagicMock() + mock_broker = MagicMock() + + count = await run_collector_once(mock_collector, mock_db, mock_broker) + assert count == 0 +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest services/news-collector/tests/test_main.py -v` +Expected: FAIL + +- [ ] **Step 3: Implement main.py** + +Create `services/news-collector/src/news_collector/main.py`: + +```python +"""News Collector Service — schedules and runs all news collectors.""" + +import asyncio +import logging + +from shared.broker import RedisBroker +from shared.db import Database +from shared.events import NewsEvent +from shared.healthcheck import HealthCheckServer +from shared.logging import setup_logging +from shared.metrics import ServiceMetrics +from shared.models import NewsItem +from shared.notifier import TelegramNotifier +from shared.sentiment_models import MarketSentiment + +from news_collector.config import NewsCollectorConfig +from news_collector.collectors.base import BaseCollector +from news_collector.collectors.finnhub import FinnhubCollector +from news_collector.collectors.rss import RSSCollector +from news_collector.collectors.sec_edgar import SecEdgarCollector +from news_collector.collectors.truth_social import TruthSocialCollector +from news_collector.collectors.reddit import RedditCollector +from news_collector.collectors.fear_greed import FearGreedCollector +from news_collector.collectors.fed import FedCollector + +logger = logging.getLogger(__name__) + + +async def run_collector_once( + collector: BaseCollector, + db: Database, + broker: RedisBroker, +) -> int: + """Run a single collector, store results, publish to Redis. + Returns number of items collected.""" + items = await collector.collect() + if not isinstance(items, list): + # FearGreedCollector returns a FearGreedResult, not a list + return 0 + + for item in items: + await db.insert_news_item(item) + event = NewsEvent(data=item) + await broker.publish("news", event.to_dict()) + + return len(items) + + +async def run_collector_loop( + collector: BaseCollector, + db: Database, + broker: RedisBroker, + log, +) -> None: + """Run a collector on its poll interval forever.""" + while True: + try: + if await collector.is_available(): + count = await run_collector_once(collector, db, broker) + log.info("collector_run", collector=collector.name, items=count) + else: + log.debug("collector_unavailable", collector=collector.name) + except Exception as exc: + log.error("collector_error", collector=collector.name, error=str(exc)) + await asyncio.sleep(collector.poll_interval) + + +async def run_fear_greed_loop( + collector: FearGreedCollector, + db: Database, + log, +) -> None: + """Run the Fear & Greed collector and update market sentiment.""" + from datetime import datetime, timezone + + while True: + try: + result = await collector.collect() + if result is not None: + ms = MarketSentiment( + fear_greed=result.fear_greed, + fear_greed_label=result.fear_greed_label, + fed_stance="neutral", # Updated by Fed collector analysis + market_regime=_determine_regime(result.fear_greed, None), + updated_at=datetime.now(timezone.utc), + ) + await db.upsert_market_sentiment(ms) + log.info("fear_greed_updated", score=result.fear_greed, label=result.fear_greed_label) + except Exception as exc: + log.error("fear_greed_error", error=str(exc)) + await asyncio.sleep(collector.poll_interval) + + +async def run_aggregator_loop( + db: Database, + interval: int, + log, +) -> None: + """Run sentiment aggregation every `interval` seconds. + Reads recent news from DB, computes per-symbol scores, upserts into symbol_scores table.""" + from datetime import datetime, timezone + from shared.sentiment import SentimentAggregator + + aggregator = SentimentAggregator() + + while True: + try: + now = datetime.now(timezone.utc) + news_items = await db.get_recent_news(hours=24) + if news_items: + scores = aggregator.aggregate(news_items, now) + for symbol_score in scores.values(): + await db.upsert_symbol_score(symbol_score) + log.info("aggregation_complete", symbols=len(scores)) + except Exception as exc: + log.error("aggregation_error", error=str(exc)) + await asyncio.sleep(interval) + + +def _determine_regime(fear_greed: int, vix: float | None) -> str: + """Determine market regime from Fear & Greed and VIX.""" + if fear_greed <= 20: + return "risk_off" + if vix is not None and vix > 30: + return "risk_off" + if fear_greed >= 60 and (vix is None or vix < 20): + return "risk_on" + return "neutral" + + +async def run() -> None: + config = NewsCollectorConfig() + log = setup_logging("news-collector", config.log_level, config.log_format) + metrics = ServiceMetrics("news_collector") + + notifier = TelegramNotifier( + bot_token=config.telegram_bot_token, + chat_id=config.telegram_chat_id, + ) + + db = Database(config.database_url) + await db.connect() + + broker = RedisBroker(config.redis_url) + + health = HealthCheckServer( + "news-collector", + port=config.health_port, + auth_token=config.metrics_auth_token, + ) + health.register_check("redis", broker.ping) + await health.start() + metrics.service_up.labels(service="news-collector").set(1) + + # Initialize collectors + news_collectors: list[BaseCollector] = [ + RSSCollector(), + SecEdgarCollector(), + TruthSocialCollector(), + RedditCollector(), + FedCollector(), + ] + + # Finnhub requires API key + if config.finnhub_api_key: + news_collectors.append(FinnhubCollector(api_key=config.finnhub_api_key)) + + fear_greed = FearGreedCollector() + + log.info( + "starting", + collectors=[c.name for c in news_collectors], + fear_greed=True, + ) + + tasks = [] + try: + for collector in news_collectors: + task = asyncio.create_task(run_collector_loop(collector, db, broker, log)) + tasks.append(task) + + tasks.append(asyncio.create_task(run_fear_greed_loop(fear_greed, db, log))) + tasks.append(asyncio.create_task( + run_aggregator_loop(db, config.sentiment_aggregate_interval, log) + )) + + await asyncio.gather(*tasks) + except Exception as exc: + log.error("fatal_error", error=str(exc)) + await notifier.send_error(str(exc), "news-collector") + raise + finally: + for task in tasks: + task.cancel() + metrics.service_up.labels(service="news-collector").set(0) + await notifier.close() + await broker.close() + await db.close() + + +def main() -> None: + asyncio.run(run()) + + +if __name__ == "__main__": + main() +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest services/news-collector/tests/test_main.py -v` +Expected: All 2 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add services/news-collector/src/news_collector/main.py services/news-collector/tests/test_main.py +git commit -m "feat: implement news-collector main scheduler with all collectors" +``` + +--- + +## Phase 3: Sentiment Analysis Pipeline + +### Task 14: Implement sentiment aggregator + +**Files:** +- Modify: `shared/src/shared/sentiment.py` +- Create: `shared/tests/test_sentiment_aggregator.py` + +- [ ] **Step 1: Write tests** + +Create `shared/tests/test_sentiment_aggregator.py`: + +```python +"""Tests for sentiment aggregator.""" + +import pytest +from datetime import datetime, timezone, timedelta + +from shared.sentiment import SentimentAggregator + + +@pytest.fixture +def aggregator(): + return SentimentAggregator() + + +def test_freshness_decay_recent(): + a = SentimentAggregator() + now = datetime.now(timezone.utc) + assert a._freshness_decay(now, now) == 1.0 + + +def test_freshness_decay_3_hours(): + a = SentimentAggregator() + now = datetime.now(timezone.utc) + three_hours_ago = now - timedelta(hours=3) + assert a._freshness_decay(three_hours_ago, now) == 0.7 + + +def test_freshness_decay_12_hours(): + a = SentimentAggregator() + now = datetime.now(timezone.utc) + twelve_hours_ago = now - timedelta(hours=12) + assert a._freshness_decay(twelve_hours_ago, now) == 0.3 + + +def test_freshness_decay_old(): + a = SentimentAggregator() + now = datetime.now(timezone.utc) + two_days_ago = now - timedelta(days=2) + assert a._freshness_decay(two_days_ago, now) == 0.0 + + +def test_compute_composite(): + a = SentimentAggregator() + composite = a._compute_composite( + news_score=0.5, + social_score=0.3, + policy_score=0.8, + filing_score=0.2, + ) + expected = 0.5 * 0.3 + 0.3 * 0.2 + 0.8 * 0.3 + 0.2 * 0.2 + assert abs(composite - expected) < 0.001 + + +def test_aggregate_news_by_symbol(aggregator): + now = datetime.now(timezone.utc) + news_items = [ + { + "symbols": ["AAPL"], + "sentiment": 0.8, + "category": "earnings", + "published_at": now, + }, + { + "symbols": ["AAPL"], + "sentiment": 0.3, + "category": "macro", + "published_at": now - timedelta(hours=2), + }, + { + "symbols": ["MSFT"], + "sentiment": -0.5, + "category": "policy", + "published_at": now, + }, + ] + scores = aggregator.aggregate(news_items, now) + + assert "AAPL" in scores + assert "MSFT" in scores + assert scores["AAPL"].news_count == 2 + assert scores["AAPL"].news_score > 0 # Positive overall + assert scores["MSFT"].policy_score < 0 # Negative policy + + +def test_aggregate_empty(aggregator): + now = datetime.now(timezone.utc) + scores = aggregator.aggregate([], now) + assert scores == {} + + +def test_determine_regime(): + a = SentimentAggregator() + assert a.determine_regime(15, None) == "risk_off" + assert a.determine_regime(15, 35.0) == "risk_off" + assert a.determine_regime(50, 35.0) == "risk_off" + assert a.determine_regime(70, 15.0) == "risk_on" + assert a.determine_regime(50, 20.0) == "neutral" +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest shared/tests/test_sentiment_aggregator.py -v` +Expected: FAIL — `SentimentAggregator` not in sentiment.py + +- [ ] **Step 3: Add SentimentAggregator to sentiment.py** + +Keep the existing `SentimentData` class (for backward compat with existing tests). Add `SentimentAggregator` class at the end of `shared/src/shared/sentiment.py`: + +```python +from datetime import timedelta +from shared.sentiment_models import SymbolScore + + +class SentimentAggregator: + """Aggregates per-news sentiment into per-symbol scores.""" + + # Weights: policy events are most impactful for US stocks + WEIGHTS = { + "news": 0.3, + "social": 0.2, + "policy": 0.3, + "filing": 0.2, + } + + # Category → score field mapping + CATEGORY_MAP = { + "earnings": "news", + "macro": "news", + "social": "social", + "policy": "policy", + "filing": "filing", + "fed": "policy", + } + + def _freshness_decay(self, published_at: datetime, now: datetime) -> float: + """Compute freshness decay factor.""" + age = now - published_at + hours = age.total_seconds() / 3600 + if hours < 1: + return 1.0 + if hours < 6: + return 0.7 + if hours < 24: + return 0.3 + return 0.0 + + def _compute_composite( + self, + news_score: float, + social_score: float, + policy_score: float, + filing_score: float, + ) -> float: + return ( + news_score * self.WEIGHTS["news"] + + social_score * self.WEIGHTS["social"] + + policy_score * self.WEIGHTS["policy"] + + filing_score * self.WEIGHTS["filing"] + ) + + def aggregate( + self, news_items: list[dict], now: datetime + ) -> dict[str, SymbolScore]: + """Aggregate news items into per-symbol scores. + + Each news_items dict must have: symbols, sentiment, category, published_at. + Returns dict mapping symbol → SymbolScore. + """ + # Accumulate per-symbol, per-category + symbol_data: dict[str, dict] = {} + + for item in news_items: + decay = self._freshness_decay(item["published_at"], now) + if decay == 0.0: + continue + + category = item.get("category", "macro") + score_field = self.CATEGORY_MAP.get(category, "news") + weighted_sentiment = item["sentiment"] * decay + + for symbol in item.get("symbols", []): + if symbol not in symbol_data: + symbol_data[symbol] = { + "news_scores": [], + "social_scores": [], + "policy_scores": [], + "filing_scores": [], + "count": 0, + } + + symbol_data[symbol][f"{score_field}_scores"].append(weighted_sentiment) + symbol_data[symbol]["count"] += 1 + + # Compute averages and composites + result = {} + for symbol, data in symbol_data.items(): + news_score = _safe_avg(data["news_scores"]) + social_score = _safe_avg(data["social_scores"]) + policy_score = _safe_avg(data["policy_scores"]) + filing_score = _safe_avg(data["filing_scores"]) + + result[symbol] = SymbolScore( + symbol=symbol, + news_score=news_score, + news_count=data["count"], + social_score=social_score, + policy_score=policy_score, + filing_score=filing_score, + composite=self._compute_composite( + news_score, social_score, policy_score, filing_score + ), + updated_at=now, + ) + + return result + + def determine_regime(self, fear_greed: int, vix: float | None) -> str: + """Determine market regime.""" + if fear_greed <= 20: + return "risk_off" + if vix is not None and vix > 30: + return "risk_off" + if fear_greed >= 60 and (vix is None or vix < 20): + return "risk_on" + return "neutral" + + +def _safe_avg(values: list[float]) -> float: + """Return average of values, or 0.0 if empty.""" + if not values: + return 0.0 + return sum(values) / len(values) +``` + +- [ ] **Step 4: Run new tests to verify they pass** + +Run: `pytest shared/tests/test_sentiment_aggregator.py -v` +Expected: All 9 tests PASS + +- [ ] **Step 5: Run existing sentiment tests for regressions** + +Run: `pytest shared/tests/test_sentiment.py -v` +Expected: All existing tests PASS (SentimentData unchanged) + +- [ ] **Step 6: Commit** + +```bash +git add shared/src/shared/sentiment.py shared/tests/test_sentiment_aggregator.py +git commit -m "feat: implement SentimentAggregator with freshness decay and composite scoring" +``` + +--- + +## Phase 4: Stock Selector Engine + +### Task 15: Implement stock selector + +**Files:** +- Create: `services/strategy-engine/src/strategy_engine/stock_selector.py` +- Create: `services/strategy-engine/tests/test_stock_selector.py` + +- [ ] **Step 1: Write tests** + +Create `services/strategy-engine/tests/test_stock_selector.py`: + +```python +"""Tests for stock selector engine.""" + +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +from datetime import datetime, timezone +from decimal import Decimal + +from shared.models import OrderSide +from shared.sentiment_models import SymbolScore, MarketSentiment, SelectedStock, Candidate + +from strategy_engine.stock_selector import ( + SentimentCandidateSource, + StockSelector, + _parse_llm_selections, +) + + +async def test_sentiment_candidate_source(): + mock_db = MagicMock() + mock_db.get_top_symbol_scores = AsyncMock(return_value=[ + {"symbol": "AAPL", "composite": 0.8, "news_count": 5}, + {"symbol": "NVDA", "composite": 0.6, "news_count": 3}, + ]) + + source = SentimentCandidateSource(mock_db) + candidates = await source.get_candidates() + + assert len(candidates) == 2 + assert candidates[0].symbol == "AAPL" + assert candidates[0].source == "sentiment" + + +def test_parse_llm_selections_valid(): + llm_response = """ + [ + {"symbol": "NVDA", "side": "BUY", "conviction": 0.85, "reason": "AI demand", "key_news": ["NVDA beats earnings"]}, + {"symbol": "XOM", "side": "BUY", "conviction": 0.72, "reason": "Oil surge", "key_news": ["Oil prices up"]} + ] + """ + selections = _parse_llm_selections(llm_response) + assert len(selections) == 2 + assert selections[0].symbol == "NVDA" + assert selections[0].conviction == 0.85 + + +def test_parse_llm_selections_invalid(): + selections = _parse_llm_selections("not json") + assert selections == [] + + +def test_parse_llm_selections_with_markdown(): + llm_response = """ + Here are my picks: + ```json + [ + {"symbol": "TSLA", "side": "BUY", "conviction": 0.7, "reason": "Momentum", "key_news": ["Tesla rally"]} + ] + ``` + """ + selections = _parse_llm_selections(llm_response) + assert len(selections) == 1 + assert selections[0].symbol == "TSLA" + + +async def test_selector_blocks_on_risk_off(): + mock_db = MagicMock() + mock_db.get_latest_market_sentiment = AsyncMock(return_value={ + "fear_greed": 15, + "fear_greed_label": "Extreme Fear", + "vix": 35.0, + "fed_stance": "neutral", + "market_regime": "risk_off", + "updated_at": datetime.now(timezone.utc), + }) + + selector = StockSelector(db=mock_db, broker=MagicMock(), alpaca=MagicMock(), anthropic_api_key="test") + result = await selector.select() + assert result == [] +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest services/strategy-engine/tests/test_stock_selector.py -v` +Expected: FAIL — module not found + +- [ ] **Step 3: Implement StockSelector** + +Create `services/strategy-engine/src/strategy_engine/stock_selector.py`: + +```python +"""Stock Selector Engine — 3-stage dynamic stock selection for MOC trading.""" + +import json +import logging +import re +from datetime import datetime, timezone +from decimal import Decimal +from typing import Optional + +import aiohttp + +from shared.alpaca import AlpacaClient +from shared.broker import RedisBroker +from shared.db import Database +from shared.models import OrderSide +from shared.sentiment_models import Candidate, MarketSentiment, SelectedStock, SymbolScore + +logger = logging.getLogger(__name__) + + +class SentimentCandidateSource: + """Get candidate stocks from sentiment scores in DB.""" + + def __init__(self, db: Database, limit: int = 20) -> None: + self._db = db + self._limit = limit + + async def get_candidates(self) -> list[Candidate]: + scores = await self._db.get_top_symbol_scores(limit=self._limit) + return [ + Candidate( + symbol=s["symbol"], + source="sentiment", + score=s["composite"], + reason=f"Sentiment composite={s['composite']:.2f}, news_count={s['news_count']}", + ) + for s in scores + if s["composite"] != 0 + ] + + +class LLMCandidateSource: + """Get candidate stocks by asking Claude to analyze today's top news.""" + + def __init__(self, db: Database, api_key: str, model: str) -> None: + self._db = db + self._api_key = api_key + self._model = model + + async def get_candidates(self) -> list[Candidate]: + news = await self._db.get_recent_news(hours=24) + if not news: + return [] + + headlines = [f"- [{n['source']}] {n['headline']} (sentiment: {n['sentiment']:.2f})" for n in news[:50]] + prompt = ( + "You are a stock market analyst. Based on today's news headlines below, " + "identify US stocks that are most likely to be affected (positively or negatively). " + "Return a JSON array of objects with: symbol, direction (BUY or SELL), score (0-1), reason.\n\n" + "Headlines:\n" + "\n".join(headlines) + "\n\n" + "Return ONLY the JSON array, no other text." + ) + + try: + async with aiohttp.ClientSession() as session: + async with session.post( + "https://api.anthropic.com/v1/messages", + headers={ + "x-api-key": self._api_key, + "anthropic-version": "2023-06-01", + "content-type": "application/json", + }, + json={ + "model": self._model, + "max_tokens": 1024, + "messages": [{"role": "user", "content": prompt}], + }, + timeout=aiohttp.ClientTimeout(total=30), + ) as resp: + if resp.status != 200: + logger.warning("llm_candidate_failed", status=resp.status) + return [] + data = await resp.json() + text = data["content"][0]["text"] + except Exception as exc: + logger.error("llm_candidate_error", error=str(exc)) + return [] + + return self._parse_response(text) + + def _parse_response(self, text: str) -> list[Candidate]: + try: + # Extract JSON from possible markdown code blocks + json_match = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL) + if json_match: + text = json_match.group(1) + items = json.loads(text) + except (json.JSONDecodeError, TypeError): + return [] + + candidates = [] + for item in items: + try: + direction = OrderSide(item.get("direction", "BUY")) + candidates.append( + Candidate( + symbol=item["symbol"], + source="llm", + direction=direction, + score=float(item.get("score", 0.5)), + reason=item.get("reason", "LLM recommendation"), + ) + ) + except (KeyError, ValueError): + continue + + return candidates + + +class StockSelector: + """3-stage stock selector: candidates → technical filter → LLM final pick.""" + + def __init__( + self, + db: Database, + broker: RedisBroker, + alpaca: AlpacaClient, + anthropic_api_key: str, + anthropic_model: str = "claude-sonnet-4-20250514", + max_picks: int = 3, + ) -> None: + self._db = db + self._broker = broker + self._alpaca = alpaca + self._api_key = anthropic_api_key + self._model = anthropic_model + self._max_picks = max_picks + self._sentiment_source = SentimentCandidateSource(db) + self._llm_source = LLMCandidateSource(db, anthropic_api_key, anthropic_model) + + async def select(self) -> list[SelectedStock]: + """Run full 3-stage selection. Returns list of SelectedStock.""" + # Check market sentiment gate + ms = await self._db.get_latest_market_sentiment() + if ms and ms.get("market_regime") == "risk_off": + logger.info("selection_blocked_risk_off") + return [] + + # Stage 1: Candidate pool + sentiment_candidates = await self._sentiment_source.get_candidates() + llm_candidates = await self._llm_source.get_candidates() + candidates = self._merge_candidates(sentiment_candidates, llm_candidates) + + if not candidates: + logger.info("no_candidates_found") + return [] + + logger.info("candidates_found", count=len(candidates)) + + # Stage 2: Technical filter + filtered = await self._technical_filter(candidates) + if not filtered: + logger.info("all_candidates_filtered_out") + return [] + + logger.info("technical_filter_passed", count=len(filtered)) + + # Stage 3: LLM final selection + selections = await self._llm_final_select(filtered, ms) + + # Publish to Redis + for selection in selections: + await self._broker.publish( + "selected_stocks", + selection.model_dump(mode="json"), + ) + + # Persist audit trail + from datetime import date as date_type + + for selection in selections: + score_data = await self._db.get_top_symbol_scores(limit=100) + snapshot = next( + (s for s in score_data if s["symbol"] == selection.symbol), + {}, + ) + await self._db.insert_stock_selection( + trade_date=date_type.today(), + symbol=selection.symbol, + side=selection.side.value, + conviction=selection.conviction, + reason=selection.reason, + key_news=selection.key_news, + sentiment_snapshot=snapshot, + ) + + return selections + + def _merge_candidates( + self, + sentiment: list[Candidate], + llm: list[Candidate], + ) -> list[Candidate]: + """Merge and deduplicate candidates, preferring higher scores.""" + by_symbol: dict[str, Candidate] = {} + for c in sentiment + llm: + if c.symbol not in by_symbol or c.score > by_symbol[c.symbol].score: + by_symbol[c.symbol] = c + return sorted(by_symbol.values(), key=lambda c: c.score, reverse=True) + + async def _technical_filter(self, candidates: list[Candidate]) -> list[Candidate]: + """Apply MOC-style technical screening to candidates.""" + import pandas as pd + + passed = [] + for candidate in candidates: + try: + bars = await self._alpaca.get_bars( + candidate.symbol, timeframe="1Day", limit=30 + ) + if not bars or len(bars) < 21: + continue + + closes = pd.Series([float(b["c"]) for b in bars]) + volumes = pd.Series([float(b["v"]) for b in bars]) + + # RSI + delta = closes.diff() + gain = delta.clip(lower=0) + loss = -delta.clip(upper=0) + avg_gain = gain.ewm(com=13, min_periods=14).mean() + avg_loss = loss.ewm(com=13, min_periods=14).mean() + rs = avg_gain / avg_loss.replace(0, float("nan")) + rsi = 100 - (100 / (1 + rs)) + current_rsi = rsi.iloc[-1] + + if pd.isna(current_rsi) or not (30 <= current_rsi <= 70): + continue + + # EMA + ema20 = closes.ewm(span=20, adjust=False).mean().iloc[-1] + if closes.iloc[-1] < ema20: + continue + + # Volume above average + vol_avg = volumes.iloc[-20:].mean() + if vol_avg > 0 and volumes.iloc[-1] < vol_avg * 0.5: + continue + + passed.append(candidate) + + except Exception as exc: + logger.warning("technical_filter_error", symbol=candidate.symbol, error=str(exc)) + continue + + return passed + + async def _llm_final_select( + self, + candidates: list[Candidate], + market_sentiment: Optional[dict], + ) -> list[SelectedStock]: + """Ask Claude to make final 2-3 picks from filtered candidates.""" + # Build context + candidate_info = [] + for c in candidates[:15]: + candidate_info.append(f"- {c.symbol}: score={c.score:.2f}, source={c.source}, reason={c.reason}") + + news = await self._db.get_recent_news(hours=12) + top_news = [f"- [{n['source']}] {n['headline']}" for n in news[:20]] + + ms_info = "No market sentiment data available." + if market_sentiment: + ms_info = ( + f"Fear & Greed: {market_sentiment.get('fear_greed', 'N/A')} " + f"({market_sentiment.get('fear_greed_label', 'N/A')}), " + f"VIX: {market_sentiment.get('vix', 'N/A')}, " + f"Fed Stance: {market_sentiment.get('fed_stance', 'N/A')}" + ) + + prompt = ( + f"You are a professional stock trader selecting {self._max_picks} stocks for " + f"Market-on-Close (MOC) overnight trading. You buy at market close and sell at " + f"next day's open.\n\n" + f"## Market Conditions\n{ms_info}\n\n" + f"## Candidate Stocks (pre-screened technically)\n" + + "\n".join(candidate_info) + "\n\n" + f"## Today's Key News\n" + + "\n".join(top_news) + "\n\n" + f"Select the best {self._max_picks} stocks. For each, provide:\n" + f"- symbol: ticker\n" + f"- side: BUY or SELL\n" + f"- conviction: 0.0-1.0\n" + f"- reason: one sentence\n" + f"- key_news: list of relevant headlines\n\n" + f"Return ONLY a JSON array. No other text." + ) + + try: + async with aiohttp.ClientSession() as session: + async with session.post( + "https://api.anthropic.com/v1/messages", + headers={ + "x-api-key": self._api_key, + "anthropic-version": "2023-06-01", + "content-type": "application/json", + }, + json={ + "model": self._model, + "max_tokens": 1024, + "messages": [{"role": "user", "content": prompt}], + }, + timeout=aiohttp.ClientTimeout(total=30), + ) as resp: + if resp.status != 200: + logger.error("llm_final_select_failed", status=resp.status) + return [] + data = await resp.json() + text = data["content"][0]["text"] + except Exception as exc: + logger.error("llm_final_select_error", error=str(exc)) + return [] + + return _parse_llm_selections(text) + + +def _parse_llm_selections(text: str) -> list[SelectedStock]: + """Parse LLM response into SelectedStock list.""" + try: + json_match = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL) + if json_match: + text = json_match.group(1) + # Also try to find a bare JSON array + array_match = re.search(r"\[.*\]", text, re.DOTALL) + if array_match: + text = array_match.group(0) + items = json.loads(text) + except (json.JSONDecodeError, TypeError): + return [] + + selections = [] + for item in items: + try: + selections.append( + SelectedStock( + symbol=item["symbol"], + side=OrderSide(item.get("side", "BUY")), + conviction=float(item.get("conviction", 0.5)), + reason=item.get("reason", ""), + key_news=item.get("key_news", []), + ) + ) + except (KeyError, ValueError): + continue + + return selections +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest services/strategy-engine/tests/test_stock_selector.py -v` +Expected: All 5 tests PASS + +- [ ] **Step 5: Run all strategy engine tests for regressions** + +Run: `pytest services/strategy-engine/tests/ -v` +Expected: All tests PASS + +- [ ] **Step 6: Commit** + +```bash +git add services/strategy-engine/src/strategy_engine/stock_selector.py services/strategy-engine/tests/test_stock_selector.py +git commit -m "feat: implement 3-stage stock selector (sentiment → technical → LLM)" +``` + +--- + +## Phase 5: Integration (MOC + Notifications + Docker) + +### Task 16: Add Telegram notification for stock selections + +**Files:** +- Modify: `shared/src/shared/notifier.py` +- Modify: `shared/tests/test_notifier.py` + +- [ ] **Step 1: Add send_stock_selection method to notifier.py** + +Add this method and import to `shared/src/shared/notifier.py`: + +Add to imports: +```python +from shared.sentiment_models import SelectedStock, MarketSentiment +``` + +Add method to `TelegramNotifier` class: + +```python + async def send_stock_selection( + self, + selections: list[SelectedStock], + market: MarketSentiment | None = None, + ) -> None: + """Format and send stock selection notification.""" + lines = [f"<b>📊 Stock Selection ({len(selections)} picks)</b>", ""] + + side_emoji = {"BUY": "🟢", "SELL": "🔴"} + + for i, s in enumerate(selections, 1): + emoji = side_emoji.get(s.side.value, "⚪") + lines.append( + f"{i}. <b>{s.symbol}</b> {emoji} {s.side.value} " + f"(conviction: {s.conviction:.0%})" + ) + lines.append(f" {s.reason}") + if s.key_news: + lines.append(f" News: {s.key_news[0]}") + lines.append("") + + if market: + lines.append( + f"Market: F&G {market.fear_greed} ({market.fear_greed_label})" + + (f" | VIX {market.vix:.1f}" if market.vix else "") + ) + + await self.send("\n".join(lines)) +``` + +- [ ] **Step 2: Add test for the new method** + +Add to `shared/tests/test_notifier.py`: + +```python +from shared.models import OrderSide +from shared.sentiment_models import SelectedStock, MarketSentiment +from datetime import datetime, timezone + + +async def test_send_stock_selection(notifier, mock_session): + """Test stock selection notification formatting.""" + selections = [ + SelectedStock( + symbol="NVDA", + side=OrderSide.BUY, + conviction=0.85, + reason="CHIPS Act expansion", + key_news=["Trump signs CHIPS Act"], + ), + ] + market = MarketSentiment( + fear_greed=55, + fear_greed_label="Neutral", + vix=18.2, + fed_stance="neutral", + market_regime="neutral", + updated_at=datetime.now(timezone.utc), + ) + await notifier.send_stock_selection(selections, market) + mock_session.post.assert_called_once() +``` + +Note: Check `shared/tests/test_notifier.py` for existing fixture names (`notifier`, `mock_session`) and adapt accordingly. + +- [ ] **Step 3: Run notifier tests** + +Run: `pytest shared/tests/test_notifier.py -v` +Expected: All tests PASS + +- [ ] **Step 4: Commit** + +```bash +git add shared/src/shared/notifier.py shared/tests/test_notifier.py +git commit -m "feat: add Telegram notification for stock selections" +``` + +--- + +### Task 17: Integrate stock selector with MOC strategy + +**Files:** +- Modify: `services/strategy-engine/src/strategy_engine/main.py` +- Modify: `services/strategy-engine/src/strategy_engine/config.py` + +- [ ] **Step 1: Update strategy engine config** + +Add to `StrategyConfig` in `services/strategy-engine/src/strategy_engine/config.py`: + +```python + selector_candidates_time: str = "15:00" + selector_filter_time: str = "15:15" + selector_final_time: str = "15:30" + selector_max_picks: int = 3 + anthropic_api_key: str = "" + anthropic_model: str = "claude-sonnet-4-20250514" +``` + +- [ ] **Step 2: Add stock selector scheduling to main.py** + +Add a new coroutine to `services/strategy-engine/src/strategy_engine/main.py` that runs the stock selector at the configured times. Add imports: + +```python +from shared.alpaca import AlpacaClient +from shared.db import Database +from shared.notifier import TelegramNotifier +from shared.sentiment_models import MarketSentiment +from strategy_engine.stock_selector import StockSelector +``` + +Add the selector loop function: + +```python +async def run_stock_selector( + selector: StockSelector, + notifier: TelegramNotifier, + db: Database, + config: StrategyConfig, + log, +) -> None: + """Run the stock selector once per day at the configured time.""" + import zoneinfo + + et = zoneinfo.ZoneInfo("America/New_York") + + while True: + now_et = datetime.now(et) + target_hour, target_min = map(int, config.selector_final_time.split(":")) + + # Check if it's time to run (within 1-minute window) + if now_et.hour == target_hour and now_et.minute == target_min: + log.info("stock_selector_running") + try: + selections = await selector.select() + if selections: + ms_data = await db.get_latest_market_sentiment() + ms = None + if ms_data: + ms = MarketSentiment(**ms_data) + await notifier.send_stock_selection(selections, ms) + log.info( + "stock_selector_complete", + picks=[s.symbol for s in selections], + ) + else: + log.info("stock_selector_no_picks") + except Exception as exc: + log.error("stock_selector_error", error=str(exc)) + # Sleep past this minute to avoid re-triggering + await asyncio.sleep(120) + else: + await asyncio.sleep(30) +``` + +In the `run()` function, add after creating the broker: + +```python + db = Database(config.database_url) + await db.connect() + + alpaca = AlpacaClient( + api_key=config.alpaca_api_key, + api_secret=config.alpaca_api_secret, + paper=config.alpaca_paper, + ) +``` + +And add the selector if anthropic key is configured: + +```python + if config.anthropic_api_key: + selector = StockSelector( + db=db, + broker=broker, + alpaca=alpaca, + anthropic_api_key=config.anthropic_api_key, + anthropic_model=config.anthropic_model, + max_picks=config.selector_max_picks, + ) + tasks.append(asyncio.create_task( + run_stock_selector(selector, notifier, db, config, log) + )) + log.info("stock_selector_enabled", time=config.selector_final_time) +``` + +Add to the `finally` block: + +```python + await alpaca.close() + await db.close() +``` + +- [ ] **Step 3: Run strategy engine tests for regressions** + +Run: `pytest services/strategy-engine/tests/ -v` +Expected: All tests PASS + +- [ ] **Step 4: Commit** + +```bash +git add services/strategy-engine/src/strategy_engine/main.py services/strategy-engine/src/strategy_engine/config.py +git commit -m "feat: integrate stock selector into strategy engine scheduler" +``` + +--- + +### Task 18: Update Docker Compose and .env + +**Files:** +- Modify: `docker-compose.yml` +- Modify: `.env.example` (already done in Task 5, just verify) + +- [ ] **Step 1: Add news-collector service to docker-compose.yml** + +Add before the `loki:` service block in `docker-compose.yml`: + +```yaml + news-collector: + build: + context: . + dockerfile: services/news-collector/Dockerfile + env_file: .env + ports: + - "8084:8084" + depends_on: + redis: + condition: service_healthy + postgres: + condition: service_healthy + healthcheck: + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8084/health')"] + interval: 10s + timeout: 5s + retries: 3 + restart: unless-stopped +``` + +- [ ] **Step 2: Verify compose file is valid** + +Run: `docker compose config --quiet 2>&1 || echo "INVALID"` +Expected: No output (valid) or compose config displayed without errors + +- [ ] **Step 3: Commit** + +```bash +git add docker-compose.yml +git commit -m "feat: add news-collector service to Docker Compose" +``` + +--- + +### Task 19: Run full test suite and lint + +- [ ] **Step 1: Install test dependencies** + +Run: `pip install -e shared/ && pip install aiosqlite feedparser nltk aioresponses` + +- [ ] **Step 2: Download VADER lexicon** + +Run: `python -c "import nltk; nltk.download('vader_lexicon', quiet=True)"` + +- [ ] **Step 3: Run lint** + +Run: `make lint` +Expected: No lint errors. If there are errors, fix them. + +- [ ] **Step 4: Run full test suite** + +Run: `make test` +Expected: All tests PASS + +- [ ] **Step 5: Fix any issues found in steps 3-4** + +If lint or tests fail, fix the issues and re-run. + +- [ ] **Step 6: Final commit if any fixes were needed** + +```bash +git add -A +git commit -m "fix: resolve lint and test issues from news selector integration" +``` diff --git a/docs/superpowers/specs/2026-04-02-news-driven-stock-selector-design.md b/docs/superpowers/specs/2026-04-02-news-driven-stock-selector-design.md new file mode 100644 index 0000000..d439154 --- /dev/null +++ b/docs/superpowers/specs/2026-04-02-news-driven-stock-selector-design.md @@ -0,0 +1,418 @@ +# News-Driven Stock Selector Design + +**Date:** 2026-04-02 +**Goal:** Upgrade the MOC (Market on Close) strategy from fixed symbol lists to dynamic, news-driven stock selection. The system collects news/sentiment data continuously, then selects 2-3 optimal stocks daily before market close. + +--- + +## Architecture Overview + +``` +[Continuous Collection] [Pre-Close Decision] +Finnhub News ─┐ +RSS Feeds ─┤ +SEC EDGAR ─┤ +Truth Social ─┼→ DB (news_items) → Sentiment Aggregator → symbol_scores +Reddit ─┤ + Redis "news" (every 15 min) market_sentiment +Fear & Greed ─┤ +FOMC/Fed ─┘ + + 15:00 ET ─→ Candidate Pool (sentiment top + LLM picks) + 15:15 ET ─→ Technical Filter (RSI, EMA, volume) + 15:30 ET ─→ LLM Final Selection (2-3 stocks) → Telegram + 15:50 ET ─→ MOC Buy Execution + 09:35 ET ─→ Next-day Sell (existing MOC logic) +``` + +## 1. News Collector Service + +New service: `services/news-collector/` + +### Structure + +``` +services/news-collector/ +├── Dockerfile +├── pyproject.toml +├── src/news_collector/ +│ ├── __init__.py +│ ├── main.py # Scheduler: runs each collector on its interval +│ ├── config.py +│ └── collectors/ +│ ├── __init__.py +│ ├── base.py # BaseCollector ABC +│ ├── finnhub.py # Finnhub market news (free, 60 req/min) +│ ├── rss.py # Yahoo Finance, Google News, MarketWatch RSS +│ ├── sec_edgar.py # SEC EDGAR 8-K/10-Q filings +│ ├── truth_social.py # Truth Social scraping (Trump posts) +│ ├── reddit.py # Reddit (r/wallstreetbets, r/stocks) +│ ├── fear_greed.py # CNN Fear & Greed Index scraping +│ └── fed.py # FOMC statements, Fed announcements +└── tests/ +``` + +### BaseCollector Interface + +```python +class BaseCollector(ABC): + name: str + poll_interval: int # seconds + + @abstractmethod + async def collect(self) -> list[NewsItem]: + """Collect and return list of NewsItem.""" + + @abstractmethod + async def is_available(self) -> bool: + """Check if this source is accessible (API key present, endpoint reachable).""" +``` + +### Poll Intervals + +| Collector | Interval | Notes | +|-----------|----------|-------| +| Finnhub | 5 min | Free tier: 60 calls/min | +| RSS (Yahoo/Google/MarketWatch) | 10 min | Headlines only | +| SEC EDGAR | 30 min | Focus on 8-K filings | +| Truth Social | 15 min | Scraping | +| Reddit | 15 min | Hot posts from relevant subs | +| Fear & Greed | 1 hour | Updates once daily but check periodically | +| FOMC/Fed | 1 hour | Infrequent events | + +### Provider Abstraction (for paid upgrade path) + +```python +# config.yaml +collectors: + news: + provider: "finnhub" # swap to "benzinga" for paid + api_key: ${FINNHUB_API_KEY} + social: + provider: "reddit" # swap to "stocktwits_pro" etc. + policy: + provider: "truth_social" # swap to "twitter_api" etc. + +# Factory +COLLECTOR_REGISTRY = { + "finnhub": FinnhubCollector, + "rss": RSSCollector, + "benzinga": BenzingaCollector, # added later +} +``` + +## 2. Shared Models (additions to shared/) + +### NewsItem (shared/models.py) + +```python +class NewsCategory(str, Enum): + POLICY = "policy" + EARNINGS = "earnings" + MACRO = "macro" + SOCIAL = "social" + FILING = "filing" + FED = "fed" + +class NewsItem(BaseModel): + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + source: str # "finnhub", "rss", "sec_edgar", etc. + headline: str + summary: str | None = None + url: str | None = None + published_at: datetime + symbols: list[str] = [] # Related tickers (if identifiable) + sentiment: float # -1.0 to 1.0 (first-pass analysis at collection) + category: NewsCategory + raw_data: dict = {} + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) +``` + +### SymbolScore (shared/sentiment_models.py — new file) + +```python +class SymbolScore(BaseModel): + symbol: str + news_score: float # -1.0 to 1.0, weighted avg of news sentiment + news_count: int # Number of news items in last 24h + social_score: float # Reddit/social sentiment + policy_score: float # Policy-related impact + filing_score: float # SEC filing impact + composite: float # Weighted final score + updated_at: datetime + +class MarketSentiment(BaseModel): + fear_greed: int # 0-100 + fear_greed_label: str # "Extreme Fear", "Fear", "Neutral", "Greed", "Extreme Greed" + vix: float | None = None + fed_stance: str # "hawkish", "neutral", "dovish" + market_regime: str # "risk_on", "neutral", "risk_off" + updated_at: datetime + +class SelectedStock(BaseModel): + symbol: str + side: OrderSide # BUY or SELL + conviction: float # 0.0 to 1.0 + reason: str # Selection rationale + key_news: list[str] # Key news headlines + +class Candidate(BaseModel): + symbol: str + source: str # "sentiment" or "llm" + direction: OrderSide | None = None # Suggested direction (if known) + score: float # Relevance/priority score + reason: str # Why this candidate was selected +``` + +## 3. Sentiment Analysis Pipeline + +### Location + +Refactor existing `shared/src/shared/sentiment.py`. + +### Two-Stage Analysis + +**Stage 1: Per-news sentiment (at collection time)** +- VADER (nltk.sentiment, free) for English headlines +- Keyword rule engine for domain-specific terms (e.g., "tariff" → negative for importers, positive for domestic producers) +- Score stored in `NewsItem.sentiment` + +**Stage 2: Per-symbol aggregation (every 15 minutes)** + +``` +composite = ( + news_score * 0.3 + + social_score * 0.2 + + policy_score * 0.3 + + filing_score * 0.2 +) * freshness_decay +``` + +Freshness decay: +- < 1 hour: 1.0 +- 1-6 hours: 0.7 +- 6-24 hours: 0.3 +- > 24 hours: excluded + +Policy score weighted high because US stock market is heavily influenced by policy events (tariffs, regulation, subsidies). + +### Market-Level Gating + +`MarketSentiment.market_regime` determination: +- `risk_off`: Fear & Greed < 20 OR VIX > 30 → **block all trades** +- `risk_on`: Fear & Greed > 60 AND VIX < 20 +- `neutral`: everything else + +This extends the existing `sentiment.py` `should_block()` logic. + +## 4. Stock Selector Engine + +### Location + +`services/strategy-engine/src/strategy_engine/stock_selector.py` + +### Three-Stage Selection Process + +**Stage 1: Candidate Pool (15:00 ET)** + +Two candidate sources, results merged (deduplicated): + +```python +class CandidateSource(ABC): + @abstractmethod + async def get_candidates(self) -> list[Candidate] + +class SentimentCandidateSource(CandidateSource): + """Top N symbols by composite SymbolScore from DB.""" + +class LLMCandidateSource(CandidateSource): + """Send today's top news summary to Claude, get related symbols + direction.""" +``` + +- SentimentCandidateSource: top 20 by composite score +- LLMCandidateSource: Claude analyzes today's major news and recommends affected symbols +- Merged pool: typically 20-30 candidates + +**Stage 2: Technical Filter (15:15 ET)** + +Apply existing MOC screening criteria to candidates: +- Fetch recent price data from Alpaca for all candidates +- RSI 30-60 +- Price > 20-period EMA +- Volume > average +- Bullish candle pattern +- Result: typically 5-10 survivors + +**Stage 3: LLM Final Selection (15:30 ET)** + +Send to Claude: +- Filtered candidate list with technical indicators +- Per-symbol sentiment scores and top news headlines +- Market sentiment (Fear & Greed, VIX, Fed stance) +- Prompt: "Select 2-3 stocks for MOC trading with rationale" + +Response parsed into `list[SelectedStock]`. + +### Integration with MOC Strategy + +Current: MOC strategy receives candles for fixed symbols and decides internally. + +New flow: +1. `StockSelector` publishes `SelectedStock` list to Redis stream `selected_stocks` at 15:30 ET +2. MOC strategy reads `selected_stocks` to get today's targets +3. MOC still applies its own technical checks at 15:50-16:00 as a safety net +4. If a selected stock fails the final technical check, it's skipped (no forced trades) + +## 5. Database Schema + +Four new tables via Alembic migration: + +```sql +CREATE TABLE news_items ( + id UUID PRIMARY KEY, + source VARCHAR(50) NOT NULL, + headline TEXT NOT NULL, + summary TEXT, + url TEXT, + published_at TIMESTAMPTZ NOT NULL, + symbols TEXT[], + sentiment FLOAT NOT NULL, + category VARCHAR(50) NOT NULL, + raw_data JSONB DEFAULT '{}', + created_at TIMESTAMPTZ DEFAULT NOW() +); +CREATE INDEX idx_news_items_published ON news_items(published_at); +CREATE INDEX idx_news_items_symbols ON news_items USING GIN(symbols); + +CREATE TABLE symbol_scores ( + id UUID PRIMARY KEY, + symbol VARCHAR(10) NOT NULL, + news_score FLOAT NOT NULL DEFAULT 0, + news_count INT NOT NULL DEFAULT 0, + social_score FLOAT NOT NULL DEFAULT 0, + policy_score FLOAT NOT NULL DEFAULT 0, + filing_score FLOAT NOT NULL DEFAULT 0, + composite FLOAT NOT NULL DEFAULT 0, + updated_at TIMESTAMPTZ NOT NULL +); +CREATE UNIQUE INDEX idx_symbol_scores_symbol ON symbol_scores(symbol); + +CREATE TABLE market_sentiment ( + id UUID PRIMARY KEY, + fear_greed INT NOT NULL, + fear_greed_label VARCHAR(30) NOT NULL, + vix FLOAT, + fed_stance VARCHAR(20) NOT NULL DEFAULT 'neutral', + market_regime VARCHAR(20) NOT NULL DEFAULT 'neutral', + updated_at TIMESTAMPTZ NOT NULL +); + +CREATE TABLE stock_selections ( + id UUID PRIMARY KEY, + trade_date DATE NOT NULL, + symbol VARCHAR(10) NOT NULL, + side VARCHAR(4) NOT NULL, + conviction FLOAT NOT NULL, + reason TEXT NOT NULL, + key_news JSONB DEFAULT '[]', + sentiment_snapshot JSONB DEFAULT '{}', + created_at TIMESTAMPTZ DEFAULT NOW() +); +CREATE INDEX idx_stock_selections_date ON stock_selections(trade_date); +``` + +`stock_selections` stores an audit trail: why each stock was selected, enabling post-hoc analysis of selection quality. + +## 6. Redis Streams + +| Stream | Producer | Consumer | Payload | +|--------|----------|----------|---------| +| `news` | news-collector | strategy-engine (sentiment aggregator) | NewsItem | +| `selected_stocks` | stock-selector | MOC strategy | SelectedStock | + +Existing streams (`candles`, `signals`, `orders`) unchanged. + +## 7. Docker Compose Addition + +```yaml +news-collector: + build: + context: . + dockerfile: services/news-collector/Dockerfile + env_file: .env + ports: + - "8084:8084" + depends_on: + redis: { condition: service_healthy } + postgres: { condition: service_healthy } + healthcheck: + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8084/health')"] + interval: 10s + timeout: 5s + retries: 3 + restart: unless-stopped +``` + +## 8. Environment Variables + +```bash +# News Collector +FINNHUB_API_KEY= # Free key from finnhub.io +NEWS_POLL_INTERVAL=300 # Default 5 min (overrides per-collector defaults) +SENTIMENT_AGGREGATE_INTERVAL=900 # 15 min + +# Stock Selector +SELECTOR_CANDIDATES_TIME=15:00 # ET, candidate pool generation +SELECTOR_FILTER_TIME=15:15 # ET, technical filter +SELECTOR_FINAL_TIME=15:30 # ET, LLM final pick +SELECTOR_MAX_PICKS=3 + +# LLM (for stock selector + screener) +ANTHROPIC_API_KEY= +ANTHROPIC_MODEL=claude-sonnet-4-20250514 +``` + +## 9. Telegram Notifications + +Extend existing `shared/notifier.py` with: + +```python +async def send_stock_selection(self, selections: list[SelectedStock], market: MarketSentiment): + """ + 📊 오늘의 종목 선정 (2/3) + + 1. NVDA 🟢 BUY (확신도: 0.85) + 근거: 트럼프 반도체 보조금 확대 발표, RSI 42 + 핵심뉴스: "Trump signs CHIPS Act expansion..." + + 2. XOM 🟢 BUY (확신도: 0.72) + 근거: 유가 상승 + 실적 서프라이즈, 볼륨 급증 + + 시장심리: Fear & Greed 55 (Neutral) | VIX 18.2 + """ +``` + +## 10. Testing Strategy + +**Unit tests:** +- Each collector: mock HTTP responses → verify NewsItem parsing +- Sentiment analysis: verify VADER + keyword scoring +- Aggregator: mock news data → verify SymbolScore calculation and freshness decay +- Stock selector: mock scores → verify candidate/filter/selection pipeline +- LLM calls: mock Claude response → verify SelectedStock parsing + +**Integration tests:** +- Full pipeline: news collection → DB → aggregation → selection +- Market gating: verify `risk_off` blocks all trades +- MOC integration: verify selected stocks flow to MOC strategy + +**Post-hoc analysis (future):** +- Use `stock_selections` audit trail to measure selection accuracy +- Historical news data replay for backtesting requires paid data (deferred) + +## 11. Out of Scope (Future) + +- Paid API integration (designed for, not implemented) +- Historical news backtesting +- WebSocket real-time news streaming +- Multi-language sentiment analysis +- Options/derivatives signals diff --git a/services/news-collector/Dockerfile b/services/news-collector/Dockerfile new file mode 100644 index 0000000..a8e5902 --- /dev/null +++ b/services/news-collector/Dockerfile @@ -0,0 +1,9 @@ +FROM python:3.12-slim +WORKDIR /app +COPY shared/ shared/ +RUN pip install --no-cache-dir ./shared +COPY services/news-collector/ services/news-collector/ +RUN pip install --no-cache-dir ./services/news-collector +RUN python -c "import nltk; nltk.download('vader_lexicon', quiet=True)" +ENV PYTHONPATH=/app +CMD ["python", "-m", "news_collector.main"] diff --git a/services/news-collector/pyproject.toml b/services/news-collector/pyproject.toml new file mode 100644 index 0000000..14c856a --- /dev/null +++ b/services/news-collector/pyproject.toml @@ -0,0 +1,25 @@ +[project] +name = "news-collector" +version = "0.1.0" +description = "News and sentiment data collector service" +requires-python = ">=3.12" +dependencies = [ + "trading-shared", + "feedparser>=6.0", + "nltk>=3.8", + "aiohttp>=3.9", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0", + "pytest-asyncio>=0.23", + "aioresponses>=0.7", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/news_collector"] diff --git a/services/news-collector/src/news_collector/__init__.py b/services/news-collector/src/news_collector/__init__.py new file mode 100644 index 0000000..5547af2 --- /dev/null +++ b/services/news-collector/src/news_collector/__init__.py @@ -0,0 +1 @@ +"""News collector service.""" diff --git a/services/news-collector/src/news_collector/collectors/__init__.py b/services/news-collector/src/news_collector/collectors/__init__.py new file mode 100644 index 0000000..5ef36a7 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/__init__.py @@ -0,0 +1 @@ +"""News collectors.""" diff --git a/services/news-collector/src/news_collector/collectors/base.py b/services/news-collector/src/news_collector/collectors/base.py new file mode 100644 index 0000000..bb43fd6 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/base.py @@ -0,0 +1,18 @@ +"""Base class for all news collectors.""" + +from abc import ABC, abstractmethod + +from shared.models import NewsItem + + +class BaseCollector(ABC): + name: str = "base" + poll_interval: int = 300 # seconds + + @abstractmethod + async def collect(self) -> list[NewsItem]: + """Collect news items from the source.""" + + @abstractmethod + async def is_available(self) -> bool: + """Check if this data source is accessible.""" diff --git a/services/news-collector/src/news_collector/collectors/fear_greed.py b/services/news-collector/src/news_collector/collectors/fear_greed.py new file mode 100644 index 0000000..f79f716 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/fear_greed.py @@ -0,0 +1,63 @@ +"""CNN Fear & Greed Index collector.""" + +import logging +from dataclasses import dataclass +from typing import Optional + +import aiohttp + +from news_collector.collectors.base import BaseCollector + +logger = logging.getLogger(__name__) + +FEAR_GREED_URL = "https://production.dataviz.cnn.io/index/fearandgreed/graphdata" + + +@dataclass +class FearGreedResult: + fear_greed: int + fear_greed_label: str + + +class FearGreedCollector(BaseCollector): + name = "fear_greed" + poll_interval = 3600 # 1 hour + + async def is_available(self) -> bool: + return True + + async def _fetch_index(self) -> Optional[dict]: + headers = {"User-Agent": "Mozilla/5.0"} + try: + async with aiohttp.ClientSession() as session: + async with session.get( + FEAR_GREED_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=10) + ) as resp: + if resp.status != 200: + return None + return await resp.json() + except Exception: + return None + + def _classify(self, score: int) -> str: + if score <= 20: + return "Extreme Fear" + if score <= 40: + return "Fear" + if score <= 60: + return "Neutral" + if score <= 80: + return "Greed" + return "Extreme Greed" + + async def collect(self) -> Optional[FearGreedResult]: + data = await self._fetch_index() + if data is None: + return None + try: + fg = data["fear_and_greed"] + score = int(fg["score"]) + label = fg.get("rating", self._classify(score)) + return FearGreedResult(fear_greed=score, fear_greed_label=label) + except (KeyError, ValueError, TypeError): + return None diff --git a/services/news-collector/src/news_collector/collectors/fed.py b/services/news-collector/src/news_collector/collectors/fed.py new file mode 100644 index 0000000..fce4842 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/fed.py @@ -0,0 +1,119 @@ +"""Federal Reserve RSS collector with hawkish/dovish/neutral stance detection.""" + +import asyncio +import logging +from calendar import timegm +from datetime import datetime, timezone + +import feedparser +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem + +from .base import BaseCollector + +logger = logging.getLogger(__name__) + +_FED_RSS_URL = "https://www.federalreserve.gov/feeds/press_all.xml" + +_HAWKISH_KEYWORDS = [ + "rate hike", + "interest rate increase", + "tighten", + "tightening", + "inflation", + "hawkish", + "restrictive", + "raise rates", + "hike rates", +] +_DOVISH_KEYWORDS = [ + "rate cut", + "interest rate decrease", + "easing", + "ease", + "stimulus", + "dovish", + "accommodative", + "lower rates", + "cut rates", + "quantitative easing", +] + + +def _detect_stance(text: str) -> str: + lower = text.lower() + hawkish_hits = sum(1 for kw in _HAWKISH_KEYWORDS if kw in lower) + dovish_hits = sum(1 for kw in _DOVISH_KEYWORDS if kw in lower) + if hawkish_hits > dovish_hits: + return "hawkish" + if dovish_hits > hawkish_hits: + return "dovish" + return "neutral" + + +class FedCollector(BaseCollector): + name: str = "fed" + poll_interval: int = 3600 + + def __init__(self) -> None: + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_fed_rss(self) -> list[dict]: + loop = asyncio.get_event_loop() + try: + parsed = await loop.run_in_executor(None, feedparser.parse, _FED_RSS_URL) + return parsed.get("entries", []) + except Exception as exc: + logger.error("Fed RSS fetch failed: %s", exc) + return [] + + def _parse_published(self, entry: dict) -> datetime: + published_parsed = entry.get("published_parsed") + if published_parsed: + try: + ts = timegm(published_parsed) + return datetime.fromtimestamp(ts, tz=timezone.utc) + except Exception: + pass + return datetime.now(timezone.utc) + + async def collect(self) -> list[NewsItem]: + try: + entries = await self._fetch_fed_rss() + except Exception as exc: + logger.error("Fed collector error: %s", exc) + return [] + + items: list[NewsItem] = [] + + for entry in entries: + title = entry.get("title", "").strip() + if not title: + continue + + summary = entry.get("summary", "") or "" + combined = f"{title} {summary}" + + sentiment = self._vader.polarity_scores(combined)["compound"] + stance = _detect_stance(combined) + published_at = self._parse_published(entry) + + items.append( + NewsItem( + source=self.name, + headline=title, + summary=summary or None, + url=entry.get("link") or None, + published_at=published_at, + symbols=[], + sentiment=sentiment, + category=NewsCategory.FED, + raw_data={"stance": stance, **dict(entry)}, + ) + ) + + return items diff --git a/services/news-collector/src/news_collector/collectors/finnhub.py b/services/news-collector/src/news_collector/collectors/finnhub.py new file mode 100644 index 0000000..13e3602 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/finnhub.py @@ -0,0 +1,88 @@ +"""Finnhub news collector with VADER sentiment analysis.""" + +import logging +from datetime import datetime, timezone + +import aiohttp +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem + +from .base import BaseCollector + +logger = logging.getLogger(__name__) + +_CATEGORY_KEYWORDS: dict[NewsCategory, list[str]] = { + NewsCategory.FED: ["fed", "fomc", "rate", "federal reserve"], + NewsCategory.POLICY: ["tariff", "trump", "regulation", "policy", "trade war"], + NewsCategory.EARNINGS: ["earnings", "revenue", "profit", "eps", "guidance", "quarter"], +} + + +def _categorize(text: str) -> NewsCategory: + lower = text.lower() + for category, keywords in _CATEGORY_KEYWORDS.items(): + if any(kw in lower for kw in keywords): + return category + return NewsCategory.MACRO + + +class FinnhubCollector(BaseCollector): + name: str = "finnhub" + poll_interval: int = 300 + + _BASE_URL = "https://finnhub.io/api/v1/news" + + def __init__(self, api_key: str) -> None: + self._api_key = api_key + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return bool(self._api_key) + + async def _fetch_news(self) -> list[dict]: + url = f"{self._BASE_URL}?category=general&token={self._api_key}" + async with aiohttp.ClientSession() as session: + async with session.get(url) as resp: + resp.raise_for_status() + return await resp.json() + + async def collect(self) -> list[NewsItem]: + try: + raw_items = await self._fetch_news() + except Exception as exc: + logger.error("Finnhub fetch failed: %s", exc) + return [] + + items: list[NewsItem] = [] + for article in raw_items: + headline = article.get("headline", "") + summary = article.get("summary", "") + combined = f"{headline} {summary}" + + sentiment_scores = self._vader.polarity_scores(combined) + sentiment = sentiment_scores["compound"] + + ts = article.get("datetime", 0) + published_at = datetime.fromtimestamp(ts, tz=timezone.utc) + + related = article.get("related", "") + symbols = [t.strip() for t in related.split(",") if t.strip()] if related else [] + + category = _categorize(combined) + + items.append( + NewsItem( + source=self.name, + headline=headline, + summary=summary or None, + url=article.get("url") or None, + published_at=published_at, + symbols=symbols, + sentiment=sentiment, + category=category, + raw_data=article, + ) + ) + + return items diff --git a/services/news-collector/src/news_collector/collectors/reddit.py b/services/news-collector/src/news_collector/collectors/reddit.py new file mode 100644 index 0000000..226a2f9 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/reddit.py @@ -0,0 +1,97 @@ +"""Reddit social sentiment collector using JSON API with VADER sentiment analysis.""" + +import logging +import re +from datetime import datetime, timezone + +import aiohttp +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem + +from .base import BaseCollector + +logger = logging.getLogger(__name__) + +_SUBREDDITS = ["wallstreetbets", "stocks", "investing"] +_MIN_SCORE = 50 + +_TICKER_PATTERN = re.compile( + r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|BRK\.?[AB]|JPM|V|UNH|XOM|" + r"JNJ|WMT|MA|PG|HD|CVX|MRK|LLY|ABBV|PFE|BAC|KO|AVGO|COST|MCD|TMO|" + r"CSCO|ACN|ABT|DHR|TXN|NEE|NFLX|PM|UPS|RTX|HON|QCOM|AMGN|LOW|IBM|" + r"INTC|AMD|PYPL|GS|MS|BLK|SPGI|CAT|DE|GE|MMM|BA|F|GM|DIS|CMCSA)\b" +) + + +class RedditCollector(BaseCollector): + name: str = "reddit" + poll_interval: int = 900 + + def __init__(self) -> None: + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_subreddit(self, subreddit: str) -> list[dict]: + url = f"https://www.reddit.com/r/{subreddit}/hot.json?limit=25" + headers = {"User-Agent": "TradingPlatform/1.0 (research@example.com)"} + try: + async with aiohttp.ClientSession() as session: + async with session.get( + url, headers=headers, timeout=aiohttp.ClientTimeout(total=10) + ) as resp: + if resp.status == 200: + data = await resp.json() + return data.get("data", {}).get("children", []) + except Exception as exc: + logger.error("Reddit fetch failed for r/%s: %s", subreddit, exc) + return [] + + async def collect(self) -> list[NewsItem]: + seen_titles: set[str] = set() + items: list[NewsItem] = [] + + for subreddit in _SUBREDDITS: + try: + posts = await self._fetch_subreddit(subreddit) + except Exception as exc: + logger.error("Reddit collector error for r/%s: %s", subreddit, exc) + continue + + for post in posts: + post_data = post.get("data", {}) + title = post_data.get("title", "").strip() + score = post_data.get("score", 0) + + if not title or score < _MIN_SCORE: + continue + if title in seen_titles: + continue + seen_titles.add(title) + + selftext = post_data.get("selftext", "") or "" + combined = f"{title} {selftext}" + + sentiment = self._vader.polarity_scores(combined)["compound"] + symbols = list(dict.fromkeys(_TICKER_PATTERN.findall(combined))) + + created_utc = post_data.get("created_utc", 0) + published_at = datetime.fromtimestamp(created_utc, tz=timezone.utc) + + items.append( + NewsItem( + source=self.name, + headline=title, + summary=selftext or None, + url=post_data.get("url") or None, + published_at=published_at, + symbols=symbols, + sentiment=sentiment, + category=NewsCategory.SOCIAL, + raw_data=post_data, + ) + ) + + return items diff --git a/services/news-collector/src/news_collector/collectors/rss.py b/services/news-collector/src/news_collector/collectors/rss.py new file mode 100644 index 0000000..ddf8503 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/rss.py @@ -0,0 +1,105 @@ +"""RSS news collector using feedparser with VADER sentiment analysis.""" + +import asyncio +import logging +import re +from datetime import datetime, timezone +from time import mktime + +import feedparser +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem + +from .base import BaseCollector + +logger = logging.getLogger(__name__) + +_DEFAULT_FEEDS = [ + "https://finance.yahoo.com/news/rssindex", + "https://news.google.com/rss/search?q=stock+market+finance&hl=en-US&gl=US&ceid=US:en", + "https://feeds.marketwatch.com/marketwatch/topstories/", +] + +_TICKER_PATTERN = re.compile( + r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|BRK\.?[AB]|JPM|V|UNH|XOM|" + r"JNJ|WMT|MA|PG|HD|CVX|MRK|LLY|ABBV|PFE|BAC|KO|AVGO|COST|MCD|TMO|" + r"CSCO|ACN|ABT|DHR|TXN|NEE|NFLX|PM|UPS|RTX|HON|QCOM|AMGN|LOW|IBM|" + r"INTC|AMD|PYPL|GS|MS|BLK|SPGI|CAT|DE|GE|MMM|BA|F|GM|DIS|CMCSA)\b" +) + + +class RSSCollector(BaseCollector): + name: str = "rss" + poll_interval: int = 600 + + def __init__(self, feeds: list[str] | None = None) -> None: + self._feeds = feeds if feeds is not None else _DEFAULT_FEEDS + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_feeds(self) -> list[dict]: + loop = asyncio.get_event_loop() + results = [] + for url in self._feeds: + try: + parsed = await loop.run_in_executor(None, feedparser.parse, url) + results.append(parsed) + except Exception as exc: + logger.error("RSS fetch failed for %s: %s", url, exc) + return results + + def _parse_published(self, entry: dict) -> datetime: + parsed_time = entry.get("published_parsed") + if parsed_time: + try: + ts = mktime(parsed_time) + return datetime.fromtimestamp(ts, tz=timezone.utc) + except Exception: + pass + return datetime.now(timezone.utc) + + async def collect(self) -> list[NewsItem]: + try: + feeds = await self._fetch_feeds() + except Exception as exc: + logger.error("RSS collector error: %s", exc) + return [] + + seen_titles: set[str] = set() + items: list[NewsItem] = [] + + for feed in feeds: + for entry in feed.get("entries", []): + title = entry.get("title", "").strip() + if not title or title in seen_titles: + continue + seen_titles.add(title) + + summary = entry.get("summary", "") or "" + combined = f"{title} {summary}" + + sentiment_scores = self._vader.polarity_scores(combined) + sentiment = sentiment_scores["compound"] + + symbols = list(dict.fromkeys(_TICKER_PATTERN.findall(combined))) + + published_at = self._parse_published(entry) + + items.append( + NewsItem( + source=self.name, + headline=title, + summary=summary or None, + url=entry.get("link") or None, + published_at=published_at, + symbols=symbols, + sentiment=sentiment, + category=NewsCategory.MACRO, + raw_data=dict(entry), + ) + ) + + return items diff --git a/services/news-collector/src/news_collector/collectors/sec_edgar.py b/services/news-collector/src/news_collector/collectors/sec_edgar.py new file mode 100644 index 0000000..ca1d070 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/sec_edgar.py @@ -0,0 +1,100 @@ +"""SEC EDGAR filing collector (free, no API key required).""" + +import logging +from datetime import datetime, timezone + +import aiohttp +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem +from news_collector.collectors.base import BaseCollector + +logger = logging.getLogger(__name__) + +TRACKED_CIKS = { + "0000320193": "AAPL", + "0000789019": "MSFT", + "0001652044": "GOOGL", + "0001018724": "AMZN", + "0001318605": "TSLA", + "0001045810": "NVDA", + "0001326801": "META", + "0000019617": "JPM", + "0000078003": "PFE", + "0000021344": "KO", +} + +SEC_USER_AGENT = "TradingPlatform research@example.com" + + +class SecEdgarCollector(BaseCollector): + name = "sec_edgar" + poll_interval = 1800 # 30 minutes + + def __init__(self) -> None: + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_recent_filings(self) -> list[dict]: + results = [] + headers = {"User-Agent": SEC_USER_AGENT} + async with aiohttp.ClientSession() as session: + for cik, ticker in TRACKED_CIKS.items(): + try: + url = f"https://data.sec.gov/submissions/CIK{cik}.json" + async with session.get( + url, headers=headers, timeout=aiohttp.ClientTimeout(total=10) + ) as resp: + if resp.status == 200: + data = await resp.json() + data["tickers"] = [{"ticker": ticker}] + results.append(data) + except Exception as exc: + logger.warning("sec_fetch_failed", cik=cik, error=str(exc)) + return results + + async def collect(self) -> list[NewsItem]: + filings_data = await self._fetch_recent_filings() + items = [] + today = datetime.now(timezone.utc).strftime("%Y-%m-%d") + + for company_data in filings_data: + tickers = [t["ticker"] for t in company_data.get("tickers", [])] + company_name = company_data.get("name", "Unknown") + recent = company_data.get("filings", {}).get("recent", {}) + + forms = recent.get("form", []) + dates = recent.get("filingDate", []) + descriptions = recent.get("primaryDocDescription", []) + accessions = recent.get("accessionNumber", []) + + for i, form in enumerate(forms): + if form != "8-K": + continue + filing_date = dates[i] if i < len(dates) else "" + if filing_date != today: + continue + + desc = descriptions[i] if i < len(descriptions) else "8-K Filing" + accession = accessions[i] if i < len(accessions) else "" + headline = f"{company_name} ({', '.join(tickers)}): {form} - {desc}" + + items.append( + NewsItem( + source=self.name, + headline=headline, + summary=desc, + url=f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&accession={accession}", + published_at=datetime.strptime(filing_date, "%Y-%m-%d").replace( + tzinfo=timezone.utc + ), + symbols=tickers, + sentiment=self._vader.polarity_scores(headline)["compound"], + category=NewsCategory.FILING, + raw_data={"form": form, "accession": accession}, + ) + ) + + return items diff --git a/services/news-collector/src/news_collector/collectors/truth_social.py b/services/news-collector/src/news_collector/collectors/truth_social.py new file mode 100644 index 0000000..33ebc86 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/truth_social.py @@ -0,0 +1,86 @@ +"""Truth Social collector using Mastodon-compatible API with VADER sentiment analysis.""" + +import logging +import re +from datetime import datetime, timezone + +import aiohttp +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem + +from .base import BaseCollector + +logger = logging.getLogger(__name__) + +_TRUMP_ACCOUNT_ID = "107780257626128497" +_API_URL = f"https://truthsocial.com/api/v1/accounts/{_TRUMP_ACCOUNT_ID}/statuses" + +_HTML_TAG_PATTERN = re.compile(r"<[^>]+>") + + +def _strip_html(text: str) -> str: + return _HTML_TAG_PATTERN.sub("", text).strip() + + +class TruthSocialCollector(BaseCollector): + name: str = "truth_social" + poll_interval: int = 900 + + def __init__(self) -> None: + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_posts(self) -> list[dict]: + headers = {"User-Agent": "TradingPlatform/1.0 (research@example.com)"} + try: + async with aiohttp.ClientSession() as session: + async with session.get( + _API_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=10) + ) as resp: + if resp.status == 200: + return await resp.json() + except Exception as exc: + logger.error("Truth Social fetch failed: %s", exc) + return [] + + async def collect(self) -> list[NewsItem]: + try: + posts = await self._fetch_posts() + except Exception as exc: + logger.error("Truth Social collector error: %s", exc) + return [] + + items: list[NewsItem] = [] + + for post in posts: + raw_content = post.get("content", "") or "" + content = _strip_html(raw_content) + if not content: + continue + + sentiment = self._vader.polarity_scores(content)["compound"] + + created_at_str = post.get("created_at", "") + try: + published_at = datetime.fromisoformat(created_at_str.replace("Z", "+00:00")) + except Exception: + published_at = datetime.now(timezone.utc) + + items.append( + NewsItem( + source=self.name, + headline=content[:200], + summary=content if len(content) > 200 else None, + url=post.get("url") or None, + published_at=published_at, + symbols=[], + sentiment=sentiment, + category=NewsCategory.POLICY, + raw_data=post, + ) + ) + + return items diff --git a/services/news-collector/src/news_collector/config.py b/services/news-collector/src/news_collector/config.py new file mode 100644 index 0000000..70d98f1 --- /dev/null +++ b/services/news-collector/src/news_collector/config.py @@ -0,0 +1,10 @@ +"""News Collector configuration.""" + +from shared.config import Settings + + +class NewsCollectorConfig(Settings): + health_port: int = 8084 + finnhub_api_key: str = "" + news_poll_interval: int = 300 + sentiment_aggregate_interval: int = 900 diff --git a/services/news-collector/src/news_collector/main.py b/services/news-collector/src/news_collector/main.py new file mode 100644 index 0000000..3493f7c --- /dev/null +++ b/services/news-collector/src/news_collector/main.py @@ -0,0 +1,193 @@ +"""News Collector Service — fetches news from multiple sources and aggregates sentiment.""" + +import asyncio +from datetime import datetime, timezone + +from shared.broker import RedisBroker +from shared.db import Database +from shared.events import NewsEvent +from shared.healthcheck import HealthCheckServer +from shared.logging import setup_logging +from shared.metrics import ServiceMetrics +from shared.models import NewsItem +from shared.notifier import TelegramNotifier +from shared.sentiment_models import MarketSentiment +from shared.sentiment import SentimentAggregator + +from news_collector.config import NewsCollectorConfig +from news_collector.collectors.finnhub import FinnhubCollector +from news_collector.collectors.rss import RSSCollector +from news_collector.collectors.sec_edgar import SecEdgarCollector +from news_collector.collectors.truth_social import TruthSocialCollector +from news_collector.collectors.reddit import RedditCollector +from news_collector.collectors.fear_greed import FearGreedCollector +from news_collector.collectors.fed import FedCollector + +# Health check port: base + 4 +HEALTH_PORT_OFFSET = 4 + + +async def run_collector_once(collector, db: Database, broker: RedisBroker) -> int: + """Run a single collector, store results in DB, publish to Redis. + + Returns the number of items collected. + """ + items: list[NewsItem] = await collector.collect() + count = 0 + for item in items: + await db.insert_news_item(item) + event = NewsEvent(data=item) + stream = f"news.{item.category.value}" + await broker.publish(stream, event.to_dict()) + count += 1 + return count + + +async def run_collector_loop(collector, db: Database, broker: RedisBroker, log) -> None: + """Run a collector repeatedly on its configured poll_interval.""" + while True: + try: + count = await run_collector_once(collector, db, broker) + log.info( + "collector_ran", + collector=collector.name, + count=count, + ) + except Exception as exc: + log.warning( + "collector_error", + collector=collector.name, + error=str(exc), + ) + await asyncio.sleep(collector.poll_interval) + + +async def run_fear_greed_loop(collector: FearGreedCollector, db: Database, log) -> None: + """Fetch Fear & Greed index on its interval and update MarketSentiment in DB.""" + while True: + try: + result = await collector.collect() + if result is not None: + ms = MarketSentiment( + fear_greed=result.fear_greed, + fear_greed_label=result.fear_greed_label, + vix=None, + fed_stance="neutral", + market_regime=_determine_regime(result.fear_greed, None), + updated_at=datetime.now(timezone.utc), + ) + await db.upsert_market_sentiment(ms) + log.info( + "fear_greed_updated", + value=result.fear_greed, + label=result.fear_greed_label, + ) + except Exception as exc: + log.warning("fear_greed_error", error=str(exc)) + await asyncio.sleep(collector.poll_interval) + + +async def run_aggregator_loop(db: Database, interval: int, log) -> None: + """Run SentimentAggregator every interval seconds and persist scores.""" + aggregator = SentimentAggregator() + while True: + await asyncio.sleep(interval) + try: + now = datetime.now(timezone.utc) + news_items = await db.get_recent_news(hours=24) + scores = aggregator.aggregate(news_items, now) + for score in scores.values(): + await db.upsert_symbol_score(score) + log.info("aggregation_complete", symbols=len(scores)) + except Exception as exc: + log.warning("aggregator_error", error=str(exc)) + + +def _determine_regime(fear_greed: int, vix: float | None) -> str: + """Classify market regime from fear/greed index and optional VIX.""" + aggregator = SentimentAggregator() + return aggregator.determine_regime(fear_greed, vix) + + +async def run() -> None: + config = NewsCollectorConfig() + log = setup_logging("news-collector", config.log_level, config.log_format) + metrics = ServiceMetrics("news_collector") + + notifier = TelegramNotifier( + bot_token=config.telegram_bot_token, + chat_id=config.telegram_chat_id, + ) + + db = Database(config.database_url) + await db.connect() + + broker = RedisBroker(config.redis_url) + + health = HealthCheckServer( + "news-collector", + port=config.health_port, + auth_token=config.metrics_auth_token, + ) + await health.start() + metrics.service_up.labels(service="news-collector").set(1) + + # Build collectors + finnhub = FinnhubCollector(api_key=config.finnhub_api_key) + rss = RSSCollector() + sec = SecEdgarCollector() + truth = TruthSocialCollector() + reddit = RedditCollector() + fear_greed = FearGreedCollector() + fed = FedCollector() + + news_collectors = [finnhub, rss, sec, truth, reddit, fed] + + log.info( + "starting", + collectors=[c.name for c in news_collectors], + poll_interval=config.news_poll_interval, + aggregate_interval=config.sentiment_aggregate_interval, + ) + + try: + tasks = [] + for collector in news_collectors: + tasks.append( + asyncio.create_task( + run_collector_loop(collector, db, broker, log), + name=f"collector-{collector.name}", + ) + ) + tasks.append( + asyncio.create_task( + run_fear_greed_loop(fear_greed, db, log), + name="fear-greed-loop", + ) + ) + tasks.append( + asyncio.create_task( + run_aggregator_loop(db, config.sentiment_aggregate_interval, log), + name="aggregator-loop", + ) + ) + await asyncio.gather(*tasks) + except Exception as exc: + log.error("fatal_error", error=str(exc)) + await notifier.send_error(str(exc), "news-collector") + raise + finally: + metrics.service_up.labels(service="news-collector").set(0) + for task in tasks: + task.cancel() + await notifier.close() + await broker.close() + await db.close() + + +def main() -> None: + asyncio.run(run()) + + +if __name__ == "__main__": + main() diff --git a/services/news-collector/tests/__init__.py b/services/news-collector/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/services/news-collector/tests/__init__.py diff --git a/services/news-collector/tests/test_fear_greed.py b/services/news-collector/tests/test_fear_greed.py new file mode 100644 index 0000000..d483aa6 --- /dev/null +++ b/services/news-collector/tests/test_fear_greed.py @@ -0,0 +1,49 @@ +"""Tests for CNN Fear & Greed Index collector.""" + +import pytest +from unittest.mock import AsyncMock, patch + +from news_collector.collectors.fear_greed import FearGreedCollector + + +@pytest.fixture +def collector(): + return FearGreedCollector() + + +def test_collector_name(collector): + assert collector.name == "fear_greed" + assert collector.poll_interval == 3600 + + +async def test_is_available(collector): + assert await collector.is_available() is True + + +async def test_collect_parses_api_response(collector): + mock_data = { + "fear_and_greed": { + "score": 45.0, + "rating": "Fear", + "timestamp": "2026-04-02T12:00:00+00:00", + } + } + with patch.object(collector, "_fetch_index", new_callable=AsyncMock, return_value=mock_data): + result = await collector.collect() + assert result.fear_greed == 45 + assert result.fear_greed_label == "Fear" + + +async def test_collect_returns_none_on_failure(collector): + with patch.object(collector, "_fetch_index", new_callable=AsyncMock, return_value=None): + result = await collector.collect() + assert result is None + + +def test_classify_label(): + c = FearGreedCollector() + assert c._classify(10) == "Extreme Fear" + assert c._classify(30) == "Fear" + assert c._classify(50) == "Neutral" + assert c._classify(70) == "Greed" + assert c._classify(85) == "Extreme Greed" diff --git a/services/news-collector/tests/test_fed.py b/services/news-collector/tests/test_fed.py new file mode 100644 index 0000000..d1a736b --- /dev/null +++ b/services/news-collector/tests/test_fed.py @@ -0,0 +1,37 @@ +"""Tests for Federal Reserve collector.""" + +import pytest +from unittest.mock import AsyncMock, patch +from news_collector.collectors.fed import FedCollector + + +@pytest.fixture +def collector(): + return FedCollector() + + +def test_collector_name(collector): + assert collector.name == "fed" + assert collector.poll_interval == 3600 + + +async def test_is_available(collector): + assert await collector.is_available() is True + + +async def test_collect_parses_rss(collector): + mock_entries = [ + { + "title": "Federal Reserve issues FOMC statement", + "link": "https://www.federalreserve.gov/newsevents/pressreleases/monetary20260402a.htm", + "published_parsed": (2026, 4, 2, 14, 0, 0, 0, 0, 0), + "summary": "The Federal Open Market Committee decided to maintain the target range...", + }, + ] + with patch.object( + collector, "_fetch_fed_rss", new_callable=AsyncMock, return_value=mock_entries + ): + items = await collector.collect() + assert len(items) == 1 + assert items[0].source == "fed" + assert items[0].category.value == "fed" diff --git a/services/news-collector/tests/test_finnhub.py b/services/news-collector/tests/test_finnhub.py new file mode 100644 index 0000000..a4cf169 --- /dev/null +++ b/services/news-collector/tests/test_finnhub.py @@ -0,0 +1,67 @@ +"""Tests for Finnhub news collector.""" + +import pytest +from unittest.mock import AsyncMock, patch + +from news_collector.collectors.finnhub import FinnhubCollector + + +@pytest.fixture +def collector(): + return FinnhubCollector(api_key="test_key") + + +def test_collector_name(collector): + assert collector.name == "finnhub" + assert collector.poll_interval == 300 + + +async def test_is_available_with_key(collector): + assert await collector.is_available() is True + + +async def test_is_available_without_key(): + c = FinnhubCollector(api_key="") + assert await c.is_available() is False + + +async def test_collect_parses_response(collector): + mock_response = [ + { + "category": "top news", + "datetime": 1711929600, + "headline": "AAPL beats earnings", + "id": 12345, + "related": "AAPL", + "source": "MarketWatch", + "summary": "Apple reported better than expected...", + "url": "https://example.com/article", + }, + { + "category": "top news", + "datetime": 1711929000, + "headline": "Fed holds rates steady", + "id": 12346, + "related": "", + "source": "Reuters", + "summary": "The Federal Reserve...", + "url": "https://example.com/fed", + }, + ] + + with patch.object(collector, "_fetch_news", new_callable=AsyncMock, return_value=mock_response): + items = await collector.collect() + + assert len(items) == 2 + assert items[0].source == "finnhub" + assert items[0].headline == "AAPL beats earnings" + assert items[0].symbols == ["AAPL"] + assert items[0].url == "https://example.com/article" + assert isinstance(items[0].sentiment, float) + assert items[1].symbols == [] + + +async def test_collect_handles_empty_response(collector): + with patch.object(collector, "_fetch_news", new_callable=AsyncMock, return_value=[]): + items = await collector.collect() + assert items == [] diff --git a/services/news-collector/tests/test_main.py b/services/news-collector/tests/test_main.py new file mode 100644 index 0000000..66190dc --- /dev/null +++ b/services/news-collector/tests/test_main.py @@ -0,0 +1,39 @@ +"""Tests for news collector scheduler.""" + +from unittest.mock import AsyncMock, MagicMock +from datetime import datetime, timezone +from shared.models import NewsCategory, NewsItem +from news_collector.main import run_collector_once + + +async def test_run_collector_once_stores_and_publishes(): + mock_item = NewsItem( + source="test", + headline="Test news", + published_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + sentiment=0.5, + category=NewsCategory.MACRO, + ) + mock_collector = MagicMock() + mock_collector.name = "test" + mock_collector.collect = AsyncMock(return_value=[mock_item]) + mock_db = MagicMock() + mock_db.insert_news_item = AsyncMock() + mock_broker = MagicMock() + mock_broker.publish = AsyncMock() + + count = await run_collector_once(mock_collector, mock_db, mock_broker) + assert count == 1 + mock_db.insert_news_item.assert_called_once_with(mock_item) + mock_broker.publish.assert_called_once() + + +async def test_run_collector_once_handles_empty(): + mock_collector = MagicMock() + mock_collector.name = "test" + mock_collector.collect = AsyncMock(return_value=[]) + mock_db = MagicMock() + mock_broker = MagicMock() + + count = await run_collector_once(mock_collector, mock_db, mock_broker) + assert count == 0 diff --git a/services/news-collector/tests/test_reddit.py b/services/news-collector/tests/test_reddit.py new file mode 100644 index 0000000..440b173 --- /dev/null +++ b/services/news-collector/tests/test_reddit.py @@ -0,0 +1,63 @@ +"""Tests for Reddit collector.""" + +import pytest +from unittest.mock import AsyncMock, patch +from news_collector.collectors.reddit import RedditCollector + + +@pytest.fixture +def collector(): + return RedditCollector() + + +def test_collector_name(collector): + assert collector.name == "reddit" + assert collector.poll_interval == 900 + + +async def test_is_available(collector): + assert await collector.is_available() is True + + +async def test_collect_parses_posts(collector): + mock_posts = [ + { + "data": { + "title": "NVDA to the moon! AI demand is insane", + "selftext": "Just loaded up on NVDA calls", + "url": "https://reddit.com/r/wallstreetbets/123", + "created_utc": 1711929600, + "score": 500, + "num_comments": 200, + "subreddit": "wallstreetbets", + } + }, + ] + with patch.object( + collector, "_fetch_subreddit", new_callable=AsyncMock, return_value=mock_posts + ): + items = await collector.collect() + assert len(items) >= 1 + assert items[0].source == "reddit" + assert items[0].category.value == "social" + + +async def test_collect_filters_low_score(collector): + mock_posts = [ + { + "data": { + "title": "Random question", + "selftext": "", + "url": "https://reddit.com/456", + "created_utc": 1711929600, + "score": 3, + "num_comments": 1, + "subreddit": "stocks", + } + }, + ] + with patch.object( + collector, "_fetch_subreddit", new_callable=AsyncMock, return_value=mock_posts + ): + items = await collector.collect() + assert items == [] diff --git a/services/news-collector/tests/test_rss.py b/services/news-collector/tests/test_rss.py new file mode 100644 index 0000000..e03250a --- /dev/null +++ b/services/news-collector/tests/test_rss.py @@ -0,0 +1,47 @@ +"""Tests for RSS news collector.""" + +import pytest +from unittest.mock import AsyncMock, patch + +from news_collector.collectors.rss import RSSCollector + + +@pytest.fixture +def collector(): + return RSSCollector() + + +def test_collector_name(collector): + assert collector.name == "rss" + assert collector.poll_interval == 600 + + +async def test_is_available(collector): + assert await collector.is_available() is True + + +async def test_collect_parses_feed(collector): + mock_feed = { + "entries": [ + { + "title": "NVDA surges on AI demand", + "link": "https://example.com/nvda", + "published_parsed": (2026, 4, 2, 12, 0, 0, 0, 0, 0), + "summary": "Nvidia stock jumped 5%...", + }, + { + "title": "Markets rally on jobs data", + "link": "https://example.com/market", + "published_parsed": (2026, 4, 2, 11, 0, 0, 0, 0, 0), + "summary": "The S&P 500 rose...", + }, + ], + } + + with patch.object(collector, "_fetch_feeds", new_callable=AsyncMock, return_value=[mock_feed]): + items = await collector.collect() + + assert len(items) == 2 + assert items[0].source == "rss" + assert items[0].headline == "NVDA surges on AI demand" + assert isinstance(items[0].sentiment, float) diff --git a/services/news-collector/tests/test_sec_edgar.py b/services/news-collector/tests/test_sec_edgar.py new file mode 100644 index 0000000..5d4f69f --- /dev/null +++ b/services/news-collector/tests/test_sec_edgar.py @@ -0,0 +1,58 @@ +"""Tests for SEC EDGAR filing collector.""" + +import pytest +from datetime import datetime, timezone +from unittest.mock import AsyncMock, patch, MagicMock + +from news_collector.collectors.sec_edgar import SecEdgarCollector + + +@pytest.fixture +def collector(): + return SecEdgarCollector() + + +def test_collector_name(collector): + assert collector.name == "sec_edgar" + assert collector.poll_interval == 1800 + + +async def test_is_available(collector): + assert await collector.is_available() is True + + +async def test_collect_parses_filings(collector): + mock_response = { + "filings": { + "recent": { + "accessionNumber": ["0001234-26-000001"], + "filingDate": ["2026-04-02"], + "primaryDocument": ["filing.htm"], + "form": ["8-K"], + "primaryDocDescription": ["Current Report"], + } + }, + "tickers": [{"ticker": "AAPL"}], + "name": "Apple Inc", + } + + mock_datetime = MagicMock(spec=datetime) + mock_datetime.now.return_value = datetime(2026, 4, 2, tzinfo=timezone.utc) + mock_datetime.strptime = datetime.strptime + + with patch.object( + collector, "_fetch_recent_filings", new_callable=AsyncMock, return_value=[mock_response] + ): + with patch("news_collector.collectors.sec_edgar.datetime", mock_datetime): + items = await collector.collect() + + assert len(items) == 1 + assert items[0].source == "sec_edgar" + assert items[0].category.value == "filing" + assert "AAPL" in items[0].symbols + + +async def test_collect_handles_empty(collector): + with patch.object(collector, "_fetch_recent_filings", new_callable=AsyncMock, return_value=[]): + items = await collector.collect() + assert items == [] diff --git a/services/news-collector/tests/test_truth_social.py b/services/news-collector/tests/test_truth_social.py new file mode 100644 index 0000000..91ddb9d --- /dev/null +++ b/services/news-collector/tests/test_truth_social.py @@ -0,0 +1,41 @@ +"""Tests for Truth Social collector.""" + +import pytest +from unittest.mock import AsyncMock, patch +from news_collector.collectors.truth_social import TruthSocialCollector + + +@pytest.fixture +def collector(): + return TruthSocialCollector() + + +def test_collector_name(collector): + assert collector.name == "truth_social" + assert collector.poll_interval == 900 + + +async def test_is_available(collector): + assert await collector.is_available() is True + + +async def test_collect_parses_posts(collector): + mock_posts = [ + { + "content": "<p>We are imposing 25% tariffs on all steel imports!</p>", + "created_at": "2026-04-02T12:00:00.000Z", + "url": "https://truthsocial.com/@realDonaldTrump/12345", + "id": "12345", + }, + ] + with patch.object(collector, "_fetch_posts", new_callable=AsyncMock, return_value=mock_posts): + items = await collector.collect() + assert len(items) == 1 + assert items[0].source == "truth_social" + assert items[0].category.value == "policy" + + +async def test_collect_handles_empty(collector): + with patch.object(collector, "_fetch_posts", new_callable=AsyncMock, return_value=[]): + items = await collector.collect() + assert items == [] diff --git a/services/strategy-engine/src/strategy_engine/config.py b/services/strategy-engine/src/strategy_engine/config.py index 9fd9c49..15f8588 100644 --- a/services/strategy-engine/src/strategy_engine/config.py +++ b/services/strategy-engine/src/strategy_engine/config.py @@ -7,3 +7,7 @@ class StrategyConfig(Settings): symbols: list[str] = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA"] timeframes: list[str] = ["1m"] strategy_params: dict = {} + selector_final_time: str = "15:30" + selector_max_picks: int = 3 + anthropic_api_key: str = "" + anthropic_model: str = "claude-sonnet-4-20250514" diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py index 30de528..5a30766 100644 --- a/services/strategy-engine/src/strategy_engine/main.py +++ b/services/strategy-engine/src/strategy_engine/main.py @@ -1,17 +1,23 @@ """Strategy Engine Service entry point.""" import asyncio +from datetime import datetime from pathlib import Path +import zoneinfo +from shared.alpaca import AlpacaClient from shared.broker import RedisBroker +from shared.db import Database from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier +from shared.sentiment_models import MarketSentiment from strategy_engine.config import StrategyConfig from strategy_engine.engine import StrategyEngine from strategy_engine.plugin_loader import load_strategies +from strategy_engine.stock_selector import StockSelector # The strategies directory lives alongside the installed package STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies" @@ -30,6 +36,40 @@ async def process_symbol(engine: StrategyEngine, stream: str, log) -> None: last_id = await engine.process_once(stream, last_id) +async def run_stock_selector( + selector: StockSelector, + notifier: TelegramNotifier, + db: Database, + config: StrategyConfig, + log, +) -> None: + """Run the stock selector once per day at the configured time.""" + et = zoneinfo.ZoneInfo("America/New_York") + + while True: + now_et = datetime.now(et) + target_hour, target_min = map(int, config.selector_final_time.split(":")) + + if now_et.hour == target_hour and now_et.minute == target_min: + log.info("stock_selector_running") + try: + selections = await selector.select() + if selections: + ms_data = await db.get_latest_market_sentiment() + ms = None + if ms_data: + ms = MarketSentiment(**ms_data) + await notifier.send_stock_selection(selections, ms) + log.info("stock_selector_complete", picks=[s.symbol for s in selections]) + else: + log.info("stock_selector_no_picks") + except Exception as exc: + log.error("stock_selector_error", error=str(exc)) + await asyncio.sleep(120) # Sleep past this minute + else: + await asyncio.sleep(30) + + async def run() -> None: config = StrategyConfig() log = setup_logging("strategy-engine", config.log_level, config.log_format) @@ -41,6 +81,16 @@ async def run() -> None: ) broker = RedisBroker(config.redis_url) + + db = Database(config.database_url) + await db.connect() + + alpaca = AlpacaClient( + api_key=config.alpaca_api_key, + api_secret=config.alpaca_api_secret, + paper=config.alpaca_paper, + ) + strategies = load_strategies(STRATEGIES_DIR) for strategy in strategies: @@ -67,6 +117,20 @@ async def run() -> None: task = asyncio.create_task(process_symbol(engine, stream, log)) tasks.append(task) + if config.anthropic_api_key: + selector = StockSelector( + db=db, + broker=broker, + alpaca=alpaca, + anthropic_api_key=config.anthropic_api_key, + anthropic_model=config.anthropic_model, + max_picks=config.selector_max_picks, + ) + tasks.append( + asyncio.create_task(run_stock_selector(selector, notifier, db, config, log)) + ) + log.info("stock_selector_enabled", time=config.selector_final_time) + await asyncio.gather(*tasks) except Exception as exc: log.error("fatal_error", error=str(exc)) @@ -78,6 +142,8 @@ async def run() -> None: metrics.service_up.labels(service="strategy-engine").set(0) await notifier.close() await broker.close() + await alpaca.close() + await db.close() def main() -> None: diff --git a/services/strategy-engine/src/strategy_engine/stock_selector.py b/services/strategy-engine/src/strategy_engine/stock_selector.py new file mode 100644 index 0000000..268d557 --- /dev/null +++ b/services/strategy-engine/src/strategy_engine/stock_selector.py @@ -0,0 +1,404 @@ +"""3-stage stock selector engine: sentiment → technical → LLM.""" + +import json +import logging +import re +from datetime import datetime, timezone + +import aiohttp + +from shared.alpaca import AlpacaClient +from shared.broker import RedisBroker +from shared.db import Database +from shared.models import OrderSide +from shared.sentiment_models import Candidate, MarketSentiment, SelectedStock + +logger = logging.getLogger(__name__) + +ANTHROPIC_API_URL = "https://api.anthropic.com/v1/messages" + + +def _parse_llm_selections(text: str) -> list[SelectedStock]: + """Parse LLM response into SelectedStock list. + + Handles both bare JSON arrays and markdown code blocks. + Returns empty list on any parse error. + """ + # Try to extract JSON from markdown code block first + code_block = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL) + if code_block: + raw = code_block.group(1) + else: + # Try to find a bare JSON array + array_match = re.search(r"\[.*\]", text, re.DOTALL) + if array_match: + raw = array_match.group(0) + else: + raw = text.strip() + + try: + data = json.loads(raw) + if not isinstance(data, list): + return [] + selections = [] + for item in data: + if not isinstance(item, dict): + continue + try: + selection = SelectedStock( + symbol=item["symbol"], + side=OrderSide(item["side"]), + conviction=float(item["conviction"]), + reason=item.get("reason", ""), + key_news=item.get("key_news", []), + ) + selections.append(selection) + except (KeyError, ValueError) as e: + logger.warning("Skipping invalid selection item: %s", e) + return selections + except (json.JSONDecodeError, TypeError): + return [] + + +class SentimentCandidateSource: + """Generates candidates from DB sentiment scores.""" + + def __init__(self, db: Database) -> None: + self._db = db + + async def get_candidates(self) -> list[Candidate]: + rows = await self._db.get_top_symbol_scores(limit=20) + candidates = [] + for row in rows: + composite = float(row.get("composite", 0)) + if composite == 0: + continue + candidates.append( + Candidate( + symbol=row["symbol"], + source="sentiment", + score=composite, + reason=f"composite={composite:.2f}, news_count={row.get('news_count', 0)}", + ) + ) + return candidates + + +class LLMCandidateSource: + """Generates candidates by asking Claude to analyze recent news.""" + + def __init__(self, db: Database, api_key: str, model: str) -> None: + self._db = db + self._api_key = api_key + self._model = model + + async def get_candidates(self) -> list[Candidate]: + news_items = await self._db.get_recent_news(hours=24) + if not news_items: + return [] + + headlines = [] + for item in news_items[:50]: # cap at 50 to stay within context + symbols = item.get("symbols", []) + sym_str = ", ".join(symbols) if symbols else "N/A" + headlines.append(f"[{sym_str}] {item['headline']}") + + prompt = ( + "You are a stock analyst. Given recent news headlines, identify the 5-10 most " + "actionable US stock tickers. Return ONLY a JSON array with objects having: " + "symbol (ticker), direction ('BUY' or 'SELL'), score (0-1), reason (brief).\n\n" + "Headlines:\n" + "\n".join(headlines) + ) + + try: + async with aiohttp.ClientSession() as session: + async with session.post( + ANTHROPIC_API_URL, + headers={ + "x-api-key": self._api_key, + "anthropic-version": "2023-06-01", + "content-type": "application/json", + }, + json={ + "model": self._model, + "max_tokens": 1024, + "messages": [{"role": "user", "content": prompt}], + }, + ) as resp: + if resp.status != 200: + body = await resp.text() + logger.error("LLM candidate source error %d: %s", resp.status, body) + return [] + data = await resp.json() + + content = data.get("content", []) + text = "" + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + text += block.get("text", "") + + return self._parse_candidates(text) + except Exception as e: + logger.error("LLMCandidateSource error: %s", e) + return [] + + def _parse_candidates(self, text: str) -> list[Candidate]: + code_block = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL) + if code_block: + raw = code_block.group(1) + else: + array_match = re.search(r"\[.*\]", text, re.DOTALL) + raw = array_match.group(0) if array_match else text.strip() + + try: + items = json.loads(raw) + if not isinstance(items, list): + return [] + candidates = [] + for item in items: + if not isinstance(item, dict): + continue + try: + direction_str = item.get("direction", "BUY") + direction = OrderSide(direction_str) + except ValueError: + direction = None + candidates.append( + Candidate( + symbol=item["symbol"], + source="llm", + direction=direction, + score=float(item.get("score", 0.5)), + reason=item.get("reason", ""), + ) + ) + return candidates + except (json.JSONDecodeError, TypeError, KeyError): + return [] + + +def _compute_rsi(closes: list[float], period: int = 14) -> float: + """Compute RSI for the last data point.""" + if len(closes) < period + 1: + return 50.0 # neutral if insufficient data + + deltas = [closes[i] - closes[i - 1] for i in range(1, len(closes))] + gains = [d if d > 0 else 0.0 for d in deltas] + losses = [-d if d < 0 else 0.0 for d in deltas] + + avg_gain = sum(gains[:period]) / period + avg_loss = sum(losses[:period]) / period + + for i in range(period, len(deltas)): + avg_gain = (avg_gain * (period - 1) + gains[i]) / period + avg_loss = (avg_loss * (period - 1) + losses[i]) / period + + if avg_loss == 0: + return 100.0 + rs = avg_gain / avg_loss + return 100.0 - (100.0 / (1.0 + rs)) + + +class StockSelector: + """Orchestrates the 3-stage stock selection pipeline.""" + + def __init__( + self, + db: Database, + broker: RedisBroker, + alpaca: AlpacaClient, + anthropic_api_key: str, + anthropic_model: str = "claude-sonnet-4-20250514", + max_picks: int = 3, + ) -> None: + self._db = db + self._broker = broker + self._alpaca = alpaca + self._api_key = anthropic_api_key + self._model = anthropic_model + self._max_picks = max_picks + + async def select(self) -> list[SelectedStock]: + """Run the full 3-stage pipeline and return selected stocks.""" + # Market gate: check sentiment + sentiment_data = await self._db.get_latest_market_sentiment() + if sentiment_data is None: + logger.warning("No market sentiment data; skipping selection") + return [] + + market_sentiment = MarketSentiment(**sentiment_data) + if market_sentiment.market_regime == "risk_off": + logger.info("Market is risk_off; skipping stock selection") + return [] + + # Stage 1: gather candidates from both sources + sentiment_source = SentimentCandidateSource(self._db) + llm_source = LLMCandidateSource(self._db, self._api_key, self._model) + + sentiment_candidates = await sentiment_source.get_candidates() + llm_candidates = await llm_source.get_candidates() + + candidates = self._merge_candidates(sentiment_candidates, llm_candidates) + if not candidates: + logger.info("No candidates found") + return [] + + # Stage 2: technical filter + filtered = await self._technical_filter(candidates) + if not filtered: + logger.info("All candidates filtered out by technical criteria") + return [] + + # Stage 3: LLM final selection + selections = await self._llm_final_select(filtered, market_sentiment) + + # Persist and publish + today = datetime.now(timezone.utc).date() + sentiment_snapshot = { + "fear_greed": market_sentiment.fear_greed, + "market_regime": market_sentiment.market_regime, + "vix": market_sentiment.vix, + } + for stock in selections: + try: + await self._db.insert_stock_selection( + trade_date=today, + symbol=stock.symbol, + side=stock.side.value, + conviction=stock.conviction, + reason=stock.reason, + key_news=stock.key_news, + sentiment_snapshot=sentiment_snapshot, + ) + except Exception as e: + logger.error("Failed to persist selection for %s: %s", stock.symbol, e) + + try: + await self._broker.publish( + "selected_stocks", + { + "symbol": stock.symbol, + "side": stock.side.value, + "conviction": stock.conviction, + "reason": stock.reason, + "key_news": stock.key_news, + "trade_date": str(today), + }, + ) + except Exception as e: + logger.error("Failed to publish selection for %s: %s", stock.symbol, e) + + return selections + + def _merge_candidates( + self, sentiment: list[Candidate], llm: list[Candidate] + ) -> list[Candidate]: + """Deduplicate candidates by symbol, keeping the higher score.""" + by_symbol: dict[str, Candidate] = {} + for c in sentiment + llm: + existing = by_symbol.get(c.symbol) + if existing is None or c.score > existing.score: + by_symbol[c.symbol] = c + return sorted(by_symbol.values(), key=lambda c: c.score, reverse=True) + + async def _technical_filter(self, candidates: list[Candidate]) -> list[Candidate]: + """Filter candidates using RSI, EMA20, and volume criteria.""" + passed = [] + for candidate in candidates: + try: + bars = await self._alpaca.get_bars(candidate.symbol, timeframe="1Day", limit=60) + if len(bars) < 21: + logger.debug("Insufficient bars for %s", candidate.symbol) + continue + + closes = [float(b["c"]) for b in bars] + volumes = [float(b["v"]) for b in bars] + + rsi = _compute_rsi(closes) + if not (30 <= rsi <= 70): + logger.debug("%s RSI=%.1f outside 30-70", candidate.symbol, rsi) + continue + + ema20 = sum(closes[-20:]) / 20 # simple approximation + current_price = closes[-1] + if current_price <= ema20: + logger.debug( + "%s price %.2f <= EMA20 %.2f", candidate.symbol, current_price, ema20 + ) + continue + + avg_volume = sum(volumes[:-1]) / max(len(volumes) - 1, 1) + current_volume = volumes[-1] + if current_volume <= 0.5 * avg_volume: + logger.debug( + "%s volume %.0f <= 50%% avg %.0f", + candidate.symbol, + current_volume, + avg_volume, + ) + continue + + passed.append(candidate) + except Exception as e: + logger.warning("Technical filter error for %s: %s", candidate.symbol, e) + + return passed + + async def _llm_final_select( + self, candidates: list[Candidate], market_sentiment: MarketSentiment + ) -> list[SelectedStock]: + """Ask Claude to pick 2-3 stocks with rationale.""" + candidate_lines = [ + f"- {c.symbol} (source={c.source}, score={c.score:.2f}, reason={c.reason})" + for c in candidates + ] + market_context = ( + f"Fear/Greed: {market_sentiment.fear_greed} ({market_sentiment.fear_greed_label}), " + f"VIX: {market_sentiment.vix}, " + f"Fed stance: {market_sentiment.fed_stance}, " + f"Regime: {market_sentiment.market_regime}" + ) + + prompt = ( + f"You are a portfolio manager. Select 2-3 stocks for today's session.\n\n" + f"Market context: {market_context}\n\n" + f"Candidates (already passed technical filters):\n" + + "\n".join(candidate_lines) + + "\n\n" + "Return ONLY a JSON array with objects having:\n" + " symbol, side ('BUY' or 'SELL'), conviction (0-1), reason (1-2 sentences), " + "key_news (list of 1-3 relevant headlines or facts)\n" + f"Select at most {self._max_picks} stocks." + ) + + try: + async with aiohttp.ClientSession() as session: + async with session.post( + ANTHROPIC_API_URL, + headers={ + "x-api-key": self._api_key, + "anthropic-version": "2023-06-01", + "content-type": "application/json", + }, + json={ + "model": self._model, + "max_tokens": 1024, + "messages": [{"role": "user", "content": prompt}], + }, + ) as resp: + if resp.status != 200: + body = await resp.text() + logger.error("LLM final select error %d: %s", resp.status, body) + return [] + data = await resp.json() + + content = data.get("content", []) + text = "" + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + text += block.get("text", "") + + return _parse_llm_selections(text)[: self._max_picks] + except Exception as e: + logger.error("LLM final select error: %s", e) + return [] diff --git a/services/strategy-engine/tests/conftest.py b/services/strategy-engine/tests/conftest.py index eb31b23..2b909ef 100644 --- a/services/strategy-engine/tests/conftest.py +++ b/services/strategy-engine/tests/conftest.py @@ -7,3 +7,8 @@ from pathlib import Path STRATEGIES_DIR = Path(__file__).parent.parent / "strategies" if str(STRATEGIES_DIR) not in sys.path: sys.path.insert(0, str(STRATEGIES_DIR.parent)) + +# Ensure the worktree's strategy_engine src is preferred over any installed version +WORKTREE_SRC = Path(__file__).parent.parent / "src" +if str(WORKTREE_SRC) not in sys.path: + sys.path.insert(0, str(WORKTREE_SRC)) diff --git a/services/strategy-engine/tests/test_stock_selector.py b/services/strategy-engine/tests/test_stock_selector.py new file mode 100644 index 0000000..ff9d09c --- /dev/null +++ b/services/strategy-engine/tests/test_stock_selector.py @@ -0,0 +1,80 @@ +"""Tests for stock selector engine.""" + +from unittest.mock import AsyncMock, MagicMock +from datetime import datetime, timezone + + +from strategy_engine.stock_selector import ( + SentimentCandidateSource, + StockSelector, + _parse_llm_selections, +) + + +async def test_sentiment_candidate_source(): + mock_db = MagicMock() + mock_db.get_top_symbol_scores = AsyncMock( + return_value=[ + {"symbol": "AAPL", "composite": 0.8, "news_count": 5}, + {"symbol": "NVDA", "composite": 0.6, "news_count": 3}, + ] + ) + + source = SentimentCandidateSource(mock_db) + candidates = await source.get_candidates() + + assert len(candidates) == 2 + assert candidates[0].symbol == "AAPL" + assert candidates[0].source == "sentiment" + + +def test_parse_llm_selections_valid(): + llm_response = """ + [ + {"symbol": "NVDA", "side": "BUY", "conviction": 0.85, "reason": "AI demand", "key_news": ["NVDA beats earnings"]}, + {"symbol": "XOM", "side": "BUY", "conviction": 0.72, "reason": "Oil surge", "key_news": ["Oil prices up"]} + ] + """ + selections = _parse_llm_selections(llm_response) + assert len(selections) == 2 + assert selections[0].symbol == "NVDA" + assert selections[0].conviction == 0.85 + + +def test_parse_llm_selections_invalid(): + selections = _parse_llm_selections("not json") + assert selections == [] + + +def test_parse_llm_selections_with_markdown(): + llm_response = """ + Here are my picks: + ```json + [ + {"symbol": "TSLA", "side": "BUY", "conviction": 0.7, "reason": "Momentum", "key_news": ["Tesla rally"]} + ] + ``` + """ + selections = _parse_llm_selections(llm_response) + assert len(selections) == 1 + assert selections[0].symbol == "TSLA" + + +async def test_selector_blocks_on_risk_off(): + mock_db = MagicMock() + mock_db.get_latest_market_sentiment = AsyncMock( + return_value={ + "fear_greed": 15, + "fear_greed_label": "Extreme Fear", + "vix": 35.0, + "fed_stance": "neutral", + "market_regime": "risk_off", + "updated_at": datetime.now(timezone.utc), + } + ) + + selector = StockSelector( + db=mock_db, broker=MagicMock(), alpaca=MagicMock(), anthropic_api_key="test" + ) + result = await selector.select() + assert result == [] diff --git a/shared/alembic/versions/002_news_sentiment_tables.py b/shared/alembic/versions/002_news_sentiment_tables.py new file mode 100644 index 0000000..402ff87 --- /dev/null +++ b/shared/alembic/versions/002_news_sentiment_tables.py @@ -0,0 +1,84 @@ +"""Add news, sentiment, and stock selection tables + +Revision ID: 002 +Revises: 001 +Create Date: 2026-04-02 +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +revision: str = "002" +down_revision: Union[str, None] = "001" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "news_items", + sa.Column("id", sa.Text, primary_key=True), + sa.Column("source", sa.Text, nullable=False), + sa.Column("headline", sa.Text, nullable=False), + sa.Column("summary", sa.Text), + sa.Column("url", sa.Text), + sa.Column("published_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("symbols", sa.Text), + sa.Column("sentiment", sa.Float, nullable=False), + sa.Column("category", sa.Text, nullable=False), + sa.Column("raw_data", sa.Text), + sa.Column( + "created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now() + ), + ) + op.create_index("idx_news_items_published", "news_items", ["published_at"]) + op.create_index("idx_news_items_source", "news_items", ["source"]) + + op.create_table( + "symbol_scores", + sa.Column("id", sa.Text, primary_key=True), + sa.Column("symbol", sa.Text, nullable=False, unique=True), + sa.Column("news_score", sa.Float, nullable=False, server_default="0"), + sa.Column("news_count", sa.Integer, nullable=False, server_default="0"), + sa.Column("social_score", sa.Float, nullable=False, server_default="0"), + sa.Column("policy_score", sa.Float, nullable=False, server_default="0"), + sa.Column("filing_score", sa.Float, nullable=False, server_default="0"), + sa.Column("composite", sa.Float, nullable=False, server_default="0"), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + ) + + op.create_table( + "market_sentiment", + sa.Column("id", sa.Text, primary_key=True), + sa.Column("fear_greed", sa.Integer, nullable=False), + sa.Column("fear_greed_label", sa.Text, nullable=False), + sa.Column("vix", sa.Float), + sa.Column("fed_stance", sa.Text, nullable=False, server_default="neutral"), + sa.Column("market_regime", sa.Text, nullable=False, server_default="neutral"), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + ) + + op.create_table( + "stock_selections", + sa.Column("id", sa.Text, primary_key=True), + sa.Column("trade_date", sa.Date, nullable=False), + sa.Column("symbol", sa.Text, nullable=False), + sa.Column("side", sa.Text, nullable=False), + sa.Column("conviction", sa.Float, nullable=False), + sa.Column("reason", sa.Text, nullable=False), + sa.Column("key_news", sa.Text), + sa.Column("sentiment_snapshot", sa.Text), + sa.Column( + "created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now() + ), + ) + op.create_index("idx_stock_selections_date", "stock_selections", ["trade_date"]) + + +def downgrade() -> None: + op.drop_table("stock_selections") + op.drop_table("market_sentiment") + op.drop_table("symbol_scores") + op.drop_table("news_items") diff --git a/shared/src/shared/config.py b/shared/src/shared/config.py index 4e8e7f1..b6ccebd 100644 --- a/shared/src/shared/config.py +++ b/shared/src/shared/config.py @@ -32,7 +32,15 @@ class Settings(BaseSettings): telegram_enabled: bool = False log_format: str = "json" health_port: int = 8080 - circuit_breaker_threshold: int = 5 - circuit_breaker_timeout: int = 60 metrics_auth_token: str = "" # If set, /health and /metrics require Bearer token + # News collector + finnhub_api_key: str = "" + news_poll_interval: int = 300 + sentiment_aggregate_interval: int = 900 + # Stock selector + selector_final_time: str = "15:30" + selector_max_picks: int = 3 + # LLM + anthropic_api_key: str = "" + anthropic_model: str = "claude-sonnet-4-20250514" model_config = {"env_file": ".env", "env_file_encoding": "utf-8", "extra": "ignore"} diff --git a/shared/src/shared/db.py b/shared/src/shared/db.py index 901e293..9cc8686 100644 --- a/shared/src/shared/db.py +++ b/shared/src/shared/db.py @@ -1,15 +1,28 @@ """Database layer using SQLAlchemy 2.0 async ORM for the trading platform.""" +import json +import uuid from contextlib import asynccontextmanager -from datetime import datetime, timedelta, timezone +from datetime import datetime, date, timedelta, timezone from decimal import Decimal from typing import Optional from sqlalchemy import select, update from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker -from shared.models import Candle, Signal, Order, OrderStatus -from shared.sa_models import Base, CandleRow, SignalRow, OrderRow, PortfolioSnapshotRow +from shared.models import Candle, Signal, Order, OrderStatus, NewsItem +from shared.sentiment_models import SymbolScore, MarketSentiment +from shared.sa_models import ( + Base, + CandleRow, + SignalRow, + OrderRow, + PortfolioSnapshotRow, + NewsItemRow, + SymbolScoreRow, + MarketSentimentRow, + StockSelectionRow, +) class Database: @@ -195,3 +208,229 @@ class Database: } for r in rows ] + + async def insert_news_item(self, item: NewsItem) -> None: + """Insert a NewsItem row, JSON-encoding symbols and raw_data.""" + row = NewsItemRow( + id=item.id, + source=item.source, + headline=item.headline, + summary=item.summary, + url=item.url, + published_at=item.published_at, + symbols=json.dumps(item.symbols), + sentiment=item.sentiment, + category=item.category.value, + raw_data=json.dumps(item.raw_data), + created_at=item.created_at, + ) + async with self._session_factory() as session: + try: + session.add(row) + await session.commit() + except Exception: + await session.rollback() + raise + + async def get_recent_news(self, hours: int = 24) -> list[dict]: + """Retrieve news items published in the last N hours.""" + since = datetime.now(timezone.utc) - timedelta(hours=hours) + stmt = ( + select(NewsItemRow) + .where(NewsItemRow.published_at >= since) + .order_by(NewsItemRow.published_at.desc()) + ) + async with self._session_factory() as session: + try: + result = await session.execute(stmt) + rows = result.scalars().all() + except Exception: + await session.rollback() + raise + return [ + { + "id": r.id, + "source": r.source, + "headline": r.headline, + "summary": r.summary, + "url": r.url, + "published_at": r.published_at, + "symbols": json.loads(r.symbols) if r.symbols else [], + "sentiment": r.sentiment, + "category": r.category, + "raw_data": json.loads(r.raw_data) if r.raw_data else {}, + "created_at": r.created_at, + } + for r in rows + ] + + async def upsert_symbol_score(self, score: SymbolScore) -> None: + """Insert or update a SymbolScore row, keyed by symbol.""" + async with self._session_factory() as session: + try: + stmt = select(SymbolScoreRow).where(SymbolScoreRow.symbol == score.symbol) + result = await session.execute(stmt) + existing = result.scalar_one_or_none() + if existing is not None: + existing.news_score = score.news_score + existing.news_count = score.news_count + existing.social_score = score.social_score + existing.policy_score = score.policy_score + existing.filing_score = score.filing_score + existing.composite = score.composite + existing.updated_at = score.updated_at + else: + row = SymbolScoreRow( + id=str(uuid.uuid4()), + symbol=score.symbol, + news_score=score.news_score, + news_count=score.news_count, + social_score=score.social_score, + policy_score=score.policy_score, + filing_score=score.filing_score, + composite=score.composite, + updated_at=score.updated_at, + ) + session.add(row) + await session.commit() + except Exception: + await session.rollback() + raise + + async def get_top_symbol_scores(self, limit: int = 20) -> list[dict]: + """Retrieve top symbol scores ordered by composite descending.""" + stmt = select(SymbolScoreRow).order_by(SymbolScoreRow.composite.desc()).limit(limit) + async with self._session_factory() as session: + try: + result = await session.execute(stmt) + rows = result.scalars().all() + except Exception: + await session.rollback() + raise + return [ + { + "id": r.id, + "symbol": r.symbol, + "news_score": r.news_score, + "news_count": r.news_count, + "social_score": r.social_score, + "policy_score": r.policy_score, + "filing_score": r.filing_score, + "composite": r.composite, + "updated_at": r.updated_at, + } + for r in rows + ] + + async def upsert_market_sentiment(self, ms: MarketSentiment) -> None: + """Insert or update the single 'latest' market sentiment row.""" + async with self._session_factory() as session: + try: + stmt = select(MarketSentimentRow).where(MarketSentimentRow.id == "latest") + result = await session.execute(stmt) + existing = result.scalar_one_or_none() + if existing is not None: + existing.fear_greed = ms.fear_greed + existing.fear_greed_label = ms.fear_greed_label + existing.vix = ms.vix + existing.fed_stance = ms.fed_stance + existing.market_regime = ms.market_regime + existing.updated_at = ms.updated_at + else: + row = MarketSentimentRow( + id="latest", + fear_greed=ms.fear_greed, + fear_greed_label=ms.fear_greed_label, + vix=ms.vix, + fed_stance=ms.fed_stance, + market_regime=ms.market_regime, + updated_at=ms.updated_at, + ) + session.add(row) + await session.commit() + except Exception: + await session.rollback() + raise + + async def get_latest_market_sentiment(self) -> Optional[dict]: + """Retrieve the 'latest' market sentiment row, or None if not found.""" + stmt = select(MarketSentimentRow).where(MarketSentimentRow.id == "latest") + async with self._session_factory() as session: + try: + result = await session.execute(stmt) + row = result.scalar_one_or_none() + except Exception: + await session.rollback() + raise + if row is None: + return None + return { + "id": row.id, + "fear_greed": row.fear_greed, + "fear_greed_label": row.fear_greed_label, + "vix": row.vix, + "fed_stance": row.fed_stance, + "market_regime": row.market_regime, + "updated_at": row.updated_at, + } + + async def insert_stock_selection( + self, + trade_date: date, + symbol: str, + side: str, + conviction: float, + reason: str, + key_news: list, + sentiment_snapshot: dict, + ) -> None: + """Insert a stock selection row with JSON-encoded lists/dicts.""" + row = StockSelectionRow( + id=str(uuid.uuid4()), + trade_date=trade_date, + symbol=symbol, + side=side, + conviction=conviction, + reason=reason, + key_news=json.dumps(key_news), + sentiment_snapshot=json.dumps(sentiment_snapshot), + created_at=datetime.now(timezone.utc), + ) + async with self._session_factory() as session: + try: + session.add(row) + await session.commit() + except Exception: + await session.rollback() + raise + + async def get_stock_selections(self, trade_date: date) -> list[dict]: + """Retrieve stock selections for a given trade date.""" + stmt = ( + select(StockSelectionRow) + .where(StockSelectionRow.trade_date == trade_date) + .order_by(StockSelectionRow.conviction.desc()) + ) + async with self._session_factory() as session: + try: + result = await session.execute(stmt) + rows = result.scalars().all() + except Exception: + await session.rollback() + raise + return [ + { + "id": r.id, + "trade_date": r.trade_date, + "symbol": r.symbol, + "side": r.side, + "conviction": r.conviction, + "reason": r.reason, + "key_news": json.loads(r.key_news) if r.key_news else [], + "sentiment_snapshot": json.loads(r.sentiment_snapshot) + if r.sentiment_snapshot + else {}, + "created_at": r.created_at, + } + for r in rows + ] diff --git a/shared/src/shared/events.py b/shared/src/shared/events.py index 72f8865..63f93a2 100644 --- a/shared/src/shared/events.py +++ b/shared/src/shared/events.py @@ -5,13 +5,14 @@ from typing import Any from pydantic import BaseModel -from shared.models import Candle, Signal, Order +from shared.models import Candle, Signal, Order, NewsItem class EventType(str, Enum): CANDLE = "CANDLE" SIGNAL = "SIGNAL" ORDER = "ORDER" + NEWS = "NEWS" class CandleEvent(BaseModel): @@ -59,10 +60,26 @@ class OrderEvent(BaseModel): return cls(type=raw["type"], data=Order(**raw["data"])) +class NewsEvent(BaseModel): + type: EventType = EventType.NEWS + data: NewsItem + + def to_dict(self) -> dict: + return { + "type": self.type, + "data": self.data.model_dump(mode="json"), + } + + @classmethod + def from_raw(cls, raw: dict) -> "NewsEvent": + return cls(type=raw["type"], data=NewsItem(**raw["data"])) + + _EVENT_TYPE_MAP = { EventType.CANDLE: CandleEvent, EventType.SIGNAL: SignalEvent, EventType.ORDER: OrderEvent, + EventType.NEWS: NewsEvent, } diff --git a/shared/src/shared/models.py b/shared/src/shared/models.py index 70820b5..a436c03 100644 --- a/shared/src/shared/models.py +++ b/shared/src/shared/models.py @@ -74,3 +74,26 @@ class Position(BaseModel): @property def unrealized_pnl(self) -> Decimal: return self.quantity * (self.current_price - self.avg_entry_price) + + +class NewsCategory(str, Enum): + POLICY = "policy" + EARNINGS = "earnings" + MACRO = "macro" + SOCIAL = "social" + FILING = "filing" + FED = "fed" + + +class NewsItem(BaseModel): + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + source: str + headline: str + summary: Optional[str] = None + url: Optional[str] = None + published_at: datetime + symbols: list[str] = [] + sentiment: float + category: NewsCategory + raw_data: dict = {} + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) diff --git a/shared/src/shared/notifier.py b/shared/src/shared/notifier.py index f03919c..3d7b6cf 100644 --- a/shared/src/shared/notifier.py +++ b/shared/src/shared/notifier.py @@ -8,6 +8,7 @@ from typing import Optional, Sequence import aiohttp from shared.models import Signal, Order, Position +from shared.sentiment_models import SelectedStock, MarketSentiment logger = logging.getLogger(__name__) @@ -123,6 +124,34 @@ class TelegramNotifier: lines.append(" No open positions") await self.send("\n".join(lines)) + async def send_stock_selection( + self, + selections: list[SelectedStock], + market: MarketSentiment | None = None, + ) -> None: + """Format and send stock selection notification.""" + lines = [f"<b>📊 Stock Selection ({len(selections)} picks)</b>", ""] + + side_emoji = {"BUY": "🟢", "SELL": "🔴"} + + for i, s in enumerate(selections, 1): + emoji = side_emoji.get(s.side.value, "⚪") + lines.append( + f"{i}. <b>{s.symbol}</b> {emoji} {s.side.value} (conviction: {s.conviction:.0%})" + ) + lines.append(f" {s.reason}") + if s.key_news: + lines.append(f" News: {s.key_news[0]}") + lines.append("") + + if market: + lines.append( + f"Market: F&G {market.fear_greed} ({market.fear_greed_label})" + + (f" | VIX {market.vix:.1f}" if market.vix else "") + ) + + await self.send("\n".join(lines)) + async def close(self) -> None: """Close the underlying aiohttp session.""" if self._session is not None: diff --git a/shared/src/shared/resilience.py b/shared/src/shared/resilience.py index e43fd21..8d8678a 100644 --- a/shared/src/shared/resilience.py +++ b/shared/src/shared/resilience.py @@ -1,105 +1 @@ -"""Retry with exponential backoff and circuit breaker utilities.""" - -from __future__ import annotations - -import asyncio -import enum -import functools -import logging -import random -import time -from typing import Any, Callable - -logger = logging.getLogger(__name__) - - -# --------------------------------------------------------------------------- -# retry_with_backoff -# --------------------------------------------------------------------------- - - -def retry_with_backoff( - max_retries: int = 3, - base_delay: float = 1.0, - max_delay: float = 60.0, -) -> Callable: - """Decorator that retries an async function with exponential backoff + jitter.""" - - def decorator(func: Callable) -> Callable: - @functools.wraps(func) - async def wrapper(*args: Any, **kwargs: Any) -> Any: - last_exc: BaseException | None = None - for attempt in range(max_retries + 1): - try: - return await func(*args, **kwargs) - except Exception as exc: - last_exc = exc - if attempt < max_retries: - delay = min(base_delay * (2**attempt), max_delay) - jitter = delay * random.uniform(0, 0.5) - total_delay = delay + jitter - logger.warning( - "Retry %d/%d for %s after error: %s (delay=%.3fs)", - attempt + 1, - max_retries, - func.__name__, - exc, - total_delay, - ) - await asyncio.sleep(total_delay) - raise last_exc # type: ignore[misc] - - return wrapper - - return decorator - - -# --------------------------------------------------------------------------- -# CircuitBreaker -# --------------------------------------------------------------------------- - - -class CircuitState(enum.Enum): - CLOSED = "closed" - OPEN = "open" - HALF_OPEN = "half_open" - - -class CircuitBreaker: - """Simple circuit breaker implementation.""" - - def __init__( - self, - failure_threshold: int = 5, - recovery_timeout: float = 60.0, - ) -> None: - self._failure_threshold = failure_threshold - self._recovery_timeout = recovery_timeout - self._failure_count: int = 0 - self._state = CircuitState.CLOSED - self._opened_at: float = 0.0 - - @property - def state(self) -> CircuitState: - return self._state - - def allow_request(self) -> bool: - if self._state == CircuitState.CLOSED: - return True - if self._state == CircuitState.OPEN: - if time.monotonic() - self._opened_at >= self._recovery_timeout: - self._state = CircuitState.HALF_OPEN - return True - return False - # HALF_OPEN - return True - - def record_success(self) -> None: - self._failure_count = 0 - self._state = CircuitState.CLOSED - - def record_failure(self) -> None: - self._failure_count += 1 - if self._failure_count >= self._failure_threshold: - self._state = CircuitState.OPEN - self._opened_at = time.monotonic() +"""Resilience utilities for the trading platform.""" diff --git a/shared/src/shared/sa_models.py b/shared/src/shared/sa_models.py index 8386ba8..dc87ef5 100644 --- a/shared/src/shared/sa_models.py +++ b/shared/src/shared/sa_models.py @@ -3,6 +3,7 @@ from datetime import datetime from decimal import Decimal +import sqlalchemy as sa from sqlalchemy import DateTime, ForeignKey, Numeric, Text from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column @@ -52,19 +53,6 @@ class OrderRow(Base): filled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) -class TradeRow(Base): - __tablename__ = "trades" - - id: Mapped[str] = mapped_column(Text, primary_key=True) - order_id: Mapped[str | None] = mapped_column(Text, ForeignKey("orders.id")) - symbol: Mapped[str] = mapped_column(Text, nullable=False) - side: Mapped[str] = mapped_column(Text, nullable=False) - price: Mapped[Decimal] = mapped_column(Numeric, nullable=False) - quantity: Mapped[Decimal] = mapped_column(Numeric, nullable=False) - fee: Mapped[Decimal] = mapped_column(Numeric, nullable=False, server_default="0") - traded_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) - - class PositionRow(Base): __tablename__ = "positions" @@ -83,3 +71,63 @@ class PortfolioSnapshotRow(Base): realized_pnl: Mapped[Decimal] = mapped_column(Numeric, nullable=False) unrealized_pnl: Mapped[Decimal] = mapped_column(Numeric, nullable=False) snapshot_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + + +class NewsItemRow(Base): + __tablename__ = "news_items" + + id: Mapped[str] = mapped_column(Text, primary_key=True) + source: Mapped[str] = mapped_column(Text, nullable=False) + headline: Mapped[str] = mapped_column(Text, nullable=False) + summary: Mapped[str | None] = mapped_column(Text) + url: Mapped[str | None] = mapped_column(Text) + published_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + symbols: Mapped[str | None] = mapped_column(Text) # JSON-encoded list + sentiment: Mapped[float] = mapped_column(sa.Float, nullable=False) + category: Mapped[str] = mapped_column(Text, nullable=False) + raw_data: Mapped[str | None] = mapped_column(Text) # JSON string + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=sa.func.now() + ) + + +class SymbolScoreRow(Base): + __tablename__ = "symbol_scores" + + id: Mapped[str] = mapped_column(Text, primary_key=True) + symbol: Mapped[str] = mapped_column(Text, nullable=False, unique=True) + news_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0") + news_count: Mapped[int] = mapped_column(sa.Integer, nullable=False, server_default="0") + social_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0") + policy_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0") + filing_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0") + composite: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0") + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + + +class MarketSentimentRow(Base): + __tablename__ = "market_sentiment" + + id: Mapped[str] = mapped_column(Text, primary_key=True) + fear_greed: Mapped[int] = mapped_column(sa.Integer, nullable=False) + fear_greed_label: Mapped[str] = mapped_column(Text, nullable=False) + vix: Mapped[float | None] = mapped_column(sa.Float) + fed_stance: Mapped[str] = mapped_column(Text, nullable=False, server_default="neutral") + market_regime: Mapped[str] = mapped_column(Text, nullable=False, server_default="neutral") + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + + +class StockSelectionRow(Base): + __tablename__ = "stock_selections" + + id: Mapped[str] = mapped_column(Text, primary_key=True) + trade_date: Mapped[datetime] = mapped_column(sa.Date, nullable=False) + symbol: Mapped[str] = mapped_column(Text, nullable=False) + side: Mapped[str] = mapped_column(Text, nullable=False) + conviction: Mapped[float] = mapped_column(sa.Float, nullable=False) + reason: Mapped[str] = mapped_column(Text, nullable=False) + key_news: Mapped[str | None] = mapped_column(Text) # JSON string + sentiment_snapshot: Mapped[str | None] = mapped_column(Text) # JSON string + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=sa.func.now() + ) diff --git a/shared/src/shared/sentiment.py b/shared/src/shared/sentiment.py index 8213b47..5b4b0da 100644 --- a/shared/src/shared/sentiment.py +++ b/shared/src/shared/sentiment.py @@ -1,35 +1,105 @@ -"""Market sentiment data.""" - -import logging -from dataclasses import dataclass, field -from datetime import datetime, timezone - -logger = logging.getLogger(__name__) - - -@dataclass -class SentimentData: - """Aggregated sentiment snapshot.""" - - fear_greed_value: int | None = None - fear_greed_label: str | None = None - news_sentiment: float | None = None - news_count: int = 0 - exchange_netflow: float | None = None - timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) - - @property - def should_buy(self) -> bool: - if self.fear_greed_value is not None and self.fear_greed_value > 70: - return False - if self.news_sentiment is not None and self.news_sentiment < -0.3: - return False - return True - - @property - def should_block(self) -> bool: - if self.fear_greed_value is not None and self.fear_greed_value > 80: - return True - if self.news_sentiment is not None and self.news_sentiment < -0.5: - return True - return False +"""Market sentiment aggregation.""" + +from datetime import datetime + +from shared.sentiment_models import SymbolScore + + +def _safe_avg(values: list[float]) -> float: + if not values: + return 0.0 + return sum(values) / len(values) + + +class SentimentAggregator: + """Aggregates per-news sentiment into per-symbol scores.""" + + WEIGHTS = {"news": 0.3, "social": 0.2, "policy": 0.3, "filing": 0.2} + + CATEGORY_MAP = { + "earnings": "news", + "macro": "news", + "social": "social", + "policy": "policy", + "filing": "filing", + "fed": "policy", + } + + def _freshness_decay(self, published_at: datetime, now: datetime) -> float: + age = now - published_at + hours = age.total_seconds() / 3600 + if hours < 1: + return 1.0 + if hours < 6: + return 0.7 + if hours < 24: + return 0.3 + return 0.0 + + def _compute_composite( + self, + news_score: float, + social_score: float, + policy_score: float, + filing_score: float, + ) -> float: + return ( + news_score * self.WEIGHTS["news"] + + social_score * self.WEIGHTS["social"] + + policy_score * self.WEIGHTS["policy"] + + filing_score * self.WEIGHTS["filing"] + ) + + def aggregate(self, news_items: list[dict], now: datetime) -> dict[str, SymbolScore]: + """Aggregate news items into per-symbol scores. + + Each dict needs: symbols, sentiment, category, published_at. + """ + symbol_data: dict[str, dict] = {} + + for item in news_items: + decay = self._freshness_decay(item["published_at"], now) + if decay == 0.0: + continue + category = item.get("category", "macro") + score_field = self.CATEGORY_MAP.get(category, "news") + weighted_sentiment = item["sentiment"] * decay + + for symbol in item.get("symbols", []): + if symbol not in symbol_data: + symbol_data[symbol] = { + "news_scores": [], + "social_scores": [], + "policy_scores": [], + "filing_scores": [], + "count": 0, + } + symbol_data[symbol][f"{score_field}_scores"].append(weighted_sentiment) + symbol_data[symbol]["count"] += 1 + + result = {} + for symbol, data in symbol_data.items(): + ns = _safe_avg(data["news_scores"]) + ss = _safe_avg(data["social_scores"]) + ps = _safe_avg(data["policy_scores"]) + fs = _safe_avg(data["filing_scores"]) + result[symbol] = SymbolScore( + symbol=symbol, + news_score=ns, + news_count=data["count"], + social_score=ss, + policy_score=ps, + filing_score=fs, + composite=self._compute_composite(ns, ss, ps, fs), + updated_at=now, + ) + return result + + def determine_regime(self, fear_greed: int, vix: float | None) -> str: + if fear_greed <= 20: + return "risk_off" + if vix is not None and vix > 30: + return "risk_off" + if fear_greed >= 60 and (vix is None or vix < 20): + return "risk_on" + return "neutral" diff --git a/shared/src/shared/sentiment_models.py b/shared/src/shared/sentiment_models.py new file mode 100644 index 0000000..a009601 --- /dev/null +++ b/shared/src/shared/sentiment_models.py @@ -0,0 +1,44 @@ +"""Sentiment scoring and stock selection models.""" + +from datetime import datetime +from typing import Optional + +from pydantic import BaseModel + +from shared.models import OrderSide + + +class SymbolScore(BaseModel): + symbol: str + news_score: float + news_count: int + social_score: float + policy_score: float + filing_score: float + composite: float + updated_at: datetime + + +class MarketSentiment(BaseModel): + fear_greed: int + fear_greed_label: str + vix: Optional[float] = None + fed_stance: str + market_regime: str + updated_at: datetime + + +class SelectedStock(BaseModel): + symbol: str + side: OrderSide + conviction: float + reason: str + key_news: list[str] + + +class Candidate(BaseModel): + symbol: str + source: str + direction: Optional[OrderSide] = None + score: float + reason: str diff --git a/shared/tests/test_db_news.py b/shared/tests/test_db_news.py new file mode 100644 index 0000000..a2c9140 --- /dev/null +++ b/shared/tests/test_db_news.py @@ -0,0 +1,78 @@ +"""Tests for database news/sentiment methods. Uses in-memory SQLite.""" + +import pytest +from datetime import datetime, date, timezone + +from shared.db import Database +from shared.models import NewsItem, NewsCategory +from shared.sentiment_models import SymbolScore, MarketSentiment + + +@pytest.fixture +async def db(): + database = Database("sqlite+aiosqlite://") + await database.connect() + yield database + await database.close() + + +async def test_insert_and_get_news_items(db): + item = NewsItem( + source="finnhub", + headline="AAPL earnings beat", + published_at=datetime(2026, 4, 2, 12, 0, tzinfo=timezone.utc), + sentiment=0.8, + category=NewsCategory.EARNINGS, + symbols=["AAPL"], + ) + await db.insert_news_item(item) + items = await db.get_recent_news(hours=24) + assert len(items) == 1 + assert items[0]["headline"] == "AAPL earnings beat" + + +async def test_upsert_symbol_score(db): + score = SymbolScore( + symbol="AAPL", + news_score=0.5, + news_count=10, + social_score=0.3, + policy_score=0.0, + filing_score=0.2, + composite=0.3, + updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + ) + await db.upsert_symbol_score(score) + scores = await db.get_top_symbol_scores(limit=5) + assert len(scores) == 1 + assert scores[0]["symbol"] == "AAPL" + + +async def test_upsert_market_sentiment(db): + ms = MarketSentiment( + fear_greed=55, + fear_greed_label="Neutral", + vix=18.2, + fed_stance="neutral", + market_regime="neutral", + updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + ) + await db.upsert_market_sentiment(ms) + result = await db.get_latest_market_sentiment() + assert result is not None + assert result["fear_greed"] == 55 + + +async def test_insert_stock_selection(db): + await db.insert_stock_selection( + trade_date=date(2026, 4, 2), + symbol="NVDA", + side="BUY", + conviction=0.85, + reason="CHIPS Act", + key_news=["Trump signs CHIPS expansion"], + sentiment_snapshot={"composite": 0.8}, + ) + selections = await db.get_stock_selections(date(2026, 4, 2)) + assert len(selections) == 1 + assert selections[0]["symbol"] == "NVDA" diff --git a/shared/tests/test_news_events.py b/shared/tests/test_news_events.py new file mode 100644 index 0000000..384796a --- /dev/null +++ b/shared/tests/test_news_events.py @@ -0,0 +1,56 @@ +"""Tests for NewsEvent.""" + +from datetime import datetime, timezone + +from shared.models import NewsCategory, NewsItem +from shared.events import NewsEvent, EventType, Event + + +def test_news_event_to_dict(): + item = NewsItem( + source="finnhub", + headline="Test", + published_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + sentiment=0.5, + category=NewsCategory.MACRO, + ) + event = NewsEvent(data=item) + d = event.to_dict() + assert d["type"] == EventType.NEWS + assert d["data"]["source"] == "finnhub" + + +def test_news_event_from_raw(): + raw = { + "type": "NEWS", + "data": { + "id": "abc", + "source": "rss", + "headline": "Test headline", + "published_at": "2026-04-02T00:00:00+00:00", + "sentiment": 0.3, + "category": "earnings", + "symbols": ["AAPL"], + "raw_data": {}, + }, + } + event = NewsEvent.from_raw(raw) + assert event.data.source == "rss" + assert event.data.symbols == ["AAPL"] + + +def test_event_dispatcher_news(): + raw = { + "type": "NEWS", + "data": { + "id": "abc", + "source": "finnhub", + "headline": "Test", + "published_at": "2026-04-02T00:00:00+00:00", + "sentiment": 0.0, + "category": "macro", + "raw_data": {}, + }, + } + event = Event.from_dict(raw) + assert isinstance(event, NewsEvent) diff --git a/shared/tests/test_resilience.py b/shared/tests/test_resilience.py deleted file mode 100644 index e287777..0000000 --- a/shared/tests/test_resilience.py +++ /dev/null @@ -1,139 +0,0 @@ -"""Tests for retry with backoff and circuit breaker.""" - -import time - -import pytest - -from shared.resilience import CircuitBreaker, CircuitState, retry_with_backoff - - -# --------------------------------------------------------------------------- -# retry_with_backoff tests -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_retry_succeeds_first_try(): - call_count = 0 - - @retry_with_backoff(max_retries=3, base_delay=0.01) - async def succeed(): - nonlocal call_count - call_count += 1 - return "ok" - - result = await succeed() - assert result == "ok" - assert call_count == 1 - - -@pytest.mark.asyncio -async def test_retry_succeeds_after_failures(): - call_count = 0 - - @retry_with_backoff(max_retries=3, base_delay=0.01) - async def flaky(): - nonlocal call_count - call_count += 1 - if call_count < 3: - raise ValueError("not yet") - return "recovered" - - result = await flaky() - assert result == "recovered" - assert call_count == 3 - - -@pytest.mark.asyncio -async def test_retry_raises_after_max_retries(): - call_count = 0 - - @retry_with_backoff(max_retries=3, base_delay=0.01) - async def always_fail(): - nonlocal call_count - call_count += 1 - raise RuntimeError("permanent") - - with pytest.raises(RuntimeError, match="permanent"): - await always_fail() - # 1 initial + 3 retries = 4 calls - assert call_count == 4 - - -@pytest.mark.asyncio -async def test_retry_respects_max_delay(): - """Backoff should be capped at max_delay.""" - - @retry_with_backoff(max_retries=2, base_delay=0.01, max_delay=0.02) - async def always_fail(): - raise RuntimeError("fail") - - start = time.monotonic() - with pytest.raises(RuntimeError): - await always_fail() - elapsed = time.monotonic() - start - # With max_delay=0.02 and 2 retries, total delay should be small - assert elapsed < 0.5 - - -# --------------------------------------------------------------------------- -# CircuitBreaker tests -# --------------------------------------------------------------------------- - - -def test_circuit_starts_closed(): - cb = CircuitBreaker(failure_threshold=3, recovery_timeout=0.05) - assert cb.state == CircuitState.CLOSED - assert cb.allow_request() is True - - -def test_circuit_opens_after_threshold(): - cb = CircuitBreaker(failure_threshold=3, recovery_timeout=60.0) - for _ in range(3): - cb.record_failure() - assert cb.state == CircuitState.OPEN - assert cb.allow_request() is False - - -def test_circuit_rejects_when_open(): - cb = CircuitBreaker(failure_threshold=2, recovery_timeout=60.0) - cb.record_failure() - cb.record_failure() - assert cb.state == CircuitState.OPEN - assert cb.allow_request() is False - - -def test_circuit_half_open_after_timeout(): - cb = CircuitBreaker(failure_threshold=2, recovery_timeout=0.05) - cb.record_failure() - cb.record_failure() - assert cb.state == CircuitState.OPEN - - time.sleep(0.06) - assert cb.allow_request() is True - assert cb.state == CircuitState.HALF_OPEN - - -def test_circuit_closes_on_success(): - cb = CircuitBreaker(failure_threshold=2, recovery_timeout=0.05) - cb.record_failure() - cb.record_failure() - assert cb.state == CircuitState.OPEN - - time.sleep(0.06) - cb.allow_request() # triggers HALF_OPEN - assert cb.state == CircuitState.HALF_OPEN - - cb.record_success() - assert cb.state == CircuitState.CLOSED - assert cb.allow_request() is True - - -def test_circuit_reopens_on_failure_in_half_open(): - cb = CircuitBreaker(failure_threshold=2, recovery_timeout=0.05) - cb.record_failure() - cb.record_failure() - time.sleep(0.06) - cb.allow_request() # HALF_OPEN - cb.record_failure() - assert cb.state == CircuitState.OPEN diff --git a/shared/tests/test_sa_models.py b/shared/tests/test_sa_models.py index 67c3c82..ae73833 100644 --- a/shared/tests/test_sa_models.py +++ b/shared/tests/test_sa_models.py @@ -11,9 +11,12 @@ def test_base_metadata_has_all_tables(): "candles", "signals", "orders", - "trades", "positions", "portfolio_snapshots", + "news_items", + "symbol_scores", + "market_sentiment", + "stock_selections", } assert expected == table_names @@ -120,44 +123,6 @@ class TestOrderRow: assert fk_cols == {"signal_id": "signals.id"} -class TestTradeRow: - def test_table_name(self): - from shared.sa_models import TradeRow - - assert TradeRow.__tablename__ == "trades" - - def test_columns(self): - from shared.sa_models import TradeRow - - mapper = inspect(TradeRow) - cols = {c.key for c in mapper.column_attrs} - expected = { - "id", - "order_id", - "symbol", - "side", - "price", - "quantity", - "fee", - "traded_at", - } - assert expected == cols - - def test_primary_key(self): - from shared.sa_models import TradeRow - - mapper = inspect(TradeRow) - pk_cols = [c.name for c in mapper.mapper.primary_key] - assert pk_cols == ["id"] - - def test_order_id_foreign_key(self): - from shared.sa_models import TradeRow - - table = TradeRow.__table__ - fk_cols = {fk.parent.name: fk.target_fullname for fk in table.foreign_keys} - assert fk_cols == {"order_id": "orders.id"} - - class TestPositionRow: def test_table_name(self): from shared.sa_models import PositionRow @@ -229,11 +194,3 @@ class TestStatusDefault: status_col = table.c.status assert status_col.server_default is not None assert status_col.server_default.arg == "PENDING" - - def test_trade_fee_server_default(self): - from shared.sa_models import TradeRow - - table = TradeRow.__table__ - fee_col = table.c.fee - assert fee_col.server_default is not None - assert fee_col.server_default.arg == "0" diff --git a/shared/tests/test_sa_news_models.py b/shared/tests/test_sa_news_models.py new file mode 100644 index 0000000..91e6d4a --- /dev/null +++ b/shared/tests/test_sa_news_models.py @@ -0,0 +1,29 @@ +"""Tests for news-related SQLAlchemy models.""" + +from shared.sa_models import NewsItemRow, SymbolScoreRow, MarketSentimentRow, StockSelectionRow + + +def test_news_item_row_tablename(): + assert NewsItemRow.__tablename__ == "news_items" + + +def test_symbol_score_row_tablename(): + assert SymbolScoreRow.__tablename__ == "symbol_scores" + + +def test_market_sentiment_row_tablename(): + assert MarketSentimentRow.__tablename__ == "market_sentiment" + + +def test_stock_selection_row_tablename(): + assert StockSelectionRow.__tablename__ == "stock_selections" + + +def test_news_item_row_columns(): + cols = {c.name for c in NewsItemRow.__table__.columns} + assert cols >= {"id", "source", "headline", "published_at", "sentiment", "category"} + + +def test_symbol_score_row_columns(): + cols = {c.name for c in SymbolScoreRow.__table__.columns} + assert cols >= {"id", "symbol", "news_score", "composite", "updated_at"} diff --git a/shared/tests/test_sentiment.py b/shared/tests/test_sentiment.py deleted file mode 100644 index 9bd8ea3..0000000 --- a/shared/tests/test_sentiment.py +++ /dev/null @@ -1,44 +0,0 @@ -"""Tests for market sentiment module.""" - -from shared.sentiment import SentimentData - - -def test_sentiment_should_buy_default_no_data(): - s = SentimentData() - assert s.should_buy is True - assert s.should_block is False - - -def test_sentiment_should_buy_low_fear_greed(): - s = SentimentData(fear_greed_value=15) - assert s.should_buy is True - - -def test_sentiment_should_not_buy_on_greed(): - s = SentimentData(fear_greed_value=75) - assert s.should_buy is False - - -def test_sentiment_should_not_buy_negative_news(): - s = SentimentData(news_sentiment=-0.4) - assert s.should_buy is False - - -def test_sentiment_should_buy_positive_news(): - s = SentimentData(fear_greed_value=50, news_sentiment=0.3) - assert s.should_buy is True - - -def test_sentiment_should_block_extreme_greed(): - s = SentimentData(fear_greed_value=85) - assert s.should_block is True - - -def test_sentiment_should_block_very_negative_news(): - s = SentimentData(news_sentiment=-0.6) - assert s.should_block is True - - -def test_sentiment_no_block_on_neutral(): - s = SentimentData(fear_greed_value=50, news_sentiment=0.0) - assert s.should_block is False diff --git a/shared/tests/test_sentiment_aggregator.py b/shared/tests/test_sentiment_aggregator.py new file mode 100644 index 0000000..a99c711 --- /dev/null +++ b/shared/tests/test_sentiment_aggregator.py @@ -0,0 +1,77 @@ +"""Tests for sentiment aggregator.""" + +import pytest +from datetime import datetime, timezone, timedelta +from shared.sentiment import SentimentAggregator + + +@pytest.fixture +def aggregator(): + return SentimentAggregator() + + +def test_freshness_decay_recent(): + a = SentimentAggregator() + now = datetime.now(timezone.utc) + assert a._freshness_decay(now, now) == 1.0 + + +def test_freshness_decay_3_hours(): + a = SentimentAggregator() + now = datetime.now(timezone.utc) + assert a._freshness_decay(now - timedelta(hours=3), now) == 0.7 + + +def test_freshness_decay_12_hours(): + a = SentimentAggregator() + now = datetime.now(timezone.utc) + assert a._freshness_decay(now - timedelta(hours=12), now) == 0.3 + + +def test_freshness_decay_old(): + a = SentimentAggregator() + now = datetime.now(timezone.utc) + assert a._freshness_decay(now - timedelta(days=2), now) == 0.0 + + +def test_compute_composite(): + a = SentimentAggregator() + composite = a._compute_composite( + news_score=0.5, social_score=0.3, policy_score=0.8, filing_score=0.2 + ) + expected = 0.5 * 0.3 + 0.3 * 0.2 + 0.8 * 0.3 + 0.2 * 0.2 + assert abs(composite - expected) < 0.001 + + +def test_aggregate_news_by_symbol(aggregator): + now = datetime.now(timezone.utc) + news_items = [ + {"symbols": ["AAPL"], "sentiment": 0.8, "category": "earnings", "published_at": now}, + { + "symbols": ["AAPL"], + "sentiment": 0.3, + "category": "macro", + "published_at": now - timedelta(hours=2), + }, + {"symbols": ["MSFT"], "sentiment": -0.5, "category": "policy", "published_at": now}, + ] + scores = aggregator.aggregate(news_items, now) + assert "AAPL" in scores + assert "MSFT" in scores + assert scores["AAPL"].news_count == 2 + assert scores["AAPL"].news_score > 0 + assert scores["MSFT"].policy_score < 0 + + +def test_aggregate_empty(aggregator): + now = datetime.now(timezone.utc) + assert aggregator.aggregate([], now) == {} + + +def test_determine_regime(): + a = SentimentAggregator() + assert a.determine_regime(15, None) == "risk_off" + assert a.determine_regime(15, 35.0) == "risk_off" + assert a.determine_regime(50, 35.0) == "risk_off" + assert a.determine_regime(70, 15.0) == "risk_on" + assert a.determine_regime(50, 20.0) == "neutral" diff --git a/shared/tests/test_sentiment_models.py b/shared/tests/test_sentiment_models.py new file mode 100644 index 0000000..25fc371 --- /dev/null +++ b/shared/tests/test_sentiment_models.py @@ -0,0 +1,113 @@ +"""Tests for news and sentiment models.""" + +from datetime import datetime, timezone + +from shared.models import NewsCategory, NewsItem, OrderSide +from shared.sentiment_models import SymbolScore, MarketSentiment, SelectedStock, Candidate + + +def test_news_item_defaults(): + item = NewsItem( + source="finnhub", + headline="Test headline", + published_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + sentiment=0.5, + category=NewsCategory.MACRO, + ) + assert item.id + assert item.symbols == [] + assert item.summary is None + assert item.raw_data == {} + assert item.created_at is not None + + +def test_news_item_with_symbols(): + item = NewsItem( + source="rss", + headline="AAPL earnings beat", + published_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + sentiment=0.8, + category=NewsCategory.EARNINGS, + symbols=["AAPL"], + ) + assert item.symbols == ["AAPL"] + assert item.category == NewsCategory.EARNINGS + + +def test_news_category_values(): + assert NewsCategory.POLICY == "policy" + assert NewsCategory.EARNINGS == "earnings" + assert NewsCategory.MACRO == "macro" + assert NewsCategory.SOCIAL == "social" + assert NewsCategory.FILING == "filing" + assert NewsCategory.FED == "fed" + + +def test_symbol_score(): + score = SymbolScore( + symbol="AAPL", + news_score=0.5, + news_count=10, + social_score=0.3, + policy_score=0.0, + filing_score=0.2, + composite=0.3, + updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + ) + assert score.symbol == "AAPL" + assert score.composite == 0.3 + + +def test_market_sentiment(): + ms = MarketSentiment( + fear_greed=25, + fear_greed_label="Extreme Fear", + vix=32.5, + fed_stance="hawkish", + market_regime="risk_off", + updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + ) + assert ms.market_regime == "risk_off" + assert ms.vix == 32.5 + + +def test_market_sentiment_no_vix(): + ms = MarketSentiment( + fear_greed=50, + fear_greed_label="Neutral", + fed_stance="neutral", + market_regime="neutral", + updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + ) + assert ms.vix is None + + +def test_selected_stock(): + ss = SelectedStock( + symbol="NVDA", + side=OrderSide.BUY, + conviction=0.85, + reason="CHIPS Act expansion", + key_news=["Trump signs CHIPS Act expansion"], + ) + assert ss.conviction == 0.85 + assert len(ss.key_news) == 1 + + +def test_candidate(): + c = Candidate( + symbol="TSLA", + source="sentiment", + direction=OrderSide.BUY, + score=0.75, + reason="High social buzz", + ) + assert c.direction == OrderSide.BUY + + c2 = Candidate( + symbol="XOM", + source="llm", + score=0.6, + reason="Oil price surge", + ) + assert c2.direction is None |
