# Platform Upgrade Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Upgrade the trading platform across 5 phases: shared library hardening, infrastructure improvements, service-level fixes, API security, and operational maturity. **Architecture:** Bottom-up approach — harden the shared library first (resilience, DB pooling, Redis resilience, config validation), then improve infrastructure (Docker, DB indexes), then fix all services (graceful shutdown, exception handling), then add API security (auth, CORS, rate limiting), and finally improve operations (CI/CD, linting, alerting). **Tech Stack:** Python 3.12, asyncio, tenacity, SQLAlchemy 2.0 async, Redis Streams, FastAPI, slowapi, Ruff, GitHub Actions, Prometheus Alertmanager --- ## File Structure ### New Files - `shared/src/shared/resilience.py` — Retry decorator, circuit breaker, timeout wrapper - `shared/tests/test_resilience.py` — Tests for resilience module - `shared/alembic/versions/003_add_missing_indexes.py` — DB index migration - `.dockerignore` — Docker build exclusions - `services/api/src/trading_api/dependencies/auth.py` — Bearer token auth dependency - `.github/workflows/ci.yml` — GitHub Actions CI pipeline - `monitoring/prometheus/alert_rules.yml` — Prometheus alerting rules ### Modified Files - `shared/src/shared/db.py` — Add connection pool config - `shared/src/shared/broker.py` — Add Redis resilience - `shared/src/shared/config.py` — Add validators, SecretStr, new fields - `shared/pyproject.toml` — Pin deps, add tenacity - `pyproject.toml` — Enhanced ruff rules, pytest-cov - `services/strategy-engine/src/strategy_engine/stock_selector.py` — Fix bug, deduplicate, session reuse - `services/*/src/*/main.py` — Signal handlers, exception specialization (all 6 services) - `services/*/Dockerfile` — Multi-stage builds, non-root user (all 7 Dockerfiles) - `services/api/pyproject.toml` — Add slowapi - `services/api/src/trading_api/main.py` — CORS, auth, rate limiting - `services/api/src/trading_api/routers/*.py` — Input validation, response models - `docker-compose.yml` — Remove hardcoded creds, add resource limits, networks - `.env.example` — Add new fields, mark secrets - `monitoring/prometheus.yml` — Reference alert rules --- ## Phase 1: Shared Library Hardening ### Task 1: Implement Resilience Module **Files:** - Create: `shared/src/shared/resilience.py` - Create: `shared/tests/test_resilience.py` - Modify: `shared/pyproject.toml:6-18` - [ ] **Step 1: Add tenacity dependency to shared/pyproject.toml** In `shared/pyproject.toml`, add `tenacity` to the dependencies list: ```python dependencies = [ "pydantic>=2.8,<3", "pydantic-settings>=2.0,<3", "redis>=5.0,<6", "asyncpg>=0.29,<1", "sqlalchemy[asyncio]>=2.0,<3", "alembic>=1.13,<2", "structlog>=24.0,<25", "prometheus-client>=0.20,<1", "pyyaml>=6.0,<7", "aiohttp>=3.9,<4", "rich>=13.0,<14", "tenacity>=8.2,<10", ] ``` Note: This also pins all existing dependencies with upper bounds. - [ ] **Step 2: Write failing tests for retry_async** Create `shared/tests/test_resilience.py`: ```python """Tests for the resilience module.""" import asyncio import pytest from shared.resilience import retry_async, CircuitBreaker, async_timeout class TestRetryAsync: async def test_succeeds_without_retry(self): call_count = 0 @retry_async(max_retries=3) async def succeed(): nonlocal call_count call_count += 1 return "ok" result = await succeed() assert result == "ok" assert call_count == 1 async def test_retries_on_failure_then_succeeds(self): call_count = 0 @retry_async(max_retries=3, base_delay=0.01) async def fail_twice(): nonlocal call_count call_count += 1 if call_count < 3: raise ConnectionError("fail") return "ok" result = await fail_twice() assert result == "ok" assert call_count == 3 async def test_raises_after_max_retries(self): @retry_async(max_retries=2, base_delay=0.01) async def always_fail(): raise ConnectionError("fail") with pytest.raises(ConnectionError): await always_fail() async def test_no_retry_on_excluded_exception(self): call_count = 0 @retry_async(max_retries=3, base_delay=0.01, exclude=(ValueError,)) async def raise_value_error(): nonlocal call_count call_count += 1 raise ValueError("bad input") with pytest.raises(ValueError): await raise_value_error() assert call_count == 1 class TestCircuitBreaker: async def test_closed_allows_calls(self): cb = CircuitBreaker(failure_threshold=3, cooldown=0.1) async def succeed(): return "ok" result = await cb.call(succeed) assert result == "ok" async def test_opens_after_threshold(self): cb = CircuitBreaker(failure_threshold=2, cooldown=60) async def fail(): raise ConnectionError("fail") for _ in range(2): with pytest.raises(ConnectionError): await cb.call(fail) with pytest.raises(RuntimeError, match="Circuit breaker is open"): await cb.call(fail) async def test_half_open_after_cooldown(self): cb = CircuitBreaker(failure_threshold=2, cooldown=0.05) call_count = 0 async def fail_then_succeed(): nonlocal call_count call_count += 1 if call_count <= 2: raise ConnectionError("fail") return "recovered" # Trip the breaker for _ in range(2): with pytest.raises(ConnectionError): await cb.call(fail_then_succeed) # Wait for cooldown await asyncio.sleep(0.1) # Should allow one call (half-open) result = await cb.call(fail_then_succeed) assert result == "recovered" class TestAsyncTimeout: async def test_completes_within_timeout(self): async with async_timeout(1.0): await asyncio.sleep(0.01) async def test_raises_on_timeout(self): with pytest.raises(asyncio.TimeoutError): async with async_timeout(0.01): await asyncio.sleep(1.0) ``` - [ ] **Step 3: Run tests to verify they fail** Run: `pytest shared/tests/test_resilience.py -v` Expected: FAIL with `ImportError: cannot import name 'retry_async' from 'shared.resilience'` - [ ] **Step 4: Implement resilience module** Write `shared/src/shared/resilience.py`: ```python """Resilience utilities: retry, circuit breaker, timeout.""" import asyncio import functools import logging import time from contextlib import asynccontextmanager logger = logging.getLogger(__name__) def retry_async( max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 30.0, exclude: tuple[type[Exception], ...] = (), ): """Decorator for async functions with exponential backoff + jitter. Args: max_retries: Maximum number of retry attempts. base_delay: Initial delay in seconds between retries. max_delay: Maximum delay cap in seconds. exclude: Exception types that should NOT be retried. """ def decorator(func): @functools.wraps(func) async def wrapper(*args, **kwargs): last_exc = None for attempt in range(max_retries + 1): try: return await func(*args, **kwargs) except exclude: raise except Exception as exc: last_exc = exc if attempt == max_retries: raise delay = min(base_delay * (2**attempt), max_delay) # Add jitter: 50-100% of delay import random delay = delay * (0.5 + random.random() * 0.5) logger.warning( "retry attempt=%d/%d delay=%.2fs error=%s func=%s", attempt + 1, max_retries, delay, str(exc), func.__name__, ) await asyncio.sleep(delay) raise last_exc # Should not reach here, but just in case return wrapper return decorator class CircuitBreaker: """Circuit breaker: opens after consecutive failures, auto-recovers after cooldown.""" def __init__(self, failure_threshold: int = 5, cooldown: float = 60.0) -> None: self._failure_threshold = failure_threshold self._cooldown = cooldown self._failure_count = 0 self._last_failure_time: float = 0 self._state = "closed" # closed, open, half_open async def call(self, func, *args, **kwargs): if self._state == "open": if time.monotonic() - self._last_failure_time >= self._cooldown: self._state = "half_open" else: raise RuntimeError("Circuit breaker is open") try: result = await func(*args, **kwargs) self._failure_count = 0 self._state = "closed" return result except Exception: self._failure_count += 1 self._last_failure_time = time.monotonic() if self._failure_count >= self._failure_threshold: self._state = "open" logger.error( "circuit_breaker_opened failures=%d cooldown=%.0fs", self._failure_count, self._cooldown, ) raise @asynccontextmanager async def async_timeout(seconds: float): """Async context manager that raises TimeoutError after given seconds.""" try: async with asyncio.timeout(seconds): yield except TimeoutError: raise asyncio.TimeoutError(f"Operation timed out after {seconds}s") ``` - [ ] **Step 5: Run tests to verify they pass** Run: `pytest shared/tests/test_resilience.py -v` Expected: All 8 tests PASS - [ ] **Step 6: Commit** ```bash git add shared/src/shared/resilience.py shared/tests/test_resilience.py shared/pyproject.toml git commit -m "feat: implement resilience module with retry, circuit breaker, timeout" ``` --- ### Task 2: Add DB Connection Pooling **Files:** - Modify: `shared/src/shared/db.py:39-44` - Modify: `shared/src/shared/config.py:10-11` - Modify: `shared/tests/test_db.py` (add pool config test) - [ ] **Step 1: Write failing test for pool config** Add to `shared/tests/test_db.py`: ```python async def test_connect_configures_pool(tmp_path): """Engine should be created with pool configuration.""" db = Database("sqlite+aiosqlite:///:memory:") await db.connect() engine = db._engine pool = engine.pool # aiosqlite uses StaticPool so we just verify connect works assert engine is not None await db.close() ``` - [ ] **Step 2: Add pool settings to config.py** In `shared/src/shared/config.py`, add after line 11 (`database_url`): ```python db_pool_size: int = 20 db_max_overflow: int = 10 db_pool_recycle: int = 3600 ``` - [ ] **Step 3: Update Database.connect() with pool parameters** In `shared/src/shared/db.py`, replace line 41: ```python self._engine = create_async_engine(self._database_url) ``` with: ```python self._engine = create_async_engine( self._database_url, pool_pre_ping=True, pool_size=pool_size, max_overflow=max_overflow, pool_recycle=pool_recycle, ) ``` Update the `connect` method signature to accept pool params: ```python async def connect( self, pool_size: int = 20, max_overflow: int = 10, pool_recycle: int = 3600, ) -> None: """Create the async engine, session factory, and all tables.""" if self._database_url.startswith("sqlite"): self._engine = create_async_engine(self._database_url) else: self._engine = create_async_engine( self._database_url, pool_pre_ping=True, pool_size=pool_size, max_overflow=max_overflow, pool_recycle=pool_recycle, ) self._session_factory = async_sessionmaker(self._engine, expire_on_commit=False) async with self._engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) ``` - [ ] **Step 4: Run tests** Run: `pytest shared/tests/test_db.py -v` Expected: PASS - [ ] **Step 5: Commit** ```bash git add shared/src/shared/db.py shared/src/shared/config.py shared/tests/test_db.py git commit -m "feat: add DB connection pooling with configurable pool_size, overflow, recycle" ``` --- ### Task 3: Add Redis Resilience **Files:** - Modify: `shared/src/shared/broker.py:1-13,15-18,102-104` - Create: `shared/tests/test_broker_resilience.py` - [ ] **Step 1: Write failing tests for Redis resilience** Create `shared/tests/test_broker_resilience.py`: ```python """Tests for Redis broker resilience features.""" from unittest.mock import AsyncMock, patch import pytest from shared.broker import RedisBroker class TestBrokerResilience: async def test_publish_retries_on_connection_error(self): broker = RedisBroker.__new__(RedisBroker) mock_redis = AsyncMock() call_count = 0 async def xadd_failing(*args, **kwargs): nonlocal call_count call_count += 1 if call_count < 3: raise ConnectionError("Redis connection lost") return "msg-id" mock_redis.xadd = xadd_failing broker._redis = mock_redis await broker.publish("test-stream", {"key": "value"}) assert call_count == 3 async def test_ping_retries_on_timeout(self): broker = RedisBroker.__new__(RedisBroker) mock_redis = AsyncMock() call_count = 0 async def ping_failing(): nonlocal call_count call_count += 1 if call_count < 2: raise TimeoutError("timeout") return True mock_redis.ping = ping_failing broker._redis = mock_redis result = await broker.ping() assert result is True assert call_count == 2 ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest shared/tests/test_broker_resilience.py -v` Expected: FAIL (publish doesn't retry) - [ ] **Step 3: Add resilience to broker.py** Replace `shared/src/shared/broker.py`: ```python """Redis Streams broker for the trading platform.""" import json import logging from typing import Any import redis.asyncio from shared.resilience import retry_async logger = logging.getLogger(__name__) class RedisBroker: """Async Redis Streams broker for publishing and reading events.""" def __init__(self, redis_url: str) -> None: self._redis = redis.asyncio.from_url( redis_url, socket_keepalive=True, health_check_interval=30, retry_on_timeout=True, ) @retry_async(max_retries=3, base_delay=0.5, exclude=(ValueError,)) async def publish(self, stream: str, data: dict[str, Any]) -> None: """Publish a message to a Redis stream.""" payload = json.dumps(data) await self._redis.xadd(stream, {"payload": payload}) async def ensure_group(self, stream: str, group: str) -> None: """Create a consumer group if it doesn't exist.""" try: await self._redis.xgroup_create(stream, group, id="0", mkstream=True) except redis.ResponseError as e: if "BUSYGROUP" not in str(e): raise @retry_async(max_retries=3, base_delay=0.5, exclude=(ValueError,)) async def read_group( self, stream: str, group: str, consumer: str, count: int = 10, block: int = 0, ) -> list[tuple[str, dict[str, Any]]]: """Read messages from a consumer group. Returns list of (message_id, data).""" results = await self._redis.xreadgroup( group, consumer, {stream: ">"}, count=count, block=block ) messages = [] if results: for _stream, entries in results: for msg_id, fields in entries: payload = fields.get(b"payload") or fields.get("payload") if payload: if isinstance(payload, bytes): payload = payload.decode() if isinstance(msg_id, bytes): msg_id = msg_id.decode() messages.append((msg_id, json.loads(payload))) return messages async def ack(self, stream: str, group: str, *msg_ids: str) -> None: """Acknowledge messages in a consumer group.""" if msg_ids: await self._redis.xack(stream, group, *msg_ids) async def read_pending( self, stream: str, group: str, consumer: str, count: int = 10, ) -> list[tuple[str, dict[str, Any]]]: """Read pending (unacknowledged) messages for this consumer.""" results = await self._redis.xreadgroup(group, consumer, {stream: "0"}, count=count) messages = [] if results: for _stream, entries in results: for msg_id, fields in entries: if not fields: continue payload = fields.get(b"payload") or fields.get("payload") if payload: if isinstance(payload, bytes): payload = payload.decode() if isinstance(msg_id, bytes): msg_id = msg_id.decode() messages.append((msg_id, json.loads(payload))) return messages async def read( self, stream: str, last_id: str = "$", count: int = 10, block: int = 0, ) -> list[dict[str, Any]]: """Read messages (original method, kept for backward compatibility).""" results = await self._redis.xread({stream: last_id}, count=count, block=block) messages = [] if results: for _stream, entries in results: for _msg_id, fields in entries: payload = fields.get(b"payload") or fields.get("payload") if payload: if isinstance(payload, bytes): payload = payload.decode() messages.append(json.loads(payload)) return messages @retry_async(max_retries=2, base_delay=0.5) async def ping(self) -> bool: """Ping the Redis server; return True if reachable.""" return await self._redis.ping() async def close(self) -> None: """Close the Redis connection.""" await self._redis.aclose() ``` - [ ] **Step 4: Run tests** Run: `pytest shared/tests/test_broker_resilience.py -v` Expected: PASS Run: `pytest shared/tests/test_broker.py -v` Expected: PASS (existing tests still work) - [ ] **Step 5: Commit** ```bash git add shared/src/shared/broker.py shared/tests/test_broker_resilience.py git commit -m "feat: add retry and resilience to Redis broker with keepalive" ``` --- ### Task 4: Config Validation & SecretStr **Files:** - Modify: `shared/src/shared/config.py` - Create: `shared/tests/test_config_validation.py` - [ ] **Step 1: Write failing tests for config validation** Create `shared/tests/test_config_validation.py`: ```python """Tests for config validation.""" import pytest from pydantic import ValidationError from shared.config import Settings class TestConfigValidation: def test_valid_defaults(self): settings = Settings() assert settings.risk_max_position_size == 0.1 def test_invalid_position_size(self): with pytest.raises(ValidationError, match="risk_max_position_size"): Settings(risk_max_position_size=-0.1) def test_invalid_health_port(self): with pytest.raises(ValidationError, match="health_port"): Settings(health_port=80) def test_invalid_log_level(self): with pytest.raises(ValidationError, match="log_level"): Settings(log_level="INVALID") def test_secret_fields_masked(self): settings = Settings(alpaca_api_key="my-secret-key") assert "my-secret-key" not in repr(settings) assert settings.alpaca_api_key.get_secret_value() == "my-secret-key" ``` - [ ] **Step 2: Run tests to verify they fail** Run: `pytest shared/tests/test_config_validation.py -v` Expected: FAIL - [ ] **Step 3: Update config.py with validators and SecretStr** Replace `shared/src/shared/config.py`: ```python """Shared configuration settings for the trading platform.""" from pydantic import SecretStr, field_validator from pydantic_settings import BaseSettings class Settings(BaseSettings): # Alpaca alpaca_api_key: SecretStr = SecretStr("") alpaca_api_secret: SecretStr = SecretStr("") alpaca_paper: bool = True # Infrastructure redis_url: SecretStr = SecretStr("redis://localhost:6379") database_url: SecretStr = SecretStr("postgresql://trading:trading@localhost:5432/trading") # DB pool db_pool_size: int = 20 db_max_overflow: int = 10 db_pool_recycle: int = 3600 # Logging log_level: str = "INFO" log_format: str = "json" # Health health_port: int = 8080 metrics_auth_token: str = "" # Risk risk_max_position_size: float = 0.1 risk_stop_loss_pct: float = 5.0 risk_daily_loss_limit_pct: float = 10.0 risk_trailing_stop_pct: float = 0.0 risk_max_open_positions: int = 10 risk_volatility_lookback: int = 20 risk_volatility_scale: bool = False risk_max_portfolio_exposure: float = 0.8 risk_max_correlated_exposure: float = 0.5 risk_correlation_threshold: float = 0.7 risk_var_confidence: float = 0.95 risk_var_limit_pct: float = 5.0 risk_drawdown_reduction_threshold: float = 0.1 risk_drawdown_halt_threshold: float = 0.2 risk_max_consecutive_losses: int = 5 risk_loss_pause_minutes: int = 60 dry_run: bool = True # Telegram telegram_bot_token: SecretStr = SecretStr("") telegram_chat_id: str = "" telegram_enabled: bool = False # News finnhub_api_key: SecretStr = SecretStr("") news_poll_interval: int = 300 sentiment_aggregate_interval: int = 900 # Stock selector selector_final_time: str = "15:30" selector_max_picks: int = 3 # LLM anthropic_api_key: SecretStr = SecretStr("") anthropic_model: str = "claude-sonnet-4-20250514" # API security api_auth_token: SecretStr = SecretStr("") cors_origins: str = "http://localhost:3000" model_config = {"env_file": ".env", "env_file_encoding": "utf-8", "extra": "ignore"} @field_validator("risk_max_position_size") @classmethod def validate_position_size(cls, v: float) -> float: if v <= 0 or v > 1: raise ValueError("risk_max_position_size must be between 0 and 1 (exclusive)") return v @field_validator("health_port") @classmethod def validate_health_port(cls, v: int) -> int: if v < 1024 or v > 65535: raise ValueError("health_port must be between 1024 and 65535") return v @field_validator("log_level") @classmethod def validate_log_level(cls, v: str) -> str: valid = {"DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"} if v.upper() not in valid: raise ValueError(f"log_level must be one of {valid}") return v.upper() ``` - [ ] **Step 4: Update all consumers to use .get_secret_value()** Every place that reads `settings.alpaca_api_key` etc. must now call `.get_secret_value()`. Key files to update: **`shared/src/shared/alpaca.py`** — where AlpacaClient is instantiated (in each service main.py), change: ```python # Before: alpaca = AlpacaClient(cfg.alpaca_api_key, cfg.alpaca_api_secret, paper=cfg.alpaca_paper) # After: alpaca = AlpacaClient( cfg.alpaca_api_key.get_secret_value(), cfg.alpaca_api_secret.get_secret_value(), paper=cfg.alpaca_paper, ) ``` **Each service main.py** — where `Database(cfg.database_url)` and `RedisBroker(cfg.redis_url)` are called: ```python # Before: db = Database(cfg.database_url) broker = RedisBroker(cfg.redis_url) # After: db = Database(cfg.database_url.get_secret_value()) broker = RedisBroker(cfg.redis_url.get_secret_value()) ``` **`shared/src/shared/notifier.py`** — where telegram_bot_token is used: ```python # Change token access to .get_secret_value() ``` **`services/strategy-engine/src/strategy_engine/main.py`** — where anthropic_api_key is passed: ```python # Before: anthropic_api_key=cfg.anthropic_api_key, # After: anthropic_api_key=cfg.anthropic_api_key.get_secret_value(), ``` **`services/news-collector/src/news_collector/main.py`** — where finnhub_api_key is used: ```python # Before: cfg.finnhub_api_key # After: cfg.finnhub_api_key.get_secret_value() ``` - [ ] **Step 5: Run all tests** Run: `pytest shared/tests/test_config_validation.py -v` Expected: PASS Run: `pytest -v` Expected: All tests PASS (no regressions from SecretStr changes) - [ ] **Step 6: Commit** ```bash git add shared/src/shared/config.py shared/tests/test_config_validation.py git add services/*/src/*/main.py shared/src/shared/notifier.py git commit -m "feat: add config validation, SecretStr for secrets, API security fields" ``` --- ### Task 5: Pin All Dependencies **Files:** - Modify: `shared/pyproject.toml` (already done in Task 1) - Modify: `services/strategy-engine/pyproject.toml` - Modify: `services/backtester/pyproject.toml` - Modify: `services/api/pyproject.toml` - Modify: `services/news-collector/pyproject.toml` - Modify: `services/data-collector/pyproject.toml` - Modify: `services/order-executor/pyproject.toml` - Modify: `services/portfolio-manager/pyproject.toml` - [ ] **Step 1: Pin service dependencies** `services/strategy-engine/pyproject.toml`: ```toml dependencies = [ "pandas>=2.1,<3", "numpy>=1.26,<3", "trading-shared", ] ``` `services/backtester/pyproject.toml`: ```toml dependencies = ["pandas>=2.1,<3", "numpy>=1.26,<3", "rich>=13.0,<14", "trading-shared"] ``` `services/api/pyproject.toml`: ```toml dependencies = [ "fastapi>=0.110,<1", "uvicorn>=0.27,<1", "slowapi>=0.1.9,<1", "trading-shared", ] ``` `services/news-collector/pyproject.toml`: ```toml dependencies = [ "trading-shared", "feedparser>=6.0,<7", "nltk>=3.8,<4", "aiohttp>=3.9,<4", ] ``` `shared/pyproject.toml` optional deps: ```toml [project.optional-dependencies] dev = [ "pytest>=8.0,<9", "pytest-asyncio>=0.23,<1", "ruff>=0.4,<1", ] claude = [ "anthropic>=0.40,<1", ] ``` - [ ] **Step 2: Verify installation works** Run: `pip install -e shared/ && pip install -e services/strategy-engine/ && pip install -e services/api/` Expected: No errors - [ ] **Step 3: Commit** ```bash git add shared/pyproject.toml services/*/pyproject.toml git commit -m "chore: pin all dependencies with upper bounds" ``` --- ## Phase 2: Infrastructure Hardening ### Task 6: Docker Secrets & Environment Cleanup **Files:** - Modify: `docker-compose.yml:17-21` - Modify: `.env.example` - [ ] **Step 1: Replace hardcoded Postgres credentials in docker-compose.yml** In `docker-compose.yml`, replace the postgres service environment: ```yaml postgres: image: postgres:16-alpine environment: POSTGRES_USER: ${POSTGRES_USER:-trading} POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-trading} POSTGRES_DB: ${POSTGRES_DB:-trading} ``` - [ ] **Step 2: Update .env.example with secret annotations** Add to `.env.example`: ```bash # === SECRETS (keep secure, do not commit .env) === ALPACA_API_KEY= ALPACA_API_SECRET= DATABASE_URL=postgresql+asyncpg://trading:trading@localhost:5432/trading REDIS_URL=redis://localhost:6379 TELEGRAM_BOT_TOKEN= FINNHUB_API_KEY= ANTHROPIC_API_KEY= API_AUTH_TOKEN= POSTGRES_USER=trading POSTGRES_PASSWORD=trading POSTGRES_DB=trading # === CONFIGURATION === ALPACA_PAPER=true DRY_RUN=true LOG_LEVEL=INFO LOG_FORMAT=json HEALTH_PORT=8080 # ... (keep existing config vars) # === API SECURITY === CORS_ORIGINS=http://localhost:3000 ``` - [ ] **Step 3: Commit** ```bash git add docker-compose.yml .env.example git commit -m "fix: move hardcoded postgres credentials to .env, annotate secrets" ``` --- ### Task 7: Dockerfile Optimization **Files:** - Create: `.dockerignore` - Modify: All 7 Dockerfiles in `services/*/Dockerfile` - [ ] **Step 1: Create .dockerignore** Create `.dockerignore` at project root: ``` __pycache__ *.pyc *.pyo .git .github .venv .env .env.* !.env.example tests/ docs/ *.md .ruff_cache .pytest_cache .mypy_cache monitoring/ scripts/ cli/ ``` - [ ] **Step 2: Update data-collector Dockerfile** Replace `services/data-collector/Dockerfile`: ```dockerfile FROM python:3.12-slim AS builder WORKDIR /app COPY shared/ shared/ RUN pip install --no-cache-dir ./shared COPY services/data-collector/ services/data-collector/ RUN pip install --no-cache-dir ./services/data-collector FROM python:3.12-slim RUN useradd -r -s /bin/false appuser WORKDIR /app COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages COPY --from=builder /usr/local/bin /usr/local/bin ENV PYTHONPATH=/app USER appuser CMD ["python", "-m", "data_collector.main"] ``` - [ ] **Step 3: Update all other Dockerfiles with same pattern** Apply the same multi-stage + non-root pattern to: - `services/strategy-engine/Dockerfile` (also copies strategies/) - `services/order-executor/Dockerfile` - `services/portfolio-manager/Dockerfile` - `services/api/Dockerfile` (also copies strategies/, uses uvicorn CMD) - `services/news-collector/Dockerfile` (also runs nltk download) - `services/backtester/Dockerfile` (also copies strategies/) For **strategy-engine** Dockerfile: ```dockerfile FROM python:3.12-slim AS builder WORKDIR /app COPY shared/ shared/ RUN pip install --no-cache-dir ./shared COPY services/strategy-engine/ services/strategy-engine/ RUN pip install --no-cache-dir ./services/strategy-engine FROM python:3.12-slim RUN useradd -r -s /bin/false appuser WORKDIR /app COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages COPY --from=builder /usr/local/bin /usr/local/bin COPY services/strategy-engine/strategies/ /app/strategies/ ENV PYTHONPATH=/app USER appuser CMD ["python", "-m", "strategy_engine.main"] ``` For **news-collector** Dockerfile: ```dockerfile FROM python:3.12-slim AS builder WORKDIR /app COPY shared/ shared/ RUN pip install --no-cache-dir ./shared COPY services/news-collector/ services/news-collector/ RUN pip install --no-cache-dir ./services/news-collector RUN python -c "import nltk; nltk.download('vader_lexicon', download_dir='/usr/local/nltk_data')" FROM python:3.12-slim RUN useradd -r -s /bin/false appuser WORKDIR /app COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages COPY --from=builder /usr/local/bin /usr/local/bin COPY --from=builder /usr/local/nltk_data /usr/local/nltk_data ENV PYTHONPATH=/app USER appuser CMD ["python", "-m", "news_collector.main"] ``` For **api** Dockerfile: ```dockerfile FROM python:3.12-slim AS builder WORKDIR /app COPY shared/ shared/ RUN pip install --no-cache-dir ./shared COPY services/api/ services/api/ RUN pip install --no-cache-dir ./services/api COPY services/strategy-engine/ services/strategy-engine/ RUN pip install --no-cache-dir ./services/strategy-engine FROM python:3.12-slim RUN useradd -r -s /bin/false appuser WORKDIR /app COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages COPY --from=builder /usr/local/bin /usr/local/bin COPY services/strategy-engine/strategies/ /app/strategies/ ENV PYTHONPATH=/app STRATEGIES_DIR=/app/strategies USER appuser CMD ["uvicorn", "trading_api.main:app", "--host", "0.0.0.0", "--port", "8000", "--timeout-graceful-shutdown", "30"] ``` For **order-executor**, **portfolio-manager**, **backtester** — same pattern as data-collector, adjusting the service name and CMD. - [ ] **Step 4: Verify Docker build works** Run: `docker compose build --quiet` Expected: All images build successfully - [ ] **Step 5: Commit** ```bash git add .dockerignore services/*/Dockerfile git commit -m "feat: optimize Dockerfiles with multi-stage builds, non-root user, .dockerignore" ``` --- ### Task 8: Database Index Migration **Files:** - Create: `shared/alembic/versions/003_add_missing_indexes.py` - [ ] **Step 1: Create migration file** Create `shared/alembic/versions/003_add_missing_indexes.py`: ```python """Add missing indexes for common query patterns. Revision ID: 003 Revises: 002 """ from alembic import op revision = "003" down_revision = "002" def upgrade(): op.create_index("idx_signals_symbol_created", "signals", ["symbol", "created_at"]) op.create_index("idx_orders_symbol_status_created", "orders", ["symbol", "status", "created_at"]) op.create_index("idx_trades_order_id", "trades", ["order_id"]) op.create_index("idx_trades_symbol_traded", "trades", ["symbol", "traded_at"]) op.create_index("idx_portfolio_snapshots_at", "portfolio_snapshots", ["snapshot_at"]) op.create_index("idx_symbol_scores_symbol", "symbol_scores", ["symbol"], unique=True) def downgrade(): op.drop_index("idx_symbol_scores_symbol", table_name="symbol_scores") op.drop_index("idx_portfolio_snapshots_at", table_name="portfolio_snapshots") op.drop_index("idx_trades_symbol_traded", table_name="trades") op.drop_index("idx_trades_order_id", table_name="trades") op.drop_index("idx_orders_symbol_status_created", table_name="orders") op.drop_index("idx_signals_symbol_created", table_name="signals") ``` - [ ] **Step 2: Verify migration runs (requires infra)** Run: `make infra && cd shared && alembic upgrade head` Expected: Migration 003 applied successfully - [ ] **Step 3: Commit** ```bash git add shared/alembic/versions/003_add_missing_indexes.py git commit -m "feat: add missing DB indexes for signals, orders, trades, snapshots" ``` --- ### Task 9: Docker Compose Resource Limits & Networks **Files:** - Modify: `docker-compose.yml` - [ ] **Step 1: Add networks and resource limits** Add to `docker-compose.yml` at bottom: ```yaml networks: internal: driver: bridge monitoring: driver: bridge ``` Add `networks: [internal]` to all application services (redis, postgres, data-collector, strategy-engine, order-executor, portfolio-manager, api, news-collector). Add `networks: [internal, monitoring]` to prometheus, grafana. Add `networks: [monitoring]` to loki, promtail. Add to each application service: ```yaml deploy: resources: limits: memory: 512M cpus: '1.0' ``` For strategy-engine and backtester, use `memory: 1G` instead. - [ ] **Step 2: Verify compose config is valid** Run: `docker compose config --quiet` Expected: No errors - [ ] **Step 3: Commit** ```bash git add docker-compose.yml git commit -m "feat: add resource limits and network isolation to docker-compose" ``` --- ## Phase 3: Service-Level Improvements ### Task 10: Graceful Shutdown for All Services **Files:** - Modify: `services/data-collector/src/data_collector/main.py` - Modify: `services/strategy-engine/src/strategy_engine/main.py` - Modify: `services/order-executor/src/order_executor/main.py` - Modify: `services/portfolio-manager/src/portfolio_manager/main.py` - Modify: `services/news-collector/src/news_collector/main.py` - Modify: `services/api/src/trading_api/main.py` - [ ] **Step 1: Create a shared shutdown helper** Add to `shared/src/shared/shutdown.py`: ```python """Graceful shutdown utilities for services.""" import asyncio import logging import signal logger = logging.getLogger(__name__) class GracefulShutdown: """Manages graceful shutdown via SIGTERM/SIGINT signals.""" def __init__(self) -> None: self._event = asyncio.Event() @property def is_shutting_down(self) -> bool: return self._event.is_set() async def wait(self) -> None: await self._event.wait() def trigger(self) -> None: logger.info("shutdown_signal_received") self._event.set() def install_handlers(self) -> None: loop = asyncio.get_running_loop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, self.trigger) ``` - [ ] **Step 2: Add shutdown to data-collector main loop** In `services/data-collector/src/data_collector/main.py`, add at the start of `run()`: ```python from shared.shutdown import GracefulShutdown shutdown = GracefulShutdown() shutdown.install_handlers() ``` Replace the main `while True` loop condition with `while not shutdown.is_shutting_down`. - [ ] **Step 3: Apply same pattern to all other services** For each service's `main.py`, add `GracefulShutdown` import, install handlers at start of `run()`, and replace infinite loops with `while not shutdown.is_shutting_down`. For strategy-engine: also cancel tasks on shutdown. For portfolio-manager: also cancel snapshot_loop task. For news-collector: also cancel all collector loop tasks. - [ ] **Step 4: Run tests** Run: `pytest -v` Expected: All tests PASS - [ ] **Step 5: Commit** ```bash git add shared/src/shared/shutdown.py services/*/src/*/main.py git commit -m "feat: add graceful shutdown with SIGTERM/SIGINT handlers to all services" ``` --- ### Task 11: Exception Handling Specialization **Files:** - Modify: All service `main.py` files - Modify: `shared/src/shared/db.py` - [ ] **Step 1: Specialize exceptions in data-collector/main.py** Replace broad `except Exception` blocks. For example, in the fetch loop: ```python # Before: except Exception as exc: log.warning("fetch_bar_failed", symbol=symbol, error=str(exc)) # After: except (ConnectionError, TimeoutError, aiohttp.ClientError) as exc: log.warning("fetch_bar_network_error", symbol=symbol, error=str(exc)) except (ValueError, KeyError) as exc: log.warning("fetch_bar_parse_error", symbol=symbol, error=str(exc)) except Exception as exc: log.error("fetch_bar_unexpected", symbol=symbol, error=str(exc), exc_info=True) ``` - [ ] **Step 2: Specialize exceptions in strategy-engine, order-executor, portfolio-manager, news-collector** Apply the same pattern: network errors → warning + retry, parse errors → warning + skip, unexpected → error + exc_info. - [ ] **Step 3: Specialize exceptions in db.py** In `shared/src/shared/db.py`, the transaction pattern can distinguish: ```python except (asyncpg.PostgresError, sqlalchemy.exc.OperationalError) as exc: await session.rollback() logger.error("db_operation_error", error=str(exc)) raise except Exception: await session.rollback() raise ``` - [ ] **Step 4: Run tests** Run: `pytest -v` Expected: All tests PASS - [ ] **Step 5: Commit** ```bash git add services/*/src/*/main.py shared/src/shared/db.py git commit -m "refactor: specialize exception handling across all services" ``` --- ### Task 12: Fix Stock Selector (Bug Fix + Dedup + Session Reuse) **Files:** - Modify: `services/strategy-engine/src/strategy_engine/stock_selector.py` - Modify: `services/strategy-engine/tests/test_stock_selector.py` (if exists, otherwise create) - [ ] **Step 1: Fix the critical bug on line 217** In `stock_selector.py` line 217, replace: ```python self._session = anthropic_model ``` with: ```python self._model = anthropic_model ``` - [ ] **Step 2: Extract common JSON parsing function** Replace the duplicate parsing logic. Add at module level (replacing `_parse_llm_selections`): ```python def _extract_json_array(text: str) -> list[dict] | None: """Extract a JSON array from text that may contain markdown code blocks.""" code_block = re.search(r"```(?:json)?\s*(\[.*?\])\s*```", text, re.DOTALL) if code_block: raw = code_block.group(1) else: array_match = re.search(r"\[.*\]", text, re.DOTALL) if array_match: raw = array_match.group(0) else: raw = text.strip() try: data = json.loads(raw) if isinstance(data, list): return [item for item in data if isinstance(item, dict)] return None except (json.JSONDecodeError, TypeError): return None def _parse_llm_selections(text: str) -> list[SelectedStock]: """Parse LLM response into SelectedStock list.""" items = _extract_json_array(text) if items is None: return [] selections = [] for item in items: try: selections.append( SelectedStock( symbol=item["symbol"], side=OrderSide(item["side"]), conviction=float(item["conviction"]), reason=item.get("reason", ""), key_news=item.get("key_news", []), ) ) except (KeyError, ValueError) as e: logger.warning("Skipping invalid selection item: %s", e) return selections ``` Update `LLMCandidateSource._parse_candidates()` to use `_extract_json_array`: ```python def _parse_candidates(self, text: str) -> list[Candidate]: items = _extract_json_array(text) if items is None: return [] candidates = [] for item in items: try: direction_str = item.get("direction", "BUY") direction = OrderSide(direction_str) except ValueError: direction = None candidates.append( Candidate( symbol=item["symbol"], source="llm", direction=direction, score=float(item.get("score", 0.5)), reason=item.get("reason", ""), ) ) return candidates ``` - [ ] **Step 3: Add session reuse to StockSelector** Add `_http_session` to `StockSelector.__init__()`: ```python self._http_session: aiohttp.ClientSession | None = None ``` Add helper method: ```python async def _ensure_session(self) -> aiohttp.ClientSession: if self._http_session is None or self._http_session.closed: self._http_session = aiohttp.ClientSession() return self._http_session async def close(self) -> None: if self._http_session and not self._http_session.closed: await self._http_session.close() ``` Replace `async with aiohttp.ClientSession() as session:` in both `LLMCandidateSource.get_candidates()` and `StockSelector._llm_final_select()` with session reuse. For `LLMCandidateSource`, accept an optional session parameter. For `StockSelector._llm_final_select()`, use `self._ensure_session()`. - [ ] **Step 4: Run tests** Run: `pytest services/strategy-engine/tests/ -v` Expected: All tests PASS - [ ] **Step 5: Commit** ```bash git add services/strategy-engine/src/strategy_engine/stock_selector.py git commit -m "fix: fix model attr bug, deduplicate LLM parsing, reuse aiohttp sessions" ``` --- ## Phase 4: API Security ### Task 13: Bearer Token Authentication **Files:** - Create: `services/api/src/trading_api/dependencies/__init__.py` - Create: `services/api/src/trading_api/dependencies/auth.py` - Modify: `services/api/src/trading_api/main.py` - [ ] **Step 1: Create auth dependency** Create `services/api/src/trading_api/dependencies/__init__.py` (empty file). Create `services/api/src/trading_api/dependencies/auth.py`: ```python """Bearer token authentication dependency.""" import logging from fastapi import Depends, HTTPException, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from shared.config import Settings logger = logging.getLogger(__name__) _security = HTTPBearer(auto_error=False) _settings = Settings() async def verify_token( credentials: HTTPAuthorizationCredentials | None = Depends(_security), ) -> None: """Verify Bearer token. Skip auth if API_AUTH_TOKEN is not configured.""" token = _settings.api_auth_token.get_secret_value() if not token: return # Auth disabled in dev mode if credentials is None or credentials.credentials != token: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or missing authentication token", headers={"WWW-Authenticate": "Bearer"}, ) ``` - [ ] **Step 2: Apply auth to all API routes** In `services/api/src/trading_api/main.py`, add auth dependency to routers: ```python from trading_api.dependencies.auth import verify_token from fastapi import Depends app.include_router(portfolio_router, prefix="/api/v1/portfolio", dependencies=[Depends(verify_token)]) app.include_router(orders_router, prefix="/api/v1/orders", dependencies=[Depends(verify_token)]) app.include_router(strategies_router, prefix="/api/v1/strategies", dependencies=[Depends(verify_token)]) ``` Log a warning on startup if token is empty: ```python @asynccontextmanager async def lifespan(app: FastAPI): cfg = Settings() if not cfg.api_auth_token.get_secret_value(): logger.warning("API_AUTH_TOKEN not set; API authentication is disabled") # ... rest of lifespan ``` - [ ] **Step 3: Write tests for auth** Add to `services/api/tests/test_auth.py`: ```python """Tests for API authentication.""" from unittest.mock import patch import pytest from httpx import ASGITransport, AsyncClient from trading_api.main import app class TestAuth: @patch("trading_api.dependencies.auth._settings") async def test_rejects_missing_token_when_configured(self, mock_settings): from pydantic import SecretStr mock_settings.api_auth_token = SecretStr("test-token") async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac: resp = await ac.get("/api/v1/portfolio/positions") assert resp.status_code == 401 @patch("trading_api.dependencies.auth._settings") async def test_accepts_valid_token(self, mock_settings): from pydantic import SecretStr mock_settings.api_auth_token = SecretStr("test-token") async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac: resp = await ac.get( "/api/v1/portfolio/positions", headers={"Authorization": "Bearer test-token"}, ) # May fail with 500 if DB not available, but should NOT be 401 assert resp.status_code != 401 ``` - [ ] **Step 4: Run tests** Run: `pytest services/api/tests/test_auth.py -v` Expected: PASS - [ ] **Step 5: Commit** ```bash git add services/api/src/trading_api/dependencies/ services/api/src/trading_api/main.py services/api/tests/test_auth.py git commit -m "feat: add Bearer token authentication to API endpoints" ``` --- ### Task 14: CORS & Rate Limiting **Files:** - Modify: `services/api/src/trading_api/main.py` - Modify: `services/api/pyproject.toml` - [ ] **Step 1: Add slowapi dependency** Already done in Task 5 (`services/api/pyproject.toml` has `slowapi>=0.1.9,<1`). - [ ] **Step 2: Add CORS and rate limiting to main.py** In `services/api/src/trading_api/main.py`: ```python from fastapi.middleware.cors import CORSMiddleware from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded from shared.config import Settings cfg = Settings() limiter = Limiter(key_func=get_remote_address) app = FastAPI(title="Trading Platform API") app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) app.add_middleware( CORSMiddleware, allow_origins=cfg.cors_origins.split(","), allow_methods=["GET", "POST"], allow_headers=["Authorization", "Content-Type"], ) ``` - [ ] **Step 3: Add rate limits to order endpoints** In `services/api/src/trading_api/routers/orders.py`: ```python from slowapi import Limiter from slowapi.util import get_remote_address limiter = Limiter(key_func=get_remote_address) @router.get("/") @limiter.limit("60/minute") async def get_orders(request: Request, limit: int = 50): ... ``` - [ ] **Step 4: Run tests** Run: `pytest services/api/tests/ -v` Expected: PASS - [ ] **Step 5: Commit** ```bash git add services/api/src/trading_api/main.py services/api/src/trading_api/routers/ git commit -m "feat: add CORS middleware and rate limiting to API" ``` --- ### Task 15: API Input Validation & Response Models **Files:** - Modify: `services/api/src/trading_api/routers/portfolio.py` - Modify: `services/api/src/trading_api/routers/orders.py` - [ ] **Step 1: Add Query validation to portfolio.py** ```python from fastapi import Query @router.get("/snapshots") async def get_snapshots(request: Request, days: int = Query(30, ge=1, le=365)): ... ``` - [ ] **Step 2: Add Query validation to orders.py** ```python from fastapi import Query @router.get("/") async def get_orders(request: Request, limit: int = Query(50, ge=1, le=1000)): ... @router.get("/signals") async def get_signals(request: Request, limit: int = Query(50, ge=1, le=1000)): ... ``` - [ ] **Step 3: Run tests** Run: `pytest services/api/tests/ -v` Expected: PASS - [ ] **Step 4: Commit** ```bash git add services/api/src/trading_api/routers/ git commit -m "feat: add input validation with Query bounds to API endpoints" ``` --- ## Phase 5: Operational Maturity ### Task 16: Enhanced Ruff Configuration **Files:** - Modify: `pyproject.toml:12-14` - [ ] **Step 1: Update ruff config in pyproject.toml** Replace the ruff section in root `pyproject.toml`: ```toml [tool.ruff] target-version = "py312" line-length = 100 [tool.ruff.lint] select = ["E", "W", "F", "I", "B", "UP", "ASYNC", "PERF", "C4", "RUF"] ignore = ["E501"] [tool.ruff.lint.per-file-ignores] "tests/*" = ["F841"] "*/tests/*" = ["F841"] [tool.ruff.lint.isort] known-first-party = ["shared"] ``` - [ ] **Step 2: Auto-fix existing violations** Run: `ruff check --fix . && ruff format .` Expected: Fixes applied - [ ] **Step 3: Verify no remaining errors** Run: `ruff check . && ruff format --check .` Expected: No errors - [ ] **Step 4: Run tests to verify no regressions** Run: `pytest -v` Expected: All tests PASS - [ ] **Step 5: Commit** ```bash git add pyproject.toml git commit -m "chore: enhance ruff lint rules with ASYNC, bugbear, isort, pyupgrade" ``` Then commit auto-fixes separately: ```bash git add -A git commit -m "style: auto-fix lint violations from enhanced ruff rules" ``` --- ### Task 17: GitHub Actions CI Pipeline **Files:** - Create: `.github/workflows/ci.yml` - [ ] **Step 1: Create CI workflow** Create `.github/workflows/ci.yml`: ```yaml name: CI on: push: branches: [master] pull_request: branches: [master] jobs: lint: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v5 with: python-version: "3.12" - run: pip install ruff - run: ruff check . - run: ruff format --check . test: runs-on: ubuntu-latest services: redis: image: redis:7-alpine ports: [6379:6379] options: >- --health-cmd "redis-cli ping" --health-interval 5s --health-timeout 3s --health-retries 5 postgres: image: postgres:16-alpine env: POSTGRES_USER: trading POSTGRES_PASSWORD: trading POSTGRES_DB: trading ports: [5432:5432] options: >- --health-cmd pg_isready --health-interval 5s --health-timeout 3s --health-retries 5 steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v5 with: python-version: "3.12" - run: | pip install -e shared/[dev] pip install -e services/strategy-engine/[dev] pip install -e services/data-collector/[dev] pip install -e services/order-executor/[dev] pip install -e services/portfolio-manager/[dev] pip install -e services/news-collector/[dev] pip install -e services/api/[dev] pip install -e services/backtester/[dev] pip install pytest-cov - run: pytest -v --cov=shared/src --cov=services --cov-report=xml --cov-report=term-missing env: DATABASE_URL: postgresql+asyncpg://trading:trading@localhost:5432/trading REDIS_URL: redis://localhost:6379 - uses: actions/upload-artifact@v4 with: name: coverage-report path: coverage.xml docker: runs-on: ubuntu-latest needs: [lint, test] if: github.ref == 'refs/heads/master' steps: - uses: actions/checkout@v4 - run: docker compose build --quiet ``` - [ ] **Step 2: Commit** ```bash mkdir -p .github/workflows git add .github/workflows/ci.yml git commit -m "feat: add GitHub Actions CI pipeline with lint, test, docker build" ``` --- ### Task 18: Prometheus Alerting Rules **Files:** - Create: `monitoring/prometheus/alert_rules.yml` - Modify: `monitoring/prometheus.yml` - [ ] **Step 1: Create alert rules** Create `monitoring/prometheus/alert_rules.yml`: ```yaml groups: - name: trading-platform rules: - alert: ServiceDown expr: up == 0 for: 1m labels: severity: critical annotations: summary: "Service {{ $labels.job }} is down" description: "{{ $labels.instance }} has been unreachable for 1 minute." - alert: HighErrorRate expr: rate(errors_total[5m]) > 10 for: 2m labels: severity: warning annotations: summary: "High error rate on {{ $labels.job }}" description: "Error rate is {{ $value }} errors/sec over 5 minutes." - alert: HighProcessingLatency expr: histogram_quantile(0.95, rate(processing_seconds_bucket[5m])) > 5 for: 5m labels: severity: warning annotations: summary: "High p95 latency on {{ $labels.job }}" description: "95th percentile processing time is {{ $value }}s." ``` - [ ] **Step 2: Reference alert rules in prometheus.yml** In `monitoring/prometheus.yml`, add after `global:`: ```yaml rule_files: - "/etc/prometheus/alert_rules.yml" ``` Update `docker-compose.yml` prometheus service to mount the file: ```yaml prometheus: volumes: - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml - ./monitoring/prometheus/alert_rules.yml:/etc/prometheus/alert_rules.yml ``` - [ ] **Step 3: Commit** ```bash git add monitoring/prometheus/alert_rules.yml monitoring/prometheus.yml docker-compose.yml git commit -m "feat: add Prometheus alerting rules for service health, errors, latency" ``` --- ### Task 19: Code Coverage Configuration **Files:** - Modify: `pyproject.toml` - [ ] **Step 1: Add pytest-cov config** Add to `pyproject.toml`: ```toml [tool.coverage.run] branch = true source = ["shared/src", "services"] omit = ["*/tests/*", "*/alembic/*"] [tool.coverage.report] fail_under = 60 show_missing = true exclude_lines = [ "pragma: no cover", "if __name__", "if TYPE_CHECKING", ] ``` Update pytest addopts: ```toml [tool.pytest.ini_options] asyncio_mode = "auto" testpaths = ["shared/tests", "services", "cli/tests", "tests"] addopts = "--import-mode=importlib" ``` Note: `--cov` flags are passed explicitly in CI, not in addopts (to avoid slowing local dev). - [ ] **Step 2: Verify coverage works** Run: `pip install pytest-cov && pytest --cov=shared/src --cov-report=term-missing` Expected: Coverage report printed, no errors - [ ] **Step 3: Commit** ```bash git add pyproject.toml git commit -m "chore: add pytest-cov configuration with 60% minimum coverage threshold" ``` --- ## Summary | Phase | Tasks | Estimated Commits | |-------|-------|-------------------| | 1: Shared Library | Tasks 1-5 | 5 commits | | 2: Infrastructure | Tasks 6-9 | 4 commits | | 3: Service Fixes | Tasks 10-12 | 3 commits | | 4: API Security | Tasks 13-15 | 3 commits | | 5: Operations | Tasks 16-19 | 5 commits | | **Total** | **19 tasks** | **~20 commits** |