summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.env.example15
-rw-r--r--CLAUDE.md106
-rw-r--r--docker-compose.yml21
-rw-r--r--docs/TODO.md296
-rw-r--r--docs/superpowers/plans/2026-04-02-news-driven-stock-selector.md3689
-rw-r--r--docs/superpowers/specs/2026-04-02-news-driven-stock-selector-design.md418
-rw-r--r--services/news-collector/Dockerfile9
-rw-r--r--services/news-collector/pyproject.toml25
-rw-r--r--services/news-collector/src/news_collector/__init__.py1
-rw-r--r--services/news-collector/src/news_collector/collectors/__init__.py1
-rw-r--r--services/news-collector/src/news_collector/collectors/base.py18
-rw-r--r--services/news-collector/src/news_collector/collectors/fear_greed.py63
-rw-r--r--services/news-collector/src/news_collector/collectors/fed.py119
-rw-r--r--services/news-collector/src/news_collector/collectors/finnhub.py88
-rw-r--r--services/news-collector/src/news_collector/collectors/reddit.py97
-rw-r--r--services/news-collector/src/news_collector/collectors/rss.py105
-rw-r--r--services/news-collector/src/news_collector/collectors/sec_edgar.py100
-rw-r--r--services/news-collector/src/news_collector/collectors/truth_social.py86
-rw-r--r--services/news-collector/src/news_collector/config.py10
-rw-r--r--services/news-collector/src/news_collector/main.py193
-rw-r--r--services/news-collector/tests/__init__.py0
-rw-r--r--services/news-collector/tests/test_fear_greed.py49
-rw-r--r--services/news-collector/tests/test_fed.py37
-rw-r--r--services/news-collector/tests/test_finnhub.py67
-rw-r--r--services/news-collector/tests/test_main.py39
-rw-r--r--services/news-collector/tests/test_reddit.py63
-rw-r--r--services/news-collector/tests/test_rss.py47
-rw-r--r--services/news-collector/tests/test_sec_edgar.py58
-rw-r--r--services/news-collector/tests/test_truth_social.py41
-rw-r--r--services/strategy-engine/src/strategy_engine/config.py4
-rw-r--r--services/strategy-engine/src/strategy_engine/main.py66
-rw-r--r--services/strategy-engine/src/strategy_engine/stock_selector.py404
-rw-r--r--services/strategy-engine/tests/conftest.py5
-rw-r--r--services/strategy-engine/tests/test_stock_selector.py80
-rw-r--r--shared/alembic/versions/002_news_sentiment_tables.py84
-rw-r--r--shared/src/shared/config.py12
-rw-r--r--shared/src/shared/db.py245
-rw-r--r--shared/src/shared/events.py19
-rw-r--r--shared/src/shared/models.py23
-rw-r--r--shared/src/shared/notifier.py29
-rw-r--r--shared/src/shared/resilience.py106
-rw-r--r--shared/src/shared/sa_models.py74
-rw-r--r--shared/src/shared/sentiment.py140
-rw-r--r--shared/src/shared/sentiment_models.py44
-rw-r--r--shared/tests/test_db_news.py78
-rw-r--r--shared/tests/test_news_events.py56
-rw-r--r--shared/tests/test_resilience.py139
-rw-r--r--shared/tests/test_sa_models.py51
-rw-r--r--shared/tests/test_sa_news_models.py29
-rw-r--r--shared/tests/test_sentiment.py44
-rw-r--r--shared/tests/test_sentiment_aggregator.py77
-rw-r--r--shared/tests/test_sentiment_models.py113
52 files changed, 7216 insertions, 567 deletions
diff --git a/.env.example b/.env.example
index 7a2751f..bdc6a67 100644
--- a/.env.example
+++ b/.env.example
@@ -19,6 +19,17 @@ TELEGRAM_CHAT_ID=
TELEGRAM_ENABLED=false
LOG_FORMAT=json
HEALTH_PORT=8080
-CIRCUIT_BREAKER_THRESHOLD=5
-CIRCUIT_BREAKER_TIMEOUT=60
METRICS_AUTH_TOKEN=
+
+# News Collector
+FINNHUB_API_KEY=
+NEWS_POLL_INTERVAL=300
+SENTIMENT_AGGREGATE_INTERVAL=900
+
+# Stock Selector
+SELECTOR_FINAL_TIME=15:30
+SELECTOR_MAX_PICKS=3
+
+# LLM (for stock selector)
+ANTHROPIC_API_KEY=
+ANTHROPIC_MODEL=claude-sonnet-4-20250514
diff --git a/CLAUDE.md b/CLAUDE.md
new file mode 100644
index 0000000..6e33f57
--- /dev/null
+++ b/CLAUDE.md
@@ -0,0 +1,106 @@
+# CLAUDE.md
+
+This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
+
+## Project Overview
+
+US stock trading platform built as a Python microservices architecture. Uses Alpaca Markets API for market data and order execution. Services communicate via Redis Streams and persist to PostgreSQL.
+
+## Common Commands
+
+```bash
+make infra # Start Redis + Postgres (required before running services/tests)
+make up # Start all services via Docker Compose
+make down # Stop all services
+make test # Run all tests (pytest -v)
+make lint # Lint check (ruff check + format check)
+make format # Auto-fix lint + format
+make migrate # Run DB migrations (alembic upgrade head, from shared/)
+make migrate-new msg="description" # Create new migration
+make ci # Full CI: install deps, lint, test, Docker build
+make e2e # End-to-end tests
+```
+
+Run a single test file: `pytest services/strategy-engine/tests/test_rsi_strategy.py -v`
+
+## Architecture
+
+### Services (each in `services/<name>/`, each has its own Dockerfile)
+
+- **data-collector** (port 8080): Fetches stock bars from Alpaca, publishes `CandleEvent` to Redis stream `candles`
+- **news-collector** (port 8084): Continuously collects news from 7 sources (Finnhub, RSS, SEC EDGAR, Truth Social, Reddit, Fear & Greed, Fed), runs sentiment aggregation every 15 min
+- **strategy-engine** (port 8081): Consumes candle events, runs strategies, publishes `SignalEvent` to stream `signals`. Also runs the stock selector at 15:30 ET daily
+- **order-executor** (port 8082): Consumes signals, runs risk checks, places orders via Alpaca, publishes `OrderEvent` to stream `orders`
+- **portfolio-manager** (port 8083): Tracks positions, PnL, portfolio snapshots
+- **api** (port 8000): FastAPI REST endpoint layer
+- **backtester**: Offline backtesting engine with walk-forward analysis
+
+### Event Flow
+
+```
+Alpaca → data-collector → [candles stream] → strategy-engine → [signals stream] → order-executor → [orders stream] → portfolio-manager
+
+News sources → news-collector → [news stream] → sentiment aggregator → symbol_scores DB
+ ↓
+ stock selector (15:30 ET) → [selected_stocks stream] → MOC strategy → signals
+```
+
+All inter-service events use `shared/src/shared/events.py` (CandleEvent, SignalEvent, OrderEvent, NewsEvent) serialized as JSON over Redis Streams via `shared/src/shared/broker.py` (RedisBroker).
+
+### Shared Library (`shared/`)
+
+Installed as editable package (`pip install -e shared/`). Contains:
+- `models.py` — Pydantic domain models: Candle, Signal, Order, Position, NewsItem, NewsCategory
+- `sentiment_models.py` — SymbolScore, MarketSentiment, SelectedStock, Candidate
+- `sa_models.py` — SQLAlchemy ORM models (CandleRow, SignalRow, OrderRow, PortfolioSnapshotRow, NewsItemRow, SymbolScoreRow, MarketSentimentRow, StockSelectionRow)
+- `broker.py` — RedisBroker (async Redis Streams pub/sub with consumer groups)
+- `db.py` — Database class (async SQLAlchemy 2.0), includes news/sentiment/selection CRUD methods
+- `alpaca.py` — AlpacaClient (async aiohttp client for Alpaca Trading + Market Data APIs)
+- `events.py` — Event types and serialization (CandleEvent, SignalEvent, OrderEvent, NewsEvent)
+- `sentiment.py` — SentimentData (legacy gating) + SentimentAggregator (freshness-weighted composite scoring)
+- `config.py`, `logging.py`, `metrics.py`, `notifier.py` (Telegram), `resilience.py`, `healthcheck.py`
+
+DB migrations live in `shared/alembic/`.
+
+### Strategy System (`services/strategy-engine/strategies/`)
+
+Strategies extend `BaseStrategy` (in `strategies/base.py`) and implement `on_candle()`, `configure()`, `warmup_period`. The plugin loader (`strategy_engine/plugin_loader.py`) auto-discovers `*.py` files in the strategies directory and loads YAML config from `strategies/config/<strategy_name>.yaml`.
+
+BaseStrategy provides optional filters (ADX regime, volume, ATR-based stops) via `_init_filters()` and `_apply_filters()`.
+
+### News-Driven Stock Selector (`services/strategy-engine/src/strategy_engine/stock_selector.py`)
+
+Dynamic stock selection for MOC (Market on Close) trading. Runs daily at 15:30 ET via `strategy-engine`:
+
+1. **Candidate Pool**: Top 20 by sentiment score + LLM-recommended stocks from today's news
+2. **Technical Filter**: RSI 30-70, price > 20 EMA, volume > 50% average
+3. **LLM Final Selection**: Claude picks 2-3 stocks with rationale
+
+Market gating: blocks all trades when Fear & Greed ≤ 20 or VIX > 30 (`risk_off` regime).
+
+### News Collector (`services/news-collector/`)
+
+7 collectors extending `BaseCollector` in `collectors/`:
+- `finnhub.py` (5min), `rss.py` (10min), `reddit.py` (15min), `truth_social.py` (15min), `sec_edgar.py` (30min), `fear_greed.py` (1hr), `fed.py` (1hr)
+- All use VADER (nltk) for sentiment scoring
+- Provider abstraction via `BaseCollector` for future paid API swap (config change only)
+
+Sentiment aggregation (every 15min) computes per-symbol composite scores with freshness decay and category weights (policy 0.3, news 0.3, social 0.2, filing 0.2).
+
+### CLI (`cli/`)
+
+Click-based CLI installed as `trading` command. Depends on the shared library.
+
+## Tech Stack
+
+- Python 3.12+, async throughout (asyncio, aiohttp)
+- Pydantic for models, SQLAlchemy 2.0 async ORM, Alembic for migrations
+- Redis Streams for inter-service messaging
+- PostgreSQL 16 for persistence
+- Ruff for linting/formatting (line-length=100)
+- pytest + pytest-asyncio (asyncio_mode="auto")
+- Docker Compose for deployment; monitoring stack (Grafana/Prometheus/Loki) available via `--profile monitoring`
+
+## Environment
+
+Copy `.env.example` to `.env`. Key vars: `ALPACA_API_KEY`, `ALPACA_API_SECRET`, `ALPACA_PAPER=true`, `DRY_RUN=true`, `DATABASE_URL`, `REDIS_URL`, `FINNHUB_API_KEY`, `ANTHROPIC_API_KEY`. DRY_RUN=true simulates order fills without hitting Alpaca. Stock selector requires `ANTHROPIC_API_KEY` to be set.
diff --git a/docker-compose.yml b/docker-compose.yml
index e981f74..63630ff 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -22,7 +22,7 @@ services:
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
- test: ["CMD-LINE", "pg_isready", "-U", "trading"]
+ test: ["CMD", "pg_isready", "-U", "trading"]
interval: 5s
timeout: 3s
retries: 5
@@ -122,6 +122,25 @@ services:
retries: 3
restart: unless-stopped
+ news-collector:
+ build:
+ context: .
+ dockerfile: services/news-collector/Dockerfile
+ env_file: .env
+ ports:
+ - "8084:8084"
+ depends_on:
+ redis:
+ condition: service_healthy
+ postgres:
+ condition: service_healthy
+ healthcheck:
+ test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8084/health')"]
+ interval: 10s
+ timeout: 5s
+ retries: 3
+ restart: unless-stopped
+
loki:
image: grafana/loki:latest
profiles: ["monitoring"]
diff --git a/docs/TODO.md b/docs/TODO.md
index d737b0f..fd4f7eb 100644
--- a/docs/TODO.md
+++ b/docs/TODO.md
@@ -1,201 +1,147 @@
-# Trading Platform — TODO
+# US Stock Trading Platform — TODO
-> Last updated: 2026-04-01
+> Last updated: 2026-04-02
## Current State
-- **298 tests**, lint clean, production-ready 인프라
-- 8 strategies, 6 services, full monitoring/CI/CD stack
-- **트레이딩 전략 업그레이드 필요** — 아래 상세
+- **375 tests**, lint clean
+- **US 주식 전용** (Alpaca API, 수수료 0%)
+- 6 microservices + CLI + shared library
+- MOC (Market on Close) 전략 + 기술적 전략 7개
+- Prometheus/Grafana/Loki 모니터링, CI/CD, Telegram 알림
+- Claude 기반 종목 스크리너
---
-## Trading Strategy Upgrade Plan
-
-### Phase 1: 백테스터 현실화 (최우선)
-
-현재 백테스트 결과는 슬리피지/수수료 미포함으로 **실제 수익과 큰 차이**가 있음. 이것부터 수정해야 전략 개선의 효과를 정확히 측정 가능.
-
-#### 1-1. 슬리피지 + 수수료 모델링
-- **파일:** `backtester/simulator.py`
-- **현재:** 시그널 가격 그대로 체결, 수수료 0
-- **수정:** 매수 시 `price * (1 + slippage_pct)`, 매도 시 `price * (1 - slippage_pct)`
-- 수수료: `cost = price * quantity * fee_pct` (maker: 0.05%, taker: 0.1%)
-- 슬리피지를 주문 크기에 비례하게 (대형 주문 → 더 큰 슬리피지)
-- **설정:** `BacktestConfig`에 `slippage_pct`, `maker_fee_pct`, `taker_fee_pct` 추가
-
-#### 1-2. 손절/익절 자동 실행
-- **파일:** `backtester/simulator.py`
-- **현재:** 시그널로만 매매, 스탑 없음
-- **수정:** 각 포지션에 stop_loss, take_profit 가격 추적
-- 매 캔들마다 `high >= take_profit` 또는 `low <= stop_loss` 체크 → 자동 청산
-- `engine.py`에서 캔들 처리 시 시뮬레이터에 현재 가격 전달
-
-#### 1-3. 공매도 지원
-- **파일:** `backtester/simulator.py`
-- **현재:** 매도는 보유 수량 내에서만 가능
-- **수정:** `allow_short: bool` 설정, 공매도 시 음수 포지션 허용
-- 공매도 수수료 (borrow fee) 추가
-
-#### 1-4. Walk-Forward Analysis
-- **파일:** `backtester/engine.py` (신규 클래스)
-- **현재:** 전체 데이터로 백테스트 → 과적합 위험
-- **수정:** `WalkForwardEngine` 클래스
- - 데이터를 N개 구간으로 분할
- - 각 구간: in-sample (파라미터 최적화) → out-of-sample (검증)
- - 최종 결과는 out-of-sample 구간만 합산
-- 파라미터 최적화: grid search 또는 random search
-
-#### 1-5. 메트릭 정확도 개선
-- **파일:** `backtester/metrics.py`
-- Sharpe/Sortino를 per-trade가 아닌 **일별 수익률 기반**으로 계산
-- Risk-free rate 설정 추가 (기본 5%)
-- Recovery Factor (총수익 / 최대 drawdown) 추가
-- 최대 연속 손실 횟수 추가
-- 인트라 트레이드 drawdown (진입 후 최저점) 계산
+## Architecture
----
+```
+Alpaca API → data-collector (REST polling)
+ → Redis Streams → strategy-engine (MOC + 기술전략)
+ → signals → order-executor (Alpaca 주문)
+ → orders → portfolio-manager (포지션 추적)
-### Phase 2: 전략 공통 인프라
-
-모든 전략에 적용할 공통 기능. 개별 전략 개선 전에 인프라를 먼저 구축.
-
-#### 2-1. ATR 기반 동적 손절/익절
-- **파일:** `strategies/base.py` + 각 전략
-- **현재:** 어떤 전략도 손절/익절을 설정하지 않음
-- **수정:** `BaseStrategy`에 `calculate_stop_loss(candle, atr)` 메서드 추가
-- ATR (Average True Range) 유틸리티 함수 (`shared/` 또는 `strategies/indicators/`)
-- 손절: entry - ATR * multiplier, 익절: entry + ATR * reward_ratio
-- Signal에 `stop_loss`, `take_profit` 필드 추가 (`shared/models.py`)
-
-#### 2-2. 추세/횡보 레짐 필터 (ADX)
-- **파일:** `strategies/indicators/adx.py` (신규)
-- ADX (Average Directional Index) 계산 유틸리티
-- ADX > 25 = 추세장, ADX < 20 = 횡보장
-- 각 전략이 레짐에 따라 동작 변경:
- - 추세 추종 전략 (MACD, EMA): ADX < 20이면 시그널 무시
- - 평균 회귀 전략 (RSI, Bollinger, Grid): ADX > 30이면 시그널 무시
-
-#### 2-3. 볼륨 확인 필터
-- **파일:** 각 전략
-- **현재:** 모든 전략이 볼륨을 무시 (RSI, MACD, EMA 등)
-- **수정:** 시그널 발생 시 해당 캔들의 볼륨이 최근 N개 평균 대비 일정 비율 이상인지 확인
-- 볼륨 < 평균의 50%면 시그널 무시 (유동성 부족)
-- 볼륨 > 평균의 200%면 시그널 가중치 증가
-
-#### 2-4. 시그널 강도 (Conviction Score)
-- **파일:** `shared/models.py` Signal + 각 전략
-- **현재:** 시그널은 BUY/SELL/quantity만 있음
-- **수정:** Signal에 `conviction: float` (0.0~1.0) 필드 추가
- - RSI 5 → conviction 0.9, RSI 28 → conviction 0.3
- - MACD 히스토그램이 0에서 먼 크로스 → 높은 conviction
-- Combined 전략에서 conviction 기반 가중치 사용
-- RiskManager에서 conviction 기반 포지션 사이징
-
-#### 2-5. 지표 라이브러리
-- **디렉토리:** `strategies/indicators/` (신규)
-- 재사용 가능한 기술 지표 함수:
- - `atr(highs, lows, closes, period)` — Average True Range
- - `adx(highs, lows, closes, period)` — Average Directional Index
- - `ema(series, period)` — Exponential Moving Average
- - `sma(series, period)` — Simple Moving Average
- - `rsi(closes, period)` — RSI
- - `bollinger_bands(closes, period, num_std)` — Bollinger
- - `macd(closes, fast, slow, signal)` — MACD
- - `volume_sma(volumes, period)` — Volume SMA
-- 각 전략에서 직접 계산하지 않고 공통 라이브러리 사용
-- 테스트: 각 지표에 대한 unit test
+Claude API → stock_screener.py (종목 분석/추천)
+FastAPI → REST API (/api/v1/portfolio, orders, strategies)
+```
----
+## 핵심 매매 전략: MOC (Market on Close)
-### Phase 3: 개별 전략 고도화
-
-Phase 1-2 완료 후 각 전략을 전문가 수준으로 업그레이드.
-
-#### 3-1. RSI 전략 개선
-- [ ] RSI 다이버전스 감지 (가격 신고가 + RSI 하락 = 약세 다이버전스)
-- [ ] ADX 레짐 필터 적용 (추세장에서는 RSI 매수 신호 무시)
-- [ ] RSI 강도별 conviction score (RSI 5 vs RSI 28)
-- [ ] ATR 기반 손절/익절
-- [ ] 볼륨 확인 필터
-
-#### 3-2. MACD 전략 개선
-- [ ] 히스토그램 크로스오버 + MACD 제로라인 크로스오버 구분
-- [ ] MACD 다이버전스 감지
-- [ ] ADX 추세 확인 (ADX < 20이면 시그널 무시)
-- [ ] 제로라인으로부터 거리 기반 시그널 강도
-- [ ] ATR 기반 손절
-
-#### 3-3. Grid 전략 개선
-- [ ] ADX 기반 레짐 필터 (추세장 진입 차단)
-- [ ] 동적 그리드 재설정 (실현 변동성 기반 범위 조정)
-- [ ] 그리드 외 이탈 시 전 포지션 청산 + 알림
-- [ ] 볼륨 프로파일 기반 비균등 그리드 간격
-
-#### 3-4. Bollinger Bands 전략 개선
-- [ ] 스퀴즈 감지 (밴드 압축 → 브레이크아웃 대비)
-- [ ] %B 지표 활용 (밴드 내 위치 0~1)
-- [ ] RSI 확인 (하단 밴드 터치 + RSI < 30 = 강한 매수)
-- [ ] 볼륨 스파이크 확인
-
-#### 3-5. EMA Crossover 전략 개선
-- [ ] ADX > 25 필터 (강한 추세만 진입)
-- [ ] 풀백 진입 (크로스 후 단기 EMA로 되돌림 시 진입)
-- [ ] 50 SMA 위/아래 필터 (장기 추세 방향 확인)
-- [ ] 볼륨 확인
-
-#### 3-6. VWAP 전략 개선
-- [ ] 일중 리셋 (매일 00:00 UTC에 VWAP 재계산)
-- [ ] VWAP 표준편차 밴드 추가 (1σ, 2σ)
-- [ ] ATR 기반 deviation threshold (고정값 대신 변동성 적응형)
-- [ ] 세션 필터 (저유동성 시간대 진입 차단)
-
-#### 3-7. Volume Profile 전략 개선
-- [ ] HVN/LVN (고/저볼륨 노드) 식별
-- [ ] 세션 기반 프로파일 리셋
-- [ ] POC를 동적 지지/저항선으로 활용
-- [ ] 볼륨 델타 (매수량 - 매도량) 추적
-
-#### 3-8. Combined 전략 개선
-- [ ] Sub-strategy conviction score 반영
-- [ ] Sub-strategy 간 상관관계 행렬 계산 → 중복 시그널 감쇄
-- [ ] 적응형 가중치 (최근 win rate 기반 동적 가중치 조정)
-- [ ] 포트폴리오 집중도 제한
+```
+[매일 ET 15:50] Claude 종목 분석 → 매수 종목 선정
+[ET 15:50~16:00] 장 마감 직전 매수 (MOC 주문)
+[다음날 ET 9:35~10:00] 시가 매도
+
+조건: 양봉 + 볼륨 > 평균 + RSI 30~60 + EMA 위 + 모멘텀 양호
+손절: -2%, 포지션당 자본금 20%, 최대 5종목
+```
---
-### Phase 4: 리스크 관리 고도화
+## Remaining Work
+
+### 즉시 해야 할 것
-#### 4-1. 포트폴리오 레벨 리스크
-- [ ] 전체 노출도 제한 (총 포지션 가치 / 잔고 비율)
-- [ ] 포지션 간 상관관계 계산 → 실효 리스크 산출
-- [ ] VaR (Value at Risk) 계산 — 95% 신뢰 구간
+#### 1. MOC 백테스트 스크립트
+- [ ] `scripts/backtest_moc.py` — 합성 데이터로 MOC 전략 파라미터 최적화
+- [ ] 5개 주식 (AAPL, MSFT, TSLA, NVDA, AMZN) 대상 90일 백테스트
+- [ ] RSI 범위, 손절률, EMA period 그리드 서치
+- [ ] Makefile target: `make backtest-moc`
-#### 4-2. 동적 포지션 축소
-- [ ] Drawdown이 일정 수준 넘으면 포지션 크기 자동 축소
-- [ ] 연속 손실 N회 시 거래 일시 중단 → Telegram 알림
-- [ ] 시간대별 리스크 조정 (주말, 공휴일 축소)
+#### 2. 실제 데이터 백테스트
+- [ ] Alpaca API로 과거 데이터 다운로드 → DB 저장
+- [ ] 실제 데이터 기반 MOC 전략 검증
+- [ ] Walk-forward analysis로 과적합 확인
-#### 4-3. 시나리오 분석
-- [ ] 과거 극단 이벤트 (FTX 사태, Luna 등)에 대한 포트폴리오 영향 시뮬레이션
-- [ ] 유동성 리스크 체크 (주문 크기 vs 호가창 깊이)
+#### 3. Paper Trading 배포
+- [ ] `.env` 설정 (ALPACA_PAPER=true)
+- [ ] `make up` → 전 서비스 실행
+- [ ] 2-4주 모의 매매 → 실제 성과 확인
+- [ ] Telegram 알림으로 매일 결과 수신
---
-## Priority & Effort
+### 개선 사항
-| Phase | 내용 | 예상 작업량 | 영향 |
-|-------|------|------------|------|
-| **Phase 1** | 백테스터 현실화 | 1-2일 | **최대** — 이것 없이는 전략 평가 불가 |
-| **Phase 2** | 전략 공통 인프라 | 1-2일 | **높음** — 모든 전략의 기반 |
-| **Phase 3** | 개별 전략 고도화 | 3-5일 | 중간 — 수익률 직접 개선 |
-| **Phase 4** | 리스크 관리 고도화 | 2-3일 | 높음 — 손실 방지 |
+#### 4. Claude 스크리너 고도화
+- [ ] SEC 공시 분석 (10-K, 10-Q, 8-K)
+- [ ] 실적 서프라이즈 감지 (EPS beat/miss)
+- [ ] 섹터 로테이션 분석
+- [ ] 뉴스 감성 분석 (Yahoo Finance, MarketWatch)
-**권장 순서: Phase 1 → Phase 2 → Phase 4 → Phase 3**
-(전략 개선보다 리스크 관리가 더 중요 — 돈을 벌기 전에 잃지 않는 게 먼저)
+#### 5. 주문 유형 확장
+- [ ] Limit order 지원
+- [ ] 프리마켓/애프터마켓 주문
+- [ ] 분할 매수/매도
+
+#### 6. 추가 전략
+- [ ] ORB (Opening Range Breakout) — 장 시작 30분 전략
+- [ ] Gap & Go — 갭 상승 종목 추격 전략
+- [ ] Earnings Play — 실적 발표 전후 전략
+
+#### 7. 리스크 관리 개선
+- [ ] 섹터 집중도 제한 (같은 섹터 3개 이상 금지)
+- [ ] 실적 발표일 매매 회피
+- [ ] 시장 전체 하락 시 매매 중단 (SPY RSI 기반)
---
-## Previously Completed (Infrastructure)
+## Quick Start
+
+```bash
+# 1. 환경 설정
+cp .env.example .env
+# ALPACA_API_KEY, ALPACA_API_SECRET 입력
+# ANTHROPIC_API_KEY 입력 (Claude 스크리너용)
+
+# 2. 의존성 설치
+pip install -e shared/
+
+# 3. 인프라 실행
+make infra # Redis + PostgreSQL
+make migrate # DB 마이그레이션
+
+# 4. 테스트
+make test # 375 tests
+
+# 5. 종목 스크리닝
+make screen # Claude가 33개 종목 분석 → Top 5 추천
+
+# 6. 서비스 실행
+make up # 전 서비스 시작 (paper trading)
+
+# 7. 모니터링
+docker compose --profile monitoring up -d
+# Grafana: http://localhost:3000
+# API: http://localhost:8000/api/v1/strategies
+
+# 8. CLI
+trading strategy list
+trading backtest run --strategy moc --symbol AAPL --timeframe 1Day
+trading portfolio show
+```
+
+---
-모든 인프라 항목 완료 (27개): SQLAlchemy, Alembic, structlog, Telegram, Prometheus/Grafana/Loki, CI/CD, FastAPI, multi-exchange, Redis consumer groups, realized PnL, bearer auth, 298 tests.
+## Completed (Infrastructure)
+
+- [x] Alpaca API 클라이언트 (paper + live)
+- [x] MOC 전략 (종가 매수 / 시가 매도)
+- [x] Claude 종목 스크리너 (33개 유니버스)
+- [x] Data collector (Alpaca REST polling)
+- [x] Order executor (Alpaca submit_order)
+- [x] SQLAlchemy ORM + Alembic 마이그레이션
+- [x] structlog 구조화 로깅
+- [x] Telegram 알림
+- [x] Retry + Circuit Breaker
+- [x] Prometheus + Grafana + Loki
+- [x] Redis consumer groups
+- [x] Portfolio snapshots + realized PnL
+- [x] Bearer token auth
+- [x] CI/CD (Gitea Actions)
+- [x] E2E test script
+- [x] FastAPI REST API
+- [x] 백테스터 (슬리피지, 수수료, SL/TP, 공매도, walk-forward)
+- [x] 기술 지표 라이브러리 (ATR, ADX, RSI, MACD, Bollinger, Stochastic, OBV)
+- [x] 포트폴리오 VaR, 상관관계, drawdown 기반 리스크
+- [x] 375 tests, lint clean
diff --git a/docs/superpowers/plans/2026-04-02-news-driven-stock-selector.md b/docs/superpowers/plans/2026-04-02-news-driven-stock-selector.md
new file mode 100644
index 0000000..0964f21
--- /dev/null
+++ b/docs/superpowers/plans/2026-04-02-news-driven-stock-selector.md
@@ -0,0 +1,3689 @@
+# News-Driven Stock Selector Implementation Plan
+
+> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
+
+**Goal:** Replace the MOC strategy's fixed symbol list with a dynamic, news-driven stock selection system that continuously collects news/sentiment data and selects 2-3 optimal stocks daily before market close.
+
+**Architecture:** A new `news-collector` service runs 7 data source collectors on individual poll intervals, storing `NewsItem` records in PostgreSQL and publishing to Redis. A sentiment aggregator computes per-symbol composite scores every 15 minutes. Before market close, a 3-stage stock selector (sentiment candidates → technical filter → LLM final pick) chooses 2-3 stocks and feeds them to the existing MOC strategy.
+
+**Tech Stack:** Python 3.12+, asyncio, aiohttp, Pydantic, SQLAlchemy 2.0 async, Redis Streams, VADER (nltk), feedparser (RSS), Anthropic SDK (Claude API), Alembic
+
+---
+
+## Phase 1: Shared Foundation (Models, DB, Events)
+
+### Task 1: Add NewsItem and sentiment models to shared
+
+**Files:**
+- Modify: `shared/src/shared/models.py`
+- Create: `shared/src/shared/sentiment_models.py`
+- Create: `shared/tests/test_sentiment_models.py`
+
+- [ ] **Step 1: Write tests for new models**
+
+Create `shared/tests/test_sentiment_models.py`:
+
+```python
+"""Tests for news and sentiment models."""
+
+import pytest
+from datetime import datetime, timezone
+
+from shared.models import NewsCategory, NewsItem, OrderSide
+from shared.sentiment_models import SymbolScore, MarketSentiment, SelectedStock, Candidate
+
+
+def test_news_item_defaults():
+ item = NewsItem(
+ source="finnhub",
+ headline="Test headline",
+ published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ sentiment=0.5,
+ category=NewsCategory.MACRO,
+ )
+ assert item.id # UUID generated
+ assert item.symbols == []
+ assert item.summary is None
+ assert item.raw_data == {}
+ assert item.created_at is not None
+
+
+def test_news_item_with_symbols():
+ item = NewsItem(
+ source="rss",
+ headline="AAPL earnings beat",
+ published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ sentiment=0.8,
+ category=NewsCategory.EARNINGS,
+ symbols=["AAPL"],
+ )
+ assert item.symbols == ["AAPL"]
+ assert item.category == NewsCategory.EARNINGS
+
+
+def test_news_category_values():
+ assert NewsCategory.POLICY == "policy"
+ assert NewsCategory.EARNINGS == "earnings"
+ assert NewsCategory.MACRO == "macro"
+ assert NewsCategory.SOCIAL == "social"
+ assert NewsCategory.FILING == "filing"
+ assert NewsCategory.FED == "fed"
+
+
+def test_symbol_score():
+ score = SymbolScore(
+ symbol="AAPL",
+ news_score=0.5,
+ news_count=10,
+ social_score=0.3,
+ policy_score=0.0,
+ filing_score=0.2,
+ composite=0.3,
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ assert score.symbol == "AAPL"
+ assert score.composite == 0.3
+
+
+def test_market_sentiment():
+ ms = MarketSentiment(
+ fear_greed=25,
+ fear_greed_label="Extreme Fear",
+ vix=32.5,
+ fed_stance="hawkish",
+ market_regime="risk_off",
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ assert ms.market_regime == "risk_off"
+ assert ms.vix == 32.5
+
+
+def test_market_sentiment_no_vix():
+ ms = MarketSentiment(
+ fear_greed=50,
+ fear_greed_label="Neutral",
+ fed_stance="neutral",
+ market_regime="neutral",
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ assert ms.vix is None
+
+
+def test_selected_stock():
+ ss = SelectedStock(
+ symbol="NVDA",
+ side=OrderSide.BUY,
+ conviction=0.85,
+ reason="CHIPS Act expansion",
+ key_news=["Trump signs CHIPS Act expansion"],
+ )
+ assert ss.conviction == 0.85
+ assert len(ss.key_news) == 1
+
+
+def test_candidate():
+ c = Candidate(
+ symbol="TSLA",
+ source="sentiment",
+ direction=OrderSide.BUY,
+ score=0.75,
+ reason="High social buzz",
+ )
+ assert c.direction == OrderSide.BUY
+
+ c2 = Candidate(
+ symbol="XOM",
+ source="llm",
+ score=0.6,
+ reason="Oil price surge",
+ )
+ assert c2.direction is None
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest shared/tests/test_sentiment_models.py -v`
+Expected: FAIL — `NewsCategory`, `NewsItem` not found in `shared.models`, `shared.sentiment_models` does not exist
+
+- [ ] **Step 3: Add NewsCategory and NewsItem to shared/models.py**
+
+Add to the end of `shared/src/shared/models.py`:
+
+```python
+class NewsCategory(str, Enum):
+ POLICY = "policy"
+ EARNINGS = "earnings"
+ MACRO = "macro"
+ SOCIAL = "social"
+ FILING = "filing"
+ FED = "fed"
+
+
+class NewsItem(BaseModel):
+ id: str = Field(default_factory=lambda: str(uuid.uuid4()))
+ source: str
+ headline: str
+ summary: Optional[str] = None
+ url: Optional[str] = None
+ published_at: datetime
+ symbols: list[str] = []
+ sentiment: float
+ category: NewsCategory
+ raw_data: dict = {}
+ created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
+```
+
+- [ ] **Step 4: Create shared/src/shared/sentiment_models.py**
+
+```python
+"""Sentiment scoring and stock selection models."""
+
+from datetime import datetime
+from typing import Optional
+
+from pydantic import BaseModel
+
+from shared.models import OrderSide
+
+
+class SymbolScore(BaseModel):
+ symbol: str
+ news_score: float
+ news_count: int
+ social_score: float
+ policy_score: float
+ filing_score: float
+ composite: float
+ updated_at: datetime
+
+
+class MarketSentiment(BaseModel):
+ fear_greed: int
+ fear_greed_label: str
+ vix: Optional[float] = None
+ fed_stance: str
+ market_regime: str
+ updated_at: datetime
+
+
+class SelectedStock(BaseModel):
+ symbol: str
+ side: OrderSide
+ conviction: float
+ reason: str
+ key_news: list[str]
+
+
+class Candidate(BaseModel):
+ symbol: str
+ source: str
+ direction: Optional[OrderSide] = None
+ score: float
+ reason: str
+```
+
+- [ ] **Step 5: Run tests to verify they pass**
+
+Run: `pytest shared/tests/test_sentiment_models.py -v`
+Expected: All 9 tests PASS
+
+- [ ] **Step 6: Commit**
+
+```bash
+git add shared/src/shared/models.py shared/src/shared/sentiment_models.py shared/tests/test_sentiment_models.py
+git commit -m "feat: add NewsItem, sentiment scoring, and stock selection models"
+```
+
+---
+
+### Task 2: Add SQLAlchemy ORM models for news tables
+
+**Files:**
+- Modify: `shared/src/shared/sa_models.py`
+- Create: `shared/tests/test_sa_news_models.py`
+
+- [ ] **Step 1: Write tests for new SA models**
+
+Create `shared/tests/test_sa_news_models.py`:
+
+```python
+"""Tests for news-related SQLAlchemy models."""
+
+from shared.sa_models import NewsItemRow, SymbolScoreRow, MarketSentimentRow, StockSelectionRow
+
+
+def test_news_item_row_tablename():
+ assert NewsItemRow.__tablename__ == "news_items"
+
+
+def test_symbol_score_row_tablename():
+ assert SymbolScoreRow.__tablename__ == "symbol_scores"
+
+
+def test_market_sentiment_row_tablename():
+ assert MarketSentimentRow.__tablename__ == "market_sentiment"
+
+
+def test_stock_selection_row_tablename():
+ assert StockSelectionRow.__tablename__ == "stock_selections"
+
+
+def test_news_item_row_columns():
+ cols = {c.name for c in NewsItemRow.__table__.columns}
+ assert cols >= {"id", "source", "headline", "published_at", "sentiment", "category"}
+
+
+def test_symbol_score_row_columns():
+ cols = {c.name for c in SymbolScoreRow.__table__.columns}
+ assert cols >= {"id", "symbol", "news_score", "composite", "updated_at"}
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest shared/tests/test_sa_news_models.py -v`
+Expected: FAIL — import errors
+
+- [ ] **Step 3: Add ORM models to sa_models.py**
+
+Add to the end of `shared/src/shared/sa_models.py`:
+
+```python
+class NewsItemRow(Base):
+ __tablename__ = "news_items"
+
+ id: Mapped[str] = mapped_column(Text, primary_key=True)
+ source: Mapped[str] = mapped_column(Text, nullable=False)
+ headline: Mapped[str] = mapped_column(Text, nullable=False)
+ summary: Mapped[str | None] = mapped_column(Text)
+ url: Mapped[str | None] = mapped_column(Text)
+ published_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
+ symbols: Mapped[str | None] = mapped_column(Text) # JSON-encoded list
+ sentiment: Mapped[float] = mapped_column(sa.Float, nullable=False)
+ category: Mapped[str] = mapped_column(Text, nullable=False)
+ raw_data: Mapped[str | None] = mapped_column(Text) # JSON string
+ created_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True), nullable=False, server_default=sa.func.now()
+ )
+
+
+class SymbolScoreRow(Base):
+ __tablename__ = "symbol_scores"
+
+ id: Mapped[str] = mapped_column(Text, primary_key=True)
+ symbol: Mapped[str] = mapped_column(Text, nullable=False, unique=True)
+ news_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ news_count: Mapped[int] = mapped_column(sa.Integer, nullable=False, server_default="0")
+ social_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ policy_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ filing_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ composite: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
+
+
+class MarketSentimentRow(Base):
+ __tablename__ = "market_sentiment"
+
+ id: Mapped[str] = mapped_column(Text, primary_key=True)
+ fear_greed: Mapped[int] = mapped_column(sa.Integer, nullable=False)
+ fear_greed_label: Mapped[str] = mapped_column(Text, nullable=False)
+ vix: Mapped[float | None] = mapped_column(sa.Float)
+ fed_stance: Mapped[str] = mapped_column(Text, nullable=False, server_default="neutral")
+ market_regime: Mapped[str] = mapped_column(Text, nullable=False, server_default="neutral")
+ updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
+
+
+class StockSelectionRow(Base):
+ __tablename__ = "stock_selections"
+
+ id: Mapped[str] = mapped_column(Text, primary_key=True)
+ trade_date: Mapped[datetime] = mapped_column(sa.Date, nullable=False)
+ symbol: Mapped[str] = mapped_column(Text, nullable=False)
+ side: Mapped[str] = mapped_column(Text, nullable=False)
+ conviction: Mapped[float] = mapped_column(sa.Float, nullable=False)
+ reason: Mapped[str] = mapped_column(Text, nullable=False)
+ key_news: Mapped[str | None] = mapped_column(Text) # JSON string
+ sentiment_snapshot: Mapped[str | None] = mapped_column(Text) # JSON string
+ created_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True), nullable=False, server_default=sa.func.now()
+ )
+```
+
+Also add `import sqlalchemy as sa` to the imports at the top of `sa_models.py`.
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest shared/tests/test_sa_news_models.py -v`
+Expected: All 6 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add shared/src/shared/sa_models.py shared/tests/test_sa_news_models.py
+git commit -m "feat: add SQLAlchemy ORM models for news, scores, selections"
+```
+
+---
+
+### Task 3: Create Alembic migration for news tables
+
+**Files:**
+- Create: `shared/alembic/versions/002_news_sentiment_tables.py`
+
+- [ ] **Step 1: Create migration file**
+
+Create `shared/alembic/versions/002_news_sentiment_tables.py`:
+
+```python
+"""Add news, sentiment, and stock selection tables
+
+Revision ID: 002
+Revises: 001
+Create Date: 2026-04-02
+"""
+
+from typing import Sequence, Union
+
+from alembic import op
+import sqlalchemy as sa
+
+revision: str = "002"
+down_revision: Union[str, None] = "001"
+branch_labels: Union[str, Sequence[str], None] = None
+depends_on: Union[str, Sequence[str], None] = None
+
+
+def upgrade() -> None:
+ op.create_table(
+ "news_items",
+ sa.Column("id", sa.Text, primary_key=True),
+ sa.Column("source", sa.Text, nullable=False),
+ sa.Column("headline", sa.Text, nullable=False),
+ sa.Column("summary", sa.Text),
+ sa.Column("url", sa.Text),
+ sa.Column("published_at", sa.DateTime(timezone=True), nullable=False),
+ sa.Column("symbols", sa.Text),
+ sa.Column("sentiment", sa.Float, nullable=False),
+ sa.Column("category", sa.Text, nullable=False),
+ sa.Column("raw_data", sa.Text),
+ sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
+ )
+ op.create_index("idx_news_items_published", "news_items", ["published_at"])
+ op.create_index("idx_news_items_source", "news_items", ["source"])
+
+ op.create_table(
+ "symbol_scores",
+ sa.Column("id", sa.Text, primary_key=True),
+ sa.Column("symbol", sa.Text, nullable=False, unique=True),
+ sa.Column("news_score", sa.Float, nullable=False, server_default="0"),
+ sa.Column("news_count", sa.Integer, nullable=False, server_default="0"),
+ sa.Column("social_score", sa.Float, nullable=False, server_default="0"),
+ sa.Column("policy_score", sa.Float, nullable=False, server_default="0"),
+ sa.Column("filing_score", sa.Float, nullable=False, server_default="0"),
+ sa.Column("composite", sa.Float, nullable=False, server_default="0"),
+ sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
+ )
+
+ op.create_table(
+ "market_sentiment",
+ sa.Column("id", sa.Text, primary_key=True),
+ sa.Column("fear_greed", sa.Integer, nullable=False),
+ sa.Column("fear_greed_label", sa.Text, nullable=False),
+ sa.Column("vix", sa.Float),
+ sa.Column("fed_stance", sa.Text, nullable=False, server_default="neutral"),
+ sa.Column("market_regime", sa.Text, nullable=False, server_default="neutral"),
+ sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
+ )
+
+ op.create_table(
+ "stock_selections",
+ sa.Column("id", sa.Text, primary_key=True),
+ sa.Column("trade_date", sa.Date, nullable=False),
+ sa.Column("symbol", sa.Text, nullable=False),
+ sa.Column("side", sa.Text, nullable=False),
+ sa.Column("conviction", sa.Float, nullable=False),
+ sa.Column("reason", sa.Text, nullable=False),
+ sa.Column("key_news", sa.Text),
+ sa.Column("sentiment_snapshot", sa.Text),
+ sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
+ )
+ op.create_index("idx_stock_selections_date", "stock_selections", ["trade_date"])
+
+
+def downgrade() -> None:
+ op.drop_table("stock_selections")
+ op.drop_table("market_sentiment")
+ op.drop_table("symbol_scores")
+ op.drop_table("news_items")
+```
+
+- [ ] **Step 2: Verify migration imports correctly**
+
+Run: `cd shared && python -c "from alembic.versions import *; print('OK')" && cd ..`
+Or simply: `python -c "import importlib.util; s=importlib.util.spec_from_file_location('m','shared/alembic/versions/002_news_sentiment_tables.py'); m=importlib.util.module_from_spec(s); s.loader.exec_module(m); print('OK')"`
+Expected: OK (no import errors)
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add shared/alembic/versions/002_news_sentiment_tables.py
+git commit -m "feat: add Alembic migration for news and sentiment tables"
+```
+
+---
+
+### Task 4: Add NewsEvent to shared events and DB methods for news
+
+**Files:**
+- Modify: `shared/src/shared/events.py`
+- Modify: `shared/src/shared/db.py`
+- Create: `shared/tests/test_news_events.py`
+- Create: `shared/tests/test_db_news.py`
+
+- [ ] **Step 1: Write tests for NewsEvent**
+
+Create `shared/tests/test_news_events.py`:
+
+```python
+"""Tests for NewsEvent."""
+
+from datetime import datetime, timezone
+
+from shared.models import NewsCategory, NewsItem
+from shared.events import NewsEvent, EventType, Event
+
+
+def test_news_event_to_dict():
+ item = NewsItem(
+ source="finnhub",
+ headline="Test",
+ published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ sentiment=0.5,
+ category=NewsCategory.MACRO,
+ )
+ event = NewsEvent(data=item)
+ d = event.to_dict()
+ assert d["type"] == EventType.NEWS
+ assert d["data"]["source"] == "finnhub"
+
+
+def test_news_event_from_raw():
+ raw = {
+ "type": "NEWS",
+ "data": {
+ "id": "abc",
+ "source": "rss",
+ "headline": "Test headline",
+ "published_at": "2026-04-02T00:00:00+00:00",
+ "sentiment": 0.3,
+ "category": "earnings",
+ "symbols": ["AAPL"],
+ "raw_data": {},
+ },
+ }
+ event = NewsEvent.from_raw(raw)
+ assert event.data.source == "rss"
+ assert event.data.symbols == ["AAPL"]
+
+
+def test_event_dispatcher_news():
+ raw = {
+ "type": "NEWS",
+ "data": {
+ "id": "abc",
+ "source": "finnhub",
+ "headline": "Test",
+ "published_at": "2026-04-02T00:00:00+00:00",
+ "sentiment": 0.0,
+ "category": "macro",
+ "raw_data": {},
+ },
+ }
+ event = Event.from_dict(raw)
+ assert isinstance(event, NewsEvent)
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest shared/tests/test_news_events.py -v`
+Expected: FAIL — `NewsEvent` not in events.py, `EventType.NEWS` missing
+
+- [ ] **Step 3: Add NewsEvent to events.py**
+
+Add `NEWS = "NEWS"` to `EventType` enum.
+
+Add `NewsEvent` class and register it in `_EVENT_TYPE_MAP`:
+
+```python
+from shared.models import Candle, Signal, Order, NewsItem
+
+class NewsEvent(BaseModel):
+ type: EventType = EventType.NEWS
+ data: NewsItem
+
+ def to_dict(self) -> dict:
+ return {
+ "type": self.type,
+ "data": self.data.model_dump(mode="json"),
+ }
+
+ @classmethod
+ def from_raw(cls, raw: dict) -> "NewsEvent":
+ return cls(type=raw["type"], data=NewsItem(**raw["data"]))
+```
+
+Add to `_EVENT_TYPE_MAP`:
+```python
+EventType.NEWS: NewsEvent,
+```
+
+- [ ] **Step 4: Run event tests to verify they pass**
+
+Run: `pytest shared/tests/test_news_events.py -v`
+Expected: All 3 tests PASS
+
+- [ ] **Step 5: Run all existing event tests to check no regressions**
+
+Run: `pytest shared/tests/test_events.py -v`
+Expected: All existing tests PASS
+
+- [ ] **Step 6: Write tests for DB news methods**
+
+Create `shared/tests/test_db_news.py`:
+
+```python
+"""Tests for database news/sentiment methods.
+
+These tests use an in-memory SQLite database.
+"""
+
+import json
+import uuid
+import pytest
+from datetime import datetime, date, timezone
+
+from shared.db import Database
+from shared.models import NewsItem, NewsCategory
+from shared.sentiment_models import SymbolScore, MarketSentiment
+
+
+@pytest.fixture
+async def db():
+ database = Database("sqlite+aiosqlite://")
+ await database.connect()
+ yield database
+ await database.close()
+
+
+async def test_insert_and_get_news_items(db):
+ item = NewsItem(
+ source="finnhub",
+ headline="AAPL earnings beat",
+ published_at=datetime(2026, 4, 2, 12, 0, tzinfo=timezone.utc),
+ sentiment=0.8,
+ category=NewsCategory.EARNINGS,
+ symbols=["AAPL"],
+ )
+ await db.insert_news_item(item)
+ items = await db.get_recent_news(hours=24)
+ assert len(items) == 1
+ assert items[0]["headline"] == "AAPL earnings beat"
+
+
+async def test_upsert_symbol_score(db):
+ score = SymbolScore(
+ symbol="AAPL",
+ news_score=0.5,
+ news_count=10,
+ social_score=0.3,
+ policy_score=0.0,
+ filing_score=0.2,
+ composite=0.3,
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ await db.upsert_symbol_score(score)
+ scores = await db.get_top_symbol_scores(limit=5)
+ assert len(scores) == 1
+ assert scores[0]["symbol"] == "AAPL"
+
+
+async def test_upsert_market_sentiment(db):
+ ms = MarketSentiment(
+ fear_greed=55,
+ fear_greed_label="Neutral",
+ vix=18.2,
+ fed_stance="neutral",
+ market_regime="neutral",
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ await db.upsert_market_sentiment(ms)
+ result = await db.get_latest_market_sentiment()
+ assert result is not None
+ assert result["fear_greed"] == 55
+
+
+async def test_insert_stock_selection(db):
+ await db.insert_stock_selection(
+ trade_date=date(2026, 4, 2),
+ symbol="NVDA",
+ side="BUY",
+ conviction=0.85,
+ reason="CHIPS Act",
+ key_news=["Trump signs CHIPS expansion"],
+ sentiment_snapshot={"composite": 0.8},
+ )
+ selections = await db.get_stock_selections(date(2026, 4, 2))
+ assert len(selections) == 1
+ assert selections[0]["symbol"] == "NVDA"
+```
+
+- [ ] **Step 7: Run DB news tests to verify they fail**
+
+Run: `pytest shared/tests/test_db_news.py -v`
+Expected: FAIL — methods not yet on Database class
+
+- [ ] **Step 8: Add news/sentiment DB methods to db.py**
+
+Add to `shared/src/shared/db.py` — new import at top:
+
+```python
+import json
+import uuid
+from datetime import date
+from shared.models import NewsItem
+from shared.sentiment_models import SymbolScore, MarketSentiment
+from shared.sa_models import NewsItemRow, SymbolScoreRow, MarketSentimentRow, StockSelectionRow
+```
+
+Add these methods to the `Database` class:
+
+```python
+ async def insert_news_item(self, item: NewsItem) -> None:
+ """Insert a news item."""
+ row = NewsItemRow(
+ id=item.id,
+ source=item.source,
+ headline=item.headline,
+ summary=item.summary,
+ url=item.url,
+ published_at=item.published_at,
+ symbols=json.dumps(item.symbols),
+ sentiment=item.sentiment,
+ category=item.category.value,
+ raw_data=json.dumps(item.raw_data),
+ created_at=item.created_at,
+ )
+ async with self._session_factory() as session:
+ try:
+ session.add(row)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
+
+ async def get_recent_news(self, hours: int = 24) -> list[dict]:
+ """Retrieve news items from the last N hours."""
+ since = datetime.now(timezone.utc) - timedelta(hours=hours)
+ stmt = (
+ select(NewsItemRow)
+ .where(NewsItemRow.published_at >= since)
+ .order_by(NewsItemRow.published_at.desc())
+ )
+ async with self._session_factory() as session:
+ result = await session.execute(stmt)
+ rows = result.scalars().all()
+ return [
+ {
+ "id": r.id,
+ "source": r.source,
+ "headline": r.headline,
+ "summary": r.summary,
+ "url": r.url,
+ "published_at": r.published_at,
+ "symbols": json.loads(r.symbols) if r.symbols else [],
+ "sentiment": r.sentiment,
+ "category": r.category,
+ "created_at": r.created_at,
+ }
+ for r in rows
+ ]
+
+ async def upsert_symbol_score(self, score: SymbolScore) -> None:
+ """Insert or update a symbol score."""
+ async with self._session_factory() as session:
+ try:
+ existing = await session.execute(
+ select(SymbolScoreRow).where(SymbolScoreRow.symbol == score.symbol)
+ )
+ row = existing.scalar_one_or_none()
+ if row:
+ row.news_score = score.news_score
+ row.news_count = score.news_count
+ row.social_score = score.social_score
+ row.policy_score = score.policy_score
+ row.filing_score = score.filing_score
+ row.composite = score.composite
+ row.updated_at = score.updated_at
+ else:
+ row = SymbolScoreRow(
+ id=str(uuid.uuid4()),
+ symbol=score.symbol,
+ news_score=score.news_score,
+ news_count=score.news_count,
+ social_score=score.social_score,
+ policy_score=score.policy_score,
+ filing_score=score.filing_score,
+ composite=score.composite,
+ updated_at=score.updated_at,
+ )
+ session.add(row)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
+
+ async def get_top_symbol_scores(self, limit: int = 20) -> list[dict]:
+ """Get top symbol scores ordered by composite descending."""
+ stmt = (
+ select(SymbolScoreRow)
+ .order_by(SymbolScoreRow.composite.desc())
+ .limit(limit)
+ )
+ async with self._session_factory() as session:
+ result = await session.execute(stmt)
+ rows = result.scalars().all()
+ return [
+ {
+ "symbol": r.symbol,
+ "news_score": r.news_score,
+ "news_count": r.news_count,
+ "social_score": r.social_score,
+ "policy_score": r.policy_score,
+ "filing_score": r.filing_score,
+ "composite": r.composite,
+ "updated_at": r.updated_at,
+ }
+ for r in rows
+ ]
+
+ async def upsert_market_sentiment(self, ms: MarketSentiment) -> None:
+ """Insert or update the latest market sentiment (single row, id='latest')."""
+ async with self._session_factory() as session:
+ try:
+ existing = await session.execute(
+ select(MarketSentimentRow).where(MarketSentimentRow.id == "latest")
+ )
+ row = existing.scalar_one_or_none()
+ if row:
+ row.fear_greed = ms.fear_greed
+ row.fear_greed_label = ms.fear_greed_label
+ row.vix = ms.vix
+ row.fed_stance = ms.fed_stance
+ row.market_regime = ms.market_regime
+ row.updated_at = ms.updated_at
+ else:
+ row = MarketSentimentRow(
+ id="latest",
+ fear_greed=ms.fear_greed,
+ fear_greed_label=ms.fear_greed_label,
+ vix=ms.vix,
+ fed_stance=ms.fed_stance,
+ market_regime=ms.market_regime,
+ updated_at=ms.updated_at,
+ )
+ session.add(row)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
+
+ async def get_latest_market_sentiment(self) -> dict | None:
+ """Get the latest market sentiment."""
+ stmt = select(MarketSentimentRow).where(MarketSentimentRow.id == "latest")
+ async with self._session_factory() as session:
+ result = await session.execute(stmt)
+ r = result.scalar_one_or_none()
+ if r is None:
+ return None
+ return {
+ "fear_greed": r.fear_greed,
+ "fear_greed_label": r.fear_greed_label,
+ "vix": r.vix,
+ "fed_stance": r.fed_stance,
+ "market_regime": r.market_regime,
+ "updated_at": r.updated_at,
+ }
+
+ async def insert_stock_selection(
+ self,
+ trade_date: date,
+ symbol: str,
+ side: str,
+ conviction: float,
+ reason: str,
+ key_news: list[str],
+ sentiment_snapshot: dict,
+ ) -> None:
+ """Insert a stock selection record."""
+ row = StockSelectionRow(
+ id=str(uuid.uuid4()),
+ trade_date=trade_date,
+ symbol=symbol,
+ side=side,
+ conviction=conviction,
+ reason=reason,
+ key_news=json.dumps(key_news),
+ sentiment_snapshot=json.dumps(sentiment_snapshot),
+ )
+ async with self._session_factory() as session:
+ try:
+ session.add(row)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
+
+ async def get_stock_selections(self, trade_date: date) -> list[dict]:
+ """Get stock selections for a specific date."""
+ stmt = (
+ select(StockSelectionRow)
+ .where(StockSelectionRow.trade_date == trade_date)
+ .order_by(StockSelectionRow.conviction.desc())
+ )
+ async with self._session_factory() as session:
+ result = await session.execute(stmt)
+ rows = result.scalars().all()
+ return [
+ {
+ "symbol": r.symbol,
+ "side": r.side,
+ "conviction": r.conviction,
+ "reason": r.reason,
+ "key_news": json.loads(r.key_news) if r.key_news else [],
+ "sentiment_snapshot": json.loads(r.sentiment_snapshot) if r.sentiment_snapshot else {},
+ }
+ for r in rows
+ ]
+```
+
+- [ ] **Step 9: Run DB news tests to verify they pass**
+
+Run: `pytest shared/tests/test_db_news.py -v`
+Expected: All 4 tests PASS
+
+Note: These tests require `aiosqlite` package. If not installed: `pip install aiosqlite`
+
+- [ ] **Step 10: Run all shared tests to check no regressions**
+
+Run: `pytest shared/tests/ -v`
+Expected: All tests PASS
+
+- [ ] **Step 11: Commit**
+
+```bash
+git add shared/src/shared/events.py shared/src/shared/db.py shared/tests/test_news_events.py shared/tests/test_db_news.py
+git commit -m "feat: add NewsEvent, DB methods for news/sentiment/selections"
+```
+
+---
+
+### Task 5: Update Settings with new env vars
+
+**Files:**
+- Modify: `shared/src/shared/config.py`
+- Modify: `.env.example`
+
+- [ ] **Step 1: Add new settings to config.py**
+
+Add these fields to the `Settings` class in `shared/src/shared/config.py`:
+
+```python
+ # News collector
+ finnhub_api_key: str = ""
+ news_poll_interval: int = 300
+ sentiment_aggregate_interval: int = 900
+ # Stock selector
+ selector_candidates_time: str = "15:00"
+ selector_filter_time: str = "15:15"
+ selector_final_time: str = "15:30"
+ selector_max_picks: int = 3
+ # LLM
+ anthropic_api_key: str = ""
+ anthropic_model: str = "claude-sonnet-4-20250514"
+```
+
+- [ ] **Step 2: Add to .env.example**
+
+Append to `.env.example`:
+
+```bash
+
+# News Collector
+FINNHUB_API_KEY=
+NEWS_POLL_INTERVAL=300
+SENTIMENT_AGGREGATE_INTERVAL=900
+
+# Stock Selector
+SELECTOR_CANDIDATES_TIME=15:00
+SELECTOR_FILTER_TIME=15:15
+SELECTOR_FINAL_TIME=15:30
+SELECTOR_MAX_PICKS=3
+
+# LLM (for stock selector)
+ANTHROPIC_API_KEY=
+ANTHROPIC_MODEL=claude-sonnet-4-20250514
+```
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add shared/src/shared/config.py .env.example
+git commit -m "feat: add news collector and stock selector config settings"
+```
+
+---
+
+## Phase 2: News Collector Service
+
+### Task 6: Scaffold news-collector service
+
+**Files:**
+- Create: `services/news-collector/pyproject.toml`
+- Create: `services/news-collector/Dockerfile`
+- Create: `services/news-collector/src/news_collector/__init__.py`
+- Create: `services/news-collector/src/news_collector/config.py`
+- Create: `services/news-collector/src/news_collector/collectors/__init__.py`
+- Create: `services/news-collector/src/news_collector/collectors/base.py`
+- Create: `services/news-collector/tests/__init__.py`
+
+- [ ] **Step 1: Create pyproject.toml**
+
+Create `services/news-collector/pyproject.toml`:
+
+```toml
+[project]
+name = "news-collector"
+version = "0.1.0"
+description = "News and sentiment data collector service"
+requires-python = ">=3.12"
+dependencies = [
+ "trading-shared",
+ "feedparser>=6.0",
+ "nltk>=3.8",
+ "aiohttp>=3.9",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest>=8.0",
+ "pytest-asyncio>=0.23",
+ "aioresponses>=0.7",
+]
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/news_collector"]
+```
+
+- [ ] **Step 2: Create Dockerfile**
+
+Create `services/news-collector/Dockerfile`:
+
+```dockerfile
+FROM python:3.12-slim
+WORKDIR /app
+COPY shared/ shared/
+RUN pip install --no-cache-dir ./shared
+COPY services/news-collector/ services/news-collector/
+RUN pip install --no-cache-dir ./services/news-collector
+RUN python -c "import nltk; nltk.download('vader_lexicon', quiet=True)"
+ENV PYTHONPATH=/app
+CMD ["python", "-m", "news_collector.main"]
+```
+
+- [ ] **Step 3: Create config.py**
+
+Create `services/news-collector/src/news_collector/config.py`:
+
+```python
+"""News Collector configuration."""
+
+from shared.config import Settings
+
+
+class NewsCollectorConfig(Settings):
+ health_port: int = 8084
+ finnhub_api_key: str = ""
+ news_poll_interval: int = 300
+ sentiment_aggregate_interval: int = 900
+```
+
+- [ ] **Step 4: Create BaseCollector**
+
+Create `services/news-collector/src/news_collector/collectors/base.py`:
+
+```python
+"""Base class for all news collectors."""
+
+from abc import ABC, abstractmethod
+
+from shared.models import NewsItem
+
+
+class BaseCollector(ABC):
+ name: str = "base"
+ poll_interval: int = 300 # seconds
+
+ @abstractmethod
+ async def collect(self) -> list[NewsItem]:
+ """Collect news items from the source."""
+
+ @abstractmethod
+ async def is_available(self) -> bool:
+ """Check if this data source is accessible."""
+```
+
+- [ ] **Step 5: Create __init__.py files**
+
+Create `services/news-collector/src/news_collector/__init__.py`:
+```python
+"""News collector service."""
+```
+
+Create `services/news-collector/src/news_collector/collectors/__init__.py`:
+```python
+"""News collectors."""
+```
+
+Create `services/news-collector/tests/__init__.py`:
+```python
+```
+
+- [ ] **Step 6: Commit**
+
+```bash
+git add services/news-collector/
+git commit -m "feat: scaffold news-collector service with BaseCollector"
+```
+
+---
+
+### Task 7: Implement Finnhub news collector
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/finnhub.py`
+- Create: `services/news-collector/tests/test_finnhub.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_finnhub.py`:
+
+```python
+"""Tests for Finnhub news collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+from datetime import datetime, timezone
+
+from news_collector.collectors.finnhub import FinnhubCollector
+
+
+@pytest.fixture
+def collector():
+ return FinnhubCollector(api_key="test_key")
+
+
+def test_collector_name(collector):
+ assert collector.name == "finnhub"
+ assert collector.poll_interval == 300
+
+
+async def test_is_available_with_key(collector):
+ assert await collector.is_available() is True
+
+
+async def test_is_available_without_key():
+ c = FinnhubCollector(api_key="")
+ assert await c.is_available() is False
+
+
+async def test_collect_parses_response(collector):
+ mock_response = [
+ {
+ "category": "top news",
+ "datetime": 1711929600,
+ "headline": "AAPL beats earnings",
+ "id": 12345,
+ "related": "AAPL",
+ "source": "MarketWatch",
+ "summary": "Apple reported better than expected...",
+ "url": "https://example.com/article",
+ },
+ {
+ "category": "top news",
+ "datetime": 1711929000,
+ "headline": "Fed holds rates steady",
+ "id": 12346,
+ "related": "",
+ "source": "Reuters",
+ "summary": "The Federal Reserve...",
+ "url": "https://example.com/fed",
+ },
+ ]
+
+ with patch.object(collector, "_fetch_news", new_callable=AsyncMock, return_value=mock_response):
+ items = await collector.collect()
+
+ assert len(items) == 2
+ assert items[0].source == "finnhub"
+ assert items[0].headline == "AAPL beats earnings"
+ assert items[0].symbols == ["AAPL"]
+ assert items[0].url == "https://example.com/article"
+ assert isinstance(items[0].sentiment, float)
+ # Second item has no related ticker
+ assert items[1].symbols == []
+
+
+async def test_collect_handles_empty_response(collector):
+ with patch.object(collector, "_fetch_news", new_callable=AsyncMock, return_value=[]):
+ items = await collector.collect()
+ assert items == []
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_finnhub.py -v`
+Expected: FAIL — module not found
+
+- [ ] **Step 3: Implement FinnhubCollector**
+
+Create `services/news-collector/src/news_collector/collectors/finnhub.py`:
+
+```python
+"""Finnhub market news collector (free tier: 60 req/min)."""
+
+import logging
+from datetime import datetime, timezone
+
+import aiohttp
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+FINNHUB_NEWS_URL = "https://finnhub.io/api/v1/news"
+
+
+class FinnhubCollector(BaseCollector):
+ name = "finnhub"
+ poll_interval = 300 # 5 minutes
+
+ def __init__(self, api_key: str) -> None:
+ self._api_key = api_key
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return bool(self._api_key)
+
+ async def _fetch_news(self) -> list[dict]:
+ """Fetch general news from Finnhub API."""
+ params = {"category": "general", "token": self._api_key}
+ async with aiohttp.ClientSession() as session:
+ async with session.get(FINNHUB_NEWS_URL, params=params) as resp:
+ if resp.status != 200:
+ logger.warning("finnhub_fetch_failed", status=resp.status)
+ return []
+ return await resp.json()
+
+ def _analyze_sentiment(self, text: str) -> float:
+ """Return VADER compound score (-1.0 to 1.0)."""
+ scores = self._vader.polarity_scores(text)
+ return scores["compound"]
+
+ def _extract_symbols(self, related: str) -> list[str]:
+ """Parse Finnhub 'related' field into symbol list."""
+ if not related or not related.strip():
+ return []
+ return [s.strip() for s in related.split(",") if s.strip()]
+
+ def _categorize(self, article: dict) -> NewsCategory:
+ """Determine category from article content."""
+ headline = article.get("headline", "").lower()
+ if any(w in headline for w in ["fed", "fomc", "rate", "inflation"]):
+ return NewsCategory.FED
+ if any(w in headline for w in ["tariff", "sanction", "regulation", "trump", "biden", "congress"]):
+ return NewsCategory.POLICY
+ if any(w in headline for w in ["earnings", "revenue", "profit", "eps"]):
+ return NewsCategory.EARNINGS
+ return NewsCategory.MACRO
+
+ async def collect(self) -> list[NewsItem]:
+ raw = await self._fetch_news()
+ items = []
+ for article in raw:
+ headline = article.get("headline", "")
+ summary = article.get("summary", "")
+ sentiment_text = f"{headline}. {summary}" if summary else headline
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=headline,
+ summary=summary or None,
+ url=article.get("url"),
+ published_at=datetime.fromtimestamp(
+ article.get("datetime", 0), tz=timezone.utc
+ ),
+ symbols=self._extract_symbols(article.get("related", "")),
+ sentiment=self._analyze_sentiment(sentiment_text),
+ category=self._categorize(article),
+ raw_data=article,
+ )
+ )
+ return items
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_finnhub.py -v`
+Expected: All 5 tests PASS
+
+Note: Requires `nltk` and VADER lexicon. If not downloaded: `python -c "import nltk; nltk.download('vader_lexicon')"`
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/finnhub.py services/news-collector/tests/test_finnhub.py
+git commit -m "feat: implement Finnhub news collector with VADER sentiment"
+```
+
+---
+
+### Task 8: Implement RSS news collector
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/rss.py`
+- Create: `services/news-collector/tests/test_rss.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_rss.py`:
+
+```python
+"""Tests for RSS news collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+from datetime import datetime, timezone
+
+from news_collector.collectors.rss import RSSCollector
+
+
+@pytest.fixture
+def collector():
+ return RSSCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "rss"
+ assert collector.poll_interval == 600
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_feed(collector):
+ mock_feed = {
+ "entries": [
+ {
+ "title": "NVDA surges on AI demand",
+ "link": "https://example.com/nvda",
+ "published_parsed": (2026, 4, 2, 12, 0, 0, 0, 0, 0),
+ "summary": "Nvidia stock jumped 5%...",
+ },
+ {
+ "title": "Markets rally on jobs data",
+ "link": "https://example.com/market",
+ "published_parsed": (2026, 4, 2, 11, 0, 0, 0, 0, 0),
+ "summary": "The S&P 500 rose...",
+ },
+ ],
+ }
+
+ with patch.object(collector, "_fetch_feeds", new_callable=AsyncMock, return_value=[mock_feed]):
+ items = await collector.collect()
+
+ assert len(items) == 2
+ assert items[0].source == "rss"
+ assert items[0].headline == "NVDA surges on AI demand"
+ assert isinstance(items[0].sentiment, float)
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_rss.py -v`
+Expected: FAIL
+
+- [ ] **Step 3: Implement RSSCollector**
+
+Create `services/news-collector/src/news_collector/collectors/rss.py`:
+
+```python
+"""RSS feed collector for Yahoo Finance, Google News, MarketWatch."""
+
+import asyncio
+import logging
+import re
+from calendar import timegm
+from datetime import datetime, timezone
+
+import aiohttp
+import feedparser
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+DEFAULT_FEEDS = [
+ "https://feeds.finance.yahoo.com/rss/2.0/headline?s=^GSPC&region=US&lang=en-US",
+ "https://news.google.com/rss/topics/CAAqJggKIiBDQkFTRWdvSUwyMHZNRGx6TVdZU0FtVnVHZ0pWVXlnQVAB?hl=en-US&gl=US&ceid=US:en",
+ "https://www.marketwatch.com/rss/topstories",
+]
+
+# Common US stock tickers to detect in headlines
+TICKER_PATTERN = re.compile(
+ r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|JPM|V|JNJ|WMT|PG|UNH|HD|"
+ r"MA|DIS|BAC|XOM|PFE|KO|PEP|CSCO|INTC|VZ|NFLX|ADBE|CRM|AMD|QCOM|"
+ r"GS|BA|CAT|MMM|IBM|GE|F|GM|NKE|MCD|SBUX|SPY|QQQ|IWM)\b"
+)
+
+
+class RSSCollector(BaseCollector):
+ name = "rss"
+ poll_interval = 600 # 10 minutes
+
+ def __init__(self, feeds: list[str] | None = None) -> None:
+ self._feeds = feeds or DEFAULT_FEEDS
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_feeds(self) -> list[dict]:
+ """Fetch and parse all RSS feeds."""
+ results = []
+ async with aiohttp.ClientSession() as session:
+ for url in self._feeds:
+ try:
+ async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
+ if resp.status == 200:
+ text = await resp.text()
+ feed = feedparser.parse(text)
+ results.append(feed)
+ except Exception as exc:
+ logger.warning("rss_fetch_failed", url=url, error=str(exc))
+ return results
+
+ def _extract_symbols(self, text: str) -> list[str]:
+ """Extract stock tickers from text."""
+ return list(set(TICKER_PATTERN.findall(text)))
+
+ def _parse_time(self, entry: dict) -> datetime:
+ """Parse published time from feed entry."""
+ parsed = entry.get("published_parsed")
+ if parsed:
+ return datetime.fromtimestamp(timegm(parsed), tz=timezone.utc)
+ return datetime.now(timezone.utc)
+
+ async def collect(self) -> list[NewsItem]:
+ feeds = await self._fetch_feeds()
+ items = []
+ seen_titles = set()
+
+ for feed in feeds:
+ for entry in feed.get("entries", []):
+ title = entry.get("title", "").strip()
+ if not title or title in seen_titles:
+ continue
+ seen_titles.add(title)
+
+ summary = entry.get("summary", "")
+ sentiment_text = f"{title}. {summary}" if summary else title
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=title,
+ summary=summary or None,
+ url=entry.get("link"),
+ published_at=self._parse_time(entry),
+ symbols=self._extract_symbols(f"{title} {summary}"),
+ sentiment=self._vader.polarity_scores(sentiment_text)["compound"],
+ category=NewsCategory.MACRO,
+ raw_data={"feed_title": title},
+ )
+ )
+
+ return items
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_rss.py -v`
+Expected: All 3 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/rss.py services/news-collector/tests/test_rss.py
+git commit -m "feat: implement RSS news collector (Yahoo, Google News, MarketWatch)"
+```
+
+---
+
+### Task 9: Implement Fear & Greed Index collector
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/fear_greed.py`
+- Create: `services/news-collector/tests/test_fear_greed.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_fear_greed.py`:
+
+```python
+"""Tests for CNN Fear & Greed Index collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from news_collector.collectors.fear_greed import FearGreedCollector
+
+
+@pytest.fixture
+def collector():
+ return FearGreedCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "fear_greed"
+ assert collector.poll_interval == 3600
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_api_response(collector):
+ mock_data = {
+ "fear_and_greed": {
+ "score": 45.0,
+ "rating": "Fear",
+ "timestamp": "2026-04-02T12:00:00+00:00",
+ }
+ }
+ with patch.object(collector, "_fetch_index", new_callable=AsyncMock, return_value=mock_data):
+ result = await collector.collect()
+
+ assert result.fear_greed == 45
+ assert result.fear_greed_label == "Fear"
+
+
+async def test_collect_returns_none_on_failure(collector):
+ with patch.object(collector, "_fetch_index", new_callable=AsyncMock, return_value=None):
+ result = await collector.collect()
+ assert result is None
+
+
+def test_classify_label():
+ c = FearGreedCollector()
+ assert c._classify(10) == "Extreme Fear"
+ assert c._classify(30) == "Fear"
+ assert c._classify(50) == "Neutral"
+ assert c._classify(70) == "Greed"
+ assert c._classify(85) == "Extreme Greed"
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_fear_greed.py -v`
+Expected: FAIL
+
+- [ ] **Step 3: Implement FearGreedCollector**
+
+Create `services/news-collector/src/news_collector/collectors/fear_greed.py`:
+
+```python
+"""CNN Fear & Greed Index collector."""
+
+import logging
+from dataclasses import dataclass
+from typing import Optional
+
+import aiohttp
+
+from news_collector.collectors.base import BaseCollector
+from shared.models import NewsItem
+
+logger = logging.getLogger(__name__)
+
+FEAR_GREED_URL = "https://production.dataviz.cnn.io/index/fearandgreed/graphdata"
+
+
+@dataclass
+class FearGreedResult:
+ fear_greed: int
+ fear_greed_label: str
+
+
+class FearGreedCollector(BaseCollector):
+ """Fetches CNN Fear & Greed Index.
+
+ Note: This collector does NOT return NewsItem — it returns FearGreedResult
+ which feeds directly into MarketSentiment. The main.py scheduler handles
+ this differently from news collectors.
+ """
+
+ name = "fear_greed"
+ poll_interval = 3600 # 1 hour
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_index(self) -> Optional[dict]:
+ """Fetch Fear & Greed data from CNN API."""
+ headers = {"User-Agent": "Mozilla/5.0"}
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(
+ FEAR_GREED_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
+ ) as resp:
+ if resp.status != 200:
+ logger.warning("fear_greed_fetch_failed", status=resp.status)
+ return None
+ return await resp.json()
+ except Exception as exc:
+ logger.warning("fear_greed_error", error=str(exc))
+ return None
+
+ def _classify(self, score: int) -> str:
+ """Classify numeric score into label."""
+ if score <= 20:
+ return "Extreme Fear"
+ if score <= 40:
+ return "Fear"
+ if score <= 60:
+ return "Neutral"
+ if score <= 80:
+ return "Greed"
+ return "Extreme Greed"
+
+ async def collect(self) -> Optional[FearGreedResult]:
+ """Collect Fear & Greed Index. Returns FearGreedResult or None."""
+ data = await self._fetch_index()
+ if data is None:
+ return None
+
+ try:
+ fg = data["fear_and_greed"]
+ score = int(fg["score"])
+ label = fg.get("rating", self._classify(score))
+ return FearGreedResult(fear_greed=score, fear_greed_label=label)
+ except (KeyError, ValueError, TypeError) as exc:
+ logger.warning("fear_greed_parse_failed", error=str(exc))
+ return None
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_fear_greed.py -v`
+Expected: All 5 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/fear_greed.py services/news-collector/tests/test_fear_greed.py
+git commit -m "feat: implement CNN Fear & Greed Index collector"
+```
+
+---
+
+### Task 10: Implement SEC EDGAR collector
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/sec_edgar.py`
+- Create: `services/news-collector/tests/test_sec_edgar.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_sec_edgar.py`:
+
+```python
+"""Tests for SEC EDGAR filing collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from news_collector.collectors.sec_edgar import SecEdgarCollector
+
+
+@pytest.fixture
+def collector():
+ return SecEdgarCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "sec_edgar"
+ assert collector.poll_interval == 1800
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_filings(collector):
+ mock_response = {
+ "filings": {
+ "recent": {
+ "accessionNumber": ["0001234-26-000001"],
+ "filingDate": ["2026-04-02"],
+ "primaryDocument": ["filing.htm"],
+ "form": ["8-K"],
+ "primaryDocDescription": ["Current Report"],
+ }
+ },
+ "tickers": [{"ticker": "AAPL"}],
+ "name": "Apple Inc",
+ }
+ with patch.object(collector, "_fetch_recent_filings", new_callable=AsyncMock, return_value=[mock_response]):
+ items = await collector.collect()
+
+ assert len(items) == 1
+ assert items[0].source == "sec_edgar"
+ assert items[0].category.value == "filing"
+ assert "AAPL" in items[0].symbols
+
+
+async def test_collect_handles_empty(collector):
+ with patch.object(collector, "_fetch_recent_filings", new_callable=AsyncMock, return_value=[]):
+ items = await collector.collect()
+ assert items == []
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_sec_edgar.py -v`
+Expected: FAIL
+
+- [ ] **Step 3: Implement SecEdgarCollector**
+
+Create `services/news-collector/src/news_collector/collectors/sec_edgar.py`:
+
+```python
+"""SEC EDGAR filing collector (free, no API key required)."""
+
+import logging
+from datetime import datetime, timezone
+
+import aiohttp
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+EDGAR_FULL_TEXT_SEARCH = "https://efts.sec.gov/LATEST/search-index"
+EDGAR_RECENT_FILINGS = "https://efts.sec.gov/LATEST/search-index?q=%228-K%22&dateRange=custom&startdt={date}&enddt={date}&forms=8-K"
+EDGAR_COMPANY_FILINGS = "https://data.sec.gov/submissions/CIK{cik}.json"
+
+# CIK numbers for major companies (subset — extend as needed)
+TRACKED_CIKS = {
+ "0000320193": "AAPL",
+ "0000789019": "MSFT",
+ "0001652044": "GOOGL",
+ "0001018724": "AMZN",
+ "0001318605": "TSLA",
+ "0001045810": "NVDA",
+ "0001326801": "META",
+ "0000019617": "JPM",
+ "0000078003": "PFE",
+ "0000021344": "KO",
+}
+
+SEC_USER_AGENT = "TradingPlatform research@example.com"
+
+
+class SecEdgarCollector(BaseCollector):
+ name = "sec_edgar"
+ poll_interval = 1800 # 30 minutes
+
+ def __init__(self) -> None:
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_recent_filings(self) -> list[dict]:
+ """Fetch recent 8-K filings for tracked companies."""
+ results = []
+ headers = {"User-Agent": SEC_USER_AGENT}
+ async with aiohttp.ClientSession() as session:
+ for cik, ticker in TRACKED_CIKS.items():
+ try:
+ url = f"https://data.sec.gov/submissions/CIK{cik}.json"
+ async with session.get(
+ url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
+ ) as resp:
+ if resp.status == 200:
+ data = await resp.json()
+ data["tickers"] = [{"ticker": ticker}]
+ results.append(data)
+ except Exception as exc:
+ logger.warning("sec_fetch_failed", cik=cik, error=str(exc))
+ return results
+
+ async def collect(self) -> list[NewsItem]:
+ filings_data = await self._fetch_recent_filings()
+ items = []
+ today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
+
+ for company_data in filings_data:
+ tickers = [t["ticker"] for t in company_data.get("tickers", [])]
+ company_name = company_data.get("name", "Unknown")
+ recent = company_data.get("filings", {}).get("recent", {})
+
+ forms = recent.get("form", [])
+ dates = recent.get("filingDate", [])
+ descriptions = recent.get("primaryDocDescription", [])
+ accessions = recent.get("accessionNumber", [])
+
+ for i, form in enumerate(forms):
+ if form != "8-K":
+ continue
+ filing_date = dates[i] if i < len(dates) else ""
+ if filing_date != today:
+ continue
+
+ desc = descriptions[i] if i < len(descriptions) else "8-K Filing"
+ accession = accessions[i] if i < len(accessions) else ""
+ headline = f"{company_name} ({', '.join(tickers)}): {form} - {desc}"
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=headline,
+ summary=desc,
+ url=f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&accession={accession}",
+ published_at=datetime.strptime(filing_date, "%Y-%m-%d").replace(tzinfo=timezone.utc),
+ symbols=tickers,
+ sentiment=self._vader.polarity_scores(headline)["compound"],
+ category=NewsCategory.FILING,
+ raw_data={"form": form, "accession": accession},
+ )
+ )
+
+ return items
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_sec_edgar.py -v`
+Expected: All 4 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/sec_edgar.py services/news-collector/tests/test_sec_edgar.py
+git commit -m "feat: implement SEC EDGAR 8-K filing collector"
+```
+
+---
+
+### Task 11: Implement Reddit collector
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/reddit.py`
+- Create: `services/news-collector/tests/test_reddit.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_reddit.py`:
+
+```python
+"""Tests for Reddit collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from news_collector.collectors.reddit import RedditCollector
+
+
+@pytest.fixture
+def collector():
+ return RedditCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "reddit"
+ assert collector.poll_interval == 900
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_posts(collector):
+ mock_posts = [
+ {
+ "data": {
+ "title": "NVDA to the moon! 🚀 AI demand is insane",
+ "selftext": "Just loaded up on NVDA calls",
+ "url": "https://reddit.com/r/wallstreetbets/123",
+ "created_utc": 1711929600,
+ "score": 500,
+ "num_comments": 200,
+ "subreddit": "wallstreetbets",
+ }
+ },
+ ]
+ with patch.object(collector, "_fetch_subreddit", new_callable=AsyncMock, return_value=mock_posts):
+ items = await collector.collect()
+
+ assert len(items) >= 1
+ assert items[0].source == "reddit"
+ assert items[0].category.value == "social"
+ assert isinstance(items[0].sentiment, float)
+
+
+async def test_collect_filters_low_score(collector):
+ mock_posts = [
+ {
+ "data": {
+ "title": "Random question about stocks",
+ "selftext": "",
+ "url": "https://reddit.com/r/stocks/456",
+ "created_utc": 1711929600,
+ "score": 3,
+ "num_comments": 1,
+ "subreddit": "stocks",
+ }
+ },
+ ]
+ with patch.object(collector, "_fetch_subreddit", new_callable=AsyncMock, return_value=mock_posts):
+ items = await collector.collect()
+ assert items == []
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_reddit.py -v`
+Expected: FAIL
+
+- [ ] **Step 3: Implement RedditCollector**
+
+Create `services/news-collector/src/news_collector/collectors/reddit.py`:
+
+```python
+"""Reddit collector for r/wallstreetbets, r/stocks, r/investing."""
+
+import logging
+import re
+from datetime import datetime, timezone
+
+import aiohttp
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+SUBREDDITS = ["wallstreetbets", "stocks", "investing"]
+MIN_SCORE = 50 # Minimum upvotes to consider
+
+TICKER_PATTERN = re.compile(
+ r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|JPM|V|JNJ|WMT|PG|UNH|HD|"
+ r"MA|DIS|BAC|XOM|PFE|KO|PEP|CSCO|INTC|VZ|NFLX|ADBE|CRM|AMD|QCOM|"
+ r"GS|BA|CAT|MMM|IBM|GE|F|GM|NKE|MCD|SBUX|SPY|QQQ|IWM)\b"
+)
+
+
+class RedditCollector(BaseCollector):
+ name = "reddit"
+ poll_interval = 900 # 15 minutes
+
+ def __init__(self) -> None:
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_subreddit(self, subreddit: str = "wallstreetbets") -> list[dict]:
+ """Fetch hot posts from a subreddit via JSON API."""
+ url = f"https://www.reddit.com/r/{subreddit}/hot.json?limit=25"
+ headers = {"User-Agent": "TradingPlatform/1.0"}
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(
+ url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
+ ) as resp:
+ if resp.status != 200:
+ logger.warning("reddit_fetch_failed", subreddit=subreddit, status=resp.status)
+ return []
+ data = await resp.json()
+ return data.get("data", {}).get("children", [])
+ except Exception as exc:
+ logger.warning("reddit_error", subreddit=subreddit, error=str(exc))
+ return []
+
+ async def collect(self) -> list[NewsItem]:
+ items = []
+ seen_titles = set()
+
+ for subreddit in SUBREDDITS:
+ posts = await self._fetch_subreddit(subreddit)
+ for post in posts:
+ data = post.get("data", {})
+ title = data.get("title", "").strip()
+ score = data.get("score", 0)
+
+ if not title or title in seen_titles or score < MIN_SCORE:
+ continue
+ seen_titles.add(title)
+
+ selftext = data.get("selftext", "")
+ text = f"{title}. {selftext}" if selftext else title
+ symbols = list(set(TICKER_PATTERN.findall(text)))
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=title,
+ summary=selftext[:500] if selftext else None,
+ url=data.get("url"),
+ published_at=datetime.fromtimestamp(
+ data.get("created_utc", 0), tz=timezone.utc
+ ),
+ symbols=symbols,
+ sentiment=self._vader.polarity_scores(text)["compound"],
+ category=NewsCategory.SOCIAL,
+ raw_data={
+ "subreddit": data.get("subreddit", subreddit),
+ "score": score,
+ "num_comments": data.get("num_comments", 0),
+ },
+ )
+ )
+
+ return items
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_reddit.py -v`
+Expected: All 4 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/reddit.py services/news-collector/tests/test_reddit.py
+git commit -m "feat: implement Reddit social sentiment collector"
+```
+
+---
+
+### Task 12: Implement Truth Social and Fed collectors
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/truth_social.py`
+- Create: `services/news-collector/src/news_collector/collectors/fed.py`
+- Create: `services/news-collector/tests/test_truth_social.py`
+- Create: `services/news-collector/tests/test_fed.py`
+
+- [ ] **Step 1: Write tests for Truth Social**
+
+Create `services/news-collector/tests/test_truth_social.py`:
+
+```python
+"""Tests for Truth Social collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from news_collector.collectors.truth_social import TruthSocialCollector
+
+
+@pytest.fixture
+def collector():
+ return TruthSocialCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "truth_social"
+ assert collector.poll_interval == 900
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_posts(collector):
+ mock_posts = [
+ {
+ "content": "We are imposing 25% tariffs on all steel imports!",
+ "created_at": "2026-04-02T12:00:00.000Z",
+ "url": "https://truthsocial.com/@realDonaldTrump/12345",
+ },
+ ]
+ with patch.object(collector, "_fetch_posts", new_callable=AsyncMock, return_value=mock_posts):
+ items = await collector.collect()
+
+ assert len(items) == 1
+ assert items[0].source == "truth_social"
+ assert items[0].category.value == "policy"
+ assert "tariff" in items[0].headline.lower() or "tariff" in items[0].raw_data.get("content", "").lower()
+
+
+async def test_collect_handles_empty(collector):
+ with patch.object(collector, "_fetch_posts", new_callable=AsyncMock, return_value=[]):
+ items = await collector.collect()
+ assert items == []
+```
+
+- [ ] **Step 2: Write tests for Fed collector**
+
+Create `services/news-collector/tests/test_fed.py`:
+
+```python
+"""Tests for Federal Reserve collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from news_collector.collectors.fed import FedCollector
+
+
+@pytest.fixture
+def collector():
+ return FedCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "fed"
+ assert collector.poll_interval == 3600
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_rss(collector):
+ mock_entries = [
+ {
+ "title": "Federal Reserve issues FOMC statement",
+ "link": "https://www.federalreserve.gov/newsevents/pressreleases/monetary20260402a.htm",
+ "published_parsed": (2026, 4, 2, 14, 0, 0, 0, 0, 0),
+ "summary": "The Federal Open Market Committee decided to maintain the target range...",
+ },
+ ]
+ with patch.object(collector, "_fetch_fed_rss", new_callable=AsyncMock, return_value=mock_entries):
+ items = await collector.collect()
+
+ assert len(items) == 1
+ assert items[0].source == "fed"
+ assert items[0].category.value == "fed"
+```
+
+- [ ] **Step 3: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py -v`
+Expected: FAIL
+
+- [ ] **Step 4: Implement TruthSocialCollector**
+
+Create `services/news-collector/src/news_collector/collectors/truth_social.py`:
+
+```python
+"""Truth Social collector for Trump posts (policy-relevant)."""
+
+import logging
+from datetime import datetime, timezone
+
+import aiohttp
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+# Truth Social uses a Mastodon-compatible API
+TRUTH_SOCIAL_API = "https://truthsocial.com/api/v1/accounts/107780257626128497/statuses"
+
+
+class TruthSocialCollector(BaseCollector):
+ name = "truth_social"
+ poll_interval = 900 # 15 minutes
+
+ def __init__(self) -> None:
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_posts(self) -> list[dict]:
+ """Fetch recent posts from Truth Social."""
+ headers = {"User-Agent": "Mozilla/5.0"}
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(
+ TRUTH_SOCIAL_API,
+ headers=headers,
+ params={"limit": 10},
+ timeout=aiohttp.ClientTimeout(total=15),
+ ) as resp:
+ if resp.status != 200:
+ logger.warning("truth_social_fetch_failed", status=resp.status)
+ return []
+ return await resp.json()
+ except Exception as exc:
+ logger.warning("truth_social_error", error=str(exc))
+ return []
+
+ def _strip_html(self, text: str) -> str:
+ """Remove HTML tags from content."""
+ import re
+ return re.sub(r"<[^>]+>", "", text).strip()
+
+ async def collect(self) -> list[NewsItem]:
+ posts = await self._fetch_posts()
+ items = []
+
+ for post in posts:
+ content = self._strip_html(post.get("content", ""))
+ if not content:
+ continue
+
+ created_at_str = post.get("created_at", "")
+ try:
+ published = datetime.fromisoformat(created_at_str.replace("Z", "+00:00"))
+ except (ValueError, AttributeError):
+ published = datetime.now(timezone.utc)
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=content[:200],
+ summary=content if len(content) > 200 else None,
+ url=post.get("url"),
+ published_at=published,
+ symbols=[], # Symbols extracted at aggregation stage via LLM
+ sentiment=self._vader.polarity_scores(content)["compound"],
+ category=NewsCategory.POLICY,
+ raw_data={"content": content, "id": post.get("id")},
+ )
+ )
+
+ return items
+```
+
+- [ ] **Step 5: Implement FedCollector**
+
+Create `services/news-collector/src/news_collector/collectors/fed.py`:
+
+```python
+"""Federal Reserve press release and FOMC statement collector."""
+
+import logging
+from calendar import timegm
+from datetime import datetime, timezone
+
+import aiohttp
+import feedparser
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+FED_RSS_URL = "https://www.federalreserve.gov/feeds/press_all.xml"
+
+
+class FedCollector(BaseCollector):
+ name = "fed"
+ poll_interval = 3600 # 1 hour
+
+ def __init__(self) -> None:
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_fed_rss(self) -> list[dict]:
+ """Fetch Federal Reserve RSS feed entries."""
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(
+ FED_RSS_URL, timeout=aiohttp.ClientTimeout(total=10)
+ ) as resp:
+ if resp.status != 200:
+ logger.warning("fed_rss_failed", status=resp.status)
+ return []
+ text = await resp.text()
+ feed = feedparser.parse(text)
+ return feed.get("entries", [])
+ except Exception as exc:
+ logger.warning("fed_rss_error", error=str(exc))
+ return []
+
+ def _detect_stance(self, text: str) -> str:
+ """Detect hawkish/dovish/neutral stance from text."""
+ text_lower = text.lower()
+ hawkish_words = ["tighten", "raise", "inflation concern", "restrictive", "higher rates"]
+ dovish_words = ["accommodate", "cut", "easing", "lower rates", "support growth"]
+
+ hawk_count = sum(1 for w in hawkish_words if w in text_lower)
+ dove_count = sum(1 for w in dovish_words if w in text_lower)
+
+ if hawk_count > dove_count:
+ return "hawkish"
+ if dove_count > hawk_count:
+ return "dovish"
+ return "neutral"
+
+ async def collect(self) -> list[NewsItem]:
+ entries = await self._fetch_fed_rss()
+ items = []
+
+ for entry in entries:
+ title = entry.get("title", "").strip()
+ if not title:
+ continue
+
+ summary = entry.get("summary", "")
+ parsed_time = entry.get("published_parsed")
+ if parsed_time:
+ published = datetime.fromtimestamp(timegm(parsed_time), tz=timezone.utc)
+ else:
+ published = datetime.now(timezone.utc)
+
+ text = f"{title}. {summary}" if summary else title
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=title,
+ summary=summary or None,
+ url=entry.get("link"),
+ published_at=published,
+ symbols=[],
+ sentiment=self._vader.polarity_scores(text)["compound"],
+ category=NewsCategory.FED,
+ raw_data={"stance": self._detect_stance(text)},
+ )
+ )
+
+ return items
+```
+
+- [ ] **Step 6: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py -v`
+Expected: All 7 tests PASS
+
+- [ ] **Step 7: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/truth_social.py services/news-collector/src/news_collector/collectors/fed.py services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py
+git commit -m "feat: implement Truth Social and Federal Reserve collectors"
+```
+
+---
+
+### Task 13: Implement news-collector main.py (scheduler)
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/main.py`
+- Create: `services/news-collector/tests/test_main.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_main.py`:
+
+```python
+"""Tests for news collector scheduler."""
+
+import pytest
+from unittest.mock import AsyncMock, patch, MagicMock
+from datetime import datetime, timezone
+
+from shared.models import NewsCategory, NewsItem
+
+from news_collector.main import run_collector_once
+
+
+async def test_run_collector_once_stores_and_publishes():
+ mock_item = NewsItem(
+ source="test",
+ headline="Test news",
+ published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ sentiment=0.5,
+ category=NewsCategory.MACRO,
+ )
+
+ mock_collector = MagicMock()
+ mock_collector.name = "test"
+ mock_collector.collect = AsyncMock(return_value=[mock_item])
+
+ mock_db = MagicMock()
+ mock_db.insert_news_item = AsyncMock()
+
+ mock_broker = MagicMock()
+ mock_broker.publish = AsyncMock()
+
+ count = await run_collector_once(mock_collector, mock_db, mock_broker)
+
+ assert count == 1
+ mock_db.insert_news_item.assert_called_once_with(mock_item)
+ mock_broker.publish.assert_called_once()
+
+
+async def test_run_collector_once_handles_empty():
+ mock_collector = MagicMock()
+ mock_collector.name = "test"
+ mock_collector.collect = AsyncMock(return_value=[])
+
+ mock_db = MagicMock()
+ mock_broker = MagicMock()
+
+ count = await run_collector_once(mock_collector, mock_db, mock_broker)
+ assert count == 0
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_main.py -v`
+Expected: FAIL
+
+- [ ] **Step 3: Implement main.py**
+
+Create `services/news-collector/src/news_collector/main.py`:
+
+```python
+"""News Collector Service — schedules and runs all news collectors."""
+
+import asyncio
+import logging
+
+from shared.broker import RedisBroker
+from shared.db import Database
+from shared.events import NewsEvent
+from shared.healthcheck import HealthCheckServer
+from shared.logging import setup_logging
+from shared.metrics import ServiceMetrics
+from shared.models import NewsItem
+from shared.notifier import TelegramNotifier
+from shared.sentiment_models import MarketSentiment
+
+from news_collector.config import NewsCollectorConfig
+from news_collector.collectors.base import BaseCollector
+from news_collector.collectors.finnhub import FinnhubCollector
+from news_collector.collectors.rss import RSSCollector
+from news_collector.collectors.sec_edgar import SecEdgarCollector
+from news_collector.collectors.truth_social import TruthSocialCollector
+from news_collector.collectors.reddit import RedditCollector
+from news_collector.collectors.fear_greed import FearGreedCollector
+from news_collector.collectors.fed import FedCollector
+
+logger = logging.getLogger(__name__)
+
+
+async def run_collector_once(
+ collector: BaseCollector,
+ db: Database,
+ broker: RedisBroker,
+) -> int:
+ """Run a single collector, store results, publish to Redis.
+ Returns number of items collected."""
+ items = await collector.collect()
+ if not isinstance(items, list):
+ # FearGreedCollector returns a FearGreedResult, not a list
+ return 0
+
+ for item in items:
+ await db.insert_news_item(item)
+ event = NewsEvent(data=item)
+ await broker.publish("news", event.to_dict())
+
+ return len(items)
+
+
+async def run_collector_loop(
+ collector: BaseCollector,
+ db: Database,
+ broker: RedisBroker,
+ log,
+) -> None:
+ """Run a collector on its poll interval forever."""
+ while True:
+ try:
+ if await collector.is_available():
+ count = await run_collector_once(collector, db, broker)
+ log.info("collector_run", collector=collector.name, items=count)
+ else:
+ log.debug("collector_unavailable", collector=collector.name)
+ except Exception as exc:
+ log.error("collector_error", collector=collector.name, error=str(exc))
+ await asyncio.sleep(collector.poll_interval)
+
+
+async def run_fear_greed_loop(
+ collector: FearGreedCollector,
+ db: Database,
+ log,
+) -> None:
+ """Run the Fear & Greed collector and update market sentiment."""
+ from datetime import datetime, timezone
+
+ while True:
+ try:
+ result = await collector.collect()
+ if result is not None:
+ ms = MarketSentiment(
+ fear_greed=result.fear_greed,
+ fear_greed_label=result.fear_greed_label,
+ fed_stance="neutral", # Updated by Fed collector analysis
+ market_regime=_determine_regime(result.fear_greed, None),
+ updated_at=datetime.now(timezone.utc),
+ )
+ await db.upsert_market_sentiment(ms)
+ log.info("fear_greed_updated", score=result.fear_greed, label=result.fear_greed_label)
+ except Exception as exc:
+ log.error("fear_greed_error", error=str(exc))
+ await asyncio.sleep(collector.poll_interval)
+
+
+async def run_aggregator_loop(
+ db: Database,
+ interval: int,
+ log,
+) -> None:
+ """Run sentiment aggregation every `interval` seconds.
+ Reads recent news from DB, computes per-symbol scores, upserts into symbol_scores table."""
+ from datetime import datetime, timezone
+ from shared.sentiment import SentimentAggregator
+
+ aggregator = SentimentAggregator()
+
+ while True:
+ try:
+ now = datetime.now(timezone.utc)
+ news_items = await db.get_recent_news(hours=24)
+ if news_items:
+ scores = aggregator.aggregate(news_items, now)
+ for symbol_score in scores.values():
+ await db.upsert_symbol_score(symbol_score)
+ log.info("aggregation_complete", symbols=len(scores))
+ except Exception as exc:
+ log.error("aggregation_error", error=str(exc))
+ await asyncio.sleep(interval)
+
+
+def _determine_regime(fear_greed: int, vix: float | None) -> str:
+ """Determine market regime from Fear & Greed and VIX."""
+ if fear_greed <= 20:
+ return "risk_off"
+ if vix is not None and vix > 30:
+ return "risk_off"
+ if fear_greed >= 60 and (vix is None or vix < 20):
+ return "risk_on"
+ return "neutral"
+
+
+async def run() -> None:
+ config = NewsCollectorConfig()
+ log = setup_logging("news-collector", config.log_level, config.log_format)
+ metrics = ServiceMetrics("news_collector")
+
+ notifier = TelegramNotifier(
+ bot_token=config.telegram_bot_token,
+ chat_id=config.telegram_chat_id,
+ )
+
+ db = Database(config.database_url)
+ await db.connect()
+
+ broker = RedisBroker(config.redis_url)
+
+ health = HealthCheckServer(
+ "news-collector",
+ port=config.health_port,
+ auth_token=config.metrics_auth_token,
+ )
+ health.register_check("redis", broker.ping)
+ await health.start()
+ metrics.service_up.labels(service="news-collector").set(1)
+
+ # Initialize collectors
+ news_collectors: list[BaseCollector] = [
+ RSSCollector(),
+ SecEdgarCollector(),
+ TruthSocialCollector(),
+ RedditCollector(),
+ FedCollector(),
+ ]
+
+ # Finnhub requires API key
+ if config.finnhub_api_key:
+ news_collectors.append(FinnhubCollector(api_key=config.finnhub_api_key))
+
+ fear_greed = FearGreedCollector()
+
+ log.info(
+ "starting",
+ collectors=[c.name for c in news_collectors],
+ fear_greed=True,
+ )
+
+ tasks = []
+ try:
+ for collector in news_collectors:
+ task = asyncio.create_task(run_collector_loop(collector, db, broker, log))
+ tasks.append(task)
+
+ tasks.append(asyncio.create_task(run_fear_greed_loop(fear_greed, db, log)))
+ tasks.append(asyncio.create_task(
+ run_aggregator_loop(db, config.sentiment_aggregate_interval, log)
+ ))
+
+ await asyncio.gather(*tasks)
+ except Exception as exc:
+ log.error("fatal_error", error=str(exc))
+ await notifier.send_error(str(exc), "news-collector")
+ raise
+ finally:
+ for task in tasks:
+ task.cancel()
+ metrics.service_up.labels(service="news-collector").set(0)
+ await notifier.close()
+ await broker.close()
+ await db.close()
+
+
+def main() -> None:
+ asyncio.run(run())
+
+
+if __name__ == "__main__":
+ main()
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_main.py -v`
+Expected: All 2 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/main.py services/news-collector/tests/test_main.py
+git commit -m "feat: implement news-collector main scheduler with all collectors"
+```
+
+---
+
+## Phase 3: Sentiment Analysis Pipeline
+
+### Task 14: Implement sentiment aggregator
+
+**Files:**
+- Modify: `shared/src/shared/sentiment.py`
+- Create: `shared/tests/test_sentiment_aggregator.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `shared/tests/test_sentiment_aggregator.py`:
+
+```python
+"""Tests for sentiment aggregator."""
+
+import pytest
+from datetime import datetime, timezone, timedelta
+
+from shared.sentiment import SentimentAggregator
+
+
+@pytest.fixture
+def aggregator():
+ return SentimentAggregator()
+
+
+def test_freshness_decay_recent():
+ a = SentimentAggregator()
+ now = datetime.now(timezone.utc)
+ assert a._freshness_decay(now, now) == 1.0
+
+
+def test_freshness_decay_3_hours():
+ a = SentimentAggregator()
+ now = datetime.now(timezone.utc)
+ three_hours_ago = now - timedelta(hours=3)
+ assert a._freshness_decay(three_hours_ago, now) == 0.7
+
+
+def test_freshness_decay_12_hours():
+ a = SentimentAggregator()
+ now = datetime.now(timezone.utc)
+ twelve_hours_ago = now - timedelta(hours=12)
+ assert a._freshness_decay(twelve_hours_ago, now) == 0.3
+
+
+def test_freshness_decay_old():
+ a = SentimentAggregator()
+ now = datetime.now(timezone.utc)
+ two_days_ago = now - timedelta(days=2)
+ assert a._freshness_decay(two_days_ago, now) == 0.0
+
+
+def test_compute_composite():
+ a = SentimentAggregator()
+ composite = a._compute_composite(
+ news_score=0.5,
+ social_score=0.3,
+ policy_score=0.8,
+ filing_score=0.2,
+ )
+ expected = 0.5 * 0.3 + 0.3 * 0.2 + 0.8 * 0.3 + 0.2 * 0.2
+ assert abs(composite - expected) < 0.001
+
+
+def test_aggregate_news_by_symbol(aggregator):
+ now = datetime.now(timezone.utc)
+ news_items = [
+ {
+ "symbols": ["AAPL"],
+ "sentiment": 0.8,
+ "category": "earnings",
+ "published_at": now,
+ },
+ {
+ "symbols": ["AAPL"],
+ "sentiment": 0.3,
+ "category": "macro",
+ "published_at": now - timedelta(hours=2),
+ },
+ {
+ "symbols": ["MSFT"],
+ "sentiment": -0.5,
+ "category": "policy",
+ "published_at": now,
+ },
+ ]
+ scores = aggregator.aggregate(news_items, now)
+
+ assert "AAPL" in scores
+ assert "MSFT" in scores
+ assert scores["AAPL"].news_count == 2
+ assert scores["AAPL"].news_score > 0 # Positive overall
+ assert scores["MSFT"].policy_score < 0 # Negative policy
+
+
+def test_aggregate_empty(aggregator):
+ now = datetime.now(timezone.utc)
+ scores = aggregator.aggregate([], now)
+ assert scores == {}
+
+
+def test_determine_regime():
+ a = SentimentAggregator()
+ assert a.determine_regime(15, None) == "risk_off"
+ assert a.determine_regime(15, 35.0) == "risk_off"
+ assert a.determine_regime(50, 35.0) == "risk_off"
+ assert a.determine_regime(70, 15.0) == "risk_on"
+ assert a.determine_regime(50, 20.0) == "neutral"
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest shared/tests/test_sentiment_aggregator.py -v`
+Expected: FAIL — `SentimentAggregator` not in sentiment.py
+
+- [ ] **Step 3: Add SentimentAggregator to sentiment.py**
+
+Keep the existing `SentimentData` class (for backward compat with existing tests). Add `SentimentAggregator` class at the end of `shared/src/shared/sentiment.py`:
+
+```python
+from datetime import timedelta
+from shared.sentiment_models import SymbolScore
+
+
+class SentimentAggregator:
+ """Aggregates per-news sentiment into per-symbol scores."""
+
+ # Weights: policy events are most impactful for US stocks
+ WEIGHTS = {
+ "news": 0.3,
+ "social": 0.2,
+ "policy": 0.3,
+ "filing": 0.2,
+ }
+
+ # Category → score field mapping
+ CATEGORY_MAP = {
+ "earnings": "news",
+ "macro": "news",
+ "social": "social",
+ "policy": "policy",
+ "filing": "filing",
+ "fed": "policy",
+ }
+
+ def _freshness_decay(self, published_at: datetime, now: datetime) -> float:
+ """Compute freshness decay factor."""
+ age = now - published_at
+ hours = age.total_seconds() / 3600
+ if hours < 1:
+ return 1.0
+ if hours < 6:
+ return 0.7
+ if hours < 24:
+ return 0.3
+ return 0.0
+
+ def _compute_composite(
+ self,
+ news_score: float,
+ social_score: float,
+ policy_score: float,
+ filing_score: float,
+ ) -> float:
+ return (
+ news_score * self.WEIGHTS["news"]
+ + social_score * self.WEIGHTS["social"]
+ + policy_score * self.WEIGHTS["policy"]
+ + filing_score * self.WEIGHTS["filing"]
+ )
+
+ def aggregate(
+ self, news_items: list[dict], now: datetime
+ ) -> dict[str, SymbolScore]:
+ """Aggregate news items into per-symbol scores.
+
+ Each news_items dict must have: symbols, sentiment, category, published_at.
+ Returns dict mapping symbol → SymbolScore.
+ """
+ # Accumulate per-symbol, per-category
+ symbol_data: dict[str, dict] = {}
+
+ for item in news_items:
+ decay = self._freshness_decay(item["published_at"], now)
+ if decay == 0.0:
+ continue
+
+ category = item.get("category", "macro")
+ score_field = self.CATEGORY_MAP.get(category, "news")
+ weighted_sentiment = item["sentiment"] * decay
+
+ for symbol in item.get("symbols", []):
+ if symbol not in symbol_data:
+ symbol_data[symbol] = {
+ "news_scores": [],
+ "social_scores": [],
+ "policy_scores": [],
+ "filing_scores": [],
+ "count": 0,
+ }
+
+ symbol_data[symbol][f"{score_field}_scores"].append(weighted_sentiment)
+ symbol_data[symbol]["count"] += 1
+
+ # Compute averages and composites
+ result = {}
+ for symbol, data in symbol_data.items():
+ news_score = _safe_avg(data["news_scores"])
+ social_score = _safe_avg(data["social_scores"])
+ policy_score = _safe_avg(data["policy_scores"])
+ filing_score = _safe_avg(data["filing_scores"])
+
+ result[symbol] = SymbolScore(
+ symbol=symbol,
+ news_score=news_score,
+ news_count=data["count"],
+ social_score=social_score,
+ policy_score=policy_score,
+ filing_score=filing_score,
+ composite=self._compute_composite(
+ news_score, social_score, policy_score, filing_score
+ ),
+ updated_at=now,
+ )
+
+ return result
+
+ def determine_regime(self, fear_greed: int, vix: float | None) -> str:
+ """Determine market regime."""
+ if fear_greed <= 20:
+ return "risk_off"
+ if vix is not None and vix > 30:
+ return "risk_off"
+ if fear_greed >= 60 and (vix is None or vix < 20):
+ return "risk_on"
+ return "neutral"
+
+
+def _safe_avg(values: list[float]) -> float:
+ """Return average of values, or 0.0 if empty."""
+ if not values:
+ return 0.0
+ return sum(values) / len(values)
+```
+
+- [ ] **Step 4: Run new tests to verify they pass**
+
+Run: `pytest shared/tests/test_sentiment_aggregator.py -v`
+Expected: All 9 tests PASS
+
+- [ ] **Step 5: Run existing sentiment tests for regressions**
+
+Run: `pytest shared/tests/test_sentiment.py -v`
+Expected: All existing tests PASS (SentimentData unchanged)
+
+- [ ] **Step 6: Commit**
+
+```bash
+git add shared/src/shared/sentiment.py shared/tests/test_sentiment_aggregator.py
+git commit -m "feat: implement SentimentAggregator with freshness decay and composite scoring"
+```
+
+---
+
+## Phase 4: Stock Selector Engine
+
+### Task 15: Implement stock selector
+
+**Files:**
+- Create: `services/strategy-engine/src/strategy_engine/stock_selector.py`
+- Create: `services/strategy-engine/tests/test_stock_selector.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/strategy-engine/tests/test_stock_selector.py`:
+
+```python
+"""Tests for stock selector engine."""
+
+import pytest
+from unittest.mock import AsyncMock, MagicMock, patch
+from datetime import datetime, timezone
+from decimal import Decimal
+
+from shared.models import OrderSide
+from shared.sentiment_models import SymbolScore, MarketSentiment, SelectedStock, Candidate
+
+from strategy_engine.stock_selector import (
+ SentimentCandidateSource,
+ StockSelector,
+ _parse_llm_selections,
+)
+
+
+async def test_sentiment_candidate_source():
+ mock_db = MagicMock()
+ mock_db.get_top_symbol_scores = AsyncMock(return_value=[
+ {"symbol": "AAPL", "composite": 0.8, "news_count": 5},
+ {"symbol": "NVDA", "composite": 0.6, "news_count": 3},
+ ])
+
+ source = SentimentCandidateSource(mock_db)
+ candidates = await source.get_candidates()
+
+ assert len(candidates) == 2
+ assert candidates[0].symbol == "AAPL"
+ assert candidates[0].source == "sentiment"
+
+
+def test_parse_llm_selections_valid():
+ llm_response = """
+ [
+ {"symbol": "NVDA", "side": "BUY", "conviction": 0.85, "reason": "AI demand", "key_news": ["NVDA beats earnings"]},
+ {"symbol": "XOM", "side": "BUY", "conviction": 0.72, "reason": "Oil surge", "key_news": ["Oil prices up"]}
+ ]
+ """
+ selections = _parse_llm_selections(llm_response)
+ assert len(selections) == 2
+ assert selections[0].symbol == "NVDA"
+ assert selections[0].conviction == 0.85
+
+
+def test_parse_llm_selections_invalid():
+ selections = _parse_llm_selections("not json")
+ assert selections == []
+
+
+def test_parse_llm_selections_with_markdown():
+ llm_response = """
+ Here are my picks:
+ ```json
+ [
+ {"symbol": "TSLA", "side": "BUY", "conviction": 0.7, "reason": "Momentum", "key_news": ["Tesla rally"]}
+ ]
+ ```
+ """
+ selections = _parse_llm_selections(llm_response)
+ assert len(selections) == 1
+ assert selections[0].symbol == "TSLA"
+
+
+async def test_selector_blocks_on_risk_off():
+ mock_db = MagicMock()
+ mock_db.get_latest_market_sentiment = AsyncMock(return_value={
+ "fear_greed": 15,
+ "fear_greed_label": "Extreme Fear",
+ "vix": 35.0,
+ "fed_stance": "neutral",
+ "market_regime": "risk_off",
+ "updated_at": datetime.now(timezone.utc),
+ })
+
+ selector = StockSelector(db=mock_db, broker=MagicMock(), alpaca=MagicMock(), anthropic_api_key="test")
+ result = await selector.select()
+ assert result == []
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/strategy-engine/tests/test_stock_selector.py -v`
+Expected: FAIL — module not found
+
+- [ ] **Step 3: Implement StockSelector**
+
+Create `services/strategy-engine/src/strategy_engine/stock_selector.py`:
+
+```python
+"""Stock Selector Engine — 3-stage dynamic stock selection for MOC trading."""
+
+import json
+import logging
+import re
+from datetime import datetime, timezone
+from decimal import Decimal
+from typing import Optional
+
+import aiohttp
+
+from shared.alpaca import AlpacaClient
+from shared.broker import RedisBroker
+from shared.db import Database
+from shared.models import OrderSide
+from shared.sentiment_models import Candidate, MarketSentiment, SelectedStock, SymbolScore
+
+logger = logging.getLogger(__name__)
+
+
+class SentimentCandidateSource:
+ """Get candidate stocks from sentiment scores in DB."""
+
+ def __init__(self, db: Database, limit: int = 20) -> None:
+ self._db = db
+ self._limit = limit
+
+ async def get_candidates(self) -> list[Candidate]:
+ scores = await self._db.get_top_symbol_scores(limit=self._limit)
+ return [
+ Candidate(
+ symbol=s["symbol"],
+ source="sentiment",
+ score=s["composite"],
+ reason=f"Sentiment composite={s['composite']:.2f}, news_count={s['news_count']}",
+ )
+ for s in scores
+ if s["composite"] != 0
+ ]
+
+
+class LLMCandidateSource:
+ """Get candidate stocks by asking Claude to analyze today's top news."""
+
+ def __init__(self, db: Database, api_key: str, model: str) -> None:
+ self._db = db
+ self._api_key = api_key
+ self._model = model
+
+ async def get_candidates(self) -> list[Candidate]:
+ news = await self._db.get_recent_news(hours=24)
+ if not news:
+ return []
+
+ headlines = [f"- [{n['source']}] {n['headline']} (sentiment: {n['sentiment']:.2f})" for n in news[:50]]
+ prompt = (
+ "You are a stock market analyst. Based on today's news headlines below, "
+ "identify US stocks that are most likely to be affected (positively or negatively). "
+ "Return a JSON array of objects with: symbol, direction (BUY or SELL), score (0-1), reason.\n\n"
+ "Headlines:\n" + "\n".join(headlines) + "\n\n"
+ "Return ONLY the JSON array, no other text."
+ )
+
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.post(
+ "https://api.anthropic.com/v1/messages",
+ headers={
+ "x-api-key": self._api_key,
+ "anthropic-version": "2023-06-01",
+ "content-type": "application/json",
+ },
+ json={
+ "model": self._model,
+ "max_tokens": 1024,
+ "messages": [{"role": "user", "content": prompt}],
+ },
+ timeout=aiohttp.ClientTimeout(total=30),
+ ) as resp:
+ if resp.status != 200:
+ logger.warning("llm_candidate_failed", status=resp.status)
+ return []
+ data = await resp.json()
+ text = data["content"][0]["text"]
+ except Exception as exc:
+ logger.error("llm_candidate_error", error=str(exc))
+ return []
+
+ return self._parse_response(text)
+
+ def _parse_response(self, text: str) -> list[Candidate]:
+ try:
+ # Extract JSON from possible markdown code blocks
+ json_match = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL)
+ if json_match:
+ text = json_match.group(1)
+ items = json.loads(text)
+ except (json.JSONDecodeError, TypeError):
+ return []
+
+ candidates = []
+ for item in items:
+ try:
+ direction = OrderSide(item.get("direction", "BUY"))
+ candidates.append(
+ Candidate(
+ symbol=item["symbol"],
+ source="llm",
+ direction=direction,
+ score=float(item.get("score", 0.5)),
+ reason=item.get("reason", "LLM recommendation"),
+ )
+ )
+ except (KeyError, ValueError):
+ continue
+
+ return candidates
+
+
+class StockSelector:
+ """3-stage stock selector: candidates → technical filter → LLM final pick."""
+
+ def __init__(
+ self,
+ db: Database,
+ broker: RedisBroker,
+ alpaca: AlpacaClient,
+ anthropic_api_key: str,
+ anthropic_model: str = "claude-sonnet-4-20250514",
+ max_picks: int = 3,
+ ) -> None:
+ self._db = db
+ self._broker = broker
+ self._alpaca = alpaca
+ self._api_key = anthropic_api_key
+ self._model = anthropic_model
+ self._max_picks = max_picks
+ self._sentiment_source = SentimentCandidateSource(db)
+ self._llm_source = LLMCandidateSource(db, anthropic_api_key, anthropic_model)
+
+ async def select(self) -> list[SelectedStock]:
+ """Run full 3-stage selection. Returns list of SelectedStock."""
+ # Check market sentiment gate
+ ms = await self._db.get_latest_market_sentiment()
+ if ms and ms.get("market_regime") == "risk_off":
+ logger.info("selection_blocked_risk_off")
+ return []
+
+ # Stage 1: Candidate pool
+ sentiment_candidates = await self._sentiment_source.get_candidates()
+ llm_candidates = await self._llm_source.get_candidates()
+ candidates = self._merge_candidates(sentiment_candidates, llm_candidates)
+
+ if not candidates:
+ logger.info("no_candidates_found")
+ return []
+
+ logger.info("candidates_found", count=len(candidates))
+
+ # Stage 2: Technical filter
+ filtered = await self._technical_filter(candidates)
+ if not filtered:
+ logger.info("all_candidates_filtered_out")
+ return []
+
+ logger.info("technical_filter_passed", count=len(filtered))
+
+ # Stage 3: LLM final selection
+ selections = await self._llm_final_select(filtered, ms)
+
+ # Publish to Redis
+ for selection in selections:
+ await self._broker.publish(
+ "selected_stocks",
+ selection.model_dump(mode="json"),
+ )
+
+ # Persist audit trail
+ from datetime import date as date_type
+
+ for selection in selections:
+ score_data = await self._db.get_top_symbol_scores(limit=100)
+ snapshot = next(
+ (s for s in score_data if s["symbol"] == selection.symbol),
+ {},
+ )
+ await self._db.insert_stock_selection(
+ trade_date=date_type.today(),
+ symbol=selection.symbol,
+ side=selection.side.value,
+ conviction=selection.conviction,
+ reason=selection.reason,
+ key_news=selection.key_news,
+ sentiment_snapshot=snapshot,
+ )
+
+ return selections
+
+ def _merge_candidates(
+ self,
+ sentiment: list[Candidate],
+ llm: list[Candidate],
+ ) -> list[Candidate]:
+ """Merge and deduplicate candidates, preferring higher scores."""
+ by_symbol: dict[str, Candidate] = {}
+ for c in sentiment + llm:
+ if c.symbol not in by_symbol or c.score > by_symbol[c.symbol].score:
+ by_symbol[c.symbol] = c
+ return sorted(by_symbol.values(), key=lambda c: c.score, reverse=True)
+
+ async def _technical_filter(self, candidates: list[Candidate]) -> list[Candidate]:
+ """Apply MOC-style technical screening to candidates."""
+ import pandas as pd
+
+ passed = []
+ for candidate in candidates:
+ try:
+ bars = await self._alpaca.get_bars(
+ candidate.symbol, timeframe="1Day", limit=30
+ )
+ if not bars or len(bars) < 21:
+ continue
+
+ closes = pd.Series([float(b["c"]) for b in bars])
+ volumes = pd.Series([float(b["v"]) for b in bars])
+
+ # RSI
+ delta = closes.diff()
+ gain = delta.clip(lower=0)
+ loss = -delta.clip(upper=0)
+ avg_gain = gain.ewm(com=13, min_periods=14).mean()
+ avg_loss = loss.ewm(com=13, min_periods=14).mean()
+ rs = avg_gain / avg_loss.replace(0, float("nan"))
+ rsi = 100 - (100 / (1 + rs))
+ current_rsi = rsi.iloc[-1]
+
+ if pd.isna(current_rsi) or not (30 <= current_rsi <= 70):
+ continue
+
+ # EMA
+ ema20 = closes.ewm(span=20, adjust=False).mean().iloc[-1]
+ if closes.iloc[-1] < ema20:
+ continue
+
+ # Volume above average
+ vol_avg = volumes.iloc[-20:].mean()
+ if vol_avg > 0 and volumes.iloc[-1] < vol_avg * 0.5:
+ continue
+
+ passed.append(candidate)
+
+ except Exception as exc:
+ logger.warning("technical_filter_error", symbol=candidate.symbol, error=str(exc))
+ continue
+
+ return passed
+
+ async def _llm_final_select(
+ self,
+ candidates: list[Candidate],
+ market_sentiment: Optional[dict],
+ ) -> list[SelectedStock]:
+ """Ask Claude to make final 2-3 picks from filtered candidates."""
+ # Build context
+ candidate_info = []
+ for c in candidates[:15]:
+ candidate_info.append(f"- {c.symbol}: score={c.score:.2f}, source={c.source}, reason={c.reason}")
+
+ news = await self._db.get_recent_news(hours=12)
+ top_news = [f"- [{n['source']}] {n['headline']}" for n in news[:20]]
+
+ ms_info = "No market sentiment data available."
+ if market_sentiment:
+ ms_info = (
+ f"Fear & Greed: {market_sentiment.get('fear_greed', 'N/A')} "
+ f"({market_sentiment.get('fear_greed_label', 'N/A')}), "
+ f"VIX: {market_sentiment.get('vix', 'N/A')}, "
+ f"Fed Stance: {market_sentiment.get('fed_stance', 'N/A')}"
+ )
+
+ prompt = (
+ f"You are a professional stock trader selecting {self._max_picks} stocks for "
+ f"Market-on-Close (MOC) overnight trading. You buy at market close and sell at "
+ f"next day's open.\n\n"
+ f"## Market Conditions\n{ms_info}\n\n"
+ f"## Candidate Stocks (pre-screened technically)\n"
+ + "\n".join(candidate_info) + "\n\n"
+ f"## Today's Key News\n"
+ + "\n".join(top_news) + "\n\n"
+ f"Select the best {self._max_picks} stocks. For each, provide:\n"
+ f"- symbol: ticker\n"
+ f"- side: BUY or SELL\n"
+ f"- conviction: 0.0-1.0\n"
+ f"- reason: one sentence\n"
+ f"- key_news: list of relevant headlines\n\n"
+ f"Return ONLY a JSON array. No other text."
+ )
+
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.post(
+ "https://api.anthropic.com/v1/messages",
+ headers={
+ "x-api-key": self._api_key,
+ "anthropic-version": "2023-06-01",
+ "content-type": "application/json",
+ },
+ json={
+ "model": self._model,
+ "max_tokens": 1024,
+ "messages": [{"role": "user", "content": prompt}],
+ },
+ timeout=aiohttp.ClientTimeout(total=30),
+ ) as resp:
+ if resp.status != 200:
+ logger.error("llm_final_select_failed", status=resp.status)
+ return []
+ data = await resp.json()
+ text = data["content"][0]["text"]
+ except Exception as exc:
+ logger.error("llm_final_select_error", error=str(exc))
+ return []
+
+ return _parse_llm_selections(text)
+
+
+def _parse_llm_selections(text: str) -> list[SelectedStock]:
+ """Parse LLM response into SelectedStock list."""
+ try:
+ json_match = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL)
+ if json_match:
+ text = json_match.group(1)
+ # Also try to find a bare JSON array
+ array_match = re.search(r"\[.*\]", text, re.DOTALL)
+ if array_match:
+ text = array_match.group(0)
+ items = json.loads(text)
+ except (json.JSONDecodeError, TypeError):
+ return []
+
+ selections = []
+ for item in items:
+ try:
+ selections.append(
+ SelectedStock(
+ symbol=item["symbol"],
+ side=OrderSide(item.get("side", "BUY")),
+ conviction=float(item.get("conviction", 0.5)),
+ reason=item.get("reason", ""),
+ key_news=item.get("key_news", []),
+ )
+ )
+ except (KeyError, ValueError):
+ continue
+
+ return selections
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/strategy-engine/tests/test_stock_selector.py -v`
+Expected: All 5 tests PASS
+
+- [ ] **Step 5: Run all strategy engine tests for regressions**
+
+Run: `pytest services/strategy-engine/tests/ -v`
+Expected: All tests PASS
+
+- [ ] **Step 6: Commit**
+
+```bash
+git add services/strategy-engine/src/strategy_engine/stock_selector.py services/strategy-engine/tests/test_stock_selector.py
+git commit -m "feat: implement 3-stage stock selector (sentiment → technical → LLM)"
+```
+
+---
+
+## Phase 5: Integration (MOC + Notifications + Docker)
+
+### Task 16: Add Telegram notification for stock selections
+
+**Files:**
+- Modify: `shared/src/shared/notifier.py`
+- Modify: `shared/tests/test_notifier.py`
+
+- [ ] **Step 1: Add send_stock_selection method to notifier.py**
+
+Add this method and import to `shared/src/shared/notifier.py`:
+
+Add to imports:
+```python
+from shared.sentiment_models import SelectedStock, MarketSentiment
+```
+
+Add method to `TelegramNotifier` class:
+
+```python
+ async def send_stock_selection(
+ self,
+ selections: list[SelectedStock],
+ market: MarketSentiment | None = None,
+ ) -> None:
+ """Format and send stock selection notification."""
+ lines = [f"<b>📊 Stock Selection ({len(selections)} picks)</b>", ""]
+
+ side_emoji = {"BUY": "🟢", "SELL": "🔴"}
+
+ for i, s in enumerate(selections, 1):
+ emoji = side_emoji.get(s.side.value, "⚪")
+ lines.append(
+ f"{i}. <b>{s.symbol}</b> {emoji} {s.side.value} "
+ f"(conviction: {s.conviction:.0%})"
+ )
+ lines.append(f" {s.reason}")
+ if s.key_news:
+ lines.append(f" News: {s.key_news[0]}")
+ lines.append("")
+
+ if market:
+ lines.append(
+ f"Market: F&amp;G {market.fear_greed} ({market.fear_greed_label})"
+ + (f" | VIX {market.vix:.1f}" if market.vix else "")
+ )
+
+ await self.send("\n".join(lines))
+```
+
+- [ ] **Step 2: Add test for the new method**
+
+Add to `shared/tests/test_notifier.py`:
+
+```python
+from shared.models import OrderSide
+from shared.sentiment_models import SelectedStock, MarketSentiment
+from datetime import datetime, timezone
+
+
+async def test_send_stock_selection(notifier, mock_session):
+ """Test stock selection notification formatting."""
+ selections = [
+ SelectedStock(
+ symbol="NVDA",
+ side=OrderSide.BUY,
+ conviction=0.85,
+ reason="CHIPS Act expansion",
+ key_news=["Trump signs CHIPS Act"],
+ ),
+ ]
+ market = MarketSentiment(
+ fear_greed=55,
+ fear_greed_label="Neutral",
+ vix=18.2,
+ fed_stance="neutral",
+ market_regime="neutral",
+ updated_at=datetime.now(timezone.utc),
+ )
+ await notifier.send_stock_selection(selections, market)
+ mock_session.post.assert_called_once()
+```
+
+Note: Check `shared/tests/test_notifier.py` for existing fixture names (`notifier`, `mock_session`) and adapt accordingly.
+
+- [ ] **Step 3: Run notifier tests**
+
+Run: `pytest shared/tests/test_notifier.py -v`
+Expected: All tests PASS
+
+- [ ] **Step 4: Commit**
+
+```bash
+git add shared/src/shared/notifier.py shared/tests/test_notifier.py
+git commit -m "feat: add Telegram notification for stock selections"
+```
+
+---
+
+### Task 17: Integrate stock selector with MOC strategy
+
+**Files:**
+- Modify: `services/strategy-engine/src/strategy_engine/main.py`
+- Modify: `services/strategy-engine/src/strategy_engine/config.py`
+
+- [ ] **Step 1: Update strategy engine config**
+
+Add to `StrategyConfig` in `services/strategy-engine/src/strategy_engine/config.py`:
+
+```python
+ selector_candidates_time: str = "15:00"
+ selector_filter_time: str = "15:15"
+ selector_final_time: str = "15:30"
+ selector_max_picks: int = 3
+ anthropic_api_key: str = ""
+ anthropic_model: str = "claude-sonnet-4-20250514"
+```
+
+- [ ] **Step 2: Add stock selector scheduling to main.py**
+
+Add a new coroutine to `services/strategy-engine/src/strategy_engine/main.py` that runs the stock selector at the configured times. Add imports:
+
+```python
+from shared.alpaca import AlpacaClient
+from shared.db import Database
+from shared.notifier import TelegramNotifier
+from shared.sentiment_models import MarketSentiment
+from strategy_engine.stock_selector import StockSelector
+```
+
+Add the selector loop function:
+
+```python
+async def run_stock_selector(
+ selector: StockSelector,
+ notifier: TelegramNotifier,
+ db: Database,
+ config: StrategyConfig,
+ log,
+) -> None:
+ """Run the stock selector once per day at the configured time."""
+ import zoneinfo
+
+ et = zoneinfo.ZoneInfo("America/New_York")
+
+ while True:
+ now_et = datetime.now(et)
+ target_hour, target_min = map(int, config.selector_final_time.split(":"))
+
+ # Check if it's time to run (within 1-minute window)
+ if now_et.hour == target_hour and now_et.minute == target_min:
+ log.info("stock_selector_running")
+ try:
+ selections = await selector.select()
+ if selections:
+ ms_data = await db.get_latest_market_sentiment()
+ ms = None
+ if ms_data:
+ ms = MarketSentiment(**ms_data)
+ await notifier.send_stock_selection(selections, ms)
+ log.info(
+ "stock_selector_complete",
+ picks=[s.symbol for s in selections],
+ )
+ else:
+ log.info("stock_selector_no_picks")
+ except Exception as exc:
+ log.error("stock_selector_error", error=str(exc))
+ # Sleep past this minute to avoid re-triggering
+ await asyncio.sleep(120)
+ else:
+ await asyncio.sleep(30)
+```
+
+In the `run()` function, add after creating the broker:
+
+```python
+ db = Database(config.database_url)
+ await db.connect()
+
+ alpaca = AlpacaClient(
+ api_key=config.alpaca_api_key,
+ api_secret=config.alpaca_api_secret,
+ paper=config.alpaca_paper,
+ )
+```
+
+And add the selector if anthropic key is configured:
+
+```python
+ if config.anthropic_api_key:
+ selector = StockSelector(
+ db=db,
+ broker=broker,
+ alpaca=alpaca,
+ anthropic_api_key=config.anthropic_api_key,
+ anthropic_model=config.anthropic_model,
+ max_picks=config.selector_max_picks,
+ )
+ tasks.append(asyncio.create_task(
+ run_stock_selector(selector, notifier, db, config, log)
+ ))
+ log.info("stock_selector_enabled", time=config.selector_final_time)
+```
+
+Add to the `finally` block:
+
+```python
+ await alpaca.close()
+ await db.close()
+```
+
+- [ ] **Step 3: Run strategy engine tests for regressions**
+
+Run: `pytest services/strategy-engine/tests/ -v`
+Expected: All tests PASS
+
+- [ ] **Step 4: Commit**
+
+```bash
+git add services/strategy-engine/src/strategy_engine/main.py services/strategy-engine/src/strategy_engine/config.py
+git commit -m "feat: integrate stock selector into strategy engine scheduler"
+```
+
+---
+
+### Task 18: Update Docker Compose and .env
+
+**Files:**
+- Modify: `docker-compose.yml`
+- Modify: `.env.example` (already done in Task 5, just verify)
+
+- [ ] **Step 1: Add news-collector service to docker-compose.yml**
+
+Add before the `loki:` service block in `docker-compose.yml`:
+
+```yaml
+ news-collector:
+ build:
+ context: .
+ dockerfile: services/news-collector/Dockerfile
+ env_file: .env
+ ports:
+ - "8084:8084"
+ depends_on:
+ redis:
+ condition: service_healthy
+ postgres:
+ condition: service_healthy
+ healthcheck:
+ test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8084/health')"]
+ interval: 10s
+ timeout: 5s
+ retries: 3
+ restart: unless-stopped
+```
+
+- [ ] **Step 2: Verify compose file is valid**
+
+Run: `docker compose config --quiet 2>&1 || echo "INVALID"`
+Expected: No output (valid) or compose config displayed without errors
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add docker-compose.yml
+git commit -m "feat: add news-collector service to Docker Compose"
+```
+
+---
+
+### Task 19: Run full test suite and lint
+
+- [ ] **Step 1: Install test dependencies**
+
+Run: `pip install -e shared/ && pip install aiosqlite feedparser nltk aioresponses`
+
+- [ ] **Step 2: Download VADER lexicon**
+
+Run: `python -c "import nltk; nltk.download('vader_lexicon', quiet=True)"`
+
+- [ ] **Step 3: Run lint**
+
+Run: `make lint`
+Expected: No lint errors. If there are errors, fix them.
+
+- [ ] **Step 4: Run full test suite**
+
+Run: `make test`
+Expected: All tests PASS
+
+- [ ] **Step 5: Fix any issues found in steps 3-4**
+
+If lint or tests fail, fix the issues and re-run.
+
+- [ ] **Step 6: Final commit if any fixes were needed**
+
+```bash
+git add -A
+git commit -m "fix: resolve lint and test issues from news selector integration"
+```
diff --git a/docs/superpowers/specs/2026-04-02-news-driven-stock-selector-design.md b/docs/superpowers/specs/2026-04-02-news-driven-stock-selector-design.md
new file mode 100644
index 0000000..d439154
--- /dev/null
+++ b/docs/superpowers/specs/2026-04-02-news-driven-stock-selector-design.md
@@ -0,0 +1,418 @@
+# News-Driven Stock Selector Design
+
+**Date:** 2026-04-02
+**Goal:** Upgrade the MOC (Market on Close) strategy from fixed symbol lists to dynamic, news-driven stock selection. The system collects news/sentiment data continuously, then selects 2-3 optimal stocks daily before market close.
+
+---
+
+## Architecture Overview
+
+```
+[Continuous Collection] [Pre-Close Decision]
+Finnhub News ─┐
+RSS Feeds ─┤
+SEC EDGAR ─┤
+Truth Social ─┼→ DB (news_items) → Sentiment Aggregator → symbol_scores
+Reddit ─┤ + Redis "news" (every 15 min) market_sentiment
+Fear & Greed ─┤
+FOMC/Fed ─┘
+
+ 15:00 ET ─→ Candidate Pool (sentiment top + LLM picks)
+ 15:15 ET ─→ Technical Filter (RSI, EMA, volume)
+ 15:30 ET ─→ LLM Final Selection (2-3 stocks) → Telegram
+ 15:50 ET ─→ MOC Buy Execution
+ 09:35 ET ─→ Next-day Sell (existing MOC logic)
+```
+
+## 1. News Collector Service
+
+New service: `services/news-collector/`
+
+### Structure
+
+```
+services/news-collector/
+├── Dockerfile
+├── pyproject.toml
+├── src/news_collector/
+│ ├── __init__.py
+│ ├── main.py # Scheduler: runs each collector on its interval
+│ ├── config.py
+│ └── collectors/
+│ ├── __init__.py
+│ ├── base.py # BaseCollector ABC
+│ ├── finnhub.py # Finnhub market news (free, 60 req/min)
+│ ├── rss.py # Yahoo Finance, Google News, MarketWatch RSS
+│ ├── sec_edgar.py # SEC EDGAR 8-K/10-Q filings
+│ ├── truth_social.py # Truth Social scraping (Trump posts)
+│ ├── reddit.py # Reddit (r/wallstreetbets, r/stocks)
+│ ├── fear_greed.py # CNN Fear & Greed Index scraping
+│ └── fed.py # FOMC statements, Fed announcements
+└── tests/
+```
+
+### BaseCollector Interface
+
+```python
+class BaseCollector(ABC):
+ name: str
+ poll_interval: int # seconds
+
+ @abstractmethod
+ async def collect(self) -> list[NewsItem]:
+ """Collect and return list of NewsItem."""
+
+ @abstractmethod
+ async def is_available(self) -> bool:
+ """Check if this source is accessible (API key present, endpoint reachable)."""
+```
+
+### Poll Intervals
+
+| Collector | Interval | Notes |
+|-----------|----------|-------|
+| Finnhub | 5 min | Free tier: 60 calls/min |
+| RSS (Yahoo/Google/MarketWatch) | 10 min | Headlines only |
+| SEC EDGAR | 30 min | Focus on 8-K filings |
+| Truth Social | 15 min | Scraping |
+| Reddit | 15 min | Hot posts from relevant subs |
+| Fear & Greed | 1 hour | Updates once daily but check periodically |
+| FOMC/Fed | 1 hour | Infrequent events |
+
+### Provider Abstraction (for paid upgrade path)
+
+```python
+# config.yaml
+collectors:
+ news:
+ provider: "finnhub" # swap to "benzinga" for paid
+ api_key: ${FINNHUB_API_KEY}
+ social:
+ provider: "reddit" # swap to "stocktwits_pro" etc.
+ policy:
+ provider: "truth_social" # swap to "twitter_api" etc.
+
+# Factory
+COLLECTOR_REGISTRY = {
+ "finnhub": FinnhubCollector,
+ "rss": RSSCollector,
+ "benzinga": BenzingaCollector, # added later
+}
+```
+
+## 2. Shared Models (additions to shared/)
+
+### NewsItem (shared/models.py)
+
+```python
+class NewsCategory(str, Enum):
+ POLICY = "policy"
+ EARNINGS = "earnings"
+ MACRO = "macro"
+ SOCIAL = "social"
+ FILING = "filing"
+ FED = "fed"
+
+class NewsItem(BaseModel):
+ id: str = Field(default_factory=lambda: str(uuid.uuid4()))
+ source: str # "finnhub", "rss", "sec_edgar", etc.
+ headline: str
+ summary: str | None = None
+ url: str | None = None
+ published_at: datetime
+ symbols: list[str] = [] # Related tickers (if identifiable)
+ sentiment: float # -1.0 to 1.0 (first-pass analysis at collection)
+ category: NewsCategory
+ raw_data: dict = {}
+ created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
+```
+
+### SymbolScore (shared/sentiment_models.py — new file)
+
+```python
+class SymbolScore(BaseModel):
+ symbol: str
+ news_score: float # -1.0 to 1.0, weighted avg of news sentiment
+ news_count: int # Number of news items in last 24h
+ social_score: float # Reddit/social sentiment
+ policy_score: float # Policy-related impact
+ filing_score: float # SEC filing impact
+ composite: float # Weighted final score
+ updated_at: datetime
+
+class MarketSentiment(BaseModel):
+ fear_greed: int # 0-100
+ fear_greed_label: str # "Extreme Fear", "Fear", "Neutral", "Greed", "Extreme Greed"
+ vix: float | None = None
+ fed_stance: str # "hawkish", "neutral", "dovish"
+ market_regime: str # "risk_on", "neutral", "risk_off"
+ updated_at: datetime
+
+class SelectedStock(BaseModel):
+ symbol: str
+ side: OrderSide # BUY or SELL
+ conviction: float # 0.0 to 1.0
+ reason: str # Selection rationale
+ key_news: list[str] # Key news headlines
+
+class Candidate(BaseModel):
+ symbol: str
+ source: str # "sentiment" or "llm"
+ direction: OrderSide | None = None # Suggested direction (if known)
+ score: float # Relevance/priority score
+ reason: str # Why this candidate was selected
+```
+
+## 3. Sentiment Analysis Pipeline
+
+### Location
+
+Refactor existing `shared/src/shared/sentiment.py`.
+
+### Two-Stage Analysis
+
+**Stage 1: Per-news sentiment (at collection time)**
+- VADER (nltk.sentiment, free) for English headlines
+- Keyword rule engine for domain-specific terms (e.g., "tariff" → negative for importers, positive for domestic producers)
+- Score stored in `NewsItem.sentiment`
+
+**Stage 2: Per-symbol aggregation (every 15 minutes)**
+
+```
+composite = (
+ news_score * 0.3 +
+ social_score * 0.2 +
+ policy_score * 0.3 +
+ filing_score * 0.2
+) * freshness_decay
+```
+
+Freshness decay:
+- < 1 hour: 1.0
+- 1-6 hours: 0.7
+- 6-24 hours: 0.3
+- > 24 hours: excluded
+
+Policy score weighted high because US stock market is heavily influenced by policy events (tariffs, regulation, subsidies).
+
+### Market-Level Gating
+
+`MarketSentiment.market_regime` determination:
+- `risk_off`: Fear & Greed < 20 OR VIX > 30 → **block all trades**
+- `risk_on`: Fear & Greed > 60 AND VIX < 20
+- `neutral`: everything else
+
+This extends the existing `sentiment.py` `should_block()` logic.
+
+## 4. Stock Selector Engine
+
+### Location
+
+`services/strategy-engine/src/strategy_engine/stock_selector.py`
+
+### Three-Stage Selection Process
+
+**Stage 1: Candidate Pool (15:00 ET)**
+
+Two candidate sources, results merged (deduplicated):
+
+```python
+class CandidateSource(ABC):
+ @abstractmethod
+ async def get_candidates(self) -> list[Candidate]
+
+class SentimentCandidateSource(CandidateSource):
+ """Top N symbols by composite SymbolScore from DB."""
+
+class LLMCandidateSource(CandidateSource):
+ """Send today's top news summary to Claude, get related symbols + direction."""
+```
+
+- SentimentCandidateSource: top 20 by composite score
+- LLMCandidateSource: Claude analyzes today's major news and recommends affected symbols
+- Merged pool: typically 20-30 candidates
+
+**Stage 2: Technical Filter (15:15 ET)**
+
+Apply existing MOC screening criteria to candidates:
+- Fetch recent price data from Alpaca for all candidates
+- RSI 30-60
+- Price > 20-period EMA
+- Volume > average
+- Bullish candle pattern
+- Result: typically 5-10 survivors
+
+**Stage 3: LLM Final Selection (15:30 ET)**
+
+Send to Claude:
+- Filtered candidate list with technical indicators
+- Per-symbol sentiment scores and top news headlines
+- Market sentiment (Fear & Greed, VIX, Fed stance)
+- Prompt: "Select 2-3 stocks for MOC trading with rationale"
+
+Response parsed into `list[SelectedStock]`.
+
+### Integration with MOC Strategy
+
+Current: MOC strategy receives candles for fixed symbols and decides internally.
+
+New flow:
+1. `StockSelector` publishes `SelectedStock` list to Redis stream `selected_stocks` at 15:30 ET
+2. MOC strategy reads `selected_stocks` to get today's targets
+3. MOC still applies its own technical checks at 15:50-16:00 as a safety net
+4. If a selected stock fails the final technical check, it's skipped (no forced trades)
+
+## 5. Database Schema
+
+Four new tables via Alembic migration:
+
+```sql
+CREATE TABLE news_items (
+ id UUID PRIMARY KEY,
+ source VARCHAR(50) NOT NULL,
+ headline TEXT NOT NULL,
+ summary TEXT,
+ url TEXT,
+ published_at TIMESTAMPTZ NOT NULL,
+ symbols TEXT[],
+ sentiment FLOAT NOT NULL,
+ category VARCHAR(50) NOT NULL,
+ raw_data JSONB DEFAULT '{}',
+ created_at TIMESTAMPTZ DEFAULT NOW()
+);
+CREATE INDEX idx_news_items_published ON news_items(published_at);
+CREATE INDEX idx_news_items_symbols ON news_items USING GIN(symbols);
+
+CREATE TABLE symbol_scores (
+ id UUID PRIMARY KEY,
+ symbol VARCHAR(10) NOT NULL,
+ news_score FLOAT NOT NULL DEFAULT 0,
+ news_count INT NOT NULL DEFAULT 0,
+ social_score FLOAT NOT NULL DEFAULT 0,
+ policy_score FLOAT NOT NULL DEFAULT 0,
+ filing_score FLOAT NOT NULL DEFAULT 0,
+ composite FLOAT NOT NULL DEFAULT 0,
+ updated_at TIMESTAMPTZ NOT NULL
+);
+CREATE UNIQUE INDEX idx_symbol_scores_symbol ON symbol_scores(symbol);
+
+CREATE TABLE market_sentiment (
+ id UUID PRIMARY KEY,
+ fear_greed INT NOT NULL,
+ fear_greed_label VARCHAR(30) NOT NULL,
+ vix FLOAT,
+ fed_stance VARCHAR(20) NOT NULL DEFAULT 'neutral',
+ market_regime VARCHAR(20) NOT NULL DEFAULT 'neutral',
+ updated_at TIMESTAMPTZ NOT NULL
+);
+
+CREATE TABLE stock_selections (
+ id UUID PRIMARY KEY,
+ trade_date DATE NOT NULL,
+ symbol VARCHAR(10) NOT NULL,
+ side VARCHAR(4) NOT NULL,
+ conviction FLOAT NOT NULL,
+ reason TEXT NOT NULL,
+ key_news JSONB DEFAULT '[]',
+ sentiment_snapshot JSONB DEFAULT '{}',
+ created_at TIMESTAMPTZ DEFAULT NOW()
+);
+CREATE INDEX idx_stock_selections_date ON stock_selections(trade_date);
+```
+
+`stock_selections` stores an audit trail: why each stock was selected, enabling post-hoc analysis of selection quality.
+
+## 6. Redis Streams
+
+| Stream | Producer | Consumer | Payload |
+|--------|----------|----------|---------|
+| `news` | news-collector | strategy-engine (sentiment aggregator) | NewsItem |
+| `selected_stocks` | stock-selector | MOC strategy | SelectedStock |
+
+Existing streams (`candles`, `signals`, `orders`) unchanged.
+
+## 7. Docker Compose Addition
+
+```yaml
+news-collector:
+ build:
+ context: .
+ dockerfile: services/news-collector/Dockerfile
+ env_file: .env
+ ports:
+ - "8084:8084"
+ depends_on:
+ redis: { condition: service_healthy }
+ postgres: { condition: service_healthy }
+ healthcheck:
+ test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8084/health')"]
+ interval: 10s
+ timeout: 5s
+ retries: 3
+ restart: unless-stopped
+```
+
+## 8. Environment Variables
+
+```bash
+# News Collector
+FINNHUB_API_KEY= # Free key from finnhub.io
+NEWS_POLL_INTERVAL=300 # Default 5 min (overrides per-collector defaults)
+SENTIMENT_AGGREGATE_INTERVAL=900 # 15 min
+
+# Stock Selector
+SELECTOR_CANDIDATES_TIME=15:00 # ET, candidate pool generation
+SELECTOR_FILTER_TIME=15:15 # ET, technical filter
+SELECTOR_FINAL_TIME=15:30 # ET, LLM final pick
+SELECTOR_MAX_PICKS=3
+
+# LLM (for stock selector + screener)
+ANTHROPIC_API_KEY=
+ANTHROPIC_MODEL=claude-sonnet-4-20250514
+```
+
+## 9. Telegram Notifications
+
+Extend existing `shared/notifier.py` with:
+
+```python
+async def send_stock_selection(self, selections: list[SelectedStock], market: MarketSentiment):
+ """
+ 📊 오늘의 종목 선정 (2/3)
+
+ 1. NVDA 🟢 BUY (확신도: 0.85)
+ 근거: 트럼프 반도체 보조금 확대 발표, RSI 42
+ 핵심뉴스: "Trump signs CHIPS Act expansion..."
+
+ 2. XOM 🟢 BUY (확신도: 0.72)
+ 근거: 유가 상승 + 실적 서프라이즈, 볼륨 급증
+
+ 시장심리: Fear & Greed 55 (Neutral) | VIX 18.2
+ """
+```
+
+## 10. Testing Strategy
+
+**Unit tests:**
+- Each collector: mock HTTP responses → verify NewsItem parsing
+- Sentiment analysis: verify VADER + keyword scoring
+- Aggregator: mock news data → verify SymbolScore calculation and freshness decay
+- Stock selector: mock scores → verify candidate/filter/selection pipeline
+- LLM calls: mock Claude response → verify SelectedStock parsing
+
+**Integration tests:**
+- Full pipeline: news collection → DB → aggregation → selection
+- Market gating: verify `risk_off` blocks all trades
+- MOC integration: verify selected stocks flow to MOC strategy
+
+**Post-hoc analysis (future):**
+- Use `stock_selections` audit trail to measure selection accuracy
+- Historical news data replay for backtesting requires paid data (deferred)
+
+## 11. Out of Scope (Future)
+
+- Paid API integration (designed for, not implemented)
+- Historical news backtesting
+- WebSocket real-time news streaming
+- Multi-language sentiment analysis
+- Options/derivatives signals
diff --git a/services/news-collector/Dockerfile b/services/news-collector/Dockerfile
new file mode 100644
index 0000000..a8e5902
--- /dev/null
+++ b/services/news-collector/Dockerfile
@@ -0,0 +1,9 @@
+FROM python:3.12-slim
+WORKDIR /app
+COPY shared/ shared/
+RUN pip install --no-cache-dir ./shared
+COPY services/news-collector/ services/news-collector/
+RUN pip install --no-cache-dir ./services/news-collector
+RUN python -c "import nltk; nltk.download('vader_lexicon', quiet=True)"
+ENV PYTHONPATH=/app
+CMD ["python", "-m", "news_collector.main"]
diff --git a/services/news-collector/pyproject.toml b/services/news-collector/pyproject.toml
new file mode 100644
index 0000000..14c856a
--- /dev/null
+++ b/services/news-collector/pyproject.toml
@@ -0,0 +1,25 @@
+[project]
+name = "news-collector"
+version = "0.1.0"
+description = "News and sentiment data collector service"
+requires-python = ">=3.12"
+dependencies = [
+ "trading-shared",
+ "feedparser>=6.0",
+ "nltk>=3.8",
+ "aiohttp>=3.9",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest>=8.0",
+ "pytest-asyncio>=0.23",
+ "aioresponses>=0.7",
+]
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/news_collector"]
diff --git a/services/news-collector/src/news_collector/__init__.py b/services/news-collector/src/news_collector/__init__.py
new file mode 100644
index 0000000..5547af2
--- /dev/null
+++ b/services/news-collector/src/news_collector/__init__.py
@@ -0,0 +1 @@
+"""News collector service."""
diff --git a/services/news-collector/src/news_collector/collectors/__init__.py b/services/news-collector/src/news_collector/collectors/__init__.py
new file mode 100644
index 0000000..5ef36a7
--- /dev/null
+++ b/services/news-collector/src/news_collector/collectors/__init__.py
@@ -0,0 +1 @@
+"""News collectors."""
diff --git a/services/news-collector/src/news_collector/collectors/base.py b/services/news-collector/src/news_collector/collectors/base.py
new file mode 100644
index 0000000..bb43fd6
--- /dev/null
+++ b/services/news-collector/src/news_collector/collectors/base.py
@@ -0,0 +1,18 @@
+"""Base class for all news collectors."""
+
+from abc import ABC, abstractmethod
+
+from shared.models import NewsItem
+
+
+class BaseCollector(ABC):
+ name: str = "base"
+ poll_interval: int = 300 # seconds
+
+ @abstractmethod
+ async def collect(self) -> list[NewsItem]:
+ """Collect news items from the source."""
+
+ @abstractmethod
+ async def is_available(self) -> bool:
+ """Check if this data source is accessible."""
diff --git a/services/news-collector/src/news_collector/collectors/fear_greed.py b/services/news-collector/src/news_collector/collectors/fear_greed.py
new file mode 100644
index 0000000..f79f716
--- /dev/null
+++ b/services/news-collector/src/news_collector/collectors/fear_greed.py
@@ -0,0 +1,63 @@
+"""CNN Fear & Greed Index collector."""
+
+import logging
+from dataclasses import dataclass
+from typing import Optional
+
+import aiohttp
+
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+FEAR_GREED_URL = "https://production.dataviz.cnn.io/index/fearandgreed/graphdata"
+
+
+@dataclass
+class FearGreedResult:
+ fear_greed: int
+ fear_greed_label: str
+
+
+class FearGreedCollector(BaseCollector):
+ name = "fear_greed"
+ poll_interval = 3600 # 1 hour
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_index(self) -> Optional[dict]:
+ headers = {"User-Agent": "Mozilla/5.0"}
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(
+ FEAR_GREED_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
+ ) as resp:
+ if resp.status != 200:
+ return None
+ return await resp.json()
+ except Exception:
+ return None
+
+ def _classify(self, score: int) -> str:
+ if score <= 20:
+ return "Extreme Fear"
+ if score <= 40:
+ return "Fear"
+ if score <= 60:
+ return "Neutral"
+ if score <= 80:
+ return "Greed"
+ return "Extreme Greed"
+
+ async def collect(self) -> Optional[FearGreedResult]:
+ data = await self._fetch_index()
+ if data is None:
+ return None
+ try:
+ fg = data["fear_and_greed"]
+ score = int(fg["score"])
+ label = fg.get("rating", self._classify(score))
+ return FearGreedResult(fear_greed=score, fear_greed_label=label)
+ except (KeyError, ValueError, TypeError):
+ return None
diff --git a/services/news-collector/src/news_collector/collectors/fed.py b/services/news-collector/src/news_collector/collectors/fed.py
new file mode 100644
index 0000000..fce4842
--- /dev/null
+++ b/services/news-collector/src/news_collector/collectors/fed.py
@@ -0,0 +1,119 @@
+"""Federal Reserve RSS collector with hawkish/dovish/neutral stance detection."""
+
+import asyncio
+import logging
+from calendar import timegm
+from datetime import datetime, timezone
+
+import feedparser
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+
+from .base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+_FED_RSS_URL = "https://www.federalreserve.gov/feeds/press_all.xml"
+
+_HAWKISH_KEYWORDS = [
+ "rate hike",
+ "interest rate increase",
+ "tighten",
+ "tightening",
+ "inflation",
+ "hawkish",
+ "restrictive",
+ "raise rates",
+ "hike rates",
+]
+_DOVISH_KEYWORDS = [
+ "rate cut",
+ "interest rate decrease",
+ "easing",
+ "ease",
+ "stimulus",
+ "dovish",
+ "accommodative",
+ "lower rates",
+ "cut rates",
+ "quantitative easing",
+]
+
+
+def _detect_stance(text: str) -> str:
+ lower = text.lower()
+ hawkish_hits = sum(1 for kw in _HAWKISH_KEYWORDS if kw in lower)
+ dovish_hits = sum(1 for kw in _DOVISH_KEYWORDS if kw in lower)
+ if hawkish_hits > dovish_hits:
+ return "hawkish"
+ if dovish_hits > hawkish_hits:
+ return "dovish"
+ return "neutral"
+
+
+class FedCollector(BaseCollector):
+ name: str = "fed"
+ poll_interval: int = 3600
+
+ def __init__(self) -> None:
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_fed_rss(self) -> list[dict]:
+ loop = asyncio.get_event_loop()
+ try:
+ parsed = await loop.run_in_executor(None, feedparser.parse, _FED_RSS_URL)
+ return parsed.get("entries", [])
+ except Exception as exc:
+ logger.error("Fed RSS fetch failed: %s", exc)
+ return []
+
+ def _parse_published(self, entry: dict) -> datetime:
+ published_parsed = entry.get("published_parsed")
+ if published_parsed:
+ try:
+ ts = timegm(published_parsed)
+ return datetime.fromtimestamp(ts, tz=timezone.utc)
+ except Exception:
+ pass
+ return datetime.now(timezone.utc)
+
+ async def collect(self) -> list[NewsItem]:
+ try:
+ entries = await self._fetch_fed_rss()
+ except Exception as exc:
+ logger.error("Fed collector error: %s", exc)
+ return []
+
+ items: list[NewsItem] = []
+
+ for entry in entries:
+ title = entry.get("title", "").strip()
+ if not title:
+ continue
+
+ summary = entry.get("summary", "") or ""
+ combined = f"{title} {summary}"
+
+ sentiment = self._vader.polarity_scores(combined)["compound"]
+ stance = _detect_stance(combined)
+ published_at = self._parse_published(entry)
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=title,
+ summary=summary or None,
+ url=entry.get("link") or None,
+ published_at=published_at,
+ symbols=[],
+ sentiment=sentiment,
+ category=NewsCategory.FED,
+ raw_data={"stance": stance, **dict(entry)},
+ )
+ )
+
+ return items
diff --git a/services/news-collector/src/news_collector/collectors/finnhub.py b/services/news-collector/src/news_collector/collectors/finnhub.py
new file mode 100644
index 0000000..13e3602
--- /dev/null
+++ b/services/news-collector/src/news_collector/collectors/finnhub.py
@@ -0,0 +1,88 @@
+"""Finnhub news collector with VADER sentiment analysis."""
+
+import logging
+from datetime import datetime, timezone
+
+import aiohttp
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+
+from .base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+_CATEGORY_KEYWORDS: dict[NewsCategory, list[str]] = {
+ NewsCategory.FED: ["fed", "fomc", "rate", "federal reserve"],
+ NewsCategory.POLICY: ["tariff", "trump", "regulation", "policy", "trade war"],
+ NewsCategory.EARNINGS: ["earnings", "revenue", "profit", "eps", "guidance", "quarter"],
+}
+
+
+def _categorize(text: str) -> NewsCategory:
+ lower = text.lower()
+ for category, keywords in _CATEGORY_KEYWORDS.items():
+ if any(kw in lower for kw in keywords):
+ return category
+ return NewsCategory.MACRO
+
+
+class FinnhubCollector(BaseCollector):
+ name: str = "finnhub"
+ poll_interval: int = 300
+
+ _BASE_URL = "https://finnhub.io/api/v1/news"
+
+ def __init__(self, api_key: str) -> None:
+ self._api_key = api_key
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return bool(self._api_key)
+
+ async def _fetch_news(self) -> list[dict]:
+ url = f"{self._BASE_URL}?category=general&token={self._api_key}"
+ async with aiohttp.ClientSession() as session:
+ async with session.get(url) as resp:
+ resp.raise_for_status()
+ return await resp.json()
+
+ async def collect(self) -> list[NewsItem]:
+ try:
+ raw_items = await self._fetch_news()
+ except Exception as exc:
+ logger.error("Finnhub fetch failed: %s", exc)
+ return []
+
+ items: list[NewsItem] = []
+ for article in raw_items:
+ headline = article.get("headline", "")
+ summary = article.get("summary", "")
+ combined = f"{headline} {summary}"
+
+ sentiment_scores = self._vader.polarity_scores(combined)
+ sentiment = sentiment_scores["compound"]
+
+ ts = article.get("datetime", 0)
+ published_at = datetime.fromtimestamp(ts, tz=timezone.utc)
+
+ related = article.get("related", "")
+ symbols = [t.strip() for t in related.split(",") if t.strip()] if related else []
+
+ category = _categorize(combined)
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=headline,
+ summary=summary or None,
+ url=article.get("url") or None,
+ published_at=published_at,
+ symbols=symbols,
+ sentiment=sentiment,
+ category=category,
+ raw_data=article,
+ )
+ )
+
+ return items
diff --git a/services/news-collector/src/news_collector/collectors/reddit.py b/services/news-collector/src/news_collector/collectors/reddit.py
new file mode 100644
index 0000000..226a2f9
--- /dev/null
+++ b/services/news-collector/src/news_collector/collectors/reddit.py
@@ -0,0 +1,97 @@
+"""Reddit social sentiment collector using JSON API with VADER sentiment analysis."""
+
+import logging
+import re
+from datetime import datetime, timezone
+
+import aiohttp
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+
+from .base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+_SUBREDDITS = ["wallstreetbets", "stocks", "investing"]
+_MIN_SCORE = 50
+
+_TICKER_PATTERN = re.compile(
+ r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|BRK\.?[AB]|JPM|V|UNH|XOM|"
+ r"JNJ|WMT|MA|PG|HD|CVX|MRK|LLY|ABBV|PFE|BAC|KO|AVGO|COST|MCD|TMO|"
+ r"CSCO|ACN|ABT|DHR|TXN|NEE|NFLX|PM|UPS|RTX|HON|QCOM|AMGN|LOW|IBM|"
+ r"INTC|AMD|PYPL|GS|MS|BLK|SPGI|CAT|DE|GE|MMM|BA|F|GM|DIS|CMCSA)\b"
+)
+
+
+class RedditCollector(BaseCollector):
+ name: str = "reddit"
+ poll_interval: int = 900
+
+ def __init__(self) -> None:
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_subreddit(self, subreddit: str) -> list[dict]:
+ url = f"https://www.reddit.com/r/{subreddit}/hot.json?limit=25"
+ headers = {"User-Agent": "TradingPlatform/1.0 (research@example.com)"}
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(
+ url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
+ ) as resp:
+ if resp.status == 200:
+ data = await resp.json()
+ return data.get("data", {}).get("children", [])
+ except Exception as exc:
+ logger.error("Reddit fetch failed for r/%s: %s", subreddit, exc)
+ return []
+
+ async def collect(self) -> list[NewsItem]:
+ seen_titles: set[str] = set()
+ items: list[NewsItem] = []
+
+ for subreddit in _SUBREDDITS:
+ try:
+ posts = await self._fetch_subreddit(subreddit)
+ except Exception as exc:
+ logger.error("Reddit collector error for r/%s: %s", subreddit, exc)
+ continue
+
+ for post in posts:
+ post_data = post.get("data", {})
+ title = post_data.get("title", "").strip()
+ score = post_data.get("score", 0)
+
+ if not title or score < _MIN_SCORE:
+ continue
+ if title in seen_titles:
+ continue
+ seen_titles.add(title)
+
+ selftext = post_data.get("selftext", "") or ""
+ combined = f"{title} {selftext}"
+
+ sentiment = self._vader.polarity_scores(combined)["compound"]
+ symbols = list(dict.fromkeys(_TICKER_PATTERN.findall(combined)))
+
+ created_utc = post_data.get("created_utc", 0)
+ published_at = datetime.fromtimestamp(created_utc, tz=timezone.utc)
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=title,
+ summary=selftext or None,
+ url=post_data.get("url") or None,
+ published_at=published_at,
+ symbols=symbols,
+ sentiment=sentiment,
+ category=NewsCategory.SOCIAL,
+ raw_data=post_data,
+ )
+ )
+
+ return items
diff --git a/services/news-collector/src/news_collector/collectors/rss.py b/services/news-collector/src/news_collector/collectors/rss.py
new file mode 100644
index 0000000..ddf8503
--- /dev/null
+++ b/services/news-collector/src/news_collector/collectors/rss.py
@@ -0,0 +1,105 @@
+"""RSS news collector using feedparser with VADER sentiment analysis."""
+
+import asyncio
+import logging
+import re
+from datetime import datetime, timezone
+from time import mktime
+
+import feedparser
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+
+from .base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+_DEFAULT_FEEDS = [
+ "https://finance.yahoo.com/news/rssindex",
+ "https://news.google.com/rss/search?q=stock+market+finance&hl=en-US&gl=US&ceid=US:en",
+ "https://feeds.marketwatch.com/marketwatch/topstories/",
+]
+
+_TICKER_PATTERN = re.compile(
+ r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|BRK\.?[AB]|JPM|V|UNH|XOM|"
+ r"JNJ|WMT|MA|PG|HD|CVX|MRK|LLY|ABBV|PFE|BAC|KO|AVGO|COST|MCD|TMO|"
+ r"CSCO|ACN|ABT|DHR|TXN|NEE|NFLX|PM|UPS|RTX|HON|QCOM|AMGN|LOW|IBM|"
+ r"INTC|AMD|PYPL|GS|MS|BLK|SPGI|CAT|DE|GE|MMM|BA|F|GM|DIS|CMCSA)\b"
+)
+
+
+class RSSCollector(BaseCollector):
+ name: str = "rss"
+ poll_interval: int = 600
+
+ def __init__(self, feeds: list[str] | None = None) -> None:
+ self._feeds = feeds if feeds is not None else _DEFAULT_FEEDS
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_feeds(self) -> list[dict]:
+ loop = asyncio.get_event_loop()
+ results = []
+ for url in self._feeds:
+ try:
+ parsed = await loop.run_in_executor(None, feedparser.parse, url)
+ results.append(parsed)
+ except Exception as exc:
+ logger.error("RSS fetch failed for %s: %s", url, exc)
+ return results
+
+ def _parse_published(self, entry: dict) -> datetime:
+ parsed_time = entry.get("published_parsed")
+ if parsed_time:
+ try:
+ ts = mktime(parsed_time)
+ return datetime.fromtimestamp(ts, tz=timezone.utc)
+ except Exception:
+ pass
+ return datetime.now(timezone.utc)
+
+ async def collect(self) -> list[NewsItem]:
+ try:
+ feeds = await self._fetch_feeds()
+ except Exception as exc:
+ logger.error("RSS collector error: %s", exc)
+ return []
+
+ seen_titles: set[str] = set()
+ items: list[NewsItem] = []
+
+ for feed in feeds:
+ for entry in feed.get("entries", []):
+ title = entry.get("title", "").strip()
+ if not title or title in seen_titles:
+ continue
+ seen_titles.add(title)
+
+ summary = entry.get("summary", "") or ""
+ combined = f"{title} {summary}"
+
+ sentiment_scores = self._vader.polarity_scores(combined)
+ sentiment = sentiment_scores["compound"]
+
+ symbols = list(dict.fromkeys(_TICKER_PATTERN.findall(combined)))
+
+ published_at = self._parse_published(entry)
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=title,
+ summary=summary or None,
+ url=entry.get("link") or None,
+ published_at=published_at,
+ symbols=symbols,
+ sentiment=sentiment,
+ category=NewsCategory.MACRO,
+ raw_data=dict(entry),
+ )
+ )
+
+ return items
diff --git a/services/news-collector/src/news_collector/collectors/sec_edgar.py b/services/news-collector/src/news_collector/collectors/sec_edgar.py
new file mode 100644
index 0000000..ca1d070
--- /dev/null
+++ b/services/news-collector/src/news_collector/collectors/sec_edgar.py
@@ -0,0 +1,100 @@
+"""SEC EDGAR filing collector (free, no API key required)."""
+
+import logging
+from datetime import datetime, timezone
+
+import aiohttp
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+TRACKED_CIKS = {
+ "0000320193": "AAPL",
+ "0000789019": "MSFT",
+ "0001652044": "GOOGL",
+ "0001018724": "AMZN",
+ "0001318605": "TSLA",
+ "0001045810": "NVDA",
+ "0001326801": "META",
+ "0000019617": "JPM",
+ "0000078003": "PFE",
+ "0000021344": "KO",
+}
+
+SEC_USER_AGENT = "TradingPlatform research@example.com"
+
+
+class SecEdgarCollector(BaseCollector):
+ name = "sec_edgar"
+ poll_interval = 1800 # 30 minutes
+
+ def __init__(self) -> None:
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_recent_filings(self) -> list[dict]:
+ results = []
+ headers = {"User-Agent": SEC_USER_AGENT}
+ async with aiohttp.ClientSession() as session:
+ for cik, ticker in TRACKED_CIKS.items():
+ try:
+ url = f"https://data.sec.gov/submissions/CIK{cik}.json"
+ async with session.get(
+ url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
+ ) as resp:
+ if resp.status == 200:
+ data = await resp.json()
+ data["tickers"] = [{"ticker": ticker}]
+ results.append(data)
+ except Exception as exc:
+ logger.warning("sec_fetch_failed", cik=cik, error=str(exc))
+ return results
+
+ async def collect(self) -> list[NewsItem]:
+ filings_data = await self._fetch_recent_filings()
+ items = []
+ today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
+
+ for company_data in filings_data:
+ tickers = [t["ticker"] for t in company_data.get("tickers", [])]
+ company_name = company_data.get("name", "Unknown")
+ recent = company_data.get("filings", {}).get("recent", {})
+
+ forms = recent.get("form", [])
+ dates = recent.get("filingDate", [])
+ descriptions = recent.get("primaryDocDescription", [])
+ accessions = recent.get("accessionNumber", [])
+
+ for i, form in enumerate(forms):
+ if form != "8-K":
+ continue
+ filing_date = dates[i] if i < len(dates) else ""
+ if filing_date != today:
+ continue
+
+ desc = descriptions[i] if i < len(descriptions) else "8-K Filing"
+ accession = accessions[i] if i < len(accessions) else ""
+ headline = f"{company_name} ({', '.join(tickers)}): {form} - {desc}"
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=headline,
+ summary=desc,
+ url=f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&accession={accession}",
+ published_at=datetime.strptime(filing_date, "%Y-%m-%d").replace(
+ tzinfo=timezone.utc
+ ),
+ symbols=tickers,
+ sentiment=self._vader.polarity_scores(headline)["compound"],
+ category=NewsCategory.FILING,
+ raw_data={"form": form, "accession": accession},
+ )
+ )
+
+ return items
diff --git a/services/news-collector/src/news_collector/collectors/truth_social.py b/services/news-collector/src/news_collector/collectors/truth_social.py
new file mode 100644
index 0000000..33ebc86
--- /dev/null
+++ b/services/news-collector/src/news_collector/collectors/truth_social.py
@@ -0,0 +1,86 @@
+"""Truth Social collector using Mastodon-compatible API with VADER sentiment analysis."""
+
+import logging
+import re
+from datetime import datetime, timezone
+
+import aiohttp
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+
+from .base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+_TRUMP_ACCOUNT_ID = "107780257626128497"
+_API_URL = f"https://truthsocial.com/api/v1/accounts/{_TRUMP_ACCOUNT_ID}/statuses"
+
+_HTML_TAG_PATTERN = re.compile(r"<[^>]+>")
+
+
+def _strip_html(text: str) -> str:
+ return _HTML_TAG_PATTERN.sub("", text).strip()
+
+
+class TruthSocialCollector(BaseCollector):
+ name: str = "truth_social"
+ poll_interval: int = 900
+
+ def __init__(self) -> None:
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_posts(self) -> list[dict]:
+ headers = {"User-Agent": "TradingPlatform/1.0 (research@example.com)"}
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(
+ _API_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
+ ) as resp:
+ if resp.status == 200:
+ return await resp.json()
+ except Exception as exc:
+ logger.error("Truth Social fetch failed: %s", exc)
+ return []
+
+ async def collect(self) -> list[NewsItem]:
+ try:
+ posts = await self._fetch_posts()
+ except Exception as exc:
+ logger.error("Truth Social collector error: %s", exc)
+ return []
+
+ items: list[NewsItem] = []
+
+ for post in posts:
+ raw_content = post.get("content", "") or ""
+ content = _strip_html(raw_content)
+ if not content:
+ continue
+
+ sentiment = self._vader.polarity_scores(content)["compound"]
+
+ created_at_str = post.get("created_at", "")
+ try:
+ published_at = datetime.fromisoformat(created_at_str.replace("Z", "+00:00"))
+ except Exception:
+ published_at = datetime.now(timezone.utc)
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=content[:200],
+ summary=content if len(content) > 200 else None,
+ url=post.get("url") or None,
+ published_at=published_at,
+ symbols=[],
+ sentiment=sentiment,
+ category=NewsCategory.POLICY,
+ raw_data=post,
+ )
+ )
+
+ return items
diff --git a/services/news-collector/src/news_collector/config.py b/services/news-collector/src/news_collector/config.py
new file mode 100644
index 0000000..70d98f1
--- /dev/null
+++ b/services/news-collector/src/news_collector/config.py
@@ -0,0 +1,10 @@
+"""News Collector configuration."""
+
+from shared.config import Settings
+
+
+class NewsCollectorConfig(Settings):
+ health_port: int = 8084
+ finnhub_api_key: str = ""
+ news_poll_interval: int = 300
+ sentiment_aggregate_interval: int = 900
diff --git a/services/news-collector/src/news_collector/main.py b/services/news-collector/src/news_collector/main.py
new file mode 100644
index 0000000..3493f7c
--- /dev/null
+++ b/services/news-collector/src/news_collector/main.py
@@ -0,0 +1,193 @@
+"""News Collector Service — fetches news from multiple sources and aggregates sentiment."""
+
+import asyncio
+from datetime import datetime, timezone
+
+from shared.broker import RedisBroker
+from shared.db import Database
+from shared.events import NewsEvent
+from shared.healthcheck import HealthCheckServer
+from shared.logging import setup_logging
+from shared.metrics import ServiceMetrics
+from shared.models import NewsItem
+from shared.notifier import TelegramNotifier
+from shared.sentiment_models import MarketSentiment
+from shared.sentiment import SentimentAggregator
+
+from news_collector.config import NewsCollectorConfig
+from news_collector.collectors.finnhub import FinnhubCollector
+from news_collector.collectors.rss import RSSCollector
+from news_collector.collectors.sec_edgar import SecEdgarCollector
+from news_collector.collectors.truth_social import TruthSocialCollector
+from news_collector.collectors.reddit import RedditCollector
+from news_collector.collectors.fear_greed import FearGreedCollector
+from news_collector.collectors.fed import FedCollector
+
+# Health check port: base + 4
+HEALTH_PORT_OFFSET = 4
+
+
+async def run_collector_once(collector, db: Database, broker: RedisBroker) -> int:
+ """Run a single collector, store results in DB, publish to Redis.
+
+ Returns the number of items collected.
+ """
+ items: list[NewsItem] = await collector.collect()
+ count = 0
+ for item in items:
+ await db.insert_news_item(item)
+ event = NewsEvent(data=item)
+ stream = f"news.{item.category.value}"
+ await broker.publish(stream, event.to_dict())
+ count += 1
+ return count
+
+
+async def run_collector_loop(collector, db: Database, broker: RedisBroker, log) -> None:
+ """Run a collector repeatedly on its configured poll_interval."""
+ while True:
+ try:
+ count = await run_collector_once(collector, db, broker)
+ log.info(
+ "collector_ran",
+ collector=collector.name,
+ count=count,
+ )
+ except Exception as exc:
+ log.warning(
+ "collector_error",
+ collector=collector.name,
+ error=str(exc),
+ )
+ await asyncio.sleep(collector.poll_interval)
+
+
+async def run_fear_greed_loop(collector: FearGreedCollector, db: Database, log) -> None:
+ """Fetch Fear & Greed index on its interval and update MarketSentiment in DB."""
+ while True:
+ try:
+ result = await collector.collect()
+ if result is not None:
+ ms = MarketSentiment(
+ fear_greed=result.fear_greed,
+ fear_greed_label=result.fear_greed_label,
+ vix=None,
+ fed_stance="neutral",
+ market_regime=_determine_regime(result.fear_greed, None),
+ updated_at=datetime.now(timezone.utc),
+ )
+ await db.upsert_market_sentiment(ms)
+ log.info(
+ "fear_greed_updated",
+ value=result.fear_greed,
+ label=result.fear_greed_label,
+ )
+ except Exception as exc:
+ log.warning("fear_greed_error", error=str(exc))
+ await asyncio.sleep(collector.poll_interval)
+
+
+async def run_aggregator_loop(db: Database, interval: int, log) -> None:
+ """Run SentimentAggregator every interval seconds and persist scores."""
+ aggregator = SentimentAggregator()
+ while True:
+ await asyncio.sleep(interval)
+ try:
+ now = datetime.now(timezone.utc)
+ news_items = await db.get_recent_news(hours=24)
+ scores = aggregator.aggregate(news_items, now)
+ for score in scores.values():
+ await db.upsert_symbol_score(score)
+ log.info("aggregation_complete", symbols=len(scores))
+ except Exception as exc:
+ log.warning("aggregator_error", error=str(exc))
+
+
+def _determine_regime(fear_greed: int, vix: float | None) -> str:
+ """Classify market regime from fear/greed index and optional VIX."""
+ aggregator = SentimentAggregator()
+ return aggregator.determine_regime(fear_greed, vix)
+
+
+async def run() -> None:
+ config = NewsCollectorConfig()
+ log = setup_logging("news-collector", config.log_level, config.log_format)
+ metrics = ServiceMetrics("news_collector")
+
+ notifier = TelegramNotifier(
+ bot_token=config.telegram_bot_token,
+ chat_id=config.telegram_chat_id,
+ )
+
+ db = Database(config.database_url)
+ await db.connect()
+
+ broker = RedisBroker(config.redis_url)
+
+ health = HealthCheckServer(
+ "news-collector",
+ port=config.health_port,
+ auth_token=config.metrics_auth_token,
+ )
+ await health.start()
+ metrics.service_up.labels(service="news-collector").set(1)
+
+ # Build collectors
+ finnhub = FinnhubCollector(api_key=config.finnhub_api_key)
+ rss = RSSCollector()
+ sec = SecEdgarCollector()
+ truth = TruthSocialCollector()
+ reddit = RedditCollector()
+ fear_greed = FearGreedCollector()
+ fed = FedCollector()
+
+ news_collectors = [finnhub, rss, sec, truth, reddit, fed]
+
+ log.info(
+ "starting",
+ collectors=[c.name for c in news_collectors],
+ poll_interval=config.news_poll_interval,
+ aggregate_interval=config.sentiment_aggregate_interval,
+ )
+
+ try:
+ tasks = []
+ for collector in news_collectors:
+ tasks.append(
+ asyncio.create_task(
+ run_collector_loop(collector, db, broker, log),
+ name=f"collector-{collector.name}",
+ )
+ )
+ tasks.append(
+ asyncio.create_task(
+ run_fear_greed_loop(fear_greed, db, log),
+ name="fear-greed-loop",
+ )
+ )
+ tasks.append(
+ asyncio.create_task(
+ run_aggregator_loop(db, config.sentiment_aggregate_interval, log),
+ name="aggregator-loop",
+ )
+ )
+ await asyncio.gather(*tasks)
+ except Exception as exc:
+ log.error("fatal_error", error=str(exc))
+ await notifier.send_error(str(exc), "news-collector")
+ raise
+ finally:
+ metrics.service_up.labels(service="news-collector").set(0)
+ for task in tasks:
+ task.cancel()
+ await notifier.close()
+ await broker.close()
+ await db.close()
+
+
+def main() -> None:
+ asyncio.run(run())
+
+
+if __name__ == "__main__":
+ main()
diff --git a/services/news-collector/tests/__init__.py b/services/news-collector/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/services/news-collector/tests/__init__.py
diff --git a/services/news-collector/tests/test_fear_greed.py b/services/news-collector/tests/test_fear_greed.py
new file mode 100644
index 0000000..d483aa6
--- /dev/null
+++ b/services/news-collector/tests/test_fear_greed.py
@@ -0,0 +1,49 @@
+"""Tests for CNN Fear & Greed Index collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from news_collector.collectors.fear_greed import FearGreedCollector
+
+
+@pytest.fixture
+def collector():
+ return FearGreedCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "fear_greed"
+ assert collector.poll_interval == 3600
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_api_response(collector):
+ mock_data = {
+ "fear_and_greed": {
+ "score": 45.0,
+ "rating": "Fear",
+ "timestamp": "2026-04-02T12:00:00+00:00",
+ }
+ }
+ with patch.object(collector, "_fetch_index", new_callable=AsyncMock, return_value=mock_data):
+ result = await collector.collect()
+ assert result.fear_greed == 45
+ assert result.fear_greed_label == "Fear"
+
+
+async def test_collect_returns_none_on_failure(collector):
+ with patch.object(collector, "_fetch_index", new_callable=AsyncMock, return_value=None):
+ result = await collector.collect()
+ assert result is None
+
+
+def test_classify_label():
+ c = FearGreedCollector()
+ assert c._classify(10) == "Extreme Fear"
+ assert c._classify(30) == "Fear"
+ assert c._classify(50) == "Neutral"
+ assert c._classify(70) == "Greed"
+ assert c._classify(85) == "Extreme Greed"
diff --git a/services/news-collector/tests/test_fed.py b/services/news-collector/tests/test_fed.py
new file mode 100644
index 0000000..d1a736b
--- /dev/null
+++ b/services/news-collector/tests/test_fed.py
@@ -0,0 +1,37 @@
+"""Tests for Federal Reserve collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+from news_collector.collectors.fed import FedCollector
+
+
+@pytest.fixture
+def collector():
+ return FedCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "fed"
+ assert collector.poll_interval == 3600
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_rss(collector):
+ mock_entries = [
+ {
+ "title": "Federal Reserve issues FOMC statement",
+ "link": "https://www.federalreserve.gov/newsevents/pressreleases/monetary20260402a.htm",
+ "published_parsed": (2026, 4, 2, 14, 0, 0, 0, 0, 0),
+ "summary": "The Federal Open Market Committee decided to maintain the target range...",
+ },
+ ]
+ with patch.object(
+ collector, "_fetch_fed_rss", new_callable=AsyncMock, return_value=mock_entries
+ ):
+ items = await collector.collect()
+ assert len(items) == 1
+ assert items[0].source == "fed"
+ assert items[0].category.value == "fed"
diff --git a/services/news-collector/tests/test_finnhub.py b/services/news-collector/tests/test_finnhub.py
new file mode 100644
index 0000000..a4cf169
--- /dev/null
+++ b/services/news-collector/tests/test_finnhub.py
@@ -0,0 +1,67 @@
+"""Tests for Finnhub news collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from news_collector.collectors.finnhub import FinnhubCollector
+
+
+@pytest.fixture
+def collector():
+ return FinnhubCollector(api_key="test_key")
+
+
+def test_collector_name(collector):
+ assert collector.name == "finnhub"
+ assert collector.poll_interval == 300
+
+
+async def test_is_available_with_key(collector):
+ assert await collector.is_available() is True
+
+
+async def test_is_available_without_key():
+ c = FinnhubCollector(api_key="")
+ assert await c.is_available() is False
+
+
+async def test_collect_parses_response(collector):
+ mock_response = [
+ {
+ "category": "top news",
+ "datetime": 1711929600,
+ "headline": "AAPL beats earnings",
+ "id": 12345,
+ "related": "AAPL",
+ "source": "MarketWatch",
+ "summary": "Apple reported better than expected...",
+ "url": "https://example.com/article",
+ },
+ {
+ "category": "top news",
+ "datetime": 1711929000,
+ "headline": "Fed holds rates steady",
+ "id": 12346,
+ "related": "",
+ "source": "Reuters",
+ "summary": "The Federal Reserve...",
+ "url": "https://example.com/fed",
+ },
+ ]
+
+ with patch.object(collector, "_fetch_news", new_callable=AsyncMock, return_value=mock_response):
+ items = await collector.collect()
+
+ assert len(items) == 2
+ assert items[0].source == "finnhub"
+ assert items[0].headline == "AAPL beats earnings"
+ assert items[0].symbols == ["AAPL"]
+ assert items[0].url == "https://example.com/article"
+ assert isinstance(items[0].sentiment, float)
+ assert items[1].symbols == []
+
+
+async def test_collect_handles_empty_response(collector):
+ with patch.object(collector, "_fetch_news", new_callable=AsyncMock, return_value=[]):
+ items = await collector.collect()
+ assert items == []
diff --git a/services/news-collector/tests/test_main.py b/services/news-collector/tests/test_main.py
new file mode 100644
index 0000000..66190dc
--- /dev/null
+++ b/services/news-collector/tests/test_main.py
@@ -0,0 +1,39 @@
+"""Tests for news collector scheduler."""
+
+from unittest.mock import AsyncMock, MagicMock
+from datetime import datetime, timezone
+from shared.models import NewsCategory, NewsItem
+from news_collector.main import run_collector_once
+
+
+async def test_run_collector_once_stores_and_publishes():
+ mock_item = NewsItem(
+ source="test",
+ headline="Test news",
+ published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ sentiment=0.5,
+ category=NewsCategory.MACRO,
+ )
+ mock_collector = MagicMock()
+ mock_collector.name = "test"
+ mock_collector.collect = AsyncMock(return_value=[mock_item])
+ mock_db = MagicMock()
+ mock_db.insert_news_item = AsyncMock()
+ mock_broker = MagicMock()
+ mock_broker.publish = AsyncMock()
+
+ count = await run_collector_once(mock_collector, mock_db, mock_broker)
+ assert count == 1
+ mock_db.insert_news_item.assert_called_once_with(mock_item)
+ mock_broker.publish.assert_called_once()
+
+
+async def test_run_collector_once_handles_empty():
+ mock_collector = MagicMock()
+ mock_collector.name = "test"
+ mock_collector.collect = AsyncMock(return_value=[])
+ mock_db = MagicMock()
+ mock_broker = MagicMock()
+
+ count = await run_collector_once(mock_collector, mock_db, mock_broker)
+ assert count == 0
diff --git a/services/news-collector/tests/test_reddit.py b/services/news-collector/tests/test_reddit.py
new file mode 100644
index 0000000..440b173
--- /dev/null
+++ b/services/news-collector/tests/test_reddit.py
@@ -0,0 +1,63 @@
+"""Tests for Reddit collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+from news_collector.collectors.reddit import RedditCollector
+
+
+@pytest.fixture
+def collector():
+ return RedditCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "reddit"
+ assert collector.poll_interval == 900
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_posts(collector):
+ mock_posts = [
+ {
+ "data": {
+ "title": "NVDA to the moon! AI demand is insane",
+ "selftext": "Just loaded up on NVDA calls",
+ "url": "https://reddit.com/r/wallstreetbets/123",
+ "created_utc": 1711929600,
+ "score": 500,
+ "num_comments": 200,
+ "subreddit": "wallstreetbets",
+ }
+ },
+ ]
+ with patch.object(
+ collector, "_fetch_subreddit", new_callable=AsyncMock, return_value=mock_posts
+ ):
+ items = await collector.collect()
+ assert len(items) >= 1
+ assert items[0].source == "reddit"
+ assert items[0].category.value == "social"
+
+
+async def test_collect_filters_low_score(collector):
+ mock_posts = [
+ {
+ "data": {
+ "title": "Random question",
+ "selftext": "",
+ "url": "https://reddit.com/456",
+ "created_utc": 1711929600,
+ "score": 3,
+ "num_comments": 1,
+ "subreddit": "stocks",
+ }
+ },
+ ]
+ with patch.object(
+ collector, "_fetch_subreddit", new_callable=AsyncMock, return_value=mock_posts
+ ):
+ items = await collector.collect()
+ assert items == []
diff --git a/services/news-collector/tests/test_rss.py b/services/news-collector/tests/test_rss.py
new file mode 100644
index 0000000..e03250a
--- /dev/null
+++ b/services/news-collector/tests/test_rss.py
@@ -0,0 +1,47 @@
+"""Tests for RSS news collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from news_collector.collectors.rss import RSSCollector
+
+
+@pytest.fixture
+def collector():
+ return RSSCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "rss"
+ assert collector.poll_interval == 600
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_feed(collector):
+ mock_feed = {
+ "entries": [
+ {
+ "title": "NVDA surges on AI demand",
+ "link": "https://example.com/nvda",
+ "published_parsed": (2026, 4, 2, 12, 0, 0, 0, 0, 0),
+ "summary": "Nvidia stock jumped 5%...",
+ },
+ {
+ "title": "Markets rally on jobs data",
+ "link": "https://example.com/market",
+ "published_parsed": (2026, 4, 2, 11, 0, 0, 0, 0, 0),
+ "summary": "The S&P 500 rose...",
+ },
+ ],
+ }
+
+ with patch.object(collector, "_fetch_feeds", new_callable=AsyncMock, return_value=[mock_feed]):
+ items = await collector.collect()
+
+ assert len(items) == 2
+ assert items[0].source == "rss"
+ assert items[0].headline == "NVDA surges on AI demand"
+ assert isinstance(items[0].sentiment, float)
diff --git a/services/news-collector/tests/test_sec_edgar.py b/services/news-collector/tests/test_sec_edgar.py
new file mode 100644
index 0000000..5d4f69f
--- /dev/null
+++ b/services/news-collector/tests/test_sec_edgar.py
@@ -0,0 +1,58 @@
+"""Tests for SEC EDGAR filing collector."""
+
+import pytest
+from datetime import datetime, timezone
+from unittest.mock import AsyncMock, patch, MagicMock
+
+from news_collector.collectors.sec_edgar import SecEdgarCollector
+
+
+@pytest.fixture
+def collector():
+ return SecEdgarCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "sec_edgar"
+ assert collector.poll_interval == 1800
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_filings(collector):
+ mock_response = {
+ "filings": {
+ "recent": {
+ "accessionNumber": ["0001234-26-000001"],
+ "filingDate": ["2026-04-02"],
+ "primaryDocument": ["filing.htm"],
+ "form": ["8-K"],
+ "primaryDocDescription": ["Current Report"],
+ }
+ },
+ "tickers": [{"ticker": "AAPL"}],
+ "name": "Apple Inc",
+ }
+
+ mock_datetime = MagicMock(spec=datetime)
+ mock_datetime.now.return_value = datetime(2026, 4, 2, tzinfo=timezone.utc)
+ mock_datetime.strptime = datetime.strptime
+
+ with patch.object(
+ collector, "_fetch_recent_filings", new_callable=AsyncMock, return_value=[mock_response]
+ ):
+ with patch("news_collector.collectors.sec_edgar.datetime", mock_datetime):
+ items = await collector.collect()
+
+ assert len(items) == 1
+ assert items[0].source == "sec_edgar"
+ assert items[0].category.value == "filing"
+ assert "AAPL" in items[0].symbols
+
+
+async def test_collect_handles_empty(collector):
+ with patch.object(collector, "_fetch_recent_filings", new_callable=AsyncMock, return_value=[]):
+ items = await collector.collect()
+ assert items == []
diff --git a/services/news-collector/tests/test_truth_social.py b/services/news-collector/tests/test_truth_social.py
new file mode 100644
index 0000000..91ddb9d
--- /dev/null
+++ b/services/news-collector/tests/test_truth_social.py
@@ -0,0 +1,41 @@
+"""Tests for Truth Social collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+from news_collector.collectors.truth_social import TruthSocialCollector
+
+
+@pytest.fixture
+def collector():
+ return TruthSocialCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "truth_social"
+ assert collector.poll_interval == 900
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_posts(collector):
+ mock_posts = [
+ {
+ "content": "<p>We are imposing 25% tariffs on all steel imports!</p>",
+ "created_at": "2026-04-02T12:00:00.000Z",
+ "url": "https://truthsocial.com/@realDonaldTrump/12345",
+ "id": "12345",
+ },
+ ]
+ with patch.object(collector, "_fetch_posts", new_callable=AsyncMock, return_value=mock_posts):
+ items = await collector.collect()
+ assert len(items) == 1
+ assert items[0].source == "truth_social"
+ assert items[0].category.value == "policy"
+
+
+async def test_collect_handles_empty(collector):
+ with patch.object(collector, "_fetch_posts", new_callable=AsyncMock, return_value=[]):
+ items = await collector.collect()
+ assert items == []
diff --git a/services/strategy-engine/src/strategy_engine/config.py b/services/strategy-engine/src/strategy_engine/config.py
index 9fd9c49..15f8588 100644
--- a/services/strategy-engine/src/strategy_engine/config.py
+++ b/services/strategy-engine/src/strategy_engine/config.py
@@ -7,3 +7,7 @@ class StrategyConfig(Settings):
symbols: list[str] = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA"]
timeframes: list[str] = ["1m"]
strategy_params: dict = {}
+ selector_final_time: str = "15:30"
+ selector_max_picks: int = 3
+ anthropic_api_key: str = ""
+ anthropic_model: str = "claude-sonnet-4-20250514"
diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py
index 30de528..5a30766 100644
--- a/services/strategy-engine/src/strategy_engine/main.py
+++ b/services/strategy-engine/src/strategy_engine/main.py
@@ -1,17 +1,23 @@
"""Strategy Engine Service entry point."""
import asyncio
+from datetime import datetime
from pathlib import Path
+import zoneinfo
+from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
+from shared.db import Database
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
+from shared.sentiment_models import MarketSentiment
from strategy_engine.config import StrategyConfig
from strategy_engine.engine import StrategyEngine
from strategy_engine.plugin_loader import load_strategies
+from strategy_engine.stock_selector import StockSelector
# The strategies directory lives alongside the installed package
STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies"
@@ -30,6 +36,40 @@ async def process_symbol(engine: StrategyEngine, stream: str, log) -> None:
last_id = await engine.process_once(stream, last_id)
+async def run_stock_selector(
+ selector: StockSelector,
+ notifier: TelegramNotifier,
+ db: Database,
+ config: StrategyConfig,
+ log,
+) -> None:
+ """Run the stock selector once per day at the configured time."""
+ et = zoneinfo.ZoneInfo("America/New_York")
+
+ while True:
+ now_et = datetime.now(et)
+ target_hour, target_min = map(int, config.selector_final_time.split(":"))
+
+ if now_et.hour == target_hour and now_et.minute == target_min:
+ log.info("stock_selector_running")
+ try:
+ selections = await selector.select()
+ if selections:
+ ms_data = await db.get_latest_market_sentiment()
+ ms = None
+ if ms_data:
+ ms = MarketSentiment(**ms_data)
+ await notifier.send_stock_selection(selections, ms)
+ log.info("stock_selector_complete", picks=[s.symbol for s in selections])
+ else:
+ log.info("stock_selector_no_picks")
+ except Exception as exc:
+ log.error("stock_selector_error", error=str(exc))
+ await asyncio.sleep(120) # Sleep past this minute
+ else:
+ await asyncio.sleep(30)
+
+
async def run() -> None:
config = StrategyConfig()
log = setup_logging("strategy-engine", config.log_level, config.log_format)
@@ -41,6 +81,16 @@ async def run() -> None:
)
broker = RedisBroker(config.redis_url)
+
+ db = Database(config.database_url)
+ await db.connect()
+
+ alpaca = AlpacaClient(
+ api_key=config.alpaca_api_key,
+ api_secret=config.alpaca_api_secret,
+ paper=config.alpaca_paper,
+ )
+
strategies = load_strategies(STRATEGIES_DIR)
for strategy in strategies:
@@ -67,6 +117,20 @@ async def run() -> None:
task = asyncio.create_task(process_symbol(engine, stream, log))
tasks.append(task)
+ if config.anthropic_api_key:
+ selector = StockSelector(
+ db=db,
+ broker=broker,
+ alpaca=alpaca,
+ anthropic_api_key=config.anthropic_api_key,
+ anthropic_model=config.anthropic_model,
+ max_picks=config.selector_max_picks,
+ )
+ tasks.append(
+ asyncio.create_task(run_stock_selector(selector, notifier, db, config, log))
+ )
+ log.info("stock_selector_enabled", time=config.selector_final_time)
+
await asyncio.gather(*tasks)
except Exception as exc:
log.error("fatal_error", error=str(exc))
@@ -78,6 +142,8 @@ async def run() -> None:
metrics.service_up.labels(service="strategy-engine").set(0)
await notifier.close()
await broker.close()
+ await alpaca.close()
+ await db.close()
def main() -> None:
diff --git a/services/strategy-engine/src/strategy_engine/stock_selector.py b/services/strategy-engine/src/strategy_engine/stock_selector.py
new file mode 100644
index 0000000..268d557
--- /dev/null
+++ b/services/strategy-engine/src/strategy_engine/stock_selector.py
@@ -0,0 +1,404 @@
+"""3-stage stock selector engine: sentiment → technical → LLM."""
+
+import json
+import logging
+import re
+from datetime import datetime, timezone
+
+import aiohttp
+
+from shared.alpaca import AlpacaClient
+from shared.broker import RedisBroker
+from shared.db import Database
+from shared.models import OrderSide
+from shared.sentiment_models import Candidate, MarketSentiment, SelectedStock
+
+logger = logging.getLogger(__name__)
+
+ANTHROPIC_API_URL = "https://api.anthropic.com/v1/messages"
+
+
+def _parse_llm_selections(text: str) -> list[SelectedStock]:
+ """Parse LLM response into SelectedStock list.
+
+ Handles both bare JSON arrays and markdown code blocks.
+ Returns empty list on any parse error.
+ """
+ # Try to extract JSON from markdown code block first
+ code_block = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL)
+ if code_block:
+ raw = code_block.group(1)
+ else:
+ # Try to find a bare JSON array
+ array_match = re.search(r"\[.*\]", text, re.DOTALL)
+ if array_match:
+ raw = array_match.group(0)
+ else:
+ raw = text.strip()
+
+ try:
+ data = json.loads(raw)
+ if not isinstance(data, list):
+ return []
+ selections = []
+ for item in data:
+ if not isinstance(item, dict):
+ continue
+ try:
+ selection = SelectedStock(
+ symbol=item["symbol"],
+ side=OrderSide(item["side"]),
+ conviction=float(item["conviction"]),
+ reason=item.get("reason", ""),
+ key_news=item.get("key_news", []),
+ )
+ selections.append(selection)
+ except (KeyError, ValueError) as e:
+ logger.warning("Skipping invalid selection item: %s", e)
+ return selections
+ except (json.JSONDecodeError, TypeError):
+ return []
+
+
+class SentimentCandidateSource:
+ """Generates candidates from DB sentiment scores."""
+
+ def __init__(self, db: Database) -> None:
+ self._db = db
+
+ async def get_candidates(self) -> list[Candidate]:
+ rows = await self._db.get_top_symbol_scores(limit=20)
+ candidates = []
+ for row in rows:
+ composite = float(row.get("composite", 0))
+ if composite == 0:
+ continue
+ candidates.append(
+ Candidate(
+ symbol=row["symbol"],
+ source="sentiment",
+ score=composite,
+ reason=f"composite={composite:.2f}, news_count={row.get('news_count', 0)}",
+ )
+ )
+ return candidates
+
+
+class LLMCandidateSource:
+ """Generates candidates by asking Claude to analyze recent news."""
+
+ def __init__(self, db: Database, api_key: str, model: str) -> None:
+ self._db = db
+ self._api_key = api_key
+ self._model = model
+
+ async def get_candidates(self) -> list[Candidate]:
+ news_items = await self._db.get_recent_news(hours=24)
+ if not news_items:
+ return []
+
+ headlines = []
+ for item in news_items[:50]: # cap at 50 to stay within context
+ symbols = item.get("symbols", [])
+ sym_str = ", ".join(symbols) if symbols else "N/A"
+ headlines.append(f"[{sym_str}] {item['headline']}")
+
+ prompt = (
+ "You are a stock analyst. Given recent news headlines, identify the 5-10 most "
+ "actionable US stock tickers. Return ONLY a JSON array with objects having: "
+ "symbol (ticker), direction ('BUY' or 'SELL'), score (0-1), reason (brief).\n\n"
+ "Headlines:\n" + "\n".join(headlines)
+ )
+
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.post(
+ ANTHROPIC_API_URL,
+ headers={
+ "x-api-key": self._api_key,
+ "anthropic-version": "2023-06-01",
+ "content-type": "application/json",
+ },
+ json={
+ "model": self._model,
+ "max_tokens": 1024,
+ "messages": [{"role": "user", "content": prompt}],
+ },
+ ) as resp:
+ if resp.status != 200:
+ body = await resp.text()
+ logger.error("LLM candidate source error %d: %s", resp.status, body)
+ return []
+ data = await resp.json()
+
+ content = data.get("content", [])
+ text = ""
+ for block in content:
+ if isinstance(block, dict) and block.get("type") == "text":
+ text += block.get("text", "")
+
+ return self._parse_candidates(text)
+ except Exception as e:
+ logger.error("LLMCandidateSource error: %s", e)
+ return []
+
+ def _parse_candidates(self, text: str) -> list[Candidate]:
+ code_block = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL)
+ if code_block:
+ raw = code_block.group(1)
+ else:
+ array_match = re.search(r"\[.*\]", text, re.DOTALL)
+ raw = array_match.group(0) if array_match else text.strip()
+
+ try:
+ items = json.loads(raw)
+ if not isinstance(items, list):
+ return []
+ candidates = []
+ for item in items:
+ if not isinstance(item, dict):
+ continue
+ try:
+ direction_str = item.get("direction", "BUY")
+ direction = OrderSide(direction_str)
+ except ValueError:
+ direction = None
+ candidates.append(
+ Candidate(
+ symbol=item["symbol"],
+ source="llm",
+ direction=direction,
+ score=float(item.get("score", 0.5)),
+ reason=item.get("reason", ""),
+ )
+ )
+ return candidates
+ except (json.JSONDecodeError, TypeError, KeyError):
+ return []
+
+
+def _compute_rsi(closes: list[float], period: int = 14) -> float:
+ """Compute RSI for the last data point."""
+ if len(closes) < period + 1:
+ return 50.0 # neutral if insufficient data
+
+ deltas = [closes[i] - closes[i - 1] for i in range(1, len(closes))]
+ gains = [d if d > 0 else 0.0 for d in deltas]
+ losses = [-d if d < 0 else 0.0 for d in deltas]
+
+ avg_gain = sum(gains[:period]) / period
+ avg_loss = sum(losses[:period]) / period
+
+ for i in range(period, len(deltas)):
+ avg_gain = (avg_gain * (period - 1) + gains[i]) / period
+ avg_loss = (avg_loss * (period - 1) + losses[i]) / period
+
+ if avg_loss == 0:
+ return 100.0
+ rs = avg_gain / avg_loss
+ return 100.0 - (100.0 / (1.0 + rs))
+
+
+class StockSelector:
+ """Orchestrates the 3-stage stock selection pipeline."""
+
+ def __init__(
+ self,
+ db: Database,
+ broker: RedisBroker,
+ alpaca: AlpacaClient,
+ anthropic_api_key: str,
+ anthropic_model: str = "claude-sonnet-4-20250514",
+ max_picks: int = 3,
+ ) -> None:
+ self._db = db
+ self._broker = broker
+ self._alpaca = alpaca
+ self._api_key = anthropic_api_key
+ self._model = anthropic_model
+ self._max_picks = max_picks
+
+ async def select(self) -> list[SelectedStock]:
+ """Run the full 3-stage pipeline and return selected stocks."""
+ # Market gate: check sentiment
+ sentiment_data = await self._db.get_latest_market_sentiment()
+ if sentiment_data is None:
+ logger.warning("No market sentiment data; skipping selection")
+ return []
+
+ market_sentiment = MarketSentiment(**sentiment_data)
+ if market_sentiment.market_regime == "risk_off":
+ logger.info("Market is risk_off; skipping stock selection")
+ return []
+
+ # Stage 1: gather candidates from both sources
+ sentiment_source = SentimentCandidateSource(self._db)
+ llm_source = LLMCandidateSource(self._db, self._api_key, self._model)
+
+ sentiment_candidates = await sentiment_source.get_candidates()
+ llm_candidates = await llm_source.get_candidates()
+
+ candidates = self._merge_candidates(sentiment_candidates, llm_candidates)
+ if not candidates:
+ logger.info("No candidates found")
+ return []
+
+ # Stage 2: technical filter
+ filtered = await self._technical_filter(candidates)
+ if not filtered:
+ logger.info("All candidates filtered out by technical criteria")
+ return []
+
+ # Stage 3: LLM final selection
+ selections = await self._llm_final_select(filtered, market_sentiment)
+
+ # Persist and publish
+ today = datetime.now(timezone.utc).date()
+ sentiment_snapshot = {
+ "fear_greed": market_sentiment.fear_greed,
+ "market_regime": market_sentiment.market_regime,
+ "vix": market_sentiment.vix,
+ }
+ for stock in selections:
+ try:
+ await self._db.insert_stock_selection(
+ trade_date=today,
+ symbol=stock.symbol,
+ side=stock.side.value,
+ conviction=stock.conviction,
+ reason=stock.reason,
+ key_news=stock.key_news,
+ sentiment_snapshot=sentiment_snapshot,
+ )
+ except Exception as e:
+ logger.error("Failed to persist selection for %s: %s", stock.symbol, e)
+
+ try:
+ await self._broker.publish(
+ "selected_stocks",
+ {
+ "symbol": stock.symbol,
+ "side": stock.side.value,
+ "conviction": stock.conviction,
+ "reason": stock.reason,
+ "key_news": stock.key_news,
+ "trade_date": str(today),
+ },
+ )
+ except Exception as e:
+ logger.error("Failed to publish selection for %s: %s", stock.symbol, e)
+
+ return selections
+
+ def _merge_candidates(
+ self, sentiment: list[Candidate], llm: list[Candidate]
+ ) -> list[Candidate]:
+ """Deduplicate candidates by symbol, keeping the higher score."""
+ by_symbol: dict[str, Candidate] = {}
+ for c in sentiment + llm:
+ existing = by_symbol.get(c.symbol)
+ if existing is None or c.score > existing.score:
+ by_symbol[c.symbol] = c
+ return sorted(by_symbol.values(), key=lambda c: c.score, reverse=True)
+
+ async def _technical_filter(self, candidates: list[Candidate]) -> list[Candidate]:
+ """Filter candidates using RSI, EMA20, and volume criteria."""
+ passed = []
+ for candidate in candidates:
+ try:
+ bars = await self._alpaca.get_bars(candidate.symbol, timeframe="1Day", limit=60)
+ if len(bars) < 21:
+ logger.debug("Insufficient bars for %s", candidate.symbol)
+ continue
+
+ closes = [float(b["c"]) for b in bars]
+ volumes = [float(b["v"]) for b in bars]
+
+ rsi = _compute_rsi(closes)
+ if not (30 <= rsi <= 70):
+ logger.debug("%s RSI=%.1f outside 30-70", candidate.symbol, rsi)
+ continue
+
+ ema20 = sum(closes[-20:]) / 20 # simple approximation
+ current_price = closes[-1]
+ if current_price <= ema20:
+ logger.debug(
+ "%s price %.2f <= EMA20 %.2f", candidate.symbol, current_price, ema20
+ )
+ continue
+
+ avg_volume = sum(volumes[:-1]) / max(len(volumes) - 1, 1)
+ current_volume = volumes[-1]
+ if current_volume <= 0.5 * avg_volume:
+ logger.debug(
+ "%s volume %.0f <= 50%% avg %.0f",
+ candidate.symbol,
+ current_volume,
+ avg_volume,
+ )
+ continue
+
+ passed.append(candidate)
+ except Exception as e:
+ logger.warning("Technical filter error for %s: %s", candidate.symbol, e)
+
+ return passed
+
+ async def _llm_final_select(
+ self, candidates: list[Candidate], market_sentiment: MarketSentiment
+ ) -> list[SelectedStock]:
+ """Ask Claude to pick 2-3 stocks with rationale."""
+ candidate_lines = [
+ f"- {c.symbol} (source={c.source}, score={c.score:.2f}, reason={c.reason})"
+ for c in candidates
+ ]
+ market_context = (
+ f"Fear/Greed: {market_sentiment.fear_greed} ({market_sentiment.fear_greed_label}), "
+ f"VIX: {market_sentiment.vix}, "
+ f"Fed stance: {market_sentiment.fed_stance}, "
+ f"Regime: {market_sentiment.market_regime}"
+ )
+
+ prompt = (
+ f"You are a portfolio manager. Select 2-3 stocks for today's session.\n\n"
+ f"Market context: {market_context}\n\n"
+ f"Candidates (already passed technical filters):\n"
+ + "\n".join(candidate_lines)
+ + "\n\n"
+ "Return ONLY a JSON array with objects having:\n"
+ " symbol, side ('BUY' or 'SELL'), conviction (0-1), reason (1-2 sentences), "
+ "key_news (list of 1-3 relevant headlines or facts)\n"
+ f"Select at most {self._max_picks} stocks."
+ )
+
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.post(
+ ANTHROPIC_API_URL,
+ headers={
+ "x-api-key": self._api_key,
+ "anthropic-version": "2023-06-01",
+ "content-type": "application/json",
+ },
+ json={
+ "model": self._model,
+ "max_tokens": 1024,
+ "messages": [{"role": "user", "content": prompt}],
+ },
+ ) as resp:
+ if resp.status != 200:
+ body = await resp.text()
+ logger.error("LLM final select error %d: %s", resp.status, body)
+ return []
+ data = await resp.json()
+
+ content = data.get("content", [])
+ text = ""
+ for block in content:
+ if isinstance(block, dict) and block.get("type") == "text":
+ text += block.get("text", "")
+
+ return _parse_llm_selections(text)[: self._max_picks]
+ except Exception as e:
+ logger.error("LLM final select error: %s", e)
+ return []
diff --git a/services/strategy-engine/tests/conftest.py b/services/strategy-engine/tests/conftest.py
index eb31b23..2b909ef 100644
--- a/services/strategy-engine/tests/conftest.py
+++ b/services/strategy-engine/tests/conftest.py
@@ -7,3 +7,8 @@ from pathlib import Path
STRATEGIES_DIR = Path(__file__).parent.parent / "strategies"
if str(STRATEGIES_DIR) not in sys.path:
sys.path.insert(0, str(STRATEGIES_DIR.parent))
+
+# Ensure the worktree's strategy_engine src is preferred over any installed version
+WORKTREE_SRC = Path(__file__).parent.parent / "src"
+if str(WORKTREE_SRC) not in sys.path:
+ sys.path.insert(0, str(WORKTREE_SRC))
diff --git a/services/strategy-engine/tests/test_stock_selector.py b/services/strategy-engine/tests/test_stock_selector.py
new file mode 100644
index 0000000..ff9d09c
--- /dev/null
+++ b/services/strategy-engine/tests/test_stock_selector.py
@@ -0,0 +1,80 @@
+"""Tests for stock selector engine."""
+
+from unittest.mock import AsyncMock, MagicMock
+from datetime import datetime, timezone
+
+
+from strategy_engine.stock_selector import (
+ SentimentCandidateSource,
+ StockSelector,
+ _parse_llm_selections,
+)
+
+
+async def test_sentiment_candidate_source():
+ mock_db = MagicMock()
+ mock_db.get_top_symbol_scores = AsyncMock(
+ return_value=[
+ {"symbol": "AAPL", "composite": 0.8, "news_count": 5},
+ {"symbol": "NVDA", "composite": 0.6, "news_count": 3},
+ ]
+ )
+
+ source = SentimentCandidateSource(mock_db)
+ candidates = await source.get_candidates()
+
+ assert len(candidates) == 2
+ assert candidates[0].symbol == "AAPL"
+ assert candidates[0].source == "sentiment"
+
+
+def test_parse_llm_selections_valid():
+ llm_response = """
+ [
+ {"symbol": "NVDA", "side": "BUY", "conviction": 0.85, "reason": "AI demand", "key_news": ["NVDA beats earnings"]},
+ {"symbol": "XOM", "side": "BUY", "conviction": 0.72, "reason": "Oil surge", "key_news": ["Oil prices up"]}
+ ]
+ """
+ selections = _parse_llm_selections(llm_response)
+ assert len(selections) == 2
+ assert selections[0].symbol == "NVDA"
+ assert selections[0].conviction == 0.85
+
+
+def test_parse_llm_selections_invalid():
+ selections = _parse_llm_selections("not json")
+ assert selections == []
+
+
+def test_parse_llm_selections_with_markdown():
+ llm_response = """
+ Here are my picks:
+ ```json
+ [
+ {"symbol": "TSLA", "side": "BUY", "conviction": 0.7, "reason": "Momentum", "key_news": ["Tesla rally"]}
+ ]
+ ```
+ """
+ selections = _parse_llm_selections(llm_response)
+ assert len(selections) == 1
+ assert selections[0].symbol == "TSLA"
+
+
+async def test_selector_blocks_on_risk_off():
+ mock_db = MagicMock()
+ mock_db.get_latest_market_sentiment = AsyncMock(
+ return_value={
+ "fear_greed": 15,
+ "fear_greed_label": "Extreme Fear",
+ "vix": 35.0,
+ "fed_stance": "neutral",
+ "market_regime": "risk_off",
+ "updated_at": datetime.now(timezone.utc),
+ }
+ )
+
+ selector = StockSelector(
+ db=mock_db, broker=MagicMock(), alpaca=MagicMock(), anthropic_api_key="test"
+ )
+ result = await selector.select()
+ assert result == []
diff --git a/shared/alembic/versions/002_news_sentiment_tables.py b/shared/alembic/versions/002_news_sentiment_tables.py
new file mode 100644
index 0000000..402ff87
--- /dev/null
+++ b/shared/alembic/versions/002_news_sentiment_tables.py
@@ -0,0 +1,84 @@
+"""Add news, sentiment, and stock selection tables
+
+Revision ID: 002
+Revises: 001
+Create Date: 2026-04-02
+"""
+
+from typing import Sequence, Union
+
+from alembic import op
+import sqlalchemy as sa
+
+revision: str = "002"
+down_revision: Union[str, None] = "001"
+branch_labels: Union[str, Sequence[str], None] = None
+depends_on: Union[str, Sequence[str], None] = None
+
+
+def upgrade() -> None:
+ op.create_table(
+ "news_items",
+ sa.Column("id", sa.Text, primary_key=True),
+ sa.Column("source", sa.Text, nullable=False),
+ sa.Column("headline", sa.Text, nullable=False),
+ sa.Column("summary", sa.Text),
+ sa.Column("url", sa.Text),
+ sa.Column("published_at", sa.DateTime(timezone=True), nullable=False),
+ sa.Column("symbols", sa.Text),
+ sa.Column("sentiment", sa.Float, nullable=False),
+ sa.Column("category", sa.Text, nullable=False),
+ sa.Column("raw_data", sa.Text),
+ sa.Column(
+ "created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()
+ ),
+ )
+ op.create_index("idx_news_items_published", "news_items", ["published_at"])
+ op.create_index("idx_news_items_source", "news_items", ["source"])
+
+ op.create_table(
+ "symbol_scores",
+ sa.Column("id", sa.Text, primary_key=True),
+ sa.Column("symbol", sa.Text, nullable=False, unique=True),
+ sa.Column("news_score", sa.Float, nullable=False, server_default="0"),
+ sa.Column("news_count", sa.Integer, nullable=False, server_default="0"),
+ sa.Column("social_score", sa.Float, nullable=False, server_default="0"),
+ sa.Column("policy_score", sa.Float, nullable=False, server_default="0"),
+ sa.Column("filing_score", sa.Float, nullable=False, server_default="0"),
+ sa.Column("composite", sa.Float, nullable=False, server_default="0"),
+ sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
+ )
+
+ op.create_table(
+ "market_sentiment",
+ sa.Column("id", sa.Text, primary_key=True),
+ sa.Column("fear_greed", sa.Integer, nullable=False),
+ sa.Column("fear_greed_label", sa.Text, nullable=False),
+ sa.Column("vix", sa.Float),
+ sa.Column("fed_stance", sa.Text, nullable=False, server_default="neutral"),
+ sa.Column("market_regime", sa.Text, nullable=False, server_default="neutral"),
+ sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
+ )
+
+ op.create_table(
+ "stock_selections",
+ sa.Column("id", sa.Text, primary_key=True),
+ sa.Column("trade_date", sa.Date, nullable=False),
+ sa.Column("symbol", sa.Text, nullable=False),
+ sa.Column("side", sa.Text, nullable=False),
+ sa.Column("conviction", sa.Float, nullable=False),
+ sa.Column("reason", sa.Text, nullable=False),
+ sa.Column("key_news", sa.Text),
+ sa.Column("sentiment_snapshot", sa.Text),
+ sa.Column(
+ "created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()
+ ),
+ )
+ op.create_index("idx_stock_selections_date", "stock_selections", ["trade_date"])
+
+
+def downgrade() -> None:
+ op.drop_table("stock_selections")
+ op.drop_table("market_sentiment")
+ op.drop_table("symbol_scores")
+ op.drop_table("news_items")
diff --git a/shared/src/shared/config.py b/shared/src/shared/config.py
index 4e8e7f1..b6ccebd 100644
--- a/shared/src/shared/config.py
+++ b/shared/src/shared/config.py
@@ -32,7 +32,15 @@ class Settings(BaseSettings):
telegram_enabled: bool = False
log_format: str = "json"
health_port: int = 8080
- circuit_breaker_threshold: int = 5
- circuit_breaker_timeout: int = 60
metrics_auth_token: str = "" # If set, /health and /metrics require Bearer token
+ # News collector
+ finnhub_api_key: str = ""
+ news_poll_interval: int = 300
+ sentiment_aggregate_interval: int = 900
+ # Stock selector
+ selector_final_time: str = "15:30"
+ selector_max_picks: int = 3
+ # LLM
+ anthropic_api_key: str = ""
+ anthropic_model: str = "claude-sonnet-4-20250514"
model_config = {"env_file": ".env", "env_file_encoding": "utf-8", "extra": "ignore"}
diff --git a/shared/src/shared/db.py b/shared/src/shared/db.py
index 901e293..9cc8686 100644
--- a/shared/src/shared/db.py
+++ b/shared/src/shared/db.py
@@ -1,15 +1,28 @@
"""Database layer using SQLAlchemy 2.0 async ORM for the trading platform."""
+import json
+import uuid
from contextlib import asynccontextmanager
-from datetime import datetime, timedelta, timezone
+from datetime import datetime, date, timedelta, timezone
from decimal import Decimal
from typing import Optional
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
-from shared.models import Candle, Signal, Order, OrderStatus
-from shared.sa_models import Base, CandleRow, SignalRow, OrderRow, PortfolioSnapshotRow
+from shared.models import Candle, Signal, Order, OrderStatus, NewsItem
+from shared.sentiment_models import SymbolScore, MarketSentiment
+from shared.sa_models import (
+ Base,
+ CandleRow,
+ SignalRow,
+ OrderRow,
+ PortfolioSnapshotRow,
+ NewsItemRow,
+ SymbolScoreRow,
+ MarketSentimentRow,
+ StockSelectionRow,
+)
class Database:
@@ -195,3 +208,229 @@ class Database:
}
for r in rows
]
+
+ async def insert_news_item(self, item: NewsItem) -> None:
+ """Insert a NewsItem row, JSON-encoding symbols and raw_data."""
+ row = NewsItemRow(
+ id=item.id,
+ source=item.source,
+ headline=item.headline,
+ summary=item.summary,
+ url=item.url,
+ published_at=item.published_at,
+ symbols=json.dumps(item.symbols),
+ sentiment=item.sentiment,
+ category=item.category.value,
+ raw_data=json.dumps(item.raw_data),
+ created_at=item.created_at,
+ )
+ async with self._session_factory() as session:
+ try:
+ session.add(row)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
+
+ async def get_recent_news(self, hours: int = 24) -> list[dict]:
+ """Retrieve news items published in the last N hours."""
+ since = datetime.now(timezone.utc) - timedelta(hours=hours)
+ stmt = (
+ select(NewsItemRow)
+ .where(NewsItemRow.published_at >= since)
+ .order_by(NewsItemRow.published_at.desc())
+ )
+ async with self._session_factory() as session:
+ try:
+ result = await session.execute(stmt)
+ rows = result.scalars().all()
+ except Exception:
+ await session.rollback()
+ raise
+ return [
+ {
+ "id": r.id,
+ "source": r.source,
+ "headline": r.headline,
+ "summary": r.summary,
+ "url": r.url,
+ "published_at": r.published_at,
+ "symbols": json.loads(r.symbols) if r.symbols else [],
+ "sentiment": r.sentiment,
+ "category": r.category,
+ "raw_data": json.loads(r.raw_data) if r.raw_data else {},
+ "created_at": r.created_at,
+ }
+ for r in rows
+ ]
+
+ async def upsert_symbol_score(self, score: SymbolScore) -> None:
+ """Insert or update a SymbolScore row, keyed by symbol."""
+ async with self._session_factory() as session:
+ try:
+ stmt = select(SymbolScoreRow).where(SymbolScoreRow.symbol == score.symbol)
+ result = await session.execute(stmt)
+ existing = result.scalar_one_or_none()
+ if existing is not None:
+ existing.news_score = score.news_score
+ existing.news_count = score.news_count
+ existing.social_score = score.social_score
+ existing.policy_score = score.policy_score
+ existing.filing_score = score.filing_score
+ existing.composite = score.composite
+ existing.updated_at = score.updated_at
+ else:
+ row = SymbolScoreRow(
+ id=str(uuid.uuid4()),
+ symbol=score.symbol,
+ news_score=score.news_score,
+ news_count=score.news_count,
+ social_score=score.social_score,
+ policy_score=score.policy_score,
+ filing_score=score.filing_score,
+ composite=score.composite,
+ updated_at=score.updated_at,
+ )
+ session.add(row)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
+
+ async def get_top_symbol_scores(self, limit: int = 20) -> list[dict]:
+ """Retrieve top symbol scores ordered by composite descending."""
+ stmt = select(SymbolScoreRow).order_by(SymbolScoreRow.composite.desc()).limit(limit)
+ async with self._session_factory() as session:
+ try:
+ result = await session.execute(stmt)
+ rows = result.scalars().all()
+ except Exception:
+ await session.rollback()
+ raise
+ return [
+ {
+ "id": r.id,
+ "symbol": r.symbol,
+ "news_score": r.news_score,
+ "news_count": r.news_count,
+ "social_score": r.social_score,
+ "policy_score": r.policy_score,
+ "filing_score": r.filing_score,
+ "composite": r.composite,
+ "updated_at": r.updated_at,
+ }
+ for r in rows
+ ]
+
+ async def upsert_market_sentiment(self, ms: MarketSentiment) -> None:
+ """Insert or update the single 'latest' market sentiment row."""
+ async with self._session_factory() as session:
+ try:
+ stmt = select(MarketSentimentRow).where(MarketSentimentRow.id == "latest")
+ result = await session.execute(stmt)
+ existing = result.scalar_one_or_none()
+ if existing is not None:
+ existing.fear_greed = ms.fear_greed
+ existing.fear_greed_label = ms.fear_greed_label
+ existing.vix = ms.vix
+ existing.fed_stance = ms.fed_stance
+ existing.market_regime = ms.market_regime
+ existing.updated_at = ms.updated_at
+ else:
+ row = MarketSentimentRow(
+ id="latest",
+ fear_greed=ms.fear_greed,
+ fear_greed_label=ms.fear_greed_label,
+ vix=ms.vix,
+ fed_stance=ms.fed_stance,
+ market_regime=ms.market_regime,
+ updated_at=ms.updated_at,
+ )
+ session.add(row)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
+
+ async def get_latest_market_sentiment(self) -> Optional[dict]:
+ """Retrieve the 'latest' market sentiment row, or None if not found."""
+ stmt = select(MarketSentimentRow).where(MarketSentimentRow.id == "latest")
+ async with self._session_factory() as session:
+ try:
+ result = await session.execute(stmt)
+ row = result.scalar_one_or_none()
+ except Exception:
+ await session.rollback()
+ raise
+ if row is None:
+ return None
+ return {
+ "id": row.id,
+ "fear_greed": row.fear_greed,
+ "fear_greed_label": row.fear_greed_label,
+ "vix": row.vix,
+ "fed_stance": row.fed_stance,
+ "market_regime": row.market_regime,
+ "updated_at": row.updated_at,
+ }
+
+ async def insert_stock_selection(
+ self,
+ trade_date: date,
+ symbol: str,
+ side: str,
+ conviction: float,
+ reason: str,
+ key_news: list,
+ sentiment_snapshot: dict,
+ ) -> None:
+ """Insert a stock selection row with JSON-encoded lists/dicts."""
+ row = StockSelectionRow(
+ id=str(uuid.uuid4()),
+ trade_date=trade_date,
+ symbol=symbol,
+ side=side,
+ conviction=conviction,
+ reason=reason,
+ key_news=json.dumps(key_news),
+ sentiment_snapshot=json.dumps(sentiment_snapshot),
+ created_at=datetime.now(timezone.utc),
+ )
+ async with self._session_factory() as session:
+ try:
+ session.add(row)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
+
+ async def get_stock_selections(self, trade_date: date) -> list[dict]:
+ """Retrieve stock selections for a given trade date."""
+ stmt = (
+ select(StockSelectionRow)
+ .where(StockSelectionRow.trade_date == trade_date)
+ .order_by(StockSelectionRow.conviction.desc())
+ )
+ async with self._session_factory() as session:
+ try:
+ result = await session.execute(stmt)
+ rows = result.scalars().all()
+ except Exception:
+ await session.rollback()
+ raise
+ return [
+ {
+ "id": r.id,
+ "trade_date": r.trade_date,
+ "symbol": r.symbol,
+ "side": r.side,
+ "conviction": r.conviction,
+ "reason": r.reason,
+ "key_news": json.loads(r.key_news) if r.key_news else [],
+ "sentiment_snapshot": json.loads(r.sentiment_snapshot)
+ if r.sentiment_snapshot
+ else {},
+ "created_at": r.created_at,
+ }
+ for r in rows
+ ]
diff --git a/shared/src/shared/events.py b/shared/src/shared/events.py
index 72f8865..63f93a2 100644
--- a/shared/src/shared/events.py
+++ b/shared/src/shared/events.py
@@ -5,13 +5,14 @@ from typing import Any
from pydantic import BaseModel
-from shared.models import Candle, Signal, Order
+from shared.models import Candle, Signal, Order, NewsItem
class EventType(str, Enum):
CANDLE = "CANDLE"
SIGNAL = "SIGNAL"
ORDER = "ORDER"
+ NEWS = "NEWS"
class CandleEvent(BaseModel):
@@ -59,10 +60,26 @@ class OrderEvent(BaseModel):
return cls(type=raw["type"], data=Order(**raw["data"]))
+class NewsEvent(BaseModel):
+ type: EventType = EventType.NEWS
+ data: NewsItem
+
+ def to_dict(self) -> dict:
+ return {
+ "type": self.type,
+ "data": self.data.model_dump(mode="json"),
+ }
+
+ @classmethod
+ def from_raw(cls, raw: dict) -> "NewsEvent":
+ return cls(type=raw["type"], data=NewsItem(**raw["data"]))
+
+
_EVENT_TYPE_MAP = {
EventType.CANDLE: CandleEvent,
EventType.SIGNAL: SignalEvent,
EventType.ORDER: OrderEvent,
+ EventType.NEWS: NewsEvent,
}
diff --git a/shared/src/shared/models.py b/shared/src/shared/models.py
index 70820b5..a436c03 100644
--- a/shared/src/shared/models.py
+++ b/shared/src/shared/models.py
@@ -74,3 +74,26 @@ class Position(BaseModel):
@property
def unrealized_pnl(self) -> Decimal:
return self.quantity * (self.current_price - self.avg_entry_price)
+
+
+class NewsCategory(str, Enum):
+ POLICY = "policy"
+ EARNINGS = "earnings"
+ MACRO = "macro"
+ SOCIAL = "social"
+ FILING = "filing"
+ FED = "fed"
+
+
+class NewsItem(BaseModel):
+ id: str = Field(default_factory=lambda: str(uuid.uuid4()))
+ source: str
+ headline: str
+ summary: Optional[str] = None
+ url: Optional[str] = None
+ published_at: datetime
+ symbols: list[str] = []
+ sentiment: float
+ category: NewsCategory
+ raw_data: dict = {}
+ created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
diff --git a/shared/src/shared/notifier.py b/shared/src/shared/notifier.py
index f03919c..3d7b6cf 100644
--- a/shared/src/shared/notifier.py
+++ b/shared/src/shared/notifier.py
@@ -8,6 +8,7 @@ from typing import Optional, Sequence
import aiohttp
from shared.models import Signal, Order, Position
+from shared.sentiment_models import SelectedStock, MarketSentiment
logger = logging.getLogger(__name__)
@@ -123,6 +124,34 @@ class TelegramNotifier:
lines.append(" No open positions")
await self.send("\n".join(lines))
+ async def send_stock_selection(
+ self,
+ selections: list[SelectedStock],
+ market: MarketSentiment | None = None,
+ ) -> None:
+ """Format and send stock selection notification."""
+ lines = [f"<b>📊 Stock Selection ({len(selections)} picks)</b>", ""]
+
+ side_emoji = {"BUY": "🟢", "SELL": "🔴"}
+
+ for i, s in enumerate(selections, 1):
+ emoji = side_emoji.get(s.side.value, "⚪")
+ lines.append(
+ f"{i}. <b>{s.symbol}</b> {emoji} {s.side.value} (conviction: {s.conviction:.0%})"
+ )
+ lines.append(f" {s.reason}")
+ if s.key_news:
+ lines.append(f" News: {s.key_news[0]}")
+ lines.append("")
+
+ if market:
+ lines.append(
+ f"Market: F&amp;G {market.fear_greed} ({market.fear_greed_label})"
+ + (f" | VIX {market.vix:.1f}" if market.vix else "")
+ )
+
+ await self.send("\n".join(lines))
+
async def close(self) -> None:
"""Close the underlying aiohttp session."""
if self._session is not None:
diff --git a/shared/src/shared/resilience.py b/shared/src/shared/resilience.py
index e43fd21..8d8678a 100644
--- a/shared/src/shared/resilience.py
+++ b/shared/src/shared/resilience.py
@@ -1,105 +1 @@
-"""Retry with exponential backoff and circuit breaker utilities."""
-
-from __future__ import annotations
-
-import asyncio
-import enum
-import functools
-import logging
-import random
-import time
-from typing import Any, Callable
-
-logger = logging.getLogger(__name__)
-
-
-# ---------------------------------------------------------------------------
-# retry_with_backoff
-# ---------------------------------------------------------------------------
-
-
-def retry_with_backoff(
- max_retries: int = 3,
- base_delay: float = 1.0,
- max_delay: float = 60.0,
-) -> Callable:
- """Decorator that retries an async function with exponential backoff + jitter."""
-
- def decorator(func: Callable) -> Callable:
- @functools.wraps(func)
- async def wrapper(*args: Any, **kwargs: Any) -> Any:
- last_exc: BaseException | None = None
- for attempt in range(max_retries + 1):
- try:
- return await func(*args, **kwargs)
- except Exception as exc:
- last_exc = exc
- if attempt < max_retries:
- delay = min(base_delay * (2**attempt), max_delay)
- jitter = delay * random.uniform(0, 0.5)
- total_delay = delay + jitter
- logger.warning(
- "Retry %d/%d for %s after error: %s (delay=%.3fs)",
- attempt + 1,
- max_retries,
- func.__name__,
- exc,
- total_delay,
- )
- await asyncio.sleep(total_delay)
- raise last_exc # type: ignore[misc]
-
- return wrapper
-
- return decorator
-
-
-# ---------------------------------------------------------------------------
-# CircuitBreaker
-# ---------------------------------------------------------------------------
-
-
-class CircuitState(enum.Enum):
- CLOSED = "closed"
- OPEN = "open"
- HALF_OPEN = "half_open"
-
-
-class CircuitBreaker:
- """Simple circuit breaker implementation."""
-
- def __init__(
- self,
- failure_threshold: int = 5,
- recovery_timeout: float = 60.0,
- ) -> None:
- self._failure_threshold = failure_threshold
- self._recovery_timeout = recovery_timeout
- self._failure_count: int = 0
- self._state = CircuitState.CLOSED
- self._opened_at: float = 0.0
-
- @property
- def state(self) -> CircuitState:
- return self._state
-
- def allow_request(self) -> bool:
- if self._state == CircuitState.CLOSED:
- return True
- if self._state == CircuitState.OPEN:
- if time.monotonic() - self._opened_at >= self._recovery_timeout:
- self._state = CircuitState.HALF_OPEN
- return True
- return False
- # HALF_OPEN
- return True
-
- def record_success(self) -> None:
- self._failure_count = 0
- self._state = CircuitState.CLOSED
-
- def record_failure(self) -> None:
- self._failure_count += 1
- if self._failure_count >= self._failure_threshold:
- self._state = CircuitState.OPEN
- self._opened_at = time.monotonic()
+"""Resilience utilities for the trading platform."""
diff --git a/shared/src/shared/sa_models.py b/shared/src/shared/sa_models.py
index 8386ba8..dc87ef5 100644
--- a/shared/src/shared/sa_models.py
+++ b/shared/src/shared/sa_models.py
@@ -3,6 +3,7 @@
from datetime import datetime
from decimal import Decimal
+import sqlalchemy as sa
from sqlalchemy import DateTime, ForeignKey, Numeric, Text
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
@@ -52,19 +53,6 @@ class OrderRow(Base):
filled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
-class TradeRow(Base):
- __tablename__ = "trades"
-
- id: Mapped[str] = mapped_column(Text, primary_key=True)
- order_id: Mapped[str | None] = mapped_column(Text, ForeignKey("orders.id"))
- symbol: Mapped[str] = mapped_column(Text, nullable=False)
- side: Mapped[str] = mapped_column(Text, nullable=False)
- price: Mapped[Decimal] = mapped_column(Numeric, nullable=False)
- quantity: Mapped[Decimal] = mapped_column(Numeric, nullable=False)
- fee: Mapped[Decimal] = mapped_column(Numeric, nullable=False, server_default="0")
- traded_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
-
-
class PositionRow(Base):
__tablename__ = "positions"
@@ -83,3 +71,63 @@ class PortfolioSnapshotRow(Base):
realized_pnl: Mapped[Decimal] = mapped_column(Numeric, nullable=False)
unrealized_pnl: Mapped[Decimal] = mapped_column(Numeric, nullable=False)
snapshot_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
+
+
+class NewsItemRow(Base):
+ __tablename__ = "news_items"
+
+ id: Mapped[str] = mapped_column(Text, primary_key=True)
+ source: Mapped[str] = mapped_column(Text, nullable=False)
+ headline: Mapped[str] = mapped_column(Text, nullable=False)
+ summary: Mapped[str | None] = mapped_column(Text)
+ url: Mapped[str | None] = mapped_column(Text)
+ published_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
+ symbols: Mapped[str | None] = mapped_column(Text) # JSON-encoded list
+ sentiment: Mapped[float] = mapped_column(sa.Float, nullable=False)
+ category: Mapped[str] = mapped_column(Text, nullable=False)
+ raw_data: Mapped[str | None] = mapped_column(Text) # JSON string
+ created_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True), nullable=False, server_default=sa.func.now()
+ )
+
+
+class SymbolScoreRow(Base):
+ __tablename__ = "symbol_scores"
+
+ id: Mapped[str] = mapped_column(Text, primary_key=True)
+ symbol: Mapped[str] = mapped_column(Text, nullable=False, unique=True)
+ news_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ news_count: Mapped[int] = mapped_column(sa.Integer, nullable=False, server_default="0")
+ social_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ policy_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ filing_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ composite: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
+
+
+class MarketSentimentRow(Base):
+ __tablename__ = "market_sentiment"
+
+ id: Mapped[str] = mapped_column(Text, primary_key=True)
+ fear_greed: Mapped[int] = mapped_column(sa.Integer, nullable=False)
+ fear_greed_label: Mapped[str] = mapped_column(Text, nullable=False)
+ vix: Mapped[float | None] = mapped_column(sa.Float)
+ fed_stance: Mapped[str] = mapped_column(Text, nullable=False, server_default="neutral")
+ market_regime: Mapped[str] = mapped_column(Text, nullable=False, server_default="neutral")
+ updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
+
+
+class StockSelectionRow(Base):
+ __tablename__ = "stock_selections"
+
+ id: Mapped[str] = mapped_column(Text, primary_key=True)
+ trade_date: Mapped[datetime] = mapped_column(sa.Date, nullable=False)
+ symbol: Mapped[str] = mapped_column(Text, nullable=False)
+ side: Mapped[str] = mapped_column(Text, nullable=False)
+ conviction: Mapped[float] = mapped_column(sa.Float, nullable=False)
+ reason: Mapped[str] = mapped_column(Text, nullable=False)
+ key_news: Mapped[str | None] = mapped_column(Text) # JSON string
+ sentiment_snapshot: Mapped[str | None] = mapped_column(Text) # JSON string
+ created_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True), nullable=False, server_default=sa.func.now()
+ )
diff --git a/shared/src/shared/sentiment.py b/shared/src/shared/sentiment.py
index 8213b47..5b4b0da 100644
--- a/shared/src/shared/sentiment.py
+++ b/shared/src/shared/sentiment.py
@@ -1,35 +1,105 @@
-"""Market sentiment data."""
-
-import logging
-from dataclasses import dataclass, field
-from datetime import datetime, timezone
-
-logger = logging.getLogger(__name__)
-
-
-@dataclass
-class SentimentData:
- """Aggregated sentiment snapshot."""
-
- fear_greed_value: int | None = None
- fear_greed_label: str | None = None
- news_sentiment: float | None = None
- news_count: int = 0
- exchange_netflow: float | None = None
- timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
-
- @property
- def should_buy(self) -> bool:
- if self.fear_greed_value is not None and self.fear_greed_value > 70:
- return False
- if self.news_sentiment is not None and self.news_sentiment < -0.3:
- return False
- return True
-
- @property
- def should_block(self) -> bool:
- if self.fear_greed_value is not None and self.fear_greed_value > 80:
- return True
- if self.news_sentiment is not None and self.news_sentiment < -0.5:
- return True
- return False
+"""Market sentiment aggregation."""
+
+from datetime import datetime
+
+from shared.sentiment_models import SymbolScore
+
+
+def _safe_avg(values: list[float]) -> float:
+ if not values:
+ return 0.0
+ return sum(values) / len(values)
+
+
+class SentimentAggregator:
+ """Aggregates per-news sentiment into per-symbol scores."""
+
+ WEIGHTS = {"news": 0.3, "social": 0.2, "policy": 0.3, "filing": 0.2}
+
+ CATEGORY_MAP = {
+ "earnings": "news",
+ "macro": "news",
+ "social": "social",
+ "policy": "policy",
+ "filing": "filing",
+ "fed": "policy",
+ }
+
+ def _freshness_decay(self, published_at: datetime, now: datetime) -> float:
+ age = now - published_at
+ hours = age.total_seconds() / 3600
+ if hours < 1:
+ return 1.0
+ if hours < 6:
+ return 0.7
+ if hours < 24:
+ return 0.3
+ return 0.0
+
+ def _compute_composite(
+ self,
+ news_score: float,
+ social_score: float,
+ policy_score: float,
+ filing_score: float,
+ ) -> float:
+ return (
+ news_score * self.WEIGHTS["news"]
+ + social_score * self.WEIGHTS["social"]
+ + policy_score * self.WEIGHTS["policy"]
+ + filing_score * self.WEIGHTS["filing"]
+ )
+
+ def aggregate(self, news_items: list[dict], now: datetime) -> dict[str, SymbolScore]:
+ """Aggregate news items into per-symbol scores.
+
+ Each dict needs: symbols, sentiment, category, published_at.
+ """
+ symbol_data: dict[str, dict] = {}
+
+ for item in news_items:
+ decay = self._freshness_decay(item["published_at"], now)
+ if decay == 0.0:
+ continue
+ category = item.get("category", "macro")
+ score_field = self.CATEGORY_MAP.get(category, "news")
+ weighted_sentiment = item["sentiment"] * decay
+
+ for symbol in item.get("symbols", []):
+ if symbol not in symbol_data:
+ symbol_data[symbol] = {
+ "news_scores": [],
+ "social_scores": [],
+ "policy_scores": [],
+ "filing_scores": [],
+ "count": 0,
+ }
+ symbol_data[symbol][f"{score_field}_scores"].append(weighted_sentiment)
+ symbol_data[symbol]["count"] += 1
+
+ result = {}
+ for symbol, data in symbol_data.items():
+ ns = _safe_avg(data["news_scores"])
+ ss = _safe_avg(data["social_scores"])
+ ps = _safe_avg(data["policy_scores"])
+ fs = _safe_avg(data["filing_scores"])
+ result[symbol] = SymbolScore(
+ symbol=symbol,
+ news_score=ns,
+ news_count=data["count"],
+ social_score=ss,
+ policy_score=ps,
+ filing_score=fs,
+ composite=self._compute_composite(ns, ss, ps, fs),
+ updated_at=now,
+ )
+ return result
+
+ def determine_regime(self, fear_greed: int, vix: float | None) -> str:
+ if fear_greed <= 20:
+ return "risk_off"
+ if vix is not None and vix > 30:
+ return "risk_off"
+ if fear_greed >= 60 and (vix is None or vix < 20):
+ return "risk_on"
+ return "neutral"
diff --git a/shared/src/shared/sentiment_models.py b/shared/src/shared/sentiment_models.py
new file mode 100644
index 0000000..a009601
--- /dev/null
+++ b/shared/src/shared/sentiment_models.py
@@ -0,0 +1,44 @@
+"""Sentiment scoring and stock selection models."""
+
+from datetime import datetime
+from typing import Optional
+
+from pydantic import BaseModel
+
+from shared.models import OrderSide
+
+
+class SymbolScore(BaseModel):
+ symbol: str
+ news_score: float
+ news_count: int
+ social_score: float
+ policy_score: float
+ filing_score: float
+ composite: float
+ updated_at: datetime
+
+
+class MarketSentiment(BaseModel):
+ fear_greed: int
+ fear_greed_label: str
+ vix: Optional[float] = None
+ fed_stance: str
+ market_regime: str
+ updated_at: datetime
+
+
+class SelectedStock(BaseModel):
+ symbol: str
+ side: OrderSide
+ conviction: float
+ reason: str
+ key_news: list[str]
+
+
+class Candidate(BaseModel):
+ symbol: str
+ source: str
+ direction: Optional[OrderSide] = None
+ score: float
+ reason: str
diff --git a/shared/tests/test_db_news.py b/shared/tests/test_db_news.py
new file mode 100644
index 0000000..a2c9140
--- /dev/null
+++ b/shared/tests/test_db_news.py
@@ -0,0 +1,78 @@
+"""Tests for database news/sentiment methods. Uses in-memory SQLite."""
+
+import pytest
+from datetime import datetime, date, timezone
+
+from shared.db import Database
+from shared.models import NewsItem, NewsCategory
+from shared.sentiment_models import SymbolScore, MarketSentiment
+
+
+@pytest.fixture
+async def db():
+ database = Database("sqlite+aiosqlite://")
+ await database.connect()
+ yield database
+ await database.close()
+
+
+async def test_insert_and_get_news_items(db):
+ item = NewsItem(
+ source="finnhub",
+ headline="AAPL earnings beat",
+ published_at=datetime(2026, 4, 2, 12, 0, tzinfo=timezone.utc),
+ sentiment=0.8,
+ category=NewsCategory.EARNINGS,
+ symbols=["AAPL"],
+ )
+ await db.insert_news_item(item)
+ items = await db.get_recent_news(hours=24)
+ assert len(items) == 1
+ assert items[0]["headline"] == "AAPL earnings beat"
+
+
+async def test_upsert_symbol_score(db):
+ score = SymbolScore(
+ symbol="AAPL",
+ news_score=0.5,
+ news_count=10,
+ social_score=0.3,
+ policy_score=0.0,
+ filing_score=0.2,
+ composite=0.3,
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ await db.upsert_symbol_score(score)
+ scores = await db.get_top_symbol_scores(limit=5)
+ assert len(scores) == 1
+ assert scores[0]["symbol"] == "AAPL"
+
+
+async def test_upsert_market_sentiment(db):
+ ms = MarketSentiment(
+ fear_greed=55,
+ fear_greed_label="Neutral",
+ vix=18.2,
+ fed_stance="neutral",
+ market_regime="neutral",
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ await db.upsert_market_sentiment(ms)
+ result = await db.get_latest_market_sentiment()
+ assert result is not None
+ assert result["fear_greed"] == 55
+
+
+async def test_insert_stock_selection(db):
+ await db.insert_stock_selection(
+ trade_date=date(2026, 4, 2),
+ symbol="NVDA",
+ side="BUY",
+ conviction=0.85,
+ reason="CHIPS Act",
+ key_news=["Trump signs CHIPS expansion"],
+ sentiment_snapshot={"composite": 0.8},
+ )
+ selections = await db.get_stock_selections(date(2026, 4, 2))
+ assert len(selections) == 1
+ assert selections[0]["symbol"] == "NVDA"
diff --git a/shared/tests/test_news_events.py b/shared/tests/test_news_events.py
new file mode 100644
index 0000000..384796a
--- /dev/null
+++ b/shared/tests/test_news_events.py
@@ -0,0 +1,56 @@
+"""Tests for NewsEvent."""
+
+from datetime import datetime, timezone
+
+from shared.models import NewsCategory, NewsItem
+from shared.events import NewsEvent, EventType, Event
+
+
+def test_news_event_to_dict():
+ item = NewsItem(
+ source="finnhub",
+ headline="Test",
+ published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ sentiment=0.5,
+ category=NewsCategory.MACRO,
+ )
+ event = NewsEvent(data=item)
+ d = event.to_dict()
+ assert d["type"] == EventType.NEWS
+ assert d["data"]["source"] == "finnhub"
+
+
+def test_news_event_from_raw():
+ raw = {
+ "type": "NEWS",
+ "data": {
+ "id": "abc",
+ "source": "rss",
+ "headline": "Test headline",
+ "published_at": "2026-04-02T00:00:00+00:00",
+ "sentiment": 0.3,
+ "category": "earnings",
+ "symbols": ["AAPL"],
+ "raw_data": {},
+ },
+ }
+ event = NewsEvent.from_raw(raw)
+ assert event.data.source == "rss"
+ assert event.data.symbols == ["AAPL"]
+
+
+def test_event_dispatcher_news():
+ raw = {
+ "type": "NEWS",
+ "data": {
+ "id": "abc",
+ "source": "finnhub",
+ "headline": "Test",
+ "published_at": "2026-04-02T00:00:00+00:00",
+ "sentiment": 0.0,
+ "category": "macro",
+ "raw_data": {},
+ },
+ }
+ event = Event.from_dict(raw)
+ assert isinstance(event, NewsEvent)
diff --git a/shared/tests/test_resilience.py b/shared/tests/test_resilience.py
deleted file mode 100644
index e287777..0000000
--- a/shared/tests/test_resilience.py
+++ /dev/null
@@ -1,139 +0,0 @@
-"""Tests for retry with backoff and circuit breaker."""
-
-import time
-
-import pytest
-
-from shared.resilience import CircuitBreaker, CircuitState, retry_with_backoff
-
-
-# ---------------------------------------------------------------------------
-# retry_with_backoff tests
-# ---------------------------------------------------------------------------
-
-
-@pytest.mark.asyncio
-async def test_retry_succeeds_first_try():
- call_count = 0
-
- @retry_with_backoff(max_retries=3, base_delay=0.01)
- async def succeed():
- nonlocal call_count
- call_count += 1
- return "ok"
-
- result = await succeed()
- assert result == "ok"
- assert call_count == 1
-
-
-@pytest.mark.asyncio
-async def test_retry_succeeds_after_failures():
- call_count = 0
-
- @retry_with_backoff(max_retries=3, base_delay=0.01)
- async def flaky():
- nonlocal call_count
- call_count += 1
- if call_count < 3:
- raise ValueError("not yet")
- return "recovered"
-
- result = await flaky()
- assert result == "recovered"
- assert call_count == 3
-
-
-@pytest.mark.asyncio
-async def test_retry_raises_after_max_retries():
- call_count = 0
-
- @retry_with_backoff(max_retries=3, base_delay=0.01)
- async def always_fail():
- nonlocal call_count
- call_count += 1
- raise RuntimeError("permanent")
-
- with pytest.raises(RuntimeError, match="permanent"):
- await always_fail()
- # 1 initial + 3 retries = 4 calls
- assert call_count == 4
-
-
-@pytest.mark.asyncio
-async def test_retry_respects_max_delay():
- """Backoff should be capped at max_delay."""
-
- @retry_with_backoff(max_retries=2, base_delay=0.01, max_delay=0.02)
- async def always_fail():
- raise RuntimeError("fail")
-
- start = time.monotonic()
- with pytest.raises(RuntimeError):
- await always_fail()
- elapsed = time.monotonic() - start
- # With max_delay=0.02 and 2 retries, total delay should be small
- assert elapsed < 0.5
-
-
-# ---------------------------------------------------------------------------
-# CircuitBreaker tests
-# ---------------------------------------------------------------------------
-
-
-def test_circuit_starts_closed():
- cb = CircuitBreaker(failure_threshold=3, recovery_timeout=0.05)
- assert cb.state == CircuitState.CLOSED
- assert cb.allow_request() is True
-
-
-def test_circuit_opens_after_threshold():
- cb = CircuitBreaker(failure_threshold=3, recovery_timeout=60.0)
- for _ in range(3):
- cb.record_failure()
- assert cb.state == CircuitState.OPEN
- assert cb.allow_request() is False
-
-
-def test_circuit_rejects_when_open():
- cb = CircuitBreaker(failure_threshold=2, recovery_timeout=60.0)
- cb.record_failure()
- cb.record_failure()
- assert cb.state == CircuitState.OPEN
- assert cb.allow_request() is False
-
-
-def test_circuit_half_open_after_timeout():
- cb = CircuitBreaker(failure_threshold=2, recovery_timeout=0.05)
- cb.record_failure()
- cb.record_failure()
- assert cb.state == CircuitState.OPEN
-
- time.sleep(0.06)
- assert cb.allow_request() is True
- assert cb.state == CircuitState.HALF_OPEN
-
-
-def test_circuit_closes_on_success():
- cb = CircuitBreaker(failure_threshold=2, recovery_timeout=0.05)
- cb.record_failure()
- cb.record_failure()
- assert cb.state == CircuitState.OPEN
-
- time.sleep(0.06)
- cb.allow_request() # triggers HALF_OPEN
- assert cb.state == CircuitState.HALF_OPEN
-
- cb.record_success()
- assert cb.state == CircuitState.CLOSED
- assert cb.allow_request() is True
-
-
-def test_circuit_reopens_on_failure_in_half_open():
- cb = CircuitBreaker(failure_threshold=2, recovery_timeout=0.05)
- cb.record_failure()
- cb.record_failure()
- time.sleep(0.06)
- cb.allow_request() # HALF_OPEN
- cb.record_failure()
- assert cb.state == CircuitState.OPEN
diff --git a/shared/tests/test_sa_models.py b/shared/tests/test_sa_models.py
index 67c3c82..ae73833 100644
--- a/shared/tests/test_sa_models.py
+++ b/shared/tests/test_sa_models.py
@@ -11,9 +11,12 @@ def test_base_metadata_has_all_tables():
"candles",
"signals",
"orders",
- "trades",
"positions",
"portfolio_snapshots",
+ "news_items",
+ "symbol_scores",
+ "market_sentiment",
+ "stock_selections",
}
assert expected == table_names
@@ -120,44 +123,6 @@ class TestOrderRow:
assert fk_cols == {"signal_id": "signals.id"}
-class TestTradeRow:
- def test_table_name(self):
- from shared.sa_models import TradeRow
-
- assert TradeRow.__tablename__ == "trades"
-
- def test_columns(self):
- from shared.sa_models import TradeRow
-
- mapper = inspect(TradeRow)
- cols = {c.key for c in mapper.column_attrs}
- expected = {
- "id",
- "order_id",
- "symbol",
- "side",
- "price",
- "quantity",
- "fee",
- "traded_at",
- }
- assert expected == cols
-
- def test_primary_key(self):
- from shared.sa_models import TradeRow
-
- mapper = inspect(TradeRow)
- pk_cols = [c.name for c in mapper.mapper.primary_key]
- assert pk_cols == ["id"]
-
- def test_order_id_foreign_key(self):
- from shared.sa_models import TradeRow
-
- table = TradeRow.__table__
- fk_cols = {fk.parent.name: fk.target_fullname for fk in table.foreign_keys}
- assert fk_cols == {"order_id": "orders.id"}
-
-
class TestPositionRow:
def test_table_name(self):
from shared.sa_models import PositionRow
@@ -229,11 +194,3 @@ class TestStatusDefault:
status_col = table.c.status
assert status_col.server_default is not None
assert status_col.server_default.arg == "PENDING"
-
- def test_trade_fee_server_default(self):
- from shared.sa_models import TradeRow
-
- table = TradeRow.__table__
- fee_col = table.c.fee
- assert fee_col.server_default is not None
- assert fee_col.server_default.arg == "0"
diff --git a/shared/tests/test_sa_news_models.py b/shared/tests/test_sa_news_models.py
new file mode 100644
index 0000000..91e6d4a
--- /dev/null
+++ b/shared/tests/test_sa_news_models.py
@@ -0,0 +1,29 @@
+"""Tests for news-related SQLAlchemy models."""
+
+from shared.sa_models import NewsItemRow, SymbolScoreRow, MarketSentimentRow, StockSelectionRow
+
+
+def test_news_item_row_tablename():
+ assert NewsItemRow.__tablename__ == "news_items"
+
+
+def test_symbol_score_row_tablename():
+ assert SymbolScoreRow.__tablename__ == "symbol_scores"
+
+
+def test_market_sentiment_row_tablename():
+ assert MarketSentimentRow.__tablename__ == "market_sentiment"
+
+
+def test_stock_selection_row_tablename():
+ assert StockSelectionRow.__tablename__ == "stock_selections"
+
+
+def test_news_item_row_columns():
+ cols = {c.name for c in NewsItemRow.__table__.columns}
+ assert cols >= {"id", "source", "headline", "published_at", "sentiment", "category"}
+
+
+def test_symbol_score_row_columns():
+ cols = {c.name for c in SymbolScoreRow.__table__.columns}
+ assert cols >= {"id", "symbol", "news_score", "composite", "updated_at"}
diff --git a/shared/tests/test_sentiment.py b/shared/tests/test_sentiment.py
deleted file mode 100644
index 9bd8ea3..0000000
--- a/shared/tests/test_sentiment.py
+++ /dev/null
@@ -1,44 +0,0 @@
-"""Tests for market sentiment module."""
-
-from shared.sentiment import SentimentData
-
-
-def test_sentiment_should_buy_default_no_data():
- s = SentimentData()
- assert s.should_buy is True
- assert s.should_block is False
-
-
-def test_sentiment_should_buy_low_fear_greed():
- s = SentimentData(fear_greed_value=15)
- assert s.should_buy is True
-
-
-def test_sentiment_should_not_buy_on_greed():
- s = SentimentData(fear_greed_value=75)
- assert s.should_buy is False
-
-
-def test_sentiment_should_not_buy_negative_news():
- s = SentimentData(news_sentiment=-0.4)
- assert s.should_buy is False
-
-
-def test_sentiment_should_buy_positive_news():
- s = SentimentData(fear_greed_value=50, news_sentiment=0.3)
- assert s.should_buy is True
-
-
-def test_sentiment_should_block_extreme_greed():
- s = SentimentData(fear_greed_value=85)
- assert s.should_block is True
-
-
-def test_sentiment_should_block_very_negative_news():
- s = SentimentData(news_sentiment=-0.6)
- assert s.should_block is True
-
-
-def test_sentiment_no_block_on_neutral():
- s = SentimentData(fear_greed_value=50, news_sentiment=0.0)
- assert s.should_block is False
diff --git a/shared/tests/test_sentiment_aggregator.py b/shared/tests/test_sentiment_aggregator.py
new file mode 100644
index 0000000..a99c711
--- /dev/null
+++ b/shared/tests/test_sentiment_aggregator.py
@@ -0,0 +1,77 @@
+"""Tests for sentiment aggregator."""
+
+import pytest
+from datetime import datetime, timezone, timedelta
+from shared.sentiment import SentimentAggregator
+
+
+@pytest.fixture
+def aggregator():
+ return SentimentAggregator()
+
+
+def test_freshness_decay_recent():
+ a = SentimentAggregator()
+ now = datetime.now(timezone.utc)
+ assert a._freshness_decay(now, now) == 1.0
+
+
+def test_freshness_decay_3_hours():
+ a = SentimentAggregator()
+ now = datetime.now(timezone.utc)
+ assert a._freshness_decay(now - timedelta(hours=3), now) == 0.7
+
+
+def test_freshness_decay_12_hours():
+ a = SentimentAggregator()
+ now = datetime.now(timezone.utc)
+ assert a._freshness_decay(now - timedelta(hours=12), now) == 0.3
+
+
+def test_freshness_decay_old():
+ a = SentimentAggregator()
+ now = datetime.now(timezone.utc)
+ assert a._freshness_decay(now - timedelta(days=2), now) == 0.0
+
+
+def test_compute_composite():
+ a = SentimentAggregator()
+ composite = a._compute_composite(
+ news_score=0.5, social_score=0.3, policy_score=0.8, filing_score=0.2
+ )
+ expected = 0.5 * 0.3 + 0.3 * 0.2 + 0.8 * 0.3 + 0.2 * 0.2
+ assert abs(composite - expected) < 0.001
+
+
+def test_aggregate_news_by_symbol(aggregator):
+ now = datetime.now(timezone.utc)
+ news_items = [
+ {"symbols": ["AAPL"], "sentiment": 0.8, "category": "earnings", "published_at": now},
+ {
+ "symbols": ["AAPL"],
+ "sentiment": 0.3,
+ "category": "macro",
+ "published_at": now - timedelta(hours=2),
+ },
+ {"symbols": ["MSFT"], "sentiment": -0.5, "category": "policy", "published_at": now},
+ ]
+ scores = aggregator.aggregate(news_items, now)
+ assert "AAPL" in scores
+ assert "MSFT" in scores
+ assert scores["AAPL"].news_count == 2
+ assert scores["AAPL"].news_score > 0
+ assert scores["MSFT"].policy_score < 0
+
+
+def test_aggregate_empty(aggregator):
+ now = datetime.now(timezone.utc)
+ assert aggregator.aggregate([], now) == {}
+
+
+def test_determine_regime():
+ a = SentimentAggregator()
+ assert a.determine_regime(15, None) == "risk_off"
+ assert a.determine_regime(15, 35.0) == "risk_off"
+ assert a.determine_regime(50, 35.0) == "risk_off"
+ assert a.determine_regime(70, 15.0) == "risk_on"
+ assert a.determine_regime(50, 20.0) == "neutral"
diff --git a/shared/tests/test_sentiment_models.py b/shared/tests/test_sentiment_models.py
new file mode 100644
index 0000000..25fc371
--- /dev/null
+++ b/shared/tests/test_sentiment_models.py
@@ -0,0 +1,113 @@
+"""Tests for news and sentiment models."""
+
+from datetime import datetime, timezone
+
+from shared.models import NewsCategory, NewsItem, OrderSide
+from shared.sentiment_models import SymbolScore, MarketSentiment, SelectedStock, Candidate
+
+
+def test_news_item_defaults():
+ item = NewsItem(
+ source="finnhub",
+ headline="Test headline",
+ published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ sentiment=0.5,
+ category=NewsCategory.MACRO,
+ )
+ assert item.id
+ assert item.symbols == []
+ assert item.summary is None
+ assert item.raw_data == {}
+ assert item.created_at is not None
+
+
+def test_news_item_with_symbols():
+ item = NewsItem(
+ source="rss",
+ headline="AAPL earnings beat",
+ published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ sentiment=0.8,
+ category=NewsCategory.EARNINGS,
+ symbols=["AAPL"],
+ )
+ assert item.symbols == ["AAPL"]
+ assert item.category == NewsCategory.EARNINGS
+
+
+def test_news_category_values():
+ assert NewsCategory.POLICY == "policy"
+ assert NewsCategory.EARNINGS == "earnings"
+ assert NewsCategory.MACRO == "macro"
+ assert NewsCategory.SOCIAL == "social"
+ assert NewsCategory.FILING == "filing"
+ assert NewsCategory.FED == "fed"
+
+
+def test_symbol_score():
+ score = SymbolScore(
+ symbol="AAPL",
+ news_score=0.5,
+ news_count=10,
+ social_score=0.3,
+ policy_score=0.0,
+ filing_score=0.2,
+ composite=0.3,
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ assert score.symbol == "AAPL"
+ assert score.composite == 0.3
+
+
+def test_market_sentiment():
+ ms = MarketSentiment(
+ fear_greed=25,
+ fear_greed_label="Extreme Fear",
+ vix=32.5,
+ fed_stance="hawkish",
+ market_regime="risk_off",
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ assert ms.market_regime == "risk_off"
+ assert ms.vix == 32.5
+
+
+def test_market_sentiment_no_vix():
+ ms = MarketSentiment(
+ fear_greed=50,
+ fear_greed_label="Neutral",
+ fed_stance="neutral",
+ market_regime="neutral",
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ assert ms.vix is None
+
+
+def test_selected_stock():
+ ss = SelectedStock(
+ symbol="NVDA",
+ side=OrderSide.BUY,
+ conviction=0.85,
+ reason="CHIPS Act expansion",
+ key_news=["Trump signs CHIPS Act expansion"],
+ )
+ assert ss.conviction == 0.85
+ assert len(ss.key_news) == 1
+
+
+def test_candidate():
+ c = Candidate(
+ symbol="TSLA",
+ source="sentiment",
+ direction=OrderSide.BUY,
+ score=0.75,
+ reason="High social buzz",
+ )
+ assert c.direction == OrderSide.BUY
+
+ c2 = Candidate(
+ symbol="XOM",
+ source="llm",
+ score=0.6,
+ reason="Oil price surge",
+ )
+ assert c2.direction is None