From b53867cb98691ef68d8e2c702e5bcd6c5f737744 Mon Sep 17 00:00:00 2001 From: TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> Date: Thu, 2 Apr 2026 13:44:48 +0900 Subject: docs: add news-driven stock selector implementation plan 19 tasks across 5 phases: shared models, news collector service (7 collectors), sentiment aggregation pipeline, stock selector engine, and integration. --- .../plans/2026-04-02-news-driven-stock-selector.md | 3689 ++++++++++++++++++++ 1 file changed, 3689 insertions(+) create mode 100644 docs/superpowers/plans/2026-04-02-news-driven-stock-selector.md diff --git a/docs/superpowers/plans/2026-04-02-news-driven-stock-selector.md b/docs/superpowers/plans/2026-04-02-news-driven-stock-selector.md new file mode 100644 index 0000000..0964f21 --- /dev/null +++ b/docs/superpowers/plans/2026-04-02-news-driven-stock-selector.md @@ -0,0 +1,3689 @@ +# News-Driven Stock Selector Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Replace the MOC strategy's fixed symbol list with a dynamic, news-driven stock selection system that continuously collects news/sentiment data and selects 2-3 optimal stocks daily before market close. + +**Architecture:** A new `news-collector` service runs 7 data source collectors on individual poll intervals, storing `NewsItem` records in PostgreSQL and publishing to Redis. A sentiment aggregator computes per-symbol composite scores every 15 minutes. Before market close, a 3-stage stock selector (sentiment candidates → technical filter → LLM final pick) chooses 2-3 stocks and feeds them to the existing MOC strategy. + +**Tech Stack:** Python 3.12+, asyncio, aiohttp, Pydantic, SQLAlchemy 2.0 async, Redis Streams, VADER (nltk), feedparser (RSS), Anthropic SDK (Claude API), Alembic + +--- + +## Phase 1: Shared Foundation (Models, DB, Events) + +### Task 1: Add NewsItem and sentiment models to shared + +**Files:** +- Modify: `shared/src/shared/models.py` +- Create: `shared/src/shared/sentiment_models.py` +- Create: `shared/tests/test_sentiment_models.py` + +- [ ] **Step 1: Write tests for new models** + +Create `shared/tests/test_sentiment_models.py`: + +```python +"""Tests for news and sentiment models.""" + +import pytest +from datetime import datetime, timezone + +from shared.models import NewsCategory, NewsItem, OrderSide +from shared.sentiment_models import SymbolScore, MarketSentiment, SelectedStock, Candidate + + +def test_news_item_defaults(): + item = NewsItem( + source="finnhub", + headline="Test headline", + published_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + sentiment=0.5, + category=NewsCategory.MACRO, + ) + assert item.id # UUID generated + assert item.symbols == [] + assert item.summary is None + assert item.raw_data == {} + assert item.created_at is not None + + +def test_news_item_with_symbols(): + item = NewsItem( + source="rss", + headline="AAPL earnings beat", + published_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + sentiment=0.8, + category=NewsCategory.EARNINGS, + symbols=["AAPL"], + ) + assert item.symbols == ["AAPL"] + assert item.category == NewsCategory.EARNINGS + + +def test_news_category_values(): + assert NewsCategory.POLICY == "policy" + assert NewsCategory.EARNINGS == "earnings" + assert NewsCategory.MACRO == "macro" + assert NewsCategory.SOCIAL == "social" + assert NewsCategory.FILING == "filing" + assert NewsCategory.FED == "fed" + + +def test_symbol_score(): + score = SymbolScore( + symbol="AAPL", + news_score=0.5, + news_count=10, + social_score=0.3, + policy_score=0.0, + filing_score=0.2, + composite=0.3, + updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + ) + assert score.symbol == "AAPL" + assert score.composite == 0.3 + + +def test_market_sentiment(): + ms = MarketSentiment( + fear_greed=25, + fear_greed_label="Extreme Fear", + vix=32.5, + fed_stance="hawkish", + market_regime="risk_off", + updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + ) + assert ms.market_regime == "risk_off" + assert ms.vix == 32.5 + + +def test_market_sentiment_no_vix(): + ms = MarketSentiment( + fear_greed=50, + fear_greed_label="Neutral", + fed_stance="neutral", + market_regime="neutral", + updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + ) + assert ms.vix is None + + +def test_selected_stock(): + ss = SelectedStock( + symbol="NVDA", + side=OrderSide.BUY, + conviction=0.85, + reason="CHIPS Act expansion", + key_news=["Trump signs CHIPS Act expansion"], + ) + assert ss.conviction == 0.85 + assert len(ss.key_news) == 1 + + +def test_candidate(): + c = Candidate( + symbol="TSLA", + source="sentiment", + direction=OrderSide.BUY, + score=0.75, + reason="High social buzz", + ) + assert c.direction == OrderSide.BUY + + c2 = Candidate( + symbol="XOM", + source="llm", + score=0.6, + reason="Oil price surge", + ) + assert c2.direction is None +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest shared/tests/test_sentiment_models.py -v` +Expected: FAIL — `NewsCategory`, `NewsItem` not found in `shared.models`, `shared.sentiment_models` does not exist + +- [ ] **Step 3: Add NewsCategory and NewsItem to shared/models.py** + +Add to the end of `shared/src/shared/models.py`: + +```python +class NewsCategory(str, Enum): + POLICY = "policy" + EARNINGS = "earnings" + MACRO = "macro" + SOCIAL = "social" + FILING = "filing" + FED = "fed" + + +class NewsItem(BaseModel): + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + source: str + headline: str + summary: Optional[str] = None + url: Optional[str] = None + published_at: datetime + symbols: list[str] = [] + sentiment: float + category: NewsCategory + raw_data: dict = {} + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) +``` + +- [ ] **Step 4: Create shared/src/shared/sentiment_models.py** + +```python +"""Sentiment scoring and stock selection models.""" + +from datetime import datetime +from typing import Optional + +from pydantic import BaseModel + +from shared.models import OrderSide + + +class SymbolScore(BaseModel): + symbol: str + news_score: float + news_count: int + social_score: float + policy_score: float + filing_score: float + composite: float + updated_at: datetime + + +class MarketSentiment(BaseModel): + fear_greed: int + fear_greed_label: str + vix: Optional[float] = None + fed_stance: str + market_regime: str + updated_at: datetime + + +class SelectedStock(BaseModel): + symbol: str + side: OrderSide + conviction: float + reason: str + key_news: list[str] + + +class Candidate(BaseModel): + symbol: str + source: str + direction: Optional[OrderSide] = None + score: float + reason: str +``` + +- [ ] **Step 5: Run tests to verify they pass** + +Run: `pytest shared/tests/test_sentiment_models.py -v` +Expected: All 9 tests PASS + +- [ ] **Step 6: Commit** + +```bash +git add shared/src/shared/models.py shared/src/shared/sentiment_models.py shared/tests/test_sentiment_models.py +git commit -m "feat: add NewsItem, sentiment scoring, and stock selection models" +``` + +--- + +### Task 2: Add SQLAlchemy ORM models for news tables + +**Files:** +- Modify: `shared/src/shared/sa_models.py` +- Create: `shared/tests/test_sa_news_models.py` + +- [ ] **Step 1: Write tests for new SA models** + +Create `shared/tests/test_sa_news_models.py`: + +```python +"""Tests for news-related SQLAlchemy models.""" + +from shared.sa_models import NewsItemRow, SymbolScoreRow, MarketSentimentRow, StockSelectionRow + + +def test_news_item_row_tablename(): + assert NewsItemRow.__tablename__ == "news_items" + + +def test_symbol_score_row_tablename(): + assert SymbolScoreRow.__tablename__ == "symbol_scores" + + +def test_market_sentiment_row_tablename(): + assert MarketSentimentRow.__tablename__ == "market_sentiment" + + +def test_stock_selection_row_tablename(): + assert StockSelectionRow.__tablename__ == "stock_selections" + + +def test_news_item_row_columns(): + cols = {c.name for c in NewsItemRow.__table__.columns} + assert cols >= {"id", "source", "headline", "published_at", "sentiment", "category"} + + +def test_symbol_score_row_columns(): + cols = {c.name for c in SymbolScoreRow.__table__.columns} + assert cols >= {"id", "symbol", "news_score", "composite", "updated_at"} +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest shared/tests/test_sa_news_models.py -v` +Expected: FAIL — import errors + +- [ ] **Step 3: Add ORM models to sa_models.py** + +Add to the end of `shared/src/shared/sa_models.py`: + +```python +class NewsItemRow(Base): + __tablename__ = "news_items" + + id: Mapped[str] = mapped_column(Text, primary_key=True) + source: Mapped[str] = mapped_column(Text, nullable=False) + headline: Mapped[str] = mapped_column(Text, nullable=False) + summary: Mapped[str | None] = mapped_column(Text) + url: Mapped[str | None] = mapped_column(Text) + published_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + symbols: Mapped[str | None] = mapped_column(Text) # JSON-encoded list + sentiment: Mapped[float] = mapped_column(sa.Float, nullable=False) + category: Mapped[str] = mapped_column(Text, nullable=False) + raw_data: Mapped[str | None] = mapped_column(Text) # JSON string + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=sa.func.now() + ) + + +class SymbolScoreRow(Base): + __tablename__ = "symbol_scores" + + id: Mapped[str] = mapped_column(Text, primary_key=True) + symbol: Mapped[str] = mapped_column(Text, nullable=False, unique=True) + news_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0") + news_count: Mapped[int] = mapped_column(sa.Integer, nullable=False, server_default="0") + social_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0") + policy_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0") + filing_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0") + composite: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0") + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + + +class MarketSentimentRow(Base): + __tablename__ = "market_sentiment" + + id: Mapped[str] = mapped_column(Text, primary_key=True) + fear_greed: Mapped[int] = mapped_column(sa.Integer, nullable=False) + fear_greed_label: Mapped[str] = mapped_column(Text, nullable=False) + vix: Mapped[float | None] = mapped_column(sa.Float) + fed_stance: Mapped[str] = mapped_column(Text, nullable=False, server_default="neutral") + market_regime: Mapped[str] = mapped_column(Text, nullable=False, server_default="neutral") + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) + + +class StockSelectionRow(Base): + __tablename__ = "stock_selections" + + id: Mapped[str] = mapped_column(Text, primary_key=True) + trade_date: Mapped[datetime] = mapped_column(sa.Date, nullable=False) + symbol: Mapped[str] = mapped_column(Text, nullable=False) + side: Mapped[str] = mapped_column(Text, nullable=False) + conviction: Mapped[float] = mapped_column(sa.Float, nullable=False) + reason: Mapped[str] = mapped_column(Text, nullable=False) + key_news: Mapped[str | None] = mapped_column(Text) # JSON string + sentiment_snapshot: Mapped[str | None] = mapped_column(Text) # JSON string + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, server_default=sa.func.now() + ) +``` + +Also add `import sqlalchemy as sa` to the imports at the top of `sa_models.py`. + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest shared/tests/test_sa_news_models.py -v` +Expected: All 6 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add shared/src/shared/sa_models.py shared/tests/test_sa_news_models.py +git commit -m "feat: add SQLAlchemy ORM models for news, scores, selections" +``` + +--- + +### Task 3: Create Alembic migration for news tables + +**Files:** +- Create: `shared/alembic/versions/002_news_sentiment_tables.py` + +- [ ] **Step 1: Create migration file** + +Create `shared/alembic/versions/002_news_sentiment_tables.py`: + +```python +"""Add news, sentiment, and stock selection tables + +Revision ID: 002 +Revises: 001 +Create Date: 2026-04-02 +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +revision: str = "002" +down_revision: Union[str, None] = "001" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "news_items", + sa.Column("id", sa.Text, primary_key=True), + sa.Column("source", sa.Text, nullable=False), + sa.Column("headline", sa.Text, nullable=False), + sa.Column("summary", sa.Text), + sa.Column("url", sa.Text), + sa.Column("published_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("symbols", sa.Text), + sa.Column("sentiment", sa.Float, nullable=False), + sa.Column("category", sa.Text, nullable=False), + sa.Column("raw_data", sa.Text), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + ) + op.create_index("idx_news_items_published", "news_items", ["published_at"]) + op.create_index("idx_news_items_source", "news_items", ["source"]) + + op.create_table( + "symbol_scores", + sa.Column("id", sa.Text, primary_key=True), + sa.Column("symbol", sa.Text, nullable=False, unique=True), + sa.Column("news_score", sa.Float, nullable=False, server_default="0"), + sa.Column("news_count", sa.Integer, nullable=False, server_default="0"), + sa.Column("social_score", sa.Float, nullable=False, server_default="0"), + sa.Column("policy_score", sa.Float, nullable=False, server_default="0"), + sa.Column("filing_score", sa.Float, nullable=False, server_default="0"), + sa.Column("composite", sa.Float, nullable=False, server_default="0"), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + ) + + op.create_table( + "market_sentiment", + sa.Column("id", sa.Text, primary_key=True), + sa.Column("fear_greed", sa.Integer, nullable=False), + sa.Column("fear_greed_label", sa.Text, nullable=False), + sa.Column("vix", sa.Float), + sa.Column("fed_stance", sa.Text, nullable=False, server_default="neutral"), + sa.Column("market_regime", sa.Text, nullable=False, server_default="neutral"), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + ) + + op.create_table( + "stock_selections", + sa.Column("id", sa.Text, primary_key=True), + sa.Column("trade_date", sa.Date, nullable=False), + sa.Column("symbol", sa.Text, nullable=False), + sa.Column("side", sa.Text, nullable=False), + sa.Column("conviction", sa.Float, nullable=False), + sa.Column("reason", sa.Text, nullable=False), + sa.Column("key_news", sa.Text), + sa.Column("sentiment_snapshot", sa.Text), + sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()), + ) + op.create_index("idx_stock_selections_date", "stock_selections", ["trade_date"]) + + +def downgrade() -> None: + op.drop_table("stock_selections") + op.drop_table("market_sentiment") + op.drop_table("symbol_scores") + op.drop_table("news_items") +``` + +- [ ] **Step 2: Verify migration imports correctly** + +Run: `cd shared && python -c "from alembic.versions import *; print('OK')" && cd ..` +Or simply: `python -c "import importlib.util; s=importlib.util.spec_from_file_location('m','shared/alembic/versions/002_news_sentiment_tables.py'); m=importlib.util.module_from_spec(s); s.loader.exec_module(m); print('OK')"` +Expected: OK (no import errors) + +- [ ] **Step 3: Commit** + +```bash +git add shared/alembic/versions/002_news_sentiment_tables.py +git commit -m "feat: add Alembic migration for news and sentiment tables" +``` + +--- + +### Task 4: Add NewsEvent to shared events and DB methods for news + +**Files:** +- Modify: `shared/src/shared/events.py` +- Modify: `shared/src/shared/db.py` +- Create: `shared/tests/test_news_events.py` +- Create: `shared/tests/test_db_news.py` + +- [ ] **Step 1: Write tests for NewsEvent** + +Create `shared/tests/test_news_events.py`: + +```python +"""Tests for NewsEvent.""" + +from datetime import datetime, timezone + +from shared.models import NewsCategory, NewsItem +from shared.events import NewsEvent, EventType, Event + + +def test_news_event_to_dict(): + item = NewsItem( + source="finnhub", + headline="Test", + published_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + sentiment=0.5, + category=NewsCategory.MACRO, + ) + event = NewsEvent(data=item) + d = event.to_dict() + assert d["type"] == EventType.NEWS + assert d["data"]["source"] == "finnhub" + + +def test_news_event_from_raw(): + raw = { + "type": "NEWS", + "data": { + "id": "abc", + "source": "rss", + "headline": "Test headline", + "published_at": "2026-04-02T00:00:00+00:00", + "sentiment": 0.3, + "category": "earnings", + "symbols": ["AAPL"], + "raw_data": {}, + }, + } + event = NewsEvent.from_raw(raw) + assert event.data.source == "rss" + assert event.data.symbols == ["AAPL"] + + +def test_event_dispatcher_news(): + raw = { + "type": "NEWS", + "data": { + "id": "abc", + "source": "finnhub", + "headline": "Test", + "published_at": "2026-04-02T00:00:00+00:00", + "sentiment": 0.0, + "category": "macro", + "raw_data": {}, + }, + } + event = Event.from_dict(raw) + assert isinstance(event, NewsEvent) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest shared/tests/test_news_events.py -v` +Expected: FAIL — `NewsEvent` not in events.py, `EventType.NEWS` missing + +- [ ] **Step 3: Add NewsEvent to events.py** + +Add `NEWS = "NEWS"` to `EventType` enum. + +Add `NewsEvent` class and register it in `_EVENT_TYPE_MAP`: + +```python +from shared.models import Candle, Signal, Order, NewsItem + +class NewsEvent(BaseModel): + type: EventType = EventType.NEWS + data: NewsItem + + def to_dict(self) -> dict: + return { + "type": self.type, + "data": self.data.model_dump(mode="json"), + } + + @classmethod + def from_raw(cls, raw: dict) -> "NewsEvent": + return cls(type=raw["type"], data=NewsItem(**raw["data"])) +``` + +Add to `_EVENT_TYPE_MAP`: +```python +EventType.NEWS: NewsEvent, +``` + +- [ ] **Step 4: Run event tests to verify they pass** + +Run: `pytest shared/tests/test_news_events.py -v` +Expected: All 3 tests PASS + +- [ ] **Step 5: Run all existing event tests to check no regressions** + +Run: `pytest shared/tests/test_events.py -v` +Expected: All existing tests PASS + +- [ ] **Step 6: Write tests for DB news methods** + +Create `shared/tests/test_db_news.py`: + +```python +"""Tests for database news/sentiment methods. + +These tests use an in-memory SQLite database. +""" + +import json +import uuid +import pytest +from datetime import datetime, date, timezone + +from shared.db import Database +from shared.models import NewsItem, NewsCategory +from shared.sentiment_models import SymbolScore, MarketSentiment + + +@pytest.fixture +async def db(): + database = Database("sqlite+aiosqlite://") + await database.connect() + yield database + await database.close() + + +async def test_insert_and_get_news_items(db): + item = NewsItem( + source="finnhub", + headline="AAPL earnings beat", + published_at=datetime(2026, 4, 2, 12, 0, tzinfo=timezone.utc), + sentiment=0.8, + category=NewsCategory.EARNINGS, + symbols=["AAPL"], + ) + await db.insert_news_item(item) + items = await db.get_recent_news(hours=24) + assert len(items) == 1 + assert items[0]["headline"] == "AAPL earnings beat" + + +async def test_upsert_symbol_score(db): + score = SymbolScore( + symbol="AAPL", + news_score=0.5, + news_count=10, + social_score=0.3, + policy_score=0.0, + filing_score=0.2, + composite=0.3, + updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + ) + await db.upsert_symbol_score(score) + scores = await db.get_top_symbol_scores(limit=5) + assert len(scores) == 1 + assert scores[0]["symbol"] == "AAPL" + + +async def test_upsert_market_sentiment(db): + ms = MarketSentiment( + fear_greed=55, + fear_greed_label="Neutral", + vix=18.2, + fed_stance="neutral", + market_regime="neutral", + updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + ) + await db.upsert_market_sentiment(ms) + result = await db.get_latest_market_sentiment() + assert result is not None + assert result["fear_greed"] == 55 + + +async def test_insert_stock_selection(db): + await db.insert_stock_selection( + trade_date=date(2026, 4, 2), + symbol="NVDA", + side="BUY", + conviction=0.85, + reason="CHIPS Act", + key_news=["Trump signs CHIPS expansion"], + sentiment_snapshot={"composite": 0.8}, + ) + selections = await db.get_stock_selections(date(2026, 4, 2)) + assert len(selections) == 1 + assert selections[0]["symbol"] == "NVDA" +``` + +- [ ] **Step 7: Run DB news tests to verify they fail** + +Run: `pytest shared/tests/test_db_news.py -v` +Expected: FAIL — methods not yet on Database class + +- [ ] **Step 8: Add news/sentiment DB methods to db.py** + +Add to `shared/src/shared/db.py` — new import at top: + +```python +import json +import uuid +from datetime import date +from shared.models import NewsItem +from shared.sentiment_models import SymbolScore, MarketSentiment +from shared.sa_models import NewsItemRow, SymbolScoreRow, MarketSentimentRow, StockSelectionRow +``` + +Add these methods to the `Database` class: + +```python + async def insert_news_item(self, item: NewsItem) -> None: + """Insert a news item.""" + row = NewsItemRow( + id=item.id, + source=item.source, + headline=item.headline, + summary=item.summary, + url=item.url, + published_at=item.published_at, + symbols=json.dumps(item.symbols), + sentiment=item.sentiment, + category=item.category.value, + raw_data=json.dumps(item.raw_data), + created_at=item.created_at, + ) + async with self._session_factory() as session: + try: + session.add(row) + await session.commit() + except Exception: + await session.rollback() + raise + + async def get_recent_news(self, hours: int = 24) -> list[dict]: + """Retrieve news items from the last N hours.""" + since = datetime.now(timezone.utc) - timedelta(hours=hours) + stmt = ( + select(NewsItemRow) + .where(NewsItemRow.published_at >= since) + .order_by(NewsItemRow.published_at.desc()) + ) + async with self._session_factory() as session: + result = await session.execute(stmt) + rows = result.scalars().all() + return [ + { + "id": r.id, + "source": r.source, + "headline": r.headline, + "summary": r.summary, + "url": r.url, + "published_at": r.published_at, + "symbols": json.loads(r.symbols) if r.symbols else [], + "sentiment": r.sentiment, + "category": r.category, + "created_at": r.created_at, + } + for r in rows + ] + + async def upsert_symbol_score(self, score: SymbolScore) -> None: + """Insert or update a symbol score.""" + async with self._session_factory() as session: + try: + existing = await session.execute( + select(SymbolScoreRow).where(SymbolScoreRow.symbol == score.symbol) + ) + row = existing.scalar_one_or_none() + if row: + row.news_score = score.news_score + row.news_count = score.news_count + row.social_score = score.social_score + row.policy_score = score.policy_score + row.filing_score = score.filing_score + row.composite = score.composite + row.updated_at = score.updated_at + else: + row = SymbolScoreRow( + id=str(uuid.uuid4()), + symbol=score.symbol, + news_score=score.news_score, + news_count=score.news_count, + social_score=score.social_score, + policy_score=score.policy_score, + filing_score=score.filing_score, + composite=score.composite, + updated_at=score.updated_at, + ) + session.add(row) + await session.commit() + except Exception: + await session.rollback() + raise + + async def get_top_symbol_scores(self, limit: int = 20) -> list[dict]: + """Get top symbol scores ordered by composite descending.""" + stmt = ( + select(SymbolScoreRow) + .order_by(SymbolScoreRow.composite.desc()) + .limit(limit) + ) + async with self._session_factory() as session: + result = await session.execute(stmt) + rows = result.scalars().all() + return [ + { + "symbol": r.symbol, + "news_score": r.news_score, + "news_count": r.news_count, + "social_score": r.social_score, + "policy_score": r.policy_score, + "filing_score": r.filing_score, + "composite": r.composite, + "updated_at": r.updated_at, + } + for r in rows + ] + + async def upsert_market_sentiment(self, ms: MarketSentiment) -> None: + """Insert or update the latest market sentiment (single row, id='latest').""" + async with self._session_factory() as session: + try: + existing = await session.execute( + select(MarketSentimentRow).where(MarketSentimentRow.id == "latest") + ) + row = existing.scalar_one_or_none() + if row: + row.fear_greed = ms.fear_greed + row.fear_greed_label = ms.fear_greed_label + row.vix = ms.vix + row.fed_stance = ms.fed_stance + row.market_regime = ms.market_regime + row.updated_at = ms.updated_at + else: + row = MarketSentimentRow( + id="latest", + fear_greed=ms.fear_greed, + fear_greed_label=ms.fear_greed_label, + vix=ms.vix, + fed_stance=ms.fed_stance, + market_regime=ms.market_regime, + updated_at=ms.updated_at, + ) + session.add(row) + await session.commit() + except Exception: + await session.rollback() + raise + + async def get_latest_market_sentiment(self) -> dict | None: + """Get the latest market sentiment.""" + stmt = select(MarketSentimentRow).where(MarketSentimentRow.id == "latest") + async with self._session_factory() as session: + result = await session.execute(stmt) + r = result.scalar_one_or_none() + if r is None: + return None + return { + "fear_greed": r.fear_greed, + "fear_greed_label": r.fear_greed_label, + "vix": r.vix, + "fed_stance": r.fed_stance, + "market_regime": r.market_regime, + "updated_at": r.updated_at, + } + + async def insert_stock_selection( + self, + trade_date: date, + symbol: str, + side: str, + conviction: float, + reason: str, + key_news: list[str], + sentiment_snapshot: dict, + ) -> None: + """Insert a stock selection record.""" + row = StockSelectionRow( + id=str(uuid.uuid4()), + trade_date=trade_date, + symbol=symbol, + side=side, + conviction=conviction, + reason=reason, + key_news=json.dumps(key_news), + sentiment_snapshot=json.dumps(sentiment_snapshot), + ) + async with self._session_factory() as session: + try: + session.add(row) + await session.commit() + except Exception: + await session.rollback() + raise + + async def get_stock_selections(self, trade_date: date) -> list[dict]: + """Get stock selections for a specific date.""" + stmt = ( + select(StockSelectionRow) + .where(StockSelectionRow.trade_date == trade_date) + .order_by(StockSelectionRow.conviction.desc()) + ) + async with self._session_factory() as session: + result = await session.execute(stmt) + rows = result.scalars().all() + return [ + { + "symbol": r.symbol, + "side": r.side, + "conviction": r.conviction, + "reason": r.reason, + "key_news": json.loads(r.key_news) if r.key_news else [], + "sentiment_snapshot": json.loads(r.sentiment_snapshot) if r.sentiment_snapshot else {}, + } + for r in rows + ] +``` + +- [ ] **Step 9: Run DB news tests to verify they pass** + +Run: `pytest shared/tests/test_db_news.py -v` +Expected: All 4 tests PASS + +Note: These tests require `aiosqlite` package. If not installed: `pip install aiosqlite` + +- [ ] **Step 10: Run all shared tests to check no regressions** + +Run: `pytest shared/tests/ -v` +Expected: All tests PASS + +- [ ] **Step 11: Commit** + +```bash +git add shared/src/shared/events.py shared/src/shared/db.py shared/tests/test_news_events.py shared/tests/test_db_news.py +git commit -m "feat: add NewsEvent, DB methods for news/sentiment/selections" +``` + +--- + +### Task 5: Update Settings with new env vars + +**Files:** +- Modify: `shared/src/shared/config.py` +- Modify: `.env.example` + +- [ ] **Step 1: Add new settings to config.py** + +Add these fields to the `Settings` class in `shared/src/shared/config.py`: + +```python + # News collector + finnhub_api_key: str = "" + news_poll_interval: int = 300 + sentiment_aggregate_interval: int = 900 + # Stock selector + selector_candidates_time: str = "15:00" + selector_filter_time: str = "15:15" + selector_final_time: str = "15:30" + selector_max_picks: int = 3 + # LLM + anthropic_api_key: str = "" + anthropic_model: str = "claude-sonnet-4-20250514" +``` + +- [ ] **Step 2: Add to .env.example** + +Append to `.env.example`: + +```bash + +# News Collector +FINNHUB_API_KEY= +NEWS_POLL_INTERVAL=300 +SENTIMENT_AGGREGATE_INTERVAL=900 + +# Stock Selector +SELECTOR_CANDIDATES_TIME=15:00 +SELECTOR_FILTER_TIME=15:15 +SELECTOR_FINAL_TIME=15:30 +SELECTOR_MAX_PICKS=3 + +# LLM (for stock selector) +ANTHROPIC_API_KEY= +ANTHROPIC_MODEL=claude-sonnet-4-20250514 +``` + +- [ ] **Step 3: Commit** + +```bash +git add shared/src/shared/config.py .env.example +git commit -m "feat: add news collector and stock selector config settings" +``` + +--- + +## Phase 2: News Collector Service + +### Task 6: Scaffold news-collector service + +**Files:** +- Create: `services/news-collector/pyproject.toml` +- Create: `services/news-collector/Dockerfile` +- Create: `services/news-collector/src/news_collector/__init__.py` +- Create: `services/news-collector/src/news_collector/config.py` +- Create: `services/news-collector/src/news_collector/collectors/__init__.py` +- Create: `services/news-collector/src/news_collector/collectors/base.py` +- Create: `services/news-collector/tests/__init__.py` + +- [ ] **Step 1: Create pyproject.toml** + +Create `services/news-collector/pyproject.toml`: + +```toml +[project] +name = "news-collector" +version = "0.1.0" +description = "News and sentiment data collector service" +requires-python = ">=3.12" +dependencies = [ + "trading-shared", + "feedparser>=6.0", + "nltk>=3.8", + "aiohttp>=3.9", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0", + "pytest-asyncio>=0.23", + "aioresponses>=0.7", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/news_collector"] +``` + +- [ ] **Step 2: Create Dockerfile** + +Create `services/news-collector/Dockerfile`: + +```dockerfile +FROM python:3.12-slim +WORKDIR /app +COPY shared/ shared/ +RUN pip install --no-cache-dir ./shared +COPY services/news-collector/ services/news-collector/ +RUN pip install --no-cache-dir ./services/news-collector +RUN python -c "import nltk; nltk.download('vader_lexicon', quiet=True)" +ENV PYTHONPATH=/app +CMD ["python", "-m", "news_collector.main"] +``` + +- [ ] **Step 3: Create config.py** + +Create `services/news-collector/src/news_collector/config.py`: + +```python +"""News Collector configuration.""" + +from shared.config import Settings + + +class NewsCollectorConfig(Settings): + health_port: int = 8084 + finnhub_api_key: str = "" + news_poll_interval: int = 300 + sentiment_aggregate_interval: int = 900 +``` + +- [ ] **Step 4: Create BaseCollector** + +Create `services/news-collector/src/news_collector/collectors/base.py`: + +```python +"""Base class for all news collectors.""" + +from abc import ABC, abstractmethod + +from shared.models import NewsItem + + +class BaseCollector(ABC): + name: str = "base" + poll_interval: int = 300 # seconds + + @abstractmethod + async def collect(self) -> list[NewsItem]: + """Collect news items from the source.""" + + @abstractmethod + async def is_available(self) -> bool: + """Check if this data source is accessible.""" +``` + +- [ ] **Step 5: Create __init__.py files** + +Create `services/news-collector/src/news_collector/__init__.py`: +```python +"""News collector service.""" +``` + +Create `services/news-collector/src/news_collector/collectors/__init__.py`: +```python +"""News collectors.""" +``` + +Create `services/news-collector/tests/__init__.py`: +```python +``` + +- [ ] **Step 6: Commit** + +```bash +git add services/news-collector/ +git commit -m "feat: scaffold news-collector service with BaseCollector" +``` + +--- + +### Task 7: Implement Finnhub news collector + +**Files:** +- Create: `services/news-collector/src/news_collector/collectors/finnhub.py` +- Create: `services/news-collector/tests/test_finnhub.py` + +- [ ] **Step 1: Write tests** + +Create `services/news-collector/tests/test_finnhub.py`: + +```python +"""Tests for Finnhub news collector.""" + +import pytest +from unittest.mock import AsyncMock, patch +from datetime import datetime, timezone + +from news_collector.collectors.finnhub import FinnhubCollector + + +@pytest.fixture +def collector(): + return FinnhubCollector(api_key="test_key") + + +def test_collector_name(collector): + assert collector.name == "finnhub" + assert collector.poll_interval == 300 + + +async def test_is_available_with_key(collector): + assert await collector.is_available() is True + + +async def test_is_available_without_key(): + c = FinnhubCollector(api_key="") + assert await c.is_available() is False + + +async def test_collect_parses_response(collector): + mock_response = [ + { + "category": "top news", + "datetime": 1711929600, + "headline": "AAPL beats earnings", + "id": 12345, + "related": "AAPL", + "source": "MarketWatch", + "summary": "Apple reported better than expected...", + "url": "https://example.com/article", + }, + { + "category": "top news", + "datetime": 1711929000, + "headline": "Fed holds rates steady", + "id": 12346, + "related": "", + "source": "Reuters", + "summary": "The Federal Reserve...", + "url": "https://example.com/fed", + }, + ] + + with patch.object(collector, "_fetch_news", new_callable=AsyncMock, return_value=mock_response): + items = await collector.collect() + + assert len(items) == 2 + assert items[0].source == "finnhub" + assert items[0].headline == "AAPL beats earnings" + assert items[0].symbols == ["AAPL"] + assert items[0].url == "https://example.com/article" + assert isinstance(items[0].sentiment, float) + # Second item has no related ticker + assert items[1].symbols == [] + + +async def test_collect_handles_empty_response(collector): + with patch.object(collector, "_fetch_news", new_callable=AsyncMock, return_value=[]): + items = await collector.collect() + assert items == [] +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest services/news-collector/tests/test_finnhub.py -v` +Expected: FAIL — module not found + +- [ ] **Step 3: Implement FinnhubCollector** + +Create `services/news-collector/src/news_collector/collectors/finnhub.py`: + +```python +"""Finnhub market news collector (free tier: 60 req/min).""" + +import logging +from datetime import datetime, timezone + +import aiohttp +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem +from news_collector.collectors.base import BaseCollector + +logger = logging.getLogger(__name__) + +FINNHUB_NEWS_URL = "https://finnhub.io/api/v1/news" + + +class FinnhubCollector(BaseCollector): + name = "finnhub" + poll_interval = 300 # 5 minutes + + def __init__(self, api_key: str) -> None: + self._api_key = api_key + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return bool(self._api_key) + + async def _fetch_news(self) -> list[dict]: + """Fetch general news from Finnhub API.""" + params = {"category": "general", "token": self._api_key} + async with aiohttp.ClientSession() as session: + async with session.get(FINNHUB_NEWS_URL, params=params) as resp: + if resp.status != 200: + logger.warning("finnhub_fetch_failed", status=resp.status) + return [] + return await resp.json() + + def _analyze_sentiment(self, text: str) -> float: + """Return VADER compound score (-1.0 to 1.0).""" + scores = self._vader.polarity_scores(text) + return scores["compound"] + + def _extract_symbols(self, related: str) -> list[str]: + """Parse Finnhub 'related' field into symbol list.""" + if not related or not related.strip(): + return [] + return [s.strip() for s in related.split(",") if s.strip()] + + def _categorize(self, article: dict) -> NewsCategory: + """Determine category from article content.""" + headline = article.get("headline", "").lower() + if any(w in headline for w in ["fed", "fomc", "rate", "inflation"]): + return NewsCategory.FED + if any(w in headline for w in ["tariff", "sanction", "regulation", "trump", "biden", "congress"]): + return NewsCategory.POLICY + if any(w in headline for w in ["earnings", "revenue", "profit", "eps"]): + return NewsCategory.EARNINGS + return NewsCategory.MACRO + + async def collect(self) -> list[NewsItem]: + raw = await self._fetch_news() + items = [] + for article in raw: + headline = article.get("headline", "") + summary = article.get("summary", "") + sentiment_text = f"{headline}. {summary}" if summary else headline + + items.append( + NewsItem( + source=self.name, + headline=headline, + summary=summary or None, + url=article.get("url"), + published_at=datetime.fromtimestamp( + article.get("datetime", 0), tz=timezone.utc + ), + symbols=self._extract_symbols(article.get("related", "")), + sentiment=self._analyze_sentiment(sentiment_text), + category=self._categorize(article), + raw_data=article, + ) + ) + return items +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest services/news-collector/tests/test_finnhub.py -v` +Expected: All 5 tests PASS + +Note: Requires `nltk` and VADER lexicon. If not downloaded: `python -c "import nltk; nltk.download('vader_lexicon')"` + +- [ ] **Step 5: Commit** + +```bash +git add services/news-collector/src/news_collector/collectors/finnhub.py services/news-collector/tests/test_finnhub.py +git commit -m "feat: implement Finnhub news collector with VADER sentiment" +``` + +--- + +### Task 8: Implement RSS news collector + +**Files:** +- Create: `services/news-collector/src/news_collector/collectors/rss.py` +- Create: `services/news-collector/tests/test_rss.py` + +- [ ] **Step 1: Write tests** + +Create `services/news-collector/tests/test_rss.py`: + +```python +"""Tests for RSS news collector.""" + +import pytest +from unittest.mock import AsyncMock, patch +from datetime import datetime, timezone + +from news_collector.collectors.rss import RSSCollector + + +@pytest.fixture +def collector(): + return RSSCollector() + + +def test_collector_name(collector): + assert collector.name == "rss" + assert collector.poll_interval == 600 + + +async def test_is_available(collector): + assert await collector.is_available() is True + + +async def test_collect_parses_feed(collector): + mock_feed = { + "entries": [ + { + "title": "NVDA surges on AI demand", + "link": "https://example.com/nvda", + "published_parsed": (2026, 4, 2, 12, 0, 0, 0, 0, 0), + "summary": "Nvidia stock jumped 5%...", + }, + { + "title": "Markets rally on jobs data", + "link": "https://example.com/market", + "published_parsed": (2026, 4, 2, 11, 0, 0, 0, 0, 0), + "summary": "The S&P 500 rose...", + }, + ], + } + + with patch.object(collector, "_fetch_feeds", new_callable=AsyncMock, return_value=[mock_feed]): + items = await collector.collect() + + assert len(items) == 2 + assert items[0].source == "rss" + assert items[0].headline == "NVDA surges on AI demand" + assert isinstance(items[0].sentiment, float) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest services/news-collector/tests/test_rss.py -v` +Expected: FAIL + +- [ ] **Step 3: Implement RSSCollector** + +Create `services/news-collector/src/news_collector/collectors/rss.py`: + +```python +"""RSS feed collector for Yahoo Finance, Google News, MarketWatch.""" + +import asyncio +import logging +import re +from calendar import timegm +from datetime import datetime, timezone + +import aiohttp +import feedparser +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem +from news_collector.collectors.base import BaseCollector + +logger = logging.getLogger(__name__) + +DEFAULT_FEEDS = [ + "https://feeds.finance.yahoo.com/rss/2.0/headline?s=^GSPC®ion=US&lang=en-US", + "https://news.google.com/rss/topics/CAAqJggKIiBDQkFTRWdvSUwyMHZNRGx6TVdZU0FtVnVHZ0pWVXlnQVAB?hl=en-US&gl=US&ceid=US:en", + "https://www.marketwatch.com/rss/topstories", +] + +# Common US stock tickers to detect in headlines +TICKER_PATTERN = re.compile( + r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|JPM|V|JNJ|WMT|PG|UNH|HD|" + r"MA|DIS|BAC|XOM|PFE|KO|PEP|CSCO|INTC|VZ|NFLX|ADBE|CRM|AMD|QCOM|" + r"GS|BA|CAT|MMM|IBM|GE|F|GM|NKE|MCD|SBUX|SPY|QQQ|IWM)\b" +) + + +class RSSCollector(BaseCollector): + name = "rss" + poll_interval = 600 # 10 minutes + + def __init__(self, feeds: list[str] | None = None) -> None: + self._feeds = feeds or DEFAULT_FEEDS + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_feeds(self) -> list[dict]: + """Fetch and parse all RSS feeds.""" + results = [] + async with aiohttp.ClientSession() as session: + for url in self._feeds: + try: + async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp: + if resp.status == 200: + text = await resp.text() + feed = feedparser.parse(text) + results.append(feed) + except Exception as exc: + logger.warning("rss_fetch_failed", url=url, error=str(exc)) + return results + + def _extract_symbols(self, text: str) -> list[str]: + """Extract stock tickers from text.""" + return list(set(TICKER_PATTERN.findall(text))) + + def _parse_time(self, entry: dict) -> datetime: + """Parse published time from feed entry.""" + parsed = entry.get("published_parsed") + if parsed: + return datetime.fromtimestamp(timegm(parsed), tz=timezone.utc) + return datetime.now(timezone.utc) + + async def collect(self) -> list[NewsItem]: + feeds = await self._fetch_feeds() + items = [] + seen_titles = set() + + for feed in feeds: + for entry in feed.get("entries", []): + title = entry.get("title", "").strip() + if not title or title in seen_titles: + continue + seen_titles.add(title) + + summary = entry.get("summary", "") + sentiment_text = f"{title}. {summary}" if summary else title + + items.append( + NewsItem( + source=self.name, + headline=title, + summary=summary or None, + url=entry.get("link"), + published_at=self._parse_time(entry), + symbols=self._extract_symbols(f"{title} {summary}"), + sentiment=self._vader.polarity_scores(sentiment_text)["compound"], + category=NewsCategory.MACRO, + raw_data={"feed_title": title}, + ) + ) + + return items +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest services/news-collector/tests/test_rss.py -v` +Expected: All 3 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add services/news-collector/src/news_collector/collectors/rss.py services/news-collector/tests/test_rss.py +git commit -m "feat: implement RSS news collector (Yahoo, Google News, MarketWatch)" +``` + +--- + +### Task 9: Implement Fear & Greed Index collector + +**Files:** +- Create: `services/news-collector/src/news_collector/collectors/fear_greed.py` +- Create: `services/news-collector/tests/test_fear_greed.py` + +- [ ] **Step 1: Write tests** + +Create `services/news-collector/tests/test_fear_greed.py`: + +```python +"""Tests for CNN Fear & Greed Index collector.""" + +import pytest +from unittest.mock import AsyncMock, patch + +from news_collector.collectors.fear_greed import FearGreedCollector + + +@pytest.fixture +def collector(): + return FearGreedCollector() + + +def test_collector_name(collector): + assert collector.name == "fear_greed" + assert collector.poll_interval == 3600 + + +async def test_is_available(collector): + assert await collector.is_available() is True + + +async def test_collect_parses_api_response(collector): + mock_data = { + "fear_and_greed": { + "score": 45.0, + "rating": "Fear", + "timestamp": "2026-04-02T12:00:00+00:00", + } + } + with patch.object(collector, "_fetch_index", new_callable=AsyncMock, return_value=mock_data): + result = await collector.collect() + + assert result.fear_greed == 45 + assert result.fear_greed_label == "Fear" + + +async def test_collect_returns_none_on_failure(collector): + with patch.object(collector, "_fetch_index", new_callable=AsyncMock, return_value=None): + result = await collector.collect() + assert result is None + + +def test_classify_label(): + c = FearGreedCollector() + assert c._classify(10) == "Extreme Fear" + assert c._classify(30) == "Fear" + assert c._classify(50) == "Neutral" + assert c._classify(70) == "Greed" + assert c._classify(85) == "Extreme Greed" +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest services/news-collector/tests/test_fear_greed.py -v` +Expected: FAIL + +- [ ] **Step 3: Implement FearGreedCollector** + +Create `services/news-collector/src/news_collector/collectors/fear_greed.py`: + +```python +"""CNN Fear & Greed Index collector.""" + +import logging +from dataclasses import dataclass +from typing import Optional + +import aiohttp + +from news_collector.collectors.base import BaseCollector +from shared.models import NewsItem + +logger = logging.getLogger(__name__) + +FEAR_GREED_URL = "https://production.dataviz.cnn.io/index/fearandgreed/graphdata" + + +@dataclass +class FearGreedResult: + fear_greed: int + fear_greed_label: str + + +class FearGreedCollector(BaseCollector): + """Fetches CNN Fear & Greed Index. + + Note: This collector does NOT return NewsItem — it returns FearGreedResult + which feeds directly into MarketSentiment. The main.py scheduler handles + this differently from news collectors. + """ + + name = "fear_greed" + poll_interval = 3600 # 1 hour + + async def is_available(self) -> bool: + return True + + async def _fetch_index(self) -> Optional[dict]: + """Fetch Fear & Greed data from CNN API.""" + headers = {"User-Agent": "Mozilla/5.0"} + try: + async with aiohttp.ClientSession() as session: + async with session.get( + FEAR_GREED_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=10) + ) as resp: + if resp.status != 200: + logger.warning("fear_greed_fetch_failed", status=resp.status) + return None + return await resp.json() + except Exception as exc: + logger.warning("fear_greed_error", error=str(exc)) + return None + + def _classify(self, score: int) -> str: + """Classify numeric score into label.""" + if score <= 20: + return "Extreme Fear" + if score <= 40: + return "Fear" + if score <= 60: + return "Neutral" + if score <= 80: + return "Greed" + return "Extreme Greed" + + async def collect(self) -> Optional[FearGreedResult]: + """Collect Fear & Greed Index. Returns FearGreedResult or None.""" + data = await self._fetch_index() + if data is None: + return None + + try: + fg = data["fear_and_greed"] + score = int(fg["score"]) + label = fg.get("rating", self._classify(score)) + return FearGreedResult(fear_greed=score, fear_greed_label=label) + except (KeyError, ValueError, TypeError) as exc: + logger.warning("fear_greed_parse_failed", error=str(exc)) + return None +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest services/news-collector/tests/test_fear_greed.py -v` +Expected: All 5 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add services/news-collector/src/news_collector/collectors/fear_greed.py services/news-collector/tests/test_fear_greed.py +git commit -m "feat: implement CNN Fear & Greed Index collector" +``` + +--- + +### Task 10: Implement SEC EDGAR collector + +**Files:** +- Create: `services/news-collector/src/news_collector/collectors/sec_edgar.py` +- Create: `services/news-collector/tests/test_sec_edgar.py` + +- [ ] **Step 1: Write tests** + +Create `services/news-collector/tests/test_sec_edgar.py`: + +```python +"""Tests for SEC EDGAR filing collector.""" + +import pytest +from unittest.mock import AsyncMock, patch + +from news_collector.collectors.sec_edgar import SecEdgarCollector + + +@pytest.fixture +def collector(): + return SecEdgarCollector() + + +def test_collector_name(collector): + assert collector.name == "sec_edgar" + assert collector.poll_interval == 1800 + + +async def test_is_available(collector): + assert await collector.is_available() is True + + +async def test_collect_parses_filings(collector): + mock_response = { + "filings": { + "recent": { + "accessionNumber": ["0001234-26-000001"], + "filingDate": ["2026-04-02"], + "primaryDocument": ["filing.htm"], + "form": ["8-K"], + "primaryDocDescription": ["Current Report"], + } + }, + "tickers": [{"ticker": "AAPL"}], + "name": "Apple Inc", + } + with patch.object(collector, "_fetch_recent_filings", new_callable=AsyncMock, return_value=[mock_response]): + items = await collector.collect() + + assert len(items) == 1 + assert items[0].source == "sec_edgar" + assert items[0].category.value == "filing" + assert "AAPL" in items[0].symbols + + +async def test_collect_handles_empty(collector): + with patch.object(collector, "_fetch_recent_filings", new_callable=AsyncMock, return_value=[]): + items = await collector.collect() + assert items == [] +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest services/news-collector/tests/test_sec_edgar.py -v` +Expected: FAIL + +- [ ] **Step 3: Implement SecEdgarCollector** + +Create `services/news-collector/src/news_collector/collectors/sec_edgar.py`: + +```python +"""SEC EDGAR filing collector (free, no API key required).""" + +import logging +from datetime import datetime, timezone + +import aiohttp +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem +from news_collector.collectors.base import BaseCollector + +logger = logging.getLogger(__name__) + +EDGAR_FULL_TEXT_SEARCH = "https://efts.sec.gov/LATEST/search-index" +EDGAR_RECENT_FILINGS = "https://efts.sec.gov/LATEST/search-index?q=%228-K%22&dateRange=custom&startdt={date}&enddt={date}&forms=8-K" +EDGAR_COMPANY_FILINGS = "https://data.sec.gov/submissions/CIK{cik}.json" + +# CIK numbers for major companies (subset — extend as needed) +TRACKED_CIKS = { + "0000320193": "AAPL", + "0000789019": "MSFT", + "0001652044": "GOOGL", + "0001018724": "AMZN", + "0001318605": "TSLA", + "0001045810": "NVDA", + "0001326801": "META", + "0000019617": "JPM", + "0000078003": "PFE", + "0000021344": "KO", +} + +SEC_USER_AGENT = "TradingPlatform research@example.com" + + +class SecEdgarCollector(BaseCollector): + name = "sec_edgar" + poll_interval = 1800 # 30 minutes + + def __init__(self) -> None: + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_recent_filings(self) -> list[dict]: + """Fetch recent 8-K filings for tracked companies.""" + results = [] + headers = {"User-Agent": SEC_USER_AGENT} + async with aiohttp.ClientSession() as session: + for cik, ticker in TRACKED_CIKS.items(): + try: + url = f"https://data.sec.gov/submissions/CIK{cik}.json" + async with session.get( + url, headers=headers, timeout=aiohttp.ClientTimeout(total=10) + ) as resp: + if resp.status == 200: + data = await resp.json() + data["tickers"] = [{"ticker": ticker}] + results.append(data) + except Exception as exc: + logger.warning("sec_fetch_failed", cik=cik, error=str(exc)) + return results + + async def collect(self) -> list[NewsItem]: + filings_data = await self._fetch_recent_filings() + items = [] + today = datetime.now(timezone.utc).strftime("%Y-%m-%d") + + for company_data in filings_data: + tickers = [t["ticker"] for t in company_data.get("tickers", [])] + company_name = company_data.get("name", "Unknown") + recent = company_data.get("filings", {}).get("recent", {}) + + forms = recent.get("form", []) + dates = recent.get("filingDate", []) + descriptions = recent.get("primaryDocDescription", []) + accessions = recent.get("accessionNumber", []) + + for i, form in enumerate(forms): + if form != "8-K": + continue + filing_date = dates[i] if i < len(dates) else "" + if filing_date != today: + continue + + desc = descriptions[i] if i < len(descriptions) else "8-K Filing" + accession = accessions[i] if i < len(accessions) else "" + headline = f"{company_name} ({', '.join(tickers)}): {form} - {desc}" + + items.append( + NewsItem( + source=self.name, + headline=headline, + summary=desc, + url=f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&accession={accession}", + published_at=datetime.strptime(filing_date, "%Y-%m-%d").replace(tzinfo=timezone.utc), + symbols=tickers, + sentiment=self._vader.polarity_scores(headline)["compound"], + category=NewsCategory.FILING, + raw_data={"form": form, "accession": accession}, + ) + ) + + return items +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest services/news-collector/tests/test_sec_edgar.py -v` +Expected: All 4 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add services/news-collector/src/news_collector/collectors/sec_edgar.py services/news-collector/tests/test_sec_edgar.py +git commit -m "feat: implement SEC EDGAR 8-K filing collector" +``` + +--- + +### Task 11: Implement Reddit collector + +**Files:** +- Create: `services/news-collector/src/news_collector/collectors/reddit.py` +- Create: `services/news-collector/tests/test_reddit.py` + +- [ ] **Step 1: Write tests** + +Create `services/news-collector/tests/test_reddit.py`: + +```python +"""Tests for Reddit collector.""" + +import pytest +from unittest.mock import AsyncMock, patch + +from news_collector.collectors.reddit import RedditCollector + + +@pytest.fixture +def collector(): + return RedditCollector() + + +def test_collector_name(collector): + assert collector.name == "reddit" + assert collector.poll_interval == 900 + + +async def test_is_available(collector): + assert await collector.is_available() is True + + +async def test_collect_parses_posts(collector): + mock_posts = [ + { + "data": { + "title": "NVDA to the moon! 🚀 AI demand is insane", + "selftext": "Just loaded up on NVDA calls", + "url": "https://reddit.com/r/wallstreetbets/123", + "created_utc": 1711929600, + "score": 500, + "num_comments": 200, + "subreddit": "wallstreetbets", + } + }, + ] + with patch.object(collector, "_fetch_subreddit", new_callable=AsyncMock, return_value=mock_posts): + items = await collector.collect() + + assert len(items) >= 1 + assert items[0].source == "reddit" + assert items[0].category.value == "social" + assert isinstance(items[0].sentiment, float) + + +async def test_collect_filters_low_score(collector): + mock_posts = [ + { + "data": { + "title": "Random question about stocks", + "selftext": "", + "url": "https://reddit.com/r/stocks/456", + "created_utc": 1711929600, + "score": 3, + "num_comments": 1, + "subreddit": "stocks", + } + }, + ] + with patch.object(collector, "_fetch_subreddit", new_callable=AsyncMock, return_value=mock_posts): + items = await collector.collect() + assert items == [] +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest services/news-collector/tests/test_reddit.py -v` +Expected: FAIL + +- [ ] **Step 3: Implement RedditCollector** + +Create `services/news-collector/src/news_collector/collectors/reddit.py`: + +```python +"""Reddit collector for r/wallstreetbets, r/stocks, r/investing.""" + +import logging +import re +from datetime import datetime, timezone + +import aiohttp +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem +from news_collector.collectors.base import BaseCollector + +logger = logging.getLogger(__name__) + +SUBREDDITS = ["wallstreetbets", "stocks", "investing"] +MIN_SCORE = 50 # Minimum upvotes to consider + +TICKER_PATTERN = re.compile( + r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|JPM|V|JNJ|WMT|PG|UNH|HD|" + r"MA|DIS|BAC|XOM|PFE|KO|PEP|CSCO|INTC|VZ|NFLX|ADBE|CRM|AMD|QCOM|" + r"GS|BA|CAT|MMM|IBM|GE|F|GM|NKE|MCD|SBUX|SPY|QQQ|IWM)\b" +) + + +class RedditCollector(BaseCollector): + name = "reddit" + poll_interval = 900 # 15 minutes + + def __init__(self) -> None: + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_subreddit(self, subreddit: str = "wallstreetbets") -> list[dict]: + """Fetch hot posts from a subreddit via JSON API.""" + url = f"https://www.reddit.com/r/{subreddit}/hot.json?limit=25" + headers = {"User-Agent": "TradingPlatform/1.0"} + try: + async with aiohttp.ClientSession() as session: + async with session.get( + url, headers=headers, timeout=aiohttp.ClientTimeout(total=10) + ) as resp: + if resp.status != 200: + logger.warning("reddit_fetch_failed", subreddit=subreddit, status=resp.status) + return [] + data = await resp.json() + return data.get("data", {}).get("children", []) + except Exception as exc: + logger.warning("reddit_error", subreddit=subreddit, error=str(exc)) + return [] + + async def collect(self) -> list[NewsItem]: + items = [] + seen_titles = set() + + for subreddit in SUBREDDITS: + posts = await self._fetch_subreddit(subreddit) + for post in posts: + data = post.get("data", {}) + title = data.get("title", "").strip() + score = data.get("score", 0) + + if not title or title in seen_titles or score < MIN_SCORE: + continue + seen_titles.add(title) + + selftext = data.get("selftext", "") + text = f"{title}. {selftext}" if selftext else title + symbols = list(set(TICKER_PATTERN.findall(text))) + + items.append( + NewsItem( + source=self.name, + headline=title, + summary=selftext[:500] if selftext else None, + url=data.get("url"), + published_at=datetime.fromtimestamp( + data.get("created_utc", 0), tz=timezone.utc + ), + symbols=symbols, + sentiment=self._vader.polarity_scores(text)["compound"], + category=NewsCategory.SOCIAL, + raw_data={ + "subreddit": data.get("subreddit", subreddit), + "score": score, + "num_comments": data.get("num_comments", 0), + }, + ) + ) + + return items +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest services/news-collector/tests/test_reddit.py -v` +Expected: All 4 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add services/news-collector/src/news_collector/collectors/reddit.py services/news-collector/tests/test_reddit.py +git commit -m "feat: implement Reddit social sentiment collector" +``` + +--- + +### Task 12: Implement Truth Social and Fed collectors + +**Files:** +- Create: `services/news-collector/src/news_collector/collectors/truth_social.py` +- Create: `services/news-collector/src/news_collector/collectors/fed.py` +- Create: `services/news-collector/tests/test_truth_social.py` +- Create: `services/news-collector/tests/test_fed.py` + +- [ ] **Step 1: Write tests for Truth Social** + +Create `services/news-collector/tests/test_truth_social.py`: + +```python +"""Tests for Truth Social collector.""" + +import pytest +from unittest.mock import AsyncMock, patch + +from news_collector.collectors.truth_social import TruthSocialCollector + + +@pytest.fixture +def collector(): + return TruthSocialCollector() + + +def test_collector_name(collector): + assert collector.name == "truth_social" + assert collector.poll_interval == 900 + + +async def test_is_available(collector): + assert await collector.is_available() is True + + +async def test_collect_parses_posts(collector): + mock_posts = [ + { + "content": "We are imposing 25% tariffs on all steel imports!", + "created_at": "2026-04-02T12:00:00.000Z", + "url": "https://truthsocial.com/@realDonaldTrump/12345", + }, + ] + with patch.object(collector, "_fetch_posts", new_callable=AsyncMock, return_value=mock_posts): + items = await collector.collect() + + assert len(items) == 1 + assert items[0].source == "truth_social" + assert items[0].category.value == "policy" + assert "tariff" in items[0].headline.lower() or "tariff" in items[0].raw_data.get("content", "").lower() + + +async def test_collect_handles_empty(collector): + with patch.object(collector, "_fetch_posts", new_callable=AsyncMock, return_value=[]): + items = await collector.collect() + assert items == [] +``` + +- [ ] **Step 2: Write tests for Fed collector** + +Create `services/news-collector/tests/test_fed.py`: + +```python +"""Tests for Federal Reserve collector.""" + +import pytest +from unittest.mock import AsyncMock, patch + +from news_collector.collectors.fed import FedCollector + + +@pytest.fixture +def collector(): + return FedCollector() + + +def test_collector_name(collector): + assert collector.name == "fed" + assert collector.poll_interval == 3600 + + +async def test_is_available(collector): + assert await collector.is_available() is True + + +async def test_collect_parses_rss(collector): + mock_entries = [ + { + "title": "Federal Reserve issues FOMC statement", + "link": "https://www.federalreserve.gov/newsevents/pressreleases/monetary20260402a.htm", + "published_parsed": (2026, 4, 2, 14, 0, 0, 0, 0, 0), + "summary": "The Federal Open Market Committee decided to maintain the target range...", + }, + ] + with patch.object(collector, "_fetch_fed_rss", new_callable=AsyncMock, return_value=mock_entries): + items = await collector.collect() + + assert len(items) == 1 + assert items[0].source == "fed" + assert items[0].category.value == "fed" +``` + +- [ ] **Step 3: Run tests to verify they fail** + +Run: `pytest services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py -v` +Expected: FAIL + +- [ ] **Step 4: Implement TruthSocialCollector** + +Create `services/news-collector/src/news_collector/collectors/truth_social.py`: + +```python +"""Truth Social collector for Trump posts (policy-relevant).""" + +import logging +from datetime import datetime, timezone + +import aiohttp +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem +from news_collector.collectors.base import BaseCollector + +logger = logging.getLogger(__name__) + +# Truth Social uses a Mastodon-compatible API +TRUTH_SOCIAL_API = "https://truthsocial.com/api/v1/accounts/107780257626128497/statuses" + + +class TruthSocialCollector(BaseCollector): + name = "truth_social" + poll_interval = 900 # 15 minutes + + def __init__(self) -> None: + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_posts(self) -> list[dict]: + """Fetch recent posts from Truth Social.""" + headers = {"User-Agent": "Mozilla/5.0"} + try: + async with aiohttp.ClientSession() as session: + async with session.get( + TRUTH_SOCIAL_API, + headers=headers, + params={"limit": 10}, + timeout=aiohttp.ClientTimeout(total=15), + ) as resp: + if resp.status != 200: + logger.warning("truth_social_fetch_failed", status=resp.status) + return [] + return await resp.json() + except Exception as exc: + logger.warning("truth_social_error", error=str(exc)) + return [] + + def _strip_html(self, text: str) -> str: + """Remove HTML tags from content.""" + import re + return re.sub(r"<[^>]+>", "", text).strip() + + async def collect(self) -> list[NewsItem]: + posts = await self._fetch_posts() + items = [] + + for post in posts: + content = self._strip_html(post.get("content", "")) + if not content: + continue + + created_at_str = post.get("created_at", "") + try: + published = datetime.fromisoformat(created_at_str.replace("Z", "+00:00")) + except (ValueError, AttributeError): + published = datetime.now(timezone.utc) + + items.append( + NewsItem( + source=self.name, + headline=content[:200], + summary=content if len(content) > 200 else None, + url=post.get("url"), + published_at=published, + symbols=[], # Symbols extracted at aggregation stage via LLM + sentiment=self._vader.polarity_scores(content)["compound"], + category=NewsCategory.POLICY, + raw_data={"content": content, "id": post.get("id")}, + ) + ) + + return items +``` + +- [ ] **Step 5: Implement FedCollector** + +Create `services/news-collector/src/news_collector/collectors/fed.py`: + +```python +"""Federal Reserve press release and FOMC statement collector.""" + +import logging +from calendar import timegm +from datetime import datetime, timezone + +import aiohttp +import feedparser +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem +from news_collector.collectors.base import BaseCollector + +logger = logging.getLogger(__name__) + +FED_RSS_URL = "https://www.federalreserve.gov/feeds/press_all.xml" + + +class FedCollector(BaseCollector): + name = "fed" + poll_interval = 3600 # 1 hour + + def __init__(self) -> None: + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_fed_rss(self) -> list[dict]: + """Fetch Federal Reserve RSS feed entries.""" + try: + async with aiohttp.ClientSession() as session: + async with session.get( + FED_RSS_URL, timeout=aiohttp.ClientTimeout(total=10) + ) as resp: + if resp.status != 200: + logger.warning("fed_rss_failed", status=resp.status) + return [] + text = await resp.text() + feed = feedparser.parse(text) + return feed.get("entries", []) + except Exception as exc: + logger.warning("fed_rss_error", error=str(exc)) + return [] + + def _detect_stance(self, text: str) -> str: + """Detect hawkish/dovish/neutral stance from text.""" + text_lower = text.lower() + hawkish_words = ["tighten", "raise", "inflation concern", "restrictive", "higher rates"] + dovish_words = ["accommodate", "cut", "easing", "lower rates", "support growth"] + + hawk_count = sum(1 for w in hawkish_words if w in text_lower) + dove_count = sum(1 for w in dovish_words if w in text_lower) + + if hawk_count > dove_count: + return "hawkish" + if dove_count > hawk_count: + return "dovish" + return "neutral" + + async def collect(self) -> list[NewsItem]: + entries = await self._fetch_fed_rss() + items = [] + + for entry in entries: + title = entry.get("title", "").strip() + if not title: + continue + + summary = entry.get("summary", "") + parsed_time = entry.get("published_parsed") + if parsed_time: + published = datetime.fromtimestamp(timegm(parsed_time), tz=timezone.utc) + else: + published = datetime.now(timezone.utc) + + text = f"{title}. {summary}" if summary else title + + items.append( + NewsItem( + source=self.name, + headline=title, + summary=summary or None, + url=entry.get("link"), + published_at=published, + symbols=[], + sentiment=self._vader.polarity_scores(text)["compound"], + category=NewsCategory.FED, + raw_data={"stance": self._detect_stance(text)}, + ) + ) + + return items +``` + +- [ ] **Step 6: Run tests to verify they pass** + +Run: `pytest services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py -v` +Expected: All 7 tests PASS + +- [ ] **Step 7: Commit** + +```bash +git add services/news-collector/src/news_collector/collectors/truth_social.py services/news-collector/src/news_collector/collectors/fed.py services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py +git commit -m "feat: implement Truth Social and Federal Reserve collectors" +``` + +--- + +### Task 13: Implement news-collector main.py (scheduler) + +**Files:** +- Create: `services/news-collector/src/news_collector/main.py` +- Create: `services/news-collector/tests/test_main.py` + +- [ ] **Step 1: Write tests** + +Create `services/news-collector/tests/test_main.py`: + +```python +"""Tests for news collector scheduler.""" + +import pytest +from unittest.mock import AsyncMock, patch, MagicMock +from datetime import datetime, timezone + +from shared.models import NewsCategory, NewsItem + +from news_collector.main import run_collector_once + + +async def test_run_collector_once_stores_and_publishes(): + mock_item = NewsItem( + source="test", + headline="Test news", + published_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + sentiment=0.5, + category=NewsCategory.MACRO, + ) + + mock_collector = MagicMock() + mock_collector.name = "test" + mock_collector.collect = AsyncMock(return_value=[mock_item]) + + mock_db = MagicMock() + mock_db.insert_news_item = AsyncMock() + + mock_broker = MagicMock() + mock_broker.publish = AsyncMock() + + count = await run_collector_once(mock_collector, mock_db, mock_broker) + + assert count == 1 + mock_db.insert_news_item.assert_called_once_with(mock_item) + mock_broker.publish.assert_called_once() + + +async def test_run_collector_once_handles_empty(): + mock_collector = MagicMock() + mock_collector.name = "test" + mock_collector.collect = AsyncMock(return_value=[]) + + mock_db = MagicMock() + mock_broker = MagicMock() + + count = await run_collector_once(mock_collector, mock_db, mock_broker) + assert count == 0 +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest services/news-collector/tests/test_main.py -v` +Expected: FAIL + +- [ ] **Step 3: Implement main.py** + +Create `services/news-collector/src/news_collector/main.py`: + +```python +"""News Collector Service — schedules and runs all news collectors.""" + +import asyncio +import logging + +from shared.broker import RedisBroker +from shared.db import Database +from shared.events import NewsEvent +from shared.healthcheck import HealthCheckServer +from shared.logging import setup_logging +from shared.metrics import ServiceMetrics +from shared.models import NewsItem +from shared.notifier import TelegramNotifier +from shared.sentiment_models import MarketSentiment + +from news_collector.config import NewsCollectorConfig +from news_collector.collectors.base import BaseCollector +from news_collector.collectors.finnhub import FinnhubCollector +from news_collector.collectors.rss import RSSCollector +from news_collector.collectors.sec_edgar import SecEdgarCollector +from news_collector.collectors.truth_social import TruthSocialCollector +from news_collector.collectors.reddit import RedditCollector +from news_collector.collectors.fear_greed import FearGreedCollector +from news_collector.collectors.fed import FedCollector + +logger = logging.getLogger(__name__) + + +async def run_collector_once( + collector: BaseCollector, + db: Database, + broker: RedisBroker, +) -> int: + """Run a single collector, store results, publish to Redis. + Returns number of items collected.""" + items = await collector.collect() + if not isinstance(items, list): + # FearGreedCollector returns a FearGreedResult, not a list + return 0 + + for item in items: + await db.insert_news_item(item) + event = NewsEvent(data=item) + await broker.publish("news", event.to_dict()) + + return len(items) + + +async def run_collector_loop( + collector: BaseCollector, + db: Database, + broker: RedisBroker, + log, +) -> None: + """Run a collector on its poll interval forever.""" + while True: + try: + if await collector.is_available(): + count = await run_collector_once(collector, db, broker) + log.info("collector_run", collector=collector.name, items=count) + else: + log.debug("collector_unavailable", collector=collector.name) + except Exception as exc: + log.error("collector_error", collector=collector.name, error=str(exc)) + await asyncio.sleep(collector.poll_interval) + + +async def run_fear_greed_loop( + collector: FearGreedCollector, + db: Database, + log, +) -> None: + """Run the Fear & Greed collector and update market sentiment.""" + from datetime import datetime, timezone + + while True: + try: + result = await collector.collect() + if result is not None: + ms = MarketSentiment( + fear_greed=result.fear_greed, + fear_greed_label=result.fear_greed_label, + fed_stance="neutral", # Updated by Fed collector analysis + market_regime=_determine_regime(result.fear_greed, None), + updated_at=datetime.now(timezone.utc), + ) + await db.upsert_market_sentiment(ms) + log.info("fear_greed_updated", score=result.fear_greed, label=result.fear_greed_label) + except Exception as exc: + log.error("fear_greed_error", error=str(exc)) + await asyncio.sleep(collector.poll_interval) + + +async def run_aggregator_loop( + db: Database, + interval: int, + log, +) -> None: + """Run sentiment aggregation every `interval` seconds. + Reads recent news from DB, computes per-symbol scores, upserts into symbol_scores table.""" + from datetime import datetime, timezone + from shared.sentiment import SentimentAggregator + + aggregator = SentimentAggregator() + + while True: + try: + now = datetime.now(timezone.utc) + news_items = await db.get_recent_news(hours=24) + if news_items: + scores = aggregator.aggregate(news_items, now) + for symbol_score in scores.values(): + await db.upsert_symbol_score(symbol_score) + log.info("aggregation_complete", symbols=len(scores)) + except Exception as exc: + log.error("aggregation_error", error=str(exc)) + await asyncio.sleep(interval) + + +def _determine_regime(fear_greed: int, vix: float | None) -> str: + """Determine market regime from Fear & Greed and VIX.""" + if fear_greed <= 20: + return "risk_off" + if vix is not None and vix > 30: + return "risk_off" + if fear_greed >= 60 and (vix is None or vix < 20): + return "risk_on" + return "neutral" + + +async def run() -> None: + config = NewsCollectorConfig() + log = setup_logging("news-collector", config.log_level, config.log_format) + metrics = ServiceMetrics("news_collector") + + notifier = TelegramNotifier( + bot_token=config.telegram_bot_token, + chat_id=config.telegram_chat_id, + ) + + db = Database(config.database_url) + await db.connect() + + broker = RedisBroker(config.redis_url) + + health = HealthCheckServer( + "news-collector", + port=config.health_port, + auth_token=config.metrics_auth_token, + ) + health.register_check("redis", broker.ping) + await health.start() + metrics.service_up.labels(service="news-collector").set(1) + + # Initialize collectors + news_collectors: list[BaseCollector] = [ + RSSCollector(), + SecEdgarCollector(), + TruthSocialCollector(), + RedditCollector(), + FedCollector(), + ] + + # Finnhub requires API key + if config.finnhub_api_key: + news_collectors.append(FinnhubCollector(api_key=config.finnhub_api_key)) + + fear_greed = FearGreedCollector() + + log.info( + "starting", + collectors=[c.name for c in news_collectors], + fear_greed=True, + ) + + tasks = [] + try: + for collector in news_collectors: + task = asyncio.create_task(run_collector_loop(collector, db, broker, log)) + tasks.append(task) + + tasks.append(asyncio.create_task(run_fear_greed_loop(fear_greed, db, log))) + tasks.append(asyncio.create_task( + run_aggregator_loop(db, config.sentiment_aggregate_interval, log) + )) + + await asyncio.gather(*tasks) + except Exception as exc: + log.error("fatal_error", error=str(exc)) + await notifier.send_error(str(exc), "news-collector") + raise + finally: + for task in tasks: + task.cancel() + metrics.service_up.labels(service="news-collector").set(0) + await notifier.close() + await broker.close() + await db.close() + + +def main() -> None: + asyncio.run(run()) + + +if __name__ == "__main__": + main() +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest services/news-collector/tests/test_main.py -v` +Expected: All 2 tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add services/news-collector/src/news_collector/main.py services/news-collector/tests/test_main.py +git commit -m "feat: implement news-collector main scheduler with all collectors" +``` + +--- + +## Phase 3: Sentiment Analysis Pipeline + +### Task 14: Implement sentiment aggregator + +**Files:** +- Modify: `shared/src/shared/sentiment.py` +- Create: `shared/tests/test_sentiment_aggregator.py` + +- [ ] **Step 1: Write tests** + +Create `shared/tests/test_sentiment_aggregator.py`: + +```python +"""Tests for sentiment aggregator.""" + +import pytest +from datetime import datetime, timezone, timedelta + +from shared.sentiment import SentimentAggregator + + +@pytest.fixture +def aggregator(): + return SentimentAggregator() + + +def test_freshness_decay_recent(): + a = SentimentAggregator() + now = datetime.now(timezone.utc) + assert a._freshness_decay(now, now) == 1.0 + + +def test_freshness_decay_3_hours(): + a = SentimentAggregator() + now = datetime.now(timezone.utc) + three_hours_ago = now - timedelta(hours=3) + assert a._freshness_decay(three_hours_ago, now) == 0.7 + + +def test_freshness_decay_12_hours(): + a = SentimentAggregator() + now = datetime.now(timezone.utc) + twelve_hours_ago = now - timedelta(hours=12) + assert a._freshness_decay(twelve_hours_ago, now) == 0.3 + + +def test_freshness_decay_old(): + a = SentimentAggregator() + now = datetime.now(timezone.utc) + two_days_ago = now - timedelta(days=2) + assert a._freshness_decay(two_days_ago, now) == 0.0 + + +def test_compute_composite(): + a = SentimentAggregator() + composite = a._compute_composite( + news_score=0.5, + social_score=0.3, + policy_score=0.8, + filing_score=0.2, + ) + expected = 0.5 * 0.3 + 0.3 * 0.2 + 0.8 * 0.3 + 0.2 * 0.2 + assert abs(composite - expected) < 0.001 + + +def test_aggregate_news_by_symbol(aggregator): + now = datetime.now(timezone.utc) + news_items = [ + { + "symbols": ["AAPL"], + "sentiment": 0.8, + "category": "earnings", + "published_at": now, + }, + { + "symbols": ["AAPL"], + "sentiment": 0.3, + "category": "macro", + "published_at": now - timedelta(hours=2), + }, + { + "symbols": ["MSFT"], + "sentiment": -0.5, + "category": "policy", + "published_at": now, + }, + ] + scores = aggregator.aggregate(news_items, now) + + assert "AAPL" in scores + assert "MSFT" in scores + assert scores["AAPL"].news_count == 2 + assert scores["AAPL"].news_score > 0 # Positive overall + assert scores["MSFT"].policy_score < 0 # Negative policy + + +def test_aggregate_empty(aggregator): + now = datetime.now(timezone.utc) + scores = aggregator.aggregate([], now) + assert scores == {} + + +def test_determine_regime(): + a = SentimentAggregator() + assert a.determine_regime(15, None) == "risk_off" + assert a.determine_regime(15, 35.0) == "risk_off" + assert a.determine_regime(50, 35.0) == "risk_off" + assert a.determine_regime(70, 15.0) == "risk_on" + assert a.determine_regime(50, 20.0) == "neutral" +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest shared/tests/test_sentiment_aggregator.py -v` +Expected: FAIL — `SentimentAggregator` not in sentiment.py + +- [ ] **Step 3: Add SentimentAggregator to sentiment.py** + +Keep the existing `SentimentData` class (for backward compat with existing tests). Add `SentimentAggregator` class at the end of `shared/src/shared/sentiment.py`: + +```python +from datetime import timedelta +from shared.sentiment_models import SymbolScore + + +class SentimentAggregator: + """Aggregates per-news sentiment into per-symbol scores.""" + + # Weights: policy events are most impactful for US stocks + WEIGHTS = { + "news": 0.3, + "social": 0.2, + "policy": 0.3, + "filing": 0.2, + } + + # Category → score field mapping + CATEGORY_MAP = { + "earnings": "news", + "macro": "news", + "social": "social", + "policy": "policy", + "filing": "filing", + "fed": "policy", + } + + def _freshness_decay(self, published_at: datetime, now: datetime) -> float: + """Compute freshness decay factor.""" + age = now - published_at + hours = age.total_seconds() / 3600 + if hours < 1: + return 1.0 + if hours < 6: + return 0.7 + if hours < 24: + return 0.3 + return 0.0 + + def _compute_composite( + self, + news_score: float, + social_score: float, + policy_score: float, + filing_score: float, + ) -> float: + return ( + news_score * self.WEIGHTS["news"] + + social_score * self.WEIGHTS["social"] + + policy_score * self.WEIGHTS["policy"] + + filing_score * self.WEIGHTS["filing"] + ) + + def aggregate( + self, news_items: list[dict], now: datetime + ) -> dict[str, SymbolScore]: + """Aggregate news items into per-symbol scores. + + Each news_items dict must have: symbols, sentiment, category, published_at. + Returns dict mapping symbol → SymbolScore. + """ + # Accumulate per-symbol, per-category + symbol_data: dict[str, dict] = {} + + for item in news_items: + decay = self._freshness_decay(item["published_at"], now) + if decay == 0.0: + continue + + category = item.get("category", "macro") + score_field = self.CATEGORY_MAP.get(category, "news") + weighted_sentiment = item["sentiment"] * decay + + for symbol in item.get("symbols", []): + if symbol not in symbol_data: + symbol_data[symbol] = { + "news_scores": [], + "social_scores": [], + "policy_scores": [], + "filing_scores": [], + "count": 0, + } + + symbol_data[symbol][f"{score_field}_scores"].append(weighted_sentiment) + symbol_data[symbol]["count"] += 1 + + # Compute averages and composites + result = {} + for symbol, data in symbol_data.items(): + news_score = _safe_avg(data["news_scores"]) + social_score = _safe_avg(data["social_scores"]) + policy_score = _safe_avg(data["policy_scores"]) + filing_score = _safe_avg(data["filing_scores"]) + + result[symbol] = SymbolScore( + symbol=symbol, + news_score=news_score, + news_count=data["count"], + social_score=social_score, + policy_score=policy_score, + filing_score=filing_score, + composite=self._compute_composite( + news_score, social_score, policy_score, filing_score + ), + updated_at=now, + ) + + return result + + def determine_regime(self, fear_greed: int, vix: float | None) -> str: + """Determine market regime.""" + if fear_greed <= 20: + return "risk_off" + if vix is not None and vix > 30: + return "risk_off" + if fear_greed >= 60 and (vix is None or vix < 20): + return "risk_on" + return "neutral" + + +def _safe_avg(values: list[float]) -> float: + """Return average of values, or 0.0 if empty.""" + if not values: + return 0.0 + return sum(values) / len(values) +``` + +- [ ] **Step 4: Run new tests to verify they pass** + +Run: `pytest shared/tests/test_sentiment_aggregator.py -v` +Expected: All 9 tests PASS + +- [ ] **Step 5: Run existing sentiment tests for regressions** + +Run: `pytest shared/tests/test_sentiment.py -v` +Expected: All existing tests PASS (SentimentData unchanged) + +- [ ] **Step 6: Commit** + +```bash +git add shared/src/shared/sentiment.py shared/tests/test_sentiment_aggregator.py +git commit -m "feat: implement SentimentAggregator with freshness decay and composite scoring" +``` + +--- + +## Phase 4: Stock Selector Engine + +### Task 15: Implement stock selector + +**Files:** +- Create: `services/strategy-engine/src/strategy_engine/stock_selector.py` +- Create: `services/strategy-engine/tests/test_stock_selector.py` + +- [ ] **Step 1: Write tests** + +Create `services/strategy-engine/tests/test_stock_selector.py`: + +```python +"""Tests for stock selector engine.""" + +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +from datetime import datetime, timezone +from decimal import Decimal + +from shared.models import OrderSide +from shared.sentiment_models import SymbolScore, MarketSentiment, SelectedStock, Candidate + +from strategy_engine.stock_selector import ( + SentimentCandidateSource, + StockSelector, + _parse_llm_selections, +) + + +async def test_sentiment_candidate_source(): + mock_db = MagicMock() + mock_db.get_top_symbol_scores = AsyncMock(return_value=[ + {"symbol": "AAPL", "composite": 0.8, "news_count": 5}, + {"symbol": "NVDA", "composite": 0.6, "news_count": 3}, + ]) + + source = SentimentCandidateSource(mock_db) + candidates = await source.get_candidates() + + assert len(candidates) == 2 + assert candidates[0].symbol == "AAPL" + assert candidates[0].source == "sentiment" + + +def test_parse_llm_selections_valid(): + llm_response = """ + [ + {"symbol": "NVDA", "side": "BUY", "conviction": 0.85, "reason": "AI demand", "key_news": ["NVDA beats earnings"]}, + {"symbol": "XOM", "side": "BUY", "conviction": 0.72, "reason": "Oil surge", "key_news": ["Oil prices up"]} + ] + """ + selections = _parse_llm_selections(llm_response) + assert len(selections) == 2 + assert selections[0].symbol == "NVDA" + assert selections[0].conviction == 0.85 + + +def test_parse_llm_selections_invalid(): + selections = _parse_llm_selections("not json") + assert selections == [] + + +def test_parse_llm_selections_with_markdown(): + llm_response = """ + Here are my picks: + ```json + [ + {"symbol": "TSLA", "side": "BUY", "conviction": 0.7, "reason": "Momentum", "key_news": ["Tesla rally"]} + ] + ``` + """ + selections = _parse_llm_selections(llm_response) + assert len(selections) == 1 + assert selections[0].symbol == "TSLA" + + +async def test_selector_blocks_on_risk_off(): + mock_db = MagicMock() + mock_db.get_latest_market_sentiment = AsyncMock(return_value={ + "fear_greed": 15, + "fear_greed_label": "Extreme Fear", + "vix": 35.0, + "fed_stance": "neutral", + "market_regime": "risk_off", + "updated_at": datetime.now(timezone.utc), + }) + + selector = StockSelector(db=mock_db, broker=MagicMock(), alpaca=MagicMock(), anthropic_api_key="test") + result = await selector.select() + assert result == [] +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest services/strategy-engine/tests/test_stock_selector.py -v` +Expected: FAIL — module not found + +- [ ] **Step 3: Implement StockSelector** + +Create `services/strategy-engine/src/strategy_engine/stock_selector.py`: + +```python +"""Stock Selector Engine — 3-stage dynamic stock selection for MOC trading.""" + +import json +import logging +import re +from datetime import datetime, timezone +from decimal import Decimal +from typing import Optional + +import aiohttp + +from shared.alpaca import AlpacaClient +from shared.broker import RedisBroker +from shared.db import Database +from shared.models import OrderSide +from shared.sentiment_models import Candidate, MarketSentiment, SelectedStock, SymbolScore + +logger = logging.getLogger(__name__) + + +class SentimentCandidateSource: + """Get candidate stocks from sentiment scores in DB.""" + + def __init__(self, db: Database, limit: int = 20) -> None: + self._db = db + self._limit = limit + + async def get_candidates(self) -> list[Candidate]: + scores = await self._db.get_top_symbol_scores(limit=self._limit) + return [ + Candidate( + symbol=s["symbol"], + source="sentiment", + score=s["composite"], + reason=f"Sentiment composite={s['composite']:.2f}, news_count={s['news_count']}", + ) + for s in scores + if s["composite"] != 0 + ] + + +class LLMCandidateSource: + """Get candidate stocks by asking Claude to analyze today's top news.""" + + def __init__(self, db: Database, api_key: str, model: str) -> None: + self._db = db + self._api_key = api_key + self._model = model + + async def get_candidates(self) -> list[Candidate]: + news = await self._db.get_recent_news(hours=24) + if not news: + return [] + + headlines = [f"- [{n['source']}] {n['headline']} (sentiment: {n['sentiment']:.2f})" for n in news[:50]] + prompt = ( + "You are a stock market analyst. Based on today's news headlines below, " + "identify US stocks that are most likely to be affected (positively or negatively). " + "Return a JSON array of objects with: symbol, direction (BUY or SELL), score (0-1), reason.\n\n" + "Headlines:\n" + "\n".join(headlines) + "\n\n" + "Return ONLY the JSON array, no other text." + ) + + try: + async with aiohttp.ClientSession() as session: + async with session.post( + "https://api.anthropic.com/v1/messages", + headers={ + "x-api-key": self._api_key, + "anthropic-version": "2023-06-01", + "content-type": "application/json", + }, + json={ + "model": self._model, + "max_tokens": 1024, + "messages": [{"role": "user", "content": prompt}], + }, + timeout=aiohttp.ClientTimeout(total=30), + ) as resp: + if resp.status != 200: + logger.warning("llm_candidate_failed", status=resp.status) + return [] + data = await resp.json() + text = data["content"][0]["text"] + except Exception as exc: + logger.error("llm_candidate_error", error=str(exc)) + return [] + + return self._parse_response(text) + + def _parse_response(self, text: str) -> list[Candidate]: + try: + # Extract JSON from possible markdown code blocks + json_match = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL) + if json_match: + text = json_match.group(1) + items = json.loads(text) + except (json.JSONDecodeError, TypeError): + return [] + + candidates = [] + for item in items: + try: + direction = OrderSide(item.get("direction", "BUY")) + candidates.append( + Candidate( + symbol=item["symbol"], + source="llm", + direction=direction, + score=float(item.get("score", 0.5)), + reason=item.get("reason", "LLM recommendation"), + ) + ) + except (KeyError, ValueError): + continue + + return candidates + + +class StockSelector: + """3-stage stock selector: candidates → technical filter → LLM final pick.""" + + def __init__( + self, + db: Database, + broker: RedisBroker, + alpaca: AlpacaClient, + anthropic_api_key: str, + anthropic_model: str = "claude-sonnet-4-20250514", + max_picks: int = 3, + ) -> None: + self._db = db + self._broker = broker + self._alpaca = alpaca + self._api_key = anthropic_api_key + self._model = anthropic_model + self._max_picks = max_picks + self._sentiment_source = SentimentCandidateSource(db) + self._llm_source = LLMCandidateSource(db, anthropic_api_key, anthropic_model) + + async def select(self) -> list[SelectedStock]: + """Run full 3-stage selection. Returns list of SelectedStock.""" + # Check market sentiment gate + ms = await self._db.get_latest_market_sentiment() + if ms and ms.get("market_regime") == "risk_off": + logger.info("selection_blocked_risk_off") + return [] + + # Stage 1: Candidate pool + sentiment_candidates = await self._sentiment_source.get_candidates() + llm_candidates = await self._llm_source.get_candidates() + candidates = self._merge_candidates(sentiment_candidates, llm_candidates) + + if not candidates: + logger.info("no_candidates_found") + return [] + + logger.info("candidates_found", count=len(candidates)) + + # Stage 2: Technical filter + filtered = await self._technical_filter(candidates) + if not filtered: + logger.info("all_candidates_filtered_out") + return [] + + logger.info("technical_filter_passed", count=len(filtered)) + + # Stage 3: LLM final selection + selections = await self._llm_final_select(filtered, ms) + + # Publish to Redis + for selection in selections: + await self._broker.publish( + "selected_stocks", + selection.model_dump(mode="json"), + ) + + # Persist audit trail + from datetime import date as date_type + + for selection in selections: + score_data = await self._db.get_top_symbol_scores(limit=100) + snapshot = next( + (s for s in score_data if s["symbol"] == selection.symbol), + {}, + ) + await self._db.insert_stock_selection( + trade_date=date_type.today(), + symbol=selection.symbol, + side=selection.side.value, + conviction=selection.conviction, + reason=selection.reason, + key_news=selection.key_news, + sentiment_snapshot=snapshot, + ) + + return selections + + def _merge_candidates( + self, + sentiment: list[Candidate], + llm: list[Candidate], + ) -> list[Candidate]: + """Merge and deduplicate candidates, preferring higher scores.""" + by_symbol: dict[str, Candidate] = {} + for c in sentiment + llm: + if c.symbol not in by_symbol or c.score > by_symbol[c.symbol].score: + by_symbol[c.symbol] = c + return sorted(by_symbol.values(), key=lambda c: c.score, reverse=True) + + async def _technical_filter(self, candidates: list[Candidate]) -> list[Candidate]: + """Apply MOC-style technical screening to candidates.""" + import pandas as pd + + passed = [] + for candidate in candidates: + try: + bars = await self._alpaca.get_bars( + candidate.symbol, timeframe="1Day", limit=30 + ) + if not bars or len(bars) < 21: + continue + + closes = pd.Series([float(b["c"]) for b in bars]) + volumes = pd.Series([float(b["v"]) for b in bars]) + + # RSI + delta = closes.diff() + gain = delta.clip(lower=0) + loss = -delta.clip(upper=0) + avg_gain = gain.ewm(com=13, min_periods=14).mean() + avg_loss = loss.ewm(com=13, min_periods=14).mean() + rs = avg_gain / avg_loss.replace(0, float("nan")) + rsi = 100 - (100 / (1 + rs)) + current_rsi = rsi.iloc[-1] + + if pd.isna(current_rsi) or not (30 <= current_rsi <= 70): + continue + + # EMA + ema20 = closes.ewm(span=20, adjust=False).mean().iloc[-1] + if closes.iloc[-1] < ema20: + continue + + # Volume above average + vol_avg = volumes.iloc[-20:].mean() + if vol_avg > 0 and volumes.iloc[-1] < vol_avg * 0.5: + continue + + passed.append(candidate) + + except Exception as exc: + logger.warning("technical_filter_error", symbol=candidate.symbol, error=str(exc)) + continue + + return passed + + async def _llm_final_select( + self, + candidates: list[Candidate], + market_sentiment: Optional[dict], + ) -> list[SelectedStock]: + """Ask Claude to make final 2-3 picks from filtered candidates.""" + # Build context + candidate_info = [] + for c in candidates[:15]: + candidate_info.append(f"- {c.symbol}: score={c.score:.2f}, source={c.source}, reason={c.reason}") + + news = await self._db.get_recent_news(hours=12) + top_news = [f"- [{n['source']}] {n['headline']}" for n in news[:20]] + + ms_info = "No market sentiment data available." + if market_sentiment: + ms_info = ( + f"Fear & Greed: {market_sentiment.get('fear_greed', 'N/A')} " + f"({market_sentiment.get('fear_greed_label', 'N/A')}), " + f"VIX: {market_sentiment.get('vix', 'N/A')}, " + f"Fed Stance: {market_sentiment.get('fed_stance', 'N/A')}" + ) + + prompt = ( + f"You are a professional stock trader selecting {self._max_picks} stocks for " + f"Market-on-Close (MOC) overnight trading. You buy at market close and sell at " + f"next day's open.\n\n" + f"## Market Conditions\n{ms_info}\n\n" + f"## Candidate Stocks (pre-screened technically)\n" + + "\n".join(candidate_info) + "\n\n" + f"## Today's Key News\n" + + "\n".join(top_news) + "\n\n" + f"Select the best {self._max_picks} stocks. For each, provide:\n" + f"- symbol: ticker\n" + f"- side: BUY or SELL\n" + f"- conviction: 0.0-1.0\n" + f"- reason: one sentence\n" + f"- key_news: list of relevant headlines\n\n" + f"Return ONLY a JSON array. No other text." + ) + + try: + async with aiohttp.ClientSession() as session: + async with session.post( + "https://api.anthropic.com/v1/messages", + headers={ + "x-api-key": self._api_key, + "anthropic-version": "2023-06-01", + "content-type": "application/json", + }, + json={ + "model": self._model, + "max_tokens": 1024, + "messages": [{"role": "user", "content": prompt}], + }, + timeout=aiohttp.ClientTimeout(total=30), + ) as resp: + if resp.status != 200: + logger.error("llm_final_select_failed", status=resp.status) + return [] + data = await resp.json() + text = data["content"][0]["text"] + except Exception as exc: + logger.error("llm_final_select_error", error=str(exc)) + return [] + + return _parse_llm_selections(text) + + +def _parse_llm_selections(text: str) -> list[SelectedStock]: + """Parse LLM response into SelectedStock list.""" + try: + json_match = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL) + if json_match: + text = json_match.group(1) + # Also try to find a bare JSON array + array_match = re.search(r"\[.*\]", text, re.DOTALL) + if array_match: + text = array_match.group(0) + items = json.loads(text) + except (json.JSONDecodeError, TypeError): + return [] + + selections = [] + for item in items: + try: + selections.append( + SelectedStock( + symbol=item["symbol"], + side=OrderSide(item.get("side", "BUY")), + conviction=float(item.get("conviction", 0.5)), + reason=item.get("reason", ""), + key_news=item.get("key_news", []), + ) + ) + except (KeyError, ValueError): + continue + + return selections +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest services/strategy-engine/tests/test_stock_selector.py -v` +Expected: All 5 tests PASS + +- [ ] **Step 5: Run all strategy engine tests for regressions** + +Run: `pytest services/strategy-engine/tests/ -v` +Expected: All tests PASS + +- [ ] **Step 6: Commit** + +```bash +git add services/strategy-engine/src/strategy_engine/stock_selector.py services/strategy-engine/tests/test_stock_selector.py +git commit -m "feat: implement 3-stage stock selector (sentiment → technical → LLM)" +``` + +--- + +## Phase 5: Integration (MOC + Notifications + Docker) + +### Task 16: Add Telegram notification for stock selections + +**Files:** +- Modify: `shared/src/shared/notifier.py` +- Modify: `shared/tests/test_notifier.py` + +- [ ] **Step 1: Add send_stock_selection method to notifier.py** + +Add this method and import to `shared/src/shared/notifier.py`: + +Add to imports: +```python +from shared.sentiment_models import SelectedStock, MarketSentiment +``` + +Add method to `TelegramNotifier` class: + +```python + async def send_stock_selection( + self, + selections: list[SelectedStock], + market: MarketSentiment | None = None, + ) -> None: + """Format and send stock selection notification.""" + lines = [f"📊 Stock Selection ({len(selections)} picks)", ""] + + side_emoji = {"BUY": "🟢", "SELL": "🔴"} + + for i, s in enumerate(selections, 1): + emoji = side_emoji.get(s.side.value, "⚪") + lines.append( + f"{i}. {s.symbol} {emoji} {s.side.value} " + f"(conviction: {s.conviction:.0%})" + ) + lines.append(f" {s.reason}") + if s.key_news: + lines.append(f" News: {s.key_news[0]}") + lines.append("") + + if market: + lines.append( + f"Market: F&G {market.fear_greed} ({market.fear_greed_label})" + + (f" | VIX {market.vix:.1f}" if market.vix else "") + ) + + await self.send("\n".join(lines)) +``` + +- [ ] **Step 2: Add test for the new method** + +Add to `shared/tests/test_notifier.py`: + +```python +from shared.models import OrderSide +from shared.sentiment_models import SelectedStock, MarketSentiment +from datetime import datetime, timezone + + +async def test_send_stock_selection(notifier, mock_session): + """Test stock selection notification formatting.""" + selections = [ + SelectedStock( + symbol="NVDA", + side=OrderSide.BUY, + conviction=0.85, + reason="CHIPS Act expansion", + key_news=["Trump signs CHIPS Act"], + ), + ] + market = MarketSentiment( + fear_greed=55, + fear_greed_label="Neutral", + vix=18.2, + fed_stance="neutral", + market_regime="neutral", + updated_at=datetime.now(timezone.utc), + ) + await notifier.send_stock_selection(selections, market) + mock_session.post.assert_called_once() +``` + +Note: Check `shared/tests/test_notifier.py` for existing fixture names (`notifier`, `mock_session`) and adapt accordingly. + +- [ ] **Step 3: Run notifier tests** + +Run: `pytest shared/tests/test_notifier.py -v` +Expected: All tests PASS + +- [ ] **Step 4: Commit** + +```bash +git add shared/src/shared/notifier.py shared/tests/test_notifier.py +git commit -m "feat: add Telegram notification for stock selections" +``` + +--- + +### Task 17: Integrate stock selector with MOC strategy + +**Files:** +- Modify: `services/strategy-engine/src/strategy_engine/main.py` +- Modify: `services/strategy-engine/src/strategy_engine/config.py` + +- [ ] **Step 1: Update strategy engine config** + +Add to `StrategyConfig` in `services/strategy-engine/src/strategy_engine/config.py`: + +```python + selector_candidates_time: str = "15:00" + selector_filter_time: str = "15:15" + selector_final_time: str = "15:30" + selector_max_picks: int = 3 + anthropic_api_key: str = "" + anthropic_model: str = "claude-sonnet-4-20250514" +``` + +- [ ] **Step 2: Add stock selector scheduling to main.py** + +Add a new coroutine to `services/strategy-engine/src/strategy_engine/main.py` that runs the stock selector at the configured times. Add imports: + +```python +from shared.alpaca import AlpacaClient +from shared.db import Database +from shared.notifier import TelegramNotifier +from shared.sentiment_models import MarketSentiment +from strategy_engine.stock_selector import StockSelector +``` + +Add the selector loop function: + +```python +async def run_stock_selector( + selector: StockSelector, + notifier: TelegramNotifier, + db: Database, + config: StrategyConfig, + log, +) -> None: + """Run the stock selector once per day at the configured time.""" + import zoneinfo + + et = zoneinfo.ZoneInfo("America/New_York") + + while True: + now_et = datetime.now(et) + target_hour, target_min = map(int, config.selector_final_time.split(":")) + + # Check if it's time to run (within 1-minute window) + if now_et.hour == target_hour and now_et.minute == target_min: + log.info("stock_selector_running") + try: + selections = await selector.select() + if selections: + ms_data = await db.get_latest_market_sentiment() + ms = None + if ms_data: + ms = MarketSentiment(**ms_data) + await notifier.send_stock_selection(selections, ms) + log.info( + "stock_selector_complete", + picks=[s.symbol for s in selections], + ) + else: + log.info("stock_selector_no_picks") + except Exception as exc: + log.error("stock_selector_error", error=str(exc)) + # Sleep past this minute to avoid re-triggering + await asyncio.sleep(120) + else: + await asyncio.sleep(30) +``` + +In the `run()` function, add after creating the broker: + +```python + db = Database(config.database_url) + await db.connect() + + alpaca = AlpacaClient( + api_key=config.alpaca_api_key, + api_secret=config.alpaca_api_secret, + paper=config.alpaca_paper, + ) +``` + +And add the selector if anthropic key is configured: + +```python + if config.anthropic_api_key: + selector = StockSelector( + db=db, + broker=broker, + alpaca=alpaca, + anthropic_api_key=config.anthropic_api_key, + anthropic_model=config.anthropic_model, + max_picks=config.selector_max_picks, + ) + tasks.append(asyncio.create_task( + run_stock_selector(selector, notifier, db, config, log) + )) + log.info("stock_selector_enabled", time=config.selector_final_time) +``` + +Add to the `finally` block: + +```python + await alpaca.close() + await db.close() +``` + +- [ ] **Step 3: Run strategy engine tests for regressions** + +Run: `pytest services/strategy-engine/tests/ -v` +Expected: All tests PASS + +- [ ] **Step 4: Commit** + +```bash +git add services/strategy-engine/src/strategy_engine/main.py services/strategy-engine/src/strategy_engine/config.py +git commit -m "feat: integrate stock selector into strategy engine scheduler" +``` + +--- + +### Task 18: Update Docker Compose and .env + +**Files:** +- Modify: `docker-compose.yml` +- Modify: `.env.example` (already done in Task 5, just verify) + +- [ ] **Step 1: Add news-collector service to docker-compose.yml** + +Add before the `loki:` service block in `docker-compose.yml`: + +```yaml + news-collector: + build: + context: . + dockerfile: services/news-collector/Dockerfile + env_file: .env + ports: + - "8084:8084" + depends_on: + redis: + condition: service_healthy + postgres: + condition: service_healthy + healthcheck: + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8084/health')"] + interval: 10s + timeout: 5s + retries: 3 + restart: unless-stopped +``` + +- [ ] **Step 2: Verify compose file is valid** + +Run: `docker compose config --quiet 2>&1 || echo "INVALID"` +Expected: No output (valid) or compose config displayed without errors + +- [ ] **Step 3: Commit** + +```bash +git add docker-compose.yml +git commit -m "feat: add news-collector service to Docker Compose" +``` + +--- + +### Task 19: Run full test suite and lint + +- [ ] **Step 1: Install test dependencies** + +Run: `pip install -e shared/ && pip install aiosqlite feedparser nltk aioresponses` + +- [ ] **Step 2: Download VADER lexicon** + +Run: `python -c "import nltk; nltk.download('vader_lexicon', quiet=True)"` + +- [ ] **Step 3: Run lint** + +Run: `make lint` +Expected: No lint errors. If there are errors, fix them. + +- [ ] **Step 4: Run full test suite** + +Run: `make test` +Expected: All tests PASS + +- [ ] **Step 5: Fix any issues found in steps 3-4** + +If lint or tests fail, fix the issues and re-run. + +- [ ] **Step 6: Final commit if any fixes were needed** + +```bash +git add -A +git commit -m "fix: resolve lint and test issues from news selector integration" +``` -- cgit v1.2.3