summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.env.example4
-rw-r--r--CLAUDE.md106
-rw-r--r--services/strategy-engine/src/strategy_engine/config.py2
-rw-r--r--shared/src/shared/config.py4
-rw-r--r--shared/src/shared/resilience.py106
-rw-r--r--shared/src/shared/sa_models.py13
-rw-r--r--shared/src/shared/sentiment.py36
-rw-r--r--shared/tests/test_resilience.py139
-rw-r--r--shared/tests/test_sa_models.py47
-rw-r--r--shared/tests/test_sentiment.py44
10 files changed, 109 insertions, 392 deletions
diff --git a/.env.example b/.env.example
index dcaf9a8..bdc6a67 100644
--- a/.env.example
+++ b/.env.example
@@ -19,8 +19,6 @@ TELEGRAM_CHAT_ID=
TELEGRAM_ENABLED=false
LOG_FORMAT=json
HEALTH_PORT=8080
-CIRCUIT_BREAKER_THRESHOLD=5
-CIRCUIT_BREAKER_TIMEOUT=60
METRICS_AUTH_TOKEN=
# News Collector
@@ -29,8 +27,6 @@ NEWS_POLL_INTERVAL=300
SENTIMENT_AGGREGATE_INTERVAL=900
# Stock Selector
-SELECTOR_CANDIDATES_TIME=15:00
-SELECTOR_FILTER_TIME=15:15
SELECTOR_FINAL_TIME=15:30
SELECTOR_MAX_PICKS=3
diff --git a/CLAUDE.md b/CLAUDE.md
new file mode 100644
index 0000000..6e33f57
--- /dev/null
+++ b/CLAUDE.md
@@ -0,0 +1,106 @@
+# CLAUDE.md
+
+This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
+
+## Project Overview
+
+US stock trading platform built as a Python microservices architecture. Uses Alpaca Markets API for market data and order execution. Services communicate via Redis Streams and persist to PostgreSQL.
+
+## Common Commands
+
+```bash
+make infra # Start Redis + Postgres (required before running services/tests)
+make up # Start all services via Docker Compose
+make down # Stop all services
+make test # Run all tests (pytest -v)
+make lint # Lint check (ruff check + format check)
+make format # Auto-fix lint + format
+make migrate # Run DB migrations (alembic upgrade head, from shared/)
+make migrate-new msg="description" # Create new migration
+make ci # Full CI: install deps, lint, test, Docker build
+make e2e # End-to-end tests
+```
+
+Run a single test file: `pytest services/strategy-engine/tests/test_rsi_strategy.py -v`
+
+## Architecture
+
+### Services (each in `services/<name>/`, each has its own Dockerfile)
+
+- **data-collector** (port 8080): Fetches stock bars from Alpaca, publishes `CandleEvent` to Redis stream `candles`
+- **news-collector** (port 8084): Continuously collects news from 7 sources (Finnhub, RSS, SEC EDGAR, Truth Social, Reddit, Fear & Greed, Fed), runs sentiment aggregation every 15 min
+- **strategy-engine** (port 8081): Consumes candle events, runs strategies, publishes `SignalEvent` to stream `signals`. Also runs the stock selector at 15:30 ET daily
+- **order-executor** (port 8082): Consumes signals, runs risk checks, places orders via Alpaca, publishes `OrderEvent` to stream `orders`
+- **portfolio-manager** (port 8083): Tracks positions, PnL, portfolio snapshots
+- **api** (port 8000): FastAPI REST endpoint layer
+- **backtester**: Offline backtesting engine with walk-forward analysis
+
+### Event Flow
+
+```
+Alpaca → data-collector → [candles stream] → strategy-engine → [signals stream] → order-executor → [orders stream] → portfolio-manager
+
+News sources → news-collector → [news stream] → sentiment aggregator → symbol_scores DB
+ ↓
+ stock selector (15:30 ET) → [selected_stocks stream] → MOC strategy → signals
+```
+
+All inter-service events use `shared/src/shared/events.py` (CandleEvent, SignalEvent, OrderEvent, NewsEvent) serialized as JSON over Redis Streams via `shared/src/shared/broker.py` (RedisBroker).
+
+### Shared Library (`shared/`)
+
+Installed as editable package (`pip install -e shared/`). Contains:
+- `models.py` — Pydantic domain models: Candle, Signal, Order, Position, NewsItem, NewsCategory
+- `sentiment_models.py` — SymbolScore, MarketSentiment, SelectedStock, Candidate
+- `sa_models.py` — SQLAlchemy ORM models (CandleRow, SignalRow, OrderRow, PortfolioSnapshotRow, NewsItemRow, SymbolScoreRow, MarketSentimentRow, StockSelectionRow)
+- `broker.py` — RedisBroker (async Redis Streams pub/sub with consumer groups)
+- `db.py` — Database class (async SQLAlchemy 2.0), includes news/sentiment/selection CRUD methods
+- `alpaca.py` — AlpacaClient (async aiohttp client for Alpaca Trading + Market Data APIs)
+- `events.py` — Event types and serialization (CandleEvent, SignalEvent, OrderEvent, NewsEvent)
+- `sentiment.py` — SentimentData (legacy gating) + SentimentAggregator (freshness-weighted composite scoring)
+- `config.py`, `logging.py`, `metrics.py`, `notifier.py` (Telegram), `resilience.py`, `healthcheck.py`
+
+DB migrations live in `shared/alembic/`.
+
+### Strategy System (`services/strategy-engine/strategies/`)
+
+Strategies extend `BaseStrategy` (in `strategies/base.py`) and implement `on_candle()`, `configure()`, `warmup_period`. The plugin loader (`strategy_engine/plugin_loader.py`) auto-discovers `*.py` files in the strategies directory and loads YAML config from `strategies/config/<strategy_name>.yaml`.
+
+BaseStrategy provides optional filters (ADX regime, volume, ATR-based stops) via `_init_filters()` and `_apply_filters()`.
+
+### News-Driven Stock Selector (`services/strategy-engine/src/strategy_engine/stock_selector.py`)
+
+Dynamic stock selection for MOC (Market on Close) trading. Runs daily at 15:30 ET via `strategy-engine`:
+
+1. **Candidate Pool**: Top 20 by sentiment score + LLM-recommended stocks from today's news
+2. **Technical Filter**: RSI 30-70, price > 20 EMA, volume > 50% average
+3. **LLM Final Selection**: Claude picks 2-3 stocks with rationale
+
+Market gating: blocks all trades when Fear & Greed ≤ 20 or VIX > 30 (`risk_off` regime).
+
+### News Collector (`services/news-collector/`)
+
+7 collectors extending `BaseCollector` in `collectors/`:
+- `finnhub.py` (5min), `rss.py` (10min), `reddit.py` (15min), `truth_social.py` (15min), `sec_edgar.py` (30min), `fear_greed.py` (1hr), `fed.py` (1hr)
+- All use VADER (nltk) for sentiment scoring
+- Provider abstraction via `BaseCollector` for future paid API swap (config change only)
+
+Sentiment aggregation (every 15min) computes per-symbol composite scores with freshness decay and category weights (policy 0.3, news 0.3, social 0.2, filing 0.2).
+
+### CLI (`cli/`)
+
+Click-based CLI installed as `trading` command. Depends on the shared library.
+
+## Tech Stack
+
+- Python 3.12+, async throughout (asyncio, aiohttp)
+- Pydantic for models, SQLAlchemy 2.0 async ORM, Alembic for migrations
+- Redis Streams for inter-service messaging
+- PostgreSQL 16 for persistence
+- Ruff for linting/formatting (line-length=100)
+- pytest + pytest-asyncio (asyncio_mode="auto")
+- Docker Compose for deployment; monitoring stack (Grafana/Prometheus/Loki) available via `--profile monitoring`
+
+## Environment
+
+Copy `.env.example` to `.env`. Key vars: `ALPACA_API_KEY`, `ALPACA_API_SECRET`, `ALPACA_PAPER=true`, `DRY_RUN=true`, `DATABASE_URL`, `REDIS_URL`, `FINNHUB_API_KEY`, `ANTHROPIC_API_KEY`. DRY_RUN=true simulates order fills without hitting Alpaca. Stock selector requires `ANTHROPIC_API_KEY` to be set.
diff --git a/services/strategy-engine/src/strategy_engine/config.py b/services/strategy-engine/src/strategy_engine/config.py
index 2a9cb43..15f8588 100644
--- a/services/strategy-engine/src/strategy_engine/config.py
+++ b/services/strategy-engine/src/strategy_engine/config.py
@@ -7,8 +7,6 @@ class StrategyConfig(Settings):
symbols: list[str] = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA"]
timeframes: list[str] = ["1m"]
strategy_params: dict = {}
- selector_candidates_time: str = "15:00"
- selector_filter_time: str = "15:15"
selector_final_time: str = "15:30"
selector_max_picks: int = 3
anthropic_api_key: str = ""
diff --git a/shared/src/shared/config.py b/shared/src/shared/config.py
index 7a947b3..b6ccebd 100644
--- a/shared/src/shared/config.py
+++ b/shared/src/shared/config.py
@@ -32,16 +32,12 @@ class Settings(BaseSettings):
telegram_enabled: bool = False
log_format: str = "json"
health_port: int = 8080
- circuit_breaker_threshold: int = 5
- circuit_breaker_timeout: int = 60
metrics_auth_token: str = "" # If set, /health and /metrics require Bearer token
# News collector
finnhub_api_key: str = ""
news_poll_interval: int = 300
sentiment_aggregate_interval: int = 900
# Stock selector
- selector_candidates_time: str = "15:00"
- selector_filter_time: str = "15:15"
selector_final_time: str = "15:30"
selector_max_picks: int = 3
# LLM
diff --git a/shared/src/shared/resilience.py b/shared/src/shared/resilience.py
index e43fd21..8d8678a 100644
--- a/shared/src/shared/resilience.py
+++ b/shared/src/shared/resilience.py
@@ -1,105 +1 @@
-"""Retry with exponential backoff and circuit breaker utilities."""
-
-from __future__ import annotations
-
-import asyncio
-import enum
-import functools
-import logging
-import random
-import time
-from typing import Any, Callable
-
-logger = logging.getLogger(__name__)
-
-
-# ---------------------------------------------------------------------------
-# retry_with_backoff
-# ---------------------------------------------------------------------------
-
-
-def retry_with_backoff(
- max_retries: int = 3,
- base_delay: float = 1.0,
- max_delay: float = 60.0,
-) -> Callable:
- """Decorator that retries an async function with exponential backoff + jitter."""
-
- def decorator(func: Callable) -> Callable:
- @functools.wraps(func)
- async def wrapper(*args: Any, **kwargs: Any) -> Any:
- last_exc: BaseException | None = None
- for attempt in range(max_retries + 1):
- try:
- return await func(*args, **kwargs)
- except Exception as exc:
- last_exc = exc
- if attempt < max_retries:
- delay = min(base_delay * (2**attempt), max_delay)
- jitter = delay * random.uniform(0, 0.5)
- total_delay = delay + jitter
- logger.warning(
- "Retry %d/%d for %s after error: %s (delay=%.3fs)",
- attempt + 1,
- max_retries,
- func.__name__,
- exc,
- total_delay,
- )
- await asyncio.sleep(total_delay)
- raise last_exc # type: ignore[misc]
-
- return wrapper
-
- return decorator
-
-
-# ---------------------------------------------------------------------------
-# CircuitBreaker
-# ---------------------------------------------------------------------------
-
-
-class CircuitState(enum.Enum):
- CLOSED = "closed"
- OPEN = "open"
- HALF_OPEN = "half_open"
-
-
-class CircuitBreaker:
- """Simple circuit breaker implementation."""
-
- def __init__(
- self,
- failure_threshold: int = 5,
- recovery_timeout: float = 60.0,
- ) -> None:
- self._failure_threshold = failure_threshold
- self._recovery_timeout = recovery_timeout
- self._failure_count: int = 0
- self._state = CircuitState.CLOSED
- self._opened_at: float = 0.0
-
- @property
- def state(self) -> CircuitState:
- return self._state
-
- def allow_request(self) -> bool:
- if self._state == CircuitState.CLOSED:
- return True
- if self._state == CircuitState.OPEN:
- if time.monotonic() - self._opened_at >= self._recovery_timeout:
- self._state = CircuitState.HALF_OPEN
- return True
- return False
- # HALF_OPEN
- return True
-
- def record_success(self) -> None:
- self._failure_count = 0
- self._state = CircuitState.CLOSED
-
- def record_failure(self) -> None:
- self._failure_count += 1
- if self._failure_count >= self._failure_threshold:
- self._state = CircuitState.OPEN
- self._opened_at = time.monotonic()
+"""Resilience utilities for the trading platform."""
diff --git a/shared/src/shared/sa_models.py b/shared/src/shared/sa_models.py
index 1bd92c2..dc87ef5 100644
--- a/shared/src/shared/sa_models.py
+++ b/shared/src/shared/sa_models.py
@@ -53,19 +53,6 @@ class OrderRow(Base):
filled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
-class TradeRow(Base):
- __tablename__ = "trades"
-
- id: Mapped[str] = mapped_column(Text, primary_key=True)
- order_id: Mapped[str | None] = mapped_column(Text, ForeignKey("orders.id"))
- symbol: Mapped[str] = mapped_column(Text, nullable=False)
- side: Mapped[str] = mapped_column(Text, nullable=False)
- price: Mapped[Decimal] = mapped_column(Numeric, nullable=False)
- quantity: Mapped[Decimal] = mapped_column(Numeric, nullable=False)
- fee: Mapped[Decimal] = mapped_column(Numeric, nullable=False, server_default="0")
- traded_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
-
-
class PositionRow(Base):
__tablename__ = "positions"
diff --git a/shared/src/shared/sentiment.py b/shared/src/shared/sentiment.py
index 449eb76..5b4b0da 100644
--- a/shared/src/shared/sentiment.py
+++ b/shared/src/shared/sentiment.py
@@ -1,41 +1,9 @@
-"""Market sentiment data."""
+"""Market sentiment aggregation."""
-import logging
-from dataclasses import dataclass, field
-from datetime import datetime, timezone
+from datetime import datetime
from shared.sentiment_models import SymbolScore
-logger = logging.getLogger(__name__)
-
-
-@dataclass
-class SentimentData:
- """Aggregated sentiment snapshot."""
-
- fear_greed_value: int | None = None
- fear_greed_label: str | None = None
- news_sentiment: float | None = None
- news_count: int = 0
- exchange_netflow: float | None = None
- timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
-
- @property
- def should_buy(self) -> bool:
- if self.fear_greed_value is not None and self.fear_greed_value > 70:
- return False
- if self.news_sentiment is not None and self.news_sentiment < -0.3:
- return False
- return True
-
- @property
- def should_block(self) -> bool:
- if self.fear_greed_value is not None and self.fear_greed_value > 80:
- return True
- if self.news_sentiment is not None and self.news_sentiment < -0.5:
- return True
- return False
-
def _safe_avg(values: list[float]) -> float:
if not values:
diff --git a/shared/tests/test_resilience.py b/shared/tests/test_resilience.py
deleted file mode 100644
index e287777..0000000
--- a/shared/tests/test_resilience.py
+++ /dev/null
@@ -1,139 +0,0 @@
-"""Tests for retry with backoff and circuit breaker."""
-
-import time
-
-import pytest
-
-from shared.resilience import CircuitBreaker, CircuitState, retry_with_backoff
-
-
-# ---------------------------------------------------------------------------
-# retry_with_backoff tests
-# ---------------------------------------------------------------------------
-
-
-@pytest.mark.asyncio
-async def test_retry_succeeds_first_try():
- call_count = 0
-
- @retry_with_backoff(max_retries=3, base_delay=0.01)
- async def succeed():
- nonlocal call_count
- call_count += 1
- return "ok"
-
- result = await succeed()
- assert result == "ok"
- assert call_count == 1
-
-
-@pytest.mark.asyncio
-async def test_retry_succeeds_after_failures():
- call_count = 0
-
- @retry_with_backoff(max_retries=3, base_delay=0.01)
- async def flaky():
- nonlocal call_count
- call_count += 1
- if call_count < 3:
- raise ValueError("not yet")
- return "recovered"
-
- result = await flaky()
- assert result == "recovered"
- assert call_count == 3
-
-
-@pytest.mark.asyncio
-async def test_retry_raises_after_max_retries():
- call_count = 0
-
- @retry_with_backoff(max_retries=3, base_delay=0.01)
- async def always_fail():
- nonlocal call_count
- call_count += 1
- raise RuntimeError("permanent")
-
- with pytest.raises(RuntimeError, match="permanent"):
- await always_fail()
- # 1 initial + 3 retries = 4 calls
- assert call_count == 4
-
-
-@pytest.mark.asyncio
-async def test_retry_respects_max_delay():
- """Backoff should be capped at max_delay."""
-
- @retry_with_backoff(max_retries=2, base_delay=0.01, max_delay=0.02)
- async def always_fail():
- raise RuntimeError("fail")
-
- start = time.monotonic()
- with pytest.raises(RuntimeError):
- await always_fail()
- elapsed = time.monotonic() - start
- # With max_delay=0.02 and 2 retries, total delay should be small
- assert elapsed < 0.5
-
-
-# ---------------------------------------------------------------------------
-# CircuitBreaker tests
-# ---------------------------------------------------------------------------
-
-
-def test_circuit_starts_closed():
- cb = CircuitBreaker(failure_threshold=3, recovery_timeout=0.05)
- assert cb.state == CircuitState.CLOSED
- assert cb.allow_request() is True
-
-
-def test_circuit_opens_after_threshold():
- cb = CircuitBreaker(failure_threshold=3, recovery_timeout=60.0)
- for _ in range(3):
- cb.record_failure()
- assert cb.state == CircuitState.OPEN
- assert cb.allow_request() is False
-
-
-def test_circuit_rejects_when_open():
- cb = CircuitBreaker(failure_threshold=2, recovery_timeout=60.0)
- cb.record_failure()
- cb.record_failure()
- assert cb.state == CircuitState.OPEN
- assert cb.allow_request() is False
-
-
-def test_circuit_half_open_after_timeout():
- cb = CircuitBreaker(failure_threshold=2, recovery_timeout=0.05)
- cb.record_failure()
- cb.record_failure()
- assert cb.state == CircuitState.OPEN
-
- time.sleep(0.06)
- assert cb.allow_request() is True
- assert cb.state == CircuitState.HALF_OPEN
-
-
-def test_circuit_closes_on_success():
- cb = CircuitBreaker(failure_threshold=2, recovery_timeout=0.05)
- cb.record_failure()
- cb.record_failure()
- assert cb.state == CircuitState.OPEN
-
- time.sleep(0.06)
- cb.allow_request() # triggers HALF_OPEN
- assert cb.state == CircuitState.HALF_OPEN
-
- cb.record_success()
- assert cb.state == CircuitState.CLOSED
- assert cb.allow_request() is True
-
-
-def test_circuit_reopens_on_failure_in_half_open():
- cb = CircuitBreaker(failure_threshold=2, recovery_timeout=0.05)
- cb.record_failure()
- cb.record_failure()
- time.sleep(0.06)
- cb.allow_request() # HALF_OPEN
- cb.record_failure()
- assert cb.state == CircuitState.OPEN
diff --git a/shared/tests/test_sa_models.py b/shared/tests/test_sa_models.py
index dc6355e..ae73833 100644
--- a/shared/tests/test_sa_models.py
+++ b/shared/tests/test_sa_models.py
@@ -11,7 +11,6 @@ def test_base_metadata_has_all_tables():
"candles",
"signals",
"orders",
- "trades",
"positions",
"portfolio_snapshots",
"news_items",
@@ -124,44 +123,6 @@ class TestOrderRow:
assert fk_cols == {"signal_id": "signals.id"}
-class TestTradeRow:
- def test_table_name(self):
- from shared.sa_models import TradeRow
-
- assert TradeRow.__tablename__ == "trades"
-
- def test_columns(self):
- from shared.sa_models import TradeRow
-
- mapper = inspect(TradeRow)
- cols = {c.key for c in mapper.column_attrs}
- expected = {
- "id",
- "order_id",
- "symbol",
- "side",
- "price",
- "quantity",
- "fee",
- "traded_at",
- }
- assert expected == cols
-
- def test_primary_key(self):
- from shared.sa_models import TradeRow
-
- mapper = inspect(TradeRow)
- pk_cols = [c.name for c in mapper.mapper.primary_key]
- assert pk_cols == ["id"]
-
- def test_order_id_foreign_key(self):
- from shared.sa_models import TradeRow
-
- table = TradeRow.__table__
- fk_cols = {fk.parent.name: fk.target_fullname for fk in table.foreign_keys}
- assert fk_cols == {"order_id": "orders.id"}
-
-
class TestPositionRow:
def test_table_name(self):
from shared.sa_models import PositionRow
@@ -233,11 +194,3 @@ class TestStatusDefault:
status_col = table.c.status
assert status_col.server_default is not None
assert status_col.server_default.arg == "PENDING"
-
- def test_trade_fee_server_default(self):
- from shared.sa_models import TradeRow
-
- table = TradeRow.__table__
- fee_col = table.c.fee
- assert fee_col.server_default is not None
- assert fee_col.server_default.arg == "0"
diff --git a/shared/tests/test_sentiment.py b/shared/tests/test_sentiment.py
deleted file mode 100644
index 9bd8ea3..0000000
--- a/shared/tests/test_sentiment.py
+++ /dev/null
@@ -1,44 +0,0 @@
-"""Tests for market sentiment module."""
-
-from shared.sentiment import SentimentData
-
-
-def test_sentiment_should_buy_default_no_data():
- s = SentimentData()
- assert s.should_buy is True
- assert s.should_block is False
-
-
-def test_sentiment_should_buy_low_fear_greed():
- s = SentimentData(fear_greed_value=15)
- assert s.should_buy is True
-
-
-def test_sentiment_should_not_buy_on_greed():
- s = SentimentData(fear_greed_value=75)
- assert s.should_buy is False
-
-
-def test_sentiment_should_not_buy_negative_news():
- s = SentimentData(news_sentiment=-0.4)
- assert s.should_buy is False
-
-
-def test_sentiment_should_buy_positive_news():
- s = SentimentData(fear_greed_value=50, news_sentiment=0.3)
- assert s.should_buy is True
-
-
-def test_sentiment_should_block_extreme_greed():
- s = SentimentData(fear_greed_value=85)
- assert s.should_block is True
-
-
-def test_sentiment_should_block_very_negative_news():
- s = SentimentData(news_sentiment=-0.6)
- assert s.should_block is True
-
-
-def test_sentiment_no_block_on_neutral():
- s = SentimentData(fear_greed_value=50, news_sentiment=0.0)
- assert s.should_block is False