# News-Driven Stock Selector Design **Date:** 2026-04-02 **Goal:** Upgrade the MOC (Market on Close) strategy from fixed symbol lists to dynamic, news-driven stock selection. The system collects news/sentiment data continuously, then selects 2-3 optimal stocks daily before market close. --- ## Architecture Overview ``` [Continuous Collection] [Pre-Close Decision] Finnhub News ─┐ RSS Feeds ─┤ SEC EDGAR ─┤ Truth Social ─┼→ DB (news_items) → Sentiment Aggregator → symbol_scores Reddit ─┤ + Redis "news" (every 15 min) market_sentiment Fear & Greed ─┤ FOMC/Fed ─┘ 15:00 ET ─→ Candidate Pool (sentiment top + LLM picks) 15:15 ET ─→ Technical Filter (RSI, EMA, volume) 15:30 ET ─→ LLM Final Selection (2-3 stocks) → Telegram 15:50 ET ─→ MOC Buy Execution 09:35 ET ─→ Next-day Sell (existing MOC logic) ``` ## 1. News Collector Service New service: `services/news-collector/` ### Structure ``` services/news-collector/ ├── Dockerfile ├── pyproject.toml ├── src/news_collector/ │ ├── __init__.py │ ├── main.py # Scheduler: runs each collector on its interval │ ├── config.py │ └── collectors/ │ ├── __init__.py │ ├── base.py # BaseCollector ABC │ ├── finnhub.py # Finnhub market news (free, 60 req/min) │ ├── rss.py # Yahoo Finance, Google News, MarketWatch RSS │ ├── sec_edgar.py # SEC EDGAR 8-K/10-Q filings │ ├── truth_social.py # Truth Social scraping (Trump posts) │ ├── reddit.py # Reddit (r/wallstreetbets, r/stocks) │ ├── fear_greed.py # CNN Fear & Greed Index scraping │ └── fed.py # FOMC statements, Fed announcements └── tests/ ``` ### BaseCollector Interface ```python class BaseCollector(ABC): name: str poll_interval: int # seconds @abstractmethod async def collect(self) -> list[NewsItem]: """Collect and return list of NewsItem.""" @abstractmethod async def is_available(self) -> bool: """Check if this source is accessible (API key present, endpoint reachable).""" ``` ### Poll Intervals | Collector | Interval | Notes | |-----------|----------|-------| | Finnhub | 5 min | Free tier: 60 calls/min | | RSS (Yahoo/Google/MarketWatch) | 10 min | Headlines only | | SEC EDGAR | 30 min | Focus on 8-K filings | | Truth Social | 15 min | Scraping | | Reddit | 15 min | Hot posts from relevant subs | | Fear & Greed | 1 hour | Updates once daily but check periodically | | FOMC/Fed | 1 hour | Infrequent events | ### Provider Abstraction (for paid upgrade path) ```python # config.yaml collectors: news: provider: "finnhub" # swap to "benzinga" for paid api_key: ${FINNHUB_API_KEY} social: provider: "reddit" # swap to "stocktwits_pro" etc. policy: provider: "truth_social" # swap to "twitter_api" etc. # Factory COLLECTOR_REGISTRY = { "finnhub": FinnhubCollector, "rss": RSSCollector, "benzinga": BenzingaCollector, # added later } ``` ## 2. Shared Models (additions to shared/) ### NewsItem (shared/models.py) ```python class NewsCategory(str, Enum): POLICY = "policy" EARNINGS = "earnings" MACRO = "macro" SOCIAL = "social" FILING = "filing" FED = "fed" class NewsItem(BaseModel): id: str = Field(default_factory=lambda: str(uuid.uuid4())) source: str # "finnhub", "rss", "sec_edgar", etc. headline: str summary: str | None = None url: str | None = None published_at: datetime symbols: list[str] = [] # Related tickers (if identifiable) sentiment: float # -1.0 to 1.0 (first-pass analysis at collection) category: NewsCategory raw_data: dict = {} created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) ``` ### SymbolScore (shared/sentiment_models.py — new file) ```python class SymbolScore(BaseModel): symbol: str news_score: float # -1.0 to 1.0, weighted avg of news sentiment news_count: int # Number of news items in last 24h social_score: float # Reddit/social sentiment policy_score: float # Policy-related impact filing_score: float # SEC filing impact composite: float # Weighted final score updated_at: datetime class MarketSentiment(BaseModel): fear_greed: int # 0-100 fear_greed_label: str # "Extreme Fear", "Fear", "Neutral", "Greed", "Extreme Greed" vix: float | None = None fed_stance: str # "hawkish", "neutral", "dovish" market_regime: str # "risk_on", "neutral", "risk_off" updated_at: datetime class SelectedStock(BaseModel): symbol: str side: OrderSide # BUY or SELL conviction: float # 0.0 to 1.0 reason: str # Selection rationale key_news: list[str] # Key news headlines class Candidate(BaseModel): symbol: str source: str # "sentiment" or "llm" direction: OrderSide | None = None # Suggested direction (if known) score: float # Relevance/priority score reason: str # Why this candidate was selected ``` ## 3. Sentiment Analysis Pipeline ### Location Refactor existing `shared/src/shared/sentiment.py`. ### Two-Stage Analysis **Stage 1: Per-news sentiment (at collection time)** - VADER (nltk.sentiment, free) for English headlines - Keyword rule engine for domain-specific terms (e.g., "tariff" → negative for importers, positive for domestic producers) - Score stored in `NewsItem.sentiment` **Stage 2: Per-symbol aggregation (every 15 minutes)** ``` composite = ( news_score * 0.3 + social_score * 0.2 + policy_score * 0.3 + filing_score * 0.2 ) * freshness_decay ``` Freshness decay: - < 1 hour: 1.0 - 1-6 hours: 0.7 - 6-24 hours: 0.3 - > 24 hours: excluded Policy score weighted high because US stock market is heavily influenced by policy events (tariffs, regulation, subsidies). ### Market-Level Gating `MarketSentiment.market_regime` determination: - `risk_off`: Fear & Greed < 20 OR VIX > 30 → **block all trades** - `risk_on`: Fear & Greed > 60 AND VIX < 20 - `neutral`: everything else This extends the existing `sentiment.py` `should_block()` logic. ## 4. Stock Selector Engine ### Location `services/strategy-engine/src/strategy_engine/stock_selector.py` ### Three-Stage Selection Process **Stage 1: Candidate Pool (15:00 ET)** Two candidate sources, results merged (deduplicated): ```python class CandidateSource(ABC): @abstractmethod async def get_candidates(self) -> list[Candidate] class SentimentCandidateSource(CandidateSource): """Top N symbols by composite SymbolScore from DB.""" class LLMCandidateSource(CandidateSource): """Send today's top news summary to Claude, get related symbols + direction.""" ``` - SentimentCandidateSource: top 20 by composite score - LLMCandidateSource: Claude analyzes today's major news and recommends affected symbols - Merged pool: typically 20-30 candidates **Stage 2: Technical Filter (15:15 ET)** Apply existing MOC screening criteria to candidates: - Fetch recent price data from Alpaca for all candidates - RSI 30-60 - Price > 20-period EMA - Volume > average - Bullish candle pattern - Result: typically 5-10 survivors **Stage 3: LLM Final Selection (15:30 ET)** Send to Claude: - Filtered candidate list with technical indicators - Per-symbol sentiment scores and top news headlines - Market sentiment (Fear & Greed, VIX, Fed stance) - Prompt: "Select 2-3 stocks for MOC trading with rationale" Response parsed into `list[SelectedStock]`. ### Integration with MOC Strategy Current: MOC strategy receives candles for fixed symbols and decides internally. New flow: 1. `StockSelector` publishes `SelectedStock` list to Redis stream `selected_stocks` at 15:30 ET 2. MOC strategy reads `selected_stocks` to get today's targets 3. MOC still applies its own technical checks at 15:50-16:00 as a safety net 4. If a selected stock fails the final technical check, it's skipped (no forced trades) ## 5. Database Schema Four new tables via Alembic migration: ```sql CREATE TABLE news_items ( id UUID PRIMARY KEY, source VARCHAR(50) NOT NULL, headline TEXT NOT NULL, summary TEXT, url TEXT, published_at TIMESTAMPTZ NOT NULL, symbols TEXT[], sentiment FLOAT NOT NULL, category VARCHAR(50) NOT NULL, raw_data JSONB DEFAULT '{}', created_at TIMESTAMPTZ DEFAULT NOW() ); CREATE INDEX idx_news_items_published ON news_items(published_at); CREATE INDEX idx_news_items_symbols ON news_items USING GIN(symbols); CREATE TABLE symbol_scores ( id UUID PRIMARY KEY, symbol VARCHAR(10) NOT NULL, news_score FLOAT NOT NULL DEFAULT 0, news_count INT NOT NULL DEFAULT 0, social_score FLOAT NOT NULL DEFAULT 0, policy_score FLOAT NOT NULL DEFAULT 0, filing_score FLOAT NOT NULL DEFAULT 0, composite FLOAT NOT NULL DEFAULT 0, updated_at TIMESTAMPTZ NOT NULL ); CREATE UNIQUE INDEX idx_symbol_scores_symbol ON symbol_scores(symbol); CREATE TABLE market_sentiment ( id UUID PRIMARY KEY, fear_greed INT NOT NULL, fear_greed_label VARCHAR(30) NOT NULL, vix FLOAT, fed_stance VARCHAR(20) NOT NULL DEFAULT 'neutral', market_regime VARCHAR(20) NOT NULL DEFAULT 'neutral', updated_at TIMESTAMPTZ NOT NULL ); CREATE TABLE stock_selections ( id UUID PRIMARY KEY, trade_date DATE NOT NULL, symbol VARCHAR(10) NOT NULL, side VARCHAR(4) NOT NULL, conviction FLOAT NOT NULL, reason TEXT NOT NULL, key_news JSONB DEFAULT '[]', sentiment_snapshot JSONB DEFAULT '{}', created_at TIMESTAMPTZ DEFAULT NOW() ); CREATE INDEX idx_stock_selections_date ON stock_selections(trade_date); ``` `stock_selections` stores an audit trail: why each stock was selected, enabling post-hoc analysis of selection quality. ## 6. Redis Streams | Stream | Producer | Consumer | Payload | |--------|----------|----------|---------| | `news` | news-collector | strategy-engine (sentiment aggregator) | NewsItem | | `selected_stocks` | stock-selector | MOC strategy | SelectedStock | Existing streams (`candles`, `signals`, `orders`) unchanged. ## 7. Docker Compose Addition ```yaml news-collector: build: context: . dockerfile: services/news-collector/Dockerfile env_file: .env ports: - "8084:8084" depends_on: redis: { condition: service_healthy } postgres: { condition: service_healthy } healthcheck: test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8084/health')"] interval: 10s timeout: 5s retries: 3 restart: unless-stopped ``` ## 8. Environment Variables ```bash # News Collector FINNHUB_API_KEY= # Free key from finnhub.io NEWS_POLL_INTERVAL=300 # Default 5 min (overrides per-collector defaults) SENTIMENT_AGGREGATE_INTERVAL=900 # 15 min # Stock Selector SELECTOR_CANDIDATES_TIME=15:00 # ET, candidate pool generation SELECTOR_FILTER_TIME=15:15 # ET, technical filter SELECTOR_FINAL_TIME=15:30 # ET, LLM final pick SELECTOR_MAX_PICKS=3 # LLM (for stock selector + screener) ANTHROPIC_API_KEY= ANTHROPIC_MODEL=claude-sonnet-4-20250514 ``` ## 9. Telegram Notifications Extend existing `shared/notifier.py` with: ```python async def send_stock_selection(self, selections: list[SelectedStock], market: MarketSentiment): """ 📊 오늘의 종목 선정 (2/3) 1. NVDA 🟢 BUY (확신도: 0.85) 근거: 트럼프 반도체 보조금 확대 발표, RSI 42 핵심뉴스: "Trump signs CHIPS Act expansion..." 2. XOM 🟢 BUY (확신도: 0.72) 근거: 유가 상승 + 실적 서프라이즈, 볼륨 급증 시장심리: Fear & Greed 55 (Neutral) | VIX 18.2 """ ``` ## 10. Testing Strategy **Unit tests:** - Each collector: mock HTTP responses → verify NewsItem parsing - Sentiment analysis: verify VADER + keyword scoring - Aggregator: mock news data → verify SymbolScore calculation and freshness decay - Stock selector: mock scores → verify candidate/filter/selection pipeline - LLM calls: mock Claude response → verify SelectedStock parsing **Integration tests:** - Full pipeline: news collection → DB → aggregation → selection - Market gating: verify `risk_off` blocks all trades - MOC integration: verify selected stocks flow to MOC strategy **Post-hoc analysis (future):** - Use `stock_selections` audit trail to measure selection accuracy - Historical news data replay for backtesting requires paid data (deferred) ## 11. Out of Scope (Future) - Paid API integration (designed for, not implemented) - Historical news backtesting - WebSocket real-time news streaming - Multi-language sentiment analysis - Options/derivatives signals