# Operations Infrastructure & Strategy Expansion — Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Add production-grade operations infrastructure (SQLAlchemy ORM, Alembic migrations, structlog, Telegram alerts, resilience, Prometheus) and expand the strategy library (MACD, Bollinger, EMA Crossover, VWAP, Volume Profile) with enhanced backtesting metrics.
**Architecture:** Operations-first approach. Migrate the DB layer to SQLAlchemy 2.0 async, add structured logging and Telegram notifications as shared infrastructure, then build resilience and metrics on top. Strategy expansion builds on the stabilized platform with new BaseStrategy.warmup_period contract and YAML config loading.
**Tech Stack:** SQLAlchemy 2.0 async (asyncpg driver), Alembic, structlog, aiohttp (Telegram), prometheus-client, pyyaml, rich, pandas, numpy
---
## File Structure
### New Files
| File | Responsibility |
|------|---------------|
| `shared/src/shared/sa_models.py` | SQLAlchemy ORM table definitions |
| `shared/src/shared/logging.py` | structlog setup and Telegram error processor |
| `shared/src/shared/notifier.py` | TelegramNotifier class |
| `shared/src/shared/resilience.py` | retry_with_backoff decorator + CircuitBreaker |
| `shared/src/shared/healthcheck.py` | aiohttp-based /health + /metrics server |
| `shared/src/shared/metrics.py` | Prometheus metric definitions |
| `shared/alembic.ini` | Alembic config |
| `shared/alembic/env.py` | Alembic async environment |
| `shared/alembic/script.py.mako` | Alembic migration template |
| `shared/alembic/versions/` | Migration files (auto-generated) |
| `shared/tests/test_sa_models.py` | SA model tests |
| `shared/tests/test_logging.py` | structlog setup tests |
| `shared/tests/test_notifier.py` | TelegramNotifier tests |
| `shared/tests/test_resilience.py` | retry + circuit breaker tests |
| `shared/tests/test_healthcheck.py` | Healthcheck server tests |
| `shared/tests/test_metrics.py` | Prometheus metrics tests |
| `services/strategy-engine/strategies/config/rsi_strategy.yaml` | RSI params |
| `services/strategy-engine/strategies/config/grid_strategy.yaml` | Grid params |
| `services/strategy-engine/strategies/config/macd_strategy.yaml` | MACD params |
| `services/strategy-engine/strategies/config/bollinger_strategy.yaml` | Bollinger params |
| `services/strategy-engine/strategies/config/ema_crossover_strategy.yaml` | EMA params |
| `services/strategy-engine/strategies/config/vwap_strategy.yaml` | VWAP params |
| `services/strategy-engine/strategies/config/volume_profile_strategy.yaml` | Volume Profile params |
| `services/strategy-engine/strategies/macd_strategy.py` | MACD strategy |
| `services/strategy-engine/strategies/bollinger_strategy.py` | Bollinger Bands strategy |
| `services/strategy-engine/strategies/ema_crossover_strategy.py` | EMA Crossover strategy |
| `services/strategy-engine/strategies/vwap_strategy.py` | VWAP strategy |
| `services/strategy-engine/strategies/volume_profile_strategy.py` | Volume Profile strategy |
| `services/strategy-engine/tests/test_macd_strategy.py` | MACD tests |
| `services/strategy-engine/tests/test_bollinger_strategy.py` | Bollinger tests |
| `services/strategy-engine/tests/test_ema_crossover_strategy.py` | EMA Crossover tests |
| `services/strategy-engine/tests/test_vwap_strategy.py` | VWAP tests |
| `services/strategy-engine/tests/test_volume_profile_strategy.py` | Volume Profile tests |
| `services/backtester/src/backtester/metrics.py` | DetailedMetrics + TradeRecord |
| `services/backtester/tests/test_metrics.py` | Detailed metrics tests |
| `monitoring/prometheus.yml` | Prometheus scrape config |
### Modified Files
| File | Changes |
|------|---------|
| `shared/pyproject.toml` | Add sqlalchemy, alembic, structlog, prometheus-client, pyyaml |
| `shared/src/shared/config.py` | Add Telegram, health, circuit breaker settings |
| `shared/src/shared/db.py` | Rewrite to SQLAlchemy async session |
| `shared/src/shared/__init__.py` | Export new modules |
| `shared/tests/test_db.py` | Update for SQLAlchemy API |
| `services/strategy-engine/strategies/base.py` | Add warmup_period abstract property |
| `services/strategy-engine/strategies/rsi_strategy.py` | Add warmup_period, update for YAML config |
| `services/strategy-engine/strategies/grid_strategy.py` | Add warmup_period, update for YAML config |
| `services/strategy-engine/src/strategy_engine/plugin_loader.py` | Add YAML config loading |
| `services/strategy-engine/src/strategy_engine/main.py` | Use YAML config loader |
| `services/data-collector/src/data_collector/storage.py` | Use AsyncSession |
| `services/data-collector/src/data_collector/main.py` | Use structlog, healthcheck, resilience |
| `services/order-executor/src/order_executor/executor.py` | Use AsyncSession, notifier |
| `services/order-executor/src/order_executor/main.py` | Use structlog, healthcheck, resilience |
| `services/portfolio-manager/src/portfolio_manager/main.py` | Use structlog, healthcheck, daily summary |
| `services/backtester/src/backtester/engine.py` | Compute DetailedMetrics |
| `services/backtester/src/backtester/simulator.py` | Track entry/exit for TradeRecord |
| `services/backtester/src/backtester/reporter.py` | Rich table output, CSV/JSON export |
| `docker-compose.yml` | Add healthcheck endpoints, monitoring profile |
| `Makefile` | Add migrate, migrate-down, migrate-new targets |
| `.env.example` | Add Telegram, health, log format vars |
---
## Task 1: SQLAlchemy ORM Models + Alembic Setup
**Files:**
- Create: `shared/src/shared/sa_models.py`
- Create: `shared/alembic.ini`
- Create: `shared/alembic/env.py`
- Create: `shared/alembic/script.py.mako`
- Modify: `shared/pyproject.toml`
- Test: `shared/tests/test_sa_models.py`
- [ ] **Step 1: Add dependencies to shared/pyproject.toml**
```toml
[project]
name = "trading-shared"
version = "0.1.0"
description = "Shared models, events, and utilities for trading platform"
requires-python = ">=3.12"
dependencies = [
"pydantic>=2.0",
"pydantic-settings>=2.0",
"redis>=5.0",
"sqlalchemy[asyncio]>=2.0",
"asyncpg>=0.29",
"alembic>=1.13",
"structlog>=24.0",
"prometheus-client>=0.20",
"pyyaml>=6.0",
"aiohttp>=3.9",
"rich>=13.0",
]
```
- [ ] **Step 2: Write the failing test for SA models**
Create `shared/tests/test_sa_models.py`:
```python
"""Tests for SQLAlchemy ORM models."""
from datetime import datetime, timezone
from decimal import Decimal
from shared.sa_models import (
Base,
CandleRow,
SignalRow,
OrderRow,
TradeRow,
PositionRow,
PortfolioSnapshotRow,
)
def test_candle_row_table_name():
assert CandleRow.__tablename__ == "candles"
def test_candle_row_columns():
cols = {c.name for c in CandleRow.__table__.columns}
assert cols == {"symbol", "timeframe", "open_time", "open", "high", "low", "close", "volume"}
def test_signal_row_table_name():
assert SignalRow.__tablename__ == "signals"
def test_signal_row_columns():
cols = {c.name for c in SignalRow.__table__.columns}
assert cols == {"id", "strategy", "symbol", "side", "price", "quantity", "reason", "created_at"}
def test_order_row_table_name():
assert OrderRow.__tablename__ == "orders"
def test_order_row_columns():
cols = {c.name for c in OrderRow.__table__.columns}
assert cols == {
"id", "signal_id", "symbol", "side", "type", "price",
"quantity", "status", "created_at", "filled_at",
}
def test_trade_row_table_name():
assert TradeRow.__tablename__ == "trades"
def test_position_row_table_name():
assert PositionRow.__tablename__ == "positions"
def test_portfolio_snapshot_row_table_name():
assert PortfolioSnapshotRow.__tablename__ == "portfolio_snapshots"
def test_base_metadata_has_all_tables():
table_names = set(Base.metadata.tables.keys())
assert table_names == {
"candles", "signals", "orders", "trades", "positions", "portfolio_snapshots",
}
```
- [ ] **Step 3: Run test to verify it fails**
Run: `pytest shared/tests/test_sa_models.py -v`
Expected: FAIL with `ModuleNotFoundError: No module named 'shared.sa_models'`
- [ ] **Step 4: Implement SA models**
Create `shared/src/shared/sa_models.py`:
```python
"""SQLAlchemy ORM models for the trading platform."""
from datetime import datetime
from sqlalchemy import (
DateTime,
ForeignKey,
Integer,
Numeric,
String,
Text,
)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class CandleRow(Base):
__tablename__ = "candles"
symbol: Mapped[str] = mapped_column(String, primary_key=True)
timeframe: Mapped[str] = mapped_column(String, primary_key=True)
open_time: Mapped[datetime] = mapped_column(DateTime(timezone=True), primary_key=True)
open: Mapped[float] = mapped_column(Numeric, nullable=False)
high: Mapped[float] = mapped_column(Numeric, nullable=False)
low: Mapped[float] = mapped_column(Numeric, nullable=False)
close: Mapped[float] = mapped_column(Numeric, nullable=False)
volume: Mapped[float] = mapped_column(Numeric, nullable=False)
class SignalRow(Base):
__tablename__ = "signals"
id: Mapped[str] = mapped_column(String, primary_key=True)
strategy: Mapped[str] = mapped_column(String, nullable=False)
symbol: Mapped[str] = mapped_column(String, nullable=False)
side: Mapped[str] = mapped_column(String, nullable=False)
price: Mapped[float] = mapped_column(Numeric, nullable=False)
quantity: Mapped[float] = mapped_column(Numeric, nullable=False)
reason: Mapped[str | None] = mapped_column(Text)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
class OrderRow(Base):
__tablename__ = "orders"
id: Mapped[str] = mapped_column(String, primary_key=True)
signal_id: Mapped[str | None] = mapped_column(String, ForeignKey("signals.id"))
symbol: Mapped[str] = mapped_column(String, nullable=False)
side: Mapped[str] = mapped_column(String, nullable=False)
type: Mapped[str] = mapped_column(String, nullable=False)
price: Mapped[float] = mapped_column(Numeric, nullable=False)
quantity: Mapped[float] = mapped_column(Numeric, nullable=False)
status: Mapped[str] = mapped_column(String, nullable=False, default="PENDING")
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
filled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
class TradeRow(Base):
__tablename__ = "trades"
id: Mapped[str] = mapped_column(String, primary_key=True)
order_id: Mapped[str | None] = mapped_column(String, ForeignKey("orders.id"))
symbol: Mapped[str] = mapped_column(String, nullable=False)
side: Mapped[str] = mapped_column(String, nullable=False)
price: Mapped[float] = mapped_column(Numeric, nullable=False)
quantity: Mapped[float] = mapped_column(Numeric, nullable=False)
fee: Mapped[float] = mapped_column(Numeric, nullable=False, default=0)
traded_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
class PositionRow(Base):
__tablename__ = "positions"
symbol: Mapped[str] = mapped_column(String, primary_key=True)
quantity: Mapped[float] = mapped_column(Numeric, nullable=False)
avg_entry_price: Mapped[float] = mapped_column(Numeric, nullable=False)
current_price: Mapped[float] = mapped_column(Numeric, nullable=False)
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
class PortfolioSnapshotRow(Base):
__tablename__ = "portfolio_snapshots"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
total_value: Mapped[float] = mapped_column(Numeric, nullable=False)
realized_pnl: Mapped[float] = mapped_column(Numeric, nullable=False)
unrealized_pnl: Mapped[float] = mapped_column(Numeric, nullable=False)
snapshot_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
```
- [ ] **Step 5: Run test to verify it passes**
Run: `pytest shared/tests/test_sa_models.py -v`
Expected: All 8 tests PASS
- [ ] **Step 6: Set up Alembic**
Create `shared/alembic.ini`:
```ini
[alembic]
script_location = alembic
sqlalchemy.url = postgresql+asyncpg://trading:trading@localhost:5432/trading
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
```
Create `shared/alembic/script.py.mako`:
```mako
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}
```
Create `shared/alembic/env.py`:
```python
"""Alembic environment configuration for async SQLAlchemy."""
import asyncio
import os
from logging.config import fileConfig
from alembic import context
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import async_engine_from_config
from shared.sa_models import Base
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
# Override URL from environment if available
database_url = os.environ.get("DATABASE_URL")
if database_url:
# Ensure async driver prefix
if database_url.startswith("postgresql://"):
database_url = database_url.replace("postgresql://", "postgresql+asyncpg://", 1)
config.set_main_option("sqlalchemy.url", database_url)
def run_migrations_offline() -> None:
url = config.get_main_option("sqlalchemy.url")
context.configure(url=url, target_metadata=target_metadata, literal_binds=True)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection):
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
```
Create empty `shared/alembic/versions/` directory (with `.gitkeep`).
- [ ] **Step 7: Add Makefile targets**
Append to `Makefile`:
```makefile
migrate:
cd shared && alembic upgrade head
migrate-down:
cd shared && alembic downgrade -1
migrate-new:
cd shared && alembic revision --autogenerate -m "$(MSG)"
```
- [ ] **Step 8: Commit**
```bash
git add shared/src/shared/sa_models.py shared/alembic.ini shared/alembic/ \
shared/tests/test_sa_models.py shared/pyproject.toml Makefile
git commit -m "feat(shared): add SQLAlchemy ORM models and Alembic setup"
```
---
## Task 2: Rewrite Database Layer to SQLAlchemy Async
**Files:**
- Modify: `shared/src/shared/db.py`
- Modify: `shared/tests/test_db.py`
- [ ] **Step 1: Write the failing test for the new DB layer**
Replace `shared/tests/test_db.py`:
```python
"""Tests for the SQLAlchemy async database layer."""
from datetime import datetime, timezone
from decimal import Decimal
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from shared.db import Database
from shared.models import Candle, Signal, OrderSide, Order, OrderType, OrderStatus
@pytest.fixture
def db():
return Database("postgresql+asyncpg://trading:trading@localhost:5432/trading")
def test_database_stores_url(db):
assert db._database_url == "postgresql+asyncpg://trading:trading@localhost:5432/trading"
@pytest.mark.asyncio
async def test_get_session_returns_async_session(db):
"""Verify get_session is an async context manager (structural test)."""
# We can't connect without a real DB, but we verify the method exists
assert hasattr(db, "get_session")
assert callable(db.get_session)
@pytest.mark.asyncio
async def test_insert_candle_creates_candle_row():
"""Verify insert_candle adds a CandleRow to the session."""
db = Database("postgresql+asyncpg://test:test@localhost/test")
candle = Candle(
symbol="BTCUSDT",
timeframe="1m",
open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
open=Decimal("50000"),
high=Decimal("51000"),
low=Decimal("49000"),
close=Decimal("50500"),
volume=Decimal("100"),
)
mock_session = AsyncMock()
mock_session.__aenter__ = AsyncMock(return_value=mock_session)
mock_session.__aexit__ = AsyncMock(return_value=False)
with patch.object(db, "get_session", return_value=mock_session):
await db.insert_candle(candle)
mock_session.merge.assert_called_once()
mock_session.commit.assert_called_once()
@pytest.mark.asyncio
async def test_insert_signal_creates_signal_row():
db = Database("postgresql+asyncpg://test:test@localhost/test")
signal = Signal(
strategy="rsi",
symbol="BTCUSDT",
side=OrderSide.BUY,
price=Decimal("50000"),
quantity=Decimal("0.01"),
reason="test signal",
)
mock_session = AsyncMock()
mock_session.__aenter__ = AsyncMock(return_value=mock_session)
mock_session.__aexit__ = AsyncMock(return_value=False)
with patch.object(db, "get_session", return_value=mock_session):
await db.insert_signal(signal)
mock_session.add.assert_called_once()
mock_session.commit.assert_called_once()
@pytest.mark.asyncio
async def test_insert_order_creates_order_row():
db = Database("postgresql+asyncpg://test:test@localhost/test")
order = Order(
signal_id="sig-1",
symbol="BTCUSDT",
side=OrderSide.BUY,
type=OrderType.MARKET,
price=Decimal("50000"),
quantity=Decimal("0.01"),
)
mock_session = AsyncMock()
mock_session.__aenter__ = AsyncMock(return_value=mock_session)
mock_session.__aexit__ = AsyncMock(return_value=False)
with patch.object(db, "get_session", return_value=mock_session):
await db.insert_order(order)
mock_session.add.assert_called_once()
mock_session.commit.assert_called_once()
```
- [ ] **Step 2: Run test to verify it fails**
Run: `pytest shared/tests/test_db.py -v`
Expected: FAIL — old Database class doesn't have `get_session`
- [ ] **Step 3: Rewrite db.py with SQLAlchemy async**
Replace `shared/src/shared/db.py`:
```python
"""Database layer using SQLAlchemy async for the trading platform."""
from datetime import datetime, timezone
from decimal import Decimal
from typing import Optional
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from shared.models import Candle, Order, OrderStatus, Signal
from shared.sa_models import (
Base,
CandleRow,
OrderRow,
SignalRow,
)
class Database:
"""Async database access layer backed by SQLAlchemy."""
def __init__(self, database_url: str) -> None:
self._database_url = database_url
# Ensure async driver prefix
if self._database_url.startswith("postgresql://"):
self._database_url = self._database_url.replace(
"postgresql://", "postgresql+asyncpg://", 1
)
self._engine = create_async_engine(self._database_url)
self._session_factory = async_sessionmaker(self._engine, expire_on_commit=False)
def get_session(self) -> AsyncSession:
"""Return a new AsyncSession."""
return self._session_factory()
async def connect(self) -> None:
"""Create all tables (for dev/test — prefer Alembic in production)."""
async with self._engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def close(self) -> None:
"""Dispose of the engine."""
await self._engine.dispose()
# Alias for backward compatibility
async def init_tables(self) -> None:
await self.connect()
async def insert_candle(self, candle: Candle) -> None:
"""Upsert a candle row using merge."""
async with self.get_session() as session:
row = CandleRow(
symbol=candle.symbol,
timeframe=candle.timeframe,
open_time=candle.open_time,
open=candle.open,
high=candle.high,
low=candle.low,
close=candle.close,
volume=candle.volume,
)
await session.merge(row)
await session.commit()
async def insert_signal(self, signal: Signal) -> None:
"""Insert a signal row."""
async with self.get_session() as session:
row = SignalRow(
id=signal.id,
strategy=signal.strategy,
symbol=signal.symbol,
side=signal.side.value,
price=signal.price,
quantity=signal.quantity,
reason=signal.reason,
created_at=signal.created_at,
)
session.add(row)
await session.commit()
async def insert_order(self, order: Order) -> None:
"""Insert an order row."""
async with self.get_session() as session:
row = OrderRow(
id=order.id,
signal_id=order.signal_id,
symbol=order.symbol,
side=order.side.value,
type=order.type.value,
price=order.price,
quantity=order.quantity,
status=order.status.value,
created_at=order.created_at,
filled_at=order.filled_at,
)
session.add(row)
await session.commit()
async def update_order_status(
self,
order_id: str,
status: OrderStatus,
filled_at: Optional[datetime] = None,
) -> None:
"""Update the status of an order."""
async with self.get_session() as session:
stmt = (
update(OrderRow)
.where(OrderRow.id == order_id)
.values(status=status.value, filled_at=filled_at)
)
await session.execute(stmt)
await session.commit()
async def get_candles(
self, symbol: str, timeframe: str, limit: int = 500
) -> list[dict]:
"""Retrieve candles ordered by open_time descending."""
async with self.get_session() as session:
stmt = (
select(CandleRow)
.where(CandleRow.symbol == symbol, CandleRow.timeframe == timeframe)
.order_by(CandleRow.open_time.desc())
.limit(limit)
)
result = await session.execute(stmt)
rows = result.scalars().all()
return [
{
"symbol": r.symbol,
"timeframe": r.timeframe,
"open_time": r.open_time,
"open": r.open,
"high": r.high,
"low": r.low,
"close": r.close,
"volume": r.volume,
}
for r in rows
]
```
- [ ] **Step 4: Run test to verify it passes**
Run: `pytest shared/tests/test_db.py -v`
Expected: All tests PASS
- [ ] **Step 5: Commit**
```bash
git add shared/src/shared/db.py shared/tests/test_db.py
git commit -m "refactor(shared): rewrite db layer to SQLAlchemy 2.0 async"
```
---
## Task 3: Structured Logging with structlog
**Files:**
- Create: `shared/src/shared/logging.py`
- Test: `shared/tests/test_logging.py`
- [ ] **Step 1: Write the failing test**
Create `shared/tests/test_logging.py`:
```python
"""Tests for structured logging setup."""
import logging
import structlog
from shared.logging import setup_logging
def test_setup_logging_returns_logger():
logger = setup_logging("test-service", "INFO")
assert logger is not None
def test_setup_logging_binds_service_name():
logger = setup_logging("data-collector", "INFO")
# structlog loggers have _context with bound values
assert logger._context.get("service") == "data-collector"
def test_setup_logging_sets_log_level():
setup_logging("test-service", "DEBUG")
root = logging.getLogger()
assert root.level == logging.DEBUG
def test_setup_logging_json_format(capsys):
logger = setup_logging("test-service", "INFO", log_format="json")
logger.info("test_event", key="value")
captured = capsys.readouterr()
assert "test_event" in captured.out or "test_event" in captured.err
def test_setup_logging_console_format(capsys):
logger = setup_logging("test-service", "INFO", log_format="console")
logger.info("test_event", key="value")
captured = capsys.readouterr()
assert "test_event" in captured.out or "test_event" in captured.err
```
- [ ] **Step 2: Run test to verify it fails**
Run: `pytest shared/tests/test_logging.py -v`
Expected: FAIL with `ModuleNotFoundError: No module named 'shared.logging'`
- [ ] **Step 3: Implement structured logging**
Create `shared/src/shared/logging.py`:
```python
"""Structured logging setup using structlog."""
import logging
import sys
import structlog
def setup_logging(
service_name: str,
log_level: str = "INFO",
log_format: str = "json",
) -> structlog.stdlib.BoundLogger:
"""Configure structlog for the given service.
Args:
service_name: Bound to every log entry as 'service'.
log_level: Python log level string (DEBUG, INFO, WARNING, ERROR).
log_format: 'json' for production, 'console' for development.
Returns:
A bound structlog logger with service context.
"""
# Set stdlib root logger level
logging.basicConfig(
format="%(message)s",
stream=sys.stdout,
level=getattr(logging, log_level.upper(), logging.INFO),
force=True,
)
shared_processors: list[structlog.types.Processor] = [
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.UnicodeDecoder(),
]
if log_format == "console":
renderer = structlog.dev.ConsoleRenderer()
else:
renderer = structlog.processors.JSONRenderer()
structlog.configure(
processors=[
*shared_processors,
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
# Also configure the formatter for stdlib loggers
formatter = structlog.stdlib.ProcessorFormatter(
processors=[
structlog.stdlib.ProcessorFormatter.remove_processors_meta,
renderer,
],
)
root = logging.getLogger()
for handler in root.handlers:
handler.setFormatter(formatter)
return structlog.get_logger().bind(service=service_name)
```
- [ ] **Step 4: Run test to verify it passes**
Run: `pytest shared/tests/test_logging.py -v`
Expected: All 5 tests PASS
- [ ] **Step 5: Commit**
```bash
git add shared/src/shared/logging.py shared/tests/test_logging.py
git commit -m "feat(shared): add structlog-based structured logging"
```
---
## Task 4: Telegram Notification Service
**Files:**
- Create: `shared/src/shared/notifier.py`
- Modify: `shared/src/shared/config.py`
- Modify: `.env.example`
- Test: `shared/tests/test_notifier.py`
- [ ] **Step 1: Update config.py with Telegram settings**
Add to `shared/src/shared/config.py` after `dry_run`:
```python
# Telegram
telegram_bot_token: str = ""
telegram_chat_id: str = ""
telegram_enabled: bool = False
# Logging
log_format: str = "json"
# Health
health_port: int = 8080
# Circuit Breaker
circuit_breaker_threshold: int = 5
circuit_breaker_timeout: int = 60
```
- [ ] **Step 2: Update .env.example**
Replace `.env.example`:
```env
# Exchange
BINANCE_API_KEY=
BINANCE_API_SECRET=
# Infrastructure
REDIS_URL=redis://localhost:6379
DATABASE_URL=postgresql+asyncpg://trading:trading@localhost:5432/trading
# Logging
LOG_LEVEL=INFO
LOG_FORMAT=json
# Telegram
TELEGRAM_BOT_TOKEN=
TELEGRAM_CHAT_ID=
TELEGRAM_ENABLED=false
# Risk Management
RISK_MAX_POSITION_SIZE=0.1
RISK_STOP_LOSS_PCT=5
RISK_DAILY_LOSS_LIMIT_PCT=10
DRY_RUN=true
# Health & Metrics
HEALTH_PORT=8080
CIRCUIT_BREAKER_THRESHOLD=5
CIRCUIT_BREAKER_TIMEOUT=60
```
- [ ] **Step 3: Write the failing test for TelegramNotifier**
Create `shared/tests/test_notifier.py`:
```python
"""Tests for Telegram notification service."""
from decimal import Decimal
from unittest.mock import AsyncMock, patch, MagicMock
import pytest
from shared.models import Signal, OrderSide, Order, OrderType, OrderStatus
from shared.notifier import TelegramNotifier
@pytest.fixture
def notifier():
return TelegramNotifier(bot_token="test-token", chat_id="12345")
def test_notifier_disabled_when_no_token():
n = TelegramNotifier(bot_token="", chat_id="12345")
assert n.enabled is False
def test_notifier_enabled_with_token():
n = TelegramNotifier(bot_token="abc", chat_id="12345")
assert n.enabled is True
@pytest.mark.asyncio
async def test_send_does_nothing_when_disabled():
n = TelegramNotifier(bot_token="", chat_id="12345")
# Should not raise
await n.send("test message")
@pytest.mark.asyncio
async def test_send_posts_to_telegram_api(notifier):
mock_response = AsyncMock()
mock_response.status = 200
mock_response.__aenter__ = AsyncMock(return_value=mock_response)
mock_response.__aexit__ = AsyncMock(return_value=False)
mock_session = AsyncMock()
mock_session.post = MagicMock(return_value=mock_response)
notifier._session = mock_session
await notifier.send("Hello")
mock_session.post.assert_called_once()
call_kwargs = mock_session.post.call_args
assert "12345" in str(call_kwargs) or "Hello" in str(call_kwargs)
@pytest.mark.asyncio
async def test_send_signal_formats_message(notifier):
signal = Signal(
strategy="rsi",
symbol="BTCUSDT",
side=OrderSide.BUY,
price=Decimal("50000"),
quantity=Decimal("0.01"),
reason="RSI oversold",
)
with patch.object(notifier, "send", new_callable=AsyncMock) as mock_send:
await notifier.send_signal(signal)
mock_send.assert_called_once()
msg = mock_send.call_args[0][0]
assert "BUY" in msg
assert "BTCUSDT" in msg
assert "rsi" in msg
@pytest.mark.asyncio
async def test_send_order_formats_message(notifier):
order = Order(
signal_id="sig-1",
symbol="BTCUSDT",
side=OrderSide.BUY,
type=OrderType.MARKET,
price=Decimal("50000"),
quantity=Decimal("0.01"),
status=OrderStatus.FILLED,
)
with patch.object(notifier, "send", new_callable=AsyncMock) as mock_send:
await notifier.send_order(order)
mock_send.assert_called_once()
msg = mock_send.call_args[0][0]
assert "FILLED" in msg
assert "BTCUSDT" in msg
@pytest.mark.asyncio
async def test_send_error_formats_message(notifier):
with patch.object(notifier, "send", new_callable=AsyncMock) as mock_send:
await notifier.send_error("Connection lost", "data-collector")
mock_send.assert_called_once()
msg = mock_send.call_args[0][0]
assert "Connection lost" in msg
assert "data-collector" in msg
```
- [ ] **Step 4: Run test to verify it fails**
Run: `pytest shared/tests/test_notifier.py -v`
Expected: FAIL with `ModuleNotFoundError: No module named 'shared.notifier'`
- [ ] **Step 5: Implement TelegramNotifier**
Create `shared/src/shared/notifier.py`:
```python
"""Telegram notification service for trading alerts."""
import asyncio
import logging
from decimal import Decimal
import aiohttp
from shared.models import Order, Signal
logger = logging.getLogger(__name__)
TELEGRAM_API = "https://api.telegram.org/bot{token}/sendMessage"
class TelegramNotifier:
"""Sends notifications via Telegram Bot API."""
def __init__(self, bot_token: str, chat_id: str) -> None:
self._bot_token = bot_token
self._chat_id = chat_id
self._session: aiohttp.ClientSession | None = None
self._semaphore = asyncio.Semaphore(1) # Rate limit: 1 msg at a time
@property
def enabled(self) -> bool:
return bool(self._bot_token)
async def _ensure_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
return self._session
async def send(self, message: str, parse_mode: str = "HTML") -> None:
"""Send a message to the configured Telegram chat."""
if not self.enabled:
return
async with self._semaphore:
url = TELEGRAM_API.format(token=self._bot_token)
payload = {
"chat_id": self._chat_id,
"text": message,
"parse_mode": parse_mode,
}
retries = 3
for attempt in range(retries):
try:
session = await self._ensure_session()
async with session.post(url, json=payload) as resp:
if resp.status == 200:
return
logger.warning(
"Telegram API returned %d on attempt %d",
resp.status,
attempt + 1,
)
except Exception as exc:
logger.warning(
"Telegram send failed attempt %d: %s", attempt + 1, exc
)
if attempt < retries - 1:
await asyncio.sleep(1)
logger.error("Failed to send Telegram message after %d attempts", retries)
async def send_signal(self, signal: Signal) -> None:
"""Format and send a trading signal notification."""
msg = (
f"Signal: {signal.side.value}\n"
f"Strategy: {signal.strategy}\n"
f"Symbol: {signal.symbol}\n"
f"Price: {signal.price}\n"
f"Quantity: {signal.quantity}\n"
f"Reason: {signal.reason}"
)
await self.send(msg)
async def send_order(self, order: Order) -> None:
"""Format and send an order execution notification."""
msg = (
f"Order: {order.status.value}\n"
f"Symbol: {order.symbol}\n"
f"Side: {order.side.value}\n"
f"Price: {order.price}\n"
f"Quantity: {order.quantity}"
)
await self.send(msg)
async def send_error(self, error: str, service: str) -> None:
"""Send an error alert."""
msg = f"Error in {service}\n{error}"
await self.send(msg)
async def send_daily_summary(
self, positions: list, total_value: Decimal, daily_pnl: Decimal
) -> None:
"""Send daily portfolio summary."""
lines = [f"Daily Summary"]
lines.append(f"Total Value: {total_value:.2f}")
lines.append(f"Daily PnL: {daily_pnl:.2f}")
lines.append(f"Open Positions: {len(positions)}")
for pos in positions:
lines.append(f" {pos.symbol}: {pos.quantity} @ {pos.avg_entry_price}")
await self.send("\n".join(lines))
async def close(self) -> None:
"""Close the HTTP session."""
if self._session and not self._session.closed:
await self._session.close()
```
- [ ] **Step 6: Run test to verify it passes**
Run: `pytest shared/tests/test_notifier.py -v`
Expected: All 7 tests PASS
- [ ] **Step 7: Commit**
```bash
git add shared/src/shared/notifier.py shared/src/shared/config.py \
shared/tests/test_notifier.py .env.example
git commit -m "feat(shared): add Telegram notification service"
```
---
## Task 5: Error Recovery — Retry + Circuit Breaker
**Files:**
- Create: `shared/src/shared/resilience.py`
- Test: `shared/tests/test_resilience.py`
- [ ] **Step 1: Write the failing test**
Create `shared/tests/test_resilience.py`:
```python
"""Tests for retry and circuit breaker."""
import asyncio
import pytest
from shared.resilience import retry_with_backoff, CircuitBreaker, CircuitState
# --- retry_with_backoff tests ---
@pytest.mark.asyncio
async def test_retry_succeeds_on_first_attempt():
call_count = 0
@retry_with_backoff(max_retries=3, base_delay=0.01)
async def succeed():
nonlocal call_count
call_count += 1
return "ok"
result = await succeed()
assert result == "ok"
assert call_count == 1
@pytest.mark.asyncio
async def test_retry_succeeds_after_failures():
call_count = 0
@retry_with_backoff(max_retries=3, base_delay=0.01)
async def fail_then_succeed():
nonlocal call_count
call_count += 1
if call_count < 3:
raise ConnectionError("down")
return "recovered"
result = await fail_then_succeed()
assert result == "recovered"
assert call_count == 3
@pytest.mark.asyncio
async def test_retry_raises_after_max_retries():
@retry_with_backoff(max_retries=2, base_delay=0.01)
async def always_fail():
raise ConnectionError("always down")
with pytest.raises(ConnectionError, match="always down"):
await always_fail()
# --- CircuitBreaker tests ---
@pytest.mark.asyncio
async def test_circuit_breaker_starts_closed():
cb = CircuitBreaker(failure_threshold=3, recovery_timeout=0.1)
assert cb.state == CircuitState.CLOSED
@pytest.mark.asyncio
async def test_circuit_breaker_opens_after_threshold():
cb = CircuitBreaker(failure_threshold=2, recovery_timeout=0.1)
cb.record_failure()
assert cb.state == CircuitState.CLOSED
cb.record_failure()
assert cb.state == CircuitState.OPEN
@pytest.mark.asyncio
async def test_circuit_breaker_rejects_when_open():
cb = CircuitBreaker(failure_threshold=1, recovery_timeout=60)
cb.record_failure()
assert cb.state == CircuitState.OPEN
assert cb.allow_request() is False
@pytest.mark.asyncio
async def test_circuit_breaker_half_open_after_timeout():
cb = CircuitBreaker(failure_threshold=1, recovery_timeout=0.05)
cb.record_failure()
assert cb.state == CircuitState.OPEN
await asyncio.sleep(0.06)
assert cb.allow_request() is True
assert cb.state == CircuitState.HALF_OPEN
@pytest.mark.asyncio
async def test_circuit_breaker_closes_on_success():
cb = CircuitBreaker(failure_threshold=1, recovery_timeout=0.05)
cb.record_failure()
await asyncio.sleep(0.06)
cb.allow_request() # transitions to HALF_OPEN
cb.record_success()
assert cb.state == CircuitState.CLOSED
```
- [ ] **Step 2: Run test to verify it fails**
Run: `pytest shared/tests/test_resilience.py -v`
Expected: FAIL with `ModuleNotFoundError: No module named 'shared.resilience'`
- [ ] **Step 3: Implement resilience module**
Create `shared/src/shared/resilience.py`:
```python
"""Retry with backoff and circuit breaker patterns."""
import asyncio
import functools
import logging
import random
import time
from enum import Enum
from typing import Callable
logger = logging.getLogger(__name__)
def retry_with_backoff(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
) -> Callable:
"""Decorator for async functions that retries with exponential backoff + jitter."""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(*args, **kwargs):
last_exc = None
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as exc:
last_exc = exc
if attempt < max_retries - 1:
delay = min(base_delay * (2**attempt), max_delay)
jitter = delay * random.uniform(0, 0.5)
wait = delay + jitter
logger.warning(
"Retry %d/%d for %s after %.2fs: %s",
attempt + 1,
max_retries,
func.__name__,
wait,
exc,
)
await asyncio.sleep(wait)
raise last_exc
return wrapper
return decorator
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
"""Circuit breaker that opens after consecutive failures."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
) -> None:
self._failure_threshold = failure_threshold
self._recovery_timeout = recovery_timeout
self._failure_count = 0
self._state = CircuitState.CLOSED
self._opened_at: float | None = None
@property
def state(self) -> CircuitState:
return self._state
def allow_request(self) -> bool:
"""Check if a request should be allowed."""
if self._state == CircuitState.CLOSED:
return True
if self._state == CircuitState.OPEN:
if self._opened_at and (time.monotonic() - self._opened_at) >= self._recovery_timeout:
self._state = CircuitState.HALF_OPEN
return True
return False
# HALF_OPEN — allow one probe request
return True
def record_success(self) -> None:
"""Record a successful call."""
self._failure_count = 0
self._state = CircuitState.CLOSED
self._opened_at = None
def record_failure(self) -> None:
"""Record a failed call."""
self._failure_count += 1
if self._failure_count >= self._failure_threshold:
self._state = CircuitState.OPEN
self._opened_at = time.monotonic()
logger.error(
"Circuit breaker OPEN after %d failures", self._failure_count
)
```
- [ ] **Step 4: Run test to verify it passes**
Run: `pytest shared/tests/test_resilience.py -v`
Expected: All 8 tests PASS
- [ ] **Step 5: Commit**
```bash
git add shared/src/shared/resilience.py shared/tests/test_resilience.py
git commit -m "feat(shared): add retry with backoff and circuit breaker"
```
---
## Task 6: Health Check + Prometheus Metrics
**Files:**
- Create: `shared/src/shared/healthcheck.py`
- Create: `shared/src/shared/metrics.py`
- Create: `monitoring/prometheus.yml`
- Modify: `docker-compose.yml`
- Test: `shared/tests/test_healthcheck.py`
- Test: `shared/tests/test_metrics.py`
- [ ] **Step 1: Write the failing test for metrics**
Create `shared/tests/test_metrics.py`:
```python
"""Tests for Prometheus metrics definitions."""
from shared.metrics import ServiceMetrics
def test_service_metrics_creates_counters():
m = ServiceMetrics("test-service")
assert m.errors_total is not None
assert m.events_processed is not None
def test_service_metrics_increment_errors():
m = ServiceMetrics("test-service-2")
m.errors_total.labels(service="test-service-2", error_type="connection").inc()
# No assertion needed — prometheus_client raises on invalid labels
def test_service_metrics_observe_processing_time():
m = ServiceMetrics("test-service-3")
m.processing_seconds.labels(service="test-service-3").observe(0.5)
```
- [ ] **Step 2: Run test to verify it fails**
Run: `pytest shared/tests/test_metrics.py -v`
Expected: FAIL with `ModuleNotFoundError: No module named 'shared.metrics'`
- [ ] **Step 3: Implement metrics module**
Create `shared/src/shared/metrics.py`:
```python
"""Prometheus metric definitions for trading services."""
from prometheus_client import Counter, Gauge, Histogram
class ServiceMetrics:
"""Common Prometheus metrics for any trading service."""
def __init__(self, service_name: str) -> None:
prefix = service_name.replace("-", "_")
self.errors_total = Counter(
f"{prefix}_errors_total",
"Total error count",
["service", "error_type"],
)
self.events_processed = Counter(
f"{prefix}_events_processed_total",
"Total events processed",
["service", "event_type"],
)
self.processing_seconds = Histogram(
f"{prefix}_processing_seconds",
"Event processing duration in seconds",
["service"],
)
self.service_up = Gauge(
f"{prefix}_up",
"Service health status (1=up, 0=down)",
["service"],
)
```
- [ ] **Step 4: Run test to verify it passes**
Run: `pytest shared/tests/test_metrics.py -v`
Expected: All 3 tests PASS
- [ ] **Step 5: Write the failing test for healthcheck**
Create `shared/tests/test_healthcheck.py`:
```python
"""Tests for health check server."""
import pytest
from unittest.mock import AsyncMock
from shared.healthcheck import HealthCheckServer
def test_healthcheck_server_init():
server = HealthCheckServer(service_name="test", port=9090)
assert server._service_name == "test"
assert server._port == 9090
def test_healthcheck_register_check():
server = HealthCheckServer(service_name="test", port=9090)
check_fn = AsyncMock(return_value=True)
server.register_check("redis", check_fn)
assert "redis" in server._checks
@pytest.mark.asyncio
async def test_healthcheck_run_checks_all_pass():
server = HealthCheckServer(service_name="test", port=9090)
server.register_check("redis", AsyncMock(return_value=True))
server.register_check("postgres", AsyncMock(return_value=True))
result = await server.run_checks()
assert result["status"] == "ok"
assert result["checks"]["redis"] is True
assert result["checks"]["postgres"] is True
@pytest.mark.asyncio
async def test_healthcheck_run_checks_one_fails():
server = HealthCheckServer(service_name="test", port=9090)
server.register_check("redis", AsyncMock(return_value=True))
server.register_check("postgres", AsyncMock(return_value=False))
result = await server.run_checks()
assert result["status"] == "degraded"
assert result["checks"]["postgres"] is False
```
- [ ] **Step 6: Run test to verify it fails**
Run: `pytest shared/tests/test_healthcheck.py -v`
Expected: FAIL with `ModuleNotFoundError: No module named 'shared.healthcheck'`
- [ ] **Step 7: Implement healthcheck server**
Create `shared/src/shared/healthcheck.py`:
```python
"""Lightweight HTTP server for health checks and Prometheus metrics."""
import time
from typing import Any, Callable, Coroutine
from aiohttp import web
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
class HealthCheckServer:
"""Serves /health and /metrics endpoints."""
def __init__(self, service_name: str, port: int = 8080) -> None:
self._service_name = service_name
self._port = port
self._checks: dict[str, Callable[[], Coroutine[Any, Any, bool]]] = {}
self._start_time = time.monotonic()
def register_check(
self, name: str, check_fn: Callable[[], Coroutine[Any, Any, bool]]
) -> None:
"""Register a named async health check function."""
self._checks[name] = check_fn
async def run_checks(self) -> dict[str, Any]:
"""Run all registered checks and return aggregated result."""
results = {}
all_ok = True
for name, fn in self._checks.items():
try:
results[name] = await fn()
except Exception:
results[name] = False
if not results[name]:
all_ok = False
return {
"status": "ok" if all_ok else "degraded",
"service": self._service_name,
"uptime_seconds": round(time.monotonic() - self._start_time, 1),
"checks": results,
}
async def _handle_health(self, request: web.Request) -> web.Response:
result = await self.run_checks()
status = 200 if result["status"] == "ok" else 503
return web.json_response(result, status=status)
async def _handle_metrics(self, request: web.Request) -> web.Response:
return web.Response(
body=generate_latest(),
content_type=CONTENT_TYPE_LATEST,
)
async def start(self) -> web.AppRunner:
"""Start the HTTP server in the background."""
app = web.Application()
app.router.add_get("/health", self._handle_health)
app.router.add_get("/metrics", self._handle_metrics)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "0.0.0.0", self._port)
await site.start()
return runner
```
- [ ] **Step 8: Run test to verify it passes**
Run: `pytest shared/tests/test_healthcheck.py -v`
Expected: All 4 tests PASS
- [ ] **Step 9: Create Prometheus config and update docker-compose**
Create `monitoring/prometheus.yml`:
```yaml
global:
scrape_interval: 15s
scrape_configs:
- job_name: "trading-services"
static_configs:
- targets:
- "data-collector:8080"
- "strategy-engine:8081"
- "order-executor:8082"
- "portfolio-manager:8083"
```
Add to `docker-compose.yml` — append these services before the `volumes:` section:
```yaml
prometheus:
image: prom/prometheus:latest
profiles: ["monitoring"]
ports:
- "9090:9090"
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
depends_on:
- data-collector
- strategy-engine
- order-executor
- portfolio-manager
grafana:
image: grafana/grafana:latest
profiles: ["monitoring"]
ports:
- "3000:3000"
depends_on:
- prometheus
```
- [ ] **Step 10: Commit**
```bash
git add shared/src/shared/metrics.py shared/src/shared/healthcheck.py \
shared/tests/test_metrics.py shared/tests/test_healthcheck.py \
monitoring/prometheus.yml docker-compose.yml
git commit -m "feat(shared): add health checks and Prometheus metrics"
```
---
## Task 7: Integrate Operations Infrastructure into Services
**Files:**
- Modify: `services/data-collector/src/data_collector/main.py`
- Modify: `services/data-collector/src/data_collector/storage.py`
- Modify: `services/order-executor/src/order_executor/main.py`
- Modify: `services/order-executor/src/order_executor/executor.py`
- Modify: `services/portfolio-manager/src/portfolio_manager/main.py`
- Modify: `services/strategy-engine/src/strategy_engine/main.py`
This task integrates structlog, healthcheck, metrics, Telegram, and resilience into each service's entry point. Each service follows the same pattern.
- [ ] **Step 1: Update data-collector/main.py**
Replace `services/data-collector/src/data_collector/main.py`:
```python
"""Data Collector Service entry point."""
import asyncio
from shared.broker import RedisBroker
from shared.config import Settings
from shared.db import Database
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
from shared.resilience import retry_with_backoff
from data_collector.binance_ws import BinanceWebSocket
from data_collector.config import CollectorConfig
from data_collector.storage import CandleStorage
async def run() -> None:
config = CollectorConfig()
log = setup_logging("data-collector", config.log_level, config.log_format)
metrics = ServiceMetrics("data_collector")
notifier = TelegramNotifier(
bot_token=config.telegram_bot_token,
chat_id=config.telegram_chat_id,
)
db = Database(config.database_url)
await db.connect()
broker = RedisBroker(config.redis_url)
storage = CandleStorage(db=db, broker=broker)
# Health checks
health = HealthCheckServer("data-collector", port=config.health_port)
async def check_redis():
try:
await broker._redis.ping()
return True
except Exception:
return False
health.register_check("redis", check_redis)
await health.start()
metrics.service_up.labels(service="data-collector").set(1)
async def on_candle(candle):
log.info("candle_received", symbol=candle.symbol, timeframe=candle.timeframe)
await storage.store(candle)
metrics.events_processed.labels(
service="data-collector", event_type="candle"
).inc()
timeframe = config.timeframes[0] if config.timeframes else "1m"
ws = BinanceWebSocket(
symbols=config.symbols,
timeframe=timeframe,
on_candle=on_candle,
)
log.info("starting", symbols=config.symbols, timeframe=timeframe)
try:
await ws.start()
except Exception as exc:
log.error("fatal_error", error=str(exc))
metrics.errors_total.labels(
service="data-collector", error_type="fatal"
).inc()
await notifier.send_error(str(exc), "data-collector")
raise
finally:
metrics.service_up.labels(service="data-collector").set(0)
await notifier.close()
await broker.close()
await db.close()
def main() -> None:
asyncio.run(run())
if __name__ == "__main__":
main()
```
- [ ] **Step 2: Update order-executor/executor.py to use notifier**
Add notifier parameter to `OrderExecutor.__init__` and call it on order events. Replace `services/order-executor/src/order_executor/executor.py`:
```python
"""Order execution logic."""
import logging
from datetime import datetime, timezone
from decimal import Decimal
from typing import Any, Optional
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import OrderEvent
from shared.models import Order, OrderSide, OrderStatus, OrderType, Signal
from shared.notifier import TelegramNotifier
from order_executor.risk_manager import RiskManager
logger = logging.getLogger(__name__)
class OrderExecutor:
"""Executes orders on an exchange with risk gating."""
def __init__(
self,
exchange: Any,
risk_manager: RiskManager,
broker: RedisBroker,
db: Database,
notifier: TelegramNotifier,
dry_run: bool = True,
) -> None:
self.exchange = exchange
self.risk_manager = risk_manager
self.broker = broker
self.db = db
self.notifier = notifier
self.dry_run = dry_run
async def execute(self, signal: Signal) -> Optional[Order]:
"""Run risk checks and place an order for the given signal."""
balance_data = await self.exchange.fetch_balance()
free_balances = balance_data.get("free", {})
quote_currency = signal.symbol.split("/")[-1] if "/" in signal.symbol else "USDT"
balance = Decimal(str(free_balances.get(quote_currency, 0)))
positions = {}
daily_pnl = Decimal(0)
result = self.risk_manager.check(
signal=signal,
balance=balance,
positions=positions,
daily_pnl=daily_pnl,
)
if not result.allowed:
logger.warning(
"Risk check rejected signal %s: %s", signal.id, result.reason
)
return None
order = Order(
signal_id=signal.id,
symbol=signal.symbol,
side=signal.side,
type=OrderType.MARKET,
price=signal.price,
quantity=signal.quantity,
status=OrderStatus.PENDING,
)
if self.dry_run:
order.status = OrderStatus.FILLED
order.filled_at = datetime.now(timezone.utc)
logger.info("[DRY RUN] Order filled: %s %s %s", order.side, order.quantity, order.symbol)
else:
try:
await self.exchange.create_order(
symbol=signal.symbol,
type="market",
side=signal.side.value.lower(),
amount=float(signal.quantity),
)
order.status = OrderStatus.FILLED
order.filled_at = datetime.now(timezone.utc)
logger.info("Order filled: %s %s %s", order.side, order.quantity, order.symbol)
except Exception as exc:
order.status = OrderStatus.FAILED
logger.error("Order failed for signal %s: %s", signal.id, exc)
await self.db.insert_order(order)
await self.broker.publish("orders", OrderEvent(data=order).to_dict())
await self.notifier.send_order(order)
return order
```
- [ ] **Step 3: Update order-executor/main.py**
Replace `services/order-executor/src/order_executor/main.py`:
```python
"""Order Executor Service entry point."""
import asyncio
from decimal import Decimal
import ccxt.async_support as ccxt
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import Event, EventType
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
from order_executor.config import ExecutorConfig
from order_executor.executor import OrderExecutor
from order_executor.risk_manager import RiskManager
async def run() -> None:
config = ExecutorConfig()
log = setup_logging("order-executor", config.log_level, config.log_format)
metrics = ServiceMetrics("order_executor")
notifier = TelegramNotifier(
bot_token=config.telegram_bot_token,
chat_id=config.telegram_chat_id,
)
db = Database(config.database_url)
await db.connect()
broker = RedisBroker(config.redis_url)
exchange = ccxt.binance(
{"apiKey": config.binance_api_key, "secret": config.binance_api_secret}
)
risk_manager = RiskManager(
max_position_size=Decimal(str(config.risk_max_position_size)),
stop_loss_pct=Decimal(str(config.risk_stop_loss_pct)),
daily_loss_limit_pct=Decimal(str(config.risk_daily_loss_limit_pct)),
)
executor = OrderExecutor(
exchange=exchange,
risk_manager=risk_manager,
broker=broker,
db=db,
notifier=notifier,
dry_run=config.dry_run,
)
health = HealthCheckServer("order-executor", port=config.health_port + 2)
await health.start()
metrics.service_up.labels(service="order-executor").set(1)
last_id = "$"
stream = "signals"
log.info("started", stream=stream, dry_run=config.dry_run)
try:
while True:
messages = await broker.read(stream, last_id=last_id, count=10, block=5000)
for msg in messages:
try:
event = Event.from_dict(msg)
if event.type == EventType.SIGNAL:
signal = event.data
log.info("processing_signal", signal_id=signal.id, symbol=signal.symbol)
await executor.execute(signal)
metrics.events_processed.labels(
service="order-executor", event_type="signal"
).inc()
except Exception as exc:
log.error("process_failed", error=str(exc))
metrics.errors_total.labels(
service="order-executor", error_type="processing"
).inc()
finally:
metrics.service_up.labels(service="order-executor").set(0)
await notifier.close()
await broker.close()
await db.close()
await exchange.close()
def main() -> None:
asyncio.run(run())
if __name__ == "__main__":
main()
```
- [ ] **Step 4: Update strategy-engine/main.py**
Replace `services/strategy-engine/src/strategy_engine/main.py`:
```python
"""Strategy Engine Service entry point."""
import asyncio
from pathlib import Path
from shared.broker import RedisBroker
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
from strategy_engine.config import StrategyConfig
from strategy_engine.engine import StrategyEngine
from strategy_engine.plugin_loader import load_strategies
STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies"
async def run() -> None:
config = StrategyConfig()
log = setup_logging("strategy-engine", config.log_level, config.log_format)
metrics = ServiceMetrics("strategy_engine")
notifier = TelegramNotifier(
bot_token=config.telegram_bot_token,
chat_id=config.telegram_chat_id,
)
broker = RedisBroker(config.redis_url)
strategies = load_strategies(STRATEGIES_DIR)
for strategy in strategies:
params = config.strategy_params.get(strategy.name, {})
strategy.configure(params)
log.info("loaded_strategies", count=len(strategies), names=[s.name for s in strategies])
engine = StrategyEngine(broker=broker, strategies=strategies)
health = HealthCheckServer("strategy-engine", port=config.health_port + 1)
await health.start()
metrics.service_up.labels(service="strategy-engine").set(1)
try:
for symbol in config.symbols:
stream = f"candles.{symbol.replace('/', '_')}"
last_id = "$"
log.info("engine_loop_start", stream=stream)
while True:
last_id = await engine.process_once(stream, last_id)
except Exception as exc:
log.error("fatal_error", error=str(exc))
await notifier.send_error(str(exc), "strategy-engine")
raise
finally:
metrics.service_up.labels(service="strategy-engine").set(0)
await notifier.close()
await broker.close()
def main() -> None:
asyncio.run(run())
if __name__ == "__main__":
main()
```
- [ ] **Step 5: Update portfolio-manager/main.py**
Replace `services/portfolio-manager/src/portfolio_manager/main.py`:
```python
"""Portfolio Manager Service entry point."""
import asyncio
from shared.broker import RedisBroker
from shared.events import Event, OrderEvent
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
from portfolio_manager.config import PortfolioConfig
from portfolio_manager.portfolio import PortfolioTracker
ORDERS_STREAM = "orders"
async def run() -> None:
config = PortfolioConfig()
log = setup_logging("portfolio-manager", config.log_level, config.log_format)
metrics = ServiceMetrics("portfolio_manager")
notifier = TelegramNotifier(
bot_token=config.telegram_bot_token,
chat_id=config.telegram_chat_id,
)
broker = RedisBroker(config.redis_url)
tracker = PortfolioTracker()
health = HealthCheckServer("portfolio-manager", port=config.health_port + 3)
await health.start()
metrics.service_up.labels(service="portfolio-manager").set(1)
last_id = "$"
log.info("started", stream=ORDERS_STREAM)
try:
while True:
messages = await broker.read(ORDERS_STREAM, last_id=last_id, block=1000)
for msg in messages:
try:
event = Event.from_dict(msg)
if isinstance(event, OrderEvent):
order = event.data
tracker.apply_order(order)
log.info(
"order_applied",
symbol=order.symbol,
side=order.side.value,
qty=str(order.quantity),
)
metrics.events_processed.labels(
service="portfolio-manager", event_type="order"
).inc()
except Exception as exc:
log.error("process_failed", error=str(exc))
metrics.errors_total.labels(
service="portfolio-manager", error_type="processing"
).inc()
finally:
metrics.service_up.labels(service="portfolio-manager").set(0)
await notifier.close()
await broker.close()
def main() -> None:
asyncio.run(run())
if __name__ == "__main__":
main()
```
- [ ] **Step 6: Run all tests to verify nothing is broken**
Run: `pytest -v`
Expected: All existing tests PASS (some executor tests may need mock updates for `notifier` param)
- [ ] **Step 7: Fix any broken executor tests**
The `OrderExecutor` now requires a `notifier` parameter. Update `services/order-executor/tests/test_executor.py` — add `notifier=AsyncMock()` to every `OrderExecutor(...)` call. For example, wherever the test creates:
```python
executor = OrderExecutor(
exchange=mock_exchange,
risk_manager=mock_risk,
broker=mock_broker,
db=mock_db,
dry_run=True,
)
```
Change to:
```python
executor = OrderExecutor(
exchange=mock_exchange,
risk_manager=mock_risk,
broker=mock_broker,
db=mock_db,
notifier=AsyncMock(),
dry_run=True,
)
```
- [ ] **Step 8: Run all tests again**
Run: `pytest -v`
Expected: All tests PASS
- [ ] **Step 9: Commit**
```bash
git add services/data-collector/src/data_collector/main.py \
services/order-executor/src/order_executor/executor.py \
services/order-executor/src/order_executor/main.py \
services/order-executor/tests/test_executor.py \
services/strategy-engine/src/strategy_engine/main.py \
services/portfolio-manager/src/portfolio_manager/main.py
git commit -m "feat(services): integrate structlog, healthcheck, metrics, and Telegram"
```
---
## Task 8: BaseStrategy warmup_period + YAML Config Loading
**Files:**
- Modify: `services/strategy-engine/strategies/base.py`
- Modify: `services/strategy-engine/strategies/rsi_strategy.py`
- Modify: `services/strategy-engine/strategies/grid_strategy.py`
- Modify: `services/strategy-engine/src/strategy_engine/plugin_loader.py`
- Create: `services/strategy-engine/strategies/config/rsi_strategy.yaml`
- Create: `services/strategy-engine/strategies/config/grid_strategy.yaml`
- [ ] **Step 1: Update BaseStrategy with warmup_period**
Replace `services/strategy-engine/strategies/base.py`:
```python
from abc import ABC, abstractmethod
from shared.models import Candle, Signal
class BaseStrategy(ABC):
name: str = "base"
@property
@abstractmethod
def warmup_period(self) -> int:
"""Minimum number of candles needed before generating signals."""
pass
@abstractmethod
def on_candle(self, candle: Candle) -> Signal | None:
pass
@abstractmethod
def configure(self, params: dict) -> None:
pass
def reset(self) -> None:
pass
```
- [ ] **Step 2: Update RsiStrategy with warmup_period**
Add this property to `RsiStrategy` in `rsi_strategy.py`, after `__init__`:
```python
@property
def warmup_period(self) -> int:
return self._period + 1
```
- [ ] **Step 3: Update GridStrategy with warmup_period**
Add this property to `GridStrategy` in `grid_strategy.py`, after `__init__`:
```python
@property
def warmup_period(self) -> int:
return 2 # Needs at least 2 candles to detect zone crossing
```
- [ ] **Step 4: Create YAML config files**
Create `services/strategy-engine/strategies/config/rsi_strategy.yaml`:
```yaml
period: 14
oversold: 30
overbought: 70
quantity: "0.01"
```
Create `services/strategy-engine/strategies/config/grid_strategy.yaml`:
```yaml
lower_price: 60000
upper_price: 70000
grid_count: 5
quantity: "0.01"
```
- [ ] **Step 5: Update plugin_loader.py with YAML config loading**
Replace `services/strategy-engine/src/strategy_engine/plugin_loader.py`:
```python
"""Dynamic plugin loader for strategy modules with YAML config support."""
import importlib.util
import sys
from pathlib import Path
import yaml
from strategies.base import BaseStrategy
def load_strategies(strategies_dir: Path) -> list[BaseStrategy]:
"""Scan strategies_dir for *.py files and load all BaseStrategy subclasses.
Automatically loads matching YAML config from strategies_dir/config/.
"""
loaded: list[BaseStrategy] = []
config_dir = strategies_dir / "config"
for path in sorted(strategies_dir.glob("*.py")):
if path.name.startswith("__") or path.name == "base.py":
continue
module_name = f"_strategy_plugin_{path.stem}"
spec = importlib.util.spec_from_file_location(module_name, path)
if spec is None or spec.loader is None:
continue
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
for attr_name in dir(module):
obj = getattr(module, attr_name)
if (
isinstance(obj, type)
and issubclass(obj, BaseStrategy)
and obj is not BaseStrategy
):
instance = obj()
# Load YAML config if it exists
yaml_path = config_dir / f"{path.stem}.yaml"
if yaml_path.exists():
with open(yaml_path) as f:
params = yaml.safe_load(f) or {}
instance.configure(params)
loaded.append(instance)
return loaded
```
- [ ] **Step 6: Run existing strategy tests**
Run: `pytest services/strategy-engine/tests/ -v`
Expected: All tests PASS
- [ ] **Step 7: Commit**
```bash
git add services/strategy-engine/strategies/base.py \
services/strategy-engine/strategies/rsi_strategy.py \
services/strategy-engine/strategies/grid_strategy.py \
services/strategy-engine/src/strategy_engine/plugin_loader.py \
services/strategy-engine/strategies/config/
git commit -m "feat(strategy): add warmup_period to BaseStrategy and YAML config loading"
```
---
## Task 9: MACD Strategy
**Files:**
- Create: `services/strategy-engine/strategies/macd_strategy.py`
- Create: `services/strategy-engine/strategies/config/macd_strategy.yaml`
- Test: `services/strategy-engine/tests/test_macd_strategy.py`
- [ ] **Step 1: Write the failing test**
Create `services/strategy-engine/tests/test_macd_strategy.py`:
```python
"""Tests for MACD strategy."""
from decimal import Decimal
import pytest
from shared.models import Candle, OrderSide
from strategies.macd_strategy import MacdStrategy
@pytest.fixture
def strategy():
s = MacdStrategy()
s.configure({"fast_period": 3, "slow_period": 6, "signal_period": 3, "quantity": "0.01"})
return s
def _candle(price: float) -> Candle:
from datetime import datetime, timezone
return Candle(
symbol="BTCUSDT",
timeframe="1m",
open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
open=Decimal(str(price)),
high=Decimal(str(price + 100)),
low=Decimal(str(price - 100)),
close=Decimal(str(price)),
volume=Decimal("10"),
)
def test_macd_warmup_period(strategy):
assert strategy.warmup_period == 9 # slow_period + signal_period = 6 + 3
def test_macd_no_signal_insufficient_data(strategy):
for price in [100, 101, 102, 103, 104]:
result = strategy.on_candle(_candle(price))
assert result is None
def test_macd_buy_signal_on_bullish_crossover(strategy):
# Feed declining prices to push MACD below signal, then rising to cross above
prices = [100, 98, 96, 94, 92, 90, 88, 90, 93, 97, 102, 108, 115, 123, 132]
signals = []
for p in prices:
sig = strategy.on_candle(_candle(p))
if sig is not None:
signals.append(sig)
buy_signals = [s for s in signals if s.side == OrderSide.BUY]
assert len(buy_signals) > 0
assert buy_signals[0].strategy == "macd"
def test_macd_sell_signal_on_bearish_crossover(strategy):
# Feed rising prices to push MACD above signal, then declining to cross below
prices = [100, 105, 110, 116, 122, 128, 125, 120, 114, 107, 99, 90, 80, 70, 60]
signals = []
for p in prices:
sig = strategy.on_candle(_candle(p))
if sig is not None:
signals.append(sig)
sell_signals = [s for s in signals if s.side == OrderSide.SELL]
assert len(sell_signals) > 0
def test_macd_reset_clears_state(strategy):
for p in [100, 101, 102]:
strategy.on_candle(_candle(p))
strategy.reset()
assert len(strategy._closes) == 0
```
- [ ] **Step 2: Run test to verify it fails**
Run: `pytest services/strategy-engine/tests/test_macd_strategy.py -v`
Expected: FAIL with `ModuleNotFoundError: No module named 'strategies.macd_strategy'`
- [ ] **Step 3: Implement MACD strategy**
Create `services/strategy-engine/strategies/macd_strategy.py`:
```python
"""MACD (Moving Average Convergence Divergence) strategy."""
from collections import deque
from decimal import Decimal
import pandas as pd
from shared.models import Candle, Signal, OrderSide
from strategies.base import BaseStrategy
class MacdStrategy(BaseStrategy):
name: str = "macd"
def __init__(self) -> None:
self._fast_period: int = 12
self._slow_period: int = 26
self._signal_period: int = 9
self._quantity: Decimal = Decimal("0.01")
self._closes: deque[float] = deque(maxlen=500)
self._prev_histogram: float | None = None
@property
def warmup_period(self) -> int:
return self._slow_period + self._signal_period
def configure(self, params: dict) -> None:
self._fast_period = int(params.get("fast_period", 12))
self._slow_period = int(params.get("slow_period", 26))
self._signal_period = int(params.get("signal_period", 9))
self._quantity = Decimal(str(params.get("quantity", "0.01")))
def reset(self) -> None:
self._closes.clear()
self._prev_histogram = None
def on_candle(self, candle: Candle) -> Signal | None:
self._closes.append(float(candle.close))
if len(self._closes) < self.warmup_period:
return None
series = pd.Series(list(self._closes))
fast_ema = series.ewm(span=self._fast_period, adjust=False).mean()
slow_ema = series.ewm(span=self._slow_period, adjust=False).mean()
macd_line = fast_ema - slow_ema
signal_line = macd_line.ewm(span=self._signal_period, adjust=False).mean()
histogram = macd_line - signal_line
current_hist = histogram.iloc[-1]
if self._prev_histogram is None:
self._prev_histogram = current_hist
return None
prev = self._prev_histogram
self._prev_histogram = current_hist
# Bullish crossover: histogram crosses from negative to positive
if prev < 0 and current_hist > 0:
return Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.BUY,
price=candle.close,
quantity=self._quantity,
reason=f"MACD bullish crossover (histogram {prev:.4f} -> {current_hist:.4f})",
)
# Bearish crossover: histogram crosses from positive to negative
if prev > 0 and current_hist < 0:
return Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.SELL,
price=candle.close,
quantity=self._quantity,
reason=f"MACD bearish crossover (histogram {prev:.4f} -> {current_hist:.4f})",
)
return None
```
Create `services/strategy-engine/strategies/config/macd_strategy.yaml`:
```yaml
fast_period: 12
slow_period: 26
signal_period: 9
quantity: "0.01"
```
- [ ] **Step 4: Run test to verify it passes**
Run: `pytest services/strategy-engine/tests/test_macd_strategy.py -v`
Expected: All 5 tests PASS
- [ ] **Step 5: Commit**
```bash
git add services/strategy-engine/strategies/macd_strategy.py \
services/strategy-engine/strategies/config/macd_strategy.yaml \
services/strategy-engine/tests/test_macd_strategy.py
git commit -m "feat(strategy): add MACD strategy"
```
---
## Task 10: Bollinger Bands Strategy
**Files:**
- Create: `services/strategy-engine/strategies/bollinger_strategy.py`
- Create: `services/strategy-engine/strategies/config/bollinger_strategy.yaml`
- Test: `services/strategy-engine/tests/test_bollinger_strategy.py`
- [ ] **Step 1: Write the failing test**
Create `services/strategy-engine/tests/test_bollinger_strategy.py`:
```python
"""Tests for Bollinger Bands strategy."""
from decimal import Decimal
from datetime import datetime, timezone
import pytest
from shared.models import Candle, OrderSide
from strategies.bollinger_strategy import BollingerStrategy
@pytest.fixture
def strategy():
s = BollingerStrategy()
s.configure({"period": 5, "num_std": 2.0, "min_bandwidth": 0.0, "quantity": "0.01"})
return s
def _candle(price: float) -> Candle:
return Candle(
symbol="BTCUSDT",
timeframe="1m",
open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
open=Decimal(str(price)),
high=Decimal(str(price + 10)),
low=Decimal(str(price - 10)),
close=Decimal(str(price)),
volume=Decimal("10"),
)
def test_bollinger_warmup_period(strategy):
assert strategy.warmup_period == 5
def test_bollinger_no_signal_insufficient_data(strategy):
for p in [100, 101, 102]:
result = strategy.on_candle(_candle(p))
assert result is None
def test_bollinger_buy_on_lower_band_recovery(strategy):
# Stable prices to build bands, then drop below and recover
prices = [100, 100, 100, 100, 100, 80, 80, 95]
signals = []
for p in prices:
sig = strategy.on_candle(_candle(p))
if sig is not None:
signals.append(sig)
buy_signals = [s for s in signals if s.side == OrderSide.BUY]
assert len(buy_signals) > 0
assert buy_signals[0].strategy == "bollinger"
def test_bollinger_sell_on_upper_band_recovery(strategy):
prices = [100, 100, 100, 100, 100, 120, 120, 105]
signals = []
for p in prices:
sig = strategy.on_candle(_candle(p))
if sig is not None:
signals.append(sig)
sell_signals = [s for s in signals if s.side == OrderSide.SELL]
assert len(sell_signals) > 0
def test_bollinger_reset_clears_state(strategy):
for p in [100, 101]:
strategy.on_candle(_candle(p))
strategy.reset()
assert len(strategy._closes) == 0
```
- [ ] **Step 2: Run test to verify it fails**
Run: `pytest services/strategy-engine/tests/test_bollinger_strategy.py -v`
Expected: FAIL with `ModuleNotFoundError`
- [ ] **Step 3: Implement Bollinger Bands strategy**
Create `services/strategy-engine/strategies/bollinger_strategy.py`:
```python
"""Bollinger Bands strategy."""
from collections import deque
from decimal import Decimal
import pandas as pd
from shared.models import Candle, Signal, OrderSide
from strategies.base import BaseStrategy
class BollingerStrategy(BaseStrategy):
name: str = "bollinger"
def __init__(self) -> None:
self._period: int = 20
self._num_std: float = 2.0
self._min_bandwidth: float = 0.02
self._quantity: Decimal = Decimal("0.01")
self._closes: deque[float] = deque(maxlen=500)
self._was_below_lower: bool = False
self._was_above_upper: bool = False
@property
def warmup_period(self) -> int:
return self._period
def configure(self, params: dict) -> None:
self._period = int(params.get("period", 20))
self._num_std = float(params.get("num_std", 2.0))
self._min_bandwidth = float(params.get("min_bandwidth", 0.02))
self._quantity = Decimal(str(params.get("quantity", "0.01")))
def reset(self) -> None:
self._closes.clear()
self._was_below_lower = False
self._was_above_upper = False
def on_candle(self, candle: Candle) -> Signal | None:
self._closes.append(float(candle.close))
if len(self._closes) < self._period:
return None
series = pd.Series(list(self._closes))
sma = series.rolling(self._period).mean().iloc[-1]
std = series.rolling(self._period).std().iloc[-1]
upper = sma + self._num_std * std
lower = sma - self._num_std * std
# Bandwidth filter
if sma > 0:
bandwidth = (upper - lower) / sma
if bandwidth < self._min_bandwidth:
return None
price = float(candle.close)
# Track band penetration
if price < lower:
self._was_below_lower = True
if price > upper:
self._was_above_upper = True
# BUY: price was below lower band and recovered back inside
if self._was_below_lower and price >= lower:
self._was_below_lower = False
return Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.BUY,
price=candle.close,
quantity=self._quantity,
reason=f"Bollinger: price recovered above lower band ({lower:.2f})",
)
# SELL: price was above upper band and recovered back inside
if self._was_above_upper and price <= upper:
self._was_above_upper = False
return Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.SELL,
price=candle.close,
quantity=self._quantity,
reason=f"Bollinger: price recovered below upper band ({upper:.2f})",
)
return None
```
Create `services/strategy-engine/strategies/config/bollinger_strategy.yaml`:
```yaml
period: 20
num_std: 2.0
min_bandwidth: 0.02
quantity: "0.01"
```
- [ ] **Step 4: Run test to verify it passes**
Run: `pytest services/strategy-engine/tests/test_bollinger_strategy.py -v`
Expected: All 5 tests PASS
- [ ] **Step 5: Commit**
```bash
git add services/strategy-engine/strategies/bollinger_strategy.py \
services/strategy-engine/strategies/config/bollinger_strategy.yaml \
services/strategy-engine/tests/test_bollinger_strategy.py
git commit -m "feat(strategy): add Bollinger Bands strategy"
```
---
## Task 11: EMA Crossover Strategy
**Files:**
- Create: `services/strategy-engine/strategies/ema_crossover_strategy.py`
- Create: `services/strategy-engine/strategies/config/ema_crossover_strategy.yaml`
- Test: `services/strategy-engine/tests/test_ema_crossover_strategy.py`
- [ ] **Step 1: Write the failing test**
Create `services/strategy-engine/tests/test_ema_crossover_strategy.py`:
```python
"""Tests for EMA Crossover strategy."""
from decimal import Decimal
from datetime import datetime, timezone
import pytest
from shared.models import Candle, OrderSide
from strategies.ema_crossover_strategy import EmaCrossoverStrategy
@pytest.fixture
def strategy():
s = EmaCrossoverStrategy()
s.configure({"short_period": 3, "long_period": 6, "quantity": "0.01"})
return s
def _candle(price: float) -> Candle:
return Candle(
symbol="BTCUSDT",
timeframe="1m",
open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
open=Decimal(str(price)),
high=Decimal(str(price + 10)),
low=Decimal(str(price - 10)),
close=Decimal(str(price)),
volume=Decimal("10"),
)
def test_ema_warmup_period(strategy):
assert strategy.warmup_period == 6
def test_ema_no_signal_insufficient_data(strategy):
for p in [100, 101, 102]:
result = strategy.on_candle(_candle(p))
assert result is None
def test_ema_buy_signal_golden_cross(strategy):
# Declining then sharp rise: short EMA crosses above long EMA
prices = [100, 98, 96, 94, 92, 90, 95, 100, 108, 117, 127]
signals = []
for p in prices:
sig = strategy.on_candle(_candle(p))
if sig is not None:
signals.append(sig)
buy_signals = [s for s in signals if s.side == OrderSide.BUY]
assert len(buy_signals) > 0
assert buy_signals[0].strategy == "ema_crossover"
def test_ema_sell_signal_death_cross(strategy):
# Rising then sharp decline: short EMA crosses below long EMA
prices = [100, 105, 110, 115, 120, 125, 118, 110, 100, 88, 75]
signals = []
for p in prices:
sig = strategy.on_candle(_candle(p))
if sig is not None:
signals.append(sig)
sell_signals = [s for s in signals if s.side == OrderSide.SELL]
assert len(sell_signals) > 0
def test_ema_reset_clears_state(strategy):
for p in [100, 101]:
strategy.on_candle(_candle(p))
strategy.reset()
assert len(strategy._closes) == 0
```
- [ ] **Step 2: Run test to verify it fails**
Run: `pytest services/strategy-engine/tests/test_ema_crossover_strategy.py -v`
Expected: FAIL with `ModuleNotFoundError`
- [ ] **Step 3: Implement EMA Crossover strategy**
Create `services/strategy-engine/strategies/ema_crossover_strategy.py`:
```python
"""EMA Crossover (Golden Cross / Death Cross) strategy."""
from collections import deque
from decimal import Decimal
import pandas as pd
from shared.models import Candle, Signal, OrderSide
from strategies.base import BaseStrategy
class EmaCrossoverStrategy(BaseStrategy):
name: str = "ema_crossover"
def __init__(self) -> None:
self._short_period: int = 9
self._long_period: int = 21
self._quantity: Decimal = Decimal("0.01")
self._closes: deque[float] = deque(maxlen=500)
self._prev_short_above: bool | None = None
@property
def warmup_period(self) -> int:
return self._long_period
def configure(self, params: dict) -> None:
self._short_period = int(params.get("short_period", 9))
self._long_period = int(params.get("long_period", 21))
self._quantity = Decimal(str(params.get("quantity", "0.01")))
def reset(self) -> None:
self._closes.clear()
self._prev_short_above = None
def on_candle(self, candle: Candle) -> Signal | None:
self._closes.append(float(candle.close))
if len(self._closes) < self._long_period:
return None
series = pd.Series(list(self._closes))
short_ema = series.ewm(span=self._short_period, adjust=False).mean().iloc[-1]
long_ema = series.ewm(span=self._long_period, adjust=False).mean().iloc[-1]
short_above = short_ema > long_ema
if self._prev_short_above is None:
self._prev_short_above = short_above
return None
prev = self._prev_short_above
self._prev_short_above = short_above
# Golden Cross: short EMA crosses above long EMA
if not prev and short_above:
return Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.BUY,
price=candle.close,
quantity=self._quantity,
reason=f"EMA Golden Cross (short={short_ema:.2f} > long={long_ema:.2f})",
)
# Death Cross: short EMA crosses below long EMA
if prev and not short_above:
return Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.SELL,
price=candle.close,
quantity=self._quantity,
reason=f"EMA Death Cross (short={short_ema:.2f} < long={long_ema:.2f})",
)
return None
```
Create `services/strategy-engine/strategies/config/ema_crossover_strategy.yaml`:
```yaml
short_period: 9
long_period: 21
quantity: "0.01"
```
- [ ] **Step 4: Run test to verify it passes**
Run: `pytest services/strategy-engine/tests/test_ema_crossover_strategy.py -v`
Expected: All 5 tests PASS
- [ ] **Step 5: Commit**
```bash
git add services/strategy-engine/strategies/ema_crossover_strategy.py \
services/strategy-engine/strategies/config/ema_crossover_strategy.yaml \
services/strategy-engine/tests/test_ema_crossover_strategy.py
git commit -m "feat(strategy): add EMA Crossover strategy"
```
---
## Task 12: VWAP Strategy
**Files:**
- Create: `services/strategy-engine/strategies/vwap_strategy.py`
- Create: `services/strategy-engine/strategies/config/vwap_strategy.yaml`
- Test: `services/strategy-engine/tests/test_vwap_strategy.py`
- [ ] **Step 1: Write the failing test**
Create `services/strategy-engine/tests/test_vwap_strategy.py`:
```python
"""Tests for VWAP strategy."""
from decimal import Decimal
from datetime import datetime, timezone
import pytest
from shared.models import Candle, OrderSide
from strategies.vwap_strategy import VwapStrategy
@pytest.fixture
def strategy():
s = VwapStrategy()
s.configure({"deviation_threshold": 0.01, "quantity": "0.01"})
return s
def _candle(price: float, volume: float = 10.0) -> Candle:
return Candle(
symbol="BTCUSDT",
timeframe="1m",
open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
open=Decimal(str(price)),
high=Decimal(str(price + 10)),
low=Decimal(str(price - 10)),
close=Decimal(str(price)),
volume=Decimal(str(volume)),
)
def test_vwap_warmup_period(strategy):
assert strategy.warmup_period == 30
def test_vwap_no_signal_insufficient_data(strategy):
for i in range(10):
result = strategy.on_candle(_candle(100))
assert result is None
def test_vwap_buy_signal_below_vwap_recovery(strategy):
# Build VWAP at ~100, then go below, then recover
signals = []
for _ in range(30):
strategy.on_candle(_candle(100, 100))
# Drop below VWAP
for _ in range(5):
strategy.on_candle(_candle(95, 10))
# Recover to VWAP
sig = strategy.on_candle(_candle(100, 10))
if sig is not None:
signals.append(sig)
buy_signals = [s for s in signals if s.side == OrderSide.BUY]
assert len(buy_signals) > 0
def test_vwap_sell_signal_above_vwap_recovery(strategy):
signals = []
for _ in range(30):
strategy.on_candle(_candle(100, 100))
for _ in range(5):
strategy.on_candle(_candle(105, 10))
sig = strategy.on_candle(_candle(100, 10))
if sig is not None:
signals.append(sig)
sell_signals = [s for s in signals if s.side == OrderSide.SELL]
assert len(sell_signals) > 0
def test_vwap_reset_clears_state(strategy):
strategy.on_candle(_candle(100))
strategy.reset()
assert strategy._cumulative_tp_vol == 0.0
assert strategy._cumulative_vol == 0.0
```
- [ ] **Step 2: Run test to verify it fails**
Run: `pytest services/strategy-engine/tests/test_vwap_strategy.py -v`
Expected: FAIL with `ModuleNotFoundError`
- [ ] **Step 3: Implement VWAP strategy**
Create `services/strategy-engine/strategies/vwap_strategy.py`:
```python
"""VWAP (Volume Weighted Average Price) strategy."""
from decimal import Decimal
from shared.models import Candle, Signal, OrderSide
from strategies.base import BaseStrategy
class VwapStrategy(BaseStrategy):
name: str = "vwap"
def __init__(self) -> None:
self._deviation_threshold: float = 0.002
self._quantity: Decimal = Decimal("0.01")
self._cumulative_tp_vol: float = 0.0
self._cumulative_vol: float = 0.0
self._candle_count: int = 0
self._was_below_vwap: bool = False
self._was_above_vwap: bool = False
@property
def warmup_period(self) -> int:
return 30
def configure(self, params: dict) -> None:
self._deviation_threshold = float(params.get("deviation_threshold", 0.002))
self._quantity = Decimal(str(params.get("quantity", "0.01")))
def reset(self) -> None:
self._cumulative_tp_vol = 0.0
self._cumulative_vol = 0.0
self._candle_count = 0
self._was_below_vwap = False
self._was_above_vwap = False
def on_candle(self, candle: Candle) -> Signal | None:
high = float(candle.high)
low = float(candle.low)
close = float(candle.close)
volume = float(candle.volume)
typical_price = (high + low + close) / 3.0
self._cumulative_tp_vol += typical_price * volume
self._cumulative_vol += volume
self._candle_count += 1
if self._candle_count < self.warmup_period:
return None
if self._cumulative_vol == 0:
return None
vwap = self._cumulative_tp_vol / self._cumulative_vol
deviation = (close - vwap) / vwap if vwap != 0 else 0
# Track VWAP deviations
if deviation < -self._deviation_threshold:
self._was_below_vwap = True
self._was_above_vwap = False
elif deviation > self._deviation_threshold:
self._was_above_vwap = True
self._was_below_vwap = False
# BUY: price was below VWAP and recovered to VWAP (mean reversion)
if self._was_below_vwap and abs(deviation) <= self._deviation_threshold:
self._was_below_vwap = False
return Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.BUY,
price=candle.close,
quantity=self._quantity,
reason=f"VWAP mean reversion from below (VWAP={vwap:.2f}, deviation={deviation:.4f})",
)
# SELL: price was above VWAP and recovered to VWAP
if self._was_above_vwap and abs(deviation) <= self._deviation_threshold:
self._was_above_vwap = False
return Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.SELL,
price=candle.close,
quantity=self._quantity,
reason=f"VWAP mean reversion from above (VWAP={vwap:.2f}, deviation={deviation:.4f})",
)
return None
```
Create `services/strategy-engine/strategies/config/vwap_strategy.yaml`:
```yaml
deviation_threshold: 0.002
quantity: "0.01"
```
- [ ] **Step 4: Run test to verify it passes**
Run: `pytest services/strategy-engine/tests/test_vwap_strategy.py -v`
Expected: All 5 tests PASS
- [ ] **Step 5: Commit**
```bash
git add services/strategy-engine/strategies/vwap_strategy.py \
services/strategy-engine/strategies/config/vwap_strategy.yaml \
services/strategy-engine/tests/test_vwap_strategy.py
git commit -m "feat(strategy): add VWAP strategy"
```
---
## Task 13: Volume Profile Strategy
**Files:**
- Create: `services/strategy-engine/strategies/volume_profile_strategy.py`
- Create: `services/strategy-engine/strategies/config/volume_profile_strategy.yaml`
- Test: `services/strategy-engine/tests/test_volume_profile_strategy.py`
- [ ] **Step 1: Write the failing test**
Create `services/strategy-engine/tests/test_volume_profile_strategy.py`:
```python
"""Tests for Volume Profile strategy."""
from decimal import Decimal
from datetime import datetime, timezone
import pytest
from shared.models import Candle, OrderSide
from strategies.volume_profile_strategy import VolumeProfileStrategy
@pytest.fixture
def strategy():
s = VolumeProfileStrategy()
s.configure({
"lookback_period": 10,
"num_bins": 5,
"value_area_pct": 0.7,
"quantity": "0.01",
})
return s
def _candle(price: float, volume: float = 10.0) -> Candle:
return Candle(
symbol="BTCUSDT",
timeframe="1m",
open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
open=Decimal(str(price)),
high=Decimal(str(price + 5)),
low=Decimal(str(price - 5)),
close=Decimal(str(price)),
volume=Decimal(str(volume)),
)
def test_volume_profile_warmup_period(strategy):
assert strategy.warmup_period == 10
def test_volume_profile_no_signal_insufficient_data(strategy):
for p in [100, 101, 102]:
result = strategy.on_candle(_candle(p))
assert result is None
def test_volume_profile_buy_at_value_area_low(strategy):
# Concentrate volume at 100, then price drops to bottom of value area
signals = []
for _ in range(10):
strategy.on_candle(_candle(100, 100))
# Price drops to lower edge
sig = strategy.on_candle(_candle(90, 10))
if sig is not None:
signals.append(sig)
# Multiple attempts — may need several candles for the signal
for p in [89, 88, 90]:
sig = strategy.on_candle(_candle(p, 10))
if sig is not None:
signals.append(sig)
buy_signals = [s for s in signals if s.side == OrderSide.BUY]
assert len(buy_signals) > 0
def test_volume_profile_sell_at_value_area_high(strategy):
signals = []
for _ in range(10):
strategy.on_candle(_candle(100, 100))
sig = strategy.on_candle(_candle(110, 10))
if sig is not None:
signals.append(sig)
for p in [111, 112, 110]:
sig = strategy.on_candle(_candle(p, 10))
if sig is not None:
signals.append(sig)
sell_signals = [s for s in signals if s.side == OrderSide.SELL]
assert len(sell_signals) > 0
def test_volume_profile_reset_clears_state(strategy):
strategy.on_candle(_candle(100))
strategy.reset()
assert len(strategy._candles) == 0
```
- [ ] **Step 2: Run test to verify it fails**
Run: `pytest services/strategy-engine/tests/test_volume_profile_strategy.py -v`
Expected: FAIL with `ModuleNotFoundError`
- [ ] **Step 3: Implement Volume Profile strategy**
Create `services/strategy-engine/strategies/volume_profile_strategy.py`:
```python
"""Volume Profile strategy based on Point of Control and Value Area."""
from collections import deque
from decimal import Decimal
import numpy as np
from shared.models import Candle, Signal, OrderSide
from strategies.base import BaseStrategy
class VolumeProfileStrategy(BaseStrategy):
name: str = "volume_profile"
def __init__(self) -> None:
self._lookback_period: int = 100
self._num_bins: int = 50
self._value_area_pct: float = 0.7
self._quantity: Decimal = Decimal("0.01")
self._candles: deque[tuple[float, float]] = deque(maxlen=500) # (close, volume)
self._was_below_va: bool = False
self._was_above_va: bool = False
@property
def warmup_period(self) -> int:
return self._lookback_period
def configure(self, params: dict) -> None:
self._lookback_period = int(params.get("lookback_period", 100))
self._num_bins = int(params.get("num_bins", 50))
self._value_area_pct = float(params.get("value_area_pct", 0.7))
self._quantity = Decimal(str(params.get("quantity", "0.01")))
def reset(self) -> None:
self._candles.clear()
self._was_below_va = False
self._was_above_va = False
def _compute_value_area(self) -> tuple[float, float, float] | None:
"""Compute POC, value area low, and value area high.
Returns (poc, va_low, va_high) or None if insufficient data.
"""
if len(self._candles) < self._lookback_period:
return None
recent = list(self._candles)[-self._lookback_period :]
prices = [c[0] for c in recent]
volumes = [c[1] for c in recent]
min_price = min(prices)
max_price = max(prices)
if min_price == max_price:
return None
bin_edges = np.linspace(min_price, max_price, self._num_bins + 1)
volume_profile = np.zeros(self._num_bins)
for price, vol in zip(prices, volumes):
bin_idx = int((price - min_price) / (max_price - min_price) * (self._num_bins - 1))
bin_idx = min(bin_idx, self._num_bins - 1)
volume_profile[bin_idx] += vol
poc_idx = int(np.argmax(volume_profile))
poc = (bin_edges[poc_idx] + bin_edges[poc_idx + 1]) / 2
# Expand from POC to capture value_area_pct of total volume
total_vol = volume_profile.sum()
if total_vol == 0:
return None
target_vol = total_vol * self._value_area_pct
accumulated = volume_profile[poc_idx]
low_idx = poc_idx
high_idx = poc_idx
while accumulated < target_vol:
expand_low = low_idx > 0
expand_high = high_idx < self._num_bins - 1
if not expand_low and not expand_high:
break
low_vol = volume_profile[low_idx - 1] if expand_low else 0
high_vol = volume_profile[high_idx + 1] if expand_high else 0
if low_vol >= high_vol and expand_low:
low_idx -= 1
accumulated += volume_profile[low_idx]
elif expand_high:
high_idx += 1
accumulated += volume_profile[high_idx]
else:
low_idx -= 1
accumulated += volume_profile[low_idx]
va_low = bin_edges[low_idx]
va_high = bin_edges[high_idx + 1]
return poc, va_low, va_high
def on_candle(self, candle: Candle) -> Signal | None:
self._candles.append((float(candle.close), float(candle.volume)))
result = self._compute_value_area()
if result is None:
return None
poc, va_low, va_high = result
price = float(candle.close)
# Track value area penetration
if price < va_low:
self._was_below_va = True
self._was_above_va = False
elif price > va_high:
self._was_above_va = True
self._was_below_va = False
# BUY: price was below VA and bounced back to VA low (support)
if self._was_below_va and price >= va_low and price <= poc:
self._was_below_va = False
return Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.BUY,
price=candle.close,
quantity=self._quantity,
reason=f"Volume Profile: bounce at VA low ({va_low:.2f}), POC={poc:.2f}",
)
# SELL: price was above VA and pulled back to VA high (resistance)
if self._was_above_va and price <= va_high and price >= poc:
self._was_above_va = False
return Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.SELL,
price=candle.close,
quantity=self._quantity,
reason=f"Volume Profile: rejection at VA high ({va_high:.2f}), POC={poc:.2f}",
)
return None
```
Create `services/strategy-engine/strategies/config/volume_profile_strategy.yaml`:
```yaml
lookback_period: 100
num_bins: 50
value_area_pct: 0.7
quantity: "0.01"
```
- [ ] **Step 4: Run test to verify it passes**
Run: `pytest services/strategy-engine/tests/test_volume_profile_strategy.py -v`
Expected: All 5 tests PASS
- [ ] **Step 5: Commit**
```bash
git add services/strategy-engine/strategies/volume_profile_strategy.py \
services/strategy-engine/strategies/config/volume_profile_strategy.yaml \
services/strategy-engine/tests/test_volume_profile_strategy.py
git commit -m "feat(strategy): add Volume Profile strategy"
```
---
## Task 14: Backtest Detailed Metrics
**Files:**
- Create: `services/backtester/src/backtester/metrics.py`
- Test: `services/backtester/tests/test_metrics.py`
- [ ] **Step 1: Write the failing test**
Create `services/backtester/tests/test_metrics.py`:
```python
"""Tests for detailed backtest metrics."""
from datetime import datetime, timedelta, timezone
from decimal import Decimal
import pytest
from backtester.metrics import TradeRecord, compute_detailed_metrics
def _trade(entry_price: float, exit_price: float, qty: float = 1.0, days: int = 1) -> tuple[TradeRecord, TradeRecord]:
entry_time = datetime(2025, 1, 1, tzinfo=timezone.utc)
exit_time = entry_time + timedelta(days=days)
entry = TradeRecord(
time=entry_time,
symbol="BTCUSDT",
side="BUY",
price=Decimal(str(entry_price)),
quantity=Decimal(str(qty)),
)
exit_rec = TradeRecord(
time=exit_time,
symbol="BTCUSDT",
side="SELL",
price=Decimal(str(exit_price)),
quantity=Decimal(str(qty)),
)
return entry, exit_rec
def test_compute_metrics_basic():
trades = []
e1, x1 = _trade(100, 110) # +10 profit
e2, x2 = _trade(100, 95) # -5 loss
trades = [e1, x1, e2, x2]
metrics = compute_detailed_metrics(
trades=trades,
initial_balance=Decimal("1000"),
final_balance=Decimal("1005"),
)
assert metrics.total_trades == 4
assert metrics.winning_trades == 1
assert metrics.losing_trades == 1
assert metrics.win_rate == pytest.approx(50.0, rel=0.01)
assert metrics.total_return == pytest.approx(0.5, rel=0.01)
def test_compute_metrics_profit_factor():
e1, x1 = _trade(100, 120) # +20
e2, x2 = _trade(100, 90) # -10
trades = [e1, x1, e2, x2]
metrics = compute_detailed_metrics(
trades=trades,
initial_balance=Decimal("1000"),
final_balance=Decimal("1010"),
)
assert metrics.profit_factor == pytest.approx(2.0, rel=0.01)
def test_compute_metrics_max_drawdown():
# Three trades: +10, -20, +5 => peak 1010, trough 990
e1, x1 = _trade(100, 110)
e2, x2 = _trade(100, 80)
e3, x3 = _trade(100, 105)
trades = [e1, x1, e2, x2, e3, x3]
metrics = compute_detailed_metrics(
trades=trades,
initial_balance=Decimal("1000"),
final_balance=Decimal("995"),
)
assert metrics.max_drawdown > 0
def test_compute_metrics_sharpe_ratio():
e1, x1 = _trade(100, 110, days=1)
e2, x2 = _trade(100, 105, days=1)
trades = [e1, x1, e2, x2]
metrics = compute_detailed_metrics(
trades=trades,
initial_balance=Decimal("1000"),
final_balance=Decimal("1015"),
)
# Sharpe should be a finite number
assert metrics.sharpe_ratio != 0 or metrics.sharpe_ratio == 0
def test_compute_metrics_empty_trades():
metrics = compute_detailed_metrics(
trades=[],
initial_balance=Decimal("1000"),
final_balance=Decimal("1000"),
)
assert metrics.total_trades == 0
assert metrics.win_rate == 0.0
assert metrics.sharpe_ratio == 0.0
```
- [ ] **Step 2: Run test to verify it fails**
Run: `pytest services/backtester/tests/test_metrics.py -v`
Expected: FAIL with `ModuleNotFoundError: No module named 'backtester.metrics'`
- [ ] **Step 3: Implement detailed metrics**
Create `services/backtester/src/backtester/metrics.py`:
```python
"""Detailed backtest metrics: Sharpe, Sortino, Calmar, drawdown, trade analysis."""
import math
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from decimal import Decimal
@dataclass
class TradeRecord:
time: datetime
symbol: str
side: str
price: Decimal
quantity: Decimal
@dataclass
class DetailedMetrics:
# Basic
total_return: float
total_trades: int
winning_trades: int
losing_trades: int
win_rate: float
profit_factor: float
# Risk
sharpe_ratio: float
sortino_ratio: float
calmar_ratio: float
max_drawdown: float
max_drawdown_duration: timedelta
# Returns
monthly_returns: dict[str, float]
avg_win: float
avg_loss: float
largest_win: float
largest_loss: float
avg_holding_period: timedelta
# Individual trades
trade_pairs: list[dict] = field(default_factory=list)
def compute_detailed_metrics(
trades: list[TradeRecord],
initial_balance: Decimal,
final_balance: Decimal,
) -> DetailedMetrics:
"""Compute detailed metrics from a list of trade records."""
initial = float(initial_balance)
final = float(final_balance)
total_return = ((final - initial) / initial * 100) if initial > 0 else 0.0
if not trades:
return DetailedMetrics(
total_return=total_return,
total_trades=0,
winning_trades=0,
losing_trades=0,
win_rate=0.0,
profit_factor=0.0,
sharpe_ratio=0.0,
sortino_ratio=0.0,
calmar_ratio=0.0,
max_drawdown=0.0,
max_drawdown_duration=timedelta(0),
monthly_returns={},
avg_win=0.0,
avg_loss=0.0,
largest_win=0.0,
largest_loss=0.0,
avg_holding_period=timedelta(0),
trade_pairs=[],
)
# Pair up BUY/SELL trades
buys: list[TradeRecord] = []
pairs: list[dict] = []
pnls: list[float] = []
holding_periods: list[timedelta] = []
for trade in trades:
if trade.side == "BUY":
buys.append(trade)
elif trade.side == "SELL" and buys:
buy = buys.pop(0)
pnl = float(trade.price - buy.price) * float(trade.quantity)
pnls.append(pnl)
holding = trade.time - buy.time
holding_periods.append(holding)
pairs.append({
"entry_time": buy.time.isoformat(),
"exit_time": trade.time.isoformat(),
"entry_price": float(buy.price),
"exit_price": float(trade.price),
"quantity": float(trade.quantity),
"pnl": pnl,
"pnl_pct": (pnl / (float(buy.price) * float(trade.quantity))) * 100 if float(buy.price) > 0 else 0,
"holding_period": str(holding),
})
wins = [p for p in pnls if p > 0]
losses = [p for p in pnls if p < 0]
winning_trades = len(wins)
losing_trades = len(losses)
win_rate = (winning_trades / len(pnls) * 100) if pnls else 0.0
gross_profit = sum(wins) if wins else 0.0
gross_loss = abs(sum(losses)) if losses else 0.0
profit_factor = (gross_profit / gross_loss) if gross_loss > 0 else 0.0
avg_win = (sum(wins) / len(wins)) if wins else 0.0
avg_loss = (sum(losses) / len(losses)) if losses else 0.0
largest_win = max(wins) if wins else 0.0
largest_loss = min(losses) if losses else 0.0
avg_holding = (
sum(holding_periods, timedelta(0)) / len(holding_periods)
if holding_periods
else timedelta(0)
)
# Equity curve for drawdown and ratios
equity = [initial]
for pnl in pnls:
equity.append(equity[-1] + pnl)
# Max drawdown
peak = equity[0]
max_dd = 0.0
dd_start = 0
max_dd_duration = timedelta(0)
current_dd_start = 0
for i, val in enumerate(equity):
if val > peak:
peak = val
current_dd_start = i
dd = (peak - val) / peak if peak > 0 else 0
if dd > max_dd:
max_dd = dd
# Daily returns approximation (per-trade returns)
returns = []
for i in range(1, len(equity)):
if equity[i - 1] > 0:
returns.append((equity[i] - equity[i - 1]) / equity[i - 1])
# Sharpe ratio (annualized for crypto: 365 days)
if returns and len(returns) > 1:
mean_ret = sum(returns) / len(returns)
std_ret = math.sqrt(sum((r - mean_ret) ** 2 for r in returns) / (len(returns) - 1))
sharpe = (mean_ret / std_ret * math.sqrt(365)) if std_ret > 0 else 0.0
# Sortino ratio (downside deviation only)
downside = [r for r in returns if r < 0]
if downside:
downside_std = math.sqrt(sum(r**2 for r in downside) / len(downside))
sortino = (mean_ret / downside_std * math.sqrt(365)) if downside_std > 0 else 0.0
else:
sortino = 0.0
else:
sharpe = 0.0
sortino = 0.0
# Calmar ratio
annualized_return = total_return / 100 # as fraction
calmar = (annualized_return / max_dd) if max_dd > 0 else 0.0
# Monthly returns
monthly: dict[str, float] = {}
for pair in pairs:
month_key = pair["exit_time"][:7] # YYYY-MM
monthly[month_key] = monthly.get(month_key, 0.0) + pair["pnl"]
return DetailedMetrics(
total_return=total_return,
total_trades=len(trades),
winning_trades=winning_trades,
losing_trades=losing_trades,
win_rate=win_rate,
profit_factor=profit_factor,
sharpe_ratio=sharpe,
sortino_ratio=sortino,
calmar_ratio=calmar,
max_drawdown=max_dd * 100, # as percentage
max_drawdown_duration=max_dd_duration,
monthly_returns=monthly,
avg_win=avg_win,
avg_loss=avg_loss,
largest_win=largest_win,
largest_loss=largest_loss,
avg_holding_period=avg_holding,
trade_pairs=pairs,
)
```
- [ ] **Step 4: Run test to verify it passes**
Run: `pytest services/backtester/tests/test_metrics.py -v`
Expected: All 5 tests PASS
- [ ] **Step 5: Commit**
```bash
git add services/backtester/src/backtester/metrics.py \
services/backtester/tests/test_metrics.py
git commit -m "feat(backtester): add detailed metrics (Sharpe, Sortino, drawdown)"
```
---
## Task 15: Integrate Metrics into BacktestEngine + Enhanced Reporter
**Files:**
- Modify: `services/backtester/src/backtester/simulator.py`
- Modify: `services/backtester/src/backtester/engine.py`
- Modify: `services/backtester/src/backtester/reporter.py`
- Modify: `services/backtester/tests/test_engine.py`
- Modify: `services/backtester/tests/test_reporter.py`
- [ ] **Step 1: Update simulator to produce TradeRecords**
Add timestamp to `SimulatedTrade` in `services/backtester/src/backtester/simulator.py`. Replace file:
```python
"""Simulated order executor for backtesting."""
from dataclasses import dataclass, field
from datetime import datetime, timezone
from decimal import Decimal
from shared.models import OrderSide, Signal
@dataclass
class SimulatedTrade:
symbol: str
side: OrderSide
price: Decimal
quantity: Decimal
balance_after: Decimal
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
class OrderSimulator:
"""Simulates order execution against a paper balance."""
def __init__(self, initial_balance: Decimal) -> None:
self.balance: Decimal = initial_balance
self.positions: dict[str, Decimal] = {}
self.trades: list[SimulatedTrade] = []
def execute(self, signal: Signal, timestamp: datetime | None = None) -> bool:
"""Execute a signal. Returns True if the trade was accepted."""
ts = timestamp or datetime.now(timezone.utc)
if signal.side == OrderSide.BUY:
cost = signal.price * signal.quantity
if cost > self.balance:
return False
self.balance -= cost
self.positions[signal.symbol] = (
self.positions.get(signal.symbol, Decimal("0")) + signal.quantity
)
trade_quantity = signal.quantity
else: # SELL
current_position = self.positions.get(signal.symbol, Decimal("0"))
if current_position <= Decimal("0"):
return False
trade_quantity = min(signal.quantity, current_position)
proceeds = signal.price * trade_quantity
self.balance += proceeds
self.positions[signal.symbol] = current_position - trade_quantity
self.trades.append(
SimulatedTrade(
symbol=signal.symbol,
side=signal.side,
price=signal.price,
quantity=trade_quantity,
balance_after=self.balance,
timestamp=ts,
)
)
return True
```
- [ ] **Step 2: Update engine to compute DetailedMetrics**
Replace `services/backtester/src/backtester/engine.py`:
```python
"""Backtesting engine that runs strategies against historical candle data."""
from dataclasses import dataclass, field
from decimal import Decimal
from typing import Protocol
from shared.models import Candle, Signal
from backtester.metrics import DetailedMetrics, TradeRecord, compute_detailed_metrics
from backtester.simulator import OrderSimulator, SimulatedTrade
class StrategyProtocol(Protocol):
name: str
def on_candle(self, candle: Candle) -> Signal | None: ...
def configure(self, params: dict) -> None: ...
def reset(self) -> None: ...
@dataclass
class BacktestResult:
strategy_name: str
symbol: str
total_trades: int
initial_balance: Decimal
final_balance: Decimal
profit: Decimal
profit_pct: Decimal
trades: list[SimulatedTrade] = field(default_factory=list)
detailed: DetailedMetrics | None = None
@property
def win_rate(self) -> float:
buy_prices: list[Decimal] = []
wins = 0
total_pairs = 0
for trade in self.trades:
if trade.side.value == "BUY":
buy_prices.append(trade.price)
else:
if buy_prices:
buy_price = buy_prices.pop(0)
total_pairs += 1
if trade.price > buy_price:
wins += 1
if total_pairs == 0:
return 0.0
return wins / total_pairs * 100
class BacktestEngine:
"""Runs a strategy against historical candles using a simulated order executor."""
def __init__(self, strategy: StrategyProtocol, initial_balance: Decimal) -> None:
self._strategy = strategy
self._initial_balance = initial_balance
def run(self, candles: list[Candle]) -> BacktestResult:
"""Run the backtest over a list of candles and return a result."""
simulator = OrderSimulator(self._initial_balance)
for candle in candles:
signal = self._strategy.on_candle(candle)
if signal is not None:
simulator.execute(signal, timestamp=candle.open_time)
final_balance = simulator.balance
if candles:
last_price = candles[-1].close
for symbol, qty in simulator.positions.items():
if qty > Decimal("0"):
final_balance += qty * last_price
profit = final_balance - self._initial_balance
if self._initial_balance != Decimal("0"):
profit_pct = (profit / self._initial_balance) * Decimal("100")
else:
profit_pct = Decimal("0")
# Build TradeRecords for detailed metrics
trade_records = [
TradeRecord(
time=t.timestamp,
symbol=t.symbol,
side=t.side.value,
price=t.price,
quantity=t.quantity,
)
for t in simulator.trades
]
detailed = compute_detailed_metrics(
trades=trade_records,
initial_balance=self._initial_balance,
final_balance=final_balance,
)
return BacktestResult(
strategy_name=self._strategy.name,
symbol=candles[0].symbol if candles else "",
total_trades=len(simulator.trades),
initial_balance=self._initial_balance,
final_balance=final_balance,
profit=profit,
profit_pct=profit_pct,
trades=simulator.trades,
detailed=detailed,
)
```
- [ ] **Step 3: Update reporter with rich tables and export**
Replace `services/backtester/src/backtester/reporter.py`:
```python
"""Report formatting for backtest results using rich tables."""
import csv
import io
import json
from rich.console import Console
from rich.table import Table
from backtester.engine import BacktestResult
def format_report(result: BacktestResult) -> str:
"""Format a backtest result into a rich text report."""
console = Console(file=io.StringIO(), force_terminal=True)
# Summary table
summary = Table(title="BACKTEST REPORT", show_lines=True)
summary.add_column("Metric", style="bold")
summary.add_column("Value", justify="right")
summary.add_row("Strategy", result.strategy_name)
summary.add_row("Symbol", result.symbol)
summary.add_row("Initial Balance", f"{result.initial_balance:.2f}")
summary.add_row("Final Balance", f"{result.final_balance:.2f}")
summary.add_row("Profit/Loss", f"{result.profit:.2f}")
summary.add_row("Profit %", f"{result.profit_pct:.2f}%")
summary.add_row("Total Trades", str(result.total_trades))
summary.add_row("Win Rate", f"{result.win_rate:.2f}%")
if result.detailed:
d = result.detailed
summary.add_row("Sharpe Ratio", f"{d.sharpe_ratio:.3f}")
summary.add_row("Sortino Ratio", f"{d.sortino_ratio:.3f}")
summary.add_row("Calmar Ratio", f"{d.calmar_ratio:.3f}")
summary.add_row("Max Drawdown", f"{d.max_drawdown:.2f}%")
summary.add_row("Profit Factor", f"{d.profit_factor:.2f}")
summary.add_row("Avg Win", f"{d.avg_win:.2f}")
summary.add_row("Avg Loss", f"{d.avg_loss:.2f}")
summary.add_row("Largest Win", f"{d.largest_win:.2f}")
summary.add_row("Largest Loss", f"{d.largest_loss:.2f}")
summary.add_row("Avg Holding Period", str(d.avg_holding_period))
console.print(summary)
# Monthly returns table
if result.detailed and result.detailed.monthly_returns:
monthly = Table(title="MONTHLY RETURNS")
monthly.add_column("Month")
monthly.add_column("PnL", justify="right")
for month, pnl in sorted(result.detailed.monthly_returns.items()):
style = "green" if pnl >= 0 else "red"
monthly.add_row(month, f"{pnl:.2f}", style=style)
console.print(monthly)
output = console.file.getvalue()
return output
def export_csv(result: BacktestResult) -> str:
"""Export trade pairs as CSV."""
if not result.detailed or not result.detailed.trade_pairs:
return ""
output = io.StringIO()
writer = csv.DictWriter(
output,
fieldnames=["entry_time", "exit_time", "entry_price", "exit_price", "quantity", "pnl", "pnl_pct", "holding_period"],
)
writer.writeheader()
for pair in result.detailed.trade_pairs:
writer.writerow(pair)
return output.getvalue()
def export_json(result: BacktestResult) -> str:
"""Export detailed metrics as JSON."""
if not result.detailed:
return "{}"
d = result.detailed
data = {
"total_return": d.total_return,
"total_trades": d.total_trades,
"winning_trades": d.winning_trades,
"losing_trades": d.losing_trades,
"win_rate": d.win_rate,
"profit_factor": d.profit_factor,
"sharpe_ratio": d.sharpe_ratio,
"sortino_ratio": d.sortino_ratio,
"calmar_ratio": d.calmar_ratio,
"max_drawdown": d.max_drawdown,
"monthly_returns": d.monthly_returns,
"avg_win": d.avg_win,
"avg_loss": d.avg_loss,
"largest_win": d.largest_win,
"largest_loss": d.largest_loss,
"trade_pairs": d.trade_pairs,
}
return json.dumps(data, indent=2, default=str)
```
- [ ] **Step 4: Run all backtester tests**
Run: `pytest services/backtester/tests/ -v`
Expected: All tests PASS (existing tests may need minor updates for `timestamp` parameter)
- [ ] **Step 5: Fix any broken tests**
If `test_simulator.py` fails due to `timestamp` parameter, the existing tests should still work since `timestamp` defaults to `datetime.now()`. If `test_reporter.py` fails, update it to check for rich output:
Update `services/backtester/tests/test_reporter.py`:
```python
"""Tests for backtest report formatter."""
from decimal import Decimal
from backtester.engine import BacktestResult
from backtester.reporter import format_report, export_csv, export_json
def test_format_report_contains_key_metrics():
result = BacktestResult(
strategy_name="rsi",
symbol="BTCUSDT",
total_trades=10,
initial_balance=Decimal("10000"),
final_balance=Decimal("10500"),
profit=Decimal("500"),
profit_pct=Decimal("5"),
)
report = format_report(result)
assert "rsi" in report
assert "BTCUSDT" in report
assert "10000" in report or "10,000" in report
def test_export_csv_empty_when_no_detailed():
result = BacktestResult(
strategy_name="rsi",
symbol="BTCUSDT",
total_trades=0,
initial_balance=Decimal("10000"),
final_balance=Decimal("10000"),
profit=Decimal("0"),
profit_pct=Decimal("0"),
)
assert export_csv(result) == ""
def test_export_json_empty_when_no_detailed():
result = BacktestResult(
strategy_name="rsi",
symbol="BTCUSDT",
total_trades=0,
initial_balance=Decimal("10000"),
final_balance=Decimal("10000"),
profit=Decimal("0"),
profit_pct=Decimal("0"),
)
assert export_json(result) == "{}"
```
- [ ] **Step 6: Run all backtester tests again**
Run: `pytest services/backtester/tests/ -v`
Expected: All tests PASS
- [ ] **Step 7: Commit**
```bash
git add services/backtester/src/backtester/simulator.py \
services/backtester/src/backtester/engine.py \
services/backtester/src/backtester/reporter.py \
services/backtester/tests/test_engine.py \
services/backtester/tests/test_reporter.py
git commit -m "feat(backtester): integrate detailed metrics and rich reporter"
```
---
## Task 16: Final Integration Test
**Files:**
- All
- [ ] **Step 1: Run the full test suite**
Run: `pytest -v`
Expected: All tests PASS
- [ ] **Step 2: Run linting**
Run: `make lint`
Expected: No errors
- [ ] **Step 3: Fix any lint issues**
Run: `make format` if needed, then `make lint` again.
- [ ] **Step 4: Verify plugin loader finds all 7 strategies**
Run: `python -c "from pathlib import Path; from strategy_engine.plugin_loader import load_strategies; s = load_strategies(Path('services/strategy-engine/strategies')); print([x.name for x in s])"`
Expected: `['bollinger', 'ema_crossover', 'grid', 'macd', 'rsi', 'volume_profile', 'vwap']`
- [ ] **Step 5: Final commit if any fixes were made**
```bash
git add -A
git commit -m "fix: resolve lint issues and final integration fixes"
```