# News-Driven Stock Selector Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Replace the MOC strategy's fixed symbol list with a dynamic, news-driven stock selection system that continuously collects news/sentiment data and selects 2-3 optimal stocks daily before market close.
**Architecture:** A new `news-collector` service runs 7 data source collectors on individual poll intervals, storing `NewsItem` records in PostgreSQL and publishing to Redis. A sentiment aggregator computes per-symbol composite scores every 15 minutes. Before market close, a 3-stage stock selector (sentiment candidates → technical filter → LLM final pick) chooses 2-3 stocks and feeds them to the existing MOC strategy.
**Tech Stack:** Python 3.12+, asyncio, aiohttp, Pydantic, SQLAlchemy 2.0 async, Redis Streams, VADER (nltk), feedparser (RSS), Anthropic SDK (Claude API), Alembic
---
## Phase 1: Shared Foundation (Models, DB, Events)
### Task 1: Add NewsItem and sentiment models to shared
**Files:**
- Modify: `shared/src/shared/models.py`
- Create: `shared/src/shared/sentiment_models.py`
- Create: `shared/tests/test_sentiment_models.py`
- [ ] **Step 1: Write tests for new models**
Create `shared/tests/test_sentiment_models.py`:
```python
"""Tests for news and sentiment models."""
import pytest
from datetime import datetime, timezone
from shared.models import NewsCategory, NewsItem, OrderSide
from shared.sentiment_models import SymbolScore, MarketSentiment, SelectedStock, Candidate
def test_news_item_defaults():
item = NewsItem(
source="finnhub",
headline="Test headline",
published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
sentiment=0.5,
category=NewsCategory.MACRO,
)
assert item.id # UUID generated
assert item.symbols == []
assert item.summary is None
assert item.raw_data == {}
assert item.created_at is not None
def test_news_item_with_symbols():
item = NewsItem(
source="rss",
headline="AAPL earnings beat",
published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
sentiment=0.8,
category=NewsCategory.EARNINGS,
symbols=["AAPL"],
)
assert item.symbols == ["AAPL"]
assert item.category == NewsCategory.EARNINGS
def test_news_category_values():
assert NewsCategory.POLICY == "policy"
assert NewsCategory.EARNINGS == "earnings"
assert NewsCategory.MACRO == "macro"
assert NewsCategory.SOCIAL == "social"
assert NewsCategory.FILING == "filing"
assert NewsCategory.FED == "fed"
def test_symbol_score():
score = SymbolScore(
symbol="AAPL",
news_score=0.5,
news_count=10,
social_score=0.3,
policy_score=0.0,
filing_score=0.2,
composite=0.3,
updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
)
assert score.symbol == "AAPL"
assert score.composite == 0.3
def test_market_sentiment():
ms = MarketSentiment(
fear_greed=25,
fear_greed_label="Extreme Fear",
vix=32.5,
fed_stance="hawkish",
market_regime="risk_off",
updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
)
assert ms.market_regime == "risk_off"
assert ms.vix == 32.5
def test_market_sentiment_no_vix():
ms = MarketSentiment(
fear_greed=50,
fear_greed_label="Neutral",
fed_stance="neutral",
market_regime="neutral",
updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
)
assert ms.vix is None
def test_selected_stock():
ss = SelectedStock(
symbol="NVDA",
side=OrderSide.BUY,
conviction=0.85,
reason="CHIPS Act expansion",
key_news=["Trump signs CHIPS Act expansion"],
)
assert ss.conviction == 0.85
assert len(ss.key_news) == 1
def test_candidate():
c = Candidate(
symbol="TSLA",
source="sentiment",
direction=OrderSide.BUY,
score=0.75,
reason="High social buzz",
)
assert c.direction == OrderSide.BUY
c2 = Candidate(
symbol="XOM",
source="llm",
score=0.6,
reason="Oil price surge",
)
assert c2.direction is None
```
- [ ] **Step 2: Run tests to verify they fail**
Run: `pytest shared/tests/test_sentiment_models.py -v`
Expected: FAIL — `NewsCategory`, `NewsItem` not found in `shared.models`, `shared.sentiment_models` does not exist
- [ ] **Step 3: Add NewsCategory and NewsItem to shared/models.py**
Add to the end of `shared/src/shared/models.py`:
```python
class NewsCategory(str, Enum):
POLICY = "policy"
EARNINGS = "earnings"
MACRO = "macro"
SOCIAL = "social"
FILING = "filing"
FED = "fed"
class NewsItem(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
source: str
headline: str
summary: Optional[str] = None
url: Optional[str] = None
published_at: datetime
symbols: list[str] = []
sentiment: float
category: NewsCategory
raw_data: dict = {}
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
```
- [ ] **Step 4: Create shared/src/shared/sentiment_models.py**
```python
"""Sentiment scoring and stock selection models."""
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
from shared.models import OrderSide
class SymbolScore(BaseModel):
symbol: str
news_score: float
news_count: int
social_score: float
policy_score: float
filing_score: float
composite: float
updated_at: datetime
class MarketSentiment(BaseModel):
fear_greed: int
fear_greed_label: str
vix: Optional[float] = None
fed_stance: str
market_regime: str
updated_at: datetime
class SelectedStock(BaseModel):
symbol: str
side: OrderSide
conviction: float
reason: str
key_news: list[str]
class Candidate(BaseModel):
symbol: str
source: str
direction: Optional[OrderSide] = None
score: float
reason: str
```
- [ ] **Step 5: Run tests to verify they pass**
Run: `pytest shared/tests/test_sentiment_models.py -v`
Expected: All 9 tests PASS
- [ ] **Step 6: Commit**
```bash
git add shared/src/shared/models.py shared/src/shared/sentiment_models.py shared/tests/test_sentiment_models.py
git commit -m "feat: add NewsItem, sentiment scoring, and stock selection models"
```
---
### Task 2: Add SQLAlchemy ORM models for news tables
**Files:**
- Modify: `shared/src/shared/sa_models.py`
- Create: `shared/tests/test_sa_news_models.py`
- [ ] **Step 1: Write tests for new SA models**
Create `shared/tests/test_sa_news_models.py`:
```python
"""Tests for news-related SQLAlchemy models."""
from shared.sa_models import NewsItemRow, SymbolScoreRow, MarketSentimentRow, StockSelectionRow
def test_news_item_row_tablename():
assert NewsItemRow.__tablename__ == "news_items"
def test_symbol_score_row_tablename():
assert SymbolScoreRow.__tablename__ == "symbol_scores"
def test_market_sentiment_row_tablename():
assert MarketSentimentRow.__tablename__ == "market_sentiment"
def test_stock_selection_row_tablename():
assert StockSelectionRow.__tablename__ == "stock_selections"
def test_news_item_row_columns():
cols = {c.name for c in NewsItemRow.__table__.columns}
assert cols >= {"id", "source", "headline", "published_at", "sentiment", "category"}
def test_symbol_score_row_columns():
cols = {c.name for c in SymbolScoreRow.__table__.columns}
assert cols >= {"id", "symbol", "news_score", "composite", "updated_at"}
```
- [ ] **Step 2: Run tests to verify they fail**
Run: `pytest shared/tests/test_sa_news_models.py -v`
Expected: FAIL — import errors
- [ ] **Step 3: Add ORM models to sa_models.py**
Add to the end of `shared/src/shared/sa_models.py`:
```python
class NewsItemRow(Base):
__tablename__ = "news_items"
id: Mapped[str] = mapped_column(Text, primary_key=True)
source: Mapped[str] = mapped_column(Text, nullable=False)
headline: Mapped[str] = mapped_column(Text, nullable=False)
summary: Mapped[str | None] = mapped_column(Text)
url: Mapped[str | None] = mapped_column(Text)
published_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
symbols: Mapped[str | None] = mapped_column(Text) # JSON-encoded list
sentiment: Mapped[float] = mapped_column(sa.Float, nullable=False)
category: Mapped[str] = mapped_column(Text, nullable=False)
raw_data: Mapped[str | None] = mapped_column(Text) # JSON string
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=sa.func.now()
)
class SymbolScoreRow(Base):
__tablename__ = "symbol_scores"
id: Mapped[str] = mapped_column(Text, primary_key=True)
symbol: Mapped[str] = mapped_column(Text, nullable=False, unique=True)
news_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
news_count: Mapped[int] = mapped_column(sa.Integer, nullable=False, server_default="0")
social_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
policy_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
filing_score: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
composite: Mapped[float] = mapped_column(sa.Float, nullable=False, server_default="0")
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
class MarketSentimentRow(Base):
__tablename__ = "market_sentiment"
id: Mapped[str] = mapped_column(Text, primary_key=True)
fear_greed: Mapped[int] = mapped_column(sa.Integer, nullable=False)
fear_greed_label: Mapped[str] = mapped_column(Text, nullable=False)
vix: Mapped[float | None] = mapped_column(sa.Float)
fed_stance: Mapped[str] = mapped_column(Text, nullable=False, server_default="neutral")
market_regime: Mapped[str] = mapped_column(Text, nullable=False, server_default="neutral")
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
class StockSelectionRow(Base):
__tablename__ = "stock_selections"
id: Mapped[str] = mapped_column(Text, primary_key=True)
trade_date: Mapped[datetime] = mapped_column(sa.Date, nullable=False)
symbol: Mapped[str] = mapped_column(Text, nullable=False)
side: Mapped[str] = mapped_column(Text, nullable=False)
conviction: Mapped[float] = mapped_column(sa.Float, nullable=False)
reason: Mapped[str] = mapped_column(Text, nullable=False)
key_news: Mapped[str | None] = mapped_column(Text) # JSON string
sentiment_snapshot: Mapped[str | None] = mapped_column(Text) # JSON string
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), nullable=False, server_default=sa.func.now()
)
```
Also add `import sqlalchemy as sa` to the imports at the top of `sa_models.py`.
- [ ] **Step 4: Run tests to verify they pass**
Run: `pytest shared/tests/test_sa_news_models.py -v`
Expected: All 6 tests PASS
- [ ] **Step 5: Commit**
```bash
git add shared/src/shared/sa_models.py shared/tests/test_sa_news_models.py
git commit -m "feat: add SQLAlchemy ORM models for news, scores, selections"
```
---
### Task 3: Create Alembic migration for news tables
**Files:**
- Create: `shared/alembic/versions/002_news_sentiment_tables.py`
- [ ] **Step 1: Create migration file**
Create `shared/alembic/versions/002_news_sentiment_tables.py`:
```python
"""Add news, sentiment, and stock selection tables
Revision ID: 002
Revises: 001
Create Date: 2026-04-02
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
revision: str = "002"
down_revision: Union[str, None] = "001"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"news_items",
sa.Column("id", sa.Text, primary_key=True),
sa.Column("source", sa.Text, nullable=False),
sa.Column("headline", sa.Text, nullable=False),
sa.Column("summary", sa.Text),
sa.Column("url", sa.Text),
sa.Column("published_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("symbols", sa.Text),
sa.Column("sentiment", sa.Float, nullable=False),
sa.Column("category", sa.Text, nullable=False),
sa.Column("raw_data", sa.Text),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
)
op.create_index("idx_news_items_published", "news_items", ["published_at"])
op.create_index("idx_news_items_source", "news_items", ["source"])
op.create_table(
"symbol_scores",
sa.Column("id", sa.Text, primary_key=True),
sa.Column("symbol", sa.Text, nullable=False, unique=True),
sa.Column("news_score", sa.Float, nullable=False, server_default="0"),
sa.Column("news_count", sa.Integer, nullable=False, server_default="0"),
sa.Column("social_score", sa.Float, nullable=False, server_default="0"),
sa.Column("policy_score", sa.Float, nullable=False, server_default="0"),
sa.Column("filing_score", sa.Float, nullable=False, server_default="0"),
sa.Column("composite", sa.Float, nullable=False, server_default="0"),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
)
op.create_table(
"market_sentiment",
sa.Column("id", sa.Text, primary_key=True),
sa.Column("fear_greed", sa.Integer, nullable=False),
sa.Column("fear_greed_label", sa.Text, nullable=False),
sa.Column("vix", sa.Float),
sa.Column("fed_stance", sa.Text, nullable=False, server_default="neutral"),
sa.Column("market_regime", sa.Text, nullable=False, server_default="neutral"),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
)
op.create_table(
"stock_selections",
sa.Column("id", sa.Text, primary_key=True),
sa.Column("trade_date", sa.Date, nullable=False),
sa.Column("symbol", sa.Text, nullable=False),
sa.Column("side", sa.Text, nullable=False),
sa.Column("conviction", sa.Float, nullable=False),
sa.Column("reason", sa.Text, nullable=False),
sa.Column("key_news", sa.Text),
sa.Column("sentiment_snapshot", sa.Text),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False, server_default=sa.func.now()),
)
op.create_index("idx_stock_selections_date", "stock_selections", ["trade_date"])
def downgrade() -> None:
op.drop_table("stock_selections")
op.drop_table("market_sentiment")
op.drop_table("symbol_scores")
op.drop_table("news_items")
```
- [ ] **Step 2: Verify migration imports correctly**
Run: `cd shared && python -c "from alembic.versions import *; print('OK')" && cd ..`
Or simply: `python -c "import importlib.util; s=importlib.util.spec_from_file_location('m','shared/alembic/versions/002_news_sentiment_tables.py'); m=importlib.util.module_from_spec(s); s.loader.exec_module(m); print('OK')"`
Expected: OK (no import errors)
- [ ] **Step 3: Commit**
```bash
git add shared/alembic/versions/002_news_sentiment_tables.py
git commit -m "feat: add Alembic migration for news and sentiment tables"
```
---
### Task 4: Add NewsEvent to shared events and DB methods for news
**Files:**
- Modify: `shared/src/shared/events.py`
- Modify: `shared/src/shared/db.py`
- Create: `shared/tests/test_news_events.py`
- Create: `shared/tests/test_db_news.py`
- [ ] **Step 1: Write tests for NewsEvent**
Create `shared/tests/test_news_events.py`:
```python
"""Tests for NewsEvent."""
from datetime import datetime, timezone
from shared.models import NewsCategory, NewsItem
from shared.events import NewsEvent, EventType, Event
def test_news_event_to_dict():
item = NewsItem(
source="finnhub",
headline="Test",
published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
sentiment=0.5,
category=NewsCategory.MACRO,
)
event = NewsEvent(data=item)
d = event.to_dict()
assert d["type"] == EventType.NEWS
assert d["data"]["source"] == "finnhub"
def test_news_event_from_raw():
raw = {
"type": "NEWS",
"data": {
"id": "abc",
"source": "rss",
"headline": "Test headline",
"published_at": "2026-04-02T00:00:00+00:00",
"sentiment": 0.3,
"category": "earnings",
"symbols": ["AAPL"],
"raw_data": {},
},
}
event = NewsEvent.from_raw(raw)
assert event.data.source == "rss"
assert event.data.symbols == ["AAPL"]
def test_event_dispatcher_news():
raw = {
"type": "NEWS",
"data": {
"id": "abc",
"source": "finnhub",
"headline": "Test",
"published_at": "2026-04-02T00:00:00+00:00",
"sentiment": 0.0,
"category": "macro",
"raw_data": {},
},
}
event = Event.from_dict(raw)
assert isinstance(event, NewsEvent)
```
- [ ] **Step 2: Run tests to verify they fail**
Run: `pytest shared/tests/test_news_events.py -v`
Expected: FAIL — `NewsEvent` not in events.py, `EventType.NEWS` missing
- [ ] **Step 3: Add NewsEvent to events.py**
Add `NEWS = "NEWS"` to `EventType` enum.
Add `NewsEvent` class and register it in `_EVENT_TYPE_MAP`:
```python
from shared.models import Candle, Signal, Order, NewsItem
class NewsEvent(BaseModel):
type: EventType = EventType.NEWS
data: NewsItem
def to_dict(self) -> dict:
return {
"type": self.type,
"data": self.data.model_dump(mode="json"),
}
@classmethod
def from_raw(cls, raw: dict) -> "NewsEvent":
return cls(type=raw["type"], data=NewsItem(**raw["data"]))
```
Add to `_EVENT_TYPE_MAP`:
```python
EventType.NEWS: NewsEvent,
```
- [ ] **Step 4: Run event tests to verify they pass**
Run: `pytest shared/tests/test_news_events.py -v`
Expected: All 3 tests PASS
- [ ] **Step 5: Run all existing event tests to check no regressions**
Run: `pytest shared/tests/test_events.py -v`
Expected: All existing tests PASS
- [ ] **Step 6: Write tests for DB news methods**
Create `shared/tests/test_db_news.py`:
```python
"""Tests for database news/sentiment methods.
These tests use an in-memory SQLite database.
"""
import json
import uuid
import pytest
from datetime import datetime, date, timezone
from shared.db import Database
from shared.models import NewsItem, NewsCategory
from shared.sentiment_models import SymbolScore, MarketSentiment
@pytest.fixture
async def db():
database = Database("sqlite+aiosqlite://")
await database.connect()
yield database
await database.close()
async def test_insert_and_get_news_items(db):
item = NewsItem(
source="finnhub",
headline="AAPL earnings beat",
published_at=datetime(2026, 4, 2, 12, 0, tzinfo=timezone.utc),
sentiment=0.8,
category=NewsCategory.EARNINGS,
symbols=["AAPL"],
)
await db.insert_news_item(item)
items = await db.get_recent_news(hours=24)
assert len(items) == 1
assert items[0]["headline"] == "AAPL earnings beat"
async def test_upsert_symbol_score(db):
score = SymbolScore(
symbol="AAPL",
news_score=0.5,
news_count=10,
social_score=0.3,
policy_score=0.0,
filing_score=0.2,
composite=0.3,
updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
)
await db.upsert_symbol_score(score)
scores = await db.get_top_symbol_scores(limit=5)
assert len(scores) == 1
assert scores[0]["symbol"] == "AAPL"
async def test_upsert_market_sentiment(db):
ms = MarketSentiment(
fear_greed=55,
fear_greed_label="Neutral",
vix=18.2,
fed_stance="neutral",
market_regime="neutral",
updated_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
)
await db.upsert_market_sentiment(ms)
result = await db.get_latest_market_sentiment()
assert result is not None
assert result["fear_greed"] == 55
async def test_insert_stock_selection(db):
await db.insert_stock_selection(
trade_date=date(2026, 4, 2),
symbol="NVDA",
side="BUY",
conviction=0.85,
reason="CHIPS Act",
key_news=["Trump signs CHIPS expansion"],
sentiment_snapshot={"composite": 0.8},
)
selections = await db.get_stock_selections(date(2026, 4, 2))
assert len(selections) == 1
assert selections[0]["symbol"] == "NVDA"
```
- [ ] **Step 7: Run DB news tests to verify they fail**
Run: `pytest shared/tests/test_db_news.py -v`
Expected: FAIL — methods not yet on Database class
- [ ] **Step 8: Add news/sentiment DB methods to db.py**
Add to `shared/src/shared/db.py` — new import at top:
```python
import json
import uuid
from datetime import date
from shared.models import NewsItem
from shared.sentiment_models import SymbolScore, MarketSentiment
from shared.sa_models import NewsItemRow, SymbolScoreRow, MarketSentimentRow, StockSelectionRow
```
Add these methods to the `Database` class:
```python
async def insert_news_item(self, item: NewsItem) -> None:
"""Insert a news item."""
row = NewsItemRow(
id=item.id,
source=item.source,
headline=item.headline,
summary=item.summary,
url=item.url,
published_at=item.published_at,
symbols=json.dumps(item.symbols),
sentiment=item.sentiment,
category=item.category.value,
raw_data=json.dumps(item.raw_data),
created_at=item.created_at,
)
async with self._session_factory() as session:
try:
session.add(row)
await session.commit()
except Exception:
await session.rollback()
raise
async def get_recent_news(self, hours: int = 24) -> list[dict]:
"""Retrieve news items from the last N hours."""
since = datetime.now(timezone.utc) - timedelta(hours=hours)
stmt = (
select(NewsItemRow)
.where(NewsItemRow.published_at >= since)
.order_by(NewsItemRow.published_at.desc())
)
async with self._session_factory() as session:
result = await session.execute(stmt)
rows = result.scalars().all()
return [
{
"id": r.id,
"source": r.source,
"headline": r.headline,
"summary": r.summary,
"url": r.url,
"published_at": r.published_at,
"symbols": json.loads(r.symbols) if r.symbols else [],
"sentiment": r.sentiment,
"category": r.category,
"created_at": r.created_at,
}
for r in rows
]
async def upsert_symbol_score(self, score: SymbolScore) -> None:
"""Insert or update a symbol score."""
async with self._session_factory() as session:
try:
existing = await session.execute(
select(SymbolScoreRow).where(SymbolScoreRow.symbol == score.symbol)
)
row = existing.scalar_one_or_none()
if row:
row.news_score = score.news_score
row.news_count = score.news_count
row.social_score = score.social_score
row.policy_score = score.policy_score
row.filing_score = score.filing_score
row.composite = score.composite
row.updated_at = score.updated_at
else:
row = SymbolScoreRow(
id=str(uuid.uuid4()),
symbol=score.symbol,
news_score=score.news_score,
news_count=score.news_count,
social_score=score.social_score,
policy_score=score.policy_score,
filing_score=score.filing_score,
composite=score.composite,
updated_at=score.updated_at,
)
session.add(row)
await session.commit()
except Exception:
await session.rollback()
raise
async def get_top_symbol_scores(self, limit: int = 20) -> list[dict]:
"""Get top symbol scores ordered by composite descending."""
stmt = (
select(SymbolScoreRow)
.order_by(SymbolScoreRow.composite.desc())
.limit(limit)
)
async with self._session_factory() as session:
result = await session.execute(stmt)
rows = result.scalars().all()
return [
{
"symbol": r.symbol,
"news_score": r.news_score,
"news_count": r.news_count,
"social_score": r.social_score,
"policy_score": r.policy_score,
"filing_score": r.filing_score,
"composite": r.composite,
"updated_at": r.updated_at,
}
for r in rows
]
async def upsert_market_sentiment(self, ms: MarketSentiment) -> None:
"""Insert or update the latest market sentiment (single row, id='latest')."""
async with self._session_factory() as session:
try:
existing = await session.execute(
select(MarketSentimentRow).where(MarketSentimentRow.id == "latest")
)
row = existing.scalar_one_or_none()
if row:
row.fear_greed = ms.fear_greed
row.fear_greed_label = ms.fear_greed_label
row.vix = ms.vix
row.fed_stance = ms.fed_stance
row.market_regime = ms.market_regime
row.updated_at = ms.updated_at
else:
row = MarketSentimentRow(
id="latest",
fear_greed=ms.fear_greed,
fear_greed_label=ms.fear_greed_label,
vix=ms.vix,
fed_stance=ms.fed_stance,
market_regime=ms.market_regime,
updated_at=ms.updated_at,
)
session.add(row)
await session.commit()
except Exception:
await session.rollback()
raise
async def get_latest_market_sentiment(self) -> dict | None:
"""Get the latest market sentiment."""
stmt = select(MarketSentimentRow).where(MarketSentimentRow.id == "latest")
async with self._session_factory() as session:
result = await session.execute(stmt)
r = result.scalar_one_or_none()
if r is None:
return None
return {
"fear_greed": r.fear_greed,
"fear_greed_label": r.fear_greed_label,
"vix": r.vix,
"fed_stance": r.fed_stance,
"market_regime": r.market_regime,
"updated_at": r.updated_at,
}
async def insert_stock_selection(
self,
trade_date: date,
symbol: str,
side: str,
conviction: float,
reason: str,
key_news: list[str],
sentiment_snapshot: dict,
) -> None:
"""Insert a stock selection record."""
row = StockSelectionRow(
id=str(uuid.uuid4()),
trade_date=trade_date,
symbol=symbol,
side=side,
conviction=conviction,
reason=reason,
key_news=json.dumps(key_news),
sentiment_snapshot=json.dumps(sentiment_snapshot),
)
async with self._session_factory() as session:
try:
session.add(row)
await session.commit()
except Exception:
await session.rollback()
raise
async def get_stock_selections(self, trade_date: date) -> list[dict]:
"""Get stock selections for a specific date."""
stmt = (
select(StockSelectionRow)
.where(StockSelectionRow.trade_date == trade_date)
.order_by(StockSelectionRow.conviction.desc())
)
async with self._session_factory() as session:
result = await session.execute(stmt)
rows = result.scalars().all()
return [
{
"symbol": r.symbol,
"side": r.side,
"conviction": r.conviction,
"reason": r.reason,
"key_news": json.loads(r.key_news) if r.key_news else [],
"sentiment_snapshot": json.loads(r.sentiment_snapshot) if r.sentiment_snapshot else {},
}
for r in rows
]
```
- [ ] **Step 9: Run DB news tests to verify they pass**
Run: `pytest shared/tests/test_db_news.py -v`
Expected: All 4 tests PASS
Note: These tests require `aiosqlite` package. If not installed: `pip install aiosqlite`
- [ ] **Step 10: Run all shared tests to check no regressions**
Run: `pytest shared/tests/ -v`
Expected: All tests PASS
- [ ] **Step 11: Commit**
```bash
git add shared/src/shared/events.py shared/src/shared/db.py shared/tests/test_news_events.py shared/tests/test_db_news.py
git commit -m "feat: add NewsEvent, DB methods for news/sentiment/selections"
```
---
### Task 5: Update Settings with new env vars
**Files:**
- Modify: `shared/src/shared/config.py`
- Modify: `.env.example`
- [ ] **Step 1: Add new settings to config.py**
Add these fields to the `Settings` class in `shared/src/shared/config.py`:
```python
# News collector
finnhub_api_key: str = ""
news_poll_interval: int = 300
sentiment_aggregate_interval: int = 900
# Stock selector
selector_candidates_time: str = "15:00"
selector_filter_time: str = "15:15"
selector_final_time: str = "15:30"
selector_max_picks: int = 3
# LLM
anthropic_api_key: str = ""
anthropic_model: str = "claude-sonnet-4-20250514"
```
- [ ] **Step 2: Add to .env.example**
Append to `.env.example`:
```bash
# News Collector
FINNHUB_API_KEY=
NEWS_POLL_INTERVAL=300
SENTIMENT_AGGREGATE_INTERVAL=900
# Stock Selector
SELECTOR_CANDIDATES_TIME=15:00
SELECTOR_FILTER_TIME=15:15
SELECTOR_FINAL_TIME=15:30
SELECTOR_MAX_PICKS=3
# LLM (for stock selector)
ANTHROPIC_API_KEY=
ANTHROPIC_MODEL=claude-sonnet-4-20250514
```
- [ ] **Step 3: Commit**
```bash
git add shared/src/shared/config.py .env.example
git commit -m "feat: add news collector and stock selector config settings"
```
---
## Phase 2: News Collector Service
### Task 6: Scaffold news-collector service
**Files:**
- Create: `services/news-collector/pyproject.toml`
- Create: `services/news-collector/Dockerfile`
- Create: `services/news-collector/src/news_collector/__init__.py`
- Create: `services/news-collector/src/news_collector/config.py`
- Create: `services/news-collector/src/news_collector/collectors/__init__.py`
- Create: `services/news-collector/src/news_collector/collectors/base.py`
- Create: `services/news-collector/tests/__init__.py`
- [ ] **Step 1: Create pyproject.toml**
Create `services/news-collector/pyproject.toml`:
```toml
[project]
name = "news-collector"
version = "0.1.0"
description = "News and sentiment data collector service"
requires-python = ">=3.12"
dependencies = [
"trading-shared",
"feedparser>=6.0",
"nltk>=3.8",
"aiohttp>=3.9",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0",
"pytest-asyncio>=0.23",
"aioresponses>=0.7",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/news_collector"]
```
- [ ] **Step 2: Create Dockerfile**
Create `services/news-collector/Dockerfile`:
```dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY shared/ shared/
RUN pip install --no-cache-dir ./shared
COPY services/news-collector/ services/news-collector/
RUN pip install --no-cache-dir ./services/news-collector
RUN python -c "import nltk; nltk.download('vader_lexicon', quiet=True)"
ENV PYTHONPATH=/app
CMD ["python", "-m", "news_collector.main"]
```
- [ ] **Step 3: Create config.py**
Create `services/news-collector/src/news_collector/config.py`:
```python
"""News Collector configuration."""
from shared.config import Settings
class NewsCollectorConfig(Settings):
health_port: int = 8084
finnhub_api_key: str = ""
news_poll_interval: int = 300
sentiment_aggregate_interval: int = 900
```
- [ ] **Step 4: Create BaseCollector**
Create `services/news-collector/src/news_collector/collectors/base.py`:
```python
"""Base class for all news collectors."""
from abc import ABC, abstractmethod
from shared.models import NewsItem
class BaseCollector(ABC):
name: str = "base"
poll_interval: int = 300 # seconds
@abstractmethod
async def collect(self) -> list[NewsItem]:
"""Collect news items from the source."""
@abstractmethod
async def is_available(self) -> bool:
"""Check if this data source is accessible."""
```
- [ ] **Step 5: Create __init__.py files**
Create `services/news-collector/src/news_collector/__init__.py`:
```python
"""News collector service."""
```
Create `services/news-collector/src/news_collector/collectors/__init__.py`:
```python
"""News collectors."""
```
Create `services/news-collector/tests/__init__.py`:
```python
```
- [ ] **Step 6: Commit**
```bash
git add services/news-collector/
git commit -m "feat: scaffold news-collector service with BaseCollector"
```
---
### Task 7: Implement Finnhub news collector
**Files:**
- Create: `services/news-collector/src/news_collector/collectors/finnhub.py`
- Create: `services/news-collector/tests/test_finnhub.py`
- [ ] **Step 1: Write tests**
Create `services/news-collector/tests/test_finnhub.py`:
```python
"""Tests for Finnhub news collector."""
import pytest
from unittest.mock import AsyncMock, patch
from datetime import datetime, timezone
from news_collector.collectors.finnhub import FinnhubCollector
@pytest.fixture
def collector():
return FinnhubCollector(api_key="test_key")
def test_collector_name(collector):
assert collector.name == "finnhub"
assert collector.poll_interval == 300
async def test_is_available_with_key(collector):
assert await collector.is_available() is True
async def test_is_available_without_key():
c = FinnhubCollector(api_key="")
assert await c.is_available() is False
async def test_collect_parses_response(collector):
mock_response = [
{
"category": "top news",
"datetime": 1711929600,
"headline": "AAPL beats earnings",
"id": 12345,
"related": "AAPL",
"source": "MarketWatch",
"summary": "Apple reported better than expected...",
"url": "https://example.com/article",
},
{
"category": "top news",
"datetime": 1711929000,
"headline": "Fed holds rates steady",
"id": 12346,
"related": "",
"source": "Reuters",
"summary": "The Federal Reserve...",
"url": "https://example.com/fed",
},
]
with patch.object(collector, "_fetch_news", new_callable=AsyncMock, return_value=mock_response):
items = await collector.collect()
assert len(items) == 2
assert items[0].source == "finnhub"
assert items[0].headline == "AAPL beats earnings"
assert items[0].symbols == ["AAPL"]
assert items[0].url == "https://example.com/article"
assert isinstance(items[0].sentiment, float)
# Second item has no related ticker
assert items[1].symbols == []
async def test_collect_handles_empty_response(collector):
with patch.object(collector, "_fetch_news", new_callable=AsyncMock, return_value=[]):
items = await collector.collect()
assert items == []
```
- [ ] **Step 2: Run tests to verify they fail**
Run: `pytest services/news-collector/tests/test_finnhub.py -v`
Expected: FAIL — module not found
- [ ] **Step 3: Implement FinnhubCollector**
Create `services/news-collector/src/news_collector/collectors/finnhub.py`:
```python
"""Finnhub market news collector (free tier: 60 req/min)."""
import logging
from datetime import datetime, timezone
import aiohttp
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from shared.models import NewsCategory, NewsItem
from news_collector.collectors.base import BaseCollector
logger = logging.getLogger(__name__)
FINNHUB_NEWS_URL = "https://finnhub.io/api/v1/news"
class FinnhubCollector(BaseCollector):
name = "finnhub"
poll_interval = 300 # 5 minutes
def __init__(self, api_key: str) -> None:
self._api_key = api_key
self._vader = SentimentIntensityAnalyzer()
async def is_available(self) -> bool:
return bool(self._api_key)
async def _fetch_news(self) -> list[dict]:
"""Fetch general news from Finnhub API."""
params = {"category": "general", "token": self._api_key}
async with aiohttp.ClientSession() as session:
async with session.get(FINNHUB_NEWS_URL, params=params) as resp:
if resp.status != 200:
logger.warning("finnhub_fetch_failed", status=resp.status)
return []
return await resp.json()
def _analyze_sentiment(self, text: str) -> float:
"""Return VADER compound score (-1.0 to 1.0)."""
scores = self._vader.polarity_scores(text)
return scores["compound"]
def _extract_symbols(self, related: str) -> list[str]:
"""Parse Finnhub 'related' field into symbol list."""
if not related or not related.strip():
return []
return [s.strip() for s in related.split(",") if s.strip()]
def _categorize(self, article: dict) -> NewsCategory:
"""Determine category from article content."""
headline = article.get("headline", "").lower()
if any(w in headline for w in ["fed", "fomc", "rate", "inflation"]):
return NewsCategory.FED
if any(w in headline for w in ["tariff", "sanction", "regulation", "trump", "biden", "congress"]):
return NewsCategory.POLICY
if any(w in headline for w in ["earnings", "revenue", "profit", "eps"]):
return NewsCategory.EARNINGS
return NewsCategory.MACRO
async def collect(self) -> list[NewsItem]:
raw = await self._fetch_news()
items = []
for article in raw:
headline = article.get("headline", "")
summary = article.get("summary", "")
sentiment_text = f"{headline}. {summary}" if summary else headline
items.append(
NewsItem(
source=self.name,
headline=headline,
summary=summary or None,
url=article.get("url"),
published_at=datetime.fromtimestamp(
article.get("datetime", 0), tz=timezone.utc
),
symbols=self._extract_symbols(article.get("related", "")),
sentiment=self._analyze_sentiment(sentiment_text),
category=self._categorize(article),
raw_data=article,
)
)
return items
```
- [ ] **Step 4: Run tests to verify they pass**
Run: `pytest services/news-collector/tests/test_finnhub.py -v`
Expected: All 5 tests PASS
Note: Requires `nltk` and VADER lexicon. If not downloaded: `python -c "import nltk; nltk.download('vader_lexicon')"`
- [ ] **Step 5: Commit**
```bash
git add services/news-collector/src/news_collector/collectors/finnhub.py services/news-collector/tests/test_finnhub.py
git commit -m "feat: implement Finnhub news collector with VADER sentiment"
```
---
### Task 8: Implement RSS news collector
**Files:**
- Create: `services/news-collector/src/news_collector/collectors/rss.py`
- Create: `services/news-collector/tests/test_rss.py`
- [ ] **Step 1: Write tests**
Create `services/news-collector/tests/test_rss.py`:
```python
"""Tests for RSS news collector."""
import pytest
from unittest.mock import AsyncMock, patch
from datetime import datetime, timezone
from news_collector.collectors.rss import RSSCollector
@pytest.fixture
def collector():
return RSSCollector()
def test_collector_name(collector):
assert collector.name == "rss"
assert collector.poll_interval == 600
async def test_is_available(collector):
assert await collector.is_available() is True
async def test_collect_parses_feed(collector):
mock_feed = {
"entries": [
{
"title": "NVDA surges on AI demand",
"link": "https://example.com/nvda",
"published_parsed": (2026, 4, 2, 12, 0, 0, 0, 0, 0),
"summary": "Nvidia stock jumped 5%...",
},
{
"title": "Markets rally on jobs data",
"link": "https://example.com/market",
"published_parsed": (2026, 4, 2, 11, 0, 0, 0, 0, 0),
"summary": "The S&P 500 rose...",
},
],
}
with patch.object(collector, "_fetch_feeds", new_callable=AsyncMock, return_value=[mock_feed]):
items = await collector.collect()
assert len(items) == 2
assert items[0].source == "rss"
assert items[0].headline == "NVDA surges on AI demand"
assert isinstance(items[0].sentiment, float)
```
- [ ] **Step 2: Run tests to verify they fail**
Run: `pytest services/news-collector/tests/test_rss.py -v`
Expected: FAIL
- [ ] **Step 3: Implement RSSCollector**
Create `services/news-collector/src/news_collector/collectors/rss.py`:
```python
"""RSS feed collector for Yahoo Finance, Google News, MarketWatch."""
import asyncio
import logging
import re
from calendar import timegm
from datetime import datetime, timezone
import aiohttp
import feedparser
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from shared.models import NewsCategory, NewsItem
from news_collector.collectors.base import BaseCollector
logger = logging.getLogger(__name__)
DEFAULT_FEEDS = [
"https://feeds.finance.yahoo.com/rss/2.0/headline?s=^GSPC®ion=US&lang=en-US",
"https://news.google.com/rss/topics/CAAqJggKIiBDQkFTRWdvSUwyMHZNRGx6TVdZU0FtVnVHZ0pWVXlnQVAB?hl=en-US&gl=US&ceid=US:en",
"https://www.marketwatch.com/rss/topstories",
]
# Common US stock tickers to detect in headlines
TICKER_PATTERN = re.compile(
r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|JPM|V|JNJ|WMT|PG|UNH|HD|"
r"MA|DIS|BAC|XOM|PFE|KO|PEP|CSCO|INTC|VZ|NFLX|ADBE|CRM|AMD|QCOM|"
r"GS|BA|CAT|MMM|IBM|GE|F|GM|NKE|MCD|SBUX|SPY|QQQ|IWM)\b"
)
class RSSCollector(BaseCollector):
name = "rss"
poll_interval = 600 # 10 minutes
def __init__(self, feeds: list[str] | None = None) -> None:
self._feeds = feeds or DEFAULT_FEEDS
self._vader = SentimentIntensityAnalyzer()
async def is_available(self) -> bool:
return True
async def _fetch_feeds(self) -> list[dict]:
"""Fetch and parse all RSS feeds."""
results = []
async with aiohttp.ClientSession() as session:
for url in self._feeds:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status == 200:
text = await resp.text()
feed = feedparser.parse(text)
results.append(feed)
except Exception as exc:
logger.warning("rss_fetch_failed", url=url, error=str(exc))
return results
def _extract_symbols(self, text: str) -> list[str]:
"""Extract stock tickers from text."""
return list(set(TICKER_PATTERN.findall(text)))
def _parse_time(self, entry: dict) -> datetime:
"""Parse published time from feed entry."""
parsed = entry.get("published_parsed")
if parsed:
return datetime.fromtimestamp(timegm(parsed), tz=timezone.utc)
return datetime.now(timezone.utc)
async def collect(self) -> list[NewsItem]:
feeds = await self._fetch_feeds()
items = []
seen_titles = set()
for feed in feeds:
for entry in feed.get("entries", []):
title = entry.get("title", "").strip()
if not title or title in seen_titles:
continue
seen_titles.add(title)
summary = entry.get("summary", "")
sentiment_text = f"{title}. {summary}" if summary else title
items.append(
NewsItem(
source=self.name,
headline=title,
summary=summary or None,
url=entry.get("link"),
published_at=self._parse_time(entry),
symbols=self._extract_symbols(f"{title} {summary}"),
sentiment=self._vader.polarity_scores(sentiment_text)["compound"],
category=NewsCategory.MACRO,
raw_data={"feed_title": title},
)
)
return items
```
- [ ] **Step 4: Run tests to verify they pass**
Run: `pytest services/news-collector/tests/test_rss.py -v`
Expected: All 3 tests PASS
- [ ] **Step 5: Commit**
```bash
git add services/news-collector/src/news_collector/collectors/rss.py services/news-collector/tests/test_rss.py
git commit -m "feat: implement RSS news collector (Yahoo, Google News, MarketWatch)"
```
---
### Task 9: Implement Fear & Greed Index collector
**Files:**
- Create: `services/news-collector/src/news_collector/collectors/fear_greed.py`
- Create: `services/news-collector/tests/test_fear_greed.py`
- [ ] **Step 1: Write tests**
Create `services/news-collector/tests/test_fear_greed.py`:
```python
"""Tests for CNN Fear & Greed Index collector."""
import pytest
from unittest.mock import AsyncMock, patch
from news_collector.collectors.fear_greed import FearGreedCollector
@pytest.fixture
def collector():
return FearGreedCollector()
def test_collector_name(collector):
assert collector.name == "fear_greed"
assert collector.poll_interval == 3600
async def test_is_available(collector):
assert await collector.is_available() is True
async def test_collect_parses_api_response(collector):
mock_data = {
"fear_and_greed": {
"score": 45.0,
"rating": "Fear",
"timestamp": "2026-04-02T12:00:00+00:00",
}
}
with patch.object(collector, "_fetch_index", new_callable=AsyncMock, return_value=mock_data):
result = await collector.collect()
assert result.fear_greed == 45
assert result.fear_greed_label == "Fear"
async def test_collect_returns_none_on_failure(collector):
with patch.object(collector, "_fetch_index", new_callable=AsyncMock, return_value=None):
result = await collector.collect()
assert result is None
def test_classify_label():
c = FearGreedCollector()
assert c._classify(10) == "Extreme Fear"
assert c._classify(30) == "Fear"
assert c._classify(50) == "Neutral"
assert c._classify(70) == "Greed"
assert c._classify(85) == "Extreme Greed"
```
- [ ] **Step 2: Run tests to verify they fail**
Run: `pytest services/news-collector/tests/test_fear_greed.py -v`
Expected: FAIL
- [ ] **Step 3: Implement FearGreedCollector**
Create `services/news-collector/src/news_collector/collectors/fear_greed.py`:
```python
"""CNN Fear & Greed Index collector."""
import logging
from dataclasses import dataclass
from typing import Optional
import aiohttp
from news_collector.collectors.base import BaseCollector
from shared.models import NewsItem
logger = logging.getLogger(__name__)
FEAR_GREED_URL = "https://production.dataviz.cnn.io/index/fearandgreed/graphdata"
@dataclass
class FearGreedResult:
fear_greed: int
fear_greed_label: str
class FearGreedCollector(BaseCollector):
"""Fetches CNN Fear & Greed Index.
Note: This collector does NOT return NewsItem — it returns FearGreedResult
which feeds directly into MarketSentiment. The main.py scheduler handles
this differently from news collectors.
"""
name = "fear_greed"
poll_interval = 3600 # 1 hour
async def is_available(self) -> bool:
return True
async def _fetch_index(self) -> Optional[dict]:
"""Fetch Fear & Greed data from CNN API."""
headers = {"User-Agent": "Mozilla/5.0"}
try:
async with aiohttp.ClientSession() as session:
async with session.get(
FEAR_GREED_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
) as resp:
if resp.status != 200:
logger.warning("fear_greed_fetch_failed", status=resp.status)
return None
return await resp.json()
except Exception as exc:
logger.warning("fear_greed_error", error=str(exc))
return None
def _classify(self, score: int) -> str:
"""Classify numeric score into label."""
if score <= 20:
return "Extreme Fear"
if score <= 40:
return "Fear"
if score <= 60:
return "Neutral"
if score <= 80:
return "Greed"
return "Extreme Greed"
async def collect(self) -> Optional[FearGreedResult]:
"""Collect Fear & Greed Index. Returns FearGreedResult or None."""
data = await self._fetch_index()
if data is None:
return None
try:
fg = data["fear_and_greed"]
score = int(fg["score"])
label = fg.get("rating", self._classify(score))
return FearGreedResult(fear_greed=score, fear_greed_label=label)
except (KeyError, ValueError, TypeError) as exc:
logger.warning("fear_greed_parse_failed", error=str(exc))
return None
```
- [ ] **Step 4: Run tests to verify they pass**
Run: `pytest services/news-collector/tests/test_fear_greed.py -v`
Expected: All 5 tests PASS
- [ ] **Step 5: Commit**
```bash
git add services/news-collector/src/news_collector/collectors/fear_greed.py services/news-collector/tests/test_fear_greed.py
git commit -m "feat: implement CNN Fear & Greed Index collector"
```
---
### Task 10: Implement SEC EDGAR collector
**Files:**
- Create: `services/news-collector/src/news_collector/collectors/sec_edgar.py`
- Create: `services/news-collector/tests/test_sec_edgar.py`
- [ ] **Step 1: Write tests**
Create `services/news-collector/tests/test_sec_edgar.py`:
```python
"""Tests for SEC EDGAR filing collector."""
import pytest
from unittest.mock import AsyncMock, patch
from news_collector.collectors.sec_edgar import SecEdgarCollector
@pytest.fixture
def collector():
return SecEdgarCollector()
def test_collector_name(collector):
assert collector.name == "sec_edgar"
assert collector.poll_interval == 1800
async def test_is_available(collector):
assert await collector.is_available() is True
async def test_collect_parses_filings(collector):
mock_response = {
"filings": {
"recent": {
"accessionNumber": ["0001234-26-000001"],
"filingDate": ["2026-04-02"],
"primaryDocument": ["filing.htm"],
"form": ["8-K"],
"primaryDocDescription": ["Current Report"],
}
},
"tickers": [{"ticker": "AAPL"}],
"name": "Apple Inc",
}
with patch.object(collector, "_fetch_recent_filings", new_callable=AsyncMock, return_value=[mock_response]):
items = await collector.collect()
assert len(items) == 1
assert items[0].source == "sec_edgar"
assert items[0].category.value == "filing"
assert "AAPL" in items[0].symbols
async def test_collect_handles_empty(collector):
with patch.object(collector, "_fetch_recent_filings", new_callable=AsyncMock, return_value=[]):
items = await collector.collect()
assert items == []
```
- [ ] **Step 2: Run tests to verify they fail**
Run: `pytest services/news-collector/tests/test_sec_edgar.py -v`
Expected: FAIL
- [ ] **Step 3: Implement SecEdgarCollector**
Create `services/news-collector/src/news_collector/collectors/sec_edgar.py`:
```python
"""SEC EDGAR filing collector (free, no API key required)."""
import logging
from datetime import datetime, timezone
import aiohttp
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from shared.models import NewsCategory, NewsItem
from news_collector.collectors.base import BaseCollector
logger = logging.getLogger(__name__)
EDGAR_FULL_TEXT_SEARCH = "https://efts.sec.gov/LATEST/search-index"
EDGAR_RECENT_FILINGS = "https://efts.sec.gov/LATEST/search-index?q=%228-K%22&dateRange=custom&startdt={date}&enddt={date}&forms=8-K"
EDGAR_COMPANY_FILINGS = "https://data.sec.gov/submissions/CIK{cik}.json"
# CIK numbers for major companies (subset — extend as needed)
TRACKED_CIKS = {
"0000320193": "AAPL",
"0000789019": "MSFT",
"0001652044": "GOOGL",
"0001018724": "AMZN",
"0001318605": "TSLA",
"0001045810": "NVDA",
"0001326801": "META",
"0000019617": "JPM",
"0000078003": "PFE",
"0000021344": "KO",
}
SEC_USER_AGENT = "TradingPlatform research@example.com"
class SecEdgarCollector(BaseCollector):
name = "sec_edgar"
poll_interval = 1800 # 30 minutes
def __init__(self) -> None:
self._vader = SentimentIntensityAnalyzer()
async def is_available(self) -> bool:
return True
async def _fetch_recent_filings(self) -> list[dict]:
"""Fetch recent 8-K filings for tracked companies."""
results = []
headers = {"User-Agent": SEC_USER_AGENT}
async with aiohttp.ClientSession() as session:
for cik, ticker in TRACKED_CIKS.items():
try:
url = f"https://data.sec.gov/submissions/CIK{cik}.json"
async with session.get(
url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
) as resp:
if resp.status == 200:
data = await resp.json()
data["tickers"] = [{"ticker": ticker}]
results.append(data)
except Exception as exc:
logger.warning("sec_fetch_failed", cik=cik, error=str(exc))
return results
async def collect(self) -> list[NewsItem]:
filings_data = await self._fetch_recent_filings()
items = []
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
for company_data in filings_data:
tickers = [t["ticker"] for t in company_data.get("tickers", [])]
company_name = company_data.get("name", "Unknown")
recent = company_data.get("filings", {}).get("recent", {})
forms = recent.get("form", [])
dates = recent.get("filingDate", [])
descriptions = recent.get("primaryDocDescription", [])
accessions = recent.get("accessionNumber", [])
for i, form in enumerate(forms):
if form != "8-K":
continue
filing_date = dates[i] if i < len(dates) else ""
if filing_date != today:
continue
desc = descriptions[i] if i < len(descriptions) else "8-K Filing"
accession = accessions[i] if i < len(accessions) else ""
headline = f"{company_name} ({', '.join(tickers)}): {form} - {desc}"
items.append(
NewsItem(
source=self.name,
headline=headline,
summary=desc,
url=f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&accession={accession}",
published_at=datetime.strptime(filing_date, "%Y-%m-%d").replace(tzinfo=timezone.utc),
symbols=tickers,
sentiment=self._vader.polarity_scores(headline)["compound"],
category=NewsCategory.FILING,
raw_data={"form": form, "accession": accession},
)
)
return items
```
- [ ] **Step 4: Run tests to verify they pass**
Run: `pytest services/news-collector/tests/test_sec_edgar.py -v`
Expected: All 4 tests PASS
- [ ] **Step 5: Commit**
```bash
git add services/news-collector/src/news_collector/collectors/sec_edgar.py services/news-collector/tests/test_sec_edgar.py
git commit -m "feat: implement SEC EDGAR 8-K filing collector"
```
---
### Task 11: Implement Reddit collector
**Files:**
- Create: `services/news-collector/src/news_collector/collectors/reddit.py`
- Create: `services/news-collector/tests/test_reddit.py`
- [ ] **Step 1: Write tests**
Create `services/news-collector/tests/test_reddit.py`:
```python
"""Tests for Reddit collector."""
import pytest
from unittest.mock import AsyncMock, patch
from news_collector.collectors.reddit import RedditCollector
@pytest.fixture
def collector():
return RedditCollector()
def test_collector_name(collector):
assert collector.name == "reddit"
assert collector.poll_interval == 900
async def test_is_available(collector):
assert await collector.is_available() is True
async def test_collect_parses_posts(collector):
mock_posts = [
{
"data": {
"title": "NVDA to the moon! 🚀 AI demand is insane",
"selftext": "Just loaded up on NVDA calls",
"url": "https://reddit.com/r/wallstreetbets/123",
"created_utc": 1711929600,
"score": 500,
"num_comments": 200,
"subreddit": "wallstreetbets",
}
},
]
with patch.object(collector, "_fetch_subreddit", new_callable=AsyncMock, return_value=mock_posts):
items = await collector.collect()
assert len(items) >= 1
assert items[0].source == "reddit"
assert items[0].category.value == "social"
assert isinstance(items[0].sentiment, float)
async def test_collect_filters_low_score(collector):
mock_posts = [
{
"data": {
"title": "Random question about stocks",
"selftext": "",
"url": "https://reddit.com/r/stocks/456",
"created_utc": 1711929600,
"score": 3,
"num_comments": 1,
"subreddit": "stocks",
}
},
]
with patch.object(collector, "_fetch_subreddit", new_callable=AsyncMock, return_value=mock_posts):
items = await collector.collect()
assert items == []
```
- [ ] **Step 2: Run tests to verify they fail**
Run: `pytest services/news-collector/tests/test_reddit.py -v`
Expected: FAIL
- [ ] **Step 3: Implement RedditCollector**
Create `services/news-collector/src/news_collector/collectors/reddit.py`:
```python
"""Reddit collector for r/wallstreetbets, r/stocks, r/investing."""
import logging
import re
from datetime import datetime, timezone
import aiohttp
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from shared.models import NewsCategory, NewsItem
from news_collector.collectors.base import BaseCollector
logger = logging.getLogger(__name__)
SUBREDDITS = ["wallstreetbets", "stocks", "investing"]
MIN_SCORE = 50 # Minimum upvotes to consider
TICKER_PATTERN = re.compile(
r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|JPM|V|JNJ|WMT|PG|UNH|HD|"
r"MA|DIS|BAC|XOM|PFE|KO|PEP|CSCO|INTC|VZ|NFLX|ADBE|CRM|AMD|QCOM|"
r"GS|BA|CAT|MMM|IBM|GE|F|GM|NKE|MCD|SBUX|SPY|QQQ|IWM)\b"
)
class RedditCollector(BaseCollector):
name = "reddit"
poll_interval = 900 # 15 minutes
def __init__(self) -> None:
self._vader = SentimentIntensityAnalyzer()
async def is_available(self) -> bool:
return True
async def _fetch_subreddit(self, subreddit: str = "wallstreetbets") -> list[dict]:
"""Fetch hot posts from a subreddit via JSON API."""
url = f"https://www.reddit.com/r/{subreddit}/hot.json?limit=25"
headers = {"User-Agent": "TradingPlatform/1.0"}
try:
async with aiohttp.ClientSession() as session:
async with session.get(
url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
) as resp:
if resp.status != 200:
logger.warning("reddit_fetch_failed", subreddit=subreddit, status=resp.status)
return []
data = await resp.json()
return data.get("data", {}).get("children", [])
except Exception as exc:
logger.warning("reddit_error", subreddit=subreddit, error=str(exc))
return []
async def collect(self) -> list[NewsItem]:
items = []
seen_titles = set()
for subreddit in SUBREDDITS:
posts = await self._fetch_subreddit(subreddit)
for post in posts:
data = post.get("data", {})
title = data.get("title", "").strip()
score = data.get("score", 0)
if not title or title in seen_titles or score < MIN_SCORE:
continue
seen_titles.add(title)
selftext = data.get("selftext", "")
text = f"{title}. {selftext}" if selftext else title
symbols = list(set(TICKER_PATTERN.findall(text)))
items.append(
NewsItem(
source=self.name,
headline=title,
summary=selftext[:500] if selftext else None,
url=data.get("url"),
published_at=datetime.fromtimestamp(
data.get("created_utc", 0), tz=timezone.utc
),
symbols=symbols,
sentiment=self._vader.polarity_scores(text)["compound"],
category=NewsCategory.SOCIAL,
raw_data={
"subreddit": data.get("subreddit", subreddit),
"score": score,
"num_comments": data.get("num_comments", 0),
},
)
)
return items
```
- [ ] **Step 4: Run tests to verify they pass**
Run: `pytest services/news-collector/tests/test_reddit.py -v`
Expected: All 4 tests PASS
- [ ] **Step 5: Commit**
```bash
git add services/news-collector/src/news_collector/collectors/reddit.py services/news-collector/tests/test_reddit.py
git commit -m "feat: implement Reddit social sentiment collector"
```
---
### Task 12: Implement Truth Social and Fed collectors
**Files:**
- Create: `services/news-collector/src/news_collector/collectors/truth_social.py`
- Create: `services/news-collector/src/news_collector/collectors/fed.py`
- Create: `services/news-collector/tests/test_truth_social.py`
- Create: `services/news-collector/tests/test_fed.py`
- [ ] **Step 1: Write tests for Truth Social**
Create `services/news-collector/tests/test_truth_social.py`:
```python
"""Tests for Truth Social collector."""
import pytest
from unittest.mock import AsyncMock, patch
from news_collector.collectors.truth_social import TruthSocialCollector
@pytest.fixture
def collector():
return TruthSocialCollector()
def test_collector_name(collector):
assert collector.name == "truth_social"
assert collector.poll_interval == 900
async def test_is_available(collector):
assert await collector.is_available() is True
async def test_collect_parses_posts(collector):
mock_posts = [
{
"content": "We are imposing 25% tariffs on all steel imports!",
"created_at": "2026-04-02T12:00:00.000Z",
"url": "https://truthsocial.com/@realDonaldTrump/12345",
},
]
with patch.object(collector, "_fetch_posts", new_callable=AsyncMock, return_value=mock_posts):
items = await collector.collect()
assert len(items) == 1
assert items[0].source == "truth_social"
assert items[0].category.value == "policy"
assert "tariff" in items[0].headline.lower() or "tariff" in items[0].raw_data.get("content", "").lower()
async def test_collect_handles_empty(collector):
with patch.object(collector, "_fetch_posts", new_callable=AsyncMock, return_value=[]):
items = await collector.collect()
assert items == []
```
- [ ] **Step 2: Write tests for Fed collector**
Create `services/news-collector/tests/test_fed.py`:
```python
"""Tests for Federal Reserve collector."""
import pytest
from unittest.mock import AsyncMock, patch
from news_collector.collectors.fed import FedCollector
@pytest.fixture
def collector():
return FedCollector()
def test_collector_name(collector):
assert collector.name == "fed"
assert collector.poll_interval == 3600
async def test_is_available(collector):
assert await collector.is_available() is True
async def test_collect_parses_rss(collector):
mock_entries = [
{
"title": "Federal Reserve issues FOMC statement",
"link": "https://www.federalreserve.gov/newsevents/pressreleases/monetary20260402a.htm",
"published_parsed": (2026, 4, 2, 14, 0, 0, 0, 0, 0),
"summary": "The Federal Open Market Committee decided to maintain the target range...",
},
]
with patch.object(collector, "_fetch_fed_rss", new_callable=AsyncMock, return_value=mock_entries):
items = await collector.collect()
assert len(items) == 1
assert items[0].source == "fed"
assert items[0].category.value == "fed"
```
- [ ] **Step 3: Run tests to verify they fail**
Run: `pytest services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py -v`
Expected: FAIL
- [ ] **Step 4: Implement TruthSocialCollector**
Create `services/news-collector/src/news_collector/collectors/truth_social.py`:
```python
"""Truth Social collector for Trump posts (policy-relevant)."""
import logging
from datetime import datetime, timezone
import aiohttp
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from shared.models import NewsCategory, NewsItem
from news_collector.collectors.base import BaseCollector
logger = logging.getLogger(__name__)
# Truth Social uses a Mastodon-compatible API
TRUTH_SOCIAL_API = "https://truthsocial.com/api/v1/accounts/107780257626128497/statuses"
class TruthSocialCollector(BaseCollector):
name = "truth_social"
poll_interval = 900 # 15 minutes
def __init__(self) -> None:
self._vader = SentimentIntensityAnalyzer()
async def is_available(self) -> bool:
return True
async def _fetch_posts(self) -> list[dict]:
"""Fetch recent posts from Truth Social."""
headers = {"User-Agent": "Mozilla/5.0"}
try:
async with aiohttp.ClientSession() as session:
async with session.get(
TRUTH_SOCIAL_API,
headers=headers,
params={"limit": 10},
timeout=aiohttp.ClientTimeout(total=15),
) as resp:
if resp.status != 200:
logger.warning("truth_social_fetch_failed", status=resp.status)
return []
return await resp.json()
except Exception as exc:
logger.warning("truth_social_error", error=str(exc))
return []
def _strip_html(self, text: str) -> str:
"""Remove HTML tags from content."""
import re
return re.sub(r"<[^>]+>", "", text).strip()
async def collect(self) -> list[NewsItem]:
posts = await self._fetch_posts()
items = []
for post in posts:
content = self._strip_html(post.get("content", ""))
if not content:
continue
created_at_str = post.get("created_at", "")
try:
published = datetime.fromisoformat(created_at_str.replace("Z", "+00:00"))
except (ValueError, AttributeError):
published = datetime.now(timezone.utc)
items.append(
NewsItem(
source=self.name,
headline=content[:200],
summary=content if len(content) > 200 else None,
url=post.get("url"),
published_at=published,
symbols=[], # Symbols extracted at aggregation stage via LLM
sentiment=self._vader.polarity_scores(content)["compound"],
category=NewsCategory.POLICY,
raw_data={"content": content, "id": post.get("id")},
)
)
return items
```
- [ ] **Step 5: Implement FedCollector**
Create `services/news-collector/src/news_collector/collectors/fed.py`:
```python
"""Federal Reserve press release and FOMC statement collector."""
import logging
from calendar import timegm
from datetime import datetime, timezone
import aiohttp
import feedparser
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from shared.models import NewsCategory, NewsItem
from news_collector.collectors.base import BaseCollector
logger = logging.getLogger(__name__)
FED_RSS_URL = "https://www.federalreserve.gov/feeds/press_all.xml"
class FedCollector(BaseCollector):
name = "fed"
poll_interval = 3600 # 1 hour
def __init__(self) -> None:
self._vader = SentimentIntensityAnalyzer()
async def is_available(self) -> bool:
return True
async def _fetch_fed_rss(self) -> list[dict]:
"""Fetch Federal Reserve RSS feed entries."""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
FED_RSS_URL, timeout=aiohttp.ClientTimeout(total=10)
) as resp:
if resp.status != 200:
logger.warning("fed_rss_failed", status=resp.status)
return []
text = await resp.text()
feed = feedparser.parse(text)
return feed.get("entries", [])
except Exception as exc:
logger.warning("fed_rss_error", error=str(exc))
return []
def _detect_stance(self, text: str) -> str:
"""Detect hawkish/dovish/neutral stance from text."""
text_lower = text.lower()
hawkish_words = ["tighten", "raise", "inflation concern", "restrictive", "higher rates"]
dovish_words = ["accommodate", "cut", "easing", "lower rates", "support growth"]
hawk_count = sum(1 for w in hawkish_words if w in text_lower)
dove_count = sum(1 for w in dovish_words if w in text_lower)
if hawk_count > dove_count:
return "hawkish"
if dove_count > hawk_count:
return "dovish"
return "neutral"
async def collect(self) -> list[NewsItem]:
entries = await self._fetch_fed_rss()
items = []
for entry in entries:
title = entry.get("title", "").strip()
if not title:
continue
summary = entry.get("summary", "")
parsed_time = entry.get("published_parsed")
if parsed_time:
published = datetime.fromtimestamp(timegm(parsed_time), tz=timezone.utc)
else:
published = datetime.now(timezone.utc)
text = f"{title}. {summary}" if summary else title
items.append(
NewsItem(
source=self.name,
headline=title,
summary=summary or None,
url=entry.get("link"),
published_at=published,
symbols=[],
sentiment=self._vader.polarity_scores(text)["compound"],
category=NewsCategory.FED,
raw_data={"stance": self._detect_stance(text)},
)
)
return items
```
- [ ] **Step 6: Run tests to verify they pass**
Run: `pytest services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py -v`
Expected: All 7 tests PASS
- [ ] **Step 7: Commit**
```bash
git add services/news-collector/src/news_collector/collectors/truth_social.py services/news-collector/src/news_collector/collectors/fed.py services/news-collector/tests/test_truth_social.py services/news-collector/tests/test_fed.py
git commit -m "feat: implement Truth Social and Federal Reserve collectors"
```
---
### Task 13: Implement news-collector main.py (scheduler)
**Files:**
- Create: `services/news-collector/src/news_collector/main.py`
- Create: `services/news-collector/tests/test_main.py`
- [ ] **Step 1: Write tests**
Create `services/news-collector/tests/test_main.py`:
```python
"""Tests for news collector scheduler."""
import pytest
from unittest.mock import AsyncMock, patch, MagicMock
from datetime import datetime, timezone
from shared.models import NewsCategory, NewsItem
from news_collector.main import run_collector_once
async def test_run_collector_once_stores_and_publishes():
mock_item = NewsItem(
source="test",
headline="Test news",
published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
sentiment=0.5,
category=NewsCategory.MACRO,
)
mock_collector = MagicMock()
mock_collector.name = "test"
mock_collector.collect = AsyncMock(return_value=[mock_item])
mock_db = MagicMock()
mock_db.insert_news_item = AsyncMock()
mock_broker = MagicMock()
mock_broker.publish = AsyncMock()
count = await run_collector_once(mock_collector, mock_db, mock_broker)
assert count == 1
mock_db.insert_news_item.assert_called_once_with(mock_item)
mock_broker.publish.assert_called_once()
async def test_run_collector_once_handles_empty():
mock_collector = MagicMock()
mock_collector.name = "test"
mock_collector.collect = AsyncMock(return_value=[])
mock_db = MagicMock()
mock_broker = MagicMock()
count = await run_collector_once(mock_collector, mock_db, mock_broker)
assert count == 0
```
- [ ] **Step 2: Run tests to verify they fail**
Run: `pytest services/news-collector/tests/test_main.py -v`
Expected: FAIL
- [ ] **Step 3: Implement main.py**
Create `services/news-collector/src/news_collector/main.py`:
```python
"""News Collector Service — schedules and runs all news collectors."""
import asyncio
import logging
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import NewsEvent
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.models import NewsItem
from shared.notifier import TelegramNotifier
from shared.sentiment_models import MarketSentiment
from news_collector.config import NewsCollectorConfig
from news_collector.collectors.base import BaseCollector
from news_collector.collectors.finnhub import FinnhubCollector
from news_collector.collectors.rss import RSSCollector
from news_collector.collectors.sec_edgar import SecEdgarCollector
from news_collector.collectors.truth_social import TruthSocialCollector
from news_collector.collectors.reddit import RedditCollector
from news_collector.collectors.fear_greed import FearGreedCollector
from news_collector.collectors.fed import FedCollector
logger = logging.getLogger(__name__)
async def run_collector_once(
collector: BaseCollector,
db: Database,
broker: RedisBroker,
) -> int:
"""Run a single collector, store results, publish to Redis.
Returns number of items collected."""
items = await collector.collect()
if not isinstance(items, list):
# FearGreedCollector returns a FearGreedResult, not a list
return 0
for item in items:
await db.insert_news_item(item)
event = NewsEvent(data=item)
await broker.publish("news", event.to_dict())
return len(items)
async def run_collector_loop(
collector: BaseCollector,
db: Database,
broker: RedisBroker,
log,
) -> None:
"""Run a collector on its poll interval forever."""
while True:
try:
if await collector.is_available():
count = await run_collector_once(collector, db, broker)
log.info("collector_run", collector=collector.name, items=count)
else:
log.debug("collector_unavailable", collector=collector.name)
except Exception as exc:
log.error("collector_error", collector=collector.name, error=str(exc))
await asyncio.sleep(collector.poll_interval)
async def run_fear_greed_loop(
collector: FearGreedCollector,
db: Database,
log,
) -> None:
"""Run the Fear & Greed collector and update market sentiment."""
from datetime import datetime, timezone
while True:
try:
result = await collector.collect()
if result is not None:
ms = MarketSentiment(
fear_greed=result.fear_greed,
fear_greed_label=result.fear_greed_label,
fed_stance="neutral", # Updated by Fed collector analysis
market_regime=_determine_regime(result.fear_greed, None),
updated_at=datetime.now(timezone.utc),
)
await db.upsert_market_sentiment(ms)
log.info("fear_greed_updated", score=result.fear_greed, label=result.fear_greed_label)
except Exception as exc:
log.error("fear_greed_error", error=str(exc))
await asyncio.sleep(collector.poll_interval)
async def run_aggregator_loop(
db: Database,
interval: int,
log,
) -> None:
"""Run sentiment aggregation every `interval` seconds.
Reads recent news from DB, computes per-symbol scores, upserts into symbol_scores table."""
from datetime import datetime, timezone
from shared.sentiment import SentimentAggregator
aggregator = SentimentAggregator()
while True:
try:
now = datetime.now(timezone.utc)
news_items = await db.get_recent_news(hours=24)
if news_items:
scores = aggregator.aggregate(news_items, now)
for symbol_score in scores.values():
await db.upsert_symbol_score(symbol_score)
log.info("aggregation_complete", symbols=len(scores))
except Exception as exc:
log.error("aggregation_error", error=str(exc))
await asyncio.sleep(interval)
def _determine_regime(fear_greed: int, vix: float | None) -> str:
"""Determine market regime from Fear & Greed and VIX."""
if fear_greed <= 20:
return "risk_off"
if vix is not None and vix > 30:
return "risk_off"
if fear_greed >= 60 and (vix is None or vix < 20):
return "risk_on"
return "neutral"
async def run() -> None:
config = NewsCollectorConfig()
log = setup_logging("news-collector", config.log_level, config.log_format)
metrics = ServiceMetrics("news_collector")
notifier = TelegramNotifier(
bot_token=config.telegram_bot_token,
chat_id=config.telegram_chat_id,
)
db = Database(config.database_url)
await db.connect()
broker = RedisBroker(config.redis_url)
health = HealthCheckServer(
"news-collector",
port=config.health_port,
auth_token=config.metrics_auth_token,
)
health.register_check("redis", broker.ping)
await health.start()
metrics.service_up.labels(service="news-collector").set(1)
# Initialize collectors
news_collectors: list[BaseCollector] = [
RSSCollector(),
SecEdgarCollector(),
TruthSocialCollector(),
RedditCollector(),
FedCollector(),
]
# Finnhub requires API key
if config.finnhub_api_key:
news_collectors.append(FinnhubCollector(api_key=config.finnhub_api_key))
fear_greed = FearGreedCollector()
log.info(
"starting",
collectors=[c.name for c in news_collectors],
fear_greed=True,
)
tasks = []
try:
for collector in news_collectors:
task = asyncio.create_task(run_collector_loop(collector, db, broker, log))
tasks.append(task)
tasks.append(asyncio.create_task(run_fear_greed_loop(fear_greed, db, log)))
tasks.append(asyncio.create_task(
run_aggregator_loop(db, config.sentiment_aggregate_interval, log)
))
await asyncio.gather(*tasks)
except Exception as exc:
log.error("fatal_error", error=str(exc))
await notifier.send_error(str(exc), "news-collector")
raise
finally:
for task in tasks:
task.cancel()
metrics.service_up.labels(service="news-collector").set(0)
await notifier.close()
await broker.close()
await db.close()
def main() -> None:
asyncio.run(run())
if __name__ == "__main__":
main()
```
- [ ] **Step 4: Run tests to verify they pass**
Run: `pytest services/news-collector/tests/test_main.py -v`
Expected: All 2 tests PASS
- [ ] **Step 5: Commit**
```bash
git add services/news-collector/src/news_collector/main.py services/news-collector/tests/test_main.py
git commit -m "feat: implement news-collector main scheduler with all collectors"
```
---
## Phase 3: Sentiment Analysis Pipeline
### Task 14: Implement sentiment aggregator
**Files:**
- Modify: `shared/src/shared/sentiment.py`
- Create: `shared/tests/test_sentiment_aggregator.py`
- [ ] **Step 1: Write tests**
Create `shared/tests/test_sentiment_aggregator.py`:
```python
"""Tests for sentiment aggregator."""
import pytest
from datetime import datetime, timezone, timedelta
from shared.sentiment import SentimentAggregator
@pytest.fixture
def aggregator():
return SentimentAggregator()
def test_freshness_decay_recent():
a = SentimentAggregator()
now = datetime.now(timezone.utc)
assert a._freshness_decay(now, now) == 1.0
def test_freshness_decay_3_hours():
a = SentimentAggregator()
now = datetime.now(timezone.utc)
three_hours_ago = now - timedelta(hours=3)
assert a._freshness_decay(three_hours_ago, now) == 0.7
def test_freshness_decay_12_hours():
a = SentimentAggregator()
now = datetime.now(timezone.utc)
twelve_hours_ago = now - timedelta(hours=12)
assert a._freshness_decay(twelve_hours_ago, now) == 0.3
def test_freshness_decay_old():
a = SentimentAggregator()
now = datetime.now(timezone.utc)
two_days_ago = now - timedelta(days=2)
assert a._freshness_decay(two_days_ago, now) == 0.0
def test_compute_composite():
a = SentimentAggregator()
composite = a._compute_composite(
news_score=0.5,
social_score=0.3,
policy_score=0.8,
filing_score=0.2,
)
expected = 0.5 * 0.3 + 0.3 * 0.2 + 0.8 * 0.3 + 0.2 * 0.2
assert abs(composite - expected) < 0.001
def test_aggregate_news_by_symbol(aggregator):
now = datetime.now(timezone.utc)
news_items = [
{
"symbols": ["AAPL"],
"sentiment": 0.8,
"category": "earnings",
"published_at": now,
},
{
"symbols": ["AAPL"],
"sentiment": 0.3,
"category": "macro",
"published_at": now - timedelta(hours=2),
},
{
"symbols": ["MSFT"],
"sentiment": -0.5,
"category": "policy",
"published_at": now,
},
]
scores = aggregator.aggregate(news_items, now)
assert "AAPL" in scores
assert "MSFT" in scores
assert scores["AAPL"].news_count == 2
assert scores["AAPL"].news_score > 0 # Positive overall
assert scores["MSFT"].policy_score < 0 # Negative policy
def test_aggregate_empty(aggregator):
now = datetime.now(timezone.utc)
scores = aggregator.aggregate([], now)
assert scores == {}
def test_determine_regime():
a = SentimentAggregator()
assert a.determine_regime(15, None) == "risk_off"
assert a.determine_regime(15, 35.0) == "risk_off"
assert a.determine_regime(50, 35.0) == "risk_off"
assert a.determine_regime(70, 15.0) == "risk_on"
assert a.determine_regime(50, 20.0) == "neutral"
```
- [ ] **Step 2: Run tests to verify they fail**
Run: `pytest shared/tests/test_sentiment_aggregator.py -v`
Expected: FAIL — `SentimentAggregator` not in sentiment.py
- [ ] **Step 3: Add SentimentAggregator to sentiment.py**
Keep the existing `SentimentData` class (for backward compat with existing tests). Add `SentimentAggregator` class at the end of `shared/src/shared/sentiment.py`:
```python
from datetime import timedelta
from shared.sentiment_models import SymbolScore
class SentimentAggregator:
"""Aggregates per-news sentiment into per-symbol scores."""
# Weights: policy events are most impactful for US stocks
WEIGHTS = {
"news": 0.3,
"social": 0.2,
"policy": 0.3,
"filing": 0.2,
}
# Category → score field mapping
CATEGORY_MAP = {
"earnings": "news",
"macro": "news",
"social": "social",
"policy": "policy",
"filing": "filing",
"fed": "policy",
}
def _freshness_decay(self, published_at: datetime, now: datetime) -> float:
"""Compute freshness decay factor."""
age = now - published_at
hours = age.total_seconds() / 3600
if hours < 1:
return 1.0
if hours < 6:
return 0.7
if hours < 24:
return 0.3
return 0.0
def _compute_composite(
self,
news_score: float,
social_score: float,
policy_score: float,
filing_score: float,
) -> float:
return (
news_score * self.WEIGHTS["news"]
+ social_score * self.WEIGHTS["social"]
+ policy_score * self.WEIGHTS["policy"]
+ filing_score * self.WEIGHTS["filing"]
)
def aggregate(
self, news_items: list[dict], now: datetime
) -> dict[str, SymbolScore]:
"""Aggregate news items into per-symbol scores.
Each news_items dict must have: symbols, sentiment, category, published_at.
Returns dict mapping symbol → SymbolScore.
"""
# Accumulate per-symbol, per-category
symbol_data: dict[str, dict] = {}
for item in news_items:
decay = self._freshness_decay(item["published_at"], now)
if decay == 0.0:
continue
category = item.get("category", "macro")
score_field = self.CATEGORY_MAP.get(category, "news")
weighted_sentiment = item["sentiment"] * decay
for symbol in item.get("symbols", []):
if symbol not in symbol_data:
symbol_data[symbol] = {
"news_scores": [],
"social_scores": [],
"policy_scores": [],
"filing_scores": [],
"count": 0,
}
symbol_data[symbol][f"{score_field}_scores"].append(weighted_sentiment)
symbol_data[symbol]["count"] += 1
# Compute averages and composites
result = {}
for symbol, data in symbol_data.items():
news_score = _safe_avg(data["news_scores"])
social_score = _safe_avg(data["social_scores"])
policy_score = _safe_avg(data["policy_scores"])
filing_score = _safe_avg(data["filing_scores"])
result[symbol] = SymbolScore(
symbol=symbol,
news_score=news_score,
news_count=data["count"],
social_score=social_score,
policy_score=policy_score,
filing_score=filing_score,
composite=self._compute_composite(
news_score, social_score, policy_score, filing_score
),
updated_at=now,
)
return result
def determine_regime(self, fear_greed: int, vix: float | None) -> str:
"""Determine market regime."""
if fear_greed <= 20:
return "risk_off"
if vix is not None and vix > 30:
return "risk_off"
if fear_greed >= 60 and (vix is None or vix < 20):
return "risk_on"
return "neutral"
def _safe_avg(values: list[float]) -> float:
"""Return average of values, or 0.0 if empty."""
if not values:
return 0.0
return sum(values) / len(values)
```
- [ ] **Step 4: Run new tests to verify they pass**
Run: `pytest shared/tests/test_sentiment_aggregator.py -v`
Expected: All 9 tests PASS
- [ ] **Step 5: Run existing sentiment tests for regressions**
Run: `pytest shared/tests/test_sentiment.py -v`
Expected: All existing tests PASS (SentimentData unchanged)
- [ ] **Step 6: Commit**
```bash
git add shared/src/shared/sentiment.py shared/tests/test_sentiment_aggregator.py
git commit -m "feat: implement SentimentAggregator with freshness decay and composite scoring"
```
---
## Phase 4: Stock Selector Engine
### Task 15: Implement stock selector
**Files:**
- Create: `services/strategy-engine/src/strategy_engine/stock_selector.py`
- Create: `services/strategy-engine/tests/test_stock_selector.py`
- [ ] **Step 1: Write tests**
Create `services/strategy-engine/tests/test_stock_selector.py`:
```python
"""Tests for stock selector engine."""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from datetime import datetime, timezone
from decimal import Decimal
from shared.models import OrderSide
from shared.sentiment_models import SymbolScore, MarketSentiment, SelectedStock, Candidate
from strategy_engine.stock_selector import (
SentimentCandidateSource,
StockSelector,
_parse_llm_selections,
)
async def test_sentiment_candidate_source():
mock_db = MagicMock()
mock_db.get_top_symbol_scores = AsyncMock(return_value=[
{"symbol": "AAPL", "composite": 0.8, "news_count": 5},
{"symbol": "NVDA", "composite": 0.6, "news_count": 3},
])
source = SentimentCandidateSource(mock_db)
candidates = await source.get_candidates()
assert len(candidates) == 2
assert candidates[0].symbol == "AAPL"
assert candidates[0].source == "sentiment"
def test_parse_llm_selections_valid():
llm_response = """
[
{"symbol": "NVDA", "side": "BUY", "conviction": 0.85, "reason": "AI demand", "key_news": ["NVDA beats earnings"]},
{"symbol": "XOM", "side": "BUY", "conviction": 0.72, "reason": "Oil surge", "key_news": ["Oil prices up"]}
]
"""
selections = _parse_llm_selections(llm_response)
assert len(selections) == 2
assert selections[0].symbol == "NVDA"
assert selections[0].conviction == 0.85
def test_parse_llm_selections_invalid():
selections = _parse_llm_selections("not json")
assert selections == []
def test_parse_llm_selections_with_markdown():
llm_response = """
Here are my picks:
```json
[
{"symbol": "TSLA", "side": "BUY", "conviction": 0.7, "reason": "Momentum", "key_news": ["Tesla rally"]}
]
```
"""
selections = _parse_llm_selections(llm_response)
assert len(selections) == 1
assert selections[0].symbol == "TSLA"
async def test_selector_blocks_on_risk_off():
mock_db = MagicMock()
mock_db.get_latest_market_sentiment = AsyncMock(return_value={
"fear_greed": 15,
"fear_greed_label": "Extreme Fear",
"vix": 35.0,
"fed_stance": "neutral",
"market_regime": "risk_off",
"updated_at": datetime.now(timezone.utc),
})
selector = StockSelector(db=mock_db, broker=MagicMock(), alpaca=MagicMock(), anthropic_api_key="test")
result = await selector.select()
assert result == []
```
- [ ] **Step 2: Run tests to verify they fail**
Run: `pytest services/strategy-engine/tests/test_stock_selector.py -v`
Expected: FAIL — module not found
- [ ] **Step 3: Implement StockSelector**
Create `services/strategy-engine/src/strategy_engine/stock_selector.py`:
```python
"""Stock Selector Engine — 3-stage dynamic stock selection for MOC trading."""
import json
import logging
import re
from datetime import datetime, timezone
from decimal import Decimal
from typing import Optional
import aiohttp
from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
from shared.db import Database
from shared.models import OrderSide
from shared.sentiment_models import Candidate, MarketSentiment, SelectedStock, SymbolScore
logger = logging.getLogger(__name__)
class SentimentCandidateSource:
"""Get candidate stocks from sentiment scores in DB."""
def __init__(self, db: Database, limit: int = 20) -> None:
self._db = db
self._limit = limit
async def get_candidates(self) -> list[Candidate]:
scores = await self._db.get_top_symbol_scores(limit=self._limit)
return [
Candidate(
symbol=s["symbol"],
source="sentiment",
score=s["composite"],
reason=f"Sentiment composite={s['composite']:.2f}, news_count={s['news_count']}",
)
for s in scores
if s["composite"] != 0
]
class LLMCandidateSource:
"""Get candidate stocks by asking Claude to analyze today's top news."""
def __init__(self, db: Database, api_key: str, model: str) -> None:
self._db = db
self._api_key = api_key
self._model = model
async def get_candidates(self) -> list[Candidate]:
news = await self._db.get_recent_news(hours=24)
if not news:
return []
headlines = [f"- [{n['source']}] {n['headline']} (sentiment: {n['sentiment']:.2f})" for n in news[:50]]
prompt = (
"You are a stock market analyst. Based on today's news headlines below, "
"identify US stocks that are most likely to be affected (positively or negatively). "
"Return a JSON array of objects with: symbol, direction (BUY or SELL), score (0-1), reason.\n\n"
"Headlines:\n" + "\n".join(headlines) + "\n\n"
"Return ONLY the JSON array, no other text."
)
try:
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.anthropic.com/v1/messages",
headers={
"x-api-key": self._api_key,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
},
json={
"model": self._model,
"max_tokens": 1024,
"messages": [{"role": "user", "content": prompt}],
},
timeout=aiohttp.ClientTimeout(total=30),
) as resp:
if resp.status != 200:
logger.warning("llm_candidate_failed", status=resp.status)
return []
data = await resp.json()
text = data["content"][0]["text"]
except Exception as exc:
logger.error("llm_candidate_error", error=str(exc))
return []
return self._parse_response(text)
def _parse_response(self, text: str) -> list[Candidate]:
try:
# Extract JSON from possible markdown code blocks
json_match = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL)
if json_match:
text = json_match.group(1)
items = json.loads(text)
except (json.JSONDecodeError, TypeError):
return []
candidates = []
for item in items:
try:
direction = OrderSide(item.get("direction", "BUY"))
candidates.append(
Candidate(
symbol=item["symbol"],
source="llm",
direction=direction,
score=float(item.get("score", 0.5)),
reason=item.get("reason", "LLM recommendation"),
)
)
except (KeyError, ValueError):
continue
return candidates
class StockSelector:
"""3-stage stock selector: candidates → technical filter → LLM final pick."""
def __init__(
self,
db: Database,
broker: RedisBroker,
alpaca: AlpacaClient,
anthropic_api_key: str,
anthropic_model: str = "claude-sonnet-4-20250514",
max_picks: int = 3,
) -> None:
self._db = db
self._broker = broker
self._alpaca = alpaca
self._api_key = anthropic_api_key
self._model = anthropic_model
self._max_picks = max_picks
self._sentiment_source = SentimentCandidateSource(db)
self._llm_source = LLMCandidateSource(db, anthropic_api_key, anthropic_model)
async def select(self) -> list[SelectedStock]:
"""Run full 3-stage selection. Returns list of SelectedStock."""
# Check market sentiment gate
ms = await self._db.get_latest_market_sentiment()
if ms and ms.get("market_regime") == "risk_off":
logger.info("selection_blocked_risk_off")
return []
# Stage 1: Candidate pool
sentiment_candidates = await self._sentiment_source.get_candidates()
llm_candidates = await self._llm_source.get_candidates()
candidates = self._merge_candidates(sentiment_candidates, llm_candidates)
if not candidates:
logger.info("no_candidates_found")
return []
logger.info("candidates_found", count=len(candidates))
# Stage 2: Technical filter
filtered = await self._technical_filter(candidates)
if not filtered:
logger.info("all_candidates_filtered_out")
return []
logger.info("technical_filter_passed", count=len(filtered))
# Stage 3: LLM final selection
selections = await self._llm_final_select(filtered, ms)
# Publish to Redis
for selection in selections:
await self._broker.publish(
"selected_stocks",
selection.model_dump(mode="json"),
)
# Persist audit trail
from datetime import date as date_type
for selection in selections:
score_data = await self._db.get_top_symbol_scores(limit=100)
snapshot = next(
(s for s in score_data if s["symbol"] == selection.symbol),
{},
)
await self._db.insert_stock_selection(
trade_date=date_type.today(),
symbol=selection.symbol,
side=selection.side.value,
conviction=selection.conviction,
reason=selection.reason,
key_news=selection.key_news,
sentiment_snapshot=snapshot,
)
return selections
def _merge_candidates(
self,
sentiment: list[Candidate],
llm: list[Candidate],
) -> list[Candidate]:
"""Merge and deduplicate candidates, preferring higher scores."""
by_symbol: dict[str, Candidate] = {}
for c in sentiment + llm:
if c.symbol not in by_symbol or c.score > by_symbol[c.symbol].score:
by_symbol[c.symbol] = c
return sorted(by_symbol.values(), key=lambda c: c.score, reverse=True)
async def _technical_filter(self, candidates: list[Candidate]) -> list[Candidate]:
"""Apply MOC-style technical screening to candidates."""
import pandas as pd
passed = []
for candidate in candidates:
try:
bars = await self._alpaca.get_bars(
candidate.symbol, timeframe="1Day", limit=30
)
if not bars or len(bars) < 21:
continue
closes = pd.Series([float(b["c"]) for b in bars])
volumes = pd.Series([float(b["v"]) for b in bars])
# RSI
delta = closes.diff()
gain = delta.clip(lower=0)
loss = -delta.clip(upper=0)
avg_gain = gain.ewm(com=13, min_periods=14).mean()
avg_loss = loss.ewm(com=13, min_periods=14).mean()
rs = avg_gain / avg_loss.replace(0, float("nan"))
rsi = 100 - (100 / (1 + rs))
current_rsi = rsi.iloc[-1]
if pd.isna(current_rsi) or not (30 <= current_rsi <= 70):
continue
# EMA
ema20 = closes.ewm(span=20, adjust=False).mean().iloc[-1]
if closes.iloc[-1] < ema20:
continue
# Volume above average
vol_avg = volumes.iloc[-20:].mean()
if vol_avg > 0 and volumes.iloc[-1] < vol_avg * 0.5:
continue
passed.append(candidate)
except Exception as exc:
logger.warning("technical_filter_error", symbol=candidate.symbol, error=str(exc))
continue
return passed
async def _llm_final_select(
self,
candidates: list[Candidate],
market_sentiment: Optional[dict],
) -> list[SelectedStock]:
"""Ask Claude to make final 2-3 picks from filtered candidates."""
# Build context
candidate_info = []
for c in candidates[:15]:
candidate_info.append(f"- {c.symbol}: score={c.score:.2f}, source={c.source}, reason={c.reason}")
news = await self._db.get_recent_news(hours=12)
top_news = [f"- [{n['source']}] {n['headline']}" for n in news[:20]]
ms_info = "No market sentiment data available."
if market_sentiment:
ms_info = (
f"Fear & Greed: {market_sentiment.get('fear_greed', 'N/A')} "
f"({market_sentiment.get('fear_greed_label', 'N/A')}), "
f"VIX: {market_sentiment.get('vix', 'N/A')}, "
f"Fed Stance: {market_sentiment.get('fed_stance', 'N/A')}"
)
prompt = (
f"You are a professional stock trader selecting {self._max_picks} stocks for "
f"Market-on-Close (MOC) overnight trading. You buy at market close and sell at "
f"next day's open.\n\n"
f"## Market Conditions\n{ms_info}\n\n"
f"## Candidate Stocks (pre-screened technically)\n"
+ "\n".join(candidate_info) + "\n\n"
f"## Today's Key News\n"
+ "\n".join(top_news) + "\n\n"
f"Select the best {self._max_picks} stocks. For each, provide:\n"
f"- symbol: ticker\n"
f"- side: BUY or SELL\n"
f"- conviction: 0.0-1.0\n"
f"- reason: one sentence\n"
f"- key_news: list of relevant headlines\n\n"
f"Return ONLY a JSON array. No other text."
)
try:
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.anthropic.com/v1/messages",
headers={
"x-api-key": self._api_key,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
},
json={
"model": self._model,
"max_tokens": 1024,
"messages": [{"role": "user", "content": prompt}],
},
timeout=aiohttp.ClientTimeout(total=30),
) as resp:
if resp.status != 200:
logger.error("llm_final_select_failed", status=resp.status)
return []
data = await resp.json()
text = data["content"][0]["text"]
except Exception as exc:
logger.error("llm_final_select_error", error=str(exc))
return []
return _parse_llm_selections(text)
def _parse_llm_selections(text: str) -> list[SelectedStock]:
"""Parse LLM response into SelectedStock list."""
try:
json_match = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL)
if json_match:
text = json_match.group(1)
# Also try to find a bare JSON array
array_match = re.search(r"\[.*\]", text, re.DOTALL)
if array_match:
text = array_match.group(0)
items = json.loads(text)
except (json.JSONDecodeError, TypeError):
return []
selections = []
for item in items:
try:
selections.append(
SelectedStock(
symbol=item["symbol"],
side=OrderSide(item.get("side", "BUY")),
conviction=float(item.get("conviction", 0.5)),
reason=item.get("reason", ""),
key_news=item.get("key_news", []),
)
)
except (KeyError, ValueError):
continue
return selections
```
- [ ] **Step 4: Run tests to verify they pass**
Run: `pytest services/strategy-engine/tests/test_stock_selector.py -v`
Expected: All 5 tests PASS
- [ ] **Step 5: Run all strategy engine tests for regressions**
Run: `pytest services/strategy-engine/tests/ -v`
Expected: All tests PASS
- [ ] **Step 6: Commit**
```bash
git add services/strategy-engine/src/strategy_engine/stock_selector.py services/strategy-engine/tests/test_stock_selector.py
git commit -m "feat: implement 3-stage stock selector (sentiment → technical → LLM)"
```
---
## Phase 5: Integration (MOC + Notifications + Docker)
### Task 16: Add Telegram notification for stock selections
**Files:**
- Modify: `shared/src/shared/notifier.py`
- Modify: `shared/tests/test_notifier.py`
- [ ] **Step 1: Add send_stock_selection method to notifier.py**
Add this method and import to `shared/src/shared/notifier.py`:
Add to imports:
```python
from shared.sentiment_models import SelectedStock, MarketSentiment
```
Add method to `TelegramNotifier` class:
```python
async def send_stock_selection(
self,
selections: list[SelectedStock],
market: MarketSentiment | None = None,
) -> None:
"""Format and send stock selection notification."""
lines = [f"📊 Stock Selection ({len(selections)} picks)", ""]
side_emoji = {"BUY": "🟢", "SELL": "🔴"}
for i, s in enumerate(selections, 1):
emoji = side_emoji.get(s.side.value, "⚪")
lines.append(
f"{i}. {s.symbol} {emoji} {s.side.value} "
f"(conviction: {s.conviction:.0%})"
)
lines.append(f" {s.reason}")
if s.key_news:
lines.append(f" News: {s.key_news[0]}")
lines.append("")
if market:
lines.append(
f"Market: F&G {market.fear_greed} ({market.fear_greed_label})"
+ (f" | VIX {market.vix:.1f}" if market.vix else "")
)
await self.send("\n".join(lines))
```
- [ ] **Step 2: Add test for the new method**
Add to `shared/tests/test_notifier.py`:
```python
from shared.models import OrderSide
from shared.sentiment_models import SelectedStock, MarketSentiment
from datetime import datetime, timezone
async def test_send_stock_selection(notifier, mock_session):
"""Test stock selection notification formatting."""
selections = [
SelectedStock(
symbol="NVDA",
side=OrderSide.BUY,
conviction=0.85,
reason="CHIPS Act expansion",
key_news=["Trump signs CHIPS Act"],
),
]
market = MarketSentiment(
fear_greed=55,
fear_greed_label="Neutral",
vix=18.2,
fed_stance="neutral",
market_regime="neutral",
updated_at=datetime.now(timezone.utc),
)
await notifier.send_stock_selection(selections, market)
mock_session.post.assert_called_once()
```
Note: Check `shared/tests/test_notifier.py` for existing fixture names (`notifier`, `mock_session`) and adapt accordingly.
- [ ] **Step 3: Run notifier tests**
Run: `pytest shared/tests/test_notifier.py -v`
Expected: All tests PASS
- [ ] **Step 4: Commit**
```bash
git add shared/src/shared/notifier.py shared/tests/test_notifier.py
git commit -m "feat: add Telegram notification for stock selections"
```
---
### Task 17: Integrate stock selector with MOC strategy
**Files:**
- Modify: `services/strategy-engine/src/strategy_engine/main.py`
- Modify: `services/strategy-engine/src/strategy_engine/config.py`
- [ ] **Step 1: Update strategy engine config**
Add to `StrategyConfig` in `services/strategy-engine/src/strategy_engine/config.py`:
```python
selector_candidates_time: str = "15:00"
selector_filter_time: str = "15:15"
selector_final_time: str = "15:30"
selector_max_picks: int = 3
anthropic_api_key: str = ""
anthropic_model: str = "claude-sonnet-4-20250514"
```
- [ ] **Step 2: Add stock selector scheduling to main.py**
Add a new coroutine to `services/strategy-engine/src/strategy_engine/main.py` that runs the stock selector at the configured times. Add imports:
```python
from shared.alpaca import AlpacaClient
from shared.db import Database
from shared.notifier import TelegramNotifier
from shared.sentiment_models import MarketSentiment
from strategy_engine.stock_selector import StockSelector
```
Add the selector loop function:
```python
async def run_stock_selector(
selector: StockSelector,
notifier: TelegramNotifier,
db: Database,
config: StrategyConfig,
log,
) -> None:
"""Run the stock selector once per day at the configured time."""
import zoneinfo
et = zoneinfo.ZoneInfo("America/New_York")
while True:
now_et = datetime.now(et)
target_hour, target_min = map(int, config.selector_final_time.split(":"))
# Check if it's time to run (within 1-minute window)
if now_et.hour == target_hour and now_et.minute == target_min:
log.info("stock_selector_running")
try:
selections = await selector.select()
if selections:
ms_data = await db.get_latest_market_sentiment()
ms = None
if ms_data:
ms = MarketSentiment(**ms_data)
await notifier.send_stock_selection(selections, ms)
log.info(
"stock_selector_complete",
picks=[s.symbol for s in selections],
)
else:
log.info("stock_selector_no_picks")
except Exception as exc:
log.error("stock_selector_error", error=str(exc))
# Sleep past this minute to avoid re-triggering
await asyncio.sleep(120)
else:
await asyncio.sleep(30)
```
In the `run()` function, add after creating the broker:
```python
db = Database(config.database_url)
await db.connect()
alpaca = AlpacaClient(
api_key=config.alpaca_api_key,
api_secret=config.alpaca_api_secret,
paper=config.alpaca_paper,
)
```
And add the selector if anthropic key is configured:
```python
if config.anthropic_api_key:
selector = StockSelector(
db=db,
broker=broker,
alpaca=alpaca,
anthropic_api_key=config.anthropic_api_key,
anthropic_model=config.anthropic_model,
max_picks=config.selector_max_picks,
)
tasks.append(asyncio.create_task(
run_stock_selector(selector, notifier, db, config, log)
))
log.info("stock_selector_enabled", time=config.selector_final_time)
```
Add to the `finally` block:
```python
await alpaca.close()
await db.close()
```
- [ ] **Step 3: Run strategy engine tests for regressions**
Run: `pytest services/strategy-engine/tests/ -v`
Expected: All tests PASS
- [ ] **Step 4: Commit**
```bash
git add services/strategy-engine/src/strategy_engine/main.py services/strategy-engine/src/strategy_engine/config.py
git commit -m "feat: integrate stock selector into strategy engine scheduler"
```
---
### Task 18: Update Docker Compose and .env
**Files:**
- Modify: `docker-compose.yml`
- Modify: `.env.example` (already done in Task 5, just verify)
- [ ] **Step 1: Add news-collector service to docker-compose.yml**
Add before the `loki:` service block in `docker-compose.yml`:
```yaml
news-collector:
build:
context: .
dockerfile: services/news-collector/Dockerfile
env_file: .env
ports:
- "8084:8084"
depends_on:
redis:
condition: service_healthy
postgres:
condition: service_healthy
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8084/health')"]
interval: 10s
timeout: 5s
retries: 3
restart: unless-stopped
```
- [ ] **Step 2: Verify compose file is valid**
Run: `docker compose config --quiet 2>&1 || echo "INVALID"`
Expected: No output (valid) or compose config displayed without errors
- [ ] **Step 3: Commit**
```bash
git add docker-compose.yml
git commit -m "feat: add news-collector service to Docker Compose"
```
---
### Task 19: Run full test suite and lint
- [ ] **Step 1: Install test dependencies**
Run: `pip install -e shared/ && pip install aiosqlite feedparser nltk aioresponses`
- [ ] **Step 2: Download VADER lexicon**
Run: `python -c "import nltk; nltk.download('vader_lexicon', quiet=True)"`
- [ ] **Step 3: Run lint**
Run: `make lint`
Expected: No lint errors. If there are errors, fix them.
- [ ] **Step 4: Run full test suite**
Run: `make test`
Expected: All tests PASS
- [ ] **Step 5: Fix any issues found in steps 3-4**
If lint or tests fail, fix the issues and re-run.
- [ ] **Step 6: Final commit if any fixes were needed**
```bash
git add -A
git commit -m "fix: resolve lint and test issues from news selector integration"
```