# News-Driven Stock Selector Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Replace the MOC strategy's fixed symbol list with a dynamic, news-driven stock selection system that continuously collects news/sentiment data and selects 2-3 optimal stocks daily before market close. **Architecture:** A new `news-collector` service runs 7 data source collectors on individual poll intervals, storing `NewsItem` records in PostgreSQL and publishing to Redis. A sentiment aggregator computes per-symbol composite scores every 15 minutes. Before market close, a 3-stage stock selector (sentiment candidates → technical filter → LLM final pick) chooses 2-3 stocks and feeds them to the existing MOC strategy. **Tech Stack:** Python 3.12+, asyncio, aiohttp, Pydantic, SQLAlchemy 2.0 async, Redis Streams, VADER (nltk), feedparser (RSS), Anthropic SDK (Claude API), Alembic --- ## Phase 1: Shared Foundation (Models, DB, Events) ### Task 1: Add NewsItem and sentiment models to shared **Files:** - Modify: `shared/src/shared/models.py` - Create: `shared/src/shared/sentiment_models.py` - Create: `shared/tests/test_sentiment_models.py` - [ ] **Step 1: Write tests for new models** Create `shared/tests/test_sentiment_models.py`: ```python """Tests for news and sentiment models.""" import pytest from datetime import datetime, timezone from shared.models import NewsCategory, NewsItem, OrderSide from shared.sentiment_models import SymbolScore, MarketSentiment, SelectedStock, Candidate def test_news_item_defaults(): item = NewsItem( source="finnhub", headline="Test headline", published_at=datetime(2026, 4, 2, tzinfo=timezone.utc), sentiment=0.5, category=NewsCategory.MACRO, ) assert item.id # UUID generated assert item.symbols == [] assert item.summary is None assert item.raw_data == {} assert item.created_at is not None def test_news_item_with_symbols(): item = NewsItem( source="rss", headline="AAPL earnings beat", published_at=datetime(2026, 4, 2, tzinfo=timezone.utc), sentiment=0.8, category=NewsCategory.EARNINGS, symbols=["AAPL"], ) assert item.symbols == ["AAPL"] assert item.category == NewsCategory.EARNINGS def test_news_category_values(): assert NewsCategory.POLICY == "policy" assert NewsCategory.EARNINGS == "earnings" assert NewsCategory.MACRO == "macro" assert NewsCategory.SOCIAL == "social" assert NewsCategory.FILING == "filing" assert NewsCategory.FED == "fed" def test_symbol_score(): score = SymbolScore( symbol="AAPL", news_score=0.5, news_count=10, social_score=0.3, policy_score=0.0, filing_score=0.2, composite=0.3, updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc), ) assert score.symbol == "AAPL" assert score.composite == 0.3 def test_market_sentiment(): ms = MarketSentiment( fear_greed=25, fear_greed_label="Extreme Fear", vix=32.5, fed_stance="hawkish", market_regime="risk_off", updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc), ) assert ms.market_regime == "risk_off" assert ms.vix == 32.5 def test_market_sentiment_no_vix(): ms = MarketSentiment( fear_greed=50, fear_greed_label="Neutral", fed_stance="neutral", market_regime="neutral", updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc), ) assert ms.vix is None def test_selected_stock(): ss = SelectedStock( symbol="NVDA", side=OrderSide.BUY, conviction=0.85, reason="CHIPS Act expansion", key_news=["Trump signs CHIPS Act expansion"], ) assert ss.conviction == 0.85 assert len(ss.key_news) == 1 def test_candidate(): c = Candidate( symbol="TSLA", source="sentiment", direction=OrderSide.BUY, score=0.75, reason="High social buzz", ) assert c.direction == OrderSide.BUY c2 = Candidate( symbol="XOM", source="llm", score=0.6, reason="Oil price surge", ) assert c2.direction is None ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest shared/tests/test_sentiment_models.py -v` Expected: FAIL — `NewsCategory`, `NewsItem` not found in `shared.models`, `shared.sentiment_models` does not exist - [ ] **Step 3: Add NewsCategory and NewsItem to shared/models.py** Add to the end of `shared/src/shared/models.py`: ```python class NewsCategory(str, Enum): POLICY = "policy" EARNINGS = "earnings" MACRO = "macro" SOCIAL = "social" FILING = "filing" FED = "fed" class NewsItem(BaseModel): id: str = Field(default_factory=lambda: str(uuid.uuid4())) source: str headline: str summary: Optional[str] = None url: Optional[str] = None published_at: datetime symbols: list[str] = [] sentiment: float category: NewsCategory raw_data: dict = {} created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) ``` - [ ] **Step 4: Create shared/src/shared/sentiment_models.py** ```python """Sentiment scoring and stock selection models.""" from datetime import datetime from typing import Optional from pydantic import BaseModel from shared.models import OrderSide class SymbolScore(BaseModel): symbol: str news_score: float news_count: int social_score: float policy_score: float filing_score: float composite: float updated_at: datetime class MarketSentiment(BaseModel): fear_greed: int fear_greed_label: str vix: Optional[float] = None fed_stance: str market_regime: str updated_at: datetime class SelectedStock(BaseModel): symbol: str side: OrderSide conviction: float reason: str key_news: list[str] class Candidate(BaseModel): symbol: str source: str direction: Optional[OrderSide] = None score: float reason: str ``` - [ ] **Step 5: Run tests to verify they pass** Run: `pytest shared/tests/test_sentiment_models.py -v` Expected: All 9 tests PASS - [ ] **Step 6: Commit** ```bash git add shared/src/shared/models.py shared/src/shared/sentiment_models.py shared/tests/test_sentiment_models.py git commit -m "feat: add NewsItem, sentiment scoring, and stock selection models" ``` --- ### Task 2: Add SQLAlchemy ORM models for news tables **Files:** - Modify: `shared/src/shared/sa_models.py` - Create: `shared/tests/test_sa_news_models.py` - [ ] **Step 1: Write tests for new SA models** Create `shared/tests/test_sa_news_models.py`: ```python """Tests for news-related SQLAlchemy models.""" from shared.sa_models import NewsItemRow, SymbolScoreRow, MarketSentimentRow, StockSelectionRow def test_news_item_row_tablename(): assert NewsItemRow.__tablename__ == "news_items" def test_symbol_score_row_tablename(): assert SymbolScoreRow.__tablename__ == "symbol_scores" def test_market_sentiment_row_tablename(): assert MarketSentimentRow.__tablename__ == "market_sentiment" def test_stock_selection_row_tablename(): assert StockSelectionRow.__tablename__ == "stock_selections" def test_news_item_row_columns(): cols = {c.name for c in NewsItemRow.__table__.columns} assert cols >= {"id", "source", "headline", "published_at", "sentiment", "category"} def test_symbol_score_row_columns(): cols = {c.name for c in SymbolScoreRow.__table__.columns} assert cols >= {"id", "symbol", "news_score", "composite", "updated_at"} ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest shared/tests/test_sa_news_models.py -v` Expected: FAIL — import errors - [ ] **Step 3: Add ORM models to sa_models.py** Add to the end of `shared/src/shared/sa_models.py`: ```python class NewsItemRow(Base): __tablename__ = "news_items" id: Mapped[str] = mapped_column(Text, primary_key=True) source: Mapped[str] = mapped_column(Text, nullable=False) headline: Mapped[str] = mapped_column(Text, nullable=False) summary: Mapped[str | None] = mapped_column(Text) url: Mapped[str | None] = mapped_column(Text) published_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) symbols: Mapped[str | None] = mapped_column(Text) # JSON-encoded list sentiment: Mapped[float] = mapped_column(sa.Float, nullable=False) category: Mapped[str] = mapped_column(Text, nullable=False) raw_data: Mapped[str | None] = mapped_column(Text) # JSON string created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=sa.func.now() ) class SymbolScoreRow(Base): __tablename__ = "symbol_scores" id: Mapped[str] = mapped_column(Text, primary_key=True) symbol: Mapped[str] = mapped_column(Text, nullable=False, unique=True) news_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0") news_count: Mapped[int] = mapped_column(sa.Integer, nullable=False, server_default="0") social_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0") policy_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0") filing_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0") composite: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0") updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) class MarketSentimentRow(Base): __tablename__ = "market_sentiment" id: Mapped[str] = mapped_column(Text, primary_key=True) fear_greed: Mapped[int] = mapped_column(sa.Integer, nullable=False) fear_greed_label: Mapped[str] = mapped_column(Text, nullable=False) vix: Mapped[float | None] = mapped_column(sa.Float) fed_stance: Mapped[str] = mapped_column(Text, nullable=False, server_default="neutral") market_regime: Mapped[str] = mapped_column(Text, nullable=False, server_default="neutral") updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) class StockSelectionRow(Base): __tablename__ = "stock_selections" id: Mapped[str] = mapped_column(Text, primary_key=True) trade_date: Mapped[datetime] = mapped_column(sa.Date, nullable=False) symbol: Mapped[str] = mapped_column(Text, nullable=False) side: Mapped[str] = mapped_column(Text, nullable=False) conviction: Mapped[float] = mapped_column(sa.Float, nullable=False) reason: Mapped[str] = mapped_column(Text, nullable=False) key_news: Mapped[str | None] = mapped_column(Text) # JSON string sentiment_snapshot: Mapped[str | None] = mapped_column(Text) # JSON string created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, server_default=sa.func.now() ) ``` Also add `import sqlalchemy as sa` to the imports at the top of `sa_models.py`. - [ ] **Step 4: Run tests to verify they pass** Run: `pytest shared/tests/test_sa_news_models.py -v` Expected: All 6 tests PASS - [ ] **Step 5: Commit** ```bash git add shared/src/shared/sa_models.py shared/tests/test_sa_news_models.py git commit -m "feat: add SQLAlchemy ORM models for news, scores, selections" ``` --- ### Task 3: Create Alembic migration for news tables **Files:** - Create: `shared/alembic/versions/002_news_sentiment_tables.py` - [ ] **Step 1: Create migration file** Create `shared/alembic/versions/002_news_sentiment_tables.py`: ```python """Add news, sentiment, and stock selection tables Revision ID: 002 Revises: 001 Create Date: 2026-04-02 """ from typing import Sequence, Union from alembic import op import sqlalchemy as sa revision: str = "002" down_revision: Union[str, None] = "001" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None def upgrade() -> None: op.create_table( "news_items", sa.Column("id", sa.Text, primary_key=True), sa.Column("source", sa.Text, nullable=False), sa.Column("headline", sa.Text, nullable=False), sa.Column("summary", sa.Text), sa.Column("url", sa.Text), sa.Column("published_at", sa.DateTime(timezone=True), nullable=False), sa.Column("symbols", sa.Text), sa.Column("sentiment", sa.Float, nullable=False), sa.Column("category", sa.Text, nullable=False), sa.Column("raw_data", sa.Text), sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), ) op.create_index("idx_news_items_published", "news_items", ["published_at"]) op.create_index("idx_news_items_source", "news_items", ["source"]) op.create_table( "symbol_scores", sa.Column("id", sa.Text, primary_key=True), sa.Column("symbol", sa.Text, nullable=False, unique=True), sa.Column("news_score", sa.Float, nullable=False, server_default="0"), sa.Column("news_count", sa.Integer, nullable=False, server_default="0"), sa.Column("social_score", sa.Float, nullable=False, server_default="0"), sa.Column("policy_score", sa.Float, nullable=False, server_default="0"), sa.Column("filing_score", sa.Float, nullable=False, server_default="0"), sa.Column("composite", sa.Float, nullable=False, server_default="0"), sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), ) op.create_table( "market_sentiment", sa.Column("id", sa.Text, primary_key=True), sa.Column("fear_greed", sa.Integer, nullable=False), sa.Column("fear_greed_label", sa.Text, nullable=False), sa.Column("vix", sa.Float), sa.Column("fed_stance", sa.Text, nullable=False, server_default="neutral"), sa.Column("market_regime", sa.Text, nullable=False, server_default="neutral"), sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), ) op.create_table( "stock_selections", sa.Column("id", sa.Text, primary_key=True), sa.Column("trade_date", sa.Date, nullable=False), sa.Column("symbol", sa.Text, nullable=False), sa.Column("side", sa.Text, nullable=False), sa.Column("conviction", sa.Float, nullable=False), sa.Column("reason", sa.Text, nullable=False), sa.Column("key_news", sa.Text), sa.Column("sentiment_snapshot", sa.Text), sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), ) op.create_index("idx_stock_selections_date", "stock_selections", ["trade_date"]) def downgrade() -> None: op.drop_table("stock_selections") op.drop_table("market_sentiment") op.drop_table("symbol_scores") op.drop_table("news_items") ``` - [ ] **Step 2: Verify migration imports correctly** Run: `cd shared && python -c "from alembic.versions import *; print('OK')" && cd ..` Or simply: `python -c "import importlib.util; s=importlib.util.spec_from_file_location('m','shared/alembic/versions/002_news_sentiment_tables.py'); m=importlib.util.module_from_spec(s); s.loader.exec_module(m); print('OK')"` Expected: OK (no import errors) - [ ] **Step 3: Commit** ```bash git add shared/alembic/versions/002_news_sentiment_tables.py git commit -m "feat: add Alembic migration for news and sentiment tables" ``` --- ### Task 4: Add NewsEvent to shared events and DB methods for news **Files:** - Modify: `shared/src/shared/events.py` - Modify: `shared/src/shared/db.py` - Create: `shared/tests/test_news_events.py` - Create: `shared/tests/test_db_news.py` - [ ] **Step 1: Write tests for NewsEvent** Create `shared/tests/test_news_events.py`: ```python """Tests for NewsEvent.""" from datetime import datetime, timezone from shared.models import NewsCategory, NewsItem from shared.events import NewsEvent, EventType, Event def test_news_event_to_dict(): item = NewsItem( source="finnhub", headline="Test", published_at=datetime(2026, 4, 2, tzinfo=timezone.utc), sentiment=0.5, category=NewsCategory.MACRO, ) event = NewsEvent(data=item) d = event.to_dict() assert d["type"] == EventType.NEWS assert d["data"]["source"] == "finnhub" def test_news_event_from_raw(): raw = { "type": "NEWS", "data": { "id": "abc", "source": "rss", "headline": "Test headline", "published_at": "2026-04-02T00:00:00+00:00", "sentiment": 0.3, "category": "earnings", "symbols": ["AAPL"], "raw_data": {}, }, } event = NewsEvent.from_raw(raw) assert event.data.source == "rss" assert event.data.symbols == ["AAPL"] def test_event_dispatcher_news(): raw = { "type": "NEWS", "data": { "id": "abc", "source": "finnhub", "headline": "Test", "published_at": "2026-04-02T00:00:00+00:00", "sentiment": 0.0, "category": "macro", "raw_data": {}, }, } event = Event.from_dict(raw) assert isinstance(event, NewsEvent) ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest shared/tests/test_news_events.py -v` Expected: FAIL — `NewsEvent` not in events.py, `EventType.NEWS` missing - [ ] **Step 3: Add NewsEvent to events.py** Add `NEWS = "NEWS"` to `EventType` enum. Add `NewsEvent` class and register it in `_EVENT_TYPE_MAP`: ```python from shared.models import Candle, Signal, Order, NewsItem class NewsEvent(BaseModel): type: EventType = EventType.NEWS data: NewsItem def to_dict(self) -> dict: return { "type": self.type, "data": self.data.model_dump(mode="json"), } @classmethod def from_raw(cls, raw: dict) -> "NewsEvent": return cls(type=raw["type"], data=NewsItem(**raw["data"])) ``` Add to `_EVENT_TYPE_MAP`: ```python EventType.NEWS: NewsEvent, ``` - [ ] **Step 4: Run event tests to verify they pass** Run: `pytest shared/tests/test_news_events.py -v` Expected: All 3 tests PASS - [ ] **Step 5: Run all existing event tests to check no regressions** Run: `pytest shared/tests/test_events.py -v` Expected: All existing tests PASS - [ ] **Step 6: Write tests for DB news methods** Create `shared/tests/test_db_news.py`: ```python """Tests for database news/sentiment methods. These tests use an in-memory SQLite database. """ import json import uuid import pytest from datetime import datetime, date, timezone from shared.db import Database from shared.models import NewsItem, NewsCategory from shared.sentiment_models import SymbolScore, MarketSentiment @pytest.fixture async def db(): database = Database("sqlite+aiosqlite://") await database.connect() yield database await database.close() async def test_insert_and_get_news_items(db): item = NewsItem( source="finnhub", headline="AAPL earnings beat", published_at=datetime(2026, 4, 2, 12, 0, tzinfo=timezone.utc), sentiment=0.8, category=NewsCategory.EARNINGS, symbols=["AAPL"], ) await db.insert_news_item(item) items = await db.get_recent_news(hours=24) assert len(items) == 1 assert items[0]["headline"] == "AAPL earnings beat" async def test_upsert_symbol_score(db): score = SymbolScore( symbol="AAPL", news_score=0.5, news_count=10, social_score=0.3, policy_score=0.0, filing_score=0.2, composite=0.3, updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc), ) await db.upsert_symbol_score(score) scores = await db.get_top_symbol_scores(limit=5) assert len(scores) == 1 assert scores[0]["symbol"] == "AAPL" async def test_upsert_market_sentiment(db): ms = MarketSentiment( fear_greed=55, fear_greed_label="Neutral", vix=18.2, fed_stance="neutral", market_regime="neutral", updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc), ) await db.upsert_market_sentiment(ms) result = await db.get_latest_market_sentiment() assert result is not None assert result["fear_greed"] == 55 async def test_insert_stock_selection(db): await db.insert_stock_selection( trade_date=date(2026, 4, 2), symbol="NVDA", side="BUY", conviction=0.85, reason="CHIPS Act", key_news=["Trump signs CHIPS expansion"], sentiment_snapshot={"composite": 0.8}, ) selections = await db.get_stock_selections(date(2026, 4, 2)) assert len(selections) == 1 assert selections[0]["symbol"] == "NVDA" ``` - [ ] **Step 7: Run DB news tests to verify they fail** Run: `pytest shared/tests/test_db_news.py -v` Expected: FAIL — methods not yet on Database class - [ ] **Step 8: Add news/sentiment DB methods to db.py** Add to `shared/src/shared/db.py` — new import at top: ```python import json import uuid from datetime import date from shared.models import NewsItem from shared.sentiment_models import SymbolScore, MarketSentiment from shared.sa_models import NewsItemRow, SymbolScoreRow, MarketSentimentRow, StockSelectionRow ``` Add these methods to the `Database` class: ```python async def insert_news_item(self, item: NewsItem) -> None: """Insert a news item.""" row = NewsItemRow( id=item.id, source=item.source, headline=item.headline, summary=item.summary, url=item.url, published_at=item.published_at, symbols=json.dumps(item.symbols), sentiment=item.sentiment, category=item.category.value, raw_data=json.dumps(item.raw_data), created_at=item.created_at, ) async with self._session_factory() as session: try: session.add(row) await session.commit() except Exception: await session.rollback() raise async def get_recent_news(self, hours: int = 24) -> list[dict]: """Retrieve news items from the last N hours.""" since = datetime.now(timezone.utc) - timedelta(hours=hours) stmt = ( select(NewsItemRow) .where(NewsItemRow.published_at >= since) .order_by(NewsItemRow.published_at.desc()) ) async with self._session_factory() as session: result = await session.execute(stmt) rows = result.scalars().all() return [ { "id": r.id, "source": r.source, "headline": r.headline, "summary": r.summary, "url": r.url, "published_at": r.published_at, "symbols": json.loads(r.symbols) if r.symbols else [], "sentiment": r.sentiment, "category": r.category, "created_at": r.created_at, } for r in rows ] async def upsert_symbol_score(self, score: SymbolScore) -> None: """Insert or update a symbol score.""" async with self._session_factory() as session: try: existing = await session.execute( select(SymbolScoreRow).where(SymbolScoreRow.symbol == score.symbol) ) row = existing.scalar_one_or_none() if row: row.news_score = score.news_score row.news_count = score.news_count row.social_score = score.social_score row.policy_score = score.policy_score row.filing_score = score.filing_score row.composite = score.composite row.updated_at = score.updated_at else: row = SymbolScoreRow( id=str(uuid.uuid4()), symbol=score.symbol, news_score=score.news_score, news_count=score.news_count, social_score=score.social_score, policy_score=score.policy_score, filing_score=score.filing_score, composite=score.composite, updated_at=score.updated_at, ) session.add(row) await session.commit() except Exception: await session.rollback() raise async def get_top_symbol_scores(self, limit: int = 20) -> list[dict]: """Get top symbol scores ordered by composite descending.""" stmt = ( select(SymbolScoreRow) .order_by(SymbolScoreRow.composite.desc()) .limit(limit) ) async with self._session_factory() as session: result = await session.execute(stmt) rows = result.scalars().all() return [ { "symbol": r.symbol, "news_score": r.news_score, "news_count": r.news_count, "social_score": r.social_score, "policy_score": r.policy_score, "filing_score": r.filing_score, "composite": r.composite, "updated_at": r.updated_at, } for r in rows ] async def upsert_market_sentiment(self, ms: MarketSentiment) -> None: """Insert or update the latest market sentiment (single row, id='latest').""" async with self._session_factory() as session: try: existing = await session.execute( select(MarketSentimentRow).where(MarketSentimentRow.id == "latest") ) row = existing.scalar_one_or_none() if row: row.fear_greed = ms.fear_greed row.fear_greed_label = ms.fear_greed_label row.vix = ms.vix row.fed_stance = ms.fed_stance row.market_regime = ms.market_regime row.updated_at = ms.updated_at else: row = MarketSentimentRow( id="latest", fear_greed=ms.fear_greed, fear_greed_label=ms.fear_greed_label, vix=ms.vix, fed_stance=ms.fed_stance, market_regime=ms.market_regime, updated_at=ms.updated_at, ) session.add(row) await session.commit() except Exception: await session.rollback() raise async def get_latest_market_sentiment(self) -> dict | None: """Get the latest market sentiment.""" stmt = select(MarketSentimentRow).where(MarketSentimentRow.id == "latest") async with self._session_factory() as session: result = await session.execute(stmt) r = result.scalar_one_or_none() if r is None: return None return { "fear_greed": r.fear_greed, "fear_greed_label": r.fear_greed_label, "vix": r.vix, "fed_stance": r.fed_stance, "market_regime": r.market_regime, "updated_at": r.updated_at, } async def insert_stock_selection( self, trade_date: date, symbol: str, side: str, conviction: float, reason: str, key_news: list[str], sentiment_snapshot: dict, ) -> None: """Insert a stock selection record.""" row = StockSelectionRow( id=str(uuid.uuid4()), trade_date=trade_date, symbol=symbol, side=side, conviction=conviction, reason=reason, key_news=json.dumps(key_news), sentiment_snapshot=json.dumps(sentiment_snapshot), ) async with self._session_factory() as session: try: session.add(row) await session.commit() except Exception: await session.rollback() raise async def get_stock_selections(self, trade_date: date) -> list[dict]: """Get stock selections for a specific date.""" stmt = ( select(StockSelectionRow) .where(StockSelectionRow.trade_date == trade_date) .order_by(StockSelectionRow.conviction.desc()) ) async with self._session_factory() as session: result = await session.execute(stmt) rows = result.scalars().all() return [ { "symbol": r.symbol, "side": r.side, "conviction": r.conviction, "reason": r.reason, "key_news": json.loads(r.key_news) if r.key_news else [], "sentiment_snapshot": json.loads(r.sentiment_snapshot) if r.sentiment_snapshot else {}, } for r in rows ] ``` - [ ] **Step 9: Run DB news tests to verify they pass** Run: `pytest shared/tests/test_db_news.py -v` Expected: All 4 tests PASS Note: These tests require `aiosqlite` package. If not installed: `pip install aiosqlite` - [ ] **Step 10: Run all shared tests to check no regressions** Run: `pytest shared/tests/ -v` Expected: All tests PASS - [ ] **Step 11: Commit** ```bash git add shared/src/shared/events.py shared/src/shared/db.py shared/tests/test_news_events.py shared/tests/test_db_news.py git commit -m "feat: add NewsEvent, DB methods for news/sentiment/selections" ``` --- ### Task 5: Update Settings with new env vars **Files:** - Modify: `shared/src/shared/config.py` - Modify: `.env.example` - [ ] **Step 1: Add new settings to config.py** Add these fields to the `Settings` class in `shared/src/shared/config.py`: ```python # News collector finnhub_api_key: str = "" news_poll_interval: int = 300 sentiment_aggregate_interval: int = 900 # Stock selector selector_candidates_time: str = "15:00" selector_filter_time: str = "15:15" selector_final_time: str = "15:30" selector_max_picks: int = 3 # LLM anthropic_api_key: str = "" anthropic_model: str = "claude-sonnet-4-20250514" ``` - [ ] **Step 2: Add to .env.example** Append to `.env.example`: ```bash # News Collector FINNHUB_API_KEY= NEWS_POLL_INTERVAL=300 SENTIMENT_AGGREGATE_INTERVAL=900 # Stock Selector SELECTOR_CANDIDATES_TIME=15:00 SELECTOR_FILTER_TIME=15:15 SELECTOR_FINAL_TIME=15:30 SELECTOR_MAX_PICKS=3 # LLM (for stock selector) ANTHROPIC_API_KEY= ANTHROPIC_MODEL=claude-sonnet-4-20250514 ``` - [ ] **Step 3: Commit** ```bash git add shared/src/shared/config.py .env.example git commit -m "feat: add news collector and stock selector config settings" ``` --- ## Phase 2: News Collector Service ### Task 6: Scaffold news-collector service **Files:** - Create: `services/news-collector/pyproject.toml` - Create: `services/news-collector/Dockerfile` - Create: `services/news-collector/src/news_collector/__init__.py` - Create: `services/news-collector/src/news_collector/config.py` - Create: `services/news-collector/src/news_collector/collectors/__init__.py` - Create: `services/news-collector/src/news_collector/collectors/base.py` - Create: `services/news-collector/tests/__init__.py` - [ ] **Step 1: Create pyproject.toml** Create `services/news-collector/pyproject.toml`: ```toml [project] name = "news-collector" version = "0.1.0" description = "News and sentiment data collector service" requires-python = ">=3.12" dependencies = [ "trading-shared", "feedparser>=6.0", "nltk>=3.8", "aiohttp>=3.9", ] [project.optional-dependencies] dev = [ "pytest>=8.0", "pytest-asyncio>=0.23", "aioresponses>=0.7", ] [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["src/news_collector"] ``` - [ ] **Step 2: Create Dockerfile** Create `services/news-collector/Dockerfile`: ```dockerfile FROM python:3.12-slim WORKDIR /app COPY shared/ shared/ RUN pip install --no-cache-dir ./shared COPY services/news-collector/ services/news-collector/ RUN pip install --no-cache-dir ./services/news-collector RUN python -c "import nltk; nltk.download('vader_lexicon', quiet=True)" ENV PYTHONPATH=/app CMD ["python", "-m", "news_collector.main"] ``` - [ ] **Step 3: Create config.py** Create `services/news-collector/src/news_collector/config.py`: ```python """News Collector configuration.""" from shared.config import Settings class NewsCollectorConfig(Settings): health_port: int = 8084 finnhub_api_key: str = "" news_poll_interval: int = 300 sentiment_aggregate_interval: int = 900 ``` - [ ] **Step 4: Create BaseCollector** Create `services/news-collector/src/news_collector/collectors/base.py`: ```python """Base class for all news collectors.""" from abc import ABC, abstractmethod from shared.models import NewsItem class BaseCollector(ABC): name: str = "base" poll_interval: int = 300 # seconds @abstractmethod async def collect(self) -> list[NewsItem]: """Collect news items from the source.""" @abstractmethod async def is_available(self) -> bool: """Check if this data source is accessible.""" ``` - [ ] **Step 5: Create __init__.py files** Create `services/news-collector/src/news_collector/__init__.py`: ```python """News collector service.""" ``` Create `services/news-collector/src/news_collector/collectors/__init__.py`: ```python """News collectors.""" ``` Create `services/news-collector/tests/__init__.py`: ```python ``` - [ ] **Step 6: Commit** ```bash git add services/news-collector/ git commit -m "feat: scaffold news-collector service with BaseCollector" ``` --- ### Task 7: Implement Finnhub news collector **Files:** - Create: `services/news-collector/src/news_collector/collectors/finnhub.py` - Create: `services/news-collector/tests/test_finnhub.py` - [ ] **Step 1: Write tests** Create `services/news-collector/tests/test_finnhub.py`: ```python """Tests for Finnhub news collector.""" import pytest from unittest.mock import AsyncMock, patch from datetime import datetime, timezone from news_collector.collectors.finnhub import FinnhubCollector @pytest.fixture def collector(): return FinnhubCollector(api_key="test_key") def test_collector_name(collector): assert collector.name == "finnhub" assert collector.poll_interval == 300 async def test_is_available_with_key(collector): assert await collector.is_available() is True async def test_is_available_without_key(): c = FinnhubCollector(api_key="") assert await c.is_available() is False async def test_collect_parses_response(collector): mock_response = [ { "category": "top news", "datetime": 1711929600, "headline": "AAPL beats earnings", "id": 12345, "related": "AAPL", "source": "MarketWatch", "summary": "Apple reported better than expected...", "url": "https://example.com/article", }, { "category": "top news", "datetime": 1711929000, "headline": "Fed holds rates steady", "id": 12346, "related": "", "source": "Reuters", "summary": "The Federal Reserve...", "url": "https://example.com/fed", }, ] with patch.object(collector, "_fetch_news", new_callable=AsyncMock, return_value=mock_response): items = await collector.collect() assert len(items) == 2 assert items[0].source == "finnhub" assert items[0].headline == "AAPL beats earnings" assert items[0].symbols == ["AAPL"] assert items[0].url == "https://example.com/article" assert isinstance(items[0].sentiment, float) # Second item has no related ticker assert items[1].symbols == [] async def test_collect_handles_empty_response(collector): with patch.object(collector, "_fetch_news", new_callable=AsyncMock, return_value=[]): items = await collector.collect() assert items == [] ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest services/news-collector/tests/test_finnhub.py -v` Expected: FAIL — module not found - [ ] **Step 3: Implement FinnhubCollector** Create `services/news-collector/src/news_collector/collectors/finnhub.py`: ```python """Finnhub market news collector (free tier: 60 req/min).""" import logging from datetime import datetime, timezone import aiohttp from nltk.sentiment.vader import SentimentIntensityAnalyzer from shared.models import NewsCategory, NewsItem from news_collector.collectors.base import BaseCollector logger = logging.getLogger(__name__) FINNHUB_NEWS_URL = "https://finnhub.io/api/v1/news" class FinnhubCollector(BaseCollector): name = "finnhub" poll_interval = 300 # 5 minutes def __init__(self, api_key: str) -> None: self._api_key = api_key self._vader = SentimentIntensityAnalyzer() async def is_available(self) -> bool: return bool(self._api_key) async def _fetch_news(self) -> list[dict]: """Fetch general news from Finnhub API.""" params = {"category": "general", "token": self._api_key} async with aiohttp.ClientSession() as session: async with session.get(FINNHUB_NEWS_URL, params=params) as resp: if resp.status != 200: logger.warning("finnhub_fetch_failed", status=resp.status) return [] return await resp.json() def _analyze_sentiment(self, text: str) -> float: """Return VADER compound score (-1.0 to 1.0).""" scores = self._vader.polarity_scores(text) return scores["compound"] def _extract_symbols(self, related: str) -> list[str]: """Parse Finnhub 'related' field into symbol list.""" if not related or not related.strip(): return [] return [s.strip() for s in related.split(",") if s.strip()] def _categorize(self, article: dict) -> NewsCategory: """Determine category from article content.""" headline = article.get("headline", "").lower() if any(w in headline for w in ["fed", "fomc", "rate", "inflation"]): return NewsCategory.FED if any(w in headline for w in ["tariff", "sanction", "regulation", "trump", "biden", "congress"]): return NewsCategory.POLICY if any(w in headline for w in ["earnings", "revenue", "profit", "eps"]): return NewsCategory.EARNINGS return NewsCategory.MACRO async def collect(self) -> list[NewsItem]: raw = await self._fetch_news() items = [] for article in raw: headline = article.get("headline", "") summary = article.get("summary", "") sentiment_text = f"{headline}. {summary}" if summary else headline items.append( NewsItem( source=self.name, headline=headline, summary=summary or None, url=article.get("url"), published_at=datetime.fromtimestamp( article.get("datetime", 0), tz=timezone.utc ), symbols=self._extract_symbols(article.get("related", "")), sentiment=self._analyze_sentiment(sentiment_text), category=self._categorize(article), raw_data=article, ) ) return items ``` - [ ] **Step 4: Run tests to verify they pass** Run: `pytest services/news-collector/tests/test_finnhub.py -v` Expected: All 5 tests PASS Note: Requires `nltk` and VADER lexicon. If not downloaded: `python -c "import nltk; nltk.download('vader_lexicon')"` - [ ] **Step 5: Commit** ```bash git add services/news-collector/src/news_collector/collectors/finnhub.py services/news-collector/tests/test_finnhub.py git commit -m "feat: implement Finnhub news collector with VADER sentiment" ``` --- ### Task 8: Implement RSS news collector **Files:** - Create: `services/news-collector/src/news_collector/collectors/rss.py` - Create: `services/news-collector/tests/test_rss.py` - [ ] **Step 1: Write tests** Create `services/news-collector/tests/test_rss.py`: ```python """Tests for RSS news collector.""" import pytest from unittest.mock import AsyncMock, patch from datetime import datetime, timezone from news_collector.collectors.rss import RSSCollector @pytest.fixture def collector(): return RSSCollector() def test_collector_name(collector): assert collector.name == "rss" assert collector.poll_interval == 600 async def test_is_available(collector): assert await collector.is_available() is True async def test_collect_parses_feed(collector): mock_feed = { "entries": [ { "title": "NVDA surges on AI demand", "link": "https://example.com/nvda", "published_parsed": (2026, 4, 2, 12, 0, 0, 0, 0, 0), "summary": "Nvidia stock jumped 5%...", }, { "title": "Markets rally on jobs data", "link": "https://example.com/market", "published_parsed": (2026, 4, 2, 11, 0, 0, 0, 0, 0), "summary": "The S&P 500 rose...", }, ], } with patch.object(collector, "_fetch_feeds", new_callable=AsyncMock, return_value=[mock_feed]): items = await collector.collect() assert len(items) == 2 assert items[0].source == "rss" assert items[0].headline == "NVDA surges on AI demand" assert isinstance(items[0].sentiment, float) ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest services/news-collector/tests/test_rss.py -v` Expected: FAIL - [ ] **Step 3: Implement RSSCollector** Create `services/news-collector/src/news_collector/collectors/rss.py`: ```python """RSS feed collector for Yahoo Finance, Google News, MarketWatch.""" import asyncio import logging import re from calendar import timegm from datetime import datetime, timezone import aiohttp import feedparser from nltk.sentiment.vader import SentimentIntensityAnalyzer from shared.models import NewsCategory, NewsItem from news_collector.collectors.base import BaseCollector logger = logging.getLogger(__name__) DEFAULT_FEEDS = [ "https://feeds.finance.yahoo.com/rss/2.0/headline?s=^GSPC®ion=US&lang=en-US", "https://news.google.com/rss/topics/CAAqJggKIiBDQkFTRWdvSUwyMHZNRGx6TVdZU0FtVnVHZ0pWVXlnQVAB?hl=en-US&gl=US&ceid=US:en", "https://www.marketwatch.com/rss/topstories", ] # Common US stock tickers to detect in headlines TICKER_PATTERN = re.compile( r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|JPM|V|JNJ|WMT|PG|UNH|HD|" r"MA|DIS|BAC|XOM|PFE|KO|PEP|CSCO|INTC|VZ|NFLX|ADBE|CRM|AMD|QCOM|" r"GS|BA|CAT|MMM|IBM|GE|F|GM|NKE|MCD|SBUX|SPY|QQQ|IWM)\b" ) class RSSCollector(BaseCollector): name = "rss" poll_interval = 600 # 10 minutes def __init__(self, feeds: list[str] | None = None) -> None: self._feeds = feeds or DEFAULT_FEEDS self._vader = SentimentIntensityAnalyzer() async def is_available(self) -> bool: return True async def _fetch_feeds(self) -> list[dict]: """Fetch and parse all RSS feeds.""" results = [] async with aiohttp.ClientSession() as session: for url in self._feeds: try: async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp: if resp.status == 200: text = await resp.text() feed = feedparser.parse(text) results.append(feed) except Exception as exc: logger.warning("rss_fetch_failed", url=url, error=str(exc)) return results def _extract_symbols(self, text: str) -> list[str]: """Extract stock tickers from text.""" return list(set(TICKER_PATTERN.findall(text))) def _parse_time(self, entry: dict) -> datetime: """Parse published time from feed entry.""" parsed = entry.get("published_parsed") if parsed: return datetime.fromtimestamp(timegm(parsed), tz=timezone.utc) return datetime.now(timezone.utc) async def collect(self) -> list[NewsItem]: feeds = await self._fetch_feeds() items = [] seen_titles = set() for feed in feeds: for entry in feed.get("entries", []): title = entry.get("title", "").strip() if not title or title in seen_titles: continue seen_titles.add(title) summary = entry.get("summary", "") sentiment_text = f"{title}. {summary}" if summary else title items.append( NewsItem( source=self.name, headline=title, summary=summary or None, url=entry.get("link"), published_at=self._parse_time(entry), symbols=self._extract_symbols(f"{title} {summary}"), sentiment=self._vader.polarity_scores(sentiment_text)["compound"], category=NewsCategory.MACRO, raw_data={"feed_title": title}, ) ) return items ``` - [ ] **Step 4: Run tests to verify they pass** Run: `pytest services/news-collector/tests/test_rss.py -v` Expected: All 3 tests PASS - [ ] **Step 5: Commit** ```bash git add services/news-collector/src/news_collector/collectors/rss.py services/news-collector/tests/test_rss.py git commit -m "feat: implement RSS news collector (Yahoo, Google News, MarketWatch)" ``` --- ### Task 9: Implement Fear & Greed Index collector **Files:** - Create: `services/news-collector/src/news_collector/collectors/fear_greed.py` - Create: `services/news-collector/tests/test_fear_greed.py` - [ ] **Step 1: Write tests** Create `services/news-collector/tests/test_fear_greed.py`: ```python """Tests for CNN Fear & Greed Index collector.""" import pytest from unittest.mock import AsyncMock, patch from news_collector.collectors.fear_greed import FearGreedCollector @pytest.fixture def collector(): return FearGreedCollector() def test_collector_name(collector): assert collector.name == "fear_greed" assert collector.poll_interval == 3600 async def test_is_available(collector): assert await collector.is_available() is True async def test_collect_parses_api_response(collector): mock_data = { "fear_and_greed": { "score": 45.0, "rating": "Fear", "timestamp": "2026-04-02T12:00:00+00:00", } } with patch.object(collector, "_fetch_index", new_callable=AsyncMock, return_value=mock_data): result = await collector.collect() assert result.fear_greed == 45 assert result.fear_greed_label == "Fear" async def test_collect_returns_none_on_failure(collector): with patch.object(collector, "_fetch_index", new_callable=AsyncMock, return_value=None): result = await collector.collect() assert result is None def test_classify_label(): c = FearGreedCollector() assert c._classify(10) == "Extreme Fear" assert c._classify(30) == "Fear" assert c._classify(50) == "Neutral" assert c._classify(70) == "Greed" assert c._classify(85) == "Extreme Greed" ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest services/news-collector/tests/test_fear_greed.py -v` Expected: FAIL - [ ] **Step 3: Implement FearGreedCollector** Create `services/news-collector/src/news_collector/collectors/fear_greed.py`: ```python """CNN Fear & Greed Index collector.""" import logging from dataclasses import dataclass from typing import Optional import aiohttp from news_collector.collectors.base import BaseCollector from shared.models import NewsItem logger = logging.getLogger(__name__) FEAR_GREED_URL = "https://production.dataviz.cnn.io/index/fearandgreed/graphdata" @dataclass class FearGreedResult: fear_greed: int fear_greed_label: str class FearGreedCollector(BaseCollector): """Fetches CNN Fear & Greed Index. Note: This collector does NOT return NewsItem — it returns FearGreedResult which feeds directly into MarketSentiment. The main.py scheduler handles this differently from news collectors. """ name = "fear_greed" poll_interval = 3600 # 1 hour async def is_available(self) -> bool: return True async def _fetch_index(self) -> Optional[dict]: """Fetch Fear & Greed data from CNN API.""" headers = {"User-Agent": "Mozilla/5.0"} try: async with aiohttp.ClientSession() as session: async with session.get( FEAR_GREED_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=10) ) as resp: if resp.status != 200: logger.warning("fear_greed_fetch_failed", status=resp.status) return None return await resp.json() except Exception as exc: logger.warning("fear_greed_error", error=str(exc)) return None def _classify(self, score: int) -> str: """Classify numeric score into label.""" if score <= 20: return "Extreme Fear" if score <= 40: return "Fear" if score <= 60: return "Neutral" if score <= 80: return "Greed" return "Extreme Greed" async def collect(self) -> Optional[FearGreedResult]: """Collect Fear & Greed Index. Returns FearGreedResult or None.""" data = await self._fetch_index() if data is None: return None try: fg = data["fear_and_greed"] score = int(fg["score"]) label = fg.get("rating", self._classify(score)) return FearGreedResult(fear_greed=score, fear_greed_label=label) except (KeyError, ValueError, TypeError) as exc: logger.warning("fear_greed_parse_failed", error=str(exc)) return None ``` - [ ] **Step 4: Run tests to verify they pass** Run: `pytest services/news-collector/tests/test_fear_greed.py -v` Expected: All 5 tests PASS - [ ] **Step 5: Commit** ```bash git add services/news-collector/src/news_collector/collectors/fear_greed.py services/news-collector/tests/test_fear_greed.py git commit -m "feat: implement CNN Fear & Greed Index collector" ``` --- ### Task 10: Implement SEC EDGAR collector **Files:** - Create: `services/news-collector/src/news_collector/collectors/sec_edgar.py` - Create: `services/news-collector/tests/test_sec_edgar.py` - [ ] **Step 1: Write tests** Create `services/news-collector/tests/test_sec_edgar.py`: ```python """Tests for SEC EDGAR filing collector.""" import pytest from unittest.mock import AsyncMock, patch from news_collector.collectors.sec_edgar import SecEdgarCollector @pytest.fixture def collector(): return SecEdgarCollector() def test_collector_name(collector): assert collector.name == "sec_edgar" assert collector.poll_interval == 1800 async def test_is_available(collector): assert await collector.is_available() is True async def test_collect_parses_filings(collector): mock_response = { "filings": { "recent": { "accessionNumber": ["0001234-26-000001"], "filingDate": ["2026-04-02"], "primaryDocument": ["filing.htm"], "form": ["8-K"], "primaryDocDescription": ["Current Report"], } }, "tickers": [{"ticker": "AAPL"}], "name": "Apple Inc", } with patch.object(collector, "_fetch_recent_filings", new_callable=AsyncMock, return_value=[mock_response]): items = await collector.collect() assert len(items) == 1 assert items[0].source == "sec_edgar" assert items[0].category.value == "filing" assert "AAPL" in items[0].symbols async def test_collect_handles_empty(collector): with patch.object(collector, "_fetch_recent_filings", new_callable=AsyncMock, return_value=[]): items = await collector.collect() assert items == [] ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest services/news-collector/tests/test_sec_edgar.py -v` Expected: FAIL - [ ] **Step 3: Implement SecEdgarCollector** Create `services/news-collector/src/news_collector/collectors/sec_edgar.py`: ```python """SEC EDGAR filing collector (free, no API key required).""" import logging from datetime import datetime, timezone import aiohttp from nltk.sentiment.vader import SentimentIntensityAnalyzer from shared.models import NewsCategory, NewsItem from news_collector.collectors.base import BaseCollector logger = logging.getLogger(__name__) EDGAR_FULL_TEXT_SEARCH = "https://efts.sec.gov/LATEST/search-index" EDGAR_RECENT_FILINGS = "https://efts.sec.gov/LATEST/search-index?q=%228-K%22&dateRange=custom&startdt={date}&enddt={date}&forms=8-K" EDGAR_COMPANY_FILINGS = "https://data.sec.gov/submissions/CIK{cik}.json" # CIK numbers for major companies (subset — extend as needed) TRACKED_CIKS = { "0000320193": "AAPL", "0000789019": "MSFT", "0001652044": "GOOGL", "0001018724": "AMZN", "0001318605": "TSLA", "0001045810": "NVDA", "0001326801": "META", "0000019617": "JPM", "0000078003": "PFE", "0000021344": "KO", } SEC_USER_AGENT = "TradingPlatform research@example.com" class SecEdgarCollector(BaseCollector): name = "sec_edgar" poll_interval = 1800 # 30 minutes def __init__(self) -> None: self._vader = SentimentIntensityAnalyzer() async def is_available(self) -> bool: return True async def _fetch_recent_filings(self) -> list[dict]: """Fetch recent 8-K filings for tracked companies.""" results = [] headers = {"User-Agent": SEC_USER_AGENT} async with aiohttp.ClientSession() as session: for cik, ticker in TRACKED_CIKS.items(): try: url = f"https://data.sec.gov/submissions/CIK{cik}.json" async with session.get( url, headers=headers, timeout=aiohttp.ClientTimeout(total=10) ) as resp: if resp.status == 200: data = await resp.json() data["tickers"] = [{"ticker": ticker}] results.append(data) except Exception as exc: logger.warning("sec_fetch_failed", cik=cik, error=str(exc)) return results async def collect(self) -> list[NewsItem]: filings_data = await self._fetch_recent_filings() items = [] today = datetime.now(timezone.utc).strftime("%Y-%m-%d") for company_data in filings_data: tickers = [t["ticker"] for t in company_data.get("tickers", [])] company_name = company_data.get("name", "Unknown") recent = company_data.get("filings", {}).get("recent", {}) forms = recent.get("form", []) dates = recent.get("filingDate", []) descriptions = recent.get("primaryDocDescription", []) accessions = recent.get("accessionNumber", []) for i, form in enumerate(forms): if form != "8-K": continue filing_date = dates[i] if i < len(dates) else "" if filing_date != today: continue desc = descriptions[i] if i < len(descriptions) else "8-K Filing" accession = accessions[i] if i < len(accessions) else "" headline = f"{company_name} ({', '.join(tickers)}): {form} - {desc}" items.append( NewsItem( source=self.name, headline=headline, summary=desc, url=f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&accession={accession}", published_at=datetime.strptime(filing_date, "%Y-%m-%d").replace(tzinfo=timezone.utc), symbols=tickers, sentiment=self._vader.polarity_scores(headline)["compound"], category=NewsCategory.FILING, raw_data={"form": form, "accession": accession}, ) ) return items ``` - [ ] **Step 4: Run tests to verify they pass** Run: `pytest services/news-collector/tests/test_sec_edgar.py -v` Expected: All 4 tests PASS - [ ] **Step 5: Commit** ```bash git add services/news-collector/src/news_collector/collectors/sec_edgar.py services/news-collector/tests/test_sec_edgar.py git commit -m "feat: implement SEC EDGAR 8-K filing collector" ``` --- ### Task 11: Implement Reddit collector **Files:** - Create: `services/news-collector/src/news_collector/collectors/reddit.py` - Create: `services/news-collector/tests/test_reddit.py` - [ ] **Step 1: Write tests** Create `services/news-collector/tests/test_reddit.py`: ```python """Tests for Reddit collector.""" import pytest from unittest.mock import AsyncMock, patch from news_collector.collectors.reddit import RedditCollector @pytest.fixture def collector(): return RedditCollector() def test_collector_name(collector): assert collector.name == "reddit" assert collector.poll_interval == 900 async def test_is_available(collector): assert await collector.is_available() is True async def test_collect_parses_posts(collector): mock_posts = [ { "data": { "title": "NVDA to the moon! 🚀 AI demand is insane", "selftext": "Just loaded up on NVDA calls", "url": "https://reddit.com/r/wallstreetbets/123", "created_utc": 1711929600, "score": 500, "num_comments": 200, "subreddit": "wallstreetbets", } }, ] with patch.object(collector, "_fetch_subreddit", new_callable=AsyncMock, return_value=mock_posts): items = await collector.collect() assert len(items) >= 1 assert items[0].source == "reddit" assert items[0].category.value == "social" assert isinstance(items[0].sentiment, float) async def test_collect_filters_low_score(collector): mock_posts = [ { "data": { "title": "Random question about stocks", "selftext": "", "url": "https://reddit.com/r/stocks/456", "created_utc": 1711929600, "score": 3, "num_comments": 1, "subreddit": "stocks", } }, ] with patch.object(collector, "_fetch_subreddit", new_callable=AsyncMock, return_value=mock_posts): items = await collector.collect() assert items == [] ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest services/news-collector/tests/test_reddit.py -v` Expected: FAIL - [ ] **Step 3: Implement RedditCollector** Create `services/news-collector/src/news_collector/collectors/reddit.py`: ```python """Reddit collector for r/wallstreetbets, r/stocks, r/investing.""" import logging import re from datetime import datetime, timezone import aiohttp from nltk.sentiment.vader import SentimentIntensityAnalyzer from shared.models import NewsCategory, NewsItem from news_collector.collectors.base import BaseCollector logger = logging.getLogger(__name__) SUBREDDITS = ["wallstreetbets", "stocks", "investing"] MIN_SCORE = 50 # Minimum upvotes to consider TICKER_PATTERN = re.compile( r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|JPM|V|JNJ|WMT|PG|UNH|HD|" r"MA|DIS|BAC|XOM|PFE|KO|PEP|CSCO|INTC|VZ|NFLX|ADBE|CRM|AMD|QCOM|" r"GS|BA|CAT|MMM|IBM|GE|F|GM|NKE|MCD|SBUX|SPY|QQQ|IWM)\b" ) class RedditCollector(BaseCollector): name = "reddit" poll_interval = 900 # 15 minutes def __init__(self) -> None: self._vader = SentimentIntensityAnalyzer() async def is_available(self) -> bool: return True async def _fetch_subreddit(self, subreddit: str = "wallstreetbets") -> list[dict]: """Fetch hot posts from a subreddit via JSON API.""" url = f"https://www.reddit.com/r/{subreddit}/hot.json?limit=25" headers = {"User-Agent": "TradingPlatform/1.0"} try: async with aiohttp.ClientSession() as session: async with session.get( url, headers=headers, timeout=aiohttp.ClientTimeout(total=10) ) as resp: if resp.status != 200: logger.warning("reddit_fetch_failed", subreddit=subreddit, status=resp.status) return [] data = await resp.json() return data.get("data", {}).get("children", []) except Exception as exc: logger.warning("reddit_error", subreddit=subreddit, error=str(exc)) return [] async def collect(self) -> list[NewsItem]: items = [] seen_titles = set() for subreddit in SUBREDDITS: posts = await self._fetch_subreddit(subreddit) for post in posts: data = post.get("data", {}) title = data.get("title", "").strip() score = data.get("score", 0) if not title or title in seen_titles or score < MIN_SCORE: continue seen_titles.add(title) selftext = data.get("selftext", "") text = f"{title}. {selftext}" if selftext else title symbols = list(set(TICKER_PATTERN.findall(text))) items.append( NewsItem( source=self.name, headline=title, summary=selftext[:500] if selftext else None, url=data.get("url"), published_at=datetime.fromtimestamp( data.get("created_utc", 0), tz=timezone.utc ), symbols=symbols, sentiment=self._vader.polarity_scores(text)["compound"], category=NewsCategory.SOCIAL, raw_data={ "subreddit": data.get("subreddit", subreddit), "score": score, "num_comments": data.get("num_comments", 0), }, ) ) return items ``` - [ ] **Step 4: Run tests to verify they pass** Run: `pytest services/news-collector/tests/test_reddit.py -v` Expected: All 4 tests PASS - [ ] **Step 5: Commit** ```bash git add services/news-collector/src/news_collector/collectors/reddit.py services/news-collector/tests/test_reddit.py git commit -m "feat: implement Reddit social sentiment collector" ``` --- ### Task 12: Implement Truth Social and Fed collectors **Files:** - Create: `services/news-collector/src/news_collector/collectors/truth_social.py` - Create: `services/news-collector/src/news_collector/collectors/fed.py` - Create: `services/news-collector/tests/test_truth_social.py` - Create: `services/news-collector/tests/test_fed.py` - [ ] **Step 1: Write tests for Truth Social** Create `services/news-collector/tests/test_truth_social.py`: ```python """Tests for Truth Social collector.""" import pytest from unittest.mock import AsyncMock, patch from news_collector.collectors.truth_social import TruthSocialCollector @pytest.fixture def collector(): return TruthSocialCollector() def test_collector_name(collector): assert collector.name == "truth_social" assert collector.poll_interval == 900 async def test_is_available(collector): assert await collector.is_available() is True async def test_collect_parses_posts(collector): mock_posts = [ { "content": "We are imposing 25% tariffs on all steel imports!", "created_at": "2026-04-02T12:00:00.000Z", "url": "https://truthsocial.com/@realDonaldTrump/12345", }, ] with patch.object(collector, "_fetch_posts", new_callable=AsyncMock, return_value=mock_posts): items = await collector.collect() assert len(items) == 1 assert items[0].source == "truth_social" assert items[0].category.value == "policy" assert "tariff" in items[0].headline.lower() or "tariff" in items[0].raw_data.get("content", "").lower() async def test_collect_handles_empty(collector): with patch.object(collector, "_fetch_posts", new_callable=AsyncMock, return_value=[]): items = await collector.collect() assert items == [] ``` - [ ] **Step 2: Write tests for Fed collector** Create `services/news-collector/tests/test_fed.py`: ```python """Tests for Federal Reserve collector.""" import pytest from unittest.mock import AsyncMock, patch from news_collector.collectors.fed import FedCollector @pytest.fixture def collector(): return FedCollector() def test_collector_name(collector): assert collector.name == "fed" assert collector.poll_interval == 3600 async def test_is_available(collector): assert await collector.is_available() is True async def test_collect_parses_rss(collector): mock_entries = [ { "title": "Federal Reserve issues FOMC statement", "link": "https://www.federalreserve.gov/newsevents/pressreleases/monetary20260402a.htm", "published_parsed": (2026, 4, 2, 14, 0, 0, 0, 0, 0), "summary": "The Federal Open Market Committee decided to maintain the target range...", }, ] with patch.object(collector, "_fetch_fed_rss", new_callable=AsyncMock, return_value=mock_entries): items = await collector.collect() assert len(items) == 1 assert items[0].source == "fed" assert items[0].category.value == "fed" ``` - [ ] **Step 3: Run tests to verify they fail** Run: `pytest services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py -v` Expected: FAIL - [ ] **Step 4: Implement TruthSocialCollector** Create `services/news-collector/src/news_collector/collectors/truth_social.py`: ```python """Truth Social collector for Trump posts (policy-relevant).""" import logging from datetime import datetime, timezone import aiohttp from nltk.sentiment.vader import SentimentIntensityAnalyzer from shared.models import NewsCategory, NewsItem from news_collector.collectors.base import BaseCollector logger = logging.getLogger(__name__) # Truth Social uses a Mastodon-compatible API TRUTH_SOCIAL_API = "https://truthsocial.com/api/v1/accounts/107780257626128497/statuses" class TruthSocialCollector(BaseCollector): name = "truth_social" poll_interval = 900 # 15 minutes def __init__(self) -> None: self._vader = SentimentIntensityAnalyzer() async def is_available(self) -> bool: return True async def _fetch_posts(self) -> list[dict]: """Fetch recent posts from Truth Social.""" headers = {"User-Agent": "Mozilla/5.0"} try: async with aiohttp.ClientSession() as session: async with session.get( TRUTH_SOCIAL_API, headers=headers, params={"limit": 10}, timeout=aiohttp.ClientTimeout(total=15), ) as resp: if resp.status != 200: logger.warning("truth_social_fetch_failed", status=resp.status) return [] return await resp.json() except Exception as exc: logger.warning("truth_social_error", error=str(exc)) return [] def _strip_html(self, text: str) -> str: """Remove HTML tags from content.""" import re return re.sub(r"<[^>]+>", "", text).strip() async def collect(self) -> list[NewsItem]: posts = await self._fetch_posts() items = [] for post in posts: content = self._strip_html(post.get("content", "")) if not content: continue created_at_str = post.get("created_at", "") try: published = datetime.fromisoformat(created_at_str.replace("Z", "+00:00")) except (ValueError, AttributeError): published = datetime.now(timezone.utc) items.append( NewsItem( source=self.name, headline=content[:200], summary=content if len(content) > 200 else None, url=post.get("url"), published_at=published, symbols=[], # Symbols extracted at aggregation stage via LLM sentiment=self._vader.polarity_scores(content)["compound"], category=NewsCategory.POLICY, raw_data={"content": content, "id": post.get("id")}, ) ) return items ``` - [ ] **Step 5: Implement FedCollector** Create `services/news-collector/src/news_collector/collectors/fed.py`: ```python """Federal Reserve press release and FOMC statement collector.""" import logging from calendar import timegm from datetime import datetime, timezone import aiohttp import feedparser from nltk.sentiment.vader import SentimentIntensityAnalyzer from shared.models import NewsCategory, NewsItem from news_collector.collectors.base import BaseCollector logger = logging.getLogger(__name__) FED_RSS_URL = "https://www.federalreserve.gov/feeds/press_all.xml" class FedCollector(BaseCollector): name = "fed" poll_interval = 3600 # 1 hour def __init__(self) -> None: self._vader = SentimentIntensityAnalyzer() async def is_available(self) -> bool: return True async def _fetch_fed_rss(self) -> list[dict]: """Fetch Federal Reserve RSS feed entries.""" try: async with aiohttp.ClientSession() as session: async with session.get( FED_RSS_URL, timeout=aiohttp.ClientTimeout(total=10) ) as resp: if resp.status != 200: logger.warning("fed_rss_failed", status=resp.status) return [] text = await resp.text() feed = feedparser.parse(text) return feed.get("entries", []) except Exception as exc: logger.warning("fed_rss_error", error=str(exc)) return [] def _detect_stance(self, text: str) -> str: """Detect hawkish/dovish/neutral stance from text.""" text_lower = text.lower() hawkish_words = ["tighten", "raise", "inflation concern", "restrictive", "higher rates"] dovish_words = ["accommodate", "cut", "easing", "lower rates", "support growth"] hawk_count = sum(1 for w in hawkish_words if w in text_lower) dove_count = sum(1 for w in dovish_words if w in text_lower) if hawk_count > dove_count: return "hawkish" if dove_count > hawk_count: return "dovish" return "neutral" async def collect(self) -> list[NewsItem]: entries = await self._fetch_fed_rss() items = [] for entry in entries: title = entry.get("title", "").strip() if not title: continue summary = entry.get("summary", "") parsed_time = entry.get("published_parsed") if parsed_time: published = datetime.fromtimestamp(timegm(parsed_time), tz=timezone.utc) else: published = datetime.now(timezone.utc) text = f"{title}. {summary}" if summary else title items.append( NewsItem( source=self.name, headline=title, summary=summary or None, url=entry.get("link"), published_at=published, symbols=[], sentiment=self._vader.polarity_scores(text)["compound"], category=NewsCategory.FED, raw_data={"stance": self._detect_stance(text)}, ) ) return items ``` - [ ] **Step 6: Run tests to verify they pass** Run: `pytest services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py -v` Expected: All 7 tests PASS - [ ] **Step 7: Commit** ```bash git add services/news-collector/src/news_collector/collectors/truth_social.py services/news-collector/src/news_collector/collectors/fed.py services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py git commit -m "feat: implement Truth Social and Federal Reserve collectors" ``` --- ### Task 13: Implement news-collector main.py (scheduler) **Files:** - Create: `services/news-collector/src/news_collector/main.py` - Create: `services/news-collector/tests/test_main.py` - [ ] **Step 1: Write tests** Create `services/news-collector/tests/test_main.py`: ```python """Tests for news collector scheduler.""" import pytest from unittest.mock import AsyncMock, patch, MagicMock from datetime import datetime, timezone from shared.models import NewsCategory, NewsItem from news_collector.main import run_collector_once async def test_run_collector_once_stores_and_publishes(): mock_item = NewsItem( source="test", headline="Test news", published_at=datetime(2026, 4, 2, tzinfo=timezone.utc), sentiment=0.5, category=NewsCategory.MACRO, ) mock_collector = MagicMock() mock_collector.name = "test" mock_collector.collect = AsyncMock(return_value=[mock_item]) mock_db = MagicMock() mock_db.insert_news_item = AsyncMock() mock_broker = MagicMock() mock_broker.publish = AsyncMock() count = await run_collector_once(mock_collector, mock_db, mock_broker) assert count == 1 mock_db.insert_news_item.assert_called_once_with(mock_item) mock_broker.publish.assert_called_once() async def test_run_collector_once_handles_empty(): mock_collector = MagicMock() mock_collector.name = "test" mock_collector.collect = AsyncMock(return_value=[]) mock_db = MagicMock() mock_broker = MagicMock() count = await run_collector_once(mock_collector, mock_db, mock_broker) assert count == 0 ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest services/news-collector/tests/test_main.py -v` Expected: FAIL - [ ] **Step 3: Implement main.py** Create `services/news-collector/src/news_collector/main.py`: ```python """News Collector Service — schedules and runs all news collectors.""" import asyncio import logging from shared.broker import RedisBroker from shared.db import Database from shared.events import NewsEvent from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.models import NewsItem from shared.notifier import TelegramNotifier from shared.sentiment_models import MarketSentiment from news_collector.config import NewsCollectorConfig from news_collector.collectors.base import BaseCollector from news_collector.collectors.finnhub import FinnhubCollector from news_collector.collectors.rss import RSSCollector from news_collector.collectors.sec_edgar import SecEdgarCollector from news_collector.collectors.truth_social import TruthSocialCollector from news_collector.collectors.reddit import RedditCollector from news_collector.collectors.fear_greed import FearGreedCollector from news_collector.collectors.fed import FedCollector logger = logging.getLogger(__name__) async def run_collector_once( collector: BaseCollector, db: Database, broker: RedisBroker, ) -> int: """Run a single collector, store results, publish to Redis. Returns number of items collected.""" items = await collector.collect() if not isinstance(items, list): # FearGreedCollector returns a FearGreedResult, not a list return 0 for item in items: await db.insert_news_item(item) event = NewsEvent(data=item) await broker.publish("news", event.to_dict()) return len(items) async def run_collector_loop( collector: BaseCollector, db: Database, broker: RedisBroker, log, ) -> None: """Run a collector on its poll interval forever.""" while True: try: if await collector.is_available(): count = await run_collector_once(collector, db, broker) log.info("collector_run", collector=collector.name, items=count) else: log.debug("collector_unavailable", collector=collector.name) except Exception as exc: log.error("collector_error", collector=collector.name, error=str(exc)) await asyncio.sleep(collector.poll_interval) async def run_fear_greed_loop( collector: FearGreedCollector, db: Database, log, ) -> None: """Run the Fear & Greed collector and update market sentiment.""" from datetime import datetime, timezone while True: try: result = await collector.collect() if result is not None: ms = MarketSentiment( fear_greed=result.fear_greed, fear_greed_label=result.fear_greed_label, fed_stance="neutral", # Updated by Fed collector analysis market_regime=_determine_regime(result.fear_greed, None), updated_at=datetime.now(timezone.utc), ) await db.upsert_market_sentiment(ms) log.info("fear_greed_updated", score=result.fear_greed, label=result.fear_greed_label) except Exception as exc: log.error("fear_greed_error", error=str(exc)) await asyncio.sleep(collector.poll_interval) async def run_aggregator_loop( db: Database, interval: int, log, ) -> None: """Run sentiment aggregation every `interval` seconds. Reads recent news from DB, computes per-symbol scores, upserts into symbol_scores table.""" from datetime import datetime, timezone from shared.sentiment import SentimentAggregator aggregator = SentimentAggregator() while True: try: now = datetime.now(timezone.utc) news_items = await db.get_recent_news(hours=24) if news_items: scores = aggregator.aggregate(news_items, now) for symbol_score in scores.values(): await db.upsert_symbol_score(symbol_score) log.info("aggregation_complete", symbols=len(scores)) except Exception as exc: log.error("aggregation_error", error=str(exc)) await asyncio.sleep(interval) def _determine_regime(fear_greed: int, vix: float | None) -> str: """Determine market regime from Fear & Greed and VIX.""" if fear_greed <= 20: return "risk_off" if vix is not None and vix > 30: return "risk_off" if fear_greed >= 60 and (vix is None or vix < 20): return "risk_on" return "neutral" async def run() -> None: config = NewsCollectorConfig() log = setup_logging("news-collector", config.log_level, config.log_format) metrics = ServiceMetrics("news_collector") notifier = TelegramNotifier( bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id, ) db = Database(config.database_url) await db.connect() broker = RedisBroker(config.redis_url) health = HealthCheckServer( "news-collector", port=config.health_port, auth_token=config.metrics_auth_token, ) health.register_check("redis", broker.ping) await health.start() metrics.service_up.labels(service="news-collector").set(1) # Initialize collectors news_collectors: list[BaseCollector] = [ RSSCollector(), SecEdgarCollector(), TruthSocialCollector(), RedditCollector(), FedCollector(), ] # Finnhub requires API key if config.finnhub_api_key: news_collectors.append(FinnhubCollector(api_key=config.finnhub_api_key)) fear_greed = FearGreedCollector() log.info( "starting", collectors=[c.name for c in news_collectors], fear_greed=True, ) tasks = [] try: for collector in news_collectors: task = asyncio.create_task(run_collector_loop(collector, db, broker, log)) tasks.append(task) tasks.append(asyncio.create_task(run_fear_greed_loop(fear_greed, db, log))) tasks.append(asyncio.create_task( run_aggregator_loop(db, config.sentiment_aggregate_interval, log) )) await asyncio.gather(*tasks) except Exception as exc: log.error("fatal_error", error=str(exc)) await notifier.send_error(str(exc), "news-collector") raise finally: for task in tasks: task.cancel() metrics.service_up.labels(service="news-collector").set(0) await notifier.close() await broker.close() await db.close() def main() -> None: asyncio.run(run()) if __name__ == "__main__": main() ``` - [ ] **Step 4: Run tests to verify they pass** Run: `pytest services/news-collector/tests/test_main.py -v` Expected: All 2 tests PASS - [ ] **Step 5: Commit** ```bash git add services/news-collector/src/news_collector/main.py services/news-collector/tests/test_main.py git commit -m "feat: implement news-collector main scheduler with all collectors" ``` --- ## Phase 3: Sentiment Analysis Pipeline ### Task 14: Implement sentiment aggregator **Files:** - Modify: `shared/src/shared/sentiment.py` - Create: `shared/tests/test_sentiment_aggregator.py` - [ ] **Step 1: Write tests** Create `shared/tests/test_sentiment_aggregator.py`: ```python """Tests for sentiment aggregator.""" import pytest from datetime import datetime, timezone, timedelta from shared.sentiment import SentimentAggregator @pytest.fixture def aggregator(): return SentimentAggregator() def test_freshness_decay_recent(): a = SentimentAggregator() now = datetime.now(timezone.utc) assert a._freshness_decay(now, now) == 1.0 def test_freshness_decay_3_hours(): a = SentimentAggregator() now = datetime.now(timezone.utc) three_hours_ago = now - timedelta(hours=3) assert a._freshness_decay(three_hours_ago, now) == 0.7 def test_freshness_decay_12_hours(): a = SentimentAggregator() now = datetime.now(timezone.utc) twelve_hours_ago = now - timedelta(hours=12) assert a._freshness_decay(twelve_hours_ago, now) == 0.3 def test_freshness_decay_old(): a = SentimentAggregator() now = datetime.now(timezone.utc) two_days_ago = now - timedelta(days=2) assert a._freshness_decay(two_days_ago, now) == 0.0 def test_compute_composite(): a = SentimentAggregator() composite = a._compute_composite( news_score=0.5, social_score=0.3, policy_score=0.8, filing_score=0.2, ) expected = 0.5 * 0.3 + 0.3 * 0.2 + 0.8 * 0.3 + 0.2 * 0.2 assert abs(composite - expected) < 0.001 def test_aggregate_news_by_symbol(aggregator): now = datetime.now(timezone.utc) news_items = [ { "symbols": ["AAPL"], "sentiment": 0.8, "category": "earnings", "published_at": now, }, { "symbols": ["AAPL"], "sentiment": 0.3, "category": "macro", "published_at": now - timedelta(hours=2), }, { "symbols": ["MSFT"], "sentiment": -0.5, "category": "policy", "published_at": now, }, ] scores = aggregator.aggregate(news_items, now) assert "AAPL" in scores assert "MSFT" in scores assert scores["AAPL"].news_count == 2 assert scores["AAPL"].news_score > 0 # Positive overall assert scores["MSFT"].policy_score < 0 # Negative policy def test_aggregate_empty(aggregator): now = datetime.now(timezone.utc) scores = aggregator.aggregate([], now) assert scores == {} def test_determine_regime(): a = SentimentAggregator() assert a.determine_regime(15, None) == "risk_off" assert a.determine_regime(15, 35.0) == "risk_off" assert a.determine_regime(50, 35.0) == "risk_off" assert a.determine_regime(70, 15.0) == "risk_on" assert a.determine_regime(50, 20.0) == "neutral" ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest shared/tests/test_sentiment_aggregator.py -v` Expected: FAIL — `SentimentAggregator` not in sentiment.py - [ ] **Step 3: Add SentimentAggregator to sentiment.py** Keep the existing `SentimentData` class (for backward compat with existing tests). Add `SentimentAggregator` class at the end of `shared/src/shared/sentiment.py`: ```python from datetime import timedelta from shared.sentiment_models import SymbolScore class SentimentAggregator: """Aggregates per-news sentiment into per-symbol scores.""" # Weights: policy events are most impactful for US stocks WEIGHTS = { "news": 0.3, "social": 0.2, "policy": 0.3, "filing": 0.2, } # Category → score field mapping CATEGORY_MAP = { "earnings": "news", "macro": "news", "social": "social", "policy": "policy", "filing": "filing", "fed": "policy", } def _freshness_decay(self, published_at: datetime, now: datetime) -> float: """Compute freshness decay factor.""" age = now - published_at hours = age.total_seconds() / 3600 if hours < 1: return 1.0 if hours < 6: return 0.7 if hours < 24: return 0.3 return 0.0 def _compute_composite( self, news_score: float, social_score: float, policy_score: float, filing_score: float, ) -> float: return ( news_score * self.WEIGHTS["news"] + social_score * self.WEIGHTS["social"] + policy_score * self.WEIGHTS["policy"] + filing_score * self.WEIGHTS["filing"] ) def aggregate( self, news_items: list[dict], now: datetime ) -> dict[str, SymbolScore]: """Aggregate news items into per-symbol scores. Each news_items dict must have: symbols, sentiment, category, published_at. Returns dict mapping symbol → SymbolScore. """ # Accumulate per-symbol, per-category symbol_data: dict[str, dict] = {} for item in news_items: decay = self._freshness_decay(item["published_at"], now) if decay == 0.0: continue category = item.get("category", "macro") score_field = self.CATEGORY_MAP.get(category, "news") weighted_sentiment = item["sentiment"] * decay for symbol in item.get("symbols", []): if symbol not in symbol_data: symbol_data[symbol] = { "news_scores": [], "social_scores": [], "policy_scores": [], "filing_scores": [], "count": 0, } symbol_data[symbol][f"{score_field}_scores"].append(weighted_sentiment) symbol_data[symbol]["count"] += 1 # Compute averages and composites result = {} for symbol, data in symbol_data.items(): news_score = _safe_avg(data["news_scores"]) social_score = _safe_avg(data["social_scores"]) policy_score = _safe_avg(data["policy_scores"]) filing_score = _safe_avg(data["filing_scores"]) result[symbol] = SymbolScore( symbol=symbol, news_score=news_score, news_count=data["count"], social_score=social_score, policy_score=policy_score, filing_score=filing_score, composite=self._compute_composite( news_score, social_score, policy_score, filing_score ), updated_at=now, ) return result def determine_regime(self, fear_greed: int, vix: float | None) -> str: """Determine market regime.""" if fear_greed <= 20: return "risk_off" if vix is not None and vix > 30: return "risk_off" if fear_greed >= 60 and (vix is None or vix < 20): return "risk_on" return "neutral" def _safe_avg(values: list[float]) -> float: """Return average of values, or 0.0 if empty.""" if not values: return 0.0 return sum(values) / len(values) ``` - [ ] **Step 4: Run new tests to verify they pass** Run: `pytest shared/tests/test_sentiment_aggregator.py -v` Expected: All 9 tests PASS - [ ] **Step 5: Run existing sentiment tests for regressions** Run: `pytest shared/tests/test_sentiment.py -v` Expected: All existing tests PASS (SentimentData unchanged) - [ ] **Step 6: Commit** ```bash git add shared/src/shared/sentiment.py shared/tests/test_sentiment_aggregator.py git commit -m "feat: implement SentimentAggregator with freshness decay and composite scoring" ``` --- ## Phase 4: Stock Selector Engine ### Task 15: Implement stock selector **Files:** - Create: `services/strategy-engine/src/strategy_engine/stock_selector.py` - Create: `services/strategy-engine/tests/test_stock_selector.py` - [ ] **Step 1: Write tests** Create `services/strategy-engine/tests/test_stock_selector.py`: ```python """Tests for stock selector engine.""" import pytest from unittest.mock import AsyncMock, MagicMock, patch from datetime import datetime, timezone from decimal import Decimal from shared.models import OrderSide from shared.sentiment_models import SymbolScore, MarketSentiment, SelectedStock, Candidate from strategy_engine.stock_selector import ( SentimentCandidateSource, StockSelector, _parse_llm_selections, ) async def test_sentiment_candidate_source(): mock_db = MagicMock() mock_db.get_top_symbol_scores = AsyncMock(return_value=[ {"symbol": "AAPL", "composite": 0.8, "news_count": 5}, {"symbol": "NVDA", "composite": 0.6, "news_count": 3}, ]) source = SentimentCandidateSource(mock_db) candidates = await source.get_candidates() assert len(candidates) == 2 assert candidates[0].symbol == "AAPL" assert candidates[0].source == "sentiment" def test_parse_llm_selections_valid(): llm_response = """ [ {"symbol": "NVDA", "side": "BUY", "conviction": 0.85, "reason": "AI demand", "key_news": ["NVDA beats earnings"]}, {"symbol": "XOM", "side": "BUY", "conviction": 0.72, "reason": "Oil surge", "key_news": ["Oil prices up"]} ] """ selections = _parse_llm_selections(llm_response) assert len(selections) == 2 assert selections[0].symbol == "NVDA" assert selections[0].conviction == 0.85 def test_parse_llm_selections_invalid(): selections = _parse_llm_selections("not json") assert selections == [] def test_parse_llm_selections_with_markdown(): llm_response = """ Here are my picks: ```json [ {"symbol": "TSLA", "side": "BUY", "conviction": 0.7, "reason": "Momentum", "key_news": ["Tesla rally"]} ] ``` """ selections = _parse_llm_selections(llm_response) assert len(selections) == 1 assert selections[0].symbol == "TSLA" async def test_selector_blocks_on_risk_off(): mock_db = MagicMock() mock_db.get_latest_market_sentiment = AsyncMock(return_value={ "fear_greed": 15, "fear_greed_label": "Extreme Fear", "vix": 35.0, "fed_stance": "neutral", "market_regime": "risk_off", "updated_at": datetime.now(timezone.utc), }) selector = StockSelector(db=mock_db, broker=MagicMock(), alpaca=MagicMock(), anthropic_api_key="test") result = await selector.select() assert result == [] ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest services/strategy-engine/tests/test_stock_selector.py -v` Expected: FAIL — module not found - [ ] **Step 3: Implement StockSelector** Create `services/strategy-engine/src/strategy_engine/stock_selector.py`: ```python """Stock Selector Engine — 3-stage dynamic stock selection for MOC trading.""" import json import logging import re from datetime import datetime, timezone from decimal import Decimal from typing import Optional import aiohttp from shared.alpaca import AlpacaClient from shared.broker import RedisBroker from shared.db import Database from shared.models import OrderSide from shared.sentiment_models import Candidate, MarketSentiment, SelectedStock, SymbolScore logger = logging.getLogger(__name__) class SentimentCandidateSource: """Get candidate stocks from sentiment scores in DB.""" def __init__(self, db: Database, limit: int = 20) -> None: self._db = db self._limit = limit async def get_candidates(self) -> list[Candidate]: scores = await self._db.get_top_symbol_scores(limit=self._limit) return [ Candidate( symbol=s["symbol"], source="sentiment", score=s["composite"], reason=f"Sentiment composite={s['composite']:.2f}, news_count={s['news_count']}", ) for s in scores if s["composite"] != 0 ] class LLMCandidateSource: """Get candidate stocks by asking Claude to analyze today's top news.""" def __init__(self, db: Database, api_key: str, model: str) -> None: self._db = db self._api_key = api_key self._model = model async def get_candidates(self) -> list[Candidate]: news = await self._db.get_recent_news(hours=24) if not news: return [] headlines = [f"- [{n['source']}] {n['headline']} (sentiment: {n['sentiment']:.2f})" for n in news[:50]] prompt = ( "You are a stock market analyst. Based on today's news headlines below, " "identify US stocks that are most likely to be affected (positively or negatively). " "Return a JSON array of objects with: symbol, direction (BUY or SELL), score (0-1), reason.\n\n" "Headlines:\n" + "\n".join(headlines) + "\n\n" "Return ONLY the JSON array, no other text." ) try: async with aiohttp.ClientSession() as session: async with session.post( "https://api.anthropic.com/v1/messages", headers={ "x-api-key": self._api_key, "anthropic-version": "2023-06-01", "content-type": "application/json", }, json={ "model": self._model, "max_tokens": 1024, "messages": [{"role": "user", "content": prompt}], }, timeout=aiohttp.ClientTimeout(total=30), ) as resp: if resp.status != 200: logger.warning("llm_candidate_failed", status=resp.status) return [] data = await resp.json() text = data["content"][0]["text"] except Exception as exc: logger.error("llm_candidate_error", error=str(exc)) return [] return self._parse_response(text) def _parse_response(self, text: str) -> list[Candidate]: try: # Extract JSON from possible markdown code blocks json_match = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL) if json_match: text = json_match.group(1) items = json.loads(text) except (json.JSONDecodeError, TypeError): return [] candidates = [] for item in items: try: direction = OrderSide(item.get("direction", "BUY")) candidates.append( Candidate( symbol=item["symbol"], source="llm", direction=direction, score=float(item.get("score", 0.5)), reason=item.get("reason", "LLM recommendation"), ) ) except (KeyError, ValueError): continue return candidates class StockSelector: """3-stage stock selector: candidates → technical filter → LLM final pick.""" def __init__( self, db: Database, broker: RedisBroker, alpaca: AlpacaClient, anthropic_api_key: str, anthropic_model: str = "claude-sonnet-4-20250514", max_picks: int = 3, ) -> None: self._db = db self._broker = broker self._alpaca = alpaca self._api_key = anthropic_api_key self._model = anthropic_model self._max_picks = max_picks self._sentiment_source = SentimentCandidateSource(db) self._llm_source = LLMCandidateSource(db, anthropic_api_key, anthropic_model) async def select(self) -> list[SelectedStock]: """Run full 3-stage selection. Returns list of SelectedStock.""" # Check market sentiment gate ms = await self._db.get_latest_market_sentiment() if ms and ms.get("market_regime") == "risk_off": logger.info("selection_blocked_risk_off") return [] # Stage 1: Candidate pool sentiment_candidates = await self._sentiment_source.get_candidates() llm_candidates = await self._llm_source.get_candidates() candidates = self._merge_candidates(sentiment_candidates, llm_candidates) if not candidates: logger.info("no_candidates_found") return [] logger.info("candidates_found", count=len(candidates)) # Stage 2: Technical filter filtered = await self._technical_filter(candidates) if not filtered: logger.info("all_candidates_filtered_out") return [] logger.info("technical_filter_passed", count=len(filtered)) # Stage 3: LLM final selection selections = await self._llm_final_select(filtered, ms) # Publish to Redis for selection in selections: await self._broker.publish( "selected_stocks", selection.model_dump(mode="json"), ) # Persist audit trail from datetime import date as date_type for selection in selections: score_data = await self._db.get_top_symbol_scores(limit=100) snapshot = next( (s for s in score_data if s["symbol"] == selection.symbol), {}, ) await self._db.insert_stock_selection( trade_date=date_type.today(), symbol=selection.symbol, side=selection.side.value, conviction=selection.conviction, reason=selection.reason, key_news=selection.key_news, sentiment_snapshot=snapshot, ) return selections def _merge_candidates( self, sentiment: list[Candidate], llm: list[Candidate], ) -> list[Candidate]: """Merge and deduplicate candidates, preferring higher scores.""" by_symbol: dict[str, Candidate] = {} for c in sentiment + llm: if c.symbol not in by_symbol or c.score > by_symbol[c.symbol].score: by_symbol[c.symbol] = c return sorted(by_symbol.values(), key=lambda c: c.score, reverse=True) async def _technical_filter(self, candidates: list[Candidate]) -> list[Candidate]: """Apply MOC-style technical screening to candidates.""" import pandas as pd passed = [] for candidate in candidates: try: bars = await self._alpaca.get_bars( candidate.symbol, timeframe="1Day", limit=30 ) if not bars or len(bars) < 21: continue closes = pd.Series([float(b["c"]) for b in bars]) volumes = pd.Series([float(b["v"]) for b in bars]) # RSI delta = closes.diff() gain = delta.clip(lower=0) loss = -delta.clip(upper=0) avg_gain = gain.ewm(com=13, min_periods=14).mean() avg_loss = loss.ewm(com=13, min_periods=14).mean() rs = avg_gain / avg_loss.replace(0, float("nan")) rsi = 100 - (100 / (1 + rs)) current_rsi = rsi.iloc[-1] if pd.isna(current_rsi) or not (30 <= current_rsi <= 70): continue # EMA ema20 = closes.ewm(span=20, adjust=False).mean().iloc[-1] if closes.iloc[-1] < ema20: continue # Volume above average vol_avg = volumes.iloc[-20:].mean() if vol_avg > 0 and volumes.iloc[-1] < vol_avg * 0.5: continue passed.append(candidate) except Exception as exc: logger.warning("technical_filter_error", symbol=candidate.symbol, error=str(exc)) continue return passed async def _llm_final_select( self, candidates: list[Candidate], market_sentiment: Optional[dict], ) -> list[SelectedStock]: """Ask Claude to make final 2-3 picks from filtered candidates.""" # Build context candidate_info = [] for c in candidates[:15]: candidate_info.append(f"- {c.symbol}: score={c.score:.2f}, source={c.source}, reason={c.reason}") news = await self._db.get_recent_news(hours=12) top_news = [f"- [{n['source']}] {n['headline']}" for n in news[:20]] ms_info = "No market sentiment data available." if market_sentiment: ms_info = ( f"Fear & Greed: {market_sentiment.get('fear_greed', 'N/A')} " f"({market_sentiment.get('fear_greed_label', 'N/A')}), " f"VIX: {market_sentiment.get('vix', 'N/A')}, " f"Fed Stance: {market_sentiment.get('fed_stance', 'N/A')}" ) prompt = ( f"You are a professional stock trader selecting {self._max_picks} stocks for " f"Market-on-Close (MOC) overnight trading. You buy at market close and sell at " f"next day's open.\n\n" f"## Market Conditions\n{ms_info}\n\n" f"## Candidate Stocks (pre-screened technically)\n" + "\n".join(candidate_info) + "\n\n" f"## Today's Key News\n" + "\n".join(top_news) + "\n\n" f"Select the best {self._max_picks} stocks. For each, provide:\n" f"- symbol: ticker\n" f"- side: BUY or SELL\n" f"- conviction: 0.0-1.0\n" f"- reason: one sentence\n" f"- key_news: list of relevant headlines\n\n" f"Return ONLY a JSON array. No other text." ) try: async with aiohttp.ClientSession() as session: async with session.post( "https://api.anthropic.com/v1/messages", headers={ "x-api-key": self._api_key, "anthropic-version": "2023-06-01", "content-type": "application/json", }, json={ "model": self._model, "max_tokens": 1024, "messages": [{"role": "user", "content": prompt}], }, timeout=aiohttp.ClientTimeout(total=30), ) as resp: if resp.status != 200: logger.error("llm_final_select_failed", status=resp.status) return [] data = await resp.json() text = data["content"][0]["text"] except Exception as exc: logger.error("llm_final_select_error", error=str(exc)) return [] return _parse_llm_selections(text) def _parse_llm_selections(text: str) -> list[SelectedStock]: """Parse LLM response into SelectedStock list.""" try: json_match = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL) if json_match: text = json_match.group(1) # Also try to find a bare JSON array array_match = re.search(r"\[.*\]", text, re.DOTALL) if array_match: text = array_match.group(0) items = json.loads(text) except (json.JSONDecodeError, TypeError): return [] selections = [] for item in items: try: selections.append( SelectedStock( symbol=item["symbol"], side=OrderSide(item.get("side", "BUY")), conviction=float(item.get("conviction", 0.5)), reason=item.get("reason", ""), key_news=item.get("key_news", []), ) ) except (KeyError, ValueError): continue return selections ``` - [ ] **Step 4: Run tests to verify they pass** Run: `pytest services/strategy-engine/tests/test_stock_selector.py -v` Expected: All 5 tests PASS - [ ] **Step 5: Run all strategy engine tests for regressions** Run: `pytest services/strategy-engine/tests/ -v` Expected: All tests PASS - [ ] **Step 6: Commit** ```bash git add services/strategy-engine/src/strategy_engine/stock_selector.py services/strategy-engine/tests/test_stock_selector.py git commit -m "feat: implement 3-stage stock selector (sentiment → technical → LLM)" ``` --- ## Phase 5: Integration (MOC + Notifications + Docker) ### Task 16: Add Telegram notification for stock selections **Files:** - Modify: `shared/src/shared/notifier.py` - Modify: `shared/tests/test_notifier.py` - [ ] **Step 1: Add send_stock_selection method to notifier.py** Add this method and import to `shared/src/shared/notifier.py`: Add to imports: ```python from shared.sentiment_models import SelectedStock, MarketSentiment ``` Add method to `TelegramNotifier` class: ```python async def send_stock_selection( self, selections: list[SelectedStock], market: MarketSentiment | None = None, ) -> None: """Format and send stock selection notification.""" lines = [f"📊 Stock Selection ({len(selections)} picks)", ""] side_emoji = {"BUY": "🟢", "SELL": "🔴"} for i, s in enumerate(selections, 1): emoji = side_emoji.get(s.side.value, "⚪") lines.append( f"{i}. {s.symbol} {emoji} {s.side.value} " f"(conviction: {s.conviction:.0%})" ) lines.append(f" {s.reason}") if s.key_news: lines.append(f" News: {s.key_news[0]}") lines.append("") if market: lines.append( f"Market: F&G {market.fear_greed} ({market.fear_greed_label})" + (f" | VIX {market.vix:.1f}" if market.vix else "") ) await self.send("\n".join(lines)) ``` - [ ] **Step 2: Add test for the new method** Add to `shared/tests/test_notifier.py`: ```python from shared.models import OrderSide from shared.sentiment_models import SelectedStock, MarketSentiment from datetime import datetime, timezone async def test_send_stock_selection(notifier, mock_session): """Test stock selection notification formatting.""" selections = [ SelectedStock( symbol="NVDA", side=OrderSide.BUY, conviction=0.85, reason="CHIPS Act expansion", key_news=["Trump signs CHIPS Act"], ), ] market = MarketSentiment( fear_greed=55, fear_greed_label="Neutral", vix=18.2, fed_stance="neutral", market_regime="neutral", updated_at=datetime.now(timezone.utc), ) await notifier.send_stock_selection(selections, market) mock_session.post.assert_called_once() ``` Note: Check `shared/tests/test_notifier.py` for existing fixture names (`notifier`, `mock_session`) and adapt accordingly. - [ ] **Step 3: Run notifier tests** Run: `pytest shared/tests/test_notifier.py -v` Expected: All tests PASS - [ ] **Step 4: Commit** ```bash git add shared/src/shared/notifier.py shared/tests/test_notifier.py git commit -m "feat: add Telegram notification for stock selections" ``` --- ### Task 17: Integrate stock selector with MOC strategy **Files:** - Modify: `services/strategy-engine/src/strategy_engine/main.py` - Modify: `services/strategy-engine/src/strategy_engine/config.py` - [ ] **Step 1: Update strategy engine config** Add to `StrategyConfig` in `services/strategy-engine/src/strategy_engine/config.py`: ```python selector_candidates_time: str = "15:00" selector_filter_time: str = "15:15" selector_final_time: str = "15:30" selector_max_picks: int = 3 anthropic_api_key: str = "" anthropic_model: str = "claude-sonnet-4-20250514" ``` - [ ] **Step 2: Add stock selector scheduling to main.py** Add a new coroutine to `services/strategy-engine/src/strategy_engine/main.py` that runs the stock selector at the configured times. Add imports: ```python from shared.alpaca import AlpacaClient from shared.db import Database from shared.notifier import TelegramNotifier from shared.sentiment_models import MarketSentiment from strategy_engine.stock_selector import StockSelector ``` Add the selector loop function: ```python async def run_stock_selector( selector: StockSelector, notifier: TelegramNotifier, db: Database, config: StrategyConfig, log, ) -> None: """Run the stock selector once per day at the configured time.""" import zoneinfo et = zoneinfo.ZoneInfo("America/New_York") while True: now_et = datetime.now(et) target_hour, target_min = map(int, config.selector_final_time.split(":")) # Check if it's time to run (within 1-minute window) if now_et.hour == target_hour and now_et.minute == target_min: log.info("stock_selector_running") try: selections = await selector.select() if selections: ms_data = await db.get_latest_market_sentiment() ms = None if ms_data: ms = MarketSentiment(**ms_data) await notifier.send_stock_selection(selections, ms) log.info( "stock_selector_complete", picks=[s.symbol for s in selections], ) else: log.info("stock_selector_no_picks") except Exception as exc: log.error("stock_selector_error", error=str(exc)) # Sleep past this minute to avoid re-triggering await asyncio.sleep(120) else: await asyncio.sleep(30) ``` In the `run()` function, add after creating the broker: ```python db = Database(config.database_url) await db.connect() alpaca = AlpacaClient( api_key=config.alpaca_api_key, api_secret=config.alpaca_api_secret, paper=config.alpaca_paper, ) ``` And add the selector if anthropic key is configured: ```python if config.anthropic_api_key: selector = StockSelector( db=db, broker=broker, alpaca=alpaca, anthropic_api_key=config.anthropic_api_key, anthropic_model=config.anthropic_model, max_picks=config.selector_max_picks, ) tasks.append(asyncio.create_task( run_stock_selector(selector, notifier, db, config, log) )) log.info("stock_selector_enabled", time=config.selector_final_time) ``` Add to the `finally` block: ```python await alpaca.close() await db.close() ``` - [ ] **Step 3: Run strategy engine tests for regressions** Run: `pytest services/strategy-engine/tests/ -v` Expected: All tests PASS - [ ] **Step 4: Commit** ```bash git add services/strategy-engine/src/strategy_engine/main.py services/strategy-engine/src/strategy_engine/config.py git commit -m "feat: integrate stock selector into strategy engine scheduler" ``` --- ### Task 18: Update Docker Compose and .env **Files:** - Modify: `docker-compose.yml` - Modify: `.env.example` (already done in Task 5, just verify) - [ ] **Step 1: Add news-collector service to docker-compose.yml** Add before the `loki:` service block in `docker-compose.yml`: ```yaml news-collector: build: context: . dockerfile: services/news-collector/Dockerfile env_file: .env ports: - "8084:8084" depends_on: redis: condition: service_healthy postgres: condition: service_healthy healthcheck: test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8084/health')"] interval: 10s timeout: 5s retries: 3 restart: unless-stopped ``` - [ ] **Step 2: Verify compose file is valid** Run: `docker compose config --quiet 2>&1 || echo "INVALID"` Expected: No output (valid) or compose config displayed without errors - [ ] **Step 3: Commit** ```bash git add docker-compose.yml git commit -m "feat: add news-collector service to Docker Compose" ``` --- ### Task 19: Run full test suite and lint - [ ] **Step 1: Install test dependencies** Run: `pip install -e shared/ && pip install aiosqlite feedparser nltk aioresponses` - [ ] **Step 2: Download VADER lexicon** Run: `python -c "import nltk; nltk.download('vader_lexicon', quiet=True)"` - [ ] **Step 3: Run lint** Run: `make lint` Expected: No lint errors. If there are errors, fix them. - [ ] **Step 4: Run full test suite** Run: `make test` Expected: All tests PASS - [ ] **Step 5: Fix any issues found in steps 3-4** If lint or tests fail, fix the issues and re-run. - [ ] **Step 6: Final commit if any fixes were needed** ```bash git add -A git commit -m "fix: resolve lint and test issues from news selector integration" ```