# Crypto Trading Platform Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Binance 현물 암호화폐 자동매매 플랫폼을 마이크로서비스 아키텍처로 구축한다. **Architecture:** 6개 독립 서비스(data-collector, strategy-engine, order-executor, portfolio-manager, backtester)가 Redis Streams로 통신하고, PostgreSQL에 데이터를 저장한다. shared 라이브러리가 공통 모델/이벤트/DB 연결을 제공하며, Click 기반 CLI로 전체를 제어한다. **Tech Stack:** Python 3.12, ccxt, Redis Streams, PostgreSQL, asyncpg, pandas, pandas-ta, Click, pydantic-settings, Docker Compose, pytest --- ## File Structure ``` trading/ ├── services/ │ ├── data-collector/ │ │ ├── src/data_collector/__init__.py │ │ ├── src/data_collector/main.py │ │ ├── src/data_collector/binance_ws.py │ │ ├── src/data_collector/binance_rest.py │ │ ├── src/data_collector/storage.py │ │ ├── src/data_collector/config.py │ │ ├── tests/test_binance_rest.py │ │ ├── tests/test_storage.py │ │ ├── tests/test_main.py │ │ ├── Dockerfile │ │ └── pyproject.toml │ ├── strategy-engine/ │ │ ├── src/strategy_engine/__init__.py │ │ ├── src/strategy_engine/main.py │ │ ├── src/strategy_engine/engine.py │ │ ├── src/strategy_engine/plugin_loader.py │ │ ├── src/strategy_engine/config.py │ │ ├── strategies/base.py │ │ ├── strategies/rsi_strategy.py │ │ ├── strategies/grid_strategy.py │ │ ├── tests/test_engine.py │ │ ├── tests/test_plugin_loader.py │ │ ├── tests/test_rsi_strategy.py │ │ ├── tests/test_grid_strategy.py │ │ ├── Dockerfile │ │ └── pyproject.toml │ ├── order-executor/ │ │ ├── src/order_executor/__init__.py │ │ ├── src/order_executor/main.py │ │ ├── src/order_executor/executor.py │ │ ├── src/order_executor/risk_manager.py │ │ ├── src/order_executor/config.py │ │ ├── tests/test_executor.py │ │ ├── tests/test_risk_manager.py │ │ ├── Dockerfile │ │ └── pyproject.toml │ ├── portfolio-manager/ │ │ ├── src/portfolio_manager/__init__.py │ │ ├── src/portfolio_manager/main.py │ │ ├── src/portfolio_manager/portfolio.py │ │ ├── src/portfolio_manager/pnl.py │ │ ├── src/portfolio_manager/config.py │ │ ├── tests/test_portfolio.py │ │ ├── tests/test_pnl.py │ │ ├── Dockerfile │ │ └── pyproject.toml │ └── backtester/ │ ├── src/backtester/__init__.py │ ├── src/backtester/main.py │ ├── src/backtester/engine.py │ ├── src/backtester/simulator.py │ ├── src/backtester/reporter.py │ ├── src/backtester/config.py │ ├── tests/test_engine.py │ ├── tests/test_simulator.py │ ├── tests/test_reporter.py │ ├── Dockerfile │ └── pyproject.toml ├── shared/ │ ├── src/shared/__init__.py │ ├── src/shared/models.py │ ├── src/shared/events.py │ ├── src/shared/broker.py │ ├── src/shared/db.py │ ├── src/shared/config.py │ ├── tests/test_models.py │ ├── tests/test_events.py │ ├── tests/test_broker.py │ ├── tests/test_db.py │ └── pyproject.toml ├── cli/ │ ├── src/trading_cli/__init__.py │ ├── src/trading_cli/main.py │ ├── src/trading_cli/commands/data.py │ ├── src/trading_cli/commands/trade.py │ ├── src/trading_cli/commands/backtest.py │ ├── src/trading_cli/commands/portfolio.py │ ├── src/trading_cli/commands/strategy.py │ ├── src/trading_cli/commands/service.py │ ├── tests/test_cli_data.py │ ├── tests/test_cli_trade.py │ └── pyproject.toml ├── docker-compose.yml ├── .env.example ├── Makefile └── pyproject.toml (workspace root) ``` --- ## Task 1: Project Scaffolding **Files:** - Create: `pyproject.toml` (workspace root) - Create: `.env.example` - Create: `docker-compose.yml` - Create: `Makefile` - Create: `.gitignore` - Create: `shared/pyproject.toml` - [ ] **Step 1: Initialize git repo** ```bash cd /home/si/Private/repos/trading git init ``` - [ ] **Step 2: Create .gitignore** Create `.gitignore`: ```gitignore __pycache__/ *.py[cod] *$py.class *.egg-info/ dist/ build/ .eggs/ *.egg .venv/ venv/ env/ .env .mypy_cache/ .pytest_cache/ .ruff_cache/ *.log .DS_Store ``` - [ ] **Step 3: Create workspace root pyproject.toml** Create `pyproject.toml`: ```toml [project] name = "trading-platform" version = "0.1.0" description = "Binance spot crypto trading platform" requires-python = ">=3.12" [tool.pytest.ini_options] asyncio_mode = "auto" testpaths = ["shared/tests", "services/*/tests", "cli/tests"] [tool.ruff] target-version = "py312" line-length = 100 ``` - [ ] **Step 4: Create .env.example** Create `.env.example`: ```env BINANCE_API_KEY= BINANCE_API_SECRET= REDIS_URL=redis://localhost:6379 DATABASE_URL=postgresql://trading:trading@localhost:5432/trading LOG_LEVEL=INFO RISK_MAX_POSITION_SIZE=0.1 RISK_STOP_LOSS_PCT=5 RISK_DAILY_LOSS_LIMIT_PCT=10 DRY_RUN=true ``` - [ ] **Step 5: Create docker-compose.yml** Create `docker-compose.yml`: ```yaml services: redis: image: redis:7-alpine ports: - "6379:6379" volumes: - redis_data:/data healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 5s timeout: 3s retries: 5 postgres: image: postgres:16-alpine ports: - "5432:5432" environment: POSTGRES_USER: trading POSTGRES_PASSWORD: trading POSTGRES_DB: trading volumes: - postgres_data:/var/lib/postgresql/data healthcheck: test: ["CMD-LINE", "pg_isready", "-U", "trading"] interval: 5s timeout: 3s retries: 5 data-collector: build: context: . dockerfile: services/data-collector/Dockerfile env_file: .env depends_on: redis: condition: service_healthy postgres: condition: service_healthy restart: unless-stopped strategy-engine: build: context: . dockerfile: services/strategy-engine/Dockerfile env_file: .env depends_on: redis: condition: service_healthy postgres: condition: service_healthy restart: unless-stopped order-executor: build: context: . dockerfile: services/order-executor/Dockerfile env_file: .env depends_on: redis: condition: service_healthy postgres: condition: service_healthy restart: unless-stopped portfolio-manager: build: context: . dockerfile: services/portfolio-manager/Dockerfile env_file: .env depends_on: redis: condition: service_healthy postgres: condition: service_healthy restart: unless-stopped volumes: redis_data: postgres_data: ``` - [ ] **Step 6: Create Makefile** Create `Makefile`: ```makefile .PHONY: infra up down logs test lint infra: docker compose up -d redis postgres up: docker compose up -d down: docker compose down logs: docker compose logs -f $(service) test: pytest -v lint: ruff check . ruff format --check . format: ruff check --fix . ruff format . ``` - [ ] **Step 7: Create shared/pyproject.toml** Create `shared/pyproject.toml`: ```toml [project] name = "trading-shared" version = "0.1.0" description = "Shared models, events, and utilities for trading platform" requires-python = ">=3.12" dependencies = [ "pydantic>=2.0", "pydantic-settings>=2.0", "redis>=5.0", "asyncpg>=0.29", ] [project.optional-dependencies] dev = [ "pytest>=8.0", "pytest-asyncio>=0.23", "ruff>=0.4", ] [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["src/shared"] ``` - [ ] **Step 8: Commit scaffolding** ```bash git add . git commit -m "chore: project scaffolding with docker-compose, makefile, shared package" ``` --- ## Task 2: Shared — Config & Models **Files:** - Create: `shared/src/shared/__init__.py` - Create: `shared/src/shared/config.py` - Create: `shared/src/shared/models.py` - Create: `shared/tests/test_models.py` - [ ] **Step 1: Write failing test for config** Create `shared/tests/test_models.py`: ```python from shared.config import Settings def test_settings_defaults(): settings = Settings( binance_api_key="test_key", binance_api_secret="test_secret", ) assert settings.redis_url == "redis://localhost:6379" assert settings.database_url == "postgresql://trading:trading@localhost:5432/trading" assert settings.log_level == "INFO" assert settings.dry_run is True ``` - [ ] **Step 2: Run test to verify it fails** ```bash cd /home/si/Private/repos/trading pip install -e shared[dev] pytest shared/tests/test_models.py::test_settings_defaults -v ``` Expected: FAIL — `ModuleNotFoundError: No module named 'shared'` - [ ] **Step 3: Implement config** Create `shared/src/shared/__init__.py`: ```python ``` Create `shared/src/shared/config.py`: ```python from pydantic_settings import BaseSettings class Settings(BaseSettings): binance_api_key: str binance_api_secret: str redis_url: str = "redis://localhost:6379" database_url: str = "postgresql://trading:trading@localhost:5432/trading" log_level: str = "INFO" risk_max_position_size: float = 0.1 risk_stop_loss_pct: float = 5.0 risk_daily_loss_limit_pct: float = 10.0 dry_run: bool = True model_config = {"env_file": ".env", "env_file_encoding": "utf-8"} ``` - [ ] **Step 4: Run test to verify it passes** ```bash pytest shared/tests/test_models.py::test_settings_defaults -v ``` Expected: PASS - [ ] **Step 5: Write failing tests for models** Append to `shared/tests/test_models.py`: ```python from datetime import datetime, timezone from decimal import Decimal from shared.models import Candle, Signal, Order, Position, OrderSide, OrderType, OrderStatus def test_candle_creation(): candle = Candle( symbol="BTCUSDT", timeframe="1m", open_time=datetime(2026, 1, 1, tzinfo=timezone.utc), open=Decimal("50000"), high=Decimal("50100"), low=Decimal("49900"), close=Decimal("50050"), volume=Decimal("1.5"), ) assert candle.symbol == "BTCUSDT" assert candle.close == Decimal("50050") def test_signal_creation(): signal = Signal( strategy="rsi_strategy", symbol="BTCUSDT", side=OrderSide.BUY, price=Decimal("50000"), quantity=Decimal("0.01"), reason="RSI below 30", ) assert signal.side == OrderSide.BUY assert signal.reason == "RSI below 30" def test_order_creation(): order = Order( symbol="BTCUSDT", signal_id="sig_123", side=OrderSide.BUY, type=OrderType.MARKET, price=Decimal("50000"), quantity=Decimal("0.01"), ) assert order.status == OrderStatus.PENDING assert order.filled_at is None assert order.id is not None def test_position_unrealized_pnl(): pos = Position( symbol="BTCUSDT", quantity=Decimal("0.1"), avg_entry_price=Decimal("50000"), current_price=Decimal("51000"), ) assert pos.unrealized_pnl == Decimal("100") # 0.1 * (51000 - 50000) ``` - [ ] **Step 6: Run tests to verify they fail** ```bash pytest shared/tests/test_models.py -v ``` Expected: FAIL — `ModuleNotFoundError: No module named 'shared.models'` - [ ] **Step 7: Implement models** Create `shared/src/shared/models.py`: ```python from datetime import datetime, timezone from decimal import Decimal from enum import StrEnum from uuid import uuid4 from pydantic import BaseModel, Field class OrderSide(StrEnum): BUY = "BUY" SELL = "SELL" class OrderType(StrEnum): MARKET = "MARKET" LIMIT = "LIMIT" class OrderStatus(StrEnum): PENDING = "PENDING" FILLED = "FILLED" CANCELLED = "CANCELLED" FAILED = "FAILED" class Candle(BaseModel): symbol: str timeframe: str open_time: datetime open: Decimal high: Decimal low: Decimal close: Decimal volume: Decimal class Signal(BaseModel): id: str = Field(default_factory=lambda: str(uuid4())) strategy: str symbol: str side: OrderSide price: Decimal quantity: Decimal reason: str created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) class Order(BaseModel): id: str = Field(default_factory=lambda: str(uuid4())) signal_id: str symbol: str side: OrderSide type: OrderType price: Decimal quantity: Decimal status: OrderStatus = OrderStatus.PENDING created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) filled_at: datetime | None = None class Position(BaseModel): symbol: str quantity: Decimal avg_entry_price: Decimal current_price: Decimal @property def unrealized_pnl(self) -> Decimal: return self.quantity * (self.current_price - self.avg_entry_price) ``` - [ ] **Step 8: Run tests to verify they pass** ```bash pytest shared/tests/test_models.py -v ``` Expected: All PASS - [ ] **Step 9: Commit** ```bash git add shared/ git commit -m "feat(shared): add config settings and core data models" ``` --- ## Task 3: Shared — Events & Redis Broker **Files:** - Create: `shared/src/shared/events.py` - Create: `shared/src/shared/broker.py` - Create: `shared/tests/test_events.py` - Create: `shared/tests/test_broker.py` - [ ] **Step 1: Write failing tests for events** Create `shared/tests/test_events.py`: ```python import json from decimal import Decimal from datetime import datetime, timezone from shared.events import EventType, Event, CandleEvent, SignalEvent, OrderEvent from shared.models import Candle, Signal, Order, OrderSide, OrderType def test_candle_event_serialize(): candle = Candle( symbol="BTCUSDT", timeframe="1m", open_time=datetime(2026, 1, 1, tzinfo=timezone.utc), open=Decimal("50000"), high=Decimal("50100"), low=Decimal("49900"), close=Decimal("50050"), volume=Decimal("1.5"), ) event = CandleEvent(data=candle) payload = event.to_dict() assert payload["type"] == EventType.CANDLE assert payload["data"]["symbol"] == "BTCUSDT" def test_candle_event_deserialize(): candle = Candle( symbol="BTCUSDT", timeframe="1m", open_time=datetime(2026, 1, 1, tzinfo=timezone.utc), open=Decimal("50000"), high=Decimal("50100"), low=Decimal("49900"), close=Decimal("50050"), volume=Decimal("1.5"), ) event = CandleEvent(data=candle) payload = event.to_dict() restored = Event.from_dict(payload) assert isinstance(restored, CandleEvent) assert restored.data.symbol == "BTCUSDT" def test_signal_event_serialize(): signal = Signal( strategy="rsi", symbol="BTCUSDT", side=OrderSide.BUY, price=Decimal("50000"), quantity=Decimal("0.01"), reason="RSI < 30", ) event = SignalEvent(data=signal) payload = event.to_dict() assert payload["type"] == EventType.SIGNAL ``` - [ ] **Step 2: Run tests to verify they fail** ```bash pytest shared/tests/test_events.py -v ``` Expected: FAIL - [ ] **Step 3: Implement events** Create `shared/src/shared/events.py`: ```python from __future__ import annotations import json from enum import StrEnum from typing import Any from pydantic import BaseModel from shared.models import Candle, Signal, Order class EventType(StrEnum): CANDLE = "candle" SIGNAL = "signal" ORDER = "order" class CandleEvent(BaseModel): type: EventType = EventType.CANDLE data: Candle def to_dict(self) -> dict[str, Any]: return json.loads(self.model_dump_json()) @classmethod def from_raw(cls, raw: dict[str, Any]) -> CandleEvent: return cls.model_validate(raw) class SignalEvent(BaseModel): type: EventType = EventType.SIGNAL data: Signal def to_dict(self) -> dict[str, Any]: return json.loads(self.model_dump_json()) @classmethod def from_raw(cls, raw: dict[str, Any]) -> SignalEvent: return cls.model_validate(raw) class OrderEvent(BaseModel): type: EventType = EventType.ORDER data: Order def to_dict(self) -> dict[str, Any]: return json.loads(self.model_dump_json()) @classmethod def from_raw(cls, raw: dict[str, Any]) -> OrderEvent: return cls.model_validate(raw) _EVENT_MAP = { EventType.CANDLE: CandleEvent, EventType.SIGNAL: SignalEvent, EventType.ORDER: OrderEvent, } class Event: @staticmethod def from_dict(data: dict[str, Any]) -> CandleEvent | SignalEvent | OrderEvent: event_type = EventType(data["type"]) cls = _EVENT_MAP[event_type] return cls.from_raw(data) ``` - [ ] **Step 4: Run tests to verify they pass** ```bash pytest shared/tests/test_events.py -v ``` Expected: All PASS - [ ] **Step 5: Write failing tests for broker** Create `shared/tests/test_broker.py`: ```python import asyncio import pytest from unittest.mock import AsyncMock, MagicMock, patch from shared.broker import RedisBroker @pytest.fixture def mock_redis(): redis = AsyncMock() redis.xadd = AsyncMock(return_value=b"1234-0") redis.xread = AsyncMock(return_value=[]) redis.close = AsyncMock() return redis @pytest.mark.asyncio async def test_broker_publish(mock_redis): broker = RedisBroker.__new__(RedisBroker) broker._redis = mock_redis await broker.publish("candles.BTCUSDT", {"type": "candle", "data": "test"}) mock_redis.xadd.assert_called_once() call_args = mock_redis.xadd.call_args assert call_args[0][0] == "candles.BTCUSDT" @pytest.mark.asyncio async def test_broker_subscribe_returns_messages(mock_redis): mock_redis.xread = AsyncMock(return_value=[ ("candles.BTCUSDT", [ (b"1234-0", {b"payload": b'{"type":"candle","data":"test"}'}), ]) ]) broker = RedisBroker.__new__(RedisBroker) broker._redis = mock_redis messages = await broker.read("candles.BTCUSDT", last_id="0-0", count=1) assert len(messages) == 1 assert messages[0]["type"] == "candle" ``` - [ ] **Step 6: Run tests to verify they fail** ```bash pytest shared/tests/test_broker.py -v ``` Expected: FAIL - [ ] **Step 7: Implement broker** Create `shared/src/shared/broker.py`: ```python from __future__ import annotations import json import redis.asyncio as redis class RedisBroker: def __init__(self, redis_url: str): self._redis = redis.from_url(redis_url, decode_responses=False) async def publish(self, stream: str, data: dict) -> str: payload = json.dumps(data) msg_id = await self._redis.xadd(stream, {"payload": payload.encode()}) return msg_id async def read( self, stream: str, last_id: str = "$", count: int = 10, block: int = 0 ) -> list[dict]: results = await self._redis.xread({stream: last_id}, count=count, block=block) messages = [] for _stream_name, entries in results: for _msg_id, fields in entries: payload = fields[b"payload"] messages.append(json.loads(payload)) return messages async def close(self): await self._redis.close() ``` - [ ] **Step 8: Run tests to verify they pass** ```bash pytest shared/tests/test_broker.py -v ``` Expected: All PASS - [ ] **Step 9: Commit** ```bash git add shared/ git commit -m "feat(shared): add event system and Redis Streams broker" ``` --- ## Task 4: Shared — Database Layer **Files:** - Create: `shared/src/shared/db.py` - Create: `shared/tests/test_db.py` - [ ] **Step 1: Write failing tests for DB** Create `shared/tests/test_db.py`: ```python import pytest from unittest.mock import AsyncMock, patch, MagicMock from shared.db import Database @pytest.mark.asyncio async def test_db_init_sql_creates_tables(): db = Database.__new__(Database) db._pool = AsyncMock() mock_conn = AsyncMock() db._pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) db._pool.acquire.return_value.__aexit__ = AsyncMock(return_value=False) await db.init_tables() mock_conn.execute.assert_called() sql = mock_conn.execute.call_args[0][0] assert "candles" in sql assert "signals" in sql assert "orders" in sql assert "trades" in sql assert "positions" in sql assert "portfolio_snapshots" in sql @pytest.mark.asyncio async def test_db_insert_candle(): db = Database.__new__(Database) db._pool = AsyncMock() mock_conn = AsyncMock() db._pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) db._pool.acquire.return_value.__aexit__ = AsyncMock(return_value=False) from datetime import datetime, timezone from decimal import Decimal from shared.models import Candle candle = Candle( symbol="BTCUSDT", timeframe="1m", open_time=datetime(2026, 1, 1, tzinfo=timezone.utc), open=Decimal("50000"), high=Decimal("50100"), low=Decimal("49900"), close=Decimal("50050"), volume=Decimal("1.5"), ) await db.insert_candle(candle) mock_conn.execute.assert_called_once() sql = mock_conn.execute.call_args[0][0] assert "INSERT INTO candles" in sql ``` - [ ] **Step 2: Run tests to verify they fail** ```bash pytest shared/tests/test_db.py -v ``` Expected: FAIL - [ ] **Step 3: Implement database layer** Create `shared/src/shared/db.py`: ```python from __future__ import annotations import asyncpg from shared.models import Candle, Order, Signal _INIT_SQL = """ CREATE TABLE IF NOT EXISTS candles ( symbol TEXT NOT NULL, timeframe TEXT NOT NULL, open_time TIMESTAMPTZ NOT NULL, open NUMERIC NOT NULL, high NUMERIC NOT NULL, low NUMERIC NOT NULL, close NUMERIC NOT NULL, volume NUMERIC NOT NULL, PRIMARY KEY (symbol, timeframe, open_time) ); CREATE TABLE IF NOT EXISTS signals ( id TEXT PRIMARY KEY, strategy TEXT NOT NULL, symbol TEXT NOT NULL, side TEXT NOT NULL, price NUMERIC NOT NULL, quantity NUMERIC NOT NULL, reason TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL ); CREATE TABLE IF NOT EXISTS orders ( id TEXT PRIMARY KEY, signal_id TEXT REFERENCES signals(id), symbol TEXT NOT NULL, side TEXT NOT NULL, type TEXT NOT NULL, price NUMERIC NOT NULL, quantity NUMERIC NOT NULL, status TEXT NOT NULL DEFAULT 'PENDING', created_at TIMESTAMPTZ NOT NULL, filled_at TIMESTAMPTZ ); CREATE TABLE IF NOT EXISTS trades ( id TEXT PRIMARY KEY, order_id TEXT REFERENCES orders(id), symbol TEXT NOT NULL, side TEXT NOT NULL, price NUMERIC NOT NULL, quantity NUMERIC NOT NULL, fee NUMERIC NOT NULL DEFAULT 0, traded_at TIMESTAMPTZ NOT NULL ); CREATE TABLE IF NOT EXISTS positions ( symbol TEXT PRIMARY KEY, quantity NUMERIC NOT NULL, avg_entry_price NUMERIC NOT NULL, current_price NUMERIC NOT NULL, updated_at TIMESTAMPTZ NOT NULL ); CREATE TABLE IF NOT EXISTS portfolio_snapshots ( id SERIAL PRIMARY KEY, total_value NUMERIC NOT NULL, realized_pnl NUMERIC NOT NULL, unrealized_pnl NUMERIC NOT NULL, snapshot_at TIMESTAMPTZ NOT NULL ); """ class Database: def __init__(self, dsn: str): self._dsn = dsn self._pool: asyncpg.Pool | None = None async def connect(self): self._pool = await asyncpg.create_pool(self._dsn) async def close(self): if self._pool: await self._pool.close() async def init_tables(self): async with self._pool.acquire() as conn: await conn.execute(_INIT_SQL) async def insert_candle(self, candle: Candle): sql = """ INSERT INTO candles (symbol, timeframe, open_time, open, high, low, close, volume) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (symbol, timeframe, open_time) DO NOTHING """ async with self._pool.acquire() as conn: await conn.execute( sql, candle.symbol, candle.timeframe, candle.open_time, candle.open, candle.high, candle.low, candle.close, candle.volume, ) async def insert_signal(self, signal: Signal): sql = """ INSERT INTO signals (id, strategy, symbol, side, price, quantity, reason, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) """ async with self._pool.acquire() as conn: await conn.execute( sql, signal.id, signal.strategy, signal.symbol, signal.side.value, signal.price, signal.quantity, signal.reason, signal.created_at, ) async def insert_order(self, order: Order): sql = """ INSERT INTO orders (id, signal_id, symbol, side, type, price, quantity, status, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) """ async with self._pool.acquire() as conn: await conn.execute( sql, order.id, order.signal_id, order.symbol, order.side.value, order.type.value, order.price, order.quantity, order.status.value, order.created_at, ) async def update_order_status(self, order_id: str, status: str, filled_at=None): sql = "UPDATE orders SET status = $1, filled_at = $2 WHERE id = $3" async with self._pool.acquire() as conn: await conn.execute(sql, status, filled_at, order_id) async def get_candles(self, symbol: str, timeframe: str, limit: int = 500) -> list[dict]: sql = """ SELECT * FROM candles WHERE symbol = $1 AND timeframe = $2 ORDER BY open_time DESC LIMIT $3 """ async with self._pool.acquire() as conn: rows = await conn.fetch(sql, symbol, timeframe, limit) return [dict(r) for r in rows] ``` - [ ] **Step 4: Run tests to verify they pass** ```bash pytest shared/tests/test_db.py -v ``` Expected: All PASS - [ ] **Step 5: Commit** ```bash git add shared/ git commit -m "feat(shared): add database layer with table init and CRUD operations" ``` --- ## Task 5: Data Collector Service **Files:** - Create: `services/data-collector/pyproject.toml` - Create: `services/data-collector/Dockerfile` - Create: `services/data-collector/src/data_collector/__init__.py` - Create: `services/data-collector/src/data_collector/config.py` - Create: `services/data-collector/src/data_collector/binance_rest.py` - Create: `services/data-collector/src/data_collector/binance_ws.py` - Create: `services/data-collector/src/data_collector/storage.py` - Create: `services/data-collector/src/data_collector/main.py` - Create: `services/data-collector/tests/test_binance_rest.py` - Create: `services/data-collector/tests/test_storage.py` - [ ] **Step 1: Create pyproject.toml** Create `services/data-collector/pyproject.toml`: ```toml [project] name = "data-collector" version = "0.1.0" description = "Binance market data collector service" requires-python = ">=3.12" dependencies = [ "ccxt>=4.0", "websockets>=12.0", "trading-shared", ] [project.optional-dependencies] dev = [ "pytest>=8.0", "pytest-asyncio>=0.23", ] [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["src/data_collector"] ``` - [ ] **Step 2: Write failing tests for binance_rest** Create `services/data-collector/tests/test_binance_rest.py`: ```python import pytest from unittest.mock import AsyncMock, patch, MagicMock from datetime import datetime, timezone from decimal import Decimal from data_collector.binance_rest import fetch_historical_candles @pytest.mark.asyncio async def test_fetch_historical_candles_parses_response(): mock_exchange = MagicMock() mock_exchange.fetch_ohlcv = AsyncMock(return_value=[ [1704067200000, 50000.0, 50100.0, 49900.0, 50050.0, 1.5], [1704067260000, 50050.0, 50200.0, 50000.0, 50150.0, 2.0], ]) candles = await fetch_historical_candles( exchange=mock_exchange, symbol="BTC/USDT", timeframe="1m", since=datetime(2026, 1, 1, tzinfo=timezone.utc), limit=2, ) assert len(candles) == 2 assert candles[0].symbol == "BTCUSDT" assert candles[0].close == Decimal("50050.0") assert candles[1].volume == Decimal("2.0") @pytest.mark.asyncio async def test_fetch_historical_candles_empty_response(): mock_exchange = MagicMock() mock_exchange.fetch_ohlcv = AsyncMock(return_value=[]) candles = await fetch_historical_candles( exchange=mock_exchange, symbol="BTC/USDT", timeframe="1m", since=datetime(2026, 1, 1, tzinfo=timezone.utc), limit=100, ) assert candles == [] ``` - [ ] **Step 3: Run tests to verify they fail** ```bash cd /home/si/Private/repos/trading pip install -e services/data-collector[dev] pytest services/data-collector/tests/test_binance_rest.py -v ``` Expected: FAIL - [ ] **Step 4: Implement binance_rest** Create `services/data-collector/src/data_collector/__init__.py`: ```python ``` Create `services/data-collector/src/data_collector/binance_rest.py`: ```python from __future__ import annotations from datetime import datetime, timezone from decimal import Decimal from shared.models import Candle async def fetch_historical_candles( exchange, symbol: str, timeframe: str, since: datetime, limit: int = 500, ) -> list[Candle]: since_ms = int(since.timestamp() * 1000) ohlcv = await exchange.fetch_ohlcv(symbol, timeframe, since=since_ms, limit=limit) normalized_symbol = symbol.replace("/", "") candles = [] for row in ohlcv: ts, o, h, l, c, v = row candles.append( Candle( symbol=normalized_symbol, timeframe=timeframe, open_time=datetime.fromtimestamp(ts / 1000, tz=timezone.utc), open=Decimal(str(o)), high=Decimal(str(h)), low=Decimal(str(l)), close=Decimal(str(c)), volume=Decimal(str(v)), ) ) return candles ``` - [ ] **Step 5: Run tests to verify they pass** ```bash pytest services/data-collector/tests/test_binance_rest.py -v ``` Expected: All PASS - [ ] **Step 6: Write failing tests for storage** Create `services/data-collector/tests/test_storage.py`: ```python import pytest from unittest.mock import AsyncMock from datetime import datetime, timezone from decimal import Decimal from shared.models import Candle from data_collector.storage import CandleStorage @pytest.fixture def mock_db(): db = AsyncMock() db.insert_candle = AsyncMock() return db @pytest.fixture def mock_broker(): broker = AsyncMock() broker.publish = AsyncMock() return broker @pytest.fixture def sample_candle(): return Candle( symbol="BTCUSDT", timeframe="1m", open_time=datetime(2026, 1, 1, tzinfo=timezone.utc), open=Decimal("50000"), high=Decimal("50100"), low=Decimal("49900"), close=Decimal("50050"), volume=Decimal("1.5"), ) @pytest.mark.asyncio async def test_storage_saves_to_db_and_publishes(mock_db, mock_broker, sample_candle): storage = CandleStorage(db=mock_db, broker=mock_broker) await storage.store(sample_candle) mock_db.insert_candle.assert_called_once_with(sample_candle) mock_broker.publish.assert_called_once() call_args = mock_broker.publish.call_args assert call_args[0][0] == "candles.BTCUSDT" @pytest.mark.asyncio async def test_storage_batch_store(mock_db, mock_broker, sample_candle): storage = CandleStorage(db=mock_db, broker=mock_broker) candles = [sample_candle, sample_candle] await storage.store_batch(candles) assert mock_db.insert_candle.call_count == 2 assert mock_broker.publish.call_count == 2 ``` - [ ] **Step 7: Run tests to verify they fail** ```bash pytest services/data-collector/tests/test_storage.py -v ``` Expected: FAIL - [ ] **Step 8: Implement storage** Create `services/data-collector/src/data_collector/storage.py`: ```python from __future__ import annotations from shared.broker import RedisBroker from shared.db import Database from shared.events import CandleEvent from shared.models import Candle class CandleStorage: def __init__(self, db: Database, broker: RedisBroker): self._db = db self._broker = broker async def store(self, candle: Candle): await self._db.insert_candle(candle) event = CandleEvent(data=candle) await self._broker.publish(f"candles.{candle.symbol}", event.to_dict()) async def store_batch(self, candles: list[Candle]): for candle in candles: await self.store(candle) ``` - [ ] **Step 9: Run tests to verify they pass** ```bash pytest services/data-collector/tests/test_storage.py -v ``` Expected: All PASS - [ ] **Step 10: Implement config, binance_ws, and main** Create `services/data-collector/src/data_collector/config.py`: ```python from shared.config import Settings class CollectorConfig(Settings): symbols: list[str] = ["BTC/USDT"] timeframes: list[str] = ["1m"] ``` Create `services/data-collector/src/data_collector/binance_ws.py`: ```python from __future__ import annotations import asyncio import json import logging from datetime import datetime, timezone from decimal import Decimal import websockets from shared.models import Candle logger = logging.getLogger(__name__) BINANCE_WS_URL = "wss://stream.binance.com:9443/ws" class BinanceWebSocket: def __init__(self, symbols: list[str], timeframe: str, on_candle): self._symbols = symbols self._timeframe = timeframe self._on_candle = on_candle self._running = False async def start(self): streams = [ f"{s.lower().replace('/', '')}@kline_{self._timeframe}" for s in self._symbols ] url = f"{BINANCE_WS_URL}/{'/'.join(streams)}" self._running = True logger.info(f"Connecting to Binance WS: {streams}") while self._running: try: async with websockets.connect(url) as ws: async for raw in ws: if not self._running: break msg = json.loads(raw) if "k" in msg: candle = self._parse_kline(msg["k"]) if candle: await self._on_candle(candle) except websockets.ConnectionClosed: logger.warning("WebSocket disconnected, reconnecting in 5s...") await asyncio.sleep(5) except Exception as e: logger.error(f"WebSocket error: {e}, reconnecting in 5s...") await asyncio.sleep(5) def stop(self): self._running = False def _parse_kline(self, k: dict) -> Candle | None: if not k.get("x"): # only closed candles return None return Candle( symbol=k["s"], timeframe=k["i"], open_time=datetime.fromtimestamp(k["t"] / 1000, tz=timezone.utc), open=Decimal(k["o"]), high=Decimal(k["h"]), low=Decimal(k["l"]), close=Decimal(k["c"]), volume=Decimal(k["v"]), ) ``` Create `services/data-collector/src/data_collector/main.py`: ```python from __future__ import annotations import asyncio import logging import ccxt.async_support as ccxt from shared.broker import RedisBroker from shared.db import Database from data_collector.binance_ws import BinanceWebSocket from data_collector.config import CollectorConfig from data_collector.storage import CandleStorage logger = logging.getLogger(__name__) async def run(): config = CollectorConfig() logging.basicConfig(level=config.log_level) db = Database(config.database_url) await db.connect() await db.init_tables() broker = RedisBroker(config.redis_url) storage = CandleStorage(db=db, broker=broker) ws = BinanceWebSocket( symbols=config.symbols, timeframe=config.timeframes[0], on_candle=storage.store, ) logger.info(f"Starting data collector: symbols={config.symbols}") try: await ws.start() finally: ws.stop() await broker.close() await db.close() def main(): asyncio.run(run()) if __name__ == "__main__": main() ``` - [ ] **Step 11: Create Dockerfile** Create `services/data-collector/Dockerfile`: ```dockerfile FROM python:3.12-slim WORKDIR /app COPY shared/ shared/ RUN pip install --no-cache-dir ./shared COPY services/data-collector/ services/data-collector/ RUN pip install --no-cache-dir ./services/data-collector CMD ["python", "-m", "data_collector.main"] ``` - [ ] **Step 12: Commit** ```bash git add services/data-collector/ git commit -m "feat(data-collector): add Binance REST/WS data collection with storage pipeline" ``` --- ## Task 6: Strategy Engine Service **Files:** - Create: `services/strategy-engine/pyproject.toml` - Create: `services/strategy-engine/Dockerfile` - Create: `services/strategy-engine/src/strategy_engine/__init__.py` - Create: `services/strategy-engine/src/strategy_engine/config.py` - Create: `services/strategy-engine/strategies/base.py` - Create: `services/strategy-engine/strategies/rsi_strategy.py` - Create: `services/strategy-engine/strategies/grid_strategy.py` - Create: `services/strategy-engine/src/strategy_engine/plugin_loader.py` - Create: `services/strategy-engine/src/strategy_engine/engine.py` - Create: `services/strategy-engine/src/strategy_engine/main.py` - Create: `services/strategy-engine/tests/test_rsi_strategy.py` - Create: `services/strategy-engine/tests/test_grid_strategy.py` - Create: `services/strategy-engine/tests/test_plugin_loader.py` - Create: `services/strategy-engine/tests/test_engine.py` - [ ] **Step 1: Create pyproject.toml** Create `services/strategy-engine/pyproject.toml`: ```toml [project] name = "strategy-engine" version = "0.1.0" description = "Plugin-based strategy execution engine" requires-python = ">=3.12" dependencies = [ "pandas>=2.0", "pandas-ta>=0.3", "trading-shared", ] [project.optional-dependencies] dev = [ "pytest>=8.0", "pytest-asyncio>=0.23", ] [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["src/strategy_engine"] ``` - [ ] **Step 2: Implement base strategy** Create `services/strategy-engine/src/strategy_engine/__init__.py`: ```python ``` Create `services/strategy-engine/strategies/base.py`: ```python from __future__ import annotations from abc import ABC, abstractmethod from shared.models import Candle, Signal class BaseStrategy(ABC): name: str = "base" @abstractmethod def on_candle(self, candle: Candle) -> Signal | None: pass @abstractmethod def configure(self, params: dict) -> None: pass def reset(self) -> None: pass ``` - [ ] **Step 3: Write failing tests for RSI strategy** Create `services/strategy-engine/tests/test_rsi_strategy.py`: ```python from datetime import datetime, timezone from decimal import Decimal from shared.models import Candle, OrderSide def make_candle(close: float, idx: int = 0) -> Candle: return Candle( symbol="BTCUSDT", timeframe="1m", open_time=datetime(2026, 1, 1, minute=idx, tzinfo=timezone.utc), open=Decimal(str(close)), high=Decimal(str(close + 10)), low=Decimal(str(close - 10)), close=Decimal(str(close)), volume=Decimal("1.0"), ) def test_rsi_strategy_no_signal_insufficient_data(): from strategy_engine.strategies.rsi_strategy import RsiStrategy strategy = RsiStrategy() strategy.configure({"period": 14, "oversold": 30, "overbought": 70, "quantity": 0.01}) signal = strategy.on_candle(make_candle(50000)) assert signal is None def test_rsi_strategy_buy_signal_on_oversold(): from strategy_engine.strategies.rsi_strategy import RsiStrategy strategy = RsiStrategy() strategy.configure({"period": 14, "oversold": 30, "overbought": 70, "quantity": 0.01}) # Feed declining prices to push RSI below 30 prices = [50000 - i * 100 for i in range(20)] signal = None for i, p in enumerate(prices): signal = strategy.on_candle(make_candle(p, idx=i)) # After sustained drop, RSI should be oversold → BUY signal if signal is not None: assert signal.side == OrderSide.BUY assert signal.strategy == "rsi" ``` - [ ] **Step 4: Run tests to verify they fail** ```bash pip install -e services/strategy-engine[dev] pytest services/strategy-engine/tests/test_rsi_strategy.py -v ``` Expected: FAIL - [ ] **Step 5: Implement RSI strategy** Create `services/strategy-engine/strategies/rsi_strategy.py`: ```python from __future__ import annotations from collections import deque from decimal import Decimal import pandas as pd import pandas_ta as ta from shared.models import Candle, Signal, OrderSide from strategies.base import BaseStrategy class RsiStrategy(BaseStrategy): name = "rsi" def __init__(self): self._closes: deque[float] = deque(maxlen=200) self._period: int = 14 self._oversold: float = 30 self._overbought: float = 70 self._quantity: Decimal = Decimal("0.01") def configure(self, params: dict) -> None: self._period = params.get("period", 14) self._oversold = params.get("oversold", 30) self._overbought = params.get("overbought", 70) self._quantity = Decimal(str(params.get("quantity", 0.01))) def on_candle(self, candle: Candle) -> Signal | None: self._closes.append(float(candle.close)) if len(self._closes) < self._period + 1: return None series = pd.Series(list(self._closes)) rsi = ta.rsi(series, length=self._period) current_rsi = rsi.iloc[-1] if current_rsi < self._oversold: return Signal( strategy=self.name, symbol=candle.symbol, side=OrderSide.BUY, price=candle.close, quantity=self._quantity, reason=f"RSI={current_rsi:.1f} < {self._oversold}", ) elif current_rsi > self._overbought: return Signal( strategy=self.name, symbol=candle.symbol, side=OrderSide.SELL, price=candle.close, quantity=self._quantity, reason=f"RSI={current_rsi:.1f} > {self._overbought}", ) return None def reset(self) -> None: self._closes.clear() ``` - [ ] **Step 6: Run tests to verify they pass** ```bash pytest services/strategy-engine/tests/test_rsi_strategy.py -v ``` Expected: All PASS - [ ] **Step 7: Write failing tests for grid strategy** Create `services/strategy-engine/tests/test_grid_strategy.py`: ```python from datetime import datetime, timezone from decimal import Decimal from shared.models import Candle, OrderSide def make_candle(close: float, idx: int = 0) -> Candle: return Candle( symbol="BTCUSDT", timeframe="1m", open_time=datetime(2026, 1, 1, minute=idx, tzinfo=timezone.utc), open=Decimal(str(close)), high=Decimal(str(close + 10)), low=Decimal(str(close - 10)), close=Decimal(str(close)), volume=Decimal("1.0"), ) def test_grid_strategy_buy_at_lower_grid(): from strategy_engine.strategies.grid_strategy import GridStrategy strategy = GridStrategy() strategy.configure({ "lower_price": 48000, "upper_price": 52000, "grid_count": 5, "quantity": 0.01, }) # Price at grid level should trigger BUY signal = strategy.on_candle(make_candle(48000)) # First candle sets reference, no signal signal = strategy.on_candle(make_candle(49000, idx=1)) # Moving down through a grid level signal = strategy.on_candle(make_candle(48000, idx=2)) if signal is not None: assert signal.side == OrderSide.BUY def test_grid_strategy_sell_at_upper_grid(): from strategy_engine.strategies.grid_strategy import GridStrategy strategy = GridStrategy() strategy.configure({ "lower_price": 48000, "upper_price": 52000, "grid_count": 5, "quantity": 0.01, }) signal = strategy.on_candle(make_candle(50000)) signal = strategy.on_candle(make_candle(51000, idx=1)) signal = strategy.on_candle(make_candle(52000, idx=2)) if signal is not None: assert signal.side == OrderSide.SELL def test_grid_strategy_no_signal_in_same_zone(): from strategy_engine.strategies.grid_strategy import GridStrategy strategy = GridStrategy() strategy.configure({ "lower_price": 48000, "upper_price": 52000, "grid_count": 5, "quantity": 0.01, }) strategy.on_candle(make_candle(50000)) signal = strategy.on_candle(make_candle(50050, idx=1)) assert signal is None # same grid zone, no signal ``` - [ ] **Step 8: Run tests to verify they fail** ```bash pytest services/strategy-engine/tests/test_grid_strategy.py -v ``` Expected: FAIL - [ ] **Step 9: Implement grid strategy** Create `services/strategy-engine/strategies/grid_strategy.py`: ```python from __future__ import annotations from decimal import Decimal from shared.models import Candle, Signal, OrderSide from strategies.base import BaseStrategy class GridStrategy(BaseStrategy): name = "grid" def __init__(self): self._lower: float = 0 self._upper: float = 0 self._grid_count: int = 5 self._quantity: Decimal = Decimal("0.01") self._grid_levels: list[float] = [] self._last_zone: int | None = None def configure(self, params: dict) -> None: self._lower = float(params["lower_price"]) self._upper = float(params["upper_price"]) self._grid_count = params.get("grid_count", 5) self._quantity = Decimal(str(params.get("quantity", 0.01))) step = (self._upper - self._lower) / self._grid_count self._grid_levels = [self._lower + step * i for i in range(self._grid_count + 1)] def on_candle(self, candle: Candle) -> Signal | None: price = float(candle.close) current_zone = self._get_zone(price) if self._last_zone is None: self._last_zone = current_zone return None signal = None if current_zone < self._last_zone: signal = Signal( strategy=self.name, symbol=candle.symbol, side=OrderSide.BUY, price=candle.close, quantity=self._quantity, reason=f"Price crossed grid down: zone {self._last_zone}->{current_zone}", ) elif current_zone > self._last_zone: signal = Signal( strategy=self.name, symbol=candle.symbol, side=OrderSide.SELL, price=candle.close, quantity=self._quantity, reason=f"Price crossed grid up: zone {self._last_zone}->{current_zone}", ) self._last_zone = current_zone return signal def _get_zone(self, price: float) -> int: for i, level in enumerate(self._grid_levels): if price < level: return i return len(self._grid_levels) def reset(self) -> None: self._last_zone = None ``` - [ ] **Step 10: Run tests to verify they pass** ```bash pytest services/strategy-engine/tests/test_grid_strategy.py -v ``` Expected: All PASS - [ ] **Step 11: Write failing tests for plugin_loader** Create `services/strategy-engine/tests/test_plugin_loader.py`: ```python import pytest from pathlib import Path from strategy_engine.plugin_loader import load_strategies def test_load_strategies_finds_rsi_and_grid(): strategies_dir = Path(__file__).parent.parent / "strategies" loaded = load_strategies(strategies_dir) names = {s.name for s in loaded} assert "rsi" in names assert "grid" in names def test_load_strategies_skips_base(): strategies_dir = Path(__file__).parent.parent / "strategies" loaded = load_strategies(strategies_dir) names = {s.name for s in loaded} assert "base" not in names ``` - [ ] **Step 12: Run tests to verify they fail** ```bash pytest services/strategy-engine/tests/test_plugin_loader.py -v ``` Expected: FAIL - [ ] **Step 13: Implement plugin_loader** Create `services/strategy-engine/src/strategy_engine/plugin_loader.py`: ```python from __future__ import annotations import importlib.util import logging from pathlib import Path from strategies.base import BaseStrategy logger = logging.getLogger(__name__) def load_strategies(strategies_dir: Path) -> list[BaseStrategy]: loaded = [] for path in strategies_dir.glob("*.py"): if path.stem.startswith("_") or path.stem == "base": continue spec = importlib.util.spec_from_file_location(path.stem, path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) for attr_name in dir(module): attr = getattr(module, attr_name) if ( isinstance(attr, type) and issubclass(attr, BaseStrategy) and attr is not BaseStrategy ): instance = attr() loaded.append(instance) logger.info(f"Loaded strategy: {instance.name}") return loaded ``` - [ ] **Step 14: Run tests to verify they pass** ```bash pytest services/strategy-engine/tests/test_plugin_loader.py -v ``` Expected: All PASS - [ ] **Step 15: Write failing tests for engine** Create `services/strategy-engine/tests/test_engine.py`: ```python import pytest from unittest.mock import AsyncMock, MagicMock from datetime import datetime, timezone from decimal import Decimal from shared.models import Candle, OrderSide from shared.events import CandleEvent from strategy_engine.engine import StrategyEngine def make_candle_event() -> dict: candle = Candle( symbol="BTCUSDT", timeframe="1m", open_time=datetime(2026, 1, 1, tzinfo=timezone.utc), open=Decimal("50000"), high=Decimal("50100"), low=Decimal("49900"), close=Decimal("50050"), volume=Decimal("1.0"), ) return CandleEvent(data=candle).to_dict() @pytest.mark.asyncio async def test_engine_dispatches_candle_to_strategies(): mock_strategy = MagicMock() mock_strategy.name = "test" mock_strategy.on_candle.return_value = None mock_broker = AsyncMock() mock_broker.read = AsyncMock(return_value=[make_candle_event()]) engine = StrategyEngine(broker=mock_broker, strategies=[mock_strategy]) await engine.process_once(stream="candles.BTCUSDT", last_id="0-0") mock_strategy.on_candle.assert_called_once() @pytest.mark.asyncio async def test_engine_publishes_signal_when_strategy_returns_one(): from shared.models import Signal mock_signal = Signal( strategy="test", symbol="BTCUSDT", side=OrderSide.BUY, price=Decimal("50000"), quantity=Decimal("0.01"), reason="test reason", ) mock_strategy = MagicMock() mock_strategy.name = "test" mock_strategy.on_candle.return_value = mock_signal mock_broker = AsyncMock() mock_broker.read = AsyncMock(return_value=[make_candle_event()]) mock_broker.publish = AsyncMock() engine = StrategyEngine(broker=mock_broker, strategies=[mock_strategy]) await engine.process_once(stream="candles.BTCUSDT", last_id="0-0") mock_broker.publish.assert_called_once() call_args = mock_broker.publish.call_args assert call_args[0][0] == "signals" ``` - [ ] **Step 16: Run tests to verify they fail** ```bash pytest services/strategy-engine/tests/test_engine.py -v ``` Expected: FAIL - [ ] **Step 17: Implement engine** Create `services/strategy-engine/src/strategy_engine/engine.py`: ```python from __future__ import annotations import logging from shared.broker import RedisBroker from shared.events import Event, SignalEvent from shared.models import Signal from strategies.base import BaseStrategy logger = logging.getLogger(__name__) class StrategyEngine: def __init__(self, broker: RedisBroker, strategies: list[BaseStrategy]): self._broker = broker self._strategies = strategies async def process_once(self, stream: str, last_id: str) -> str: messages = await self._broker.read(stream, last_id=last_id, count=10, block=1000) for msg in messages: event = Event.from_dict(msg) candle = event.data for strategy in self._strategies: signal = strategy.on_candle(candle) if signal is not None: logger.info(f"Signal from {strategy.name}: {signal.side} {signal.symbol}") await self._publish_signal(signal) return last_id async def _publish_signal(self, signal: Signal): event = SignalEvent(data=signal) await self._broker.publish("signals", event.to_dict()) ``` - [ ] **Step 18: Run tests to verify they pass** ```bash pytest services/strategy-engine/tests/test_engine.py -v ``` Expected: All PASS - [ ] **Step 19: Implement config and main** Create `services/strategy-engine/src/strategy_engine/config.py`: ```python from shared.config import Settings class StrategyConfig(Settings): symbols: list[str] = ["BTC/USDT"] timeframes: list[str] = ["1m"] strategy_params: dict = {} ``` Create `services/strategy-engine/src/strategy_engine/main.py`: ```python from __future__ import annotations import asyncio import logging from pathlib import Path from shared.broker import RedisBroker from strategy_engine.config import StrategyConfig from strategy_engine.engine import StrategyEngine from strategy_engine.plugin_loader import load_strategies logger = logging.getLogger(__name__) async def run(): config = StrategyConfig() logging.basicConfig(level=config.log_level) broker = RedisBroker(config.redis_url) strategies_dir = Path(__file__).parent.parent.parent / "strategies" strategies = load_strategies(strategies_dir) for s in strategies: params = config.strategy_params.get(s.name, {}) s.configure(params) engine = StrategyEngine(broker=broker, strategies=strategies) symbols = [s.replace("/", "") for s in config.symbols] logger.info(f"Starting strategy engine: strategies={[s.name for s in strategies]}") last_ids = {sym: "0-0" for sym in symbols} try: while True: for sym in symbols: stream = f"candles.{sym}" last_ids[sym] = await engine.process_once(stream, last_ids[sym]) finally: await broker.close() def main(): asyncio.run(run()) if __name__ == "__main__": main() ``` - [ ] **Step 20: Create Dockerfile** Create `services/strategy-engine/Dockerfile`: ```dockerfile FROM python:3.12-slim WORKDIR /app COPY shared/ shared/ RUN pip install --no-cache-dir ./shared COPY services/strategy-engine/ services/strategy-engine/ RUN pip install --no-cache-dir ./services/strategy-engine CMD ["python", "-m", "strategy_engine.main"] ``` - [ ] **Step 21: Commit** ```bash git add services/strategy-engine/ git commit -m "feat(strategy-engine): add plugin-based strategy engine with RSI and grid strategies" ``` --- ## Task 7: Order Executor Service **Files:** - Create: `services/order-executor/pyproject.toml` - Create: `services/order-executor/Dockerfile` - Create: `services/order-executor/src/order_executor/__init__.py` - Create: `services/order-executor/src/order_executor/config.py` - Create: `services/order-executor/src/order_executor/risk_manager.py` - Create: `services/order-executor/src/order_executor/executor.py` - Create: `services/order-executor/src/order_executor/main.py` - Create: `services/order-executor/tests/test_risk_manager.py` - Create: `services/order-executor/tests/test_executor.py` - [ ] **Step 1: Create pyproject.toml** Create `services/order-executor/pyproject.toml`: ```toml [project] name = "order-executor" version = "0.1.0" description = "Order execution service with risk management" requires-python = ">=3.12" dependencies = [ "ccxt>=4.0", "trading-shared", ] [project.optional-dependencies] dev = [ "pytest>=8.0", "pytest-asyncio>=0.23", ] [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["src/order_executor"] ``` - [ ] **Step 2: Write failing tests for risk_manager** Create `services/order-executor/tests/test_risk_manager.py`: ```python import pytest from decimal import Decimal from shared.models import Signal, OrderSide from order_executor.risk_manager import RiskManager, RiskCheckResult @pytest.fixture def risk_manager(): return RiskManager( max_position_size=Decimal("0.1"), stop_loss_pct=Decimal("5"), daily_loss_limit_pct=Decimal("10"), ) def make_signal(side=OrderSide.BUY, quantity="0.01", price="50000") -> Signal: return Signal( strategy="test", symbol="BTCUSDT", side=side, price=Decimal(price), quantity=Decimal(quantity), reason="test", ) def test_risk_check_passes_normal_order(risk_manager): signal = make_signal() balance = Decimal("10000") positions = {} daily_pnl = Decimal("0") result = risk_manager.check(signal, balance, positions, daily_pnl) assert result.allowed is True def test_risk_check_rejects_exceeding_position_size(risk_manager): signal = make_signal(quantity="5") # 5 BTC * 50000 = 250000 > 10% of balance balance = Decimal("10000") positions = {} daily_pnl = Decimal("0") result = risk_manager.check(signal, balance, positions, daily_pnl) assert result.allowed is False assert "position size" in result.reason.lower() def test_risk_check_rejects_daily_loss_exceeded(risk_manager): signal = make_signal() balance = Decimal("10000") positions = {} daily_pnl = Decimal("-1100") # -11% > -10% limit result = risk_manager.check(signal, balance, positions, daily_pnl) assert result.allowed is False assert "daily loss" in result.reason.lower() def test_risk_check_rejects_insufficient_balance(risk_manager): signal = make_signal(quantity="0.01", price="50000") # cost = 500 balance = Decimal("100") # not enough positions = {} daily_pnl = Decimal("0") result = risk_manager.check(signal, balance, positions, daily_pnl) assert result.allowed is False assert "balance" in result.reason.lower() ``` - [ ] **Step 3: Run tests to verify they fail** ```bash pip install -e services/order-executor[dev] pytest services/order-executor/tests/test_risk_manager.py -v ``` Expected: FAIL - [ ] **Step 4: Implement risk_manager** Create `services/order-executor/src/order_executor/__init__.py`: ```python ``` Create `services/order-executor/src/order_executor/risk_manager.py`: ```python from __future__ import annotations from dataclasses import dataclass from decimal import Decimal from shared.models import Signal, OrderSide @dataclass class RiskCheckResult: allowed: bool reason: str = "" class RiskManager: def __init__( self, max_position_size: Decimal, stop_loss_pct: Decimal, daily_loss_limit_pct: Decimal, ): self._max_position_size = max_position_size self._stop_loss_pct = stop_loss_pct self._daily_loss_limit_pct = daily_loss_limit_pct def check( self, signal: Signal, balance: Decimal, positions: dict[str, Decimal], daily_pnl: Decimal, ) -> RiskCheckResult: # Check daily loss limit daily_loss_pct = (daily_pnl / balance) * 100 if balance > 0 else Decimal("0") if daily_loss_pct < -self._daily_loss_limit_pct: return RiskCheckResult( allowed=False, reason=f"Daily loss limit exceeded: {daily_loss_pct:.1f}%", ) if signal.side == OrderSide.BUY: order_cost = signal.price * signal.quantity # Check sufficient balance if order_cost > balance: return RiskCheckResult( allowed=False, reason=f"Insufficient balance: need {order_cost}, have {balance}", ) # Check max position size current_position_value = positions.get(signal.symbol, Decimal("0")) * signal.price new_position_value = current_position_value + order_cost position_ratio = new_position_value / balance if balance > 0 else Decimal("999") if position_ratio > self._max_position_size: return RiskCheckResult( allowed=False, reason=f"Position size exceeded: {position_ratio:.2f} > {self._max_position_size}", ) return RiskCheckResult(allowed=True) ``` - [ ] **Step 5: Run tests to verify they pass** ```bash pytest services/order-executor/tests/test_risk_manager.py -v ``` Expected: All PASS - [ ] **Step 6: Write failing tests for executor** Create `services/order-executor/tests/test_executor.py`: ```python import pytest from unittest.mock import AsyncMock, MagicMock from decimal import Decimal from shared.models import Signal, OrderSide, OrderStatus from order_executor.executor import OrderExecutor from order_executor.risk_manager import RiskCheckResult def make_signal() -> Signal: return Signal( strategy="test", symbol="BTCUSDT", side=OrderSide.BUY, price=Decimal("50000"), quantity=Decimal("0.01"), reason="test", ) @pytest.mark.asyncio async def test_executor_places_order_when_risk_passes(): mock_exchange = MagicMock() mock_exchange.create_order = AsyncMock(return_value={ "id": "binance_123", "status": "closed", "filled": 0.01, "price": 50000, }) mock_exchange.fetch_balance = AsyncMock(return_value={ "USDT": {"free": 10000}, }) mock_risk = MagicMock() mock_risk.check.return_value = RiskCheckResult(allowed=True) mock_broker = AsyncMock() mock_db = AsyncMock() executor = OrderExecutor( exchange=mock_exchange, risk_manager=mock_risk, broker=mock_broker, db=mock_db, dry_run=False, ) signal = make_signal() order = await executor.execute(signal) assert order is not None assert order.status == OrderStatus.FILLED mock_exchange.create_order.assert_called_once() @pytest.mark.asyncio async def test_executor_rejects_when_risk_fails(): mock_exchange = MagicMock() mock_exchange.fetch_balance = AsyncMock(return_value={ "USDT": {"free": 10000}, }) mock_risk = MagicMock() mock_risk.check.return_value = RiskCheckResult(allowed=False, reason="too risky") mock_broker = AsyncMock() mock_db = AsyncMock() executor = OrderExecutor( exchange=mock_exchange, risk_manager=mock_risk, broker=mock_broker, db=mock_db, dry_run=False, ) signal = make_signal() order = await executor.execute(signal) assert order is None mock_exchange.create_order.assert_not_called() @pytest.mark.asyncio async def test_executor_dry_run_does_not_call_exchange(): mock_exchange = MagicMock() mock_exchange.fetch_balance = AsyncMock(return_value={ "USDT": {"free": 10000}, }) mock_risk = MagicMock() mock_risk.check.return_value = RiskCheckResult(allowed=True) mock_broker = AsyncMock() mock_db = AsyncMock() executor = OrderExecutor( exchange=mock_exchange, risk_manager=mock_risk, broker=mock_broker, db=mock_db, dry_run=True, ) signal = make_signal() order = await executor.execute(signal) assert order is not None assert order.status == OrderStatus.FILLED mock_exchange.create_order.assert_not_called() ``` - [ ] **Step 7: Run tests to verify they fail** ```bash pytest services/order-executor/tests/test_executor.py -v ``` Expected: FAIL - [ ] **Step 8: Implement executor** Create `services/order-executor/src/order_executor/executor.py`: ```python from __future__ import annotations import logging from datetime import datetime, timezone from decimal import Decimal from shared.broker import RedisBroker from shared.db import Database from shared.events import OrderEvent from shared.models import Order, OrderSide, OrderStatus, OrderType, Signal from order_executor.risk_manager import RiskManager logger = logging.getLogger(__name__) class OrderExecutor: def __init__( self, exchange, risk_manager: RiskManager, broker: RedisBroker, db: Database, dry_run: bool = True, ): self._exchange = exchange self._risk = risk_manager self._broker = broker self._db = db self._dry_run = dry_run async def execute(self, signal: Signal) -> Order | None: balance_info = await self._exchange.fetch_balance() balance = Decimal(str(balance_info.get("USDT", {}).get("free", 0))) positions: dict[str, Decimal] = {} daily_pnl = Decimal("0") result = self._risk.check(signal, balance, positions, daily_pnl) if not result.allowed: logger.warning(f"Risk check failed: {result.reason}") return None order = Order( signal_id=signal.id, symbol=signal.symbol, side=signal.side, type=OrderType.MARKET, price=signal.price, quantity=signal.quantity, ) if self._dry_run: logger.info(f"[DRY RUN] Would execute: {order.side} {order.quantity} {order.symbol}") order.status = OrderStatus.FILLED order.filled_at = datetime.now(timezone.utc) else: try: result = await self._exchange.create_order( symbol=signal.symbol.replace("USDT", "/USDT"), type="market", side=signal.side.value.lower(), amount=float(signal.quantity), ) order.status = OrderStatus.FILLED order.filled_at = datetime.now(timezone.utc) logger.info(f"Order filled: {order.id}") except Exception as e: order.status = OrderStatus.FAILED logger.error(f"Order failed: {e}") await self._db.insert_order(order) event = OrderEvent(data=order) await self._broker.publish("orders", event.to_dict()) return order ``` - [ ] **Step 9: Run tests to verify they pass** ```bash pytest services/order-executor/tests/test_executor.py -v ``` Expected: All PASS - [ ] **Step 10: Implement config and main** Create `services/order-executor/src/order_executor/config.py`: ```python from shared.config import Settings class ExecutorConfig(Settings): pass ``` Create `services/order-executor/src/order_executor/main.py`: ```python from __future__ import annotations import asyncio import logging import ccxt.async_support as ccxt from shared.broker import RedisBroker from shared.db import Database from shared.events import Event from order_executor.config import ExecutorConfig from order_executor.executor import OrderExecutor from order_executor.risk_manager import RiskManager logger = logging.getLogger(__name__) async def run(): config = ExecutorConfig() logging.basicConfig(level=config.log_level) db = Database(config.database_url) await db.connect() broker = RedisBroker(config.redis_url) exchange = ccxt.binance({ "apiKey": config.binance_api_key, "secret": config.binance_api_secret, }) risk_manager = RiskManager( max_position_size=config.risk_max_position_size, stop_loss_pct=config.risk_stop_loss_pct, daily_loss_limit_pct=config.risk_daily_loss_limit_pct, ) executor = OrderExecutor( exchange=exchange, risk_manager=risk_manager, broker=broker, db=db, dry_run=config.dry_run, ) logger.info(f"Starting order executor (dry_run={config.dry_run})") last_id = "0-0" try: while True: messages = await broker.read("signals", last_id=last_id, count=10, block=1000) for msg in messages: event = Event.from_dict(msg) await executor.execute(event.data) finally: await exchange.close() await broker.close() await db.close() def main(): asyncio.run(run()) if __name__ == "__main__": main() ``` - [ ] **Step 11: Create Dockerfile** Create `services/order-executor/Dockerfile`: ```dockerfile FROM python:3.12-slim WORKDIR /app COPY shared/ shared/ RUN pip install --no-cache-dir ./shared COPY services/order-executor/ services/order-executor/ RUN pip install --no-cache-dir ./services/order-executor CMD ["python", "-m", "order_executor.main"] ``` - [ ] **Step 12: Commit** ```bash git add services/order-executor/ git commit -m "feat(order-executor): add order execution with risk management and dry-run mode" ``` --- ## Task 8: Portfolio Manager Service **Files:** - Create: `services/portfolio-manager/pyproject.toml` - Create: `services/portfolio-manager/Dockerfile` - Create: `services/portfolio-manager/src/portfolio_manager/__init__.py` - Create: `services/portfolio-manager/src/portfolio_manager/config.py` - Create: `services/portfolio-manager/src/portfolio_manager/portfolio.py` - Create: `services/portfolio-manager/src/portfolio_manager/pnl.py` - Create: `services/portfolio-manager/src/portfolio_manager/main.py` - Create: `services/portfolio-manager/tests/test_portfolio.py` - Create: `services/portfolio-manager/tests/test_pnl.py` - [ ] **Step 1: Create pyproject.toml** Create `services/portfolio-manager/pyproject.toml`: ```toml [project] name = "portfolio-manager" version = "0.1.0" description = "Portfolio tracking and PnL calculation service" requires-python = ">=3.12" dependencies = [ "trading-shared", ] [project.optional-dependencies] dev = [ "pytest>=8.0", "pytest-asyncio>=0.23", ] [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["src/portfolio_manager"] ``` - [ ] **Step 2: Write failing tests for pnl** Create `services/portfolio-manager/tests/test_pnl.py`: ```python from decimal import Decimal from portfolio_manager.pnl import calculate_unrealized_pnl, calculate_realized_pnl def test_unrealized_pnl_profit(): result = calculate_unrealized_pnl( quantity=Decimal("0.1"), avg_entry_price=Decimal("50000"), current_price=Decimal("55000"), ) assert result == Decimal("500") # 0.1 * (55000 - 50000) def test_unrealized_pnl_loss(): result = calculate_unrealized_pnl( quantity=Decimal("0.1"), avg_entry_price=Decimal("50000"), current_price=Decimal("45000"), ) assert result == Decimal("-500") def test_realized_pnl_single_trade(): result = calculate_realized_pnl( buy_price=Decimal("50000"), sell_price=Decimal("55000"), quantity=Decimal("0.1"), fee=Decimal("5.5"), ) assert result == Decimal("494.5") # 0.1 * (55000 - 50000) - 5.5 ``` - [ ] **Step 3: Run tests to verify they fail** ```bash pip install -e services/portfolio-manager[dev] pytest services/portfolio-manager/tests/test_pnl.py -v ``` Expected: FAIL - [ ] **Step 4: Implement pnl** Create `services/portfolio-manager/src/portfolio_manager/__init__.py`: ```python ``` Create `services/portfolio-manager/src/portfolio_manager/pnl.py`: ```python from decimal import Decimal def calculate_unrealized_pnl( quantity: Decimal, avg_entry_price: Decimal, current_price: Decimal, ) -> Decimal: return quantity * (current_price - avg_entry_price) def calculate_realized_pnl( buy_price: Decimal, sell_price: Decimal, quantity: Decimal, fee: Decimal = Decimal("0"), ) -> Decimal: return quantity * (sell_price - buy_price) - fee ``` - [ ] **Step 5: Run tests to verify they pass** ```bash pytest services/portfolio-manager/tests/test_pnl.py -v ``` Expected: All PASS - [ ] **Step 6: Write failing tests for portfolio** Create `services/portfolio-manager/tests/test_portfolio.py`: ```python import pytest from decimal import Decimal from shared.models import Order, OrderSide, OrderType, OrderStatus from portfolio_manager.portfolio import PortfolioTracker @pytest.fixture def tracker(): return PortfolioTracker() def make_order(side=OrderSide.BUY, price="50000", quantity="0.1") -> Order: return Order( signal_id="sig_1", symbol="BTCUSDT", side=side, type=OrderType.MARKET, price=Decimal(price), quantity=Decimal(quantity), status=OrderStatus.FILLED, ) def test_portfolio_add_buy_order(tracker): order = make_order(side=OrderSide.BUY) tracker.apply_order(order) pos = tracker.get_position("BTCUSDT") assert pos.quantity == Decimal("0.1") assert pos.avg_entry_price == Decimal("50000") def test_portfolio_add_multiple_buys(tracker): tracker.apply_order(make_order(price="50000", quantity="0.1")) tracker.apply_order(make_order(price="52000", quantity="0.1")) pos = tracker.get_position("BTCUSDT") assert pos.quantity == Decimal("0.2") assert pos.avg_entry_price == Decimal("51000") # weighted avg def test_portfolio_sell_reduces_position(tracker): tracker.apply_order(make_order(side=OrderSide.BUY, price="50000", quantity="0.2")) tracker.apply_order(make_order(side=OrderSide.SELL, price="55000", quantity="0.1")) pos = tracker.get_position("BTCUSDT") assert pos.quantity == Decimal("0.1") assert pos.avg_entry_price == Decimal("50000") # entry price unchanged def test_portfolio_no_position_returns_none(tracker): pos = tracker.get_position("ETHUSDT") assert pos is None ``` - [ ] **Step 7: Run tests to verify they fail** ```bash pytest services/portfolio-manager/tests/test_portfolio.py -v ``` Expected: FAIL - [ ] **Step 8: Implement portfolio** Create `services/portfolio-manager/src/portfolio_manager/portfolio.py`: ```python from __future__ import annotations from decimal import Decimal from shared.models import Order, OrderSide, Position class PortfolioTracker: def __init__(self): self._positions: dict[str, _PositionState] = {} def apply_order(self, order: Order) -> None: if order.symbol not in self._positions: self._positions[order.symbol] = _PositionState() state = self._positions[order.symbol] if order.side == OrderSide.BUY: total_cost = state.avg_entry * state.quantity + order.price * order.quantity state.quantity += order.quantity state.avg_entry = total_cost / state.quantity if state.quantity > 0 else Decimal("0") elif order.side == OrderSide.SELL: state.quantity -= order.quantity if state.quantity <= 0: state.quantity = Decimal("0") state.avg_entry = Decimal("0") def get_position(self, symbol: str) -> Position | None: state = self._positions.get(symbol) if state is None or state.quantity == 0: return None return Position( symbol=symbol, quantity=state.quantity, avg_entry_price=state.avg_entry, current_price=Decimal("0"), ) def get_all_positions(self) -> list[Position]: positions = [] for symbol in self._positions: pos = self.get_position(symbol) if pos is not None: positions.append(pos) return positions class _PositionState: def __init__(self): self.quantity = Decimal("0") self.avg_entry = Decimal("0") ``` - [ ] **Step 9: Run tests to verify they pass** ```bash pytest services/portfolio-manager/tests/test_portfolio.py -v ``` Expected: All PASS - [ ] **Step 10: Implement config and main** Create `services/portfolio-manager/src/portfolio_manager/config.py`: ```python from shared.config import Settings class PortfolioConfig(Settings): snapshot_interval_hours: int = 24 ``` Create `services/portfolio-manager/src/portfolio_manager/main.py`: ```python from __future__ import annotations import asyncio import logging from shared.broker import RedisBroker from shared.db import Database from shared.events import Event from portfolio_manager.config import PortfolioConfig from portfolio_manager.portfolio import PortfolioTracker logger = logging.getLogger(__name__) async def run(): config = PortfolioConfig() logging.basicConfig(level=config.log_level) db = Database(config.database_url) await db.connect() broker = RedisBroker(config.redis_url) tracker = PortfolioTracker() logger.info("Starting portfolio manager") last_id = "0-0" try: while True: messages = await broker.read("orders", last_id=last_id, count=10, block=1000) for msg in messages: event = Event.from_dict(msg) order = event.data tracker.apply_order(order) logger.info(f"Position updated: {order.symbol}") finally: await broker.close() await db.close() def main(): asyncio.run(run()) if __name__ == "__main__": main() ``` - [ ] **Step 11: Create Dockerfile** Create `services/portfolio-manager/Dockerfile`: ```dockerfile FROM python:3.12-slim WORKDIR /app COPY shared/ shared/ RUN pip install --no-cache-dir ./shared COPY services/portfolio-manager/ services/portfolio-manager/ RUN pip install --no-cache-dir ./services/portfolio-manager CMD ["python", "-m", "portfolio_manager.main"] ``` - [ ] **Step 12: Commit** ```bash git add services/portfolio-manager/ git commit -m "feat(portfolio-manager): add portfolio tracking and PnL calculation" ``` --- ## Task 9: Backtester Service **Files:** - Create: `services/backtester/pyproject.toml` - Create: `services/backtester/Dockerfile` - Create: `services/backtester/src/backtester/__init__.py` - Create: `services/backtester/src/backtester/config.py` - Create: `services/backtester/src/backtester/simulator.py` - Create: `services/backtester/src/backtester/engine.py` - Create: `services/backtester/src/backtester/reporter.py` - Create: `services/backtester/src/backtester/main.py` - Create: `services/backtester/tests/test_simulator.py` - Create: `services/backtester/tests/test_engine.py` - Create: `services/backtester/tests/test_reporter.py` - [ ] **Step 1: Create pyproject.toml** Create `services/backtester/pyproject.toml`: ```toml [project] name = "backtester" version = "0.1.0" description = "Strategy backtesting engine" requires-python = ">=3.12" dependencies = [ "pandas>=2.0", "trading-shared", ] [project.optional-dependencies] dev = [ "pytest>=8.0", "pytest-asyncio>=0.23", ] [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["src/backtester"] ``` - [ ] **Step 2: Write failing tests for simulator** Create `services/backtester/tests/test_simulator.py`: ```python from decimal import Decimal from shared.models import Signal, OrderSide from backtester.simulator import OrderSimulator def make_signal(side=OrderSide.BUY, price="50000", quantity="0.1") -> Signal: return Signal( strategy="test", symbol="BTCUSDT", side=side, price=Decimal(price), quantity=Decimal(quantity), reason="test", ) def test_simulator_initial_balance(): sim = OrderSimulator(initial_balance=Decimal("10000")) assert sim.balance == Decimal("10000") def test_simulator_buy_reduces_balance(): sim = OrderSimulator(initial_balance=Decimal("10000")) sim.execute(make_signal(side=OrderSide.BUY, price="50000", quantity="0.1")) assert sim.balance == Decimal("5000") # 10000 - 0.1*50000 assert sim.positions["BTCUSDT"] == Decimal("0.1") def test_simulator_sell_increases_balance(): sim = OrderSimulator(initial_balance=Decimal("10000")) sim.execute(make_signal(side=OrderSide.BUY, price="50000", quantity="0.1")) sim.execute(make_signal(side=OrderSide.SELL, price="55000", quantity="0.1")) assert sim.balance == Decimal("10500") # 5000 + 0.1*55000 assert sim.positions.get("BTCUSDT", Decimal("0")) == Decimal("0") def test_simulator_reject_buy_insufficient_balance(): sim = OrderSimulator(initial_balance=Decimal("100")) result = sim.execute(make_signal(side=OrderSide.BUY, price="50000", quantity="0.1")) assert result is False assert sim.balance == Decimal("100") def test_simulator_trade_history(): sim = OrderSimulator(initial_balance=Decimal("10000")) sim.execute(make_signal(side=OrderSide.BUY)) assert len(sim.trades) == 1 ``` - [ ] **Step 3: Run tests to verify they fail** ```bash pip install -e services/backtester[dev] pytest services/backtester/tests/test_simulator.py -v ``` Expected: FAIL - [ ] **Step 4: Implement simulator** Create `services/backtester/src/backtester/__init__.py`: ```python ``` Create `services/backtester/src/backtester/simulator.py`: ```python from __future__ import annotations from dataclasses import dataclass, field from decimal import Decimal from shared.models import Signal, OrderSide @dataclass class SimulatedTrade: symbol: str side: OrderSide price: Decimal quantity: Decimal balance_after: Decimal class OrderSimulator: def __init__(self, initial_balance: Decimal): self.balance = initial_balance self.positions: dict[str, Decimal] = {} self.trades: list[SimulatedTrade] = [] def execute(self, signal: Signal) -> bool: if signal.side == OrderSide.BUY: cost = signal.price * signal.quantity if cost > self.balance: return False self.balance -= cost current = self.positions.get(signal.symbol, Decimal("0")) self.positions[signal.symbol] = current + signal.quantity elif signal.side == OrderSide.SELL: current = self.positions.get(signal.symbol, Decimal("0")) sell_qty = min(signal.quantity, current) if sell_qty <= 0: return False self.balance += signal.price * sell_qty self.positions[signal.symbol] = current - sell_qty self.trades.append( SimulatedTrade( symbol=signal.symbol, side=signal.side, price=signal.price, quantity=signal.quantity, balance_after=self.balance, ) ) return True ``` - [ ] **Step 5: Run tests to verify they pass** ```bash pytest services/backtester/tests/test_simulator.py -v ``` Expected: All PASS - [ ] **Step 6: Write failing tests for backtest engine** Create `services/backtester/tests/test_engine.py`: ```python import pytest from decimal import Decimal from datetime import datetime, timezone from unittest.mock import MagicMock from shared.models import Candle, Signal, OrderSide from backtester.engine import BacktestEngine def make_candles(prices: list[float]) -> list[Candle]: return [ Candle( symbol="BTCUSDT", timeframe="1m", open_time=datetime(2026, 1, 1, minute=i, tzinfo=timezone.utc), open=Decimal(str(p)), high=Decimal(str(p + 10)), low=Decimal(str(p - 10)), close=Decimal(str(p)), volume=Decimal("1.0"), ) for i, p in enumerate(prices) ] def test_backtest_engine_runs_strategy_over_candles(): mock_strategy = MagicMock() mock_strategy.name = "test" mock_strategy.on_candle.return_value = None candles = make_candles([50000, 50100, 50200]) engine = BacktestEngine( strategy=mock_strategy, initial_balance=Decimal("10000"), ) result = engine.run(candles) assert mock_strategy.on_candle.call_count == 3 assert result.total_trades == 0 assert result.final_balance == Decimal("10000") def test_backtest_engine_executes_signals(): buy_signal = Signal( strategy="test", symbol="BTCUSDT", side=OrderSide.BUY, price=Decimal("50000"), quantity=Decimal("0.1"), reason="test buy", ) sell_signal = Signal( strategy="test", symbol="BTCUSDT", side=OrderSide.SELL, price=Decimal("55000"), quantity=Decimal("0.1"), reason="test sell", ) mock_strategy = MagicMock() mock_strategy.name = "test" mock_strategy.on_candle.side_effect = [buy_signal, None, sell_signal] candles = make_candles([50000, 52000, 55000]) engine = BacktestEngine( strategy=mock_strategy, initial_balance=Decimal("10000"), ) result = engine.run(candles) assert result.total_trades == 2 assert result.final_balance == Decimal("10500") # 10000 - 5000 + 5500 ``` - [ ] **Step 7: Run tests to verify they fail** ```bash pytest services/backtester/tests/test_engine.py -v ``` Expected: FAIL - [ ] **Step 8: Implement backtest engine** Create `services/backtester/src/backtester/engine.py`: ```python from __future__ import annotations from dataclasses import dataclass from decimal import Decimal from shared.models import Candle from backtester.simulator import OrderSimulator from strategies.base import BaseStrategy @dataclass class BacktestResult: strategy_name: str symbol: str total_trades: int initial_balance: Decimal final_balance: Decimal profit: Decimal profit_pct: Decimal trades: list @property def win_rate(self) -> Decimal: if self.total_trades == 0: return Decimal("0") wins = sum( 1 for i in range(0, len(self.trades) - 1, 2) if i + 1 < len(self.trades) and self.trades[i + 1].balance_after > self.trades[i].balance_after ) pairs = self.total_trades // 2 return Decimal(str(wins / pairs * 100)) if pairs > 0 else Decimal("0") class BacktestEngine: def __init__(self, strategy: BaseStrategy, initial_balance: Decimal): self._strategy = strategy self._initial_balance = initial_balance def run(self, candles: list[Candle]) -> BacktestResult: simulator = OrderSimulator(self._initial_balance) symbol = candles[0].symbol if candles else "" for candle in candles: signal = self._strategy.on_candle(candle) if signal is not None: simulator.execute(signal) final = simulator.balance # Add value of remaining positions at last candle price if candles: last_price = candles[-1].close for sym, qty in simulator.positions.items(): final += qty * last_price profit = final - self._initial_balance profit_pct = (profit / self._initial_balance) * 100 if self._initial_balance > 0 else Decimal("0") return BacktestResult( strategy_name=self._strategy.name, symbol=symbol, total_trades=len(simulator.trades), initial_balance=self._initial_balance, final_balance=final, profit=profit, profit_pct=profit_pct, trades=simulator.trades, ) ``` - [ ] **Step 9: Run tests to verify they pass** ```bash pytest services/backtester/tests/test_engine.py -v ``` Expected: All PASS - [ ] **Step 10: Write failing tests for reporter** Create `services/backtester/tests/test_reporter.py`: ```python from decimal import Decimal from backtester.engine import BacktestResult from backtester.reporter import format_report def test_format_report_contains_key_metrics(): result = BacktestResult( strategy_name="rsi", symbol="BTCUSDT", total_trades=10, initial_balance=Decimal("10000"), final_balance=Decimal("11500"), profit=Decimal("1500"), profit_pct=Decimal("15"), trades=[], ) report = format_report(result) assert "rsi" in report assert "BTCUSDT" in report assert "10000" in report assert "11500" in report assert "1500" in report assert "15" in report ``` - [ ] **Step 11: Run test to verify it fails** ```bash pytest services/backtester/tests/test_reporter.py -v ``` Expected: FAIL - [ ] **Step 12: Implement reporter** Create `services/backtester/src/backtester/reporter.py`: ```python from backtester.engine import BacktestResult def format_report(result: BacktestResult) -> str: lines = [ "=" * 50, f" Backtest Report: {result.strategy_name}", "=" * 50, f" Symbol: {result.symbol}", f" Total Trades: {result.total_trades}", f" Initial Balance: {result.initial_balance}", f" Final Balance: {result.final_balance}", f" Profit: {result.profit}", f" Profit %: {result.profit_pct:.2f}%", f" Win Rate: {result.win_rate:.1f}%", "=" * 50, ] return "\n".join(lines) ``` - [ ] **Step 13: Run test to verify it passes** ```bash pytest services/backtester/tests/test_reporter.py -v ``` Expected: PASS - [ ] **Step 14: Implement config and main** Create `services/backtester/src/backtester/config.py`: ```python from shared.config import Settings class BacktestConfig(Settings): backtest_initial_balance: float = 10000.0 ``` Create `services/backtester/src/backtester/main.py`: ```python from __future__ import annotations import asyncio import logging from decimal import Decimal from pathlib import Path from shared.db import Database from backtester.config import BacktestConfig from backtester.engine import BacktestEngine from backtester.reporter import format_report logger = logging.getLogger(__name__) async def run_backtest( strategy_name: str, symbol: str, timeframe: str, initial_balance: Decimal, db: Database, strategies_dir: Path, ) -> str: from strategy_engine.plugin_loader import load_strategies strategies = load_strategies(strategies_dir) strategy = next((s for s in strategies if s.name == strategy_name), None) if strategy is None: return f"Strategy '{strategy_name}' not found" candles_data = await db.get_candles(symbol, timeframe) if not candles_data: return f"No candle data for {symbol} {timeframe}" from shared.models import Candle candles = [Candle(**row) for row in reversed(candles_data)] engine = BacktestEngine(strategy=strategy, initial_balance=initial_balance) result = engine.run(candles) return format_report(result) ``` - [ ] **Step 15: Create Dockerfile** Create `services/backtester/Dockerfile`: ```dockerfile FROM python:3.12-slim WORKDIR /app COPY shared/ shared/ RUN pip install --no-cache-dir ./shared COPY services/strategy-engine/strategies/ services/strategy-engine/strategies/ COPY services/backtester/ services/backtester/ RUN pip install --no-cache-dir ./services/backtester CMD ["python", "-m", "backtester.main"] ``` - [ ] **Step 16: Commit** ```bash git add services/backtester/ git commit -m "feat(backtester): add backtesting engine with simulator and reporting" ``` --- ## Task 10: CLI **Files:** - Create: `cli/pyproject.toml` - Create: `cli/src/trading_cli/__init__.py` - Create: `cli/src/trading_cli/main.py` - Create: `cli/src/trading_cli/commands/data.py` - Create: `cli/src/trading_cli/commands/trade.py` - Create: `cli/src/trading_cli/commands/backtest.py` - Create: `cli/src/trading_cli/commands/portfolio.py` - Create: `cli/src/trading_cli/commands/strategy.py` - Create: `cli/src/trading_cli/commands/service.py` - Create: `cli/tests/test_cli_data.py` - [ ] **Step 1: Create pyproject.toml** Create `cli/pyproject.toml`: ```toml [project] name = "trading-cli" version = "0.1.0" description = "CLI interface for the trading platform" requires-python = ">=3.12" dependencies = [ "click>=8.0", "rich>=13.0", "trading-shared", ] [project.scripts] trading = "trading_cli.main:cli" [project.optional-dependencies] dev = [ "pytest>=8.0", "pytest-asyncio>=0.23", ] [build-system] requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["src/trading_cli"] ``` - [ ] **Step 2: Write failing tests for CLI data commands** Create `cli/tests/test_cli_data.py`: ```python from click.testing import CliRunner from trading_cli.main import cli def test_cli_help(): runner = CliRunner() result = runner.invoke(cli, ["--help"]) assert result.exit_code == 0 assert "trading" in result.output.lower() or "Usage" in result.output def test_cli_data_group(): runner = CliRunner() result = runner.invoke(cli, ["data", "--help"]) assert result.exit_code == 0 assert "collect" in result.output assert "history" in result.output ``` - [ ] **Step 3: Run tests to verify they fail** ```bash pip install -e cli[dev] pytest cli/tests/test_cli_data.py -v ``` Expected: FAIL - [ ] **Step 4: Implement CLI main and data commands** Create `cli/src/trading_cli/__init__.py`: ```python ``` Create `cli/src/trading_cli/main.py`: ```python import click from trading_cli.commands.data import data from trading_cli.commands.trade import trade from trading_cli.commands.backtest import backtest from trading_cli.commands.portfolio import portfolio from trading_cli.commands.strategy import strategy from trading_cli.commands.service import service @click.group() @click.version_option(version="0.1.0") def cli(): """Trading Platform CLI — Binance spot crypto trading""" pass cli.add_command(data) cli.add_command(trade) cli.add_command(backtest) cli.add_command(portfolio) cli.add_command(strategy) cli.add_command(service) ``` Create `cli/src/trading_cli/commands/data.py`: ```python import asyncio import click @click.group() def data(): """Data collection commands""" pass @data.command() @click.option("--symbol", required=True, help="Trading pair (e.g. BTCUSDT)") @click.option("--timeframe", default="1m", help="Candle timeframe") def collect(symbol: str, timeframe: str): """Start real-time data collection""" click.echo(f"Starting data collection: {symbol} {timeframe}") from data_collector.config import CollectorConfig from data_collector.main import run asyncio.run(run()) @data.command() @click.option("--symbol", required=True, help="Trading pair (e.g. BTCUSDT)") @click.option("--timeframe", default="1m", help="Candle timeframe") @click.option("--from", "since", required=True, help="Start date (YYYY-MM-DD)") @click.option("--limit", default=1000, help="Number of candles") def history(symbol: str, timeframe: str, since: str, limit: int): """Download historical candle data""" click.echo(f"Downloading history: {symbol} {timeframe} from {since} (limit={limit})") async def _run(): import ccxt.async_support as ccxt from datetime import datetime, timezone from shared.broker import RedisBroker from shared.config import Settings from shared.db import Database from data_collector.binance_rest import fetch_historical_candles from data_collector.storage import CandleStorage settings = Settings() db = Database(settings.database_url) await db.connect() await db.init_tables() broker = RedisBroker(settings.redis_url) storage = CandleStorage(db=db, broker=broker) exchange = ccxt.binance() since_dt = datetime.strptime(since, "%Y-%m-%d").replace(tzinfo=timezone.utc) candles = await fetch_historical_candles( exchange=exchange, symbol=symbol.replace("USDT", "/USDT"), timeframe=timeframe, since=since_dt, limit=limit, ) await storage.store_batch(candles) await exchange.close() await broker.close() await db.close() click.echo(f"Downloaded {len(candles)} candles") asyncio.run(_run()) @data.command("list") def list_data(): """List currently collecting symbols""" click.echo("Collecting symbols:") click.echo(" (Check docker-compose service status)") ``` - [ ] **Step 5: Implement remaining CLI command stubs** Create `cli/src/trading_cli/commands/trade.py`: ```python import click @click.group() def trade(): """Trading bot commands""" pass @trade.command() @click.option("--strategy", required=True, help="Strategy name") @click.option("--symbol", required=True, help="Trading pair") def start(strategy: str, symbol: str): """Start a trading bot""" click.echo(f"Starting bot: strategy={strategy} symbol={symbol}") @trade.command() @click.option("--strategy", required=True, help="Strategy name") def stop(strategy: str): """Stop a trading bot""" click.echo(f"Stopping bot: strategy={strategy}") @trade.command() def status(): """Show running bot status""" click.echo("Running bots:") @trade.command("stop-all") def stop_all(): """Emergency stop: stop all bots and cancel all orders""" click.confirm("Are you sure you want to stop ALL bots?", abort=True) click.echo("Stopping all bots and cancelling open orders...") ``` Create `cli/src/trading_cli/commands/backtest.py`: ```python import asyncio from decimal import Decimal import click @click.group() def backtest(): """Backtesting commands""" pass @backtest.command("run") @click.option("--strategy", required=True, help="Strategy name") @click.option("--symbol", required=True, help="Trading pair") @click.option("--from", "since", required=True, help="Start date") @click.option("--to", "until", required=True, help="End date") @click.option("--balance", default=10000.0, help="Initial balance") def run_backtest(strategy: str, symbol: str, since: str, until: str, balance: float): """Run a backtest""" click.echo(f"Running backtest: {strategy} on {symbol} ({since} ~ {until})") async def _run(): from pathlib import Path from shared.config import Settings from shared.db import Database from backtester.main import run_backtest as bt_run settings = Settings() db = Database(settings.database_url) await db.connect() strategies_dir = Path(__file__).parent.parent.parent.parent.parent / "services" / "strategy-engine" / "strategies" report = await bt_run( strategy_name=strategy, symbol=symbol, timeframe="1m", initial_balance=Decimal(str(balance)), db=db, strategies_dir=strategies_dir, ) click.echo(report) await db.close() asyncio.run(_run()) @backtest.command() @click.option("--id", "report_id", default="latest", help="Report ID") def report(report_id: str): """Show backtest report""" click.echo(f"Showing report: {report_id}") ``` Create `cli/src/trading_cli/commands/portfolio.py`: ```python import click @click.group() def portfolio(): """Portfolio commands""" pass @portfolio.command() def show(): """Show current portfolio""" click.echo("Current Portfolio:") click.echo(" (Connect to portfolio-manager service)") @portfolio.command() @click.option("--days", default=30, help="Number of days") def history(days: int): """Show PnL history""" click.echo(f"PnL history (last {days} days):") ``` Create `cli/src/trading_cli/commands/strategy.py`: ```python from pathlib import Path import click @click.group() def strategy(): """Strategy management commands""" pass @strategy.command("list") def list_strategies(): """List available strategies""" from strategy_engine.plugin_loader import load_strategies strategies_dir = Path(__file__).parent.parent.parent.parent.parent / "services" / "strategy-engine" / "strategies" strategies = load_strategies(strategies_dir) click.echo("Available strategies:") for s in strategies: click.echo(f" - {s.name}") @strategy.command() @click.option("--name", required=True, help="Strategy name") def info(name: str): """Show strategy details""" click.echo(f"Strategy: {name}") ``` Create `cli/src/trading_cli/commands/service.py`: ```python import subprocess import click @click.group() def service(): """Service management commands""" pass @service.command() def up(): """Start all services""" click.echo("Starting all services...") subprocess.run(["docker", "compose", "up", "-d"], check=True) @service.command() def down(): """Stop all services""" click.echo("Stopping all services...") subprocess.run(["docker", "compose", "down"], check=True) @service.command() @click.option("--name", required=True, help="Service name") def logs(name: str): """Show service logs""" subprocess.run(["docker", "compose", "logs", "-f", name]) ``` - [ ] **Step 6: Run tests to verify they pass** ```bash pytest cli/tests/test_cli_data.py -v ``` Expected: All PASS - [ ] **Step 7: Commit** ```bash git add cli/ git commit -m "feat(cli): add Click-based CLI with data, trade, backtest, portfolio, strategy, and service commands" ``` --- ## Task 11: Integration Verification - [ ] **Step 1: Run all tests** ```bash cd /home/si/Private/repos/trading pytest -v ``` Expected: All tests pass - [ ] **Step 2: Lint check** ```bash ruff check . ``` Fix any issues found. - [ ] **Step 3: Verify Docker builds** ```bash docker compose build ``` Expected: All services build successfully - [ ] **Step 4: Start infrastructure and verify** ```bash make infra # Wait for healthy status docker compose ps ``` Expected: redis and postgres running and healthy - [ ] **Step 5: Final commit** ```bash git add . git commit -m "chore: integration verification — all tests pass, docker builds succeed" ```