diff options
| -rw-r--r-- | .env.example | 4 | ||||
| -rw-r--r-- | CLAUDE.md | 106 | ||||
| -rw-r--r-- | services/strategy-engine/src/strategy_engine/config.py | 2 | ||||
| -rw-r--r-- | shared/src/shared/config.py | 4 | ||||
| -rw-r--r-- | shared/src/shared/resilience.py | 106 | ||||
| -rw-r--r-- | shared/src/shared/sa_models.py | 13 | ||||
| -rw-r--r-- | shared/src/shared/sentiment.py | 36 | ||||
| -rw-r--r-- | shared/tests/test_resilience.py | 139 | ||||
| -rw-r--r-- | shared/tests/test_sa_models.py | 47 | ||||
| -rw-r--r-- | shared/tests/test_sentiment.py | 44 |
10 files changed, 109 insertions, 392 deletions
diff --git a/.env.example b/.env.example index dcaf9a8..bdc6a67 100644 --- a/.env.example +++ b/.env.example @@ -19,8 +19,6 @@ TELEGRAM_CHAT_ID= TELEGRAM_ENABLED=false LOG_FORMAT=json HEALTH_PORT=8080 -CIRCUIT_BREAKER_THRESHOLD=5 -CIRCUIT_BREAKER_TIMEOUT=60 METRICS_AUTH_TOKEN= # News Collector @@ -29,8 +27,6 @@ NEWS_POLL_INTERVAL=300 SENTIMENT_AGGREGATE_INTERVAL=900 # Stock Selector -SELECTOR_CANDIDATES_TIME=15:00 -SELECTOR_FILTER_TIME=15:15 SELECTOR_FINAL_TIME=15:30 SELECTOR_MAX_PICKS=3 diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..6e33f57 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,106 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +US stock trading platform built as a Python microservices architecture. Uses Alpaca Markets API for market data and order execution. Services communicate via Redis Streams and persist to PostgreSQL. + +## Common Commands + +```bash +make infra # Start Redis + Postgres (required before running services/tests) +make up # Start all services via Docker Compose +make down # Stop all services +make test # Run all tests (pytest -v) +make lint # Lint check (ruff check + format check) +make format # Auto-fix lint + format +make migrate # Run DB migrations (alembic upgrade head, from shared/) +make migrate-new msg="description" # Create new migration +make ci # Full CI: install deps, lint, test, Docker build +make e2e # End-to-end tests +``` + +Run a single test file: `pytest services/strategy-engine/tests/test_rsi_strategy.py -v` + +## Architecture + +### Services (each in `services/<name>/`, each has its own Dockerfile) + +- **data-collector** (port 8080): Fetches stock bars from Alpaca, publishes `CandleEvent` to Redis stream `candles` +- **news-collector** (port 8084): Continuously collects news from 7 sources (Finnhub, RSS, SEC EDGAR, Truth Social, Reddit, Fear & Greed, Fed), runs sentiment aggregation every 15 min +- **strategy-engine** (port 8081): Consumes candle events, runs strategies, publishes `SignalEvent` to stream `signals`. Also runs the stock selector at 15:30 ET daily +- **order-executor** (port 8082): Consumes signals, runs risk checks, places orders via Alpaca, publishes `OrderEvent` to stream `orders` +- **portfolio-manager** (port 8083): Tracks positions, PnL, portfolio snapshots +- **api** (port 8000): FastAPI REST endpoint layer +- **backtester**: Offline backtesting engine with walk-forward analysis + +### Event Flow + +``` +Alpaca → data-collector → [candles stream] → strategy-engine → [signals stream] → order-executor → [orders stream] → portfolio-manager + +News sources → news-collector → [news stream] → sentiment aggregator → symbol_scores DB + ↓ + stock selector (15:30 ET) → [selected_stocks stream] → MOC strategy → signals +``` + +All inter-service events use `shared/src/shared/events.py` (CandleEvent, SignalEvent, OrderEvent, NewsEvent) serialized as JSON over Redis Streams via `shared/src/shared/broker.py` (RedisBroker). + +### Shared Library (`shared/`) + +Installed as editable package (`pip install -e shared/`). Contains: +- `models.py` — Pydantic domain models: Candle, Signal, Order, Position, NewsItem, NewsCategory +- `sentiment_models.py` — SymbolScore, MarketSentiment, SelectedStock, Candidate +- `sa_models.py` — SQLAlchemy ORM models (CandleRow, SignalRow, OrderRow, PortfolioSnapshotRow, NewsItemRow, SymbolScoreRow, MarketSentimentRow, StockSelectionRow) +- `broker.py` — RedisBroker (async Redis Streams pub/sub with consumer groups) +- `db.py` — Database class (async SQLAlchemy 2.0), includes news/sentiment/selection CRUD methods +- `alpaca.py` — AlpacaClient (async aiohttp client for Alpaca Trading + Market Data APIs) +- `events.py` — Event types and serialization (CandleEvent, SignalEvent, OrderEvent, NewsEvent) +- `sentiment.py` — SentimentData (legacy gating) + SentimentAggregator (freshness-weighted composite scoring) +- `config.py`, `logging.py`, `metrics.py`, `notifier.py` (Telegram), `resilience.py`, `healthcheck.py` + +DB migrations live in `shared/alembic/`. + +### Strategy System (`services/strategy-engine/strategies/`) + +Strategies extend `BaseStrategy` (in `strategies/base.py`) and implement `on_candle()`, `configure()`, `warmup_period`. The plugin loader (`strategy_engine/plugin_loader.py`) auto-discovers `*.py` files in the strategies directory and loads YAML config from `strategies/config/<strategy_name>.yaml`. + +BaseStrategy provides optional filters (ADX regime, volume, ATR-based stops) via `_init_filters()` and `_apply_filters()`. + +### News-Driven Stock Selector (`services/strategy-engine/src/strategy_engine/stock_selector.py`) + +Dynamic stock selection for MOC (Market on Close) trading. Runs daily at 15:30 ET via `strategy-engine`: + +1. **Candidate Pool**: Top 20 by sentiment score + LLM-recommended stocks from today's news +2. **Technical Filter**: RSI 30-70, price > 20 EMA, volume > 50% average +3. **LLM Final Selection**: Claude picks 2-3 stocks with rationale + +Market gating: blocks all trades when Fear & Greed ≤ 20 or VIX > 30 (`risk_off` regime). + +### News Collector (`services/news-collector/`) + +7 collectors extending `BaseCollector` in `collectors/`: +- `finnhub.py` (5min), `rss.py` (10min), `reddit.py` (15min), `truth_social.py` (15min), `sec_edgar.py` (30min), `fear_greed.py` (1hr), `fed.py` (1hr) +- All use VADER (nltk) for sentiment scoring +- Provider abstraction via `BaseCollector` for future paid API swap (config change only) + +Sentiment aggregation (every 15min) computes per-symbol composite scores with freshness decay and category weights (policy 0.3, news 0.3, social 0.2, filing 0.2). + +### CLI (`cli/`) + +Click-based CLI installed as `trading` command. Depends on the shared library. + +## Tech Stack + +- Python 3.12+, async throughout (asyncio, aiohttp) +- Pydantic for models, SQLAlchemy 2.0 async ORM, Alembic for migrations +- Redis Streams for inter-service messaging +- PostgreSQL 16 for persistence +- Ruff for linting/formatting (line-length=100) +- pytest + pytest-asyncio (asyncio_mode="auto") +- Docker Compose for deployment; monitoring stack (Grafana/Prometheus/Loki) available via `--profile monitoring` + +## Environment + +Copy `.env.example` to `.env`. Key vars: `ALPACA_API_KEY`, `ALPACA_API_SECRET`, `ALPACA_PAPER=true`, `DRY_RUN=true`, `DATABASE_URL`, `REDIS_URL`, `FINNHUB_API_KEY`, `ANTHROPIC_API_KEY`. DRY_RUN=true simulates order fills without hitting Alpaca. Stock selector requires `ANTHROPIC_API_KEY` to be set. diff --git a/services/strategy-engine/src/strategy_engine/config.py b/services/strategy-engine/src/strategy_engine/config.py index 2a9cb43..15f8588 100644 --- a/services/strategy-engine/src/strategy_engine/config.py +++ b/services/strategy-engine/src/strategy_engine/config.py @@ -7,8 +7,6 @@ class StrategyConfig(Settings): symbols: list[str] = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA"] timeframes: list[str] = ["1m"] strategy_params: dict = {} - selector_candidates_time: str = "15:00" - selector_filter_time: str = "15:15" selector_final_time: str = "15:30" selector_max_picks: int = 3 anthropic_api_key: str = "" diff --git a/shared/src/shared/config.py b/shared/src/shared/config.py index 7a947b3..b6ccebd 100644 --- a/shared/src/shared/config.py +++ b/shared/src/shared/config.py @@ -32,16 +32,12 @@ class Settings(BaseSettings): telegram_enabled: bool = False log_format: str = "json" health_port: int = 8080 - circuit_breaker_threshold: int = 5 - circuit_breaker_timeout: int = 60 metrics_auth_token: str = "" # If set, /health and /metrics require Bearer token # News collector finnhub_api_key: str = "" news_poll_interval: int = 300 sentiment_aggregate_interval: int = 900 # Stock selector - selector_candidates_time: str = "15:00" - selector_filter_time: str = "15:15" selector_final_time: str = "15:30" selector_max_picks: int = 3 # LLM diff --git a/shared/src/shared/resilience.py b/shared/src/shared/resilience.py index e43fd21..8d8678a 100644 --- a/shared/src/shared/resilience.py +++ b/shared/src/shared/resilience.py @@ -1,105 +1 @@ -"""Retry with exponential backoff and circuit breaker utilities.""" - -from __future__ import annotations - -import asyncio -import enum -import functools -import logging -import random -import time -from typing import Any, Callable - -logger = logging.getLogger(__name__) - - -# --------------------------------------------------------------------------- -# retry_with_backoff -# --------------------------------------------------------------------------- - - -def retry_with_backoff( - max_retries: int = 3, - base_delay: float = 1.0, - max_delay: float = 60.0, -) -> Callable: - """Decorator that retries an async function with exponential backoff + jitter.""" - - def decorator(func: Callable) -> Callable: - @functools.wraps(func) - async def wrapper(*args: Any, **kwargs: Any) -> Any: - last_exc: BaseException | None = None - for attempt in range(max_retries + 1): - try: - return await func(*args, **kwargs) - except Exception as exc: - last_exc = exc - if attempt < max_retries: - delay = min(base_delay * (2**attempt), max_delay) - jitter = delay * random.uniform(0, 0.5) - total_delay = delay + jitter - logger.warning( - "Retry %d/%d for %s after error: %s (delay=%.3fs)", - attempt + 1, - max_retries, - func.__name__, - exc, - total_delay, - ) - await asyncio.sleep(total_delay) - raise last_exc # type: ignore[misc] - - return wrapper - - return decorator - - -# --------------------------------------------------------------------------- -# CircuitBreaker -# --------------------------------------------------------------------------- - - -class CircuitState(enum.Enum): - CLOSED = "closed" - OPEN = "open" - HALF_OPEN = "half_open" - - -class CircuitBreaker: - """Simple circuit breaker implementation.""" - - def __init__( - self, - failure_threshold: int = 5, - recovery_timeout: float = 60.0, - ) -> None: - self._failure_threshold = failure_threshold - self._recovery_timeout = recovery_timeout - self._failure_count: int = 0 - self._state = CircuitState.CLOSED - self._opened_at: float = 0.0 - - @property - def state(self) -> CircuitState: - return self._state - - def allow_request(self) -> bool: - if self._state == CircuitState.CLOSED: - return True - if self._state == CircuitState.OPEN: - if time.monotonic() - self._opened_at >= self._recovery_timeout: - self._state = CircuitState.HALF_OPEN - return True - return False - # HALF_OPEN - return True - - def record_success(self) -> None: - self._failure_count = 0 - self._state = CircuitState.CLOSED - - def record_failure(self) -> None: - self._failure_count += 1 - if self._failure_count >= self._failure_threshold: - self._state = CircuitState.OPEN - self._opened_at = time.monotonic() +"""Resilience utilities for the trading platform.""" diff --git a/shared/src/shared/sa_models.py b/shared/src/shared/sa_models.py index 1bd92c2..dc87ef5 100644 --- a/shared/src/shared/sa_models.py +++ b/shared/src/shared/sa_models.py @@ -53,19 +53,6 @@ class OrderRow(Base): filled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) -class TradeRow(Base): - __tablename__ = "trades" - - id: Mapped[str] = mapped_column(Text, primary_key=True) - order_id: Mapped[str | None] = mapped_column(Text, ForeignKey("orders.id")) - symbol: Mapped[str] = mapped_column(Text, nullable=False) - side: Mapped[str] = mapped_column(Text, nullable=False) - price: Mapped[Decimal] = mapped_column(Numeric, nullable=False) - quantity: Mapped[Decimal] = mapped_column(Numeric, nullable=False) - fee: Mapped[Decimal] = mapped_column(Numeric, nullable=False, server_default="0") - traded_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False) - - class PositionRow(Base): __tablename__ = "positions" diff --git a/shared/src/shared/sentiment.py b/shared/src/shared/sentiment.py index 449eb76..5b4b0da 100644 --- a/shared/src/shared/sentiment.py +++ b/shared/src/shared/sentiment.py @@ -1,41 +1,9 @@ -"""Market sentiment data.""" +"""Market sentiment aggregation.""" -import logging -from dataclasses import dataclass, field -from datetime import datetime, timezone +from datetime import datetime from shared.sentiment_models import SymbolScore -logger = logging.getLogger(__name__) - - -@dataclass -class SentimentData: - """Aggregated sentiment snapshot.""" - - fear_greed_value: int | None = None - fear_greed_label: str | None = None - news_sentiment: float | None = None - news_count: int = 0 - exchange_netflow: float | None = None - timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) - - @property - def should_buy(self) -> bool: - if self.fear_greed_value is not None and self.fear_greed_value > 70: - return False - if self.news_sentiment is not None and self.news_sentiment < -0.3: - return False - return True - - @property - def should_block(self) -> bool: - if self.fear_greed_value is not None and self.fear_greed_value > 80: - return True - if self.news_sentiment is not None and self.news_sentiment < -0.5: - return True - return False - def _safe_avg(values: list[float]) -> float: if not values: diff --git a/shared/tests/test_resilience.py b/shared/tests/test_resilience.py deleted file mode 100644 index e287777..0000000 --- a/shared/tests/test_resilience.py +++ /dev/null @@ -1,139 +0,0 @@ -"""Tests for retry with backoff and circuit breaker.""" - -import time - -import pytest - -from shared.resilience import CircuitBreaker, CircuitState, retry_with_backoff - - -# --------------------------------------------------------------------------- -# retry_with_backoff tests -# --------------------------------------------------------------------------- - - -@pytest.mark.asyncio -async def test_retry_succeeds_first_try(): - call_count = 0 - - @retry_with_backoff(max_retries=3, base_delay=0.01) - async def succeed(): - nonlocal call_count - call_count += 1 - return "ok" - - result = await succeed() - assert result == "ok" - assert call_count == 1 - - -@pytest.mark.asyncio -async def test_retry_succeeds_after_failures(): - call_count = 0 - - @retry_with_backoff(max_retries=3, base_delay=0.01) - async def flaky(): - nonlocal call_count - call_count += 1 - if call_count < 3: - raise ValueError("not yet") - return "recovered" - - result = await flaky() - assert result == "recovered" - assert call_count == 3 - - -@pytest.mark.asyncio -async def test_retry_raises_after_max_retries(): - call_count = 0 - - @retry_with_backoff(max_retries=3, base_delay=0.01) - async def always_fail(): - nonlocal call_count - call_count += 1 - raise RuntimeError("permanent") - - with pytest.raises(RuntimeError, match="permanent"): - await always_fail() - # 1 initial + 3 retries = 4 calls - assert call_count == 4 - - -@pytest.mark.asyncio -async def test_retry_respects_max_delay(): - """Backoff should be capped at max_delay.""" - - @retry_with_backoff(max_retries=2, base_delay=0.01, max_delay=0.02) - async def always_fail(): - raise RuntimeError("fail") - - start = time.monotonic() - with pytest.raises(RuntimeError): - await always_fail() - elapsed = time.monotonic() - start - # With max_delay=0.02 and 2 retries, total delay should be small - assert elapsed < 0.5 - - -# --------------------------------------------------------------------------- -# CircuitBreaker tests -# --------------------------------------------------------------------------- - - -def test_circuit_starts_closed(): - cb = CircuitBreaker(failure_threshold=3, recovery_timeout=0.05) - assert cb.state == CircuitState.CLOSED - assert cb.allow_request() is True - - -def test_circuit_opens_after_threshold(): - cb = CircuitBreaker(failure_threshold=3, recovery_timeout=60.0) - for _ in range(3): - cb.record_failure() - assert cb.state == CircuitState.OPEN - assert cb.allow_request() is False - - -def test_circuit_rejects_when_open(): - cb = CircuitBreaker(failure_threshold=2, recovery_timeout=60.0) - cb.record_failure() - cb.record_failure() - assert cb.state == CircuitState.OPEN - assert cb.allow_request() is False - - -def test_circuit_half_open_after_timeout(): - cb = CircuitBreaker(failure_threshold=2, recovery_timeout=0.05) - cb.record_failure() - cb.record_failure() - assert cb.state == CircuitState.OPEN - - time.sleep(0.06) - assert cb.allow_request() is True - assert cb.state == CircuitState.HALF_OPEN - - -def test_circuit_closes_on_success(): - cb = CircuitBreaker(failure_threshold=2, recovery_timeout=0.05) - cb.record_failure() - cb.record_failure() - assert cb.state == CircuitState.OPEN - - time.sleep(0.06) - cb.allow_request() # triggers HALF_OPEN - assert cb.state == CircuitState.HALF_OPEN - - cb.record_success() - assert cb.state == CircuitState.CLOSED - assert cb.allow_request() is True - - -def test_circuit_reopens_on_failure_in_half_open(): - cb = CircuitBreaker(failure_threshold=2, recovery_timeout=0.05) - cb.record_failure() - cb.record_failure() - time.sleep(0.06) - cb.allow_request() # HALF_OPEN - cb.record_failure() - assert cb.state == CircuitState.OPEN diff --git a/shared/tests/test_sa_models.py b/shared/tests/test_sa_models.py index dc6355e..ae73833 100644 --- a/shared/tests/test_sa_models.py +++ b/shared/tests/test_sa_models.py @@ -11,7 +11,6 @@ def test_base_metadata_has_all_tables(): "candles", "signals", "orders", - "trades", "positions", "portfolio_snapshots", "news_items", @@ -124,44 +123,6 @@ class TestOrderRow: assert fk_cols == {"signal_id": "signals.id"} -class TestTradeRow: - def test_table_name(self): - from shared.sa_models import TradeRow - - assert TradeRow.__tablename__ == "trades" - - def test_columns(self): - from shared.sa_models import TradeRow - - mapper = inspect(TradeRow) - cols = {c.key for c in mapper.column_attrs} - expected = { - "id", - "order_id", - "symbol", - "side", - "price", - "quantity", - "fee", - "traded_at", - } - assert expected == cols - - def test_primary_key(self): - from shared.sa_models import TradeRow - - mapper = inspect(TradeRow) - pk_cols = [c.name for c in mapper.mapper.primary_key] - assert pk_cols == ["id"] - - def test_order_id_foreign_key(self): - from shared.sa_models import TradeRow - - table = TradeRow.__table__ - fk_cols = {fk.parent.name: fk.target_fullname for fk in table.foreign_keys} - assert fk_cols == {"order_id": "orders.id"} - - class TestPositionRow: def test_table_name(self): from shared.sa_models import PositionRow @@ -233,11 +194,3 @@ class TestStatusDefault: status_col = table.c.status assert status_col.server_default is not None assert status_col.server_default.arg == "PENDING" - - def test_trade_fee_server_default(self): - from shared.sa_models import TradeRow - - table = TradeRow.__table__ - fee_col = table.c.fee - assert fee_col.server_default is not None - assert fee_col.server_default.arg == "0" diff --git a/shared/tests/test_sentiment.py b/shared/tests/test_sentiment.py deleted file mode 100644 index 9bd8ea3..0000000 --- a/shared/tests/test_sentiment.py +++ /dev/null @@ -1,44 +0,0 @@ -"""Tests for market sentiment module.""" - -from shared.sentiment import SentimentData - - -def test_sentiment_should_buy_default_no_data(): - s = SentimentData() - assert s.should_buy is True - assert s.should_block is False - - -def test_sentiment_should_buy_low_fear_greed(): - s = SentimentData(fear_greed_value=15) - assert s.should_buy is True - - -def test_sentiment_should_not_buy_on_greed(): - s = SentimentData(fear_greed_value=75) - assert s.should_buy is False - - -def test_sentiment_should_not_buy_negative_news(): - s = SentimentData(news_sentiment=-0.4) - assert s.should_buy is False - - -def test_sentiment_should_buy_positive_news(): - s = SentimentData(fear_greed_value=50, news_sentiment=0.3) - assert s.should_buy is True - - -def test_sentiment_should_block_extreme_greed(): - s = SentimentData(fear_greed_value=85) - assert s.should_block is True - - -def test_sentiment_should_block_very_negative_news(): - s = SentimentData(news_sentiment=-0.6) - assert s.should_block is True - - -def test_sentiment_no_block_on_neutral(): - s = SentimentData(fear_greed_value=50, news_sentiment=0.0) - assert s.should_block is False |
