summaryrefslogtreecommitdiff
path: root/docs/superpowers/plans
diff options
context:
space:
mode:
authorTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-02 10:26:52 +0900
committerTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-02 10:26:52 +0900
commit53cadcf7e34f05f77082e84f0696b56bcbcbae36 (patch)
treee02650e10c4d5727bc1e32e27788c17327fa46f7 /docs/superpowers/plans
parentf5521da2876a2c19afc24f370b3258f2be95f81a (diff)
refactor: remove all crypto/Binance code, update to US stock symbols
Diffstat (limited to 'docs/superpowers/plans')
-rw-r--r--docs/superpowers/plans/2026-04-01-crypto-trading-platform.md4063
-rw-r--r--docs/superpowers/plans/2026-04-01-operations-and-strategy-expansion.md4187
2 files changed, 0 insertions, 8250 deletions
diff --git a/docs/superpowers/plans/2026-04-01-crypto-trading-platform.md b/docs/superpowers/plans/2026-04-01-crypto-trading-platform.md
deleted file mode 100644
index 08ff0f5..0000000
--- a/docs/superpowers/plans/2026-04-01-crypto-trading-platform.md
+++ /dev/null
@@ -1,4063 +0,0 @@
-# Crypto Trading Platform Implementation Plan
-
-> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
-
-**Goal:** Binance 현물 암호화폐 자동매매 플랫폼을 마이크로서비스 아키텍처로 구축한다.
-
-**Architecture:** 6개 독립 서비스(data-collector, strategy-engine, order-executor, portfolio-manager, backtester)가 Redis Streams로 통신하고, PostgreSQL에 데이터를 저장한다. shared 라이브러리가 공통 모델/이벤트/DB 연결을 제공하며, Click 기반 CLI로 전체를 제어한다.
-
-**Tech Stack:** Python 3.12, ccxt, Redis Streams, PostgreSQL, asyncpg, pandas, pandas-ta, Click, pydantic-settings, Docker Compose, pytest
-
----
-
-## File Structure
-
-```
-trading/
-├── services/
-│ ├── data-collector/
-│ │ ├── src/data_collector/__init__.py
-│ │ ├── src/data_collector/main.py
-│ │ ├── src/data_collector/binance_ws.py
-│ │ ├── src/data_collector/binance_rest.py
-│ │ ├── src/data_collector/storage.py
-│ │ ├── src/data_collector/config.py
-│ │ ├── tests/test_binance_rest.py
-│ │ ├── tests/test_storage.py
-│ │ ├── tests/test_main.py
-│ │ ├── Dockerfile
-│ │ └── pyproject.toml
-│ ├── strategy-engine/
-│ │ ├── src/strategy_engine/__init__.py
-│ │ ├── src/strategy_engine/main.py
-│ │ ├── src/strategy_engine/engine.py
-│ │ ├── src/strategy_engine/plugin_loader.py
-│ │ ├── src/strategy_engine/config.py
-│ │ ├── strategies/base.py
-│ │ ├── strategies/rsi_strategy.py
-│ │ ├── strategies/grid_strategy.py
-│ │ ├── tests/test_engine.py
-│ │ ├── tests/test_plugin_loader.py
-│ │ ├── tests/test_rsi_strategy.py
-│ │ ├── tests/test_grid_strategy.py
-│ │ ├── Dockerfile
-│ │ └── pyproject.toml
-│ ├── order-executor/
-│ │ ├── src/order_executor/__init__.py
-│ │ ├── src/order_executor/main.py
-│ │ ├── src/order_executor/executor.py
-│ │ ├── src/order_executor/risk_manager.py
-│ │ ├── src/order_executor/config.py
-│ │ ├── tests/test_executor.py
-│ │ ├── tests/test_risk_manager.py
-│ │ ├── Dockerfile
-│ │ └── pyproject.toml
-│ ├── portfolio-manager/
-│ │ ├── src/portfolio_manager/__init__.py
-│ │ ├── src/portfolio_manager/main.py
-│ │ ├── src/portfolio_manager/portfolio.py
-│ │ ├── src/portfolio_manager/pnl.py
-│ │ ├── src/portfolio_manager/config.py
-│ │ ├── tests/test_portfolio.py
-│ │ ├── tests/test_pnl.py
-│ │ ├── Dockerfile
-│ │ └── pyproject.toml
-│ └── backtester/
-│ ├── src/backtester/__init__.py
-│ ├── src/backtester/main.py
-│ ├── src/backtester/engine.py
-│ ├── src/backtester/simulator.py
-│ ├── src/backtester/reporter.py
-│ ├── src/backtester/config.py
-│ ├── tests/test_engine.py
-│ ├── tests/test_simulator.py
-│ ├── tests/test_reporter.py
-│ ├── Dockerfile
-│ └── pyproject.toml
-├── shared/
-│ ├── src/shared/__init__.py
-│ ├── src/shared/models.py
-│ ├── src/shared/events.py
-│ ├── src/shared/broker.py
-│ ├── src/shared/db.py
-│ ├── src/shared/config.py
-│ ├── tests/test_models.py
-│ ├── tests/test_events.py
-│ ├── tests/test_broker.py
-│ ├── tests/test_db.py
-│ └── pyproject.toml
-├── cli/
-│ ├── src/trading_cli/__init__.py
-│ ├── src/trading_cli/main.py
-│ ├── src/trading_cli/commands/data.py
-│ ├── src/trading_cli/commands/trade.py
-│ ├── src/trading_cli/commands/backtest.py
-│ ├── src/trading_cli/commands/portfolio.py
-│ ├── src/trading_cli/commands/strategy.py
-│ ├── src/trading_cli/commands/service.py
-│ ├── tests/test_cli_data.py
-│ ├── tests/test_cli_trade.py
-│ └── pyproject.toml
-├── docker-compose.yml
-├── .env.example
-├── Makefile
-└── pyproject.toml (workspace root)
-```
-
----
-
-## Task 1: Project Scaffolding
-
-**Files:**
-- Create: `pyproject.toml` (workspace root)
-- Create: `.env.example`
-- Create: `docker-compose.yml`
-- Create: `Makefile`
-- Create: `.gitignore`
-- Create: `shared/pyproject.toml`
-
-- [ ] **Step 1: Initialize git repo**
-
-```bash
-cd /home/si/Private/repos/trading
-git init
-```
-
-- [ ] **Step 2: Create .gitignore**
-
-Create `.gitignore`:
-
-```gitignore
-__pycache__/
-*.py[cod]
-*$py.class
-*.egg-info/
-dist/
-build/
-.eggs/
-*.egg
-.venv/
-venv/
-env/
-.env
-.mypy_cache/
-.pytest_cache/
-.ruff_cache/
-*.log
-.DS_Store
-```
-
-- [ ] **Step 3: Create workspace root pyproject.toml**
-
-Create `pyproject.toml`:
-
-```toml
-[project]
-name = "trading-platform"
-version = "0.1.0"
-description = "Binance spot crypto trading platform"
-requires-python = ">=3.12"
-
-[tool.pytest.ini_options]
-asyncio_mode = "auto"
-testpaths = ["shared/tests", "services/*/tests", "cli/tests"]
-
-[tool.ruff]
-target-version = "py312"
-line-length = 100
-```
-
-- [ ] **Step 4: Create .env.example**
-
-Create `.env.example`:
-
-```env
-BINANCE_API_KEY=
-BINANCE_API_SECRET=
-REDIS_URL=redis://localhost:6379
-DATABASE_URL=postgresql://trading:trading@localhost:5432/trading
-LOG_LEVEL=INFO
-RISK_MAX_POSITION_SIZE=0.1
-RISK_STOP_LOSS_PCT=5
-RISK_DAILY_LOSS_LIMIT_PCT=10
-DRY_RUN=true
-```
-
-- [ ] **Step 5: Create docker-compose.yml**
-
-Create `docker-compose.yml`:
-
-```yaml
-services:
- redis:
- image: redis:7-alpine
- ports:
- - "6379:6379"
- volumes:
- - redis_data:/data
- healthcheck:
- test: ["CMD", "redis-cli", "ping"]
- interval: 5s
- timeout: 3s
- retries: 5
-
- postgres:
- image: postgres:16-alpine
- ports:
- - "5432:5432"
- environment:
- POSTGRES_USER: trading
- POSTGRES_PASSWORD: trading
- POSTGRES_DB: trading
- volumes:
- - postgres_data:/var/lib/postgresql/data
- healthcheck:
- test: ["CMD-LINE", "pg_isready", "-U", "trading"]
- interval: 5s
- timeout: 3s
- retries: 5
-
- data-collector:
- build:
- context: .
- dockerfile: services/data-collector/Dockerfile
- env_file: .env
- depends_on:
- redis:
- condition: service_healthy
- postgres:
- condition: service_healthy
- restart: unless-stopped
-
- strategy-engine:
- build:
- context: .
- dockerfile: services/strategy-engine/Dockerfile
- env_file: .env
- depends_on:
- redis:
- condition: service_healthy
- postgres:
- condition: service_healthy
- restart: unless-stopped
-
- order-executor:
- build:
- context: .
- dockerfile: services/order-executor/Dockerfile
- env_file: .env
- depends_on:
- redis:
- condition: service_healthy
- postgres:
- condition: service_healthy
- restart: unless-stopped
-
- portfolio-manager:
- build:
- context: .
- dockerfile: services/portfolio-manager/Dockerfile
- env_file: .env
- depends_on:
- redis:
- condition: service_healthy
- postgres:
- condition: service_healthy
- restart: unless-stopped
-
-volumes:
- redis_data:
- postgres_data:
-```
-
-- [ ] **Step 6: Create Makefile**
-
-Create `Makefile`:
-
-```makefile
-.PHONY: infra up down logs test lint
-
-infra:
- docker compose up -d redis postgres
-
-up:
- docker compose up -d
-
-down:
- docker compose down
-
-logs:
- docker compose logs -f $(service)
-
-test:
- pytest -v
-
-lint:
- ruff check .
- ruff format --check .
-
-format:
- ruff check --fix .
- ruff format .
-```
-
-- [ ] **Step 7: Create shared/pyproject.toml**
-
-Create `shared/pyproject.toml`:
-
-```toml
-[project]
-name = "trading-shared"
-version = "0.1.0"
-description = "Shared models, events, and utilities for trading platform"
-requires-python = ">=3.12"
-dependencies = [
- "pydantic>=2.0",
- "pydantic-settings>=2.0",
- "redis>=5.0",
- "asyncpg>=0.29",
-]
-
-[project.optional-dependencies]
-dev = [
- "pytest>=8.0",
- "pytest-asyncio>=0.23",
- "ruff>=0.4",
-]
-
-[build-system]
-requires = ["hatchling"]
-build-backend = "hatchling.build"
-
-[tool.hatch.build.targets.wheel]
-packages = ["src/shared"]
-```
-
-- [ ] **Step 8: Commit scaffolding**
-
-```bash
-git add .
-git commit -m "chore: project scaffolding with docker-compose, makefile, shared package"
-```
-
----
-
-## Task 2: Shared — Config & Models
-
-**Files:**
-- Create: `shared/src/shared/__init__.py`
-- Create: `shared/src/shared/config.py`
-- Create: `shared/src/shared/models.py`
-- Create: `shared/tests/test_models.py`
-
-- [ ] **Step 1: Write failing test for config**
-
-Create `shared/tests/test_models.py`:
-
-```python
-from shared.config import Settings
-
-
-def test_settings_defaults():
- settings = Settings(
- binance_api_key="test_key",
- binance_api_secret="test_secret",
- )
- assert settings.redis_url == "redis://localhost:6379"
- assert settings.database_url == "postgresql://trading:trading@localhost:5432/trading"
- assert settings.log_level == "INFO"
- assert settings.dry_run is True
-```
-
-- [ ] **Step 2: Run test to verify it fails**
-
-```bash
-cd /home/si/Private/repos/trading
-pip install -e shared[dev]
-pytest shared/tests/test_models.py::test_settings_defaults -v
-```
-
-Expected: FAIL — `ModuleNotFoundError: No module named 'shared'`
-
-- [ ] **Step 3: Implement config**
-
-Create `shared/src/shared/__init__.py`:
-
-```python
-```
-
-Create `shared/src/shared/config.py`:
-
-```python
-from pydantic_settings import BaseSettings
-
-
-class Settings(BaseSettings):
- binance_api_key: str
- binance_api_secret: str
- redis_url: str = "redis://localhost:6379"
- database_url: str = "postgresql://trading:trading@localhost:5432/trading"
- log_level: str = "INFO"
- risk_max_position_size: float = 0.1
- risk_stop_loss_pct: float = 5.0
- risk_daily_loss_limit_pct: float = 10.0
- dry_run: bool = True
-
- model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}
-```
-
-- [ ] **Step 4: Run test to verify it passes**
-
-```bash
-pytest shared/tests/test_models.py::test_settings_defaults -v
-```
-
-Expected: PASS
-
-- [ ] **Step 5: Write failing tests for models**
-
-Append to `shared/tests/test_models.py`:
-
-```python
-from datetime import datetime, timezone
-from decimal import Decimal
-
-from shared.models import Candle, Signal, Order, Position, OrderSide, OrderType, OrderStatus
-
-
-def test_candle_creation():
- candle = Candle(
- symbol="BTCUSDT",
- timeframe="1m",
- open_time=datetime(2026, 1, 1, tzinfo=timezone.utc),
- open=Decimal("50000"),
- high=Decimal("50100"),
- low=Decimal("49900"),
- close=Decimal("50050"),
- volume=Decimal("1.5"),
- )
- assert candle.symbol == "BTCUSDT"
- assert candle.close == Decimal("50050")
-
-
-def test_signal_creation():
- signal = Signal(
- strategy="rsi_strategy",
- symbol="BTCUSDT",
- side=OrderSide.BUY,
- price=Decimal("50000"),
- quantity=Decimal("0.01"),
- reason="RSI below 30",
- )
- assert signal.side == OrderSide.BUY
- assert signal.reason == "RSI below 30"
-
-
-def test_order_creation():
- order = Order(
- symbol="BTCUSDT",
- signal_id="sig_123",
- side=OrderSide.BUY,
- type=OrderType.MARKET,
- price=Decimal("50000"),
- quantity=Decimal("0.01"),
- )
- assert order.status == OrderStatus.PENDING
- assert order.filled_at is None
- assert order.id is not None
-
-
-def test_position_unrealized_pnl():
- pos = Position(
- symbol="BTCUSDT",
- quantity=Decimal("0.1"),
- avg_entry_price=Decimal("50000"),
- current_price=Decimal("51000"),
- )
- assert pos.unrealized_pnl == Decimal("100") # 0.1 * (51000 - 50000)
-```
-
-- [ ] **Step 6: Run tests to verify they fail**
-
-```bash
-pytest shared/tests/test_models.py -v
-```
-
-Expected: FAIL — `ModuleNotFoundError: No module named 'shared.models'`
-
-- [ ] **Step 7: Implement models**
-
-Create `shared/src/shared/models.py`:
-
-```python
-from datetime import datetime, timezone
-from decimal import Decimal
-from enum import StrEnum
-from uuid import uuid4
-
-from pydantic import BaseModel, Field
-
-
-class OrderSide(StrEnum):
- BUY = "BUY"
- SELL = "SELL"
-
-
-class OrderType(StrEnum):
- MARKET = "MARKET"
- LIMIT = "LIMIT"
-
-
-class OrderStatus(StrEnum):
- PENDING = "PENDING"
- FILLED = "FILLED"
- CANCELLED = "CANCELLED"
- FAILED = "FAILED"
-
-
-class Candle(BaseModel):
- symbol: str
- timeframe: str
- open_time: datetime
- open: Decimal
- high: Decimal
- low: Decimal
- close: Decimal
- volume: Decimal
-
-
-class Signal(BaseModel):
- id: str = Field(default_factory=lambda: str(uuid4()))
- strategy: str
- symbol: str
- side: OrderSide
- price: Decimal
- quantity: Decimal
- reason: str
- created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
-
-
-class Order(BaseModel):
- id: str = Field(default_factory=lambda: str(uuid4()))
- signal_id: str
- symbol: str
- side: OrderSide
- type: OrderType
- price: Decimal
- quantity: Decimal
- status: OrderStatus = OrderStatus.PENDING
- created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
- filled_at: datetime | None = None
-
-
-class Position(BaseModel):
- symbol: str
- quantity: Decimal
- avg_entry_price: Decimal
- current_price: Decimal
-
- @property
- def unrealized_pnl(self) -> Decimal:
- return self.quantity * (self.current_price - self.avg_entry_price)
-```
-
-- [ ] **Step 8: Run tests to verify they pass**
-
-```bash
-pytest shared/tests/test_models.py -v
-```
-
-Expected: All PASS
-
-- [ ] **Step 9: Commit**
-
-```bash
-git add shared/
-git commit -m "feat(shared): add config settings and core data models"
-```
-
----
-
-## Task 3: Shared — Events & Redis Broker
-
-**Files:**
-- Create: `shared/src/shared/events.py`
-- Create: `shared/src/shared/broker.py`
-- Create: `shared/tests/test_events.py`
-- Create: `shared/tests/test_broker.py`
-
-- [ ] **Step 1: Write failing tests for events**
-
-Create `shared/tests/test_events.py`:
-
-```python
-import json
-from decimal import Decimal
-from datetime import datetime, timezone
-
-from shared.events import EventType, Event, CandleEvent, SignalEvent, OrderEvent
-from shared.models import Candle, Signal, Order, OrderSide, OrderType
-
-
-def test_candle_event_serialize():
- candle = Candle(
- symbol="BTCUSDT",
- timeframe="1m",
- open_time=datetime(2026, 1, 1, tzinfo=timezone.utc),
- open=Decimal("50000"),
- high=Decimal("50100"),
- low=Decimal("49900"),
- close=Decimal("50050"),
- volume=Decimal("1.5"),
- )
- event = CandleEvent(data=candle)
- payload = event.to_dict()
- assert payload["type"] == EventType.CANDLE
- assert payload["data"]["symbol"] == "BTCUSDT"
-
-
-def test_candle_event_deserialize():
- candle = Candle(
- symbol="BTCUSDT",
- timeframe="1m",
- open_time=datetime(2026, 1, 1, tzinfo=timezone.utc),
- open=Decimal("50000"),
- high=Decimal("50100"),
- low=Decimal("49900"),
- close=Decimal("50050"),
- volume=Decimal("1.5"),
- )
- event = CandleEvent(data=candle)
- payload = event.to_dict()
- restored = Event.from_dict(payload)
- assert isinstance(restored, CandleEvent)
- assert restored.data.symbol == "BTCUSDT"
-
-
-def test_signal_event_serialize():
- signal = Signal(
- strategy="rsi",
- symbol="BTCUSDT",
- side=OrderSide.BUY,
- price=Decimal("50000"),
- quantity=Decimal("0.01"),
- reason="RSI < 30",
- )
- event = SignalEvent(data=signal)
- payload = event.to_dict()
- assert payload["type"] == EventType.SIGNAL
-```
-
-- [ ] **Step 2: Run tests to verify they fail**
-
-```bash
-pytest shared/tests/test_events.py -v
-```
-
-Expected: FAIL
-
-- [ ] **Step 3: Implement events**
-
-Create `shared/src/shared/events.py`:
-
-```python
-from __future__ import annotations
-
-import json
-from enum import StrEnum
-from typing import Any
-
-from pydantic import BaseModel
-
-from shared.models import Candle, Signal, Order
-
-
-class EventType(StrEnum):
- CANDLE = "candle"
- SIGNAL = "signal"
- ORDER = "order"
-
-
-class CandleEvent(BaseModel):
- type: EventType = EventType.CANDLE
- data: Candle
-
- def to_dict(self) -> dict[str, Any]:
- return json.loads(self.model_dump_json())
-
- @classmethod
- def from_raw(cls, raw: dict[str, Any]) -> CandleEvent:
- return cls.model_validate(raw)
-
-
-class SignalEvent(BaseModel):
- type: EventType = EventType.SIGNAL
- data: Signal
-
- def to_dict(self) -> dict[str, Any]:
- return json.loads(self.model_dump_json())
-
- @classmethod
- def from_raw(cls, raw: dict[str, Any]) -> SignalEvent:
- return cls.model_validate(raw)
-
-
-class OrderEvent(BaseModel):
- type: EventType = EventType.ORDER
- data: Order
-
- def to_dict(self) -> dict[str, Any]:
- return json.loads(self.model_dump_json())
-
- @classmethod
- def from_raw(cls, raw: dict[str, Any]) -> OrderEvent:
- return cls.model_validate(raw)
-
-
-_EVENT_MAP = {
- EventType.CANDLE: CandleEvent,
- EventType.SIGNAL: SignalEvent,
- EventType.ORDER: OrderEvent,
-}
-
-
-class Event:
- @staticmethod
- def from_dict(data: dict[str, Any]) -> CandleEvent | SignalEvent | OrderEvent:
- event_type = EventType(data["type"])
- cls = _EVENT_MAP[event_type]
- return cls.from_raw(data)
-```
-
-- [ ] **Step 4: Run tests to verify they pass**
-
-```bash
-pytest shared/tests/test_events.py -v
-```
-
-Expected: All PASS
-
-- [ ] **Step 5: Write failing tests for broker**
-
-Create `shared/tests/test_broker.py`:
-
-```python
-import asyncio
-import pytest
-from unittest.mock import AsyncMock, MagicMock, patch
-
-from shared.broker import RedisBroker
-
-
-@pytest.fixture
-def mock_redis():
- redis = AsyncMock()
- redis.xadd = AsyncMock(return_value=b"1234-0")
- redis.xread = AsyncMock(return_value=[])
- redis.close = AsyncMock()
- return redis
-
-
-@pytest.mark.asyncio
-async def test_broker_publish(mock_redis):
- broker = RedisBroker.__new__(RedisBroker)
- broker._redis = mock_redis
-
- await broker.publish("candles.BTCUSDT", {"type": "candle", "data": "test"})
-
- mock_redis.xadd.assert_called_once()
- call_args = mock_redis.xadd.call_args
- assert call_args[0][0] == "candles.BTCUSDT"
-
-
-@pytest.mark.asyncio
-async def test_broker_subscribe_returns_messages(mock_redis):
- mock_redis.xread = AsyncMock(return_value=[
- ("candles.BTCUSDT", [
- (b"1234-0", {b"payload": b'{"type":"candle","data":"test"}'}),
- ])
- ])
- broker = RedisBroker.__new__(RedisBroker)
- broker._redis = mock_redis
-
- messages = await broker.read("candles.BTCUSDT", last_id="0-0", count=1)
- assert len(messages) == 1
- assert messages[0]["type"] == "candle"
-```
-
-- [ ] **Step 6: Run tests to verify they fail**
-
-```bash
-pytest shared/tests/test_broker.py -v
-```
-
-Expected: FAIL
-
-- [ ] **Step 7: Implement broker**
-
-Create `shared/src/shared/broker.py`:
-
-```python
-from __future__ import annotations
-
-import json
-
-import redis.asyncio as redis
-
-
-class RedisBroker:
- def __init__(self, redis_url: str):
- self._redis = redis.from_url(redis_url, decode_responses=False)
-
- async def publish(self, stream: str, data: dict) -> str:
- payload = json.dumps(data)
- msg_id = await self._redis.xadd(stream, {"payload": payload.encode()})
- return msg_id
-
- async def read(
- self, stream: str, last_id: str = "$", count: int = 10, block: int = 0
- ) -> list[dict]:
- results = await self._redis.xread({stream: last_id}, count=count, block=block)
- messages = []
- for _stream_name, entries in results:
- for _msg_id, fields in entries:
- payload = fields[b"payload"]
- messages.append(json.loads(payload))
- return messages
-
- async def close(self):
- await self._redis.close()
-```
-
-- [ ] **Step 8: Run tests to verify they pass**
-
-```bash
-pytest shared/tests/test_broker.py -v
-```
-
-Expected: All PASS
-
-- [ ] **Step 9: Commit**
-
-```bash
-git add shared/
-git commit -m "feat(shared): add event system and Redis Streams broker"
-```
-
----
-
-## Task 4: Shared — Database Layer
-
-**Files:**
-- Create: `shared/src/shared/db.py`
-- Create: `shared/tests/test_db.py`
-
-- [ ] **Step 1: Write failing tests for DB**
-
-Create `shared/tests/test_db.py`:
-
-```python
-import pytest
-from unittest.mock import AsyncMock, patch, MagicMock
-
-from shared.db import Database
-
-
-@pytest.mark.asyncio
-async def test_db_init_sql_creates_tables():
- db = Database.__new__(Database)
- db._pool = AsyncMock()
- mock_conn = AsyncMock()
- db._pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
- db._pool.acquire.return_value.__aexit__ = AsyncMock(return_value=False)
-
- await db.init_tables()
-
- mock_conn.execute.assert_called()
- sql = mock_conn.execute.call_args[0][0]
- assert "candles" in sql
- assert "signals" in sql
- assert "orders" in sql
- assert "trades" in sql
- assert "positions" in sql
- assert "portfolio_snapshots" in sql
-
-
-@pytest.mark.asyncio
-async def test_db_insert_candle():
- db = Database.__new__(Database)
- db._pool = AsyncMock()
- mock_conn = AsyncMock()
- db._pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
- db._pool.acquire.return_value.__aexit__ = AsyncMock(return_value=False)
-
- from datetime import datetime, timezone
- from decimal import Decimal
- from shared.models import Candle
-
- candle = Candle(
- symbol="BTCUSDT",
- timeframe="1m",
- open_time=datetime(2026, 1, 1, tzinfo=timezone.utc),
- open=Decimal("50000"),
- high=Decimal("50100"),
- low=Decimal("49900"),
- close=Decimal("50050"),
- volume=Decimal("1.5"),
- )
-
- await db.insert_candle(candle)
- mock_conn.execute.assert_called_once()
- sql = mock_conn.execute.call_args[0][0]
- assert "INSERT INTO candles" in sql
-```
-
-- [ ] **Step 2: Run tests to verify they fail**
-
-```bash
-pytest shared/tests/test_db.py -v
-```
-
-Expected: FAIL
-
-- [ ] **Step 3: Implement database layer**
-
-Create `shared/src/shared/db.py`:
-
-```python
-from __future__ import annotations
-
-import asyncpg
-
-from shared.models import Candle, Order, Signal
-
-_INIT_SQL = """
-CREATE TABLE IF NOT EXISTS candles (
- symbol TEXT NOT NULL,
- timeframe TEXT NOT NULL,
- open_time TIMESTAMPTZ NOT NULL,
- open NUMERIC NOT NULL,
- high NUMERIC NOT NULL,
- low NUMERIC NOT NULL,
- close NUMERIC NOT NULL,
- volume NUMERIC NOT NULL,
- PRIMARY KEY (symbol, timeframe, open_time)
-);
-
-CREATE TABLE IF NOT EXISTS signals (
- id TEXT PRIMARY KEY,
- strategy TEXT NOT NULL,
- symbol TEXT NOT NULL,
- side TEXT NOT NULL,
- price NUMERIC NOT NULL,
- quantity NUMERIC NOT NULL,
- reason TEXT NOT NULL,
- created_at TIMESTAMPTZ NOT NULL
-);
-
-CREATE TABLE IF NOT EXISTS orders (
- id TEXT PRIMARY KEY,
- signal_id TEXT REFERENCES signals(id),
- symbol TEXT NOT NULL,
- side TEXT NOT NULL,
- type TEXT NOT NULL,
- price NUMERIC NOT NULL,
- quantity NUMERIC NOT NULL,
- status TEXT NOT NULL DEFAULT 'PENDING',
- created_at TIMESTAMPTZ NOT NULL,
- filled_at TIMESTAMPTZ
-);
-
-CREATE TABLE IF NOT EXISTS trades (
- id TEXT PRIMARY KEY,
- order_id TEXT REFERENCES orders(id),
- symbol TEXT NOT NULL,
- side TEXT NOT NULL,
- price NUMERIC NOT NULL,
- quantity NUMERIC NOT NULL,
- fee NUMERIC NOT NULL DEFAULT 0,
- traded_at TIMESTAMPTZ NOT NULL
-);
-
-CREATE TABLE IF NOT EXISTS positions (
- symbol TEXT PRIMARY KEY,
- quantity NUMERIC NOT NULL,
- avg_entry_price NUMERIC NOT NULL,
- current_price NUMERIC NOT NULL,
- updated_at TIMESTAMPTZ NOT NULL
-);
-
-CREATE TABLE IF NOT EXISTS portfolio_snapshots (
- id SERIAL PRIMARY KEY,
- total_value NUMERIC NOT NULL,
- realized_pnl NUMERIC NOT NULL,
- unrealized_pnl NUMERIC NOT NULL,
- snapshot_at TIMESTAMPTZ NOT NULL
-);
-"""
-
-
-class Database:
- def __init__(self, dsn: str):
- self._dsn = dsn
- self._pool: asyncpg.Pool | None = None
-
- async def connect(self):
- self._pool = await asyncpg.create_pool(self._dsn)
-
- async def close(self):
- if self._pool:
- await self._pool.close()
-
- async def init_tables(self):
- async with self._pool.acquire() as conn:
- await conn.execute(_INIT_SQL)
-
- async def insert_candle(self, candle: Candle):
- sql = """
- INSERT INTO candles (symbol, timeframe, open_time, open, high, low, close, volume)
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
- ON CONFLICT (symbol, timeframe, open_time) DO NOTHING
- """
- async with self._pool.acquire() as conn:
- await conn.execute(
- sql,
- candle.symbol,
- candle.timeframe,
- candle.open_time,
- candle.open,
- candle.high,
- candle.low,
- candle.close,
- candle.volume,
- )
-
- async def insert_signal(self, signal: Signal):
- sql = """
- INSERT INTO signals (id, strategy, symbol, side, price, quantity, reason, created_at)
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
- """
- async with self._pool.acquire() as conn:
- await conn.execute(
- sql,
- signal.id,
- signal.strategy,
- signal.symbol,
- signal.side.value,
- signal.price,
- signal.quantity,
- signal.reason,
- signal.created_at,
- )
-
- async def insert_order(self, order: Order):
- sql = """
- INSERT INTO orders (id, signal_id, symbol, side, type, price, quantity, status, created_at)
- VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
- """
- async with self._pool.acquire() as conn:
- await conn.execute(
- sql,
- order.id,
- order.signal_id,
- order.symbol,
- order.side.value,
- order.type.value,
- order.price,
- order.quantity,
- order.status.value,
- order.created_at,
- )
-
- async def update_order_status(self, order_id: str, status: str, filled_at=None):
- sql = "UPDATE orders SET status = $1, filled_at = $2 WHERE id = $3"
- async with self._pool.acquire() as conn:
- await conn.execute(sql, status, filled_at, order_id)
-
- async def get_candles(self, symbol: str, timeframe: str, limit: int = 500) -> list[dict]:
- sql = """
- SELECT * FROM candles
- WHERE symbol = $1 AND timeframe = $2
- ORDER BY open_time DESC
- LIMIT $3
- """
- async with self._pool.acquire() as conn:
- rows = await conn.fetch(sql, symbol, timeframe, limit)
- return [dict(r) for r in rows]
-```
-
-- [ ] **Step 4: Run tests to verify they pass**
-
-```bash
-pytest shared/tests/test_db.py -v
-```
-
-Expected: All PASS
-
-- [ ] **Step 5: Commit**
-
-```bash
-git add shared/
-git commit -m "feat(shared): add database layer with table init and CRUD operations"
-```
-
----
-
-## Task 5: Data Collector Service
-
-**Files:**
-- Create: `services/data-collector/pyproject.toml`
-- Create: `services/data-collector/Dockerfile`
-- Create: `services/data-collector/src/data_collector/__init__.py`
-- Create: `services/data-collector/src/data_collector/config.py`
-- Create: `services/data-collector/src/data_collector/binance_rest.py`
-- Create: `services/data-collector/src/data_collector/binance_ws.py`
-- Create: `services/data-collector/src/data_collector/storage.py`
-- Create: `services/data-collector/src/data_collector/main.py`
-- Create: `services/data-collector/tests/test_binance_rest.py`
-- Create: `services/data-collector/tests/test_storage.py`
-
-- [ ] **Step 1: Create pyproject.toml**
-
-Create `services/data-collector/pyproject.toml`:
-
-```toml
-[project]
-name = "data-collector"
-version = "0.1.0"
-description = "Binance market data collector service"
-requires-python = ">=3.12"
-dependencies = [
- "ccxt>=4.0",
- "websockets>=12.0",
- "trading-shared",
-]
-
-[project.optional-dependencies]
-dev = [
- "pytest>=8.0",
- "pytest-asyncio>=0.23",
-]
-
-[build-system]
-requires = ["hatchling"]
-build-backend = "hatchling.build"
-
-[tool.hatch.build.targets.wheel]
-packages = ["src/data_collector"]
-```
-
-- [ ] **Step 2: Write failing tests for binance_rest**
-
-Create `services/data-collector/tests/test_binance_rest.py`:
-
-```python
-import pytest
-from unittest.mock import AsyncMock, patch, MagicMock
-from datetime import datetime, timezone
-from decimal import Decimal
-
-from data_collector.binance_rest import fetch_historical_candles
-
-
-@pytest.mark.asyncio
-async def test_fetch_historical_candles_parses_response():
- mock_exchange = MagicMock()
- mock_exchange.fetch_ohlcv = AsyncMock(return_value=[
- [1704067200000, 50000.0, 50100.0, 49900.0, 50050.0, 1.5],
- [1704067260000, 50050.0, 50200.0, 50000.0, 50150.0, 2.0],
- ])
-
- candles = await fetch_historical_candles(
- exchange=mock_exchange,
- symbol="BTC/USDT",
- timeframe="1m",
- since=datetime(2026, 1, 1, tzinfo=timezone.utc),
- limit=2,
- )
-
- assert len(candles) == 2
- assert candles[0].symbol == "BTCUSDT"
- assert candles[0].close == Decimal("50050.0")
- assert candles[1].volume == Decimal("2.0")
-
-
-@pytest.mark.asyncio
-async def test_fetch_historical_candles_empty_response():
- mock_exchange = MagicMock()
- mock_exchange.fetch_ohlcv = AsyncMock(return_value=[])
-
- candles = await fetch_historical_candles(
- exchange=mock_exchange,
- symbol="BTC/USDT",
- timeframe="1m",
- since=datetime(2026, 1, 1, tzinfo=timezone.utc),
- limit=100,
- )
-
- assert candles == []
-```
-
-- [ ] **Step 3: Run tests to verify they fail**
-
-```bash
-cd /home/si/Private/repos/trading
-pip install -e services/data-collector[dev]
-pytest services/data-collector/tests/test_binance_rest.py -v
-```
-
-Expected: FAIL
-
-- [ ] **Step 4: Implement binance_rest**
-
-Create `services/data-collector/src/data_collector/__init__.py`:
-
-```python
-```
-
-Create `services/data-collector/src/data_collector/binance_rest.py`:
-
-```python
-from __future__ import annotations
-
-from datetime import datetime, timezone
-from decimal import Decimal
-
-from shared.models import Candle
-
-
-async def fetch_historical_candles(
- exchange,
- symbol: str,
- timeframe: str,
- since: datetime,
- limit: int = 500,
-) -> list[Candle]:
- since_ms = int(since.timestamp() * 1000)
- ohlcv = await exchange.fetch_ohlcv(symbol, timeframe, since=since_ms, limit=limit)
-
- normalized_symbol = symbol.replace("/", "")
- candles = []
- for row in ohlcv:
- ts, o, h, l, c, v = row
- candles.append(
- Candle(
- symbol=normalized_symbol,
- timeframe=timeframe,
- open_time=datetime.fromtimestamp(ts / 1000, tz=timezone.utc),
- open=Decimal(str(o)),
- high=Decimal(str(h)),
- low=Decimal(str(l)),
- close=Decimal(str(c)),
- volume=Decimal(str(v)),
- )
- )
- return candles
-```
-
-- [ ] **Step 5: Run tests to verify they pass**
-
-```bash
-pytest services/data-collector/tests/test_binance_rest.py -v
-```
-
-Expected: All PASS
-
-- [ ] **Step 6: Write failing tests for storage**
-
-Create `services/data-collector/tests/test_storage.py`:
-
-```python
-import pytest
-from unittest.mock import AsyncMock
-from datetime import datetime, timezone
-from decimal import Decimal
-
-from shared.models import Candle
-from data_collector.storage import CandleStorage
-
-
-@pytest.fixture
-def mock_db():
- db = AsyncMock()
- db.insert_candle = AsyncMock()
- return db
-
-
-@pytest.fixture
-def mock_broker():
- broker = AsyncMock()
- broker.publish = AsyncMock()
- return broker
-
-
-@pytest.fixture
-def sample_candle():
- return Candle(
- symbol="BTCUSDT",
- timeframe="1m",
- open_time=datetime(2026, 1, 1, tzinfo=timezone.utc),
- open=Decimal("50000"),
- high=Decimal("50100"),
- low=Decimal("49900"),
- close=Decimal("50050"),
- volume=Decimal("1.5"),
- )
-
-
-@pytest.mark.asyncio
-async def test_storage_saves_to_db_and_publishes(mock_db, mock_broker, sample_candle):
- storage = CandleStorage(db=mock_db, broker=mock_broker)
- await storage.store(sample_candle)
-
- mock_db.insert_candle.assert_called_once_with(sample_candle)
- mock_broker.publish.assert_called_once()
- call_args = mock_broker.publish.call_args
- assert call_args[0][0] == "candles.BTCUSDT"
-
-
-@pytest.mark.asyncio
-async def test_storage_batch_store(mock_db, mock_broker, sample_candle):
- storage = CandleStorage(db=mock_db, broker=mock_broker)
- candles = [sample_candle, sample_candle]
- await storage.store_batch(candles)
-
- assert mock_db.insert_candle.call_count == 2
- assert mock_broker.publish.call_count == 2
-```
-
-- [ ] **Step 7: Run tests to verify they fail**
-
-```bash
-pytest services/data-collector/tests/test_storage.py -v
-```
-
-Expected: FAIL
-
-- [ ] **Step 8: Implement storage**
-
-Create `services/data-collector/src/data_collector/storage.py`:
-
-```python
-from __future__ import annotations
-
-from shared.broker import RedisBroker
-from shared.db import Database
-from shared.events import CandleEvent
-from shared.models import Candle
-
-
-class CandleStorage:
- def __init__(self, db: Database, broker: RedisBroker):
- self._db = db
- self._broker = broker
-
- async def store(self, candle: Candle):
- await self._db.insert_candle(candle)
- event = CandleEvent(data=candle)
- await self._broker.publish(f"candles.{candle.symbol}", event.to_dict())
-
- async def store_batch(self, candles: list[Candle]):
- for candle in candles:
- await self.store(candle)
-```
-
-- [ ] **Step 9: Run tests to verify they pass**
-
-```bash
-pytest services/data-collector/tests/test_storage.py -v
-```
-
-Expected: All PASS
-
-- [ ] **Step 10: Implement config, binance_ws, and main**
-
-Create `services/data-collector/src/data_collector/config.py`:
-
-```python
-from shared.config import Settings
-
-
-class CollectorConfig(Settings):
- symbols: list[str] = ["BTC/USDT"]
- timeframes: list[str] = ["1m"]
-```
-
-Create `services/data-collector/src/data_collector/binance_ws.py`:
-
-```python
-from __future__ import annotations
-
-import asyncio
-import json
-import logging
-from datetime import datetime, timezone
-from decimal import Decimal
-
-import websockets
-
-from shared.models import Candle
-
-logger = logging.getLogger(__name__)
-
-BINANCE_WS_URL = "wss://stream.binance.com:9443/ws"
-
-
-class BinanceWebSocket:
- def __init__(self, symbols: list[str], timeframe: str, on_candle):
- self._symbols = symbols
- self._timeframe = timeframe
- self._on_candle = on_candle
- self._running = False
-
- async def start(self):
- streams = [
- f"{s.lower().replace('/', '')}@kline_{self._timeframe}"
- for s in self._symbols
- ]
- url = f"{BINANCE_WS_URL}/{'/'.join(streams)}"
- self._running = True
- logger.info(f"Connecting to Binance WS: {streams}")
-
- while self._running:
- try:
- async with websockets.connect(url) as ws:
- async for raw in ws:
- if not self._running:
- break
- msg = json.loads(raw)
- if "k" in msg:
- candle = self._parse_kline(msg["k"])
- if candle:
- await self._on_candle(candle)
- except websockets.ConnectionClosed:
- logger.warning("WebSocket disconnected, reconnecting in 5s...")
- await asyncio.sleep(5)
- except Exception as e:
- logger.error(f"WebSocket error: {e}, reconnecting in 5s...")
- await asyncio.sleep(5)
-
- def stop(self):
- self._running = False
-
- def _parse_kline(self, k: dict) -> Candle | None:
- if not k.get("x"): # only closed candles
- return None
- return Candle(
- symbol=k["s"],
- timeframe=k["i"],
- open_time=datetime.fromtimestamp(k["t"] / 1000, tz=timezone.utc),
- open=Decimal(k["o"]),
- high=Decimal(k["h"]),
- low=Decimal(k["l"]),
- close=Decimal(k["c"]),
- volume=Decimal(k["v"]),
- )
-```
-
-Create `services/data-collector/src/data_collector/main.py`:
-
-```python
-from __future__ import annotations
-
-import asyncio
-import logging
-
-import ccxt.async_support as ccxt
-
-from shared.broker import RedisBroker
-from shared.db import Database
-from data_collector.binance_ws import BinanceWebSocket
-from data_collector.config import CollectorConfig
-from data_collector.storage import CandleStorage
-
-logger = logging.getLogger(__name__)
-
-
-async def run():
- config = CollectorConfig()
- logging.basicConfig(level=config.log_level)
-
- db = Database(config.database_url)
- await db.connect()
- await db.init_tables()
-
- broker = RedisBroker(config.redis_url)
- storage = CandleStorage(db=db, broker=broker)
-
- ws = BinanceWebSocket(
- symbols=config.symbols,
- timeframe=config.timeframes[0],
- on_candle=storage.store,
- )
-
- logger.info(f"Starting data collector: symbols={config.symbols}")
- try:
- await ws.start()
- finally:
- ws.stop()
- await broker.close()
- await db.close()
-
-
-def main():
- asyncio.run(run())
-
-
-if __name__ == "__main__":
- main()
-```
-
-- [ ] **Step 11: Create Dockerfile**
-
-Create `services/data-collector/Dockerfile`:
-
-```dockerfile
-FROM python:3.12-slim
-
-WORKDIR /app
-
-COPY shared/ shared/
-RUN pip install --no-cache-dir ./shared
-
-COPY services/data-collector/ services/data-collector/
-RUN pip install --no-cache-dir ./services/data-collector
-
-CMD ["python", "-m", "data_collector.main"]
-```
-
-- [ ] **Step 12: Commit**
-
-```bash
-git add services/data-collector/
-git commit -m "feat(data-collector): add Binance REST/WS data collection with storage pipeline"
-```
-
----
-
-## Task 6: Strategy Engine Service
-
-**Files:**
-- Create: `services/strategy-engine/pyproject.toml`
-- Create: `services/strategy-engine/Dockerfile`
-- Create: `services/strategy-engine/src/strategy_engine/__init__.py`
-- Create: `services/strategy-engine/src/strategy_engine/config.py`
-- Create: `services/strategy-engine/strategies/base.py`
-- Create: `services/strategy-engine/strategies/rsi_strategy.py`
-- Create: `services/strategy-engine/strategies/grid_strategy.py`
-- Create: `services/strategy-engine/src/strategy_engine/plugin_loader.py`
-- Create: `services/strategy-engine/src/strategy_engine/engine.py`
-- Create: `services/strategy-engine/src/strategy_engine/main.py`
-- Create: `services/strategy-engine/tests/test_rsi_strategy.py`
-- Create: `services/strategy-engine/tests/test_grid_strategy.py`
-- Create: `services/strategy-engine/tests/test_plugin_loader.py`
-- Create: `services/strategy-engine/tests/test_engine.py`
-
-- [ ] **Step 1: Create pyproject.toml**
-
-Create `services/strategy-engine/pyproject.toml`:
-
-```toml
-[project]
-name = "strategy-engine"
-version = "0.1.0"
-description = "Plugin-based strategy execution engine"
-requires-python = ">=3.12"
-dependencies = [
- "pandas>=2.0",
- "pandas-ta>=0.3",
- "trading-shared",
-]
-
-[project.optional-dependencies]
-dev = [
- "pytest>=8.0",
- "pytest-asyncio>=0.23",
-]
-
-[build-system]
-requires = ["hatchling"]
-build-backend = "hatchling.build"
-
-[tool.hatch.build.targets.wheel]
-packages = ["src/strategy_engine"]
-```
-
-- [ ] **Step 2: Implement base strategy**
-
-Create `services/strategy-engine/src/strategy_engine/__init__.py`:
-
-```python
-```
-
-Create `services/strategy-engine/strategies/base.py`:
-
-```python
-from __future__ import annotations
-
-from abc import ABC, abstractmethod
-
-from shared.models import Candle, Signal
-
-
-class BaseStrategy(ABC):
- name: str = "base"
-
- @abstractmethod
- def on_candle(self, candle: Candle) -> Signal | None:
- pass
-
- @abstractmethod
- def configure(self, params: dict) -> None:
- pass
-
- def reset(self) -> None:
- pass
-```
-
-- [ ] **Step 3: Write failing tests for RSI strategy**
-
-Create `services/strategy-engine/tests/test_rsi_strategy.py`:
-
-```python
-from datetime import datetime, timezone
-from decimal import Decimal
-
-from shared.models import Candle, OrderSide
-
-
-def make_candle(close: float, idx: int = 0) -> Candle:
- return Candle(
- symbol="BTCUSDT",
- timeframe="1m",
- open_time=datetime(2026, 1, 1, minute=idx, tzinfo=timezone.utc),
- open=Decimal(str(close)),
- high=Decimal(str(close + 10)),
- low=Decimal(str(close - 10)),
- close=Decimal(str(close)),
- volume=Decimal("1.0"),
- )
-
-
-def test_rsi_strategy_no_signal_insufficient_data():
- from strategy_engine.strategies.rsi_strategy import RsiStrategy
-
- strategy = RsiStrategy()
- strategy.configure({"period": 14, "oversold": 30, "overbought": 70, "quantity": 0.01})
-
- signal = strategy.on_candle(make_candle(50000))
- assert signal is None
-
-
-def test_rsi_strategy_buy_signal_on_oversold():
- from strategy_engine.strategies.rsi_strategy import RsiStrategy
-
- strategy = RsiStrategy()
- strategy.configure({"period": 14, "oversold": 30, "overbought": 70, "quantity": 0.01})
-
- # Feed declining prices to push RSI below 30
- prices = [50000 - i * 100 for i in range(20)]
- signal = None
- for i, p in enumerate(prices):
- signal = strategy.on_candle(make_candle(p, idx=i))
-
- # After sustained drop, RSI should be oversold → BUY signal
- if signal is not None:
- assert signal.side == OrderSide.BUY
- assert signal.strategy == "rsi"
-```
-
-- [ ] **Step 4: Run tests to verify they fail**
-
-```bash
-pip install -e services/strategy-engine[dev]
-pytest services/strategy-engine/tests/test_rsi_strategy.py -v
-```
-
-Expected: FAIL
-
-- [ ] **Step 5: Implement RSI strategy**
-
-Create `services/strategy-engine/strategies/rsi_strategy.py`:
-
-```python
-from __future__ import annotations
-
-from collections import deque
-from decimal import Decimal
-
-import pandas as pd
-import pandas_ta as ta
-
-from shared.models import Candle, Signal, OrderSide
-from strategies.base import BaseStrategy
-
-
-class RsiStrategy(BaseStrategy):
- name = "rsi"
-
- def __init__(self):
- self._closes: deque[float] = deque(maxlen=200)
- self._period: int = 14
- self._oversold: float = 30
- self._overbought: float = 70
- self._quantity: Decimal = Decimal("0.01")
-
- def configure(self, params: dict) -> None:
- self._period = params.get("period", 14)
- self._oversold = params.get("oversold", 30)
- self._overbought = params.get("overbought", 70)
- self._quantity = Decimal(str(params.get("quantity", 0.01)))
-
- def on_candle(self, candle: Candle) -> Signal | None:
- self._closes.append(float(candle.close))
-
- if len(self._closes) < self._period + 1:
- return None
-
- series = pd.Series(list(self._closes))
- rsi = ta.rsi(series, length=self._period)
- current_rsi = rsi.iloc[-1]
-
- if current_rsi < self._oversold:
- return Signal(
- strategy=self.name,
- symbol=candle.symbol,
- side=OrderSide.BUY,
- price=candle.close,
- quantity=self._quantity,
- reason=f"RSI={current_rsi:.1f} < {self._oversold}",
- )
- elif current_rsi > self._overbought:
- return Signal(
- strategy=self.name,
- symbol=candle.symbol,
- side=OrderSide.SELL,
- price=candle.close,
- quantity=self._quantity,
- reason=f"RSI={current_rsi:.1f} > {self._overbought}",
- )
- return None
-
- def reset(self) -> None:
- self._closes.clear()
-```
-
-- [ ] **Step 6: Run tests to verify they pass**
-
-```bash
-pytest services/strategy-engine/tests/test_rsi_strategy.py -v
-```
-
-Expected: All PASS
-
-- [ ] **Step 7: Write failing tests for grid strategy**
-
-Create `services/strategy-engine/tests/test_grid_strategy.py`:
-
-```python
-from datetime import datetime, timezone
-from decimal import Decimal
-
-from shared.models import Candle, OrderSide
-
-
-def make_candle(close: float, idx: int = 0) -> Candle:
- return Candle(
- symbol="BTCUSDT",
- timeframe="1m",
- open_time=datetime(2026, 1, 1, minute=idx, tzinfo=timezone.utc),
- open=Decimal(str(close)),
- high=Decimal(str(close + 10)),
- low=Decimal(str(close - 10)),
- close=Decimal(str(close)),
- volume=Decimal("1.0"),
- )
-
-
-def test_grid_strategy_buy_at_lower_grid():
- from strategy_engine.strategies.grid_strategy import GridStrategy
-
- strategy = GridStrategy()
- strategy.configure({
- "lower_price": 48000,
- "upper_price": 52000,
- "grid_count": 5,
- "quantity": 0.01,
- })
-
- # Price at grid level should trigger BUY
- signal = strategy.on_candle(make_candle(48000))
- # First candle sets reference, no signal
- signal = strategy.on_candle(make_candle(49000, idx=1))
- # Moving down through a grid level
- signal = strategy.on_candle(make_candle(48000, idx=2))
- if signal is not None:
- assert signal.side == OrderSide.BUY
-
-
-def test_grid_strategy_sell_at_upper_grid():
- from strategy_engine.strategies.grid_strategy import GridStrategy
-
- strategy = GridStrategy()
- strategy.configure({
- "lower_price": 48000,
- "upper_price": 52000,
- "grid_count": 5,
- "quantity": 0.01,
- })
-
- signal = strategy.on_candle(make_candle(50000))
- signal = strategy.on_candle(make_candle(51000, idx=1))
- signal = strategy.on_candle(make_candle(52000, idx=2))
- if signal is not None:
- assert signal.side == OrderSide.SELL
-
-
-def test_grid_strategy_no_signal_in_same_zone():
- from strategy_engine.strategies.grid_strategy import GridStrategy
-
- strategy = GridStrategy()
- strategy.configure({
- "lower_price": 48000,
- "upper_price": 52000,
- "grid_count": 5,
- "quantity": 0.01,
- })
-
- strategy.on_candle(make_candle(50000))
- signal = strategy.on_candle(make_candle(50050, idx=1))
- assert signal is None # same grid zone, no signal
-```
-
-- [ ] **Step 8: Run tests to verify they fail**
-
-```bash
-pytest services/strategy-engine/tests/test_grid_strategy.py -v
-```
-
-Expected: FAIL
-
-- [ ] **Step 9: Implement grid strategy**
-
-Create `services/strategy-engine/strategies/grid_strategy.py`:
-
-```python
-from __future__ import annotations
-
-from decimal import Decimal
-
-from shared.models import Candle, Signal, OrderSide
-from strategies.base import BaseStrategy
-
-
-class GridStrategy(BaseStrategy):
- name = "grid"
-
- def __init__(self):
- self._lower: float = 0
- self._upper: float = 0
- self._grid_count: int = 5
- self._quantity: Decimal = Decimal("0.01")
- self._grid_levels: list[float] = []
- self._last_zone: int | None = None
-
- def configure(self, params: dict) -> None:
- self._lower = float(params["lower_price"])
- self._upper = float(params["upper_price"])
- self._grid_count = params.get("grid_count", 5)
- self._quantity = Decimal(str(params.get("quantity", 0.01)))
- step = (self._upper - self._lower) / self._grid_count
- self._grid_levels = [self._lower + step * i for i in range(self._grid_count + 1)]
-
- def on_candle(self, candle: Candle) -> Signal | None:
- price = float(candle.close)
- current_zone = self._get_zone(price)
-
- if self._last_zone is None:
- self._last_zone = current_zone
- return None
-
- signal = None
- if current_zone < self._last_zone:
- signal = Signal(
- strategy=self.name,
- symbol=candle.symbol,
- side=OrderSide.BUY,
- price=candle.close,
- quantity=self._quantity,
- reason=f"Price crossed grid down: zone {self._last_zone}->{current_zone}",
- )
- elif current_zone > self._last_zone:
- signal = Signal(
- strategy=self.name,
- symbol=candle.symbol,
- side=OrderSide.SELL,
- price=candle.close,
- quantity=self._quantity,
- reason=f"Price crossed grid up: zone {self._last_zone}->{current_zone}",
- )
-
- self._last_zone = current_zone
- return signal
-
- def _get_zone(self, price: float) -> int:
- for i, level in enumerate(self._grid_levels):
- if price < level:
- return i
- return len(self._grid_levels)
-
- def reset(self) -> None:
- self._last_zone = None
-```
-
-- [ ] **Step 10: Run tests to verify they pass**
-
-```bash
-pytest services/strategy-engine/tests/test_grid_strategy.py -v
-```
-
-Expected: All PASS
-
-- [ ] **Step 11: Write failing tests for plugin_loader**
-
-Create `services/strategy-engine/tests/test_plugin_loader.py`:
-
-```python
-import pytest
-from pathlib import Path
-
-from strategy_engine.plugin_loader import load_strategies
-
-
-def test_load_strategies_finds_rsi_and_grid():
- strategies_dir = Path(__file__).parent.parent / "strategies"
- loaded = load_strategies(strategies_dir)
-
- names = {s.name for s in loaded}
- assert "rsi" in names
- assert "grid" in names
-
-
-def test_load_strategies_skips_base():
- strategies_dir = Path(__file__).parent.parent / "strategies"
- loaded = load_strategies(strategies_dir)
-
- names = {s.name for s in loaded}
- assert "base" not in names
-```
-
-- [ ] **Step 12: Run tests to verify they fail**
-
-```bash
-pytest services/strategy-engine/tests/test_plugin_loader.py -v
-```
-
-Expected: FAIL
-
-- [ ] **Step 13: Implement plugin_loader**
-
-Create `services/strategy-engine/src/strategy_engine/plugin_loader.py`:
-
-```python
-from __future__ import annotations
-
-import importlib.util
-import logging
-from pathlib import Path
-
-from strategies.base import BaseStrategy
-
-logger = logging.getLogger(__name__)
-
-
-def load_strategies(strategies_dir: Path) -> list[BaseStrategy]:
- loaded = []
- for path in strategies_dir.glob("*.py"):
- if path.stem.startswith("_") or path.stem == "base":
- continue
-
- spec = importlib.util.spec_from_file_location(path.stem, path)
- module = importlib.util.module_from_spec(spec)
- spec.loader.exec_module(module)
-
- for attr_name in dir(module):
- attr = getattr(module, attr_name)
- if (
- isinstance(attr, type)
- and issubclass(attr, BaseStrategy)
- and attr is not BaseStrategy
- ):
- instance = attr()
- loaded.append(instance)
- logger.info(f"Loaded strategy: {instance.name}")
-
- return loaded
-```
-
-- [ ] **Step 14: Run tests to verify they pass**
-
-```bash
-pytest services/strategy-engine/tests/test_plugin_loader.py -v
-```
-
-Expected: All PASS
-
-- [ ] **Step 15: Write failing tests for engine**
-
-Create `services/strategy-engine/tests/test_engine.py`:
-
-```python
-import pytest
-from unittest.mock import AsyncMock, MagicMock
-from datetime import datetime, timezone
-from decimal import Decimal
-
-from shared.models import Candle, OrderSide
-from shared.events import CandleEvent
-from strategy_engine.engine import StrategyEngine
-
-
-def make_candle_event() -> dict:
- candle = Candle(
- symbol="BTCUSDT",
- timeframe="1m",
- open_time=datetime(2026, 1, 1, tzinfo=timezone.utc),
- open=Decimal("50000"),
- high=Decimal("50100"),
- low=Decimal("49900"),
- close=Decimal("50050"),
- volume=Decimal("1.0"),
- )
- return CandleEvent(data=candle).to_dict()
-
-
-@pytest.mark.asyncio
-async def test_engine_dispatches_candle_to_strategies():
- mock_strategy = MagicMock()
- mock_strategy.name = "test"
- mock_strategy.on_candle.return_value = None
-
- mock_broker = AsyncMock()
- mock_broker.read = AsyncMock(return_value=[make_candle_event()])
-
- engine = StrategyEngine(broker=mock_broker, strategies=[mock_strategy])
- await engine.process_once(stream="candles.BTCUSDT", last_id="0-0")
-
- mock_strategy.on_candle.assert_called_once()
-
-
-@pytest.mark.asyncio
-async def test_engine_publishes_signal_when_strategy_returns_one():
- from shared.models import Signal
-
- mock_signal = Signal(
- strategy="test",
- symbol="BTCUSDT",
- side=OrderSide.BUY,
- price=Decimal("50000"),
- quantity=Decimal("0.01"),
- reason="test reason",
- )
- mock_strategy = MagicMock()
- mock_strategy.name = "test"
- mock_strategy.on_candle.return_value = mock_signal
-
- mock_broker = AsyncMock()
- mock_broker.read = AsyncMock(return_value=[make_candle_event()])
- mock_broker.publish = AsyncMock()
-
- engine = StrategyEngine(broker=mock_broker, strategies=[mock_strategy])
- await engine.process_once(stream="candles.BTCUSDT", last_id="0-0")
-
- mock_broker.publish.assert_called_once()
- call_args = mock_broker.publish.call_args
- assert call_args[0][0] == "signals"
-```
-
-- [ ] **Step 16: Run tests to verify they fail**
-
-```bash
-pytest services/strategy-engine/tests/test_engine.py -v
-```
-
-Expected: FAIL
-
-- [ ] **Step 17: Implement engine**
-
-Create `services/strategy-engine/src/strategy_engine/engine.py`:
-
-```python
-from __future__ import annotations
-
-import logging
-
-from shared.broker import RedisBroker
-from shared.events import Event, SignalEvent
-from shared.models import Signal
-from strategies.base import BaseStrategy
-
-logger = logging.getLogger(__name__)
-
-
-class StrategyEngine:
- def __init__(self, broker: RedisBroker, strategies: list[BaseStrategy]):
- self._broker = broker
- self._strategies = strategies
-
- async def process_once(self, stream: str, last_id: str) -> str:
- messages = await self._broker.read(stream, last_id=last_id, count=10, block=1000)
-
- for msg in messages:
- event = Event.from_dict(msg)
- candle = event.data
-
- for strategy in self._strategies:
- signal = strategy.on_candle(candle)
- if signal is not None:
- logger.info(f"Signal from {strategy.name}: {signal.side} {signal.symbol}")
- await self._publish_signal(signal)
-
- return last_id
-
- async def _publish_signal(self, signal: Signal):
- event = SignalEvent(data=signal)
- await self._broker.publish("signals", event.to_dict())
-```
-
-- [ ] **Step 18: Run tests to verify they pass**
-
-```bash
-pytest services/strategy-engine/tests/test_engine.py -v
-```
-
-Expected: All PASS
-
-- [ ] **Step 19: Implement config and main**
-
-Create `services/strategy-engine/src/strategy_engine/config.py`:
-
-```python
-from shared.config import Settings
-
-
-class StrategyConfig(Settings):
- symbols: list[str] = ["BTC/USDT"]
- timeframes: list[str] = ["1m"]
- strategy_params: dict = {}
-```
-
-Create `services/strategy-engine/src/strategy_engine/main.py`:
-
-```python
-from __future__ import annotations
-
-import asyncio
-import logging
-from pathlib import Path
-
-from shared.broker import RedisBroker
-from strategy_engine.config import StrategyConfig
-from strategy_engine.engine import StrategyEngine
-from strategy_engine.plugin_loader import load_strategies
-
-logger = logging.getLogger(__name__)
-
-
-async def run():
- config = StrategyConfig()
- logging.basicConfig(level=config.log_level)
-
- broker = RedisBroker(config.redis_url)
- strategies_dir = Path(__file__).parent.parent.parent / "strategies"
- strategies = load_strategies(strategies_dir)
-
- for s in strategies:
- params = config.strategy_params.get(s.name, {})
- s.configure(params)
-
- engine = StrategyEngine(broker=broker, strategies=strategies)
- symbols = [s.replace("/", "") for s in config.symbols]
-
- logger.info(f"Starting strategy engine: strategies={[s.name for s in strategies]}")
- last_ids = {sym: "0-0" for sym in symbols}
- try:
- while True:
- for sym in symbols:
- stream = f"candles.{sym}"
- last_ids[sym] = await engine.process_once(stream, last_ids[sym])
- finally:
- await broker.close()
-
-
-def main():
- asyncio.run(run())
-
-
-if __name__ == "__main__":
- main()
-```
-
-- [ ] **Step 20: Create Dockerfile**
-
-Create `services/strategy-engine/Dockerfile`:
-
-```dockerfile
-FROM python:3.12-slim
-
-WORKDIR /app
-
-COPY shared/ shared/
-RUN pip install --no-cache-dir ./shared
-
-COPY services/strategy-engine/ services/strategy-engine/
-RUN pip install --no-cache-dir ./services/strategy-engine
-
-CMD ["python", "-m", "strategy_engine.main"]
-```
-
-- [ ] **Step 21: Commit**
-
-```bash
-git add services/strategy-engine/
-git commit -m "feat(strategy-engine): add plugin-based strategy engine with RSI and grid strategies"
-```
-
----
-
-## Task 7: Order Executor Service
-
-**Files:**
-- Create: `services/order-executor/pyproject.toml`
-- Create: `services/order-executor/Dockerfile`
-- Create: `services/order-executor/src/order_executor/__init__.py`
-- Create: `services/order-executor/src/order_executor/config.py`
-- Create: `services/order-executor/src/order_executor/risk_manager.py`
-- Create: `services/order-executor/src/order_executor/executor.py`
-- Create: `services/order-executor/src/order_executor/main.py`
-- Create: `services/order-executor/tests/test_risk_manager.py`
-- Create: `services/order-executor/tests/test_executor.py`
-
-- [ ] **Step 1: Create pyproject.toml**
-
-Create `services/order-executor/pyproject.toml`:
-
-```toml
-[project]
-name = "order-executor"
-version = "0.1.0"
-description = "Order execution service with risk management"
-requires-python = ">=3.12"
-dependencies = [
- "ccxt>=4.0",
- "trading-shared",
-]
-
-[project.optional-dependencies]
-dev = [
- "pytest>=8.0",
- "pytest-asyncio>=0.23",
-]
-
-[build-system]
-requires = ["hatchling"]
-build-backend = "hatchling.build"
-
-[tool.hatch.build.targets.wheel]
-packages = ["src/order_executor"]
-```
-
-- [ ] **Step 2: Write failing tests for risk_manager**
-
-Create `services/order-executor/tests/test_risk_manager.py`:
-
-```python
-import pytest
-from decimal import Decimal
-
-from shared.models import Signal, OrderSide
-from order_executor.risk_manager import RiskManager, RiskCheckResult
-
-
-@pytest.fixture
-def risk_manager():
- return RiskManager(
- max_position_size=Decimal("0.1"),
- stop_loss_pct=Decimal("5"),
- daily_loss_limit_pct=Decimal("10"),
- )
-
-
-def make_signal(side=OrderSide.BUY, quantity="0.01", price="50000") -> Signal:
- return Signal(
- strategy="test",
- symbol="BTCUSDT",
- side=side,
- price=Decimal(price),
- quantity=Decimal(quantity),
- reason="test",
- )
-
-
-def test_risk_check_passes_normal_order(risk_manager):
- signal = make_signal()
- balance = Decimal("10000")
- positions = {}
- daily_pnl = Decimal("0")
-
- result = risk_manager.check(signal, balance, positions, daily_pnl)
- assert result.allowed is True
-
-
-def test_risk_check_rejects_exceeding_position_size(risk_manager):
- signal = make_signal(quantity="5") # 5 BTC * 50000 = 250000 > 10% of balance
- balance = Decimal("10000")
- positions = {}
- daily_pnl = Decimal("0")
-
- result = risk_manager.check(signal, balance, positions, daily_pnl)
- assert result.allowed is False
- assert "position size" in result.reason.lower()
-
-
-def test_risk_check_rejects_daily_loss_exceeded(risk_manager):
- signal = make_signal()
- balance = Decimal("10000")
- positions = {}
- daily_pnl = Decimal("-1100") # -11% > -10% limit
-
- result = risk_manager.check(signal, balance, positions, daily_pnl)
- assert result.allowed is False
- assert "daily loss" in result.reason.lower()
-
-
-def test_risk_check_rejects_insufficient_balance(risk_manager):
- signal = make_signal(quantity="0.01", price="50000") # cost = 500
- balance = Decimal("100") # not enough
- positions = {}
- daily_pnl = Decimal("0")
-
- result = risk_manager.check(signal, balance, positions, daily_pnl)
- assert result.allowed is False
- assert "balance" in result.reason.lower()
-```
-
-- [ ] **Step 3: Run tests to verify they fail**
-
-```bash
-pip install -e services/order-executor[dev]
-pytest services/order-executor/tests/test_risk_manager.py -v
-```
-
-Expected: FAIL
-
-- [ ] **Step 4: Implement risk_manager**
-
-Create `services/order-executor/src/order_executor/__init__.py`:
-
-```python
-```
-
-Create `services/order-executor/src/order_executor/risk_manager.py`:
-
-```python
-from __future__ import annotations
-
-from dataclasses import dataclass
-from decimal import Decimal
-
-from shared.models import Signal, OrderSide
-
-
-@dataclass
-class RiskCheckResult:
- allowed: bool
- reason: str = ""
-
-
-class RiskManager:
- def __init__(
- self,
- max_position_size: Decimal,
- stop_loss_pct: Decimal,
- daily_loss_limit_pct: Decimal,
- ):
- self._max_position_size = max_position_size
- self._stop_loss_pct = stop_loss_pct
- self._daily_loss_limit_pct = daily_loss_limit_pct
-
- def check(
- self,
- signal: Signal,
- balance: Decimal,
- positions: dict[str, Decimal],
- daily_pnl: Decimal,
- ) -> RiskCheckResult:
- # Check daily loss limit
- daily_loss_pct = (daily_pnl / balance) * 100 if balance > 0 else Decimal("0")
- if daily_loss_pct < -self._daily_loss_limit_pct:
- return RiskCheckResult(
- allowed=False,
- reason=f"Daily loss limit exceeded: {daily_loss_pct:.1f}%",
- )
-
- if signal.side == OrderSide.BUY:
- order_cost = signal.price * signal.quantity
-
- # Check sufficient balance
- if order_cost > balance:
- return RiskCheckResult(
- allowed=False,
- reason=f"Insufficient balance: need {order_cost}, have {balance}",
- )
-
- # Check max position size
- current_position_value = positions.get(signal.symbol, Decimal("0")) * signal.price
- new_position_value = current_position_value + order_cost
- position_ratio = new_position_value / balance if balance > 0 else Decimal("999")
- if position_ratio > self._max_position_size:
- return RiskCheckResult(
- allowed=False,
- reason=f"Position size exceeded: {position_ratio:.2f} > {self._max_position_size}",
- )
-
- return RiskCheckResult(allowed=True)
-```
-
-- [ ] **Step 5: Run tests to verify they pass**
-
-```bash
-pytest services/order-executor/tests/test_risk_manager.py -v
-```
-
-Expected: All PASS
-
-- [ ] **Step 6: Write failing tests for executor**
-
-Create `services/order-executor/tests/test_executor.py`:
-
-```python
-import pytest
-from unittest.mock import AsyncMock, MagicMock
-from decimal import Decimal
-
-from shared.models import Signal, OrderSide, OrderStatus
-from order_executor.executor import OrderExecutor
-from order_executor.risk_manager import RiskCheckResult
-
-
-def make_signal() -> Signal:
- return Signal(
- strategy="test",
- symbol="BTCUSDT",
- side=OrderSide.BUY,
- price=Decimal("50000"),
- quantity=Decimal("0.01"),
- reason="test",
- )
-
-
-@pytest.mark.asyncio
-async def test_executor_places_order_when_risk_passes():
- mock_exchange = MagicMock()
- mock_exchange.create_order = AsyncMock(return_value={
- "id": "binance_123",
- "status": "closed",
- "filled": 0.01,
- "price": 50000,
- })
- mock_exchange.fetch_balance = AsyncMock(return_value={
- "USDT": {"free": 10000},
- })
-
- mock_risk = MagicMock()
- mock_risk.check.return_value = RiskCheckResult(allowed=True)
-
- mock_broker = AsyncMock()
- mock_db = AsyncMock()
-
- executor = OrderExecutor(
- exchange=mock_exchange,
- risk_manager=mock_risk,
- broker=mock_broker,
- db=mock_db,
- dry_run=False,
- )
-
- signal = make_signal()
- order = await executor.execute(signal)
-
- assert order is not None
- assert order.status == OrderStatus.FILLED
- mock_exchange.create_order.assert_called_once()
-
-
-@pytest.mark.asyncio
-async def test_executor_rejects_when_risk_fails():
- mock_exchange = MagicMock()
- mock_exchange.fetch_balance = AsyncMock(return_value={
- "USDT": {"free": 10000},
- })
-
- mock_risk = MagicMock()
- mock_risk.check.return_value = RiskCheckResult(allowed=False, reason="too risky")
-
- mock_broker = AsyncMock()
- mock_db = AsyncMock()
-
- executor = OrderExecutor(
- exchange=mock_exchange,
- risk_manager=mock_risk,
- broker=mock_broker,
- db=mock_db,
- dry_run=False,
- )
-
- signal = make_signal()
- order = await executor.execute(signal)
- assert order is None
- mock_exchange.create_order.assert_not_called()
-
-
-@pytest.mark.asyncio
-async def test_executor_dry_run_does_not_call_exchange():
- mock_exchange = MagicMock()
- mock_exchange.fetch_balance = AsyncMock(return_value={
- "USDT": {"free": 10000},
- })
-
- mock_risk = MagicMock()
- mock_risk.check.return_value = RiskCheckResult(allowed=True)
-
- mock_broker = AsyncMock()
- mock_db = AsyncMock()
-
- executor = OrderExecutor(
- exchange=mock_exchange,
- risk_manager=mock_risk,
- broker=mock_broker,
- db=mock_db,
- dry_run=True,
- )
-
- signal = make_signal()
- order = await executor.execute(signal)
-
- assert order is not None
- assert order.status == OrderStatus.FILLED
- mock_exchange.create_order.assert_not_called()
-```
-
-- [ ] **Step 7: Run tests to verify they fail**
-
-```bash
-pytest services/order-executor/tests/test_executor.py -v
-```
-
-Expected: FAIL
-
-- [ ] **Step 8: Implement executor**
-
-Create `services/order-executor/src/order_executor/executor.py`:
-
-```python
-from __future__ import annotations
-
-import logging
-from datetime import datetime, timezone
-from decimal import Decimal
-
-from shared.broker import RedisBroker
-from shared.db import Database
-from shared.events import OrderEvent
-from shared.models import Order, OrderSide, OrderStatus, OrderType, Signal
-from order_executor.risk_manager import RiskManager
-
-logger = logging.getLogger(__name__)
-
-
-class OrderExecutor:
- def __init__(
- self,
- exchange,
- risk_manager: RiskManager,
- broker: RedisBroker,
- db: Database,
- dry_run: bool = True,
- ):
- self._exchange = exchange
- self._risk = risk_manager
- self._broker = broker
- self._db = db
- self._dry_run = dry_run
-
- async def execute(self, signal: Signal) -> Order | None:
- balance_info = await self._exchange.fetch_balance()
- balance = Decimal(str(balance_info.get("USDT", {}).get("free", 0)))
- positions: dict[str, Decimal] = {}
- daily_pnl = Decimal("0")
-
- result = self._risk.check(signal, balance, positions, daily_pnl)
- if not result.allowed:
- logger.warning(f"Risk check failed: {result.reason}")
- return None
-
- order = Order(
- signal_id=signal.id,
- symbol=signal.symbol,
- side=signal.side,
- type=OrderType.MARKET,
- price=signal.price,
- quantity=signal.quantity,
- )
-
- if self._dry_run:
- logger.info(f"[DRY RUN] Would execute: {order.side} {order.quantity} {order.symbol}")
- order.status = OrderStatus.FILLED
- order.filled_at = datetime.now(timezone.utc)
- else:
- try:
- result = await self._exchange.create_order(
- symbol=signal.symbol.replace("USDT", "/USDT"),
- type="market",
- side=signal.side.value.lower(),
- amount=float(signal.quantity),
- )
- order.status = OrderStatus.FILLED
- order.filled_at = datetime.now(timezone.utc)
- logger.info(f"Order filled: {order.id}")
- except Exception as e:
- order.status = OrderStatus.FAILED
- logger.error(f"Order failed: {e}")
-
- await self._db.insert_order(order)
- event = OrderEvent(data=order)
- await self._broker.publish("orders", event.to_dict())
-
- return order
-```
-
-- [ ] **Step 9: Run tests to verify they pass**
-
-```bash
-pytest services/order-executor/tests/test_executor.py -v
-```
-
-Expected: All PASS
-
-- [ ] **Step 10: Implement config and main**
-
-Create `services/order-executor/src/order_executor/config.py`:
-
-```python
-from shared.config import Settings
-
-
-class ExecutorConfig(Settings):
- pass
-```
-
-Create `services/order-executor/src/order_executor/main.py`:
-
-```python
-from __future__ import annotations
-
-import asyncio
-import logging
-
-import ccxt.async_support as ccxt
-
-from shared.broker import RedisBroker
-from shared.db import Database
-from shared.events import Event
-from order_executor.config import ExecutorConfig
-from order_executor.executor import OrderExecutor
-from order_executor.risk_manager import RiskManager
-
-logger = logging.getLogger(__name__)
-
-
-async def run():
- config = ExecutorConfig()
- logging.basicConfig(level=config.log_level)
-
- db = Database(config.database_url)
- await db.connect()
-
- broker = RedisBroker(config.redis_url)
-
- exchange = ccxt.binance({
- "apiKey": config.binance_api_key,
- "secret": config.binance_api_secret,
- })
-
- risk_manager = RiskManager(
- max_position_size=config.risk_max_position_size,
- stop_loss_pct=config.risk_stop_loss_pct,
- daily_loss_limit_pct=config.risk_daily_loss_limit_pct,
- )
-
- executor = OrderExecutor(
- exchange=exchange,
- risk_manager=risk_manager,
- broker=broker,
- db=db,
- dry_run=config.dry_run,
- )
-
- logger.info(f"Starting order executor (dry_run={config.dry_run})")
- last_id = "0-0"
- try:
- while True:
- messages = await broker.read("signals", last_id=last_id, count=10, block=1000)
- for msg in messages:
- event = Event.from_dict(msg)
- await executor.execute(event.data)
- finally:
- await exchange.close()
- await broker.close()
- await db.close()
-
-
-def main():
- asyncio.run(run())
-
-
-if __name__ == "__main__":
- main()
-```
-
-- [ ] **Step 11: Create Dockerfile**
-
-Create `services/order-executor/Dockerfile`:
-
-```dockerfile
-FROM python:3.12-slim
-
-WORKDIR /app
-
-COPY shared/ shared/
-RUN pip install --no-cache-dir ./shared
-
-COPY services/order-executor/ services/order-executor/
-RUN pip install --no-cache-dir ./services/order-executor
-
-CMD ["python", "-m", "order_executor.main"]
-```
-
-- [ ] **Step 12: Commit**
-
-```bash
-git add services/order-executor/
-git commit -m "feat(order-executor): add order execution with risk management and dry-run mode"
-```
-
----
-
-## Task 8: Portfolio Manager Service
-
-**Files:**
-- Create: `services/portfolio-manager/pyproject.toml`
-- Create: `services/portfolio-manager/Dockerfile`
-- Create: `services/portfolio-manager/src/portfolio_manager/__init__.py`
-- Create: `services/portfolio-manager/src/portfolio_manager/config.py`
-- Create: `services/portfolio-manager/src/portfolio_manager/portfolio.py`
-- Create: `services/portfolio-manager/src/portfolio_manager/pnl.py`
-- Create: `services/portfolio-manager/src/portfolio_manager/main.py`
-- Create: `services/portfolio-manager/tests/test_portfolio.py`
-- Create: `services/portfolio-manager/tests/test_pnl.py`
-
-- [ ] **Step 1: Create pyproject.toml**
-
-Create `services/portfolio-manager/pyproject.toml`:
-
-```toml
-[project]
-name = "portfolio-manager"
-version = "0.1.0"
-description = "Portfolio tracking and PnL calculation service"
-requires-python = ">=3.12"
-dependencies = [
- "trading-shared",
-]
-
-[project.optional-dependencies]
-dev = [
- "pytest>=8.0",
- "pytest-asyncio>=0.23",
-]
-
-[build-system]
-requires = ["hatchling"]
-build-backend = "hatchling.build"
-
-[tool.hatch.build.targets.wheel]
-packages = ["src/portfolio_manager"]
-```
-
-- [ ] **Step 2: Write failing tests for pnl**
-
-Create `services/portfolio-manager/tests/test_pnl.py`:
-
-```python
-from decimal import Decimal
-
-from portfolio_manager.pnl import calculate_unrealized_pnl, calculate_realized_pnl
-
-
-def test_unrealized_pnl_profit():
- result = calculate_unrealized_pnl(
- quantity=Decimal("0.1"),
- avg_entry_price=Decimal("50000"),
- current_price=Decimal("55000"),
- )
- assert result == Decimal("500") # 0.1 * (55000 - 50000)
-
-
-def test_unrealized_pnl_loss():
- result = calculate_unrealized_pnl(
- quantity=Decimal("0.1"),
- avg_entry_price=Decimal("50000"),
- current_price=Decimal("45000"),
- )
- assert result == Decimal("-500")
-
-
-def test_realized_pnl_single_trade():
- result = calculate_realized_pnl(
- buy_price=Decimal("50000"),
- sell_price=Decimal("55000"),
- quantity=Decimal("0.1"),
- fee=Decimal("5.5"),
- )
- assert result == Decimal("494.5") # 0.1 * (55000 - 50000) - 5.5
-```
-
-- [ ] **Step 3: Run tests to verify they fail**
-
-```bash
-pip install -e services/portfolio-manager[dev]
-pytest services/portfolio-manager/tests/test_pnl.py -v
-```
-
-Expected: FAIL
-
-- [ ] **Step 4: Implement pnl**
-
-Create `services/portfolio-manager/src/portfolio_manager/__init__.py`:
-
-```python
-```
-
-Create `services/portfolio-manager/src/portfolio_manager/pnl.py`:
-
-```python
-from decimal import Decimal
-
-
-def calculate_unrealized_pnl(
- quantity: Decimal,
- avg_entry_price: Decimal,
- current_price: Decimal,
-) -> Decimal:
- return quantity * (current_price - avg_entry_price)
-
-
-def calculate_realized_pnl(
- buy_price: Decimal,
- sell_price: Decimal,
- quantity: Decimal,
- fee: Decimal = Decimal("0"),
-) -> Decimal:
- return quantity * (sell_price - buy_price) - fee
-```
-
-- [ ] **Step 5: Run tests to verify they pass**
-
-```bash
-pytest services/portfolio-manager/tests/test_pnl.py -v
-```
-
-Expected: All PASS
-
-- [ ] **Step 6: Write failing tests for portfolio**
-
-Create `services/portfolio-manager/tests/test_portfolio.py`:
-
-```python
-import pytest
-from decimal import Decimal
-
-from shared.models import Order, OrderSide, OrderType, OrderStatus
-from portfolio_manager.portfolio import PortfolioTracker
-
-
-@pytest.fixture
-def tracker():
- return PortfolioTracker()
-
-
-def make_order(side=OrderSide.BUY, price="50000", quantity="0.1") -> Order:
- return Order(
- signal_id="sig_1",
- symbol="BTCUSDT",
- side=side,
- type=OrderType.MARKET,
- price=Decimal(price),
- quantity=Decimal(quantity),
- status=OrderStatus.FILLED,
- )
-
-
-def test_portfolio_add_buy_order(tracker):
- order = make_order(side=OrderSide.BUY)
- tracker.apply_order(order)
-
- pos = tracker.get_position("BTCUSDT")
- assert pos.quantity == Decimal("0.1")
- assert pos.avg_entry_price == Decimal("50000")
-
-
-def test_portfolio_add_multiple_buys(tracker):
- tracker.apply_order(make_order(price="50000", quantity="0.1"))
- tracker.apply_order(make_order(price="52000", quantity="0.1"))
-
- pos = tracker.get_position("BTCUSDT")
- assert pos.quantity == Decimal("0.2")
- assert pos.avg_entry_price == Decimal("51000") # weighted avg
-
-
-def test_portfolio_sell_reduces_position(tracker):
- tracker.apply_order(make_order(side=OrderSide.BUY, price="50000", quantity="0.2"))
- tracker.apply_order(make_order(side=OrderSide.SELL, price="55000", quantity="0.1"))
-
- pos = tracker.get_position("BTCUSDT")
- assert pos.quantity == Decimal("0.1")
- assert pos.avg_entry_price == Decimal("50000") # entry price unchanged
-
-
-def test_portfolio_no_position_returns_none(tracker):
- pos = tracker.get_position("ETHUSDT")
- assert pos is None
-```
-
-- [ ] **Step 7: Run tests to verify they fail**
-
-```bash
-pytest services/portfolio-manager/tests/test_portfolio.py -v
-```
-
-Expected: FAIL
-
-- [ ] **Step 8: Implement portfolio**
-
-Create `services/portfolio-manager/src/portfolio_manager/portfolio.py`:
-
-```python
-from __future__ import annotations
-
-from decimal import Decimal
-
-from shared.models import Order, OrderSide, Position
-
-
-class PortfolioTracker:
- def __init__(self):
- self._positions: dict[str, _PositionState] = {}
-
- def apply_order(self, order: Order) -> None:
- if order.symbol not in self._positions:
- self._positions[order.symbol] = _PositionState()
-
- state = self._positions[order.symbol]
- if order.side == OrderSide.BUY:
- total_cost = state.avg_entry * state.quantity + order.price * order.quantity
- state.quantity += order.quantity
- state.avg_entry = total_cost / state.quantity if state.quantity > 0 else Decimal("0")
- elif order.side == OrderSide.SELL:
- state.quantity -= order.quantity
- if state.quantity <= 0:
- state.quantity = Decimal("0")
- state.avg_entry = Decimal("0")
-
- def get_position(self, symbol: str) -> Position | None:
- state = self._positions.get(symbol)
- if state is None or state.quantity == 0:
- return None
- return Position(
- symbol=symbol,
- quantity=state.quantity,
- avg_entry_price=state.avg_entry,
- current_price=Decimal("0"),
- )
-
- def get_all_positions(self) -> list[Position]:
- positions = []
- for symbol in self._positions:
- pos = self.get_position(symbol)
- if pos is not None:
- positions.append(pos)
- return positions
-
-
-class _PositionState:
- def __init__(self):
- self.quantity = Decimal("0")
- self.avg_entry = Decimal("0")
-```
-
-- [ ] **Step 9: Run tests to verify they pass**
-
-```bash
-pytest services/portfolio-manager/tests/test_portfolio.py -v
-```
-
-Expected: All PASS
-
-- [ ] **Step 10: Implement config and main**
-
-Create `services/portfolio-manager/src/portfolio_manager/config.py`:
-
-```python
-from shared.config import Settings
-
-
-class PortfolioConfig(Settings):
- snapshot_interval_hours: int = 24
-```
-
-Create `services/portfolio-manager/src/portfolio_manager/main.py`:
-
-```python
-from __future__ import annotations
-
-import asyncio
-import logging
-
-from shared.broker import RedisBroker
-from shared.db import Database
-from shared.events import Event
-from portfolio_manager.config import PortfolioConfig
-from portfolio_manager.portfolio import PortfolioTracker
-
-logger = logging.getLogger(__name__)
-
-
-async def run():
- config = PortfolioConfig()
- logging.basicConfig(level=config.log_level)
-
- db = Database(config.database_url)
- await db.connect()
-
- broker = RedisBroker(config.redis_url)
- tracker = PortfolioTracker()
-
- logger.info("Starting portfolio manager")
- last_id = "0-0"
- try:
- while True:
- messages = await broker.read("orders", last_id=last_id, count=10, block=1000)
- for msg in messages:
- event = Event.from_dict(msg)
- order = event.data
- tracker.apply_order(order)
- logger.info(f"Position updated: {order.symbol}")
- finally:
- await broker.close()
- await db.close()
-
-
-def main():
- asyncio.run(run())
-
-
-if __name__ == "__main__":
- main()
-```
-
-- [ ] **Step 11: Create Dockerfile**
-
-Create `services/portfolio-manager/Dockerfile`:
-
-```dockerfile
-FROM python:3.12-slim
-
-WORKDIR /app
-
-COPY shared/ shared/
-RUN pip install --no-cache-dir ./shared
-
-COPY services/portfolio-manager/ services/portfolio-manager/
-RUN pip install --no-cache-dir ./services/portfolio-manager
-
-CMD ["python", "-m", "portfolio_manager.main"]
-```
-
-- [ ] **Step 12: Commit**
-
-```bash
-git add services/portfolio-manager/
-git commit -m "feat(portfolio-manager): add portfolio tracking and PnL calculation"
-```
-
----
-
-## Task 9: Backtester Service
-
-**Files:**
-- Create: `services/backtester/pyproject.toml`
-- Create: `services/backtester/Dockerfile`
-- Create: `services/backtester/src/backtester/__init__.py`
-- Create: `services/backtester/src/backtester/config.py`
-- Create: `services/backtester/src/backtester/simulator.py`
-- Create: `services/backtester/src/backtester/engine.py`
-- Create: `services/backtester/src/backtester/reporter.py`
-- Create: `services/backtester/src/backtester/main.py`
-- Create: `services/backtester/tests/test_simulator.py`
-- Create: `services/backtester/tests/test_engine.py`
-- Create: `services/backtester/tests/test_reporter.py`
-
-- [ ] **Step 1: Create pyproject.toml**
-
-Create `services/backtester/pyproject.toml`:
-
-```toml
-[project]
-name = "backtester"
-version = "0.1.0"
-description = "Strategy backtesting engine"
-requires-python = ">=3.12"
-dependencies = [
- "pandas>=2.0",
- "trading-shared",
-]
-
-[project.optional-dependencies]
-dev = [
- "pytest>=8.0",
- "pytest-asyncio>=0.23",
-]
-
-[build-system]
-requires = ["hatchling"]
-build-backend = "hatchling.build"
-
-[tool.hatch.build.targets.wheel]
-packages = ["src/backtester"]
-```
-
-- [ ] **Step 2: Write failing tests for simulator**
-
-Create `services/backtester/tests/test_simulator.py`:
-
-```python
-from decimal import Decimal
-
-from shared.models import Signal, OrderSide
-from backtester.simulator import OrderSimulator
-
-
-def make_signal(side=OrderSide.BUY, price="50000", quantity="0.1") -> Signal:
- return Signal(
- strategy="test",
- symbol="BTCUSDT",
- side=side,
- price=Decimal(price),
- quantity=Decimal(quantity),
- reason="test",
- )
-
-
-def test_simulator_initial_balance():
- sim = OrderSimulator(initial_balance=Decimal("10000"))
- assert sim.balance == Decimal("10000")
-
-
-def test_simulator_buy_reduces_balance():
- sim = OrderSimulator(initial_balance=Decimal("10000"))
- sim.execute(make_signal(side=OrderSide.BUY, price="50000", quantity="0.1"))
-
- assert sim.balance == Decimal("5000") # 10000 - 0.1*50000
- assert sim.positions["BTCUSDT"] == Decimal("0.1")
-
-
-def test_simulator_sell_increases_balance():
- sim = OrderSimulator(initial_balance=Decimal("10000"))
- sim.execute(make_signal(side=OrderSide.BUY, price="50000", quantity="0.1"))
- sim.execute(make_signal(side=OrderSide.SELL, price="55000", quantity="0.1"))
-
- assert sim.balance == Decimal("10500") # 5000 + 0.1*55000
- assert sim.positions.get("BTCUSDT", Decimal("0")) == Decimal("0")
-
-
-def test_simulator_reject_buy_insufficient_balance():
- sim = OrderSimulator(initial_balance=Decimal("100"))
- result = sim.execute(make_signal(side=OrderSide.BUY, price="50000", quantity="0.1"))
- assert result is False
- assert sim.balance == Decimal("100")
-
-
-def test_simulator_trade_history():
- sim = OrderSimulator(initial_balance=Decimal("10000"))
- sim.execute(make_signal(side=OrderSide.BUY))
- assert len(sim.trades) == 1
-```
-
-- [ ] **Step 3: Run tests to verify they fail**
-
-```bash
-pip install -e services/backtester[dev]
-pytest services/backtester/tests/test_simulator.py -v
-```
-
-Expected: FAIL
-
-- [ ] **Step 4: Implement simulator**
-
-Create `services/backtester/src/backtester/__init__.py`:
-
-```python
-```
-
-Create `services/backtester/src/backtester/simulator.py`:
-
-```python
-from __future__ import annotations
-
-from dataclasses import dataclass, field
-from decimal import Decimal
-
-from shared.models import Signal, OrderSide
-
-
-@dataclass
-class SimulatedTrade:
- symbol: str
- side: OrderSide
- price: Decimal
- quantity: Decimal
- balance_after: Decimal
-
-
-class OrderSimulator:
- def __init__(self, initial_balance: Decimal):
- self.balance = initial_balance
- self.positions: dict[str, Decimal] = {}
- self.trades: list[SimulatedTrade] = []
-
- def execute(self, signal: Signal) -> bool:
- if signal.side == OrderSide.BUY:
- cost = signal.price * signal.quantity
- if cost > self.balance:
- return False
- self.balance -= cost
- current = self.positions.get(signal.symbol, Decimal("0"))
- self.positions[signal.symbol] = current + signal.quantity
- elif signal.side == OrderSide.SELL:
- current = self.positions.get(signal.symbol, Decimal("0"))
- sell_qty = min(signal.quantity, current)
- if sell_qty <= 0:
- return False
- self.balance += signal.price * sell_qty
- self.positions[signal.symbol] = current - sell_qty
-
- self.trades.append(
- SimulatedTrade(
- symbol=signal.symbol,
- side=signal.side,
- price=signal.price,
- quantity=signal.quantity,
- balance_after=self.balance,
- )
- )
- return True
-```
-
-- [ ] **Step 5: Run tests to verify they pass**
-
-```bash
-pytest services/backtester/tests/test_simulator.py -v
-```
-
-Expected: All PASS
-
-- [ ] **Step 6: Write failing tests for backtest engine**
-
-Create `services/backtester/tests/test_engine.py`:
-
-```python
-import pytest
-from decimal import Decimal
-from datetime import datetime, timezone
-from unittest.mock import MagicMock
-
-from shared.models import Candle, Signal, OrderSide
-from backtester.engine import BacktestEngine
-
-
-def make_candles(prices: list[float]) -> list[Candle]:
- return [
- Candle(
- symbol="BTCUSDT",
- timeframe="1m",
- open_time=datetime(2026, 1, 1, minute=i, tzinfo=timezone.utc),
- open=Decimal(str(p)),
- high=Decimal(str(p + 10)),
- low=Decimal(str(p - 10)),
- close=Decimal(str(p)),
- volume=Decimal("1.0"),
- )
- for i, p in enumerate(prices)
- ]
-
-
-def test_backtest_engine_runs_strategy_over_candles():
- mock_strategy = MagicMock()
- mock_strategy.name = "test"
- mock_strategy.on_candle.return_value = None
-
- candles = make_candles([50000, 50100, 50200])
-
- engine = BacktestEngine(
- strategy=mock_strategy,
- initial_balance=Decimal("10000"),
- )
- result = engine.run(candles)
-
- assert mock_strategy.on_candle.call_count == 3
- assert result.total_trades == 0
- assert result.final_balance == Decimal("10000")
-
-
-def test_backtest_engine_executes_signals():
- buy_signal = Signal(
- strategy="test",
- symbol="BTCUSDT",
- side=OrderSide.BUY,
- price=Decimal("50000"),
- quantity=Decimal("0.1"),
- reason="test buy",
- )
- sell_signal = Signal(
- strategy="test",
- symbol="BTCUSDT",
- side=OrderSide.SELL,
- price=Decimal("55000"),
- quantity=Decimal("0.1"),
- reason="test sell",
- )
-
- mock_strategy = MagicMock()
- mock_strategy.name = "test"
- mock_strategy.on_candle.side_effect = [buy_signal, None, sell_signal]
-
- candles = make_candles([50000, 52000, 55000])
-
- engine = BacktestEngine(
- strategy=mock_strategy,
- initial_balance=Decimal("10000"),
- )
- result = engine.run(candles)
-
- assert result.total_trades == 2
- assert result.final_balance == Decimal("10500") # 10000 - 5000 + 5500
-```
-
-- [ ] **Step 7: Run tests to verify they fail**
-
-```bash
-pytest services/backtester/tests/test_engine.py -v
-```
-
-Expected: FAIL
-
-- [ ] **Step 8: Implement backtest engine**
-
-Create `services/backtester/src/backtester/engine.py`:
-
-```python
-from __future__ import annotations
-
-from dataclasses import dataclass
-from decimal import Decimal
-
-from shared.models import Candle
-from backtester.simulator import OrderSimulator
-from strategies.base import BaseStrategy
-
-
-@dataclass
-class BacktestResult:
- strategy_name: str
- symbol: str
- total_trades: int
- initial_balance: Decimal
- final_balance: Decimal
- profit: Decimal
- profit_pct: Decimal
- trades: list
-
- @property
- def win_rate(self) -> Decimal:
- if self.total_trades == 0:
- return Decimal("0")
- wins = sum(
- 1
- for i in range(0, len(self.trades) - 1, 2)
- if i + 1 < len(self.trades)
- and self.trades[i + 1].balance_after > self.trades[i].balance_after
- )
- pairs = self.total_trades // 2
- return Decimal(str(wins / pairs * 100)) if pairs > 0 else Decimal("0")
-
-
-class BacktestEngine:
- def __init__(self, strategy: BaseStrategy, initial_balance: Decimal):
- self._strategy = strategy
- self._initial_balance = initial_balance
-
- def run(self, candles: list[Candle]) -> BacktestResult:
- simulator = OrderSimulator(self._initial_balance)
- symbol = candles[0].symbol if candles else ""
-
- for candle in candles:
- signal = self._strategy.on_candle(candle)
- if signal is not None:
- simulator.execute(signal)
-
- final = simulator.balance
- # Add value of remaining positions at last candle price
- if candles:
- last_price = candles[-1].close
- for sym, qty in simulator.positions.items():
- final += qty * last_price
-
- profit = final - self._initial_balance
- profit_pct = (profit / self._initial_balance) * 100 if self._initial_balance > 0 else Decimal("0")
-
- return BacktestResult(
- strategy_name=self._strategy.name,
- symbol=symbol,
- total_trades=len(simulator.trades),
- initial_balance=self._initial_balance,
- final_balance=final,
- profit=profit,
- profit_pct=profit_pct,
- trades=simulator.trades,
- )
-```
-
-- [ ] **Step 9: Run tests to verify they pass**
-
-```bash
-pytest services/backtester/tests/test_engine.py -v
-```
-
-Expected: All PASS
-
-- [ ] **Step 10: Write failing tests for reporter**
-
-Create `services/backtester/tests/test_reporter.py`:
-
-```python
-from decimal import Decimal
-
-from backtester.engine import BacktestResult
-from backtester.reporter import format_report
-
-
-def test_format_report_contains_key_metrics():
- result = BacktestResult(
- strategy_name="rsi",
- symbol="BTCUSDT",
- total_trades=10,
- initial_balance=Decimal("10000"),
- final_balance=Decimal("11500"),
- profit=Decimal("1500"),
- profit_pct=Decimal("15"),
- trades=[],
- )
-
- report = format_report(result)
-
- assert "rsi" in report
- assert "BTCUSDT" in report
- assert "10000" in report
- assert "11500" in report
- assert "1500" in report
- assert "15" in report
-```
-
-- [ ] **Step 11: Run test to verify it fails**
-
-```bash
-pytest services/backtester/tests/test_reporter.py -v
-```
-
-Expected: FAIL
-
-- [ ] **Step 12: Implement reporter**
-
-Create `services/backtester/src/backtester/reporter.py`:
-
-```python
-from backtester.engine import BacktestResult
-
-
-def format_report(result: BacktestResult) -> str:
- lines = [
- "=" * 50,
- f" Backtest Report: {result.strategy_name}",
- "=" * 50,
- f" Symbol: {result.symbol}",
- f" Total Trades: {result.total_trades}",
- f" Initial Balance: {result.initial_balance}",
- f" Final Balance: {result.final_balance}",
- f" Profit: {result.profit}",
- f" Profit %: {result.profit_pct:.2f}%",
- f" Win Rate: {result.win_rate:.1f}%",
- "=" * 50,
- ]
- return "\n".join(lines)
-```
-
-- [ ] **Step 13: Run test to verify it passes**
-
-```bash
-pytest services/backtester/tests/test_reporter.py -v
-```
-
-Expected: PASS
-
-- [ ] **Step 14: Implement config and main**
-
-Create `services/backtester/src/backtester/config.py`:
-
-```python
-from shared.config import Settings
-
-
-class BacktestConfig(Settings):
- backtest_initial_balance: float = 10000.0
-```
-
-Create `services/backtester/src/backtester/main.py`:
-
-```python
-from __future__ import annotations
-
-import asyncio
-import logging
-from decimal import Decimal
-from pathlib import Path
-
-from shared.db import Database
-from backtester.config import BacktestConfig
-from backtester.engine import BacktestEngine
-from backtester.reporter import format_report
-
-logger = logging.getLogger(__name__)
-
-
-async def run_backtest(
- strategy_name: str,
- symbol: str,
- timeframe: str,
- initial_balance: Decimal,
- db: Database,
- strategies_dir: Path,
-) -> str:
- from strategy_engine.plugin_loader import load_strategies
-
- strategies = load_strategies(strategies_dir)
- strategy = next((s for s in strategies if s.name == strategy_name), None)
- if strategy is None:
- return f"Strategy '{strategy_name}' not found"
-
- candles_data = await db.get_candles(symbol, timeframe)
- if not candles_data:
- return f"No candle data for {symbol} {timeframe}"
-
- from shared.models import Candle
-
- candles = [Candle(**row) for row in reversed(candles_data)]
-
- engine = BacktestEngine(strategy=strategy, initial_balance=initial_balance)
- result = engine.run(candles)
- return format_report(result)
-```
-
-- [ ] **Step 15: Create Dockerfile**
-
-Create `services/backtester/Dockerfile`:
-
-```dockerfile
-FROM python:3.12-slim
-
-WORKDIR /app
-
-COPY shared/ shared/
-RUN pip install --no-cache-dir ./shared
-
-COPY services/strategy-engine/strategies/ services/strategy-engine/strategies/
-COPY services/backtester/ services/backtester/
-RUN pip install --no-cache-dir ./services/backtester
-
-CMD ["python", "-m", "backtester.main"]
-```
-
-- [ ] **Step 16: Commit**
-
-```bash
-git add services/backtester/
-git commit -m "feat(backtester): add backtesting engine with simulator and reporting"
-```
-
----
-
-## Task 10: CLI
-
-**Files:**
-- Create: `cli/pyproject.toml`
-- Create: `cli/src/trading_cli/__init__.py`
-- Create: `cli/src/trading_cli/main.py`
-- Create: `cli/src/trading_cli/commands/data.py`
-- Create: `cli/src/trading_cli/commands/trade.py`
-- Create: `cli/src/trading_cli/commands/backtest.py`
-- Create: `cli/src/trading_cli/commands/portfolio.py`
-- Create: `cli/src/trading_cli/commands/strategy.py`
-- Create: `cli/src/trading_cli/commands/service.py`
-- Create: `cli/tests/test_cli_data.py`
-
-- [ ] **Step 1: Create pyproject.toml**
-
-Create `cli/pyproject.toml`:
-
-```toml
-[project]
-name = "trading-cli"
-version = "0.1.0"
-description = "CLI interface for the trading platform"
-requires-python = ">=3.12"
-dependencies = [
- "click>=8.0",
- "rich>=13.0",
- "trading-shared",
-]
-
-[project.scripts]
-trading = "trading_cli.main:cli"
-
-[project.optional-dependencies]
-dev = [
- "pytest>=8.0",
- "pytest-asyncio>=0.23",
-]
-
-[build-system]
-requires = ["hatchling"]
-build-backend = "hatchling.build"
-
-[tool.hatch.build.targets.wheel]
-packages = ["src/trading_cli"]
-```
-
-- [ ] **Step 2: Write failing tests for CLI data commands**
-
-Create `cli/tests/test_cli_data.py`:
-
-```python
-from click.testing import CliRunner
-from trading_cli.main import cli
-
-
-def test_cli_help():
- runner = CliRunner()
- result = runner.invoke(cli, ["--help"])
- assert result.exit_code == 0
- assert "trading" in result.output.lower() or "Usage" in result.output
-
-
-def test_cli_data_group():
- runner = CliRunner()
- result = runner.invoke(cli, ["data", "--help"])
- assert result.exit_code == 0
- assert "collect" in result.output
- assert "history" in result.output
-```
-
-- [ ] **Step 3: Run tests to verify they fail**
-
-```bash
-pip install -e cli[dev]
-pytest cli/tests/test_cli_data.py -v
-```
-
-Expected: FAIL
-
-- [ ] **Step 4: Implement CLI main and data commands**
-
-Create `cli/src/trading_cli/__init__.py`:
-
-```python
-```
-
-Create `cli/src/trading_cli/main.py`:
-
-```python
-import click
-
-from trading_cli.commands.data import data
-from trading_cli.commands.trade import trade
-from trading_cli.commands.backtest import backtest
-from trading_cli.commands.portfolio import portfolio
-from trading_cli.commands.strategy import strategy
-from trading_cli.commands.service import service
-
-
-@click.group()
-@click.version_option(version="0.1.0")
-def cli():
- """Trading Platform CLI — Binance spot crypto trading"""
- pass
-
-
-cli.add_command(data)
-cli.add_command(trade)
-cli.add_command(backtest)
-cli.add_command(portfolio)
-cli.add_command(strategy)
-cli.add_command(service)
-```
-
-Create `cli/src/trading_cli/commands/data.py`:
-
-```python
-import asyncio
-
-import click
-
-
-@click.group()
-def data():
- """Data collection commands"""
- pass
-
-
-@data.command()
-@click.option("--symbol", required=True, help="Trading pair (e.g. BTCUSDT)")
-@click.option("--timeframe", default="1m", help="Candle timeframe")
-def collect(symbol: str, timeframe: str):
- """Start real-time data collection"""
- click.echo(f"Starting data collection: {symbol} {timeframe}")
-
- from data_collector.config import CollectorConfig
- from data_collector.main import run
-
- asyncio.run(run())
-
-
-@data.command()
-@click.option("--symbol", required=True, help="Trading pair (e.g. BTCUSDT)")
-@click.option("--timeframe", default="1m", help="Candle timeframe")
-@click.option("--from", "since", required=True, help="Start date (YYYY-MM-DD)")
-@click.option("--limit", default=1000, help="Number of candles")
-def history(symbol: str, timeframe: str, since: str, limit: int):
- """Download historical candle data"""
- click.echo(f"Downloading history: {symbol} {timeframe} from {since} (limit={limit})")
-
- async def _run():
- import ccxt.async_support as ccxt
- from datetime import datetime, timezone
- from shared.broker import RedisBroker
- from shared.config import Settings
- from shared.db import Database
- from data_collector.binance_rest import fetch_historical_candles
- from data_collector.storage import CandleStorage
-
- settings = Settings()
- db = Database(settings.database_url)
- await db.connect()
- await db.init_tables()
- broker = RedisBroker(settings.redis_url)
- storage = CandleStorage(db=db, broker=broker)
-
- exchange = ccxt.binance()
- since_dt = datetime.strptime(since, "%Y-%m-%d").replace(tzinfo=timezone.utc)
- candles = await fetch_historical_candles(
- exchange=exchange,
- symbol=symbol.replace("USDT", "/USDT"),
- timeframe=timeframe,
- since=since_dt,
- limit=limit,
- )
- await storage.store_batch(candles)
- await exchange.close()
- await broker.close()
- await db.close()
- click.echo(f"Downloaded {len(candles)} candles")
-
- asyncio.run(_run())
-
-
-@data.command("list")
-def list_data():
- """List currently collecting symbols"""
- click.echo("Collecting symbols:")
- click.echo(" (Check docker-compose service status)")
-```
-
-- [ ] **Step 5: Implement remaining CLI command stubs**
-
-Create `cli/src/trading_cli/commands/trade.py`:
-
-```python
-import click
-
-
-@click.group()
-def trade():
- """Trading bot commands"""
- pass
-
-
-@trade.command()
-@click.option("--strategy", required=True, help="Strategy name")
-@click.option("--symbol", required=True, help="Trading pair")
-def start(strategy: str, symbol: str):
- """Start a trading bot"""
- click.echo(f"Starting bot: strategy={strategy} symbol={symbol}")
-
-
-@trade.command()
-@click.option("--strategy", required=True, help="Strategy name")
-def stop(strategy: str):
- """Stop a trading bot"""
- click.echo(f"Stopping bot: strategy={strategy}")
-
-
-@trade.command()
-def status():
- """Show running bot status"""
- click.echo("Running bots:")
-
-
-@trade.command("stop-all")
-def stop_all():
- """Emergency stop: stop all bots and cancel all orders"""
- click.confirm("Are you sure you want to stop ALL bots?", abort=True)
- click.echo("Stopping all bots and cancelling open orders...")
-```
-
-Create `cli/src/trading_cli/commands/backtest.py`:
-
-```python
-import asyncio
-from decimal import Decimal
-
-import click
-
-
-@click.group()
-def backtest():
- """Backtesting commands"""
- pass
-
-
-@backtest.command("run")
-@click.option("--strategy", required=True, help="Strategy name")
-@click.option("--symbol", required=True, help="Trading pair")
-@click.option("--from", "since", required=True, help="Start date")
-@click.option("--to", "until", required=True, help="End date")
-@click.option("--balance", default=10000.0, help="Initial balance")
-def run_backtest(strategy: str, symbol: str, since: str, until: str, balance: float):
- """Run a backtest"""
- click.echo(f"Running backtest: {strategy} on {symbol} ({since} ~ {until})")
-
- async def _run():
- from pathlib import Path
- from shared.config import Settings
- from shared.db import Database
- from backtester.main import run_backtest as bt_run
-
- settings = Settings()
- db = Database(settings.database_url)
- await db.connect()
-
- strategies_dir = Path(__file__).parent.parent.parent.parent.parent / "services" / "strategy-engine" / "strategies"
- report = await bt_run(
- strategy_name=strategy,
- symbol=symbol,
- timeframe="1m",
- initial_balance=Decimal(str(balance)),
- db=db,
- strategies_dir=strategies_dir,
- )
- click.echo(report)
- await db.close()
-
- asyncio.run(_run())
-
-
-@backtest.command()
-@click.option("--id", "report_id", default="latest", help="Report ID")
-def report(report_id: str):
- """Show backtest report"""
- click.echo(f"Showing report: {report_id}")
-```
-
-Create `cli/src/trading_cli/commands/portfolio.py`:
-
-```python
-import click
-
-
-@click.group()
-def portfolio():
- """Portfolio commands"""
- pass
-
-
-@portfolio.command()
-def show():
- """Show current portfolio"""
- click.echo("Current Portfolio:")
- click.echo(" (Connect to portfolio-manager service)")
-
-
-@portfolio.command()
-@click.option("--days", default=30, help="Number of days")
-def history(days: int):
- """Show PnL history"""
- click.echo(f"PnL history (last {days} days):")
-```
-
-Create `cli/src/trading_cli/commands/strategy.py`:
-
-```python
-from pathlib import Path
-
-import click
-
-
-@click.group()
-def strategy():
- """Strategy management commands"""
- pass
-
-
-@strategy.command("list")
-def list_strategies():
- """List available strategies"""
- from strategy_engine.plugin_loader import load_strategies
-
- strategies_dir = Path(__file__).parent.parent.parent.parent.parent / "services" / "strategy-engine" / "strategies"
- strategies = load_strategies(strategies_dir)
- click.echo("Available strategies:")
- for s in strategies:
- click.echo(f" - {s.name}")
-
-
-@strategy.command()
-@click.option("--name", required=True, help="Strategy name")
-def info(name: str):
- """Show strategy details"""
- click.echo(f"Strategy: {name}")
-```
-
-Create `cli/src/trading_cli/commands/service.py`:
-
-```python
-import subprocess
-
-import click
-
-
-@click.group()
-def service():
- """Service management commands"""
- pass
-
-
-@service.command()
-def up():
- """Start all services"""
- click.echo("Starting all services...")
- subprocess.run(["docker", "compose", "up", "-d"], check=True)
-
-
-@service.command()
-def down():
- """Stop all services"""
- click.echo("Stopping all services...")
- subprocess.run(["docker", "compose", "down"], check=True)
-
-
-@service.command()
-@click.option("--name", required=True, help="Service name")
-def logs(name: str):
- """Show service logs"""
- subprocess.run(["docker", "compose", "logs", "-f", name])
-```
-
-- [ ] **Step 6: Run tests to verify they pass**
-
-```bash
-pytest cli/tests/test_cli_data.py -v
-```
-
-Expected: All PASS
-
-- [ ] **Step 7: Commit**
-
-```bash
-git add cli/
-git commit -m "feat(cli): add Click-based CLI with data, trade, backtest, portfolio, strategy, and service commands"
-```
-
----
-
-## Task 11: Integration Verification
-
-- [ ] **Step 1: Run all tests**
-
-```bash
-cd /home/si/Private/repos/trading
-pytest -v
-```
-
-Expected: All tests pass
-
-- [ ] **Step 2: Lint check**
-
-```bash
-ruff check .
-```
-
-Fix any issues found.
-
-- [ ] **Step 3: Verify Docker builds**
-
-```bash
-docker compose build
-```
-
-Expected: All services build successfully
-
-- [ ] **Step 4: Start infrastructure and verify**
-
-```bash
-make infra
-# Wait for healthy status
-docker compose ps
-```
-
-Expected: redis and postgres running and healthy
-
-- [ ] **Step 5: Final commit**
-
-```bash
-git add .
-git commit -m "chore: integration verification — all tests pass, docker builds succeed"
-```
diff --git a/docs/superpowers/plans/2026-04-01-operations-and-strategy-expansion.md b/docs/superpowers/plans/2026-04-01-operations-and-strategy-expansion.md
deleted file mode 100644
index 761a49a..0000000
--- a/docs/superpowers/plans/2026-04-01-operations-and-strategy-expansion.md
+++ /dev/null
@@ -1,4187 +0,0 @@
-# Operations Infrastructure & Strategy Expansion — Implementation Plan
-
-> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
-
-**Goal:** Add production-grade operations infrastructure (SQLAlchemy ORM, Alembic migrations, structlog, Telegram alerts, resilience, Prometheus) and expand the strategy library (MACD, Bollinger, EMA Crossover, VWAP, Volume Profile) with enhanced backtesting metrics.
-
-**Architecture:** Operations-first approach. Migrate the DB layer to SQLAlchemy 2.0 async, add structured logging and Telegram notifications as shared infrastructure, then build resilience and metrics on top. Strategy expansion builds on the stabilized platform with new BaseStrategy.warmup_period contract and YAML config loading.
-
-**Tech Stack:** SQLAlchemy 2.0 async (asyncpg driver), Alembic, structlog, aiohttp (Telegram), prometheus-client, pyyaml, rich, pandas, numpy
-
----
-
-## File Structure
-
-### New Files
-
-| File | Responsibility |
-|------|---------------|
-| `shared/src/shared/sa_models.py` | SQLAlchemy ORM table definitions |
-| `shared/src/shared/logging.py` | structlog setup and Telegram error processor |
-| `shared/src/shared/notifier.py` | TelegramNotifier class |
-| `shared/src/shared/resilience.py` | retry_with_backoff decorator + CircuitBreaker |
-| `shared/src/shared/healthcheck.py` | aiohttp-based /health + /metrics server |
-| `shared/src/shared/metrics.py` | Prometheus metric definitions |
-| `shared/alembic.ini` | Alembic config |
-| `shared/alembic/env.py` | Alembic async environment |
-| `shared/alembic/script.py.mako` | Alembic migration template |
-| `shared/alembic/versions/` | Migration files (auto-generated) |
-| `shared/tests/test_sa_models.py` | SA model tests |
-| `shared/tests/test_logging.py` | structlog setup tests |
-| `shared/tests/test_notifier.py` | TelegramNotifier tests |
-| `shared/tests/test_resilience.py` | retry + circuit breaker tests |
-| `shared/tests/test_healthcheck.py` | Healthcheck server tests |
-| `shared/tests/test_metrics.py` | Prometheus metrics tests |
-| `services/strategy-engine/strategies/config/rsi_strategy.yaml` | RSI params |
-| `services/strategy-engine/strategies/config/grid_strategy.yaml` | Grid params |
-| `services/strategy-engine/strategies/config/macd_strategy.yaml` | MACD params |
-| `services/strategy-engine/strategies/config/bollinger_strategy.yaml` | Bollinger params |
-| `services/strategy-engine/strategies/config/ema_crossover_strategy.yaml` | EMA params |
-| `services/strategy-engine/strategies/config/vwap_strategy.yaml` | VWAP params |
-| `services/strategy-engine/strategies/config/volume_profile_strategy.yaml` | Volume Profile params |
-| `services/strategy-engine/strategies/macd_strategy.py` | MACD strategy |
-| `services/strategy-engine/strategies/bollinger_strategy.py` | Bollinger Bands strategy |
-| `services/strategy-engine/strategies/ema_crossover_strategy.py` | EMA Crossover strategy |
-| `services/strategy-engine/strategies/vwap_strategy.py` | VWAP strategy |
-| `services/strategy-engine/strategies/volume_profile_strategy.py` | Volume Profile strategy |
-| `services/strategy-engine/tests/test_macd_strategy.py` | MACD tests |
-| `services/strategy-engine/tests/test_bollinger_strategy.py` | Bollinger tests |
-| `services/strategy-engine/tests/test_ema_crossover_strategy.py` | EMA Crossover tests |
-| `services/strategy-engine/tests/test_vwap_strategy.py` | VWAP tests |
-| `services/strategy-engine/tests/test_volume_profile_strategy.py` | Volume Profile tests |
-| `services/backtester/src/backtester/metrics.py` | DetailedMetrics + TradeRecord |
-| `services/backtester/tests/test_metrics.py` | Detailed metrics tests |
-| `monitoring/prometheus.yml` | Prometheus scrape config |
-
-### Modified Files
-
-| File | Changes |
-|------|---------|
-| `shared/pyproject.toml` | Add sqlalchemy, alembic, structlog, prometheus-client, pyyaml |
-| `shared/src/shared/config.py` | Add Telegram, health, circuit breaker settings |
-| `shared/src/shared/db.py` | Rewrite to SQLAlchemy async session |
-| `shared/src/shared/__init__.py` | Export new modules |
-| `shared/tests/test_db.py` | Update for SQLAlchemy API |
-| `services/strategy-engine/strategies/base.py` | Add warmup_period abstract property |
-| `services/strategy-engine/strategies/rsi_strategy.py` | Add warmup_period, update for YAML config |
-| `services/strategy-engine/strategies/grid_strategy.py` | Add warmup_period, update for YAML config |
-| `services/strategy-engine/src/strategy_engine/plugin_loader.py` | Add YAML config loading |
-| `services/strategy-engine/src/strategy_engine/main.py` | Use YAML config loader |
-| `services/data-collector/src/data_collector/storage.py` | Use AsyncSession |
-| `services/data-collector/src/data_collector/main.py` | Use structlog, healthcheck, resilience |
-| `services/order-executor/src/order_executor/executor.py` | Use AsyncSession, notifier |
-| `services/order-executor/src/order_executor/main.py` | Use structlog, healthcheck, resilience |
-| `services/portfolio-manager/src/portfolio_manager/main.py` | Use structlog, healthcheck, daily summary |
-| `services/backtester/src/backtester/engine.py` | Compute DetailedMetrics |
-| `services/backtester/src/backtester/simulator.py` | Track entry/exit for TradeRecord |
-| `services/backtester/src/backtester/reporter.py` | Rich table output, CSV/JSON export |
-| `docker-compose.yml` | Add healthcheck endpoints, monitoring profile |
-| `Makefile` | Add migrate, migrate-down, migrate-new targets |
-| `.env.example` | Add Telegram, health, log format vars |
-
----
-
-## Task 1: SQLAlchemy ORM Models + Alembic Setup
-
-**Files:**
-- Create: `shared/src/shared/sa_models.py`
-- Create: `shared/alembic.ini`
-- Create: `shared/alembic/env.py`
-- Create: `shared/alembic/script.py.mako`
-- Modify: `shared/pyproject.toml`
-- Test: `shared/tests/test_sa_models.py`
-
-- [ ] **Step 1: Add dependencies to shared/pyproject.toml**
-
-```toml
-[project]
-name = "trading-shared"
-version = "0.1.0"
-description = "Shared models, events, and utilities for trading platform"
-requires-python = ">=3.12"
-dependencies = [
- "pydantic>=2.0",
- "pydantic-settings>=2.0",
- "redis>=5.0",
- "sqlalchemy[asyncio]>=2.0",
- "asyncpg>=0.29",
- "alembic>=1.13",
- "structlog>=24.0",
- "prometheus-client>=0.20",
- "pyyaml>=6.0",
- "aiohttp>=3.9",
- "rich>=13.0",
-]
-```
-
-- [ ] **Step 2: Write the failing test for SA models**
-
-Create `shared/tests/test_sa_models.py`:
-
-```python
-"""Tests for SQLAlchemy ORM models."""
-from datetime import datetime, timezone
-from decimal import Decimal
-
-from shared.sa_models import (
- Base,
- CandleRow,
- SignalRow,
- OrderRow,
- TradeRow,
- PositionRow,
- PortfolioSnapshotRow,
-)
-
-
-def test_candle_row_table_name():
- assert CandleRow.__tablename__ == "candles"
-
-
-def test_candle_row_columns():
- cols = {c.name for c in CandleRow.__table__.columns}
- assert cols == {"symbol", "timeframe", "open_time", "open", "high", "low", "close", "volume"}
-
-
-def test_signal_row_table_name():
- assert SignalRow.__tablename__ == "signals"
-
-
-def test_signal_row_columns():
- cols = {c.name for c in SignalRow.__table__.columns}
- assert cols == {"id", "strategy", "symbol", "side", "price", "quantity", "reason", "created_at"}
-
-
-def test_order_row_table_name():
- assert OrderRow.__tablename__ == "orders"
-
-
-def test_order_row_columns():
- cols = {c.name for c in OrderRow.__table__.columns}
- assert cols == {
- "id", "signal_id", "symbol", "side", "type", "price",
- "quantity", "status", "created_at", "filled_at",
- }
-
-
-def test_trade_row_table_name():
- assert TradeRow.__tablename__ == "trades"
-
-
-def test_position_row_table_name():
- assert PositionRow.__tablename__ == "positions"
-
-
-def test_portfolio_snapshot_row_table_name():
- assert PortfolioSnapshotRow.__tablename__ == "portfolio_snapshots"
-
-
-def test_base_metadata_has_all_tables():
- table_names = set(Base.metadata.tables.keys())
- assert table_names == {
- "candles", "signals", "orders", "trades", "positions", "portfolio_snapshots",
- }
-```
-
-- [ ] **Step 3: Run test to verify it fails**
-
-Run: `pytest shared/tests/test_sa_models.py -v`
-Expected: FAIL with `ModuleNotFoundError: No module named 'shared.sa_models'`
-
-- [ ] **Step 4: Implement SA models**
-
-Create `shared/src/shared/sa_models.py`:
-
-```python
-"""SQLAlchemy ORM models for the trading platform."""
-from datetime import datetime
-
-from sqlalchemy import (
- DateTime,
- ForeignKey,
- Integer,
- Numeric,
- String,
- Text,
-)
-from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
-
-
-class Base(DeclarativeBase):
- pass
-
-
-class CandleRow(Base):
- __tablename__ = "candles"
-
- symbol: Mapped[str] = mapped_column(String, primary_key=True)
- timeframe: Mapped[str] = mapped_column(String, primary_key=True)
- open_time: Mapped[datetime] = mapped_column(DateTime(timezone=True), primary_key=True)
- open: Mapped[float] = mapped_column(Numeric, nullable=False)
- high: Mapped[float] = mapped_column(Numeric, nullable=False)
- low: Mapped[float] = mapped_column(Numeric, nullable=False)
- close: Mapped[float] = mapped_column(Numeric, nullable=False)
- volume: Mapped[float] = mapped_column(Numeric, nullable=False)
-
-
-class SignalRow(Base):
- __tablename__ = "signals"
-
- id: Mapped[str] = mapped_column(String, primary_key=True)
- strategy: Mapped[str] = mapped_column(String, nullable=False)
- symbol: Mapped[str] = mapped_column(String, nullable=False)
- side: Mapped[str] = mapped_column(String, nullable=False)
- price: Mapped[float] = mapped_column(Numeric, nullable=False)
- quantity: Mapped[float] = mapped_column(Numeric, nullable=False)
- reason: Mapped[str | None] = mapped_column(Text)
- created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
-
-
-class OrderRow(Base):
- __tablename__ = "orders"
-
- id: Mapped[str] = mapped_column(String, primary_key=True)
- signal_id: Mapped[str | None] = mapped_column(String, ForeignKey("signals.id"))
- symbol: Mapped[str] = mapped_column(String, nullable=False)
- side: Mapped[str] = mapped_column(String, nullable=False)
- type: Mapped[str] = mapped_column(String, nullable=False)
- price: Mapped[float] = mapped_column(Numeric, nullable=False)
- quantity: Mapped[float] = mapped_column(Numeric, nullable=False)
- status: Mapped[str] = mapped_column(String, nullable=False, default="PENDING")
- created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
- filled_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
-
-
-class TradeRow(Base):
- __tablename__ = "trades"
-
- id: Mapped[str] = mapped_column(String, primary_key=True)
- order_id: Mapped[str | None] = mapped_column(String, ForeignKey("orders.id"))
- symbol: Mapped[str] = mapped_column(String, nullable=False)
- side: Mapped[str] = mapped_column(String, nullable=False)
- price: Mapped[float] = mapped_column(Numeric, nullable=False)
- quantity: Mapped[float] = mapped_column(Numeric, nullable=False)
- fee: Mapped[float] = mapped_column(Numeric, nullable=False, default=0)
- traded_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
-
-
-class PositionRow(Base):
- __tablename__ = "positions"
-
- symbol: Mapped[str] = mapped_column(String, primary_key=True)
- quantity: Mapped[float] = mapped_column(Numeric, nullable=False)
- avg_entry_price: Mapped[float] = mapped_column(Numeric, nullable=False)
- current_price: Mapped[float] = mapped_column(Numeric, nullable=False)
- updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
-
-
-class PortfolioSnapshotRow(Base):
- __tablename__ = "portfolio_snapshots"
-
- id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
- total_value: Mapped[float] = mapped_column(Numeric, nullable=False)
- realized_pnl: Mapped[float] = mapped_column(Numeric, nullable=False)
- unrealized_pnl: Mapped[float] = mapped_column(Numeric, nullable=False)
- snapshot_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
-```
-
-- [ ] **Step 5: Run test to verify it passes**
-
-Run: `pytest shared/tests/test_sa_models.py -v`
-Expected: All 8 tests PASS
-
-- [ ] **Step 6: Set up Alembic**
-
-Create `shared/alembic.ini`:
-
-```ini
-[alembic]
-script_location = alembic
-sqlalchemy.url = postgresql+asyncpg://trading:trading@localhost:5432/trading
-
-[loggers]
-keys = root,sqlalchemy,alembic
-
-[handlers]
-keys = console
-
-[formatters]
-keys = generic
-
-[logger_root]
-level = WARN
-handlers = console
-
-[logger_sqlalchemy]
-level = WARN
-handlers =
-qualname = sqlalchemy.engine
-
-[logger_alembic]
-level = INFO
-handlers =
-qualname = alembic
-
-[handler_console]
-class = StreamHandler
-args = (sys.stderr,)
-level = NOTSET
-formatter = generic
-
-[formatter_generic]
-format = %(levelname)-5.5s [%(name)s] %(message)s
-datefmt = %H:%M:%S
-```
-
-Create `shared/alembic/script.py.mako`:
-
-```mako
-"""${message}
-
-Revision ID: ${up_revision}
-Revises: ${down_revision | comma,n}
-Create Date: ${create_date}
-"""
-from typing import Sequence, Union
-
-from alembic import op
-import sqlalchemy as sa
-${imports if imports else ""}
-
-# revision identifiers, used by Alembic.
-revision: str = ${repr(up_revision)}
-down_revision: Union[str, None] = ${repr(down_revision)}
-branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
-depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
-
-
-def upgrade() -> None:
- ${upgrades if upgrades else "pass"}
-
-
-def downgrade() -> None:
- ${downgrades if downgrades else "pass"}
-```
-
-Create `shared/alembic/env.py`:
-
-```python
-"""Alembic environment configuration for async SQLAlchemy."""
-import asyncio
-import os
-from logging.config import fileConfig
-
-from alembic import context
-from sqlalchemy import pool
-from sqlalchemy.ext.asyncio import async_engine_from_config
-
-from shared.sa_models import Base
-
-config = context.config
-
-if config.config_file_name is not None:
- fileConfig(config.config_file_name)
-
-target_metadata = Base.metadata
-
-# Override URL from environment if available
-database_url = os.environ.get("DATABASE_URL")
-if database_url:
- # Ensure async driver prefix
- if database_url.startswith("postgresql://"):
- database_url = database_url.replace("postgresql://", "postgresql+asyncpg://", 1)
- config.set_main_option("sqlalchemy.url", database_url)
-
-
-def run_migrations_offline() -> None:
- url = config.get_main_option("sqlalchemy.url")
- context.configure(url=url, target_metadata=target_metadata, literal_binds=True)
- with context.begin_transaction():
- context.run_migrations()
-
-
-def do_run_migrations(connection):
- context.configure(connection=connection, target_metadata=target_metadata)
- with context.begin_transaction():
- context.run_migrations()
-
-
-async def run_async_migrations() -> None:
- connectable = async_engine_from_config(
- config.get_section(config.config_ini_section, {}),
- prefix="sqlalchemy.",
- poolclass=pool.NullPool,
- )
- async with connectable.connect() as connection:
- await connection.run_sync(do_run_migrations)
- await connectable.dispose()
-
-
-def run_migrations_online() -> None:
- asyncio.run(run_async_migrations())
-
-
-if context.is_offline_mode():
- run_migrations_offline()
-else:
- run_migrations_online()
-```
-
-Create empty `shared/alembic/versions/` directory (with `.gitkeep`).
-
-- [ ] **Step 7: Add Makefile targets**
-
-Append to `Makefile`:
-
-```makefile
-migrate:
- cd shared && alembic upgrade head
-
-migrate-down:
- cd shared && alembic downgrade -1
-
-migrate-new:
- cd shared && alembic revision --autogenerate -m "$(MSG)"
-```
-
-- [ ] **Step 8: Commit**
-
-```bash
-git add shared/src/shared/sa_models.py shared/alembic.ini shared/alembic/ \
- shared/tests/test_sa_models.py shared/pyproject.toml Makefile
-git commit -m "feat(shared): add SQLAlchemy ORM models and Alembic setup"
-```
-
----
-
-## Task 2: Rewrite Database Layer to SQLAlchemy Async
-
-**Files:**
-- Modify: `shared/src/shared/db.py`
-- Modify: `shared/tests/test_db.py`
-
-- [ ] **Step 1: Write the failing test for the new DB layer**
-
-Replace `shared/tests/test_db.py`:
-
-```python
-"""Tests for the SQLAlchemy async database layer."""
-from datetime import datetime, timezone
-from decimal import Decimal
-from unittest.mock import AsyncMock, MagicMock, patch
-
-import pytest
-
-from shared.db import Database
-from shared.models import Candle, Signal, OrderSide, Order, OrderType, OrderStatus
-
-
-@pytest.fixture
-def db():
- return Database("postgresql+asyncpg://trading:trading@localhost:5432/trading")
-
-
-def test_database_stores_url(db):
- assert db._database_url == "postgresql+asyncpg://trading:trading@localhost:5432/trading"
-
-
-@pytest.mark.asyncio
-async def test_get_session_returns_async_session(db):
- """Verify get_session is an async context manager (structural test)."""
- # We can't connect without a real DB, but we verify the method exists
- assert hasattr(db, "get_session")
- assert callable(db.get_session)
-
-
-@pytest.mark.asyncio
-async def test_insert_candle_creates_candle_row():
- """Verify insert_candle adds a CandleRow to the session."""
- db = Database("postgresql+asyncpg://test:test@localhost/test")
-
- candle = Candle(
- symbol="BTCUSDT",
- timeframe="1m",
- open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
- open=Decimal("50000"),
- high=Decimal("51000"),
- low=Decimal("49000"),
- close=Decimal("50500"),
- volume=Decimal("100"),
- )
-
- mock_session = AsyncMock()
- mock_session.__aenter__ = AsyncMock(return_value=mock_session)
- mock_session.__aexit__ = AsyncMock(return_value=False)
-
- with patch.object(db, "get_session", return_value=mock_session):
- await db.insert_candle(candle)
-
- mock_session.merge.assert_called_once()
- mock_session.commit.assert_called_once()
-
-
-@pytest.mark.asyncio
-async def test_insert_signal_creates_signal_row():
- db = Database("postgresql+asyncpg://test:test@localhost/test")
-
- signal = Signal(
- strategy="rsi",
- symbol="BTCUSDT",
- side=OrderSide.BUY,
- price=Decimal("50000"),
- quantity=Decimal("0.01"),
- reason="test signal",
- )
-
- mock_session = AsyncMock()
- mock_session.__aenter__ = AsyncMock(return_value=mock_session)
- mock_session.__aexit__ = AsyncMock(return_value=False)
-
- with patch.object(db, "get_session", return_value=mock_session):
- await db.insert_signal(signal)
-
- mock_session.add.assert_called_once()
- mock_session.commit.assert_called_once()
-
-
-@pytest.mark.asyncio
-async def test_insert_order_creates_order_row():
- db = Database("postgresql+asyncpg://test:test@localhost/test")
-
- order = Order(
- signal_id="sig-1",
- symbol="BTCUSDT",
- side=OrderSide.BUY,
- type=OrderType.MARKET,
- price=Decimal("50000"),
- quantity=Decimal("0.01"),
- )
-
- mock_session = AsyncMock()
- mock_session.__aenter__ = AsyncMock(return_value=mock_session)
- mock_session.__aexit__ = AsyncMock(return_value=False)
-
- with patch.object(db, "get_session", return_value=mock_session):
- await db.insert_order(order)
-
- mock_session.add.assert_called_once()
- mock_session.commit.assert_called_once()
-```
-
-- [ ] **Step 2: Run test to verify it fails**
-
-Run: `pytest shared/tests/test_db.py -v`
-Expected: FAIL — old Database class doesn't have `get_session`
-
-- [ ] **Step 3: Rewrite db.py with SQLAlchemy async**
-
-Replace `shared/src/shared/db.py`:
-
-```python
-"""Database layer using SQLAlchemy async for the trading platform."""
-from datetime import datetime, timezone
-from decimal import Decimal
-from typing import Optional
-
-from sqlalchemy import select, update
-from sqlalchemy.ext.asyncio import (
- AsyncSession,
- async_sessionmaker,
- create_async_engine,
-)
-
-from shared.models import Candle, Order, OrderStatus, Signal
-from shared.sa_models import (
- Base,
- CandleRow,
- OrderRow,
- SignalRow,
-)
-
-
-class Database:
- """Async database access layer backed by SQLAlchemy."""
-
- def __init__(self, database_url: str) -> None:
- self._database_url = database_url
- # Ensure async driver prefix
- if self._database_url.startswith("postgresql://"):
- self._database_url = self._database_url.replace(
- "postgresql://", "postgresql+asyncpg://", 1
- )
- self._engine = create_async_engine(self._database_url)
- self._session_factory = async_sessionmaker(self._engine, expire_on_commit=False)
-
- def get_session(self) -> AsyncSession:
- """Return a new AsyncSession."""
- return self._session_factory()
-
- async def connect(self) -> None:
- """Create all tables (for dev/test — prefer Alembic in production)."""
- async with self._engine.begin() as conn:
- await conn.run_sync(Base.metadata.create_all)
-
- async def close(self) -> None:
- """Dispose of the engine."""
- await self._engine.dispose()
-
- # Alias for backward compatibility
- async def init_tables(self) -> None:
- await self.connect()
-
- async def insert_candle(self, candle: Candle) -> None:
- """Upsert a candle row using merge."""
- async with self.get_session() as session:
- row = CandleRow(
- symbol=candle.symbol,
- timeframe=candle.timeframe,
- open_time=candle.open_time,
- open=candle.open,
- high=candle.high,
- low=candle.low,
- close=candle.close,
- volume=candle.volume,
- )
- await session.merge(row)
- await session.commit()
-
- async def insert_signal(self, signal: Signal) -> None:
- """Insert a signal row."""
- async with self.get_session() as session:
- row = SignalRow(
- id=signal.id,
- strategy=signal.strategy,
- symbol=signal.symbol,
- side=signal.side.value,
- price=signal.price,
- quantity=signal.quantity,
- reason=signal.reason,
- created_at=signal.created_at,
- )
- session.add(row)
- await session.commit()
-
- async def insert_order(self, order: Order) -> None:
- """Insert an order row."""
- async with self.get_session() as session:
- row = OrderRow(
- id=order.id,
- signal_id=order.signal_id,
- symbol=order.symbol,
- side=order.side.value,
- type=order.type.value,
- price=order.price,
- quantity=order.quantity,
- status=order.status.value,
- created_at=order.created_at,
- filled_at=order.filled_at,
- )
- session.add(row)
- await session.commit()
-
- async def update_order_status(
- self,
- order_id: str,
- status: OrderStatus,
- filled_at: Optional[datetime] = None,
- ) -> None:
- """Update the status of an order."""
- async with self.get_session() as session:
- stmt = (
- update(OrderRow)
- .where(OrderRow.id == order_id)
- .values(status=status.value, filled_at=filled_at)
- )
- await session.execute(stmt)
- await session.commit()
-
- async def get_candles(
- self, symbol: str, timeframe: str, limit: int = 500
- ) -> list[dict]:
- """Retrieve candles ordered by open_time descending."""
- async with self.get_session() as session:
- stmt = (
- select(CandleRow)
- .where(CandleRow.symbol == symbol, CandleRow.timeframe == timeframe)
- .order_by(CandleRow.open_time.desc())
- .limit(limit)
- )
- result = await session.execute(stmt)
- rows = result.scalars().all()
- return [
- {
- "symbol": r.symbol,
- "timeframe": r.timeframe,
- "open_time": r.open_time,
- "open": r.open,
- "high": r.high,
- "low": r.low,
- "close": r.close,
- "volume": r.volume,
- }
- for r in rows
- ]
-```
-
-- [ ] **Step 4: Run test to verify it passes**
-
-Run: `pytest shared/tests/test_db.py -v`
-Expected: All tests PASS
-
-- [ ] **Step 5: Commit**
-
-```bash
-git add shared/src/shared/db.py shared/tests/test_db.py
-git commit -m "refactor(shared): rewrite db layer to SQLAlchemy 2.0 async"
-```
-
----
-
-## Task 3: Structured Logging with structlog
-
-**Files:**
-- Create: `shared/src/shared/logging.py`
-- Test: `shared/tests/test_logging.py`
-
-- [ ] **Step 1: Write the failing test**
-
-Create `shared/tests/test_logging.py`:
-
-```python
-"""Tests for structured logging setup."""
-import logging
-
-import structlog
-
-from shared.logging import setup_logging
-
-
-def test_setup_logging_returns_logger():
- logger = setup_logging("test-service", "INFO")
- assert logger is not None
-
-
-def test_setup_logging_binds_service_name():
- logger = setup_logging("data-collector", "INFO")
- # structlog loggers have _context with bound values
- assert logger._context.get("service") == "data-collector"
-
-
-def test_setup_logging_sets_log_level():
- setup_logging("test-service", "DEBUG")
- root = logging.getLogger()
- assert root.level == logging.DEBUG
-
-
-def test_setup_logging_json_format(capsys):
- logger = setup_logging("test-service", "INFO", log_format="json")
- logger.info("test_event", key="value")
- captured = capsys.readouterr()
- assert "test_event" in captured.out or "test_event" in captured.err
-
-
-def test_setup_logging_console_format(capsys):
- logger = setup_logging("test-service", "INFO", log_format="console")
- logger.info("test_event", key="value")
- captured = capsys.readouterr()
- assert "test_event" in captured.out or "test_event" in captured.err
-```
-
-- [ ] **Step 2: Run test to verify it fails**
-
-Run: `pytest shared/tests/test_logging.py -v`
-Expected: FAIL with `ModuleNotFoundError: No module named 'shared.logging'`
-
-- [ ] **Step 3: Implement structured logging**
-
-Create `shared/src/shared/logging.py`:
-
-```python
-"""Structured logging setup using structlog."""
-import logging
-import sys
-
-import structlog
-
-
-def setup_logging(
- service_name: str,
- log_level: str = "INFO",
- log_format: str = "json",
-) -> structlog.stdlib.BoundLogger:
- """Configure structlog for the given service.
-
- Args:
- service_name: Bound to every log entry as 'service'.
- log_level: Python log level string (DEBUG, INFO, WARNING, ERROR).
- log_format: 'json' for production, 'console' for development.
-
- Returns:
- A bound structlog logger with service context.
- """
- # Set stdlib root logger level
- logging.basicConfig(
- format="%(message)s",
- stream=sys.stdout,
- level=getattr(logging, log_level.upper(), logging.INFO),
- force=True,
- )
-
- shared_processors: list[structlog.types.Processor] = [
- structlog.contextvars.merge_contextvars,
- structlog.stdlib.add_log_level,
- structlog.stdlib.add_logger_name,
- structlog.processors.TimeStamper(fmt="iso"),
- structlog.processors.StackInfoRenderer(),
- structlog.processors.UnicodeDecoder(),
- ]
-
- if log_format == "console":
- renderer = structlog.dev.ConsoleRenderer()
- else:
- renderer = structlog.processors.JSONRenderer()
-
- structlog.configure(
- processors=[
- *shared_processors,
- structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
- ],
- logger_factory=structlog.stdlib.LoggerFactory(),
- wrapper_class=structlog.stdlib.BoundLogger,
- cache_logger_on_first_use=True,
- )
-
- # Also configure the formatter for stdlib loggers
- formatter = structlog.stdlib.ProcessorFormatter(
- processors=[
- structlog.stdlib.ProcessorFormatter.remove_processors_meta,
- renderer,
- ],
- )
-
- root = logging.getLogger()
- for handler in root.handlers:
- handler.setFormatter(formatter)
-
- return structlog.get_logger().bind(service=service_name)
-```
-
-- [ ] **Step 4: Run test to verify it passes**
-
-Run: `pytest shared/tests/test_logging.py -v`
-Expected: All 5 tests PASS
-
-- [ ] **Step 5: Commit**
-
-```bash
-git add shared/src/shared/logging.py shared/tests/test_logging.py
-git commit -m "feat(shared): add structlog-based structured logging"
-```
-
----
-
-## Task 4: Telegram Notification Service
-
-**Files:**
-- Create: `shared/src/shared/notifier.py`
-- Modify: `shared/src/shared/config.py`
-- Modify: `.env.example`
-- Test: `shared/tests/test_notifier.py`
-
-- [ ] **Step 1: Update config.py with Telegram settings**
-
-Add to `shared/src/shared/config.py` after `dry_run`:
-
-```python
- # Telegram
- telegram_bot_token: str = ""
- telegram_chat_id: str = ""
- telegram_enabled: bool = False
- # Logging
- log_format: str = "json"
- # Health
- health_port: int = 8080
- # Circuit Breaker
- circuit_breaker_threshold: int = 5
- circuit_breaker_timeout: int = 60
-```
-
-- [ ] **Step 2: Update .env.example**
-
-Replace `.env.example`:
-
-```env
-# Exchange
-BINANCE_API_KEY=
-BINANCE_API_SECRET=
-
-# Infrastructure
-REDIS_URL=redis://localhost:6379
-DATABASE_URL=postgresql+asyncpg://trading:trading@localhost:5432/trading
-
-# Logging
-LOG_LEVEL=INFO
-LOG_FORMAT=json
-
-# Telegram
-TELEGRAM_BOT_TOKEN=
-TELEGRAM_CHAT_ID=
-TELEGRAM_ENABLED=false
-
-# Risk Management
-RISK_MAX_POSITION_SIZE=0.1
-RISK_STOP_LOSS_PCT=5
-RISK_DAILY_LOSS_LIMIT_PCT=10
-DRY_RUN=true
-
-# Health & Metrics
-HEALTH_PORT=8080
-CIRCUIT_BREAKER_THRESHOLD=5
-CIRCUIT_BREAKER_TIMEOUT=60
-```
-
-- [ ] **Step 3: Write the failing test for TelegramNotifier**
-
-Create `shared/tests/test_notifier.py`:
-
-```python
-"""Tests for Telegram notification service."""
-from decimal import Decimal
-from unittest.mock import AsyncMock, patch, MagicMock
-
-import pytest
-
-from shared.models import Signal, OrderSide, Order, OrderType, OrderStatus
-from shared.notifier import TelegramNotifier
-
-
-@pytest.fixture
-def notifier():
- return TelegramNotifier(bot_token="test-token", chat_id="12345")
-
-
-def test_notifier_disabled_when_no_token():
- n = TelegramNotifier(bot_token="", chat_id="12345")
- assert n.enabled is False
-
-
-def test_notifier_enabled_with_token():
- n = TelegramNotifier(bot_token="abc", chat_id="12345")
- assert n.enabled is True
-
-
-@pytest.mark.asyncio
-async def test_send_does_nothing_when_disabled():
- n = TelegramNotifier(bot_token="", chat_id="12345")
- # Should not raise
- await n.send("test message")
-
-
-@pytest.mark.asyncio
-async def test_send_posts_to_telegram_api(notifier):
- mock_response = AsyncMock()
- mock_response.status = 200
- mock_response.__aenter__ = AsyncMock(return_value=mock_response)
- mock_response.__aexit__ = AsyncMock(return_value=False)
-
- mock_session = AsyncMock()
- mock_session.post = MagicMock(return_value=mock_response)
-
- notifier._session = mock_session
-
- await notifier.send("Hello")
-
- mock_session.post.assert_called_once()
- call_kwargs = mock_session.post.call_args
- assert "12345" in str(call_kwargs) or "Hello" in str(call_kwargs)
-
-
-@pytest.mark.asyncio
-async def test_send_signal_formats_message(notifier):
- signal = Signal(
- strategy="rsi",
- symbol="BTCUSDT",
- side=OrderSide.BUY,
- price=Decimal("50000"),
- quantity=Decimal("0.01"),
- reason="RSI oversold",
- )
-
- with patch.object(notifier, "send", new_callable=AsyncMock) as mock_send:
- await notifier.send_signal(signal)
- mock_send.assert_called_once()
- msg = mock_send.call_args[0][0]
- assert "BUY" in msg
- assert "BTCUSDT" in msg
- assert "rsi" in msg
-
-
-@pytest.mark.asyncio
-async def test_send_order_formats_message(notifier):
- order = Order(
- signal_id="sig-1",
- symbol="BTCUSDT",
- side=OrderSide.BUY,
- type=OrderType.MARKET,
- price=Decimal("50000"),
- quantity=Decimal("0.01"),
- status=OrderStatus.FILLED,
- )
-
- with patch.object(notifier, "send", new_callable=AsyncMock) as mock_send:
- await notifier.send_order(order)
- mock_send.assert_called_once()
- msg = mock_send.call_args[0][0]
- assert "FILLED" in msg
- assert "BTCUSDT" in msg
-
-
-@pytest.mark.asyncio
-async def test_send_error_formats_message(notifier):
- with patch.object(notifier, "send", new_callable=AsyncMock) as mock_send:
- await notifier.send_error("Connection lost", "data-collector")
- mock_send.assert_called_once()
- msg = mock_send.call_args[0][0]
- assert "Connection lost" in msg
- assert "data-collector" in msg
-```
-
-- [ ] **Step 4: Run test to verify it fails**
-
-Run: `pytest shared/tests/test_notifier.py -v`
-Expected: FAIL with `ModuleNotFoundError: No module named 'shared.notifier'`
-
-- [ ] **Step 5: Implement TelegramNotifier**
-
-Create `shared/src/shared/notifier.py`:
-
-```python
-"""Telegram notification service for trading alerts."""
-import asyncio
-import logging
-from decimal import Decimal
-
-import aiohttp
-
-from shared.models import Order, Signal
-
-logger = logging.getLogger(__name__)
-
-TELEGRAM_API = "https://api.telegram.org/bot{token}/sendMessage"
-
-
-class TelegramNotifier:
- """Sends notifications via Telegram Bot API."""
-
- def __init__(self, bot_token: str, chat_id: str) -> None:
- self._bot_token = bot_token
- self._chat_id = chat_id
- self._session: aiohttp.ClientSession | None = None
- self._semaphore = asyncio.Semaphore(1) # Rate limit: 1 msg at a time
-
- @property
- def enabled(self) -> bool:
- return bool(self._bot_token)
-
- async def _ensure_session(self) -> aiohttp.ClientSession:
- if self._session is None or self._session.closed:
- self._session = aiohttp.ClientSession()
- return self._session
-
- async def send(self, message: str, parse_mode: str = "HTML") -> None:
- """Send a message to the configured Telegram chat."""
- if not self.enabled:
- return
-
- async with self._semaphore:
- url = TELEGRAM_API.format(token=self._bot_token)
- payload = {
- "chat_id": self._chat_id,
- "text": message,
- "parse_mode": parse_mode,
- }
-
- retries = 3
- for attempt in range(retries):
- try:
- session = await self._ensure_session()
- async with session.post(url, json=payload) as resp:
- if resp.status == 200:
- return
- logger.warning(
- "Telegram API returned %d on attempt %d",
- resp.status,
- attempt + 1,
- )
- except Exception as exc:
- logger.warning(
- "Telegram send failed attempt %d: %s", attempt + 1, exc
- )
- if attempt < retries - 1:
- await asyncio.sleep(1)
-
- logger.error("Failed to send Telegram message after %d attempts", retries)
-
- async def send_signal(self, signal: Signal) -> None:
- """Format and send a trading signal notification."""
- msg = (
- f"<b>Signal: {signal.side.value}</b>\n"
- f"Strategy: {signal.strategy}\n"
- f"Symbol: {signal.symbol}\n"
- f"Price: {signal.price}\n"
- f"Quantity: {signal.quantity}\n"
- f"Reason: {signal.reason}"
- )
- await self.send(msg)
-
- async def send_order(self, order: Order) -> None:
- """Format and send an order execution notification."""
- msg = (
- f"<b>Order: {order.status.value}</b>\n"
- f"Symbol: {order.symbol}\n"
- f"Side: {order.side.value}\n"
- f"Price: {order.price}\n"
- f"Quantity: {order.quantity}"
- )
- await self.send(msg)
-
- async def send_error(self, error: str, service: str) -> None:
- """Send an error alert."""
- msg = f"<b>Error in {service}</b>\n{error}"
- await self.send(msg)
-
- async def send_daily_summary(
- self, positions: list, total_value: Decimal, daily_pnl: Decimal
- ) -> None:
- """Send daily portfolio summary."""
- lines = [f"<b>Daily Summary</b>"]
- lines.append(f"Total Value: {total_value:.2f}")
- lines.append(f"Daily PnL: {daily_pnl:.2f}")
- lines.append(f"Open Positions: {len(positions)}")
- for pos in positions:
- lines.append(f" {pos.symbol}: {pos.quantity} @ {pos.avg_entry_price}")
- await self.send("\n".join(lines))
-
- async def close(self) -> None:
- """Close the HTTP session."""
- if self._session and not self._session.closed:
- await self._session.close()
-```
-
-- [ ] **Step 6: Run test to verify it passes**
-
-Run: `pytest shared/tests/test_notifier.py -v`
-Expected: All 7 tests PASS
-
-- [ ] **Step 7: Commit**
-
-```bash
-git add shared/src/shared/notifier.py shared/src/shared/config.py \
- shared/tests/test_notifier.py .env.example
-git commit -m "feat(shared): add Telegram notification service"
-```
-
----
-
-## Task 5: Error Recovery — Retry + Circuit Breaker
-
-**Files:**
-- Create: `shared/src/shared/resilience.py`
-- Test: `shared/tests/test_resilience.py`
-
-- [ ] **Step 1: Write the failing test**
-
-Create `shared/tests/test_resilience.py`:
-
-```python
-"""Tests for retry and circuit breaker."""
-import asyncio
-
-import pytest
-
-from shared.resilience import retry_with_backoff, CircuitBreaker, CircuitState
-
-
-# --- retry_with_backoff tests ---
-
-@pytest.mark.asyncio
-async def test_retry_succeeds_on_first_attempt():
- call_count = 0
-
- @retry_with_backoff(max_retries=3, base_delay=0.01)
- async def succeed():
- nonlocal call_count
- call_count += 1
- return "ok"
-
- result = await succeed()
- assert result == "ok"
- assert call_count == 1
-
-
-@pytest.mark.asyncio
-async def test_retry_succeeds_after_failures():
- call_count = 0
-
- @retry_with_backoff(max_retries=3, base_delay=0.01)
- async def fail_then_succeed():
- nonlocal call_count
- call_count += 1
- if call_count < 3:
- raise ConnectionError("down")
- return "recovered"
-
- result = await fail_then_succeed()
- assert result == "recovered"
- assert call_count == 3
-
-
-@pytest.mark.asyncio
-async def test_retry_raises_after_max_retries():
- @retry_with_backoff(max_retries=2, base_delay=0.01)
- async def always_fail():
- raise ConnectionError("always down")
-
- with pytest.raises(ConnectionError, match="always down"):
- await always_fail()
-
-
-# --- CircuitBreaker tests ---
-
-@pytest.mark.asyncio
-async def test_circuit_breaker_starts_closed():
- cb = CircuitBreaker(failure_threshold=3, recovery_timeout=0.1)
- assert cb.state == CircuitState.CLOSED
-
-
-@pytest.mark.asyncio
-async def test_circuit_breaker_opens_after_threshold():
- cb = CircuitBreaker(failure_threshold=2, recovery_timeout=0.1)
- cb.record_failure()
- assert cb.state == CircuitState.CLOSED
- cb.record_failure()
- assert cb.state == CircuitState.OPEN
-
-
-@pytest.mark.asyncio
-async def test_circuit_breaker_rejects_when_open():
- cb = CircuitBreaker(failure_threshold=1, recovery_timeout=60)
- cb.record_failure()
- assert cb.state == CircuitState.OPEN
- assert cb.allow_request() is False
-
-
-@pytest.mark.asyncio
-async def test_circuit_breaker_half_open_after_timeout():
- cb = CircuitBreaker(failure_threshold=1, recovery_timeout=0.05)
- cb.record_failure()
- assert cb.state == CircuitState.OPEN
- await asyncio.sleep(0.06)
- assert cb.allow_request() is True
- assert cb.state == CircuitState.HALF_OPEN
-
-
-@pytest.mark.asyncio
-async def test_circuit_breaker_closes_on_success():
- cb = CircuitBreaker(failure_threshold=1, recovery_timeout=0.05)
- cb.record_failure()
- await asyncio.sleep(0.06)
- cb.allow_request() # transitions to HALF_OPEN
- cb.record_success()
- assert cb.state == CircuitState.CLOSED
-```
-
-- [ ] **Step 2: Run test to verify it fails**
-
-Run: `pytest shared/tests/test_resilience.py -v`
-Expected: FAIL with `ModuleNotFoundError: No module named 'shared.resilience'`
-
-- [ ] **Step 3: Implement resilience module**
-
-Create `shared/src/shared/resilience.py`:
-
-```python
-"""Retry with backoff and circuit breaker patterns."""
-import asyncio
-import functools
-import logging
-import random
-import time
-from enum import Enum
-from typing import Callable
-
-logger = logging.getLogger(__name__)
-
-
-def retry_with_backoff(
- max_retries: int = 3,
- base_delay: float = 1.0,
- max_delay: float = 60.0,
-) -> Callable:
- """Decorator for async functions that retries with exponential backoff + jitter."""
-
- def decorator(func: Callable) -> Callable:
- @functools.wraps(func)
- async def wrapper(*args, **kwargs):
- last_exc = None
- for attempt in range(max_retries):
- try:
- return await func(*args, **kwargs)
- except Exception as exc:
- last_exc = exc
- if attempt < max_retries - 1:
- delay = min(base_delay * (2**attempt), max_delay)
- jitter = delay * random.uniform(0, 0.5)
- wait = delay + jitter
- logger.warning(
- "Retry %d/%d for %s after %.2fs: %s",
- attempt + 1,
- max_retries,
- func.__name__,
- wait,
- exc,
- )
- await asyncio.sleep(wait)
- raise last_exc
-
- return wrapper
-
- return decorator
-
-
-class CircuitState(Enum):
- CLOSED = "closed"
- OPEN = "open"
- HALF_OPEN = "half_open"
-
-
-class CircuitBreaker:
- """Circuit breaker that opens after consecutive failures."""
-
- def __init__(
- self,
- failure_threshold: int = 5,
- recovery_timeout: float = 60.0,
- ) -> None:
- self._failure_threshold = failure_threshold
- self._recovery_timeout = recovery_timeout
- self._failure_count = 0
- self._state = CircuitState.CLOSED
- self._opened_at: float | None = None
-
- @property
- def state(self) -> CircuitState:
- return self._state
-
- def allow_request(self) -> bool:
- """Check if a request should be allowed."""
- if self._state == CircuitState.CLOSED:
- return True
-
- if self._state == CircuitState.OPEN:
- if self._opened_at and (time.monotonic() - self._opened_at) >= self._recovery_timeout:
- self._state = CircuitState.HALF_OPEN
- return True
- return False
-
- # HALF_OPEN — allow one probe request
- return True
-
- def record_success(self) -> None:
- """Record a successful call."""
- self._failure_count = 0
- self._state = CircuitState.CLOSED
- self._opened_at = None
-
- def record_failure(self) -> None:
- """Record a failed call."""
- self._failure_count += 1
- if self._failure_count >= self._failure_threshold:
- self._state = CircuitState.OPEN
- self._opened_at = time.monotonic()
- logger.error(
- "Circuit breaker OPEN after %d failures", self._failure_count
- )
-```
-
-- [ ] **Step 4: Run test to verify it passes**
-
-Run: `pytest shared/tests/test_resilience.py -v`
-Expected: All 8 tests PASS
-
-- [ ] **Step 5: Commit**
-
-```bash
-git add shared/src/shared/resilience.py shared/tests/test_resilience.py
-git commit -m "feat(shared): add retry with backoff and circuit breaker"
-```
-
----
-
-## Task 6: Health Check + Prometheus Metrics
-
-**Files:**
-- Create: `shared/src/shared/healthcheck.py`
-- Create: `shared/src/shared/metrics.py`
-- Create: `monitoring/prometheus.yml`
-- Modify: `docker-compose.yml`
-- Test: `shared/tests/test_healthcheck.py`
-- Test: `shared/tests/test_metrics.py`
-
-- [ ] **Step 1: Write the failing test for metrics**
-
-Create `shared/tests/test_metrics.py`:
-
-```python
-"""Tests for Prometheus metrics definitions."""
-from shared.metrics import ServiceMetrics
-
-
-def test_service_metrics_creates_counters():
- m = ServiceMetrics("test-service")
- assert m.errors_total is not None
- assert m.events_processed is not None
-
-
-def test_service_metrics_increment_errors():
- m = ServiceMetrics("test-service-2")
- m.errors_total.labels(service="test-service-2", error_type="connection").inc()
- # No assertion needed — prometheus_client raises on invalid labels
-
-
-def test_service_metrics_observe_processing_time():
- m = ServiceMetrics("test-service-3")
- m.processing_seconds.labels(service="test-service-3").observe(0.5)
-```
-
-- [ ] **Step 2: Run test to verify it fails**
-
-Run: `pytest shared/tests/test_metrics.py -v`
-Expected: FAIL with `ModuleNotFoundError: No module named 'shared.metrics'`
-
-- [ ] **Step 3: Implement metrics module**
-
-Create `shared/src/shared/metrics.py`:
-
-```python
-"""Prometheus metric definitions for trading services."""
-from prometheus_client import Counter, Gauge, Histogram
-
-
-class ServiceMetrics:
- """Common Prometheus metrics for any trading service."""
-
- def __init__(self, service_name: str) -> None:
- prefix = service_name.replace("-", "_")
-
- self.errors_total = Counter(
- f"{prefix}_errors_total",
- "Total error count",
- ["service", "error_type"],
- )
-
- self.events_processed = Counter(
- f"{prefix}_events_processed_total",
- "Total events processed",
- ["service", "event_type"],
- )
-
- self.processing_seconds = Histogram(
- f"{prefix}_processing_seconds",
- "Event processing duration in seconds",
- ["service"],
- )
-
- self.service_up = Gauge(
- f"{prefix}_up",
- "Service health status (1=up, 0=down)",
- ["service"],
- )
-```
-
-- [ ] **Step 4: Run test to verify it passes**
-
-Run: `pytest shared/tests/test_metrics.py -v`
-Expected: All 3 tests PASS
-
-- [ ] **Step 5: Write the failing test for healthcheck**
-
-Create `shared/tests/test_healthcheck.py`:
-
-```python
-"""Tests for health check server."""
-import pytest
-from unittest.mock import AsyncMock
-
-from shared.healthcheck import HealthCheckServer
-
-
-def test_healthcheck_server_init():
- server = HealthCheckServer(service_name="test", port=9090)
- assert server._service_name == "test"
- assert server._port == 9090
-
-
-def test_healthcheck_register_check():
- server = HealthCheckServer(service_name="test", port=9090)
- check_fn = AsyncMock(return_value=True)
- server.register_check("redis", check_fn)
- assert "redis" in server._checks
-
-
-@pytest.mark.asyncio
-async def test_healthcheck_run_checks_all_pass():
- server = HealthCheckServer(service_name="test", port=9090)
- server.register_check("redis", AsyncMock(return_value=True))
- server.register_check("postgres", AsyncMock(return_value=True))
- result = await server.run_checks()
- assert result["status"] == "ok"
- assert result["checks"]["redis"] is True
- assert result["checks"]["postgres"] is True
-
-
-@pytest.mark.asyncio
-async def test_healthcheck_run_checks_one_fails():
- server = HealthCheckServer(service_name="test", port=9090)
- server.register_check("redis", AsyncMock(return_value=True))
- server.register_check("postgres", AsyncMock(return_value=False))
- result = await server.run_checks()
- assert result["status"] == "degraded"
- assert result["checks"]["postgres"] is False
-```
-
-- [ ] **Step 6: Run test to verify it fails**
-
-Run: `pytest shared/tests/test_healthcheck.py -v`
-Expected: FAIL with `ModuleNotFoundError: No module named 'shared.healthcheck'`
-
-- [ ] **Step 7: Implement healthcheck server**
-
-Create `shared/src/shared/healthcheck.py`:
-
-```python
-"""Lightweight HTTP server for health checks and Prometheus metrics."""
-import time
-from typing import Any, Callable, Coroutine
-
-from aiohttp import web
-from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
-
-
-class HealthCheckServer:
- """Serves /health and /metrics endpoints."""
-
- def __init__(self, service_name: str, port: int = 8080) -> None:
- self._service_name = service_name
- self._port = port
- self._checks: dict[str, Callable[[], Coroutine[Any, Any, bool]]] = {}
- self._start_time = time.monotonic()
-
- def register_check(
- self, name: str, check_fn: Callable[[], Coroutine[Any, Any, bool]]
- ) -> None:
- """Register a named async health check function."""
- self._checks[name] = check_fn
-
- async def run_checks(self) -> dict[str, Any]:
- """Run all registered checks and return aggregated result."""
- results = {}
- all_ok = True
- for name, fn in self._checks.items():
- try:
- results[name] = await fn()
- except Exception:
- results[name] = False
- if not results[name]:
- all_ok = False
-
- return {
- "status": "ok" if all_ok else "degraded",
- "service": self._service_name,
- "uptime_seconds": round(time.monotonic() - self._start_time, 1),
- "checks": results,
- }
-
- async def _handle_health(self, request: web.Request) -> web.Response:
- result = await self.run_checks()
- status = 200 if result["status"] == "ok" else 503
- return web.json_response(result, status=status)
-
- async def _handle_metrics(self, request: web.Request) -> web.Response:
- return web.Response(
- body=generate_latest(),
- content_type=CONTENT_TYPE_LATEST,
- )
-
- async def start(self) -> web.AppRunner:
- """Start the HTTP server in the background."""
- app = web.Application()
- app.router.add_get("/health", self._handle_health)
- app.router.add_get("/metrics", self._handle_metrics)
-
- runner = web.AppRunner(app)
- await runner.setup()
- site = web.TCPSite(runner, "0.0.0.0", self._port)
- await site.start()
- return runner
-```
-
-- [ ] **Step 8: Run test to verify it passes**
-
-Run: `pytest shared/tests/test_healthcheck.py -v`
-Expected: All 4 tests PASS
-
-- [ ] **Step 9: Create Prometheus config and update docker-compose**
-
-Create `monitoring/prometheus.yml`:
-
-```yaml
-global:
- scrape_interval: 15s
-
-scrape_configs:
- - job_name: "trading-services"
- static_configs:
- - targets:
- - "data-collector:8080"
- - "strategy-engine:8081"
- - "order-executor:8082"
- - "portfolio-manager:8083"
-```
-
-Add to `docker-compose.yml` — append these services before the `volumes:` section:
-
-```yaml
- prometheus:
- image: prom/prometheus:latest
- profiles: ["monitoring"]
- ports:
- - "9090:9090"
- volumes:
- - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
- depends_on:
- - data-collector
- - strategy-engine
- - order-executor
- - portfolio-manager
-
- grafana:
- image: grafana/grafana:latest
- profiles: ["monitoring"]
- ports:
- - "3000:3000"
- depends_on:
- - prometheus
-```
-
-- [ ] **Step 10: Commit**
-
-```bash
-git add shared/src/shared/metrics.py shared/src/shared/healthcheck.py \
- shared/tests/test_metrics.py shared/tests/test_healthcheck.py \
- monitoring/prometheus.yml docker-compose.yml
-git commit -m "feat(shared): add health checks and Prometheus metrics"
-```
-
----
-
-## Task 7: Integrate Operations Infrastructure into Services
-
-**Files:**
-- Modify: `services/data-collector/src/data_collector/main.py`
-- Modify: `services/data-collector/src/data_collector/storage.py`
-- Modify: `services/order-executor/src/order_executor/main.py`
-- Modify: `services/order-executor/src/order_executor/executor.py`
-- Modify: `services/portfolio-manager/src/portfolio_manager/main.py`
-- Modify: `services/strategy-engine/src/strategy_engine/main.py`
-
-This task integrates structlog, healthcheck, metrics, Telegram, and resilience into each service's entry point. Each service follows the same pattern.
-
-- [ ] **Step 1: Update data-collector/main.py**
-
-Replace `services/data-collector/src/data_collector/main.py`:
-
-```python
-"""Data Collector Service entry point."""
-import asyncio
-
-from shared.broker import RedisBroker
-from shared.config import Settings
-from shared.db import Database
-from shared.healthcheck import HealthCheckServer
-from shared.logging import setup_logging
-from shared.metrics import ServiceMetrics
-from shared.notifier import TelegramNotifier
-from shared.resilience import retry_with_backoff
-
-from data_collector.binance_ws import BinanceWebSocket
-from data_collector.config import CollectorConfig
-from data_collector.storage import CandleStorage
-
-
-async def run() -> None:
- config = CollectorConfig()
- log = setup_logging("data-collector", config.log_level, config.log_format)
- metrics = ServiceMetrics("data_collector")
-
- notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token,
- chat_id=config.telegram_chat_id,
- )
-
- db = Database(config.database_url)
- await db.connect()
-
- broker = RedisBroker(config.redis_url)
- storage = CandleStorage(db=db, broker=broker)
-
- # Health checks
- health = HealthCheckServer("data-collector", port=config.health_port)
-
- async def check_redis():
- try:
- await broker._redis.ping()
- return True
- except Exception:
- return False
-
- health.register_check("redis", check_redis)
- await health.start()
-
- metrics.service_up.labels(service="data-collector").set(1)
-
- async def on_candle(candle):
- log.info("candle_received", symbol=candle.symbol, timeframe=candle.timeframe)
- await storage.store(candle)
- metrics.events_processed.labels(
- service="data-collector", event_type="candle"
- ).inc()
-
- timeframe = config.timeframes[0] if config.timeframes else "1m"
-
- ws = BinanceWebSocket(
- symbols=config.symbols,
- timeframe=timeframe,
- on_candle=on_candle,
- )
-
- log.info("starting", symbols=config.symbols, timeframe=timeframe)
-
- try:
- await ws.start()
- except Exception as exc:
- log.error("fatal_error", error=str(exc))
- metrics.errors_total.labels(
- service="data-collector", error_type="fatal"
- ).inc()
- await notifier.send_error(str(exc), "data-collector")
- raise
- finally:
- metrics.service_up.labels(service="data-collector").set(0)
- await notifier.close()
- await broker.close()
- await db.close()
-
-
-def main() -> None:
- asyncio.run(run())
-
-
-if __name__ == "__main__":
- main()
-```
-
-- [ ] **Step 2: Update order-executor/executor.py to use notifier**
-
-Add notifier parameter to `OrderExecutor.__init__` and call it on order events. Replace `services/order-executor/src/order_executor/executor.py`:
-
-```python
-"""Order execution logic."""
-import logging
-from datetime import datetime, timezone
-from decimal import Decimal
-from typing import Any, Optional
-
-from shared.broker import RedisBroker
-from shared.db import Database
-from shared.events import OrderEvent
-from shared.models import Order, OrderSide, OrderStatus, OrderType, Signal
-from shared.notifier import TelegramNotifier
-
-from order_executor.risk_manager import RiskManager
-
-logger = logging.getLogger(__name__)
-
-
-class OrderExecutor:
- """Executes orders on an exchange with risk gating."""
-
- def __init__(
- self,
- exchange: Any,
- risk_manager: RiskManager,
- broker: RedisBroker,
- db: Database,
- notifier: TelegramNotifier,
- dry_run: bool = True,
- ) -> None:
- self.exchange = exchange
- self.risk_manager = risk_manager
- self.broker = broker
- self.db = db
- self.notifier = notifier
- self.dry_run = dry_run
-
- async def execute(self, signal: Signal) -> Optional[Order]:
- """Run risk checks and place an order for the given signal."""
- balance_data = await self.exchange.fetch_balance()
- free_balances = balance_data.get("free", {})
- quote_currency = signal.symbol.split("/")[-1] if "/" in signal.symbol else "USDT"
- balance = Decimal(str(free_balances.get(quote_currency, 0)))
-
- positions = {}
- daily_pnl = Decimal(0)
-
- result = self.risk_manager.check(
- signal=signal,
- balance=balance,
- positions=positions,
- daily_pnl=daily_pnl,
- )
-
- if not result.allowed:
- logger.warning(
- "Risk check rejected signal %s: %s", signal.id, result.reason
- )
- return None
-
- order = Order(
- signal_id=signal.id,
- symbol=signal.symbol,
- side=signal.side,
- type=OrderType.MARKET,
- price=signal.price,
- quantity=signal.quantity,
- status=OrderStatus.PENDING,
- )
-
- if self.dry_run:
- order.status = OrderStatus.FILLED
- order.filled_at = datetime.now(timezone.utc)
- logger.info("[DRY RUN] Order filled: %s %s %s", order.side, order.quantity, order.symbol)
- else:
- try:
- await self.exchange.create_order(
- symbol=signal.symbol,
- type="market",
- side=signal.side.value.lower(),
- amount=float(signal.quantity),
- )
- order.status = OrderStatus.FILLED
- order.filled_at = datetime.now(timezone.utc)
- logger.info("Order filled: %s %s %s", order.side, order.quantity, order.symbol)
- except Exception as exc:
- order.status = OrderStatus.FAILED
- logger.error("Order failed for signal %s: %s", signal.id, exc)
-
- await self.db.insert_order(order)
- await self.broker.publish("orders", OrderEvent(data=order).to_dict())
- await self.notifier.send_order(order)
-
- return order
-```
-
-- [ ] **Step 3: Update order-executor/main.py**
-
-Replace `services/order-executor/src/order_executor/main.py`:
-
-```python
-"""Order Executor Service entry point."""
-import asyncio
-from decimal import Decimal
-
-import ccxt.async_support as ccxt
-
-from shared.broker import RedisBroker
-from shared.db import Database
-from shared.events import Event, EventType
-from shared.healthcheck import HealthCheckServer
-from shared.logging import setup_logging
-from shared.metrics import ServiceMetrics
-from shared.notifier import TelegramNotifier
-
-from order_executor.config import ExecutorConfig
-from order_executor.executor import OrderExecutor
-from order_executor.risk_manager import RiskManager
-
-
-async def run() -> None:
- config = ExecutorConfig()
- log = setup_logging("order-executor", config.log_level, config.log_format)
- metrics = ServiceMetrics("order_executor")
-
- notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token,
- chat_id=config.telegram_chat_id,
- )
-
- db = Database(config.database_url)
- await db.connect()
-
- broker = RedisBroker(config.redis_url)
-
- exchange = ccxt.binance(
- {"apiKey": config.binance_api_key, "secret": config.binance_api_secret}
- )
-
- risk_manager = RiskManager(
- max_position_size=Decimal(str(config.risk_max_position_size)),
- stop_loss_pct=Decimal(str(config.risk_stop_loss_pct)),
- daily_loss_limit_pct=Decimal(str(config.risk_daily_loss_limit_pct)),
- )
-
- executor = OrderExecutor(
- exchange=exchange,
- risk_manager=risk_manager,
- broker=broker,
- db=db,
- notifier=notifier,
- dry_run=config.dry_run,
- )
-
- health = HealthCheckServer("order-executor", port=config.health_port + 2)
- await health.start()
- metrics.service_up.labels(service="order-executor").set(1)
-
- last_id = "$"
- stream = "signals"
- log.info("started", stream=stream, dry_run=config.dry_run)
-
- try:
- while True:
- messages = await broker.read(stream, last_id=last_id, count=10, block=5000)
- for msg in messages:
- try:
- event = Event.from_dict(msg)
- if event.type == EventType.SIGNAL:
- signal = event.data
- log.info("processing_signal", signal_id=signal.id, symbol=signal.symbol)
- await executor.execute(signal)
- metrics.events_processed.labels(
- service="order-executor", event_type="signal"
- ).inc()
- except Exception as exc:
- log.error("process_failed", error=str(exc))
- metrics.errors_total.labels(
- service="order-executor", error_type="processing"
- ).inc()
- finally:
- metrics.service_up.labels(service="order-executor").set(0)
- await notifier.close()
- await broker.close()
- await db.close()
- await exchange.close()
-
-
-def main() -> None:
- asyncio.run(run())
-
-
-if __name__ == "__main__":
- main()
-```
-
-- [ ] **Step 4: Update strategy-engine/main.py**
-
-Replace `services/strategy-engine/src/strategy_engine/main.py`:
-
-```python
-"""Strategy Engine Service entry point."""
-import asyncio
-from pathlib import Path
-
-from shared.broker import RedisBroker
-from shared.healthcheck import HealthCheckServer
-from shared.logging import setup_logging
-from shared.metrics import ServiceMetrics
-from shared.notifier import TelegramNotifier
-
-from strategy_engine.config import StrategyConfig
-from strategy_engine.engine import StrategyEngine
-from strategy_engine.plugin_loader import load_strategies
-
-STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies"
-
-
-async def run() -> None:
- config = StrategyConfig()
- log = setup_logging("strategy-engine", config.log_level, config.log_format)
- metrics = ServiceMetrics("strategy_engine")
-
- notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token,
- chat_id=config.telegram_chat_id,
- )
-
- broker = RedisBroker(config.redis_url)
- strategies = load_strategies(STRATEGIES_DIR)
-
- for strategy in strategies:
- params = config.strategy_params.get(strategy.name, {})
- strategy.configure(params)
-
- log.info("loaded_strategies", count=len(strategies), names=[s.name for s in strategies])
-
- engine = StrategyEngine(broker=broker, strategies=strategies)
-
- health = HealthCheckServer("strategy-engine", port=config.health_port + 1)
- await health.start()
- metrics.service_up.labels(service="strategy-engine").set(1)
-
- try:
- for symbol in config.symbols:
- stream = f"candles.{symbol.replace('/', '_')}"
- last_id = "$"
- log.info("engine_loop_start", stream=stream)
- while True:
- last_id = await engine.process_once(stream, last_id)
- except Exception as exc:
- log.error("fatal_error", error=str(exc))
- await notifier.send_error(str(exc), "strategy-engine")
- raise
- finally:
- metrics.service_up.labels(service="strategy-engine").set(0)
- await notifier.close()
- await broker.close()
-
-
-def main() -> None:
- asyncio.run(run())
-
-
-if __name__ == "__main__":
- main()
-```
-
-- [ ] **Step 5: Update portfolio-manager/main.py**
-
-Replace `services/portfolio-manager/src/portfolio_manager/main.py`:
-
-```python
-"""Portfolio Manager Service entry point."""
-import asyncio
-
-from shared.broker import RedisBroker
-from shared.events import Event, OrderEvent
-from shared.healthcheck import HealthCheckServer
-from shared.logging import setup_logging
-from shared.metrics import ServiceMetrics
-from shared.notifier import TelegramNotifier
-
-from portfolio_manager.config import PortfolioConfig
-from portfolio_manager.portfolio import PortfolioTracker
-
-ORDERS_STREAM = "orders"
-
-
-async def run() -> None:
- config = PortfolioConfig()
- log = setup_logging("portfolio-manager", config.log_level, config.log_format)
- metrics = ServiceMetrics("portfolio_manager")
-
- notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token,
- chat_id=config.telegram_chat_id,
- )
-
- broker = RedisBroker(config.redis_url)
- tracker = PortfolioTracker()
-
- health = HealthCheckServer("portfolio-manager", port=config.health_port + 3)
- await health.start()
- metrics.service_up.labels(service="portfolio-manager").set(1)
-
- last_id = "$"
- log.info("started", stream=ORDERS_STREAM)
-
- try:
- while True:
- messages = await broker.read(ORDERS_STREAM, last_id=last_id, block=1000)
- for msg in messages:
- try:
- event = Event.from_dict(msg)
- if isinstance(event, OrderEvent):
- order = event.data
- tracker.apply_order(order)
- log.info(
- "order_applied",
- symbol=order.symbol,
- side=order.side.value,
- qty=str(order.quantity),
- )
- metrics.events_processed.labels(
- service="portfolio-manager", event_type="order"
- ).inc()
- except Exception as exc:
- log.error("process_failed", error=str(exc))
- metrics.errors_total.labels(
- service="portfolio-manager", error_type="processing"
- ).inc()
- finally:
- metrics.service_up.labels(service="portfolio-manager").set(0)
- await notifier.close()
- await broker.close()
-
-
-def main() -> None:
- asyncio.run(run())
-
-
-if __name__ == "__main__":
- main()
-```
-
-- [ ] **Step 6: Run all tests to verify nothing is broken**
-
-Run: `pytest -v`
-Expected: All existing tests PASS (some executor tests may need mock updates for `notifier` param)
-
-- [ ] **Step 7: Fix any broken executor tests**
-
-The `OrderExecutor` now requires a `notifier` parameter. Update `services/order-executor/tests/test_executor.py` — add `notifier=AsyncMock()` to every `OrderExecutor(...)` call. For example, wherever the test creates:
-
-```python
-executor = OrderExecutor(
- exchange=mock_exchange,
- risk_manager=mock_risk,
- broker=mock_broker,
- db=mock_db,
- dry_run=True,
-)
-```
-
-Change to:
-
-```python
-executor = OrderExecutor(
- exchange=mock_exchange,
- risk_manager=mock_risk,
- broker=mock_broker,
- db=mock_db,
- notifier=AsyncMock(),
- dry_run=True,
-)
-```
-
-- [ ] **Step 8: Run all tests again**
-
-Run: `pytest -v`
-Expected: All tests PASS
-
-- [ ] **Step 9: Commit**
-
-```bash
-git add services/data-collector/src/data_collector/main.py \
- services/order-executor/src/order_executor/executor.py \
- services/order-executor/src/order_executor/main.py \
- services/order-executor/tests/test_executor.py \
- services/strategy-engine/src/strategy_engine/main.py \
- services/portfolio-manager/src/portfolio_manager/main.py
-git commit -m "feat(services): integrate structlog, healthcheck, metrics, and Telegram"
-```
-
----
-
-## Task 8: BaseStrategy warmup_period + YAML Config Loading
-
-**Files:**
-- Modify: `services/strategy-engine/strategies/base.py`
-- Modify: `services/strategy-engine/strategies/rsi_strategy.py`
-- Modify: `services/strategy-engine/strategies/grid_strategy.py`
-- Modify: `services/strategy-engine/src/strategy_engine/plugin_loader.py`
-- Create: `services/strategy-engine/strategies/config/rsi_strategy.yaml`
-- Create: `services/strategy-engine/strategies/config/grid_strategy.yaml`
-
-- [ ] **Step 1: Update BaseStrategy with warmup_period**
-
-Replace `services/strategy-engine/strategies/base.py`:
-
-```python
-from abc import ABC, abstractmethod
-from shared.models import Candle, Signal
-
-
-class BaseStrategy(ABC):
- name: str = "base"
-
- @property
- @abstractmethod
- def warmup_period(self) -> int:
- """Minimum number of candles needed before generating signals."""
- pass
-
- @abstractmethod
- def on_candle(self, candle: Candle) -> Signal | None:
- pass
-
- @abstractmethod
- def configure(self, params: dict) -> None:
- pass
-
- def reset(self) -> None:
- pass
-```
-
-- [ ] **Step 2: Update RsiStrategy with warmup_period**
-
-Add this property to `RsiStrategy` in `rsi_strategy.py`, after `__init__`:
-
-```python
- @property
- def warmup_period(self) -> int:
- return self._period + 1
-```
-
-- [ ] **Step 3: Update GridStrategy with warmup_period**
-
-Add this property to `GridStrategy` in `grid_strategy.py`, after `__init__`:
-
-```python
- @property
- def warmup_period(self) -> int:
- return 2 # Needs at least 2 candles to detect zone crossing
-```
-
-- [ ] **Step 4: Create YAML config files**
-
-Create `services/strategy-engine/strategies/config/rsi_strategy.yaml`:
-
-```yaml
-period: 14
-oversold: 30
-overbought: 70
-quantity: "0.01"
-```
-
-Create `services/strategy-engine/strategies/config/grid_strategy.yaml`:
-
-```yaml
-lower_price: 60000
-upper_price: 70000
-grid_count: 5
-quantity: "0.01"
-```
-
-- [ ] **Step 5: Update plugin_loader.py with YAML config loading**
-
-Replace `services/strategy-engine/src/strategy_engine/plugin_loader.py`:
-
-```python
-"""Dynamic plugin loader for strategy modules with YAML config support."""
-import importlib.util
-import sys
-from pathlib import Path
-
-import yaml
-
-from strategies.base import BaseStrategy
-
-
-def load_strategies(strategies_dir: Path) -> list[BaseStrategy]:
- """Scan strategies_dir for *.py files and load all BaseStrategy subclasses.
-
- Automatically loads matching YAML config from strategies_dir/config/.
- """
- loaded: list[BaseStrategy] = []
- config_dir = strategies_dir / "config"
-
- for path in sorted(strategies_dir.glob("*.py")):
- if path.name.startswith("__") or path.name == "base.py":
- continue
-
- module_name = f"_strategy_plugin_{path.stem}"
- spec = importlib.util.spec_from_file_location(module_name, path)
- if spec is None or spec.loader is None:
- continue
-
- module = importlib.util.module_from_spec(spec)
- sys.modules[module_name] = module
- spec.loader.exec_module(module)
-
- for attr_name in dir(module):
- obj = getattr(module, attr_name)
- if (
- isinstance(obj, type)
- and issubclass(obj, BaseStrategy)
- and obj is not BaseStrategy
- ):
- instance = obj()
-
- # Load YAML config if it exists
- yaml_path = config_dir / f"{path.stem}.yaml"
- if yaml_path.exists():
- with open(yaml_path) as f:
- params = yaml.safe_load(f) or {}
- instance.configure(params)
-
- loaded.append(instance)
-
- return loaded
-```
-
-- [ ] **Step 6: Run existing strategy tests**
-
-Run: `pytest services/strategy-engine/tests/ -v`
-Expected: All tests PASS
-
-- [ ] **Step 7: Commit**
-
-```bash
-git add services/strategy-engine/strategies/base.py \
- services/strategy-engine/strategies/rsi_strategy.py \
- services/strategy-engine/strategies/grid_strategy.py \
- services/strategy-engine/src/strategy_engine/plugin_loader.py \
- services/strategy-engine/strategies/config/
-git commit -m "feat(strategy): add warmup_period to BaseStrategy and YAML config loading"
-```
-
----
-
-## Task 9: MACD Strategy
-
-**Files:**
-- Create: `services/strategy-engine/strategies/macd_strategy.py`
-- Create: `services/strategy-engine/strategies/config/macd_strategy.yaml`
-- Test: `services/strategy-engine/tests/test_macd_strategy.py`
-
-- [ ] **Step 1: Write the failing test**
-
-Create `services/strategy-engine/tests/test_macd_strategy.py`:
-
-```python
-"""Tests for MACD strategy."""
-from decimal import Decimal
-
-import pytest
-
-from shared.models import Candle, OrderSide
-from strategies.macd_strategy import MacdStrategy
-
-
-@pytest.fixture
-def strategy():
- s = MacdStrategy()
- s.configure({"fast_period": 3, "slow_period": 6, "signal_period": 3, "quantity": "0.01"})
- return s
-
-
-def _candle(price: float) -> Candle:
- from datetime import datetime, timezone
- return Candle(
- symbol="BTCUSDT",
- timeframe="1m",
- open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
- open=Decimal(str(price)),
- high=Decimal(str(price + 100)),
- low=Decimal(str(price - 100)),
- close=Decimal(str(price)),
- volume=Decimal("10"),
- )
-
-
-def test_macd_warmup_period(strategy):
- assert strategy.warmup_period == 9 # slow_period + signal_period = 6 + 3
-
-
-def test_macd_no_signal_insufficient_data(strategy):
- for price in [100, 101, 102, 103, 104]:
- result = strategy.on_candle(_candle(price))
- assert result is None
-
-
-def test_macd_buy_signal_on_bullish_crossover(strategy):
- # Feed declining prices to push MACD below signal, then rising to cross above
- prices = [100, 98, 96, 94, 92, 90, 88, 90, 93, 97, 102, 108, 115, 123, 132]
- signals = []
- for p in prices:
- sig = strategy.on_candle(_candle(p))
- if sig is not None:
- signals.append(sig)
- buy_signals = [s for s in signals if s.side == OrderSide.BUY]
- assert len(buy_signals) > 0
- assert buy_signals[0].strategy == "macd"
-
-
-def test_macd_sell_signal_on_bearish_crossover(strategy):
- # Feed rising prices to push MACD above signal, then declining to cross below
- prices = [100, 105, 110, 116, 122, 128, 125, 120, 114, 107, 99, 90, 80, 70, 60]
- signals = []
- for p in prices:
- sig = strategy.on_candle(_candle(p))
- if sig is not None:
- signals.append(sig)
- sell_signals = [s for s in signals if s.side == OrderSide.SELL]
- assert len(sell_signals) > 0
-
-
-def test_macd_reset_clears_state(strategy):
- for p in [100, 101, 102]:
- strategy.on_candle(_candle(p))
- strategy.reset()
- assert len(strategy._closes) == 0
-```
-
-- [ ] **Step 2: Run test to verify it fails**
-
-Run: `pytest services/strategy-engine/tests/test_macd_strategy.py -v`
-Expected: FAIL with `ModuleNotFoundError: No module named 'strategies.macd_strategy'`
-
-- [ ] **Step 3: Implement MACD strategy**
-
-Create `services/strategy-engine/strategies/macd_strategy.py`:
-
-```python
-"""MACD (Moving Average Convergence Divergence) strategy."""
-from collections import deque
-from decimal import Decimal
-
-import pandas as pd
-
-from shared.models import Candle, Signal, OrderSide
-from strategies.base import BaseStrategy
-
-
-class MacdStrategy(BaseStrategy):
- name: str = "macd"
-
- def __init__(self) -> None:
- self._fast_period: int = 12
- self._slow_period: int = 26
- self._signal_period: int = 9
- self._quantity: Decimal = Decimal("0.01")
- self._closes: deque[float] = deque(maxlen=500)
- self._prev_histogram: float | None = None
-
- @property
- def warmup_period(self) -> int:
- return self._slow_period + self._signal_period
-
- def configure(self, params: dict) -> None:
- self._fast_period = int(params.get("fast_period", 12))
- self._slow_period = int(params.get("slow_period", 26))
- self._signal_period = int(params.get("signal_period", 9))
- self._quantity = Decimal(str(params.get("quantity", "0.01")))
-
- def reset(self) -> None:
- self._closes.clear()
- self._prev_histogram = None
-
- def on_candle(self, candle: Candle) -> Signal | None:
- self._closes.append(float(candle.close))
-
- if len(self._closes) < self.warmup_period:
- return None
-
- series = pd.Series(list(self._closes))
- fast_ema = series.ewm(span=self._fast_period, adjust=False).mean()
- slow_ema = series.ewm(span=self._slow_period, adjust=False).mean()
- macd_line = fast_ema - slow_ema
- signal_line = macd_line.ewm(span=self._signal_period, adjust=False).mean()
- histogram = macd_line - signal_line
-
- current_hist = histogram.iloc[-1]
-
- if self._prev_histogram is None:
- self._prev_histogram = current_hist
- return None
-
- prev = self._prev_histogram
- self._prev_histogram = current_hist
-
- # Bullish crossover: histogram crosses from negative to positive
- if prev < 0 and current_hist > 0:
- return Signal(
- strategy=self.name,
- symbol=candle.symbol,
- side=OrderSide.BUY,
- price=candle.close,
- quantity=self._quantity,
- reason=f"MACD bullish crossover (histogram {prev:.4f} -> {current_hist:.4f})",
- )
-
- # Bearish crossover: histogram crosses from positive to negative
- if prev > 0 and current_hist < 0:
- return Signal(
- strategy=self.name,
- symbol=candle.symbol,
- side=OrderSide.SELL,
- price=candle.close,
- quantity=self._quantity,
- reason=f"MACD bearish crossover (histogram {prev:.4f} -> {current_hist:.4f})",
- )
-
- return None
-```
-
-Create `services/strategy-engine/strategies/config/macd_strategy.yaml`:
-
-```yaml
-fast_period: 12
-slow_period: 26
-signal_period: 9
-quantity: "0.01"
-```
-
-- [ ] **Step 4: Run test to verify it passes**
-
-Run: `pytest services/strategy-engine/tests/test_macd_strategy.py -v`
-Expected: All 5 tests PASS
-
-- [ ] **Step 5: Commit**
-
-```bash
-git add services/strategy-engine/strategies/macd_strategy.py \
- services/strategy-engine/strategies/config/macd_strategy.yaml \
- services/strategy-engine/tests/test_macd_strategy.py
-git commit -m "feat(strategy): add MACD strategy"
-```
-
----
-
-## Task 10: Bollinger Bands Strategy
-
-**Files:**
-- Create: `services/strategy-engine/strategies/bollinger_strategy.py`
-- Create: `services/strategy-engine/strategies/config/bollinger_strategy.yaml`
-- Test: `services/strategy-engine/tests/test_bollinger_strategy.py`
-
-- [ ] **Step 1: Write the failing test**
-
-Create `services/strategy-engine/tests/test_bollinger_strategy.py`:
-
-```python
-"""Tests for Bollinger Bands strategy."""
-from decimal import Decimal
-from datetime import datetime, timezone
-
-import pytest
-
-from shared.models import Candle, OrderSide
-from strategies.bollinger_strategy import BollingerStrategy
-
-
-@pytest.fixture
-def strategy():
- s = BollingerStrategy()
- s.configure({"period": 5, "num_std": 2.0, "min_bandwidth": 0.0, "quantity": "0.01"})
- return s
-
-
-def _candle(price: float) -> Candle:
- return Candle(
- symbol="BTCUSDT",
- timeframe="1m",
- open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
- open=Decimal(str(price)),
- high=Decimal(str(price + 10)),
- low=Decimal(str(price - 10)),
- close=Decimal(str(price)),
- volume=Decimal("10"),
- )
-
-
-def test_bollinger_warmup_period(strategy):
- assert strategy.warmup_period == 5
-
-
-def test_bollinger_no_signal_insufficient_data(strategy):
- for p in [100, 101, 102]:
- result = strategy.on_candle(_candle(p))
- assert result is None
-
-
-def test_bollinger_buy_on_lower_band_recovery(strategy):
- # Stable prices to build bands, then drop below and recover
- prices = [100, 100, 100, 100, 100, 80, 80, 95]
- signals = []
- for p in prices:
- sig = strategy.on_candle(_candle(p))
- if sig is not None:
- signals.append(sig)
- buy_signals = [s for s in signals if s.side == OrderSide.BUY]
- assert len(buy_signals) > 0
- assert buy_signals[0].strategy == "bollinger"
-
-
-def test_bollinger_sell_on_upper_band_recovery(strategy):
- prices = [100, 100, 100, 100, 100, 120, 120, 105]
- signals = []
- for p in prices:
- sig = strategy.on_candle(_candle(p))
- if sig is not None:
- signals.append(sig)
- sell_signals = [s for s in signals if s.side == OrderSide.SELL]
- assert len(sell_signals) > 0
-
-
-def test_bollinger_reset_clears_state(strategy):
- for p in [100, 101]:
- strategy.on_candle(_candle(p))
- strategy.reset()
- assert len(strategy._closes) == 0
-```
-
-- [ ] **Step 2: Run test to verify it fails**
-
-Run: `pytest services/strategy-engine/tests/test_bollinger_strategy.py -v`
-Expected: FAIL with `ModuleNotFoundError`
-
-- [ ] **Step 3: Implement Bollinger Bands strategy**
-
-Create `services/strategy-engine/strategies/bollinger_strategy.py`:
-
-```python
-"""Bollinger Bands strategy."""
-from collections import deque
-from decimal import Decimal
-
-import pandas as pd
-
-from shared.models import Candle, Signal, OrderSide
-from strategies.base import BaseStrategy
-
-
-class BollingerStrategy(BaseStrategy):
- name: str = "bollinger"
-
- def __init__(self) -> None:
- self._period: int = 20
- self._num_std: float = 2.0
- self._min_bandwidth: float = 0.02
- self._quantity: Decimal = Decimal("0.01")
- self._closes: deque[float] = deque(maxlen=500)
- self._was_below_lower: bool = False
- self._was_above_upper: bool = False
-
- @property
- def warmup_period(self) -> int:
- return self._period
-
- def configure(self, params: dict) -> None:
- self._period = int(params.get("period", 20))
- self._num_std = float(params.get("num_std", 2.0))
- self._min_bandwidth = float(params.get("min_bandwidth", 0.02))
- self._quantity = Decimal(str(params.get("quantity", "0.01")))
-
- def reset(self) -> None:
- self._closes.clear()
- self._was_below_lower = False
- self._was_above_upper = False
-
- def on_candle(self, candle: Candle) -> Signal | None:
- self._closes.append(float(candle.close))
-
- if len(self._closes) < self._period:
- return None
-
- series = pd.Series(list(self._closes))
- sma = series.rolling(self._period).mean().iloc[-1]
- std = series.rolling(self._period).std().iloc[-1]
-
- upper = sma + self._num_std * std
- lower = sma - self._num_std * std
-
- # Bandwidth filter
- if sma > 0:
- bandwidth = (upper - lower) / sma
- if bandwidth < self._min_bandwidth:
- return None
-
- price = float(candle.close)
-
- # Track band penetration
- if price < lower:
- self._was_below_lower = True
- if price > upper:
- self._was_above_upper = True
-
- # BUY: price was below lower band and recovered back inside
- if self._was_below_lower and price >= lower:
- self._was_below_lower = False
- return Signal(
- strategy=self.name,
- symbol=candle.symbol,
- side=OrderSide.BUY,
- price=candle.close,
- quantity=self._quantity,
- reason=f"Bollinger: price recovered above lower band ({lower:.2f})",
- )
-
- # SELL: price was above upper band and recovered back inside
- if self._was_above_upper and price <= upper:
- self._was_above_upper = False
- return Signal(
- strategy=self.name,
- symbol=candle.symbol,
- side=OrderSide.SELL,
- price=candle.close,
- quantity=self._quantity,
- reason=f"Bollinger: price recovered below upper band ({upper:.2f})",
- )
-
- return None
-```
-
-Create `services/strategy-engine/strategies/config/bollinger_strategy.yaml`:
-
-```yaml
-period: 20
-num_std: 2.0
-min_bandwidth: 0.02
-quantity: "0.01"
-```
-
-- [ ] **Step 4: Run test to verify it passes**
-
-Run: `pytest services/strategy-engine/tests/test_bollinger_strategy.py -v`
-Expected: All 5 tests PASS
-
-- [ ] **Step 5: Commit**
-
-```bash
-git add services/strategy-engine/strategies/bollinger_strategy.py \
- services/strategy-engine/strategies/config/bollinger_strategy.yaml \
- services/strategy-engine/tests/test_bollinger_strategy.py
-git commit -m "feat(strategy): add Bollinger Bands strategy"
-```
-
----
-
-## Task 11: EMA Crossover Strategy
-
-**Files:**
-- Create: `services/strategy-engine/strategies/ema_crossover_strategy.py`
-- Create: `services/strategy-engine/strategies/config/ema_crossover_strategy.yaml`
-- Test: `services/strategy-engine/tests/test_ema_crossover_strategy.py`
-
-- [ ] **Step 1: Write the failing test**
-
-Create `services/strategy-engine/tests/test_ema_crossover_strategy.py`:
-
-```python
-"""Tests for EMA Crossover strategy."""
-from decimal import Decimal
-from datetime import datetime, timezone
-
-import pytest
-
-from shared.models import Candle, OrderSide
-from strategies.ema_crossover_strategy import EmaCrossoverStrategy
-
-
-@pytest.fixture
-def strategy():
- s = EmaCrossoverStrategy()
- s.configure({"short_period": 3, "long_period": 6, "quantity": "0.01"})
- return s
-
-
-def _candle(price: float) -> Candle:
- return Candle(
- symbol="BTCUSDT",
- timeframe="1m",
- open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
- open=Decimal(str(price)),
- high=Decimal(str(price + 10)),
- low=Decimal(str(price - 10)),
- close=Decimal(str(price)),
- volume=Decimal("10"),
- )
-
-
-def test_ema_warmup_period(strategy):
- assert strategy.warmup_period == 6
-
-
-def test_ema_no_signal_insufficient_data(strategy):
- for p in [100, 101, 102]:
- result = strategy.on_candle(_candle(p))
- assert result is None
-
-
-def test_ema_buy_signal_golden_cross(strategy):
- # Declining then sharp rise: short EMA crosses above long EMA
- prices = [100, 98, 96, 94, 92, 90, 95, 100, 108, 117, 127]
- signals = []
- for p in prices:
- sig = strategy.on_candle(_candle(p))
- if sig is not None:
- signals.append(sig)
- buy_signals = [s for s in signals if s.side == OrderSide.BUY]
- assert len(buy_signals) > 0
- assert buy_signals[0].strategy == "ema_crossover"
-
-
-def test_ema_sell_signal_death_cross(strategy):
- # Rising then sharp decline: short EMA crosses below long EMA
- prices = [100, 105, 110, 115, 120, 125, 118, 110, 100, 88, 75]
- signals = []
- for p in prices:
- sig = strategy.on_candle(_candle(p))
- if sig is not None:
- signals.append(sig)
- sell_signals = [s for s in signals if s.side == OrderSide.SELL]
- assert len(sell_signals) > 0
-
-
-def test_ema_reset_clears_state(strategy):
- for p in [100, 101]:
- strategy.on_candle(_candle(p))
- strategy.reset()
- assert len(strategy._closes) == 0
-```
-
-- [ ] **Step 2: Run test to verify it fails**
-
-Run: `pytest services/strategy-engine/tests/test_ema_crossover_strategy.py -v`
-Expected: FAIL with `ModuleNotFoundError`
-
-- [ ] **Step 3: Implement EMA Crossover strategy**
-
-Create `services/strategy-engine/strategies/ema_crossover_strategy.py`:
-
-```python
-"""EMA Crossover (Golden Cross / Death Cross) strategy."""
-from collections import deque
-from decimal import Decimal
-
-import pandas as pd
-
-from shared.models import Candle, Signal, OrderSide
-from strategies.base import BaseStrategy
-
-
-class EmaCrossoverStrategy(BaseStrategy):
- name: str = "ema_crossover"
-
- def __init__(self) -> None:
- self._short_period: int = 9
- self._long_period: int = 21
- self._quantity: Decimal = Decimal("0.01")
- self._closes: deque[float] = deque(maxlen=500)
- self._prev_short_above: bool | None = None
-
- @property
- def warmup_period(self) -> int:
- return self._long_period
-
- def configure(self, params: dict) -> None:
- self._short_period = int(params.get("short_period", 9))
- self._long_period = int(params.get("long_period", 21))
- self._quantity = Decimal(str(params.get("quantity", "0.01")))
-
- def reset(self) -> None:
- self._closes.clear()
- self._prev_short_above = None
-
- def on_candle(self, candle: Candle) -> Signal | None:
- self._closes.append(float(candle.close))
-
- if len(self._closes) < self._long_period:
- return None
-
- series = pd.Series(list(self._closes))
- short_ema = series.ewm(span=self._short_period, adjust=False).mean().iloc[-1]
- long_ema = series.ewm(span=self._long_period, adjust=False).mean().iloc[-1]
-
- short_above = short_ema > long_ema
-
- if self._prev_short_above is None:
- self._prev_short_above = short_above
- return None
-
- prev = self._prev_short_above
- self._prev_short_above = short_above
-
- # Golden Cross: short EMA crosses above long EMA
- if not prev and short_above:
- return Signal(
- strategy=self.name,
- symbol=candle.symbol,
- side=OrderSide.BUY,
- price=candle.close,
- quantity=self._quantity,
- reason=f"EMA Golden Cross (short={short_ema:.2f} > long={long_ema:.2f})",
- )
-
- # Death Cross: short EMA crosses below long EMA
- if prev and not short_above:
- return Signal(
- strategy=self.name,
- symbol=candle.symbol,
- side=OrderSide.SELL,
- price=candle.close,
- quantity=self._quantity,
- reason=f"EMA Death Cross (short={short_ema:.2f} < long={long_ema:.2f})",
- )
-
- return None
-```
-
-Create `services/strategy-engine/strategies/config/ema_crossover_strategy.yaml`:
-
-```yaml
-short_period: 9
-long_period: 21
-quantity: "0.01"
-```
-
-- [ ] **Step 4: Run test to verify it passes**
-
-Run: `pytest services/strategy-engine/tests/test_ema_crossover_strategy.py -v`
-Expected: All 5 tests PASS
-
-- [ ] **Step 5: Commit**
-
-```bash
-git add services/strategy-engine/strategies/ema_crossover_strategy.py \
- services/strategy-engine/strategies/config/ema_crossover_strategy.yaml \
- services/strategy-engine/tests/test_ema_crossover_strategy.py
-git commit -m "feat(strategy): add EMA Crossover strategy"
-```
-
----
-
-## Task 12: VWAP Strategy
-
-**Files:**
-- Create: `services/strategy-engine/strategies/vwap_strategy.py`
-- Create: `services/strategy-engine/strategies/config/vwap_strategy.yaml`
-- Test: `services/strategy-engine/tests/test_vwap_strategy.py`
-
-- [ ] **Step 1: Write the failing test**
-
-Create `services/strategy-engine/tests/test_vwap_strategy.py`:
-
-```python
-"""Tests for VWAP strategy."""
-from decimal import Decimal
-from datetime import datetime, timezone
-
-import pytest
-
-from shared.models import Candle, OrderSide
-from strategies.vwap_strategy import VwapStrategy
-
-
-@pytest.fixture
-def strategy():
- s = VwapStrategy()
- s.configure({"deviation_threshold": 0.01, "quantity": "0.01"})
- return s
-
-
-def _candle(price: float, volume: float = 10.0) -> Candle:
- return Candle(
- symbol="BTCUSDT",
- timeframe="1m",
- open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
- open=Decimal(str(price)),
- high=Decimal(str(price + 10)),
- low=Decimal(str(price - 10)),
- close=Decimal(str(price)),
- volume=Decimal(str(volume)),
- )
-
-
-def test_vwap_warmup_period(strategy):
- assert strategy.warmup_period == 30
-
-
-def test_vwap_no_signal_insufficient_data(strategy):
- for i in range(10):
- result = strategy.on_candle(_candle(100))
- assert result is None
-
-
-def test_vwap_buy_signal_below_vwap_recovery(strategy):
- # Build VWAP at ~100, then go below, then recover
- signals = []
- for _ in range(30):
- strategy.on_candle(_candle(100, 100))
- # Drop below VWAP
- for _ in range(5):
- strategy.on_candle(_candle(95, 10))
- # Recover to VWAP
- sig = strategy.on_candle(_candle(100, 10))
- if sig is not None:
- signals.append(sig)
- buy_signals = [s for s in signals if s.side == OrderSide.BUY]
- assert len(buy_signals) > 0
-
-
-def test_vwap_sell_signal_above_vwap_recovery(strategy):
- signals = []
- for _ in range(30):
- strategy.on_candle(_candle(100, 100))
- for _ in range(5):
- strategy.on_candle(_candle(105, 10))
- sig = strategy.on_candle(_candle(100, 10))
- if sig is not None:
- signals.append(sig)
- sell_signals = [s for s in signals if s.side == OrderSide.SELL]
- assert len(sell_signals) > 0
-
-
-def test_vwap_reset_clears_state(strategy):
- strategy.on_candle(_candle(100))
- strategy.reset()
- assert strategy._cumulative_tp_vol == 0.0
- assert strategy._cumulative_vol == 0.0
-```
-
-- [ ] **Step 2: Run test to verify it fails**
-
-Run: `pytest services/strategy-engine/tests/test_vwap_strategy.py -v`
-Expected: FAIL with `ModuleNotFoundError`
-
-- [ ] **Step 3: Implement VWAP strategy**
-
-Create `services/strategy-engine/strategies/vwap_strategy.py`:
-
-```python
-"""VWAP (Volume Weighted Average Price) strategy."""
-from decimal import Decimal
-
-from shared.models import Candle, Signal, OrderSide
-from strategies.base import BaseStrategy
-
-
-class VwapStrategy(BaseStrategy):
- name: str = "vwap"
-
- def __init__(self) -> None:
- self._deviation_threshold: float = 0.002
- self._quantity: Decimal = Decimal("0.01")
- self._cumulative_tp_vol: float = 0.0
- self._cumulative_vol: float = 0.0
- self._candle_count: int = 0
- self._was_below_vwap: bool = False
- self._was_above_vwap: bool = False
-
- @property
- def warmup_period(self) -> int:
- return 30
-
- def configure(self, params: dict) -> None:
- self._deviation_threshold = float(params.get("deviation_threshold", 0.002))
- self._quantity = Decimal(str(params.get("quantity", "0.01")))
-
- def reset(self) -> None:
- self._cumulative_tp_vol = 0.0
- self._cumulative_vol = 0.0
- self._candle_count = 0
- self._was_below_vwap = False
- self._was_above_vwap = False
-
- def on_candle(self, candle: Candle) -> Signal | None:
- high = float(candle.high)
- low = float(candle.low)
- close = float(candle.close)
- volume = float(candle.volume)
-
- typical_price = (high + low + close) / 3.0
- self._cumulative_tp_vol += typical_price * volume
- self._cumulative_vol += volume
- self._candle_count += 1
-
- if self._candle_count < self.warmup_period:
- return None
-
- if self._cumulative_vol == 0:
- return None
-
- vwap = self._cumulative_tp_vol / self._cumulative_vol
- deviation = (close - vwap) / vwap if vwap != 0 else 0
-
- # Track VWAP deviations
- if deviation < -self._deviation_threshold:
- self._was_below_vwap = True
- self._was_above_vwap = False
- elif deviation > self._deviation_threshold:
- self._was_above_vwap = True
- self._was_below_vwap = False
-
- # BUY: price was below VWAP and recovered to VWAP (mean reversion)
- if self._was_below_vwap and abs(deviation) <= self._deviation_threshold:
- self._was_below_vwap = False
- return Signal(
- strategy=self.name,
- symbol=candle.symbol,
- side=OrderSide.BUY,
- price=candle.close,
- quantity=self._quantity,
- reason=f"VWAP mean reversion from below (VWAP={vwap:.2f}, deviation={deviation:.4f})",
- )
-
- # SELL: price was above VWAP and recovered to VWAP
- if self._was_above_vwap and abs(deviation) <= self._deviation_threshold:
- self._was_above_vwap = False
- return Signal(
- strategy=self.name,
- symbol=candle.symbol,
- side=OrderSide.SELL,
- price=candle.close,
- quantity=self._quantity,
- reason=f"VWAP mean reversion from above (VWAP={vwap:.2f}, deviation={deviation:.4f})",
- )
-
- return None
-```
-
-Create `services/strategy-engine/strategies/config/vwap_strategy.yaml`:
-
-```yaml
-deviation_threshold: 0.002
-quantity: "0.01"
-```
-
-- [ ] **Step 4: Run test to verify it passes**
-
-Run: `pytest services/strategy-engine/tests/test_vwap_strategy.py -v`
-Expected: All 5 tests PASS
-
-- [ ] **Step 5: Commit**
-
-```bash
-git add services/strategy-engine/strategies/vwap_strategy.py \
- services/strategy-engine/strategies/config/vwap_strategy.yaml \
- services/strategy-engine/tests/test_vwap_strategy.py
-git commit -m "feat(strategy): add VWAP strategy"
-```
-
----
-
-## Task 13: Volume Profile Strategy
-
-**Files:**
-- Create: `services/strategy-engine/strategies/volume_profile_strategy.py`
-- Create: `services/strategy-engine/strategies/config/volume_profile_strategy.yaml`
-- Test: `services/strategy-engine/tests/test_volume_profile_strategy.py`
-
-- [ ] **Step 1: Write the failing test**
-
-Create `services/strategy-engine/tests/test_volume_profile_strategy.py`:
-
-```python
-"""Tests for Volume Profile strategy."""
-from decimal import Decimal
-from datetime import datetime, timezone
-
-import pytest
-
-from shared.models import Candle, OrderSide
-from strategies.volume_profile_strategy import VolumeProfileStrategy
-
-
-@pytest.fixture
-def strategy():
- s = VolumeProfileStrategy()
- s.configure({
- "lookback_period": 10,
- "num_bins": 5,
- "value_area_pct": 0.7,
- "quantity": "0.01",
- })
- return s
-
-
-def _candle(price: float, volume: float = 10.0) -> Candle:
- return Candle(
- symbol="BTCUSDT",
- timeframe="1m",
- open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
- open=Decimal(str(price)),
- high=Decimal(str(price + 5)),
- low=Decimal(str(price - 5)),
- close=Decimal(str(price)),
- volume=Decimal(str(volume)),
- )
-
-
-def test_volume_profile_warmup_period(strategy):
- assert strategy.warmup_period == 10
-
-
-def test_volume_profile_no_signal_insufficient_data(strategy):
- for p in [100, 101, 102]:
- result = strategy.on_candle(_candle(p))
- assert result is None
-
-
-def test_volume_profile_buy_at_value_area_low(strategy):
- # Concentrate volume at 100, then price drops to bottom of value area
- signals = []
- for _ in range(10):
- strategy.on_candle(_candle(100, 100))
- # Price drops to lower edge
- sig = strategy.on_candle(_candle(90, 10))
- if sig is not None:
- signals.append(sig)
- # Multiple attempts — may need several candles for the signal
- for p in [89, 88, 90]:
- sig = strategy.on_candle(_candle(p, 10))
- if sig is not None:
- signals.append(sig)
- buy_signals = [s for s in signals if s.side == OrderSide.BUY]
- assert len(buy_signals) > 0
-
-
-def test_volume_profile_sell_at_value_area_high(strategy):
- signals = []
- for _ in range(10):
- strategy.on_candle(_candle(100, 100))
- sig = strategy.on_candle(_candle(110, 10))
- if sig is not None:
- signals.append(sig)
- for p in [111, 112, 110]:
- sig = strategy.on_candle(_candle(p, 10))
- if sig is not None:
- signals.append(sig)
- sell_signals = [s for s in signals if s.side == OrderSide.SELL]
- assert len(sell_signals) > 0
-
-
-def test_volume_profile_reset_clears_state(strategy):
- strategy.on_candle(_candle(100))
- strategy.reset()
- assert len(strategy._candles) == 0
-```
-
-- [ ] **Step 2: Run test to verify it fails**
-
-Run: `pytest services/strategy-engine/tests/test_volume_profile_strategy.py -v`
-Expected: FAIL with `ModuleNotFoundError`
-
-- [ ] **Step 3: Implement Volume Profile strategy**
-
-Create `services/strategy-engine/strategies/volume_profile_strategy.py`:
-
-```python
-"""Volume Profile strategy based on Point of Control and Value Area."""
-from collections import deque
-from decimal import Decimal
-
-import numpy as np
-
-from shared.models import Candle, Signal, OrderSide
-from strategies.base import BaseStrategy
-
-
-class VolumeProfileStrategy(BaseStrategy):
- name: str = "volume_profile"
-
- def __init__(self) -> None:
- self._lookback_period: int = 100
- self._num_bins: int = 50
- self._value_area_pct: float = 0.7
- self._quantity: Decimal = Decimal("0.01")
- self._candles: deque[tuple[float, float]] = deque(maxlen=500) # (close, volume)
- self._was_below_va: bool = False
- self._was_above_va: bool = False
-
- @property
- def warmup_period(self) -> int:
- return self._lookback_period
-
- def configure(self, params: dict) -> None:
- self._lookback_period = int(params.get("lookback_period", 100))
- self._num_bins = int(params.get("num_bins", 50))
- self._value_area_pct = float(params.get("value_area_pct", 0.7))
- self._quantity = Decimal(str(params.get("quantity", "0.01")))
-
- def reset(self) -> None:
- self._candles.clear()
- self._was_below_va = False
- self._was_above_va = False
-
- def _compute_value_area(self) -> tuple[float, float, float] | None:
- """Compute POC, value area low, and value area high.
-
- Returns (poc, va_low, va_high) or None if insufficient data.
- """
- if len(self._candles) < self._lookback_period:
- return None
-
- recent = list(self._candles)[-self._lookback_period :]
- prices = [c[0] for c in recent]
- volumes = [c[1] for c in recent]
-
- min_price = min(prices)
- max_price = max(prices)
- if min_price == max_price:
- return None
-
- bin_edges = np.linspace(min_price, max_price, self._num_bins + 1)
- volume_profile = np.zeros(self._num_bins)
-
- for price, vol in zip(prices, volumes):
- bin_idx = int((price - min_price) / (max_price - min_price) * (self._num_bins - 1))
- bin_idx = min(bin_idx, self._num_bins - 1)
- volume_profile[bin_idx] += vol
-
- poc_idx = int(np.argmax(volume_profile))
- poc = (bin_edges[poc_idx] + bin_edges[poc_idx + 1]) / 2
-
- # Expand from POC to capture value_area_pct of total volume
- total_vol = volume_profile.sum()
- if total_vol == 0:
- return None
-
- target_vol = total_vol * self._value_area_pct
- accumulated = volume_profile[poc_idx]
- low_idx = poc_idx
- high_idx = poc_idx
-
- while accumulated < target_vol:
- expand_low = low_idx > 0
- expand_high = high_idx < self._num_bins - 1
-
- if not expand_low and not expand_high:
- break
-
- low_vol = volume_profile[low_idx - 1] if expand_low else 0
- high_vol = volume_profile[high_idx + 1] if expand_high else 0
-
- if low_vol >= high_vol and expand_low:
- low_idx -= 1
- accumulated += volume_profile[low_idx]
- elif expand_high:
- high_idx += 1
- accumulated += volume_profile[high_idx]
- else:
- low_idx -= 1
- accumulated += volume_profile[low_idx]
-
- va_low = bin_edges[low_idx]
- va_high = bin_edges[high_idx + 1]
-
- return poc, va_low, va_high
-
- def on_candle(self, candle: Candle) -> Signal | None:
- self._candles.append((float(candle.close), float(candle.volume)))
-
- result = self._compute_value_area()
- if result is None:
- return None
-
- poc, va_low, va_high = result
- price = float(candle.close)
-
- # Track value area penetration
- if price < va_low:
- self._was_below_va = True
- self._was_above_va = False
- elif price > va_high:
- self._was_above_va = True
- self._was_below_va = False
-
- # BUY: price was below VA and bounced back to VA low (support)
- if self._was_below_va and price >= va_low and price <= poc:
- self._was_below_va = False
- return Signal(
- strategy=self.name,
- symbol=candle.symbol,
- side=OrderSide.BUY,
- price=candle.close,
- quantity=self._quantity,
- reason=f"Volume Profile: bounce at VA low ({va_low:.2f}), POC={poc:.2f}",
- )
-
- # SELL: price was above VA and pulled back to VA high (resistance)
- if self._was_above_va and price <= va_high and price >= poc:
- self._was_above_va = False
- return Signal(
- strategy=self.name,
- symbol=candle.symbol,
- side=OrderSide.SELL,
- price=candle.close,
- quantity=self._quantity,
- reason=f"Volume Profile: rejection at VA high ({va_high:.2f}), POC={poc:.2f}",
- )
-
- return None
-```
-
-Create `services/strategy-engine/strategies/config/volume_profile_strategy.yaml`:
-
-```yaml
-lookback_period: 100
-num_bins: 50
-value_area_pct: 0.7
-quantity: "0.01"
-```
-
-- [ ] **Step 4: Run test to verify it passes**
-
-Run: `pytest services/strategy-engine/tests/test_volume_profile_strategy.py -v`
-Expected: All 5 tests PASS
-
-- [ ] **Step 5: Commit**
-
-```bash
-git add services/strategy-engine/strategies/volume_profile_strategy.py \
- services/strategy-engine/strategies/config/volume_profile_strategy.yaml \
- services/strategy-engine/tests/test_volume_profile_strategy.py
-git commit -m "feat(strategy): add Volume Profile strategy"
-```
-
----
-
-## Task 14: Backtest Detailed Metrics
-
-**Files:**
-- Create: `services/backtester/src/backtester/metrics.py`
-- Test: `services/backtester/tests/test_metrics.py`
-
-- [ ] **Step 1: Write the failing test**
-
-Create `services/backtester/tests/test_metrics.py`:
-
-```python
-"""Tests for detailed backtest metrics."""
-from datetime import datetime, timedelta, timezone
-from decimal import Decimal
-
-import pytest
-
-from backtester.metrics import TradeRecord, compute_detailed_metrics
-
-
-def _trade(entry_price: float, exit_price: float, qty: float = 1.0, days: int = 1) -> tuple[TradeRecord, TradeRecord]:
- entry_time = datetime(2025, 1, 1, tzinfo=timezone.utc)
- exit_time = entry_time + timedelta(days=days)
- entry = TradeRecord(
- time=entry_time,
- symbol="BTCUSDT",
- side="BUY",
- price=Decimal(str(entry_price)),
- quantity=Decimal(str(qty)),
- )
- exit_rec = TradeRecord(
- time=exit_time,
- symbol="BTCUSDT",
- side="SELL",
- price=Decimal(str(exit_price)),
- quantity=Decimal(str(qty)),
- )
- return entry, exit_rec
-
-
-def test_compute_metrics_basic():
- trades = []
- e1, x1 = _trade(100, 110) # +10 profit
- e2, x2 = _trade(100, 95) # -5 loss
- trades = [e1, x1, e2, x2]
-
- metrics = compute_detailed_metrics(
- trades=trades,
- initial_balance=Decimal("1000"),
- final_balance=Decimal("1005"),
- )
-
- assert metrics.total_trades == 4
- assert metrics.winning_trades == 1
- assert metrics.losing_trades == 1
- assert metrics.win_rate == pytest.approx(50.0, rel=0.01)
- assert metrics.total_return == pytest.approx(0.5, rel=0.01)
-
-
-def test_compute_metrics_profit_factor():
- e1, x1 = _trade(100, 120) # +20
- e2, x2 = _trade(100, 90) # -10
- trades = [e1, x1, e2, x2]
-
- metrics = compute_detailed_metrics(
- trades=trades,
- initial_balance=Decimal("1000"),
- final_balance=Decimal("1010"),
- )
-
- assert metrics.profit_factor == pytest.approx(2.0, rel=0.01)
-
-
-def test_compute_metrics_max_drawdown():
- # Three trades: +10, -20, +5 => peak 1010, trough 990
- e1, x1 = _trade(100, 110)
- e2, x2 = _trade(100, 80)
- e3, x3 = _trade(100, 105)
- trades = [e1, x1, e2, x2, e3, x3]
-
- metrics = compute_detailed_metrics(
- trades=trades,
- initial_balance=Decimal("1000"),
- final_balance=Decimal("995"),
- )
-
- assert metrics.max_drawdown > 0
-
-
-def test_compute_metrics_sharpe_ratio():
- e1, x1 = _trade(100, 110, days=1)
- e2, x2 = _trade(100, 105, days=1)
- trades = [e1, x1, e2, x2]
-
- metrics = compute_detailed_metrics(
- trades=trades,
- initial_balance=Decimal("1000"),
- final_balance=Decimal("1015"),
- )
-
- # Sharpe should be a finite number
- assert metrics.sharpe_ratio != 0 or metrics.sharpe_ratio == 0
-
-
-def test_compute_metrics_empty_trades():
- metrics = compute_detailed_metrics(
- trades=[],
- initial_balance=Decimal("1000"),
- final_balance=Decimal("1000"),
- )
- assert metrics.total_trades == 0
- assert metrics.win_rate == 0.0
- assert metrics.sharpe_ratio == 0.0
-```
-
-- [ ] **Step 2: Run test to verify it fails**
-
-Run: `pytest services/backtester/tests/test_metrics.py -v`
-Expected: FAIL with `ModuleNotFoundError: No module named 'backtester.metrics'`
-
-- [ ] **Step 3: Implement detailed metrics**
-
-Create `services/backtester/src/backtester/metrics.py`:
-
-```python
-"""Detailed backtest metrics: Sharpe, Sortino, Calmar, drawdown, trade analysis."""
-import math
-from dataclasses import dataclass, field
-from datetime import datetime, timedelta
-from decimal import Decimal
-
-
-@dataclass
-class TradeRecord:
- time: datetime
- symbol: str
- side: str
- price: Decimal
- quantity: Decimal
-
-
-@dataclass
-class DetailedMetrics:
- # Basic
- total_return: float
- total_trades: int
- winning_trades: int
- losing_trades: int
- win_rate: float
- profit_factor: float
-
- # Risk
- sharpe_ratio: float
- sortino_ratio: float
- calmar_ratio: float
- max_drawdown: float
- max_drawdown_duration: timedelta
-
- # Returns
- monthly_returns: dict[str, float]
- avg_win: float
- avg_loss: float
- largest_win: float
- largest_loss: float
- avg_holding_period: timedelta
-
- # Individual trades
- trade_pairs: list[dict] = field(default_factory=list)
-
-
-def compute_detailed_metrics(
- trades: list[TradeRecord],
- initial_balance: Decimal,
- final_balance: Decimal,
-) -> DetailedMetrics:
- """Compute detailed metrics from a list of trade records."""
- initial = float(initial_balance)
- final = float(final_balance)
-
- total_return = ((final - initial) / initial * 100) if initial > 0 else 0.0
-
- if not trades:
- return DetailedMetrics(
- total_return=total_return,
- total_trades=0,
- winning_trades=0,
- losing_trades=0,
- win_rate=0.0,
- profit_factor=0.0,
- sharpe_ratio=0.0,
- sortino_ratio=0.0,
- calmar_ratio=0.0,
- max_drawdown=0.0,
- max_drawdown_duration=timedelta(0),
- monthly_returns={},
- avg_win=0.0,
- avg_loss=0.0,
- largest_win=0.0,
- largest_loss=0.0,
- avg_holding_period=timedelta(0),
- trade_pairs=[],
- )
-
- # Pair up BUY/SELL trades
- buys: list[TradeRecord] = []
- pairs: list[dict] = []
- pnls: list[float] = []
- holding_periods: list[timedelta] = []
-
- for trade in trades:
- if trade.side == "BUY":
- buys.append(trade)
- elif trade.side == "SELL" and buys:
- buy = buys.pop(0)
- pnl = float(trade.price - buy.price) * float(trade.quantity)
- pnls.append(pnl)
- holding = trade.time - buy.time
- holding_periods.append(holding)
- pairs.append({
- "entry_time": buy.time.isoformat(),
- "exit_time": trade.time.isoformat(),
- "entry_price": float(buy.price),
- "exit_price": float(trade.price),
- "quantity": float(trade.quantity),
- "pnl": pnl,
- "pnl_pct": (pnl / (float(buy.price) * float(trade.quantity))) * 100 if float(buy.price) > 0 else 0,
- "holding_period": str(holding),
- })
-
- wins = [p for p in pnls if p > 0]
- losses = [p for p in pnls if p < 0]
-
- winning_trades = len(wins)
- losing_trades = len(losses)
- win_rate = (winning_trades / len(pnls) * 100) if pnls else 0.0
-
- gross_profit = sum(wins) if wins else 0.0
- gross_loss = abs(sum(losses)) if losses else 0.0
- profit_factor = (gross_profit / gross_loss) if gross_loss > 0 else 0.0
-
- avg_win = (sum(wins) / len(wins)) if wins else 0.0
- avg_loss = (sum(losses) / len(losses)) if losses else 0.0
- largest_win = max(wins) if wins else 0.0
- largest_loss = min(losses) if losses else 0.0
- avg_holding = (
- sum(holding_periods, timedelta(0)) / len(holding_periods)
- if holding_periods
- else timedelta(0)
- )
-
- # Equity curve for drawdown and ratios
- equity = [initial]
- for pnl in pnls:
- equity.append(equity[-1] + pnl)
-
- # Max drawdown
- peak = equity[0]
- max_dd = 0.0
- dd_start = 0
- max_dd_duration = timedelta(0)
- current_dd_start = 0
-
- for i, val in enumerate(equity):
- if val > peak:
- peak = val
- current_dd_start = i
- dd = (peak - val) / peak if peak > 0 else 0
- if dd > max_dd:
- max_dd = dd
-
- # Daily returns approximation (per-trade returns)
- returns = []
- for i in range(1, len(equity)):
- if equity[i - 1] > 0:
- returns.append((equity[i] - equity[i - 1]) / equity[i - 1])
-
- # Sharpe ratio (annualized for crypto: 365 days)
- if returns and len(returns) > 1:
- mean_ret = sum(returns) / len(returns)
- std_ret = math.sqrt(sum((r - mean_ret) ** 2 for r in returns) / (len(returns) - 1))
- sharpe = (mean_ret / std_ret * math.sqrt(365)) if std_ret > 0 else 0.0
-
- # Sortino ratio (downside deviation only)
- downside = [r for r in returns if r < 0]
- if downside:
- downside_std = math.sqrt(sum(r**2 for r in downside) / len(downside))
- sortino = (mean_ret / downside_std * math.sqrt(365)) if downside_std > 0 else 0.0
- else:
- sortino = 0.0
- else:
- sharpe = 0.0
- sortino = 0.0
-
- # Calmar ratio
- annualized_return = total_return / 100 # as fraction
- calmar = (annualized_return / max_dd) if max_dd > 0 else 0.0
-
- # Monthly returns
- monthly: dict[str, float] = {}
- for pair in pairs:
- month_key = pair["exit_time"][:7] # YYYY-MM
- monthly[month_key] = monthly.get(month_key, 0.0) + pair["pnl"]
-
- return DetailedMetrics(
- total_return=total_return,
- total_trades=len(trades),
- winning_trades=winning_trades,
- losing_trades=losing_trades,
- win_rate=win_rate,
- profit_factor=profit_factor,
- sharpe_ratio=sharpe,
- sortino_ratio=sortino,
- calmar_ratio=calmar,
- max_drawdown=max_dd * 100, # as percentage
- max_drawdown_duration=max_dd_duration,
- monthly_returns=monthly,
- avg_win=avg_win,
- avg_loss=avg_loss,
- largest_win=largest_win,
- largest_loss=largest_loss,
- avg_holding_period=avg_holding,
- trade_pairs=pairs,
- )
-```
-
-- [ ] **Step 4: Run test to verify it passes**
-
-Run: `pytest services/backtester/tests/test_metrics.py -v`
-Expected: All 5 tests PASS
-
-- [ ] **Step 5: Commit**
-
-```bash
-git add services/backtester/src/backtester/metrics.py \
- services/backtester/tests/test_metrics.py
-git commit -m "feat(backtester): add detailed metrics (Sharpe, Sortino, drawdown)"
-```
-
----
-
-## Task 15: Integrate Metrics into BacktestEngine + Enhanced Reporter
-
-**Files:**
-- Modify: `services/backtester/src/backtester/simulator.py`
-- Modify: `services/backtester/src/backtester/engine.py`
-- Modify: `services/backtester/src/backtester/reporter.py`
-- Modify: `services/backtester/tests/test_engine.py`
-- Modify: `services/backtester/tests/test_reporter.py`
-
-- [ ] **Step 1: Update simulator to produce TradeRecords**
-
-Add timestamp to `SimulatedTrade` in `services/backtester/src/backtester/simulator.py`. Replace file:
-
-```python
-"""Simulated order executor for backtesting."""
-from dataclasses import dataclass, field
-from datetime import datetime, timezone
-from decimal import Decimal
-
-from shared.models import OrderSide, Signal
-
-
-@dataclass
-class SimulatedTrade:
- symbol: str
- side: OrderSide
- price: Decimal
- quantity: Decimal
- balance_after: Decimal
- timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
-
-
-class OrderSimulator:
- """Simulates order execution against a paper balance."""
-
- def __init__(self, initial_balance: Decimal) -> None:
- self.balance: Decimal = initial_balance
- self.positions: dict[str, Decimal] = {}
- self.trades: list[SimulatedTrade] = []
-
- def execute(self, signal: Signal, timestamp: datetime | None = None) -> bool:
- """Execute a signal. Returns True if the trade was accepted."""
- ts = timestamp or datetime.now(timezone.utc)
-
- if signal.side == OrderSide.BUY:
- cost = signal.price * signal.quantity
- if cost > self.balance:
- return False
- self.balance -= cost
- self.positions[signal.symbol] = (
- self.positions.get(signal.symbol, Decimal("0")) + signal.quantity
- )
- trade_quantity = signal.quantity
- else: # SELL
- current_position = self.positions.get(signal.symbol, Decimal("0"))
- if current_position <= Decimal("0"):
- return False
- trade_quantity = min(signal.quantity, current_position)
- proceeds = signal.price * trade_quantity
- self.balance += proceeds
- self.positions[signal.symbol] = current_position - trade_quantity
-
- self.trades.append(
- SimulatedTrade(
- symbol=signal.symbol,
- side=signal.side,
- price=signal.price,
- quantity=trade_quantity,
- balance_after=self.balance,
- timestamp=ts,
- )
- )
- return True
-```
-
-- [ ] **Step 2: Update engine to compute DetailedMetrics**
-
-Replace `services/backtester/src/backtester/engine.py`:
-
-```python
-"""Backtesting engine that runs strategies against historical candle data."""
-from dataclasses import dataclass, field
-from decimal import Decimal
-from typing import Protocol
-
-from shared.models import Candle, Signal
-
-from backtester.metrics import DetailedMetrics, TradeRecord, compute_detailed_metrics
-from backtester.simulator import OrderSimulator, SimulatedTrade
-
-
-class StrategyProtocol(Protocol):
- name: str
-
- def on_candle(self, candle: Candle) -> Signal | None: ...
- def configure(self, params: dict) -> None: ...
- def reset(self) -> None: ...
-
-
-@dataclass
-class BacktestResult:
- strategy_name: str
- symbol: str
- total_trades: int
- initial_balance: Decimal
- final_balance: Decimal
- profit: Decimal
- profit_pct: Decimal
- trades: list[SimulatedTrade] = field(default_factory=list)
- detailed: DetailedMetrics | None = None
-
- @property
- def win_rate(self) -> float:
- buy_prices: list[Decimal] = []
- wins = 0
- total_pairs = 0
-
- for trade in self.trades:
- if trade.side.value == "BUY":
- buy_prices.append(trade.price)
- else:
- if buy_prices:
- buy_price = buy_prices.pop(0)
- total_pairs += 1
- if trade.price > buy_price:
- wins += 1
-
- if total_pairs == 0:
- return 0.0
- return wins / total_pairs * 100
-
-
-class BacktestEngine:
- """Runs a strategy against historical candles using a simulated order executor."""
-
- def __init__(self, strategy: StrategyProtocol, initial_balance: Decimal) -> None:
- self._strategy = strategy
- self._initial_balance = initial_balance
-
- def run(self, candles: list[Candle]) -> BacktestResult:
- """Run the backtest over a list of candles and return a result."""
- simulator = OrderSimulator(self._initial_balance)
-
- for candle in candles:
- signal = self._strategy.on_candle(candle)
- if signal is not None:
- simulator.execute(signal, timestamp=candle.open_time)
-
- final_balance = simulator.balance
- if candles:
- last_price = candles[-1].close
- for symbol, qty in simulator.positions.items():
- if qty > Decimal("0"):
- final_balance += qty * last_price
-
- profit = final_balance - self._initial_balance
- if self._initial_balance != Decimal("0"):
- profit_pct = (profit / self._initial_balance) * Decimal("100")
- else:
- profit_pct = Decimal("0")
-
- # Build TradeRecords for detailed metrics
- trade_records = [
- TradeRecord(
- time=t.timestamp,
- symbol=t.symbol,
- side=t.side.value,
- price=t.price,
- quantity=t.quantity,
- )
- for t in simulator.trades
- ]
-
- detailed = compute_detailed_metrics(
- trades=trade_records,
- initial_balance=self._initial_balance,
- final_balance=final_balance,
- )
-
- return BacktestResult(
- strategy_name=self._strategy.name,
- symbol=candles[0].symbol if candles else "",
- total_trades=len(simulator.trades),
- initial_balance=self._initial_balance,
- final_balance=final_balance,
- profit=profit,
- profit_pct=profit_pct,
- trades=simulator.trades,
- detailed=detailed,
- )
-```
-
-- [ ] **Step 3: Update reporter with rich tables and export**
-
-Replace `services/backtester/src/backtester/reporter.py`:
-
-```python
-"""Report formatting for backtest results using rich tables."""
-import csv
-import io
-import json
-
-from rich.console import Console
-from rich.table import Table
-
-from backtester.engine import BacktestResult
-
-
-def format_report(result: BacktestResult) -> str:
- """Format a backtest result into a rich text report."""
- console = Console(file=io.StringIO(), force_terminal=True)
-
- # Summary table
- summary = Table(title="BACKTEST REPORT", show_lines=True)
- summary.add_column("Metric", style="bold")
- summary.add_column("Value", justify="right")
-
- summary.add_row("Strategy", result.strategy_name)
- summary.add_row("Symbol", result.symbol)
- summary.add_row("Initial Balance", f"{result.initial_balance:.2f}")
- summary.add_row("Final Balance", f"{result.final_balance:.2f}")
- summary.add_row("Profit/Loss", f"{result.profit:.2f}")
- summary.add_row("Profit %", f"{result.profit_pct:.2f}%")
- summary.add_row("Total Trades", str(result.total_trades))
- summary.add_row("Win Rate", f"{result.win_rate:.2f}%")
-
- if result.detailed:
- d = result.detailed
- summary.add_row("Sharpe Ratio", f"{d.sharpe_ratio:.3f}")
- summary.add_row("Sortino Ratio", f"{d.sortino_ratio:.3f}")
- summary.add_row("Calmar Ratio", f"{d.calmar_ratio:.3f}")
- summary.add_row("Max Drawdown", f"{d.max_drawdown:.2f}%")
- summary.add_row("Profit Factor", f"{d.profit_factor:.2f}")
- summary.add_row("Avg Win", f"{d.avg_win:.2f}")
- summary.add_row("Avg Loss", f"{d.avg_loss:.2f}")
- summary.add_row("Largest Win", f"{d.largest_win:.2f}")
- summary.add_row("Largest Loss", f"{d.largest_loss:.2f}")
- summary.add_row("Avg Holding Period", str(d.avg_holding_period))
-
- console.print(summary)
-
- # Monthly returns table
- if result.detailed and result.detailed.monthly_returns:
- monthly = Table(title="MONTHLY RETURNS")
- monthly.add_column("Month")
- monthly.add_column("PnL", justify="right")
- for month, pnl in sorted(result.detailed.monthly_returns.items()):
- style = "green" if pnl >= 0 else "red"
- monthly.add_row(month, f"{pnl:.2f}", style=style)
- console.print(monthly)
-
- output = console.file.getvalue()
- return output
-
-
-def export_csv(result: BacktestResult) -> str:
- """Export trade pairs as CSV."""
- if not result.detailed or not result.detailed.trade_pairs:
- return ""
-
- output = io.StringIO()
- writer = csv.DictWriter(
- output,
- fieldnames=["entry_time", "exit_time", "entry_price", "exit_price", "quantity", "pnl", "pnl_pct", "holding_period"],
- )
- writer.writeheader()
- for pair in result.detailed.trade_pairs:
- writer.writerow(pair)
- return output.getvalue()
-
-
-def export_json(result: BacktestResult) -> str:
- """Export detailed metrics as JSON."""
- if not result.detailed:
- return "{}"
-
- d = result.detailed
- data = {
- "total_return": d.total_return,
- "total_trades": d.total_trades,
- "winning_trades": d.winning_trades,
- "losing_trades": d.losing_trades,
- "win_rate": d.win_rate,
- "profit_factor": d.profit_factor,
- "sharpe_ratio": d.sharpe_ratio,
- "sortino_ratio": d.sortino_ratio,
- "calmar_ratio": d.calmar_ratio,
- "max_drawdown": d.max_drawdown,
- "monthly_returns": d.monthly_returns,
- "avg_win": d.avg_win,
- "avg_loss": d.avg_loss,
- "largest_win": d.largest_win,
- "largest_loss": d.largest_loss,
- "trade_pairs": d.trade_pairs,
- }
- return json.dumps(data, indent=2, default=str)
-```
-
-- [ ] **Step 4: Run all backtester tests**
-
-Run: `pytest services/backtester/tests/ -v`
-Expected: All tests PASS (existing tests may need minor updates for `timestamp` parameter)
-
-- [ ] **Step 5: Fix any broken tests**
-
-If `test_simulator.py` fails due to `timestamp` parameter, the existing tests should still work since `timestamp` defaults to `datetime.now()`. If `test_reporter.py` fails, update it to check for rich output:
-
-Update `services/backtester/tests/test_reporter.py`:
-
-```python
-"""Tests for backtest report formatter."""
-from decimal import Decimal
-
-from backtester.engine import BacktestResult
-from backtester.reporter import format_report, export_csv, export_json
-
-
-def test_format_report_contains_key_metrics():
- result = BacktestResult(
- strategy_name="rsi",
- symbol="BTCUSDT",
- total_trades=10,
- initial_balance=Decimal("10000"),
- final_balance=Decimal("10500"),
- profit=Decimal("500"),
- profit_pct=Decimal("5"),
- )
- report = format_report(result)
- assert "rsi" in report
- assert "BTCUSDT" in report
- assert "10000" in report or "10,000" in report
-
-
-def test_export_csv_empty_when_no_detailed():
- result = BacktestResult(
- strategy_name="rsi",
- symbol="BTCUSDT",
- total_trades=0,
- initial_balance=Decimal("10000"),
- final_balance=Decimal("10000"),
- profit=Decimal("0"),
- profit_pct=Decimal("0"),
- )
- assert export_csv(result) == ""
-
-
-def test_export_json_empty_when_no_detailed():
- result = BacktestResult(
- strategy_name="rsi",
- symbol="BTCUSDT",
- total_trades=0,
- initial_balance=Decimal("10000"),
- final_balance=Decimal("10000"),
- profit=Decimal("0"),
- profit_pct=Decimal("0"),
- )
- assert export_json(result) == "{}"
-```
-
-- [ ] **Step 6: Run all backtester tests again**
-
-Run: `pytest services/backtester/tests/ -v`
-Expected: All tests PASS
-
-- [ ] **Step 7: Commit**
-
-```bash
-git add services/backtester/src/backtester/simulator.py \
- services/backtester/src/backtester/engine.py \
- services/backtester/src/backtester/reporter.py \
- services/backtester/tests/test_engine.py \
- services/backtester/tests/test_reporter.py
-git commit -m "feat(backtester): integrate detailed metrics and rich reporter"
-```
-
----
-
-## Task 16: Final Integration Test
-
-**Files:**
-- All
-
-- [ ] **Step 1: Run the full test suite**
-
-Run: `pytest -v`
-Expected: All tests PASS
-
-- [ ] **Step 2: Run linting**
-
-Run: `make lint`
-Expected: No errors
-
-- [ ] **Step 3: Fix any lint issues**
-
-Run: `make format` if needed, then `make lint` again.
-
-- [ ] **Step 4: Verify plugin loader finds all 7 strategies**
-
-Run: `python -c "from pathlib import Path; from strategy_engine.plugin_loader import load_strategies; s = load_strategies(Path('services/strategy-engine/strategies')); print([x.name for x in s])"`
-Expected: `['bollinger', 'ema_crossover', 'grid', 'macd', 'rsi', 'volume_profile', 'vwap']`
-
-- [ ] **Step 5: Final commit if any fixes were made**
-
-```bash
-git add -A
-git commit -m "fix: resolve lint issues and final integration fixes"
-```