summaryrefslogtreecommitdiff
path: root/docs/superpowers
diff options
context:
space:
mode:
Diffstat (limited to 'docs/superpowers')
-rw-r--r--docs/superpowers/plans/2026-04-02-news-driven-stock-selector.md3689
-rw-r--r--docs/superpowers/plans/2026-04-02-platform-upgrade.md1991
-rw-r--r--docs/superpowers/specs/2026-04-02-news-driven-stock-selector-design.md418
-rw-r--r--docs/superpowers/specs/2026-04-02-platform-upgrade-design.md257
4 files changed, 6355 insertions, 0 deletions
diff --git a/docs/superpowers/plans/2026-04-02-news-driven-stock-selector.md b/docs/superpowers/plans/2026-04-02-news-driven-stock-selector.md
new file mode 100644
index 0000000..0964f21
--- /dev/null
+++ b/docs/superpowers/plans/2026-04-02-news-driven-stock-selector.md
@@ -0,0 +1,3689 @@
+# News-Driven Stock Selector Implementation Plan
+
+> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
+
+**Goal:** Replace the MOC strategy's fixed symbol list with a dynamic, news-driven stock selection system that continuously collects news/sentiment data and selects 2-3 optimal stocks daily before market close.
+
+**Architecture:** A new `news-collector` service runs 7 data source collectors on individual poll intervals, storing `NewsItem` records in PostgreSQL and publishing to Redis. A sentiment aggregator computes per-symbol composite scores every 15 minutes. Before market close, a 3-stage stock selector (sentiment candidates → technical filter → LLM final pick) chooses 2-3 stocks and feeds them to the existing MOC strategy.
+
+**Tech Stack:** Python 3.12+, asyncio, aiohttp, Pydantic, SQLAlchemy 2.0 async, Redis Streams, VADER (nltk), feedparser (RSS), Anthropic SDK (Claude API), Alembic
+
+---
+
+## Phase 1: Shared Foundation (Models, DB, Events)
+
+### Task 1: Add NewsItem and sentiment models to shared
+
+**Files:**
+- Modify: `shared/src/shared/models.py`
+- Create: `shared/src/shared/sentiment_models.py`
+- Create: `shared/tests/test_sentiment_models.py`
+
+- [ ] **Step 1: Write tests for new models**
+
+Create `shared/tests/test_sentiment_models.py`:
+
+```python
+"""Tests for news and sentiment models."""
+
+import pytest
+from datetime import datetime, timezone
+
+from shared.models import NewsCategory, NewsItem, OrderSide
+from shared.sentiment_models import SymbolScore, MarketSentiment, SelectedStock, Candidate
+
+
+def test_news_item_defaults():
+ item = NewsItem(
+ source="finnhub",
+ headline="Test headline",
+ published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ sentiment=0.5,
+ category=NewsCategory.MACRO,
+ )
+ assert item.id # UUID generated
+ assert item.symbols == []
+ assert item.summary is None
+ assert item.raw_data == {}
+ assert item.created_at is not None
+
+
+def test_news_item_with_symbols():
+ item = NewsItem(
+ source="rss",
+ headline="AAPL earnings beat",
+ published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ sentiment=0.8,
+ category=NewsCategory.EARNINGS,
+ symbols=["AAPL"],
+ )
+ assert item.symbols == ["AAPL"]
+ assert item.category == NewsCategory.EARNINGS
+
+
+def test_news_category_values():
+ assert NewsCategory.POLICY == "policy"
+ assert NewsCategory.EARNINGS == "earnings"
+ assert NewsCategory.MACRO == "macro"
+ assert NewsCategory.SOCIAL == "social"
+ assert NewsCategory.FILING == "filing"
+ assert NewsCategory.FED == "fed"
+
+
+def test_symbol_score():
+ score = SymbolScore(
+ symbol="AAPL",
+ news_score=0.5,
+ news_count=10,
+ social_score=0.3,
+ policy_score=0.0,
+ filing_score=0.2,
+ composite=0.3,
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ assert score.symbol == "AAPL"
+ assert score.composite == 0.3
+
+
+def test_market_sentiment():
+ ms = MarketSentiment(
+ fear_greed=25,
+ fear_greed_label="Extreme Fear",
+ vix=32.5,
+ fed_stance="hawkish",
+ market_regime="risk_off",
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ assert ms.market_regime == "risk_off"
+ assert ms.vix == 32.5
+
+
+def test_market_sentiment_no_vix():
+ ms = MarketSentiment(
+ fear_greed=50,
+ fear_greed_label="Neutral",
+ fed_stance="neutral",
+ market_regime="neutral",
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ assert ms.vix is None
+
+
+def test_selected_stock():
+ ss = SelectedStock(
+ symbol="NVDA",
+ side=OrderSide.BUY,
+ conviction=0.85,
+ reason="CHIPS Act expansion",
+ key_news=["Trump signs CHIPS Act expansion"],
+ )
+ assert ss.conviction == 0.85
+ assert len(ss.key_news) == 1
+
+
+def test_candidate():
+ c = Candidate(
+ symbol="TSLA",
+ source="sentiment",
+ direction=OrderSide.BUY,
+ score=0.75,
+ reason="High social buzz",
+ )
+ assert c.direction == OrderSide.BUY
+
+ c2 = Candidate(
+ symbol="XOM",
+ source="llm",
+ score=0.6,
+ reason="Oil price surge",
+ )
+ assert c2.direction is None
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest shared/tests/test_sentiment_models.py -v`
+Expected: FAIL — `NewsCategory`, `NewsItem` not found in `shared.models`, `shared.sentiment_models` does not exist
+
+- [ ] **Step 3: Add NewsCategory and NewsItem to shared/models.py**
+
+Add to the end of `shared/src/shared/models.py`:
+
+```python
+class NewsCategory(str, Enum):
+ POLICY = "policy"
+ EARNINGS = "earnings"
+ MACRO = "macro"
+ SOCIAL = "social"
+ FILING = "filing"
+ FED = "fed"
+
+
+class NewsItem(BaseModel):
+ id: str = Field(default_factory=lambda: str(uuid.uuid4()))
+ source: str
+ headline: str
+ summary: Optional[str] = None
+ url: Optional[str] = None
+ published_at: datetime
+ symbols: list[str] = []
+ sentiment: float
+ category: NewsCategory
+ raw_data: dict = {}
+ created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
+```
+
+- [ ] **Step 4: Create shared/src/shared/sentiment_models.py**
+
+```python
+"""Sentiment scoring and stock selection models."""
+
+from datetime import datetime
+from typing import Optional
+
+from pydantic import BaseModel
+
+from shared.models import OrderSide
+
+
+class SymbolScore(BaseModel):
+ symbol: str
+ news_score: float
+ news_count: int
+ social_score: float
+ policy_score: float
+ filing_score: float
+ composite: float
+ updated_at: datetime
+
+
+class MarketSentiment(BaseModel):
+ fear_greed: int
+ fear_greed_label: str
+ vix: Optional[float] = None
+ fed_stance: str
+ market_regime: str
+ updated_at: datetime
+
+
+class SelectedStock(BaseModel):
+ symbol: str
+ side: OrderSide
+ conviction: float
+ reason: str
+ key_news: list[str]
+
+
+class Candidate(BaseModel):
+ symbol: str
+ source: str
+ direction: Optional[OrderSide] = None
+ score: float
+ reason: str
+```
+
+- [ ] **Step 5: Run tests to verify they pass**
+
+Run: `pytest shared/tests/test_sentiment_models.py -v`
+Expected: All 9 tests PASS
+
+- [ ] **Step 6: Commit**
+
+```bash
+git add shared/src/shared/models.py shared/src/shared/sentiment_models.py shared/tests/test_sentiment_models.py
+git commit -m "feat: add NewsItem, sentiment scoring, and stock selection models"
+```
+
+---
+
+### Task 2: Add SQLAlchemy ORM models for news tables
+
+**Files:**
+- Modify: `shared/src/shared/sa_models.py`
+- Create: `shared/tests/test_sa_news_models.py`
+
+- [ ] **Step 1: Write tests for new SA models**
+
+Create `shared/tests/test_sa_news_models.py`:
+
+```python
+"""Tests for news-related SQLAlchemy models."""
+
+from shared.sa_models import NewsItemRow, SymbolScoreRow, MarketSentimentRow, StockSelectionRow
+
+
+def test_news_item_row_tablename():
+ assert NewsItemRow.__tablename__ == "news_items"
+
+
+def test_symbol_score_row_tablename():
+ assert SymbolScoreRow.__tablename__ == "symbol_scores"
+
+
+def test_market_sentiment_row_tablename():
+ assert MarketSentimentRow.__tablename__ == "market_sentiment"
+
+
+def test_stock_selection_row_tablename():
+ assert StockSelectionRow.__tablename__ == "stock_selections"
+
+
+def test_news_item_row_columns():
+ cols = {c.name for c in NewsItemRow.__table__.columns}
+ assert cols >= {"id", "source", "headline", "published_at", "sentiment", "category"}
+
+
+def test_symbol_score_row_columns():
+ cols = {c.name for c in SymbolScoreRow.__table__.columns}
+ assert cols >= {"id", "symbol", "news_score", "composite", "updated_at"}
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest shared/tests/test_sa_news_models.py -v`
+Expected: FAIL — import errors
+
+- [ ] **Step 3: Add ORM models to sa_models.py**
+
+Add to the end of `shared/src/shared/sa_models.py`:
+
+```python
+class NewsItemRow(Base):
+ __tablename__ = "news_items"
+
+ id: Mapped[str] = mapped_column(Text, primary_key=True)
+ source: Mapped[str] = mapped_column(Text, nullable=False)
+ headline: Mapped[str] = mapped_column(Text, nullable=False)
+ summary: Mapped[str | None] = mapped_column(Text)
+ url: Mapped[str | None] = mapped_column(Text)
+ published_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
+ symbols: Mapped[str | None] = mapped_column(Text) # JSON-encoded list
+ sentiment: Mapped[float] = mapped_column(sa.Float, nullable=False)
+ category: Mapped[str] = mapped_column(Text, nullable=False)
+ raw_data: Mapped[str | None] = mapped_column(Text) # JSON string
+ created_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True), nullable=False, server_default=sa.func.now()
+ )
+
+
+class SymbolScoreRow(Base):
+ __tablename__ = "symbol_scores"
+
+ id: Mapped[str] = mapped_column(Text, primary_key=True)
+ symbol: Mapped[str] = mapped_column(Text, nullable=False, unique=True)
+ news_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ news_count: Mapped[int] = mapped_column(sa.Integer, nullable=False, server_default="0")
+ social_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ policy_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ filing_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ composite: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
+ updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
+
+
+class MarketSentimentRow(Base):
+ __tablename__ = "market_sentiment"
+
+ id: Mapped[str] = mapped_column(Text, primary_key=True)
+ fear_greed: Mapped[int] = mapped_column(sa.Integer, nullable=False)
+ fear_greed_label: Mapped[str] = mapped_column(Text, nullable=False)
+ vix: Mapped[float | None] = mapped_column(sa.Float)
+ fed_stance: Mapped[str] = mapped_column(Text, nullable=False, server_default="neutral")
+ market_regime: Mapped[str] = mapped_column(Text, nullable=False, server_default="neutral")
+ updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
+
+
+class StockSelectionRow(Base):
+ __tablename__ = "stock_selections"
+
+ id: Mapped[str] = mapped_column(Text, primary_key=True)
+ trade_date: Mapped[datetime] = mapped_column(sa.Date, nullable=False)
+ symbol: Mapped[str] = mapped_column(Text, nullable=False)
+ side: Mapped[str] = mapped_column(Text, nullable=False)
+ conviction: Mapped[float] = mapped_column(sa.Float, nullable=False)
+ reason: Mapped[str] = mapped_column(Text, nullable=False)
+ key_news: Mapped[str | None] = mapped_column(Text) # JSON string
+ sentiment_snapshot: Mapped[str | None] = mapped_column(Text) # JSON string
+ created_at: Mapped[datetime] = mapped_column(
+ DateTime(timezone=True), nullable=False, server_default=sa.func.now()
+ )
+```
+
+Also add `import sqlalchemy as sa` to the imports at the top of `sa_models.py`.
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest shared/tests/test_sa_news_models.py -v`
+Expected: All 6 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add shared/src/shared/sa_models.py shared/tests/test_sa_news_models.py
+git commit -m "feat: add SQLAlchemy ORM models for news, scores, selections"
+```
+
+---
+
+### Task 3: Create Alembic migration for news tables
+
+**Files:**
+- Create: `shared/alembic/versions/002_news_sentiment_tables.py`
+
+- [ ] **Step 1: Create migration file**
+
+Create `shared/alembic/versions/002_news_sentiment_tables.py`:
+
+```python
+"""Add news, sentiment, and stock selection tables
+
+Revision ID: 002
+Revises: 001
+Create Date: 2026-04-02
+"""
+
+from typing import Sequence, Union
+
+from alembic import op
+import sqlalchemy as sa
+
+revision: str = "002"
+down_revision: Union[str, None] = "001"
+branch_labels: Union[str, Sequence[str], None] = None
+depends_on: Union[str, Sequence[str], None] = None
+
+
+def upgrade() -> None:
+ op.create_table(
+ "news_items",
+ sa.Column("id", sa.Text, primary_key=True),
+ sa.Column("source", sa.Text, nullable=False),
+ sa.Column("headline", sa.Text, nullable=False),
+ sa.Column("summary", sa.Text),
+ sa.Column("url", sa.Text),
+ sa.Column("published_at", sa.DateTime(timezone=True), nullable=False),
+ sa.Column("symbols", sa.Text),
+ sa.Column("sentiment", sa.Float, nullable=False),
+ sa.Column("category", sa.Text, nullable=False),
+ sa.Column("raw_data", sa.Text),
+ sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
+ )
+ op.create_index("idx_news_items_published", "news_items", ["published_at"])
+ op.create_index("idx_news_items_source", "news_items", ["source"])
+
+ op.create_table(
+ "symbol_scores",
+ sa.Column("id", sa.Text, primary_key=True),
+ sa.Column("symbol", sa.Text, nullable=False, unique=True),
+ sa.Column("news_score", sa.Float, nullable=False, server_default="0"),
+ sa.Column("news_count", sa.Integer, nullable=False, server_default="0"),
+ sa.Column("social_score", sa.Float, nullable=False, server_default="0"),
+ sa.Column("policy_score", sa.Float, nullable=False, server_default="0"),
+ sa.Column("filing_score", sa.Float, nullable=False, server_default="0"),
+ sa.Column("composite", sa.Float, nullable=False, server_default="0"),
+ sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
+ )
+
+ op.create_table(
+ "market_sentiment",
+ sa.Column("id", sa.Text, primary_key=True),
+ sa.Column("fear_greed", sa.Integer, nullable=False),
+ sa.Column("fear_greed_label", sa.Text, nullable=False),
+ sa.Column("vix", sa.Float),
+ sa.Column("fed_stance", sa.Text, nullable=False, server_default="neutral"),
+ sa.Column("market_regime", sa.Text, nullable=False, server_default="neutral"),
+ sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
+ )
+
+ op.create_table(
+ "stock_selections",
+ sa.Column("id", sa.Text, primary_key=True),
+ sa.Column("trade_date", sa.Date, nullable=False),
+ sa.Column("symbol", sa.Text, nullable=False),
+ sa.Column("side", sa.Text, nullable=False),
+ sa.Column("conviction", sa.Float, nullable=False),
+ sa.Column("reason", sa.Text, nullable=False),
+ sa.Column("key_news", sa.Text),
+ sa.Column("sentiment_snapshot", sa.Text),
+ sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
+ )
+ op.create_index("idx_stock_selections_date", "stock_selections", ["trade_date"])
+
+
+def downgrade() -> None:
+ op.drop_table("stock_selections")
+ op.drop_table("market_sentiment")
+ op.drop_table("symbol_scores")
+ op.drop_table("news_items")
+```
+
+- [ ] **Step 2: Verify migration imports correctly**
+
+Run: `cd shared && python -c "from alembic.versions import *; print('OK')" && cd ..`
+Or simply: `python -c "import importlib.util; s=importlib.util.spec_from_file_location('m','shared/alembic/versions/002_news_sentiment_tables.py'); m=importlib.util.module_from_spec(s); s.loader.exec_module(m); print('OK')"`
+Expected: OK (no import errors)
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add shared/alembic/versions/002_news_sentiment_tables.py
+git commit -m "feat: add Alembic migration for news and sentiment tables"
+```
+
+---
+
+### Task 4: Add NewsEvent to shared events and DB methods for news
+
+**Files:**
+- Modify: `shared/src/shared/events.py`
+- Modify: `shared/src/shared/db.py`
+- Create: `shared/tests/test_news_events.py`
+- Create: `shared/tests/test_db_news.py`
+
+- [ ] **Step 1: Write tests for NewsEvent**
+
+Create `shared/tests/test_news_events.py`:
+
+```python
+"""Tests for NewsEvent."""
+
+from datetime import datetime, timezone
+
+from shared.models import NewsCategory, NewsItem
+from shared.events import NewsEvent, EventType, Event
+
+
+def test_news_event_to_dict():
+ item = NewsItem(
+ source="finnhub",
+ headline="Test",
+ published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ sentiment=0.5,
+ category=NewsCategory.MACRO,
+ )
+ event = NewsEvent(data=item)
+ d = event.to_dict()
+ assert d["type"] == EventType.NEWS
+ assert d["data"]["source"] == "finnhub"
+
+
+def test_news_event_from_raw():
+ raw = {
+ "type": "NEWS",
+ "data": {
+ "id": "abc",
+ "source": "rss",
+ "headline": "Test headline",
+ "published_at": "2026-04-02T00:00:00+00:00",
+ "sentiment": 0.3,
+ "category": "earnings",
+ "symbols": ["AAPL"],
+ "raw_data": {},
+ },
+ }
+ event = NewsEvent.from_raw(raw)
+ assert event.data.source == "rss"
+ assert event.data.symbols == ["AAPL"]
+
+
+def test_event_dispatcher_news():
+ raw = {
+ "type": "NEWS",
+ "data": {
+ "id": "abc",
+ "source": "finnhub",
+ "headline": "Test",
+ "published_at": "2026-04-02T00:00:00+00:00",
+ "sentiment": 0.0,
+ "category": "macro",
+ "raw_data": {},
+ },
+ }
+ event = Event.from_dict(raw)
+ assert isinstance(event, NewsEvent)
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest shared/tests/test_news_events.py -v`
+Expected: FAIL — `NewsEvent` not in events.py, `EventType.NEWS` missing
+
+- [ ] **Step 3: Add NewsEvent to events.py**
+
+Add `NEWS = "NEWS"` to `EventType` enum.
+
+Add `NewsEvent` class and register it in `_EVENT_TYPE_MAP`:
+
+```python
+from shared.models import Candle, Signal, Order, NewsItem
+
+class NewsEvent(BaseModel):
+ type: EventType = EventType.NEWS
+ data: NewsItem
+
+ def to_dict(self) -> dict:
+ return {
+ "type": self.type,
+ "data": self.data.model_dump(mode="json"),
+ }
+
+ @classmethod
+ def from_raw(cls, raw: dict) -> "NewsEvent":
+ return cls(type=raw["type"], data=NewsItem(**raw["data"]))
+```
+
+Add to `_EVENT_TYPE_MAP`:
+```python
+EventType.NEWS: NewsEvent,
+```
+
+- [ ] **Step 4: Run event tests to verify they pass**
+
+Run: `pytest shared/tests/test_news_events.py -v`
+Expected: All 3 tests PASS
+
+- [ ] **Step 5: Run all existing event tests to check no regressions**
+
+Run: `pytest shared/tests/test_events.py -v`
+Expected: All existing tests PASS
+
+- [ ] **Step 6: Write tests for DB news methods**
+
+Create `shared/tests/test_db_news.py`:
+
+```python
+"""Tests for database news/sentiment methods.
+
+These tests use an in-memory SQLite database.
+"""
+
+import json
+import uuid
+import pytest
+from datetime import datetime, date, timezone
+
+from shared.db import Database
+from shared.models import NewsItem, NewsCategory
+from shared.sentiment_models import SymbolScore, MarketSentiment
+
+
+@pytest.fixture
+async def db():
+ database = Database("sqlite+aiosqlite://")
+ await database.connect()
+ yield database
+ await database.close()
+
+
+async def test_insert_and_get_news_items(db):
+ item = NewsItem(
+ source="finnhub",
+ headline="AAPL earnings beat",
+ published_at=datetime(2026, 4, 2, 12, 0, tzinfo=timezone.utc),
+ sentiment=0.8,
+ category=NewsCategory.EARNINGS,
+ symbols=["AAPL"],
+ )
+ await db.insert_news_item(item)
+ items = await db.get_recent_news(hours=24)
+ assert len(items) == 1
+ assert items[0]["headline"] == "AAPL earnings beat"
+
+
+async def test_upsert_symbol_score(db):
+ score = SymbolScore(
+ symbol="AAPL",
+ news_score=0.5,
+ news_count=10,
+ social_score=0.3,
+ policy_score=0.0,
+ filing_score=0.2,
+ composite=0.3,
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ await db.upsert_symbol_score(score)
+ scores = await db.get_top_symbol_scores(limit=5)
+ assert len(scores) == 1
+ assert scores[0]["symbol"] == "AAPL"
+
+
+async def test_upsert_market_sentiment(db):
+ ms = MarketSentiment(
+ fear_greed=55,
+ fear_greed_label="Neutral",
+ vix=18.2,
+ fed_stance="neutral",
+ market_regime="neutral",
+ updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ )
+ await db.upsert_market_sentiment(ms)
+ result = await db.get_latest_market_sentiment()
+ assert result is not None
+ assert result["fear_greed"] == 55
+
+
+async def test_insert_stock_selection(db):
+ await db.insert_stock_selection(
+ trade_date=date(2026, 4, 2),
+ symbol="NVDA",
+ side="BUY",
+ conviction=0.85,
+ reason="CHIPS Act",
+ key_news=["Trump signs CHIPS expansion"],
+ sentiment_snapshot={"composite": 0.8},
+ )
+ selections = await db.get_stock_selections(date(2026, 4, 2))
+ assert len(selections) == 1
+ assert selections[0]["symbol"] == "NVDA"
+```
+
+- [ ] **Step 7: Run DB news tests to verify they fail**
+
+Run: `pytest shared/tests/test_db_news.py -v`
+Expected: FAIL — methods not yet on Database class
+
+- [ ] **Step 8: Add news/sentiment DB methods to db.py**
+
+Add to `shared/src/shared/db.py` — new import at top:
+
+```python
+import json
+import uuid
+from datetime import date
+from shared.models import NewsItem
+from shared.sentiment_models import SymbolScore, MarketSentiment
+from shared.sa_models import NewsItemRow, SymbolScoreRow, MarketSentimentRow, StockSelectionRow
+```
+
+Add these methods to the `Database` class:
+
+```python
+ async def insert_news_item(self, item: NewsItem) -> None:
+ """Insert a news item."""
+ row = NewsItemRow(
+ id=item.id,
+ source=item.source,
+ headline=item.headline,
+ summary=item.summary,
+ url=item.url,
+ published_at=item.published_at,
+ symbols=json.dumps(item.symbols),
+ sentiment=item.sentiment,
+ category=item.category.value,
+ raw_data=json.dumps(item.raw_data),
+ created_at=item.created_at,
+ )
+ async with self._session_factory() as session:
+ try:
+ session.add(row)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
+
+ async def get_recent_news(self, hours: int = 24) -> list[dict]:
+ """Retrieve news items from the last N hours."""
+ since = datetime.now(timezone.utc) - timedelta(hours=hours)
+ stmt = (
+ select(NewsItemRow)
+ .where(NewsItemRow.published_at >= since)
+ .order_by(NewsItemRow.published_at.desc())
+ )
+ async with self._session_factory() as session:
+ result = await session.execute(stmt)
+ rows = result.scalars().all()
+ return [
+ {
+ "id": r.id,
+ "source": r.source,
+ "headline": r.headline,
+ "summary": r.summary,
+ "url": r.url,
+ "published_at": r.published_at,
+ "symbols": json.loads(r.symbols) if r.symbols else [],
+ "sentiment": r.sentiment,
+ "category": r.category,
+ "created_at": r.created_at,
+ }
+ for r in rows
+ ]
+
+ async def upsert_symbol_score(self, score: SymbolScore) -> None:
+ """Insert or update a symbol score."""
+ async with self._session_factory() as session:
+ try:
+ existing = await session.execute(
+ select(SymbolScoreRow).where(SymbolScoreRow.symbol == score.symbol)
+ )
+ row = existing.scalar_one_or_none()
+ if row:
+ row.news_score = score.news_score
+ row.news_count = score.news_count
+ row.social_score = score.social_score
+ row.policy_score = score.policy_score
+ row.filing_score = score.filing_score
+ row.composite = score.composite
+ row.updated_at = score.updated_at
+ else:
+ row = SymbolScoreRow(
+ id=str(uuid.uuid4()),
+ symbol=score.symbol,
+ news_score=score.news_score,
+ news_count=score.news_count,
+ social_score=score.social_score,
+ policy_score=score.policy_score,
+ filing_score=score.filing_score,
+ composite=score.composite,
+ updated_at=score.updated_at,
+ )
+ session.add(row)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
+
+ async def get_top_symbol_scores(self, limit: int = 20) -> list[dict]:
+ """Get top symbol scores ordered by composite descending."""
+ stmt = (
+ select(SymbolScoreRow)
+ .order_by(SymbolScoreRow.composite.desc())
+ .limit(limit)
+ )
+ async with self._session_factory() as session:
+ result = await session.execute(stmt)
+ rows = result.scalars().all()
+ return [
+ {
+ "symbol": r.symbol,
+ "news_score": r.news_score,
+ "news_count": r.news_count,
+ "social_score": r.social_score,
+ "policy_score": r.policy_score,
+ "filing_score": r.filing_score,
+ "composite": r.composite,
+ "updated_at": r.updated_at,
+ }
+ for r in rows
+ ]
+
+ async def upsert_market_sentiment(self, ms: MarketSentiment) -> None:
+ """Insert or update the latest market sentiment (single row, id='latest')."""
+ async with self._session_factory() as session:
+ try:
+ existing = await session.execute(
+ select(MarketSentimentRow).where(MarketSentimentRow.id == "latest")
+ )
+ row = existing.scalar_one_or_none()
+ if row:
+ row.fear_greed = ms.fear_greed
+ row.fear_greed_label = ms.fear_greed_label
+ row.vix = ms.vix
+ row.fed_stance = ms.fed_stance
+ row.market_regime = ms.market_regime
+ row.updated_at = ms.updated_at
+ else:
+ row = MarketSentimentRow(
+ id="latest",
+ fear_greed=ms.fear_greed,
+ fear_greed_label=ms.fear_greed_label,
+ vix=ms.vix,
+ fed_stance=ms.fed_stance,
+ market_regime=ms.market_regime,
+ updated_at=ms.updated_at,
+ )
+ session.add(row)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
+
+ async def get_latest_market_sentiment(self) -> dict | None:
+ """Get the latest market sentiment."""
+ stmt = select(MarketSentimentRow).where(MarketSentimentRow.id == "latest")
+ async with self._session_factory() as session:
+ result = await session.execute(stmt)
+ r = result.scalar_one_or_none()
+ if r is None:
+ return None
+ return {
+ "fear_greed": r.fear_greed,
+ "fear_greed_label": r.fear_greed_label,
+ "vix": r.vix,
+ "fed_stance": r.fed_stance,
+ "market_regime": r.market_regime,
+ "updated_at": r.updated_at,
+ }
+
+ async def insert_stock_selection(
+ self,
+ trade_date: date,
+ symbol: str,
+ side: str,
+ conviction: float,
+ reason: str,
+ key_news: list[str],
+ sentiment_snapshot: dict,
+ ) -> None:
+ """Insert a stock selection record."""
+ row = StockSelectionRow(
+ id=str(uuid.uuid4()),
+ trade_date=trade_date,
+ symbol=symbol,
+ side=side,
+ conviction=conviction,
+ reason=reason,
+ key_news=json.dumps(key_news),
+ sentiment_snapshot=json.dumps(sentiment_snapshot),
+ )
+ async with self._session_factory() as session:
+ try:
+ session.add(row)
+ await session.commit()
+ except Exception:
+ await session.rollback()
+ raise
+
+ async def get_stock_selections(self, trade_date: date) -> list[dict]:
+ """Get stock selections for a specific date."""
+ stmt = (
+ select(StockSelectionRow)
+ .where(StockSelectionRow.trade_date == trade_date)
+ .order_by(StockSelectionRow.conviction.desc())
+ )
+ async with self._session_factory() as session:
+ result = await session.execute(stmt)
+ rows = result.scalars().all()
+ return [
+ {
+ "symbol": r.symbol,
+ "side": r.side,
+ "conviction": r.conviction,
+ "reason": r.reason,
+ "key_news": json.loads(r.key_news) if r.key_news else [],
+ "sentiment_snapshot": json.loads(r.sentiment_snapshot) if r.sentiment_snapshot else {},
+ }
+ for r in rows
+ ]
+```
+
+- [ ] **Step 9: Run DB news tests to verify they pass**
+
+Run: `pytest shared/tests/test_db_news.py -v`
+Expected: All 4 tests PASS
+
+Note: These tests require `aiosqlite` package. If not installed: `pip install aiosqlite`
+
+- [ ] **Step 10: Run all shared tests to check no regressions**
+
+Run: `pytest shared/tests/ -v`
+Expected: All tests PASS
+
+- [ ] **Step 11: Commit**
+
+```bash
+git add shared/src/shared/events.py shared/src/shared/db.py shared/tests/test_news_events.py shared/tests/test_db_news.py
+git commit -m "feat: add NewsEvent, DB methods for news/sentiment/selections"
+```
+
+---
+
+### Task 5: Update Settings with new env vars
+
+**Files:**
+- Modify: `shared/src/shared/config.py`
+- Modify: `.env.example`
+
+- [ ] **Step 1: Add new settings to config.py**
+
+Add these fields to the `Settings` class in `shared/src/shared/config.py`:
+
+```python
+ # News collector
+ finnhub_api_key: str = ""
+ news_poll_interval: int = 300
+ sentiment_aggregate_interval: int = 900
+ # Stock selector
+ selector_candidates_time: str = "15:00"
+ selector_filter_time: str = "15:15"
+ selector_final_time: str = "15:30"
+ selector_max_picks: int = 3
+ # LLM
+ anthropic_api_key: str = ""
+ anthropic_model: str = "claude-sonnet-4-20250514"
+```
+
+- [ ] **Step 2: Add to .env.example**
+
+Append to `.env.example`:
+
+```bash
+
+# News Collector
+FINNHUB_API_KEY=
+NEWS_POLL_INTERVAL=300
+SENTIMENT_AGGREGATE_INTERVAL=900
+
+# Stock Selector
+SELECTOR_CANDIDATES_TIME=15:00
+SELECTOR_FILTER_TIME=15:15
+SELECTOR_FINAL_TIME=15:30
+SELECTOR_MAX_PICKS=3
+
+# LLM (for stock selector)
+ANTHROPIC_API_KEY=
+ANTHROPIC_MODEL=claude-sonnet-4-20250514
+```
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add shared/src/shared/config.py .env.example
+git commit -m "feat: add news collector and stock selector config settings"
+```
+
+---
+
+## Phase 2: News Collector Service
+
+### Task 6: Scaffold news-collector service
+
+**Files:**
+- Create: `services/news-collector/pyproject.toml`
+- Create: `services/news-collector/Dockerfile`
+- Create: `services/news-collector/src/news_collector/__init__.py`
+- Create: `services/news-collector/src/news_collector/config.py`
+- Create: `services/news-collector/src/news_collector/collectors/__init__.py`
+- Create: `services/news-collector/src/news_collector/collectors/base.py`
+- Create: `services/news-collector/tests/__init__.py`
+
+- [ ] **Step 1: Create pyproject.toml**
+
+Create `services/news-collector/pyproject.toml`:
+
+```toml
+[project]
+name = "news-collector"
+version = "0.1.0"
+description = "News and sentiment data collector service"
+requires-python = ">=3.12"
+dependencies = [
+ "trading-shared",
+ "feedparser>=6.0",
+ "nltk>=3.8",
+ "aiohttp>=3.9",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest>=8.0",
+ "pytest-asyncio>=0.23",
+ "aioresponses>=0.7",
+]
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/news_collector"]
+```
+
+- [ ] **Step 2: Create Dockerfile**
+
+Create `services/news-collector/Dockerfile`:
+
+```dockerfile
+FROM python:3.12-slim
+WORKDIR /app
+COPY shared/ shared/
+RUN pip install --no-cache-dir ./shared
+COPY services/news-collector/ services/news-collector/
+RUN pip install --no-cache-dir ./services/news-collector
+RUN python -c "import nltk; nltk.download('vader_lexicon', quiet=True)"
+ENV PYTHONPATH=/app
+CMD ["python", "-m", "news_collector.main"]
+```
+
+- [ ] **Step 3: Create config.py**
+
+Create `services/news-collector/src/news_collector/config.py`:
+
+```python
+"""News Collector configuration."""
+
+from shared.config import Settings
+
+
+class NewsCollectorConfig(Settings):
+ health_port: int = 8084
+ finnhub_api_key: str = ""
+ news_poll_interval: int = 300
+ sentiment_aggregate_interval: int = 900
+```
+
+- [ ] **Step 4: Create BaseCollector**
+
+Create `services/news-collector/src/news_collector/collectors/base.py`:
+
+```python
+"""Base class for all news collectors."""
+
+from abc import ABC, abstractmethod
+
+from shared.models import NewsItem
+
+
+class BaseCollector(ABC):
+ name: str = "base"
+ poll_interval: int = 300 # seconds
+
+ @abstractmethod
+ async def collect(self) -> list[NewsItem]:
+ """Collect news items from the source."""
+
+ @abstractmethod
+ async def is_available(self) -> bool:
+ """Check if this data source is accessible."""
+```
+
+- [ ] **Step 5: Create __init__.py files**
+
+Create `services/news-collector/src/news_collector/__init__.py`:
+```python
+"""News collector service."""
+```
+
+Create `services/news-collector/src/news_collector/collectors/__init__.py`:
+```python
+"""News collectors."""
+```
+
+Create `services/news-collector/tests/__init__.py`:
+```python
+```
+
+- [ ] **Step 6: Commit**
+
+```bash
+git add services/news-collector/
+git commit -m "feat: scaffold news-collector service with BaseCollector"
+```
+
+---
+
+### Task 7: Implement Finnhub news collector
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/finnhub.py`
+- Create: `services/news-collector/tests/test_finnhub.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_finnhub.py`:
+
+```python
+"""Tests for Finnhub news collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+from datetime import datetime, timezone
+
+from news_collector.collectors.finnhub import FinnhubCollector
+
+
+@pytest.fixture
+def collector():
+ return FinnhubCollector(api_key="test_key")
+
+
+def test_collector_name(collector):
+ assert collector.name == "finnhub"
+ assert collector.poll_interval == 300
+
+
+async def test_is_available_with_key(collector):
+ assert await collector.is_available() is True
+
+
+async def test_is_available_without_key():
+ c = FinnhubCollector(api_key="")
+ assert await c.is_available() is False
+
+
+async def test_collect_parses_response(collector):
+ mock_response = [
+ {
+ "category": "top news",
+ "datetime": 1711929600,
+ "headline": "AAPL beats earnings",
+ "id": 12345,
+ "related": "AAPL",
+ "source": "MarketWatch",
+ "summary": "Apple reported better than expected...",
+ "url": "https://example.com/article",
+ },
+ {
+ "category": "top news",
+ "datetime": 1711929000,
+ "headline": "Fed holds rates steady",
+ "id": 12346,
+ "related": "",
+ "source": "Reuters",
+ "summary": "The Federal Reserve...",
+ "url": "https://example.com/fed",
+ },
+ ]
+
+ with patch.object(collector, "_fetch_news", new_callable=AsyncMock, return_value=mock_response):
+ items = await collector.collect()
+
+ assert len(items) == 2
+ assert items[0].source == "finnhub"
+ assert items[0].headline == "AAPL beats earnings"
+ assert items[0].symbols == ["AAPL"]
+ assert items[0].url == "https://example.com/article"
+ assert isinstance(items[0].sentiment, float)
+ # Second item has no related ticker
+ assert items[1].symbols == []
+
+
+async def test_collect_handles_empty_response(collector):
+ with patch.object(collector, "_fetch_news", new_callable=AsyncMock, return_value=[]):
+ items = await collector.collect()
+ assert items == []
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_finnhub.py -v`
+Expected: FAIL — module not found
+
+- [ ] **Step 3: Implement FinnhubCollector**
+
+Create `services/news-collector/src/news_collector/collectors/finnhub.py`:
+
+```python
+"""Finnhub market news collector (free tier: 60 req/min)."""
+
+import logging
+from datetime import datetime, timezone
+
+import aiohttp
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+FINNHUB_NEWS_URL = "https://finnhub.io/api/v1/news"
+
+
+class FinnhubCollector(BaseCollector):
+ name = "finnhub"
+ poll_interval = 300 # 5 minutes
+
+ def __init__(self, api_key: str) -> None:
+ self._api_key = api_key
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return bool(self._api_key)
+
+ async def _fetch_news(self) -> list[dict]:
+ """Fetch general news from Finnhub API."""
+ params = {"category": "general", "token": self._api_key}
+ async with aiohttp.ClientSession() as session:
+ async with session.get(FINNHUB_NEWS_URL, params=params) as resp:
+ if resp.status != 200:
+ logger.warning("finnhub_fetch_failed", status=resp.status)
+ return []
+ return await resp.json()
+
+ def _analyze_sentiment(self, text: str) -> float:
+ """Return VADER compound score (-1.0 to 1.0)."""
+ scores = self._vader.polarity_scores(text)
+ return scores["compound"]
+
+ def _extract_symbols(self, related: str) -> list[str]:
+ """Parse Finnhub 'related' field into symbol list."""
+ if not related or not related.strip():
+ return []
+ return [s.strip() for s in related.split(",") if s.strip()]
+
+ def _categorize(self, article: dict) -> NewsCategory:
+ """Determine category from article content."""
+ headline = article.get("headline", "").lower()
+ if any(w in headline for w in ["fed", "fomc", "rate", "inflation"]):
+ return NewsCategory.FED
+ if any(w in headline for w in ["tariff", "sanction", "regulation", "trump", "biden", "congress"]):
+ return NewsCategory.POLICY
+ if any(w in headline for w in ["earnings", "revenue", "profit", "eps"]):
+ return NewsCategory.EARNINGS
+ return NewsCategory.MACRO
+
+ async def collect(self) -> list[NewsItem]:
+ raw = await self._fetch_news()
+ items = []
+ for article in raw:
+ headline = article.get("headline", "")
+ summary = article.get("summary", "")
+ sentiment_text = f"{headline}. {summary}" if summary else headline
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=headline,
+ summary=summary or None,
+ url=article.get("url"),
+ published_at=datetime.fromtimestamp(
+ article.get("datetime", 0), tz=timezone.utc
+ ),
+ symbols=self._extract_symbols(article.get("related", "")),
+ sentiment=self._analyze_sentiment(sentiment_text),
+ category=self._categorize(article),
+ raw_data=article,
+ )
+ )
+ return items
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_finnhub.py -v`
+Expected: All 5 tests PASS
+
+Note: Requires `nltk` and VADER lexicon. If not downloaded: `python -c "import nltk; nltk.download('vader_lexicon')"`
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/finnhub.py services/news-collector/tests/test_finnhub.py
+git commit -m "feat: implement Finnhub news collector with VADER sentiment"
+```
+
+---
+
+### Task 8: Implement RSS news collector
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/rss.py`
+- Create: `services/news-collector/tests/test_rss.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_rss.py`:
+
+```python
+"""Tests for RSS news collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+from datetime import datetime, timezone
+
+from news_collector.collectors.rss import RSSCollector
+
+
+@pytest.fixture
+def collector():
+ return RSSCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "rss"
+ assert collector.poll_interval == 600
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_feed(collector):
+ mock_feed = {
+ "entries": [
+ {
+ "title": "NVDA surges on AI demand",
+ "link": "https://example.com/nvda",
+ "published_parsed": (2026, 4, 2, 12, 0, 0, 0, 0, 0),
+ "summary": "Nvidia stock jumped 5%...",
+ },
+ {
+ "title": "Markets rally on jobs data",
+ "link": "https://example.com/market",
+ "published_parsed": (2026, 4, 2, 11, 0, 0, 0, 0, 0),
+ "summary": "The S&P 500 rose...",
+ },
+ ],
+ }
+
+ with patch.object(collector, "_fetch_feeds", new_callable=AsyncMock, return_value=[mock_feed]):
+ items = await collector.collect()
+
+ assert len(items) == 2
+ assert items[0].source == "rss"
+ assert items[0].headline == "NVDA surges on AI demand"
+ assert isinstance(items[0].sentiment, float)
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_rss.py -v`
+Expected: FAIL
+
+- [ ] **Step 3: Implement RSSCollector**
+
+Create `services/news-collector/src/news_collector/collectors/rss.py`:
+
+```python
+"""RSS feed collector for Yahoo Finance, Google News, MarketWatch."""
+
+import asyncio
+import logging
+import re
+from calendar import timegm
+from datetime import datetime, timezone
+
+import aiohttp
+import feedparser
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+DEFAULT_FEEDS = [
+ "https://feeds.finance.yahoo.com/rss/2.0/headline?s=^GSPC&region=US&lang=en-US",
+ "https://news.google.com/rss/topics/CAAqJggKIiBDQkFTRWdvSUwyMHZNRGx6TVdZU0FtVnVHZ0pWVXlnQVAB?hl=en-US&gl=US&ceid=US:en",
+ "https://www.marketwatch.com/rss/topstories",
+]
+
+# Common US stock tickers to detect in headlines
+TICKER_PATTERN = re.compile(
+ r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|JPM|V|JNJ|WMT|PG|UNH|HD|"
+ r"MA|DIS|BAC|XOM|PFE|KO|PEP|CSCO|INTC|VZ|NFLX|ADBE|CRM|AMD|QCOM|"
+ r"GS|BA|CAT|MMM|IBM|GE|F|GM|NKE|MCD|SBUX|SPY|QQQ|IWM)\b"
+)
+
+
+class RSSCollector(BaseCollector):
+ name = "rss"
+ poll_interval = 600 # 10 minutes
+
+ def __init__(self, feeds: list[str] | None = None) -> None:
+ self._feeds = feeds or DEFAULT_FEEDS
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_feeds(self) -> list[dict]:
+ """Fetch and parse all RSS feeds."""
+ results = []
+ async with aiohttp.ClientSession() as session:
+ for url in self._feeds:
+ try:
+ async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
+ if resp.status == 200:
+ text = await resp.text()
+ feed = feedparser.parse(text)
+ results.append(feed)
+ except Exception as exc:
+ logger.warning("rss_fetch_failed", url=url, error=str(exc))
+ return results
+
+ def _extract_symbols(self, text: str) -> list[str]:
+ """Extract stock tickers from text."""
+ return list(set(TICKER_PATTERN.findall(text)))
+
+ def _parse_time(self, entry: dict) -> datetime:
+ """Parse published time from feed entry."""
+ parsed = entry.get("published_parsed")
+ if parsed:
+ return datetime.fromtimestamp(timegm(parsed), tz=timezone.utc)
+ return datetime.now(timezone.utc)
+
+ async def collect(self) -> list[NewsItem]:
+ feeds = await self._fetch_feeds()
+ items = []
+ seen_titles = set()
+
+ for feed in feeds:
+ for entry in feed.get("entries", []):
+ title = entry.get("title", "").strip()
+ if not title or title in seen_titles:
+ continue
+ seen_titles.add(title)
+
+ summary = entry.get("summary", "")
+ sentiment_text = f"{title}. {summary}" if summary else title
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=title,
+ summary=summary or None,
+ url=entry.get("link"),
+ published_at=self._parse_time(entry),
+ symbols=self._extract_symbols(f"{title} {summary}"),
+ sentiment=self._vader.polarity_scores(sentiment_text)["compound"],
+ category=NewsCategory.MACRO,
+ raw_data={"feed_title": title},
+ )
+ )
+
+ return items
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_rss.py -v`
+Expected: All 3 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/rss.py services/news-collector/tests/test_rss.py
+git commit -m "feat: implement RSS news collector (Yahoo, Google News, MarketWatch)"
+```
+
+---
+
+### Task 9: Implement Fear & Greed Index collector
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/fear_greed.py`
+- Create: `services/news-collector/tests/test_fear_greed.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_fear_greed.py`:
+
+```python
+"""Tests for CNN Fear & Greed Index collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from news_collector.collectors.fear_greed import FearGreedCollector
+
+
+@pytest.fixture
+def collector():
+ return FearGreedCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "fear_greed"
+ assert collector.poll_interval == 3600
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_api_response(collector):
+ mock_data = {
+ "fear_and_greed": {
+ "score": 45.0,
+ "rating": "Fear",
+ "timestamp": "2026-04-02T12:00:00+00:00",
+ }
+ }
+ with patch.object(collector, "_fetch_index", new_callable=AsyncMock, return_value=mock_data):
+ result = await collector.collect()
+
+ assert result.fear_greed == 45
+ assert result.fear_greed_label == "Fear"
+
+
+async def test_collect_returns_none_on_failure(collector):
+ with patch.object(collector, "_fetch_index", new_callable=AsyncMock, return_value=None):
+ result = await collector.collect()
+ assert result is None
+
+
+def test_classify_label():
+ c = FearGreedCollector()
+ assert c._classify(10) == "Extreme Fear"
+ assert c._classify(30) == "Fear"
+ assert c._classify(50) == "Neutral"
+ assert c._classify(70) == "Greed"
+ assert c._classify(85) == "Extreme Greed"
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_fear_greed.py -v`
+Expected: FAIL
+
+- [ ] **Step 3: Implement FearGreedCollector**
+
+Create `services/news-collector/src/news_collector/collectors/fear_greed.py`:
+
+```python
+"""CNN Fear & Greed Index collector."""
+
+import logging
+from dataclasses import dataclass
+from typing import Optional
+
+import aiohttp
+
+from news_collector.collectors.base import BaseCollector
+from shared.models import NewsItem
+
+logger = logging.getLogger(__name__)
+
+FEAR_GREED_URL = "https://production.dataviz.cnn.io/index/fearandgreed/graphdata"
+
+
+@dataclass
+class FearGreedResult:
+ fear_greed: int
+ fear_greed_label: str
+
+
+class FearGreedCollector(BaseCollector):
+ """Fetches CNN Fear & Greed Index.
+
+ Note: This collector does NOT return NewsItem — it returns FearGreedResult
+ which feeds directly into MarketSentiment. The main.py scheduler handles
+ this differently from news collectors.
+ """
+
+ name = "fear_greed"
+ poll_interval = 3600 # 1 hour
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_index(self) -> Optional[dict]:
+ """Fetch Fear & Greed data from CNN API."""
+ headers = {"User-Agent": "Mozilla/5.0"}
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(
+ FEAR_GREED_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
+ ) as resp:
+ if resp.status != 200:
+ logger.warning("fear_greed_fetch_failed", status=resp.status)
+ return None
+ return await resp.json()
+ except Exception as exc:
+ logger.warning("fear_greed_error", error=str(exc))
+ return None
+
+ def _classify(self, score: int) -> str:
+ """Classify numeric score into label."""
+ if score <= 20:
+ return "Extreme Fear"
+ if score <= 40:
+ return "Fear"
+ if score <= 60:
+ return "Neutral"
+ if score <= 80:
+ return "Greed"
+ return "Extreme Greed"
+
+ async def collect(self) -> Optional[FearGreedResult]:
+ """Collect Fear & Greed Index. Returns FearGreedResult or None."""
+ data = await self._fetch_index()
+ if data is None:
+ return None
+
+ try:
+ fg = data["fear_and_greed"]
+ score = int(fg["score"])
+ label = fg.get("rating", self._classify(score))
+ return FearGreedResult(fear_greed=score, fear_greed_label=label)
+ except (KeyError, ValueError, TypeError) as exc:
+ logger.warning("fear_greed_parse_failed", error=str(exc))
+ return None
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_fear_greed.py -v`
+Expected: All 5 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/fear_greed.py services/news-collector/tests/test_fear_greed.py
+git commit -m "feat: implement CNN Fear & Greed Index collector"
+```
+
+---
+
+### Task 10: Implement SEC EDGAR collector
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/sec_edgar.py`
+- Create: `services/news-collector/tests/test_sec_edgar.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_sec_edgar.py`:
+
+```python
+"""Tests for SEC EDGAR filing collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from news_collector.collectors.sec_edgar import SecEdgarCollector
+
+
+@pytest.fixture
+def collector():
+ return SecEdgarCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "sec_edgar"
+ assert collector.poll_interval == 1800
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_filings(collector):
+ mock_response = {
+ "filings": {
+ "recent": {
+ "accessionNumber": ["0001234-26-000001"],
+ "filingDate": ["2026-04-02"],
+ "primaryDocument": ["filing.htm"],
+ "form": ["8-K"],
+ "primaryDocDescription": ["Current Report"],
+ }
+ },
+ "tickers": [{"ticker": "AAPL"}],
+ "name": "Apple Inc",
+ }
+ with patch.object(collector, "_fetch_recent_filings", new_callable=AsyncMock, return_value=[mock_response]):
+ items = await collector.collect()
+
+ assert len(items) == 1
+ assert items[0].source == "sec_edgar"
+ assert items[0].category.value == "filing"
+ assert "AAPL" in items[0].symbols
+
+
+async def test_collect_handles_empty(collector):
+ with patch.object(collector, "_fetch_recent_filings", new_callable=AsyncMock, return_value=[]):
+ items = await collector.collect()
+ assert items == []
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_sec_edgar.py -v`
+Expected: FAIL
+
+- [ ] **Step 3: Implement SecEdgarCollector**
+
+Create `services/news-collector/src/news_collector/collectors/sec_edgar.py`:
+
+```python
+"""SEC EDGAR filing collector (free, no API key required)."""
+
+import logging
+from datetime import datetime, timezone
+
+import aiohttp
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+EDGAR_FULL_TEXT_SEARCH = "https://efts.sec.gov/LATEST/search-index"
+EDGAR_RECENT_FILINGS = "https://efts.sec.gov/LATEST/search-index?q=%228-K%22&dateRange=custom&startdt={date}&enddt={date}&forms=8-K"
+EDGAR_COMPANY_FILINGS = "https://data.sec.gov/submissions/CIK{cik}.json"
+
+# CIK numbers for major companies (subset — extend as needed)
+TRACKED_CIKS = {
+ "0000320193": "AAPL",
+ "0000789019": "MSFT",
+ "0001652044": "GOOGL",
+ "0001018724": "AMZN",
+ "0001318605": "TSLA",
+ "0001045810": "NVDA",
+ "0001326801": "META",
+ "0000019617": "JPM",
+ "0000078003": "PFE",
+ "0000021344": "KO",
+}
+
+SEC_USER_AGENT = "TradingPlatform research@example.com"
+
+
+class SecEdgarCollector(BaseCollector):
+ name = "sec_edgar"
+ poll_interval = 1800 # 30 minutes
+
+ def __init__(self) -> None:
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_recent_filings(self) -> list[dict]:
+ """Fetch recent 8-K filings for tracked companies."""
+ results = []
+ headers = {"User-Agent": SEC_USER_AGENT}
+ async with aiohttp.ClientSession() as session:
+ for cik, ticker in TRACKED_CIKS.items():
+ try:
+ url = f"https://data.sec.gov/submissions/CIK{cik}.json"
+ async with session.get(
+ url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
+ ) as resp:
+ if resp.status == 200:
+ data = await resp.json()
+ data["tickers"] = [{"ticker": ticker}]
+ results.append(data)
+ except Exception as exc:
+ logger.warning("sec_fetch_failed", cik=cik, error=str(exc))
+ return results
+
+ async def collect(self) -> list[NewsItem]:
+ filings_data = await self._fetch_recent_filings()
+ items = []
+ today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
+
+ for company_data in filings_data:
+ tickers = [t["ticker"] for t in company_data.get("tickers", [])]
+ company_name = company_data.get("name", "Unknown")
+ recent = company_data.get("filings", {}).get("recent", {})
+
+ forms = recent.get("form", [])
+ dates = recent.get("filingDate", [])
+ descriptions = recent.get("primaryDocDescription", [])
+ accessions = recent.get("accessionNumber", [])
+
+ for i, form in enumerate(forms):
+ if form != "8-K":
+ continue
+ filing_date = dates[i] if i < len(dates) else ""
+ if filing_date != today:
+ continue
+
+ desc = descriptions[i] if i < len(descriptions) else "8-K Filing"
+ accession = accessions[i] if i < len(accessions) else ""
+ headline = f"{company_name} ({', '.join(tickers)}): {form} - {desc}"
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=headline,
+ summary=desc,
+ url=f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&accession={accession}",
+ published_at=datetime.strptime(filing_date, "%Y-%m-%d").replace(tzinfo=timezone.utc),
+ symbols=tickers,
+ sentiment=self._vader.polarity_scores(headline)["compound"],
+ category=NewsCategory.FILING,
+ raw_data={"form": form, "accession": accession},
+ )
+ )
+
+ return items
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_sec_edgar.py -v`
+Expected: All 4 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/sec_edgar.py services/news-collector/tests/test_sec_edgar.py
+git commit -m "feat: implement SEC EDGAR 8-K filing collector"
+```
+
+---
+
+### Task 11: Implement Reddit collector
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/reddit.py`
+- Create: `services/news-collector/tests/test_reddit.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_reddit.py`:
+
+```python
+"""Tests for Reddit collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from news_collector.collectors.reddit import RedditCollector
+
+
+@pytest.fixture
+def collector():
+ return RedditCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "reddit"
+ assert collector.poll_interval == 900
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_posts(collector):
+ mock_posts = [
+ {
+ "data": {
+ "title": "NVDA to the moon! 🚀 AI demand is insane",
+ "selftext": "Just loaded up on NVDA calls",
+ "url": "https://reddit.com/r/wallstreetbets/123",
+ "created_utc": 1711929600,
+ "score": 500,
+ "num_comments": 200,
+ "subreddit": "wallstreetbets",
+ }
+ },
+ ]
+ with patch.object(collector, "_fetch_subreddit", new_callable=AsyncMock, return_value=mock_posts):
+ items = await collector.collect()
+
+ assert len(items) >= 1
+ assert items[0].source == "reddit"
+ assert items[0].category.value == "social"
+ assert isinstance(items[0].sentiment, float)
+
+
+async def test_collect_filters_low_score(collector):
+ mock_posts = [
+ {
+ "data": {
+ "title": "Random question about stocks",
+ "selftext": "",
+ "url": "https://reddit.com/r/stocks/456",
+ "created_utc": 1711929600,
+ "score": 3,
+ "num_comments": 1,
+ "subreddit": "stocks",
+ }
+ },
+ ]
+ with patch.object(collector, "_fetch_subreddit", new_callable=AsyncMock, return_value=mock_posts):
+ items = await collector.collect()
+ assert items == []
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_reddit.py -v`
+Expected: FAIL
+
+- [ ] **Step 3: Implement RedditCollector**
+
+Create `services/news-collector/src/news_collector/collectors/reddit.py`:
+
+```python
+"""Reddit collector for r/wallstreetbets, r/stocks, r/investing."""
+
+import logging
+import re
+from datetime import datetime, timezone
+
+import aiohttp
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+SUBREDDITS = ["wallstreetbets", "stocks", "investing"]
+MIN_SCORE = 50 # Minimum upvotes to consider
+
+TICKER_PATTERN = re.compile(
+ r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|JPM|V|JNJ|WMT|PG|UNH|HD|"
+ r"MA|DIS|BAC|XOM|PFE|KO|PEP|CSCO|INTC|VZ|NFLX|ADBE|CRM|AMD|QCOM|"
+ r"GS|BA|CAT|MMM|IBM|GE|F|GM|NKE|MCD|SBUX|SPY|QQQ|IWM)\b"
+)
+
+
+class RedditCollector(BaseCollector):
+ name = "reddit"
+ poll_interval = 900 # 15 minutes
+
+ def __init__(self) -> None:
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_subreddit(self, subreddit: str = "wallstreetbets") -> list[dict]:
+ """Fetch hot posts from a subreddit via JSON API."""
+ url = f"https://www.reddit.com/r/{subreddit}/hot.json?limit=25"
+ headers = {"User-Agent": "TradingPlatform/1.0"}
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(
+ url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
+ ) as resp:
+ if resp.status != 200:
+ logger.warning("reddit_fetch_failed", subreddit=subreddit, status=resp.status)
+ return []
+ data = await resp.json()
+ return data.get("data", {}).get("children", [])
+ except Exception as exc:
+ logger.warning("reddit_error", subreddit=subreddit, error=str(exc))
+ return []
+
+ async def collect(self) -> list[NewsItem]:
+ items = []
+ seen_titles = set()
+
+ for subreddit in SUBREDDITS:
+ posts = await self._fetch_subreddit(subreddit)
+ for post in posts:
+ data = post.get("data", {})
+ title = data.get("title", "").strip()
+ score = data.get("score", 0)
+
+ if not title or title in seen_titles or score < MIN_SCORE:
+ continue
+ seen_titles.add(title)
+
+ selftext = data.get("selftext", "")
+ text = f"{title}. {selftext}" if selftext else title
+ symbols = list(set(TICKER_PATTERN.findall(text)))
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=title,
+ summary=selftext[:500] if selftext else None,
+ url=data.get("url"),
+ published_at=datetime.fromtimestamp(
+ data.get("created_utc", 0), tz=timezone.utc
+ ),
+ symbols=symbols,
+ sentiment=self._vader.polarity_scores(text)["compound"],
+ category=NewsCategory.SOCIAL,
+ raw_data={
+ "subreddit": data.get("subreddit", subreddit),
+ "score": score,
+ "num_comments": data.get("num_comments", 0),
+ },
+ )
+ )
+
+ return items
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_reddit.py -v`
+Expected: All 4 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/reddit.py services/news-collector/tests/test_reddit.py
+git commit -m "feat: implement Reddit social sentiment collector"
+```
+
+---
+
+### Task 12: Implement Truth Social and Fed collectors
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/collectors/truth_social.py`
+- Create: `services/news-collector/src/news_collector/collectors/fed.py`
+- Create: `services/news-collector/tests/test_truth_social.py`
+- Create: `services/news-collector/tests/test_fed.py`
+
+- [ ] **Step 1: Write tests for Truth Social**
+
+Create `services/news-collector/tests/test_truth_social.py`:
+
+```python
+"""Tests for Truth Social collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from news_collector.collectors.truth_social import TruthSocialCollector
+
+
+@pytest.fixture
+def collector():
+ return TruthSocialCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "truth_social"
+ assert collector.poll_interval == 900
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_posts(collector):
+ mock_posts = [
+ {
+ "content": "We are imposing 25% tariffs on all steel imports!",
+ "created_at": "2026-04-02T12:00:00.000Z",
+ "url": "https://truthsocial.com/@realDonaldTrump/12345",
+ },
+ ]
+ with patch.object(collector, "_fetch_posts", new_callable=AsyncMock, return_value=mock_posts):
+ items = await collector.collect()
+
+ assert len(items) == 1
+ assert items[0].source == "truth_social"
+ assert items[0].category.value == "policy"
+ assert "tariff" in items[0].headline.lower() or "tariff" in items[0].raw_data.get("content", "").lower()
+
+
+async def test_collect_handles_empty(collector):
+ with patch.object(collector, "_fetch_posts", new_callable=AsyncMock, return_value=[]):
+ items = await collector.collect()
+ assert items == []
+```
+
+- [ ] **Step 2: Write tests for Fed collector**
+
+Create `services/news-collector/tests/test_fed.py`:
+
+```python
+"""Tests for Federal Reserve collector."""
+
+import pytest
+from unittest.mock import AsyncMock, patch
+
+from news_collector.collectors.fed import FedCollector
+
+
+@pytest.fixture
+def collector():
+ return FedCollector()
+
+
+def test_collector_name(collector):
+ assert collector.name == "fed"
+ assert collector.poll_interval == 3600
+
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+
+async def test_collect_parses_rss(collector):
+ mock_entries = [
+ {
+ "title": "Federal Reserve issues FOMC statement",
+ "link": "https://www.federalreserve.gov/newsevents/pressreleases/monetary20260402a.htm",
+ "published_parsed": (2026, 4, 2, 14, 0, 0, 0, 0, 0),
+ "summary": "The Federal Open Market Committee decided to maintain the target range...",
+ },
+ ]
+ with patch.object(collector, "_fetch_fed_rss", new_callable=AsyncMock, return_value=mock_entries):
+ items = await collector.collect()
+
+ assert len(items) == 1
+ assert items[0].source == "fed"
+ assert items[0].category.value == "fed"
+```
+
+- [ ] **Step 3: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py -v`
+Expected: FAIL
+
+- [ ] **Step 4: Implement TruthSocialCollector**
+
+Create `services/news-collector/src/news_collector/collectors/truth_social.py`:
+
+```python
+"""Truth Social collector for Trump posts (policy-relevant)."""
+
+import logging
+from datetime import datetime, timezone
+
+import aiohttp
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+# Truth Social uses a Mastodon-compatible API
+TRUTH_SOCIAL_API = "https://truthsocial.com/api/v1/accounts/107780257626128497/statuses"
+
+
+class TruthSocialCollector(BaseCollector):
+ name = "truth_social"
+ poll_interval = 900 # 15 minutes
+
+ def __init__(self) -> None:
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_posts(self) -> list[dict]:
+ """Fetch recent posts from Truth Social."""
+ headers = {"User-Agent": "Mozilla/5.0"}
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(
+ TRUTH_SOCIAL_API,
+ headers=headers,
+ params={"limit": 10},
+ timeout=aiohttp.ClientTimeout(total=15),
+ ) as resp:
+ if resp.status != 200:
+ logger.warning("truth_social_fetch_failed", status=resp.status)
+ return []
+ return await resp.json()
+ except Exception as exc:
+ logger.warning("truth_social_error", error=str(exc))
+ return []
+
+ def _strip_html(self, text: str) -> str:
+ """Remove HTML tags from content."""
+ import re
+ return re.sub(r"<[^>]+>", "", text).strip()
+
+ async def collect(self) -> list[NewsItem]:
+ posts = await self._fetch_posts()
+ items = []
+
+ for post in posts:
+ content = self._strip_html(post.get("content", ""))
+ if not content:
+ continue
+
+ created_at_str = post.get("created_at", "")
+ try:
+ published = datetime.fromisoformat(created_at_str.replace("Z", "+00:00"))
+ except (ValueError, AttributeError):
+ published = datetime.now(timezone.utc)
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=content[:200],
+ summary=content if len(content) > 200 else None,
+ url=post.get("url"),
+ published_at=published,
+ symbols=[], # Symbols extracted at aggregation stage via LLM
+ sentiment=self._vader.polarity_scores(content)["compound"],
+ category=NewsCategory.POLICY,
+ raw_data={"content": content, "id": post.get("id")},
+ )
+ )
+
+ return items
+```
+
+- [ ] **Step 5: Implement FedCollector**
+
+Create `services/news-collector/src/news_collector/collectors/fed.py`:
+
+```python
+"""Federal Reserve press release and FOMC statement collector."""
+
+import logging
+from calendar import timegm
+from datetime import datetime, timezone
+
+import aiohttp
+import feedparser
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+from news_collector.collectors.base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+FED_RSS_URL = "https://www.federalreserve.gov/feeds/press_all.xml"
+
+
+class FedCollector(BaseCollector):
+ name = "fed"
+ poll_interval = 3600 # 1 hour
+
+ def __init__(self) -> None:
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_fed_rss(self) -> list[dict]:
+ """Fetch Federal Reserve RSS feed entries."""
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(
+ FED_RSS_URL, timeout=aiohttp.ClientTimeout(total=10)
+ ) as resp:
+ if resp.status != 200:
+ logger.warning("fed_rss_failed", status=resp.status)
+ return []
+ text = await resp.text()
+ feed = feedparser.parse(text)
+ return feed.get("entries", [])
+ except Exception as exc:
+ logger.warning("fed_rss_error", error=str(exc))
+ return []
+
+ def _detect_stance(self, text: str) -> str:
+ """Detect hawkish/dovish/neutral stance from text."""
+ text_lower = text.lower()
+ hawkish_words = ["tighten", "raise", "inflation concern", "restrictive", "higher rates"]
+ dovish_words = ["accommodate", "cut", "easing", "lower rates", "support growth"]
+
+ hawk_count = sum(1 for w in hawkish_words if w in text_lower)
+ dove_count = sum(1 for w in dovish_words if w in text_lower)
+
+ if hawk_count > dove_count:
+ return "hawkish"
+ if dove_count > hawk_count:
+ return "dovish"
+ return "neutral"
+
+ async def collect(self) -> list[NewsItem]:
+ entries = await self._fetch_fed_rss()
+ items = []
+
+ for entry in entries:
+ title = entry.get("title", "").strip()
+ if not title:
+ continue
+
+ summary = entry.get("summary", "")
+ parsed_time = entry.get("published_parsed")
+ if parsed_time:
+ published = datetime.fromtimestamp(timegm(parsed_time), tz=timezone.utc)
+ else:
+ published = datetime.now(timezone.utc)
+
+ text = f"{title}. {summary}" if summary else title
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=title,
+ summary=summary or None,
+ url=entry.get("link"),
+ published_at=published,
+ symbols=[],
+ sentiment=self._vader.polarity_scores(text)["compound"],
+ category=NewsCategory.FED,
+ raw_data={"stance": self._detect_stance(text)},
+ )
+ )
+
+ return items
+```
+
+- [ ] **Step 6: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py -v`
+Expected: All 7 tests PASS
+
+- [ ] **Step 7: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/collectors/truth_social.py services/news-collector/src/news_collector/collectors/fed.py services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py
+git commit -m "feat: implement Truth Social and Federal Reserve collectors"
+```
+
+---
+
+### Task 13: Implement news-collector main.py (scheduler)
+
+**Files:**
+- Create: `services/news-collector/src/news_collector/main.py`
+- Create: `services/news-collector/tests/test_main.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/news-collector/tests/test_main.py`:
+
+```python
+"""Tests for news collector scheduler."""
+
+import pytest
+from unittest.mock import AsyncMock, patch, MagicMock
+from datetime import datetime, timezone
+
+from shared.models import NewsCategory, NewsItem
+
+from news_collector.main import run_collector_once
+
+
+async def test_run_collector_once_stores_and_publishes():
+ mock_item = NewsItem(
+ source="test",
+ headline="Test news",
+ published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ sentiment=0.5,
+ category=NewsCategory.MACRO,
+ )
+
+ mock_collector = MagicMock()
+ mock_collector.name = "test"
+ mock_collector.collect = AsyncMock(return_value=[mock_item])
+
+ mock_db = MagicMock()
+ mock_db.insert_news_item = AsyncMock()
+
+ mock_broker = MagicMock()
+ mock_broker.publish = AsyncMock()
+
+ count = await run_collector_once(mock_collector, mock_db, mock_broker)
+
+ assert count == 1
+ mock_db.insert_news_item.assert_called_once_with(mock_item)
+ mock_broker.publish.assert_called_once()
+
+
+async def test_run_collector_once_handles_empty():
+ mock_collector = MagicMock()
+ mock_collector.name = "test"
+ mock_collector.collect = AsyncMock(return_value=[])
+
+ mock_db = MagicMock()
+ mock_broker = MagicMock()
+
+ count = await run_collector_once(mock_collector, mock_db, mock_broker)
+ assert count == 0
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/news-collector/tests/test_main.py -v`
+Expected: FAIL
+
+- [ ] **Step 3: Implement main.py**
+
+Create `services/news-collector/src/news_collector/main.py`:
+
+```python
+"""News Collector Service — schedules and runs all news collectors."""
+
+import asyncio
+import logging
+
+from shared.broker import RedisBroker
+from shared.db import Database
+from shared.events import NewsEvent
+from shared.healthcheck import HealthCheckServer
+from shared.logging import setup_logging
+from shared.metrics import ServiceMetrics
+from shared.models import NewsItem
+from shared.notifier import TelegramNotifier
+from shared.sentiment_models import MarketSentiment
+
+from news_collector.config import NewsCollectorConfig
+from news_collector.collectors.base import BaseCollector
+from news_collector.collectors.finnhub import FinnhubCollector
+from news_collector.collectors.rss import RSSCollector
+from news_collector.collectors.sec_edgar import SecEdgarCollector
+from news_collector.collectors.truth_social import TruthSocialCollector
+from news_collector.collectors.reddit import RedditCollector
+from news_collector.collectors.fear_greed import FearGreedCollector
+from news_collector.collectors.fed import FedCollector
+
+logger = logging.getLogger(__name__)
+
+
+async def run_collector_once(
+ collector: BaseCollector,
+ db: Database,
+ broker: RedisBroker,
+) -> int:
+ """Run a single collector, store results, publish to Redis.
+ Returns number of items collected."""
+ items = await collector.collect()
+ if not isinstance(items, list):
+ # FearGreedCollector returns a FearGreedResult, not a list
+ return 0
+
+ for item in items:
+ await db.insert_news_item(item)
+ event = NewsEvent(data=item)
+ await broker.publish("news", event.to_dict())
+
+ return len(items)
+
+
+async def run_collector_loop(
+ collector: BaseCollector,
+ db: Database,
+ broker: RedisBroker,
+ log,
+) -> None:
+ """Run a collector on its poll interval forever."""
+ while True:
+ try:
+ if await collector.is_available():
+ count = await run_collector_once(collector, db, broker)
+ log.info("collector_run", collector=collector.name, items=count)
+ else:
+ log.debug("collector_unavailable", collector=collector.name)
+ except Exception as exc:
+ log.error("collector_error", collector=collector.name, error=str(exc))
+ await asyncio.sleep(collector.poll_interval)
+
+
+async def run_fear_greed_loop(
+ collector: FearGreedCollector,
+ db: Database,
+ log,
+) -> None:
+ """Run the Fear & Greed collector and update market sentiment."""
+ from datetime import datetime, timezone
+
+ while True:
+ try:
+ result = await collector.collect()
+ if result is not None:
+ ms = MarketSentiment(
+ fear_greed=result.fear_greed,
+ fear_greed_label=result.fear_greed_label,
+ fed_stance="neutral", # Updated by Fed collector analysis
+ market_regime=_determine_regime(result.fear_greed, None),
+ updated_at=datetime.now(timezone.utc),
+ )
+ await db.upsert_market_sentiment(ms)
+ log.info("fear_greed_updated", score=result.fear_greed, label=result.fear_greed_label)
+ except Exception as exc:
+ log.error("fear_greed_error", error=str(exc))
+ await asyncio.sleep(collector.poll_interval)
+
+
+async def run_aggregator_loop(
+ db: Database,
+ interval: int,
+ log,
+) -> None:
+ """Run sentiment aggregation every `interval` seconds.
+ Reads recent news from DB, computes per-symbol scores, upserts into symbol_scores table."""
+ from datetime import datetime, timezone
+ from shared.sentiment import SentimentAggregator
+
+ aggregator = SentimentAggregator()
+
+ while True:
+ try:
+ now = datetime.now(timezone.utc)
+ news_items = await db.get_recent_news(hours=24)
+ if news_items:
+ scores = aggregator.aggregate(news_items, now)
+ for symbol_score in scores.values():
+ await db.upsert_symbol_score(symbol_score)
+ log.info("aggregation_complete", symbols=len(scores))
+ except Exception as exc:
+ log.error("aggregation_error", error=str(exc))
+ await asyncio.sleep(interval)
+
+
+def _determine_regime(fear_greed: int, vix: float | None) -> str:
+ """Determine market regime from Fear & Greed and VIX."""
+ if fear_greed <= 20:
+ return "risk_off"
+ if vix is not None and vix > 30:
+ return "risk_off"
+ if fear_greed >= 60 and (vix is None or vix < 20):
+ return "risk_on"
+ return "neutral"
+
+
+async def run() -> None:
+ config = NewsCollectorConfig()
+ log = setup_logging("news-collector", config.log_level, config.log_format)
+ metrics = ServiceMetrics("news_collector")
+
+ notifier = TelegramNotifier(
+ bot_token=config.telegram_bot_token,
+ chat_id=config.telegram_chat_id,
+ )
+
+ db = Database(config.database_url)
+ await db.connect()
+
+ broker = RedisBroker(config.redis_url)
+
+ health = HealthCheckServer(
+ "news-collector",
+ port=config.health_port,
+ auth_token=config.metrics_auth_token,
+ )
+ health.register_check("redis", broker.ping)
+ await health.start()
+ metrics.service_up.labels(service="news-collector").set(1)
+
+ # Initialize collectors
+ news_collectors: list[BaseCollector] = [
+ RSSCollector(),
+ SecEdgarCollector(),
+ TruthSocialCollector(),
+ RedditCollector(),
+ FedCollector(),
+ ]
+
+ # Finnhub requires API key
+ if config.finnhub_api_key:
+ news_collectors.append(FinnhubCollector(api_key=config.finnhub_api_key))
+
+ fear_greed = FearGreedCollector()
+
+ log.info(
+ "starting",
+ collectors=[c.name for c in news_collectors],
+ fear_greed=True,
+ )
+
+ tasks = []
+ try:
+ for collector in news_collectors:
+ task = asyncio.create_task(run_collector_loop(collector, db, broker, log))
+ tasks.append(task)
+
+ tasks.append(asyncio.create_task(run_fear_greed_loop(fear_greed, db, log)))
+ tasks.append(asyncio.create_task(
+ run_aggregator_loop(db, config.sentiment_aggregate_interval, log)
+ ))
+
+ await asyncio.gather(*tasks)
+ except Exception as exc:
+ log.error("fatal_error", error=str(exc))
+ await notifier.send_error(str(exc), "news-collector")
+ raise
+ finally:
+ for task in tasks:
+ task.cancel()
+ metrics.service_up.labels(service="news-collector").set(0)
+ await notifier.close()
+ await broker.close()
+ await db.close()
+
+
+def main() -> None:
+ asyncio.run(run())
+
+
+if __name__ == "__main__":
+ main()
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/news-collector/tests/test_main.py -v`
+Expected: All 2 tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/news-collector/src/news_collector/main.py services/news-collector/tests/test_main.py
+git commit -m "feat: implement news-collector main scheduler with all collectors"
+```
+
+---
+
+## Phase 3: Sentiment Analysis Pipeline
+
+### Task 14: Implement sentiment aggregator
+
+**Files:**
+- Modify: `shared/src/shared/sentiment.py`
+- Create: `shared/tests/test_sentiment_aggregator.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `shared/tests/test_sentiment_aggregator.py`:
+
+```python
+"""Tests for sentiment aggregator."""
+
+import pytest
+from datetime import datetime, timezone, timedelta
+
+from shared.sentiment import SentimentAggregator
+
+
+@pytest.fixture
+def aggregator():
+ return SentimentAggregator()
+
+
+def test_freshness_decay_recent():
+ a = SentimentAggregator()
+ now = datetime.now(timezone.utc)
+ assert a._freshness_decay(now, now) == 1.0
+
+
+def test_freshness_decay_3_hours():
+ a = SentimentAggregator()
+ now = datetime.now(timezone.utc)
+ three_hours_ago = now - timedelta(hours=3)
+ assert a._freshness_decay(three_hours_ago, now) == 0.7
+
+
+def test_freshness_decay_12_hours():
+ a = SentimentAggregator()
+ now = datetime.now(timezone.utc)
+ twelve_hours_ago = now - timedelta(hours=12)
+ assert a._freshness_decay(twelve_hours_ago, now) == 0.3
+
+
+def test_freshness_decay_old():
+ a = SentimentAggregator()
+ now = datetime.now(timezone.utc)
+ two_days_ago = now - timedelta(days=2)
+ assert a._freshness_decay(two_days_ago, now) == 0.0
+
+
+def test_compute_composite():
+ a = SentimentAggregator()
+ composite = a._compute_composite(
+ news_score=0.5,
+ social_score=0.3,
+ policy_score=0.8,
+ filing_score=0.2,
+ )
+ expected = 0.5 * 0.3 + 0.3 * 0.2 + 0.8 * 0.3 + 0.2 * 0.2
+ assert abs(composite - expected) < 0.001
+
+
+def test_aggregate_news_by_symbol(aggregator):
+ now = datetime.now(timezone.utc)
+ news_items = [
+ {
+ "symbols": ["AAPL"],
+ "sentiment": 0.8,
+ "category": "earnings",
+ "published_at": now,
+ },
+ {
+ "symbols": ["AAPL"],
+ "sentiment": 0.3,
+ "category": "macro",
+ "published_at": now - timedelta(hours=2),
+ },
+ {
+ "symbols": ["MSFT"],
+ "sentiment": -0.5,
+ "category": "policy",
+ "published_at": now,
+ },
+ ]
+ scores = aggregator.aggregate(news_items, now)
+
+ assert "AAPL" in scores
+ assert "MSFT" in scores
+ assert scores["AAPL"].news_count == 2
+ assert scores["AAPL"].news_score > 0 # Positive overall
+ assert scores["MSFT"].policy_score < 0 # Negative policy
+
+
+def test_aggregate_empty(aggregator):
+ now = datetime.now(timezone.utc)
+ scores = aggregator.aggregate([], now)
+ assert scores == {}
+
+
+def test_determine_regime():
+ a = SentimentAggregator()
+ assert a.determine_regime(15, None) == "risk_off"
+ assert a.determine_regime(15, 35.0) == "risk_off"
+ assert a.determine_regime(50, 35.0) == "risk_off"
+ assert a.determine_regime(70, 15.0) == "risk_on"
+ assert a.determine_regime(50, 20.0) == "neutral"
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest shared/tests/test_sentiment_aggregator.py -v`
+Expected: FAIL — `SentimentAggregator` not in sentiment.py
+
+- [ ] **Step 3: Add SentimentAggregator to sentiment.py**
+
+Keep the existing `SentimentData` class (for backward compat with existing tests). Add `SentimentAggregator` class at the end of `shared/src/shared/sentiment.py`:
+
+```python
+from datetime import timedelta
+from shared.sentiment_models import SymbolScore
+
+
+class SentimentAggregator:
+ """Aggregates per-news sentiment into per-symbol scores."""
+
+ # Weights: policy events are most impactful for US stocks
+ WEIGHTS = {
+ "news": 0.3,
+ "social": 0.2,
+ "policy": 0.3,
+ "filing": 0.2,
+ }
+
+ # Category → score field mapping
+ CATEGORY_MAP = {
+ "earnings": "news",
+ "macro": "news",
+ "social": "social",
+ "policy": "policy",
+ "filing": "filing",
+ "fed": "policy",
+ }
+
+ def _freshness_decay(self, published_at: datetime, now: datetime) -> float:
+ """Compute freshness decay factor."""
+ age = now - published_at
+ hours = age.total_seconds() / 3600
+ if hours < 1:
+ return 1.0
+ if hours < 6:
+ return 0.7
+ if hours < 24:
+ return 0.3
+ return 0.0
+
+ def _compute_composite(
+ self,
+ news_score: float,
+ social_score: float,
+ policy_score: float,
+ filing_score: float,
+ ) -> float:
+ return (
+ news_score * self.WEIGHTS["news"]
+ + social_score * self.WEIGHTS["social"]
+ + policy_score * self.WEIGHTS["policy"]
+ + filing_score * self.WEIGHTS["filing"]
+ )
+
+ def aggregate(
+ self, news_items: list[dict], now: datetime
+ ) -> dict[str, SymbolScore]:
+ """Aggregate news items into per-symbol scores.
+
+ Each news_items dict must have: symbols, sentiment, category, published_at.
+ Returns dict mapping symbol → SymbolScore.
+ """
+ # Accumulate per-symbol, per-category
+ symbol_data: dict[str, dict] = {}
+
+ for item in news_items:
+ decay = self._freshness_decay(item["published_at"], now)
+ if decay == 0.0:
+ continue
+
+ category = item.get("category", "macro")
+ score_field = self.CATEGORY_MAP.get(category, "news")
+ weighted_sentiment = item["sentiment"] * decay
+
+ for symbol in item.get("symbols", []):
+ if symbol not in symbol_data:
+ symbol_data[symbol] = {
+ "news_scores": [],
+ "social_scores": [],
+ "policy_scores": [],
+ "filing_scores": [],
+ "count": 0,
+ }
+
+ symbol_data[symbol][f"{score_field}_scores"].append(weighted_sentiment)
+ symbol_data[symbol]["count"] += 1
+
+ # Compute averages and composites
+ result = {}
+ for symbol, data in symbol_data.items():
+ news_score = _safe_avg(data["news_scores"])
+ social_score = _safe_avg(data["social_scores"])
+ policy_score = _safe_avg(data["policy_scores"])
+ filing_score = _safe_avg(data["filing_scores"])
+
+ result[symbol] = SymbolScore(
+ symbol=symbol,
+ news_score=news_score,
+ news_count=data["count"],
+ social_score=social_score,
+ policy_score=policy_score,
+ filing_score=filing_score,
+ composite=self._compute_composite(
+ news_score, social_score, policy_score, filing_score
+ ),
+ updated_at=now,
+ )
+
+ return result
+
+ def determine_regime(self, fear_greed: int, vix: float | None) -> str:
+ """Determine market regime."""
+ if fear_greed <= 20:
+ return "risk_off"
+ if vix is not None and vix > 30:
+ return "risk_off"
+ if fear_greed >= 60 and (vix is None or vix < 20):
+ return "risk_on"
+ return "neutral"
+
+
+def _safe_avg(values: list[float]) -> float:
+ """Return average of values, or 0.0 if empty."""
+ if not values:
+ return 0.0
+ return sum(values) / len(values)
+```
+
+- [ ] **Step 4: Run new tests to verify they pass**
+
+Run: `pytest shared/tests/test_sentiment_aggregator.py -v`
+Expected: All 9 tests PASS
+
+- [ ] **Step 5: Run existing sentiment tests for regressions**
+
+Run: `pytest shared/tests/test_sentiment.py -v`
+Expected: All existing tests PASS (SentimentData unchanged)
+
+- [ ] **Step 6: Commit**
+
+```bash
+git add shared/src/shared/sentiment.py shared/tests/test_sentiment_aggregator.py
+git commit -m "feat: implement SentimentAggregator with freshness decay and composite scoring"
+```
+
+---
+
+## Phase 4: Stock Selector Engine
+
+### Task 15: Implement stock selector
+
+**Files:**
+- Create: `services/strategy-engine/src/strategy_engine/stock_selector.py`
+- Create: `services/strategy-engine/tests/test_stock_selector.py`
+
+- [ ] **Step 1: Write tests**
+
+Create `services/strategy-engine/tests/test_stock_selector.py`:
+
+```python
+"""Tests for stock selector engine."""
+
+import pytest
+from unittest.mock import AsyncMock, MagicMock, patch
+from datetime import datetime, timezone
+from decimal import Decimal
+
+from shared.models import OrderSide
+from shared.sentiment_models import SymbolScore, MarketSentiment, SelectedStock, Candidate
+
+from strategy_engine.stock_selector import (
+ SentimentCandidateSource,
+ StockSelector,
+ _parse_llm_selections,
+)
+
+
+async def test_sentiment_candidate_source():
+ mock_db = MagicMock()
+ mock_db.get_top_symbol_scores = AsyncMock(return_value=[
+ {"symbol": "AAPL", "composite": 0.8, "news_count": 5},
+ {"symbol": "NVDA", "composite": 0.6, "news_count": 3},
+ ])
+
+ source = SentimentCandidateSource(mock_db)
+ candidates = await source.get_candidates()
+
+ assert len(candidates) == 2
+ assert candidates[0].symbol == "AAPL"
+ assert candidates[0].source == "sentiment"
+
+
+def test_parse_llm_selections_valid():
+ llm_response = """
+ [
+ {"symbol": "NVDA", "side": "BUY", "conviction": 0.85, "reason": "AI demand", "key_news": ["NVDA beats earnings"]},
+ {"symbol": "XOM", "side": "BUY", "conviction": 0.72, "reason": "Oil surge", "key_news": ["Oil prices up"]}
+ ]
+ """
+ selections = _parse_llm_selections(llm_response)
+ assert len(selections) == 2
+ assert selections[0].symbol == "NVDA"
+ assert selections[0].conviction == 0.85
+
+
+def test_parse_llm_selections_invalid():
+ selections = _parse_llm_selections("not json")
+ assert selections == []
+
+
+def test_parse_llm_selections_with_markdown():
+ llm_response = """
+ Here are my picks:
+ ```json
+ [
+ {"symbol": "TSLA", "side": "BUY", "conviction": 0.7, "reason": "Momentum", "key_news": ["Tesla rally"]}
+ ]
+ ```
+ """
+ selections = _parse_llm_selections(llm_response)
+ assert len(selections) == 1
+ assert selections[0].symbol == "TSLA"
+
+
+async def test_selector_blocks_on_risk_off():
+ mock_db = MagicMock()
+ mock_db.get_latest_market_sentiment = AsyncMock(return_value={
+ "fear_greed": 15,
+ "fear_greed_label": "Extreme Fear",
+ "vix": 35.0,
+ "fed_stance": "neutral",
+ "market_regime": "risk_off",
+ "updated_at": datetime.now(timezone.utc),
+ })
+
+ selector = StockSelector(db=mock_db, broker=MagicMock(), alpaca=MagicMock(), anthropic_api_key="test")
+ result = await selector.select()
+ assert result == []
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest services/strategy-engine/tests/test_stock_selector.py -v`
+Expected: FAIL — module not found
+
+- [ ] **Step 3: Implement StockSelector**
+
+Create `services/strategy-engine/src/strategy_engine/stock_selector.py`:
+
+```python
+"""Stock Selector Engine — 3-stage dynamic stock selection for MOC trading."""
+
+import json
+import logging
+import re
+from datetime import datetime, timezone
+from decimal import Decimal
+from typing import Optional
+
+import aiohttp
+
+from shared.alpaca import AlpacaClient
+from shared.broker import RedisBroker
+from shared.db import Database
+from shared.models import OrderSide
+from shared.sentiment_models import Candidate, MarketSentiment, SelectedStock, SymbolScore
+
+logger = logging.getLogger(__name__)
+
+
+class SentimentCandidateSource:
+ """Get candidate stocks from sentiment scores in DB."""
+
+ def __init__(self, db: Database, limit: int = 20) -> None:
+ self._db = db
+ self._limit = limit
+
+ async def get_candidates(self) -> list[Candidate]:
+ scores = await self._db.get_top_symbol_scores(limit=self._limit)
+ return [
+ Candidate(
+ symbol=s["symbol"],
+ source="sentiment",
+ score=s["composite"],
+ reason=f"Sentiment composite={s['composite']:.2f}, news_count={s['news_count']}",
+ )
+ for s in scores
+ if s["composite"] != 0
+ ]
+
+
+class LLMCandidateSource:
+ """Get candidate stocks by asking Claude to analyze today's top news."""
+
+ def __init__(self, db: Database, api_key: str, model: str) -> None:
+ self._db = db
+ self._api_key = api_key
+ self._model = model
+
+ async def get_candidates(self) -> list[Candidate]:
+ news = await self._db.get_recent_news(hours=24)
+ if not news:
+ return []
+
+ headlines = [f"- [{n['source']}] {n['headline']} (sentiment: {n['sentiment']:.2f})" for n in news[:50]]
+ prompt = (
+ "You are a stock market analyst. Based on today's news headlines below, "
+ "identify US stocks that are most likely to be affected (positively or negatively). "
+ "Return a JSON array of objects with: symbol, direction (BUY or SELL), score (0-1), reason.\n\n"
+ "Headlines:\n" + "\n".join(headlines) + "\n\n"
+ "Return ONLY the JSON array, no other text."
+ )
+
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.post(
+ "https://api.anthropic.com/v1/messages",
+ headers={
+ "x-api-key": self._api_key,
+ "anthropic-version": "2023-06-01",
+ "content-type": "application/json",
+ },
+ json={
+ "model": self._model,
+ "max_tokens": 1024,
+ "messages": [{"role": "user", "content": prompt}],
+ },
+ timeout=aiohttp.ClientTimeout(total=30),
+ ) as resp:
+ if resp.status != 200:
+ logger.warning("llm_candidate_failed", status=resp.status)
+ return []
+ data = await resp.json()
+ text = data["content"][0]["text"]
+ except Exception as exc:
+ logger.error("llm_candidate_error", error=str(exc))
+ return []
+
+ return self._parse_response(text)
+
+ def _parse_response(self, text: str) -> list[Candidate]:
+ try:
+ # Extract JSON from possible markdown code blocks
+ json_match = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL)
+ if json_match:
+ text = json_match.group(1)
+ items = json.loads(text)
+ except (json.JSONDecodeError, TypeError):
+ return []
+
+ candidates = []
+ for item in items:
+ try:
+ direction = OrderSide(item.get("direction", "BUY"))
+ candidates.append(
+ Candidate(
+ symbol=item["symbol"],
+ source="llm",
+ direction=direction,
+ score=float(item.get("score", 0.5)),
+ reason=item.get("reason", "LLM recommendation"),
+ )
+ )
+ except (KeyError, ValueError):
+ continue
+
+ return candidates
+
+
+class StockSelector:
+ """3-stage stock selector: candidates → technical filter → LLM final pick."""
+
+ def __init__(
+ self,
+ db: Database,
+ broker: RedisBroker,
+ alpaca: AlpacaClient,
+ anthropic_api_key: str,
+ anthropic_model: str = "claude-sonnet-4-20250514",
+ max_picks: int = 3,
+ ) -> None:
+ self._db = db
+ self._broker = broker
+ self._alpaca = alpaca
+ self._api_key = anthropic_api_key
+ self._model = anthropic_model
+ self._max_picks = max_picks
+ self._sentiment_source = SentimentCandidateSource(db)
+ self._llm_source = LLMCandidateSource(db, anthropic_api_key, anthropic_model)
+
+ async def select(self) -> list[SelectedStock]:
+ """Run full 3-stage selection. Returns list of SelectedStock."""
+ # Check market sentiment gate
+ ms = await self._db.get_latest_market_sentiment()
+ if ms and ms.get("market_regime") == "risk_off":
+ logger.info("selection_blocked_risk_off")
+ return []
+
+ # Stage 1: Candidate pool
+ sentiment_candidates = await self._sentiment_source.get_candidates()
+ llm_candidates = await self._llm_source.get_candidates()
+ candidates = self._merge_candidates(sentiment_candidates, llm_candidates)
+
+ if not candidates:
+ logger.info("no_candidates_found")
+ return []
+
+ logger.info("candidates_found", count=len(candidates))
+
+ # Stage 2: Technical filter
+ filtered = await self._technical_filter(candidates)
+ if not filtered:
+ logger.info("all_candidates_filtered_out")
+ return []
+
+ logger.info("technical_filter_passed", count=len(filtered))
+
+ # Stage 3: LLM final selection
+ selections = await self._llm_final_select(filtered, ms)
+
+ # Publish to Redis
+ for selection in selections:
+ await self._broker.publish(
+ "selected_stocks",
+ selection.model_dump(mode="json"),
+ )
+
+ # Persist audit trail
+ from datetime import date as date_type
+
+ for selection in selections:
+ score_data = await self._db.get_top_symbol_scores(limit=100)
+ snapshot = next(
+ (s for s in score_data if s["symbol"] == selection.symbol),
+ {},
+ )
+ await self._db.insert_stock_selection(
+ trade_date=date_type.today(),
+ symbol=selection.symbol,
+ side=selection.side.value,
+ conviction=selection.conviction,
+ reason=selection.reason,
+ key_news=selection.key_news,
+ sentiment_snapshot=snapshot,
+ )
+
+ return selections
+
+ def _merge_candidates(
+ self,
+ sentiment: list[Candidate],
+ llm: list[Candidate],
+ ) -> list[Candidate]:
+ """Merge and deduplicate candidates, preferring higher scores."""
+ by_symbol: dict[str, Candidate] = {}
+ for c in sentiment + llm:
+ if c.symbol not in by_symbol or c.score > by_symbol[c.symbol].score:
+ by_symbol[c.symbol] = c
+ return sorted(by_symbol.values(), key=lambda c: c.score, reverse=True)
+
+ async def _technical_filter(self, candidates: list[Candidate]) -> list[Candidate]:
+ """Apply MOC-style technical screening to candidates."""
+ import pandas as pd
+
+ passed = []
+ for candidate in candidates:
+ try:
+ bars = await self._alpaca.get_bars(
+ candidate.symbol, timeframe="1Day", limit=30
+ )
+ if not bars or len(bars) < 21:
+ continue
+
+ closes = pd.Series([float(b["c"]) for b in bars])
+ volumes = pd.Series([float(b["v"]) for b in bars])
+
+ # RSI
+ delta = closes.diff()
+ gain = delta.clip(lower=0)
+ loss = -delta.clip(upper=0)
+ avg_gain = gain.ewm(com=13, min_periods=14).mean()
+ avg_loss = loss.ewm(com=13, min_periods=14).mean()
+ rs = avg_gain / avg_loss.replace(0, float("nan"))
+ rsi = 100 - (100 / (1 + rs))
+ current_rsi = rsi.iloc[-1]
+
+ if pd.isna(current_rsi) or not (30 <= current_rsi <= 70):
+ continue
+
+ # EMA
+ ema20 = closes.ewm(span=20, adjust=False).mean().iloc[-1]
+ if closes.iloc[-1] < ema20:
+ continue
+
+ # Volume above average
+ vol_avg = volumes.iloc[-20:].mean()
+ if vol_avg > 0 and volumes.iloc[-1] < vol_avg * 0.5:
+ continue
+
+ passed.append(candidate)
+
+ except Exception as exc:
+ logger.warning("technical_filter_error", symbol=candidate.symbol, error=str(exc))
+ continue
+
+ return passed
+
+ async def _llm_final_select(
+ self,
+ candidates: list[Candidate],
+ market_sentiment: Optional[dict],
+ ) -> list[SelectedStock]:
+ """Ask Claude to make final 2-3 picks from filtered candidates."""
+ # Build context
+ candidate_info = []
+ for c in candidates[:15]:
+ candidate_info.append(f"- {c.symbol}: score={c.score:.2f}, source={c.source}, reason={c.reason}")
+
+ news = await self._db.get_recent_news(hours=12)
+ top_news = [f"- [{n['source']}] {n['headline']}" for n in news[:20]]
+
+ ms_info = "No market sentiment data available."
+ if market_sentiment:
+ ms_info = (
+ f"Fear & Greed: {market_sentiment.get('fear_greed', 'N/A')} "
+ f"({market_sentiment.get('fear_greed_label', 'N/A')}), "
+ f"VIX: {market_sentiment.get('vix', 'N/A')}, "
+ f"Fed Stance: {market_sentiment.get('fed_stance', 'N/A')}"
+ )
+
+ prompt = (
+ f"You are a professional stock trader selecting {self._max_picks} stocks for "
+ f"Market-on-Close (MOC) overnight trading. You buy at market close and sell at "
+ f"next day's open.\n\n"
+ f"## Market Conditions\n{ms_info}\n\n"
+ f"## Candidate Stocks (pre-screened technically)\n"
+ + "\n".join(candidate_info) + "\n\n"
+ f"## Today's Key News\n"
+ + "\n".join(top_news) + "\n\n"
+ f"Select the best {self._max_picks} stocks. For each, provide:\n"
+ f"- symbol: ticker\n"
+ f"- side: BUY or SELL\n"
+ f"- conviction: 0.0-1.0\n"
+ f"- reason: one sentence\n"
+ f"- key_news: list of relevant headlines\n\n"
+ f"Return ONLY a JSON array. No other text."
+ )
+
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.post(
+ "https://api.anthropic.com/v1/messages",
+ headers={
+ "x-api-key": self._api_key,
+ "anthropic-version": "2023-06-01",
+ "content-type": "application/json",
+ },
+ json={
+ "model": self._model,
+ "max_tokens": 1024,
+ "messages": [{"role": "user", "content": prompt}],
+ },
+ timeout=aiohttp.ClientTimeout(total=30),
+ ) as resp:
+ if resp.status != 200:
+ logger.error("llm_final_select_failed", status=resp.status)
+ return []
+ data = await resp.json()
+ text = data["content"][0]["text"]
+ except Exception as exc:
+ logger.error("llm_final_select_error", error=str(exc))
+ return []
+
+ return _parse_llm_selections(text)
+
+
+def _parse_llm_selections(text: str) -> list[SelectedStock]:
+ """Parse LLM response into SelectedStock list."""
+ try:
+ json_match = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL)
+ if json_match:
+ text = json_match.group(1)
+ # Also try to find a bare JSON array
+ array_match = re.search(r"\[.*\]", text, re.DOTALL)
+ if array_match:
+ text = array_match.group(0)
+ items = json.loads(text)
+ except (json.JSONDecodeError, TypeError):
+ return []
+
+ selections = []
+ for item in items:
+ try:
+ selections.append(
+ SelectedStock(
+ symbol=item["symbol"],
+ side=OrderSide(item.get("side", "BUY")),
+ conviction=float(item.get("conviction", 0.5)),
+ reason=item.get("reason", ""),
+ key_news=item.get("key_news", []),
+ )
+ )
+ except (KeyError, ValueError):
+ continue
+
+ return selections
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+Run: `pytest services/strategy-engine/tests/test_stock_selector.py -v`
+Expected: All 5 tests PASS
+
+- [ ] **Step 5: Run all strategy engine tests for regressions**
+
+Run: `pytest services/strategy-engine/tests/ -v`
+Expected: All tests PASS
+
+- [ ] **Step 6: Commit**
+
+```bash
+git add services/strategy-engine/src/strategy_engine/stock_selector.py services/strategy-engine/tests/test_stock_selector.py
+git commit -m "feat: implement 3-stage stock selector (sentiment → technical → LLM)"
+```
+
+---
+
+## Phase 5: Integration (MOC + Notifications + Docker)
+
+### Task 16: Add Telegram notification for stock selections
+
+**Files:**
+- Modify: `shared/src/shared/notifier.py`
+- Modify: `shared/tests/test_notifier.py`
+
+- [ ] **Step 1: Add send_stock_selection method to notifier.py**
+
+Add this method and import to `shared/src/shared/notifier.py`:
+
+Add to imports:
+```python
+from shared.sentiment_models import SelectedStock, MarketSentiment
+```
+
+Add method to `TelegramNotifier` class:
+
+```python
+ async def send_stock_selection(
+ self,
+ selections: list[SelectedStock],
+ market: MarketSentiment | None = None,
+ ) -> None:
+ """Format and send stock selection notification."""
+ lines = [f"<b>📊 Stock Selection ({len(selections)} picks)</b>", ""]
+
+ side_emoji = {"BUY": "🟢", "SELL": "🔴"}
+
+ for i, s in enumerate(selections, 1):
+ emoji = side_emoji.get(s.side.value, "⚪")
+ lines.append(
+ f"{i}. <b>{s.symbol}</b> {emoji} {s.side.value} "
+ f"(conviction: {s.conviction:.0%})"
+ )
+ lines.append(f" {s.reason}")
+ if s.key_news:
+ lines.append(f" News: {s.key_news[0]}")
+ lines.append("")
+
+ if market:
+ lines.append(
+ f"Market: F&amp;G {market.fear_greed} ({market.fear_greed_label})"
+ + (f" | VIX {market.vix:.1f}" if market.vix else "")
+ )
+
+ await self.send("\n".join(lines))
+```
+
+- [ ] **Step 2: Add test for the new method**
+
+Add to `shared/tests/test_notifier.py`:
+
+```python
+from shared.models import OrderSide
+from shared.sentiment_models import SelectedStock, MarketSentiment
+from datetime import datetime, timezone
+
+
+async def test_send_stock_selection(notifier, mock_session):
+ """Test stock selection notification formatting."""
+ selections = [
+ SelectedStock(
+ symbol="NVDA",
+ side=OrderSide.BUY,
+ conviction=0.85,
+ reason="CHIPS Act expansion",
+ key_news=["Trump signs CHIPS Act"],
+ ),
+ ]
+ market = MarketSentiment(
+ fear_greed=55,
+ fear_greed_label="Neutral",
+ vix=18.2,
+ fed_stance="neutral",
+ market_regime="neutral",
+ updated_at=datetime.now(timezone.utc),
+ )
+ await notifier.send_stock_selection(selections, market)
+ mock_session.post.assert_called_once()
+```
+
+Note: Check `shared/tests/test_notifier.py` for existing fixture names (`notifier`, `mock_session`) and adapt accordingly.
+
+- [ ] **Step 3: Run notifier tests**
+
+Run: `pytest shared/tests/test_notifier.py -v`
+Expected: All tests PASS
+
+- [ ] **Step 4: Commit**
+
+```bash
+git add shared/src/shared/notifier.py shared/tests/test_notifier.py
+git commit -m "feat: add Telegram notification for stock selections"
+```
+
+---
+
+### Task 17: Integrate stock selector with MOC strategy
+
+**Files:**
+- Modify: `services/strategy-engine/src/strategy_engine/main.py`
+- Modify: `services/strategy-engine/src/strategy_engine/config.py`
+
+- [ ] **Step 1: Update strategy engine config**
+
+Add to `StrategyConfig` in `services/strategy-engine/src/strategy_engine/config.py`:
+
+```python
+ selector_candidates_time: str = "15:00"
+ selector_filter_time: str = "15:15"
+ selector_final_time: str = "15:30"
+ selector_max_picks: int = 3
+ anthropic_api_key: str = ""
+ anthropic_model: str = "claude-sonnet-4-20250514"
+```
+
+- [ ] **Step 2: Add stock selector scheduling to main.py**
+
+Add a new coroutine to `services/strategy-engine/src/strategy_engine/main.py` that runs the stock selector at the configured times. Add imports:
+
+```python
+from shared.alpaca import AlpacaClient
+from shared.db import Database
+from shared.notifier import TelegramNotifier
+from shared.sentiment_models import MarketSentiment
+from strategy_engine.stock_selector import StockSelector
+```
+
+Add the selector loop function:
+
+```python
+async def run_stock_selector(
+ selector: StockSelector,
+ notifier: TelegramNotifier,
+ db: Database,
+ config: StrategyConfig,
+ log,
+) -> None:
+ """Run the stock selector once per day at the configured time."""
+ import zoneinfo
+
+ et = zoneinfo.ZoneInfo("America/New_York")
+
+ while True:
+ now_et = datetime.now(et)
+ target_hour, target_min = map(int, config.selector_final_time.split(":"))
+
+ # Check if it's time to run (within 1-minute window)
+ if now_et.hour == target_hour and now_et.minute == target_min:
+ log.info("stock_selector_running")
+ try:
+ selections = await selector.select()
+ if selections:
+ ms_data = await db.get_latest_market_sentiment()
+ ms = None
+ if ms_data:
+ ms = MarketSentiment(**ms_data)
+ await notifier.send_stock_selection(selections, ms)
+ log.info(
+ "stock_selector_complete",
+ picks=[s.symbol for s in selections],
+ )
+ else:
+ log.info("stock_selector_no_picks")
+ except Exception as exc:
+ log.error("stock_selector_error", error=str(exc))
+ # Sleep past this minute to avoid re-triggering
+ await asyncio.sleep(120)
+ else:
+ await asyncio.sleep(30)
+```
+
+In the `run()` function, add after creating the broker:
+
+```python
+ db = Database(config.database_url)
+ await db.connect()
+
+ alpaca = AlpacaClient(
+ api_key=config.alpaca_api_key,
+ api_secret=config.alpaca_api_secret,
+ paper=config.alpaca_paper,
+ )
+```
+
+And add the selector if anthropic key is configured:
+
+```python
+ if config.anthropic_api_key:
+ selector = StockSelector(
+ db=db,
+ broker=broker,
+ alpaca=alpaca,
+ anthropic_api_key=config.anthropic_api_key,
+ anthropic_model=config.anthropic_model,
+ max_picks=config.selector_max_picks,
+ )
+ tasks.append(asyncio.create_task(
+ run_stock_selector(selector, notifier, db, config, log)
+ ))
+ log.info("stock_selector_enabled", time=config.selector_final_time)
+```
+
+Add to the `finally` block:
+
+```python
+ await alpaca.close()
+ await db.close()
+```
+
+- [ ] **Step 3: Run strategy engine tests for regressions**
+
+Run: `pytest services/strategy-engine/tests/ -v`
+Expected: All tests PASS
+
+- [ ] **Step 4: Commit**
+
+```bash
+git add services/strategy-engine/src/strategy_engine/main.py services/strategy-engine/src/strategy_engine/config.py
+git commit -m "feat: integrate stock selector into strategy engine scheduler"
+```
+
+---
+
+### Task 18: Update Docker Compose and .env
+
+**Files:**
+- Modify: `docker-compose.yml`
+- Modify: `.env.example` (already done in Task 5, just verify)
+
+- [ ] **Step 1: Add news-collector service to docker-compose.yml**
+
+Add before the `loki:` service block in `docker-compose.yml`:
+
+```yaml
+ news-collector:
+ build:
+ context: .
+ dockerfile: services/news-collector/Dockerfile
+ env_file: .env
+ ports:
+ - "8084:8084"
+ depends_on:
+ redis:
+ condition: service_healthy
+ postgres:
+ condition: service_healthy
+ healthcheck:
+ test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8084/health')"]
+ interval: 10s
+ timeout: 5s
+ retries: 3
+ restart: unless-stopped
+```
+
+- [ ] **Step 2: Verify compose file is valid**
+
+Run: `docker compose config --quiet 2>&1 || echo "INVALID"`
+Expected: No output (valid) or compose config displayed without errors
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add docker-compose.yml
+git commit -m "feat: add news-collector service to Docker Compose"
+```
+
+---
+
+### Task 19: Run full test suite and lint
+
+- [ ] **Step 1: Install test dependencies**
+
+Run: `pip install -e shared/ && pip install aiosqlite feedparser nltk aioresponses`
+
+- [ ] **Step 2: Download VADER lexicon**
+
+Run: `python -c "import nltk; nltk.download('vader_lexicon', quiet=True)"`
+
+- [ ] **Step 3: Run lint**
+
+Run: `make lint`
+Expected: No lint errors. If there are errors, fix them.
+
+- [ ] **Step 4: Run full test suite**
+
+Run: `make test`
+Expected: All tests PASS
+
+- [ ] **Step 5: Fix any issues found in steps 3-4**
+
+If lint or tests fail, fix the issues and re-run.
+
+- [ ] **Step 6: Final commit if any fixes were needed**
+
+```bash
+git add -A
+git commit -m "fix: resolve lint and test issues from news selector integration"
+```
diff --git a/docs/superpowers/plans/2026-04-02-platform-upgrade.md b/docs/superpowers/plans/2026-04-02-platform-upgrade.md
new file mode 100644
index 0000000..c28d287
--- /dev/null
+++ b/docs/superpowers/plans/2026-04-02-platform-upgrade.md
@@ -0,0 +1,1991 @@
+# Platform Upgrade Implementation Plan
+
+> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
+
+**Goal:** Upgrade the trading platform across 5 phases: shared library hardening, infrastructure improvements, service-level fixes, API security, and operational maturity.
+
+**Architecture:** Bottom-up approach — harden the shared library first (resilience, DB pooling, Redis resilience, config validation), then improve infrastructure (Docker, DB indexes), then fix all services (graceful shutdown, exception handling), then add API security (auth, CORS, rate limiting), and finally improve operations (CI/CD, linting, alerting).
+
+**Tech Stack:** Python 3.12, asyncio, tenacity, SQLAlchemy 2.0 async, Redis Streams, FastAPI, slowapi, Ruff, GitHub Actions, Prometheus Alertmanager
+
+---
+
+## File Structure
+
+### New Files
+- `shared/src/shared/resilience.py` — Retry decorator, circuit breaker, timeout wrapper
+- `shared/tests/test_resilience.py` — Tests for resilience module
+- `shared/alembic/versions/003_add_missing_indexes.py` — DB index migration
+- `.dockerignore` — Docker build exclusions
+- `services/api/src/trading_api/dependencies/auth.py` — Bearer token auth dependency
+- `.github/workflows/ci.yml` — GitHub Actions CI pipeline
+- `monitoring/prometheus/alert_rules.yml` — Prometheus alerting rules
+
+### Modified Files
+- `shared/src/shared/db.py` — Add connection pool config
+- `shared/src/shared/broker.py` — Add Redis resilience
+- `shared/src/shared/config.py` — Add validators, SecretStr, new fields
+- `shared/pyproject.toml` — Pin deps, add tenacity
+- `pyproject.toml` — Enhanced ruff rules, pytest-cov
+- `services/strategy-engine/src/strategy_engine/stock_selector.py` — Fix bug, deduplicate, session reuse
+- `services/*/src/*/main.py` — Signal handlers, exception specialization (all 6 services)
+- `services/*/Dockerfile` — Multi-stage builds, non-root user (all 7 Dockerfiles)
+- `services/api/pyproject.toml` — Add slowapi
+- `services/api/src/trading_api/main.py` — CORS, auth, rate limiting
+- `services/api/src/trading_api/routers/*.py` — Input validation, response models
+- `docker-compose.yml` — Remove hardcoded creds, add resource limits, networks
+- `.env.example` — Add new fields, mark secrets
+- `monitoring/prometheus.yml` — Reference alert rules
+
+---
+
+## Phase 1: Shared Library Hardening
+
+### Task 1: Implement Resilience Module
+
+**Files:**
+- Create: `shared/src/shared/resilience.py`
+- Create: `shared/tests/test_resilience.py`
+- Modify: `shared/pyproject.toml:6-18`
+
+- [ ] **Step 1: Add tenacity dependency to shared/pyproject.toml**
+
+In `shared/pyproject.toml`, add `tenacity` to the dependencies list:
+
+```python
+dependencies = [
+ "pydantic>=2.8,<3",
+ "pydantic-settings>=2.0,<3",
+ "redis>=5.0,<6",
+ "asyncpg>=0.29,<1",
+ "sqlalchemy[asyncio]>=2.0,<3",
+ "alembic>=1.13,<2",
+ "structlog>=24.0,<25",
+ "prometheus-client>=0.20,<1",
+ "pyyaml>=6.0,<7",
+ "aiohttp>=3.9,<4",
+ "rich>=13.0,<14",
+ "tenacity>=8.2,<10",
+]
+```
+
+Note: This also pins all existing dependencies with upper bounds.
+
+- [ ] **Step 2: Write failing tests for retry_async**
+
+Create `shared/tests/test_resilience.py`:
+
+```python
+"""Tests for the resilience module."""
+
+import asyncio
+
+import pytest
+
+from shared.resilience import retry_async, CircuitBreaker, async_timeout
+
+
+class TestRetryAsync:
+ async def test_succeeds_without_retry(self):
+ call_count = 0
+
+ @retry_async(max_retries=3)
+ async def succeed():
+ nonlocal call_count
+ call_count += 1
+ return "ok"
+
+ result = await succeed()
+ assert result == "ok"
+ assert call_count == 1
+
+ async def test_retries_on_failure_then_succeeds(self):
+ call_count = 0
+
+ @retry_async(max_retries=3, base_delay=0.01)
+ async def fail_twice():
+ nonlocal call_count
+ call_count += 1
+ if call_count < 3:
+ raise ConnectionError("fail")
+ return "ok"
+
+ result = await fail_twice()
+ assert result == "ok"
+ assert call_count == 3
+
+ async def test_raises_after_max_retries(self):
+ @retry_async(max_retries=2, base_delay=0.01)
+ async def always_fail():
+ raise ConnectionError("fail")
+
+ with pytest.raises(ConnectionError):
+ await always_fail()
+
+ async def test_no_retry_on_excluded_exception(self):
+ call_count = 0
+
+ @retry_async(max_retries=3, base_delay=0.01, exclude=(ValueError,))
+ async def raise_value_error():
+ nonlocal call_count
+ call_count += 1
+ raise ValueError("bad input")
+
+ with pytest.raises(ValueError):
+ await raise_value_error()
+ assert call_count == 1
+
+
+class TestCircuitBreaker:
+ async def test_closed_allows_calls(self):
+ cb = CircuitBreaker(failure_threshold=3, cooldown=0.1)
+
+ async def succeed():
+ return "ok"
+
+ result = await cb.call(succeed)
+ assert result == "ok"
+
+ async def test_opens_after_threshold(self):
+ cb = CircuitBreaker(failure_threshold=2, cooldown=60)
+
+ async def fail():
+ raise ConnectionError("fail")
+
+ for _ in range(2):
+ with pytest.raises(ConnectionError):
+ await cb.call(fail)
+
+ with pytest.raises(RuntimeError, match="Circuit breaker is open"):
+ await cb.call(fail)
+
+ async def test_half_open_after_cooldown(self):
+ cb = CircuitBreaker(failure_threshold=2, cooldown=0.05)
+
+ call_count = 0
+
+ async def fail_then_succeed():
+ nonlocal call_count
+ call_count += 1
+ if call_count <= 2:
+ raise ConnectionError("fail")
+ return "recovered"
+
+ # Trip the breaker
+ for _ in range(2):
+ with pytest.raises(ConnectionError):
+ await cb.call(fail_then_succeed)
+
+ # Wait for cooldown
+ await asyncio.sleep(0.1)
+
+ # Should allow one call (half-open)
+ result = await cb.call(fail_then_succeed)
+ assert result == "recovered"
+
+
+class TestAsyncTimeout:
+ async def test_completes_within_timeout(self):
+ async with async_timeout(1.0):
+ await asyncio.sleep(0.01)
+
+ async def test_raises_on_timeout(self):
+ with pytest.raises(asyncio.TimeoutError):
+ async with async_timeout(0.01):
+ await asyncio.sleep(1.0)
+```
+
+- [ ] **Step 3: Run tests to verify they fail**
+
+Run: `pytest shared/tests/test_resilience.py -v`
+Expected: FAIL with `ImportError: cannot import name 'retry_async' from 'shared.resilience'`
+
+- [ ] **Step 4: Implement resilience module**
+
+Write `shared/src/shared/resilience.py`:
+
+```python
+"""Resilience utilities: retry, circuit breaker, timeout."""
+
+import asyncio
+import functools
+import logging
+import time
+from contextlib import asynccontextmanager
+
+logger = logging.getLogger(__name__)
+
+
+def retry_async(
+ max_retries: int = 3,
+ base_delay: float = 1.0,
+ max_delay: float = 30.0,
+ exclude: tuple[type[Exception], ...] = (),
+):
+ """Decorator for async functions with exponential backoff + jitter.
+
+ Args:
+ max_retries: Maximum number of retry attempts.
+ base_delay: Initial delay in seconds between retries.
+ max_delay: Maximum delay cap in seconds.
+ exclude: Exception types that should NOT be retried.
+ """
+
+ def decorator(func):
+ @functools.wraps(func)
+ async def wrapper(*args, **kwargs):
+ last_exc = None
+ for attempt in range(max_retries + 1):
+ try:
+ return await func(*args, **kwargs)
+ except exclude:
+ raise
+ except Exception as exc:
+ last_exc = exc
+ if attempt == max_retries:
+ raise
+ delay = min(base_delay * (2**attempt), max_delay)
+ # Add jitter: 50-100% of delay
+ import random
+
+ delay = delay * (0.5 + random.random() * 0.5)
+ logger.warning(
+ "retry attempt=%d/%d delay=%.2fs error=%s func=%s",
+ attempt + 1,
+ max_retries,
+ delay,
+ str(exc),
+ func.__name__,
+ )
+ await asyncio.sleep(delay)
+ raise last_exc # Should not reach here, but just in case
+
+ return wrapper
+
+ return decorator
+
+
+class CircuitBreaker:
+ """Circuit breaker: opens after consecutive failures, auto-recovers after cooldown."""
+
+ def __init__(self, failure_threshold: int = 5, cooldown: float = 60.0) -> None:
+ self._failure_threshold = failure_threshold
+ self._cooldown = cooldown
+ self._failure_count = 0
+ self._last_failure_time: float = 0
+ self._state = "closed" # closed, open, half_open
+
+ async def call(self, func, *args, **kwargs):
+ if self._state == "open":
+ if time.monotonic() - self._last_failure_time >= self._cooldown:
+ self._state = "half_open"
+ else:
+ raise RuntimeError("Circuit breaker is open")
+
+ try:
+ result = await func(*args, **kwargs)
+ self._failure_count = 0
+ self._state = "closed"
+ return result
+ except Exception:
+ self._failure_count += 1
+ self._last_failure_time = time.monotonic()
+ if self._failure_count >= self._failure_threshold:
+ self._state = "open"
+ logger.error(
+ "circuit_breaker_opened failures=%d cooldown=%.0fs",
+ self._failure_count,
+ self._cooldown,
+ )
+ raise
+
+
+@asynccontextmanager
+async def async_timeout(seconds: float):
+ """Async context manager that raises TimeoutError after given seconds."""
+ try:
+ async with asyncio.timeout(seconds):
+ yield
+ except TimeoutError:
+ raise asyncio.TimeoutError(f"Operation timed out after {seconds}s")
+```
+
+- [ ] **Step 5: Run tests to verify they pass**
+
+Run: `pytest shared/tests/test_resilience.py -v`
+Expected: All 8 tests PASS
+
+- [ ] **Step 6: Commit**
+
+```bash
+git add shared/src/shared/resilience.py shared/tests/test_resilience.py shared/pyproject.toml
+git commit -m "feat: implement resilience module with retry, circuit breaker, timeout"
+```
+
+---
+
+### Task 2: Add DB Connection Pooling
+
+**Files:**
+- Modify: `shared/src/shared/db.py:39-44`
+- Modify: `shared/src/shared/config.py:10-11`
+- Modify: `shared/tests/test_db.py` (add pool config test)
+
+- [ ] **Step 1: Write failing test for pool config**
+
+Add to `shared/tests/test_db.py`:
+
+```python
+async def test_connect_configures_pool(tmp_path):
+ """Engine should be created with pool configuration."""
+ db = Database("sqlite+aiosqlite:///:memory:")
+ await db.connect()
+ engine = db._engine
+ pool = engine.pool
+ # aiosqlite uses StaticPool so we just verify connect works
+ assert engine is not None
+ await db.close()
+```
+
+- [ ] **Step 2: Add pool settings to config.py**
+
+In `shared/src/shared/config.py`, add after line 11 (`database_url`):
+
+```python
+ db_pool_size: int = 20
+ db_max_overflow: int = 10
+ db_pool_recycle: int = 3600
+```
+
+- [ ] **Step 3: Update Database.connect() with pool parameters**
+
+In `shared/src/shared/db.py`, replace line 41:
+
+```python
+ self._engine = create_async_engine(self._database_url)
+```
+
+with:
+
+```python
+ self._engine = create_async_engine(
+ self._database_url,
+ pool_pre_ping=True,
+ pool_size=pool_size,
+ max_overflow=max_overflow,
+ pool_recycle=pool_recycle,
+ )
+```
+
+Update the `connect` method signature to accept pool params:
+
+```python
+ async def connect(
+ self,
+ pool_size: int = 20,
+ max_overflow: int = 10,
+ pool_recycle: int = 3600,
+ ) -> None:
+ """Create the async engine, session factory, and all tables."""
+ if self._database_url.startswith("sqlite"):
+ self._engine = create_async_engine(self._database_url)
+ else:
+ self._engine = create_async_engine(
+ self._database_url,
+ pool_pre_ping=True,
+ pool_size=pool_size,
+ max_overflow=max_overflow,
+ pool_recycle=pool_recycle,
+ )
+ self._session_factory = async_sessionmaker(self._engine, expire_on_commit=False)
+ async with self._engine.begin() as conn:
+ await conn.run_sync(Base.metadata.create_all)
+```
+
+- [ ] **Step 4: Run tests**
+
+Run: `pytest shared/tests/test_db.py -v`
+Expected: PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add shared/src/shared/db.py shared/src/shared/config.py shared/tests/test_db.py
+git commit -m "feat: add DB connection pooling with configurable pool_size, overflow, recycle"
+```
+
+---
+
+### Task 3: Add Redis Resilience
+
+**Files:**
+- Modify: `shared/src/shared/broker.py:1-13,15-18,102-104`
+- Create: `shared/tests/test_broker_resilience.py`
+
+- [ ] **Step 1: Write failing tests for Redis resilience**
+
+Create `shared/tests/test_broker_resilience.py`:
+
+```python
+"""Tests for Redis broker resilience features."""
+
+from unittest.mock import AsyncMock, patch
+
+import pytest
+
+from shared.broker import RedisBroker
+
+
+class TestBrokerResilience:
+ async def test_publish_retries_on_connection_error(self):
+ broker = RedisBroker.__new__(RedisBroker)
+ mock_redis = AsyncMock()
+ call_count = 0
+
+ async def xadd_failing(*args, **kwargs):
+ nonlocal call_count
+ call_count += 1
+ if call_count < 3:
+ raise ConnectionError("Redis connection lost")
+ return "msg-id"
+
+ mock_redis.xadd = xadd_failing
+ broker._redis = mock_redis
+
+ await broker.publish("test-stream", {"key": "value"})
+ assert call_count == 3
+
+ async def test_ping_retries_on_timeout(self):
+ broker = RedisBroker.__new__(RedisBroker)
+ mock_redis = AsyncMock()
+ call_count = 0
+
+ async def ping_failing():
+ nonlocal call_count
+ call_count += 1
+ if call_count < 2:
+ raise TimeoutError("timeout")
+ return True
+
+ mock_redis.ping = ping_failing
+ broker._redis = mock_redis
+
+ result = await broker.ping()
+ assert result is True
+ assert call_count == 2
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest shared/tests/test_broker_resilience.py -v`
+Expected: FAIL (publish doesn't retry)
+
+- [ ] **Step 3: Add resilience to broker.py**
+
+Replace `shared/src/shared/broker.py`:
+
+```python
+"""Redis Streams broker for the trading platform."""
+
+import json
+import logging
+from typing import Any
+
+import redis.asyncio
+
+from shared.resilience import retry_async
+
+logger = logging.getLogger(__name__)
+
+
+class RedisBroker:
+ """Async Redis Streams broker for publishing and reading events."""
+
+ def __init__(self, redis_url: str) -> None:
+ self._redis = redis.asyncio.from_url(
+ redis_url,
+ socket_keepalive=True,
+ health_check_interval=30,
+ retry_on_timeout=True,
+ )
+
+ @retry_async(max_retries=3, base_delay=0.5, exclude=(ValueError,))
+ async def publish(self, stream: str, data: dict[str, Any]) -> None:
+ """Publish a message to a Redis stream."""
+ payload = json.dumps(data)
+ await self._redis.xadd(stream, {"payload": payload})
+
+ async def ensure_group(self, stream: str, group: str) -> None:
+ """Create a consumer group if it doesn't exist."""
+ try:
+ await self._redis.xgroup_create(stream, group, id="0", mkstream=True)
+ except redis.ResponseError as e:
+ if "BUSYGROUP" not in str(e):
+ raise
+
+ @retry_async(max_retries=3, base_delay=0.5, exclude=(ValueError,))
+ async def read_group(
+ self,
+ stream: str,
+ group: str,
+ consumer: str,
+ count: int = 10,
+ block: int = 0,
+ ) -> list[tuple[str, dict[str, Any]]]:
+ """Read messages from a consumer group. Returns list of (message_id, data)."""
+ results = await self._redis.xreadgroup(
+ group, consumer, {stream: ">"}, count=count, block=block
+ )
+ messages = []
+ if results:
+ for _stream, entries in results:
+ for msg_id, fields in entries:
+ payload = fields.get(b"payload") or fields.get("payload")
+ if payload:
+ if isinstance(payload, bytes):
+ payload = payload.decode()
+ if isinstance(msg_id, bytes):
+ msg_id = msg_id.decode()
+ messages.append((msg_id, json.loads(payload)))
+ return messages
+
+ async def ack(self, stream: str, group: str, *msg_ids: str) -> None:
+ """Acknowledge messages in a consumer group."""
+ if msg_ids:
+ await self._redis.xack(stream, group, *msg_ids)
+
+ async def read_pending(
+ self,
+ stream: str,
+ group: str,
+ consumer: str,
+ count: int = 10,
+ ) -> list[tuple[str, dict[str, Any]]]:
+ """Read pending (unacknowledged) messages for this consumer."""
+ results = await self._redis.xreadgroup(group, consumer, {stream: "0"}, count=count)
+ messages = []
+ if results:
+ for _stream, entries in results:
+ for msg_id, fields in entries:
+ if not fields:
+ continue
+ payload = fields.get(b"payload") or fields.get("payload")
+ if payload:
+ if isinstance(payload, bytes):
+ payload = payload.decode()
+ if isinstance(msg_id, bytes):
+ msg_id = msg_id.decode()
+ messages.append((msg_id, json.loads(payload)))
+ return messages
+
+ async def read(
+ self,
+ stream: str,
+ last_id: str = "$",
+ count: int = 10,
+ block: int = 0,
+ ) -> list[dict[str, Any]]:
+ """Read messages (original method, kept for backward compatibility)."""
+ results = await self._redis.xread({stream: last_id}, count=count, block=block)
+ messages = []
+ if results:
+ for _stream, entries in results:
+ for _msg_id, fields in entries:
+ payload = fields.get(b"payload") or fields.get("payload")
+ if payload:
+ if isinstance(payload, bytes):
+ payload = payload.decode()
+ messages.append(json.loads(payload))
+ return messages
+
+ @retry_async(max_retries=2, base_delay=0.5)
+ async def ping(self) -> bool:
+ """Ping the Redis server; return True if reachable."""
+ return await self._redis.ping()
+
+ async def close(self) -> None:
+ """Close the Redis connection."""
+ await self._redis.aclose()
+```
+
+- [ ] **Step 4: Run tests**
+
+Run: `pytest shared/tests/test_broker_resilience.py -v`
+Expected: PASS
+
+Run: `pytest shared/tests/test_broker.py -v`
+Expected: PASS (existing tests still work)
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add shared/src/shared/broker.py shared/tests/test_broker_resilience.py
+git commit -m "feat: add retry and resilience to Redis broker with keepalive"
+```
+
+---
+
+### Task 4: Config Validation & SecretStr
+
+**Files:**
+- Modify: `shared/src/shared/config.py`
+- Create: `shared/tests/test_config_validation.py`
+
+- [ ] **Step 1: Write failing tests for config validation**
+
+Create `shared/tests/test_config_validation.py`:
+
+```python
+"""Tests for config validation."""
+
+import pytest
+from pydantic import ValidationError
+
+from shared.config import Settings
+
+
+class TestConfigValidation:
+ def test_valid_defaults(self):
+ settings = Settings()
+ assert settings.risk_max_position_size == 0.1
+
+ def test_invalid_position_size(self):
+ with pytest.raises(ValidationError, match="risk_max_position_size"):
+ Settings(risk_max_position_size=-0.1)
+
+ def test_invalid_health_port(self):
+ with pytest.raises(ValidationError, match="health_port"):
+ Settings(health_port=80)
+
+ def test_invalid_log_level(self):
+ with pytest.raises(ValidationError, match="log_level"):
+ Settings(log_level="INVALID")
+
+ def test_secret_fields_masked(self):
+ settings = Settings(alpaca_api_key="my-secret-key")
+ assert "my-secret-key" not in repr(settings)
+ assert settings.alpaca_api_key.get_secret_value() == "my-secret-key"
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+Run: `pytest shared/tests/test_config_validation.py -v`
+Expected: FAIL
+
+- [ ] **Step 3: Update config.py with validators and SecretStr**
+
+Replace `shared/src/shared/config.py`:
+
+```python
+"""Shared configuration settings for the trading platform."""
+
+from pydantic import SecretStr, field_validator
+from pydantic_settings import BaseSettings
+
+
+class Settings(BaseSettings):
+ # Alpaca
+ alpaca_api_key: SecretStr = SecretStr("")
+ alpaca_api_secret: SecretStr = SecretStr("")
+ alpaca_paper: bool = True
+ # Infrastructure
+ redis_url: SecretStr = SecretStr("redis://localhost:6379")
+ database_url: SecretStr = SecretStr("postgresql://trading:trading@localhost:5432/trading")
+ # DB pool
+ db_pool_size: int = 20
+ db_max_overflow: int = 10
+ db_pool_recycle: int = 3600
+ # Logging
+ log_level: str = "INFO"
+ log_format: str = "json"
+ # Health
+ health_port: int = 8080
+ metrics_auth_token: str = ""
+ # Risk
+ risk_max_position_size: float = 0.1
+ risk_stop_loss_pct: float = 5.0
+ risk_daily_loss_limit_pct: float = 10.0
+ risk_trailing_stop_pct: float = 0.0
+ risk_max_open_positions: int = 10
+ risk_volatility_lookback: int = 20
+ risk_volatility_scale: bool = False
+ risk_max_portfolio_exposure: float = 0.8
+ risk_max_correlated_exposure: float = 0.5
+ risk_correlation_threshold: float = 0.7
+ risk_var_confidence: float = 0.95
+ risk_var_limit_pct: float = 5.0
+ risk_drawdown_reduction_threshold: float = 0.1
+ risk_drawdown_halt_threshold: float = 0.2
+ risk_max_consecutive_losses: int = 5
+ risk_loss_pause_minutes: int = 60
+ dry_run: bool = True
+ # Telegram
+ telegram_bot_token: SecretStr = SecretStr("")
+ telegram_chat_id: str = ""
+ telegram_enabled: bool = False
+ # News
+ finnhub_api_key: SecretStr = SecretStr("")
+ news_poll_interval: int = 300
+ sentiment_aggregate_interval: int = 900
+ # Stock selector
+ selector_final_time: str = "15:30"
+ selector_max_picks: int = 3
+ # LLM
+ anthropic_api_key: SecretStr = SecretStr("")
+ anthropic_model: str = "claude-sonnet-4-20250514"
+ # API security
+ api_auth_token: SecretStr = SecretStr("")
+ cors_origins: str = "http://localhost:3000"
+
+ model_config = {"env_file": ".env", "env_file_encoding": "utf-8", "extra": "ignore"}
+
+ @field_validator("risk_max_position_size")
+ @classmethod
+ def validate_position_size(cls, v: float) -> float:
+ if v <= 0 or v > 1:
+ raise ValueError("risk_max_position_size must be between 0 and 1 (exclusive)")
+ return v
+
+ @field_validator("health_port")
+ @classmethod
+ def validate_health_port(cls, v: int) -> int:
+ if v < 1024 or v > 65535:
+ raise ValueError("health_port must be between 1024 and 65535")
+ return v
+
+ @field_validator("log_level")
+ @classmethod
+ def validate_log_level(cls, v: str) -> str:
+ valid = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"}
+ if v.upper() not in valid:
+ raise ValueError(f"log_level must be one of {valid}")
+ return v.upper()
+```
+
+- [ ] **Step 4: Update all consumers to use .get_secret_value()**
+
+Every place that reads `settings.alpaca_api_key` etc. must now call `.get_secret_value()`. Key files to update:
+
+**`shared/src/shared/alpaca.py`** — where AlpacaClient is instantiated (in each service main.py), change:
+```python
+# Before:
+alpaca = AlpacaClient(cfg.alpaca_api_key, cfg.alpaca_api_secret, paper=cfg.alpaca_paper)
+# After:
+alpaca = AlpacaClient(
+ cfg.alpaca_api_key.get_secret_value(),
+ cfg.alpaca_api_secret.get_secret_value(),
+ paper=cfg.alpaca_paper,
+)
+```
+
+**Each service main.py** — where `Database(cfg.database_url)` and `RedisBroker(cfg.redis_url)` are called:
+```python
+# Before:
+db = Database(cfg.database_url)
+broker = RedisBroker(cfg.redis_url)
+# After:
+db = Database(cfg.database_url.get_secret_value())
+broker = RedisBroker(cfg.redis_url.get_secret_value())
+```
+
+**`shared/src/shared/notifier.py`** — where telegram_bot_token is used:
+```python
+# Change token access to .get_secret_value()
+```
+
+**`services/strategy-engine/src/strategy_engine/main.py`** — where anthropic_api_key is passed:
+```python
+# Before:
+anthropic_api_key=cfg.anthropic_api_key,
+# After:
+anthropic_api_key=cfg.anthropic_api_key.get_secret_value(),
+```
+
+**`services/news-collector/src/news_collector/main.py`** — where finnhub_api_key is used:
+```python
+# Before:
+cfg.finnhub_api_key
+# After:
+cfg.finnhub_api_key.get_secret_value()
+```
+
+- [ ] **Step 5: Run all tests**
+
+Run: `pytest shared/tests/test_config_validation.py -v`
+Expected: PASS
+
+Run: `pytest -v`
+Expected: All tests PASS (no regressions from SecretStr changes)
+
+- [ ] **Step 6: Commit**
+
+```bash
+git add shared/src/shared/config.py shared/tests/test_config_validation.py
+git add services/*/src/*/main.py shared/src/shared/notifier.py
+git commit -m "feat: add config validation, SecretStr for secrets, API security fields"
+```
+
+---
+
+### Task 5: Pin All Dependencies
+
+**Files:**
+- Modify: `shared/pyproject.toml` (already done in Task 1)
+- Modify: `services/strategy-engine/pyproject.toml`
+- Modify: `services/backtester/pyproject.toml`
+- Modify: `services/api/pyproject.toml`
+- Modify: `services/news-collector/pyproject.toml`
+- Modify: `services/data-collector/pyproject.toml`
+- Modify: `services/order-executor/pyproject.toml`
+- Modify: `services/portfolio-manager/pyproject.toml`
+
+- [ ] **Step 1: Pin service dependencies**
+
+`services/strategy-engine/pyproject.toml`:
+```toml
+dependencies = [
+ "pandas>=2.1,<3",
+ "numpy>=1.26,<3",
+ "trading-shared",
+]
+```
+
+`services/backtester/pyproject.toml`:
+```toml
+dependencies = ["pandas>=2.1,<3", "numpy>=1.26,<3", "rich>=13.0,<14", "trading-shared"]
+```
+
+`services/api/pyproject.toml`:
+```toml
+dependencies = [
+ "fastapi>=0.110,<1",
+ "uvicorn>=0.27,<1",
+ "slowapi>=0.1.9,<1",
+ "trading-shared",
+]
+```
+
+`services/news-collector/pyproject.toml`:
+```toml
+dependencies = [
+ "trading-shared",
+ "feedparser>=6.0,<7",
+ "nltk>=3.8,<4",
+ "aiohttp>=3.9,<4",
+]
+```
+
+`shared/pyproject.toml` optional deps:
+```toml
+[project.optional-dependencies]
+dev = [
+ "pytest>=8.0,<9",
+ "pytest-asyncio>=0.23,<1",
+ "ruff>=0.4,<1",
+]
+claude = [
+ "anthropic>=0.40,<1",
+]
+```
+
+- [ ] **Step 2: Verify installation works**
+
+Run: `pip install -e shared/ && pip install -e services/strategy-engine/ && pip install -e services/api/`
+Expected: No errors
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add shared/pyproject.toml services/*/pyproject.toml
+git commit -m "chore: pin all dependencies with upper bounds"
+```
+
+---
+
+## Phase 2: Infrastructure Hardening
+
+### Task 6: Docker Secrets & Environment Cleanup
+
+**Files:**
+- Modify: `docker-compose.yml:17-21`
+- Modify: `.env.example`
+
+- [ ] **Step 1: Replace hardcoded Postgres credentials in docker-compose.yml**
+
+In `docker-compose.yml`, replace the postgres service environment:
+
+```yaml
+ postgres:
+ image: postgres:16-alpine
+ environment:
+ POSTGRES_USER: ${POSTGRES_USER:-trading}
+ POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-trading}
+ POSTGRES_DB: ${POSTGRES_DB:-trading}
+```
+
+- [ ] **Step 2: Update .env.example with secret annotations**
+
+Add to `.env.example`:
+
+```bash
+# === SECRETS (keep secure, do not commit .env) ===
+ALPACA_API_KEY=
+ALPACA_API_SECRET=
+DATABASE_URL=postgresql+asyncpg://trading:trading@localhost:5432/trading
+REDIS_URL=redis://localhost:6379
+TELEGRAM_BOT_TOKEN=
+FINNHUB_API_KEY=
+ANTHROPIC_API_KEY=
+API_AUTH_TOKEN=
+POSTGRES_USER=trading
+POSTGRES_PASSWORD=trading
+POSTGRES_DB=trading
+
+# === CONFIGURATION ===
+ALPACA_PAPER=true
+DRY_RUN=true
+LOG_LEVEL=INFO
+LOG_FORMAT=json
+HEALTH_PORT=8080
+# ... (keep existing config vars)
+
+# === API SECURITY ===
+CORS_ORIGINS=http://localhost:3000
+```
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add docker-compose.yml .env.example
+git commit -m "fix: move hardcoded postgres credentials to .env, annotate secrets"
+```
+
+---
+
+### Task 7: Dockerfile Optimization
+
+**Files:**
+- Create: `.dockerignore`
+- Modify: All 7 Dockerfiles in `services/*/Dockerfile`
+
+- [ ] **Step 1: Create .dockerignore**
+
+Create `.dockerignore` at project root:
+
+```
+__pycache__
+*.pyc
+*.pyo
+.git
+.github
+.venv
+.env
+.env.*
+!.env.example
+tests/
+docs/
+*.md
+.ruff_cache
+.pytest_cache
+.mypy_cache
+monitoring/
+scripts/
+cli/
+```
+
+- [ ] **Step 2: Update data-collector Dockerfile**
+
+Replace `services/data-collector/Dockerfile`:
+
+```dockerfile
+FROM python:3.12-slim AS builder
+WORKDIR /app
+COPY shared/ shared/
+RUN pip install --no-cache-dir ./shared
+COPY services/data-collector/ services/data-collector/
+RUN pip install --no-cache-dir ./services/data-collector
+
+FROM python:3.12-slim
+RUN useradd -r -s /bin/false appuser
+WORKDIR /app
+COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
+COPY --from=builder /usr/local/bin /usr/local/bin
+ENV PYTHONPATH=/app
+USER appuser
+CMD ["python", "-m", "data_collector.main"]
+```
+
+- [ ] **Step 3: Update all other Dockerfiles with same pattern**
+
+Apply the same multi-stage + non-root pattern to:
+- `services/strategy-engine/Dockerfile` (also copies strategies/)
+- `services/order-executor/Dockerfile`
+- `services/portfolio-manager/Dockerfile`
+- `services/api/Dockerfile` (also copies strategies/, uses uvicorn CMD)
+- `services/news-collector/Dockerfile` (also runs nltk download)
+- `services/backtester/Dockerfile` (also copies strategies/)
+
+For **strategy-engine** Dockerfile:
+```dockerfile
+FROM python:3.12-slim AS builder
+WORKDIR /app
+COPY shared/ shared/
+RUN pip install --no-cache-dir ./shared
+COPY services/strategy-engine/ services/strategy-engine/
+RUN pip install --no-cache-dir ./services/strategy-engine
+
+FROM python:3.12-slim
+RUN useradd -r -s /bin/false appuser
+WORKDIR /app
+COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
+COPY --from=builder /usr/local/bin /usr/local/bin
+COPY services/strategy-engine/strategies/ /app/strategies/
+ENV PYTHONPATH=/app
+USER appuser
+CMD ["python", "-m", "strategy_engine.main"]
+```
+
+For **news-collector** Dockerfile:
+```dockerfile
+FROM python:3.12-slim AS builder
+WORKDIR /app
+COPY shared/ shared/
+RUN pip install --no-cache-dir ./shared
+COPY services/news-collector/ services/news-collector/
+RUN pip install --no-cache-dir ./services/news-collector
+RUN python -c "import nltk; nltk.download('vader_lexicon', download_dir='/usr/local/nltk_data')"
+
+FROM python:3.12-slim
+RUN useradd -r -s /bin/false appuser
+WORKDIR /app
+COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
+COPY --from=builder /usr/local/bin /usr/local/bin
+COPY --from=builder /usr/local/nltk_data /usr/local/nltk_data
+ENV PYTHONPATH=/app
+USER appuser
+CMD ["python", "-m", "news_collector.main"]
+```
+
+For **api** Dockerfile:
+```dockerfile
+FROM python:3.12-slim AS builder
+WORKDIR /app
+COPY shared/ shared/
+RUN pip install --no-cache-dir ./shared
+COPY services/api/ services/api/
+RUN pip install --no-cache-dir ./services/api
+COPY services/strategy-engine/ services/strategy-engine/
+RUN pip install --no-cache-dir ./services/strategy-engine
+
+FROM python:3.12-slim
+RUN useradd -r -s /bin/false appuser
+WORKDIR /app
+COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
+COPY --from=builder /usr/local/bin /usr/local/bin
+COPY services/strategy-engine/strategies/ /app/strategies/
+ENV PYTHONPATH=/app STRATEGIES_DIR=/app/strategies
+USER appuser
+CMD ["uvicorn", "trading_api.main:app", "--host", "0.0.0.0", "--port", "8000", "--timeout-graceful-shutdown", "30"]
+```
+
+For **order-executor**, **portfolio-manager**, **backtester** — same pattern as data-collector, adjusting the service name and CMD.
+
+- [ ] **Step 4: Verify Docker build works**
+
+Run: `docker compose build --quiet`
+Expected: All images build successfully
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add .dockerignore services/*/Dockerfile
+git commit -m "feat: optimize Dockerfiles with multi-stage builds, non-root user, .dockerignore"
+```
+
+---
+
+### Task 8: Database Index Migration
+
+**Files:**
+- Create: `shared/alembic/versions/003_add_missing_indexes.py`
+
+- [ ] **Step 1: Create migration file**
+
+Create `shared/alembic/versions/003_add_missing_indexes.py`:
+
+```python
+"""Add missing indexes for common query patterns.
+
+Revision ID: 003
+Revises: 002
+"""
+
+from alembic import op
+
+revision = "003"
+down_revision = "002"
+
+
+def upgrade():
+ op.create_index("idx_signals_symbol_created", "signals", ["symbol", "created_at"])
+ op.create_index("idx_orders_symbol_status_created", "orders", ["symbol", "status", "created_at"])
+ op.create_index("idx_trades_order_id", "trades", ["order_id"])
+ op.create_index("idx_trades_symbol_traded", "trades", ["symbol", "traded_at"])
+ op.create_index("idx_portfolio_snapshots_at", "portfolio_snapshots", ["snapshot_at"])
+ op.create_index("idx_symbol_scores_symbol", "symbol_scores", ["symbol"], unique=True)
+
+
+def downgrade():
+ op.drop_index("idx_symbol_scores_symbol", table_name="symbol_scores")
+ op.drop_index("idx_portfolio_snapshots_at", table_name="portfolio_snapshots")
+ op.drop_index("idx_trades_symbol_traded", table_name="trades")
+ op.drop_index("idx_trades_order_id", table_name="trades")
+ op.drop_index("idx_orders_symbol_status_created", table_name="orders")
+ op.drop_index("idx_signals_symbol_created", table_name="signals")
+```
+
+- [ ] **Step 2: Verify migration runs (requires infra)**
+
+Run: `make infra && cd shared && alembic upgrade head`
+Expected: Migration 003 applied successfully
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add shared/alembic/versions/003_add_missing_indexes.py
+git commit -m "feat: add missing DB indexes for signals, orders, trades, snapshots"
+```
+
+---
+
+### Task 9: Docker Compose Resource Limits & Networks
+
+**Files:**
+- Modify: `docker-compose.yml`
+
+- [ ] **Step 1: Add networks and resource limits**
+
+Add to `docker-compose.yml` at bottom:
+
+```yaml
+networks:
+ internal:
+ driver: bridge
+ monitoring:
+ driver: bridge
+```
+
+Add `networks: [internal]` to all application services (redis, postgres, data-collector, strategy-engine, order-executor, portfolio-manager, api, news-collector).
+
+Add `networks: [internal, monitoring]` to prometheus, grafana. Add `networks: [monitoring]` to loki, promtail.
+
+Add to each application service:
+
+```yaml
+ deploy:
+ resources:
+ limits:
+ memory: 512M
+ cpus: '1.0'
+```
+
+For strategy-engine and backtester, use `memory: 1G` instead.
+
+- [ ] **Step 2: Verify compose config is valid**
+
+Run: `docker compose config --quiet`
+Expected: No errors
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add docker-compose.yml
+git commit -m "feat: add resource limits and network isolation to docker-compose"
+```
+
+---
+
+## Phase 3: Service-Level Improvements
+
+### Task 10: Graceful Shutdown for All Services
+
+**Files:**
+- Modify: `services/data-collector/src/data_collector/main.py`
+- Modify: `services/strategy-engine/src/strategy_engine/main.py`
+- Modify: `services/order-executor/src/order_executor/main.py`
+- Modify: `services/portfolio-manager/src/portfolio_manager/main.py`
+- Modify: `services/news-collector/src/news_collector/main.py`
+- Modify: `services/api/src/trading_api/main.py`
+
+- [ ] **Step 1: Create a shared shutdown helper**
+
+Add to `shared/src/shared/shutdown.py`:
+
+```python
+"""Graceful shutdown utilities for services."""
+
+import asyncio
+import logging
+import signal
+
+logger = logging.getLogger(__name__)
+
+
+class GracefulShutdown:
+ """Manages graceful shutdown via SIGTERM/SIGINT signals."""
+
+ def __init__(self) -> None:
+ self._event = asyncio.Event()
+
+ @property
+ def is_shutting_down(self) -> bool:
+ return self._event.is_set()
+
+ async def wait(self) -> None:
+ await self._event.wait()
+
+ def trigger(self) -> None:
+ logger.info("shutdown_signal_received")
+ self._event.set()
+
+ def install_handlers(self) -> None:
+ loop = asyncio.get_running_loop()
+ for sig in (signal.SIGTERM, signal.SIGINT):
+ loop.add_signal_handler(sig, self.trigger)
+```
+
+- [ ] **Step 2: Add shutdown to data-collector main loop**
+
+In `services/data-collector/src/data_collector/main.py`, add at the start of `run()`:
+
+```python
+from shared.shutdown import GracefulShutdown
+
+shutdown = GracefulShutdown()
+shutdown.install_handlers()
+```
+
+Replace the main `while True` loop condition with `while not shutdown.is_shutting_down`.
+
+- [ ] **Step 3: Apply same pattern to all other services**
+
+For each service's `main.py`, add `GracefulShutdown` import, install handlers at start of `run()`, and replace infinite loops with `while not shutdown.is_shutting_down`.
+
+For strategy-engine: also cancel tasks on shutdown.
+For portfolio-manager: also cancel snapshot_loop task.
+For news-collector: also cancel all collector loop tasks.
+
+- [ ] **Step 4: Run tests**
+
+Run: `pytest -v`
+Expected: All tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add shared/src/shared/shutdown.py services/*/src/*/main.py
+git commit -m "feat: add graceful shutdown with SIGTERM/SIGINT handlers to all services"
+```
+
+---
+
+### Task 11: Exception Handling Specialization
+
+**Files:**
+- Modify: All service `main.py` files
+- Modify: `shared/src/shared/db.py`
+
+- [ ] **Step 1: Specialize exceptions in data-collector/main.py**
+
+Replace broad `except Exception` blocks. For example, in the fetch loop:
+
+```python
+# Before:
+except Exception as exc:
+ log.warning("fetch_bar_failed", symbol=symbol, error=str(exc))
+
+# After:
+except (ConnectionError, TimeoutError, aiohttp.ClientError) as exc:
+ log.warning("fetch_bar_network_error", symbol=symbol, error=str(exc))
+except (ValueError, KeyError) as exc:
+ log.warning("fetch_bar_parse_error", symbol=symbol, error=str(exc))
+except Exception as exc:
+ log.error("fetch_bar_unexpected", symbol=symbol, error=str(exc), exc_info=True)
+```
+
+- [ ] **Step 2: Specialize exceptions in strategy-engine, order-executor, portfolio-manager, news-collector**
+
+Apply the same pattern: network errors → warning + retry, parse errors → warning + skip, unexpected → error + exc_info.
+
+- [ ] **Step 3: Specialize exceptions in db.py**
+
+In `shared/src/shared/db.py`, the transaction pattern can distinguish:
+
+```python
+except (asyncpg.PostgresError, sqlalchemy.exc.OperationalError) as exc:
+ await session.rollback()
+ logger.error("db_operation_error", error=str(exc))
+ raise
+except Exception:
+ await session.rollback()
+ raise
+```
+
+- [ ] **Step 4: Run tests**
+
+Run: `pytest -v`
+Expected: All tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/*/src/*/main.py shared/src/shared/db.py
+git commit -m "refactor: specialize exception handling across all services"
+```
+
+---
+
+### Task 12: Fix Stock Selector (Bug Fix + Dedup + Session Reuse)
+
+**Files:**
+- Modify: `services/strategy-engine/src/strategy_engine/stock_selector.py`
+- Modify: `services/strategy-engine/tests/test_stock_selector.py` (if exists, otherwise create)
+
+- [ ] **Step 1: Fix the critical bug on line 217**
+
+In `stock_selector.py` line 217, replace:
+```python
+self._session = anthropic_model
+```
+with:
+```python
+self._model = anthropic_model
+```
+
+- [ ] **Step 2: Extract common JSON parsing function**
+
+Replace the duplicate parsing logic. Add at module level (replacing `_parse_llm_selections`):
+
+```python
+def _extract_json_array(text: str) -> list[dict] | None:
+ """Extract a JSON array from text that may contain markdown code blocks."""
+ code_block = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL)
+ if code_block:
+ raw = code_block.group(1)
+ else:
+ array_match = re.search(r"\[.*\]", text, re.DOTALL)
+ if array_match:
+ raw = array_match.group(0)
+ else:
+ raw = text.strip()
+
+ try:
+ data = json.loads(raw)
+ if isinstance(data, list):
+ return [item for item in data if isinstance(item, dict)]
+ return None
+ except (json.JSONDecodeError, TypeError):
+ return None
+
+
+def _parse_llm_selections(text: str) -> list[SelectedStock]:
+ """Parse LLM response into SelectedStock list."""
+ items = _extract_json_array(text)
+ if items is None:
+ return []
+ selections = []
+ for item in items:
+ try:
+ selections.append(
+ SelectedStock(
+ symbol=item["symbol"],
+ side=OrderSide(item["side"]),
+ conviction=float(item["conviction"]),
+ reason=item.get("reason", ""),
+ key_news=item.get("key_news", []),
+ )
+ )
+ except (KeyError, ValueError) as e:
+ logger.warning("Skipping invalid selection item: %s", e)
+ return selections
+```
+
+Update `LLMCandidateSource._parse_candidates()` to use `_extract_json_array`:
+
+```python
+ def _parse_candidates(self, text: str) -> list[Candidate]:
+ items = _extract_json_array(text)
+ if items is None:
+ return []
+ candidates = []
+ for item in items:
+ try:
+ direction_str = item.get("direction", "BUY")
+ direction = OrderSide(direction_str)
+ except ValueError:
+ direction = None
+ candidates.append(
+ Candidate(
+ symbol=item["symbol"],
+ source="llm",
+ direction=direction,
+ score=float(item.get("score", 0.5)),
+ reason=item.get("reason", ""),
+ )
+ )
+ return candidates
+```
+
+- [ ] **Step 3: Add session reuse to StockSelector**
+
+Add `_http_session` to `StockSelector.__init__()`:
+
+```python
+self._http_session: aiohttp.ClientSession | None = None
+```
+
+Add helper method:
+
+```python
+async def _ensure_session(self) -> aiohttp.ClientSession:
+ if self._http_session is None or self._http_session.closed:
+ self._http_session = aiohttp.ClientSession()
+ return self._http_session
+
+async def close(self) -> None:
+ if self._http_session and not self._http_session.closed:
+ await self._http_session.close()
+```
+
+Replace `async with aiohttp.ClientSession() as session:` in both `LLMCandidateSource.get_candidates()` and `StockSelector._llm_final_select()` with session reuse. For `LLMCandidateSource`, accept an optional session parameter. For `StockSelector._llm_final_select()`, use `self._ensure_session()`.
+
+- [ ] **Step 4: Run tests**
+
+Run: `pytest services/strategy-engine/tests/ -v`
+Expected: All tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/strategy-engine/src/strategy_engine/stock_selector.py
+git commit -m "fix: fix model attr bug, deduplicate LLM parsing, reuse aiohttp sessions"
+```
+
+---
+
+## Phase 4: API Security
+
+### Task 13: Bearer Token Authentication
+
+**Files:**
+- Create: `services/api/src/trading_api/dependencies/__init__.py`
+- Create: `services/api/src/trading_api/dependencies/auth.py`
+- Modify: `services/api/src/trading_api/main.py`
+
+- [ ] **Step 1: Create auth dependency**
+
+Create `services/api/src/trading_api/dependencies/__init__.py` (empty file).
+
+Create `services/api/src/trading_api/dependencies/auth.py`:
+
+```python
+"""Bearer token authentication dependency."""
+
+import logging
+
+from fastapi import Depends, HTTPException, status
+from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
+
+from shared.config import Settings
+
+logger = logging.getLogger(__name__)
+
+_security = HTTPBearer(auto_error=False)
+_settings = Settings()
+
+
+async def verify_token(
+ credentials: HTTPAuthorizationCredentials | None = Depends(_security),
+) -> None:
+ """Verify Bearer token. Skip auth if API_AUTH_TOKEN is not configured."""
+ token = _settings.api_auth_token.get_secret_value()
+ if not token:
+ return # Auth disabled in dev mode
+
+ if credentials is None or credentials.credentials != token:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail="Invalid or missing authentication token",
+ headers={"WWW-Authenticate": "Bearer"},
+ )
+```
+
+- [ ] **Step 2: Apply auth to all API routes**
+
+In `services/api/src/trading_api/main.py`, add auth dependency to routers:
+
+```python
+from trading_api.dependencies.auth import verify_token
+from fastapi import Depends
+
+app.include_router(portfolio_router, prefix="/api/v1/portfolio", dependencies=[Depends(verify_token)])
+app.include_router(orders_router, prefix="/api/v1/orders", dependencies=[Depends(verify_token)])
+app.include_router(strategies_router, prefix="/api/v1/strategies", dependencies=[Depends(verify_token)])
+```
+
+Log a warning on startup if token is empty:
+
+```python
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+ cfg = Settings()
+ if not cfg.api_auth_token.get_secret_value():
+ logger.warning("API_AUTH_TOKEN not set; API authentication is disabled")
+ # ... rest of lifespan
+```
+
+- [ ] **Step 3: Write tests for auth**
+
+Add to `services/api/tests/test_auth.py`:
+
+```python
+"""Tests for API authentication."""
+
+from unittest.mock import patch
+
+import pytest
+from httpx import ASGITransport, AsyncClient
+
+from trading_api.main import app
+
+
+class TestAuth:
+ @patch("trading_api.dependencies.auth._settings")
+ async def test_rejects_missing_token_when_configured(self, mock_settings):
+ from pydantic import SecretStr
+ mock_settings.api_auth_token = SecretStr("test-token")
+ async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac:
+ resp = await ac.get("/api/v1/portfolio/positions")
+ assert resp.status_code == 401
+
+ @patch("trading_api.dependencies.auth._settings")
+ async def test_accepts_valid_token(self, mock_settings):
+ from pydantic import SecretStr
+ mock_settings.api_auth_token = SecretStr("test-token")
+ async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac:
+ resp = await ac.get(
+ "/api/v1/portfolio/positions",
+ headers={"Authorization": "Bearer test-token"},
+ )
+ # May fail with 500 if DB not available, but should NOT be 401
+ assert resp.status_code != 401
+```
+
+- [ ] **Step 4: Run tests**
+
+Run: `pytest services/api/tests/test_auth.py -v`
+Expected: PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/api/src/trading_api/dependencies/ services/api/src/trading_api/main.py services/api/tests/test_auth.py
+git commit -m "feat: add Bearer token authentication to API endpoints"
+```
+
+---
+
+### Task 14: CORS & Rate Limiting
+
+**Files:**
+- Modify: `services/api/src/trading_api/main.py`
+- Modify: `services/api/pyproject.toml`
+
+- [ ] **Step 1: Add slowapi dependency**
+
+Already done in Task 5 (`services/api/pyproject.toml` has `slowapi>=0.1.9,<1`).
+
+- [ ] **Step 2: Add CORS and rate limiting to main.py**
+
+In `services/api/src/trading_api/main.py`:
+
+```python
+from fastapi.middleware.cors import CORSMiddleware
+from slowapi import Limiter, _rate_limit_exceeded_handler
+from slowapi.util import get_remote_address
+from slowapi.errors import RateLimitExceeded
+
+from shared.config import Settings
+
+cfg = Settings()
+
+limiter = Limiter(key_func=get_remote_address)
+app = FastAPI(title="Trading Platform API")
+app.state.limiter = limiter
+app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
+
+app.add_middleware(
+ CORSMiddleware,
+ allow_origins=cfg.cors_origins.split(","),
+ allow_methods=["GET", "POST"],
+ allow_headers=["Authorization", "Content-Type"],
+)
+```
+
+- [ ] **Step 3: Add rate limits to order endpoints**
+
+In `services/api/src/trading_api/routers/orders.py`:
+
+```python
+from slowapi import Limiter
+from slowapi.util import get_remote_address
+
+limiter = Limiter(key_func=get_remote_address)
+
+@router.get("/")
+@limiter.limit("60/minute")
+async def get_orders(request: Request, limit: int = 50):
+ ...
+```
+
+- [ ] **Step 4: Run tests**
+
+Run: `pytest services/api/tests/ -v`
+Expected: PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add services/api/src/trading_api/main.py services/api/src/trading_api/routers/
+git commit -m "feat: add CORS middleware and rate limiting to API"
+```
+
+---
+
+### Task 15: API Input Validation & Response Models
+
+**Files:**
+- Modify: `services/api/src/trading_api/routers/portfolio.py`
+- Modify: `services/api/src/trading_api/routers/orders.py`
+
+- [ ] **Step 1: Add Query validation to portfolio.py**
+
+```python
+from fastapi import Query
+
+@router.get("/snapshots")
+async def get_snapshots(request: Request, days: int = Query(30, ge=1, le=365)):
+ ...
+```
+
+- [ ] **Step 2: Add Query validation to orders.py**
+
+```python
+from fastapi import Query
+
+@router.get("/")
+async def get_orders(request: Request, limit: int = Query(50, ge=1, le=1000)):
+ ...
+
+@router.get("/signals")
+async def get_signals(request: Request, limit: int = Query(50, ge=1, le=1000)):
+ ...
+```
+
+- [ ] **Step 3: Run tests**
+
+Run: `pytest services/api/tests/ -v`
+Expected: PASS
+
+- [ ] **Step 4: Commit**
+
+```bash
+git add services/api/src/trading_api/routers/
+git commit -m "feat: add input validation with Query bounds to API endpoints"
+```
+
+---
+
+## Phase 5: Operational Maturity
+
+### Task 16: Enhanced Ruff Configuration
+
+**Files:**
+- Modify: `pyproject.toml:12-14`
+
+- [ ] **Step 1: Update ruff config in pyproject.toml**
+
+Replace the ruff section in root `pyproject.toml`:
+
+```toml
+[tool.ruff]
+target-version = "py312"
+line-length = 100
+
+[tool.ruff.lint]
+select = ["E", "W", "F", "I", "B", "UP", "ASYNC", "PERF", "C4", "RUF"]
+ignore = ["E501"]
+
+[tool.ruff.lint.per-file-ignores]
+"tests/*" = ["F841"]
+"*/tests/*" = ["F841"]
+
+[tool.ruff.lint.isort]
+known-first-party = ["shared"]
+```
+
+- [ ] **Step 2: Auto-fix existing violations**
+
+Run: `ruff check --fix . && ruff format .`
+Expected: Fixes applied
+
+- [ ] **Step 3: Verify no remaining errors**
+
+Run: `ruff check . && ruff format --check .`
+Expected: No errors
+
+- [ ] **Step 4: Run tests to verify no regressions**
+
+Run: `pytest -v`
+Expected: All tests PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add pyproject.toml
+git commit -m "chore: enhance ruff lint rules with ASYNC, bugbear, isort, pyupgrade"
+```
+
+Then commit auto-fixes separately:
+
+```bash
+git add -A
+git commit -m "style: auto-fix lint violations from enhanced ruff rules"
+```
+
+---
+
+### Task 17: GitHub Actions CI Pipeline
+
+**Files:**
+- Create: `.github/workflows/ci.yml`
+
+- [ ] **Step 1: Create CI workflow**
+
+Create `.github/workflows/ci.yml`:
+
+```yaml
+name: CI
+
+on:
+ push:
+ branches: [master]
+ pull_request:
+ branches: [master]
+
+jobs:
+ lint:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+ - uses: actions/setup-python@v5
+ with:
+ python-version: "3.12"
+ - run: pip install ruff
+ - run: ruff check .
+ - run: ruff format --check .
+
+ test:
+ runs-on: ubuntu-latest
+ services:
+ redis:
+ image: redis:7-alpine
+ ports: [6379:6379]
+ options: >-
+ --health-cmd "redis-cli ping"
+ --health-interval 5s
+ --health-timeout 3s
+ --health-retries 5
+ postgres:
+ image: postgres:16-alpine
+ env:
+ POSTGRES_USER: trading
+ POSTGRES_PASSWORD: trading
+ POSTGRES_DB: trading
+ ports: [5432:5432]
+ options: >-
+ --health-cmd pg_isready
+ --health-interval 5s
+ --health-timeout 3s
+ --health-retries 5
+ steps:
+ - uses: actions/checkout@v4
+ - uses: actions/setup-python@v5
+ with:
+ python-version: "3.12"
+ - run: |
+ pip install -e shared/[dev]
+ pip install -e services/strategy-engine/[dev]
+ pip install -e services/data-collector/[dev]
+ pip install -e services/order-executor/[dev]
+ pip install -e services/portfolio-manager/[dev]
+ pip install -e services/news-collector/[dev]
+ pip install -e services/api/[dev]
+ pip install -e services/backtester/[dev]
+ pip install pytest-cov
+ - run: pytest -v --cov=shared/src --cov=services --cov-report=xml --cov-report=term-missing
+ env:
+ DATABASE_URL: postgresql+asyncpg://trading:trading@localhost:5432/trading
+ REDIS_URL: redis://localhost:6379
+ - uses: actions/upload-artifact@v4
+ with:
+ name: coverage-report
+ path: coverage.xml
+
+ docker:
+ runs-on: ubuntu-latest
+ needs: [lint, test]
+ if: github.ref == 'refs/heads/master'
+ steps:
+ - uses: actions/checkout@v4
+ - run: docker compose build --quiet
+```
+
+- [ ] **Step 2: Commit**
+
+```bash
+mkdir -p .github/workflows
+git add .github/workflows/ci.yml
+git commit -m "feat: add GitHub Actions CI pipeline with lint, test, docker build"
+```
+
+---
+
+### Task 18: Prometheus Alerting Rules
+
+**Files:**
+- Create: `monitoring/prometheus/alert_rules.yml`
+- Modify: `monitoring/prometheus.yml`
+
+- [ ] **Step 1: Create alert rules**
+
+Create `monitoring/prometheus/alert_rules.yml`:
+
+```yaml
+groups:
+ - name: trading-platform
+ rules:
+ - alert: ServiceDown
+ expr: up == 0
+ for: 1m
+ labels:
+ severity: critical
+ annotations:
+ summary: "Service {{ $labels.job }} is down"
+ description: "{{ $labels.instance }} has been unreachable for 1 minute."
+
+ - alert: HighErrorRate
+ expr: rate(errors_total[5m]) > 10
+ for: 2m
+ labels:
+ severity: warning
+ annotations:
+ summary: "High error rate on {{ $labels.job }}"
+ description: "Error rate is {{ $value }} errors/sec over 5 minutes."
+
+ - alert: HighProcessingLatency
+ expr: histogram_quantile(0.95, rate(processing_seconds_bucket[5m])) > 5
+ for: 5m
+ labels:
+ severity: warning
+ annotations:
+ summary: "High p95 latency on {{ $labels.job }}"
+ description: "95th percentile processing time is {{ $value }}s."
+```
+
+- [ ] **Step 2: Reference alert rules in prometheus.yml**
+
+In `monitoring/prometheus.yml`, add after `global:`:
+
+```yaml
+rule_files:
+ - "/etc/prometheus/alert_rules.yml"
+```
+
+Update `docker-compose.yml` prometheus service to mount the file:
+
+```yaml
+ prometheus:
+ volumes:
+ - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
+ - ./monitoring/prometheus/alert_rules.yml:/etc/prometheus/alert_rules.yml
+```
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add monitoring/prometheus/alert_rules.yml monitoring/prometheus.yml docker-compose.yml
+git commit -m "feat: add Prometheus alerting rules for service health, errors, latency"
+```
+
+---
+
+### Task 19: Code Coverage Configuration
+
+**Files:**
+- Modify: `pyproject.toml`
+
+- [ ] **Step 1: Add pytest-cov config**
+
+Add to `pyproject.toml`:
+
+```toml
+[tool.coverage.run]
+branch = true
+source = ["shared/src", "services"]
+omit = ["*/tests/*", "*/alembic/*"]
+
+[tool.coverage.report]
+fail_under = 60
+show_missing = true
+exclude_lines = [
+ "pragma: no cover",
+ "if __name__",
+ "if TYPE_CHECKING",
+]
+```
+
+Update pytest addopts:
+```toml
+[tool.pytest.ini_options]
+asyncio_mode = "auto"
+testpaths = ["shared/tests", "services", "cli/tests", "tests"]
+addopts = "--import-mode=importlib"
+```
+
+Note: `--cov` flags are passed explicitly in CI, not in addopts (to avoid slowing local dev).
+
+- [ ] **Step 2: Verify coverage works**
+
+Run: `pip install pytest-cov && pytest --cov=shared/src --cov-report=term-missing`
+Expected: Coverage report printed, no errors
+
+- [ ] **Step 3: Commit**
+
+```bash
+git add pyproject.toml
+git commit -m "chore: add pytest-cov configuration with 60% minimum coverage threshold"
+```
+
+---
+
+## Summary
+
+| Phase | Tasks | Estimated Commits |
+|-------|-------|-------------------|
+| 1: Shared Library | Tasks 1-5 | 5 commits |
+| 2: Infrastructure | Tasks 6-9 | 4 commits |
+| 3: Service Fixes | Tasks 10-12 | 3 commits |
+| 4: API Security | Tasks 13-15 | 3 commits |
+| 5: Operations | Tasks 16-19 | 5 commits |
+| **Total** | **19 tasks** | **~20 commits** |
diff --git a/docs/superpowers/specs/2026-04-02-news-driven-stock-selector-design.md b/docs/superpowers/specs/2026-04-02-news-driven-stock-selector-design.md
new file mode 100644
index 0000000..d439154
--- /dev/null
+++ b/docs/superpowers/specs/2026-04-02-news-driven-stock-selector-design.md
@@ -0,0 +1,418 @@
+# News-Driven Stock Selector Design
+
+**Date:** 2026-04-02
+**Goal:** Upgrade the MOC (Market on Close) strategy from fixed symbol lists to dynamic, news-driven stock selection. The system collects news/sentiment data continuously, then selects 2-3 optimal stocks daily before market close.
+
+---
+
+## Architecture Overview
+
+```
+[Continuous Collection] [Pre-Close Decision]
+Finnhub News ─┐
+RSS Feeds ─┤
+SEC EDGAR ─┤
+Truth Social ─┼→ DB (news_items) → Sentiment Aggregator → symbol_scores
+Reddit ─┤ + Redis "news" (every 15 min) market_sentiment
+Fear & Greed ─┤
+FOMC/Fed ─┘
+
+ 15:00 ET ─→ Candidate Pool (sentiment top + LLM picks)
+ 15:15 ET ─→ Technical Filter (RSI, EMA, volume)
+ 15:30 ET ─→ LLM Final Selection (2-3 stocks) → Telegram
+ 15:50 ET ─→ MOC Buy Execution
+ 09:35 ET ─→ Next-day Sell (existing MOC logic)
+```
+
+## 1. News Collector Service
+
+New service: `services/news-collector/`
+
+### Structure
+
+```
+services/news-collector/
+├── Dockerfile
+├── pyproject.toml
+├── src/news_collector/
+│ ├── __init__.py
+│ ├── main.py # Scheduler: runs each collector on its interval
+│ ├── config.py
+│ └── collectors/
+│ ├── __init__.py
+│ ├── base.py # BaseCollector ABC
+│ ├── finnhub.py # Finnhub market news (free, 60 req/min)
+│ ├── rss.py # Yahoo Finance, Google News, MarketWatch RSS
+│ ├── sec_edgar.py # SEC EDGAR 8-K/10-Q filings
+│ ├── truth_social.py # Truth Social scraping (Trump posts)
+│ ├── reddit.py # Reddit (r/wallstreetbets, r/stocks)
+│ ├── fear_greed.py # CNN Fear & Greed Index scraping
+│ └── fed.py # FOMC statements, Fed announcements
+└── tests/
+```
+
+### BaseCollector Interface
+
+```python
+class BaseCollector(ABC):
+ name: str
+ poll_interval: int # seconds
+
+ @abstractmethod
+ async def collect(self) -> list[NewsItem]:
+ """Collect and return list of NewsItem."""
+
+ @abstractmethod
+ async def is_available(self) -> bool:
+ """Check if this source is accessible (API key present, endpoint reachable)."""
+```
+
+### Poll Intervals
+
+| Collector | Interval | Notes |
+|-----------|----------|-------|
+| Finnhub | 5 min | Free tier: 60 calls/min |
+| RSS (Yahoo/Google/MarketWatch) | 10 min | Headlines only |
+| SEC EDGAR | 30 min | Focus on 8-K filings |
+| Truth Social | 15 min | Scraping |
+| Reddit | 15 min | Hot posts from relevant subs |
+| Fear & Greed | 1 hour | Updates once daily but check periodically |
+| FOMC/Fed | 1 hour | Infrequent events |
+
+### Provider Abstraction (for paid upgrade path)
+
+```python
+# config.yaml
+collectors:
+ news:
+ provider: "finnhub" # swap to "benzinga" for paid
+ api_key: ${FINNHUB_API_KEY}
+ social:
+ provider: "reddit" # swap to "stocktwits_pro" etc.
+ policy:
+ provider: "truth_social" # swap to "twitter_api" etc.
+
+# Factory
+COLLECTOR_REGISTRY = {
+ "finnhub": FinnhubCollector,
+ "rss": RSSCollector,
+ "benzinga": BenzingaCollector, # added later
+}
+```
+
+## 2. Shared Models (additions to shared/)
+
+### NewsItem (shared/models.py)
+
+```python
+class NewsCategory(str, Enum):
+ POLICY = "policy"
+ EARNINGS = "earnings"
+ MACRO = "macro"
+ SOCIAL = "social"
+ FILING = "filing"
+ FED = "fed"
+
+class NewsItem(BaseModel):
+ id: str = Field(default_factory=lambda: str(uuid.uuid4()))
+ source: str # "finnhub", "rss", "sec_edgar", etc.
+ headline: str
+ summary: str | None = None
+ url: str | None = None
+ published_at: datetime
+ symbols: list[str] = [] # Related tickers (if identifiable)
+ sentiment: float # -1.0 to 1.0 (first-pass analysis at collection)
+ category: NewsCategory
+ raw_data: dict = {}
+ created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
+```
+
+### SymbolScore (shared/sentiment_models.py — new file)
+
+```python
+class SymbolScore(BaseModel):
+ symbol: str
+ news_score: float # -1.0 to 1.0, weighted avg of news sentiment
+ news_count: int # Number of news items in last 24h
+ social_score: float # Reddit/social sentiment
+ policy_score: float # Policy-related impact
+ filing_score: float # SEC filing impact
+ composite: float # Weighted final score
+ updated_at: datetime
+
+class MarketSentiment(BaseModel):
+ fear_greed: int # 0-100
+ fear_greed_label: str # "Extreme Fear", "Fear", "Neutral", "Greed", "Extreme Greed"
+ vix: float | None = None
+ fed_stance: str # "hawkish", "neutral", "dovish"
+ market_regime: str # "risk_on", "neutral", "risk_off"
+ updated_at: datetime
+
+class SelectedStock(BaseModel):
+ symbol: str
+ side: OrderSide # BUY or SELL
+ conviction: float # 0.0 to 1.0
+ reason: str # Selection rationale
+ key_news: list[str] # Key news headlines
+
+class Candidate(BaseModel):
+ symbol: str
+ source: str # "sentiment" or "llm"
+ direction: OrderSide | None = None # Suggested direction (if known)
+ score: float # Relevance/priority score
+ reason: str # Why this candidate was selected
+```
+
+## 3. Sentiment Analysis Pipeline
+
+### Location
+
+Refactor existing `shared/src/shared/sentiment.py`.
+
+### Two-Stage Analysis
+
+**Stage 1: Per-news sentiment (at collection time)**
+- VADER (nltk.sentiment, free) for English headlines
+- Keyword rule engine for domain-specific terms (e.g., "tariff" → negative for importers, positive for domestic producers)
+- Score stored in `NewsItem.sentiment`
+
+**Stage 2: Per-symbol aggregation (every 15 minutes)**
+
+```
+composite = (
+ news_score * 0.3 +
+ social_score * 0.2 +
+ policy_score * 0.3 +
+ filing_score * 0.2
+) * freshness_decay
+```
+
+Freshness decay:
+- < 1 hour: 1.0
+- 1-6 hours: 0.7
+- 6-24 hours: 0.3
+- > 24 hours: excluded
+
+Policy score weighted high because US stock market is heavily influenced by policy events (tariffs, regulation, subsidies).
+
+### Market-Level Gating
+
+`MarketSentiment.market_regime` determination:
+- `risk_off`: Fear & Greed < 20 OR VIX > 30 → **block all trades**
+- `risk_on`: Fear & Greed > 60 AND VIX < 20
+- `neutral`: everything else
+
+This extends the existing `sentiment.py` `should_block()` logic.
+
+## 4. Stock Selector Engine
+
+### Location
+
+`services/strategy-engine/src/strategy_engine/stock_selector.py`
+
+### Three-Stage Selection Process
+
+**Stage 1: Candidate Pool (15:00 ET)**
+
+Two candidate sources, results merged (deduplicated):
+
+```python
+class CandidateSource(ABC):
+ @abstractmethod
+ async def get_candidates(self) -> list[Candidate]
+
+class SentimentCandidateSource(CandidateSource):
+ """Top N symbols by composite SymbolScore from DB."""
+
+class LLMCandidateSource(CandidateSource):
+ """Send today's top news summary to Claude, get related symbols + direction."""
+```
+
+- SentimentCandidateSource: top 20 by composite score
+- LLMCandidateSource: Claude analyzes today's major news and recommends affected symbols
+- Merged pool: typically 20-30 candidates
+
+**Stage 2: Technical Filter (15:15 ET)**
+
+Apply existing MOC screening criteria to candidates:
+- Fetch recent price data from Alpaca for all candidates
+- RSI 30-60
+- Price > 20-period EMA
+- Volume > average
+- Bullish candle pattern
+- Result: typically 5-10 survivors
+
+**Stage 3: LLM Final Selection (15:30 ET)**
+
+Send to Claude:
+- Filtered candidate list with technical indicators
+- Per-symbol sentiment scores and top news headlines
+- Market sentiment (Fear & Greed, VIX, Fed stance)
+- Prompt: "Select 2-3 stocks for MOC trading with rationale"
+
+Response parsed into `list[SelectedStock]`.
+
+### Integration with MOC Strategy
+
+Current: MOC strategy receives candles for fixed symbols and decides internally.
+
+New flow:
+1. `StockSelector` publishes `SelectedStock` list to Redis stream `selected_stocks` at 15:30 ET
+2. MOC strategy reads `selected_stocks` to get today's targets
+3. MOC still applies its own technical checks at 15:50-16:00 as a safety net
+4. If a selected stock fails the final technical check, it's skipped (no forced trades)
+
+## 5. Database Schema
+
+Four new tables via Alembic migration:
+
+```sql
+CREATE TABLE news_items (
+ id UUID PRIMARY KEY,
+ source VARCHAR(50) NOT NULL,
+ headline TEXT NOT NULL,
+ summary TEXT,
+ url TEXT,
+ published_at TIMESTAMPTZ NOT NULL,
+ symbols TEXT[],
+ sentiment FLOAT NOT NULL,
+ category VARCHAR(50) NOT NULL,
+ raw_data JSONB DEFAULT '{}',
+ created_at TIMESTAMPTZ DEFAULT NOW()
+);
+CREATE INDEX idx_news_items_published ON news_items(published_at);
+CREATE INDEX idx_news_items_symbols ON news_items USING GIN(symbols);
+
+CREATE TABLE symbol_scores (
+ id UUID PRIMARY KEY,
+ symbol VARCHAR(10) NOT NULL,
+ news_score FLOAT NOT NULL DEFAULT 0,
+ news_count INT NOT NULL DEFAULT 0,
+ social_score FLOAT NOT NULL DEFAULT 0,
+ policy_score FLOAT NOT NULL DEFAULT 0,
+ filing_score FLOAT NOT NULL DEFAULT 0,
+ composite FLOAT NOT NULL DEFAULT 0,
+ updated_at TIMESTAMPTZ NOT NULL
+);
+CREATE UNIQUE INDEX idx_symbol_scores_symbol ON symbol_scores(symbol);
+
+CREATE TABLE market_sentiment (
+ id UUID PRIMARY KEY,
+ fear_greed INT NOT NULL,
+ fear_greed_label VARCHAR(30) NOT NULL,
+ vix FLOAT,
+ fed_stance VARCHAR(20) NOT NULL DEFAULT 'neutral',
+ market_regime VARCHAR(20) NOT NULL DEFAULT 'neutral',
+ updated_at TIMESTAMPTZ NOT NULL
+);
+
+CREATE TABLE stock_selections (
+ id UUID PRIMARY KEY,
+ trade_date DATE NOT NULL,
+ symbol VARCHAR(10) NOT NULL,
+ side VARCHAR(4) NOT NULL,
+ conviction FLOAT NOT NULL,
+ reason TEXT NOT NULL,
+ key_news JSONB DEFAULT '[]',
+ sentiment_snapshot JSONB DEFAULT '{}',
+ created_at TIMESTAMPTZ DEFAULT NOW()
+);
+CREATE INDEX idx_stock_selections_date ON stock_selections(trade_date);
+```
+
+`stock_selections` stores an audit trail: why each stock was selected, enabling post-hoc analysis of selection quality.
+
+## 6. Redis Streams
+
+| Stream | Producer | Consumer | Payload |
+|--------|----------|----------|---------|
+| `news` | news-collector | strategy-engine (sentiment aggregator) | NewsItem |
+| `selected_stocks` | stock-selector | MOC strategy | SelectedStock |
+
+Existing streams (`candles`, `signals`, `orders`) unchanged.
+
+## 7. Docker Compose Addition
+
+```yaml
+news-collector:
+ build:
+ context: .
+ dockerfile: services/news-collector/Dockerfile
+ env_file: .env
+ ports:
+ - "8084:8084"
+ depends_on:
+ redis: { condition: service_healthy }
+ postgres: { condition: service_healthy }
+ healthcheck:
+ test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8084/health')"]
+ interval: 10s
+ timeout: 5s
+ retries: 3
+ restart: unless-stopped
+```
+
+## 8. Environment Variables
+
+```bash
+# News Collector
+FINNHUB_API_KEY= # Free key from finnhub.io
+NEWS_POLL_INTERVAL=300 # Default 5 min (overrides per-collector defaults)
+SENTIMENT_AGGREGATE_INTERVAL=900 # 15 min
+
+# Stock Selector
+SELECTOR_CANDIDATES_TIME=15:00 # ET, candidate pool generation
+SELECTOR_FILTER_TIME=15:15 # ET, technical filter
+SELECTOR_FINAL_TIME=15:30 # ET, LLM final pick
+SELECTOR_MAX_PICKS=3
+
+# LLM (for stock selector + screener)
+ANTHROPIC_API_KEY=
+ANTHROPIC_MODEL=claude-sonnet-4-20250514
+```
+
+## 9. Telegram Notifications
+
+Extend existing `shared/notifier.py` with:
+
+```python
+async def send_stock_selection(self, selections: list[SelectedStock], market: MarketSentiment):
+ """
+ 📊 오늘의 종목 선정 (2/3)
+
+ 1. NVDA 🟢 BUY (확신도: 0.85)
+ 근거: 트럼프 반도체 보조금 확대 발표, RSI 42
+ 핵심뉴스: "Trump signs CHIPS Act expansion..."
+
+ 2. XOM 🟢 BUY (확신도: 0.72)
+ 근거: 유가 상승 + 실적 서프라이즈, 볼륨 급증
+
+ 시장심리: Fear & Greed 55 (Neutral) | VIX 18.2
+ """
+```
+
+## 10. Testing Strategy
+
+**Unit tests:**
+- Each collector: mock HTTP responses → verify NewsItem parsing
+- Sentiment analysis: verify VADER + keyword scoring
+- Aggregator: mock news data → verify SymbolScore calculation and freshness decay
+- Stock selector: mock scores → verify candidate/filter/selection pipeline
+- LLM calls: mock Claude response → verify SelectedStock parsing
+
+**Integration tests:**
+- Full pipeline: news collection → DB → aggregation → selection
+- Market gating: verify `risk_off` blocks all trades
+- MOC integration: verify selected stocks flow to MOC strategy
+
+**Post-hoc analysis (future):**
+- Use `stock_selections` audit trail to measure selection accuracy
+- Historical news data replay for backtesting requires paid data (deferred)
+
+## 11. Out of Scope (Future)
+
+- Paid API integration (designed for, not implemented)
+- Historical news backtesting
+- WebSocket real-time news streaming
+- Multi-language sentiment analysis
+- Options/derivatives signals
diff --git a/docs/superpowers/specs/2026-04-02-platform-upgrade-design.md b/docs/superpowers/specs/2026-04-02-platform-upgrade-design.md
new file mode 100644
index 0000000..9c84e10
--- /dev/null
+++ b/docs/superpowers/specs/2026-04-02-platform-upgrade-design.md
@@ -0,0 +1,257 @@
+# Platform Upgrade Design Spec
+
+**Date**: 2026-04-02
+**Approach**: Bottom-Up (shared library → infra → services → API security → operations)
+
+---
+
+## Phase 1: Shared Library Hardening
+
+### 1-1. Resilience Module (`shared/src/shared/resilience.py`)
+Currently empty. Implement:
+- **`retry_async()`** — tenacity-based exponential backoff + jitter decorator. Configurable max retries (default 3), base delay (1s), max delay (30s).
+- **`CircuitBreaker`** — Tracks consecutive failures. Opens after N failures (default 5), stays open for configurable cooldown (default 60s), transitions to half-open to test recovery.
+- **`timeout()`** — asyncio-based timeout wrapper. Raises `TimeoutError` after configurable duration.
+- All decorators composable: `@retry_async() @circuit_breaker() async def call_api():`
+
+### 1-2. DB Connection Pooling (`shared/src/shared/db.py`)
+Add to `create_async_engine()`:
+- `pool_size=20` (configurable via `DB_POOL_SIZE`)
+- `max_overflow=10` (configurable via `DB_MAX_OVERFLOW`)
+- `pool_pre_ping=True` (verify connections before use)
+- `pool_recycle=3600` (recycle stale connections)
+Add corresponding fields to `Settings`.
+
+### 1-3. Redis Resilience (`shared/src/shared/broker.py`)
+- Add to `redis.asyncio.from_url()`: `socket_keepalive=True`, `health_check_interval=30`, `retry_on_timeout=True`
+- Wrap `publish()`, `read_group()`, `ensure_group()` with `@retry_async()` from resilience module
+- Add `reconnect()` method for connection loss recovery
+
+### 1-4. Config Validation (`shared/src/shared/config.py`)
+- Add `field_validator` for business logic: `risk_max_position_size > 0`, `health_port` in 1024-65535, `log_level` in valid set
+- Change secret fields to `SecretStr`: `alpaca_api_key`, `alpaca_api_secret`, `database_url`, `redis_url`, `telegram_bot_token`, `anthropic_api_key`, `finnhub_api_key`
+- Update all consumers to call `.get_secret_value()` where needed
+
+### 1-5. Dependency Pinning
+All `pyproject.toml` files: add upper bounds.
+Examples:
+- `pydantic>=2.8,<3`
+- `redis>=5.0,<6`
+- `sqlalchemy[asyncio]>=2.0,<3`
+- `numpy>=1.26,<3`
+- `pandas>=2.1,<3`
+- `anthropic>=0.40,<1`
+Run `uv lock` to generate lock file.
+
+---
+
+## Phase 2: Infrastructure Hardening
+
+### 2-1. Docker Secrets & Environment
+- Remove hardcoded `POSTGRES_USER: trading` / `POSTGRES_PASSWORD: trading` from `docker-compose.yml`
+- Reference via `${POSTGRES_USER}` / `${POSTGRES_PASSWORD}` from `.env`
+- Add comments in `.env.example` marking secret vs config variables
+
+### 2-2. Dockerfile Optimization (all 7 services)
+Pattern for each Dockerfile:
+```dockerfile
+# Stage 1: builder
+FROM python:3.12-slim AS builder
+WORKDIR /app
+COPY shared/pyproject.toml shared/setup.cfg shared/
+COPY shared/src/ shared/src/
+RUN pip install --no-cache-dir ./shared
+COPY services/<name>/pyproject.toml services/<name>/
+COPY services/<name>/src/ services/<name>/src/
+RUN pip install --no-cache-dir ./services/<name>
+
+# Stage 2: runtime
+FROM python:3.12-slim
+RUN useradd -r -s /bin/false appuser
+WORKDIR /app
+COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
+COPY --from=builder /usr/local/bin /usr/local/bin
+USER appuser
+CMD ["python", "-m", "<module>.main"]
+```
+
+Create root `.dockerignore`:
+```
+__pycache__
+*.pyc
+.git
+.venv
+.env
+tests/
+docs/
+*.md
+.ruff_cache
+```
+
+### 2-3. Database Index Migration (`003_add_missing_indexes.py`)
+New Alembic migration adding:
+- `idx_signals_symbol_created` on `signals(symbol, created_at)`
+- `idx_orders_symbol_status_created` on `orders(symbol, status, created_at)`
+- `idx_trades_order_id` on `trades(order_id)`
+- `idx_trades_symbol_traded` on `trades(symbol, traded_at)`
+- `idx_portfolio_snapshots_at` on `portfolio_snapshots(snapshot_at)`
+- `idx_symbol_scores_symbol` unique on `symbol_scores(symbol)`
+
+### 2-4. Docker Compose Resource Limits
+Add to each service:
+```yaml
+deploy:
+ resources:
+ limits:
+ memory: 512M
+ cpus: '1.0'
+```
+Strategy-engine and backtester get `memory: 1G` (pandas/numpy usage).
+
+Add explicit networks:
+```yaml
+networks:
+ internal:
+ driver: bridge
+ monitoring:
+ driver: bridge
+```
+
+---
+
+## Phase 3: Service-Level Improvements
+
+### 3-1. Graceful Shutdown (all services)
+Add to each service's `main()`:
+```python
+shutdown_event = asyncio.Event()
+
+def _signal_handler():
+ log.info("shutdown_signal_received")
+ shutdown_event.set()
+
+loop = asyncio.get_event_loop()
+loop.add_signal_handler(signal.SIGTERM, _signal_handler)
+loop.add_signal_handler(signal.SIGINT, _signal_handler)
+```
+Main loops check `shutdown_event.is_set()` to exit gracefully.
+API service: add `--timeout-graceful-shutdown 30` to uvicorn CMD.
+
+### 3-2. Exception Specialization (all services)
+Replace broad `except Exception` with layered handling:
+- `ConnectionError`, `TimeoutError` → retry via resilience module
+- `ValueError`, `KeyError` → log warning, skip item, continue
+- `Exception` → top-level only, `exc_info=True` for full traceback + Telegram alert
+
+Target: reduce 63 broad catches to ~10 top-level safety nets.
+
+### 3-3. LLM Parsing Deduplication (`stock_selector.py`)
+Extract `_extract_json_from_text(text: str) -> list | dict | None`:
+- Tries ```` ```json ``` ```` code block extraction
+- Falls back to `re.search(r"\[.*\]", text, re.DOTALL)`
+- Falls back to raw `json.loads(text.strip())`
+Replace 3 duplicate parsing blocks with single call.
+
+### 3-4. aiohttp Session Reuse (`stock_selector.py`)
+- Add `_session: aiohttp.ClientSession | None = None` to `StockSelector`
+- Lazy-init in `_ensure_session()`, close in `close()`
+- Replace all `async with aiohttp.ClientSession()` with `self._session`
+
+---
+
+## Phase 4: API Security
+
+### 4-1. Bearer Token Authentication
+- Add `api_auth_token: SecretStr = ""` to `Settings`
+- Create `dependencies/auth.py` with `verify_token()` dependency
+- Apply to all `/api/v1/*` routes via router-level `dependencies=[Depends(verify_token)]`
+- If token is empty string → skip auth (dev mode), log warning on startup
+
+### 4-2. CORS Configuration
+```python
+app.add_middleware(
+ CORSMiddleware,
+ allow_origins=settings.cors_origins.split(","), # default: "http://localhost:3000"
+ allow_methods=["GET", "POST"],
+ allow_headers=["Authorization", "Content-Type"],
+)
+```
+Add `cors_origins: str = "http://localhost:3000"` to Settings.
+
+### 4-3. Rate Limiting
+- Add `slowapi` dependency
+- Global default: 60 req/min per IP
+- Order-related endpoints: 10 req/min per IP
+- Return `429 Too Many Requests` with `Retry-After` header
+
+### 4-4. Input Validation
+- All `limit` params: `Query(default=50, ge=1, le=1000)`
+- All `days` params: `Query(default=30, ge=1, le=365)`
+- Add Pydantic `response_model` to all endpoints (enables auto OpenAPI docs)
+- Add `symbol` param validation: uppercase, 1-5 chars, alphanumeric
+
+---
+
+## Phase 5: Operational Maturity
+
+### 5-1. GitHub Actions CI/CD
+File: `.github/workflows/ci.yml`
+
+**PR trigger** (`pull_request`):
+1. Install deps (`uv sync`)
+2. Ruff lint + format check
+3. pytest with coverage (`--cov --cov-report=xml`)
+4. Upload coverage to PR comment
+
+**Main push** (`push: branches: [master]`):
+1. Same lint + test
+2. `docker compose build`
+3. (Future: push to registry)
+
+### 5-2. Ruff Rules Enhancement
+```toml
+[tool.ruff.lint]
+select = ["E", "W", "F", "I", "B", "UP", "ASYNC", "PERF", "C4", "RUF"]
+ignore = ["E501"]
+
+[tool.ruff.lint.per-file-ignores]
+"tests/*" = ["F841"]
+
+[tool.ruff.lint.isort]
+known-first-party = ["shared"]
+```
+Run `ruff check --fix .` and `ruff format .` to fix existing violations, commit separately.
+
+### 5-3. Prometheus Alerting
+File: `monitoring/prometheus/alert_rules.yml`
+Rules:
+- `ServiceDown`: `service_up == 0` for 1 min → critical
+- `HighErrorRate`: `rate(errors_total[5m]) > 10` → warning
+- `HighLatency`: `histogram_quantile(0.95, processing_seconds) > 5` → warning
+
+Add Alertmanager config with Telegram webhook (reuse existing bot token).
+Reference alert rules in `monitoring/prometheus.yml`.
+
+### 5-4. Code Coverage
+Add to root `pyproject.toml`:
+```toml
+[tool.pytest.ini_options]
+addopts = "--cov=shared/src --cov=services --cov-report=term-missing"
+
+[tool.coverage.run]
+branch = true
+omit = ["tests/*", "*/alembic/*"]
+
+[tool.coverage.report]
+fail_under = 70
+```
+Add `pytest-cov` to dev dependencies.
+
+---
+
+## Out of Scope
+- Kubernetes/Helm charts (premature — Docker Compose sufficient for current scale)
+- External secrets manager (Vault, AWS SM — overkill for single-machine deployment)
+- OpenTelemetry distributed tracing (add when debugging cross-service issues)
+- API versioning beyond `/api/v1/` prefix
+- Data retention/partitioning (address when data volume becomes an issue)