From b53867cb98691ef68d8e2c702e5bcd6c5f737744 Mon Sep 17 00:00:00 2001
From: TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>
Date: Thu, 2 Apr 2026 13:44:48 +0900
Subject: docs: add news-driven stock selector implementation plan
19 tasks across 5 phases: shared models, news collector service (7 collectors),
sentiment aggregation pipeline, stock selector engine, and integration.
---
.../plans/2026-04-02-news-driven-stock-selector.md | 3689 ++++++++++++++++++++
1 file changed, 3689 insertions(+)
create mode 100644 docs/superpowers/plans/2026-04-02-news-driven-stock-selector.md
(limited to 'docs/superpowers')
diff --git a/docs/superpowers/plans/2026-04-02-news-driven-stock-selector.md b/docs/superpowers/plans/2026-04-02-news-driven-stock-selector.md
new file mode 100644
index 0000000..0964f21
--- /dev/null
+++ b/docs/superpowers/plans/2026-04-02-news-driven-stock-selector.md
@@ -0,0 +1,3689 @@
+# News-Driven Stock Selector Implementation Plan
+
+> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
+
+**Goal:** Replace the MOC strategy's fixed symbol list with a dynamic, news-driven stock selection system that continuously collects news/sentiment data and selects 2-3 optimal stocks daily before market close.
+
+**Architecture:** A new `news-collector` service runs 7 data source collectors on individual poll intervals, storing `NewsItem` records in PostgreSQL and publishing to Redis. A sentiment aggregator computes per-symbol composite scores every 15 minutes. Before market close, a 3-stage stock selector (sentiment candidates → technical filter → LLM final pick) chooses 2-3 stocks and feeds them to the existing MOC strategy.
+
+**Tech Stack:** Python 3.12+, asyncio, aiohttp, Pydantic, SQLAlchemy 2.0 async, Redis Streams, VADER (nltk), feedparser (RSS), Anthropic SDK (Claude API), Alembic
+
+---
+
+## Phase 1: Shared Foundation (Models, DB, Events)
+
+### Task 1: Add NewsItem and sentiment models to shared
+
+**Files:**
+- Modify: `shared/src/shared/models.py`
+- Create: `shared/src/shared/sentiment_models.py`
+- Create: `shared/tests/test_sentiment_models.py`
+
+- [ ] **Step 1: Write tests for new models**
+
+Create `shared/tests/test_sentiment_models.py`:
+
+```python
+"""Tests for news and sentiment models."""
+
+import pytest
+from datetime import datetime, timezone
+
+from shared.models import NewsCategory, NewsItem, OrderSide
+from shared.sentiment_models import SymbolScore, MarketSentiment, SelectedStock, Candidate
+
+
+def test_news_item_defaults():
+ item = NewsItem(
+ source="finnhub",
+ headline="Test headline",
+ published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ sentiment=0.5,
+ category=NewsCategory.MACRO,
+ )
+ assert item.id # UUID generated
+ assert item.symbols == []
+ assert item.summary is None
+ assert item.raw_data == {}
+ assert item.created_at is not None
+
+
+def test_news_item_with_symbols():
+ item = NewsItem(
+ source="rss",
+ headline="AAPL earnings beat",
+ published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ sentiment=0.8,
+ category=NewsCategory.EARNINGS,
+ symbols=["AAPL"],
+ )
+ assert item.symbols == ["AAPL"]
+ assert item.category == NewsCategory.EARNINGS
+
+
+def test_news_category_values():
+ assert NewsCategory.POLICY == "policy"
+ assert NewsCategory.EARNINGS == "earnings"
+ assert NewsCategory.MACRO == "macro"
+ assert NewsCategory.SOCIAL == "social"
+ assert NewsCategory.FILING == "filing"
+ assert NewsCategory.FED == "fed"
+
+
+def test_symbol_score():
+ score = SymbolScore(
+ symbol="AAPL",
+ news_score=0.5,
+ news_count=10,
+ social_score=0.3,
+ policy_score=0.0,
+ filing_score=0.2,
+ composite=0.3,
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ assert score.symbol == "AAPL"
+ assert score.composite == 0.3
+
+
+def test_market_sentiment():
+ ms = MarketSentiment(
+ fear_greed=25,
+ fear_greed_label="Extreme Fear",
+ vix=32.5,
+ fed_stance="hawkish",
+ market_regime="risk_off",
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ assert ms.market_regime == "risk_off"
+ assert ms.vix == 32.5
+
+
+def test_market_sentiment_no_vix():
+ ms = MarketSentiment(
+ fear_greed=50,
+ fear_greed_label="Neutral",
+ fed_stance="neutral",
+ market_regime="neutral",
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ assert ms.vix is None
+
+
+def test_selected_stock():
+ ss = SelectedStock(
+ symbol="NVDA",
+ side=OrderSide.BUY,
+ conviction=0.85,
+ reason="CHIPS Act expansion",
+ key_news=["Trump signs CHIPS Act expansion"],
+ )
+ assert ss.conviction == 0.85
+ assert len(ss.key_news) == 1
+
+
+def test_candidate():
+ c = Candidate(
+ symbol="TSLA",
+ source="sentiment",
+ direction=OrderSide.BUY,
+ score=0.75,
+ reason="High social buzz",
+ )
+ assert c.direction == OrderSide.BUY
+
+ c2 = Candidate(
+ symbol="XOM",
+ source="llm",
+ score=0.6,
+ reason="Oil price surge",
+ )
+ assert c2.direction is None
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest shared/tests/test_sentiment_models.py -v`
+Expected: FAIL — `NewsCategory`, `NewsItem` not found in `shared.models`, `shared.sentiment_models` does not exist
+
+- [ ] **Step 3: Add NewsCategory and NewsItem to shared/models.py**
+
+Add to the end of `shared/src/shared/models.py`:
+
+```python
+class NewsCategory(str, Enum):
+ POLICY = "policy"
+ EARNINGS = "earnings"
+ MACRO = "macro"
+ SOCIAL = "social"
+ FILING = "filing"
+ FED = "fed"
+
+
+class NewsItem(BaseModel):
+ id: str = Field(default_factory=lambda: str(uuid.uuid4()))
+ source: str
+ headline: str
+ summary: Optional[str] = None
+ url: Optional[str] = None
+ published_at: datetime
+ symbols: list[str] = []
+ sentiment: float
+ category: NewsCategory
+ raw_data: dict = {}
+ created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
+```
+
+- [ ] **Step 4: Create shared/src/shared/sentiment_models.py**
+
+```python
+"""Sentiment scoring and stock selection models."""
+
+from datetime import datetime
+from typing import Optional
+
+from pydantic import BaseModel
+
+from shared.models import OrderSide
+
+
+class SymbolScore(BaseModel):
+ symbol: str
+ news_score: float
+ news_count: int
+ social_score: float
+ policy_score: float
+ filing_score: float
+ composite: float
+ updated_at: datetime
+
+
+class MarketSentiment(BaseModel):
+ fear_greed: int
+ fear_greed_label: str
+ vix: Optional[float] = None
+ fed_stance: str
+ market_regime: str
+ updated_at: datetime
+
+
+class SelectedStock(BaseModel):
+ symbol: str
+ side: OrderSide
+ conviction: float
+ reason: str
+ key_news: list[str]
+
+
+class Candidate(BaseModel):
+ symbol: str
+ source: str
+ direction: Optional[OrderSide] = None
+ score: float
+ reason: str
+```
+
+- [ ] **Step 5: Run tests to verify they pass**
+
+Run: `pytest shared/tests/test_sentiment_models.py -v`
+Expected: All 9 tests PASS
+
+- [ ] **Step 6: Commit**
+
+```bash
+git add shared/src/shared/models.py shared/src/shared/sentiment_models.py shared/tests/test_sentiment_models.py
+git commit -m "feat: add NewsItem, sentiment scoring, and stock selection models"
+```
+
+---
+
+### Task 2: Add SQLAlchemy ORM models for news tables
+
+**Files:**
+- Modify: `shared/src/shared/sa_models.py`
+- Create: `shared/tests/test_sa_news_models.py`
+
+- [ ] **Step 1: Write tests for new SA models**
+
+Create `shared/tests/test_sa_news_models.py`:
+
+```python
+"""Tests for news-related SQLAlchemy models."""
+
+from shared.sa_models import NewsItemRow, SymbolScoreRow, MarketSentimentRow, StockSelectionRow
+
+
+def test_news_item_row_tablename():
+ assert NewsItemRow.__tablename__ == "news_items"
+
+
+def test_symbol_score_row_tablename():
+ assert SymbolScoreRow.__tablename__ == "symbol_scores"
+
+
+def test_market_sentiment_row_tablename():
+ assert MarketSentimentRow.__tablename__ == "market_sentiment"
+
+
+def test_stock_selection_row_tablename():
+ assert StockSelectionRow.__tablename__ == "stock_selections"
+
+
+def test_news_item_row_columns():
+ cols = {c.name for c in NewsItemRow.__table__.columns}
+ assert cols >= {"id", "source", "headline", "published_at", "sentiment", "category"}
+
+
+def test_symbol_score_row_columns():
+ cols = {c.name for c in SymbolScoreRow.__table__.columns}
+ assert cols >= {"id", "symbol", "news_score", "composite", "updated_at"}
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest shared/tests/test_sa_news_models.py -v`
+Expected: FAIL — import errors
+
+- [ ] **Step 3: Add ORM models to sa_models.py**
+
+Add to the end of `shared/src/shared/sa_models.py`:
+
+```python
+class NewsItemRow(Base):
+ __tablename__ = "news_items"
+
+ id: Mapped[str] = mapped_column(Text, primary_key=True)
+ source: Mapped[str] = mapped_column(Text, nullable=False)
+ headline: Mapped[str] = mapped_column(Text, nullable=False)
+ summary: Mapped[str | None] = mapped_column(Text)
+ url: Mapped[str | None] = mapped_column(Text)
+ published_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
+ symbols: Mapped[str | None] = mapped_column(Text) # JSON-encoded list
+ sentiment: Mapped[float] = mapped_column(sa.Float, nullable=False)
+ category: Mapped[str] = mapped_column(Text, nullable=False)
+ raw_data: Mapped[str | None] = mapped_column(Text) # JSON string
+ created_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True), nullable=False, server_default=sa.func.now()
+ )
+
+
+class SymbolScoreRow(Base):
+ __tablename__ = "symbol_scores"
+
+ id: Mapped[str] = mapped_column(Text, primary_key=True)
+ symbol: Mapped[str] = mapped_column(Text, nullable=False, unique=True)
+ news_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ news_count: Mapped[int] = mapped_column(sa.Integer, nullable=False, server_default="0")
+ social_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ policy_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ filing_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ composite: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
+
+
+class MarketSentimentRow(Base):
+ __tablename__ = "market_sentiment"
+
+ id: Mapped[str] = mapped_column(Text, primary_key=True)
+ fear_greed: Mapped[int] = mapped_column(sa.Integer, nullable=False)
+ fear_greed_label: Mapped[str] = mapped_column(Text, nullable=False)
+ vix: Mapped[float | None] = mapped_column(sa.Float)
+ fed_stance: Mapped[str] = mapped_column(Text, nullable=False, server_default="neutral")
+ market_regime: Mapped[str] = mapped_column(Text, nullable=False, server_default="neutral")
+ updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
+
+
+class StockSelectionRow(Base):
+ __tablename__ = "stock_selections"
+
+ id: Mapped[str] = mapped_column(Text, primary_key=True)
+ trade_date: Mapped[datetime] = mapped_column(sa.Date, nullable=False)
+ symbol: Mapped[str] = mapped_column(Text, nullable=False)
+ side: Mapped[str] = mapped_column(Text, nullable=False)
+ conviction: Mapped[float] = mapped_column(sa.Float, nullable=False)
+ reason: Mapped[str] = mapped_column(Text, nullable=False)
+ key_news: Mapped[str | None] = mapped_column(Text) # JSON string
+ sentiment_snapshot: Mapped[str | None] = mapped_column(Text) # JSON string
+ created_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True), nullable=False, server_default=sa.func.now()
+ )
+```
+
+Also add `import sqlalchemy as sa` to the imports at the top of `sa_models.py`.
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest shared/tests/test_sa_news_models.py -v`
+Expected: All 6 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add shared/src/shared/sa_models.py shared/tests/test_sa_news_models.py
+git commit -m "feat: add SQLAlchemy ORM models for news, scores, selections"
+```
+
+---
+
+### Task 3: Create Alembic migration for news tables
+
+**Files:**
+- Create: `shared/alembic/versions/002_news_sentiment_tables.py`
+
+- [ ] **Step 1: Create migration file**
+
+Create `shared/alembic/versions/002_news_sentiment_tables.py`:
+
+```python
+"""Add news, sentiment, and stock selection tables
+
+Revision ID: 002
+Revises: 001
+Create Date: 2026-04-02
+"""
+
+from typing import Sequence, Union
+
+from alembic import op
+import sqlalchemy as sa
+
+revision: str = "002"
+down_revision: Union[str, None] = "001"
+branch_labels: Union[str, Sequence[str], None] = None
+depends_on: Union[str, Sequence[str], None] = None
+
+
+def upgrade() -> None:
+ op.create_table(
+ "news_items",
+ sa.Column("id", sa.Text, primary_key=True),
+ sa.Column("source", sa.Text, nullable=False),
+ sa.Column("headline", sa.Text, nullable=False),
+ sa.Column("summary", sa.Text),
+ sa.Column("url", sa.Text),
+ sa.Column("published_at", sa.DateTime(timezone=True), nullable=False),
+ sa.Column("symbols", sa.Text),
+ sa.Column("sentiment", sa.Float, nullable=False),
+ sa.Column("category", sa.Text, nullable=False),
+ sa.Column("raw_data", sa.Text),
+ sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
+ )
+ op.create_index("idx_news_items_published", "news_items", ["published_at"])
+ op.create_index("idx_news_items_source", "news_items", ["source"])
+
+ op.create_table(
+ "symbol_scores",
+ sa.Column("id", sa.Text, primary_key=True),
+ sa.Column("symbol", sa.Text, nullable=False, unique=True),
+ sa.Column("news_score", sa.Float, nullable=False, server_default="0"),
+ sa.Column("news_count", sa.Integer, nullable=False, server_default="0"),
+ sa.Column("social_score", sa.Float, nullable=False, server_default="0"),
+ sa.Column("policy_score", sa.Float, nullable=False, server_default="0"),
+ sa.Column("filing_score", sa.Float, nullable=False, server_default="0"),
+ sa.Column("composite", sa.Float, nullable=False, server_default="0"),
+ sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
+ )
+
+ op.create_table(
+ "market_sentiment",
+ sa.Column("id", sa.Text, primary_key=True),
+ sa.Column("fear_greed", sa.Integer, nullable=False),
+ sa.Column("fear_greed_label", sa.Text, nullable=False),
+ sa.Column("vix", sa.Float),
+ sa.Column("fed_stance", sa.Text, nullable=False, server_default="neutral"),
+ sa.Column("market_regime", sa.Text, nullable=False, server_default="neutral"),
+ sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
+ )
+
+ op.create_table(
+ "stock_selections",
+ sa.Column("id", sa.Text, primary_key=True),
+ sa.Column("trade_date", sa.Date, nullable=False),
+ sa.Column("symbol", sa.Text, nullable=False),
+ sa.Column("side", sa.Text, nullable=False),
+ sa.Column("conviction", sa.Float, nullable=False),
+ sa.Column("reason", sa.Text, nullable=False),
+ sa.Column("key_news", sa.Text),
+ sa.Column("sentiment_snapshot", sa.Text),
+ sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
+ )
+ op.create_index("idx_stock_selections_date", "stock_selections", ["trade_date"])
+
+
+def downgrade() -> None:
+ op.drop_table("stock_selections")
+ op.drop_table("market_sentiment")
+ op.drop_table("symbol_scores")
+ op.drop_table("news_items")
+```
+
+- [ ] **Step 2: Verify migration imports correctly**
+
+Run: `cd shared && python -c "from alembic.versions import *; print('OK')" && cd ..`
+Or simply: `python -c "import importlib.util; s=importlib.util.spec_from_file_location('m','shared/alembic/versions/002_news_sentiment_tables.py'); m=importlib.util.module_from_spec(s); s.loader.exec_module(m); print('OK')"`
+Expected: OK (no import errors)
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add shared/alembic/versions/002_news_sentiment_tables.py
+git commit -m "feat: add Alembic migration for news and sentiment tables"
+```
+
+---
+
+### Task 4: Add NewsEvent to shared events and DB methods for news
+
+**Files:**
+- Modify: `shared/src/shared/events.py`
+- Modify: `shared/src/shared/db.py`
+- Create: `shared/tests/test_news_events.py`
+- Create: `shared/tests/test_db_news.py`
+
+- [ ] **Step 1: Write tests for NewsEvent**
+
+Create `shared/tests/test_news_events.py`:
+
+```python
+"""Tests for NewsEvent."""
+
+from datetime import datetime, timezone
+
+from shared.models import NewsCategory, NewsItem
+from shared.events import NewsEvent, EventType, Event
+
+
+def test_news_event_to_dict():
+ item = NewsItem(
+ source="finnhub",
+ headline="Test",
+ published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ sentiment=0.5,
+ category=NewsCategory.MACRO,
+ )
+ event = NewsEvent(data=item)
+ d = event.to_dict()
+ assert d["type"] == EventType.NEWS
+ assert d["data"]["source"] == "finnhub"
+
+
+def test_news_event_from_raw():
+ raw = {
+ "type": "NEWS",
+ "data": {
+ "id": "abc",
+ "source": "rss",
+ "headline": "Test headline",
+ "published_at": "2026-04-02T00:00:00+00:00",
+ "sentiment": 0.3,
+ "category": "earnings",
+ "symbols": ["AAPL"],
+ "raw_data": {},
+ },
+ }
+ event = NewsEvent.from_raw(raw)
+ assert event.data.source == "rss"
+ assert event.data.symbols == ["AAPL"]
+
+
+def test_event_dispatcher_news():
+ raw = {
+ "type": "NEWS",
+ "data": {
+ "id": "abc",
+ "source": "finnhub",
+ "headline": "Test",
+ "published_at": "2026-04-02T00:00:00+00:00",
+ "sentiment": 0.0,
+ "category": "macro",
+ "raw_data": {},
+ },
+ }
+ event = Event.from_dict(raw)
+ assert isinstance(event, NewsEvent)
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest shared/tests/test_news_events.py -v`
+Expected: FAIL — `NewsEvent` not in events.py, `EventType.NEWS` missing
+
+- [ ] **Step 3: Add NewsEvent to events.py**
+
+Add `NEWS = "NEWS"` to `EventType` enum.
+
+Add `NewsEvent` class and register it in `_EVENT_TYPE_MAP`:
+
+```python
+from shared.models import Candle, Signal, Order, NewsItem
+
+class NewsEvent(BaseModel):
+ type: EventType = EventType.NEWS
+ data: NewsItem
+
+ def to_dict(self) -> dict:
+ return {
+ "type": self.type,
+ "data": self.data.model_dump(mode="json"),
+ }
+
+ @classmethod
+ def from_raw(cls, raw: dict) -> "NewsEvent":
+ return cls(type=raw["type"], data=NewsItem(**raw["data"]))
+```
+
+Add to `_EVENT_TYPE_MAP`:
+```python
+EventType.NEWS: NewsEvent,
+```
+
+- [ ] **Step 4: Run event tests to verify they pass**
+
+Run: `pytest shared/tests/test_news_events.py -v`
+Expected: All 3 tests PASS
+
+- [ ] **Step 5: Run all existing event tests to check no regressions**
+
+Run: `pytest shared/tests/test_events.py -v`
+Expected: All existing tests PASS
+
+- [ ] **Step 6: Write tests for DB news methods**
+
+Create `shared/tests/test_db_news.py`:
+
+```python
+"""Tests for database news/sentiment methods.
+
+These tests use an in-memory SQLite database.
+"""
+
+import json
+import uuid
+import pytest
+from datetime import datetime, date, timezone
+
+from shared.db import Database
+from shared.models import NewsItem, NewsCategory
+from shared.sentiment_models import SymbolScore, MarketSentiment
+
+
+@pytest.fixture
+async def db():
+ database = Database("sqlite+aiosqlite://")
+ await database.connect()
+ yield database
+ await database.close()
+
+
+async def test_insert_and_get_news_items(db):
+ item = NewsItem(
+ source="finnhub",
+ headline="AAPL earnings beat",
+ published_at=datetime(2026, 4, 2, 12, 0, tzinfo=timezone.utc),
+ sentiment=0.8,
+ category=NewsCategory.EARNINGS,
+ symbols=["AAPL"],
+ )
+ await db.insert_news_item(item)
+ items = await db.get_recent_news(hours=24)
+ assert len(items) == 1
+ assert items[0]["headline"] == "AAPL earnings beat"
+
+
+async def test_upsert_symbol_score(db):
+ score = SymbolScore(
+ symbol="AAPL",
+ news_score=0.5,
+ news_count=10,
+ social_score=0.3,
+ policy_score=0.0,
+ filing_score=0.2,
+ composite=0.3,
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ await db.upsert_symbol_score(score)
+ scores = await db.get_top_symbol_scores(limit=5)
+ assert len(scores) == 1
+ assert scores[0]["symbol"] == "AAPL"
+
+
+async def test_upsert_market_sentiment(db):
+ ms = MarketSentiment(
+ fear_greed=55,
+ fear_greed_label="Neutral",
+ vix=18.2,
+ fed_stance="neutral",
+ market_regime="neutral",
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ await db.upsert_market_sentiment(ms)
+ result = await db.get_latest_market_sentiment()
+ assert result is not None
+ assert result["fear_greed"] == 55
+
+
+async def test_insert_stock_selection(db):
+ await db.insert_stock_selection(
+ trade_date=date(2026, 4, 2),
+ symbol="NVDA",
+ side="BUY",
+ conviction=0.85,
+ reason="CHIPS Act",
+ key_news=["Trump signs CHIPS expansion"],
+ sentiment_snapshot={"composite": 0.8},
+ )
+ selections = await db.get_stock_selections(date(2026, 4, 2))
+ assert len(selections) == 1
+ assert selections[0]["symbol"] == "NVDA"
+```
+
+- [ ] **Step 7: Run DB news tests to verify they fail**
+
+Run: `pytest shared/tests/test_db_news.py -v`
+Expected: FAIL — methods not yet on Database class
+
+- [ ] **Step 8: Add news/sentiment DB methods to db.py**
+
+Add to `shared/src/shared/db.py` — new import at top:
+
+```python
+import json
+import uuid
+from datetime import date
+from shared.models import NewsItem
+from shared.sentiment_models import SymbolScore, MarketSentiment
+from shared.sa_models import NewsItemRow, SymbolScoreRow, MarketSentimentRow, StockSelectionRow
+```
+
+Add these methods to the `Database` class:
+
+```python
+ async def insert_news_item(self, item: NewsItem) -> None:
+ """Insert a news item."""
+ row = NewsItemRow(
+ id=item.id,
+ source=item.source,
+ headline=item.headline,
+ summary=item.summary,
+ url=item.url,
+ published_at=item.published_at,
+ symbols=json.dumps(item.symbols),
+ sentiment=item.sentiment,
+ category=item.category.value,
+ raw_data=json.dumps(item.raw_data),
+ created_at=item.created_at,
+ )
+ async with self._session_factory() as session:
+ try:
+ session.add(row)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
+
+ async def get_recent_news(self, hours: int = 24) -> list[dict]:
+ """Retrieve news items from the last N hours."""
+ since = datetime.now(timezone.utc) - timedelta(hours=hours)
+ stmt = (
+ select(NewsItemRow)
+ .where(NewsItemRow.published_at >= since)
+ .order_by(NewsItemRow.published_at.desc())
+ )
+ async with self._session_factory() as session:
+ result = await session.execute(stmt)
+ rows = result.scalars().all()
+ return [
+ {
+ "id": r.id,
+ "source": r.source,
+ "headline": r.headline,
+ "summary": r.summary,
+ "url": r.url,
+ "published_at": r.published_at,
+ "symbols": json.loads(r.symbols) if r.symbols else [],
+ "sentiment": r.sentiment,
+ "category": r.category,
+ "created_at": r.created_at,
+ }
+ for r in rows
+ ]
+
+ async def upsert_symbol_score(self, score: SymbolScore) -> None:
+ """Insert or update a symbol score."""
+ async with self._session_factory() as session:
+ try:
+ existing = await session.execute(
+ select(SymbolScoreRow).where(SymbolScoreRow.symbol == score.symbol)
+ )
+ row = existing.scalar_one_or_none()
+ if row:
+ row.news_score = score.news_score
+ row.news_count = score.news_count
+ row.social_score = score.social_score
+ row.policy_score = score.policy_score
+ row.filing_score = score.filing_score
+ row.composite = score.composite
+ row.updated_at = score.updated_at
+ else:
+ row = SymbolScoreRow(
+ id=str(uuid.uuid4()),
+ symbol=score.symbol,
+ news_score=score.news_score,
+ news_count=score.news_count,
+ social_score=score.social_score,
+ policy_score=score.policy_score,
+ filing_score=score.filing_score,
+ composite=score.composite,
+ updated_at=score.updated_at,
+ )
+ session.add(row)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
+
+ async def get_top_symbol_scores(self, limit: int = 20) -> list[dict]:
+ """Get top symbol scores ordered by composite descending."""
+ stmt = (
+ select(SymbolScoreRow)
+ .order_by(SymbolScoreRow.composite.desc())
+ .limit(limit)
+ )
+ async with self._session_factory() as session:
+ result = await session.execute(stmt)
+ rows = result.scalars().all()
+ return [
+ {
+ "symbol": r.symbol,
+ "news_score": r.news_score,
+ "news_count": r.news_count,
+ "social_score": r.social_score,
+ "policy_score": r.policy_score,
+ "filing_score": r.filing_score,
+ "composite": r.composite,
+ "updated_at": r.updated_at,
+ }
+ for r in rows
+ ]
+
+ async def upsert_market_sentiment(self, ms: MarketSentiment) -> None:
+ """Insert or update the latest market sentiment (single row, id='latest')."""
+ async with self._session_factory() as session:
+ try:
+ existing = await session.execute(
+ select(MarketSentimentRow).where(MarketSentimentRow.id == "latest")
+ )
+ row = existing.scalar_one_or_none()
+ if row:
+ row.fear_greed = ms.fear_greed
+ row.fear_greed_label = ms.fear_greed_label
+ row.vix = ms.vix
+ row.fed_stance = ms.fed_stance
+ row.market_regime = ms.market_regime
+ row.updated_at = ms.updated_at
+ else:
+ row = MarketSentimentRow(
+ id="latest",
+ fear_greed=ms.fear_greed,
+ fear_greed_label=ms.fear_greed_label,
+ vix=ms.vix,
+ fed_stance=ms.fed_stance,
+ market_regime=ms.market_regime,
+ updated_at=ms.updated_at,
+ )
+ session.add(row)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
+
+ async def get_latest_market_sentiment(self) -> dict | None:
+ """Get the latest market sentiment."""
+ stmt = select(MarketSentimentRow).where(MarketSentimentRow.id == "latest")
+ async with self._session_factory() as session:
+ result = await session.execute(stmt)
+ r = result.scalar_one_or_none()
+ if r is None:
+ return None
+ return {
+ "fear_greed": r.fear_greed,
+ "fear_greed_label": r.fear_greed_label,
+ "vix": r.vix,
+ "fed_stance": r.fed_stance,
+ "market_regime": r.market_regime,
+ "updated_at": r.updated_at,
+ }
+
+ async def insert_stock_selection(
+ self,
+ trade_date: date,
+ symbol: str,
+ side: str,
+ conviction: float,
+ reason: str,
+ key_news: list[str],
+ sentiment_snapshot: dict,
+ ) -> None:
+ """Insert a stock selection record."""
+ row = StockSelectionRow(
+ id=str(uuid.uuid4()),
+ trade_date=trade_date,
+ symbol=symbol,
+ side=side,
+ conviction=conviction,
+ reason=reason,
+ key_news=json.dumps(key_news),
+ sentiment_snapshot=json.dumps(sentiment_snapshot),
+ )
+ async with self._session_factory() as session:
+ try:
+ session.add(row)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
+
+ async def get_stock_selections(self, trade_date: date) -> list[dict]:
+ """Get stock selections for a specific date."""
+ stmt = (
+ select(StockSelectionRow)
+ .where(StockSelectionRow.trade_date == trade_date)
+ .order_by(StockSelectionRow.conviction.desc())
+ )
+ async with self._session_factory() as session:
+ result = await session.execute(stmt)
+ rows = result.scalars().all()
+ return [
+ {
+ "symbol": r.symbol,
+ "side": r.side,
+ "conviction": r.conviction,
+ "reason": r.reason,
+ "key_news": json.loads(r.key_news) if r.key_news else [],
+ "sentiment_snapshot": json.loads(r.sentiment_snapshot) if r.sentiment_snapshot else {},
+ }
+ for r in rows
+ ]
+```
+
+- [ ] **Step 9: Run DB news tests to verify they pass**
+
+Run: `pytest shared/tests/test_db_news.py -v`
+Expected: All 4 tests PASS
+
+Note: These tests require `aiosqlite` package. If not installed: `pip install aiosqlite`
+
+- [ ] **Step 10: Run all shared tests to check no regressions**
+
+Run: `pytest shared/tests/ -v`
+Expected: All tests PASS
+
+- [ ] **Step 11: Commit**
+
+```bash
+git add shared/src/shared/events.py shared/src/shared/db.py shared/tests/test_news_events.py shared/tests/test_db_news.py
+git commit -m "feat: add NewsEvent, DB methods for news/sentiment/selections"
+```
+
+---
+
+### Task 5: Update Settings with new env vars
+
+**Files:**
+- Modify: `shared/src/shared/config.py`
+- Modify: `.env.example`
+
+- [ ] **Step 1: Add new settings to config.py**
+
+Add these fields to the `Settings` class in `shared/src/shared/config.py`:
+
+```python
+ # News collector
+ finnhub_api_key: str = ""
+ news_poll_interval: int = 300
+ sentiment_aggregate_interval: int = 900
+ # Stock selector
+ selector_candidates_time: str = "15:00"
+ selector_filter_time: str = "15:15"
+ selector_final_time: str = "15:30"
+ selector_max_picks: int = 3
+ # LLM
+ anthropic_api_key: str = ""
+ anthropic_model: str = "claude-sonnet-4-20250514"
+```
+
+- [ ] **Step 2: Add to .env.example**
+
+Append to `.env.example`:
+
+```bash
+
+# News Collector
+FINNHUB_API_KEY=
+NEWS_POLL_INTERVAL=300
+SENTIMENT_AGGREGATE_INTERVAL=900
+
+# Stock Selector
+SELECTOR_CANDIDATES_TIME=15:00
+SELECTOR_FILTER_TIME=15:15
+SELECTOR_FINAL_TIME=15:30
+SELECTOR_MAX_PICKS=3
+
+# LLM (for stock selector)
+ANTHROPIC_API_KEY=
+ANTHROPIC_MODEL=claude-sonnet-4-20250514
+```
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add shared/src/shared/config.py .env.example
+git commit -m "feat: add news collector and stock selector config settings"
+```
+
+---
+
+## Phase 2: News Collector Service
+
+### Task 6: Scaffold news-collector service
+
+**Files:**
+- Create: `services/news-collector/pyproject.toml`
+- Create: `services/news-collector/Dockerfile`
+- Create: `services/news-collector/src/news_collector/__init__.py`
+- Create: `services/news-collector/src/news_collector/config.py`
+- Create: `services/news-collector/src/news_collector/collectors/__init__.py`
+- Create: `services/news-collector/src/news_collector/collectors/base.py`
+- Create: `services/news-collector/tests/__init__.py`
+
+- [ ] **Step 1: Create pyproject.toml**
+
+Create `services/news-collector/pyproject.toml`:
+
+```toml
+[project]
+name = "news-collector"
+version = "0.1.0"
+description = "News and sentiment data collector service"
+requires-python = ">=3.12"
+dependencies = [
+ "trading-shared",
+ "feedparser>=6.0",
+ "nltk>=3.8",
+ "aiohttp>=3.9",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest>=8.0",
+ "pytest-asyncio>=0.23",
+ "aioresponses>=0.7",
+]
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/news_collector"]
+```
+
+- [ ] **Step 2: Create Dockerfile**
+
+Create `services/news-collector/Dockerfile`:
+
+```dockerfile
+FROM python:3.12-slim
+WORKDIR /app
+COPY shared/ shared/
+RUN pip install --no-cache-dir ./shared
+COPY services/news-collector/ services/news-collector/
+RUN pip install --no-cache-dir ./services/news-collector
+RUN python -c "import nltk; nltk.download('vader_lexicon', quiet=True)"
+ENV PYTHONPATH=/app
+CMD ["python", "-m", "news_collector.main"]
+```
+
+- [ ] **Step 3: Create config.py**
+
+Create `services/news-collector/src/news_collector/config.py`:
+
+```python
+"""News Collector configuration."""
+
+from shared.config import Settings
+
+
+class NewsCollectorConfig(Settings):
+ health_port: int = 8084
+ finnhub_api_key: str = ""
+ news_poll_interval: int = 300
+ sentiment_aggregate_interval: int = 900
+```
+
+- [ ] **Step 4: Create BaseCollector**
+
+Create `services/news-collector/src/news_collector/collectors/base.py`:
+
+```python
+"""Base class for all news collectors."""
+
+from abc import ABC, abstractmethod
+
+from shared.models import NewsItem
+
+
+class BaseCollector(ABC):
+ name: str = "base"
+ poll_interval: int = 300 # seconds
+
+ @abstractmethod
+ async def collect(self) -> list[NewsItem]:
+ """Collect news items from the source."""
+
+ @abstractmethod
+ async def is_available(self) -> bool:
+ """Check if this data source is accessible."""
+```
+
+- [ ] **Step 5: Create __init__.py files**
+
+Create `services/news-collector/src/news_collector/__init__.py`:
+```python
+"""News collector service."""
+```
+
+Create `services/news-collector/src/news_collector/collectors/__init__.py`:
+```python
+"""News collectors."""
+```
+
+Create `services/news-collector/tests/__init__.py`:
+```python
+```
+
+- [ ] **Step 6: Commit**
+
+```bash
+git add services/news-collector/
+git commit -m "feat: scaffold news-collector service with BaseCollector"
+```
+
+---
+
+### Task 7: Implement Finnhub news collector
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/finnhub.py`
+- Create: `services/news-collector/tests/test_finnhub.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_finnhub.py`:
+
+```python
+"""Tests for Finnhub news collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+from datetime import datetime, timezone
+
+from news_collector.collectors.finnhub import FinnhubCollector
+
+
+@pytest.fixture
+def collector():
+ return FinnhubCollector(api_key="test_key")
+
+
+def test_collector_name(collector):
+ assert collector.name == "finnhub"
+ assert collector.poll_interval == 300
+
+
+async def test_is_available_with_key(collector):
+ assert await collector.is_available() is True
+
+
+async def test_is_available_without_key():
+ c = FinnhubCollector(api_key="")
+ assert await c.is_available() is False
+
+
+async def test_collect_parses_response(collector):
+ mock_response = [
+ {
+ "category": "top news",
+ "datetime": 1711929600,
+ "headline": "AAPL beats earnings",
+ "id": 12345,
+ "related": "AAPL",
+ "source": "MarketWatch",
+ "summary": "Apple reported better than expected...",
+ "url": "https://example.com/article",
+ },
+ {
+ "category": "top news",
+ "datetime": 1711929000,
+ "headline": "Fed holds rates steady",
+ "id": 12346,
+ "related": "",
+ "source": "Reuters",
+ "summary": "The Federal Reserve...",
+ "url": "https://example.com/fed",
+ },
+ ]
+
+ with patch.object(collector, "_fetch_news", new_callable=AsyncMock, return_value=mock_response):
+ items = await collector.collect()
+
+ assert len(items) == 2
+ assert items[0].source == "finnhub"
+ assert items[0].headline == "AAPL beats earnings"
+ assert items[0].symbols == ["AAPL"]
+ assert items[0].url == "https://example.com/article"
+ assert isinstance(items[0].sentiment, float)
+ # Second item has no related ticker
+ assert items[1].symbols == []
+
+
+async def test_collect_handles_empty_response(collector):
+ with patch.object(collector, "_fetch_news", new_callable=AsyncMock, return_value=[]):
+ items = await collector.collect()
+ assert items == []
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_finnhub.py -v`
+Expected: FAIL — module not found
+
+- [ ] **Step 3: Implement FinnhubCollector**
+
+Create `services/news-collector/src/news_collector/collectors/finnhub.py`:
+
+```python
+"""Finnhub market news collector (free tier: 60 req/min)."""
+
+import logging
+from datetime import datetime, timezone
+
+import aiohttp
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+FINNHUB_NEWS_URL = "https://finnhub.io/api/v1/news"
+
+
+class FinnhubCollector(BaseCollector):
+ name = "finnhub"
+ poll_interval = 300 # 5 minutes
+
+ def __init__(self, api_key: str) -> None:
+ self._api_key = api_key
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return bool(self._api_key)
+
+ async def _fetch_news(self) -> list[dict]:
+ """Fetch general news from Finnhub API."""
+ params = {"category": "general", "token": self._api_key}
+ async with aiohttp.ClientSession() as session:
+ async with session.get(FINNHUB_NEWS_URL, params=params) as resp:
+ if resp.status != 200:
+ logger.warning("finnhub_fetch_failed", status=resp.status)
+ return []
+ return await resp.json()
+
+ def _analyze_sentiment(self, text: str) -> float:
+ """Return VADER compound score (-1.0 to 1.0)."""
+ scores = self._vader.polarity_scores(text)
+ return scores["compound"]
+
+ def _extract_symbols(self, related: str) -> list[str]:
+ """Parse Finnhub 'related' field into symbol list."""
+ if not related or not related.strip():
+ return []
+ return [s.strip() for s in related.split(",") if s.strip()]
+
+ def _categorize(self, article: dict) -> NewsCategory:
+ """Determine category from article content."""
+ headline = article.get("headline", "").lower()
+ if any(w in headline for w in ["fed", "fomc", "rate", "inflation"]):
+ return NewsCategory.FED
+ if any(w in headline for w in ["tariff", "sanction", "regulation", "trump", "biden", "congress"]):
+ return NewsCategory.POLICY
+ if any(w in headline for w in ["earnings", "revenue", "profit", "eps"]):
+ return NewsCategory.EARNINGS
+ return NewsCategory.MACRO
+
+ async def collect(self) -> list[NewsItem]:
+ raw = await self._fetch_news()
+ items = []
+ for article in raw:
+ headline = article.get("headline", "")
+ summary = article.get("summary", "")
+ sentiment_text = f"{headline}. {summary}" if summary else headline
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=headline,
+ summary=summary or None,
+ url=article.get("url"),
+ published_at=datetime.fromtimestamp(
+ article.get("datetime", 0), tz=timezone.utc
+ ),
+ symbols=self._extract_symbols(article.get("related", "")),
+ sentiment=self._analyze_sentiment(sentiment_text),
+ category=self._categorize(article),
+ raw_data=article,
+ )
+ )
+ return items
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_finnhub.py -v`
+Expected: All 5 tests PASS
+
+Note: Requires `nltk` and VADER lexicon. If not downloaded: `python -c "import nltk; nltk.download('vader_lexicon')"`
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/finnhub.py services/news-collector/tests/test_finnhub.py
+git commit -m "feat: implement Finnhub news collector with VADER sentiment"
+```
+
+---
+
+### Task 8: Implement RSS news collector
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/rss.py`
+- Create: `services/news-collector/tests/test_rss.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_rss.py`:
+
+```python
+"""Tests for RSS news collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+from datetime import datetime, timezone
+
+from news_collector.collectors.rss import RSSCollector
+
+
+@pytest.fixture
+def collector():
+ return RSSCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "rss"
+ assert collector.poll_interval == 600
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_feed(collector):
+ mock_feed = {
+ "entries": [
+ {
+ "title": "NVDA surges on AI demand",
+ "link": "https://example.com/nvda",
+ "published_parsed": (2026, 4, 2, 12, 0, 0, 0, 0, 0),
+ "summary": "Nvidia stock jumped 5%...",
+ },
+ {
+ "title": "Markets rally on jobs data",
+ "link": "https://example.com/market",
+ "published_parsed": (2026, 4, 2, 11, 0, 0, 0, 0, 0),
+ "summary": "The S&P 500 rose...",
+ },
+ ],
+ }
+
+ with patch.object(collector, "_fetch_feeds", new_callable=AsyncMock, return_value=[mock_feed]):
+ items = await collector.collect()
+
+ assert len(items) == 2
+ assert items[0].source == "rss"
+ assert items[0].headline == "NVDA surges on AI demand"
+ assert isinstance(items[0].sentiment, float)
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_rss.py -v`
+Expected: FAIL
+
+- [ ] **Step 3: Implement RSSCollector**
+
+Create `services/news-collector/src/news_collector/collectors/rss.py`:
+
+```python
+"""RSS feed collector for Yahoo Finance, Google News, MarketWatch."""
+
+import asyncio
+import logging
+import re
+from calendar import timegm
+from datetime import datetime, timezone
+
+import aiohttp
+import feedparser
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+DEFAULT_FEEDS = [
+ "https://feeds.finance.yahoo.com/rss/2.0/headline?s=^GSPC®ion=US&lang=en-US",
+ "https://news.google.com/rss/topics/CAAqJggKIiBDQkFTRWdvSUwyMHZNRGx6TVdZU0FtVnVHZ0pWVXlnQVAB?hl=en-US&gl=US&ceid=US:en",
+ "https://www.marketwatch.com/rss/topstories",
+]
+
+# Common US stock tickers to detect in headlines
+TICKER_PATTERN = re.compile(
+ r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|JPM|V|JNJ|WMT|PG|UNH|HD|"
+ r"MA|DIS|BAC|XOM|PFE|KO|PEP|CSCO|INTC|VZ|NFLX|ADBE|CRM|AMD|QCOM|"
+ r"GS|BA|CAT|MMM|IBM|GE|F|GM|NKE|MCD|SBUX|SPY|QQQ|IWM)\b"
+)
+
+
+class RSSCollector(BaseCollector):
+ name = "rss"
+ poll_interval = 600 # 10 minutes
+
+ def __init__(self, feeds: list[str] | None = None) -> None:
+ self._feeds = feeds or DEFAULT_FEEDS
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_feeds(self) -> list[dict]:
+ """Fetch and parse all RSS feeds."""
+ results = []
+ async with aiohttp.ClientSession() as session:
+ for url in self._feeds:
+ try:
+ async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
+ if resp.status == 200:
+ text = await resp.text()
+ feed = feedparser.parse(text)
+ results.append(feed)
+ except Exception as exc:
+ logger.warning("rss_fetch_failed", url=url, error=str(exc))
+ return results
+
+ def _extract_symbols(self, text: str) -> list[str]:
+ """Extract stock tickers from text."""
+ return list(set(TICKER_PATTERN.findall(text)))
+
+ def _parse_time(self, entry: dict) -> datetime:
+ """Parse published time from feed entry."""
+ parsed = entry.get("published_parsed")
+ if parsed:
+ return datetime.fromtimestamp(timegm(parsed), tz=timezone.utc)
+ return datetime.now(timezone.utc)
+
+ async def collect(self) -> list[NewsItem]:
+ feeds = await self._fetch_feeds()
+ items = []
+ seen_titles = set()
+
+ for feed in feeds:
+ for entry in feed.get("entries", []):
+ title = entry.get("title", "").strip()
+ if not title or title in seen_titles:
+ continue
+ seen_titles.add(title)
+
+ summary = entry.get("summary", "")
+ sentiment_text = f"{title}. {summary}" if summary else title
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=title,
+ summary=summary or None,
+ url=entry.get("link"),
+ published_at=self._parse_time(entry),
+ symbols=self._extract_symbols(f"{title} {summary}"),
+ sentiment=self._vader.polarity_scores(sentiment_text)["compound"],
+ category=NewsCategory.MACRO,
+ raw_data={"feed_title": title},
+ )
+ )
+
+ return items
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_rss.py -v`
+Expected: All 3 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/rss.py services/news-collector/tests/test_rss.py
+git commit -m "feat: implement RSS news collector (Yahoo, Google News, MarketWatch)"
+```
+
+---
+
+### Task 9: Implement Fear & Greed Index collector
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/fear_greed.py`
+- Create: `services/news-collector/tests/test_fear_greed.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_fear_greed.py`:
+
+```python
+"""Tests for CNN Fear & Greed Index collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from news_collector.collectors.fear_greed import FearGreedCollector
+
+
+@pytest.fixture
+def collector():
+ return FearGreedCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "fear_greed"
+ assert collector.poll_interval == 3600
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_api_response(collector):
+ mock_data = {
+ "fear_and_greed": {
+ "score": 45.0,
+ "rating": "Fear",
+ "timestamp": "2026-04-02T12:00:00+00:00",
+ }
+ }
+ with patch.object(collector, "_fetch_index", new_callable=AsyncMock, return_value=mock_data):
+ result = await collector.collect()
+
+ assert result.fear_greed == 45
+ assert result.fear_greed_label == "Fear"
+
+
+async def test_collect_returns_none_on_failure(collector):
+ with patch.object(collector, "_fetch_index", new_callable=AsyncMock, return_value=None):
+ result = await collector.collect()
+ assert result is None
+
+
+def test_classify_label():
+ c = FearGreedCollector()
+ assert c._classify(10) == "Extreme Fear"
+ assert c._classify(30) == "Fear"
+ assert c._classify(50) == "Neutral"
+ assert c._classify(70) == "Greed"
+ assert c._classify(85) == "Extreme Greed"
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_fear_greed.py -v`
+Expected: FAIL
+
+- [ ] **Step 3: Implement FearGreedCollector**
+
+Create `services/news-collector/src/news_collector/collectors/fear_greed.py`:
+
+```python
+"""CNN Fear & Greed Index collector."""
+
+import logging
+from dataclasses import dataclass
+from typing import Optional
+
+import aiohttp
+
+from news_collector.collectors.base import BaseCollector
+from shared.models import NewsItem
+
+logger = logging.getLogger(__name__)
+
+FEAR_GREED_URL = "https://production.dataviz.cnn.io/index/fearandgreed/graphdata"
+
+
+@dataclass
+class FearGreedResult:
+ fear_greed: int
+ fear_greed_label: str
+
+
+class FearGreedCollector(BaseCollector):
+ """Fetches CNN Fear & Greed Index.
+
+ Note: This collector does NOT return NewsItem — it returns FearGreedResult
+ which feeds directly into MarketSentiment. The main.py scheduler handles
+ this differently from news collectors.
+ """
+
+ name = "fear_greed"
+ poll_interval = 3600 # 1 hour
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_index(self) -> Optional[dict]:
+ """Fetch Fear & Greed data from CNN API."""
+ headers = {"User-Agent": "Mozilla/5.0"}
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(
+ FEAR_GREED_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
+ ) as resp:
+ if resp.status != 200:
+ logger.warning("fear_greed_fetch_failed", status=resp.status)
+ return None
+ return await resp.json()
+ except Exception as exc:
+ logger.warning("fear_greed_error", error=str(exc))
+ return None
+
+ def _classify(self, score: int) -> str:
+ """Classify numeric score into label."""
+ if score <= 20:
+ return "Extreme Fear"
+ if score <= 40:
+ return "Fear"
+ if score <= 60:
+ return "Neutral"
+ if score <= 80:
+ return "Greed"
+ return "Extreme Greed"
+
+ async def collect(self) -> Optional[FearGreedResult]:
+ """Collect Fear & Greed Index. Returns FearGreedResult or None."""
+ data = await self._fetch_index()
+ if data is None:
+ return None
+
+ try:
+ fg = data["fear_and_greed"]
+ score = int(fg["score"])
+ label = fg.get("rating", self._classify(score))
+ return FearGreedResult(fear_greed=score, fear_greed_label=label)
+ except (KeyError, ValueError, TypeError) as exc:
+ logger.warning("fear_greed_parse_failed", error=str(exc))
+ return None
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_fear_greed.py -v`
+Expected: All 5 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/fear_greed.py services/news-collector/tests/test_fear_greed.py
+git commit -m "feat: implement CNN Fear & Greed Index collector"
+```
+
+---
+
+### Task 10: Implement SEC EDGAR collector
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/sec_edgar.py`
+- Create: `services/news-collector/tests/test_sec_edgar.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_sec_edgar.py`:
+
+```python
+"""Tests for SEC EDGAR filing collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from news_collector.collectors.sec_edgar import SecEdgarCollector
+
+
+@pytest.fixture
+def collector():
+ return SecEdgarCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "sec_edgar"
+ assert collector.poll_interval == 1800
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_filings(collector):
+ mock_response = {
+ "filings": {
+ "recent": {
+ "accessionNumber": ["0001234-26-000001"],
+ "filingDate": ["2026-04-02"],
+ "primaryDocument": ["filing.htm"],
+ "form": ["8-K"],
+ "primaryDocDescription": ["Current Report"],
+ }
+ },
+ "tickers": [{"ticker": "AAPL"}],
+ "name": "Apple Inc",
+ }
+ with patch.object(collector, "_fetch_recent_filings", new_callable=AsyncMock, return_value=[mock_response]):
+ items = await collector.collect()
+
+ assert len(items) == 1
+ assert items[0].source == "sec_edgar"
+ assert items[0].category.value == "filing"
+ assert "AAPL" in items[0].symbols
+
+
+async def test_collect_handles_empty(collector):
+ with patch.object(collector, "_fetch_recent_filings", new_callable=AsyncMock, return_value=[]):
+ items = await collector.collect()
+ assert items == []
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_sec_edgar.py -v`
+Expected: FAIL
+
+- [ ] **Step 3: Implement SecEdgarCollector**
+
+Create `services/news-collector/src/news_collector/collectors/sec_edgar.py`:
+
+```python
+"""SEC EDGAR filing collector (free, no API key required)."""
+
+import logging
+from datetime import datetime, timezone
+
+import aiohttp
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+EDGAR_FULL_TEXT_SEARCH = "https://efts.sec.gov/LATEST/search-index"
+EDGAR_RECENT_FILINGS = "https://efts.sec.gov/LATEST/search-index?q=%228-K%22&dateRange=custom&startdt={date}&enddt={date}&forms=8-K"
+EDGAR_COMPANY_FILINGS = "https://data.sec.gov/submissions/CIK{cik}.json"
+
+# CIK numbers for major companies (subset — extend as needed)
+TRACKED_CIKS = {
+ "0000320193": "AAPL",
+ "0000789019": "MSFT",
+ "0001652044": "GOOGL",
+ "0001018724": "AMZN",
+ "0001318605": "TSLA",
+ "0001045810": "NVDA",
+ "0001326801": "META",
+ "0000019617": "JPM",
+ "0000078003": "PFE",
+ "0000021344": "KO",
+}
+
+SEC_USER_AGENT = "TradingPlatform research@example.com"
+
+
+class SecEdgarCollector(BaseCollector):
+ name = "sec_edgar"
+ poll_interval = 1800 # 30 minutes
+
+ def __init__(self) -> None:
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_recent_filings(self) -> list[dict]:
+ """Fetch recent 8-K filings for tracked companies."""
+ results = []
+ headers = {"User-Agent": SEC_USER_AGENT}
+ async with aiohttp.ClientSession() as session:
+ for cik, ticker in TRACKED_CIKS.items():
+ try:
+ url = f"https://data.sec.gov/submissions/CIK{cik}.json"
+ async with session.get(
+ url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
+ ) as resp:
+ if resp.status == 200:
+ data = await resp.json()
+ data["tickers"] = [{"ticker": ticker}]
+ results.append(data)
+ except Exception as exc:
+ logger.warning("sec_fetch_failed", cik=cik, error=str(exc))
+ return results
+
+ async def collect(self) -> list[NewsItem]:
+ filings_data = await self._fetch_recent_filings()
+ items = []
+ today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
+
+ for company_data in filings_data:
+ tickers = [t["ticker"] for t in company_data.get("tickers", [])]
+ company_name = company_data.get("name", "Unknown")
+ recent = company_data.get("filings", {}).get("recent", {})
+
+ forms = recent.get("form", [])
+ dates = recent.get("filingDate", [])
+ descriptions = recent.get("primaryDocDescription", [])
+ accessions = recent.get("accessionNumber", [])
+
+ for i, form in enumerate(forms):
+ if form != "8-K":
+ continue
+ filing_date = dates[i] if i < len(dates) else ""
+ if filing_date != today:
+ continue
+
+ desc = descriptions[i] if i < len(descriptions) else "8-K Filing"
+ accession = accessions[i] if i < len(accessions) else ""
+ headline = f"{company_name} ({', '.join(tickers)}): {form} - {desc}"
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=headline,
+ summary=desc,
+ url=f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&accession={accession}",
+ published_at=datetime.strptime(filing_date, "%Y-%m-%d").replace(tzinfo=timezone.utc),
+ symbols=tickers,
+ sentiment=self._vader.polarity_scores(headline)["compound"],
+ category=NewsCategory.FILING,
+ raw_data={"form": form, "accession": accession},
+ )
+ )
+
+ return items
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_sec_edgar.py -v`
+Expected: All 4 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/sec_edgar.py services/news-collector/tests/test_sec_edgar.py
+git commit -m "feat: implement SEC EDGAR 8-K filing collector"
+```
+
+---
+
+### Task 11: Implement Reddit collector
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/reddit.py`
+- Create: `services/news-collector/tests/test_reddit.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_reddit.py`:
+
+```python
+"""Tests for Reddit collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from news_collector.collectors.reddit import RedditCollector
+
+
+@pytest.fixture
+def collector():
+ return RedditCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "reddit"
+ assert collector.poll_interval == 900
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_posts(collector):
+ mock_posts = [
+ {
+ "data": {
+ "title": "NVDA to the moon! 🚀 AI demand is insane",
+ "selftext": "Just loaded up on NVDA calls",
+ "url": "https://reddit.com/r/wallstreetbets/123",
+ "created_utc": 1711929600,
+ "score": 500,
+ "num_comments": 200,
+ "subreddit": "wallstreetbets",
+ }
+ },
+ ]
+ with patch.object(collector, "_fetch_subreddit", new_callable=AsyncMock, return_value=mock_posts):
+ items = await collector.collect()
+
+ assert len(items) >= 1
+ assert items[0].source == "reddit"
+ assert items[0].category.value == "social"
+ assert isinstance(items[0].sentiment, float)
+
+
+async def test_collect_filters_low_score(collector):
+ mock_posts = [
+ {
+ "data": {
+ "title": "Random question about stocks",
+ "selftext": "",
+ "url": "https://reddit.com/r/stocks/456",
+ "created_utc": 1711929600,
+ "score": 3,
+ "num_comments": 1,
+ "subreddit": "stocks",
+ }
+ },
+ ]
+ with patch.object(collector, "_fetch_subreddit", new_callable=AsyncMock, return_value=mock_posts):
+ items = await collector.collect()
+ assert items == []
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_reddit.py -v`
+Expected: FAIL
+
+- [ ] **Step 3: Implement RedditCollector**
+
+Create `services/news-collector/src/news_collector/collectors/reddit.py`:
+
+```python
+"""Reddit collector for r/wallstreetbets, r/stocks, r/investing."""
+
+import logging
+import re
+from datetime import datetime, timezone
+
+import aiohttp
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+SUBREDDITS = ["wallstreetbets", "stocks", "investing"]
+MIN_SCORE = 50 # Minimum upvotes to consider
+
+TICKER_PATTERN = re.compile(
+ r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|JPM|V|JNJ|WMT|PG|UNH|HD|"
+ r"MA|DIS|BAC|XOM|PFE|KO|PEP|CSCO|INTC|VZ|NFLX|ADBE|CRM|AMD|QCOM|"
+ r"GS|BA|CAT|MMM|IBM|GE|F|GM|NKE|MCD|SBUX|SPY|QQQ|IWM)\b"
+)
+
+
+class RedditCollector(BaseCollector):
+ name = "reddit"
+ poll_interval = 900 # 15 minutes
+
+ def __init__(self) -> None:
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_subreddit(self, subreddit: str = "wallstreetbets") -> list[dict]:
+ """Fetch hot posts from a subreddit via JSON API."""
+ url = f"https://www.reddit.com/r/{subreddit}/hot.json?limit=25"
+ headers = {"User-Agent": "TradingPlatform/1.0"}
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(
+ url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
+ ) as resp:
+ if resp.status != 200:
+ logger.warning("reddit_fetch_failed", subreddit=subreddit, status=resp.status)
+ return []
+ data = await resp.json()
+ return data.get("data", {}).get("children", [])
+ except Exception as exc:
+ logger.warning("reddit_error", subreddit=subreddit, error=str(exc))
+ return []
+
+ async def collect(self) -> list[NewsItem]:
+ items = []
+ seen_titles = set()
+
+ for subreddit in SUBREDDITS:
+ posts = await self._fetch_subreddit(subreddit)
+ for post in posts:
+ data = post.get("data", {})
+ title = data.get("title", "").strip()
+ score = data.get("score", 0)
+
+ if not title or title in seen_titles or score < MIN_SCORE:
+ continue
+ seen_titles.add(title)
+
+ selftext = data.get("selftext", "")
+ text = f"{title}. {selftext}" if selftext else title
+ symbols = list(set(TICKER_PATTERN.findall(text)))
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=title,
+ summary=selftext[:500] if selftext else None,
+ url=data.get("url"),
+ published_at=datetime.fromtimestamp(
+ data.get("created_utc", 0), tz=timezone.utc
+ ),
+ symbols=symbols,
+ sentiment=self._vader.polarity_scores(text)["compound"],
+ category=NewsCategory.SOCIAL,
+ raw_data={
+ "subreddit": data.get("subreddit", subreddit),
+ "score": score,
+ "num_comments": data.get("num_comments", 0),
+ },
+ )
+ )
+
+ return items
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_reddit.py -v`
+Expected: All 4 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/reddit.py services/news-collector/tests/test_reddit.py
+git commit -m "feat: implement Reddit social sentiment collector"
+```
+
+---
+
+### Task 12: Implement Truth Social and Fed collectors
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/truth_social.py`
+- Create: `services/news-collector/src/news_collector/collectors/fed.py`
+- Create: `services/news-collector/tests/test_truth_social.py`
+- Create: `services/news-collector/tests/test_fed.py`
+
+- [ ] **Step 1: Write tests for Truth Social**
+
+Create `services/news-collector/tests/test_truth_social.py`:
+
+```python
+"""Tests for Truth Social collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from news_collector.collectors.truth_social import TruthSocialCollector
+
+
+@pytest.fixture
+def collector():
+ return TruthSocialCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "truth_social"
+ assert collector.poll_interval == 900
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_posts(collector):
+ mock_posts = [
+ {
+ "content": "We are imposing 25% tariffs on all steel imports!",
+ "created_at": "2026-04-02T12:00:00.000Z",
+ "url": "https://truthsocial.com/@realDonaldTrump/12345",
+ },
+ ]
+ with patch.object(collector, "_fetch_posts", new_callable=AsyncMock, return_value=mock_posts):
+ items = await collector.collect()
+
+ assert len(items) == 1
+ assert items[0].source == "truth_social"
+ assert items[0].category.value == "policy"
+ assert "tariff" in items[0].headline.lower() or "tariff" in items[0].raw_data.get("content", "").lower()
+
+
+async def test_collect_handles_empty(collector):
+ with patch.object(collector, "_fetch_posts", new_callable=AsyncMock, return_value=[]):
+ items = await collector.collect()
+ assert items == []
+```
+
+- [ ] **Step 2: Write tests for Fed collector**
+
+Create `services/news-collector/tests/test_fed.py`:
+
+```python
+"""Tests for Federal Reserve collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from news_collector.collectors.fed import FedCollector
+
+
+@pytest.fixture
+def collector():
+ return FedCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "fed"
+ assert collector.poll_interval == 3600
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_rss(collector):
+ mock_entries = [
+ {
+ "title": "Federal Reserve issues FOMC statement",
+ "link": "https://www.federalreserve.gov/newsevents/pressreleases/monetary20260402a.htm",
+ "published_parsed": (2026, 4, 2, 14, 0, 0, 0, 0, 0),
+ "summary": "The Federal Open Market Committee decided to maintain the target range...",
+ },
+ ]
+ with patch.object(collector, "_fetch_fed_rss", new_callable=AsyncMock, return_value=mock_entries):
+ items = await collector.collect()
+
+ assert len(items) == 1
+ assert items[0].source == "fed"
+ assert items[0].category.value == "fed"
+```
+
+- [ ] **Step 3: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py -v`
+Expected: FAIL
+
+- [ ] **Step 4: Implement TruthSocialCollector**
+
+Create `services/news-collector/src/news_collector/collectors/truth_social.py`:
+
+```python
+"""Truth Social collector for Trump posts (policy-relevant)."""
+
+import logging
+from datetime import datetime, timezone
+
+import aiohttp
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+# Truth Social uses a Mastodon-compatible API
+TRUTH_SOCIAL_API = "https://truthsocial.com/api/v1/accounts/107780257626128497/statuses"
+
+
+class TruthSocialCollector(BaseCollector):
+ name = "truth_social"
+ poll_interval = 900 # 15 minutes
+
+ def __init__(self) -> None:
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_posts(self) -> list[dict]:
+ """Fetch recent posts from Truth Social."""
+ headers = {"User-Agent": "Mozilla/5.0"}
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(
+ TRUTH_SOCIAL_API,
+ headers=headers,
+ params={"limit": 10},
+ timeout=aiohttp.ClientTimeout(total=15),
+ ) as resp:
+ if resp.status != 200:
+ logger.warning("truth_social_fetch_failed", status=resp.status)
+ return []
+ return await resp.json()
+ except Exception as exc:
+ logger.warning("truth_social_error", error=str(exc))
+ return []
+
+ def _strip_html(self, text: str) -> str:
+ """Remove HTML tags from content."""
+ import re
+ return re.sub(r"<[^>]+>", "", text).strip()
+
+ async def collect(self) -> list[NewsItem]:
+ posts = await self._fetch_posts()
+ items = []
+
+ for post in posts:
+ content = self._strip_html(post.get("content", ""))
+ if not content:
+ continue
+
+ created_at_str = post.get("created_at", "")
+ try:
+ published = datetime.fromisoformat(created_at_str.replace("Z", "+00:00"))
+ except (ValueError, AttributeError):
+ published = datetime.now(timezone.utc)
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=content[:200],
+ summary=content if len(content) > 200 else None,
+ url=post.get("url"),
+ published_at=published,
+ symbols=[], # Symbols extracted at aggregation stage via LLM
+ sentiment=self._vader.polarity_scores(content)["compound"],
+ category=NewsCategory.POLICY,
+ raw_data={"content": content, "id": post.get("id")},
+ )
+ )
+
+ return items
+```
+
+- [ ] **Step 5: Implement FedCollector**
+
+Create `services/news-collector/src/news_collector/collectors/fed.py`:
+
+```python
+"""Federal Reserve press release and FOMC statement collector."""
+
+import logging
+from calendar import timegm
+from datetime import datetime, timezone
+
+import aiohttp
+import feedparser
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+FED_RSS_URL = "https://www.federalreserve.gov/feeds/press_all.xml"
+
+
+class FedCollector(BaseCollector):
+ name = "fed"
+ poll_interval = 3600 # 1 hour
+
+ def __init__(self) -> None:
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_fed_rss(self) -> list[dict]:
+ """Fetch Federal Reserve RSS feed entries."""
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(
+ FED_RSS_URL, timeout=aiohttp.ClientTimeout(total=10)
+ ) as resp:
+ if resp.status != 200:
+ logger.warning("fed_rss_failed", status=resp.status)
+ return []
+ text = await resp.text()
+ feed = feedparser.parse(text)
+ return feed.get("entries", [])
+ except Exception as exc:
+ logger.warning("fed_rss_error", error=str(exc))
+ return []
+
+ def _detect_stance(self, text: str) -> str:
+ """Detect hawkish/dovish/neutral stance from text."""
+ text_lower = text.lower()
+ hawkish_words = ["tighten", "raise", "inflation concern", "restrictive", "higher rates"]
+ dovish_words = ["accommodate", "cut", "easing", "lower rates", "support growth"]
+
+ hawk_count = sum(1 for w in hawkish_words if w in text_lower)
+ dove_count = sum(1 for w in dovish_words if w in text_lower)
+
+ if hawk_count > dove_count:
+ return "hawkish"
+ if dove_count > hawk_count:
+ return "dovish"
+ return "neutral"
+
+ async def collect(self) -> list[NewsItem]:
+ entries = await self._fetch_fed_rss()
+ items = []
+
+ for entry in entries:
+ title = entry.get("title", "").strip()
+ if not title:
+ continue
+
+ summary = entry.get("summary", "")
+ parsed_time = entry.get("published_parsed")
+ if parsed_time:
+ published = datetime.fromtimestamp(timegm(parsed_time), tz=timezone.utc)
+ else:
+ published = datetime.now(timezone.utc)
+
+ text = f"{title}. {summary}" if summary else title
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=title,
+ summary=summary or None,
+ url=entry.get("link"),
+ published_at=published,
+ symbols=[],
+ sentiment=self._vader.polarity_scores(text)["compound"],
+ category=NewsCategory.FED,
+ raw_data={"stance": self._detect_stance(text)},
+ )
+ )
+
+ return items
+```
+
+- [ ] **Step 6: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py -v`
+Expected: All 7 tests PASS
+
+- [ ] **Step 7: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/truth_social.py services/news-collector/src/news_collector/collectors/fed.py services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py
+git commit -m "feat: implement Truth Social and Federal Reserve collectors"
+```
+
+---
+
+### Task 13: Implement news-collector main.py (scheduler)
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/main.py`
+- Create: `services/news-collector/tests/test_main.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_main.py`:
+
+```python
+"""Tests for news collector scheduler."""
+
+import pytest
+from unittest.mock import AsyncMock, patch, MagicMock
+from datetime import datetime, timezone
+
+from shared.models import NewsCategory, NewsItem
+
+from news_collector.main import run_collector_once
+
+
+async def test_run_collector_once_stores_and_publishes():
+ mock_item = NewsItem(
+ source="test",
+ headline="Test news",
+ published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ sentiment=0.5,
+ category=NewsCategory.MACRO,
+ )
+
+ mock_collector = MagicMock()
+ mock_collector.name = "test"
+ mock_collector.collect = AsyncMock(return_value=[mock_item])
+
+ mock_db = MagicMock()
+ mock_db.insert_news_item = AsyncMock()
+
+ mock_broker = MagicMock()
+ mock_broker.publish = AsyncMock()
+
+ count = await run_collector_once(mock_collector, mock_db, mock_broker)
+
+ assert count == 1
+ mock_db.insert_news_item.assert_called_once_with(mock_item)
+ mock_broker.publish.assert_called_once()
+
+
+async def test_run_collector_once_handles_empty():
+ mock_collector = MagicMock()
+ mock_collector.name = "test"
+ mock_collector.collect = AsyncMock(return_value=[])
+
+ mock_db = MagicMock()
+ mock_broker = MagicMock()
+
+ count = await run_collector_once(mock_collector, mock_db, mock_broker)
+ assert count == 0
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_main.py -v`
+Expected: FAIL
+
+- [ ] **Step 3: Implement main.py**
+
+Create `services/news-collector/src/news_collector/main.py`:
+
+```python
+"""News Collector Service — schedules and runs all news collectors."""
+
+import asyncio
+import logging
+
+from shared.broker import RedisBroker
+from shared.db import Database
+from shared.events import NewsEvent
+from shared.healthcheck import HealthCheckServer
+from shared.logging import setup_logging
+from shared.metrics import ServiceMetrics
+from shared.models import NewsItem
+from shared.notifier import TelegramNotifier
+from shared.sentiment_models import MarketSentiment
+
+from news_collector.config import NewsCollectorConfig
+from news_collector.collectors.base import BaseCollector
+from news_collector.collectors.finnhub import FinnhubCollector
+from news_collector.collectors.rss import RSSCollector
+from news_collector.collectors.sec_edgar import SecEdgarCollector
+from news_collector.collectors.truth_social import TruthSocialCollector
+from news_collector.collectors.reddit import RedditCollector
+from news_collector.collectors.fear_greed import FearGreedCollector
+from news_collector.collectors.fed import FedCollector
+
+logger = logging.getLogger(__name__)
+
+
+async def run_collector_once(
+ collector: BaseCollector,
+ db: Database,
+ broker: RedisBroker,
+) -> int:
+ """Run a single collector, store results, publish to Redis.
+ Returns number of items collected."""
+ items = await collector.collect()
+ if not isinstance(items, list):
+ # FearGreedCollector returns a FearGreedResult, not a list
+ return 0
+
+ for item in items:
+ await db.insert_news_item(item)
+ event = NewsEvent(data=item)
+ await broker.publish("news", event.to_dict())
+
+ return len(items)
+
+
+async def run_collector_loop(
+ collector: BaseCollector,
+ db: Database,
+ broker: RedisBroker,
+ log,
+) -> None:
+ """Run a collector on its poll interval forever."""
+ while True:
+ try:
+ if await collector.is_available():
+ count = await run_collector_once(collector, db, broker)
+ log.info("collector_run", collector=collector.name, items=count)
+ else:
+ log.debug("collector_unavailable", collector=collector.name)
+ except Exception as exc:
+ log.error("collector_error", collector=collector.name, error=str(exc))
+ await asyncio.sleep(collector.poll_interval)
+
+
+async def run_fear_greed_loop(
+ collector: FearGreedCollector,
+ db: Database,
+ log,
+) -> None:
+ """Run the Fear & Greed collector and update market sentiment."""
+ from datetime import datetime, timezone
+
+ while True:
+ try:
+ result = await collector.collect()
+ if result is not None:
+ ms = MarketSentiment(
+ fear_greed=result.fear_greed,
+ fear_greed_label=result.fear_greed_label,
+ fed_stance="neutral", # Updated by Fed collector analysis
+ market_regime=_determine_regime(result.fear_greed, None),
+ updated_at=datetime.now(timezone.utc),
+ )
+ await db.upsert_market_sentiment(ms)
+ log.info("fear_greed_updated", score=result.fear_greed, label=result.fear_greed_label)
+ except Exception as exc:
+ log.error("fear_greed_error", error=str(exc))
+ await asyncio.sleep(collector.poll_interval)
+
+
+async def run_aggregator_loop(
+ db: Database,
+ interval: int,
+ log,
+) -> None:
+ """Run sentiment aggregation every `interval` seconds.
+ Reads recent news from DB, computes per-symbol scores, upserts into symbol_scores table."""
+ from datetime import datetime, timezone
+ from shared.sentiment import SentimentAggregator
+
+ aggregator = SentimentAggregator()
+
+ while True:
+ try:
+ now = datetime.now(timezone.utc)
+ news_items = await db.get_recent_news(hours=24)
+ if news_items:
+ scores = aggregator.aggregate(news_items, now)
+ for symbol_score in scores.values():
+ await db.upsert_symbol_score(symbol_score)
+ log.info("aggregation_complete", symbols=len(scores))
+ except Exception as exc:
+ log.error("aggregation_error", error=str(exc))
+ await asyncio.sleep(interval)
+
+
+def _determine_regime(fear_greed: int, vix: float | None) -> str:
+ """Determine market regime from Fear & Greed and VIX."""
+ if fear_greed <= 20:
+ return "risk_off"
+ if vix is not None and vix > 30:
+ return "risk_off"
+ if fear_greed >= 60 and (vix is None or vix < 20):
+ return "risk_on"
+ return "neutral"
+
+
+async def run() -> None:
+ config = NewsCollectorConfig()
+ log = setup_logging("news-collector", config.log_level, config.log_format)
+ metrics = ServiceMetrics("news_collector")
+
+ notifier = TelegramNotifier(
+ bot_token=config.telegram_bot_token,
+ chat_id=config.telegram_chat_id,
+ )
+
+ db = Database(config.database_url)
+ await db.connect()
+
+ broker = RedisBroker(config.redis_url)
+
+ health = HealthCheckServer(
+ "news-collector",
+ port=config.health_port,
+ auth_token=config.metrics_auth_token,
+ )
+ health.register_check("redis", broker.ping)
+ await health.start()
+ metrics.service_up.labels(service="news-collector").set(1)
+
+ # Initialize collectors
+ news_collectors: list[BaseCollector] = [
+ RSSCollector(),
+ SecEdgarCollector(),
+ TruthSocialCollector(),
+ RedditCollector(),
+ FedCollector(),
+ ]
+
+ # Finnhub requires API key
+ if config.finnhub_api_key:
+ news_collectors.append(FinnhubCollector(api_key=config.finnhub_api_key))
+
+ fear_greed = FearGreedCollector()
+
+ log.info(
+ "starting",
+ collectors=[c.name for c in news_collectors],
+ fear_greed=True,
+ )
+
+ tasks = []
+ try:
+ for collector in news_collectors:
+ task = asyncio.create_task(run_collector_loop(collector, db, broker, log))
+ tasks.append(task)
+
+ tasks.append(asyncio.create_task(run_fear_greed_loop(fear_greed, db, log)))
+ tasks.append(asyncio.create_task(
+ run_aggregator_loop(db, config.sentiment_aggregate_interval, log)
+ ))
+
+ await asyncio.gather(*tasks)
+ except Exception as exc:
+ log.error("fatal_error", error=str(exc))
+ await notifier.send_error(str(exc), "news-collector")
+ raise
+ finally:
+ for task in tasks:
+ task.cancel()
+ metrics.service_up.labels(service="news-collector").set(0)
+ await notifier.close()
+ await broker.close()
+ await db.close()
+
+
+def main() -> None:
+ asyncio.run(run())
+
+
+if __name__ == "__main__":
+ main()
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_main.py -v`
+Expected: All 2 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/main.py services/news-collector/tests/test_main.py
+git commit -m "feat: implement news-collector main scheduler with all collectors"
+```
+
+---
+
+## Phase 3: Sentiment Analysis Pipeline
+
+### Task 14: Implement sentiment aggregator
+
+**Files:**
+- Modify: `shared/src/shared/sentiment.py`
+- Create: `shared/tests/test_sentiment_aggregator.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `shared/tests/test_sentiment_aggregator.py`:
+
+```python
+"""Tests for sentiment aggregator."""
+
+import pytest
+from datetime import datetime, timezone, timedelta
+
+from shared.sentiment import SentimentAggregator
+
+
+@pytest.fixture
+def aggregator():
+ return SentimentAggregator()
+
+
+def test_freshness_decay_recent():
+ a = SentimentAggregator()
+ now = datetime.now(timezone.utc)
+ assert a._freshness_decay(now, now) == 1.0
+
+
+def test_freshness_decay_3_hours():
+ a = SentimentAggregator()
+ now = datetime.now(timezone.utc)
+ three_hours_ago = now - timedelta(hours=3)
+ assert a._freshness_decay(three_hours_ago, now) == 0.7
+
+
+def test_freshness_decay_12_hours():
+ a = SentimentAggregator()
+ now = datetime.now(timezone.utc)
+ twelve_hours_ago = now - timedelta(hours=12)
+ assert a._freshness_decay(twelve_hours_ago, now) == 0.3
+
+
+def test_freshness_decay_old():
+ a = SentimentAggregator()
+ now = datetime.now(timezone.utc)
+ two_days_ago = now - timedelta(days=2)
+ assert a._freshness_decay(two_days_ago, now) == 0.0
+
+
+def test_compute_composite():
+ a = SentimentAggregator()
+ composite = a._compute_composite(
+ news_score=0.5,
+ social_score=0.3,
+ policy_score=0.8,
+ filing_score=0.2,
+ )
+ expected = 0.5 * 0.3 + 0.3 * 0.2 + 0.8 * 0.3 + 0.2 * 0.2
+ assert abs(composite - expected) < 0.001
+
+
+def test_aggregate_news_by_symbol(aggregator):
+ now = datetime.now(timezone.utc)
+ news_items = [
+ {
+ "symbols": ["AAPL"],
+ "sentiment": 0.8,
+ "category": "earnings",
+ "published_at": now,
+ },
+ {
+ "symbols": ["AAPL"],
+ "sentiment": 0.3,
+ "category": "macro",
+ "published_at": now - timedelta(hours=2),
+ },
+ {
+ "symbols": ["MSFT"],
+ "sentiment": -0.5,
+ "category": "policy",
+ "published_at": now,
+ },
+ ]
+ scores = aggregator.aggregate(news_items, now)
+
+ assert "AAPL" in scores
+ assert "MSFT" in scores
+ assert scores["AAPL"].news_count == 2
+ assert scores["AAPL"].news_score > 0 # Positive overall
+ assert scores["MSFT"].policy_score < 0 # Negative policy
+
+
+def test_aggregate_empty(aggregator):
+ now = datetime.now(timezone.utc)
+ scores = aggregator.aggregate([], now)
+ assert scores == {}
+
+
+def test_determine_regime():
+ a = SentimentAggregator()
+ assert a.determine_regime(15, None) == "risk_off"
+ assert a.determine_regime(15, 35.0) == "risk_off"
+ assert a.determine_regime(50, 35.0) == "risk_off"
+ assert a.determine_regime(70, 15.0) == "risk_on"
+ assert a.determine_regime(50, 20.0) == "neutral"
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest shared/tests/test_sentiment_aggregator.py -v`
+Expected: FAIL — `SentimentAggregator` not in sentiment.py
+
+- [ ] **Step 3: Add SentimentAggregator to sentiment.py**
+
+Keep the existing `SentimentData` class (for backward compat with existing tests). Add `SentimentAggregator` class at the end of `shared/src/shared/sentiment.py`:
+
+```python
+from datetime import timedelta
+from shared.sentiment_models import SymbolScore
+
+
+class SentimentAggregator:
+ """Aggregates per-news sentiment into per-symbol scores."""
+
+ # Weights: policy events are most impactful for US stocks
+ WEIGHTS = {
+ "news": 0.3,
+ "social": 0.2,
+ "policy": 0.3,
+ "filing": 0.2,
+ }
+
+ # Category → score field mapping
+ CATEGORY_MAP = {
+ "earnings": "news",
+ "macro": "news",
+ "social": "social",
+ "policy": "policy",
+ "filing": "filing",
+ "fed": "policy",
+ }
+
+ def _freshness_decay(self, published_at: datetime, now: datetime) -> float:
+ """Compute freshness decay factor."""
+ age = now - published_at
+ hours = age.total_seconds() / 3600
+ if hours < 1:
+ return 1.0
+ if hours < 6:
+ return 0.7
+ if hours < 24:
+ return 0.3
+ return 0.0
+
+ def _compute_composite(
+ self,
+ news_score: float,
+ social_score: float,
+ policy_score: float,
+ filing_score: float,
+ ) -> float:
+ return (
+ news_score * self.WEIGHTS["news"]
+ + social_score * self.WEIGHTS["social"]
+ + policy_score * self.WEIGHTS["policy"]
+ + filing_score * self.WEIGHTS["filing"]
+ )
+
+ def aggregate(
+ self, news_items: list[dict], now: datetime
+ ) -> dict[str, SymbolScore]:
+ """Aggregate news items into per-symbol scores.
+
+ Each news_items dict must have: symbols, sentiment, category, published_at.
+ Returns dict mapping symbol → SymbolScore.
+ """
+ # Accumulate per-symbol, per-category
+ symbol_data: dict[str, dict] = {}
+
+ for item in news_items:
+ decay = self._freshness_decay(item["published_at"], now)
+ if decay == 0.0:
+ continue
+
+ category = item.get("category", "macro")
+ score_field = self.CATEGORY_MAP.get(category, "news")
+ weighted_sentiment = item["sentiment"] * decay
+
+ for symbol in item.get("symbols", []):
+ if symbol not in symbol_data:
+ symbol_data[symbol] = {
+ "news_scores": [],
+ "social_scores": [],
+ "policy_scores": [],
+ "filing_scores": [],
+ "count": 0,
+ }
+
+ symbol_data[symbol][f"{score_field}_scores"].append(weighted_sentiment)
+ symbol_data[symbol]["count"] += 1
+
+ # Compute averages and composites
+ result = {}
+ for symbol, data in symbol_data.items():
+ news_score = _safe_avg(data["news_scores"])
+ social_score = _safe_avg(data["social_scores"])
+ policy_score = _safe_avg(data["policy_scores"])
+ filing_score = _safe_avg(data["filing_scores"])
+
+ result[symbol] = SymbolScore(
+ symbol=symbol,
+ news_score=news_score,
+ news_count=data["count"],
+ social_score=social_score,
+ policy_score=policy_score,
+ filing_score=filing_score,
+ composite=self._compute_composite(
+ news_score, social_score, policy_score, filing_score
+ ),
+ updated_at=now,
+ )
+
+ return result
+
+ def determine_regime(self, fear_greed: int, vix: float | None) -> str:
+ """Determine market regime."""
+ if fear_greed <= 20:
+ return "risk_off"
+ if vix is not None and vix > 30:
+ return "risk_off"
+ if fear_greed >= 60 and (vix is None or vix < 20):
+ return "risk_on"
+ return "neutral"
+
+
+def _safe_avg(values: list[float]) -> float:
+ """Return average of values, or 0.0 if empty."""
+ if not values:
+ return 0.0
+ return sum(values) / len(values)
+```
+
+- [ ] **Step 4: Run new tests to verify they pass**
+
+Run: `pytest shared/tests/test_sentiment_aggregator.py -v`
+Expected: All 9 tests PASS
+
+- [ ] **Step 5: Run existing sentiment tests for regressions**
+
+Run: `pytest shared/tests/test_sentiment.py -v`
+Expected: All existing tests PASS (SentimentData unchanged)
+
+- [ ] **Step 6: Commit**
+
+```bash
+git add shared/src/shared/sentiment.py shared/tests/test_sentiment_aggregator.py
+git commit -m "feat: implement SentimentAggregator with freshness decay and composite scoring"
+```
+
+---
+
+## Phase 4: Stock Selector Engine
+
+### Task 15: Implement stock selector
+
+**Files:**
+- Create: `services/strategy-engine/src/strategy_engine/stock_selector.py`
+- Create: `services/strategy-engine/tests/test_stock_selector.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/strategy-engine/tests/test_stock_selector.py`:
+
+```python
+"""Tests for stock selector engine."""
+
+import pytest
+from unittest.mock import AsyncMock, MagicMock, patch
+from datetime import datetime, timezone
+from decimal import Decimal
+
+from shared.models import OrderSide
+from shared.sentiment_models import SymbolScore, MarketSentiment, SelectedStock, Candidate
+
+from strategy_engine.stock_selector import (
+ SentimentCandidateSource,
+ StockSelector,
+ _parse_llm_selections,
+)
+
+
+async def test_sentiment_candidate_source():
+ mock_db = MagicMock()
+ mock_db.get_top_symbol_scores = AsyncMock(return_value=[
+ {"symbol": "AAPL", "composite": 0.8, "news_count": 5},
+ {"symbol": "NVDA", "composite": 0.6, "news_count": 3},
+ ])
+
+ source = SentimentCandidateSource(mock_db)
+ candidates = await source.get_candidates()
+
+ assert len(candidates) == 2
+ assert candidates[0].symbol == "AAPL"
+ assert candidates[0].source == "sentiment"
+
+
+def test_parse_llm_selections_valid():
+ llm_response = """
+ [
+ {"symbol": "NVDA", "side": "BUY", "conviction": 0.85, "reason": "AI demand", "key_news": ["NVDA beats earnings"]},
+ {"symbol": "XOM", "side": "BUY", "conviction": 0.72, "reason": "Oil surge", "key_news": ["Oil prices up"]}
+ ]
+ """
+ selections = _parse_llm_selections(llm_response)
+ assert len(selections) == 2
+ assert selections[0].symbol == "NVDA"
+ assert selections[0].conviction == 0.85
+
+
+def test_parse_llm_selections_invalid():
+ selections = _parse_llm_selections("not json")
+ assert selections == []
+
+
+def test_parse_llm_selections_with_markdown():
+ llm_response = """
+ Here are my picks:
+ ```json
+ [
+ {"symbol": "TSLA", "side": "BUY", "conviction": 0.7, "reason": "Momentum", "key_news": ["Tesla rally"]}
+ ]
+ ```
+ """
+ selections = _parse_llm_selections(llm_response)
+ assert len(selections) == 1
+ assert selections[0].symbol == "TSLA"
+
+
+async def test_selector_blocks_on_risk_off():
+ mock_db = MagicMock()
+ mock_db.get_latest_market_sentiment = AsyncMock(return_value={
+ "fear_greed": 15,
+ "fear_greed_label": "Extreme Fear",
+ "vix": 35.0,
+ "fed_stance": "neutral",
+ "market_regime": "risk_off",
+ "updated_at": datetime.now(timezone.utc),
+ })
+
+ selector = StockSelector(db=mock_db, broker=MagicMock(), alpaca=MagicMock(), anthropic_api_key="test")
+ result = await selector.select()
+ assert result == []
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/strategy-engine/tests/test_stock_selector.py -v`
+Expected: FAIL — module not found
+
+- [ ] **Step 3: Implement StockSelector**
+
+Create `services/strategy-engine/src/strategy_engine/stock_selector.py`:
+
+```python
+"""Stock Selector Engine — 3-stage dynamic stock selection for MOC trading."""
+
+import json
+import logging
+import re
+from datetime import datetime, timezone
+from decimal import Decimal
+from typing import Optional
+
+import aiohttp
+
+from shared.alpaca import AlpacaClient
+from shared.broker import RedisBroker
+from shared.db import Database
+from shared.models import OrderSide
+from shared.sentiment_models import Candidate, MarketSentiment, SelectedStock, SymbolScore
+
+logger = logging.getLogger(__name__)
+
+
+class SentimentCandidateSource:
+ """Get candidate stocks from sentiment scores in DB."""
+
+ def __init__(self, db: Database, limit: int = 20) -> None:
+ self._db = db
+ self._limit = limit
+
+ async def get_candidates(self) -> list[Candidate]:
+ scores = await self._db.get_top_symbol_scores(limit=self._limit)
+ return [
+ Candidate(
+ symbol=s["symbol"],
+ source="sentiment",
+ score=s["composite"],
+ reason=f"Sentiment composite={s['composite']:.2f}, news_count={s['news_count']}",
+ )
+ for s in scores
+ if s["composite"] != 0
+ ]
+
+
+class LLMCandidateSource:
+ """Get candidate stocks by asking Claude to analyze today's top news."""
+
+ def __init__(self, db: Database, api_key: str, model: str) -> None:
+ self._db = db
+ self._api_key = api_key
+ self._model = model
+
+ async def get_candidates(self) -> list[Candidate]:
+ news = await self._db.get_recent_news(hours=24)
+ if not news:
+ return []
+
+ headlines = [f"- [{n['source']}] {n['headline']} (sentiment: {n['sentiment']:.2f})" for n in news[:50]]
+ prompt = (
+ "You are a stock market analyst. Based on today's news headlines below, "
+ "identify US stocks that are most likely to be affected (positively or negatively). "
+ "Return a JSON array of objects with: symbol, direction (BUY or SELL), score (0-1), reason.\n\n"
+ "Headlines:\n" + "\n".join(headlines) + "\n\n"
+ "Return ONLY the JSON array, no other text."
+ )
+
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.post(
+ "https://api.anthropic.com/v1/messages",
+ headers={
+ "x-api-key": self._api_key,
+ "anthropic-version": "2023-06-01",
+ "content-type": "application/json",
+ },
+ json={
+ "model": self._model,
+ "max_tokens": 1024,
+ "messages": [{"role": "user", "content": prompt}],
+ },
+ timeout=aiohttp.ClientTimeout(total=30),
+ ) as resp:
+ if resp.status != 200:
+ logger.warning("llm_candidate_failed", status=resp.status)
+ return []
+ data = await resp.json()
+ text = data["content"][0]["text"]
+ except Exception as exc:
+ logger.error("llm_candidate_error", error=str(exc))
+ return []
+
+ return self._parse_response(text)
+
+ def _parse_response(self, text: str) -> list[Candidate]:
+ try:
+ # Extract JSON from possible markdown code blocks
+ json_match = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL)
+ if json_match:
+ text = json_match.group(1)
+ items = json.loads(text)
+ except (json.JSONDecodeError, TypeError):
+ return []
+
+ candidates = []
+ for item in items:
+ try:
+ direction = OrderSide(item.get("direction", "BUY"))
+ candidates.append(
+ Candidate(
+ symbol=item["symbol"],
+ source="llm",
+ direction=direction,
+ score=float(item.get("score", 0.5)),
+ reason=item.get("reason", "LLM recommendation"),
+ )
+ )
+ except (KeyError, ValueError):
+ continue
+
+ return candidates
+
+
+class StockSelector:
+ """3-stage stock selector: candidates → technical filter → LLM final pick."""
+
+ def __init__(
+ self,
+ db: Database,
+ broker: RedisBroker,
+ alpaca: AlpacaClient,
+ anthropic_api_key: str,
+ anthropic_model: str = "claude-sonnet-4-20250514",
+ max_picks: int = 3,
+ ) -> None:
+ self._db = db
+ self._broker = broker
+ self._alpaca = alpaca
+ self._api_key = anthropic_api_key
+ self._model = anthropic_model
+ self._max_picks = max_picks
+ self._sentiment_source = SentimentCandidateSource(db)
+ self._llm_source = LLMCandidateSource(db, anthropic_api_key, anthropic_model)
+
+ async def select(self) -> list[SelectedStock]:
+ """Run full 3-stage selection. Returns list of SelectedStock."""
+ # Check market sentiment gate
+ ms = await self._db.get_latest_market_sentiment()
+ if ms and ms.get("market_regime") == "risk_off":
+ logger.info("selection_blocked_risk_off")
+ return []
+
+ # Stage 1: Candidate pool
+ sentiment_candidates = await self._sentiment_source.get_candidates()
+ llm_candidates = await self._llm_source.get_candidates()
+ candidates = self._merge_candidates(sentiment_candidates, llm_candidates)
+
+ if not candidates:
+ logger.info("no_candidates_found")
+ return []
+
+ logger.info("candidates_found", count=len(candidates))
+
+ # Stage 2: Technical filter
+ filtered = await self._technical_filter(candidates)
+ if not filtered:
+ logger.info("all_candidates_filtered_out")
+ return []
+
+ logger.info("technical_filter_passed", count=len(filtered))
+
+ # Stage 3: LLM final selection
+ selections = await self._llm_final_select(filtered, ms)
+
+ # Publish to Redis
+ for selection in selections:
+ await self._broker.publish(
+ "selected_stocks",
+ selection.model_dump(mode="json"),
+ )
+
+ # Persist audit trail
+ from datetime import date as date_type
+
+ for selection in selections:
+ score_data = await self._db.get_top_symbol_scores(limit=100)
+ snapshot = next(
+ (s for s in score_data if s["symbol"] == selection.symbol),
+ {},
+ )
+ await self._db.insert_stock_selection(
+ trade_date=date_type.today(),
+ symbol=selection.symbol,
+ side=selection.side.value,
+ conviction=selection.conviction,
+ reason=selection.reason,
+ key_news=selection.key_news,
+ sentiment_snapshot=snapshot,
+ )
+
+ return selections
+
+ def _merge_candidates(
+ self,
+ sentiment: list[Candidate],
+ llm: list[Candidate],
+ ) -> list[Candidate]:
+ """Merge and deduplicate candidates, preferring higher scores."""
+ by_symbol: dict[str, Candidate] = {}
+ for c in sentiment + llm:
+ if c.symbol not in by_symbol or c.score > by_symbol[c.symbol].score:
+ by_symbol[c.symbol] = c
+ return sorted(by_symbol.values(), key=lambda c: c.score, reverse=True)
+
+ async def _technical_filter(self, candidates: list[Candidate]) -> list[Candidate]:
+ """Apply MOC-style technical screening to candidates."""
+ import pandas as pd
+
+ passed = []
+ for candidate in candidates:
+ try:
+ bars = await self._alpaca.get_bars(
+ candidate.symbol, timeframe="1Day", limit=30
+ )
+ if not bars or len(bars) < 21:
+ continue
+
+ closes = pd.Series([float(b["c"]) for b in bars])
+ volumes = pd.Series([float(b["v"]) for b in bars])
+
+ # RSI
+ delta = closes.diff()
+ gain = delta.clip(lower=0)
+ loss = -delta.clip(upper=0)
+ avg_gain = gain.ewm(com=13, min_periods=14).mean()
+ avg_loss = loss.ewm(com=13, min_periods=14).mean()
+ rs = avg_gain / avg_loss.replace(0, float("nan"))
+ rsi = 100 - (100 / (1 + rs))
+ current_rsi = rsi.iloc[-1]
+
+ if pd.isna(current_rsi) or not (30 <= current_rsi <= 70):
+ continue
+
+ # EMA
+ ema20 = closes.ewm(span=20, adjust=False).mean().iloc[-1]
+ if closes.iloc[-1] < ema20:
+ continue
+
+ # Volume above average
+ vol_avg = volumes.iloc[-20:].mean()
+ if vol_avg > 0 and volumes.iloc[-1] < vol_avg * 0.5:
+ continue
+
+ passed.append(candidate)
+
+ except Exception as exc:
+ logger.warning("technical_filter_error", symbol=candidate.symbol, error=str(exc))
+ continue
+
+ return passed
+
+ async def _llm_final_select(
+ self,
+ candidates: list[Candidate],
+ market_sentiment: Optional[dict],
+ ) -> list[SelectedStock]:
+ """Ask Claude to make final 2-3 picks from filtered candidates."""
+ # Build context
+ candidate_info = []
+ for c in candidates[:15]:
+ candidate_info.append(f"- {c.symbol}: score={c.score:.2f}, source={c.source}, reason={c.reason}")
+
+ news = await self._db.get_recent_news(hours=12)
+ top_news = [f"- [{n['source']}] {n['headline']}" for n in news[:20]]
+
+ ms_info = "No market sentiment data available."
+ if market_sentiment:
+ ms_info = (
+ f"Fear & Greed: {market_sentiment.get('fear_greed', 'N/A')} "
+ f"({market_sentiment.get('fear_greed_label', 'N/A')}), "
+ f"VIX: {market_sentiment.get('vix', 'N/A')}, "
+ f"Fed Stance: {market_sentiment.get('fed_stance', 'N/A')}"
+ )
+
+ prompt = (
+ f"You are a professional stock trader selecting {self._max_picks} stocks for "
+ f"Market-on-Close (MOC) overnight trading. You buy at market close and sell at "
+ f"next day's open.\n\n"
+ f"## Market Conditions\n{ms_info}\n\n"
+ f"## Candidate Stocks (pre-screened technically)\n"
+ + "\n".join(candidate_info) + "\n\n"
+ f"## Today's Key News\n"
+ + "\n".join(top_news) + "\n\n"
+ f"Select the best {self._max_picks} stocks. For each, provide:\n"
+ f"- symbol: ticker\n"
+ f"- side: BUY or SELL\n"
+ f"- conviction: 0.0-1.0\n"
+ f"- reason: one sentence\n"
+ f"- key_news: list of relevant headlines\n\n"
+ f"Return ONLY a JSON array. No other text."
+ )
+
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.post(
+ "https://api.anthropic.com/v1/messages",
+ headers={
+ "x-api-key": self._api_key,
+ "anthropic-version": "2023-06-01",
+ "content-type": "application/json",
+ },
+ json={
+ "model": self._model,
+ "max_tokens": 1024,
+ "messages": [{"role": "user", "content": prompt}],
+ },
+ timeout=aiohttp.ClientTimeout(total=30),
+ ) as resp:
+ if resp.status != 200:
+ logger.error("llm_final_select_failed", status=resp.status)
+ return []
+ data = await resp.json()
+ text = data["content"][0]["text"]
+ except Exception as exc:
+ logger.error("llm_final_select_error", error=str(exc))
+ return []
+
+ return _parse_llm_selections(text)
+
+
+def _parse_llm_selections(text: str) -> list[SelectedStock]:
+ """Parse LLM response into SelectedStock list."""
+ try:
+ json_match = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL)
+ if json_match:
+ text = json_match.group(1)
+ # Also try to find a bare JSON array
+ array_match = re.search(r"\[.*\]", text, re.DOTALL)
+ if array_match:
+ text = array_match.group(0)
+ items = json.loads(text)
+ except (json.JSONDecodeError, TypeError):
+ return []
+
+ selections = []
+ for item in items:
+ try:
+ selections.append(
+ SelectedStock(
+ symbol=item["symbol"],
+ side=OrderSide(item.get("side", "BUY")),
+ conviction=float(item.get("conviction", 0.5)),
+ reason=item.get("reason", ""),
+ key_news=item.get("key_news", []),
+ )
+ )
+ except (KeyError, ValueError):
+ continue
+
+ return selections
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/strategy-engine/tests/test_stock_selector.py -v`
+Expected: All 5 tests PASS
+
+- [ ] **Step 5: Run all strategy engine tests for regressions**
+
+Run: `pytest services/strategy-engine/tests/ -v`
+Expected: All tests PASS
+
+- [ ] **Step 6: Commit**
+
+```bash
+git add services/strategy-engine/src/strategy_engine/stock_selector.py services/strategy-engine/tests/test_stock_selector.py
+git commit -m "feat: implement 3-stage stock selector (sentiment → technical → LLM)"
+```
+
+---
+
+## Phase 5: Integration (MOC + Notifications + Docker)
+
+### Task 16: Add Telegram notification for stock selections
+
+**Files:**
+- Modify: `shared/src/shared/notifier.py`
+- Modify: `shared/tests/test_notifier.py`
+
+- [ ] **Step 1: Add send_stock_selection method to notifier.py**
+
+Add this method and import to `shared/src/shared/notifier.py`:
+
+Add to imports:
+```python
+from shared.sentiment_models import SelectedStock, MarketSentiment
+```
+
+Add method to `TelegramNotifier` class:
+
+```python
+ async def send_stock_selection(
+ self,
+ selections: list[SelectedStock],
+ market: MarketSentiment | None = None,
+ ) -> None:
+ """Format and send stock selection notification."""
+ lines = [f"📊 Stock Selection ({len(selections)} picks)", ""]
+
+ side_emoji = {"BUY": "🟢", "SELL": "🔴"}
+
+ for i, s in enumerate(selections, 1):
+ emoji = side_emoji.get(s.side.value, "⚪")
+ lines.append(
+ f"{i}. {s.symbol} {emoji} {s.side.value} "
+ f"(conviction: {s.conviction:.0%})"
+ )
+ lines.append(f" {s.reason}")
+ if s.key_news:
+ lines.append(f" News: {s.key_news[0]}")
+ lines.append("")
+
+ if market:
+ lines.append(
+ f"Market: F&G {market.fear_greed} ({market.fear_greed_label})"
+ + (f" | VIX {market.vix:.1f}" if market.vix else "")
+ )
+
+ await self.send("\n".join(lines))
+```
+
+- [ ] **Step 2: Add test for the new method**
+
+Add to `shared/tests/test_notifier.py`:
+
+```python
+from shared.models import OrderSide
+from shared.sentiment_models import SelectedStock, MarketSentiment
+from datetime import datetime, timezone
+
+
+async def test_send_stock_selection(notifier, mock_session):
+ """Test stock selection notification formatting."""
+ selections = [
+ SelectedStock(
+ symbol="NVDA",
+ side=OrderSide.BUY,
+ conviction=0.85,
+ reason="CHIPS Act expansion",
+ key_news=["Trump signs CHIPS Act"],
+ ),
+ ]
+ market = MarketSentiment(
+ fear_greed=55,
+ fear_greed_label="Neutral",
+ vix=18.2,
+ fed_stance="neutral",
+ market_regime="neutral",
+ updated_at=datetime.now(timezone.utc),
+ )
+ await notifier.send_stock_selection(selections, market)
+ mock_session.post.assert_called_once()
+```
+
+Note: Check `shared/tests/test_notifier.py` for existing fixture names (`notifier`, `mock_session`) and adapt accordingly.
+
+- [ ] **Step 3: Run notifier tests**
+
+Run: `pytest shared/tests/test_notifier.py -v`
+Expected: All tests PASS
+
+- [ ] **Step 4: Commit**
+
+```bash
+git add shared/src/shared/notifier.py shared/tests/test_notifier.py
+git commit -m "feat: add Telegram notification for stock selections"
+```
+
+---
+
+### Task 17: Integrate stock selector with MOC strategy
+
+**Files:**
+- Modify: `services/strategy-engine/src/strategy_engine/main.py`
+- Modify: `services/strategy-engine/src/strategy_engine/config.py`
+
+- [ ] **Step 1: Update strategy engine config**
+
+Add to `StrategyConfig` in `services/strategy-engine/src/strategy_engine/config.py`:
+
+```python
+ selector_candidates_time: str = "15:00"
+ selector_filter_time: str = "15:15"
+ selector_final_time: str = "15:30"
+ selector_max_picks: int = 3
+ anthropic_api_key: str = ""
+ anthropic_model: str = "claude-sonnet-4-20250514"
+```
+
+- [ ] **Step 2: Add stock selector scheduling to main.py**
+
+Add a new coroutine to `services/strategy-engine/src/strategy_engine/main.py` that runs the stock selector at the configured times. Add imports:
+
+```python
+from shared.alpaca import AlpacaClient
+from shared.db import Database
+from shared.notifier import TelegramNotifier
+from shared.sentiment_models import MarketSentiment
+from strategy_engine.stock_selector import StockSelector
+```
+
+Add the selector loop function:
+
+```python
+async def run_stock_selector(
+ selector: StockSelector,
+ notifier: TelegramNotifier,
+ db: Database,
+ config: StrategyConfig,
+ log,
+) -> None:
+ """Run the stock selector once per day at the configured time."""
+ import zoneinfo
+
+ et = zoneinfo.ZoneInfo("America/New_York")
+
+ while True:
+ now_et = datetime.now(et)
+ target_hour, target_min = map(int, config.selector_final_time.split(":"))
+
+ # Check if it's time to run (within 1-minute window)
+ if now_et.hour == target_hour and now_et.minute == target_min:
+ log.info("stock_selector_running")
+ try:
+ selections = await selector.select()
+ if selections:
+ ms_data = await db.get_latest_market_sentiment()
+ ms = None
+ if ms_data:
+ ms = MarketSentiment(**ms_data)
+ await notifier.send_stock_selection(selections, ms)
+ log.info(
+ "stock_selector_complete",
+ picks=[s.symbol for s in selections],
+ )
+ else:
+ log.info("stock_selector_no_picks")
+ except Exception as exc:
+ log.error("stock_selector_error", error=str(exc))
+ # Sleep past this minute to avoid re-triggering
+ await asyncio.sleep(120)
+ else:
+ await asyncio.sleep(30)
+```
+
+In the `run()` function, add after creating the broker:
+
+```python
+ db = Database(config.database_url)
+ await db.connect()
+
+ alpaca = AlpacaClient(
+ api_key=config.alpaca_api_key,
+ api_secret=config.alpaca_api_secret,
+ paper=config.alpaca_paper,
+ )
+```
+
+And add the selector if anthropic key is configured:
+
+```python
+ if config.anthropic_api_key:
+ selector = StockSelector(
+ db=db,
+ broker=broker,
+ alpaca=alpaca,
+ anthropic_api_key=config.anthropic_api_key,
+ anthropic_model=config.anthropic_model,
+ max_picks=config.selector_max_picks,
+ )
+ tasks.append(asyncio.create_task(
+ run_stock_selector(selector, notifier, db, config, log)
+ ))
+ log.info("stock_selector_enabled", time=config.selector_final_time)
+```
+
+Add to the `finally` block:
+
+```python
+ await alpaca.close()
+ await db.close()
+```
+
+- [ ] **Step 3: Run strategy engine tests for regressions**
+
+Run: `pytest services/strategy-engine/tests/ -v`
+Expected: All tests PASS
+
+- [ ] **Step 4: Commit**
+
+```bash
+git add services/strategy-engine/src/strategy_engine/main.py services/strategy-engine/src/strategy_engine/config.py
+git commit -m "feat: integrate stock selector into strategy engine scheduler"
+```
+
+---
+
+### Task 18: Update Docker Compose and .env
+
+**Files:**
+- Modify: `docker-compose.yml`
+- Modify: `.env.example` (already done in Task 5, just verify)
+
+- [ ] **Step 1: Add news-collector service to docker-compose.yml**
+
+Add before the `loki:` service block in `docker-compose.yml`:
+
+```yaml
+ news-collector:
+ build:
+ context: .
+ dockerfile: services/news-collector/Dockerfile
+ env_file: .env
+ ports:
+ - "8084:8084"
+ depends_on:
+ redis:
+ condition: service_healthy
+ postgres:
+ condition: service_healthy
+ healthcheck:
+ test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8084/health')"]
+ interval: 10s
+ timeout: 5s
+ retries: 3
+ restart: unless-stopped
+```
+
+- [ ] **Step 2: Verify compose file is valid**
+
+Run: `docker compose config --quiet 2>&1 || echo "INVALID"`
+Expected: No output (valid) or compose config displayed without errors
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add docker-compose.yml
+git commit -m "feat: add news-collector service to Docker Compose"
+```
+
+---
+
+### Task 19: Run full test suite and lint
+
+- [ ] **Step 1: Install test dependencies**
+
+Run: `pip install -e shared/ && pip install aiosqlite feedparser nltk aioresponses`
+
+- [ ] **Step 2: Download VADER lexicon**
+
+Run: `python -c "import nltk; nltk.download('vader_lexicon', quiet=True)"`
+
+- [ ] **Step 3: Run lint**
+
+Run: `make lint`
+Expected: No lint errors. If there are errors, fix them.
+
+- [ ] **Step 4: Run full test suite**
+
+Run: `make test`
+Expected: All tests PASS
+
+- [ ] **Step 5: Fix any issues found in steps 3-4**
+
+If lint or tests fail, fix the issues and re-run.
+
+- [ ] **Step 6: Final commit if any fixes were needed**
+
+```bash
+git add -A
+git commit -m "fix: resolve lint and test issues from news selector integration"
+```
--
cgit v1.2.3