From 33b14aaa2344b0fd95d1629627c3d135b24ae102 Mon Sep 17 00:00:00 2001 From: TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> Date: Wed, 1 Apr 2026 15:56:35 +0900 Subject: feat: initial trading platform implementation Binance spot crypto trading platform with microservices architecture: - shared: Pydantic models, Redis Streams broker, asyncpg DB layer - data-collector: Binance WebSocket/REST market data collection - strategy-engine: Plugin-based strategy execution (RSI, Grid) - order-executor: Order execution with risk management - portfolio-manager: Position tracking and PnL calculation - backtester: Historical strategy testing with simulator - cli: Click-based CLI for all operations - Docker Compose orchestration with Redis and PostgreSQL - 24 test files covering all modules --- .../plans/2026-04-01-crypto-trading-platform.md | 4063 ++++++++++++++++++++ .../2026-04-01-crypto-trading-platform-design.md | 374 ++ 2 files changed, 4437 insertions(+) create mode 100644 docs/superpowers/plans/2026-04-01-crypto-trading-platform.md create mode 100644 docs/superpowers/specs/2026-04-01-crypto-trading-platform-design.md (limited to 'docs/superpowers') diff --git a/docs/superpowers/plans/2026-04-01-crypto-trading-platform.md b/docs/superpowers/plans/2026-04-01-crypto-trading-platform.md new file mode 100644 index 0000000..08ff0f5 --- /dev/null +++ b/docs/superpowers/plans/2026-04-01-crypto-trading-platform.md @@ -0,0 +1,4063 @@ +# Crypto Trading Platform Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Binance 현물 암호화폐 자동매매 플랫폼을 마이크로서비스 아키텍처로 구축한다. + +**Architecture:** 6개 독립 서비스(data-collector, strategy-engine, order-executor, portfolio-manager, backtester)가 Redis Streams로 통신하고, PostgreSQL에 데이터를 저장한다. shared 라이브러리가 공통 모델/이벤트/DB 연결을 제공하며, Click 기반 CLI로 전체를 제어한다. + +**Tech Stack:** Python 3.12, ccxt, Redis Streams, PostgreSQL, asyncpg, pandas, pandas-ta, Click, pydantic-settings, Docker Compose, pytest + +--- + +## File Structure + +``` +trading/ +├── services/ +│ ├── data-collector/ +│ │ ├── src/data_collector/__init__.py +│ │ ├── src/data_collector/main.py +│ │ ├── src/data_collector/binance_ws.py +│ │ ├── src/data_collector/binance_rest.py +│ │ ├── src/data_collector/storage.py +│ │ ├── src/data_collector/config.py +│ │ ├── tests/test_binance_rest.py +│ │ ├── tests/test_storage.py +│ │ ├── tests/test_main.py +│ │ ├── Dockerfile +│ │ └── pyproject.toml +│ ├── strategy-engine/ +│ │ ├── src/strategy_engine/__init__.py +│ │ ├── src/strategy_engine/main.py +│ │ ├── src/strategy_engine/engine.py +│ │ ├── src/strategy_engine/plugin_loader.py +│ │ ├── src/strategy_engine/config.py +│ │ ├── strategies/base.py +│ │ ├── strategies/rsi_strategy.py +│ │ ├── strategies/grid_strategy.py +│ │ ├── tests/test_engine.py +│ │ ├── tests/test_plugin_loader.py +│ │ ├── tests/test_rsi_strategy.py +│ │ ├── tests/test_grid_strategy.py +│ │ ├── Dockerfile +│ │ └── pyproject.toml +│ ├── order-executor/ +│ │ ├── src/order_executor/__init__.py +│ │ ├── src/order_executor/main.py +│ │ ├── src/order_executor/executor.py +│ │ ├── src/order_executor/risk_manager.py +│ │ ├── src/order_executor/config.py +│ │ ├── tests/test_executor.py +│ │ ├── tests/test_risk_manager.py +│ │ ├── Dockerfile +│ │ └── pyproject.toml +│ ├── portfolio-manager/ +│ │ ├── src/portfolio_manager/__init__.py +│ │ ├── src/portfolio_manager/main.py +│ │ ├── src/portfolio_manager/portfolio.py +│ │ ├── src/portfolio_manager/pnl.py +│ │ ├── src/portfolio_manager/config.py +│ │ ├── tests/test_portfolio.py +│ │ ├── tests/test_pnl.py +│ │ ├── Dockerfile +│ │ └── pyproject.toml +│ └── backtester/ +│ ├── src/backtester/__init__.py +│ ├── src/backtester/main.py +│ ├── src/backtester/engine.py +│ ├── src/backtester/simulator.py +│ ├── src/backtester/reporter.py +│ ├── src/backtester/config.py +│ ├── tests/test_engine.py +│ ├── tests/test_simulator.py +│ ├── tests/test_reporter.py +│ ├── Dockerfile +│ └── pyproject.toml +├── shared/ +│ ├── src/shared/__init__.py +│ ├── src/shared/models.py +│ ├── src/shared/events.py +│ ├── src/shared/broker.py +│ ├── src/shared/db.py +│ ├── src/shared/config.py +│ ├── tests/test_models.py +│ ├── tests/test_events.py +│ ├── tests/test_broker.py +│ ├── tests/test_db.py +│ └── pyproject.toml +├── cli/ +│ ├── src/trading_cli/__init__.py +│ ├── src/trading_cli/main.py +│ ├── src/trading_cli/commands/data.py +│ ├── src/trading_cli/commands/trade.py +│ ├── src/trading_cli/commands/backtest.py +│ ├── src/trading_cli/commands/portfolio.py +│ ├── src/trading_cli/commands/strategy.py +│ ├── src/trading_cli/commands/service.py +│ ├── tests/test_cli_data.py +│ ├── tests/test_cli_trade.py +│ └── pyproject.toml +├── docker-compose.yml +├── .env.example +├── Makefile +└── pyproject.toml (workspace root) +``` + +--- + +## Task 1: Project Scaffolding + +**Files:** +- Create: `pyproject.toml` (workspace root) +- Create: `.env.example` +- Create: `docker-compose.yml` +- Create: `Makefile` +- Create: `.gitignore` +- Create: `shared/pyproject.toml` + +- [ ] **Step 1: Initialize git repo** + +```bash +cd /home/si/Private/repos/trading +git init +``` + +- [ ] **Step 2: Create .gitignore** + +Create `.gitignore`: + +```gitignore +__pycache__/ +*.py[cod] +*$py.class +*.egg-info/ +dist/ +build/ +.eggs/ +*.egg +.venv/ +venv/ +env/ +.env +.mypy_cache/ +.pytest_cache/ +.ruff_cache/ +*.log +.DS_Store +``` + +- [ ] **Step 3: Create workspace root pyproject.toml** + +Create `pyproject.toml`: + +```toml +[project] +name = "trading-platform" +version = "0.1.0" +description = "Binance spot crypto trading platform" +requires-python = ">=3.12" + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["shared/tests", "services/*/tests", "cli/tests"] + +[tool.ruff] +target-version = "py312" +line-length = 100 +``` + +- [ ] **Step 4: Create .env.example** + +Create `.env.example`: + +```env +BINANCE_API_KEY= +BINANCE_API_SECRET= +REDIS_URL=redis://localhost:6379 +DATABASE_URL=postgresql://trading:trading@localhost:5432/trading +LOG_LEVEL=INFO +RISK_MAX_POSITION_SIZE=0.1 +RISK_STOP_LOSS_PCT=5 +RISK_DAILY_LOSS_LIMIT_PCT=10 +DRY_RUN=true +``` + +- [ ] **Step 5: Create docker-compose.yml** + +Create `docker-compose.yml`: + +```yaml +services: + redis: + image: redis:7-alpine + ports: + - "6379:6379" + volumes: + - redis_data:/data + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 3s + retries: 5 + + postgres: + image: postgres:16-alpine + ports: + - "5432:5432" + environment: + POSTGRES_USER: trading + POSTGRES_PASSWORD: trading + POSTGRES_DB: trading + volumes: + - postgres_data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-LINE", "pg_isready", "-U", "trading"] + interval: 5s + timeout: 3s + retries: 5 + + data-collector: + build: + context: . + dockerfile: services/data-collector/Dockerfile + env_file: .env + depends_on: + redis: + condition: service_healthy + postgres: + condition: service_healthy + restart: unless-stopped + + strategy-engine: + build: + context: . + dockerfile: services/strategy-engine/Dockerfile + env_file: .env + depends_on: + redis: + condition: service_healthy + postgres: + condition: service_healthy + restart: unless-stopped + + order-executor: + build: + context: . + dockerfile: services/order-executor/Dockerfile + env_file: .env + depends_on: + redis: + condition: service_healthy + postgres: + condition: service_healthy + restart: unless-stopped + + portfolio-manager: + build: + context: . + dockerfile: services/portfolio-manager/Dockerfile + env_file: .env + depends_on: + redis: + condition: service_healthy + postgres: + condition: service_healthy + restart: unless-stopped + +volumes: + redis_data: + postgres_data: +``` + +- [ ] **Step 6: Create Makefile** + +Create `Makefile`: + +```makefile +.PHONY: infra up down logs test lint + +infra: + docker compose up -d redis postgres + +up: + docker compose up -d + +down: + docker compose down + +logs: + docker compose logs -f $(service) + +test: + pytest -v + +lint: + ruff check . + ruff format --check . + +format: + ruff check --fix . + ruff format . +``` + +- [ ] **Step 7: Create shared/pyproject.toml** + +Create `shared/pyproject.toml`: + +```toml +[project] +name = "trading-shared" +version = "0.1.0" +description = "Shared models, events, and utilities for trading platform" +requires-python = ">=3.12" +dependencies = [ + "pydantic>=2.0", + "pydantic-settings>=2.0", + "redis>=5.0", + "asyncpg>=0.29", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0", + "pytest-asyncio>=0.23", + "ruff>=0.4", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/shared"] +``` + +- [ ] **Step 8: Commit scaffolding** + +```bash +git add . +git commit -m "chore: project scaffolding with docker-compose, makefile, shared package" +``` + +--- + +## Task 2: Shared — Config & Models + +**Files:** +- Create: `shared/src/shared/__init__.py` +- Create: `shared/src/shared/config.py` +- Create: `shared/src/shared/models.py` +- Create: `shared/tests/test_models.py` + +- [ ] **Step 1: Write failing test for config** + +Create `shared/tests/test_models.py`: + +```python +from shared.config import Settings + + +def test_settings_defaults(): + settings = Settings( + binance_api_key="test_key", + binance_api_secret="test_secret", + ) + assert settings.redis_url == "redis://localhost:6379" + assert settings.database_url == "postgresql://trading:trading@localhost:5432/trading" + assert settings.log_level == "INFO" + assert settings.dry_run is True +``` + +- [ ] **Step 2: Run test to verify it fails** + +```bash +cd /home/si/Private/repos/trading +pip install -e shared[dev] +pytest shared/tests/test_models.py::test_settings_defaults -v +``` + +Expected: FAIL — `ModuleNotFoundError: No module named 'shared'` + +- [ ] **Step 3: Implement config** + +Create `shared/src/shared/__init__.py`: + +```python +``` + +Create `shared/src/shared/config.py`: + +```python +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + binance_api_key: str + binance_api_secret: str + redis_url: str = "redis://localhost:6379" + database_url: str = "postgresql://trading:trading@localhost:5432/trading" + log_level: str = "INFO" + risk_max_position_size: float = 0.1 + risk_stop_loss_pct: float = 5.0 + risk_daily_loss_limit_pct: float = 10.0 + dry_run: bool = True + + model_config = {"env_file": ".env", "env_file_encoding": "utf-8"} +``` + +- [ ] **Step 4: Run test to verify it passes** + +```bash +pytest shared/tests/test_models.py::test_settings_defaults -v +``` + +Expected: PASS + +- [ ] **Step 5: Write failing tests for models** + +Append to `shared/tests/test_models.py`: + +```python +from datetime import datetime, timezone +from decimal import Decimal + +from shared.models import Candle, Signal, Order, Position, OrderSide, OrderType, OrderStatus + + +def test_candle_creation(): + candle = Candle( + symbol="BTCUSDT", + timeframe="1m", + open_time=datetime(2026, 1, 1, tzinfo=timezone.utc), + open=Decimal("50000"), + high=Decimal("50100"), + low=Decimal("49900"), + close=Decimal("50050"), + volume=Decimal("1.5"), + ) + assert candle.symbol == "BTCUSDT" + assert candle.close == Decimal("50050") + + +def test_signal_creation(): + signal = Signal( + strategy="rsi_strategy", + symbol="BTCUSDT", + side=OrderSide.BUY, + price=Decimal("50000"), + quantity=Decimal("0.01"), + reason="RSI below 30", + ) + assert signal.side == OrderSide.BUY + assert signal.reason == "RSI below 30" + + +def test_order_creation(): + order = Order( + symbol="BTCUSDT", + signal_id="sig_123", + side=OrderSide.BUY, + type=OrderType.MARKET, + price=Decimal("50000"), + quantity=Decimal("0.01"), + ) + assert order.status == OrderStatus.PENDING + assert order.filled_at is None + assert order.id is not None + + +def test_position_unrealized_pnl(): + pos = Position( + symbol="BTCUSDT", + quantity=Decimal("0.1"), + avg_entry_price=Decimal("50000"), + current_price=Decimal("51000"), + ) + assert pos.unrealized_pnl == Decimal("100") # 0.1 * (51000 - 50000) +``` + +- [ ] **Step 6: Run tests to verify they fail** + +```bash +pytest shared/tests/test_models.py -v +``` + +Expected: FAIL — `ModuleNotFoundError: No module named 'shared.models'` + +- [ ] **Step 7: Implement models** + +Create `shared/src/shared/models.py`: + +```python +from datetime import datetime, timezone +from decimal import Decimal +from enum import StrEnum +from uuid import uuid4 + +from pydantic import BaseModel, Field + + +class OrderSide(StrEnum): + BUY = "BUY" + SELL = "SELL" + + +class OrderType(StrEnum): + MARKET = "MARKET" + LIMIT = "LIMIT" + + +class OrderStatus(StrEnum): + PENDING = "PENDING" + FILLED = "FILLED" + CANCELLED = "CANCELLED" + FAILED = "FAILED" + + +class Candle(BaseModel): + symbol: str + timeframe: str + open_time: datetime + open: Decimal + high: Decimal + low: Decimal + close: Decimal + volume: Decimal + + +class Signal(BaseModel): + id: str = Field(default_factory=lambda: str(uuid4())) + strategy: str + symbol: str + side: OrderSide + price: Decimal + quantity: Decimal + reason: str + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + +class Order(BaseModel): + id: str = Field(default_factory=lambda: str(uuid4())) + signal_id: str + symbol: str + side: OrderSide + type: OrderType + price: Decimal + quantity: Decimal + status: OrderStatus = OrderStatus.PENDING + created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + filled_at: datetime | None = None + + +class Position(BaseModel): + symbol: str + quantity: Decimal + avg_entry_price: Decimal + current_price: Decimal + + @property + def unrealized_pnl(self) -> Decimal: + return self.quantity * (self.current_price - self.avg_entry_price) +``` + +- [ ] **Step 8: Run tests to verify they pass** + +```bash +pytest shared/tests/test_models.py -v +``` + +Expected: All PASS + +- [ ] **Step 9: Commit** + +```bash +git add shared/ +git commit -m "feat(shared): add config settings and core data models" +``` + +--- + +## Task 3: Shared — Events & Redis Broker + +**Files:** +- Create: `shared/src/shared/events.py` +- Create: `shared/src/shared/broker.py` +- Create: `shared/tests/test_events.py` +- Create: `shared/tests/test_broker.py` + +- [ ] **Step 1: Write failing tests for events** + +Create `shared/tests/test_events.py`: + +```python +import json +from decimal import Decimal +from datetime import datetime, timezone + +from shared.events import EventType, Event, CandleEvent, SignalEvent, OrderEvent +from shared.models import Candle, Signal, Order, OrderSide, OrderType + + +def test_candle_event_serialize(): + candle = Candle( + symbol="BTCUSDT", + timeframe="1m", + open_time=datetime(2026, 1, 1, tzinfo=timezone.utc), + open=Decimal("50000"), + high=Decimal("50100"), + low=Decimal("49900"), + close=Decimal("50050"), + volume=Decimal("1.5"), + ) + event = CandleEvent(data=candle) + payload = event.to_dict() + assert payload["type"] == EventType.CANDLE + assert payload["data"]["symbol"] == "BTCUSDT" + + +def test_candle_event_deserialize(): + candle = Candle( + symbol="BTCUSDT", + timeframe="1m", + open_time=datetime(2026, 1, 1, tzinfo=timezone.utc), + open=Decimal("50000"), + high=Decimal("50100"), + low=Decimal("49900"), + close=Decimal("50050"), + volume=Decimal("1.5"), + ) + event = CandleEvent(data=candle) + payload = event.to_dict() + restored = Event.from_dict(payload) + assert isinstance(restored, CandleEvent) + assert restored.data.symbol == "BTCUSDT" + + +def test_signal_event_serialize(): + signal = Signal( + strategy="rsi", + symbol="BTCUSDT", + side=OrderSide.BUY, + price=Decimal("50000"), + quantity=Decimal("0.01"), + reason="RSI < 30", + ) + event = SignalEvent(data=signal) + payload = event.to_dict() + assert payload["type"] == EventType.SIGNAL +``` + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +pytest shared/tests/test_events.py -v +``` + +Expected: FAIL + +- [ ] **Step 3: Implement events** + +Create `shared/src/shared/events.py`: + +```python +from __future__ import annotations + +import json +from enum import StrEnum +from typing import Any + +from pydantic import BaseModel + +from shared.models import Candle, Signal, Order + + +class EventType(StrEnum): + CANDLE = "candle" + SIGNAL = "signal" + ORDER = "order" + + +class CandleEvent(BaseModel): + type: EventType = EventType.CANDLE + data: Candle + + def to_dict(self) -> dict[str, Any]: + return json.loads(self.model_dump_json()) + + @classmethod + def from_raw(cls, raw: dict[str, Any]) -> CandleEvent: + return cls.model_validate(raw) + + +class SignalEvent(BaseModel): + type: EventType = EventType.SIGNAL + data: Signal + + def to_dict(self) -> dict[str, Any]: + return json.loads(self.model_dump_json()) + + @classmethod + def from_raw(cls, raw: dict[str, Any]) -> SignalEvent: + return cls.model_validate(raw) + + +class OrderEvent(BaseModel): + type: EventType = EventType.ORDER + data: Order + + def to_dict(self) -> dict[str, Any]: + return json.loads(self.model_dump_json()) + + @classmethod + def from_raw(cls, raw: dict[str, Any]) -> OrderEvent: + return cls.model_validate(raw) + + +_EVENT_MAP = { + EventType.CANDLE: CandleEvent, + EventType.SIGNAL: SignalEvent, + EventType.ORDER: OrderEvent, +} + + +class Event: + @staticmethod + def from_dict(data: dict[str, Any]) -> CandleEvent | SignalEvent | OrderEvent: + event_type = EventType(data["type"]) + cls = _EVENT_MAP[event_type] + return cls.from_raw(data) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +pytest shared/tests/test_events.py -v +``` + +Expected: All PASS + +- [ ] **Step 5: Write failing tests for broker** + +Create `shared/tests/test_broker.py`: + +```python +import asyncio +import pytest +from unittest.mock import AsyncMock, MagicMock, patch + +from shared.broker import RedisBroker + + +@pytest.fixture +def mock_redis(): + redis = AsyncMock() + redis.xadd = AsyncMock(return_value=b"1234-0") + redis.xread = AsyncMock(return_value=[]) + redis.close = AsyncMock() + return redis + + +@pytest.mark.asyncio +async def test_broker_publish(mock_redis): + broker = RedisBroker.__new__(RedisBroker) + broker._redis = mock_redis + + await broker.publish("candles.BTCUSDT", {"type": "candle", "data": "test"}) + + mock_redis.xadd.assert_called_once() + call_args = mock_redis.xadd.call_args + assert call_args[0][0] == "candles.BTCUSDT" + + +@pytest.mark.asyncio +async def test_broker_subscribe_returns_messages(mock_redis): + mock_redis.xread = AsyncMock(return_value=[ + ("candles.BTCUSDT", [ + (b"1234-0", {b"payload": b'{"type":"candle","data":"test"}'}), + ]) + ]) + broker = RedisBroker.__new__(RedisBroker) + broker._redis = mock_redis + + messages = await broker.read("candles.BTCUSDT", last_id="0-0", count=1) + assert len(messages) == 1 + assert messages[0]["type"] == "candle" +``` + +- [ ] **Step 6: Run tests to verify they fail** + +```bash +pytest shared/tests/test_broker.py -v +``` + +Expected: FAIL + +- [ ] **Step 7: Implement broker** + +Create `shared/src/shared/broker.py`: + +```python +from __future__ import annotations + +import json + +import redis.asyncio as redis + + +class RedisBroker: + def __init__(self, redis_url: str): + self._redis = redis.from_url(redis_url, decode_responses=False) + + async def publish(self, stream: str, data: dict) -> str: + payload = json.dumps(data) + msg_id = await self._redis.xadd(stream, {"payload": payload.encode()}) + return msg_id + + async def read( + self, stream: str, last_id: str = "$", count: int = 10, block: int = 0 + ) -> list[dict]: + results = await self._redis.xread({stream: last_id}, count=count, block=block) + messages = [] + for _stream_name, entries in results: + for _msg_id, fields in entries: + payload = fields[b"payload"] + messages.append(json.loads(payload)) + return messages + + async def close(self): + await self._redis.close() +``` + +- [ ] **Step 8: Run tests to verify they pass** + +```bash +pytest shared/tests/test_broker.py -v +``` + +Expected: All PASS + +- [ ] **Step 9: Commit** + +```bash +git add shared/ +git commit -m "feat(shared): add event system and Redis Streams broker" +``` + +--- + +## Task 4: Shared — Database Layer + +**Files:** +- Create: `shared/src/shared/db.py` +- Create: `shared/tests/test_db.py` + +- [ ] **Step 1: Write failing tests for DB** + +Create `shared/tests/test_db.py`: + +```python +import pytest +from unittest.mock import AsyncMock, patch, MagicMock + +from shared.db import Database + + +@pytest.mark.asyncio +async def test_db_init_sql_creates_tables(): + db = Database.__new__(Database) + db._pool = AsyncMock() + mock_conn = AsyncMock() + db._pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + db._pool.acquire.return_value.__aexit__ = AsyncMock(return_value=False) + + await db.init_tables() + + mock_conn.execute.assert_called() + sql = mock_conn.execute.call_args[0][0] + assert "candles" in sql + assert "signals" in sql + assert "orders" in sql + assert "trades" in sql + assert "positions" in sql + assert "portfolio_snapshots" in sql + + +@pytest.mark.asyncio +async def test_db_insert_candle(): + db = Database.__new__(Database) + db._pool = AsyncMock() + mock_conn = AsyncMock() + db._pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn) + db._pool.acquire.return_value.__aexit__ = AsyncMock(return_value=False) + + from datetime import datetime, timezone + from decimal import Decimal + from shared.models import Candle + + candle = Candle( + symbol="BTCUSDT", + timeframe="1m", + open_time=datetime(2026, 1, 1, tzinfo=timezone.utc), + open=Decimal("50000"), + high=Decimal("50100"), + low=Decimal("49900"), + close=Decimal("50050"), + volume=Decimal("1.5"), + ) + + await db.insert_candle(candle) + mock_conn.execute.assert_called_once() + sql = mock_conn.execute.call_args[0][0] + assert "INSERT INTO candles" in sql +``` + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +pytest shared/tests/test_db.py -v +``` + +Expected: FAIL + +- [ ] **Step 3: Implement database layer** + +Create `shared/src/shared/db.py`: + +```python +from __future__ import annotations + +import asyncpg + +from shared.models import Candle, Order, Signal + +_INIT_SQL = """ +CREATE TABLE IF NOT EXISTS candles ( + symbol TEXT NOT NULL, + timeframe TEXT NOT NULL, + open_time TIMESTAMPTZ NOT NULL, + open NUMERIC NOT NULL, + high NUMERIC NOT NULL, + low NUMERIC NOT NULL, + close NUMERIC NOT NULL, + volume NUMERIC NOT NULL, + PRIMARY KEY (symbol, timeframe, open_time) +); + +CREATE TABLE IF NOT EXISTS signals ( + id TEXT PRIMARY KEY, + strategy TEXT NOT NULL, + symbol TEXT NOT NULL, + side TEXT NOT NULL, + price NUMERIC NOT NULL, + quantity NUMERIC NOT NULL, + reason TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL +); + +CREATE TABLE IF NOT EXISTS orders ( + id TEXT PRIMARY KEY, + signal_id TEXT REFERENCES signals(id), + symbol TEXT NOT NULL, + side TEXT NOT NULL, + type TEXT NOT NULL, + price NUMERIC NOT NULL, + quantity NUMERIC NOT NULL, + status TEXT NOT NULL DEFAULT 'PENDING', + created_at TIMESTAMPTZ NOT NULL, + filled_at TIMESTAMPTZ +); + +CREATE TABLE IF NOT EXISTS trades ( + id TEXT PRIMARY KEY, + order_id TEXT REFERENCES orders(id), + symbol TEXT NOT NULL, + side TEXT NOT NULL, + price NUMERIC NOT NULL, + quantity NUMERIC NOT NULL, + fee NUMERIC NOT NULL DEFAULT 0, + traded_at TIMESTAMPTZ NOT NULL +); + +CREATE TABLE IF NOT EXISTS positions ( + symbol TEXT PRIMARY KEY, + quantity NUMERIC NOT NULL, + avg_entry_price NUMERIC NOT NULL, + current_price NUMERIC NOT NULL, + updated_at TIMESTAMPTZ NOT NULL +); + +CREATE TABLE IF NOT EXISTS portfolio_snapshots ( + id SERIAL PRIMARY KEY, + total_value NUMERIC NOT NULL, + realized_pnl NUMERIC NOT NULL, + unrealized_pnl NUMERIC NOT NULL, + snapshot_at TIMESTAMPTZ NOT NULL +); +""" + + +class Database: + def __init__(self, dsn: str): + self._dsn = dsn + self._pool: asyncpg.Pool | None = None + + async def connect(self): + self._pool = await asyncpg.create_pool(self._dsn) + + async def close(self): + if self._pool: + await self._pool.close() + + async def init_tables(self): + async with self._pool.acquire() as conn: + await conn.execute(_INIT_SQL) + + async def insert_candle(self, candle: Candle): + sql = """ + INSERT INTO candles (symbol, timeframe, open_time, open, high, low, close, volume) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (symbol, timeframe, open_time) DO NOTHING + """ + async with self._pool.acquire() as conn: + await conn.execute( + sql, + candle.symbol, + candle.timeframe, + candle.open_time, + candle.open, + candle.high, + candle.low, + candle.close, + candle.volume, + ) + + async def insert_signal(self, signal: Signal): + sql = """ + INSERT INTO signals (id, strategy, symbol, side, price, quantity, reason, created_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + """ + async with self._pool.acquire() as conn: + await conn.execute( + sql, + signal.id, + signal.strategy, + signal.symbol, + signal.side.value, + signal.price, + signal.quantity, + signal.reason, + signal.created_at, + ) + + async def insert_order(self, order: Order): + sql = """ + INSERT INTO orders (id, signal_id, symbol, side, type, price, quantity, status, created_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + """ + async with self._pool.acquire() as conn: + await conn.execute( + sql, + order.id, + order.signal_id, + order.symbol, + order.side.value, + order.type.value, + order.price, + order.quantity, + order.status.value, + order.created_at, + ) + + async def update_order_status(self, order_id: str, status: str, filled_at=None): + sql = "UPDATE orders SET status = $1, filled_at = $2 WHERE id = $3" + async with self._pool.acquire() as conn: + await conn.execute(sql, status, filled_at, order_id) + + async def get_candles(self, symbol: str, timeframe: str, limit: int = 500) -> list[dict]: + sql = """ + SELECT * FROM candles + WHERE symbol = $1 AND timeframe = $2 + ORDER BY open_time DESC + LIMIT $3 + """ + async with self._pool.acquire() as conn: + rows = await conn.fetch(sql, symbol, timeframe, limit) + return [dict(r) for r in rows] +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +pytest shared/tests/test_db.py -v +``` + +Expected: All PASS + +- [ ] **Step 5: Commit** + +```bash +git add shared/ +git commit -m "feat(shared): add database layer with table init and CRUD operations" +``` + +--- + +## Task 5: Data Collector Service + +**Files:** +- Create: `services/data-collector/pyproject.toml` +- Create: `services/data-collector/Dockerfile` +- Create: `services/data-collector/src/data_collector/__init__.py` +- Create: `services/data-collector/src/data_collector/config.py` +- Create: `services/data-collector/src/data_collector/binance_rest.py` +- Create: `services/data-collector/src/data_collector/binance_ws.py` +- Create: `services/data-collector/src/data_collector/storage.py` +- Create: `services/data-collector/src/data_collector/main.py` +- Create: `services/data-collector/tests/test_binance_rest.py` +- Create: `services/data-collector/tests/test_storage.py` + +- [ ] **Step 1: Create pyproject.toml** + +Create `services/data-collector/pyproject.toml`: + +```toml +[project] +name = "data-collector" +version = "0.1.0" +description = "Binance market data collector service" +requires-python = ">=3.12" +dependencies = [ + "ccxt>=4.0", + "websockets>=12.0", + "trading-shared", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0", + "pytest-asyncio>=0.23", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/data_collector"] +``` + +- [ ] **Step 2: Write failing tests for binance_rest** + +Create `services/data-collector/tests/test_binance_rest.py`: + +```python +import pytest +from unittest.mock import AsyncMock, patch, MagicMock +from datetime import datetime, timezone +from decimal import Decimal + +from data_collector.binance_rest import fetch_historical_candles + + +@pytest.mark.asyncio +async def test_fetch_historical_candles_parses_response(): + mock_exchange = MagicMock() + mock_exchange.fetch_ohlcv = AsyncMock(return_value=[ + [1704067200000, 50000.0, 50100.0, 49900.0, 50050.0, 1.5], + [1704067260000, 50050.0, 50200.0, 50000.0, 50150.0, 2.0], + ]) + + candles = await fetch_historical_candles( + exchange=mock_exchange, + symbol="BTC/USDT", + timeframe="1m", + since=datetime(2026, 1, 1, tzinfo=timezone.utc), + limit=2, + ) + + assert len(candles) == 2 + assert candles[0].symbol == "BTCUSDT" + assert candles[0].close == Decimal("50050.0") + assert candles[1].volume == Decimal("2.0") + + +@pytest.mark.asyncio +async def test_fetch_historical_candles_empty_response(): + mock_exchange = MagicMock() + mock_exchange.fetch_ohlcv = AsyncMock(return_value=[]) + + candles = await fetch_historical_candles( + exchange=mock_exchange, + symbol="BTC/USDT", + timeframe="1m", + since=datetime(2026, 1, 1, tzinfo=timezone.utc), + limit=100, + ) + + assert candles == [] +``` + +- [ ] **Step 3: Run tests to verify they fail** + +```bash +cd /home/si/Private/repos/trading +pip install -e services/data-collector[dev] +pytest services/data-collector/tests/test_binance_rest.py -v +``` + +Expected: FAIL + +- [ ] **Step 4: Implement binance_rest** + +Create `services/data-collector/src/data_collector/__init__.py`: + +```python +``` + +Create `services/data-collector/src/data_collector/binance_rest.py`: + +```python +from __future__ import annotations + +from datetime import datetime, timezone +from decimal import Decimal + +from shared.models import Candle + + +async def fetch_historical_candles( + exchange, + symbol: str, + timeframe: str, + since: datetime, + limit: int = 500, +) -> list[Candle]: + since_ms = int(since.timestamp() * 1000) + ohlcv = await exchange.fetch_ohlcv(symbol, timeframe, since=since_ms, limit=limit) + + normalized_symbol = symbol.replace("/", "") + candles = [] + for row in ohlcv: + ts, o, h, l, c, v = row + candles.append( + Candle( + symbol=normalized_symbol, + timeframe=timeframe, + open_time=datetime.fromtimestamp(ts / 1000, tz=timezone.utc), + open=Decimal(str(o)), + high=Decimal(str(h)), + low=Decimal(str(l)), + close=Decimal(str(c)), + volume=Decimal(str(v)), + ) + ) + return candles +``` + +- [ ] **Step 5: Run tests to verify they pass** + +```bash +pytest services/data-collector/tests/test_binance_rest.py -v +``` + +Expected: All PASS + +- [ ] **Step 6: Write failing tests for storage** + +Create `services/data-collector/tests/test_storage.py`: + +```python +import pytest +from unittest.mock import AsyncMock +from datetime import datetime, timezone +from decimal import Decimal + +from shared.models import Candle +from data_collector.storage import CandleStorage + + +@pytest.fixture +def mock_db(): + db = AsyncMock() + db.insert_candle = AsyncMock() + return db + + +@pytest.fixture +def mock_broker(): + broker = AsyncMock() + broker.publish = AsyncMock() + return broker + + +@pytest.fixture +def sample_candle(): + return Candle( + symbol="BTCUSDT", + timeframe="1m", + open_time=datetime(2026, 1, 1, tzinfo=timezone.utc), + open=Decimal("50000"), + high=Decimal("50100"), + low=Decimal("49900"), + close=Decimal("50050"), + volume=Decimal("1.5"), + ) + + +@pytest.mark.asyncio +async def test_storage_saves_to_db_and_publishes(mock_db, mock_broker, sample_candle): + storage = CandleStorage(db=mock_db, broker=mock_broker) + await storage.store(sample_candle) + + mock_db.insert_candle.assert_called_once_with(sample_candle) + mock_broker.publish.assert_called_once() + call_args = mock_broker.publish.call_args + assert call_args[0][0] == "candles.BTCUSDT" + + +@pytest.mark.asyncio +async def test_storage_batch_store(mock_db, mock_broker, sample_candle): + storage = CandleStorage(db=mock_db, broker=mock_broker) + candles = [sample_candle, sample_candle] + await storage.store_batch(candles) + + assert mock_db.insert_candle.call_count == 2 + assert mock_broker.publish.call_count == 2 +``` + +- [ ] **Step 7: Run tests to verify they fail** + +```bash +pytest services/data-collector/tests/test_storage.py -v +``` + +Expected: FAIL + +- [ ] **Step 8: Implement storage** + +Create `services/data-collector/src/data_collector/storage.py`: + +```python +from __future__ import annotations + +from shared.broker import RedisBroker +from shared.db import Database +from shared.events import CandleEvent +from shared.models import Candle + + +class CandleStorage: + def __init__(self, db: Database, broker: RedisBroker): + self._db = db + self._broker = broker + + async def store(self, candle: Candle): + await self._db.insert_candle(candle) + event = CandleEvent(data=candle) + await self._broker.publish(f"candles.{candle.symbol}", event.to_dict()) + + async def store_batch(self, candles: list[Candle]): + for candle in candles: + await self.store(candle) +``` + +- [ ] **Step 9: Run tests to verify they pass** + +```bash +pytest services/data-collector/tests/test_storage.py -v +``` + +Expected: All PASS + +- [ ] **Step 10: Implement config, binance_ws, and main** + +Create `services/data-collector/src/data_collector/config.py`: + +```python +from shared.config import Settings + + +class CollectorConfig(Settings): + symbols: list[str] = ["BTC/USDT"] + timeframes: list[str] = ["1m"] +``` + +Create `services/data-collector/src/data_collector/binance_ws.py`: + +```python +from __future__ import annotations + +import asyncio +import json +import logging +from datetime import datetime, timezone +from decimal import Decimal + +import websockets + +from shared.models import Candle + +logger = logging.getLogger(__name__) + +BINANCE_WS_URL = "wss://stream.binance.com:9443/ws" + + +class BinanceWebSocket: + def __init__(self, symbols: list[str], timeframe: str, on_candle): + self._symbols = symbols + self._timeframe = timeframe + self._on_candle = on_candle + self._running = False + + async def start(self): + streams = [ + f"{s.lower().replace('/', '')}@kline_{self._timeframe}" + for s in self._symbols + ] + url = f"{BINANCE_WS_URL}/{'/'.join(streams)}" + self._running = True + logger.info(f"Connecting to Binance WS: {streams}") + + while self._running: + try: + async with websockets.connect(url) as ws: + async for raw in ws: + if not self._running: + break + msg = json.loads(raw) + if "k" in msg: + candle = self._parse_kline(msg["k"]) + if candle: + await self._on_candle(candle) + except websockets.ConnectionClosed: + logger.warning("WebSocket disconnected, reconnecting in 5s...") + await asyncio.sleep(5) + except Exception as e: + logger.error(f"WebSocket error: {e}, reconnecting in 5s...") + await asyncio.sleep(5) + + def stop(self): + self._running = False + + def _parse_kline(self, k: dict) -> Candle | None: + if not k.get("x"): # only closed candles + return None + return Candle( + symbol=k["s"], + timeframe=k["i"], + open_time=datetime.fromtimestamp(k["t"] / 1000, tz=timezone.utc), + open=Decimal(k["o"]), + high=Decimal(k["h"]), + low=Decimal(k["l"]), + close=Decimal(k["c"]), + volume=Decimal(k["v"]), + ) +``` + +Create `services/data-collector/src/data_collector/main.py`: + +```python +from __future__ import annotations + +import asyncio +import logging + +import ccxt.async_support as ccxt + +from shared.broker import RedisBroker +from shared.db import Database +from data_collector.binance_ws import BinanceWebSocket +from data_collector.config import CollectorConfig +from data_collector.storage import CandleStorage + +logger = logging.getLogger(__name__) + + +async def run(): + config = CollectorConfig() + logging.basicConfig(level=config.log_level) + + db = Database(config.database_url) + await db.connect() + await db.init_tables() + + broker = RedisBroker(config.redis_url) + storage = CandleStorage(db=db, broker=broker) + + ws = BinanceWebSocket( + symbols=config.symbols, + timeframe=config.timeframes[0], + on_candle=storage.store, + ) + + logger.info(f"Starting data collector: symbols={config.symbols}") + try: + await ws.start() + finally: + ws.stop() + await broker.close() + await db.close() + + +def main(): + asyncio.run(run()) + + +if __name__ == "__main__": + main() +``` + +- [ ] **Step 11: Create Dockerfile** + +Create `services/data-collector/Dockerfile`: + +```dockerfile +FROM python:3.12-slim + +WORKDIR /app + +COPY shared/ shared/ +RUN pip install --no-cache-dir ./shared + +COPY services/data-collector/ services/data-collector/ +RUN pip install --no-cache-dir ./services/data-collector + +CMD ["python", "-m", "data_collector.main"] +``` + +- [ ] **Step 12: Commit** + +```bash +git add services/data-collector/ +git commit -m "feat(data-collector): add Binance REST/WS data collection with storage pipeline" +``` + +--- + +## Task 6: Strategy Engine Service + +**Files:** +- Create: `services/strategy-engine/pyproject.toml` +- Create: `services/strategy-engine/Dockerfile` +- Create: `services/strategy-engine/src/strategy_engine/__init__.py` +- Create: `services/strategy-engine/src/strategy_engine/config.py` +- Create: `services/strategy-engine/strategies/base.py` +- Create: `services/strategy-engine/strategies/rsi_strategy.py` +- Create: `services/strategy-engine/strategies/grid_strategy.py` +- Create: `services/strategy-engine/src/strategy_engine/plugin_loader.py` +- Create: `services/strategy-engine/src/strategy_engine/engine.py` +- Create: `services/strategy-engine/src/strategy_engine/main.py` +- Create: `services/strategy-engine/tests/test_rsi_strategy.py` +- Create: `services/strategy-engine/tests/test_grid_strategy.py` +- Create: `services/strategy-engine/tests/test_plugin_loader.py` +- Create: `services/strategy-engine/tests/test_engine.py` + +- [ ] **Step 1: Create pyproject.toml** + +Create `services/strategy-engine/pyproject.toml`: + +```toml +[project] +name = "strategy-engine" +version = "0.1.0" +description = "Plugin-based strategy execution engine" +requires-python = ">=3.12" +dependencies = [ + "pandas>=2.0", + "pandas-ta>=0.3", + "trading-shared", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0", + "pytest-asyncio>=0.23", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/strategy_engine"] +``` + +- [ ] **Step 2: Implement base strategy** + +Create `services/strategy-engine/src/strategy_engine/__init__.py`: + +```python +``` + +Create `services/strategy-engine/strategies/base.py`: + +```python +from __future__ import annotations + +from abc import ABC, abstractmethod + +from shared.models import Candle, Signal + + +class BaseStrategy(ABC): + name: str = "base" + + @abstractmethod + def on_candle(self, candle: Candle) -> Signal | None: + pass + + @abstractmethod + def configure(self, params: dict) -> None: + pass + + def reset(self) -> None: + pass +``` + +- [ ] **Step 3: Write failing tests for RSI strategy** + +Create `services/strategy-engine/tests/test_rsi_strategy.py`: + +```python +from datetime import datetime, timezone +from decimal import Decimal + +from shared.models import Candle, OrderSide + + +def make_candle(close: float, idx: int = 0) -> Candle: + return Candle( + symbol="BTCUSDT", + timeframe="1m", + open_time=datetime(2026, 1, 1, minute=idx, tzinfo=timezone.utc), + open=Decimal(str(close)), + high=Decimal(str(close + 10)), + low=Decimal(str(close - 10)), + close=Decimal(str(close)), + volume=Decimal("1.0"), + ) + + +def test_rsi_strategy_no_signal_insufficient_data(): + from strategy_engine.strategies.rsi_strategy import RsiStrategy + + strategy = RsiStrategy() + strategy.configure({"period": 14, "oversold": 30, "overbought": 70, "quantity": 0.01}) + + signal = strategy.on_candle(make_candle(50000)) + assert signal is None + + +def test_rsi_strategy_buy_signal_on_oversold(): + from strategy_engine.strategies.rsi_strategy import RsiStrategy + + strategy = RsiStrategy() + strategy.configure({"period": 14, "oversold": 30, "overbought": 70, "quantity": 0.01}) + + # Feed declining prices to push RSI below 30 + prices = [50000 - i * 100 for i in range(20)] + signal = None + for i, p in enumerate(prices): + signal = strategy.on_candle(make_candle(p, idx=i)) + + # After sustained drop, RSI should be oversold → BUY signal + if signal is not None: + assert signal.side == OrderSide.BUY + assert signal.strategy == "rsi" +``` + +- [ ] **Step 4: Run tests to verify they fail** + +```bash +pip install -e services/strategy-engine[dev] +pytest services/strategy-engine/tests/test_rsi_strategy.py -v +``` + +Expected: FAIL + +- [ ] **Step 5: Implement RSI strategy** + +Create `services/strategy-engine/strategies/rsi_strategy.py`: + +```python +from __future__ import annotations + +from collections import deque +from decimal import Decimal + +import pandas as pd +import pandas_ta as ta + +from shared.models import Candle, Signal, OrderSide +from strategies.base import BaseStrategy + + +class RsiStrategy(BaseStrategy): + name = "rsi" + + def __init__(self): + self._closes: deque[float] = deque(maxlen=200) + self._period: int = 14 + self._oversold: float = 30 + self._overbought: float = 70 + self._quantity: Decimal = Decimal("0.01") + + def configure(self, params: dict) -> None: + self._period = params.get("period", 14) + self._oversold = params.get("oversold", 30) + self._overbought = params.get("overbought", 70) + self._quantity = Decimal(str(params.get("quantity", 0.01))) + + def on_candle(self, candle: Candle) -> Signal | None: + self._closes.append(float(candle.close)) + + if len(self._closes) < self._period + 1: + return None + + series = pd.Series(list(self._closes)) + rsi = ta.rsi(series, length=self._period) + current_rsi = rsi.iloc[-1] + + if current_rsi < self._oversold: + return Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.BUY, + price=candle.close, + quantity=self._quantity, + reason=f"RSI={current_rsi:.1f} < {self._oversold}", + ) + elif current_rsi > self._overbought: + return Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.SELL, + price=candle.close, + quantity=self._quantity, + reason=f"RSI={current_rsi:.1f} > {self._overbought}", + ) + return None + + def reset(self) -> None: + self._closes.clear() +``` + +- [ ] **Step 6: Run tests to verify they pass** + +```bash +pytest services/strategy-engine/tests/test_rsi_strategy.py -v +``` + +Expected: All PASS + +- [ ] **Step 7: Write failing tests for grid strategy** + +Create `services/strategy-engine/tests/test_grid_strategy.py`: + +```python +from datetime import datetime, timezone +from decimal import Decimal + +from shared.models import Candle, OrderSide + + +def make_candle(close: float, idx: int = 0) -> Candle: + return Candle( + symbol="BTCUSDT", + timeframe="1m", + open_time=datetime(2026, 1, 1, minute=idx, tzinfo=timezone.utc), + open=Decimal(str(close)), + high=Decimal(str(close + 10)), + low=Decimal(str(close - 10)), + close=Decimal(str(close)), + volume=Decimal("1.0"), + ) + + +def test_grid_strategy_buy_at_lower_grid(): + from strategy_engine.strategies.grid_strategy import GridStrategy + + strategy = GridStrategy() + strategy.configure({ + "lower_price": 48000, + "upper_price": 52000, + "grid_count": 5, + "quantity": 0.01, + }) + + # Price at grid level should trigger BUY + signal = strategy.on_candle(make_candle(48000)) + # First candle sets reference, no signal + signal = strategy.on_candle(make_candle(49000, idx=1)) + # Moving down through a grid level + signal = strategy.on_candle(make_candle(48000, idx=2)) + if signal is not None: + assert signal.side == OrderSide.BUY + + +def test_grid_strategy_sell_at_upper_grid(): + from strategy_engine.strategies.grid_strategy import GridStrategy + + strategy = GridStrategy() + strategy.configure({ + "lower_price": 48000, + "upper_price": 52000, + "grid_count": 5, + "quantity": 0.01, + }) + + signal = strategy.on_candle(make_candle(50000)) + signal = strategy.on_candle(make_candle(51000, idx=1)) + signal = strategy.on_candle(make_candle(52000, idx=2)) + if signal is not None: + assert signal.side == OrderSide.SELL + + +def test_grid_strategy_no_signal_in_same_zone(): + from strategy_engine.strategies.grid_strategy import GridStrategy + + strategy = GridStrategy() + strategy.configure({ + "lower_price": 48000, + "upper_price": 52000, + "grid_count": 5, + "quantity": 0.01, + }) + + strategy.on_candle(make_candle(50000)) + signal = strategy.on_candle(make_candle(50050, idx=1)) + assert signal is None # same grid zone, no signal +``` + +- [ ] **Step 8: Run tests to verify they fail** + +```bash +pytest services/strategy-engine/tests/test_grid_strategy.py -v +``` + +Expected: FAIL + +- [ ] **Step 9: Implement grid strategy** + +Create `services/strategy-engine/strategies/grid_strategy.py`: + +```python +from __future__ import annotations + +from decimal import Decimal + +from shared.models import Candle, Signal, OrderSide +from strategies.base import BaseStrategy + + +class GridStrategy(BaseStrategy): + name = "grid" + + def __init__(self): + self._lower: float = 0 + self._upper: float = 0 + self._grid_count: int = 5 + self._quantity: Decimal = Decimal("0.01") + self._grid_levels: list[float] = [] + self._last_zone: int | None = None + + def configure(self, params: dict) -> None: + self._lower = float(params["lower_price"]) + self._upper = float(params["upper_price"]) + self._grid_count = params.get("grid_count", 5) + self._quantity = Decimal(str(params.get("quantity", 0.01))) + step = (self._upper - self._lower) / self._grid_count + self._grid_levels = [self._lower + step * i for i in range(self._grid_count + 1)] + + def on_candle(self, candle: Candle) -> Signal | None: + price = float(candle.close) + current_zone = self._get_zone(price) + + if self._last_zone is None: + self._last_zone = current_zone + return None + + signal = None + if current_zone < self._last_zone: + signal = Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.BUY, + price=candle.close, + quantity=self._quantity, + reason=f"Price crossed grid down: zone {self._last_zone}->{current_zone}", + ) + elif current_zone > self._last_zone: + signal = Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.SELL, + price=candle.close, + quantity=self._quantity, + reason=f"Price crossed grid up: zone {self._last_zone}->{current_zone}", + ) + + self._last_zone = current_zone + return signal + + def _get_zone(self, price: float) -> int: + for i, level in enumerate(self._grid_levels): + if price < level: + return i + return len(self._grid_levels) + + def reset(self) -> None: + self._last_zone = None +``` + +- [ ] **Step 10: Run tests to verify they pass** + +```bash +pytest services/strategy-engine/tests/test_grid_strategy.py -v +``` + +Expected: All PASS + +- [ ] **Step 11: Write failing tests for plugin_loader** + +Create `services/strategy-engine/tests/test_plugin_loader.py`: + +```python +import pytest +from pathlib import Path + +from strategy_engine.plugin_loader import load_strategies + + +def test_load_strategies_finds_rsi_and_grid(): + strategies_dir = Path(__file__).parent.parent / "strategies" + loaded = load_strategies(strategies_dir) + + names = {s.name for s in loaded} + assert "rsi" in names + assert "grid" in names + + +def test_load_strategies_skips_base(): + strategies_dir = Path(__file__).parent.parent / "strategies" + loaded = load_strategies(strategies_dir) + + names = {s.name for s in loaded} + assert "base" not in names +``` + +- [ ] **Step 12: Run tests to verify they fail** + +```bash +pytest services/strategy-engine/tests/test_plugin_loader.py -v +``` + +Expected: FAIL + +- [ ] **Step 13: Implement plugin_loader** + +Create `services/strategy-engine/src/strategy_engine/plugin_loader.py`: + +```python +from __future__ import annotations + +import importlib.util +import logging +from pathlib import Path + +from strategies.base import BaseStrategy + +logger = logging.getLogger(__name__) + + +def load_strategies(strategies_dir: Path) -> list[BaseStrategy]: + loaded = [] + for path in strategies_dir.glob("*.py"): + if path.stem.startswith("_") or path.stem == "base": + continue + + spec = importlib.util.spec_from_file_location(path.stem, path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + for attr_name in dir(module): + attr = getattr(module, attr_name) + if ( + isinstance(attr, type) + and issubclass(attr, BaseStrategy) + and attr is not BaseStrategy + ): + instance = attr() + loaded.append(instance) + logger.info(f"Loaded strategy: {instance.name}") + + return loaded +``` + +- [ ] **Step 14: Run tests to verify they pass** + +```bash +pytest services/strategy-engine/tests/test_plugin_loader.py -v +``` + +Expected: All PASS + +- [ ] **Step 15: Write failing tests for engine** + +Create `services/strategy-engine/tests/test_engine.py`: + +```python +import pytest +from unittest.mock import AsyncMock, MagicMock +from datetime import datetime, timezone +from decimal import Decimal + +from shared.models import Candle, OrderSide +from shared.events import CandleEvent +from strategy_engine.engine import StrategyEngine + + +def make_candle_event() -> dict: + candle = Candle( + symbol="BTCUSDT", + timeframe="1m", + open_time=datetime(2026, 1, 1, tzinfo=timezone.utc), + open=Decimal("50000"), + high=Decimal("50100"), + low=Decimal("49900"), + close=Decimal("50050"), + volume=Decimal("1.0"), + ) + return CandleEvent(data=candle).to_dict() + + +@pytest.mark.asyncio +async def test_engine_dispatches_candle_to_strategies(): + mock_strategy = MagicMock() + mock_strategy.name = "test" + mock_strategy.on_candle.return_value = None + + mock_broker = AsyncMock() + mock_broker.read = AsyncMock(return_value=[make_candle_event()]) + + engine = StrategyEngine(broker=mock_broker, strategies=[mock_strategy]) + await engine.process_once(stream="candles.BTCUSDT", last_id="0-0") + + mock_strategy.on_candle.assert_called_once() + + +@pytest.mark.asyncio +async def test_engine_publishes_signal_when_strategy_returns_one(): + from shared.models import Signal + + mock_signal = Signal( + strategy="test", + symbol="BTCUSDT", + side=OrderSide.BUY, + price=Decimal("50000"), + quantity=Decimal("0.01"), + reason="test reason", + ) + mock_strategy = MagicMock() + mock_strategy.name = "test" + mock_strategy.on_candle.return_value = mock_signal + + mock_broker = AsyncMock() + mock_broker.read = AsyncMock(return_value=[make_candle_event()]) + mock_broker.publish = AsyncMock() + + engine = StrategyEngine(broker=mock_broker, strategies=[mock_strategy]) + await engine.process_once(stream="candles.BTCUSDT", last_id="0-0") + + mock_broker.publish.assert_called_once() + call_args = mock_broker.publish.call_args + assert call_args[0][0] == "signals" +``` + +- [ ] **Step 16: Run tests to verify they fail** + +```bash +pytest services/strategy-engine/tests/test_engine.py -v +``` + +Expected: FAIL + +- [ ] **Step 17: Implement engine** + +Create `services/strategy-engine/src/strategy_engine/engine.py`: + +```python +from __future__ import annotations + +import logging + +from shared.broker import RedisBroker +from shared.events import Event, SignalEvent +from shared.models import Signal +from strategies.base import BaseStrategy + +logger = logging.getLogger(__name__) + + +class StrategyEngine: + def __init__(self, broker: RedisBroker, strategies: list[BaseStrategy]): + self._broker = broker + self._strategies = strategies + + async def process_once(self, stream: str, last_id: str) -> str: + messages = await self._broker.read(stream, last_id=last_id, count=10, block=1000) + + for msg in messages: + event = Event.from_dict(msg) + candle = event.data + + for strategy in self._strategies: + signal = strategy.on_candle(candle) + if signal is not None: + logger.info(f"Signal from {strategy.name}: {signal.side} {signal.symbol}") + await self._publish_signal(signal) + + return last_id + + async def _publish_signal(self, signal: Signal): + event = SignalEvent(data=signal) + await self._broker.publish("signals", event.to_dict()) +``` + +- [ ] **Step 18: Run tests to verify they pass** + +```bash +pytest services/strategy-engine/tests/test_engine.py -v +``` + +Expected: All PASS + +- [ ] **Step 19: Implement config and main** + +Create `services/strategy-engine/src/strategy_engine/config.py`: + +```python +from shared.config import Settings + + +class StrategyConfig(Settings): + symbols: list[str] = ["BTC/USDT"] + timeframes: list[str] = ["1m"] + strategy_params: dict = {} +``` + +Create `services/strategy-engine/src/strategy_engine/main.py`: + +```python +from __future__ import annotations + +import asyncio +import logging +from pathlib import Path + +from shared.broker import RedisBroker +from strategy_engine.config import StrategyConfig +from strategy_engine.engine import StrategyEngine +from strategy_engine.plugin_loader import load_strategies + +logger = logging.getLogger(__name__) + + +async def run(): + config = StrategyConfig() + logging.basicConfig(level=config.log_level) + + broker = RedisBroker(config.redis_url) + strategies_dir = Path(__file__).parent.parent.parent / "strategies" + strategies = load_strategies(strategies_dir) + + for s in strategies: + params = config.strategy_params.get(s.name, {}) + s.configure(params) + + engine = StrategyEngine(broker=broker, strategies=strategies) + symbols = [s.replace("/", "") for s in config.symbols] + + logger.info(f"Starting strategy engine: strategies={[s.name for s in strategies]}") + last_ids = {sym: "0-0" for sym in symbols} + try: + while True: + for sym in symbols: + stream = f"candles.{sym}" + last_ids[sym] = await engine.process_once(stream, last_ids[sym]) + finally: + await broker.close() + + +def main(): + asyncio.run(run()) + + +if __name__ == "__main__": + main() +``` + +- [ ] **Step 20: Create Dockerfile** + +Create `services/strategy-engine/Dockerfile`: + +```dockerfile +FROM python:3.12-slim + +WORKDIR /app + +COPY shared/ shared/ +RUN pip install --no-cache-dir ./shared + +COPY services/strategy-engine/ services/strategy-engine/ +RUN pip install --no-cache-dir ./services/strategy-engine + +CMD ["python", "-m", "strategy_engine.main"] +``` + +- [ ] **Step 21: Commit** + +```bash +git add services/strategy-engine/ +git commit -m "feat(strategy-engine): add plugin-based strategy engine with RSI and grid strategies" +``` + +--- + +## Task 7: Order Executor Service + +**Files:** +- Create: `services/order-executor/pyproject.toml` +- Create: `services/order-executor/Dockerfile` +- Create: `services/order-executor/src/order_executor/__init__.py` +- Create: `services/order-executor/src/order_executor/config.py` +- Create: `services/order-executor/src/order_executor/risk_manager.py` +- Create: `services/order-executor/src/order_executor/executor.py` +- Create: `services/order-executor/src/order_executor/main.py` +- Create: `services/order-executor/tests/test_risk_manager.py` +- Create: `services/order-executor/tests/test_executor.py` + +- [ ] **Step 1: Create pyproject.toml** + +Create `services/order-executor/pyproject.toml`: + +```toml +[project] +name = "order-executor" +version = "0.1.0" +description = "Order execution service with risk management" +requires-python = ">=3.12" +dependencies = [ + "ccxt>=4.0", + "trading-shared", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0", + "pytest-asyncio>=0.23", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/order_executor"] +``` + +- [ ] **Step 2: Write failing tests for risk_manager** + +Create `services/order-executor/tests/test_risk_manager.py`: + +```python +import pytest +from decimal import Decimal + +from shared.models import Signal, OrderSide +from order_executor.risk_manager import RiskManager, RiskCheckResult + + +@pytest.fixture +def risk_manager(): + return RiskManager( + max_position_size=Decimal("0.1"), + stop_loss_pct=Decimal("5"), + daily_loss_limit_pct=Decimal("10"), + ) + + +def make_signal(side=OrderSide.BUY, quantity="0.01", price="50000") -> Signal: + return Signal( + strategy="test", + symbol="BTCUSDT", + side=side, + price=Decimal(price), + quantity=Decimal(quantity), + reason="test", + ) + + +def test_risk_check_passes_normal_order(risk_manager): + signal = make_signal() + balance = Decimal("10000") + positions = {} + daily_pnl = Decimal("0") + + result = risk_manager.check(signal, balance, positions, daily_pnl) + assert result.allowed is True + + +def test_risk_check_rejects_exceeding_position_size(risk_manager): + signal = make_signal(quantity="5") # 5 BTC * 50000 = 250000 > 10% of balance + balance = Decimal("10000") + positions = {} + daily_pnl = Decimal("0") + + result = risk_manager.check(signal, balance, positions, daily_pnl) + assert result.allowed is False + assert "position size" in result.reason.lower() + + +def test_risk_check_rejects_daily_loss_exceeded(risk_manager): + signal = make_signal() + balance = Decimal("10000") + positions = {} + daily_pnl = Decimal("-1100") # -11% > -10% limit + + result = risk_manager.check(signal, balance, positions, daily_pnl) + assert result.allowed is False + assert "daily loss" in result.reason.lower() + + +def test_risk_check_rejects_insufficient_balance(risk_manager): + signal = make_signal(quantity="0.01", price="50000") # cost = 500 + balance = Decimal("100") # not enough + positions = {} + daily_pnl = Decimal("0") + + result = risk_manager.check(signal, balance, positions, daily_pnl) + assert result.allowed is False + assert "balance" in result.reason.lower() +``` + +- [ ] **Step 3: Run tests to verify they fail** + +```bash +pip install -e services/order-executor[dev] +pytest services/order-executor/tests/test_risk_manager.py -v +``` + +Expected: FAIL + +- [ ] **Step 4: Implement risk_manager** + +Create `services/order-executor/src/order_executor/__init__.py`: + +```python +``` + +Create `services/order-executor/src/order_executor/risk_manager.py`: + +```python +from __future__ import annotations + +from dataclasses import dataclass +from decimal import Decimal + +from shared.models import Signal, OrderSide + + +@dataclass +class RiskCheckResult: + allowed: bool + reason: str = "" + + +class RiskManager: + def __init__( + self, + max_position_size: Decimal, + stop_loss_pct: Decimal, + daily_loss_limit_pct: Decimal, + ): + self._max_position_size = max_position_size + self._stop_loss_pct = stop_loss_pct + self._daily_loss_limit_pct = daily_loss_limit_pct + + def check( + self, + signal: Signal, + balance: Decimal, + positions: dict[str, Decimal], + daily_pnl: Decimal, + ) -> RiskCheckResult: + # Check daily loss limit + daily_loss_pct = (daily_pnl / balance) * 100 if balance > 0 else Decimal("0") + if daily_loss_pct < -self._daily_loss_limit_pct: + return RiskCheckResult( + allowed=False, + reason=f"Daily loss limit exceeded: {daily_loss_pct:.1f}%", + ) + + if signal.side == OrderSide.BUY: + order_cost = signal.price * signal.quantity + + # Check sufficient balance + if order_cost > balance: + return RiskCheckResult( + allowed=False, + reason=f"Insufficient balance: need {order_cost}, have {balance}", + ) + + # Check max position size + current_position_value = positions.get(signal.symbol, Decimal("0")) * signal.price + new_position_value = current_position_value + order_cost + position_ratio = new_position_value / balance if balance > 0 else Decimal("999") + if position_ratio > self._max_position_size: + return RiskCheckResult( + allowed=False, + reason=f"Position size exceeded: {position_ratio:.2f} > {self._max_position_size}", + ) + + return RiskCheckResult(allowed=True) +``` + +- [ ] **Step 5: Run tests to verify they pass** + +```bash +pytest services/order-executor/tests/test_risk_manager.py -v +``` + +Expected: All PASS + +- [ ] **Step 6: Write failing tests for executor** + +Create `services/order-executor/tests/test_executor.py`: + +```python +import pytest +from unittest.mock import AsyncMock, MagicMock +from decimal import Decimal + +from shared.models import Signal, OrderSide, OrderStatus +from order_executor.executor import OrderExecutor +from order_executor.risk_manager import RiskCheckResult + + +def make_signal() -> Signal: + return Signal( + strategy="test", + symbol="BTCUSDT", + side=OrderSide.BUY, + price=Decimal("50000"), + quantity=Decimal("0.01"), + reason="test", + ) + + +@pytest.mark.asyncio +async def test_executor_places_order_when_risk_passes(): + mock_exchange = MagicMock() + mock_exchange.create_order = AsyncMock(return_value={ + "id": "binance_123", + "status": "closed", + "filled": 0.01, + "price": 50000, + }) + mock_exchange.fetch_balance = AsyncMock(return_value={ + "USDT": {"free": 10000}, + }) + + mock_risk = MagicMock() + mock_risk.check.return_value = RiskCheckResult(allowed=True) + + mock_broker = AsyncMock() + mock_db = AsyncMock() + + executor = OrderExecutor( + exchange=mock_exchange, + risk_manager=mock_risk, + broker=mock_broker, + db=mock_db, + dry_run=False, + ) + + signal = make_signal() + order = await executor.execute(signal) + + assert order is not None + assert order.status == OrderStatus.FILLED + mock_exchange.create_order.assert_called_once() + + +@pytest.mark.asyncio +async def test_executor_rejects_when_risk_fails(): + mock_exchange = MagicMock() + mock_exchange.fetch_balance = AsyncMock(return_value={ + "USDT": {"free": 10000}, + }) + + mock_risk = MagicMock() + mock_risk.check.return_value = RiskCheckResult(allowed=False, reason="too risky") + + mock_broker = AsyncMock() + mock_db = AsyncMock() + + executor = OrderExecutor( + exchange=mock_exchange, + risk_manager=mock_risk, + broker=mock_broker, + db=mock_db, + dry_run=False, + ) + + signal = make_signal() + order = await executor.execute(signal) + assert order is None + mock_exchange.create_order.assert_not_called() + + +@pytest.mark.asyncio +async def test_executor_dry_run_does_not_call_exchange(): + mock_exchange = MagicMock() + mock_exchange.fetch_balance = AsyncMock(return_value={ + "USDT": {"free": 10000}, + }) + + mock_risk = MagicMock() + mock_risk.check.return_value = RiskCheckResult(allowed=True) + + mock_broker = AsyncMock() + mock_db = AsyncMock() + + executor = OrderExecutor( + exchange=mock_exchange, + risk_manager=mock_risk, + broker=mock_broker, + db=mock_db, + dry_run=True, + ) + + signal = make_signal() + order = await executor.execute(signal) + + assert order is not None + assert order.status == OrderStatus.FILLED + mock_exchange.create_order.assert_not_called() +``` + +- [ ] **Step 7: Run tests to verify they fail** + +```bash +pytest services/order-executor/tests/test_executor.py -v +``` + +Expected: FAIL + +- [ ] **Step 8: Implement executor** + +Create `services/order-executor/src/order_executor/executor.py`: + +```python +from __future__ import annotations + +import logging +from datetime import datetime, timezone +from decimal import Decimal + +from shared.broker import RedisBroker +from shared.db import Database +from shared.events import OrderEvent +from shared.models import Order, OrderSide, OrderStatus, OrderType, Signal +from order_executor.risk_manager import RiskManager + +logger = logging.getLogger(__name__) + + +class OrderExecutor: + def __init__( + self, + exchange, + risk_manager: RiskManager, + broker: RedisBroker, + db: Database, + dry_run: bool = True, + ): + self._exchange = exchange + self._risk = risk_manager + self._broker = broker + self._db = db + self._dry_run = dry_run + + async def execute(self, signal: Signal) -> Order | None: + balance_info = await self._exchange.fetch_balance() + balance = Decimal(str(balance_info.get("USDT", {}).get("free", 0))) + positions: dict[str, Decimal] = {} + daily_pnl = Decimal("0") + + result = self._risk.check(signal, balance, positions, daily_pnl) + if not result.allowed: + logger.warning(f"Risk check failed: {result.reason}") + return None + + order = Order( + signal_id=signal.id, + symbol=signal.symbol, + side=signal.side, + type=OrderType.MARKET, + price=signal.price, + quantity=signal.quantity, + ) + + if self._dry_run: + logger.info(f"[DRY RUN] Would execute: {order.side} {order.quantity} {order.symbol}") + order.status = OrderStatus.FILLED + order.filled_at = datetime.now(timezone.utc) + else: + try: + result = await self._exchange.create_order( + symbol=signal.symbol.replace("USDT", "/USDT"), + type="market", + side=signal.side.value.lower(), + amount=float(signal.quantity), + ) + order.status = OrderStatus.FILLED + order.filled_at = datetime.now(timezone.utc) + logger.info(f"Order filled: {order.id}") + except Exception as e: + order.status = OrderStatus.FAILED + logger.error(f"Order failed: {e}") + + await self._db.insert_order(order) + event = OrderEvent(data=order) + await self._broker.publish("orders", event.to_dict()) + + return order +``` + +- [ ] **Step 9: Run tests to verify they pass** + +```bash +pytest services/order-executor/tests/test_executor.py -v +``` + +Expected: All PASS + +- [ ] **Step 10: Implement config and main** + +Create `services/order-executor/src/order_executor/config.py`: + +```python +from shared.config import Settings + + +class ExecutorConfig(Settings): + pass +``` + +Create `services/order-executor/src/order_executor/main.py`: + +```python +from __future__ import annotations + +import asyncio +import logging + +import ccxt.async_support as ccxt + +from shared.broker import RedisBroker +from shared.db import Database +from shared.events import Event +from order_executor.config import ExecutorConfig +from order_executor.executor import OrderExecutor +from order_executor.risk_manager import RiskManager + +logger = logging.getLogger(__name__) + + +async def run(): + config = ExecutorConfig() + logging.basicConfig(level=config.log_level) + + db = Database(config.database_url) + await db.connect() + + broker = RedisBroker(config.redis_url) + + exchange = ccxt.binance({ + "apiKey": config.binance_api_key, + "secret": config.binance_api_secret, + }) + + risk_manager = RiskManager( + max_position_size=config.risk_max_position_size, + stop_loss_pct=config.risk_stop_loss_pct, + daily_loss_limit_pct=config.risk_daily_loss_limit_pct, + ) + + executor = OrderExecutor( + exchange=exchange, + risk_manager=risk_manager, + broker=broker, + db=db, + dry_run=config.dry_run, + ) + + logger.info(f"Starting order executor (dry_run={config.dry_run})") + last_id = "0-0" + try: + while True: + messages = await broker.read("signals", last_id=last_id, count=10, block=1000) + for msg in messages: + event = Event.from_dict(msg) + await executor.execute(event.data) + finally: + await exchange.close() + await broker.close() + await db.close() + + +def main(): + asyncio.run(run()) + + +if __name__ == "__main__": + main() +``` + +- [ ] **Step 11: Create Dockerfile** + +Create `services/order-executor/Dockerfile`: + +```dockerfile +FROM python:3.12-slim + +WORKDIR /app + +COPY shared/ shared/ +RUN pip install --no-cache-dir ./shared + +COPY services/order-executor/ services/order-executor/ +RUN pip install --no-cache-dir ./services/order-executor + +CMD ["python", "-m", "order_executor.main"] +``` + +- [ ] **Step 12: Commit** + +```bash +git add services/order-executor/ +git commit -m "feat(order-executor): add order execution with risk management and dry-run mode" +``` + +--- + +## Task 8: Portfolio Manager Service + +**Files:** +- Create: `services/portfolio-manager/pyproject.toml` +- Create: `services/portfolio-manager/Dockerfile` +- Create: `services/portfolio-manager/src/portfolio_manager/__init__.py` +- Create: `services/portfolio-manager/src/portfolio_manager/config.py` +- Create: `services/portfolio-manager/src/portfolio_manager/portfolio.py` +- Create: `services/portfolio-manager/src/portfolio_manager/pnl.py` +- Create: `services/portfolio-manager/src/portfolio_manager/main.py` +- Create: `services/portfolio-manager/tests/test_portfolio.py` +- Create: `services/portfolio-manager/tests/test_pnl.py` + +- [ ] **Step 1: Create pyproject.toml** + +Create `services/portfolio-manager/pyproject.toml`: + +```toml +[project] +name = "portfolio-manager" +version = "0.1.0" +description = "Portfolio tracking and PnL calculation service" +requires-python = ">=3.12" +dependencies = [ + "trading-shared", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0", + "pytest-asyncio>=0.23", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/portfolio_manager"] +``` + +- [ ] **Step 2: Write failing tests for pnl** + +Create `services/portfolio-manager/tests/test_pnl.py`: + +```python +from decimal import Decimal + +from portfolio_manager.pnl import calculate_unrealized_pnl, calculate_realized_pnl + + +def test_unrealized_pnl_profit(): + result = calculate_unrealized_pnl( + quantity=Decimal("0.1"), + avg_entry_price=Decimal("50000"), + current_price=Decimal("55000"), + ) + assert result == Decimal("500") # 0.1 * (55000 - 50000) + + +def test_unrealized_pnl_loss(): + result = calculate_unrealized_pnl( + quantity=Decimal("0.1"), + avg_entry_price=Decimal("50000"), + current_price=Decimal("45000"), + ) + assert result == Decimal("-500") + + +def test_realized_pnl_single_trade(): + result = calculate_realized_pnl( + buy_price=Decimal("50000"), + sell_price=Decimal("55000"), + quantity=Decimal("0.1"), + fee=Decimal("5.5"), + ) + assert result == Decimal("494.5") # 0.1 * (55000 - 50000) - 5.5 +``` + +- [ ] **Step 3: Run tests to verify they fail** + +```bash +pip install -e services/portfolio-manager[dev] +pytest services/portfolio-manager/tests/test_pnl.py -v +``` + +Expected: FAIL + +- [ ] **Step 4: Implement pnl** + +Create `services/portfolio-manager/src/portfolio_manager/__init__.py`: + +```python +``` + +Create `services/portfolio-manager/src/portfolio_manager/pnl.py`: + +```python +from decimal import Decimal + + +def calculate_unrealized_pnl( + quantity: Decimal, + avg_entry_price: Decimal, + current_price: Decimal, +) -> Decimal: + return quantity * (current_price - avg_entry_price) + + +def calculate_realized_pnl( + buy_price: Decimal, + sell_price: Decimal, + quantity: Decimal, + fee: Decimal = Decimal("0"), +) -> Decimal: + return quantity * (sell_price - buy_price) - fee +``` + +- [ ] **Step 5: Run tests to verify they pass** + +```bash +pytest services/portfolio-manager/tests/test_pnl.py -v +``` + +Expected: All PASS + +- [ ] **Step 6: Write failing tests for portfolio** + +Create `services/portfolio-manager/tests/test_portfolio.py`: + +```python +import pytest +from decimal import Decimal + +from shared.models import Order, OrderSide, OrderType, OrderStatus +from portfolio_manager.portfolio import PortfolioTracker + + +@pytest.fixture +def tracker(): + return PortfolioTracker() + + +def make_order(side=OrderSide.BUY, price="50000", quantity="0.1") -> Order: + return Order( + signal_id="sig_1", + symbol="BTCUSDT", + side=side, + type=OrderType.MARKET, + price=Decimal(price), + quantity=Decimal(quantity), + status=OrderStatus.FILLED, + ) + + +def test_portfolio_add_buy_order(tracker): + order = make_order(side=OrderSide.BUY) + tracker.apply_order(order) + + pos = tracker.get_position("BTCUSDT") + assert pos.quantity == Decimal("0.1") + assert pos.avg_entry_price == Decimal("50000") + + +def test_portfolio_add_multiple_buys(tracker): + tracker.apply_order(make_order(price="50000", quantity="0.1")) + tracker.apply_order(make_order(price="52000", quantity="0.1")) + + pos = tracker.get_position("BTCUSDT") + assert pos.quantity == Decimal("0.2") + assert pos.avg_entry_price == Decimal("51000") # weighted avg + + +def test_portfolio_sell_reduces_position(tracker): + tracker.apply_order(make_order(side=OrderSide.BUY, price="50000", quantity="0.2")) + tracker.apply_order(make_order(side=OrderSide.SELL, price="55000", quantity="0.1")) + + pos = tracker.get_position("BTCUSDT") + assert pos.quantity == Decimal("0.1") + assert pos.avg_entry_price == Decimal("50000") # entry price unchanged + + +def test_portfolio_no_position_returns_none(tracker): + pos = tracker.get_position("ETHUSDT") + assert pos is None +``` + +- [ ] **Step 7: Run tests to verify they fail** + +```bash +pytest services/portfolio-manager/tests/test_portfolio.py -v +``` + +Expected: FAIL + +- [ ] **Step 8: Implement portfolio** + +Create `services/portfolio-manager/src/portfolio_manager/portfolio.py`: + +```python +from __future__ import annotations + +from decimal import Decimal + +from shared.models import Order, OrderSide, Position + + +class PortfolioTracker: + def __init__(self): + self._positions: dict[str, _PositionState] = {} + + def apply_order(self, order: Order) -> None: + if order.symbol not in self._positions: + self._positions[order.symbol] = _PositionState() + + state = self._positions[order.symbol] + if order.side == OrderSide.BUY: + total_cost = state.avg_entry * state.quantity + order.price * order.quantity + state.quantity += order.quantity + state.avg_entry = total_cost / state.quantity if state.quantity > 0 else Decimal("0") + elif order.side == OrderSide.SELL: + state.quantity -= order.quantity + if state.quantity <= 0: + state.quantity = Decimal("0") + state.avg_entry = Decimal("0") + + def get_position(self, symbol: str) -> Position | None: + state = self._positions.get(symbol) + if state is None or state.quantity == 0: + return None + return Position( + symbol=symbol, + quantity=state.quantity, + avg_entry_price=state.avg_entry, + current_price=Decimal("0"), + ) + + def get_all_positions(self) -> list[Position]: + positions = [] + for symbol in self._positions: + pos = self.get_position(symbol) + if pos is not None: + positions.append(pos) + return positions + + +class _PositionState: + def __init__(self): + self.quantity = Decimal("0") + self.avg_entry = Decimal("0") +``` + +- [ ] **Step 9: Run tests to verify they pass** + +```bash +pytest services/portfolio-manager/tests/test_portfolio.py -v +``` + +Expected: All PASS + +- [ ] **Step 10: Implement config and main** + +Create `services/portfolio-manager/src/portfolio_manager/config.py`: + +```python +from shared.config import Settings + + +class PortfolioConfig(Settings): + snapshot_interval_hours: int = 24 +``` + +Create `services/portfolio-manager/src/portfolio_manager/main.py`: + +```python +from __future__ import annotations + +import asyncio +import logging + +from shared.broker import RedisBroker +from shared.db import Database +from shared.events import Event +from portfolio_manager.config import PortfolioConfig +from portfolio_manager.portfolio import PortfolioTracker + +logger = logging.getLogger(__name__) + + +async def run(): + config = PortfolioConfig() + logging.basicConfig(level=config.log_level) + + db = Database(config.database_url) + await db.connect() + + broker = RedisBroker(config.redis_url) + tracker = PortfolioTracker() + + logger.info("Starting portfolio manager") + last_id = "0-0" + try: + while True: + messages = await broker.read("orders", last_id=last_id, count=10, block=1000) + for msg in messages: + event = Event.from_dict(msg) + order = event.data + tracker.apply_order(order) + logger.info(f"Position updated: {order.symbol}") + finally: + await broker.close() + await db.close() + + +def main(): + asyncio.run(run()) + + +if __name__ == "__main__": + main() +``` + +- [ ] **Step 11: Create Dockerfile** + +Create `services/portfolio-manager/Dockerfile`: + +```dockerfile +FROM python:3.12-slim + +WORKDIR /app + +COPY shared/ shared/ +RUN pip install --no-cache-dir ./shared + +COPY services/portfolio-manager/ services/portfolio-manager/ +RUN pip install --no-cache-dir ./services/portfolio-manager + +CMD ["python", "-m", "portfolio_manager.main"] +``` + +- [ ] **Step 12: Commit** + +```bash +git add services/portfolio-manager/ +git commit -m "feat(portfolio-manager): add portfolio tracking and PnL calculation" +``` + +--- + +## Task 9: Backtester Service + +**Files:** +- Create: `services/backtester/pyproject.toml` +- Create: `services/backtester/Dockerfile` +- Create: `services/backtester/src/backtester/__init__.py` +- Create: `services/backtester/src/backtester/config.py` +- Create: `services/backtester/src/backtester/simulator.py` +- Create: `services/backtester/src/backtester/engine.py` +- Create: `services/backtester/src/backtester/reporter.py` +- Create: `services/backtester/src/backtester/main.py` +- Create: `services/backtester/tests/test_simulator.py` +- Create: `services/backtester/tests/test_engine.py` +- Create: `services/backtester/tests/test_reporter.py` + +- [ ] **Step 1: Create pyproject.toml** + +Create `services/backtester/pyproject.toml`: + +```toml +[project] +name = "backtester" +version = "0.1.0" +description = "Strategy backtesting engine" +requires-python = ">=3.12" +dependencies = [ + "pandas>=2.0", + "trading-shared", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0", + "pytest-asyncio>=0.23", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/backtester"] +``` + +- [ ] **Step 2: Write failing tests for simulator** + +Create `services/backtester/tests/test_simulator.py`: + +```python +from decimal import Decimal + +from shared.models import Signal, OrderSide +from backtester.simulator import OrderSimulator + + +def make_signal(side=OrderSide.BUY, price="50000", quantity="0.1") -> Signal: + return Signal( + strategy="test", + symbol="BTCUSDT", + side=side, + price=Decimal(price), + quantity=Decimal(quantity), + reason="test", + ) + + +def test_simulator_initial_balance(): + sim = OrderSimulator(initial_balance=Decimal("10000")) + assert sim.balance == Decimal("10000") + + +def test_simulator_buy_reduces_balance(): + sim = OrderSimulator(initial_balance=Decimal("10000")) + sim.execute(make_signal(side=OrderSide.BUY, price="50000", quantity="0.1")) + + assert sim.balance == Decimal("5000") # 10000 - 0.1*50000 + assert sim.positions["BTCUSDT"] == Decimal("0.1") + + +def test_simulator_sell_increases_balance(): + sim = OrderSimulator(initial_balance=Decimal("10000")) + sim.execute(make_signal(side=OrderSide.BUY, price="50000", quantity="0.1")) + sim.execute(make_signal(side=OrderSide.SELL, price="55000", quantity="0.1")) + + assert sim.balance == Decimal("10500") # 5000 + 0.1*55000 + assert sim.positions.get("BTCUSDT", Decimal("0")) == Decimal("0") + + +def test_simulator_reject_buy_insufficient_balance(): + sim = OrderSimulator(initial_balance=Decimal("100")) + result = sim.execute(make_signal(side=OrderSide.BUY, price="50000", quantity="0.1")) + assert result is False + assert sim.balance == Decimal("100") + + +def test_simulator_trade_history(): + sim = OrderSimulator(initial_balance=Decimal("10000")) + sim.execute(make_signal(side=OrderSide.BUY)) + assert len(sim.trades) == 1 +``` + +- [ ] **Step 3: Run tests to verify they fail** + +```bash +pip install -e services/backtester[dev] +pytest services/backtester/tests/test_simulator.py -v +``` + +Expected: FAIL + +- [ ] **Step 4: Implement simulator** + +Create `services/backtester/src/backtester/__init__.py`: + +```python +``` + +Create `services/backtester/src/backtester/simulator.py`: + +```python +from __future__ import annotations + +from dataclasses import dataclass, field +from decimal import Decimal + +from shared.models import Signal, OrderSide + + +@dataclass +class SimulatedTrade: + symbol: str + side: OrderSide + price: Decimal + quantity: Decimal + balance_after: Decimal + + +class OrderSimulator: + def __init__(self, initial_balance: Decimal): + self.balance = initial_balance + self.positions: dict[str, Decimal] = {} + self.trades: list[SimulatedTrade] = [] + + def execute(self, signal: Signal) -> bool: + if signal.side == OrderSide.BUY: + cost = signal.price * signal.quantity + if cost > self.balance: + return False + self.balance -= cost + current = self.positions.get(signal.symbol, Decimal("0")) + self.positions[signal.symbol] = current + signal.quantity + elif signal.side == OrderSide.SELL: + current = self.positions.get(signal.symbol, Decimal("0")) + sell_qty = min(signal.quantity, current) + if sell_qty <= 0: + return False + self.balance += signal.price * sell_qty + self.positions[signal.symbol] = current - sell_qty + + self.trades.append( + SimulatedTrade( + symbol=signal.symbol, + side=signal.side, + price=signal.price, + quantity=signal.quantity, + balance_after=self.balance, + ) + ) + return True +``` + +- [ ] **Step 5: Run tests to verify they pass** + +```bash +pytest services/backtester/tests/test_simulator.py -v +``` + +Expected: All PASS + +- [ ] **Step 6: Write failing tests for backtest engine** + +Create `services/backtester/tests/test_engine.py`: + +```python +import pytest +from decimal import Decimal +from datetime import datetime, timezone +from unittest.mock import MagicMock + +from shared.models import Candle, Signal, OrderSide +from backtester.engine import BacktestEngine + + +def make_candles(prices: list[float]) -> list[Candle]: + return [ + Candle( + symbol="BTCUSDT", + timeframe="1m", + open_time=datetime(2026, 1, 1, minute=i, tzinfo=timezone.utc), + open=Decimal(str(p)), + high=Decimal(str(p + 10)), + low=Decimal(str(p - 10)), + close=Decimal(str(p)), + volume=Decimal("1.0"), + ) + for i, p in enumerate(prices) + ] + + +def test_backtest_engine_runs_strategy_over_candles(): + mock_strategy = MagicMock() + mock_strategy.name = "test" + mock_strategy.on_candle.return_value = None + + candles = make_candles([50000, 50100, 50200]) + + engine = BacktestEngine( + strategy=mock_strategy, + initial_balance=Decimal("10000"), + ) + result = engine.run(candles) + + assert mock_strategy.on_candle.call_count == 3 + assert result.total_trades == 0 + assert result.final_balance == Decimal("10000") + + +def test_backtest_engine_executes_signals(): + buy_signal = Signal( + strategy="test", + symbol="BTCUSDT", + side=OrderSide.BUY, + price=Decimal("50000"), + quantity=Decimal("0.1"), + reason="test buy", + ) + sell_signal = Signal( + strategy="test", + symbol="BTCUSDT", + side=OrderSide.SELL, + price=Decimal("55000"), + quantity=Decimal("0.1"), + reason="test sell", + ) + + mock_strategy = MagicMock() + mock_strategy.name = "test" + mock_strategy.on_candle.side_effect = [buy_signal, None, sell_signal] + + candles = make_candles([50000, 52000, 55000]) + + engine = BacktestEngine( + strategy=mock_strategy, + initial_balance=Decimal("10000"), + ) + result = engine.run(candles) + + assert result.total_trades == 2 + assert result.final_balance == Decimal("10500") # 10000 - 5000 + 5500 +``` + +- [ ] **Step 7: Run tests to verify they fail** + +```bash +pytest services/backtester/tests/test_engine.py -v +``` + +Expected: FAIL + +- [ ] **Step 8: Implement backtest engine** + +Create `services/backtester/src/backtester/engine.py`: + +```python +from __future__ import annotations + +from dataclasses import dataclass +from decimal import Decimal + +from shared.models import Candle +from backtester.simulator import OrderSimulator +from strategies.base import BaseStrategy + + +@dataclass +class BacktestResult: + strategy_name: str + symbol: str + total_trades: int + initial_balance: Decimal + final_balance: Decimal + profit: Decimal + profit_pct: Decimal + trades: list + + @property + def win_rate(self) -> Decimal: + if self.total_trades == 0: + return Decimal("0") + wins = sum( + 1 + for i in range(0, len(self.trades) - 1, 2) + if i + 1 < len(self.trades) + and self.trades[i + 1].balance_after > self.trades[i].balance_after + ) + pairs = self.total_trades // 2 + return Decimal(str(wins / pairs * 100)) if pairs > 0 else Decimal("0") + + +class BacktestEngine: + def __init__(self, strategy: BaseStrategy, initial_balance: Decimal): + self._strategy = strategy + self._initial_balance = initial_balance + + def run(self, candles: list[Candle]) -> BacktestResult: + simulator = OrderSimulator(self._initial_balance) + symbol = candles[0].symbol if candles else "" + + for candle in candles: + signal = self._strategy.on_candle(candle) + if signal is not None: + simulator.execute(signal) + + final = simulator.balance + # Add value of remaining positions at last candle price + if candles: + last_price = candles[-1].close + for sym, qty in simulator.positions.items(): + final += qty * last_price + + profit = final - self._initial_balance + profit_pct = (profit / self._initial_balance) * 100 if self._initial_balance > 0 else Decimal("0") + + return BacktestResult( + strategy_name=self._strategy.name, + symbol=symbol, + total_trades=len(simulator.trades), + initial_balance=self._initial_balance, + final_balance=final, + profit=profit, + profit_pct=profit_pct, + trades=simulator.trades, + ) +``` + +- [ ] **Step 9: Run tests to verify they pass** + +```bash +pytest services/backtester/tests/test_engine.py -v +``` + +Expected: All PASS + +- [ ] **Step 10: Write failing tests for reporter** + +Create `services/backtester/tests/test_reporter.py`: + +```python +from decimal import Decimal + +from backtester.engine import BacktestResult +from backtester.reporter import format_report + + +def test_format_report_contains_key_metrics(): + result = BacktestResult( + strategy_name="rsi", + symbol="BTCUSDT", + total_trades=10, + initial_balance=Decimal("10000"), + final_balance=Decimal("11500"), + profit=Decimal("1500"), + profit_pct=Decimal("15"), + trades=[], + ) + + report = format_report(result) + + assert "rsi" in report + assert "BTCUSDT" in report + assert "10000" in report + assert "11500" in report + assert "1500" in report + assert "15" in report +``` + +- [ ] **Step 11: Run test to verify it fails** + +```bash +pytest services/backtester/tests/test_reporter.py -v +``` + +Expected: FAIL + +- [ ] **Step 12: Implement reporter** + +Create `services/backtester/src/backtester/reporter.py`: + +```python +from backtester.engine import BacktestResult + + +def format_report(result: BacktestResult) -> str: + lines = [ + "=" * 50, + f" Backtest Report: {result.strategy_name}", + "=" * 50, + f" Symbol: {result.symbol}", + f" Total Trades: {result.total_trades}", + f" Initial Balance: {result.initial_balance}", + f" Final Balance: {result.final_balance}", + f" Profit: {result.profit}", + f" Profit %: {result.profit_pct:.2f}%", + f" Win Rate: {result.win_rate:.1f}%", + "=" * 50, + ] + return "\n".join(lines) +``` + +- [ ] **Step 13: Run test to verify it passes** + +```bash +pytest services/backtester/tests/test_reporter.py -v +``` + +Expected: PASS + +- [ ] **Step 14: Implement config and main** + +Create `services/backtester/src/backtester/config.py`: + +```python +from shared.config import Settings + + +class BacktestConfig(Settings): + backtest_initial_balance: float = 10000.0 +``` + +Create `services/backtester/src/backtester/main.py`: + +```python +from __future__ import annotations + +import asyncio +import logging +from decimal import Decimal +from pathlib import Path + +from shared.db import Database +from backtester.config import BacktestConfig +from backtester.engine import BacktestEngine +from backtester.reporter import format_report + +logger = logging.getLogger(__name__) + + +async def run_backtest( + strategy_name: str, + symbol: str, + timeframe: str, + initial_balance: Decimal, + db: Database, + strategies_dir: Path, +) -> str: + from strategy_engine.plugin_loader import load_strategies + + strategies = load_strategies(strategies_dir) + strategy = next((s for s in strategies if s.name == strategy_name), None) + if strategy is None: + return f"Strategy '{strategy_name}' not found" + + candles_data = await db.get_candles(symbol, timeframe) + if not candles_data: + return f"No candle data for {symbol} {timeframe}" + + from shared.models import Candle + + candles = [Candle(**row) for row in reversed(candles_data)] + + engine = BacktestEngine(strategy=strategy, initial_balance=initial_balance) + result = engine.run(candles) + return format_report(result) +``` + +- [ ] **Step 15: Create Dockerfile** + +Create `services/backtester/Dockerfile`: + +```dockerfile +FROM python:3.12-slim + +WORKDIR /app + +COPY shared/ shared/ +RUN pip install --no-cache-dir ./shared + +COPY services/strategy-engine/strategies/ services/strategy-engine/strategies/ +COPY services/backtester/ services/backtester/ +RUN pip install --no-cache-dir ./services/backtester + +CMD ["python", "-m", "backtester.main"] +``` + +- [ ] **Step 16: Commit** + +```bash +git add services/backtester/ +git commit -m "feat(backtester): add backtesting engine with simulator and reporting" +``` + +--- + +## Task 10: CLI + +**Files:** +- Create: `cli/pyproject.toml` +- Create: `cli/src/trading_cli/__init__.py` +- Create: `cli/src/trading_cli/main.py` +- Create: `cli/src/trading_cli/commands/data.py` +- Create: `cli/src/trading_cli/commands/trade.py` +- Create: `cli/src/trading_cli/commands/backtest.py` +- Create: `cli/src/trading_cli/commands/portfolio.py` +- Create: `cli/src/trading_cli/commands/strategy.py` +- Create: `cli/src/trading_cli/commands/service.py` +- Create: `cli/tests/test_cli_data.py` + +- [ ] **Step 1: Create pyproject.toml** + +Create `cli/pyproject.toml`: + +```toml +[project] +name = "trading-cli" +version = "0.1.0" +description = "CLI interface for the trading platform" +requires-python = ">=3.12" +dependencies = [ + "click>=8.0", + "rich>=13.0", + "trading-shared", +] + +[project.scripts] +trading = "trading_cli.main:cli" + +[project.optional-dependencies] +dev = [ + "pytest>=8.0", + "pytest-asyncio>=0.23", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/trading_cli"] +``` + +- [ ] **Step 2: Write failing tests for CLI data commands** + +Create `cli/tests/test_cli_data.py`: + +```python +from click.testing import CliRunner +from trading_cli.main import cli + + +def test_cli_help(): + runner = CliRunner() + result = runner.invoke(cli, ["--help"]) + assert result.exit_code == 0 + assert "trading" in result.output.lower() or "Usage" in result.output + + +def test_cli_data_group(): + runner = CliRunner() + result = runner.invoke(cli, ["data", "--help"]) + assert result.exit_code == 0 + assert "collect" in result.output + assert "history" in result.output +``` + +- [ ] **Step 3: Run tests to verify they fail** + +```bash +pip install -e cli[dev] +pytest cli/tests/test_cli_data.py -v +``` + +Expected: FAIL + +- [ ] **Step 4: Implement CLI main and data commands** + +Create `cli/src/trading_cli/__init__.py`: + +```python +``` + +Create `cli/src/trading_cli/main.py`: + +```python +import click + +from trading_cli.commands.data import data +from trading_cli.commands.trade import trade +from trading_cli.commands.backtest import backtest +from trading_cli.commands.portfolio import portfolio +from trading_cli.commands.strategy import strategy +from trading_cli.commands.service import service + + +@click.group() +@click.version_option(version="0.1.0") +def cli(): + """Trading Platform CLI — Binance spot crypto trading""" + pass + + +cli.add_command(data) +cli.add_command(trade) +cli.add_command(backtest) +cli.add_command(portfolio) +cli.add_command(strategy) +cli.add_command(service) +``` + +Create `cli/src/trading_cli/commands/data.py`: + +```python +import asyncio + +import click + + +@click.group() +def data(): + """Data collection commands""" + pass + + +@data.command() +@click.option("--symbol", required=True, help="Trading pair (e.g. BTCUSDT)") +@click.option("--timeframe", default="1m", help="Candle timeframe") +def collect(symbol: str, timeframe: str): + """Start real-time data collection""" + click.echo(f"Starting data collection: {symbol} {timeframe}") + + from data_collector.config import CollectorConfig + from data_collector.main import run + + asyncio.run(run()) + + +@data.command() +@click.option("--symbol", required=True, help="Trading pair (e.g. BTCUSDT)") +@click.option("--timeframe", default="1m", help="Candle timeframe") +@click.option("--from", "since", required=True, help="Start date (YYYY-MM-DD)") +@click.option("--limit", default=1000, help="Number of candles") +def history(symbol: str, timeframe: str, since: str, limit: int): + """Download historical candle data""" + click.echo(f"Downloading history: {symbol} {timeframe} from {since} (limit={limit})") + + async def _run(): + import ccxt.async_support as ccxt + from datetime import datetime, timezone + from shared.broker import RedisBroker + from shared.config import Settings + from shared.db import Database + from data_collector.binance_rest import fetch_historical_candles + from data_collector.storage import CandleStorage + + settings = Settings() + db = Database(settings.database_url) + await db.connect() + await db.init_tables() + broker = RedisBroker(settings.redis_url) + storage = CandleStorage(db=db, broker=broker) + + exchange = ccxt.binance() + since_dt = datetime.strptime(since, "%Y-%m-%d").replace(tzinfo=timezone.utc) + candles = await fetch_historical_candles( + exchange=exchange, + symbol=symbol.replace("USDT", "/USDT"), + timeframe=timeframe, + since=since_dt, + limit=limit, + ) + await storage.store_batch(candles) + await exchange.close() + await broker.close() + await db.close() + click.echo(f"Downloaded {len(candles)} candles") + + asyncio.run(_run()) + + +@data.command("list") +def list_data(): + """List currently collecting symbols""" + click.echo("Collecting symbols:") + click.echo(" (Check docker-compose service status)") +``` + +- [ ] **Step 5: Implement remaining CLI command stubs** + +Create `cli/src/trading_cli/commands/trade.py`: + +```python +import click + + +@click.group() +def trade(): + """Trading bot commands""" + pass + + +@trade.command() +@click.option("--strategy", required=True, help="Strategy name") +@click.option("--symbol", required=True, help="Trading pair") +def start(strategy: str, symbol: str): + """Start a trading bot""" + click.echo(f"Starting bot: strategy={strategy} symbol={symbol}") + + +@trade.command() +@click.option("--strategy", required=True, help="Strategy name") +def stop(strategy: str): + """Stop a trading bot""" + click.echo(f"Stopping bot: strategy={strategy}") + + +@trade.command() +def status(): + """Show running bot status""" + click.echo("Running bots:") + + +@trade.command("stop-all") +def stop_all(): + """Emergency stop: stop all bots and cancel all orders""" + click.confirm("Are you sure you want to stop ALL bots?", abort=True) + click.echo("Stopping all bots and cancelling open orders...") +``` + +Create `cli/src/trading_cli/commands/backtest.py`: + +```python +import asyncio +from decimal import Decimal + +import click + + +@click.group() +def backtest(): + """Backtesting commands""" + pass + + +@backtest.command("run") +@click.option("--strategy", required=True, help="Strategy name") +@click.option("--symbol", required=True, help="Trading pair") +@click.option("--from", "since", required=True, help="Start date") +@click.option("--to", "until", required=True, help="End date") +@click.option("--balance", default=10000.0, help="Initial balance") +def run_backtest(strategy: str, symbol: str, since: str, until: str, balance: float): + """Run a backtest""" + click.echo(f"Running backtest: {strategy} on {symbol} ({since} ~ {until})") + + async def _run(): + from pathlib import Path + from shared.config import Settings + from shared.db import Database + from backtester.main import run_backtest as bt_run + + settings = Settings() + db = Database(settings.database_url) + await db.connect() + + strategies_dir = Path(__file__).parent.parent.parent.parent.parent / "services" / "strategy-engine" / "strategies" + report = await bt_run( + strategy_name=strategy, + symbol=symbol, + timeframe="1m", + initial_balance=Decimal(str(balance)), + db=db, + strategies_dir=strategies_dir, + ) + click.echo(report) + await db.close() + + asyncio.run(_run()) + + +@backtest.command() +@click.option("--id", "report_id", default="latest", help="Report ID") +def report(report_id: str): + """Show backtest report""" + click.echo(f"Showing report: {report_id}") +``` + +Create `cli/src/trading_cli/commands/portfolio.py`: + +```python +import click + + +@click.group() +def portfolio(): + """Portfolio commands""" + pass + + +@portfolio.command() +def show(): + """Show current portfolio""" + click.echo("Current Portfolio:") + click.echo(" (Connect to portfolio-manager service)") + + +@portfolio.command() +@click.option("--days", default=30, help="Number of days") +def history(days: int): + """Show PnL history""" + click.echo(f"PnL history (last {days} days):") +``` + +Create `cli/src/trading_cli/commands/strategy.py`: + +```python +from pathlib import Path + +import click + + +@click.group() +def strategy(): + """Strategy management commands""" + pass + + +@strategy.command("list") +def list_strategies(): + """List available strategies""" + from strategy_engine.plugin_loader import load_strategies + + strategies_dir = Path(__file__).parent.parent.parent.parent.parent / "services" / "strategy-engine" / "strategies" + strategies = load_strategies(strategies_dir) + click.echo("Available strategies:") + for s in strategies: + click.echo(f" - {s.name}") + + +@strategy.command() +@click.option("--name", required=True, help="Strategy name") +def info(name: str): + """Show strategy details""" + click.echo(f"Strategy: {name}") +``` + +Create `cli/src/trading_cli/commands/service.py`: + +```python +import subprocess + +import click + + +@click.group() +def service(): + """Service management commands""" + pass + + +@service.command() +def up(): + """Start all services""" + click.echo("Starting all services...") + subprocess.run(["docker", "compose", "up", "-d"], check=True) + + +@service.command() +def down(): + """Stop all services""" + click.echo("Stopping all services...") + subprocess.run(["docker", "compose", "down"], check=True) + + +@service.command() +@click.option("--name", required=True, help="Service name") +def logs(name: str): + """Show service logs""" + subprocess.run(["docker", "compose", "logs", "-f", name]) +``` + +- [ ] **Step 6: Run tests to verify they pass** + +```bash +pytest cli/tests/test_cli_data.py -v +``` + +Expected: All PASS + +- [ ] **Step 7: Commit** + +```bash +git add cli/ +git commit -m "feat(cli): add Click-based CLI with data, trade, backtest, portfolio, strategy, and service commands" +``` + +--- + +## Task 11: Integration Verification + +- [ ] **Step 1: Run all tests** + +```bash +cd /home/si/Private/repos/trading +pytest -v +``` + +Expected: All tests pass + +- [ ] **Step 2: Lint check** + +```bash +ruff check . +``` + +Fix any issues found. + +- [ ] **Step 3: Verify Docker builds** + +```bash +docker compose build +``` + +Expected: All services build successfully + +- [ ] **Step 4: Start infrastructure and verify** + +```bash +make infra +# Wait for healthy status +docker compose ps +``` + +Expected: redis and postgres running and healthy + +- [ ] **Step 5: Final commit** + +```bash +git add . +git commit -m "chore: integration verification — all tests pass, docker builds succeed" +``` diff --git a/docs/superpowers/specs/2026-04-01-crypto-trading-platform-design.md b/docs/superpowers/specs/2026-04-01-crypto-trading-platform-design.md new file mode 100644 index 0000000..aa32eb4 --- /dev/null +++ b/docs/superpowers/specs/2026-04-01-crypto-trading-platform-design.md @@ -0,0 +1,374 @@ +# Crypto Trading Platform — Design Spec + +## Overview + +Binance 현물 암호화폐 자동매매 플랫폼. 마이크로서비스 아키텍처 기반으로 데이터 수집, 전략 실행, 주문 처리, 포트폴리오 관리, 백테스팅을 독립 서비스로 운영한다. CLI로 제어하며, 전략은 플러그인 방식으로 확장 가능하다. + +- **시장:** 암호화폐 (Binance 현물) +- **언어:** Python +- **인터페이스:** CLI (Click) +- **아키텍처:** 마이크로서비스 (Docker Compose) + +--- + +## Architecture + +### 서비스 구성 + +``` +┌─────────────┐ ┌──────────────────┐ ┌─────────────────┐ +│ Data │───▶│ Message Broker │◀──│ Strategy │ +│ Collector │ │ (Redis Streams) │ │ Engine │ +└─────────────┘ └──────────────────┘ └─────────────────┘ + │ ▲ │ + ▼ │ ▼ + ┌──────────────────┐ ┌─────────────────┐ + │ Backtester │ │ Order │ + │ │ │ Executor │ + └──────────────────┘ └─────────────────┘ + │ + ┌────────────────────────┘ + ▼ + ┌──────────────────┐ + │ Portfolio │ + │ Manager │ + └──────────────────┘ + + CLI ──────▶ 각 서비스에 명령 전달 +``` + +| 서비스 | 역할 | 상시 실행 | +|--------|------|-----------| +| **data-collector** | Binance WebSocket/REST로 시세 수집, DB 저장 | Yes | +| **strategy-engine** | 플러그인 전략 로드 및 시그널 생성 | 봇 실행 시 | +| **order-executor** | 시그널 받아 실제 주문 실행 + 리스크 관리 | 봇 실행 시 | +| **portfolio-manager** | 잔고, 손익, 포지션 추적 | Yes | +| **backtester** | 과거 데이터로 전략 검증 | 요청 시 | +| **shared** | 공통 모델, 이벤트 정의, 유틸리티 (라이브러리) | — | +| **cli** | 사용자 인터페이스, 각 서비스 제어 | — | + +### 통신 흐름 + +``` +[Binance WS] + │ + ▼ +data-collector ──publish──▶ Redis Stream: "candles.{symbol}" + │ + ┌───────────────┤ + ▼ ▼ + strategy-engine backtester (과거 데이터는 DB에서) + │ + ▼ + Redis Stream: "signals" + │ + ▼ + order-executor + │ + ┌───────┴───────┐ + ▼ ▼ + [Binance API] Redis Stream: "orders" + │ + ▼ + portfolio-manager +``` + +--- + +## Project Structure + +``` +trading/ +├── services/ +│ ├── data-collector/ +│ │ ├── src/ +│ │ │ ├── __init__.py +│ │ │ ├── main.py # 서비스 진입점 +│ │ │ ├── binance_ws.py # WebSocket 실시간 시세 +│ │ │ ├── binance_rest.py # REST 과거 데이터 수집 +│ │ │ ├── storage.py # DB 저장 로직 +│ │ │ └── config.py +│ │ ├── tests/ +│ │ ├── Dockerfile +│ │ └── pyproject.toml +│ │ +│ ├── strategy-engine/ +│ │ ├── src/ +│ │ │ ├── __init__.py +│ │ │ ├── main.py +│ │ │ ├── engine.py # 전략 로더 + 실행기 +│ │ │ ├── plugin_loader.py # 플러그인 동적 로드 +│ │ │ └── config.py +│ │ ├── strategies/ # 플러그인 전략 디렉토리 +│ │ │ ├── base.py # 전략 추상 클래스 +│ │ │ ├── rsi_strategy.py # 예시: RSI 전략 +│ │ │ └── grid_strategy.py # 예시: 그리드 전략 +│ │ ├── tests/ +│ │ ├── Dockerfile +│ │ └── pyproject.toml +│ │ +│ ├── order-executor/ +│ │ ├── src/ +│ │ │ ├── __init__.py +│ │ │ ├── main.py +│ │ │ ├── executor.py # 주문 실행 로직 +│ │ │ ├── risk_manager.py # 리스크 관리 (손절/익절) +│ │ │ └── config.py +│ │ ├── tests/ +│ │ ├── Dockerfile +│ │ └── pyproject.toml +│ │ +│ ├── portfolio-manager/ +│ │ ├── src/ +│ │ │ ├── __init__.py +│ │ │ ├── main.py +│ │ │ ├── portfolio.py # 잔고/포지션 추적 +│ │ │ ├── pnl.py # 손익 계산 +│ │ │ └── config.py +│ │ ├── tests/ +│ │ ├── Dockerfile +│ │ └── pyproject.toml +│ │ +│ └── backtester/ +│ ├── src/ +│ │ ├── __init__.py +│ │ ├── main.py +│ │ ├── engine.py # 백테스팅 엔진 +│ │ ├── simulator.py # 가상 주문 시뮬레이터 +│ │ ├── reporter.py # 결과 리포트 생성 +│ │ └── config.py +│ ├── tests/ +│ ├── Dockerfile +│ └── pyproject.toml +│ +├── shared/ +│ ├── src/shared/ +│ │ ├── __init__.py +│ │ ├── models.py # 공통 데이터 모델 +│ │ ├── events.py # 이벤트 타입 정의 +│ │ ├── broker.py # Redis Streams 클라이언트 +│ │ ├── db.py # DB 연결 (PostgreSQL) +│ │ └── config.py # 공통 설정 +│ ├── tests/ +│ └── pyproject.toml +│ +├── cli/ +│ ├── src/ +│ │ ├── __init__.py +│ │ ├── main.py # Click 기반 CLI 진입점 +│ │ ├── commands/ +│ │ │ ├── data.py # 데이터 수집 명령 +│ │ │ ├── trade.py # 매매 시작/중지 +│ │ │ ├── backtest.py # 백테스팅 실행 +│ │ │ ├── portfolio.py # 포트폴리오 조회 +│ │ │ └── strategy.py # 전략 관리 +│ │ └── config.py +│ ├── tests/ +│ └── pyproject.toml +│ +├── docker-compose.yml # 전체 서비스 오케스트레이션 +├── .env.example # 환경변수 템플릿 +├── Makefile # 공통 명령어 +└── README.md +``` + +--- + +## Tech Stack + +| 용도 | 라이브러리 | +|------|-----------| +| 거래소 API | **ccxt** | +| 메시지 브로커 | **Redis Streams** | +| DB | **PostgreSQL** + **asyncpg** | +| CLI | **Click** | +| 데이터 분석 | **pandas**, **numpy** | +| 기술 지표 | **pandas-ta** | +| 비동기 처리 | **asyncio** + **aiohttp** | +| 설정 관리 | **pydantic-settings** | +| 컨테이너 | **Docker** + **docker-compose** | +| 테스트 | **pytest** + **pytest-asyncio** | + +--- + +## Data Models + +### Core Models (shared/models.py) + +```python +class Candle: + symbol: str # "BTCUSDT" + timeframe: str # "1m", "5m", "1h" + open_time: datetime + open: Decimal + high: Decimal + low: Decimal + close: Decimal + volume: Decimal + +class Signal: + strategy: str # "rsi_strategy" + symbol: str + side: "BUY" | "SELL" + price: Decimal + quantity: Decimal + reason: str # 시그널 발생 근거 + +class Order: + id: str + signal_id: str # 추적용 + symbol: str + side: "BUY" | "SELL" + type: "MARKET" | "LIMIT" + price: Decimal + quantity: Decimal + status: "PENDING" | "FILLED" | "CANCELLED" | "FAILED" + created_at: datetime + filled_at: datetime | None + +class Position: + symbol: str + quantity: Decimal + avg_entry_price: Decimal + current_price: Decimal + unrealized_pnl: Decimal +``` + +### PostgreSQL Tables + +| 테이블 | 용도 | +|--------|------| +| `candles` | 시세 이력 (파티셔닝: symbol + timeframe) | +| `signals` | 전략 시그널 이력 | +| `orders` | 주문 이력 | +| `trades` | 체결 이력 | +| `positions` | 현재 포지션 | +| `portfolio_snapshots` | 일별 포트폴리오 스냅샷 | + +### Storage Strategy + +- **실시간 시세:** Redis 캐싱 + PostgreSQL 영구 저장 +- **주문/체결:** PostgreSQL 즉시 기록 +- **백테스팅 데이터:** PostgreSQL에서 bulk read (pandas DataFrame) + +--- + +## Strategy Plugin System + +### Base Interface + +```python +from abc import ABC, abstractmethod +from shared.models import Candle, Signal + +class BaseStrategy(ABC): + @abstractmethod + def on_candle(self, candle: Candle) -> Signal | None: + """캔들 데이터 수신 시 시그널 반환""" + pass + + @abstractmethod + def configure(self, params: dict) -> None: + """전략 파라미터 설정""" + pass +``` + +새 전략 추가 = `BaseStrategy` 상속 파일 하나 작성 후 `strategies/` 디렉토리에 배치. + +### 예시 전략 + +- **RSI Strategy:** RSI 과매도 시 매수, 과매수 시 매도 +- **Grid Strategy:** 가격 구간을 나눠 자동 매수/매도 주문 배치 + +--- + +## CLI Interface + +```bash +# 데이터 수집 +trading data collect --symbol BTCUSDT --timeframe 1m +trading data history --symbol BTCUSDT --from 2025-01-01 +trading data list + +# 자동매매 +trading trade start --strategy rsi --symbol BTCUSDT +trading trade stop --strategy rsi +trading trade status + +# 수동매매 +trading order buy --symbol BTCUSDT --quantity 0.01 +trading order sell --symbol BTCUSDT --price 70000 +trading order cancel --id abc123 + +# 백테스팅 +trading backtest run --strategy rsi --symbol BTCUSDT \ + --from 2025-01-01 --to 2025-12-31 +trading backtest report --id latest + +# 포트폴리오 +trading portfolio show +trading portfolio history --days 30 + +# 전략 관리 +trading strategy list +trading strategy info --name rsi + +# 서비스 관리 +trading service up +trading service down +trading service logs --name strategy-engine +``` + +--- + +## Risk Management + +### Risk Check Pipeline (order-executor) + +시그널 수신 시 다음 체크를 순서대로 통과해야 주문 실행: + +1. 최대 포지션 크기 초과 여부 +2. 일일 최대 손실 한도 도달 여부 +3. 동일 심볼 중복 주문 방지 +4. 주문 금액 < 가용 잔고 확인 +5. 가격 급변 감지 (슬리피지 보호) + +### Safety Mechanisms + +| 장치 | 설명 | +|------|------| +| **긴급 정지 (Kill Switch)** | `trading trade stop-all` — 모든 봇 중지, 미체결 주문 전량 취소 | +| **일일 손실 한도** | 설정 비율 초과 시 자동 매매 중단 | +| **최대 포지션 제한** | 총 자산 대비 단일 심볼 비율 제한 | +| **연결 끊김 대응** | Binance 연결 끊기면 신규 주문 중단, 재연결 시도 | +| **드라이런 모드** | 실제 주문 없이 시그널만 생성 — 전략 검증용 | + +--- + +## Configuration (.env) + +``` +BINANCE_API_KEY= +BINANCE_API_SECRET= +REDIS_URL=redis://localhost:6379 +DATABASE_URL=postgresql://user:pass@localhost:5432/trading +LOG_LEVEL=INFO +RISK_MAX_POSITION_SIZE=0.1 +RISK_STOP_LOSS_PCT=5 +RISK_DAILY_LOSS_LIMIT_PCT=10 +DRY_RUN=true +``` + +--- + +## Docker Compose Services + +```yaml +services: + redis: # 메시지 브로커 (항상 실행) + postgres: # 데이터 저장소 (항상 실행) + data-collector: # 시세 수집 (항상 실행) + strategy-engine: # 전략 엔진 (봇 실행 시) + order-executor: # 주문 실행 (봇 실행 시) + portfolio-manager: # 포트폴리오 (항상 실행) +``` -- cgit v1.2.3