From 8c852c5583b4f610844fe6001309b71e1958908e Mon Sep 17 00:00:00 2001 From: TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> Date: Thu, 2 Apr 2026 15:01:09 +0900 Subject: docs: add platform upgrade implementation plan 19 tasks across 5 phases: shared library hardening, infrastructure, service fixes, API security, and operational maturity. --- .../plans/2026-04-02-platform-upgrade.md | 1991 ++++++++++++++++++++ 1 file changed, 1991 insertions(+) create mode 100644 docs/superpowers/plans/2026-04-02-platform-upgrade.md (limited to 'docs/superpowers/plans/2026-04-02-platform-upgrade.md') diff --git a/docs/superpowers/plans/2026-04-02-platform-upgrade.md b/docs/superpowers/plans/2026-04-02-platform-upgrade.md new file mode 100644 index 0000000..c28d287 --- /dev/null +++ b/docs/superpowers/plans/2026-04-02-platform-upgrade.md @@ -0,0 +1,1991 @@ +# Platform Upgrade Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Upgrade the trading platform across 5 phases: shared library hardening, infrastructure improvements, service-level fixes, API security, and operational maturity. + +**Architecture:** Bottom-up approach — harden the shared library first (resilience, DB pooling, Redis resilience, config validation), then improve infrastructure (Docker, DB indexes), then fix all services (graceful shutdown, exception handling), then add API security (auth, CORS, rate limiting), and finally improve operations (CI/CD, linting, alerting). + +**Tech Stack:** Python 3.12, asyncio, tenacity, SQLAlchemy 2.0 async, Redis Streams, FastAPI, slowapi, Ruff, GitHub Actions, Prometheus Alertmanager + +--- + +## File Structure + +### New Files +- `shared/src/shared/resilience.py` — Retry decorator, circuit breaker, timeout wrapper +- `shared/tests/test_resilience.py` — Tests for resilience module +- `shared/alembic/versions/003_add_missing_indexes.py` — DB index migration +- `.dockerignore` — Docker build exclusions +- `services/api/src/trading_api/dependencies/auth.py` — Bearer token auth dependency +- `.github/workflows/ci.yml` — GitHub Actions CI pipeline +- `monitoring/prometheus/alert_rules.yml` — Prometheus alerting rules + +### Modified Files +- `shared/src/shared/db.py` — Add connection pool config +- `shared/src/shared/broker.py` — Add Redis resilience +- `shared/src/shared/config.py` — Add validators, SecretStr, new fields +- `shared/pyproject.toml` — Pin deps, add tenacity +- `pyproject.toml` — Enhanced ruff rules, pytest-cov +- `services/strategy-engine/src/strategy_engine/stock_selector.py` — Fix bug, deduplicate, session reuse +- `services/*/src/*/main.py` — Signal handlers, exception specialization (all 6 services) +- `services/*/Dockerfile` — Multi-stage builds, non-root user (all 7 Dockerfiles) +- `services/api/pyproject.toml` — Add slowapi +- `services/api/src/trading_api/main.py` — CORS, auth, rate limiting +- `services/api/src/trading_api/routers/*.py` — Input validation, response models +- `docker-compose.yml` — Remove hardcoded creds, add resource limits, networks +- `.env.example` — Add new fields, mark secrets +- `monitoring/prometheus.yml` — Reference alert rules + +--- + +## Phase 1: Shared Library Hardening + +### Task 1: Implement Resilience Module + +**Files:** +- Create: `shared/src/shared/resilience.py` +- Create: `shared/tests/test_resilience.py` +- Modify: `shared/pyproject.toml:6-18` + +- [ ] **Step 1: Add tenacity dependency to shared/pyproject.toml** + +In `shared/pyproject.toml`, add `tenacity` to the dependencies list: + +```python +dependencies = [ + "pydantic>=2.8,<3", + "pydantic-settings>=2.0,<3", + "redis>=5.0,<6", + "asyncpg>=0.29,<1", + "sqlalchemy[asyncio]>=2.0,<3", + "alembic>=1.13,<2", + "structlog>=24.0,<25", + "prometheus-client>=0.20,<1", + "pyyaml>=6.0,<7", + "aiohttp>=3.9,<4", + "rich>=13.0,<14", + "tenacity>=8.2,<10", +] +``` + +Note: This also pins all existing dependencies with upper bounds. + +- [ ] **Step 2: Write failing tests for retry_async** + +Create `shared/tests/test_resilience.py`: + +```python +"""Tests for the resilience module.""" + +import asyncio + +import pytest + +from shared.resilience import retry_async, CircuitBreaker, async_timeout + + +class TestRetryAsync: + async def test_succeeds_without_retry(self): + call_count = 0 + + @retry_async(max_retries=3) + async def succeed(): + nonlocal call_count + call_count += 1 + return "ok" + + result = await succeed() + assert result == "ok" + assert call_count == 1 + + async def test_retries_on_failure_then_succeeds(self): + call_count = 0 + + @retry_async(max_retries=3, base_delay=0.01) + async def fail_twice(): + nonlocal call_count + call_count += 1 + if call_count < 3: + raise ConnectionError("fail") + return "ok" + + result = await fail_twice() + assert result == "ok" + assert call_count == 3 + + async def test_raises_after_max_retries(self): + @retry_async(max_retries=2, base_delay=0.01) + async def always_fail(): + raise ConnectionError("fail") + + with pytest.raises(ConnectionError): + await always_fail() + + async def test_no_retry_on_excluded_exception(self): + call_count = 0 + + @retry_async(max_retries=3, base_delay=0.01, exclude=(ValueError,)) + async def raise_value_error(): + nonlocal call_count + call_count += 1 + raise ValueError("bad input") + + with pytest.raises(ValueError): + await raise_value_error() + assert call_count == 1 + + +class TestCircuitBreaker: + async def test_closed_allows_calls(self): + cb = CircuitBreaker(failure_threshold=3, cooldown=0.1) + + async def succeed(): + return "ok" + + result = await cb.call(succeed) + assert result == "ok" + + async def test_opens_after_threshold(self): + cb = CircuitBreaker(failure_threshold=2, cooldown=60) + + async def fail(): + raise ConnectionError("fail") + + for _ in range(2): + with pytest.raises(ConnectionError): + await cb.call(fail) + + with pytest.raises(RuntimeError, match="Circuit breaker is open"): + await cb.call(fail) + + async def test_half_open_after_cooldown(self): + cb = CircuitBreaker(failure_threshold=2, cooldown=0.05) + + call_count = 0 + + async def fail_then_succeed(): + nonlocal call_count + call_count += 1 + if call_count <= 2: + raise ConnectionError("fail") + return "recovered" + + # Trip the breaker + for _ in range(2): + with pytest.raises(ConnectionError): + await cb.call(fail_then_succeed) + + # Wait for cooldown + await asyncio.sleep(0.1) + + # Should allow one call (half-open) + result = await cb.call(fail_then_succeed) + assert result == "recovered" + + +class TestAsyncTimeout: + async def test_completes_within_timeout(self): + async with async_timeout(1.0): + await asyncio.sleep(0.01) + + async def test_raises_on_timeout(self): + with pytest.raises(asyncio.TimeoutError): + async with async_timeout(0.01): + await asyncio.sleep(1.0) +``` + +- [ ] **Step 3: Run tests to verify they fail** + +Run: `pytest shared/tests/test_resilience.py -v` +Expected: FAIL with `ImportError: cannot import name 'retry_async' from 'shared.resilience'` + +- [ ] **Step 4: Implement resilience module** + +Write `shared/src/shared/resilience.py`: + +```python +"""Resilience utilities: retry, circuit breaker, timeout.""" + +import asyncio +import functools +import logging +import time +from contextlib import asynccontextmanager + +logger = logging.getLogger(__name__) + + +def retry_async( + max_retries: int = 3, + base_delay: float = 1.0, + max_delay: float = 30.0, + exclude: tuple[type[Exception], ...] = (), +): + """Decorator for async functions with exponential backoff + jitter. + + Args: + max_retries: Maximum number of retry attempts. + base_delay: Initial delay in seconds between retries. + max_delay: Maximum delay cap in seconds. + exclude: Exception types that should NOT be retried. + """ + + def decorator(func): + @functools.wraps(func) + async def wrapper(*args, **kwargs): + last_exc = None + for attempt in range(max_retries + 1): + try: + return await func(*args, **kwargs) + except exclude: + raise + except Exception as exc: + last_exc = exc + if attempt == max_retries: + raise + delay = min(base_delay * (2**attempt), max_delay) + # Add jitter: 50-100% of delay + import random + + delay = delay * (0.5 + random.random() * 0.5) + logger.warning( + "retry attempt=%d/%d delay=%.2fs error=%s func=%s", + attempt + 1, + max_retries, + delay, + str(exc), + func.__name__, + ) + await asyncio.sleep(delay) + raise last_exc # Should not reach here, but just in case + + return wrapper + + return decorator + + +class CircuitBreaker: + """Circuit breaker: opens after consecutive failures, auto-recovers after cooldown.""" + + def __init__(self, failure_threshold: int = 5, cooldown: float = 60.0) -> None: + self._failure_threshold = failure_threshold + self._cooldown = cooldown + self._failure_count = 0 + self._last_failure_time: float = 0 + self._state = "closed" # closed, open, half_open + + async def call(self, func, *args, **kwargs): + if self._state == "open": + if time.monotonic() - self._last_failure_time >= self._cooldown: + self._state = "half_open" + else: + raise RuntimeError("Circuit breaker is open") + + try: + result = await func(*args, **kwargs) + self._failure_count = 0 + self._state = "closed" + return result + except Exception: + self._failure_count += 1 + self._last_failure_time = time.monotonic() + if self._failure_count >= self._failure_threshold: + self._state = "open" + logger.error( + "circuit_breaker_opened failures=%d cooldown=%.0fs", + self._failure_count, + self._cooldown, + ) + raise + + +@asynccontextmanager +async def async_timeout(seconds: float): + """Async context manager that raises TimeoutError after given seconds.""" + try: + async with asyncio.timeout(seconds): + yield + except TimeoutError: + raise asyncio.TimeoutError(f"Operation timed out after {seconds}s") +``` + +- [ ] **Step 5: Run tests to verify they pass** + +Run: `pytest shared/tests/test_resilience.py -v` +Expected: All 8 tests PASS + +- [ ] **Step 6: Commit** + +```bash +git add shared/src/shared/resilience.py shared/tests/test_resilience.py shared/pyproject.toml +git commit -m "feat: implement resilience module with retry, circuit breaker, timeout" +``` + +--- + +### Task 2: Add DB Connection Pooling + +**Files:** +- Modify: `shared/src/shared/db.py:39-44` +- Modify: `shared/src/shared/config.py:10-11` +- Modify: `shared/tests/test_db.py` (add pool config test) + +- [ ] **Step 1: Write failing test for pool config** + +Add to `shared/tests/test_db.py`: + +```python +async def test_connect_configures_pool(tmp_path): + """Engine should be created with pool configuration.""" + db = Database("sqlite+aiosqlite:///:memory:") + await db.connect() + engine = db._engine + pool = engine.pool + # aiosqlite uses StaticPool so we just verify connect works + assert engine is not None + await db.close() +``` + +- [ ] **Step 2: Add pool settings to config.py** + +In `shared/src/shared/config.py`, add after line 11 (`database_url`): + +```python + db_pool_size: int = 20 + db_max_overflow: int = 10 + db_pool_recycle: int = 3600 +``` + +- [ ] **Step 3: Update Database.connect() with pool parameters** + +In `shared/src/shared/db.py`, replace line 41: + +```python + self._engine = create_async_engine(self._database_url) +``` + +with: + +```python + self._engine = create_async_engine( + self._database_url, + pool_pre_ping=True, + pool_size=pool_size, + max_overflow=max_overflow, + pool_recycle=pool_recycle, + ) +``` + +Update the `connect` method signature to accept pool params: + +```python + async def connect( + self, + pool_size: int = 20, + max_overflow: int = 10, + pool_recycle: int = 3600, + ) -> None: + """Create the async engine, session factory, and all tables.""" + if self._database_url.startswith("sqlite"): + self._engine = create_async_engine(self._database_url) + else: + self._engine = create_async_engine( + self._database_url, + pool_pre_ping=True, + pool_size=pool_size, + max_overflow=max_overflow, + pool_recycle=pool_recycle, + ) + self._session_factory = async_sessionmaker(self._engine, expire_on_commit=False) + async with self._engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) +``` + +- [ ] **Step 4: Run tests** + +Run: `pytest shared/tests/test_db.py -v` +Expected: PASS + +- [ ] **Step 5: Commit** + +```bash +git add shared/src/shared/db.py shared/src/shared/config.py shared/tests/test_db.py +git commit -m "feat: add DB connection pooling with configurable pool_size, overflow, recycle" +``` + +--- + +### Task 3: Add Redis Resilience + +**Files:** +- Modify: `shared/src/shared/broker.py:1-13,15-18,102-104` +- Create: `shared/tests/test_broker_resilience.py` + +- [ ] **Step 1: Write failing tests for Redis resilience** + +Create `shared/tests/test_broker_resilience.py`: + +```python +"""Tests for Redis broker resilience features.""" + +from unittest.mock import AsyncMock, patch + +import pytest + +from shared.broker import RedisBroker + + +class TestBrokerResilience: + async def test_publish_retries_on_connection_error(self): + broker = RedisBroker.__new__(RedisBroker) + mock_redis = AsyncMock() + call_count = 0 + + async def xadd_failing(*args, **kwargs): + nonlocal call_count + call_count += 1 + if call_count < 3: + raise ConnectionError("Redis connection lost") + return "msg-id" + + mock_redis.xadd = xadd_failing + broker._redis = mock_redis + + await broker.publish("test-stream", {"key": "value"}) + assert call_count == 3 + + async def test_ping_retries_on_timeout(self): + broker = RedisBroker.__new__(RedisBroker) + mock_redis = AsyncMock() + call_count = 0 + + async def ping_failing(): + nonlocal call_count + call_count += 1 + if call_count < 2: + raise TimeoutError("timeout") + return True + + mock_redis.ping = ping_failing + broker._redis = mock_redis + + result = await broker.ping() + assert result is True + assert call_count == 2 +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest shared/tests/test_broker_resilience.py -v` +Expected: FAIL (publish doesn't retry) + +- [ ] **Step 3: Add resilience to broker.py** + +Replace `shared/src/shared/broker.py`: + +```python +"""Redis Streams broker for the trading platform.""" + +import json +import logging +from typing import Any + +import redis.asyncio + +from shared.resilience import retry_async + +logger = logging.getLogger(__name__) + + +class RedisBroker: + """Async Redis Streams broker for publishing and reading events.""" + + def __init__(self, redis_url: str) -> None: + self._redis = redis.asyncio.from_url( + redis_url, + socket_keepalive=True, + health_check_interval=30, + retry_on_timeout=True, + ) + + @retry_async(max_retries=3, base_delay=0.5, exclude=(ValueError,)) + async def publish(self, stream: str, data: dict[str, Any]) -> None: + """Publish a message to a Redis stream.""" + payload = json.dumps(data) + await self._redis.xadd(stream, {"payload": payload}) + + async def ensure_group(self, stream: str, group: str) -> None: + """Create a consumer group if it doesn't exist.""" + try: + await self._redis.xgroup_create(stream, group, id="0", mkstream=True) + except redis.ResponseError as e: + if "BUSYGROUP" not in str(e): + raise + + @retry_async(max_retries=3, base_delay=0.5, exclude=(ValueError,)) + async def read_group( + self, + stream: str, + group: str, + consumer: str, + count: int = 10, + block: int = 0, + ) -> list[tuple[str, dict[str, Any]]]: + """Read messages from a consumer group. Returns list of (message_id, data).""" + results = await self._redis.xreadgroup( + group, consumer, {stream: ">"}, count=count, block=block + ) + messages = [] + if results: + for _stream, entries in results: + for msg_id, fields in entries: + payload = fields.get(b"payload") or fields.get("payload") + if payload: + if isinstance(payload, bytes): + payload = payload.decode() + if isinstance(msg_id, bytes): + msg_id = msg_id.decode() + messages.append((msg_id, json.loads(payload))) + return messages + + async def ack(self, stream: str, group: str, *msg_ids: str) -> None: + """Acknowledge messages in a consumer group.""" + if msg_ids: + await self._redis.xack(stream, group, *msg_ids) + + async def read_pending( + self, + stream: str, + group: str, + consumer: str, + count: int = 10, + ) -> list[tuple[str, dict[str, Any]]]: + """Read pending (unacknowledged) messages for this consumer.""" + results = await self._redis.xreadgroup(group, consumer, {stream: "0"}, count=count) + messages = [] + if results: + for _stream, entries in results: + for msg_id, fields in entries: + if not fields: + continue + payload = fields.get(b"payload") or fields.get("payload") + if payload: + if isinstance(payload, bytes): + payload = payload.decode() + if isinstance(msg_id, bytes): + msg_id = msg_id.decode() + messages.append((msg_id, json.loads(payload))) + return messages + + async def read( + self, + stream: str, + last_id: str = "$", + count: int = 10, + block: int = 0, + ) -> list[dict[str, Any]]: + """Read messages (original method, kept for backward compatibility).""" + results = await self._redis.xread({stream: last_id}, count=count, block=block) + messages = [] + if results: + for _stream, entries in results: + for _msg_id, fields in entries: + payload = fields.get(b"payload") or fields.get("payload") + if payload: + if isinstance(payload, bytes): + payload = payload.decode() + messages.append(json.loads(payload)) + return messages + + @retry_async(max_retries=2, base_delay=0.5) + async def ping(self) -> bool: + """Ping the Redis server; return True if reachable.""" + return await self._redis.ping() + + async def close(self) -> None: + """Close the Redis connection.""" + await self._redis.aclose() +``` + +- [ ] **Step 4: Run tests** + +Run: `pytest shared/tests/test_broker_resilience.py -v` +Expected: PASS + +Run: `pytest shared/tests/test_broker.py -v` +Expected: PASS (existing tests still work) + +- [ ] **Step 5: Commit** + +```bash +git add shared/src/shared/broker.py shared/tests/test_broker_resilience.py +git commit -m "feat: add retry and resilience to Redis broker with keepalive" +``` + +--- + +### Task 4: Config Validation & SecretStr + +**Files:** +- Modify: `shared/src/shared/config.py` +- Create: `shared/tests/test_config_validation.py` + +- [ ] **Step 1: Write failing tests for config validation** + +Create `shared/tests/test_config_validation.py`: + +```python +"""Tests for config validation.""" + +import pytest +from pydantic import ValidationError + +from shared.config import Settings + + +class TestConfigValidation: + def test_valid_defaults(self): + settings = Settings() + assert settings.risk_max_position_size == 0.1 + + def test_invalid_position_size(self): + with pytest.raises(ValidationError, match="risk_max_position_size"): + Settings(risk_max_position_size=-0.1) + + def test_invalid_health_port(self): + with pytest.raises(ValidationError, match="health_port"): + Settings(health_port=80) + + def test_invalid_log_level(self): + with pytest.raises(ValidationError, match="log_level"): + Settings(log_level="INVALID") + + def test_secret_fields_masked(self): + settings = Settings(alpaca_api_key="my-secret-key") + assert "my-secret-key" not in repr(settings) + assert settings.alpaca_api_key.get_secret_value() == "my-secret-key" +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest shared/tests/test_config_validation.py -v` +Expected: FAIL + +- [ ] **Step 3: Update config.py with validators and SecretStr** + +Replace `shared/src/shared/config.py`: + +```python +"""Shared configuration settings for the trading platform.""" + +from pydantic import SecretStr, field_validator +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + # Alpaca + alpaca_api_key: SecretStr = SecretStr("") + alpaca_api_secret: SecretStr = SecretStr("") + alpaca_paper: bool = True + # Infrastructure + redis_url: SecretStr = SecretStr("redis://localhost:6379") + database_url: SecretStr = SecretStr("postgresql://trading:trading@localhost:5432/trading") + # DB pool + db_pool_size: int = 20 + db_max_overflow: int = 10 + db_pool_recycle: int = 3600 + # Logging + log_level: str = "INFO" + log_format: str = "json" + # Health + health_port: int = 8080 + metrics_auth_token: str = "" + # Risk + risk_max_position_size: float = 0.1 + risk_stop_loss_pct: float = 5.0 + risk_daily_loss_limit_pct: float = 10.0 + risk_trailing_stop_pct: float = 0.0 + risk_max_open_positions: int = 10 + risk_volatility_lookback: int = 20 + risk_volatility_scale: bool = False + risk_max_portfolio_exposure: float = 0.8 + risk_max_correlated_exposure: float = 0.5 + risk_correlation_threshold: float = 0.7 + risk_var_confidence: float = 0.95 + risk_var_limit_pct: float = 5.0 + risk_drawdown_reduction_threshold: float = 0.1 + risk_drawdown_halt_threshold: float = 0.2 + risk_max_consecutive_losses: int = 5 + risk_loss_pause_minutes: int = 60 + dry_run: bool = True + # Telegram + telegram_bot_token: SecretStr = SecretStr("") + telegram_chat_id: str = "" + telegram_enabled: bool = False + # News + finnhub_api_key: SecretStr = SecretStr("") + news_poll_interval: int = 300 + sentiment_aggregate_interval: int = 900 + # Stock selector + selector_final_time: str = "15:30" + selector_max_picks: int = 3 + # LLM + anthropic_api_key: SecretStr = SecretStr("") + anthropic_model: str = "claude-sonnet-4-20250514" + # API security + api_auth_token: SecretStr = SecretStr("") + cors_origins: str = "http://localhost:3000" + + model_config = {"env_file": ".env", "env_file_encoding": "utf-8", "extra": "ignore"} + + @field_validator("risk_max_position_size") + @classmethod + def validate_position_size(cls, v: float) -> float: + if v <= 0 or v > 1: + raise ValueError("risk_max_position_size must be between 0 and 1 (exclusive)") + return v + + @field_validator("health_port") + @classmethod + def validate_health_port(cls, v: int) -> int: + if v < 1024 or v > 65535: + raise ValueError("health_port must be between 1024 and 65535") + return v + + @field_validator("log_level") + @classmethod + def validate_log_level(cls, v: str) -> str: + valid = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"} + if v.upper() not in valid: + raise ValueError(f"log_level must be one of {valid}") + return v.upper() +``` + +- [ ] **Step 4: Update all consumers to use .get_secret_value()** + +Every place that reads `settings.alpaca_api_key` etc. must now call `.get_secret_value()`. Key files to update: + +**`shared/src/shared/alpaca.py`** — where AlpacaClient is instantiated (in each service main.py), change: +```python +# Before: +alpaca = AlpacaClient(cfg.alpaca_api_key, cfg.alpaca_api_secret, paper=cfg.alpaca_paper) +# After: +alpaca = AlpacaClient( + cfg.alpaca_api_key.get_secret_value(), + cfg.alpaca_api_secret.get_secret_value(), + paper=cfg.alpaca_paper, +) +``` + +**Each service main.py** — where `Database(cfg.database_url)` and `RedisBroker(cfg.redis_url)` are called: +```python +# Before: +db = Database(cfg.database_url) +broker = RedisBroker(cfg.redis_url) +# After: +db = Database(cfg.database_url.get_secret_value()) +broker = RedisBroker(cfg.redis_url.get_secret_value()) +``` + +**`shared/src/shared/notifier.py`** — where telegram_bot_token is used: +```python +# Change token access to .get_secret_value() +``` + +**`services/strategy-engine/src/strategy_engine/main.py`** — where anthropic_api_key is passed: +```python +# Before: +anthropic_api_key=cfg.anthropic_api_key, +# After: +anthropic_api_key=cfg.anthropic_api_key.get_secret_value(), +``` + +**`services/news-collector/src/news_collector/main.py`** — where finnhub_api_key is used: +```python +# Before: +cfg.finnhub_api_key +# After: +cfg.finnhub_api_key.get_secret_value() +``` + +- [ ] **Step 5: Run all tests** + +Run: `pytest shared/tests/test_config_validation.py -v` +Expected: PASS + +Run: `pytest -v` +Expected: All tests PASS (no regressions from SecretStr changes) + +- [ ] **Step 6: Commit** + +```bash +git add shared/src/shared/config.py shared/tests/test_config_validation.py +git add services/*/src/*/main.py shared/src/shared/notifier.py +git commit -m "feat: add config validation, SecretStr for secrets, API security fields" +``` + +--- + +### Task 5: Pin All Dependencies + +**Files:** +- Modify: `shared/pyproject.toml` (already done in Task 1) +- Modify: `services/strategy-engine/pyproject.toml` +- Modify: `services/backtester/pyproject.toml` +- Modify: `services/api/pyproject.toml` +- Modify: `services/news-collector/pyproject.toml` +- Modify: `services/data-collector/pyproject.toml` +- Modify: `services/order-executor/pyproject.toml` +- Modify: `services/portfolio-manager/pyproject.toml` + +- [ ] **Step 1: Pin service dependencies** + +`services/strategy-engine/pyproject.toml`: +```toml +dependencies = [ + "pandas>=2.1,<3", + "numpy>=1.26,<3", + "trading-shared", +] +``` + +`services/backtester/pyproject.toml`: +```toml +dependencies = ["pandas>=2.1,<3", "numpy>=1.26,<3", "rich>=13.0,<14", "trading-shared"] +``` + +`services/api/pyproject.toml`: +```toml +dependencies = [ + "fastapi>=0.110,<1", + "uvicorn>=0.27,<1", + "slowapi>=0.1.9,<1", + "trading-shared", +] +``` + +`services/news-collector/pyproject.toml`: +```toml +dependencies = [ + "trading-shared", + "feedparser>=6.0,<7", + "nltk>=3.8,<4", + "aiohttp>=3.9,<4", +] +``` + +`shared/pyproject.toml` optional deps: +```toml +[project.optional-dependencies] +dev = [ + "pytest>=8.0,<9", + "pytest-asyncio>=0.23,<1", + "ruff>=0.4,<1", +] +claude = [ + "anthropic>=0.40,<1", +] +``` + +- [ ] **Step 2: Verify installation works** + +Run: `pip install -e shared/ && pip install -e services/strategy-engine/ && pip install -e services/api/` +Expected: No errors + +- [ ] **Step 3: Commit** + +```bash +git add shared/pyproject.toml services/*/pyproject.toml +git commit -m "chore: pin all dependencies with upper bounds" +``` + +--- + +## Phase 2: Infrastructure Hardening + +### Task 6: Docker Secrets & Environment Cleanup + +**Files:** +- Modify: `docker-compose.yml:17-21` +- Modify: `.env.example` + +- [ ] **Step 1: Replace hardcoded Postgres credentials in docker-compose.yml** + +In `docker-compose.yml`, replace the postgres service environment: + +```yaml + postgres: + image: postgres:16-alpine + environment: + POSTGRES_USER: ${POSTGRES_USER:-trading} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-trading} + POSTGRES_DB: ${POSTGRES_DB:-trading} +``` + +- [ ] **Step 2: Update .env.example with secret annotations** + +Add to `.env.example`: + +```bash +# === SECRETS (keep secure, do not commit .env) === +ALPACA_API_KEY= +ALPACA_API_SECRET= +DATABASE_URL=postgresql+asyncpg://trading:trading@localhost:5432/trading +REDIS_URL=redis://localhost:6379 +TELEGRAM_BOT_TOKEN= +FINNHUB_API_KEY= +ANTHROPIC_API_KEY= +API_AUTH_TOKEN= +POSTGRES_USER=trading +POSTGRES_PASSWORD=trading +POSTGRES_DB=trading + +# === CONFIGURATION === +ALPACA_PAPER=true +DRY_RUN=true +LOG_LEVEL=INFO +LOG_FORMAT=json +HEALTH_PORT=8080 +# ... (keep existing config vars) + +# === API SECURITY === +CORS_ORIGINS=http://localhost:3000 +``` + +- [ ] **Step 3: Commit** + +```bash +git add docker-compose.yml .env.example +git commit -m "fix: move hardcoded postgres credentials to .env, annotate secrets" +``` + +--- + +### Task 7: Dockerfile Optimization + +**Files:** +- Create: `.dockerignore` +- Modify: All 7 Dockerfiles in `services/*/Dockerfile` + +- [ ] **Step 1: Create .dockerignore** + +Create `.dockerignore` at project root: + +``` +__pycache__ +*.pyc +*.pyo +.git +.github +.venv +.env +.env.* +!.env.example +tests/ +docs/ +*.md +.ruff_cache +.pytest_cache +.mypy_cache +monitoring/ +scripts/ +cli/ +``` + +- [ ] **Step 2: Update data-collector Dockerfile** + +Replace `services/data-collector/Dockerfile`: + +```dockerfile +FROM python:3.12-slim AS builder +WORKDIR /app +COPY shared/ shared/ +RUN pip install --no-cache-dir ./shared +COPY services/data-collector/ services/data-collector/ +RUN pip install --no-cache-dir ./services/data-collector + +FROM python:3.12-slim +RUN useradd -r -s /bin/false appuser +WORKDIR /app +COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages +COPY --from=builder /usr/local/bin /usr/local/bin +ENV PYTHONPATH=/app +USER appuser +CMD ["python", "-m", "data_collector.main"] +``` + +- [ ] **Step 3: Update all other Dockerfiles with same pattern** + +Apply the same multi-stage + non-root pattern to: +- `services/strategy-engine/Dockerfile` (also copies strategies/) +- `services/order-executor/Dockerfile` +- `services/portfolio-manager/Dockerfile` +- `services/api/Dockerfile` (also copies strategies/, uses uvicorn CMD) +- `services/news-collector/Dockerfile` (also runs nltk download) +- `services/backtester/Dockerfile` (also copies strategies/) + +For **strategy-engine** Dockerfile: +```dockerfile +FROM python:3.12-slim AS builder +WORKDIR /app +COPY shared/ shared/ +RUN pip install --no-cache-dir ./shared +COPY services/strategy-engine/ services/strategy-engine/ +RUN pip install --no-cache-dir ./services/strategy-engine + +FROM python:3.12-slim +RUN useradd -r -s /bin/false appuser +WORKDIR /app +COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages +COPY --from=builder /usr/local/bin /usr/local/bin +COPY services/strategy-engine/strategies/ /app/strategies/ +ENV PYTHONPATH=/app +USER appuser +CMD ["python", "-m", "strategy_engine.main"] +``` + +For **news-collector** Dockerfile: +```dockerfile +FROM python:3.12-slim AS builder +WORKDIR /app +COPY shared/ shared/ +RUN pip install --no-cache-dir ./shared +COPY services/news-collector/ services/news-collector/ +RUN pip install --no-cache-dir ./services/news-collector +RUN python -c "import nltk; nltk.download('vader_lexicon', download_dir='/usr/local/nltk_data')" + +FROM python:3.12-slim +RUN useradd -r -s /bin/false appuser +WORKDIR /app +COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages +COPY --from=builder /usr/local/bin /usr/local/bin +COPY --from=builder /usr/local/nltk_data /usr/local/nltk_data +ENV PYTHONPATH=/app +USER appuser +CMD ["python", "-m", "news_collector.main"] +``` + +For **api** Dockerfile: +```dockerfile +FROM python:3.12-slim AS builder +WORKDIR /app +COPY shared/ shared/ +RUN pip install --no-cache-dir ./shared +COPY services/api/ services/api/ +RUN pip install --no-cache-dir ./services/api +COPY services/strategy-engine/ services/strategy-engine/ +RUN pip install --no-cache-dir ./services/strategy-engine + +FROM python:3.12-slim +RUN useradd -r -s /bin/false appuser +WORKDIR /app +COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages +COPY --from=builder /usr/local/bin /usr/local/bin +COPY services/strategy-engine/strategies/ /app/strategies/ +ENV PYTHONPATH=/app STRATEGIES_DIR=/app/strategies +USER appuser +CMD ["uvicorn", "trading_api.main:app", "--host", "0.0.0.0", "--port", "8000", "--timeout-graceful-shutdown", "30"] +``` + +For **order-executor**, **portfolio-manager**, **backtester** — same pattern as data-collector, adjusting the service name and CMD. + +- [ ] **Step 4: Verify Docker build works** + +Run: `docker compose build --quiet` +Expected: All images build successfully + +- [ ] **Step 5: Commit** + +```bash +git add .dockerignore services/*/Dockerfile +git commit -m "feat: optimize Dockerfiles with multi-stage builds, non-root user, .dockerignore" +``` + +--- + +### Task 8: Database Index Migration + +**Files:** +- Create: `shared/alembic/versions/003_add_missing_indexes.py` + +- [ ] **Step 1: Create migration file** + +Create `shared/alembic/versions/003_add_missing_indexes.py`: + +```python +"""Add missing indexes for common query patterns. + +Revision ID: 003 +Revises: 002 +""" + +from alembic import op + +revision = "003" +down_revision = "002" + + +def upgrade(): + op.create_index("idx_signals_symbol_created", "signals", ["symbol", "created_at"]) + op.create_index("idx_orders_symbol_status_created", "orders", ["symbol", "status", "created_at"]) + op.create_index("idx_trades_order_id", "trades", ["order_id"]) + op.create_index("idx_trades_symbol_traded", "trades", ["symbol", "traded_at"]) + op.create_index("idx_portfolio_snapshots_at", "portfolio_snapshots", ["snapshot_at"]) + op.create_index("idx_symbol_scores_symbol", "symbol_scores", ["symbol"], unique=True) + + +def downgrade(): + op.drop_index("idx_symbol_scores_symbol", table_name="symbol_scores") + op.drop_index("idx_portfolio_snapshots_at", table_name="portfolio_snapshots") + op.drop_index("idx_trades_symbol_traded", table_name="trades") + op.drop_index("idx_trades_order_id", table_name="trades") + op.drop_index("idx_orders_symbol_status_created", table_name="orders") + op.drop_index("idx_signals_symbol_created", table_name="signals") +``` + +- [ ] **Step 2: Verify migration runs (requires infra)** + +Run: `make infra && cd shared && alembic upgrade head` +Expected: Migration 003 applied successfully + +- [ ] **Step 3: Commit** + +```bash +git add shared/alembic/versions/003_add_missing_indexes.py +git commit -m "feat: add missing DB indexes for signals, orders, trades, snapshots" +``` + +--- + +### Task 9: Docker Compose Resource Limits & Networks + +**Files:** +- Modify: `docker-compose.yml` + +- [ ] **Step 1: Add networks and resource limits** + +Add to `docker-compose.yml` at bottom: + +```yaml +networks: + internal: + driver: bridge + monitoring: + driver: bridge +``` + +Add `networks: [internal]` to all application services (redis, postgres, data-collector, strategy-engine, order-executor, portfolio-manager, api, news-collector). + +Add `networks: [internal, monitoring]` to prometheus, grafana. Add `networks: [monitoring]` to loki, promtail. + +Add to each application service: + +```yaml + deploy: + resources: + limits: + memory: 512M + cpus: '1.0' +``` + +For strategy-engine and backtester, use `memory: 1G` instead. + +- [ ] **Step 2: Verify compose config is valid** + +Run: `docker compose config --quiet` +Expected: No errors + +- [ ] **Step 3: Commit** + +```bash +git add docker-compose.yml +git commit -m "feat: add resource limits and network isolation to docker-compose" +``` + +--- + +## Phase 3: Service-Level Improvements + +### Task 10: Graceful Shutdown for All Services + +**Files:** +- Modify: `services/data-collector/src/data_collector/main.py` +- Modify: `services/strategy-engine/src/strategy_engine/main.py` +- Modify: `services/order-executor/src/order_executor/main.py` +- Modify: `services/portfolio-manager/src/portfolio_manager/main.py` +- Modify: `services/news-collector/src/news_collector/main.py` +- Modify: `services/api/src/trading_api/main.py` + +- [ ] **Step 1: Create a shared shutdown helper** + +Add to `shared/src/shared/shutdown.py`: + +```python +"""Graceful shutdown utilities for services.""" + +import asyncio +import logging +import signal + +logger = logging.getLogger(__name__) + + +class GracefulShutdown: + """Manages graceful shutdown via SIGTERM/SIGINT signals.""" + + def __init__(self) -> None: + self._event = asyncio.Event() + + @property + def is_shutting_down(self) -> bool: + return self._event.is_set() + + async def wait(self) -> None: + await self._event.wait() + + def trigger(self) -> None: + logger.info("shutdown_signal_received") + self._event.set() + + def install_handlers(self) -> None: + loop = asyncio.get_running_loop() + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, self.trigger) +``` + +- [ ] **Step 2: Add shutdown to data-collector main loop** + +In `services/data-collector/src/data_collector/main.py`, add at the start of `run()`: + +```python +from shared.shutdown import GracefulShutdown + +shutdown = GracefulShutdown() +shutdown.install_handlers() +``` + +Replace the main `while True` loop condition with `while not shutdown.is_shutting_down`. + +- [ ] **Step 3: Apply same pattern to all other services** + +For each service's `main.py`, add `GracefulShutdown` import, install handlers at start of `run()`, and replace infinite loops with `while not shutdown.is_shutting_down`. + +For strategy-engine: also cancel tasks on shutdown. +For portfolio-manager: also cancel snapshot_loop task. +For news-collector: also cancel all collector loop tasks. + +- [ ] **Step 4: Run tests** + +Run: `pytest -v` +Expected: All tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add shared/src/shared/shutdown.py services/*/src/*/main.py +git commit -m "feat: add graceful shutdown with SIGTERM/SIGINT handlers to all services" +``` + +--- + +### Task 11: Exception Handling Specialization + +**Files:** +- Modify: All service `main.py` files +- Modify: `shared/src/shared/db.py` + +- [ ] **Step 1: Specialize exceptions in data-collector/main.py** + +Replace broad `except Exception` blocks. For example, in the fetch loop: + +```python +# Before: +except Exception as exc: + log.warning("fetch_bar_failed", symbol=symbol, error=str(exc)) + +# After: +except (ConnectionError, TimeoutError, aiohttp.ClientError) as exc: + log.warning("fetch_bar_network_error", symbol=symbol, error=str(exc)) +except (ValueError, KeyError) as exc: + log.warning("fetch_bar_parse_error", symbol=symbol, error=str(exc)) +except Exception as exc: + log.error("fetch_bar_unexpected", symbol=symbol, error=str(exc), exc_info=True) +``` + +- [ ] **Step 2: Specialize exceptions in strategy-engine, order-executor, portfolio-manager, news-collector** + +Apply the same pattern: network errors → warning + retry, parse errors → warning + skip, unexpected → error + exc_info. + +- [ ] **Step 3: Specialize exceptions in db.py** + +In `shared/src/shared/db.py`, the transaction pattern can distinguish: + +```python +except (asyncpg.PostgresError, sqlalchemy.exc.OperationalError) as exc: + await session.rollback() + logger.error("db_operation_error", error=str(exc)) + raise +except Exception: + await session.rollback() + raise +``` + +- [ ] **Step 4: Run tests** + +Run: `pytest -v` +Expected: All tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add services/*/src/*/main.py shared/src/shared/db.py +git commit -m "refactor: specialize exception handling across all services" +``` + +--- + +### Task 12: Fix Stock Selector (Bug Fix + Dedup + Session Reuse) + +**Files:** +- Modify: `services/strategy-engine/src/strategy_engine/stock_selector.py` +- Modify: `services/strategy-engine/tests/test_stock_selector.py` (if exists, otherwise create) + +- [ ] **Step 1: Fix the critical bug on line 217** + +In `stock_selector.py` line 217, replace: +```python +self._session = anthropic_model +``` +with: +```python +self._model = anthropic_model +``` + +- [ ] **Step 2: Extract common JSON parsing function** + +Replace the duplicate parsing logic. Add at module level (replacing `_parse_llm_selections`): + +```python +def _extract_json_array(text: str) -> list[dict] | None: + """Extract a JSON array from text that may contain markdown code blocks.""" + code_block = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL) + if code_block: + raw = code_block.group(1) + else: + array_match = re.search(r"\[.*\]", text, re.DOTALL) + if array_match: + raw = array_match.group(0) + else: + raw = text.strip() + + try: + data = json.loads(raw) + if isinstance(data, list): + return [item for item in data if isinstance(item, dict)] + return None + except (json.JSONDecodeError, TypeError): + return None + + +def _parse_llm_selections(text: str) -> list[SelectedStock]: + """Parse LLM response into SelectedStock list.""" + items = _extract_json_array(text) + if items is None: + return [] + selections = [] + for item in items: + try: + selections.append( + SelectedStock( + symbol=item["symbol"], + side=OrderSide(item["side"]), + conviction=float(item["conviction"]), + reason=item.get("reason", ""), + key_news=item.get("key_news", []), + ) + ) + except (KeyError, ValueError) as e: + logger.warning("Skipping invalid selection item: %s", e) + return selections +``` + +Update `LLMCandidateSource._parse_candidates()` to use `_extract_json_array`: + +```python + def _parse_candidates(self, text: str) -> list[Candidate]: + items = _extract_json_array(text) + if items is None: + return [] + candidates = [] + for item in items: + try: + direction_str = item.get("direction", "BUY") + direction = OrderSide(direction_str) + except ValueError: + direction = None + candidates.append( + Candidate( + symbol=item["symbol"], + source="llm", + direction=direction, + score=float(item.get("score", 0.5)), + reason=item.get("reason", ""), + ) + ) + return candidates +``` + +- [ ] **Step 3: Add session reuse to StockSelector** + +Add `_http_session` to `StockSelector.__init__()`: + +```python +self._http_session: aiohttp.ClientSession | None = None +``` + +Add helper method: + +```python +async def _ensure_session(self) -> aiohttp.ClientSession: + if self._http_session is None or self._http_session.closed: + self._http_session = aiohttp.ClientSession() + return self._http_session + +async def close(self) -> None: + if self._http_session and not self._http_session.closed: + await self._http_session.close() +``` + +Replace `async with aiohttp.ClientSession() as session:` in both `LLMCandidateSource.get_candidates()` and `StockSelector._llm_final_select()` with session reuse. For `LLMCandidateSource`, accept an optional session parameter. For `StockSelector._llm_final_select()`, use `self._ensure_session()`. + +- [ ] **Step 4: Run tests** + +Run: `pytest services/strategy-engine/tests/ -v` +Expected: All tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add services/strategy-engine/src/strategy_engine/stock_selector.py +git commit -m "fix: fix model attr bug, deduplicate LLM parsing, reuse aiohttp sessions" +``` + +--- + +## Phase 4: API Security + +### Task 13: Bearer Token Authentication + +**Files:** +- Create: `services/api/src/trading_api/dependencies/__init__.py` +- Create: `services/api/src/trading_api/dependencies/auth.py` +- Modify: `services/api/src/trading_api/main.py` + +- [ ] **Step 1: Create auth dependency** + +Create `services/api/src/trading_api/dependencies/__init__.py` (empty file). + +Create `services/api/src/trading_api/dependencies/auth.py`: + +```python +"""Bearer token authentication dependency.""" + +import logging + +from fastapi import Depends, HTTPException, status +from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer + +from shared.config import Settings + +logger = logging.getLogger(__name__) + +_security = HTTPBearer(auto_error=False) +_settings = Settings() + + +async def verify_token( + credentials: HTTPAuthorizationCredentials | None = Depends(_security), +) -> None: + """Verify Bearer token. Skip auth if API_AUTH_TOKEN is not configured.""" + token = _settings.api_auth_token.get_secret_value() + if not token: + return # Auth disabled in dev mode + + if credentials is None or credentials.credentials != token: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid or missing authentication token", + headers={"WWW-Authenticate": "Bearer"}, + ) +``` + +- [ ] **Step 2: Apply auth to all API routes** + +In `services/api/src/trading_api/main.py`, add auth dependency to routers: + +```python +from trading_api.dependencies.auth import verify_token +from fastapi import Depends + +app.include_router(portfolio_router, prefix="/api/v1/portfolio", dependencies=[Depends(verify_token)]) +app.include_router(orders_router, prefix="/api/v1/orders", dependencies=[Depends(verify_token)]) +app.include_router(strategies_router, prefix="/api/v1/strategies", dependencies=[Depends(verify_token)]) +``` + +Log a warning on startup if token is empty: + +```python +@asynccontextmanager +async def lifespan(app: FastAPI): + cfg = Settings() + if not cfg.api_auth_token.get_secret_value(): + logger.warning("API_AUTH_TOKEN not set; API authentication is disabled") + # ... rest of lifespan +``` + +- [ ] **Step 3: Write tests for auth** + +Add to `services/api/tests/test_auth.py`: + +```python +"""Tests for API authentication.""" + +from unittest.mock import patch + +import pytest +from httpx import ASGITransport, AsyncClient + +from trading_api.main import app + + +class TestAuth: + @patch("trading_api.dependencies.auth._settings") + async def test_rejects_missing_token_when_configured(self, mock_settings): + from pydantic import SecretStr + mock_settings.api_auth_token = SecretStr("test-token") + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac: + resp = await ac.get("/api/v1/portfolio/positions") + assert resp.status_code == 401 + + @patch("trading_api.dependencies.auth._settings") + async def test_accepts_valid_token(self, mock_settings): + from pydantic import SecretStr + mock_settings.api_auth_token = SecretStr("test-token") + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac: + resp = await ac.get( + "/api/v1/portfolio/positions", + headers={"Authorization": "Bearer test-token"}, + ) + # May fail with 500 if DB not available, but should NOT be 401 + assert resp.status_code != 401 +``` + +- [ ] **Step 4: Run tests** + +Run: `pytest services/api/tests/test_auth.py -v` +Expected: PASS + +- [ ] **Step 5: Commit** + +```bash +git add services/api/src/trading_api/dependencies/ services/api/src/trading_api/main.py services/api/tests/test_auth.py +git commit -m "feat: add Bearer token authentication to API endpoints" +``` + +--- + +### Task 14: CORS & Rate Limiting + +**Files:** +- Modify: `services/api/src/trading_api/main.py` +- Modify: `services/api/pyproject.toml` + +- [ ] **Step 1: Add slowapi dependency** + +Already done in Task 5 (`services/api/pyproject.toml` has `slowapi>=0.1.9,<1`). + +- [ ] **Step 2: Add CORS and rate limiting to main.py** + +In `services/api/src/trading_api/main.py`: + +```python +from fastapi.middleware.cors import CORSMiddleware +from slowapi import Limiter, _rate_limit_exceeded_handler +from slowapi.util import get_remote_address +from slowapi.errors import RateLimitExceeded + +from shared.config import Settings + +cfg = Settings() + +limiter = Limiter(key_func=get_remote_address) +app = FastAPI(title="Trading Platform API") +app.state.limiter = limiter +app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) + +app.add_middleware( + CORSMiddleware, + allow_origins=cfg.cors_origins.split(","), + allow_methods=["GET", "POST"], + allow_headers=["Authorization", "Content-Type"], +) +``` + +- [ ] **Step 3: Add rate limits to order endpoints** + +In `services/api/src/trading_api/routers/orders.py`: + +```python +from slowapi import Limiter +from slowapi.util import get_remote_address + +limiter = Limiter(key_func=get_remote_address) + +@router.get("/") +@limiter.limit("60/minute") +async def get_orders(request: Request, limit: int = 50): + ... +``` + +- [ ] **Step 4: Run tests** + +Run: `pytest services/api/tests/ -v` +Expected: PASS + +- [ ] **Step 5: Commit** + +```bash +git add services/api/src/trading_api/main.py services/api/src/trading_api/routers/ +git commit -m "feat: add CORS middleware and rate limiting to API" +``` + +--- + +### Task 15: API Input Validation & Response Models + +**Files:** +- Modify: `services/api/src/trading_api/routers/portfolio.py` +- Modify: `services/api/src/trading_api/routers/orders.py` + +- [ ] **Step 1: Add Query validation to portfolio.py** + +```python +from fastapi import Query + +@router.get("/snapshots") +async def get_snapshots(request: Request, days: int = Query(30, ge=1, le=365)): + ... +``` + +- [ ] **Step 2: Add Query validation to orders.py** + +```python +from fastapi import Query + +@router.get("/") +async def get_orders(request: Request, limit: int = Query(50, ge=1, le=1000)): + ... + +@router.get("/signals") +async def get_signals(request: Request, limit: int = Query(50, ge=1, le=1000)): + ... +``` + +- [ ] **Step 3: Run tests** + +Run: `pytest services/api/tests/ -v` +Expected: PASS + +- [ ] **Step 4: Commit** + +```bash +git add services/api/src/trading_api/routers/ +git commit -m "feat: add input validation with Query bounds to API endpoints" +``` + +--- + +## Phase 5: Operational Maturity + +### Task 16: Enhanced Ruff Configuration + +**Files:** +- Modify: `pyproject.toml:12-14` + +- [ ] **Step 1: Update ruff config in pyproject.toml** + +Replace the ruff section in root `pyproject.toml`: + +```toml +[tool.ruff] +target-version = "py312" +line-length = 100 + +[tool.ruff.lint] +select = ["E", "W", "F", "I", "B", "UP", "ASYNC", "PERF", "C4", "RUF"] +ignore = ["E501"] + +[tool.ruff.lint.per-file-ignores] +"tests/*" = ["F841"] +"*/tests/*" = ["F841"] + +[tool.ruff.lint.isort] +known-first-party = ["shared"] +``` + +- [ ] **Step 2: Auto-fix existing violations** + +Run: `ruff check --fix . && ruff format .` +Expected: Fixes applied + +- [ ] **Step 3: Verify no remaining errors** + +Run: `ruff check . && ruff format --check .` +Expected: No errors + +- [ ] **Step 4: Run tests to verify no regressions** + +Run: `pytest -v` +Expected: All tests PASS + +- [ ] **Step 5: Commit** + +```bash +git add pyproject.toml +git commit -m "chore: enhance ruff lint rules with ASYNC, bugbear, isort, pyupgrade" +``` + +Then commit auto-fixes separately: + +```bash +git add -A +git commit -m "style: auto-fix lint violations from enhanced ruff rules" +``` + +--- + +### Task 17: GitHub Actions CI Pipeline + +**Files:** +- Create: `.github/workflows/ci.yml` + +- [ ] **Step 1: Create CI workflow** + +Create `.github/workflows/ci.yml`: + +```yaml +name: CI + +on: + push: + branches: [master] + pull_request: + branches: [master] + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + - run: pip install ruff + - run: ruff check . + - run: ruff format --check . + + test: + runs-on: ubuntu-latest + services: + redis: + image: redis:7-alpine + ports: [6379:6379] + options: >- + --health-cmd "redis-cli ping" + --health-interval 5s + --health-timeout 3s + --health-retries 5 + postgres: + image: postgres:16-alpine + env: + POSTGRES_USER: trading + POSTGRES_PASSWORD: trading + POSTGRES_DB: trading + ports: [5432:5432] + options: >- + --health-cmd pg_isready + --health-interval 5s + --health-timeout 3s + --health-retries 5 + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + - run: | + pip install -e shared/[dev] + pip install -e services/strategy-engine/[dev] + pip install -e services/data-collector/[dev] + pip install -e services/order-executor/[dev] + pip install -e services/portfolio-manager/[dev] + pip install -e services/news-collector/[dev] + pip install -e services/api/[dev] + pip install -e services/backtester/[dev] + pip install pytest-cov + - run: pytest -v --cov=shared/src --cov=services --cov-report=xml --cov-report=term-missing + env: + DATABASE_URL: postgresql+asyncpg://trading:trading@localhost:5432/trading + REDIS_URL: redis://localhost:6379 + - uses: actions/upload-artifact@v4 + with: + name: coverage-report + path: coverage.xml + + docker: + runs-on: ubuntu-latest + needs: [lint, test] + if: github.ref == 'refs/heads/master' + steps: + - uses: actions/checkout@v4 + - run: docker compose build --quiet +``` + +- [ ] **Step 2: Commit** + +```bash +mkdir -p .github/workflows +git add .github/workflows/ci.yml +git commit -m "feat: add GitHub Actions CI pipeline with lint, test, docker build" +``` + +--- + +### Task 18: Prometheus Alerting Rules + +**Files:** +- Create: `monitoring/prometheus/alert_rules.yml` +- Modify: `monitoring/prometheus.yml` + +- [ ] **Step 1: Create alert rules** + +Create `monitoring/prometheus/alert_rules.yml`: + +```yaml +groups: + - name: trading-platform + rules: + - alert: ServiceDown + expr: up == 0 + for: 1m + labels: + severity: critical + annotations: + summary: "Service {{ $labels.job }} is down" + description: "{{ $labels.instance }} has been unreachable for 1 minute." + + - alert: HighErrorRate + expr: rate(errors_total[5m]) > 10 + for: 2m + labels: + severity: warning + annotations: + summary: "High error rate on {{ $labels.job }}" + description: "Error rate is {{ $value }} errors/sec over 5 minutes." + + - alert: HighProcessingLatency + expr: histogram_quantile(0.95, rate(processing_seconds_bucket[5m])) > 5 + for: 5m + labels: + severity: warning + annotations: + summary: "High p95 latency on {{ $labels.job }}" + description: "95th percentile processing time is {{ $value }}s." +``` + +- [ ] **Step 2: Reference alert rules in prometheus.yml** + +In `monitoring/prometheus.yml`, add after `global:`: + +```yaml +rule_files: + - "/etc/prometheus/alert_rules.yml" +``` + +Update `docker-compose.yml` prometheus service to mount the file: + +```yaml + prometheus: + volumes: + - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml + - ./monitoring/prometheus/alert_rules.yml:/etc/prometheus/alert_rules.yml +``` + +- [ ] **Step 3: Commit** + +```bash +git add monitoring/prometheus/alert_rules.yml monitoring/prometheus.yml docker-compose.yml +git commit -m "feat: add Prometheus alerting rules for service health, errors, latency" +``` + +--- + +### Task 19: Code Coverage Configuration + +**Files:** +- Modify: `pyproject.toml` + +- [ ] **Step 1: Add pytest-cov config** + +Add to `pyproject.toml`: + +```toml +[tool.coverage.run] +branch = true +source = ["shared/src", "services"] +omit = ["*/tests/*", "*/alembic/*"] + +[tool.coverage.report] +fail_under = 60 +show_missing = true +exclude_lines = [ + "pragma: no cover", + "if __name__", + "if TYPE_CHECKING", +] +``` + +Update pytest addopts: +```toml +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["shared/tests", "services", "cli/tests", "tests"] +addopts = "--import-mode=importlib" +``` + +Note: `--cov` flags are passed explicitly in CI, not in addopts (to avoid slowing local dev). + +- [ ] **Step 2: Verify coverage works** + +Run: `pip install pytest-cov && pytest --cov=shared/src --cov-report=term-missing` +Expected: Coverage report printed, no errors + +- [ ] **Step 3: Commit** + +```bash +git add pyproject.toml +git commit -m "chore: add pytest-cov configuration with 60% minimum coverage threshold" +``` + +--- + +## Summary + +| Phase | Tasks | Estimated Commits | +|-------|-------|-------------------| +| 1: Shared Library | Tasks 1-5 | 5 commits | +| 2: Infrastructure | Tasks 6-9 | 4 commits | +| 3: Service Fixes | Tasks 10-12 | 3 commits | +| 4: API Security | Tasks 13-15 | 3 commits | +| 5: Operations | Tasks 16-19 | 5 commits | +| **Total** | **19 tasks** | **~20 commits** | -- cgit v1.2.3