summaryrefslogtreecommitdiff
path: root/docs
diff options
context:
space:
mode:
Diffstat (limited to 'docs')
-rw-r--r--docs/TODO.md296
-rw-r--r--docs/superpowers/plans/2026-04-02-news-driven-stock-selector.md3689
-rw-r--r--docs/superpowers/specs/2026-04-02-news-driven-stock-selector-design.md418
3 files changed, 4228 insertions, 175 deletions
diff --git a/docs/TODO.md b/docs/TODO.md
index d737b0f..fd4f7eb 100644
--- a/docs/TODO.md
+++ b/docs/TODO.md
@@ -1,201 +1,147 @@
-# Trading Platform — TODO
+# US Stock Trading Platform — TODO
-> Last updated: 2026-04-01
+> Last updated: 2026-04-02
## Current State
-- **298 tests**, lint clean, production-ready 인프라
-- 8 strategies, 6 services, full monitoring/CI/CD stack
-- **트레이딩 전략 업그레이드 필요** — 아래 상세
+- **375 tests**, lint clean
+- **US 주식 전용** (Alpaca API, 수수료 0%)
+- 6 microservices + CLI + shared library
+- MOC (Market on Close) 전략 + 기술적 전략 7개
+- Prometheus/Grafana/Loki 모니터링, CI/CD, Telegram 알림
+- Claude 기반 종목 스크리너
---
-## Trading Strategy Upgrade Plan
-
-### Phase 1: 백테스터 현실화 (최우선)
-
-현재 백테스트 결과는 슬리피지/수수료 미포함으로 **실제 수익과 큰 차이**가 있음. 이것부터 수정해야 전략 개선의 효과를 정확히 측정 가능.
-
-#### 1-1. 슬리피지 + 수수료 모델링
-- **파일:** `backtester/simulator.py`
-- **현재:** 시그널 가격 그대로 체결, 수수료 0
-- **수정:** 매수 시 `price * (1 + slippage_pct)`, 매도 시 `price * (1 - slippage_pct)`
-- 수수료: `cost = price * quantity * fee_pct` (maker: 0.05%, taker: 0.1%)
-- 슬리피지를 주문 크기에 비례하게 (대형 주문 → 더 큰 슬리피지)
-- **설정:** `BacktestConfig`에 `slippage_pct`, `maker_fee_pct`, `taker_fee_pct` 추가
-
-#### 1-2. 손절/익절 자동 실행
-- **파일:** `backtester/simulator.py`
-- **현재:** 시그널로만 매매, 스탑 없음
-- **수정:** 각 포지션에 stop_loss, take_profit 가격 추적
-- 매 캔들마다 `high >= take_profit` 또는 `low <= stop_loss` 체크 → 자동 청산
-- `engine.py`에서 캔들 처리 시 시뮬레이터에 현재 가격 전달
-
-#### 1-3. 공매도 지원
-- **파일:** `backtester/simulator.py`
-- **현재:** 매도는 보유 수량 내에서만 가능
-- **수정:** `allow_short: bool` 설정, 공매도 시 음수 포지션 허용
-- 공매도 수수료 (borrow fee) 추가
-
-#### 1-4. Walk-Forward Analysis
-- **파일:** `backtester/engine.py` (신규 클래스)
-- **현재:** 전체 데이터로 백테스트 → 과적합 위험
-- **수정:** `WalkForwardEngine` 클래스
- - 데이터를 N개 구간으로 분할
- - 각 구간: in-sample (파라미터 최적화) → out-of-sample (검증)
- - 최종 결과는 out-of-sample 구간만 합산
-- 파라미터 최적화: grid search 또는 random search
-
-#### 1-5. 메트릭 정확도 개선
-- **파일:** `backtester/metrics.py`
-- Sharpe/Sortino를 per-trade가 아닌 **일별 수익률 기반**으로 계산
-- Risk-free rate 설정 추가 (기본 5%)
-- Recovery Factor (총수익 / 최대 drawdown) 추가
-- 최대 연속 손실 횟수 추가
-- 인트라 트레이드 drawdown (진입 후 최저점) 계산
+## Architecture
----
+```
+Alpaca API → data-collector (REST polling)
+ → Redis Streams → strategy-engine (MOC + 기술전략)
+ → signals → order-executor (Alpaca 주문)
+ → orders → portfolio-manager (포지션 추적)
-### Phase 2: 전략 공통 인프라
-
-모든 전략에 적용할 공통 기능. 개별 전략 개선 전에 인프라를 먼저 구축.
-
-#### 2-1. ATR 기반 동적 손절/익절
-- **파일:** `strategies/base.py` + 각 전략
-- **현재:** 어떤 전략도 손절/익절을 설정하지 않음
-- **수정:** `BaseStrategy`에 `calculate_stop_loss(candle, atr)` 메서드 추가
-- ATR (Average True Range) 유틸리티 함수 (`shared/` 또는 `strategies/indicators/`)
-- 손절: entry - ATR * multiplier, 익절: entry + ATR * reward_ratio
-- Signal에 `stop_loss`, `take_profit` 필드 추가 (`shared/models.py`)
-
-#### 2-2. 추세/횡보 레짐 필터 (ADX)
-- **파일:** `strategies/indicators/adx.py` (신규)
-- ADX (Average Directional Index) 계산 유틸리티
-- ADX > 25 = 추세장, ADX < 20 = 횡보장
-- 각 전략이 레짐에 따라 동작 변경:
- - 추세 추종 전략 (MACD, EMA): ADX < 20이면 시그널 무시
- - 평균 회귀 전략 (RSI, Bollinger, Grid): ADX > 30이면 시그널 무시
-
-#### 2-3. 볼륨 확인 필터
-- **파일:** 각 전략
-- **현재:** 모든 전략이 볼륨을 무시 (RSI, MACD, EMA 등)
-- **수정:** 시그널 발생 시 해당 캔들의 볼륨이 최근 N개 평균 대비 일정 비율 이상인지 확인
-- 볼륨 < 평균의 50%면 시그널 무시 (유동성 부족)
-- 볼륨 > 평균의 200%면 시그널 가중치 증가
-
-#### 2-4. 시그널 강도 (Conviction Score)
-- **파일:** `shared/models.py` Signal + 각 전략
-- **현재:** 시그널은 BUY/SELL/quantity만 있음
-- **수정:** Signal에 `conviction: float` (0.0~1.0) 필드 추가
- - RSI 5 → conviction 0.9, RSI 28 → conviction 0.3
- - MACD 히스토그램이 0에서 먼 크로스 → 높은 conviction
-- Combined 전략에서 conviction 기반 가중치 사용
-- RiskManager에서 conviction 기반 포지션 사이징
-
-#### 2-5. 지표 라이브러리
-- **디렉토리:** `strategies/indicators/` (신규)
-- 재사용 가능한 기술 지표 함수:
- - `atr(highs, lows, closes, period)` — Average True Range
- - `adx(highs, lows, closes, period)` — Average Directional Index
- - `ema(series, period)` — Exponential Moving Average
- - `sma(series, period)` — Simple Moving Average
- - `rsi(closes, period)` — RSI
- - `bollinger_bands(closes, period, num_std)` — Bollinger
- - `macd(closes, fast, slow, signal)` — MACD
- - `volume_sma(volumes, period)` — Volume SMA
-- 각 전략에서 직접 계산하지 않고 공통 라이브러리 사용
-- 테스트: 각 지표에 대한 unit test
+Claude API → stock_screener.py (종목 분석/추천)
+FastAPI → REST API (/api/v1/portfolio, orders, strategies)
+```
----
+## 핵심 매매 전략: MOC (Market on Close)
-### Phase 3: 개별 전략 고도화
-
-Phase 1-2 완료 후 각 전략을 전문가 수준으로 업그레이드.
-
-#### 3-1. RSI 전략 개선
-- [ ] RSI 다이버전스 감지 (가격 신고가 + RSI 하락 = 약세 다이버전스)
-- [ ] ADX 레짐 필터 적용 (추세장에서는 RSI 매수 신호 무시)
-- [ ] RSI 강도별 conviction score (RSI 5 vs RSI 28)
-- [ ] ATR 기반 손절/익절
-- [ ] 볼륨 확인 필터
-
-#### 3-2. MACD 전략 개선
-- [ ] 히스토그램 크로스오버 + MACD 제로라인 크로스오버 구분
-- [ ] MACD 다이버전스 감지
-- [ ] ADX 추세 확인 (ADX < 20이면 시그널 무시)
-- [ ] 제로라인으로부터 거리 기반 시그널 강도
-- [ ] ATR 기반 손절
-
-#### 3-3. Grid 전략 개선
-- [ ] ADX 기반 레짐 필터 (추세장 진입 차단)
-- [ ] 동적 그리드 재설정 (실현 변동성 기반 범위 조정)
-- [ ] 그리드 외 이탈 시 전 포지션 청산 + 알림
-- [ ] 볼륨 프로파일 기반 비균등 그리드 간격
-
-#### 3-4. Bollinger Bands 전략 개선
-- [ ] 스퀴즈 감지 (밴드 압축 → 브레이크아웃 대비)
-- [ ] %B 지표 활용 (밴드 내 위치 0~1)
-- [ ] RSI 확인 (하단 밴드 터치 + RSI < 30 = 강한 매수)
-- [ ] 볼륨 스파이크 확인
-
-#### 3-5. EMA Crossover 전략 개선
-- [ ] ADX > 25 필터 (강한 추세만 진입)
-- [ ] 풀백 진입 (크로스 후 단기 EMA로 되돌림 시 진입)
-- [ ] 50 SMA 위/아래 필터 (장기 추세 방향 확인)
-- [ ] 볼륨 확인
-
-#### 3-6. VWAP 전략 개선
-- [ ] 일중 리셋 (매일 00:00 UTC에 VWAP 재계산)
-- [ ] VWAP 표준편차 밴드 추가 (1σ, 2σ)
-- [ ] ATR 기반 deviation threshold (고정값 대신 변동성 적응형)
-- [ ] 세션 필터 (저유동성 시간대 진입 차단)
-
-#### 3-7. Volume Profile 전략 개선
-- [ ] HVN/LVN (고/저볼륨 노드) 식별
-- [ ] 세션 기반 프로파일 리셋
-- [ ] POC를 동적 지지/저항선으로 활용
-- [ ] 볼륨 델타 (매수량 - 매도량) 추적
-
-#### 3-8. Combined 전략 개선
-- [ ] Sub-strategy conviction score 반영
-- [ ] Sub-strategy 간 상관관계 행렬 계산 → 중복 시그널 감쇄
-- [ ] 적응형 가중치 (최근 win rate 기반 동적 가중치 조정)
-- [ ] 포트폴리오 집중도 제한
+```
+[매일 ET 15:50] Claude 종목 분석 → 매수 종목 선정
+[ET 15:50~16:00] 장 마감 직전 매수 (MOC 주문)
+[다음날 ET 9:35~10:00] 시가 매도
+
+조건: 양봉 + 볼륨 > 평균 + RSI 30~60 + EMA 위 + 모멘텀 양호
+손절: -2%, 포지션당 자본금 20%, 최대 5종목
+```
---
-### Phase 4: 리스크 관리 고도화
+## Remaining Work
+
+### 즉시 해야 할 것
-#### 4-1. 포트폴리오 레벨 리스크
-- [ ] 전체 노출도 제한 (총 포지션 가치 / 잔고 비율)
-- [ ] 포지션 간 상관관계 계산 → 실효 리스크 산출
-- [ ] VaR (Value at Risk) 계산 — 95% 신뢰 구간
+#### 1. MOC 백테스트 스크립트
+- [ ] `scripts/backtest_moc.py` — 합성 데이터로 MOC 전략 파라미터 최적화
+- [ ] 5개 주식 (AAPL, MSFT, TSLA, NVDA, AMZN) 대상 90일 백테스트
+- [ ] RSI 범위, 손절률, EMA period 그리드 서치
+- [ ] Makefile target: `make backtest-moc`
-#### 4-2. 동적 포지션 축소
-- [ ] Drawdown이 일정 수준 넘으면 포지션 크기 자동 축소
-- [ ] 연속 손실 N회 시 거래 일시 중단 → Telegram 알림
-- [ ] 시간대별 리스크 조정 (주말, 공휴일 축소)
+#### 2. 실제 데이터 백테스트
+- [ ] Alpaca API로 과거 데이터 다운로드 → DB 저장
+- [ ] 실제 데이터 기반 MOC 전략 검증
+- [ ] Walk-forward analysis로 과적합 확인
-#### 4-3. 시나리오 분석
-- [ ] 과거 극단 이벤트 (FTX 사태, Luna 등)에 대한 포트폴리오 영향 시뮬레이션
-- [ ] 유동성 리스크 체크 (주문 크기 vs 호가창 깊이)
+#### 3. Paper Trading 배포
+- [ ] `.env` 설정 (ALPACA_PAPER=true)
+- [ ] `make up` → 전 서비스 실행
+- [ ] 2-4주 모의 매매 → 실제 성과 확인
+- [ ] Telegram 알림으로 매일 결과 수신
---
-## Priority & Effort
+### 개선 사항
-| Phase | 내용 | 예상 작업량 | 영향 |
-|-------|------|------------|------|
-| **Phase 1** | 백테스터 현실화 | 1-2일 | **최대** — 이것 없이는 전략 평가 불가 |
-| **Phase 2** | 전략 공통 인프라 | 1-2일 | **높음** — 모든 전략의 기반 |
-| **Phase 3** | 개별 전략 고도화 | 3-5일 | 중간 — 수익률 직접 개선 |
-| **Phase 4** | 리스크 관리 고도화 | 2-3일 | 높음 — 손실 방지 |
+#### 4. Claude 스크리너 고도화
+- [ ] SEC 공시 분석 (10-K, 10-Q, 8-K)
+- [ ] 실적 서프라이즈 감지 (EPS beat/miss)
+- [ ] 섹터 로테이션 분석
+- [ ] 뉴스 감성 분석 (Yahoo Finance, MarketWatch)
-**권장 순서: Phase 1 → Phase 2 → Phase 4 → Phase 3**
-(전략 개선보다 리스크 관리가 더 중요 — 돈을 벌기 전에 잃지 않는 게 먼저)
+#### 5. 주문 유형 확장
+- [ ] Limit order 지원
+- [ ] 프리마켓/애프터마켓 주문
+- [ ] 분할 매수/매도
+
+#### 6. 추가 전략
+- [ ] ORB (Opening Range Breakout) — 장 시작 30분 전략
+- [ ] Gap & Go — 갭 상승 종목 추격 전략
+- [ ] Earnings Play — 실적 발표 전후 전략
+
+#### 7. 리스크 관리 개선
+- [ ] 섹터 집중도 제한 (같은 섹터 3개 이상 금지)
+- [ ] 실적 발표일 매매 회피
+- [ ] 시장 전체 하락 시 매매 중단 (SPY RSI 기반)
---
-## Previously Completed (Infrastructure)
+## Quick Start
+
+```bash
+# 1. 환경 설정
+cp .env.example .env
+# ALPACA_API_KEY, ALPACA_API_SECRET 입력
+# ANTHROPIC_API_KEY 입력 (Claude 스크리너용)
+
+# 2. 의존성 설치
+pip install -e shared/
+
+# 3. 인프라 실행
+make infra # Redis + PostgreSQL
+make migrate # DB 마이그레이션
+
+# 4. 테스트
+make test # 375 tests
+
+# 5. 종목 스크리닝
+make screen # Claude가 33개 종목 분석 → Top 5 추천
+
+# 6. 서비스 실행
+make up # 전 서비스 시작 (paper trading)
+
+# 7. 모니터링
+docker compose --profile monitoring up -d
+# Grafana: http://localhost:3000
+# API: http://localhost:8000/api/v1/strategies
+
+# 8. CLI
+trading strategy list
+trading backtest run --strategy moc --symbol AAPL --timeframe 1Day
+trading portfolio show
+```
+
+---
-모든 인프라 항목 완료 (27개): SQLAlchemy, Alembic, structlog, Telegram, Prometheus/Grafana/Loki, CI/CD, FastAPI, multi-exchange, Redis consumer groups, realized PnL, bearer auth, 298 tests.
+## Completed (Infrastructure)
+
+- [x] Alpaca API 클라이언트 (paper + live)
+- [x] MOC 전략 (종가 매수 / 시가 매도)
+- [x] Claude 종목 스크리너 (33개 유니버스)
+- [x] Data collector (Alpaca REST polling)
+- [x] Order executor (Alpaca submit_order)
+- [x] SQLAlchemy ORM + Alembic 마이그레이션
+- [x] structlog 구조화 로깅
+- [x] Telegram 알림
+- [x] Retry + Circuit Breaker
+- [x] Prometheus + Grafana + Loki
+- [x] Redis consumer groups
+- [x] Portfolio snapshots + realized PnL
+- [x] Bearer token auth
+- [x] CI/CD (Gitea Actions)
+- [x] E2E test script
+- [x] FastAPI REST API
+- [x] 백테스터 (슬리피지, 수수료, SL/TP, 공매도, walk-forward)
+- [x] 기술 지표 라이브러리 (ATR, ADX, RSI, MACD, Bollinger, Stochastic, OBV)
+- [x] 포트폴리오 VaR, 상관관계, drawdown 기반 리스크
+- [x] 375 tests, lint clean
diff --git a/docs/superpowers/plans/2026-04-02-news-driven-stock-selector.md b/docs/superpowers/plans/2026-04-02-news-driven-stock-selector.md
new file mode 100644
index 0000000..0964f21
--- /dev/null
+++ b/docs/superpowers/plans/2026-04-02-news-driven-stock-selector.md
@@ -0,0 +1,3689 @@
+# News-Driven Stock Selector Implementation Plan
+
+> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
+
+**Goal:** Replace the MOC strategy's fixed symbol list with a dynamic, news-driven stock selection system that continuously collects news/sentiment data and selects 2-3 optimal stocks daily before market close.
+
+**Architecture:** A new `news-collector` service runs 7 data source collectors on individual poll intervals, storing `NewsItem` records in PostgreSQL and publishing to Redis. A sentiment aggregator computes per-symbol composite scores every 15 minutes. Before market close, a 3-stage stock selector (sentiment candidates → technical filter → LLM final pick) chooses 2-3 stocks and feeds them to the existing MOC strategy.
+
+**Tech Stack:** Python 3.12+, asyncio, aiohttp, Pydantic, SQLAlchemy 2.0 async, Redis Streams, VADER (nltk), feedparser (RSS), Anthropic SDK (Claude API), Alembic
+
+---
+
+## Phase 1: Shared Foundation (Models, DB, Events)
+
+### Task 1: Add NewsItem and sentiment models to shared
+
+**Files:**
+- Modify: `shared/src/shared/models.py`
+- Create: `shared/src/shared/sentiment_models.py`
+- Create: `shared/tests/test_sentiment_models.py`
+
+- [ ] **Step 1: Write tests for new models**
+
+Create `shared/tests/test_sentiment_models.py`:
+
+```python
+"""Tests for news and sentiment models."""
+
+import pytest
+from datetime import datetime, timezone
+
+from shared.models import NewsCategory, NewsItem, OrderSide
+from shared.sentiment_models import SymbolScore, MarketSentiment, SelectedStock, Candidate
+
+
+def test_news_item_defaults():
+ item = NewsItem(
+ source="finnhub",
+ headline="Test headline",
+ published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ sentiment=0.5,
+ category=NewsCategory.MACRO,
+ )
+ assert item.id # UUID generated
+ assert item.symbols == []
+ assert item.summary is None
+ assert item.raw_data == {}
+ assert item.created_at is not None
+
+
+def test_news_item_with_symbols():
+ item = NewsItem(
+ source="rss",
+ headline="AAPL earnings beat",
+ published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ sentiment=0.8,
+ category=NewsCategory.EARNINGS,
+ symbols=["AAPL"],
+ )
+ assert item.symbols == ["AAPL"]
+ assert item.category == NewsCategory.EARNINGS
+
+
+def test_news_category_values():
+ assert NewsCategory.POLICY == "policy"
+ assert NewsCategory.EARNINGS == "earnings"
+ assert NewsCategory.MACRO == "macro"
+ assert NewsCategory.SOCIAL == "social"
+ assert NewsCategory.FILING == "filing"
+ assert NewsCategory.FED == "fed"
+
+
+def test_symbol_score():
+ score = SymbolScore(
+ symbol="AAPL",
+ news_score=0.5,
+ news_count=10,
+ social_score=0.3,
+ policy_score=0.0,
+ filing_score=0.2,
+ composite=0.3,
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ assert score.symbol == "AAPL"
+ assert score.composite == 0.3
+
+
+def test_market_sentiment():
+ ms = MarketSentiment(
+ fear_greed=25,
+ fear_greed_label="Extreme Fear",
+ vix=32.5,
+ fed_stance="hawkish",
+ market_regime="risk_off",
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ assert ms.market_regime == "risk_off"
+ assert ms.vix == 32.5
+
+
+def test_market_sentiment_no_vix():
+ ms = MarketSentiment(
+ fear_greed=50,
+ fear_greed_label="Neutral",
+ fed_stance="neutral",
+ market_regime="neutral",
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ assert ms.vix is None
+
+
+def test_selected_stock():
+ ss = SelectedStock(
+ symbol="NVDA",
+ side=OrderSide.BUY,
+ conviction=0.85,
+ reason="CHIPS Act expansion",
+ key_news=["Trump signs CHIPS Act expansion"],
+ )
+ assert ss.conviction == 0.85
+ assert len(ss.key_news) == 1
+
+
+def test_candidate():
+ c = Candidate(
+ symbol="TSLA",
+ source="sentiment",
+ direction=OrderSide.BUY,
+ score=0.75,
+ reason="High social buzz",
+ )
+ assert c.direction == OrderSide.BUY
+
+ c2 = Candidate(
+ symbol="XOM",
+ source="llm",
+ score=0.6,
+ reason="Oil price surge",
+ )
+ assert c2.direction is None
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest shared/tests/test_sentiment_models.py -v`
+Expected: FAIL — `NewsCategory`, `NewsItem` not found in `shared.models`, `shared.sentiment_models` does not exist
+
+- [ ] **Step 3: Add NewsCategory and NewsItem to shared/models.py**
+
+Add to the end of `shared/src/shared/models.py`:
+
+```python
+class NewsCategory(str, Enum):
+ POLICY = "policy"
+ EARNINGS = "earnings"
+ MACRO = "macro"
+ SOCIAL = "social"
+ FILING = "filing"
+ FED = "fed"
+
+
+class NewsItem(BaseModel):
+ id: str = Field(default_factory=lambda: str(uuid.uuid4()))
+ source: str
+ headline: str
+ summary: Optional[str] = None
+ url: Optional[str] = None
+ published_at: datetime
+ symbols: list[str] = []
+ sentiment: float
+ category: NewsCategory
+ raw_data: dict = {}
+ created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
+```
+
+- [ ] **Step 4: Create shared/src/shared/sentiment_models.py**
+
+```python
+"""Sentiment scoring and stock selection models."""
+
+from datetime import datetime
+from typing import Optional
+
+from pydantic import BaseModel
+
+from shared.models import OrderSide
+
+
+class SymbolScore(BaseModel):
+ symbol: str
+ news_score: float
+ news_count: int
+ social_score: float
+ policy_score: float
+ filing_score: float
+ composite: float
+ updated_at: datetime
+
+
+class MarketSentiment(BaseModel):
+ fear_greed: int
+ fear_greed_label: str
+ vix: Optional[float] = None
+ fed_stance: str
+ market_regime: str
+ updated_at: datetime
+
+
+class SelectedStock(BaseModel):
+ symbol: str
+ side: OrderSide
+ conviction: float
+ reason: str
+ key_news: list[str]
+
+
+class Candidate(BaseModel):
+ symbol: str
+ source: str
+ direction: Optional[OrderSide] = None
+ score: float
+ reason: str
+```
+
+- [ ] **Step 5: Run tests to verify they pass**
+
+Run: `pytest shared/tests/test_sentiment_models.py -v`
+Expected: All 9 tests PASS
+
+- [ ] **Step 6: Commit**
+
+```bash
+git add shared/src/shared/models.py shared/src/shared/sentiment_models.py shared/tests/test_sentiment_models.py
+git commit -m "feat: add NewsItem, sentiment scoring, and stock selection models"
+```
+
+---
+
+### Task 2: Add SQLAlchemy ORM models for news tables
+
+**Files:**
+- Modify: `shared/src/shared/sa_models.py`
+- Create: `shared/tests/test_sa_news_models.py`
+
+- [ ] **Step 1: Write tests for new SA models**
+
+Create `shared/tests/test_sa_news_models.py`:
+
+```python
+"""Tests for news-related SQLAlchemy models."""
+
+from shared.sa_models import NewsItemRow, SymbolScoreRow, MarketSentimentRow, StockSelectionRow
+
+
+def test_news_item_row_tablename():
+ assert NewsItemRow.__tablename__ == "news_items"
+
+
+def test_symbol_score_row_tablename():
+ assert SymbolScoreRow.__tablename__ == "symbol_scores"
+
+
+def test_market_sentiment_row_tablename():
+ assert MarketSentimentRow.__tablename__ == "market_sentiment"
+
+
+def test_stock_selection_row_tablename():
+ assert StockSelectionRow.__tablename__ == "stock_selections"
+
+
+def test_news_item_row_columns():
+ cols = {c.name for c in NewsItemRow.__table__.columns}
+ assert cols >= {"id", "source", "headline", "published_at", "sentiment", "category"}
+
+
+def test_symbol_score_row_columns():
+ cols = {c.name for c in SymbolScoreRow.__table__.columns}
+ assert cols >= {"id", "symbol", "news_score", "composite", "updated_at"}
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest shared/tests/test_sa_news_models.py -v`
+Expected: FAIL — import errors
+
+- [ ] **Step 3: Add ORM models to sa_models.py**
+
+Add to the end of `shared/src/shared/sa_models.py`:
+
+```python
+class NewsItemRow(Base):
+ __tablename__ = "news_items"
+
+ id: Mapped[str] = mapped_column(Text, primary_key=True)
+ source: Mapped[str] = mapped_column(Text, nullable=False)
+ headline: Mapped[str] = mapped_column(Text, nullable=False)
+ summary: Mapped[str | None] = mapped_column(Text)
+ url: Mapped[str | None] = mapped_column(Text)
+ published_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
+ symbols: Mapped[str | None] = mapped_column(Text) # JSON-encoded list
+ sentiment: Mapped[float] = mapped_column(sa.Float, nullable=False)
+ category: Mapped[str] = mapped_column(Text, nullable=False)
+ raw_data: Mapped[str | None] = mapped_column(Text) # JSON string
+ created_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True), nullable=False, server_default=sa.func.now()
+ )
+
+
+class SymbolScoreRow(Base):
+ __tablename__ = "symbol_scores"
+
+ id: Mapped[str] = mapped_column(Text, primary_key=True)
+ symbol: Mapped[str] = mapped_column(Text, nullable=False, unique=True)
+ news_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ news_count: Mapped[int] = mapped_column(sa.Integer, nullable=False, server_default="0")
+ social_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ policy_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ filing_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ composite: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
+
+
+class MarketSentimentRow(Base):
+ __tablename__ = "market_sentiment"
+
+ id: Mapped[str] = mapped_column(Text, primary_key=True)
+ fear_greed: Mapped[int] = mapped_column(sa.Integer, nullable=False)
+ fear_greed_label: Mapped[str] = mapped_column(Text, nullable=False)
+ vix: Mapped[float | None] = mapped_column(sa.Float)
+ fed_stance: Mapped[str] = mapped_column(Text, nullable=False, server_default="neutral")
+ market_regime: Mapped[str] = mapped_column(Text, nullable=False, server_default="neutral")
+ updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
+
+
+class StockSelectionRow(Base):
+ __tablename__ = "stock_selections"
+
+ id: Mapped[str] = mapped_column(Text, primary_key=True)
+ trade_date: Mapped[datetime] = mapped_column(sa.Date, nullable=False)
+ symbol: Mapped[str] = mapped_column(Text, nullable=False)
+ side: Mapped[str] = mapped_column(Text, nullable=False)
+ conviction: Mapped[float] = mapped_column(sa.Float, nullable=False)
+ reason: Mapped[str] = mapped_column(Text, nullable=False)
+ key_news: Mapped[str | None] = mapped_column(Text) # JSON string
+ sentiment_snapshot: Mapped[str | None] = mapped_column(Text) # JSON string
+ created_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True), nullable=False, server_default=sa.func.now()
+ )
+```
+
+Also add `import sqlalchemy as sa` to the imports at the top of `sa_models.py`.
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest shared/tests/test_sa_news_models.py -v`
+Expected: All 6 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add shared/src/shared/sa_models.py shared/tests/test_sa_news_models.py
+git commit -m "feat: add SQLAlchemy ORM models for news, scores, selections"
+```
+
+---
+
+### Task 3: Create Alembic migration for news tables
+
+**Files:**
+- Create: `shared/alembic/versions/002_news_sentiment_tables.py`
+
+- [ ] **Step 1: Create migration file**
+
+Create `shared/alembic/versions/002_news_sentiment_tables.py`:
+
+```python
+"""Add news, sentiment, and stock selection tables
+
+Revision ID: 002
+Revises: 001
+Create Date: 2026-04-02
+"""
+
+from typing import Sequence, Union
+
+from alembic import op
+import sqlalchemy as sa
+
+revision: str = "002"
+down_revision: Union[str, None] = "001"
+branch_labels: Union[str, Sequence[str], None] = None
+depends_on: Union[str, Sequence[str], None] = None
+
+
+def upgrade() -> None:
+ op.create_table(
+ "news_items",
+ sa.Column("id", sa.Text, primary_key=True),
+ sa.Column("source", sa.Text, nullable=False),
+ sa.Column("headline", sa.Text, nullable=False),
+ sa.Column("summary", sa.Text),
+ sa.Column("url", sa.Text),
+ sa.Column("published_at", sa.DateTime(timezone=True), nullable=False),
+ sa.Column("symbols", sa.Text),
+ sa.Column("sentiment", sa.Float, nullable=False),
+ sa.Column("category", sa.Text, nullable=False),
+ sa.Column("raw_data", sa.Text),
+ sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
+ )
+ op.create_index("idx_news_items_published", "news_items", ["published_at"])
+ op.create_index("idx_news_items_source", "news_items", ["source"])
+
+ op.create_table(
+ "symbol_scores",
+ sa.Column("id", sa.Text, primary_key=True),
+ sa.Column("symbol", sa.Text, nullable=False, unique=True),
+ sa.Column("news_score", sa.Float, nullable=False, server_default="0"),
+ sa.Column("news_count", sa.Integer, nullable=False, server_default="0"),
+ sa.Column("social_score", sa.Float, nullable=False, server_default="0"),
+ sa.Column("policy_score", sa.Float, nullable=False, server_default="0"),
+ sa.Column("filing_score", sa.Float, nullable=False, server_default="0"),
+ sa.Column("composite", sa.Float, nullable=False, server_default="0"),
+ sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
+ )
+
+ op.create_table(
+ "market_sentiment",
+ sa.Column("id", sa.Text, primary_key=True),
+ sa.Column("fear_greed", sa.Integer, nullable=False),
+ sa.Column("fear_greed_label", sa.Text, nullable=False),
+ sa.Column("vix", sa.Float),
+ sa.Column("fed_stance", sa.Text, nullable=False, server_default="neutral"),
+ sa.Column("market_regime", sa.Text, nullable=False, server_default="neutral"),
+ sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
+ )
+
+ op.create_table(
+ "stock_selections",
+ sa.Column("id", sa.Text, primary_key=True),
+ sa.Column("trade_date", sa.Date, nullable=False),
+ sa.Column("symbol", sa.Text, nullable=False),
+ sa.Column("side", sa.Text, nullable=False),
+ sa.Column("conviction", sa.Float, nullable=False),
+ sa.Column("reason", sa.Text, nullable=False),
+ sa.Column("key_news", sa.Text),
+ sa.Column("sentiment_snapshot", sa.Text),
+ sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
+ )
+ op.create_index("idx_stock_selections_date", "stock_selections", ["trade_date"])
+
+
+def downgrade() -> None:
+ op.drop_table("stock_selections")
+ op.drop_table("market_sentiment")
+ op.drop_table("symbol_scores")
+ op.drop_table("news_items")
+```
+
+- [ ] **Step 2: Verify migration imports correctly**
+
+Run: `cd shared && python -c "from alembic.versions import *; print('OK')" && cd ..`
+Or simply: `python -c "import importlib.util; s=importlib.util.spec_from_file_location('m','shared/alembic/versions/002_news_sentiment_tables.py'); m=importlib.util.module_from_spec(s); s.loader.exec_module(m); print('OK')"`
+Expected: OK (no import errors)
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add shared/alembic/versions/002_news_sentiment_tables.py
+git commit -m "feat: add Alembic migration for news and sentiment tables"
+```
+
+---
+
+### Task 4: Add NewsEvent to shared events and DB methods for news
+
+**Files:**
+- Modify: `shared/src/shared/events.py`
+- Modify: `shared/src/shared/db.py`
+- Create: `shared/tests/test_news_events.py`
+- Create: `shared/tests/test_db_news.py`
+
+- [ ] **Step 1: Write tests for NewsEvent**
+
+Create `shared/tests/test_news_events.py`:
+
+```python
+"""Tests for NewsEvent."""
+
+from datetime import datetime, timezone
+
+from shared.models import NewsCategory, NewsItem
+from shared.events import NewsEvent, EventType, Event
+
+
+def test_news_event_to_dict():
+ item = NewsItem(
+ source="finnhub",
+ headline="Test",
+ published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ sentiment=0.5,
+ category=NewsCategory.MACRO,
+ )
+ event = NewsEvent(data=item)
+ d = event.to_dict()
+ assert d["type"] == EventType.NEWS
+ assert d["data"]["source"] == "finnhub"
+
+
+def test_news_event_from_raw():
+ raw = {
+ "type": "NEWS",
+ "data": {
+ "id": "abc",
+ "source": "rss",
+ "headline": "Test headline",
+ "published_at": "2026-04-02T00:00:00+00:00",
+ "sentiment": 0.3,
+ "category": "earnings",
+ "symbols": ["AAPL"],
+ "raw_data": {},
+ },
+ }
+ event = NewsEvent.from_raw(raw)
+ assert event.data.source == "rss"
+ assert event.data.symbols == ["AAPL"]
+
+
+def test_event_dispatcher_news():
+ raw = {
+ "type": "NEWS",
+ "data": {
+ "id": "abc",
+ "source": "finnhub",
+ "headline": "Test",
+ "published_at": "2026-04-02T00:00:00+00:00",
+ "sentiment": 0.0,
+ "category": "macro",
+ "raw_data": {},
+ },
+ }
+ event = Event.from_dict(raw)
+ assert isinstance(event, NewsEvent)
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest shared/tests/test_news_events.py -v`
+Expected: FAIL — `NewsEvent` not in events.py, `EventType.NEWS` missing
+
+- [ ] **Step 3: Add NewsEvent to events.py**
+
+Add `NEWS = "NEWS"` to `EventType` enum.
+
+Add `NewsEvent` class and register it in `_EVENT_TYPE_MAP`:
+
+```python
+from shared.models import Candle, Signal, Order, NewsItem
+
+class NewsEvent(BaseModel):
+ type: EventType = EventType.NEWS
+ data: NewsItem
+
+ def to_dict(self) -> dict:
+ return {
+ "type": self.type,
+ "data": self.data.model_dump(mode="json"),
+ }
+
+ @classmethod
+ def from_raw(cls, raw: dict) -> "NewsEvent":
+ return cls(type=raw["type"], data=NewsItem(**raw["data"]))
+```
+
+Add to `_EVENT_TYPE_MAP`:
+```python
+EventType.NEWS: NewsEvent,
+```
+
+- [ ] **Step 4: Run event tests to verify they pass**
+
+Run: `pytest shared/tests/test_news_events.py -v`
+Expected: All 3 tests PASS
+
+- [ ] **Step 5: Run all existing event tests to check no regressions**
+
+Run: `pytest shared/tests/test_events.py -v`
+Expected: All existing tests PASS
+
+- [ ] **Step 6: Write tests for DB news methods**
+
+Create `shared/tests/test_db_news.py`:
+
+```python
+"""Tests for database news/sentiment methods.
+
+These tests use an in-memory SQLite database.
+"""
+
+import json
+import uuid
+import pytest
+from datetime import datetime, date, timezone
+
+from shared.db import Database
+from shared.models import NewsItem, NewsCategory
+from shared.sentiment_models import SymbolScore, MarketSentiment
+
+
+@pytest.fixture
+async def db():
+ database = Database("sqlite+aiosqlite://")
+ await database.connect()
+ yield database
+ await database.close()
+
+
+async def test_insert_and_get_news_items(db):
+ item = NewsItem(
+ source="finnhub",
+ headline="AAPL earnings beat",
+ published_at=datetime(2026, 4, 2, 12, 0, tzinfo=timezone.utc),
+ sentiment=0.8,
+ category=NewsCategory.EARNINGS,
+ symbols=["AAPL"],
+ )
+ await db.insert_news_item(item)
+ items = await db.get_recent_news(hours=24)
+ assert len(items) == 1
+ assert items[0]["headline"] == "AAPL earnings beat"
+
+
+async def test_upsert_symbol_score(db):
+ score = SymbolScore(
+ symbol="AAPL",
+ news_score=0.5,
+ news_count=10,
+ social_score=0.3,
+ policy_score=0.0,
+ filing_score=0.2,
+ composite=0.3,
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ await db.upsert_symbol_score(score)
+ scores = await db.get_top_symbol_scores(limit=5)
+ assert len(scores) == 1
+ assert scores[0]["symbol"] == "AAPL"
+
+
+async def test_upsert_market_sentiment(db):
+ ms = MarketSentiment(
+ fear_greed=55,
+ fear_greed_label="Neutral",
+ vix=18.2,
+ fed_stance="neutral",
+ market_regime="neutral",
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ await db.upsert_market_sentiment(ms)
+ result = await db.get_latest_market_sentiment()
+ assert result is not None
+ assert result["fear_greed"] == 55
+
+
+async def test_insert_stock_selection(db):
+ await db.insert_stock_selection(
+ trade_date=date(2026, 4, 2),
+ symbol="NVDA",
+ side="BUY",
+ conviction=0.85,
+ reason="CHIPS Act",
+ key_news=["Trump signs CHIPS expansion"],
+ sentiment_snapshot={"composite": 0.8},
+ )
+ selections = await db.get_stock_selections(date(2026, 4, 2))
+ assert len(selections) == 1
+ assert selections[0]["symbol"] == "NVDA"
+```
+
+- [ ] **Step 7: Run DB news tests to verify they fail**
+
+Run: `pytest shared/tests/test_db_news.py -v`
+Expected: FAIL — methods not yet on Database class
+
+- [ ] **Step 8: Add news/sentiment DB methods to db.py**
+
+Add to `shared/src/shared/db.py` — new import at top:
+
+```python
+import json
+import uuid
+from datetime import date
+from shared.models import NewsItem
+from shared.sentiment_models import SymbolScore, MarketSentiment
+from shared.sa_models import NewsItemRow, SymbolScoreRow, MarketSentimentRow, StockSelectionRow
+```
+
+Add these methods to the `Database` class:
+
+```python
+ async def insert_news_item(self, item: NewsItem) -> None:
+ """Insert a news item."""
+ row = NewsItemRow(
+ id=item.id,
+ source=item.source,
+ headline=item.headline,
+ summary=item.summary,
+ url=item.url,
+ published_at=item.published_at,
+ symbols=json.dumps(item.symbols),
+ sentiment=item.sentiment,
+ category=item.category.value,
+ raw_data=json.dumps(item.raw_data),
+ created_at=item.created_at,
+ )
+ async with self._session_factory() as session:
+ try:
+ session.add(row)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
+
+ async def get_recent_news(self, hours: int = 24) -> list[dict]:
+ """Retrieve news items from the last N hours."""
+ since = datetime.now(timezone.utc) - timedelta(hours=hours)
+ stmt = (
+ select(NewsItemRow)
+ .where(NewsItemRow.published_at >= since)
+ .order_by(NewsItemRow.published_at.desc())
+ )
+ async with self._session_factory() as session:
+ result = await session.execute(stmt)
+ rows = result.scalars().all()
+ return [
+ {
+ "id": r.id,
+ "source": r.source,
+ "headline": r.headline,
+ "summary": r.summary,
+ "url": r.url,
+ "published_at": r.published_at,
+ "symbols": json.loads(r.symbols) if r.symbols else [],
+ "sentiment": r.sentiment,
+ "category": r.category,
+ "created_at": r.created_at,
+ }
+ for r in rows
+ ]
+
+ async def upsert_symbol_score(self, score: SymbolScore) -> None:
+ """Insert or update a symbol score."""
+ async with self._session_factory() as session:
+ try:
+ existing = await session.execute(
+ select(SymbolScoreRow).where(SymbolScoreRow.symbol == score.symbol)
+ )
+ row = existing.scalar_one_or_none()
+ if row:
+ row.news_score = score.news_score
+ row.news_count = score.news_count
+ row.social_score = score.social_score
+ row.policy_score = score.policy_score
+ row.filing_score = score.filing_score
+ row.composite = score.composite
+ row.updated_at = score.updated_at
+ else:
+ row = SymbolScoreRow(
+ id=str(uuid.uuid4()),
+ symbol=score.symbol,
+ news_score=score.news_score,
+ news_count=score.news_count,
+ social_score=score.social_score,
+ policy_score=score.policy_score,
+ filing_score=score.filing_score,
+ composite=score.composite,
+ updated_at=score.updated_at,
+ )
+ session.add(row)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
+
+ async def get_top_symbol_scores(self, limit: int = 20) -> list[dict]:
+ """Get top symbol scores ordered by composite descending."""
+ stmt = (
+ select(SymbolScoreRow)
+ .order_by(SymbolScoreRow.composite.desc())
+ .limit(limit)
+ )
+ async with self._session_factory() as session:
+ result = await session.execute(stmt)
+ rows = result.scalars().all()
+ return [
+ {
+ "symbol": r.symbol,
+ "news_score": r.news_score,
+ "news_count": r.news_count,
+ "social_score": r.social_score,
+ "policy_score": r.policy_score,
+ "filing_score": r.filing_score,
+ "composite": r.composite,
+ "updated_at": r.updated_at,
+ }
+ for r in rows
+ ]
+
+ async def upsert_market_sentiment(self, ms: MarketSentiment) -> None:
+ """Insert or update the latest market sentiment (single row, id='latest')."""
+ async with self._session_factory() as session:
+ try:
+ existing = await session.execute(
+ select(MarketSentimentRow).where(MarketSentimentRow.id == "latest")
+ )
+ row = existing.scalar_one_or_none()
+ if row:
+ row.fear_greed = ms.fear_greed
+ row.fear_greed_label = ms.fear_greed_label
+ row.vix = ms.vix
+ row.fed_stance = ms.fed_stance
+ row.market_regime = ms.market_regime
+ row.updated_at = ms.updated_at
+ else:
+ row = MarketSentimentRow(
+ id="latest",
+ fear_greed=ms.fear_greed,
+ fear_greed_label=ms.fear_greed_label,
+ vix=ms.vix,
+ fed_stance=ms.fed_stance,
+ market_regime=ms.market_regime,
+ updated_at=ms.updated_at,
+ )
+ session.add(row)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
+
+ async def get_latest_market_sentiment(self) -> dict | None:
+ """Get the latest market sentiment."""
+ stmt = select(MarketSentimentRow).where(MarketSentimentRow.id == "latest")
+ async with self._session_factory() as session:
+ result = await session.execute(stmt)
+ r = result.scalar_one_or_none()
+ if r is None:
+ return None
+ return {
+ "fear_greed": r.fear_greed,
+ "fear_greed_label": r.fear_greed_label,
+ "vix": r.vix,
+ "fed_stance": r.fed_stance,
+ "market_regime": r.market_regime,
+ "updated_at": r.updated_at,
+ }
+
+ async def insert_stock_selection(
+ self,
+ trade_date: date,
+ symbol: str,
+ side: str,
+ conviction: float,
+ reason: str,
+ key_news: list[str],
+ sentiment_snapshot: dict,
+ ) -> None:
+ """Insert a stock selection record."""
+ row = StockSelectionRow(
+ id=str(uuid.uuid4()),
+ trade_date=trade_date,
+ symbol=symbol,
+ side=side,
+ conviction=conviction,
+ reason=reason,
+ key_news=json.dumps(key_news),
+ sentiment_snapshot=json.dumps(sentiment_snapshot),
+ )
+ async with self._session_factory() as session:
+ try:
+ session.add(row)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
+
+ async def get_stock_selections(self, trade_date: date) -> list[dict]:
+ """Get stock selections for a specific date."""
+ stmt = (
+ select(StockSelectionRow)
+ .where(StockSelectionRow.trade_date == trade_date)
+ .order_by(StockSelectionRow.conviction.desc())
+ )
+ async with self._session_factory() as session:
+ result = await session.execute(stmt)
+ rows = result.scalars().all()
+ return [
+ {
+ "symbol": r.symbol,
+ "side": r.side,
+ "conviction": r.conviction,
+ "reason": r.reason,
+ "key_news": json.loads(r.key_news) if r.key_news else [],
+ "sentiment_snapshot": json.loads(r.sentiment_snapshot) if r.sentiment_snapshot else {},
+ }
+ for r in rows
+ ]
+```
+
+- [ ] **Step 9: Run DB news tests to verify they pass**
+
+Run: `pytest shared/tests/test_db_news.py -v`
+Expected: All 4 tests PASS
+
+Note: These tests require `aiosqlite` package. If not installed: `pip install aiosqlite`
+
+- [ ] **Step 10: Run all shared tests to check no regressions**
+
+Run: `pytest shared/tests/ -v`
+Expected: All tests PASS
+
+- [ ] **Step 11: Commit**
+
+```bash
+git add shared/src/shared/events.py shared/src/shared/db.py shared/tests/test_news_events.py shared/tests/test_db_news.py
+git commit -m "feat: add NewsEvent, DB methods for news/sentiment/selections"
+```
+
+---
+
+### Task 5: Update Settings with new env vars
+
+**Files:**
+- Modify: `shared/src/shared/config.py`
+- Modify: `.env.example`
+
+- [ ] **Step 1: Add new settings to config.py**
+
+Add these fields to the `Settings` class in `shared/src/shared/config.py`:
+
+```python
+ # News collector
+ finnhub_api_key: str = ""
+ news_poll_interval: int = 300
+ sentiment_aggregate_interval: int = 900
+ # Stock selector
+ selector_candidates_time: str = "15:00"
+ selector_filter_time: str = "15:15"
+ selector_final_time: str = "15:30"
+ selector_max_picks: int = 3
+ # LLM
+ anthropic_api_key: str = ""
+ anthropic_model: str = "claude-sonnet-4-20250514"
+```
+
+- [ ] **Step 2: Add to .env.example**
+
+Append to `.env.example`:
+
+```bash
+
+# News Collector
+FINNHUB_API_KEY=
+NEWS_POLL_INTERVAL=300
+SENTIMENT_AGGREGATE_INTERVAL=900
+
+# Stock Selector
+SELECTOR_CANDIDATES_TIME=15:00
+SELECTOR_FILTER_TIME=15:15
+SELECTOR_FINAL_TIME=15:30
+SELECTOR_MAX_PICKS=3
+
+# LLM (for stock selector)
+ANTHROPIC_API_KEY=
+ANTHROPIC_MODEL=claude-sonnet-4-20250514
+```
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add shared/src/shared/config.py .env.example
+git commit -m "feat: add news collector and stock selector config settings"
+```
+
+---
+
+## Phase 2: News Collector Service
+
+### Task 6: Scaffold news-collector service
+
+**Files:**
+- Create: `services/news-collector/pyproject.toml`
+- Create: `services/news-collector/Dockerfile`
+- Create: `services/news-collector/src/news_collector/__init__.py`
+- Create: `services/news-collector/src/news_collector/config.py`
+- Create: `services/news-collector/src/news_collector/collectors/__init__.py`
+- Create: `services/news-collector/src/news_collector/collectors/base.py`
+- Create: `services/news-collector/tests/__init__.py`
+
+- [ ] **Step 1: Create pyproject.toml**
+
+Create `services/news-collector/pyproject.toml`:
+
+```toml
+[project]
+name = "news-collector"
+version = "0.1.0"
+description = "News and sentiment data collector service"
+requires-python = ">=3.12"
+dependencies = [
+ "trading-shared",
+ "feedparser>=6.0",
+ "nltk>=3.8",
+ "aiohttp>=3.9",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest>=8.0",
+ "pytest-asyncio>=0.23",
+ "aioresponses>=0.7",
+]
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/news_collector"]
+```
+
+- [ ] **Step 2: Create Dockerfile**
+
+Create `services/news-collector/Dockerfile`:
+
+```dockerfile
+FROM python:3.12-slim
+WORKDIR /app
+COPY shared/ shared/
+RUN pip install --no-cache-dir ./shared
+COPY services/news-collector/ services/news-collector/
+RUN pip install --no-cache-dir ./services/news-collector
+RUN python -c "import nltk; nltk.download('vader_lexicon', quiet=True)"
+ENV PYTHONPATH=/app
+CMD ["python", "-m", "news_collector.main"]
+```
+
+- [ ] **Step 3: Create config.py**
+
+Create `services/news-collector/src/news_collector/config.py`:
+
+```python
+"""News Collector configuration."""
+
+from shared.config import Settings
+
+
+class NewsCollectorConfig(Settings):
+ health_port: int = 8084
+ finnhub_api_key: str = ""
+ news_poll_interval: int = 300
+ sentiment_aggregate_interval: int = 900
+```
+
+- [ ] **Step 4: Create BaseCollector**
+
+Create `services/news-collector/src/news_collector/collectors/base.py`:
+
+```python
+"""Base class for all news collectors."""
+
+from abc import ABC, abstractmethod
+
+from shared.models import NewsItem
+
+
+class BaseCollector(ABC):
+ name: str = "base"
+ poll_interval: int = 300 # seconds
+
+ @abstractmethod
+ async def collect(self) -> list[NewsItem]:
+ """Collect news items from the source."""
+
+ @abstractmethod
+ async def is_available(self) -> bool:
+ """Check if this data source is accessible."""
+```
+
+- [ ] **Step 5: Create __init__.py files**
+
+Create `services/news-collector/src/news_collector/__init__.py`:
+```python
+"""News collector service."""
+```
+
+Create `services/news-collector/src/news_collector/collectors/__init__.py`:
+```python
+"""News collectors."""
+```
+
+Create `services/news-collector/tests/__init__.py`:
+```python
+```
+
+- [ ] **Step 6: Commit**
+
+```bash
+git add services/news-collector/
+git commit -m "feat: scaffold news-collector service with BaseCollector"
+```
+
+---
+
+### Task 7: Implement Finnhub news collector
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/finnhub.py`
+- Create: `services/news-collector/tests/test_finnhub.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_finnhub.py`:
+
+```python
+"""Tests for Finnhub news collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+from datetime import datetime, timezone
+
+from news_collector.collectors.finnhub import FinnhubCollector
+
+
+@pytest.fixture
+def collector():
+ return FinnhubCollector(api_key="test_key")
+
+
+def test_collector_name(collector):
+ assert collector.name == "finnhub"
+ assert collector.poll_interval == 300
+
+
+async def test_is_available_with_key(collector):
+ assert await collector.is_available() is True
+
+
+async def test_is_available_without_key():
+ c = FinnhubCollector(api_key="")
+ assert await c.is_available() is False
+
+
+async def test_collect_parses_response(collector):
+ mock_response = [
+ {
+ "category": "top news",
+ "datetime": 1711929600,
+ "headline": "AAPL beats earnings",
+ "id": 12345,
+ "related": "AAPL",
+ "source": "MarketWatch",
+ "summary": "Apple reported better than expected...",
+ "url": "https://example.com/article",
+ },
+ {
+ "category": "top news",
+ "datetime": 1711929000,
+ "headline": "Fed holds rates steady",
+ "id": 12346,
+ "related": "",
+ "source": "Reuters",
+ "summary": "The Federal Reserve...",
+ "url": "https://example.com/fed",
+ },
+ ]
+
+ with patch.object(collector, "_fetch_news", new_callable=AsyncMock, return_value=mock_response):
+ items = await collector.collect()
+
+ assert len(items) == 2
+ assert items[0].source == "finnhub"
+ assert items[0].headline == "AAPL beats earnings"
+ assert items[0].symbols == ["AAPL"]
+ assert items[0].url == "https://example.com/article"
+ assert isinstance(items[0].sentiment, float)
+ # Second item has no related ticker
+ assert items[1].symbols == []
+
+
+async def test_collect_handles_empty_response(collector):
+ with patch.object(collector, "_fetch_news", new_callable=AsyncMock, return_value=[]):
+ items = await collector.collect()
+ assert items == []
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_finnhub.py -v`
+Expected: FAIL — module not found
+
+- [ ] **Step 3: Implement FinnhubCollector**
+
+Create `services/news-collector/src/news_collector/collectors/finnhub.py`:
+
+```python
+"""Finnhub market news collector (free tier: 60 req/min)."""
+
+import logging
+from datetime import datetime, timezone
+
+import aiohttp
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+FINNHUB_NEWS_URL = "https://finnhub.io/api/v1/news"
+
+
+class FinnhubCollector(BaseCollector):
+ name = "finnhub"
+ poll_interval = 300 # 5 minutes
+
+ def __init__(self, api_key: str) -> None:
+ self._api_key = api_key
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return bool(self._api_key)
+
+ async def _fetch_news(self) -> list[dict]:
+ """Fetch general news from Finnhub API."""
+ params = {"category": "general", "token": self._api_key}
+ async with aiohttp.ClientSession() as session:
+ async with session.get(FINNHUB_NEWS_URL, params=params) as resp:
+ if resp.status != 200:
+ logger.warning("finnhub_fetch_failed", status=resp.status)
+ return []
+ return await resp.json()
+
+ def _analyze_sentiment(self, text: str) -> float:
+ """Return VADER compound score (-1.0 to 1.0)."""
+ scores = self._vader.polarity_scores(text)
+ return scores["compound"]
+
+ def _extract_symbols(self, related: str) -> list[str]:
+ """Parse Finnhub 'related' field into symbol list."""
+ if not related or not related.strip():
+ return []
+ return [s.strip() for s in related.split(",") if s.strip()]
+
+ def _categorize(self, article: dict) -> NewsCategory:
+ """Determine category from article content."""
+ headline = article.get("headline", "").lower()
+ if any(w in headline for w in ["fed", "fomc", "rate", "inflation"]):
+ return NewsCategory.FED
+ if any(w in headline for w in ["tariff", "sanction", "regulation", "trump", "biden", "congress"]):
+ return NewsCategory.POLICY
+ if any(w in headline for w in ["earnings", "revenue", "profit", "eps"]):
+ return NewsCategory.EARNINGS
+ return NewsCategory.MACRO
+
+ async def collect(self) -> list[NewsItem]:
+ raw = await self._fetch_news()
+ items = []
+ for article in raw:
+ headline = article.get("headline", "")
+ summary = article.get("summary", "")
+ sentiment_text = f"{headline}. {summary}" if summary else headline
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=headline,
+ summary=summary or None,
+ url=article.get("url"),
+ published_at=datetime.fromtimestamp(
+ article.get("datetime", 0), tz=timezone.utc
+ ),
+ symbols=self._extract_symbols(article.get("related", "")),
+ sentiment=self._analyze_sentiment(sentiment_text),
+ category=self._categorize(article),
+ raw_data=article,
+ )
+ )
+ return items
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_finnhub.py -v`
+Expected: All 5 tests PASS
+
+Note: Requires `nltk` and VADER lexicon. If not downloaded: `python -c "import nltk; nltk.download('vader_lexicon')"`
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/finnhub.py services/news-collector/tests/test_finnhub.py
+git commit -m "feat: implement Finnhub news collector with VADER sentiment"
+```
+
+---
+
+### Task 8: Implement RSS news collector
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/rss.py`
+- Create: `services/news-collector/tests/test_rss.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_rss.py`:
+
+```python
+"""Tests for RSS news collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+from datetime import datetime, timezone
+
+from news_collector.collectors.rss import RSSCollector
+
+
+@pytest.fixture
+def collector():
+ return RSSCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "rss"
+ assert collector.poll_interval == 600
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_feed(collector):
+ mock_feed = {
+ "entries": [
+ {
+ "title": "NVDA surges on AI demand",
+ "link": "https://example.com/nvda",
+ "published_parsed": (2026, 4, 2, 12, 0, 0, 0, 0, 0),
+ "summary": "Nvidia stock jumped 5%...",
+ },
+ {
+ "title": "Markets rally on jobs data",
+ "link": "https://example.com/market",
+ "published_parsed": (2026, 4, 2, 11, 0, 0, 0, 0, 0),
+ "summary": "The S&P 500 rose...",
+ },
+ ],
+ }
+
+ with patch.object(collector, "_fetch_feeds", new_callable=AsyncMock, return_value=[mock_feed]):
+ items = await collector.collect()
+
+ assert len(items) == 2
+ assert items[0].source == "rss"
+ assert items[0].headline == "NVDA surges on AI demand"
+ assert isinstance(items[0].sentiment, float)
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_rss.py -v`
+Expected: FAIL
+
+- [ ] **Step 3: Implement RSSCollector**
+
+Create `services/news-collector/src/news_collector/collectors/rss.py`:
+
+```python
+"""RSS feed collector for Yahoo Finance, Google News, MarketWatch."""
+
+import asyncio
+import logging
+import re
+from calendar import timegm
+from datetime import datetime, timezone
+
+import aiohttp
+import feedparser
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+DEFAULT_FEEDS = [
+ "https://feeds.finance.yahoo.com/rss/2.0/headline?s=^GSPC&region=US&lang=en-US",
+ "https://news.google.com/rss/topics/CAAqJggKIiBDQkFTRWdvSUwyMHZNRGx6TVdZU0FtVnVHZ0pWVXlnQVAB?hl=en-US&gl=US&ceid=US:en",
+ "https://www.marketwatch.com/rss/topstories",
+]
+
+# Common US stock tickers to detect in headlines
+TICKER_PATTERN = re.compile(
+ r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|JPM|V|JNJ|WMT|PG|UNH|HD|"
+ r"MA|DIS|BAC|XOM|PFE|KO|PEP|CSCO|INTC|VZ|NFLX|ADBE|CRM|AMD|QCOM|"
+ r"GS|BA|CAT|MMM|IBM|GE|F|GM|NKE|MCD|SBUX|SPY|QQQ|IWM)\b"
+)
+
+
+class RSSCollector(BaseCollector):
+ name = "rss"
+ poll_interval = 600 # 10 minutes
+
+ def __init__(self, feeds: list[str] | None = None) -> None:
+ self._feeds = feeds or DEFAULT_FEEDS
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_feeds(self) -> list[dict]:
+ """Fetch and parse all RSS feeds."""
+ results = []
+ async with aiohttp.ClientSession() as session:
+ for url in self._feeds:
+ try:
+ async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
+ if resp.status == 200:
+ text = await resp.text()
+ feed = feedparser.parse(text)
+ results.append(feed)
+ except Exception as exc:
+ logger.warning("rss_fetch_failed", url=url, error=str(exc))
+ return results
+
+ def _extract_symbols(self, text: str) -> list[str]:
+ """Extract stock tickers from text."""
+ return list(set(TICKER_PATTERN.findall(text)))
+
+ def _parse_time(self, entry: dict) -> datetime:
+ """Parse published time from feed entry."""
+ parsed = entry.get("published_parsed")
+ if parsed:
+ return datetime.fromtimestamp(timegm(parsed), tz=timezone.utc)
+ return datetime.now(timezone.utc)
+
+ async def collect(self) -> list[NewsItem]:
+ feeds = await self._fetch_feeds()
+ items = []
+ seen_titles = set()
+
+ for feed in feeds:
+ for entry in feed.get("entries", []):
+ title = entry.get("title", "").strip()
+ if not title or title in seen_titles:
+ continue
+ seen_titles.add(title)
+
+ summary = entry.get("summary", "")
+ sentiment_text = f"{title}. {summary}" if summary else title
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=title,
+ summary=summary or None,
+ url=entry.get("link"),
+ published_at=self._parse_time(entry),
+ symbols=self._extract_symbols(f"{title} {summary}"),
+ sentiment=self._vader.polarity_scores(sentiment_text)["compound"],
+ category=NewsCategory.MACRO,
+ raw_data={"feed_title": title},
+ )
+ )
+
+ return items
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_rss.py -v`
+Expected: All 3 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/rss.py services/news-collector/tests/test_rss.py
+git commit -m "feat: implement RSS news collector (Yahoo, Google News, MarketWatch)"
+```
+
+---
+
+### Task 9: Implement Fear & Greed Index collector
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/fear_greed.py`
+- Create: `services/news-collector/tests/test_fear_greed.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_fear_greed.py`:
+
+```python
+"""Tests for CNN Fear & Greed Index collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from news_collector.collectors.fear_greed import FearGreedCollector
+
+
+@pytest.fixture
+def collector():
+ return FearGreedCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "fear_greed"
+ assert collector.poll_interval == 3600
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_api_response(collector):
+ mock_data = {
+ "fear_and_greed": {
+ "score": 45.0,
+ "rating": "Fear",
+ "timestamp": "2026-04-02T12:00:00+00:00",
+ }
+ }
+ with patch.object(collector, "_fetch_index", new_callable=AsyncMock, return_value=mock_data):
+ result = await collector.collect()
+
+ assert result.fear_greed == 45
+ assert result.fear_greed_label == "Fear"
+
+
+async def test_collect_returns_none_on_failure(collector):
+ with patch.object(collector, "_fetch_index", new_callable=AsyncMock, return_value=None):
+ result = await collector.collect()
+ assert result is None
+
+
+def test_classify_label():
+ c = FearGreedCollector()
+ assert c._classify(10) == "Extreme Fear"
+ assert c._classify(30) == "Fear"
+ assert c._classify(50) == "Neutral"
+ assert c._classify(70) == "Greed"
+ assert c._classify(85) == "Extreme Greed"
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_fear_greed.py -v`
+Expected: FAIL
+
+- [ ] **Step 3: Implement FearGreedCollector**
+
+Create `services/news-collector/src/news_collector/collectors/fear_greed.py`:
+
+```python
+"""CNN Fear & Greed Index collector."""
+
+import logging
+from dataclasses import dataclass
+from typing import Optional
+
+import aiohttp
+
+from news_collector.collectors.base import BaseCollector
+from shared.models import NewsItem
+
+logger = logging.getLogger(__name__)
+
+FEAR_GREED_URL = "https://production.dataviz.cnn.io/index/fearandgreed/graphdata"
+
+
+@dataclass
+class FearGreedResult:
+ fear_greed: int
+ fear_greed_label: str
+
+
+class FearGreedCollector(BaseCollector):
+ """Fetches CNN Fear & Greed Index.
+
+ Note: This collector does NOT return NewsItem — it returns FearGreedResult
+ which feeds directly into MarketSentiment. The main.py scheduler handles
+ this differently from news collectors.
+ """
+
+ name = "fear_greed"
+ poll_interval = 3600 # 1 hour
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_index(self) -> Optional[dict]:
+ """Fetch Fear & Greed data from CNN API."""
+ headers = {"User-Agent": "Mozilla/5.0"}
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(
+ FEAR_GREED_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
+ ) as resp:
+ if resp.status != 200:
+ logger.warning("fear_greed_fetch_failed", status=resp.status)
+ return None
+ return await resp.json()
+ except Exception as exc:
+ logger.warning("fear_greed_error", error=str(exc))
+ return None
+
+ def _classify(self, score: int) -> str:
+ """Classify numeric score into label."""
+ if score <= 20:
+ return "Extreme Fear"
+ if score <= 40:
+ return "Fear"
+ if score <= 60:
+ return "Neutral"
+ if score <= 80:
+ return "Greed"
+ return "Extreme Greed"
+
+ async def collect(self) -> Optional[FearGreedResult]:
+ """Collect Fear & Greed Index. Returns FearGreedResult or None."""
+ data = await self._fetch_index()
+ if data is None:
+ return None
+
+ try:
+ fg = data["fear_and_greed"]
+ score = int(fg["score"])
+ label = fg.get("rating", self._classify(score))
+ return FearGreedResult(fear_greed=score, fear_greed_label=label)
+ except (KeyError, ValueError, TypeError) as exc:
+ logger.warning("fear_greed_parse_failed", error=str(exc))
+ return None
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_fear_greed.py -v`
+Expected: All 5 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/fear_greed.py services/news-collector/tests/test_fear_greed.py
+git commit -m "feat: implement CNN Fear & Greed Index collector"
+```
+
+---
+
+### Task 10: Implement SEC EDGAR collector
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/sec_edgar.py`
+- Create: `services/news-collector/tests/test_sec_edgar.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_sec_edgar.py`:
+
+```python
+"""Tests for SEC EDGAR filing collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from news_collector.collectors.sec_edgar import SecEdgarCollector
+
+
+@pytest.fixture
+def collector():
+ return SecEdgarCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "sec_edgar"
+ assert collector.poll_interval == 1800
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_filings(collector):
+ mock_response = {
+ "filings": {
+ "recent": {
+ "accessionNumber": ["0001234-26-000001"],
+ "filingDate": ["2026-04-02"],
+ "primaryDocument": ["filing.htm"],
+ "form": ["8-K"],
+ "primaryDocDescription": ["Current Report"],
+ }
+ },
+ "tickers": [{"ticker": "AAPL"}],
+ "name": "Apple Inc",
+ }
+ with patch.object(collector, "_fetch_recent_filings", new_callable=AsyncMock, return_value=[mock_response]):
+ items = await collector.collect()
+
+ assert len(items) == 1
+ assert items[0].source == "sec_edgar"
+ assert items[0].category.value == "filing"
+ assert "AAPL" in items[0].symbols
+
+
+async def test_collect_handles_empty(collector):
+ with patch.object(collector, "_fetch_recent_filings", new_callable=AsyncMock, return_value=[]):
+ items = await collector.collect()
+ assert items == []
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_sec_edgar.py -v`
+Expected: FAIL
+
+- [ ] **Step 3: Implement SecEdgarCollector**
+
+Create `services/news-collector/src/news_collector/collectors/sec_edgar.py`:
+
+```python
+"""SEC EDGAR filing collector (free, no API key required)."""
+
+import logging
+from datetime import datetime, timezone
+
+import aiohttp
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+EDGAR_FULL_TEXT_SEARCH = "https://efts.sec.gov/LATEST/search-index"
+EDGAR_RECENT_FILINGS = "https://efts.sec.gov/LATEST/search-index?q=%228-K%22&dateRange=custom&startdt={date}&enddt={date}&forms=8-K"
+EDGAR_COMPANY_FILINGS = "https://data.sec.gov/submissions/CIK{cik}.json"
+
+# CIK numbers for major companies (subset — extend as needed)
+TRACKED_CIKS = {
+ "0000320193": "AAPL",
+ "0000789019": "MSFT",
+ "0001652044": "GOOGL",
+ "0001018724": "AMZN",
+ "0001318605": "TSLA",
+ "0001045810": "NVDA",
+ "0001326801": "META",
+ "0000019617": "JPM",
+ "0000078003": "PFE",
+ "0000021344": "KO",
+}
+
+SEC_USER_AGENT = "TradingPlatform research@example.com"
+
+
+class SecEdgarCollector(BaseCollector):
+ name = "sec_edgar"
+ poll_interval = 1800 # 30 minutes
+
+ def __init__(self) -> None:
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_recent_filings(self) -> list[dict]:
+ """Fetch recent 8-K filings for tracked companies."""
+ results = []
+ headers = {"User-Agent": SEC_USER_AGENT}
+ async with aiohttp.ClientSession() as session:
+ for cik, ticker in TRACKED_CIKS.items():
+ try:
+ url = f"https://data.sec.gov/submissions/CIK{cik}.json"
+ async with session.get(
+ url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
+ ) as resp:
+ if resp.status == 200:
+ data = await resp.json()
+ data["tickers"] = [{"ticker": ticker}]
+ results.append(data)
+ except Exception as exc:
+ logger.warning("sec_fetch_failed", cik=cik, error=str(exc))
+ return results
+
+ async def collect(self) -> list[NewsItem]:
+ filings_data = await self._fetch_recent_filings()
+ items = []
+ today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
+
+ for company_data in filings_data:
+ tickers = [t["ticker"] for t in company_data.get("tickers", [])]
+ company_name = company_data.get("name", "Unknown")
+ recent = company_data.get("filings", {}).get("recent", {})
+
+ forms = recent.get("form", [])
+ dates = recent.get("filingDate", [])
+ descriptions = recent.get("primaryDocDescription", [])
+ accessions = recent.get("accessionNumber", [])
+
+ for i, form in enumerate(forms):
+ if form != "8-K":
+ continue
+ filing_date = dates[i] if i < len(dates) else ""
+ if filing_date != today:
+ continue
+
+ desc = descriptions[i] if i < len(descriptions) else "8-K Filing"
+ accession = accessions[i] if i < len(accessions) else ""
+ headline = f"{company_name} ({', '.join(tickers)}): {form} - {desc}"
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=headline,
+ summary=desc,
+ url=f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&accession={accession}",
+ published_at=datetime.strptime(filing_date, "%Y-%m-%d").replace(tzinfo=timezone.utc),
+ symbols=tickers,
+ sentiment=self._vader.polarity_scores(headline)["compound"],
+ category=NewsCategory.FILING,
+ raw_data={"form": form, "accession": accession},
+ )
+ )
+
+ return items
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_sec_edgar.py -v`
+Expected: All 4 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/sec_edgar.py services/news-collector/tests/test_sec_edgar.py
+git commit -m "feat: implement SEC EDGAR 8-K filing collector"
+```
+
+---
+
+### Task 11: Implement Reddit collector
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/reddit.py`
+- Create: `services/news-collector/tests/test_reddit.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_reddit.py`:
+
+```python
+"""Tests for Reddit collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from news_collector.collectors.reddit import RedditCollector
+
+
+@pytest.fixture
+def collector():
+ return RedditCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "reddit"
+ assert collector.poll_interval == 900
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_posts(collector):
+ mock_posts = [
+ {
+ "data": {
+ "title": "NVDA to the moon! 🚀 AI demand is insane",
+ "selftext": "Just loaded up on NVDA calls",
+ "url": "https://reddit.com/r/wallstreetbets/123",
+ "created_utc": 1711929600,
+ "score": 500,
+ "num_comments": 200,
+ "subreddit": "wallstreetbets",
+ }
+ },
+ ]
+ with patch.object(collector, "_fetch_subreddit", new_callable=AsyncMock, return_value=mock_posts):
+ items = await collector.collect()
+
+ assert len(items) >= 1
+ assert items[0].source == "reddit"
+ assert items[0].category.value == "social"
+ assert isinstance(items[0].sentiment, float)
+
+
+async def test_collect_filters_low_score(collector):
+ mock_posts = [
+ {
+ "data": {
+ "title": "Random question about stocks",
+ "selftext": "",
+ "url": "https://reddit.com/r/stocks/456",
+ "created_utc": 1711929600,
+ "score": 3,
+ "num_comments": 1,
+ "subreddit": "stocks",
+ }
+ },
+ ]
+ with patch.object(collector, "_fetch_subreddit", new_callable=AsyncMock, return_value=mock_posts):
+ items = await collector.collect()
+ assert items == []
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_reddit.py -v`
+Expected: FAIL
+
+- [ ] **Step 3: Implement RedditCollector**
+
+Create `services/news-collector/src/news_collector/collectors/reddit.py`:
+
+```python
+"""Reddit collector for r/wallstreetbets, r/stocks, r/investing."""
+
+import logging
+import re
+from datetime import datetime, timezone
+
+import aiohttp
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+SUBREDDITS = ["wallstreetbets", "stocks", "investing"]
+MIN_SCORE = 50 # Minimum upvotes to consider
+
+TICKER_PATTERN = re.compile(
+ r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|JPM|V|JNJ|WMT|PG|UNH|HD|"
+ r"MA|DIS|BAC|XOM|PFE|KO|PEP|CSCO|INTC|VZ|NFLX|ADBE|CRM|AMD|QCOM|"
+ r"GS|BA|CAT|MMM|IBM|GE|F|GM|NKE|MCD|SBUX|SPY|QQQ|IWM)\b"
+)
+
+
+class RedditCollector(BaseCollector):
+ name = "reddit"
+ poll_interval = 900 # 15 minutes
+
+ def __init__(self) -> None:
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_subreddit(self, subreddit: str = "wallstreetbets") -> list[dict]:
+ """Fetch hot posts from a subreddit via JSON API."""
+ url = f"https://www.reddit.com/r/{subreddit}/hot.json?limit=25"
+ headers = {"User-Agent": "TradingPlatform/1.0"}
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(
+ url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
+ ) as resp:
+ if resp.status != 200:
+ logger.warning("reddit_fetch_failed", subreddit=subreddit, status=resp.status)
+ return []
+ data = await resp.json()
+ return data.get("data", {}).get("children", [])
+ except Exception as exc:
+ logger.warning("reddit_error", subreddit=subreddit, error=str(exc))
+ return []
+
+ async def collect(self) -> list[NewsItem]:
+ items = []
+ seen_titles = set()
+
+ for subreddit in SUBREDDITS:
+ posts = await self._fetch_subreddit(subreddit)
+ for post in posts:
+ data = post.get("data", {})
+ title = data.get("title", "").strip()
+ score = data.get("score", 0)
+
+ if not title or title in seen_titles or score < MIN_SCORE:
+ continue
+ seen_titles.add(title)
+
+ selftext = data.get("selftext", "")
+ text = f"{title}. {selftext}" if selftext else title
+ symbols = list(set(TICKER_PATTERN.findall(text)))
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=title,
+ summary=selftext[:500] if selftext else None,
+ url=data.get("url"),
+ published_at=datetime.fromtimestamp(
+ data.get("created_utc", 0), tz=timezone.utc
+ ),
+ symbols=symbols,
+ sentiment=self._vader.polarity_scores(text)["compound"],
+ category=NewsCategory.SOCIAL,
+ raw_data={
+ "subreddit": data.get("subreddit", subreddit),
+ "score": score,
+ "num_comments": data.get("num_comments", 0),
+ },
+ )
+ )
+
+ return items
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_reddit.py -v`
+Expected: All 4 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/reddit.py services/news-collector/tests/test_reddit.py
+git commit -m "feat: implement Reddit social sentiment collector"
+```
+
+---
+
+### Task 12: Implement Truth Social and Fed collectors
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/truth_social.py`
+- Create: `services/news-collector/src/news_collector/collectors/fed.py`
+- Create: `services/news-collector/tests/test_truth_social.py`
+- Create: `services/news-collector/tests/test_fed.py`
+
+- [ ] **Step 1: Write tests for Truth Social**
+
+Create `services/news-collector/tests/test_truth_social.py`:
+
+```python
+"""Tests for Truth Social collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from news_collector.collectors.truth_social import TruthSocialCollector
+
+
+@pytest.fixture
+def collector():
+ return TruthSocialCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "truth_social"
+ assert collector.poll_interval == 900
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_posts(collector):
+ mock_posts = [
+ {
+ "content": "We are imposing 25% tariffs on all steel imports!",
+ "created_at": "2026-04-02T12:00:00.000Z",
+ "url": "https://truthsocial.com/@realDonaldTrump/12345",
+ },
+ ]
+ with patch.object(collector, "_fetch_posts", new_callable=AsyncMock, return_value=mock_posts):
+ items = await collector.collect()
+
+ assert len(items) == 1
+ assert items[0].source == "truth_social"
+ assert items[0].category.value == "policy"
+ assert "tariff" in items[0].headline.lower() or "tariff" in items[0].raw_data.get("content", "").lower()
+
+
+async def test_collect_handles_empty(collector):
+ with patch.object(collector, "_fetch_posts", new_callable=AsyncMock, return_value=[]):
+ items = await collector.collect()
+ assert items == []
+```
+
+- [ ] **Step 2: Write tests for Fed collector**
+
+Create `services/news-collector/tests/test_fed.py`:
+
+```python
+"""Tests for Federal Reserve collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from news_collector.collectors.fed import FedCollector
+
+
+@pytest.fixture
+def collector():
+ return FedCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "fed"
+ assert collector.poll_interval == 3600
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_rss(collector):
+ mock_entries = [
+ {
+ "title": "Federal Reserve issues FOMC statement",
+ "link": "https://www.federalreserve.gov/newsevents/pressreleases/monetary20260402a.htm",
+ "published_parsed": (2026, 4, 2, 14, 0, 0, 0, 0, 0),
+ "summary": "The Federal Open Market Committee decided to maintain the target range...",
+ },
+ ]
+ with patch.object(collector, "_fetch_fed_rss", new_callable=AsyncMock, return_value=mock_entries):
+ items = await collector.collect()
+
+ assert len(items) == 1
+ assert items[0].source == "fed"
+ assert items[0].category.value == "fed"
+```
+
+- [ ] **Step 3: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py -v`
+Expected: FAIL
+
+- [ ] **Step 4: Implement TruthSocialCollector**
+
+Create `services/news-collector/src/news_collector/collectors/truth_social.py`:
+
+```python
+"""Truth Social collector for Trump posts (policy-relevant)."""
+
+import logging
+from datetime import datetime, timezone
+
+import aiohttp
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+# Truth Social uses a Mastodon-compatible API
+TRUTH_SOCIAL_API = "https://truthsocial.com/api/v1/accounts/107780257626128497/statuses"
+
+
+class TruthSocialCollector(BaseCollector):
+ name = "truth_social"
+ poll_interval = 900 # 15 minutes
+
+ def __init__(self) -> None:
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_posts(self) -> list[dict]:
+ """Fetch recent posts from Truth Social."""
+ headers = {"User-Agent": "Mozilla/5.0"}
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(
+ TRUTH_SOCIAL_API,
+ headers=headers,
+ params={"limit": 10},
+ timeout=aiohttp.ClientTimeout(total=15),
+ ) as resp:
+ if resp.status != 200:
+ logger.warning("truth_social_fetch_failed", status=resp.status)
+ return []
+ return await resp.json()
+ except Exception as exc:
+ logger.warning("truth_social_error", error=str(exc))
+ return []
+
+ def _strip_html(self, text: str) -> str:
+ """Remove HTML tags from content."""
+ import re
+ return re.sub(r"<[^>]+>", "", text).strip()
+
+ async def collect(self) -> list[NewsItem]:
+ posts = await self._fetch_posts()
+ items = []
+
+ for post in posts:
+ content = self._strip_html(post.get("content", ""))
+ if not content:
+ continue
+
+ created_at_str = post.get("created_at", "")
+ try:
+ published = datetime.fromisoformat(created_at_str.replace("Z", "+00:00"))
+ except (ValueError, AttributeError):
+ published = datetime.now(timezone.utc)
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=content[:200],
+ summary=content if len(content) > 200 else None,
+ url=post.get("url"),
+ published_at=published,
+ symbols=[], # Symbols extracted at aggregation stage via LLM
+ sentiment=self._vader.polarity_scores(content)["compound"],
+ category=NewsCategory.POLICY,
+ raw_data={"content": content, "id": post.get("id")},
+ )
+ )
+
+ return items
+```
+
+- [ ] **Step 5: Implement FedCollector**
+
+Create `services/news-collector/src/news_collector/collectors/fed.py`:
+
+```python
+"""Federal Reserve press release and FOMC statement collector."""
+
+import logging
+from calendar import timegm
+from datetime import datetime, timezone
+
+import aiohttp
+import feedparser
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+FED_RSS_URL = "https://www.federalreserve.gov/feeds/press_all.xml"
+
+
+class FedCollector(BaseCollector):
+ name = "fed"
+ poll_interval = 3600 # 1 hour
+
+ def __init__(self) -> None:
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_fed_rss(self) -> list[dict]:
+ """Fetch Federal Reserve RSS feed entries."""
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(
+ FED_RSS_URL, timeout=aiohttp.ClientTimeout(total=10)
+ ) as resp:
+ if resp.status != 200:
+ logger.warning("fed_rss_failed", status=resp.status)
+ return []
+ text = await resp.text()
+ feed = feedparser.parse(text)
+ return feed.get("entries", [])
+ except Exception as exc:
+ logger.warning("fed_rss_error", error=str(exc))
+ return []
+
+ def _detect_stance(self, text: str) -> str:
+ """Detect hawkish/dovish/neutral stance from text."""
+ text_lower = text.lower()
+ hawkish_words = ["tighten", "raise", "inflation concern", "restrictive", "higher rates"]
+ dovish_words = ["accommodate", "cut", "easing", "lower rates", "support growth"]
+
+ hawk_count = sum(1 for w in hawkish_words if w in text_lower)
+ dove_count = sum(1 for w in dovish_words if w in text_lower)
+
+ if hawk_count > dove_count:
+ return "hawkish"
+ if dove_count > hawk_count:
+ return "dovish"
+ return "neutral"
+
+ async def collect(self) -> list[NewsItem]:
+ entries = await self._fetch_fed_rss()
+ items = []
+
+ for entry in entries:
+ title = entry.get("title", "").strip()
+ if not title:
+ continue
+
+ summary = entry.get("summary", "")
+ parsed_time = entry.get("published_parsed")
+ if parsed_time:
+ published = datetime.fromtimestamp(timegm(parsed_time), tz=timezone.utc)
+ else:
+ published = datetime.now(timezone.utc)
+
+ text = f"{title}. {summary}" if summary else title
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=title,
+ summary=summary or None,
+ url=entry.get("link"),
+ published_at=published,
+ symbols=[],
+ sentiment=self._vader.polarity_scores(text)["compound"],
+ category=NewsCategory.FED,
+ raw_data={"stance": self._detect_stance(text)},
+ )
+ )
+
+ return items
+```
+
+- [ ] **Step 6: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py -v`
+Expected: All 7 tests PASS
+
+- [ ] **Step 7: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/truth_social.py services/news-collector/src/news_collector/collectors/fed.py services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py
+git commit -m "feat: implement Truth Social and Federal Reserve collectors"
+```
+
+---
+
+### Task 13: Implement news-collector main.py (scheduler)
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/main.py`
+- Create: `services/news-collector/tests/test_main.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_main.py`:
+
+```python
+"""Tests for news collector scheduler."""
+
+import pytest
+from unittest.mock import AsyncMock, patch, MagicMock
+from datetime import datetime, timezone
+
+from shared.models import NewsCategory, NewsItem
+
+from news_collector.main import run_collector_once
+
+
+async def test_run_collector_once_stores_and_publishes():
+ mock_item = NewsItem(
+ source="test",
+ headline="Test news",
+ published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ sentiment=0.5,
+ category=NewsCategory.MACRO,
+ )
+
+ mock_collector = MagicMock()
+ mock_collector.name = "test"
+ mock_collector.collect = AsyncMock(return_value=[mock_item])
+
+ mock_db = MagicMock()
+ mock_db.insert_news_item = AsyncMock()
+
+ mock_broker = MagicMock()
+ mock_broker.publish = AsyncMock()
+
+ count = await run_collector_once(mock_collector, mock_db, mock_broker)
+
+ assert count == 1
+ mock_db.insert_news_item.assert_called_once_with(mock_item)
+ mock_broker.publish.assert_called_once()
+
+
+async def test_run_collector_once_handles_empty():
+ mock_collector = MagicMock()
+ mock_collector.name = "test"
+ mock_collector.collect = AsyncMock(return_value=[])
+
+ mock_db = MagicMock()
+ mock_broker = MagicMock()
+
+ count = await run_collector_once(mock_collector, mock_db, mock_broker)
+ assert count == 0
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_main.py -v`
+Expected: FAIL
+
+- [ ] **Step 3: Implement main.py**
+
+Create `services/news-collector/src/news_collector/main.py`:
+
+```python
+"""News Collector Service — schedules and runs all news collectors."""
+
+import asyncio
+import logging
+
+from shared.broker import RedisBroker
+from shared.db import Database
+from shared.events import NewsEvent
+from shared.healthcheck import HealthCheckServer
+from shared.logging import setup_logging
+from shared.metrics import ServiceMetrics
+from shared.models import NewsItem
+from shared.notifier import TelegramNotifier
+from shared.sentiment_models import MarketSentiment
+
+from news_collector.config import NewsCollectorConfig
+from news_collector.collectors.base import BaseCollector
+from news_collector.collectors.finnhub import FinnhubCollector
+from news_collector.collectors.rss import RSSCollector
+from news_collector.collectors.sec_edgar import SecEdgarCollector
+from news_collector.collectors.truth_social import TruthSocialCollector
+from news_collector.collectors.reddit import RedditCollector
+from news_collector.collectors.fear_greed import FearGreedCollector
+from news_collector.collectors.fed import FedCollector
+
+logger = logging.getLogger(__name__)
+
+
+async def run_collector_once(
+ collector: BaseCollector,
+ db: Database,
+ broker: RedisBroker,
+) -> int:
+ """Run a single collector, store results, publish to Redis.
+ Returns number of items collected."""
+ items = await collector.collect()
+ if not isinstance(items, list):
+ # FearGreedCollector returns a FearGreedResult, not a list
+ return 0
+
+ for item in items:
+ await db.insert_news_item(item)
+ event = NewsEvent(data=item)
+ await broker.publish("news", event.to_dict())
+
+ return len(items)
+
+
+async def run_collector_loop(
+ collector: BaseCollector,
+ db: Database,
+ broker: RedisBroker,
+ log,
+) -> None:
+ """Run a collector on its poll interval forever."""
+ while True:
+ try:
+ if await collector.is_available():
+ count = await run_collector_once(collector, db, broker)
+ log.info("collector_run", collector=collector.name, items=count)
+ else:
+ log.debug("collector_unavailable", collector=collector.name)
+ except Exception as exc:
+ log.error("collector_error", collector=collector.name, error=str(exc))
+ await asyncio.sleep(collector.poll_interval)
+
+
+async def run_fear_greed_loop(
+ collector: FearGreedCollector,
+ db: Database,
+ log,
+) -> None:
+ """Run the Fear & Greed collector and update market sentiment."""
+ from datetime import datetime, timezone
+
+ while True:
+ try:
+ result = await collector.collect()
+ if result is not None:
+ ms = MarketSentiment(
+ fear_greed=result.fear_greed,
+ fear_greed_label=result.fear_greed_label,
+ fed_stance="neutral", # Updated by Fed collector analysis
+ market_regime=_determine_regime(result.fear_greed, None),
+ updated_at=datetime.now(timezone.utc),
+ )
+ await db.upsert_market_sentiment(ms)
+ log.info("fear_greed_updated", score=result.fear_greed, label=result.fear_greed_label)
+ except Exception as exc:
+ log.error("fear_greed_error", error=str(exc))
+ await asyncio.sleep(collector.poll_interval)
+
+
+async def run_aggregator_loop(
+ db: Database,
+ interval: int,
+ log,
+) -> None:
+ """Run sentiment aggregation every `interval` seconds.
+ Reads recent news from DB, computes per-symbol scores, upserts into symbol_scores table."""
+ from datetime import datetime, timezone
+ from shared.sentiment import SentimentAggregator
+
+ aggregator = SentimentAggregator()
+
+ while True:
+ try:
+ now = datetime.now(timezone.utc)
+ news_items = await db.get_recent_news(hours=24)
+ if news_items:
+ scores = aggregator.aggregate(news_items, now)
+ for symbol_score in scores.values():
+ await db.upsert_symbol_score(symbol_score)
+ log.info("aggregation_complete", symbols=len(scores))
+ except Exception as exc:
+ log.error("aggregation_error", error=str(exc))
+ await asyncio.sleep(interval)
+
+
+def _determine_regime(fear_greed: int, vix: float | None) -> str:
+ """Determine market regime from Fear & Greed and VIX."""
+ if fear_greed <= 20:
+ return "risk_off"
+ if vix is not None and vix > 30:
+ return "risk_off"
+ if fear_greed >= 60 and (vix is None or vix < 20):
+ return "risk_on"
+ return "neutral"
+
+
+async def run() -> None:
+ config = NewsCollectorConfig()
+ log = setup_logging("news-collector", config.log_level, config.log_format)
+ metrics = ServiceMetrics("news_collector")
+
+ notifier = TelegramNotifier(
+ bot_token=config.telegram_bot_token,
+ chat_id=config.telegram_chat_id,
+ )
+
+ db = Database(config.database_url)
+ await db.connect()
+
+ broker = RedisBroker(config.redis_url)
+
+ health = HealthCheckServer(
+ "news-collector",
+ port=config.health_port,
+ auth_token=config.metrics_auth_token,
+ )
+ health.register_check("redis", broker.ping)
+ await health.start()
+ metrics.service_up.labels(service="news-collector").set(1)
+
+ # Initialize collectors
+ news_collectors: list[BaseCollector] = [
+ RSSCollector(),
+ SecEdgarCollector(),
+ TruthSocialCollector(),
+ RedditCollector(),
+ FedCollector(),
+ ]
+
+ # Finnhub requires API key
+ if config.finnhub_api_key:
+ news_collectors.append(FinnhubCollector(api_key=config.finnhub_api_key))
+
+ fear_greed = FearGreedCollector()
+
+ log.info(
+ "starting",
+ collectors=[c.name for c in news_collectors],
+ fear_greed=True,
+ )
+
+ tasks = []
+ try:
+ for collector in news_collectors:
+ task = asyncio.create_task(run_collector_loop(collector, db, broker, log))
+ tasks.append(task)
+
+ tasks.append(asyncio.create_task(run_fear_greed_loop(fear_greed, db, log)))
+ tasks.append(asyncio.create_task(
+ run_aggregator_loop(db, config.sentiment_aggregate_interval, log)
+ ))
+
+ await asyncio.gather(*tasks)
+ except Exception as exc:
+ log.error("fatal_error", error=str(exc))
+ await notifier.send_error(str(exc), "news-collector")
+ raise
+ finally:
+ for task in tasks:
+ task.cancel()
+ metrics.service_up.labels(service="news-collector").set(0)
+ await notifier.close()
+ await broker.close()
+ await db.close()
+
+
+def main() -> None:
+ asyncio.run(run())
+
+
+if __name__ == "__main__":
+ main()
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_main.py -v`
+Expected: All 2 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/main.py services/news-collector/tests/test_main.py
+git commit -m "feat: implement news-collector main scheduler with all collectors"
+```
+
+---
+
+## Phase 3: Sentiment Analysis Pipeline
+
+### Task 14: Implement sentiment aggregator
+
+**Files:**
+- Modify: `shared/src/shared/sentiment.py`
+- Create: `shared/tests/test_sentiment_aggregator.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `shared/tests/test_sentiment_aggregator.py`:
+
+```python
+"""Tests for sentiment aggregator."""
+
+import pytest
+from datetime import datetime, timezone, timedelta
+
+from shared.sentiment import SentimentAggregator
+
+
+@pytest.fixture
+def aggregator():
+ return SentimentAggregator()
+
+
+def test_freshness_decay_recent():
+ a = SentimentAggregator()
+ now = datetime.now(timezone.utc)
+ assert a._freshness_decay(now, now) == 1.0
+
+
+def test_freshness_decay_3_hours():
+ a = SentimentAggregator()
+ now = datetime.now(timezone.utc)
+ three_hours_ago = now - timedelta(hours=3)
+ assert a._freshness_decay(three_hours_ago, now) == 0.7
+
+
+def test_freshness_decay_12_hours():
+ a = SentimentAggregator()
+ now = datetime.now(timezone.utc)
+ twelve_hours_ago = now - timedelta(hours=12)
+ assert a._freshness_decay(twelve_hours_ago, now) == 0.3
+
+
+def test_freshness_decay_old():
+ a = SentimentAggregator()
+ now = datetime.now(timezone.utc)
+ two_days_ago = now - timedelta(days=2)
+ assert a._freshness_decay(two_days_ago, now) == 0.0
+
+
+def test_compute_composite():
+ a = SentimentAggregator()
+ composite = a._compute_composite(
+ news_score=0.5,
+ social_score=0.3,
+ policy_score=0.8,
+ filing_score=0.2,
+ )
+ expected = 0.5 * 0.3 + 0.3 * 0.2 + 0.8 * 0.3 + 0.2 * 0.2
+ assert abs(composite - expected) < 0.001
+
+
+def test_aggregate_news_by_symbol(aggregator):
+ now = datetime.now(timezone.utc)
+ news_items = [
+ {
+ "symbols": ["AAPL"],
+ "sentiment": 0.8,
+ "category": "earnings",
+ "published_at": now,
+ },
+ {
+ "symbols": ["AAPL"],
+ "sentiment": 0.3,
+ "category": "macro",
+ "published_at": now - timedelta(hours=2),
+ },
+ {
+ "symbols": ["MSFT"],
+ "sentiment": -0.5,
+ "category": "policy",
+ "published_at": now,
+ },
+ ]
+ scores = aggregator.aggregate(news_items, now)
+
+ assert "AAPL" in scores
+ assert "MSFT" in scores
+ assert scores["AAPL"].news_count == 2
+ assert scores["AAPL"].news_score > 0 # Positive overall
+ assert scores["MSFT"].policy_score < 0 # Negative policy
+
+
+def test_aggregate_empty(aggregator):
+ now = datetime.now(timezone.utc)
+ scores = aggregator.aggregate([], now)
+ assert scores == {}
+
+
+def test_determine_regime():
+ a = SentimentAggregator()
+ assert a.determine_regime(15, None) == "risk_off"
+ assert a.determine_regime(15, 35.0) == "risk_off"
+ assert a.determine_regime(50, 35.0) == "risk_off"
+ assert a.determine_regime(70, 15.0) == "risk_on"
+ assert a.determine_regime(50, 20.0) == "neutral"
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest shared/tests/test_sentiment_aggregator.py -v`
+Expected: FAIL — `SentimentAggregator` not in sentiment.py
+
+- [ ] **Step 3: Add SentimentAggregator to sentiment.py**
+
+Keep the existing `SentimentData` class (for backward compat with existing tests). Add `SentimentAggregator` class at the end of `shared/src/shared/sentiment.py`:
+
+```python
+from datetime import timedelta
+from shared.sentiment_models import SymbolScore
+
+
+class SentimentAggregator:
+ """Aggregates per-news sentiment into per-symbol scores."""
+
+ # Weights: policy events are most impactful for US stocks
+ WEIGHTS = {
+ "news": 0.3,
+ "social": 0.2,
+ "policy": 0.3,
+ "filing": 0.2,
+ }
+
+ # Category → score field mapping
+ CATEGORY_MAP = {
+ "earnings": "news",
+ "macro": "news",
+ "social": "social",
+ "policy": "policy",
+ "filing": "filing",
+ "fed": "policy",
+ }
+
+ def _freshness_decay(self, published_at: datetime, now: datetime) -> float:
+ """Compute freshness decay factor."""
+ age = now - published_at
+ hours = age.total_seconds() / 3600
+ if hours < 1:
+ return 1.0
+ if hours < 6:
+ return 0.7
+ if hours < 24:
+ return 0.3
+ return 0.0
+
+ def _compute_composite(
+ self,
+ news_score: float,
+ social_score: float,
+ policy_score: float,
+ filing_score: float,
+ ) -> float:
+ return (
+ news_score * self.WEIGHTS["news"]
+ + social_score * self.WEIGHTS["social"]
+ + policy_score * self.WEIGHTS["policy"]
+ + filing_score * self.WEIGHTS["filing"]
+ )
+
+ def aggregate(
+ self, news_items: list[dict], now: datetime
+ ) -> dict[str, SymbolScore]:
+ """Aggregate news items into per-symbol scores.
+
+ Each news_items dict must have: symbols, sentiment, category, published_at.
+ Returns dict mapping symbol → SymbolScore.
+ """
+ # Accumulate per-symbol, per-category
+ symbol_data: dict[str, dict] = {}
+
+ for item in news_items:
+ decay = self._freshness_decay(item["published_at"], now)
+ if decay == 0.0:
+ continue
+
+ category = item.get("category", "macro")
+ score_field = self.CATEGORY_MAP.get(category, "news")
+ weighted_sentiment = item["sentiment"] * decay
+
+ for symbol in item.get("symbols", []):
+ if symbol not in symbol_data:
+ symbol_data[symbol] = {
+ "news_scores": [],
+ "social_scores": [],
+ "policy_scores": [],
+ "filing_scores": [],
+ "count": 0,
+ }
+
+ symbol_data[symbol][f"{score_field}_scores"].append(weighted_sentiment)
+ symbol_data[symbol]["count"] += 1
+
+ # Compute averages and composites
+ result = {}
+ for symbol, data in symbol_data.items():
+ news_score = _safe_avg(data["news_scores"])
+ social_score = _safe_avg(data["social_scores"])
+ policy_score = _safe_avg(data["policy_scores"])
+ filing_score = _safe_avg(data["filing_scores"])
+
+ result[symbol] = SymbolScore(
+ symbol=symbol,
+ news_score=news_score,
+ news_count=data["count"],
+ social_score=social_score,
+ policy_score=policy_score,
+ filing_score=filing_score,
+ composite=self._compute_composite(
+ news_score, social_score, policy_score, filing_score
+ ),
+ updated_at=now,
+ )
+
+ return result
+
+ def determine_regime(self, fear_greed: int, vix: float | None) -> str:
+ """Determine market regime."""
+ if fear_greed <= 20:
+ return "risk_off"
+ if vix is not None and vix > 30:
+ return "risk_off"
+ if fear_greed >= 60 and (vix is None or vix < 20):
+ return "risk_on"
+ return "neutral"
+
+
+def _safe_avg(values: list[float]) -> float:
+ """Return average of values, or 0.0 if empty."""
+ if not values:
+ return 0.0
+ return sum(values) / len(values)
+```
+
+- [ ] **Step 4: Run new tests to verify they pass**
+
+Run: `pytest shared/tests/test_sentiment_aggregator.py -v`
+Expected: All 9 tests PASS
+
+- [ ] **Step 5: Run existing sentiment tests for regressions**
+
+Run: `pytest shared/tests/test_sentiment.py -v`
+Expected: All existing tests PASS (SentimentData unchanged)
+
+- [ ] **Step 6: Commit**
+
+```bash
+git add shared/src/shared/sentiment.py shared/tests/test_sentiment_aggregator.py
+git commit -m "feat: implement SentimentAggregator with freshness decay and composite scoring"
+```
+
+---
+
+## Phase 4: Stock Selector Engine
+
+### Task 15: Implement stock selector
+
+**Files:**
+- Create: `services/strategy-engine/src/strategy_engine/stock_selector.py`
+- Create: `services/strategy-engine/tests/test_stock_selector.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/strategy-engine/tests/test_stock_selector.py`:
+
+```python
+"""Tests for stock selector engine."""
+
+import pytest
+from unittest.mock import AsyncMock, MagicMock, patch
+from datetime import datetime, timezone
+from decimal import Decimal
+
+from shared.models import OrderSide
+from shared.sentiment_models import SymbolScore, MarketSentiment, SelectedStock, Candidate
+
+from strategy_engine.stock_selector import (
+ SentimentCandidateSource,
+ StockSelector,
+ _parse_llm_selections,
+)
+
+
+async def test_sentiment_candidate_source():
+ mock_db = MagicMock()
+ mock_db.get_top_symbol_scores = AsyncMock(return_value=[
+ {"symbol": "AAPL", "composite": 0.8, "news_count": 5},
+ {"symbol": "NVDA", "composite": 0.6, "news_count": 3},
+ ])
+
+ source = SentimentCandidateSource(mock_db)
+ candidates = await source.get_candidates()
+
+ assert len(candidates) == 2
+ assert candidates[0].symbol == "AAPL"
+ assert candidates[0].source == "sentiment"
+
+
+def test_parse_llm_selections_valid():
+ llm_response = """
+ [
+ {"symbol": "NVDA", "side": "BUY", "conviction": 0.85, "reason": "AI demand", "key_news": ["NVDA beats earnings"]},
+ {"symbol": "XOM", "side": "BUY", "conviction": 0.72, "reason": "Oil surge", "key_news": ["Oil prices up"]}
+ ]
+ """
+ selections = _parse_llm_selections(llm_response)
+ assert len(selections) == 2
+ assert selections[0].symbol == "NVDA"
+ assert selections[0].conviction == 0.85
+
+
+def test_parse_llm_selections_invalid():
+ selections = _parse_llm_selections("not json")
+ assert selections == []
+
+
+def test_parse_llm_selections_with_markdown():
+ llm_response = """
+ Here are my picks:
+ ```json
+ [
+ {"symbol": "TSLA", "side": "BUY", "conviction": 0.7, "reason": "Momentum", "key_news": ["Tesla rally"]}
+ ]
+ ```
+ """
+ selections = _parse_llm_selections(llm_response)
+ assert len(selections) == 1
+ assert selections[0].symbol == "TSLA"
+
+
+async def test_selector_blocks_on_risk_off():
+ mock_db = MagicMock()
+ mock_db.get_latest_market_sentiment = AsyncMock(return_value={
+ "fear_greed": 15,
+ "fear_greed_label": "Extreme Fear",
+ "vix": 35.0,
+ "fed_stance": "neutral",
+ "market_regime": "risk_off",
+ "updated_at": datetime.now(timezone.utc),
+ })
+
+ selector = StockSelector(db=mock_db, broker=MagicMock(), alpaca=MagicMock(), anthropic_api_key="test")
+ result = await selector.select()
+ assert result == []
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/strategy-engine/tests/test_stock_selector.py -v`
+Expected: FAIL — module not found
+
+- [ ] **Step 3: Implement StockSelector**
+
+Create `services/strategy-engine/src/strategy_engine/stock_selector.py`:
+
+```python
+"""Stock Selector Engine — 3-stage dynamic stock selection for MOC trading."""
+
+import json
+import logging
+import re
+from datetime import datetime, timezone
+from decimal import Decimal
+from typing import Optional
+
+import aiohttp
+
+from shared.alpaca import AlpacaClient
+from shared.broker import RedisBroker
+from shared.db import Database
+from shared.models import OrderSide
+from shared.sentiment_models import Candidate, MarketSentiment, SelectedStock, SymbolScore
+
+logger = logging.getLogger(__name__)
+
+
+class SentimentCandidateSource:
+ """Get candidate stocks from sentiment scores in DB."""
+
+ def __init__(self, db: Database, limit: int = 20) -> None:
+ self._db = db
+ self._limit = limit
+
+ async def get_candidates(self) -> list[Candidate]:
+ scores = await self._db.get_top_symbol_scores(limit=self._limit)
+ return [
+ Candidate(
+ symbol=s["symbol"],
+ source="sentiment",
+ score=s["composite"],
+ reason=f"Sentiment composite={s['composite']:.2f}, news_count={s['news_count']}",
+ )
+ for s in scores
+ if s["composite"] != 0
+ ]
+
+
+class LLMCandidateSource:
+ """Get candidate stocks by asking Claude to analyze today's top news."""
+
+ def __init__(self, db: Database, api_key: str, model: str) -> None:
+ self._db = db
+ self._api_key = api_key
+ self._model = model
+
+ async def get_candidates(self) -> list[Candidate]:
+ news = await self._db.get_recent_news(hours=24)
+ if not news:
+ return []
+
+ headlines = [f"- [{n['source']}] {n['headline']} (sentiment: {n['sentiment']:.2f})" for n in news[:50]]
+ prompt = (
+ "You are a stock market analyst. Based on today's news headlines below, "
+ "identify US stocks that are most likely to be affected (positively or negatively). "
+ "Return a JSON array of objects with: symbol, direction (BUY or SELL), score (0-1), reason.\n\n"
+ "Headlines:\n" + "\n".join(headlines) + "\n\n"
+ "Return ONLY the JSON array, no other text."
+ )
+
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.post(
+ "https://api.anthropic.com/v1/messages",
+ headers={
+ "x-api-key": self._api_key,
+ "anthropic-version": "2023-06-01",
+ "content-type": "application/json",
+ },
+ json={
+ "model": self._model,
+ "max_tokens": 1024,
+ "messages": [{"role": "user", "content": prompt}],
+ },
+ timeout=aiohttp.ClientTimeout(total=30),
+ ) as resp:
+ if resp.status != 200:
+ logger.warning("llm_candidate_failed", status=resp.status)
+ return []
+ data = await resp.json()
+ text = data["content"][0]["text"]
+ except Exception as exc:
+ logger.error("llm_candidate_error", error=str(exc))
+ return []
+
+ return self._parse_response(text)
+
+ def _parse_response(self, text: str) -> list[Candidate]:
+ try:
+ # Extract JSON from possible markdown code blocks
+ json_match = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL)
+ if json_match:
+ text = json_match.group(1)
+ items = json.loads(text)
+ except (json.JSONDecodeError, TypeError):
+ return []
+
+ candidates = []
+ for item in items:
+ try:
+ direction = OrderSide(item.get("direction", "BUY"))
+ candidates.append(
+ Candidate(
+ symbol=item["symbol"],
+ source="llm",
+ direction=direction,
+ score=float(item.get("score", 0.5)),
+ reason=item.get("reason", "LLM recommendation"),
+ )
+ )
+ except (KeyError, ValueError):
+ continue
+
+ return candidates
+
+
+class StockSelector:
+ """3-stage stock selector: candidates → technical filter → LLM final pick."""
+
+ def __init__(
+ self,
+ db: Database,
+ broker: RedisBroker,
+ alpaca: AlpacaClient,
+ anthropic_api_key: str,
+ anthropic_model: str = "claude-sonnet-4-20250514",
+ max_picks: int = 3,
+ ) -> None:
+ self._db = db
+ self._broker = broker
+ self._alpaca = alpaca
+ self._api_key = anthropic_api_key
+ self._model = anthropic_model
+ self._max_picks = max_picks
+ self._sentiment_source = SentimentCandidateSource(db)
+ self._llm_source = LLMCandidateSource(db, anthropic_api_key, anthropic_model)
+
+ async def select(self) -> list[SelectedStock]:
+ """Run full 3-stage selection. Returns list of SelectedStock."""
+ # Check market sentiment gate
+ ms = await self._db.get_latest_market_sentiment()
+ if ms and ms.get("market_regime") == "risk_off":
+ logger.info("selection_blocked_risk_off")
+ return []
+
+ # Stage 1: Candidate pool
+ sentiment_candidates = await self._sentiment_source.get_candidates()
+ llm_candidates = await self._llm_source.get_candidates()
+ candidates = self._merge_candidates(sentiment_candidates, llm_candidates)
+
+ if not candidates:
+ logger.info("no_candidates_found")
+ return []
+
+ logger.info("candidates_found", count=len(candidates))
+
+ # Stage 2: Technical filter
+ filtered = await self._technical_filter(candidates)
+ if not filtered:
+ logger.info("all_candidates_filtered_out")
+ return []
+
+ logger.info("technical_filter_passed", count=len(filtered))
+
+ # Stage 3: LLM final selection
+ selections = await self._llm_final_select(filtered, ms)
+
+ # Publish to Redis
+ for selection in selections:
+ await self._broker.publish(
+ "selected_stocks",
+ selection.model_dump(mode="json"),
+ )
+
+ # Persist audit trail
+ from datetime import date as date_type
+
+ for selection in selections:
+ score_data = await self._db.get_top_symbol_scores(limit=100)
+ snapshot = next(
+ (s for s in score_data if s["symbol"] == selection.symbol),
+ {},
+ )
+ await self._db.insert_stock_selection(
+ trade_date=date_type.today(),
+ symbol=selection.symbol,
+ side=selection.side.value,
+ conviction=selection.conviction,
+ reason=selection.reason,
+ key_news=selection.key_news,
+ sentiment_snapshot=snapshot,
+ )
+
+ return selections
+
+ def _merge_candidates(
+ self,
+ sentiment: list[Candidate],
+ llm: list[Candidate],
+ ) -> list[Candidate]:
+ """Merge and deduplicate candidates, preferring higher scores."""
+ by_symbol: dict[str, Candidate] = {}
+ for c in sentiment + llm:
+ if c.symbol not in by_symbol or c.score > by_symbol[c.symbol].score:
+ by_symbol[c.symbol] = c
+ return sorted(by_symbol.values(), key=lambda c: c.score, reverse=True)
+
+ async def _technical_filter(self, candidates: list[Candidate]) -> list[Candidate]:
+ """Apply MOC-style technical screening to candidates."""
+ import pandas as pd
+
+ passed = []
+ for candidate in candidates:
+ try:
+ bars = await self._alpaca.get_bars(
+ candidate.symbol, timeframe="1Day", limit=30
+ )
+ if not bars or len(bars) < 21:
+ continue
+
+ closes = pd.Series([float(b["c"]) for b in bars])
+ volumes = pd.Series([float(b["v"]) for b in bars])
+
+ # RSI
+ delta = closes.diff()
+ gain = delta.clip(lower=0)
+ loss = -delta.clip(upper=0)
+ avg_gain = gain.ewm(com=13, min_periods=14).mean()
+ avg_loss = loss.ewm(com=13, min_periods=14).mean()
+ rs = avg_gain / avg_loss.replace(0, float("nan"))
+ rsi = 100 - (100 / (1 + rs))
+ current_rsi = rsi.iloc[-1]
+
+ if pd.isna(current_rsi) or not (30 <= current_rsi <= 70):
+ continue
+
+ # EMA
+ ema20 = closes.ewm(span=20, adjust=False).mean().iloc[-1]
+ if closes.iloc[-1] < ema20:
+ continue
+
+ # Volume above average
+ vol_avg = volumes.iloc[-20:].mean()
+ if vol_avg > 0 and volumes.iloc[-1] < vol_avg * 0.5:
+ continue
+
+ passed.append(candidate)
+
+ except Exception as exc:
+ logger.warning("technical_filter_error", symbol=candidate.symbol, error=str(exc))
+ continue
+
+ return passed
+
+ async def _llm_final_select(
+ self,
+ candidates: list[Candidate],
+ market_sentiment: Optional[dict],
+ ) -> list[SelectedStock]:
+ """Ask Claude to make final 2-3 picks from filtered candidates."""
+ # Build context
+ candidate_info = []
+ for c in candidates[:15]:
+ candidate_info.append(f"- {c.symbol}: score={c.score:.2f}, source={c.source}, reason={c.reason}")
+
+ news = await self._db.get_recent_news(hours=12)
+ top_news = [f"- [{n['source']}] {n['headline']}" for n in news[:20]]
+
+ ms_info = "No market sentiment data available."
+ if market_sentiment:
+ ms_info = (
+ f"Fear & Greed: {market_sentiment.get('fear_greed', 'N/A')} "
+ f"({market_sentiment.get('fear_greed_label', 'N/A')}), "
+ f"VIX: {market_sentiment.get('vix', 'N/A')}, "
+ f"Fed Stance: {market_sentiment.get('fed_stance', 'N/A')}"
+ )
+
+ prompt = (
+ f"You are a professional stock trader selecting {self._max_picks} stocks for "
+ f"Market-on-Close (MOC) overnight trading. You buy at market close and sell at "
+ f"next day's open.\n\n"
+ f"## Market Conditions\n{ms_info}\n\n"
+ f"## Candidate Stocks (pre-screened technically)\n"
+ + "\n".join(candidate_info) + "\n\n"
+ f"## Today's Key News\n"
+ + "\n".join(top_news) + "\n\n"
+ f"Select the best {self._max_picks} stocks. For each, provide:\n"
+ f"- symbol: ticker\n"
+ f"- side: BUY or SELL\n"
+ f"- conviction: 0.0-1.0\n"
+ f"- reason: one sentence\n"
+ f"- key_news: list of relevant headlines\n\n"
+ f"Return ONLY a JSON array. No other text."
+ )
+
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.post(
+ "https://api.anthropic.com/v1/messages",
+ headers={
+ "x-api-key": self._api_key,
+ "anthropic-version": "2023-06-01",
+ "content-type": "application/json",
+ },
+ json={
+ "model": self._model,
+ "max_tokens": 1024,
+ "messages": [{"role": "user", "content": prompt}],
+ },
+ timeout=aiohttp.ClientTimeout(total=30),
+ ) as resp:
+ if resp.status != 200:
+ logger.error("llm_final_select_failed", status=resp.status)
+ return []
+ data = await resp.json()
+ text = data["content"][0]["text"]
+ except Exception as exc:
+ logger.error("llm_final_select_error", error=str(exc))
+ return []
+
+ return _parse_llm_selections(text)
+
+
+def _parse_llm_selections(text: str) -> list[SelectedStock]:
+ """Parse LLM response into SelectedStock list."""
+ try:
+ json_match = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL)
+ if json_match:
+ text = json_match.group(1)
+ # Also try to find a bare JSON array
+ array_match = re.search(r"\[.*\]", text, re.DOTALL)
+ if array_match:
+ text = array_match.group(0)
+ items = json.loads(text)
+ except (json.JSONDecodeError, TypeError):
+ return []
+
+ selections = []
+ for item in items:
+ try:
+ selections.append(
+ SelectedStock(
+ symbol=item["symbol"],
+ side=OrderSide(item.get("side", "BUY")),
+ conviction=float(item.get("conviction", 0.5)),
+ reason=item.get("reason", ""),
+ key_news=item.get("key_news", []),
+ )
+ )
+ except (KeyError, ValueError):
+ continue
+
+ return selections
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/strategy-engine/tests/test_stock_selector.py -v`
+Expected: All 5 tests PASS
+
+- [ ] **Step 5: Run all strategy engine tests for regressions**
+
+Run: `pytest services/strategy-engine/tests/ -v`
+Expected: All tests PASS
+
+- [ ] **Step 6: Commit**
+
+```bash
+git add services/strategy-engine/src/strategy_engine/stock_selector.py services/strategy-engine/tests/test_stock_selector.py
+git commit -m "feat: implement 3-stage stock selector (sentiment → technical → LLM)"
+```
+
+---
+
+## Phase 5: Integration (MOC + Notifications + Docker)
+
+### Task 16: Add Telegram notification for stock selections
+
+**Files:**
+- Modify: `shared/src/shared/notifier.py`
+- Modify: `shared/tests/test_notifier.py`
+
+- [ ] **Step 1: Add send_stock_selection method to notifier.py**
+
+Add this method and import to `shared/src/shared/notifier.py`:
+
+Add to imports:
+```python
+from shared.sentiment_models import SelectedStock, MarketSentiment
+```
+
+Add method to `TelegramNotifier` class:
+
+```python
+ async def send_stock_selection(
+ self,
+ selections: list[SelectedStock],
+ market: MarketSentiment | None = None,
+ ) -> None:
+ """Format and send stock selection notification."""
+ lines = [f"<b>📊 Stock Selection ({len(selections)} picks)</b>", ""]
+
+ side_emoji = {"BUY": "🟢", "SELL": "🔴"}
+
+ for i, s in enumerate(selections, 1):
+ emoji = side_emoji.get(s.side.value, "⚪")
+ lines.append(
+ f"{i}. <b>{s.symbol}</b> {emoji} {s.side.value} "
+ f"(conviction: {s.conviction:.0%})"
+ )
+ lines.append(f" {s.reason}")
+ if s.key_news:
+ lines.append(f" News: {s.key_news[0]}")
+ lines.append("")
+
+ if market:
+ lines.append(
+ f"Market: F&amp;G {market.fear_greed} ({market.fear_greed_label})"
+ + (f" | VIX {market.vix:.1f}" if market.vix else "")
+ )
+
+ await self.send("\n".join(lines))
+```
+
+- [ ] **Step 2: Add test for the new method**
+
+Add to `shared/tests/test_notifier.py`:
+
+```python
+from shared.models import OrderSide
+from shared.sentiment_models import SelectedStock, MarketSentiment
+from datetime import datetime, timezone
+
+
+async def test_send_stock_selection(notifier, mock_session):
+ """Test stock selection notification formatting."""
+ selections = [
+ SelectedStock(
+ symbol="NVDA",
+ side=OrderSide.BUY,
+ conviction=0.85,
+ reason="CHIPS Act expansion",
+ key_news=["Trump signs CHIPS Act"],
+ ),
+ ]
+ market = MarketSentiment(
+ fear_greed=55,
+ fear_greed_label="Neutral",
+ vix=18.2,
+ fed_stance="neutral",
+ market_regime="neutral",
+ updated_at=datetime.now(timezone.utc),
+ )
+ await notifier.send_stock_selection(selections, market)
+ mock_session.post.assert_called_once()
+```
+
+Note: Check `shared/tests/test_notifier.py` for existing fixture names (`notifier`, `mock_session`) and adapt accordingly.
+
+- [ ] **Step 3: Run notifier tests**
+
+Run: `pytest shared/tests/test_notifier.py -v`
+Expected: All tests PASS
+
+- [ ] **Step 4: Commit**
+
+```bash
+git add shared/src/shared/notifier.py shared/tests/test_notifier.py
+git commit -m "feat: add Telegram notification for stock selections"
+```
+
+---
+
+### Task 17: Integrate stock selector with MOC strategy
+
+**Files:**
+- Modify: `services/strategy-engine/src/strategy_engine/main.py`
+- Modify: `services/strategy-engine/src/strategy_engine/config.py`
+
+- [ ] **Step 1: Update strategy engine config**
+
+Add to `StrategyConfig` in `services/strategy-engine/src/strategy_engine/config.py`:
+
+```python
+ selector_candidates_time: str = "15:00"
+ selector_filter_time: str = "15:15"
+ selector_final_time: str = "15:30"
+ selector_max_picks: int = 3
+ anthropic_api_key: str = ""
+ anthropic_model: str = "claude-sonnet-4-20250514"
+```
+
+- [ ] **Step 2: Add stock selector scheduling to main.py**
+
+Add a new coroutine to `services/strategy-engine/src/strategy_engine/main.py` that runs the stock selector at the configured times. Add imports:
+
+```python
+from shared.alpaca import AlpacaClient
+from shared.db import Database
+from shared.notifier import TelegramNotifier
+from shared.sentiment_models import MarketSentiment
+from strategy_engine.stock_selector import StockSelector
+```
+
+Add the selector loop function:
+
+```python
+async def run_stock_selector(
+ selector: StockSelector,
+ notifier: TelegramNotifier,
+ db: Database,
+ config: StrategyConfig,
+ log,
+) -> None:
+ """Run the stock selector once per day at the configured time."""
+ import zoneinfo
+
+ et = zoneinfo.ZoneInfo("America/New_York")
+
+ while True:
+ now_et = datetime.now(et)
+ target_hour, target_min = map(int, config.selector_final_time.split(":"))
+
+ # Check if it's time to run (within 1-minute window)
+ if now_et.hour == target_hour and now_et.minute == target_min:
+ log.info("stock_selector_running")
+ try:
+ selections = await selector.select()
+ if selections:
+ ms_data = await db.get_latest_market_sentiment()
+ ms = None
+ if ms_data:
+ ms = MarketSentiment(**ms_data)
+ await notifier.send_stock_selection(selections, ms)
+ log.info(
+ "stock_selector_complete",
+ picks=[s.symbol for s in selections],
+ )
+ else:
+ log.info("stock_selector_no_picks")
+ except Exception as exc:
+ log.error("stock_selector_error", error=str(exc))
+ # Sleep past this minute to avoid re-triggering
+ await asyncio.sleep(120)
+ else:
+ await asyncio.sleep(30)
+```
+
+In the `run()` function, add after creating the broker:
+
+```python
+ db = Database(config.database_url)
+ await db.connect()
+
+ alpaca = AlpacaClient(
+ api_key=config.alpaca_api_key,
+ api_secret=config.alpaca_api_secret,
+ paper=config.alpaca_paper,
+ )
+```
+
+And add the selector if anthropic key is configured:
+
+```python
+ if config.anthropic_api_key:
+ selector = StockSelector(
+ db=db,
+ broker=broker,
+ alpaca=alpaca,
+ anthropic_api_key=config.anthropic_api_key,
+ anthropic_model=config.anthropic_model,
+ max_picks=config.selector_max_picks,
+ )
+ tasks.append(asyncio.create_task(
+ run_stock_selector(selector, notifier, db, config, log)
+ ))
+ log.info("stock_selector_enabled", time=config.selector_final_time)
+```
+
+Add to the `finally` block:
+
+```python
+ await alpaca.close()
+ await db.close()
+```
+
+- [ ] **Step 3: Run strategy engine tests for regressions**
+
+Run: `pytest services/strategy-engine/tests/ -v`
+Expected: All tests PASS
+
+- [ ] **Step 4: Commit**
+
+```bash
+git add services/strategy-engine/src/strategy_engine/main.py services/strategy-engine/src/strategy_engine/config.py
+git commit -m "feat: integrate stock selector into strategy engine scheduler"
+```
+
+---
+
+### Task 18: Update Docker Compose and .env
+
+**Files:**
+- Modify: `docker-compose.yml`
+- Modify: `.env.example` (already done in Task 5, just verify)
+
+- [ ] **Step 1: Add news-collector service to docker-compose.yml**
+
+Add before the `loki:` service block in `docker-compose.yml`:
+
+```yaml
+ news-collector:
+ build:
+ context: .
+ dockerfile: services/news-collector/Dockerfile
+ env_file: .env
+ ports:
+ - "8084:8084"
+ depends_on:
+ redis:
+ condition: service_healthy
+ postgres:
+ condition: service_healthy
+ healthcheck:
+ test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8084/health')"]
+ interval: 10s
+ timeout: 5s
+ retries: 3
+ restart: unless-stopped
+```
+
+- [ ] **Step 2: Verify compose file is valid**
+
+Run: `docker compose config --quiet 2>&1 || echo "INVALID"`
+Expected: No output (valid) or compose config displayed without errors
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add docker-compose.yml
+git commit -m "feat: add news-collector service to Docker Compose"
+```
+
+---
+
+### Task 19: Run full test suite and lint
+
+- [ ] **Step 1: Install test dependencies**
+
+Run: `pip install -e shared/ && pip install aiosqlite feedparser nltk aioresponses`
+
+- [ ] **Step 2: Download VADER lexicon**
+
+Run: `python -c "import nltk; nltk.download('vader_lexicon', quiet=True)"`
+
+- [ ] **Step 3: Run lint**
+
+Run: `make lint`
+Expected: No lint errors. If there are errors, fix them.
+
+- [ ] **Step 4: Run full test suite**
+
+Run: `make test`
+Expected: All tests PASS
+
+- [ ] **Step 5: Fix any issues found in steps 3-4**
+
+If lint or tests fail, fix the issues and re-run.
+
+- [ ] **Step 6: Final commit if any fixes were needed**
+
+```bash
+git add -A
+git commit -m "fix: resolve lint and test issues from news selector integration"
+```
diff --git a/docs/superpowers/specs/2026-04-02-news-driven-stock-selector-design.md b/docs/superpowers/specs/2026-04-02-news-driven-stock-selector-design.md
new file mode 100644
index 0000000..d439154
--- /dev/null
+++ b/docs/superpowers/specs/2026-04-02-news-driven-stock-selector-design.md
@@ -0,0 +1,418 @@
+# News-Driven Stock Selector Design
+
+**Date:** 2026-04-02
+**Goal:** Upgrade the MOC (Market on Close) strategy from fixed symbol lists to dynamic, news-driven stock selection. The system collects news/sentiment data continuously, then selects 2-3 optimal stocks daily before market close.
+
+---
+
+## Architecture Overview
+
+```
+[Continuous Collection] [Pre-Close Decision]
+Finnhub News ─┐
+RSS Feeds ─┤
+SEC EDGAR ─┤
+Truth Social ─┼→ DB (news_items) → Sentiment Aggregator → symbol_scores
+Reddit ─┤ + Redis "news" (every 15 min) market_sentiment
+Fear & Greed ─┤
+FOMC/Fed ─┘
+
+ 15:00 ET ─→ Candidate Pool (sentiment top + LLM picks)
+ 15:15 ET ─→ Technical Filter (RSI, EMA, volume)
+ 15:30 ET ─→ LLM Final Selection (2-3 stocks) → Telegram
+ 15:50 ET ─→ MOC Buy Execution
+ 09:35 ET ─→ Next-day Sell (existing MOC logic)
+```
+
+## 1. News Collector Service
+
+New service: `services/news-collector/`
+
+### Structure
+
+```
+services/news-collector/
+├── Dockerfile
+├── pyproject.toml
+├── src/news_collector/
+│ ├── __init__.py
+│ ├── main.py # Scheduler: runs each collector on its interval
+│ ├── config.py
+│ └── collectors/
+│ ├── __init__.py
+│ ├── base.py # BaseCollector ABC
+│ ├── finnhub.py # Finnhub market news (free, 60 req/min)
+│ ├── rss.py # Yahoo Finance, Google News, MarketWatch RSS
+│ ├── sec_edgar.py # SEC EDGAR 8-K/10-Q filings
+│ ├── truth_social.py # Truth Social scraping (Trump posts)
+│ ├── reddit.py # Reddit (r/wallstreetbets, r/stocks)
+│ ├── fear_greed.py # CNN Fear & Greed Index scraping
+│ └── fed.py # FOMC statements, Fed announcements
+└── tests/
+```
+
+### BaseCollector Interface
+
+```python
+class BaseCollector(ABC):
+ name: str
+ poll_interval: int # seconds
+
+ @abstractmethod
+ async def collect(self) -> list[NewsItem]:
+ """Collect and return list of NewsItem."""
+
+ @abstractmethod
+ async def is_available(self) -> bool:
+ """Check if this source is accessible (API key present, endpoint reachable)."""
+```
+
+### Poll Intervals
+
+| Collector | Interval | Notes |
+|-----------|----------|-------|
+| Finnhub | 5 min | Free tier: 60 calls/min |
+| RSS (Yahoo/Google/MarketWatch) | 10 min | Headlines only |
+| SEC EDGAR | 30 min | Focus on 8-K filings |
+| Truth Social | 15 min | Scraping |
+| Reddit | 15 min | Hot posts from relevant subs |
+| Fear & Greed | 1 hour | Updates once daily but check periodically |
+| FOMC/Fed | 1 hour | Infrequent events |
+
+### Provider Abstraction (for paid upgrade path)
+
+```python
+# config.yaml
+collectors:
+ news:
+ provider: "finnhub" # swap to "benzinga" for paid
+ api_key: ${FINNHUB_API_KEY}
+ social:
+ provider: "reddit" # swap to "stocktwits_pro" etc.
+ policy:
+ provider: "truth_social" # swap to "twitter_api" etc.
+
+# Factory
+COLLECTOR_REGISTRY = {
+ "finnhub": FinnhubCollector,
+ "rss": RSSCollector,
+ "benzinga": BenzingaCollector, # added later
+}
+```
+
+## 2. Shared Models (additions to shared/)
+
+### NewsItem (shared/models.py)
+
+```python
+class NewsCategory(str, Enum):
+ POLICY = "policy"
+ EARNINGS = "earnings"
+ MACRO = "macro"
+ SOCIAL = "social"
+ FILING = "filing"
+ FED = "fed"
+
+class NewsItem(BaseModel):
+ id: str = Field(default_factory=lambda: str(uuid.uuid4()))
+ source: str # "finnhub", "rss", "sec_edgar", etc.
+ headline: str
+ summary: str | None = None
+ url: str | None = None
+ published_at: datetime
+ symbols: list[str] = [] # Related tickers (if identifiable)
+ sentiment: float # -1.0 to 1.0 (first-pass analysis at collection)
+ category: NewsCategory
+ raw_data: dict = {}
+ created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
+```
+
+### SymbolScore (shared/sentiment_models.py — new file)
+
+```python
+class SymbolScore(BaseModel):
+ symbol: str
+ news_score: float # -1.0 to 1.0, weighted avg of news sentiment
+ news_count: int # Number of news items in last 24h
+ social_score: float # Reddit/social sentiment
+ policy_score: float # Policy-related impact
+ filing_score: float # SEC filing impact
+ composite: float # Weighted final score
+ updated_at: datetime
+
+class MarketSentiment(BaseModel):
+ fear_greed: int # 0-100
+ fear_greed_label: str # "Extreme Fear", "Fear", "Neutral", "Greed", "Extreme Greed"
+ vix: float | None = None
+ fed_stance: str # "hawkish", "neutral", "dovish"
+ market_regime: str # "risk_on", "neutral", "risk_off"
+ updated_at: datetime
+
+class SelectedStock(BaseModel):
+ symbol: str
+ side: OrderSide # BUY or SELL
+ conviction: float # 0.0 to 1.0
+ reason: str # Selection rationale
+ key_news: list[str] # Key news headlines
+
+class Candidate(BaseModel):
+ symbol: str
+ source: str # "sentiment" or "llm"
+ direction: OrderSide | None = None # Suggested direction (if known)
+ score: float # Relevance/priority score
+ reason: str # Why this candidate was selected
+```
+
+## 3. Sentiment Analysis Pipeline
+
+### Location
+
+Refactor existing `shared/src/shared/sentiment.py`.
+
+### Two-Stage Analysis
+
+**Stage 1: Per-news sentiment (at collection time)**
+- VADER (nltk.sentiment, free) for English headlines
+- Keyword rule engine for domain-specific terms (e.g., "tariff" → negative for importers, positive for domestic producers)
+- Score stored in `NewsItem.sentiment`
+
+**Stage 2: Per-symbol aggregation (every 15 minutes)**
+
+```
+composite = (
+ news_score * 0.3 +
+ social_score * 0.2 +
+ policy_score * 0.3 +
+ filing_score * 0.2
+) * freshness_decay
+```
+
+Freshness decay:
+- < 1 hour: 1.0
+- 1-6 hours: 0.7
+- 6-24 hours: 0.3
+- > 24 hours: excluded
+
+Policy score weighted high because US stock market is heavily influenced by policy events (tariffs, regulation, subsidies).
+
+### Market-Level Gating
+
+`MarketSentiment.market_regime` determination:
+- `risk_off`: Fear & Greed < 20 OR VIX > 30 → **block all trades**
+- `risk_on`: Fear & Greed > 60 AND VIX < 20
+- `neutral`: everything else
+
+This extends the existing `sentiment.py` `should_block()` logic.
+
+## 4. Stock Selector Engine
+
+### Location
+
+`services/strategy-engine/src/strategy_engine/stock_selector.py`
+
+### Three-Stage Selection Process
+
+**Stage 1: Candidate Pool (15:00 ET)**
+
+Two candidate sources, results merged (deduplicated):
+
+```python
+class CandidateSource(ABC):
+ @abstractmethod
+ async def get_candidates(self) -> list[Candidate]
+
+class SentimentCandidateSource(CandidateSource):
+ """Top N symbols by composite SymbolScore from DB."""
+
+class LLMCandidateSource(CandidateSource):
+ """Send today's top news summary to Claude, get related symbols + direction."""
+```
+
+- SentimentCandidateSource: top 20 by composite score
+- LLMCandidateSource: Claude analyzes today's major news and recommends affected symbols
+- Merged pool: typically 20-30 candidates
+
+**Stage 2: Technical Filter (15:15 ET)**
+
+Apply existing MOC screening criteria to candidates:
+- Fetch recent price data from Alpaca for all candidates
+- RSI 30-60
+- Price > 20-period EMA
+- Volume > average
+- Bullish candle pattern
+- Result: typically 5-10 survivors
+
+**Stage 3: LLM Final Selection (15:30 ET)**
+
+Send to Claude:
+- Filtered candidate list with technical indicators
+- Per-symbol sentiment scores and top news headlines
+- Market sentiment (Fear & Greed, VIX, Fed stance)
+- Prompt: "Select 2-3 stocks for MOC trading with rationale"
+
+Response parsed into `list[SelectedStock]`.
+
+### Integration with MOC Strategy
+
+Current: MOC strategy receives candles for fixed symbols and decides internally.
+
+New flow:
+1. `StockSelector` publishes `SelectedStock` list to Redis stream `selected_stocks` at 15:30 ET
+2. MOC strategy reads `selected_stocks` to get today's targets
+3. MOC still applies its own technical checks at 15:50-16:00 as a safety net
+4. If a selected stock fails the final technical check, it's skipped (no forced trades)
+
+## 5. Database Schema
+
+Four new tables via Alembic migration:
+
+```sql
+CREATE TABLE news_items (
+ id UUID PRIMARY KEY,
+ source VARCHAR(50) NOT NULL,
+ headline TEXT NOT NULL,
+ summary TEXT,
+ url TEXT,
+ published_at TIMESTAMPTZ NOT NULL,
+ symbols TEXT[],
+ sentiment FLOAT NOT NULL,
+ category VARCHAR(50) NOT NULL,
+ raw_data JSONB DEFAULT '{}',
+ created_at TIMESTAMPTZ DEFAULT NOW()
+);
+CREATE INDEX idx_news_items_published ON news_items(published_at);
+CREATE INDEX idx_news_items_symbols ON news_items USING GIN(symbols);
+
+CREATE TABLE symbol_scores (
+ id UUID PRIMARY KEY,
+ symbol VARCHAR(10) NOT NULL,
+ news_score FLOAT NOT NULL DEFAULT 0,
+ news_count INT NOT NULL DEFAULT 0,
+ social_score FLOAT NOT NULL DEFAULT 0,
+ policy_score FLOAT NOT NULL DEFAULT 0,
+ filing_score FLOAT NOT NULL DEFAULT 0,
+ composite FLOAT NOT NULL DEFAULT 0,
+ updated_at TIMESTAMPTZ NOT NULL
+);
+CREATE UNIQUE INDEX idx_symbol_scores_symbol ON symbol_scores(symbol);
+
+CREATE TABLE market_sentiment (
+ id UUID PRIMARY KEY,
+ fear_greed INT NOT NULL,
+ fear_greed_label VARCHAR(30) NOT NULL,
+ vix FLOAT,
+ fed_stance VARCHAR(20) NOT NULL DEFAULT 'neutral',
+ market_regime VARCHAR(20) NOT NULL DEFAULT 'neutral',
+ updated_at TIMESTAMPTZ NOT NULL
+);
+
+CREATE TABLE stock_selections (
+ id UUID PRIMARY KEY,
+ trade_date DATE NOT NULL,
+ symbol VARCHAR(10) NOT NULL,
+ side VARCHAR(4) NOT NULL,
+ conviction FLOAT NOT NULL,
+ reason TEXT NOT NULL,
+ key_news JSONB DEFAULT '[]',
+ sentiment_snapshot JSONB DEFAULT '{}',
+ created_at TIMESTAMPTZ DEFAULT NOW()
+);
+CREATE INDEX idx_stock_selections_date ON stock_selections(trade_date);
+```
+
+`stock_selections` stores an audit trail: why each stock was selected, enabling post-hoc analysis of selection quality.
+
+## 6. Redis Streams
+
+| Stream | Producer | Consumer | Payload |
+|--------|----------|----------|---------|
+| `news` | news-collector | strategy-engine (sentiment aggregator) | NewsItem |
+| `selected_stocks` | stock-selector | MOC strategy | SelectedStock |
+
+Existing streams (`candles`, `signals`, `orders`) unchanged.
+
+## 7. Docker Compose Addition
+
+```yaml
+news-collector:
+ build:
+ context: .
+ dockerfile: services/news-collector/Dockerfile
+ env_file: .env
+ ports:
+ - "8084:8084"
+ depends_on:
+ redis: { condition: service_healthy }
+ postgres: { condition: service_healthy }
+ healthcheck:
+ test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8084/health')"]
+ interval: 10s
+ timeout: 5s
+ retries: 3
+ restart: unless-stopped
+```
+
+## 8. Environment Variables
+
+```bash
+# News Collector
+FINNHUB_API_KEY= # Free key from finnhub.io
+NEWS_POLL_INTERVAL=300 # Default 5 min (overrides per-collector defaults)
+SENTIMENT_AGGREGATE_INTERVAL=900 # 15 min
+
+# Stock Selector
+SELECTOR_CANDIDATES_TIME=15:00 # ET, candidate pool generation
+SELECTOR_FILTER_TIME=15:15 # ET, technical filter
+SELECTOR_FINAL_TIME=15:30 # ET, LLM final pick
+SELECTOR_MAX_PICKS=3
+
+# LLM (for stock selector + screener)
+ANTHROPIC_API_KEY=
+ANTHROPIC_MODEL=claude-sonnet-4-20250514
+```
+
+## 9. Telegram Notifications
+
+Extend existing `shared/notifier.py` with:
+
+```python
+async def send_stock_selection(self, selections: list[SelectedStock], market: MarketSentiment):
+ """
+ 📊 오늘의 종목 선정 (2/3)
+
+ 1. NVDA 🟢 BUY (확신도: 0.85)
+ 근거: 트럼프 반도체 보조금 확대 발표, RSI 42
+ 핵심뉴스: "Trump signs CHIPS Act expansion..."
+
+ 2. XOM 🟢 BUY (확신도: 0.72)
+ 근거: 유가 상승 + 실적 서프라이즈, 볼륨 급증
+
+ 시장심리: Fear & Greed 55 (Neutral) | VIX 18.2
+ """
+```
+
+## 10. Testing Strategy
+
+**Unit tests:**
+- Each collector: mock HTTP responses → verify NewsItem parsing
+- Sentiment analysis: verify VADER + keyword scoring
+- Aggregator: mock news data → verify SymbolScore calculation and freshness decay
+- Stock selector: mock scores → verify candidate/filter/selection pipeline
+- LLM calls: mock Claude response → verify SelectedStock parsing
+
+**Integration tests:**
+- Full pipeline: news collection → DB → aggregation → selection
+- Market gating: verify `risk_off` blocks all trades
+- MOC integration: verify selected stocks flow to MOC strategy
+
+**Post-hoc analysis (future):**
+- Use `stock_selections` audit trail to measure selection accuracy
+- Historical news data replay for backtesting requires paid data (deferred)
+
+## 11. Out of Scope (Future)
+
+- Paid API integration (designed for, not implemented)
+- Historical news backtesting
+- WebSocket real-time news streaming
+- Multi-language sentiment analysis
+- Options/derivatives signals