summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.env.example9
-rw-r--r--.mcp.json62
-rw-r--r--Makefile24
-rw-r--r--cli/pyproject.toml19
-rw-r--r--cli/src/trading_cli/__init__.py0
-rw-r--r--cli/src/trading_cli/commands/__init__.py0
-rw-r--r--cli/src/trading_cli/commands/backtest.py26
-rw-r--r--cli/src/trading_cli/commands/data.py31
-rw-r--r--cli/src/trading_cli/commands/portfolio.py20
-rw-r--r--cli/src/trading_cli/commands/service.py29
-rw-r--r--cli/src/trading_cli/commands/strategy.py20
-rw-r--r--cli/src/trading_cli/commands/trade.py35
-rw-r--r--cli/src/trading_cli/main.py22
-rw-r--r--cli/tests/__init__.py0
-rw-r--r--cli/tests/test_cli_data.py17
-rw-r--r--cli/tests/test_cli_trade.py10
-rw-r--r--docker-compose.yml80
-rw-r--r--docs/superpowers/plans/2026-04-01-crypto-trading-platform.md4063
-rw-r--r--docs/superpowers/specs/2026-04-01-crypto-trading-platform-design.md374
-rw-r--r--pyproject.toml14
-rw-r--r--services/backtester/Dockerfile7
-rw-r--r--services/backtester/pyproject.toml16
-rw-r--r--services/backtester/src/backtester/__init__.py0
-rw-r--r--services/backtester/src/backtester/config.py13
-rw-r--r--services/backtester/src/backtester/engine.py95
-rw-r--r--services/backtester/src/backtester/main.py60
-rw-r--r--services/backtester/src/backtester/reporter.py28
-rw-r--r--services/backtester/src/backtester/simulator.py54
-rw-r--r--services/backtester/tests/__init__.py0
-rw-r--r--services/backtester/tests/test_engine.py74
-rw-r--r--services/backtester/tests/test_reporter.py26
-rw-r--r--services/backtester/tests/test_simulator.py73
-rw-r--r--services/data-collector/Dockerfile7
-rw-r--r--services/data-collector/pyproject.toml23
-rw-r--r--services/data-collector/src/data_collector/__init__.py0
-rw-r--r--services/data-collector/src/data_collector/binance_rest.py53
-rw-r--r--services/data-collector/src/data_collector/binance_ws.py106
-rw-r--r--services/data-collector/src/data_collector/config.py6
-rw-r--r--services/data-collector/src/data_collector/main.py58
-rw-r--r--services/data-collector/src/data_collector/storage.py24
-rw-r--r--services/data-collector/tests/__init__.py0
-rw-r--r--services/data-collector/tests/test_binance_rest.py53
-rw-r--r--services/data-collector/tests/test_storage.py62
-rw-r--r--services/order-executor/Dockerfile7
-rw-r--r--services/order-executor/pyproject.toml16
-rw-r--r--services/order-executor/src/order_executor/__init__.py0
-rw-r--r--services/order-executor/src/order_executor/config.py6
-rw-r--r--services/order-executor/src/order_executor/executor.py100
-rw-r--r--services/order-executor/src/order_executor/main.py83
-rw-r--r--services/order-executor/src/order_executor/risk_manager.py55
-rw-r--r--services/order-executor/tests/__init__.py0
-rw-r--r--services/order-executor/tests/test_executor.py122
-rw-r--r--services/order-executor/tests/test_risk_manager.py72
-rw-r--r--services/portfolio-manager/Dockerfile7
-rw-r--r--services/portfolio-manager/pyproject.toml16
-rw-r--r--services/portfolio-manager/src/portfolio_manager/__init__.py0
-rw-r--r--services/portfolio-manager/src/portfolio_manager/config.py6
-rw-r--r--services/portfolio-manager/src/portfolio_manager/main.py56
-rw-r--r--services/portfolio-manager/src/portfolio_manager/pnl.py21
-rw-r--r--services/portfolio-manager/src/portfolio_manager/portfolio.py62
-rw-r--r--services/portfolio-manager/tests/__init__.py0
-rw-r--r--services/portfolio-manager/tests/test_pnl.py32
-rw-r--r--services/portfolio-manager/tests/test_portfolio.py57
-rw-r--r--services/strategy-engine/Dockerfile7
-rw-r--r--services/strategy-engine/pyproject.toml19
-rw-r--r--services/strategy-engine/src/strategy_engine/__init__.py0
-rw-r--r--services/strategy-engine/src/strategy_engine/config.py8
-rw-r--r--services/strategy-engine/src/strategy_engine/engine.py54
-rw-r--r--services/strategy-engine/src/strategy_engine/main.py56
-rw-r--r--services/strategy-engine/src/strategy_engine/plugin_loader.py36
-rw-r--r--services/strategy-engine/strategies/__init__.py0
-rw-r--r--services/strategy-engine/strategies/base.py17
-rw-r--r--services/strategy-engine/strategies/grid_strategy.py77
-rw-r--r--services/strategy-engine/strategies/rsi_strategy.py77
-rw-r--r--services/strategy-engine/tests/__init__.py0
-rw-r--r--services/strategy-engine/tests/conftest.py8
-rw-r--r--services/strategy-engine/tests/test_engine.py72
-rw-r--r--services/strategy-engine/tests/test_grid_strategy.py60
-rw-r--r--services/strategy-engine/tests/test_plugin_loader.py22
-rw-r--r--services/strategy-engine/tests/test_rsi_strategy.py45
-rw-r--r--shared/pyproject.toml25
-rw-r--r--shared/src/shared/__init__.py1
-rw-r--r--shared/src/shared/broker.py43
-rw-r--r--shared/src/shared/config.py16
-rw-r--r--shared/src/shared/db.py184
-rw-r--r--shared/src/shared/events.py75
-rw-r--r--shared/src/shared/models.py72
-rw-r--r--shared/tests/__init__.py0
-rw-r--r--shared/tests/test_broker.py66
-rw-r--r--shared/tests/test_db.py70
-rw-r--r--shared/tests/test_events.py80
-rw-r--r--shared/tests/test_models.py100
92 files changed, 7701 insertions, 0 deletions
diff --git a/.env.example b/.env.example
new file mode 100644
index 0000000..37ddd56
--- /dev/null
+++ b/.env.example
@@ -0,0 +1,9 @@
+BINANCE_API_KEY=
+BINANCE_API_SECRET=
+REDIS_URL=redis://localhost:6379
+DATABASE_URL=postgresql://trading:trading@localhost:5432/trading
+LOG_LEVEL=INFO
+RISK_MAX_POSITION_SIZE=0.1
+RISK_STOP_LOSS_PCT=5
+RISK_DAILY_LOSS_LIMIT_PCT=10
+DRY_RUN=true
diff --git a/.mcp.json b/.mcp.json
new file mode 100644
index 0000000..cd02c2a
--- /dev/null
+++ b/.mcp.json
@@ -0,0 +1,62 @@
+{
+ "mcpServers": {
+ "fetch": {
+ "command": "uvx",
+ "args": ["mcp-server-fetch"]
+ },
+ "chrome-devtools": {
+ "command": "npx",
+ "args": [ "-y", "chrome-devtools-mcp@latest" ]
+ },
+ "filesystem": {
+ "command": "npx",
+ "args": [
+ "-y",
+ "@modelcontextprotocol/server-filesystem",
+ "/home/si/Public/repos/epc-portal"
+ ]
+ },
+ "git": {
+ "command": "uvx",
+ "args": [
+ "mcp-server-git",
+ "--repository",
+ "/home/si/Public/repos/epc-portal"
+ ]
+ },
+ "memory": {
+ "command": "npx",
+ "args": [
+ "-y",
+ "@modelcontextprotocol/server-memory"
+ ]
+ },
+ "postgres": {
+ "command": "npx",
+ "args": [
+ "-y",
+ "@modelcontextprotocol/server-postgres",
+ "postgresql://dts:dujinDTS2@localhost:5433/portal"
+ ]
+ },
+ "sequential-thinking": {
+ "command": "npx",
+ "args": [
+ "-y",
+ "@modelcontextprotocol/server-sequential-thinking"
+ ]
+ },
+ "supabase": {
+ "type": "http",
+ "url": "https://mcp.supabase.com/mcp"
+ },
+ "context7": {
+ "command": "npx",
+ "args": ["-y", "@upstash/context7-mcp", "--api-key", "$(pass show api/context7)"]
+ },
+ "taskmaster-ai": {
+ "command": "npx",
+ "args": ["-y", "task-master-ai@latest"]
+ }
+ }
+}
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..e852fac
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,24 @@
+.PHONY: infra up down logs test lint format
+
+infra:
+ docker compose up -d redis postgres
+
+up:
+ docker compose up -d
+
+down:
+ docker compose down
+
+logs:
+ docker compose logs -f $(service)
+
+test:
+ pytest -v
+
+lint:
+ ruff check .
+ ruff format --check .
+
+format:
+ ruff check --fix .
+ ruff format .
diff --git a/cli/pyproject.toml b/cli/pyproject.toml
new file mode 100644
index 0000000..e208021
--- /dev/null
+++ b/cli/pyproject.toml
@@ -0,0 +1,19 @@
+[project]
+name = "trading-cli"
+version = "0.1.0"
+description = "CLI interface for the trading platform"
+requires-python = ">=3.12"
+dependencies = ["click>=8.0", "rich>=13.0", "trading-shared"]
+
+[project.scripts]
+trading = "trading_cli.main:cli"
+
+[project.optional-dependencies]
+dev = ["pytest>=8.0", "pytest-asyncio>=0.23"]
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/trading_cli"]
diff --git a/cli/src/trading_cli/__init__.py b/cli/src/trading_cli/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/cli/src/trading_cli/__init__.py
diff --git a/cli/src/trading_cli/commands/__init__.py b/cli/src/trading_cli/commands/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/cli/src/trading_cli/commands/__init__.py
diff --git a/cli/src/trading_cli/commands/backtest.py b/cli/src/trading_cli/commands/backtest.py
new file mode 100644
index 0000000..40617b6
--- /dev/null
+++ b/cli/src/trading_cli/commands/backtest.py
@@ -0,0 +1,26 @@
+import click
+
+
+@click.group()
+def backtest():
+ """Backtesting commands."""
+ pass
+
+
+@backtest.command()
+@click.option("--strategy", required=True, help="Strategy name to backtest")
+@click.option("--symbol", required=True, help="Trading symbol (e.g. BTCUSDT)")
+@click.option("--from", "from_date", required=True, help="Start date (ISO format)")
+@click.option("--to", "to_date", default=None, help="End date (ISO format, defaults to now)")
+@click.option("--balance", default=10000.0, show_default=True, help="Initial balance in USDT")
+def run(strategy, symbol, from_date, to_date, balance):
+ """Run a backtest for a strategy."""
+ to_label = to_date or "now"
+ click.echo(f"Running backtest: strategy={strategy}, symbol={symbol}, {from_date} → {to_label}, balance={balance}...")
+
+
+@backtest.command()
+@click.option("--id", "backtest_id", required=True, help="Backtest run ID")
+def report(backtest_id):
+ """Show a backtest report by ID."""
+ click.echo(f"Showing backtest report for ID: {backtest_id}...")
diff --git a/cli/src/trading_cli/commands/data.py b/cli/src/trading_cli/commands/data.py
new file mode 100644
index 0000000..1fa5e30
--- /dev/null
+++ b/cli/src/trading_cli/commands/data.py
@@ -0,0 +1,31 @@
+import click
+
+
+@click.group()
+def data():
+ """Data collection and management commands."""
+ pass
+
+
+@data.command()
+@click.option("--symbol", required=True, help="Trading symbol (e.g. BTCUSDT)")
+@click.option("--timeframe", default="1m", show_default=True, help="Candle timeframe")
+def collect(symbol, timeframe):
+ """Start collecting live market data for a symbol."""
+ click.echo(f"Starting data collection for {symbol} at {timeframe} timeframe...")
+
+
+@data.command()
+@click.option("--symbol", required=True, help="Trading symbol (e.g. BTCUSDT)")
+@click.option("--timeframe", default="1m", show_default=True, help="Candle timeframe")
+@click.option("--from", "since", default=None, help="Start date (ISO format)")
+@click.option("--limit", default=1000, show_default=True, help="Number of candles to fetch")
+def history(symbol, timeframe, since, limit):
+ """Download historical market data for a symbol."""
+ click.echo(f"Downloading {limit} {timeframe} candles for {symbol}" + (f" since {since}" if since else "") + "...")
+
+
+@data.command("list")
+def list_():
+ """List available data streams and symbols."""
+ click.echo("Fetching available data streams and collected symbols...")
diff --git a/cli/src/trading_cli/commands/portfolio.py b/cli/src/trading_cli/commands/portfolio.py
new file mode 100644
index 0000000..9389bac
--- /dev/null
+++ b/cli/src/trading_cli/commands/portfolio.py
@@ -0,0 +1,20 @@
+import click
+
+
+@click.group()
+def portfolio():
+ """Portfolio management commands."""
+ pass
+
+
+@portfolio.command()
+def show():
+ """Show the current portfolio holdings and balances."""
+ click.echo("Fetching current portfolio...")
+
+
+@portfolio.command()
+@click.option("--days", default=30, show_default=True, help="Number of days of history")
+def history(days):
+ """Show PnL history for the portfolio."""
+ click.echo(f"Fetching PnL history for the last {days} days...")
diff --git a/cli/src/trading_cli/commands/service.py b/cli/src/trading_cli/commands/service.py
new file mode 100644
index 0000000..d01eaae
--- /dev/null
+++ b/cli/src/trading_cli/commands/service.py
@@ -0,0 +1,29 @@
+import subprocess
+import click
+
+
+@click.group()
+def service():
+ """Docker service management commands."""
+ pass
+
+
+@service.command()
+def up():
+ """Start all services with docker compose."""
+ click.echo("Starting all services...")
+ subprocess.run(["docker", "compose", "up", "-d"], check=True)
+
+
+@service.command()
+def down():
+ """Stop all services with docker compose."""
+ click.echo("Stopping all services...")
+ subprocess.run(["docker", "compose", "down"], check=True)
+
+
+@service.command()
+@click.option("--name", required=True, help="Service name to follow logs for")
+def logs(name):
+ """Follow logs for a specific service."""
+ subprocess.run(["docker", "compose", "logs", "-f", name], check=True)
diff --git a/cli/src/trading_cli/commands/strategy.py b/cli/src/trading_cli/commands/strategy.py
new file mode 100644
index 0000000..68ffeee
--- /dev/null
+++ b/cli/src/trading_cli/commands/strategy.py
@@ -0,0 +1,20 @@
+import click
+
+
+@click.group()
+def strategy():
+ """Strategy management commands."""
+ pass
+
+
+@strategy.command("list")
+def list_():
+ """List all available trading strategies."""
+ click.echo("Fetching available strategies...")
+
+
+@strategy.command()
+@click.option("--name", required=True, help="Strategy name")
+def info(name):
+ """Show detailed information about a strategy."""
+ click.echo(f"Fetching details for strategy: {name}...")
diff --git a/cli/src/trading_cli/commands/trade.py b/cli/src/trading_cli/commands/trade.py
new file mode 100644
index 0000000..f90e0ed
--- /dev/null
+++ b/cli/src/trading_cli/commands/trade.py
@@ -0,0 +1,35 @@
+import click
+
+
+@click.group()
+def trade():
+ """Trading bot management commands."""
+ pass
+
+
+@trade.command()
+@click.option("--strategy", required=True, help="Strategy name to run")
+@click.option("--symbol", required=True, help="Trading symbol (e.g. BTCUSDT)")
+def start(strategy, symbol):
+ """Start a trading bot for a strategy and symbol."""
+ click.echo(f"Starting trading bot: strategy={strategy}, symbol={symbol}...")
+
+
+@trade.command()
+@click.option("--strategy", required=True, help="Strategy name to stop")
+def stop(strategy):
+ """Stop a running trading bot."""
+ click.echo(f"Stopping trading bot for strategy: {strategy}...")
+
+
+@trade.command()
+def status():
+ """Show status of all running trading bots."""
+ click.echo("Fetching running bots status...")
+
+
+@trade.command("stop-all")
+def stop_all():
+ """Stop all running trading bots."""
+ click.confirm("Are you sure you want to stop all running bots?", abort=True)
+ click.echo("Stopping all running trading bots...")
diff --git a/cli/src/trading_cli/main.py b/cli/src/trading_cli/main.py
new file mode 100644
index 0000000..db3c282
--- /dev/null
+++ b/cli/src/trading_cli/main.py
@@ -0,0 +1,22 @@
+import click
+from trading_cli.commands.data import data
+from trading_cli.commands.trade import trade
+from trading_cli.commands.backtest import backtest
+from trading_cli.commands.portfolio import portfolio
+from trading_cli.commands.strategy import strategy
+from trading_cli.commands.service import service
+
+
+@click.group()
+@click.version_option(version="0.1.0")
+def cli():
+ """Trading Platform CLI — Binance spot crypto trading"""
+ pass
+
+
+cli.add_command(data)
+cli.add_command(trade)
+cli.add_command(backtest)
+cli.add_command(portfolio)
+cli.add_command(strategy)
+cli.add_command(service)
diff --git a/cli/tests/__init__.py b/cli/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/cli/tests/__init__.py
diff --git a/cli/tests/test_cli_data.py b/cli/tests/test_cli_data.py
new file mode 100644
index 0000000..2cc2149
--- /dev/null
+++ b/cli/tests/test_cli_data.py
@@ -0,0 +1,17 @@
+from click.testing import CliRunner
+from trading_cli.main import cli
+
+
+def test_cli_help():
+ runner = CliRunner()
+ result = runner.invoke(cli, ["--help"])
+ assert result.exit_code == 0
+ assert "Usage" in result.output
+
+
+def test_cli_data_group():
+ runner = CliRunner()
+ result = runner.invoke(cli, ["data", "--help"])
+ assert result.exit_code == 0
+ assert "collect" in result.output
+ assert "history" in result.output
diff --git a/cli/tests/test_cli_trade.py b/cli/tests/test_cli_trade.py
new file mode 100644
index 0000000..d3f3079
--- /dev/null
+++ b/cli/tests/test_cli_trade.py
@@ -0,0 +1,10 @@
+from click.testing import CliRunner
+from trading_cli.main import cli
+
+
+def test_cli_trade_group():
+ runner = CliRunner()
+ result = runner.invoke(cli, ["trade", "--help"])
+ assert result.exit_code == 0
+ assert "start" in result.output
+ assert "stop" in result.output
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 0000000..c961354
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,80 @@
+services:
+ redis:
+ image: redis:7-alpine
+ ports:
+ - "6379:6379"
+ volumes:
+ - redis_data:/data
+ healthcheck:
+ test: ["CMD", "redis-cli", "ping"]
+ interval: 5s
+ timeout: 3s
+ retries: 5
+
+ postgres:
+ image: postgres:16-alpine
+ ports:
+ - "5432:5432"
+ environment:
+ POSTGRES_USER: trading
+ POSTGRES_PASSWORD: trading
+ POSTGRES_DB: trading
+ volumes:
+ - postgres_data:/var/lib/postgresql/data
+ healthcheck:
+ test: ["CMD-LINE", "pg_isready", "-U", "trading"]
+ interval: 5s
+ timeout: 3s
+ retries: 5
+
+ data-collector:
+ build:
+ context: .
+ dockerfile: services/data-collector/Dockerfile
+ env_file: .env
+ depends_on:
+ redis:
+ condition: service_healthy
+ postgres:
+ condition: service_healthy
+ restart: unless-stopped
+
+ strategy-engine:
+ build:
+ context: .
+ dockerfile: services/strategy-engine/Dockerfile
+ env_file: .env
+ depends_on:
+ redis:
+ condition: service_healthy
+ postgres:
+ condition: service_healthy
+ restart: unless-stopped
+
+ order-executor:
+ build:
+ context: .
+ dockerfile: services/order-executor/Dockerfile
+ env_file: .env
+ depends_on:
+ redis:
+ condition: service_healthy
+ postgres:
+ condition: service_healthy
+ restart: unless-stopped
+
+ portfolio-manager:
+ build:
+ context: .
+ dockerfile: services/portfolio-manager/Dockerfile
+ env_file: .env
+ depends_on:
+ redis:
+ condition: service_healthy
+ postgres:
+ condition: service_healthy
+ restart: unless-stopped
+
+volumes:
+ redis_data:
+ postgres_data:
diff --git a/docs/superpowers/plans/2026-04-01-crypto-trading-platform.md b/docs/superpowers/plans/2026-04-01-crypto-trading-platform.md
new file mode 100644
index 0000000..08ff0f5
--- /dev/null
+++ b/docs/superpowers/plans/2026-04-01-crypto-trading-platform.md
@@ -0,0 +1,4063 @@
+# Crypto Trading Platform Implementation Plan
+
+> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
+
+**Goal:** Binance 현물 암호화폐 자동매매 플랫폼을 마이크로서비스 아키텍처로 구축한다.
+
+**Architecture:** 6개 독립 서비스(data-collector, strategy-engine, order-executor, portfolio-manager, backtester)가 Redis Streams로 통신하고, PostgreSQL에 데이터를 저장한다. shared 라이브러리가 공통 모델/이벤트/DB 연결을 제공하며, Click 기반 CLI로 전체를 제어한다.
+
+**Tech Stack:** Python 3.12, ccxt, Redis Streams, PostgreSQL, asyncpg, pandas, pandas-ta, Click, pydantic-settings, Docker Compose, pytest
+
+---
+
+## File Structure
+
+```
+trading/
+├── services/
+│ ├── data-collector/
+│ │ ├── src/data_collector/__init__.py
+│ │ ├── src/data_collector/main.py
+│ │ ├── src/data_collector/binance_ws.py
+│ │ ├── src/data_collector/binance_rest.py
+│ │ ├── src/data_collector/storage.py
+│ │ ├── src/data_collector/config.py
+│ │ ├── tests/test_binance_rest.py
+│ │ ├── tests/test_storage.py
+│ │ ├── tests/test_main.py
+│ │ ├── Dockerfile
+│ │ └── pyproject.toml
+│ ├── strategy-engine/
+│ │ ├── src/strategy_engine/__init__.py
+│ │ ├── src/strategy_engine/main.py
+│ │ ├── src/strategy_engine/engine.py
+│ │ ├── src/strategy_engine/plugin_loader.py
+│ │ ├── src/strategy_engine/config.py
+│ │ ├── strategies/base.py
+│ │ ├── strategies/rsi_strategy.py
+│ │ ├── strategies/grid_strategy.py
+│ │ ├── tests/test_engine.py
+│ │ ├── tests/test_plugin_loader.py
+│ │ ├── tests/test_rsi_strategy.py
+│ │ ├── tests/test_grid_strategy.py
+│ │ ├── Dockerfile
+│ │ └── pyproject.toml
+│ ├── order-executor/
+│ │ ├── src/order_executor/__init__.py
+│ │ ├── src/order_executor/main.py
+│ │ ├── src/order_executor/executor.py
+│ │ ├── src/order_executor/risk_manager.py
+│ │ ├── src/order_executor/config.py
+│ │ ├── tests/test_executor.py
+│ │ ├── tests/test_risk_manager.py
+│ │ ├── Dockerfile
+│ │ └── pyproject.toml
+│ ├── portfolio-manager/
+│ │ ├── src/portfolio_manager/__init__.py
+│ │ ├── src/portfolio_manager/main.py
+│ │ ├── src/portfolio_manager/portfolio.py
+│ │ ├── src/portfolio_manager/pnl.py
+│ │ ├── src/portfolio_manager/config.py
+│ │ ├── tests/test_portfolio.py
+│ │ ├── tests/test_pnl.py
+│ │ ├── Dockerfile
+│ │ └── pyproject.toml
+│ └── backtester/
+│ ├── src/backtester/__init__.py
+│ ├── src/backtester/main.py
+│ ├── src/backtester/engine.py
+│ ├── src/backtester/simulator.py
+│ ├── src/backtester/reporter.py
+│ ├── src/backtester/config.py
+│ ├── tests/test_engine.py
+│ ├── tests/test_simulator.py
+│ ├── tests/test_reporter.py
+│ ├── Dockerfile
+│ └── pyproject.toml
+├── shared/
+│ ├── src/shared/__init__.py
+│ ├── src/shared/models.py
+│ ├── src/shared/events.py
+│ ├── src/shared/broker.py
+│ ├── src/shared/db.py
+│ ├── src/shared/config.py
+│ ├── tests/test_models.py
+│ ├── tests/test_events.py
+│ ├── tests/test_broker.py
+│ ├── tests/test_db.py
+│ └── pyproject.toml
+├── cli/
+│ ├── src/trading_cli/__init__.py
+│ ├── src/trading_cli/main.py
+│ ├── src/trading_cli/commands/data.py
+│ ├── src/trading_cli/commands/trade.py
+│ ├── src/trading_cli/commands/backtest.py
+│ ├── src/trading_cli/commands/portfolio.py
+│ ├── src/trading_cli/commands/strategy.py
+│ ├── src/trading_cli/commands/service.py
+│ ├── tests/test_cli_data.py
+│ ├── tests/test_cli_trade.py
+│ └── pyproject.toml
+├── docker-compose.yml
+├── .env.example
+├── Makefile
+└── pyproject.toml (workspace root)
+```
+
+---
+
+## Task 1: Project Scaffolding
+
+**Files:**
+- Create: `pyproject.toml` (workspace root)
+- Create: `.env.example`
+- Create: `docker-compose.yml`
+- Create: `Makefile`
+- Create: `.gitignore`
+- Create: `shared/pyproject.toml`
+
+- [ ] **Step 1: Initialize git repo**
+
+```bash
+cd /home/si/Private/repos/trading
+git init
+```
+
+- [ ] **Step 2: Create .gitignore**
+
+Create `.gitignore`:
+
+```gitignore
+__pycache__/
+*.py[cod]
+*$py.class
+*.egg-info/
+dist/
+build/
+.eggs/
+*.egg
+.venv/
+venv/
+env/
+.env
+.mypy_cache/
+.pytest_cache/
+.ruff_cache/
+*.log
+.DS_Store
+```
+
+- [ ] **Step 3: Create workspace root pyproject.toml**
+
+Create `pyproject.toml`:
+
+```toml
+[project]
+name = "trading-platform"
+version = "0.1.0"
+description = "Binance spot crypto trading platform"
+requires-python = ">=3.12"
+
+[tool.pytest.ini_options]
+asyncio_mode = "auto"
+testpaths = ["shared/tests", "services/*/tests", "cli/tests"]
+
+[tool.ruff]
+target-version = "py312"
+line-length = 100
+```
+
+- [ ] **Step 4: Create .env.example**
+
+Create `.env.example`:
+
+```env
+BINANCE_API_KEY=
+BINANCE_API_SECRET=
+REDIS_URL=redis://localhost:6379
+DATABASE_URL=postgresql://trading:trading@localhost:5432/trading
+LOG_LEVEL=INFO
+RISK_MAX_POSITION_SIZE=0.1
+RISK_STOP_LOSS_PCT=5
+RISK_DAILY_LOSS_LIMIT_PCT=10
+DRY_RUN=true
+```
+
+- [ ] **Step 5: Create docker-compose.yml**
+
+Create `docker-compose.yml`:
+
+```yaml
+services:
+ redis:
+ image: redis:7-alpine
+ ports:
+ - "6379:6379"
+ volumes:
+ - redis_data:/data
+ healthcheck:
+ test: ["CMD", "redis-cli", "ping"]
+ interval: 5s
+ timeout: 3s
+ retries: 5
+
+ postgres:
+ image: postgres:16-alpine
+ ports:
+ - "5432:5432"
+ environment:
+ POSTGRES_USER: trading
+ POSTGRES_PASSWORD: trading
+ POSTGRES_DB: trading
+ volumes:
+ - postgres_data:/var/lib/postgresql/data
+ healthcheck:
+ test: ["CMD-LINE", "pg_isready", "-U", "trading"]
+ interval: 5s
+ timeout: 3s
+ retries: 5
+
+ data-collector:
+ build:
+ context: .
+ dockerfile: services/data-collector/Dockerfile
+ env_file: .env
+ depends_on:
+ redis:
+ condition: service_healthy
+ postgres:
+ condition: service_healthy
+ restart: unless-stopped
+
+ strategy-engine:
+ build:
+ context: .
+ dockerfile: services/strategy-engine/Dockerfile
+ env_file: .env
+ depends_on:
+ redis:
+ condition: service_healthy
+ postgres:
+ condition: service_healthy
+ restart: unless-stopped
+
+ order-executor:
+ build:
+ context: .
+ dockerfile: services/order-executor/Dockerfile
+ env_file: .env
+ depends_on:
+ redis:
+ condition: service_healthy
+ postgres:
+ condition: service_healthy
+ restart: unless-stopped
+
+ portfolio-manager:
+ build:
+ context: .
+ dockerfile: services/portfolio-manager/Dockerfile
+ env_file: .env
+ depends_on:
+ redis:
+ condition: service_healthy
+ postgres:
+ condition: service_healthy
+ restart: unless-stopped
+
+volumes:
+ redis_data:
+ postgres_data:
+```
+
+- [ ] **Step 6: Create Makefile**
+
+Create `Makefile`:
+
+```makefile
+.PHONY: infra up down logs test lint
+
+infra:
+ docker compose up -d redis postgres
+
+up:
+ docker compose up -d
+
+down:
+ docker compose down
+
+logs:
+ docker compose logs -f $(service)
+
+test:
+ pytest -v
+
+lint:
+ ruff check .
+ ruff format --check .
+
+format:
+ ruff check --fix .
+ ruff format .
+```
+
+- [ ] **Step 7: Create shared/pyproject.toml**
+
+Create `shared/pyproject.toml`:
+
+```toml
+[project]
+name = "trading-shared"
+version = "0.1.0"
+description = "Shared models, events, and utilities for trading platform"
+requires-python = ">=3.12"
+dependencies = [
+ "pydantic>=2.0",
+ "pydantic-settings>=2.0",
+ "redis>=5.0",
+ "asyncpg>=0.29",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest>=8.0",
+ "pytest-asyncio>=0.23",
+ "ruff>=0.4",
+]
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/shared"]
+```
+
+- [ ] **Step 8: Commit scaffolding**
+
+```bash
+git add .
+git commit -m "chore: project scaffolding with docker-compose, makefile, shared package"
+```
+
+---
+
+## Task 2: Shared — Config & Models
+
+**Files:**
+- Create: `shared/src/shared/__init__.py`
+- Create: `shared/src/shared/config.py`
+- Create: `shared/src/shared/models.py`
+- Create: `shared/tests/test_models.py`
+
+- [ ] **Step 1: Write failing test for config**
+
+Create `shared/tests/test_models.py`:
+
+```python
+from shared.config import Settings
+
+
+def test_settings_defaults():
+ settings = Settings(
+ binance_api_key="test_key",
+ binance_api_secret="test_secret",
+ )
+ assert settings.redis_url == "redis://localhost:6379"
+ assert settings.database_url == "postgresql://trading:trading@localhost:5432/trading"
+ assert settings.log_level == "INFO"
+ assert settings.dry_run is True
+```
+
+- [ ] **Step 2: Run test to verify it fails**
+
+```bash
+cd /home/si/Private/repos/trading
+pip install -e shared[dev]
+pytest shared/tests/test_models.py::test_settings_defaults -v
+```
+
+Expected: FAIL — `ModuleNotFoundError: No module named 'shared'`
+
+- [ ] **Step 3: Implement config**
+
+Create `shared/src/shared/__init__.py`:
+
+```python
+```
+
+Create `shared/src/shared/config.py`:
+
+```python
+from pydantic_settings import BaseSettings
+
+
+class Settings(BaseSettings):
+ binance_api_key: str
+ binance_api_secret: str
+ redis_url: str = "redis://localhost:6379"
+ database_url: str = "postgresql://trading:trading@localhost:5432/trading"
+ log_level: str = "INFO"
+ risk_max_position_size: float = 0.1
+ risk_stop_loss_pct: float = 5.0
+ risk_daily_loss_limit_pct: float = 10.0
+ dry_run: bool = True
+
+ model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}
+```
+
+- [ ] **Step 4: Run test to verify it passes**
+
+```bash
+pytest shared/tests/test_models.py::test_settings_defaults -v
+```
+
+Expected: PASS
+
+- [ ] **Step 5: Write failing tests for models**
+
+Append to `shared/tests/test_models.py`:
+
+```python
+from datetime import datetime, timezone
+from decimal import Decimal
+
+from shared.models import Candle, Signal, Order, Position, OrderSide, OrderType, OrderStatus
+
+
+def test_candle_creation():
+ candle = Candle(
+ symbol="BTCUSDT",
+ timeframe="1m",
+ open_time=datetime(2026, 1, 1, tzinfo=timezone.utc),
+ open=Decimal("50000"),
+ high=Decimal("50100"),
+ low=Decimal("49900"),
+ close=Decimal("50050"),
+ volume=Decimal("1.5"),
+ )
+ assert candle.symbol == "BTCUSDT"
+ assert candle.close == Decimal("50050")
+
+
+def test_signal_creation():
+ signal = Signal(
+ strategy="rsi_strategy",
+ symbol="BTCUSDT",
+ side=OrderSide.BUY,
+ price=Decimal("50000"),
+ quantity=Decimal("0.01"),
+ reason="RSI below 30",
+ )
+ assert signal.side == OrderSide.BUY
+ assert signal.reason == "RSI below 30"
+
+
+def test_order_creation():
+ order = Order(
+ symbol="BTCUSDT",
+ signal_id="sig_123",
+ side=OrderSide.BUY,
+ type=OrderType.MARKET,
+ price=Decimal("50000"),
+ quantity=Decimal("0.01"),
+ )
+ assert order.status == OrderStatus.PENDING
+ assert order.filled_at is None
+ assert order.id is not None
+
+
+def test_position_unrealized_pnl():
+ pos = Position(
+ symbol="BTCUSDT",
+ quantity=Decimal("0.1"),
+ avg_entry_price=Decimal("50000"),
+ current_price=Decimal("51000"),
+ )
+ assert pos.unrealized_pnl == Decimal("100") # 0.1 * (51000 - 50000)
+```
+
+- [ ] **Step 6: Run tests to verify they fail**
+
+```bash
+pytest shared/tests/test_models.py -v
+```
+
+Expected: FAIL — `ModuleNotFoundError: No module named 'shared.models'`
+
+- [ ] **Step 7: Implement models**
+
+Create `shared/src/shared/models.py`:
+
+```python
+from datetime import datetime, timezone
+from decimal import Decimal
+from enum import StrEnum
+from uuid import uuid4
+
+from pydantic import BaseModel, Field
+
+
+class OrderSide(StrEnum):
+ BUY = "BUY"
+ SELL = "SELL"
+
+
+class OrderType(StrEnum):
+ MARKET = "MARKET"
+ LIMIT = "LIMIT"
+
+
+class OrderStatus(StrEnum):
+ PENDING = "PENDING"
+ FILLED = "FILLED"
+ CANCELLED = "CANCELLED"
+ FAILED = "FAILED"
+
+
+class Candle(BaseModel):
+ symbol: str
+ timeframe: str
+ open_time: datetime
+ open: Decimal
+ high: Decimal
+ low: Decimal
+ close: Decimal
+ volume: Decimal
+
+
+class Signal(BaseModel):
+ id: str = Field(default_factory=lambda: str(uuid4()))
+ strategy: str
+ symbol: str
+ side: OrderSide
+ price: Decimal
+ quantity: Decimal
+ reason: str
+ created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
+
+
+class Order(BaseModel):
+ id: str = Field(default_factory=lambda: str(uuid4()))
+ signal_id: str
+ symbol: str
+ side: OrderSide
+ type: OrderType
+ price: Decimal
+ quantity: Decimal
+ status: OrderStatus = OrderStatus.PENDING
+ created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
+ filled_at: datetime | None = None
+
+
+class Position(BaseModel):
+ symbol: str
+ quantity: Decimal
+ avg_entry_price: Decimal
+ current_price: Decimal
+
+ @property
+ def unrealized_pnl(self) -> Decimal:
+ return self.quantity * (self.current_price - self.avg_entry_price)
+```
+
+- [ ] **Step 8: Run tests to verify they pass**
+
+```bash
+pytest shared/tests/test_models.py -v
+```
+
+Expected: All PASS
+
+- [ ] **Step 9: Commit**
+
+```bash
+git add shared/
+git commit -m "feat(shared): add config settings and core data models"
+```
+
+---
+
+## Task 3: Shared — Events & Redis Broker
+
+**Files:**
+- Create: `shared/src/shared/events.py`
+- Create: `shared/src/shared/broker.py`
+- Create: `shared/tests/test_events.py`
+- Create: `shared/tests/test_broker.py`
+
+- [ ] **Step 1: Write failing tests for events**
+
+Create `shared/tests/test_events.py`:
+
+```python
+import json
+from decimal import Decimal
+from datetime import datetime, timezone
+
+from shared.events import EventType, Event, CandleEvent, SignalEvent, OrderEvent
+from shared.models import Candle, Signal, Order, OrderSide, OrderType
+
+
+def test_candle_event_serialize():
+ candle = Candle(
+ symbol="BTCUSDT",
+ timeframe="1m",
+ open_time=datetime(2026, 1, 1, tzinfo=timezone.utc),
+ open=Decimal("50000"),
+ high=Decimal("50100"),
+ low=Decimal("49900"),
+ close=Decimal("50050"),
+ volume=Decimal("1.5"),
+ )
+ event = CandleEvent(data=candle)
+ payload = event.to_dict()
+ assert payload["type"] == EventType.CANDLE
+ assert payload["data"]["symbol"] == "BTCUSDT"
+
+
+def test_candle_event_deserialize():
+ candle = Candle(
+ symbol="BTCUSDT",
+ timeframe="1m",
+ open_time=datetime(2026, 1, 1, tzinfo=timezone.utc),
+ open=Decimal("50000"),
+ high=Decimal("50100"),
+ low=Decimal("49900"),
+ close=Decimal("50050"),
+ volume=Decimal("1.5"),
+ )
+ event = CandleEvent(data=candle)
+ payload = event.to_dict()
+ restored = Event.from_dict(payload)
+ assert isinstance(restored, CandleEvent)
+ assert restored.data.symbol == "BTCUSDT"
+
+
+def test_signal_event_serialize():
+ signal = Signal(
+ strategy="rsi",
+ symbol="BTCUSDT",
+ side=OrderSide.BUY,
+ price=Decimal("50000"),
+ quantity=Decimal("0.01"),
+ reason="RSI < 30",
+ )
+ event = SignalEvent(data=signal)
+ payload = event.to_dict()
+ assert payload["type"] == EventType.SIGNAL
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+```bash
+pytest shared/tests/test_events.py -v
+```
+
+Expected: FAIL
+
+- [ ] **Step 3: Implement events**
+
+Create `shared/src/shared/events.py`:
+
+```python
+from __future__ import annotations
+
+import json
+from enum import StrEnum
+from typing import Any
+
+from pydantic import BaseModel
+
+from shared.models import Candle, Signal, Order
+
+
+class EventType(StrEnum):
+ CANDLE = "candle"
+ SIGNAL = "signal"
+ ORDER = "order"
+
+
+class CandleEvent(BaseModel):
+ type: EventType = EventType.CANDLE
+ data: Candle
+
+ def to_dict(self) -> dict[str, Any]:
+ return json.loads(self.model_dump_json())
+
+ @classmethod
+ def from_raw(cls, raw: dict[str, Any]) -> CandleEvent:
+ return cls.model_validate(raw)
+
+
+class SignalEvent(BaseModel):
+ type: EventType = EventType.SIGNAL
+ data: Signal
+
+ def to_dict(self) -> dict[str, Any]:
+ return json.loads(self.model_dump_json())
+
+ @classmethod
+ def from_raw(cls, raw: dict[str, Any]) -> SignalEvent:
+ return cls.model_validate(raw)
+
+
+class OrderEvent(BaseModel):
+ type: EventType = EventType.ORDER
+ data: Order
+
+ def to_dict(self) -> dict[str, Any]:
+ return json.loads(self.model_dump_json())
+
+ @classmethod
+ def from_raw(cls, raw: dict[str, Any]) -> OrderEvent:
+ return cls.model_validate(raw)
+
+
+_EVENT_MAP = {
+ EventType.CANDLE: CandleEvent,
+ EventType.SIGNAL: SignalEvent,
+ EventType.ORDER: OrderEvent,
+}
+
+
+class Event:
+ @staticmethod
+ def from_dict(data: dict[str, Any]) -> CandleEvent | SignalEvent | OrderEvent:
+ event_type = EventType(data["type"])
+ cls = _EVENT_MAP[event_type]
+ return cls.from_raw(data)
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+```bash
+pytest shared/tests/test_events.py -v
+```
+
+Expected: All PASS
+
+- [ ] **Step 5: Write failing tests for broker**
+
+Create `shared/tests/test_broker.py`:
+
+```python
+import asyncio
+import pytest
+from unittest.mock import AsyncMock, MagicMock, patch
+
+from shared.broker import RedisBroker
+
+
+@pytest.fixture
+def mock_redis():
+ redis = AsyncMock()
+ redis.xadd = AsyncMock(return_value=b"1234-0")
+ redis.xread = AsyncMock(return_value=[])
+ redis.close = AsyncMock()
+ return redis
+
+
+@pytest.mark.asyncio
+async def test_broker_publish(mock_redis):
+ broker = RedisBroker.__new__(RedisBroker)
+ broker._redis = mock_redis
+
+ await broker.publish("candles.BTCUSDT", {"type": "candle", "data": "test"})
+
+ mock_redis.xadd.assert_called_once()
+ call_args = mock_redis.xadd.call_args
+ assert call_args[0][0] == "candles.BTCUSDT"
+
+
+@pytest.mark.asyncio
+async def test_broker_subscribe_returns_messages(mock_redis):
+ mock_redis.xread = AsyncMock(return_value=[
+ ("candles.BTCUSDT", [
+ (b"1234-0", {b"payload": b'{"type":"candle","data":"test"}'}),
+ ])
+ ])
+ broker = RedisBroker.__new__(RedisBroker)
+ broker._redis = mock_redis
+
+ messages = await broker.read("candles.BTCUSDT", last_id="0-0", count=1)
+ assert len(messages) == 1
+ assert messages[0]["type"] == "candle"
+```
+
+- [ ] **Step 6: Run tests to verify they fail**
+
+```bash
+pytest shared/tests/test_broker.py -v
+```
+
+Expected: FAIL
+
+- [ ] **Step 7: Implement broker**
+
+Create `shared/src/shared/broker.py`:
+
+```python
+from __future__ import annotations
+
+import json
+
+import redis.asyncio as redis
+
+
+class RedisBroker:
+ def __init__(self, redis_url: str):
+ self._redis = redis.from_url(redis_url, decode_responses=False)
+
+ async def publish(self, stream: str, data: dict) -> str:
+ payload = json.dumps(data)
+ msg_id = await self._redis.xadd(stream, {"payload": payload.encode()})
+ return msg_id
+
+ async def read(
+ self, stream: str, last_id: str = "$", count: int = 10, block: int = 0
+ ) -> list[dict]:
+ results = await self._redis.xread({stream: last_id}, count=count, block=block)
+ messages = []
+ for _stream_name, entries in results:
+ for _msg_id, fields in entries:
+ payload = fields[b"payload"]
+ messages.append(json.loads(payload))
+ return messages
+
+ async def close(self):
+ await self._redis.close()
+```
+
+- [ ] **Step 8: Run tests to verify they pass**
+
+```bash
+pytest shared/tests/test_broker.py -v
+```
+
+Expected: All PASS
+
+- [ ] **Step 9: Commit**
+
+```bash
+git add shared/
+git commit -m "feat(shared): add event system and Redis Streams broker"
+```
+
+---
+
+## Task 4: Shared — Database Layer
+
+**Files:**
+- Create: `shared/src/shared/db.py`
+- Create: `shared/tests/test_db.py`
+
+- [ ] **Step 1: Write failing tests for DB**
+
+Create `shared/tests/test_db.py`:
+
+```python
+import pytest
+from unittest.mock import AsyncMock, patch, MagicMock
+
+from shared.db import Database
+
+
+@pytest.mark.asyncio
+async def test_db_init_sql_creates_tables():
+ db = Database.__new__(Database)
+ db._pool = AsyncMock()
+ mock_conn = AsyncMock()
+ db._pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
+ db._pool.acquire.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ await db.init_tables()
+
+ mock_conn.execute.assert_called()
+ sql = mock_conn.execute.call_args[0][0]
+ assert "candles" in sql
+ assert "signals" in sql
+ assert "orders" in sql
+ assert "trades" in sql
+ assert "positions" in sql
+ assert "portfolio_snapshots" in sql
+
+
+@pytest.mark.asyncio
+async def test_db_insert_candle():
+ db = Database.__new__(Database)
+ db._pool = AsyncMock()
+ mock_conn = AsyncMock()
+ db._pool.acquire.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
+ db._pool.acquire.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ from datetime import datetime, timezone
+ from decimal import Decimal
+ from shared.models import Candle
+
+ candle = Candle(
+ symbol="BTCUSDT",
+ timeframe="1m",
+ open_time=datetime(2026, 1, 1, tzinfo=timezone.utc),
+ open=Decimal("50000"),
+ high=Decimal("50100"),
+ low=Decimal("49900"),
+ close=Decimal("50050"),
+ volume=Decimal("1.5"),
+ )
+
+ await db.insert_candle(candle)
+ mock_conn.execute.assert_called_once()
+ sql = mock_conn.execute.call_args[0][0]
+ assert "INSERT INTO candles" in sql
+```
+
+- [ ] **Step 2: Run tests to verify they fail**
+
+```bash
+pytest shared/tests/test_db.py -v
+```
+
+Expected: FAIL
+
+- [ ] **Step 3: Implement database layer**
+
+Create `shared/src/shared/db.py`:
+
+```python
+from __future__ import annotations
+
+import asyncpg
+
+from shared.models import Candle, Order, Signal
+
+_INIT_SQL = """
+CREATE TABLE IF NOT EXISTS candles (
+ symbol TEXT NOT NULL,
+ timeframe TEXT NOT NULL,
+ open_time TIMESTAMPTZ NOT NULL,
+ open NUMERIC NOT NULL,
+ high NUMERIC NOT NULL,
+ low NUMERIC NOT NULL,
+ close NUMERIC NOT NULL,
+ volume NUMERIC NOT NULL,
+ PRIMARY KEY (symbol, timeframe, open_time)
+);
+
+CREATE TABLE IF NOT EXISTS signals (
+ id TEXT PRIMARY KEY,
+ strategy TEXT NOT NULL,
+ symbol TEXT NOT NULL,
+ side TEXT NOT NULL,
+ price NUMERIC NOT NULL,
+ quantity NUMERIC NOT NULL,
+ reason TEXT NOT NULL,
+ created_at TIMESTAMPTZ NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS orders (
+ id TEXT PRIMARY KEY,
+ signal_id TEXT REFERENCES signals(id),
+ symbol TEXT NOT NULL,
+ side TEXT NOT NULL,
+ type TEXT NOT NULL,
+ price NUMERIC NOT NULL,
+ quantity NUMERIC NOT NULL,
+ status TEXT NOT NULL DEFAULT 'PENDING',
+ created_at TIMESTAMPTZ NOT NULL,
+ filled_at TIMESTAMPTZ
+);
+
+CREATE TABLE IF NOT EXISTS trades (
+ id TEXT PRIMARY KEY,
+ order_id TEXT REFERENCES orders(id),
+ symbol TEXT NOT NULL,
+ side TEXT NOT NULL,
+ price NUMERIC NOT NULL,
+ quantity NUMERIC NOT NULL,
+ fee NUMERIC NOT NULL DEFAULT 0,
+ traded_at TIMESTAMPTZ NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS positions (
+ symbol TEXT PRIMARY KEY,
+ quantity NUMERIC NOT NULL,
+ avg_entry_price NUMERIC NOT NULL,
+ current_price NUMERIC NOT NULL,
+ updated_at TIMESTAMPTZ NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS portfolio_snapshots (
+ id SERIAL PRIMARY KEY,
+ total_value NUMERIC NOT NULL,
+ realized_pnl NUMERIC NOT NULL,
+ unrealized_pnl NUMERIC NOT NULL,
+ snapshot_at TIMESTAMPTZ NOT NULL
+);
+"""
+
+
+class Database:
+ def __init__(self, dsn: str):
+ self._dsn = dsn
+ self._pool: asyncpg.Pool | None = None
+
+ async def connect(self):
+ self._pool = await asyncpg.create_pool(self._dsn)
+
+ async def close(self):
+ if self._pool:
+ await self._pool.close()
+
+ async def init_tables(self):
+ async with self._pool.acquire() as conn:
+ await conn.execute(_INIT_SQL)
+
+ async def insert_candle(self, candle: Candle):
+ sql = """
+ INSERT INTO candles (symbol, timeframe, open_time, open, high, low, close, volume)
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
+ ON CONFLICT (symbol, timeframe, open_time) DO NOTHING
+ """
+ async with self._pool.acquire() as conn:
+ await conn.execute(
+ sql,
+ candle.symbol,
+ candle.timeframe,
+ candle.open_time,
+ candle.open,
+ candle.high,
+ candle.low,
+ candle.close,
+ candle.volume,
+ )
+
+ async def insert_signal(self, signal: Signal):
+ sql = """
+ INSERT INTO signals (id, strategy, symbol, side, price, quantity, reason, created_at)
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
+ """
+ async with self._pool.acquire() as conn:
+ await conn.execute(
+ sql,
+ signal.id,
+ signal.strategy,
+ signal.symbol,
+ signal.side.value,
+ signal.price,
+ signal.quantity,
+ signal.reason,
+ signal.created_at,
+ )
+
+ async def insert_order(self, order: Order):
+ sql = """
+ INSERT INTO orders (id, signal_id, symbol, side, type, price, quantity, status, created_at)
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
+ """
+ async with self._pool.acquire() as conn:
+ await conn.execute(
+ sql,
+ order.id,
+ order.signal_id,
+ order.symbol,
+ order.side.value,
+ order.type.value,
+ order.price,
+ order.quantity,
+ order.status.value,
+ order.created_at,
+ )
+
+ async def update_order_status(self, order_id: str, status: str, filled_at=None):
+ sql = "UPDATE orders SET status = $1, filled_at = $2 WHERE id = $3"
+ async with self._pool.acquire() as conn:
+ await conn.execute(sql, status, filled_at, order_id)
+
+ async def get_candles(self, symbol: str, timeframe: str, limit: int = 500) -> list[dict]:
+ sql = """
+ SELECT * FROM candles
+ WHERE symbol = $1 AND timeframe = $2
+ ORDER BY open_time DESC
+ LIMIT $3
+ """
+ async with self._pool.acquire() as conn:
+ rows = await conn.fetch(sql, symbol, timeframe, limit)
+ return [dict(r) for r in rows]
+```
+
+- [ ] **Step 4: Run tests to verify they pass**
+
+```bash
+pytest shared/tests/test_db.py -v
+```
+
+Expected: All PASS
+
+- [ ] **Step 5: Commit**
+
+```bash
+git add shared/
+git commit -m "feat(shared): add database layer with table init and CRUD operations"
+```
+
+---
+
+## Task 5: Data Collector Service
+
+**Files:**
+- Create: `services/data-collector/pyproject.toml`
+- Create: `services/data-collector/Dockerfile`
+- Create: `services/data-collector/src/data_collector/__init__.py`
+- Create: `services/data-collector/src/data_collector/config.py`
+- Create: `services/data-collector/src/data_collector/binance_rest.py`
+- Create: `services/data-collector/src/data_collector/binance_ws.py`
+- Create: `services/data-collector/src/data_collector/storage.py`
+- Create: `services/data-collector/src/data_collector/main.py`
+- Create: `services/data-collector/tests/test_binance_rest.py`
+- Create: `services/data-collector/tests/test_storage.py`
+
+- [ ] **Step 1: Create pyproject.toml**
+
+Create `services/data-collector/pyproject.toml`:
+
+```toml
+[project]
+name = "data-collector"
+version = "0.1.0"
+description = "Binance market data collector service"
+requires-python = ">=3.12"
+dependencies = [
+ "ccxt>=4.0",
+ "websockets>=12.0",
+ "trading-shared",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest>=8.0",
+ "pytest-asyncio>=0.23",
+]
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/data_collector"]
+```
+
+- [ ] **Step 2: Write failing tests for binance_rest**
+
+Create `services/data-collector/tests/test_binance_rest.py`:
+
+```python
+import pytest
+from unittest.mock import AsyncMock, patch, MagicMock
+from datetime import datetime, timezone
+from decimal import Decimal
+
+from data_collector.binance_rest import fetch_historical_candles
+
+
+@pytest.mark.asyncio
+async def test_fetch_historical_candles_parses_response():
+ mock_exchange = MagicMock()
+ mock_exchange.fetch_ohlcv = AsyncMock(return_value=[
+ [1704067200000, 50000.0, 50100.0, 49900.0, 50050.0, 1.5],
+ [1704067260000, 50050.0, 50200.0, 50000.0, 50150.0, 2.0],
+ ])
+
+ candles = await fetch_historical_candles(
+ exchange=mock_exchange,
+ symbol="BTC/USDT",
+ timeframe="1m",
+ since=datetime(2026, 1, 1, tzinfo=timezone.utc),
+ limit=2,
+ )
+
+ assert len(candles) == 2
+ assert candles[0].symbol == "BTCUSDT"
+ assert candles[0].close == Decimal("50050.0")
+ assert candles[1].volume == Decimal("2.0")
+
+
+@pytest.mark.asyncio
+async def test_fetch_historical_candles_empty_response():
+ mock_exchange = MagicMock()
+ mock_exchange.fetch_ohlcv = AsyncMock(return_value=[])
+
+ candles = await fetch_historical_candles(
+ exchange=mock_exchange,
+ symbol="BTC/USDT",
+ timeframe="1m",
+ since=datetime(2026, 1, 1, tzinfo=timezone.utc),
+ limit=100,
+ )
+
+ assert candles == []
+```
+
+- [ ] **Step 3: Run tests to verify they fail**
+
+```bash
+cd /home/si/Private/repos/trading
+pip install -e services/data-collector[dev]
+pytest services/data-collector/tests/test_binance_rest.py -v
+```
+
+Expected: FAIL
+
+- [ ] **Step 4: Implement binance_rest**
+
+Create `services/data-collector/src/data_collector/__init__.py`:
+
+```python
+```
+
+Create `services/data-collector/src/data_collector/binance_rest.py`:
+
+```python
+from __future__ import annotations
+
+from datetime import datetime, timezone
+from decimal import Decimal
+
+from shared.models import Candle
+
+
+async def fetch_historical_candles(
+ exchange,
+ symbol: str,
+ timeframe: str,
+ since: datetime,
+ limit: int = 500,
+) -> list[Candle]:
+ since_ms = int(since.timestamp() * 1000)
+ ohlcv = await exchange.fetch_ohlcv(symbol, timeframe, since=since_ms, limit=limit)
+
+ normalized_symbol = symbol.replace("/", "")
+ candles = []
+ for row in ohlcv:
+ ts, o, h, l, c, v = row
+ candles.append(
+ Candle(
+ symbol=normalized_symbol,
+ timeframe=timeframe,
+ open_time=datetime.fromtimestamp(ts / 1000, tz=timezone.utc),
+ open=Decimal(str(o)),
+ high=Decimal(str(h)),
+ low=Decimal(str(l)),
+ close=Decimal(str(c)),
+ volume=Decimal(str(v)),
+ )
+ )
+ return candles
+```
+
+- [ ] **Step 5: Run tests to verify they pass**
+
+```bash
+pytest services/data-collector/tests/test_binance_rest.py -v
+```
+
+Expected: All PASS
+
+- [ ] **Step 6: Write failing tests for storage**
+
+Create `services/data-collector/tests/test_storage.py`:
+
+```python
+import pytest
+from unittest.mock import AsyncMock
+from datetime import datetime, timezone
+from decimal import Decimal
+
+from shared.models import Candle
+from data_collector.storage import CandleStorage
+
+
+@pytest.fixture
+def mock_db():
+ db = AsyncMock()
+ db.insert_candle = AsyncMock()
+ return db
+
+
+@pytest.fixture
+def mock_broker():
+ broker = AsyncMock()
+ broker.publish = AsyncMock()
+ return broker
+
+
+@pytest.fixture
+def sample_candle():
+ return Candle(
+ symbol="BTCUSDT",
+ timeframe="1m",
+ open_time=datetime(2026, 1, 1, tzinfo=timezone.utc),
+ open=Decimal("50000"),
+ high=Decimal("50100"),
+ low=Decimal("49900"),
+ close=Decimal("50050"),
+ volume=Decimal("1.5"),
+ )
+
+
+@pytest.mark.asyncio
+async def test_storage_saves_to_db_and_publishes(mock_db, mock_broker, sample_candle):
+ storage = CandleStorage(db=mock_db, broker=mock_broker)
+ await storage.store(sample_candle)
+
+ mock_db.insert_candle.assert_called_once_with(sample_candle)
+ mock_broker.publish.assert_called_once()
+ call_args = mock_broker.publish.call_args
+ assert call_args[0][0] == "candles.BTCUSDT"
+
+
+@pytest.mark.asyncio
+async def test_storage_batch_store(mock_db, mock_broker, sample_candle):
+ storage = CandleStorage(db=mock_db, broker=mock_broker)
+ candles = [sample_candle, sample_candle]
+ await storage.store_batch(candles)
+
+ assert mock_db.insert_candle.call_count == 2
+ assert mock_broker.publish.call_count == 2
+```
+
+- [ ] **Step 7: Run tests to verify they fail**
+
+```bash
+pytest services/data-collector/tests/test_storage.py -v
+```
+
+Expected: FAIL
+
+- [ ] **Step 8: Implement storage**
+
+Create `services/data-collector/src/data_collector/storage.py`:
+
+```python
+from __future__ import annotations
+
+from shared.broker import RedisBroker
+from shared.db import Database
+from shared.events import CandleEvent
+from shared.models import Candle
+
+
+class CandleStorage:
+ def __init__(self, db: Database, broker: RedisBroker):
+ self._db = db
+ self._broker = broker
+
+ async def store(self, candle: Candle):
+ await self._db.insert_candle(candle)
+ event = CandleEvent(data=candle)
+ await self._broker.publish(f"candles.{candle.symbol}", event.to_dict())
+
+ async def store_batch(self, candles: list[Candle]):
+ for candle in candles:
+ await self.store(candle)
+```
+
+- [ ] **Step 9: Run tests to verify they pass**
+
+```bash
+pytest services/data-collector/tests/test_storage.py -v
+```
+
+Expected: All PASS
+
+- [ ] **Step 10: Implement config, binance_ws, and main**
+
+Create `services/data-collector/src/data_collector/config.py`:
+
+```python
+from shared.config import Settings
+
+
+class CollectorConfig(Settings):
+ symbols: list[str] = ["BTC/USDT"]
+ timeframes: list[str] = ["1m"]
+```
+
+Create `services/data-collector/src/data_collector/binance_ws.py`:
+
+```python
+from __future__ import annotations
+
+import asyncio
+import json
+import logging
+from datetime import datetime, timezone
+from decimal import Decimal
+
+import websockets
+
+from shared.models import Candle
+
+logger = logging.getLogger(__name__)
+
+BINANCE_WS_URL = "wss://stream.binance.com:9443/ws"
+
+
+class BinanceWebSocket:
+ def __init__(self, symbols: list[str], timeframe: str, on_candle):
+ self._symbols = symbols
+ self._timeframe = timeframe
+ self._on_candle = on_candle
+ self._running = False
+
+ async def start(self):
+ streams = [
+ f"{s.lower().replace('/', '')}@kline_{self._timeframe}"
+ for s in self._symbols
+ ]
+ url = f"{BINANCE_WS_URL}/{'/'.join(streams)}"
+ self._running = True
+ logger.info(f"Connecting to Binance WS: {streams}")
+
+ while self._running:
+ try:
+ async with websockets.connect(url) as ws:
+ async for raw in ws:
+ if not self._running:
+ break
+ msg = json.loads(raw)
+ if "k" in msg:
+ candle = self._parse_kline(msg["k"])
+ if candle:
+ await self._on_candle(candle)
+ except websockets.ConnectionClosed:
+ logger.warning("WebSocket disconnected, reconnecting in 5s...")
+ await asyncio.sleep(5)
+ except Exception as e:
+ logger.error(f"WebSocket error: {e}, reconnecting in 5s...")
+ await asyncio.sleep(5)
+
+ def stop(self):
+ self._running = False
+
+ def _parse_kline(self, k: dict) -> Candle | None:
+ if not k.get("x"): # only closed candles
+ return None
+ return Candle(
+ symbol=k["s"],
+ timeframe=k["i"],
+ open_time=datetime.fromtimestamp(k["t"] / 1000, tz=timezone.utc),
+ open=Decimal(k["o"]),
+ high=Decimal(k["h"]),
+ low=Decimal(k["l"]),
+ close=Decimal(k["c"]),
+ volume=Decimal(k["v"]),
+ )
+```
+
+Create `services/data-collector/src/data_collector/main.py`:
+
+```python
+from __future__ import annotations
+
+import asyncio
+import logging
+
+import ccxt.async_support as ccxt
+
+from shared.broker import RedisBroker
+from shared.db import Database
+from data_collector.binance_ws import BinanceWebSocket
+from data_collector.config import CollectorConfig
+from data_collector.storage import CandleStorage
+
+logger = logging.getLogger(__name__)
+
+
+async def run():
+ config = CollectorConfig()
+ logging.basicConfig(level=config.log_level)
+
+ db = Database(config.database_url)
+ await db.connect()
+ await db.init_tables()
+
+ broker = RedisBroker(config.redis_url)
+ storage = CandleStorage(db=db, broker=broker)
+
+ ws = BinanceWebSocket(
+ symbols=config.symbols,
+ timeframe=config.timeframes[0],
+ on_candle=storage.store,
+ )
+
+ logger.info(f"Starting data collector: symbols={config.symbols}")
+ try:
+ await ws.start()
+ finally:
+ ws.stop()
+ await broker.close()
+ await db.close()
+
+
+def main():
+ asyncio.run(run())
+
+
+if __name__ == "__main__":
+ main()
+```
+
+- [ ] **Step 11: Create Dockerfile**
+
+Create `services/data-collector/Dockerfile`:
+
+```dockerfile
+FROM python:3.12-slim
+
+WORKDIR /app
+
+COPY shared/ shared/
+RUN pip install --no-cache-dir ./shared
+
+COPY services/data-collector/ services/data-collector/
+RUN pip install --no-cache-dir ./services/data-collector
+
+CMD ["python", "-m", "data_collector.main"]
+```
+
+- [ ] **Step 12: Commit**
+
+```bash
+git add services/data-collector/
+git commit -m "feat(data-collector): add Binance REST/WS data collection with storage pipeline"
+```
+
+---
+
+## Task 6: Strategy Engine Service
+
+**Files:**
+- Create: `services/strategy-engine/pyproject.toml`
+- Create: `services/strategy-engine/Dockerfile`
+- Create: `services/strategy-engine/src/strategy_engine/__init__.py`
+- Create: `services/strategy-engine/src/strategy_engine/config.py`
+- Create: `services/strategy-engine/strategies/base.py`
+- Create: `services/strategy-engine/strategies/rsi_strategy.py`
+- Create: `services/strategy-engine/strategies/grid_strategy.py`
+- Create: `services/strategy-engine/src/strategy_engine/plugin_loader.py`
+- Create: `services/strategy-engine/src/strategy_engine/engine.py`
+- Create: `services/strategy-engine/src/strategy_engine/main.py`
+- Create: `services/strategy-engine/tests/test_rsi_strategy.py`
+- Create: `services/strategy-engine/tests/test_grid_strategy.py`
+- Create: `services/strategy-engine/tests/test_plugin_loader.py`
+- Create: `services/strategy-engine/tests/test_engine.py`
+
+- [ ] **Step 1: Create pyproject.toml**
+
+Create `services/strategy-engine/pyproject.toml`:
+
+```toml
+[project]
+name = "strategy-engine"
+version = "0.1.0"
+description = "Plugin-based strategy execution engine"
+requires-python = ">=3.12"
+dependencies = [
+ "pandas>=2.0",
+ "pandas-ta>=0.3",
+ "trading-shared",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest>=8.0",
+ "pytest-asyncio>=0.23",
+]
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/strategy_engine"]
+```
+
+- [ ] **Step 2: Implement base strategy**
+
+Create `services/strategy-engine/src/strategy_engine/__init__.py`:
+
+```python
+```
+
+Create `services/strategy-engine/strategies/base.py`:
+
+```python
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+
+from shared.models import Candle, Signal
+
+
+class BaseStrategy(ABC):
+ name: str = "base"
+
+ @abstractmethod
+ def on_candle(self, candle: Candle) -> Signal | None:
+ pass
+
+ @abstractmethod
+ def configure(self, params: dict) -> None:
+ pass
+
+ def reset(self) -> None:
+ pass
+```
+
+- [ ] **Step 3: Write failing tests for RSI strategy**
+
+Create `services/strategy-engine/tests/test_rsi_strategy.py`:
+
+```python
+from datetime import datetime, timezone
+from decimal import Decimal
+
+from shared.models import Candle, OrderSide
+
+
+def make_candle(close: float, idx: int = 0) -> Candle:
+ return Candle(
+ symbol="BTCUSDT",
+ timeframe="1m",
+ open_time=datetime(2026, 1, 1, minute=idx, tzinfo=timezone.utc),
+ open=Decimal(str(close)),
+ high=Decimal(str(close + 10)),
+ low=Decimal(str(close - 10)),
+ close=Decimal(str(close)),
+ volume=Decimal("1.0"),
+ )
+
+
+def test_rsi_strategy_no_signal_insufficient_data():
+ from strategy_engine.strategies.rsi_strategy import RsiStrategy
+
+ strategy = RsiStrategy()
+ strategy.configure({"period": 14, "oversold": 30, "overbought": 70, "quantity": 0.01})
+
+ signal = strategy.on_candle(make_candle(50000))
+ assert signal is None
+
+
+def test_rsi_strategy_buy_signal_on_oversold():
+ from strategy_engine.strategies.rsi_strategy import RsiStrategy
+
+ strategy = RsiStrategy()
+ strategy.configure({"period": 14, "oversold": 30, "overbought": 70, "quantity": 0.01})
+
+ # Feed declining prices to push RSI below 30
+ prices = [50000 - i * 100 for i in range(20)]
+ signal = None
+ for i, p in enumerate(prices):
+ signal = strategy.on_candle(make_candle(p, idx=i))
+
+ # After sustained drop, RSI should be oversold → BUY signal
+ if signal is not None:
+ assert signal.side == OrderSide.BUY
+ assert signal.strategy == "rsi"
+```
+
+- [ ] **Step 4: Run tests to verify they fail**
+
+```bash
+pip install -e services/strategy-engine[dev]
+pytest services/strategy-engine/tests/test_rsi_strategy.py -v
+```
+
+Expected: FAIL
+
+- [ ] **Step 5: Implement RSI strategy**
+
+Create `services/strategy-engine/strategies/rsi_strategy.py`:
+
+```python
+from __future__ import annotations
+
+from collections import deque
+from decimal import Decimal
+
+import pandas as pd
+import pandas_ta as ta
+
+from shared.models import Candle, Signal, OrderSide
+from strategies.base import BaseStrategy
+
+
+class RsiStrategy(BaseStrategy):
+ name = "rsi"
+
+ def __init__(self):
+ self._closes: deque[float] = deque(maxlen=200)
+ self._period: int = 14
+ self._oversold: float = 30
+ self._overbought: float = 70
+ self._quantity: Decimal = Decimal("0.01")
+
+ def configure(self, params: dict) -> None:
+ self._period = params.get("period", 14)
+ self._oversold = params.get("oversold", 30)
+ self._overbought = params.get("overbought", 70)
+ self._quantity = Decimal(str(params.get("quantity", 0.01)))
+
+ def on_candle(self, candle: Candle) -> Signal | None:
+ self._closes.append(float(candle.close))
+
+ if len(self._closes) < self._period + 1:
+ return None
+
+ series = pd.Series(list(self._closes))
+ rsi = ta.rsi(series, length=self._period)
+ current_rsi = rsi.iloc[-1]
+
+ if current_rsi < self._oversold:
+ return Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.BUY,
+ price=candle.close,
+ quantity=self._quantity,
+ reason=f"RSI={current_rsi:.1f} < {self._oversold}",
+ )
+ elif current_rsi > self._overbought:
+ return Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.SELL,
+ price=candle.close,
+ quantity=self._quantity,
+ reason=f"RSI={current_rsi:.1f} > {self._overbought}",
+ )
+ return None
+
+ def reset(self) -> None:
+ self._closes.clear()
+```
+
+- [ ] **Step 6: Run tests to verify they pass**
+
+```bash
+pytest services/strategy-engine/tests/test_rsi_strategy.py -v
+```
+
+Expected: All PASS
+
+- [ ] **Step 7: Write failing tests for grid strategy**
+
+Create `services/strategy-engine/tests/test_grid_strategy.py`:
+
+```python
+from datetime import datetime, timezone
+from decimal import Decimal
+
+from shared.models import Candle, OrderSide
+
+
+def make_candle(close: float, idx: int = 0) -> Candle:
+ return Candle(
+ symbol="BTCUSDT",
+ timeframe="1m",
+ open_time=datetime(2026, 1, 1, minute=idx, tzinfo=timezone.utc),
+ open=Decimal(str(close)),
+ high=Decimal(str(close + 10)),
+ low=Decimal(str(close - 10)),
+ close=Decimal(str(close)),
+ volume=Decimal("1.0"),
+ )
+
+
+def test_grid_strategy_buy_at_lower_grid():
+ from strategy_engine.strategies.grid_strategy import GridStrategy
+
+ strategy = GridStrategy()
+ strategy.configure({
+ "lower_price": 48000,
+ "upper_price": 52000,
+ "grid_count": 5,
+ "quantity": 0.01,
+ })
+
+ # Price at grid level should trigger BUY
+ signal = strategy.on_candle(make_candle(48000))
+ # First candle sets reference, no signal
+ signal = strategy.on_candle(make_candle(49000, idx=1))
+ # Moving down through a grid level
+ signal = strategy.on_candle(make_candle(48000, idx=2))
+ if signal is not None:
+ assert signal.side == OrderSide.BUY
+
+
+def test_grid_strategy_sell_at_upper_grid():
+ from strategy_engine.strategies.grid_strategy import GridStrategy
+
+ strategy = GridStrategy()
+ strategy.configure({
+ "lower_price": 48000,
+ "upper_price": 52000,
+ "grid_count": 5,
+ "quantity": 0.01,
+ })
+
+ signal = strategy.on_candle(make_candle(50000))
+ signal = strategy.on_candle(make_candle(51000, idx=1))
+ signal = strategy.on_candle(make_candle(52000, idx=2))
+ if signal is not None:
+ assert signal.side == OrderSide.SELL
+
+
+def test_grid_strategy_no_signal_in_same_zone():
+ from strategy_engine.strategies.grid_strategy import GridStrategy
+
+ strategy = GridStrategy()
+ strategy.configure({
+ "lower_price": 48000,
+ "upper_price": 52000,
+ "grid_count": 5,
+ "quantity": 0.01,
+ })
+
+ strategy.on_candle(make_candle(50000))
+ signal = strategy.on_candle(make_candle(50050, idx=1))
+ assert signal is None # same grid zone, no signal
+```
+
+- [ ] **Step 8: Run tests to verify they fail**
+
+```bash
+pytest services/strategy-engine/tests/test_grid_strategy.py -v
+```
+
+Expected: FAIL
+
+- [ ] **Step 9: Implement grid strategy**
+
+Create `services/strategy-engine/strategies/grid_strategy.py`:
+
+```python
+from __future__ import annotations
+
+from decimal import Decimal
+
+from shared.models import Candle, Signal, OrderSide
+from strategies.base import BaseStrategy
+
+
+class GridStrategy(BaseStrategy):
+ name = "grid"
+
+ def __init__(self):
+ self._lower: float = 0
+ self._upper: float = 0
+ self._grid_count: int = 5
+ self._quantity: Decimal = Decimal("0.01")
+ self._grid_levels: list[float] = []
+ self._last_zone: int | None = None
+
+ def configure(self, params: dict) -> None:
+ self._lower = float(params["lower_price"])
+ self._upper = float(params["upper_price"])
+ self._grid_count = params.get("grid_count", 5)
+ self._quantity = Decimal(str(params.get("quantity", 0.01)))
+ step = (self._upper - self._lower) / self._grid_count
+ self._grid_levels = [self._lower + step * i for i in range(self._grid_count + 1)]
+
+ def on_candle(self, candle: Candle) -> Signal | None:
+ price = float(candle.close)
+ current_zone = self._get_zone(price)
+
+ if self._last_zone is None:
+ self._last_zone = current_zone
+ return None
+
+ signal = None
+ if current_zone < self._last_zone:
+ signal = Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.BUY,
+ price=candle.close,
+ quantity=self._quantity,
+ reason=f"Price crossed grid down: zone {self._last_zone}->{current_zone}",
+ )
+ elif current_zone > self._last_zone:
+ signal = Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.SELL,
+ price=candle.close,
+ quantity=self._quantity,
+ reason=f"Price crossed grid up: zone {self._last_zone}->{current_zone}",
+ )
+
+ self._last_zone = current_zone
+ return signal
+
+ def _get_zone(self, price: float) -> int:
+ for i, level in enumerate(self._grid_levels):
+ if price < level:
+ return i
+ return len(self._grid_levels)
+
+ def reset(self) -> None:
+ self._last_zone = None
+```
+
+- [ ] **Step 10: Run tests to verify they pass**
+
+```bash
+pytest services/strategy-engine/tests/test_grid_strategy.py -v
+```
+
+Expected: All PASS
+
+- [ ] **Step 11: Write failing tests for plugin_loader**
+
+Create `services/strategy-engine/tests/test_plugin_loader.py`:
+
+```python
+import pytest
+from pathlib import Path
+
+from strategy_engine.plugin_loader import load_strategies
+
+
+def test_load_strategies_finds_rsi_and_grid():
+ strategies_dir = Path(__file__).parent.parent / "strategies"
+ loaded = load_strategies(strategies_dir)
+
+ names = {s.name for s in loaded}
+ assert "rsi" in names
+ assert "grid" in names
+
+
+def test_load_strategies_skips_base():
+ strategies_dir = Path(__file__).parent.parent / "strategies"
+ loaded = load_strategies(strategies_dir)
+
+ names = {s.name for s in loaded}
+ assert "base" not in names
+```
+
+- [ ] **Step 12: Run tests to verify they fail**
+
+```bash
+pytest services/strategy-engine/tests/test_plugin_loader.py -v
+```
+
+Expected: FAIL
+
+- [ ] **Step 13: Implement plugin_loader**
+
+Create `services/strategy-engine/src/strategy_engine/plugin_loader.py`:
+
+```python
+from __future__ import annotations
+
+import importlib.util
+import logging
+from pathlib import Path
+
+from strategies.base import BaseStrategy
+
+logger = logging.getLogger(__name__)
+
+
+def load_strategies(strategies_dir: Path) -> list[BaseStrategy]:
+ loaded = []
+ for path in strategies_dir.glob("*.py"):
+ if path.stem.startswith("_") or path.stem == "base":
+ continue
+
+ spec = importlib.util.spec_from_file_location(path.stem, path)
+ module = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(module)
+
+ for attr_name in dir(module):
+ attr = getattr(module, attr_name)
+ if (
+ isinstance(attr, type)
+ and issubclass(attr, BaseStrategy)
+ and attr is not BaseStrategy
+ ):
+ instance = attr()
+ loaded.append(instance)
+ logger.info(f"Loaded strategy: {instance.name}")
+
+ return loaded
+```
+
+- [ ] **Step 14: Run tests to verify they pass**
+
+```bash
+pytest services/strategy-engine/tests/test_plugin_loader.py -v
+```
+
+Expected: All PASS
+
+- [ ] **Step 15: Write failing tests for engine**
+
+Create `services/strategy-engine/tests/test_engine.py`:
+
+```python
+import pytest
+from unittest.mock import AsyncMock, MagicMock
+from datetime import datetime, timezone
+from decimal import Decimal
+
+from shared.models import Candle, OrderSide
+from shared.events import CandleEvent
+from strategy_engine.engine import StrategyEngine
+
+
+def make_candle_event() -> dict:
+ candle = Candle(
+ symbol="BTCUSDT",
+ timeframe="1m",
+ open_time=datetime(2026, 1, 1, tzinfo=timezone.utc),
+ open=Decimal("50000"),
+ high=Decimal("50100"),
+ low=Decimal("49900"),
+ close=Decimal("50050"),
+ volume=Decimal("1.0"),
+ )
+ return CandleEvent(data=candle).to_dict()
+
+
+@pytest.mark.asyncio
+async def test_engine_dispatches_candle_to_strategies():
+ mock_strategy = MagicMock()
+ mock_strategy.name = "test"
+ mock_strategy.on_candle.return_value = None
+
+ mock_broker = AsyncMock()
+ mock_broker.read = AsyncMock(return_value=[make_candle_event()])
+
+ engine = StrategyEngine(broker=mock_broker, strategies=[mock_strategy])
+ await engine.process_once(stream="candles.BTCUSDT", last_id="0-0")
+
+ mock_strategy.on_candle.assert_called_once()
+
+
+@pytest.mark.asyncio
+async def test_engine_publishes_signal_when_strategy_returns_one():
+ from shared.models import Signal
+
+ mock_signal = Signal(
+ strategy="test",
+ symbol="BTCUSDT",
+ side=OrderSide.BUY,
+ price=Decimal("50000"),
+ quantity=Decimal("0.01"),
+ reason="test reason",
+ )
+ mock_strategy = MagicMock()
+ mock_strategy.name = "test"
+ mock_strategy.on_candle.return_value = mock_signal
+
+ mock_broker = AsyncMock()
+ mock_broker.read = AsyncMock(return_value=[make_candle_event()])
+ mock_broker.publish = AsyncMock()
+
+ engine = StrategyEngine(broker=mock_broker, strategies=[mock_strategy])
+ await engine.process_once(stream="candles.BTCUSDT", last_id="0-0")
+
+ mock_broker.publish.assert_called_once()
+ call_args = mock_broker.publish.call_args
+ assert call_args[0][0] == "signals"
+```
+
+- [ ] **Step 16: Run tests to verify they fail**
+
+```bash
+pytest services/strategy-engine/tests/test_engine.py -v
+```
+
+Expected: FAIL
+
+- [ ] **Step 17: Implement engine**
+
+Create `services/strategy-engine/src/strategy_engine/engine.py`:
+
+```python
+from __future__ import annotations
+
+import logging
+
+from shared.broker import RedisBroker
+from shared.events import Event, SignalEvent
+from shared.models import Signal
+from strategies.base import BaseStrategy
+
+logger = logging.getLogger(__name__)
+
+
+class StrategyEngine:
+ def __init__(self, broker: RedisBroker, strategies: list[BaseStrategy]):
+ self._broker = broker
+ self._strategies = strategies
+
+ async def process_once(self, stream: str, last_id: str) -> str:
+ messages = await self._broker.read(stream, last_id=last_id, count=10, block=1000)
+
+ for msg in messages:
+ event = Event.from_dict(msg)
+ candle = event.data
+
+ for strategy in self._strategies:
+ signal = strategy.on_candle(candle)
+ if signal is not None:
+ logger.info(f"Signal from {strategy.name}: {signal.side} {signal.symbol}")
+ await self._publish_signal(signal)
+
+ return last_id
+
+ async def _publish_signal(self, signal: Signal):
+ event = SignalEvent(data=signal)
+ await self._broker.publish("signals", event.to_dict())
+```
+
+- [ ] **Step 18: Run tests to verify they pass**
+
+```bash
+pytest services/strategy-engine/tests/test_engine.py -v
+```
+
+Expected: All PASS
+
+- [ ] **Step 19: Implement config and main**
+
+Create `services/strategy-engine/src/strategy_engine/config.py`:
+
+```python
+from shared.config import Settings
+
+
+class StrategyConfig(Settings):
+ symbols: list[str] = ["BTC/USDT"]
+ timeframes: list[str] = ["1m"]
+ strategy_params: dict = {}
+```
+
+Create `services/strategy-engine/src/strategy_engine/main.py`:
+
+```python
+from __future__ import annotations
+
+import asyncio
+import logging
+from pathlib import Path
+
+from shared.broker import RedisBroker
+from strategy_engine.config import StrategyConfig
+from strategy_engine.engine import StrategyEngine
+from strategy_engine.plugin_loader import load_strategies
+
+logger = logging.getLogger(__name__)
+
+
+async def run():
+ config = StrategyConfig()
+ logging.basicConfig(level=config.log_level)
+
+ broker = RedisBroker(config.redis_url)
+ strategies_dir = Path(__file__).parent.parent.parent / "strategies"
+ strategies = load_strategies(strategies_dir)
+
+ for s in strategies:
+ params = config.strategy_params.get(s.name, {})
+ s.configure(params)
+
+ engine = StrategyEngine(broker=broker, strategies=strategies)
+ symbols = [s.replace("/", "") for s in config.symbols]
+
+ logger.info(f"Starting strategy engine: strategies={[s.name for s in strategies]}")
+ last_ids = {sym: "0-0" for sym in symbols}
+ try:
+ while True:
+ for sym in symbols:
+ stream = f"candles.{sym}"
+ last_ids[sym] = await engine.process_once(stream, last_ids[sym])
+ finally:
+ await broker.close()
+
+
+def main():
+ asyncio.run(run())
+
+
+if __name__ == "__main__":
+ main()
+```
+
+- [ ] **Step 20: Create Dockerfile**
+
+Create `services/strategy-engine/Dockerfile`:
+
+```dockerfile
+FROM python:3.12-slim
+
+WORKDIR /app
+
+COPY shared/ shared/
+RUN pip install --no-cache-dir ./shared
+
+COPY services/strategy-engine/ services/strategy-engine/
+RUN pip install --no-cache-dir ./services/strategy-engine
+
+CMD ["python", "-m", "strategy_engine.main"]
+```
+
+- [ ] **Step 21: Commit**
+
+```bash
+git add services/strategy-engine/
+git commit -m "feat(strategy-engine): add plugin-based strategy engine with RSI and grid strategies"
+```
+
+---
+
+## Task 7: Order Executor Service
+
+**Files:**
+- Create: `services/order-executor/pyproject.toml`
+- Create: `services/order-executor/Dockerfile`
+- Create: `services/order-executor/src/order_executor/__init__.py`
+- Create: `services/order-executor/src/order_executor/config.py`
+- Create: `services/order-executor/src/order_executor/risk_manager.py`
+- Create: `services/order-executor/src/order_executor/executor.py`
+- Create: `services/order-executor/src/order_executor/main.py`
+- Create: `services/order-executor/tests/test_risk_manager.py`
+- Create: `services/order-executor/tests/test_executor.py`
+
+- [ ] **Step 1: Create pyproject.toml**
+
+Create `services/order-executor/pyproject.toml`:
+
+```toml
+[project]
+name = "order-executor"
+version = "0.1.0"
+description = "Order execution service with risk management"
+requires-python = ">=3.12"
+dependencies = [
+ "ccxt>=4.0",
+ "trading-shared",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest>=8.0",
+ "pytest-asyncio>=0.23",
+]
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/order_executor"]
+```
+
+- [ ] **Step 2: Write failing tests for risk_manager**
+
+Create `services/order-executor/tests/test_risk_manager.py`:
+
+```python
+import pytest
+from decimal import Decimal
+
+from shared.models import Signal, OrderSide
+from order_executor.risk_manager import RiskManager, RiskCheckResult
+
+
+@pytest.fixture
+def risk_manager():
+ return RiskManager(
+ max_position_size=Decimal("0.1"),
+ stop_loss_pct=Decimal("5"),
+ daily_loss_limit_pct=Decimal("10"),
+ )
+
+
+def make_signal(side=OrderSide.BUY, quantity="0.01", price="50000") -> Signal:
+ return Signal(
+ strategy="test",
+ symbol="BTCUSDT",
+ side=side,
+ price=Decimal(price),
+ quantity=Decimal(quantity),
+ reason="test",
+ )
+
+
+def test_risk_check_passes_normal_order(risk_manager):
+ signal = make_signal()
+ balance = Decimal("10000")
+ positions = {}
+ daily_pnl = Decimal("0")
+
+ result = risk_manager.check(signal, balance, positions, daily_pnl)
+ assert result.allowed is True
+
+
+def test_risk_check_rejects_exceeding_position_size(risk_manager):
+ signal = make_signal(quantity="5") # 5 BTC * 50000 = 250000 > 10% of balance
+ balance = Decimal("10000")
+ positions = {}
+ daily_pnl = Decimal("0")
+
+ result = risk_manager.check(signal, balance, positions, daily_pnl)
+ assert result.allowed is False
+ assert "position size" in result.reason.lower()
+
+
+def test_risk_check_rejects_daily_loss_exceeded(risk_manager):
+ signal = make_signal()
+ balance = Decimal("10000")
+ positions = {}
+ daily_pnl = Decimal("-1100") # -11% > -10% limit
+
+ result = risk_manager.check(signal, balance, positions, daily_pnl)
+ assert result.allowed is False
+ assert "daily loss" in result.reason.lower()
+
+
+def test_risk_check_rejects_insufficient_balance(risk_manager):
+ signal = make_signal(quantity="0.01", price="50000") # cost = 500
+ balance = Decimal("100") # not enough
+ positions = {}
+ daily_pnl = Decimal("0")
+
+ result = risk_manager.check(signal, balance, positions, daily_pnl)
+ assert result.allowed is False
+ assert "balance" in result.reason.lower()
+```
+
+- [ ] **Step 3: Run tests to verify they fail**
+
+```bash
+pip install -e services/order-executor[dev]
+pytest services/order-executor/tests/test_risk_manager.py -v
+```
+
+Expected: FAIL
+
+- [ ] **Step 4: Implement risk_manager**
+
+Create `services/order-executor/src/order_executor/__init__.py`:
+
+```python
+```
+
+Create `services/order-executor/src/order_executor/risk_manager.py`:
+
+```python
+from __future__ import annotations
+
+from dataclasses import dataclass
+from decimal import Decimal
+
+from shared.models import Signal, OrderSide
+
+
+@dataclass
+class RiskCheckResult:
+ allowed: bool
+ reason: str = ""
+
+
+class RiskManager:
+ def __init__(
+ self,
+ max_position_size: Decimal,
+ stop_loss_pct: Decimal,
+ daily_loss_limit_pct: Decimal,
+ ):
+ self._max_position_size = max_position_size
+ self._stop_loss_pct = stop_loss_pct
+ self._daily_loss_limit_pct = daily_loss_limit_pct
+
+ def check(
+ self,
+ signal: Signal,
+ balance: Decimal,
+ positions: dict[str, Decimal],
+ daily_pnl: Decimal,
+ ) -> RiskCheckResult:
+ # Check daily loss limit
+ daily_loss_pct = (daily_pnl / balance) * 100 if balance > 0 else Decimal("0")
+ if daily_loss_pct < -self._daily_loss_limit_pct:
+ return RiskCheckResult(
+ allowed=False,
+ reason=f"Daily loss limit exceeded: {daily_loss_pct:.1f}%",
+ )
+
+ if signal.side == OrderSide.BUY:
+ order_cost = signal.price * signal.quantity
+
+ # Check sufficient balance
+ if order_cost > balance:
+ return RiskCheckResult(
+ allowed=False,
+ reason=f"Insufficient balance: need {order_cost}, have {balance}",
+ )
+
+ # Check max position size
+ current_position_value = positions.get(signal.symbol, Decimal("0")) * signal.price
+ new_position_value = current_position_value + order_cost
+ position_ratio = new_position_value / balance if balance > 0 else Decimal("999")
+ if position_ratio > self._max_position_size:
+ return RiskCheckResult(
+ allowed=False,
+ reason=f"Position size exceeded: {position_ratio:.2f} > {self._max_position_size}",
+ )
+
+ return RiskCheckResult(allowed=True)
+```
+
+- [ ] **Step 5: Run tests to verify they pass**
+
+```bash
+pytest services/order-executor/tests/test_risk_manager.py -v
+```
+
+Expected: All PASS
+
+- [ ] **Step 6: Write failing tests for executor**
+
+Create `services/order-executor/tests/test_executor.py`:
+
+```python
+import pytest
+from unittest.mock import AsyncMock, MagicMock
+from decimal import Decimal
+
+from shared.models import Signal, OrderSide, OrderStatus
+from order_executor.executor import OrderExecutor
+from order_executor.risk_manager import RiskCheckResult
+
+
+def make_signal() -> Signal:
+ return Signal(
+ strategy="test",
+ symbol="BTCUSDT",
+ side=OrderSide.BUY,
+ price=Decimal("50000"),
+ quantity=Decimal("0.01"),
+ reason="test",
+ )
+
+
+@pytest.mark.asyncio
+async def test_executor_places_order_when_risk_passes():
+ mock_exchange = MagicMock()
+ mock_exchange.create_order = AsyncMock(return_value={
+ "id": "binance_123",
+ "status": "closed",
+ "filled": 0.01,
+ "price": 50000,
+ })
+ mock_exchange.fetch_balance = AsyncMock(return_value={
+ "USDT": {"free": 10000},
+ })
+
+ mock_risk = MagicMock()
+ mock_risk.check.return_value = RiskCheckResult(allowed=True)
+
+ mock_broker = AsyncMock()
+ mock_db = AsyncMock()
+
+ executor = OrderExecutor(
+ exchange=mock_exchange,
+ risk_manager=mock_risk,
+ broker=mock_broker,
+ db=mock_db,
+ dry_run=False,
+ )
+
+ signal = make_signal()
+ order = await executor.execute(signal)
+
+ assert order is not None
+ assert order.status == OrderStatus.FILLED
+ mock_exchange.create_order.assert_called_once()
+
+
+@pytest.mark.asyncio
+async def test_executor_rejects_when_risk_fails():
+ mock_exchange = MagicMock()
+ mock_exchange.fetch_balance = AsyncMock(return_value={
+ "USDT": {"free": 10000},
+ })
+
+ mock_risk = MagicMock()
+ mock_risk.check.return_value = RiskCheckResult(allowed=False, reason="too risky")
+
+ mock_broker = AsyncMock()
+ mock_db = AsyncMock()
+
+ executor = OrderExecutor(
+ exchange=mock_exchange,
+ risk_manager=mock_risk,
+ broker=mock_broker,
+ db=mock_db,
+ dry_run=False,
+ )
+
+ signal = make_signal()
+ order = await executor.execute(signal)
+ assert order is None
+ mock_exchange.create_order.assert_not_called()
+
+
+@pytest.mark.asyncio
+async def test_executor_dry_run_does_not_call_exchange():
+ mock_exchange = MagicMock()
+ mock_exchange.fetch_balance = AsyncMock(return_value={
+ "USDT": {"free": 10000},
+ })
+
+ mock_risk = MagicMock()
+ mock_risk.check.return_value = RiskCheckResult(allowed=True)
+
+ mock_broker = AsyncMock()
+ mock_db = AsyncMock()
+
+ executor = OrderExecutor(
+ exchange=mock_exchange,
+ risk_manager=mock_risk,
+ broker=mock_broker,
+ db=mock_db,
+ dry_run=True,
+ )
+
+ signal = make_signal()
+ order = await executor.execute(signal)
+
+ assert order is not None
+ assert order.status == OrderStatus.FILLED
+ mock_exchange.create_order.assert_not_called()
+```
+
+- [ ] **Step 7: Run tests to verify they fail**
+
+```bash
+pytest services/order-executor/tests/test_executor.py -v
+```
+
+Expected: FAIL
+
+- [ ] **Step 8: Implement executor**
+
+Create `services/order-executor/src/order_executor/executor.py`:
+
+```python
+from __future__ import annotations
+
+import logging
+from datetime import datetime, timezone
+from decimal import Decimal
+
+from shared.broker import RedisBroker
+from shared.db import Database
+from shared.events import OrderEvent
+from shared.models import Order, OrderSide, OrderStatus, OrderType, Signal
+from order_executor.risk_manager import RiskManager
+
+logger = logging.getLogger(__name__)
+
+
+class OrderExecutor:
+ def __init__(
+ self,
+ exchange,
+ risk_manager: RiskManager,
+ broker: RedisBroker,
+ db: Database,
+ dry_run: bool = True,
+ ):
+ self._exchange = exchange
+ self._risk = risk_manager
+ self._broker = broker
+ self._db = db
+ self._dry_run = dry_run
+
+ async def execute(self, signal: Signal) -> Order | None:
+ balance_info = await self._exchange.fetch_balance()
+ balance = Decimal(str(balance_info.get("USDT", {}).get("free", 0)))
+ positions: dict[str, Decimal] = {}
+ daily_pnl = Decimal("0")
+
+ result = self._risk.check(signal, balance, positions, daily_pnl)
+ if not result.allowed:
+ logger.warning(f"Risk check failed: {result.reason}")
+ return None
+
+ order = Order(
+ signal_id=signal.id,
+ symbol=signal.symbol,
+ side=signal.side,
+ type=OrderType.MARKET,
+ price=signal.price,
+ quantity=signal.quantity,
+ )
+
+ if self._dry_run:
+ logger.info(f"[DRY RUN] Would execute: {order.side} {order.quantity} {order.symbol}")
+ order.status = OrderStatus.FILLED
+ order.filled_at = datetime.now(timezone.utc)
+ else:
+ try:
+ result = await self._exchange.create_order(
+ symbol=signal.symbol.replace("USDT", "/USDT"),
+ type="market",
+ side=signal.side.value.lower(),
+ amount=float(signal.quantity),
+ )
+ order.status = OrderStatus.FILLED
+ order.filled_at = datetime.now(timezone.utc)
+ logger.info(f"Order filled: {order.id}")
+ except Exception as e:
+ order.status = OrderStatus.FAILED
+ logger.error(f"Order failed: {e}")
+
+ await self._db.insert_order(order)
+ event = OrderEvent(data=order)
+ await self._broker.publish("orders", event.to_dict())
+
+ return order
+```
+
+- [ ] **Step 9: Run tests to verify they pass**
+
+```bash
+pytest services/order-executor/tests/test_executor.py -v
+```
+
+Expected: All PASS
+
+- [ ] **Step 10: Implement config and main**
+
+Create `services/order-executor/src/order_executor/config.py`:
+
+```python
+from shared.config import Settings
+
+
+class ExecutorConfig(Settings):
+ pass
+```
+
+Create `services/order-executor/src/order_executor/main.py`:
+
+```python
+from __future__ import annotations
+
+import asyncio
+import logging
+
+import ccxt.async_support as ccxt
+
+from shared.broker import RedisBroker
+from shared.db import Database
+from shared.events import Event
+from order_executor.config import ExecutorConfig
+from order_executor.executor import OrderExecutor
+from order_executor.risk_manager import RiskManager
+
+logger = logging.getLogger(__name__)
+
+
+async def run():
+ config = ExecutorConfig()
+ logging.basicConfig(level=config.log_level)
+
+ db = Database(config.database_url)
+ await db.connect()
+
+ broker = RedisBroker(config.redis_url)
+
+ exchange = ccxt.binance({
+ "apiKey": config.binance_api_key,
+ "secret": config.binance_api_secret,
+ })
+
+ risk_manager = RiskManager(
+ max_position_size=config.risk_max_position_size,
+ stop_loss_pct=config.risk_stop_loss_pct,
+ daily_loss_limit_pct=config.risk_daily_loss_limit_pct,
+ )
+
+ executor = OrderExecutor(
+ exchange=exchange,
+ risk_manager=risk_manager,
+ broker=broker,
+ db=db,
+ dry_run=config.dry_run,
+ )
+
+ logger.info(f"Starting order executor (dry_run={config.dry_run})")
+ last_id = "0-0"
+ try:
+ while True:
+ messages = await broker.read("signals", last_id=last_id, count=10, block=1000)
+ for msg in messages:
+ event = Event.from_dict(msg)
+ await executor.execute(event.data)
+ finally:
+ await exchange.close()
+ await broker.close()
+ await db.close()
+
+
+def main():
+ asyncio.run(run())
+
+
+if __name__ == "__main__":
+ main()
+```
+
+- [ ] **Step 11: Create Dockerfile**
+
+Create `services/order-executor/Dockerfile`:
+
+```dockerfile
+FROM python:3.12-slim
+
+WORKDIR /app
+
+COPY shared/ shared/
+RUN pip install --no-cache-dir ./shared
+
+COPY services/order-executor/ services/order-executor/
+RUN pip install --no-cache-dir ./services/order-executor
+
+CMD ["python", "-m", "order_executor.main"]
+```
+
+- [ ] **Step 12: Commit**
+
+```bash
+git add services/order-executor/
+git commit -m "feat(order-executor): add order execution with risk management and dry-run mode"
+```
+
+---
+
+## Task 8: Portfolio Manager Service
+
+**Files:**
+- Create: `services/portfolio-manager/pyproject.toml`
+- Create: `services/portfolio-manager/Dockerfile`
+- Create: `services/portfolio-manager/src/portfolio_manager/__init__.py`
+- Create: `services/portfolio-manager/src/portfolio_manager/config.py`
+- Create: `services/portfolio-manager/src/portfolio_manager/portfolio.py`
+- Create: `services/portfolio-manager/src/portfolio_manager/pnl.py`
+- Create: `services/portfolio-manager/src/portfolio_manager/main.py`
+- Create: `services/portfolio-manager/tests/test_portfolio.py`
+- Create: `services/portfolio-manager/tests/test_pnl.py`
+
+- [ ] **Step 1: Create pyproject.toml**
+
+Create `services/portfolio-manager/pyproject.toml`:
+
+```toml
+[project]
+name = "portfolio-manager"
+version = "0.1.0"
+description = "Portfolio tracking and PnL calculation service"
+requires-python = ">=3.12"
+dependencies = [
+ "trading-shared",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest>=8.0",
+ "pytest-asyncio>=0.23",
+]
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/portfolio_manager"]
+```
+
+- [ ] **Step 2: Write failing tests for pnl**
+
+Create `services/portfolio-manager/tests/test_pnl.py`:
+
+```python
+from decimal import Decimal
+
+from portfolio_manager.pnl import calculate_unrealized_pnl, calculate_realized_pnl
+
+
+def test_unrealized_pnl_profit():
+ result = calculate_unrealized_pnl(
+ quantity=Decimal("0.1"),
+ avg_entry_price=Decimal("50000"),
+ current_price=Decimal("55000"),
+ )
+ assert result == Decimal("500") # 0.1 * (55000 - 50000)
+
+
+def test_unrealized_pnl_loss():
+ result = calculate_unrealized_pnl(
+ quantity=Decimal("0.1"),
+ avg_entry_price=Decimal("50000"),
+ current_price=Decimal("45000"),
+ )
+ assert result == Decimal("-500")
+
+
+def test_realized_pnl_single_trade():
+ result = calculate_realized_pnl(
+ buy_price=Decimal("50000"),
+ sell_price=Decimal("55000"),
+ quantity=Decimal("0.1"),
+ fee=Decimal("5.5"),
+ )
+ assert result == Decimal("494.5") # 0.1 * (55000 - 50000) - 5.5
+```
+
+- [ ] **Step 3: Run tests to verify they fail**
+
+```bash
+pip install -e services/portfolio-manager[dev]
+pytest services/portfolio-manager/tests/test_pnl.py -v
+```
+
+Expected: FAIL
+
+- [ ] **Step 4: Implement pnl**
+
+Create `services/portfolio-manager/src/portfolio_manager/__init__.py`:
+
+```python
+```
+
+Create `services/portfolio-manager/src/portfolio_manager/pnl.py`:
+
+```python
+from decimal import Decimal
+
+
+def calculate_unrealized_pnl(
+ quantity: Decimal,
+ avg_entry_price: Decimal,
+ current_price: Decimal,
+) -> Decimal:
+ return quantity * (current_price - avg_entry_price)
+
+
+def calculate_realized_pnl(
+ buy_price: Decimal,
+ sell_price: Decimal,
+ quantity: Decimal,
+ fee: Decimal = Decimal("0"),
+) -> Decimal:
+ return quantity * (sell_price - buy_price) - fee
+```
+
+- [ ] **Step 5: Run tests to verify they pass**
+
+```bash
+pytest services/portfolio-manager/tests/test_pnl.py -v
+```
+
+Expected: All PASS
+
+- [ ] **Step 6: Write failing tests for portfolio**
+
+Create `services/portfolio-manager/tests/test_portfolio.py`:
+
+```python
+import pytest
+from decimal import Decimal
+
+from shared.models import Order, OrderSide, OrderType, OrderStatus
+from portfolio_manager.portfolio import PortfolioTracker
+
+
+@pytest.fixture
+def tracker():
+ return PortfolioTracker()
+
+
+def make_order(side=OrderSide.BUY, price="50000", quantity="0.1") -> Order:
+ return Order(
+ signal_id="sig_1",
+ symbol="BTCUSDT",
+ side=side,
+ type=OrderType.MARKET,
+ price=Decimal(price),
+ quantity=Decimal(quantity),
+ status=OrderStatus.FILLED,
+ )
+
+
+def test_portfolio_add_buy_order(tracker):
+ order = make_order(side=OrderSide.BUY)
+ tracker.apply_order(order)
+
+ pos = tracker.get_position("BTCUSDT")
+ assert pos.quantity == Decimal("0.1")
+ assert pos.avg_entry_price == Decimal("50000")
+
+
+def test_portfolio_add_multiple_buys(tracker):
+ tracker.apply_order(make_order(price="50000", quantity="0.1"))
+ tracker.apply_order(make_order(price="52000", quantity="0.1"))
+
+ pos = tracker.get_position("BTCUSDT")
+ assert pos.quantity == Decimal("0.2")
+ assert pos.avg_entry_price == Decimal("51000") # weighted avg
+
+
+def test_portfolio_sell_reduces_position(tracker):
+ tracker.apply_order(make_order(side=OrderSide.BUY, price="50000", quantity="0.2"))
+ tracker.apply_order(make_order(side=OrderSide.SELL, price="55000", quantity="0.1"))
+
+ pos = tracker.get_position("BTCUSDT")
+ assert pos.quantity == Decimal("0.1")
+ assert pos.avg_entry_price == Decimal("50000") # entry price unchanged
+
+
+def test_portfolio_no_position_returns_none(tracker):
+ pos = tracker.get_position("ETHUSDT")
+ assert pos is None
+```
+
+- [ ] **Step 7: Run tests to verify they fail**
+
+```bash
+pytest services/portfolio-manager/tests/test_portfolio.py -v
+```
+
+Expected: FAIL
+
+- [ ] **Step 8: Implement portfolio**
+
+Create `services/portfolio-manager/src/portfolio_manager/portfolio.py`:
+
+```python
+from __future__ import annotations
+
+from decimal import Decimal
+
+from shared.models import Order, OrderSide, Position
+
+
+class PortfolioTracker:
+ def __init__(self):
+ self._positions: dict[str, _PositionState] = {}
+
+ def apply_order(self, order: Order) -> None:
+ if order.symbol not in self._positions:
+ self._positions[order.symbol] = _PositionState()
+
+ state = self._positions[order.symbol]
+ if order.side == OrderSide.BUY:
+ total_cost = state.avg_entry * state.quantity + order.price * order.quantity
+ state.quantity += order.quantity
+ state.avg_entry = total_cost / state.quantity if state.quantity > 0 else Decimal("0")
+ elif order.side == OrderSide.SELL:
+ state.quantity -= order.quantity
+ if state.quantity <= 0:
+ state.quantity = Decimal("0")
+ state.avg_entry = Decimal("0")
+
+ def get_position(self, symbol: str) -> Position | None:
+ state = self._positions.get(symbol)
+ if state is None or state.quantity == 0:
+ return None
+ return Position(
+ symbol=symbol,
+ quantity=state.quantity,
+ avg_entry_price=state.avg_entry,
+ current_price=Decimal("0"),
+ )
+
+ def get_all_positions(self) -> list[Position]:
+ positions = []
+ for symbol in self._positions:
+ pos = self.get_position(symbol)
+ if pos is not None:
+ positions.append(pos)
+ return positions
+
+
+class _PositionState:
+ def __init__(self):
+ self.quantity = Decimal("0")
+ self.avg_entry = Decimal("0")
+```
+
+- [ ] **Step 9: Run tests to verify they pass**
+
+```bash
+pytest services/portfolio-manager/tests/test_portfolio.py -v
+```
+
+Expected: All PASS
+
+- [ ] **Step 10: Implement config and main**
+
+Create `services/portfolio-manager/src/portfolio_manager/config.py`:
+
+```python
+from shared.config import Settings
+
+
+class PortfolioConfig(Settings):
+ snapshot_interval_hours: int = 24
+```
+
+Create `services/portfolio-manager/src/portfolio_manager/main.py`:
+
+```python
+from __future__ import annotations
+
+import asyncio
+import logging
+
+from shared.broker import RedisBroker
+from shared.db import Database
+from shared.events import Event
+from portfolio_manager.config import PortfolioConfig
+from portfolio_manager.portfolio import PortfolioTracker
+
+logger = logging.getLogger(__name__)
+
+
+async def run():
+ config = PortfolioConfig()
+ logging.basicConfig(level=config.log_level)
+
+ db = Database(config.database_url)
+ await db.connect()
+
+ broker = RedisBroker(config.redis_url)
+ tracker = PortfolioTracker()
+
+ logger.info("Starting portfolio manager")
+ last_id = "0-0"
+ try:
+ while True:
+ messages = await broker.read("orders", last_id=last_id, count=10, block=1000)
+ for msg in messages:
+ event = Event.from_dict(msg)
+ order = event.data
+ tracker.apply_order(order)
+ logger.info(f"Position updated: {order.symbol}")
+ finally:
+ await broker.close()
+ await db.close()
+
+
+def main():
+ asyncio.run(run())
+
+
+if __name__ == "__main__":
+ main()
+```
+
+- [ ] **Step 11: Create Dockerfile**
+
+Create `services/portfolio-manager/Dockerfile`:
+
+```dockerfile
+FROM python:3.12-slim
+
+WORKDIR /app
+
+COPY shared/ shared/
+RUN pip install --no-cache-dir ./shared
+
+COPY services/portfolio-manager/ services/portfolio-manager/
+RUN pip install --no-cache-dir ./services/portfolio-manager
+
+CMD ["python", "-m", "portfolio_manager.main"]
+```
+
+- [ ] **Step 12: Commit**
+
+```bash
+git add services/portfolio-manager/
+git commit -m "feat(portfolio-manager): add portfolio tracking and PnL calculation"
+```
+
+---
+
+## Task 9: Backtester Service
+
+**Files:**
+- Create: `services/backtester/pyproject.toml`
+- Create: `services/backtester/Dockerfile`
+- Create: `services/backtester/src/backtester/__init__.py`
+- Create: `services/backtester/src/backtester/config.py`
+- Create: `services/backtester/src/backtester/simulator.py`
+- Create: `services/backtester/src/backtester/engine.py`
+- Create: `services/backtester/src/backtester/reporter.py`
+- Create: `services/backtester/src/backtester/main.py`
+- Create: `services/backtester/tests/test_simulator.py`
+- Create: `services/backtester/tests/test_engine.py`
+- Create: `services/backtester/tests/test_reporter.py`
+
+- [ ] **Step 1: Create pyproject.toml**
+
+Create `services/backtester/pyproject.toml`:
+
+```toml
+[project]
+name = "backtester"
+version = "0.1.0"
+description = "Strategy backtesting engine"
+requires-python = ">=3.12"
+dependencies = [
+ "pandas>=2.0",
+ "trading-shared",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest>=8.0",
+ "pytest-asyncio>=0.23",
+]
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/backtester"]
+```
+
+- [ ] **Step 2: Write failing tests for simulator**
+
+Create `services/backtester/tests/test_simulator.py`:
+
+```python
+from decimal import Decimal
+
+from shared.models import Signal, OrderSide
+from backtester.simulator import OrderSimulator
+
+
+def make_signal(side=OrderSide.BUY, price="50000", quantity="0.1") -> Signal:
+ return Signal(
+ strategy="test",
+ symbol="BTCUSDT",
+ side=side,
+ price=Decimal(price),
+ quantity=Decimal(quantity),
+ reason="test",
+ )
+
+
+def test_simulator_initial_balance():
+ sim = OrderSimulator(initial_balance=Decimal("10000"))
+ assert sim.balance == Decimal("10000")
+
+
+def test_simulator_buy_reduces_balance():
+ sim = OrderSimulator(initial_balance=Decimal("10000"))
+ sim.execute(make_signal(side=OrderSide.BUY, price="50000", quantity="0.1"))
+
+ assert sim.balance == Decimal("5000") # 10000 - 0.1*50000
+ assert sim.positions["BTCUSDT"] == Decimal("0.1")
+
+
+def test_simulator_sell_increases_balance():
+ sim = OrderSimulator(initial_balance=Decimal("10000"))
+ sim.execute(make_signal(side=OrderSide.BUY, price="50000", quantity="0.1"))
+ sim.execute(make_signal(side=OrderSide.SELL, price="55000", quantity="0.1"))
+
+ assert sim.balance == Decimal("10500") # 5000 + 0.1*55000
+ assert sim.positions.get("BTCUSDT", Decimal("0")) == Decimal("0")
+
+
+def test_simulator_reject_buy_insufficient_balance():
+ sim = OrderSimulator(initial_balance=Decimal("100"))
+ result = sim.execute(make_signal(side=OrderSide.BUY, price="50000", quantity="0.1"))
+ assert result is False
+ assert sim.balance == Decimal("100")
+
+
+def test_simulator_trade_history():
+ sim = OrderSimulator(initial_balance=Decimal("10000"))
+ sim.execute(make_signal(side=OrderSide.BUY))
+ assert len(sim.trades) == 1
+```
+
+- [ ] **Step 3: Run tests to verify they fail**
+
+```bash
+pip install -e services/backtester[dev]
+pytest services/backtester/tests/test_simulator.py -v
+```
+
+Expected: FAIL
+
+- [ ] **Step 4: Implement simulator**
+
+Create `services/backtester/src/backtester/__init__.py`:
+
+```python
+```
+
+Create `services/backtester/src/backtester/simulator.py`:
+
+```python
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+from decimal import Decimal
+
+from shared.models import Signal, OrderSide
+
+
+@dataclass
+class SimulatedTrade:
+ symbol: str
+ side: OrderSide
+ price: Decimal
+ quantity: Decimal
+ balance_after: Decimal
+
+
+class OrderSimulator:
+ def __init__(self, initial_balance: Decimal):
+ self.balance = initial_balance
+ self.positions: dict[str, Decimal] = {}
+ self.trades: list[SimulatedTrade] = []
+
+ def execute(self, signal: Signal) -> bool:
+ if signal.side == OrderSide.BUY:
+ cost = signal.price * signal.quantity
+ if cost > self.balance:
+ return False
+ self.balance -= cost
+ current = self.positions.get(signal.symbol, Decimal("0"))
+ self.positions[signal.symbol] = current + signal.quantity
+ elif signal.side == OrderSide.SELL:
+ current = self.positions.get(signal.symbol, Decimal("0"))
+ sell_qty = min(signal.quantity, current)
+ if sell_qty <= 0:
+ return False
+ self.balance += signal.price * sell_qty
+ self.positions[signal.symbol] = current - sell_qty
+
+ self.trades.append(
+ SimulatedTrade(
+ symbol=signal.symbol,
+ side=signal.side,
+ price=signal.price,
+ quantity=signal.quantity,
+ balance_after=self.balance,
+ )
+ )
+ return True
+```
+
+- [ ] **Step 5: Run tests to verify they pass**
+
+```bash
+pytest services/backtester/tests/test_simulator.py -v
+```
+
+Expected: All PASS
+
+- [ ] **Step 6: Write failing tests for backtest engine**
+
+Create `services/backtester/tests/test_engine.py`:
+
+```python
+import pytest
+from decimal import Decimal
+from datetime import datetime, timezone
+from unittest.mock import MagicMock
+
+from shared.models import Candle, Signal, OrderSide
+from backtester.engine import BacktestEngine
+
+
+def make_candles(prices: list[float]) -> list[Candle]:
+ return [
+ Candle(
+ symbol="BTCUSDT",
+ timeframe="1m",
+ open_time=datetime(2026, 1, 1, minute=i, tzinfo=timezone.utc),
+ open=Decimal(str(p)),
+ high=Decimal(str(p + 10)),
+ low=Decimal(str(p - 10)),
+ close=Decimal(str(p)),
+ volume=Decimal("1.0"),
+ )
+ for i, p in enumerate(prices)
+ ]
+
+
+def test_backtest_engine_runs_strategy_over_candles():
+ mock_strategy = MagicMock()
+ mock_strategy.name = "test"
+ mock_strategy.on_candle.return_value = None
+
+ candles = make_candles([50000, 50100, 50200])
+
+ engine = BacktestEngine(
+ strategy=mock_strategy,
+ initial_balance=Decimal("10000"),
+ )
+ result = engine.run(candles)
+
+ assert mock_strategy.on_candle.call_count == 3
+ assert result.total_trades == 0
+ assert result.final_balance == Decimal("10000")
+
+
+def test_backtest_engine_executes_signals():
+ buy_signal = Signal(
+ strategy="test",
+ symbol="BTCUSDT",
+ side=OrderSide.BUY,
+ price=Decimal("50000"),
+ quantity=Decimal("0.1"),
+ reason="test buy",
+ )
+ sell_signal = Signal(
+ strategy="test",
+ symbol="BTCUSDT",
+ side=OrderSide.SELL,
+ price=Decimal("55000"),
+ quantity=Decimal("0.1"),
+ reason="test sell",
+ )
+
+ mock_strategy = MagicMock()
+ mock_strategy.name = "test"
+ mock_strategy.on_candle.side_effect = [buy_signal, None, sell_signal]
+
+ candles = make_candles([50000, 52000, 55000])
+
+ engine = BacktestEngine(
+ strategy=mock_strategy,
+ initial_balance=Decimal("10000"),
+ )
+ result = engine.run(candles)
+
+ assert result.total_trades == 2
+ assert result.final_balance == Decimal("10500") # 10000 - 5000 + 5500
+```
+
+- [ ] **Step 7: Run tests to verify they fail**
+
+```bash
+pytest services/backtester/tests/test_engine.py -v
+```
+
+Expected: FAIL
+
+- [ ] **Step 8: Implement backtest engine**
+
+Create `services/backtester/src/backtester/engine.py`:
+
+```python
+from __future__ import annotations
+
+from dataclasses import dataclass
+from decimal import Decimal
+
+from shared.models import Candle
+from backtester.simulator import OrderSimulator
+from strategies.base import BaseStrategy
+
+
+@dataclass
+class BacktestResult:
+ strategy_name: str
+ symbol: str
+ total_trades: int
+ initial_balance: Decimal
+ final_balance: Decimal
+ profit: Decimal
+ profit_pct: Decimal
+ trades: list
+
+ @property
+ def win_rate(self) -> Decimal:
+ if self.total_trades == 0:
+ return Decimal("0")
+ wins = sum(
+ 1
+ for i in range(0, len(self.trades) - 1, 2)
+ if i + 1 < len(self.trades)
+ and self.trades[i + 1].balance_after > self.trades[i].balance_after
+ )
+ pairs = self.total_trades // 2
+ return Decimal(str(wins / pairs * 100)) if pairs > 0 else Decimal("0")
+
+
+class BacktestEngine:
+ def __init__(self, strategy: BaseStrategy, initial_balance: Decimal):
+ self._strategy = strategy
+ self._initial_balance = initial_balance
+
+ def run(self, candles: list[Candle]) -> BacktestResult:
+ simulator = OrderSimulator(self._initial_balance)
+ symbol = candles[0].symbol if candles else ""
+
+ for candle in candles:
+ signal = self._strategy.on_candle(candle)
+ if signal is not None:
+ simulator.execute(signal)
+
+ final = simulator.balance
+ # Add value of remaining positions at last candle price
+ if candles:
+ last_price = candles[-1].close
+ for sym, qty in simulator.positions.items():
+ final += qty * last_price
+
+ profit = final - self._initial_balance
+ profit_pct = (profit / self._initial_balance) * 100 if self._initial_balance > 0 else Decimal("0")
+
+ return BacktestResult(
+ strategy_name=self._strategy.name,
+ symbol=symbol,
+ total_trades=len(simulator.trades),
+ initial_balance=self._initial_balance,
+ final_balance=final,
+ profit=profit,
+ profit_pct=profit_pct,
+ trades=simulator.trades,
+ )
+```
+
+- [ ] **Step 9: Run tests to verify they pass**
+
+```bash
+pytest services/backtester/tests/test_engine.py -v
+```
+
+Expected: All PASS
+
+- [ ] **Step 10: Write failing tests for reporter**
+
+Create `services/backtester/tests/test_reporter.py`:
+
+```python
+from decimal import Decimal
+
+from backtester.engine import BacktestResult
+from backtester.reporter import format_report
+
+
+def test_format_report_contains_key_metrics():
+ result = BacktestResult(
+ strategy_name="rsi",
+ symbol="BTCUSDT",
+ total_trades=10,
+ initial_balance=Decimal("10000"),
+ final_balance=Decimal("11500"),
+ profit=Decimal("1500"),
+ profit_pct=Decimal("15"),
+ trades=[],
+ )
+
+ report = format_report(result)
+
+ assert "rsi" in report
+ assert "BTCUSDT" in report
+ assert "10000" in report
+ assert "11500" in report
+ assert "1500" in report
+ assert "15" in report
+```
+
+- [ ] **Step 11: Run test to verify it fails**
+
+```bash
+pytest services/backtester/tests/test_reporter.py -v
+```
+
+Expected: FAIL
+
+- [ ] **Step 12: Implement reporter**
+
+Create `services/backtester/src/backtester/reporter.py`:
+
+```python
+from backtester.engine import BacktestResult
+
+
+def format_report(result: BacktestResult) -> str:
+ lines = [
+ "=" * 50,
+ f" Backtest Report: {result.strategy_name}",
+ "=" * 50,
+ f" Symbol: {result.symbol}",
+ f" Total Trades: {result.total_trades}",
+ f" Initial Balance: {result.initial_balance}",
+ f" Final Balance: {result.final_balance}",
+ f" Profit: {result.profit}",
+ f" Profit %: {result.profit_pct:.2f}%",
+ f" Win Rate: {result.win_rate:.1f}%",
+ "=" * 50,
+ ]
+ return "\n".join(lines)
+```
+
+- [ ] **Step 13: Run test to verify it passes**
+
+```bash
+pytest services/backtester/tests/test_reporter.py -v
+```
+
+Expected: PASS
+
+- [ ] **Step 14: Implement config and main**
+
+Create `services/backtester/src/backtester/config.py`:
+
+```python
+from shared.config import Settings
+
+
+class BacktestConfig(Settings):
+ backtest_initial_balance: float = 10000.0
+```
+
+Create `services/backtester/src/backtester/main.py`:
+
+```python
+from __future__ import annotations
+
+import asyncio
+import logging
+from decimal import Decimal
+from pathlib import Path
+
+from shared.db import Database
+from backtester.config import BacktestConfig
+from backtester.engine import BacktestEngine
+from backtester.reporter import format_report
+
+logger = logging.getLogger(__name__)
+
+
+async def run_backtest(
+ strategy_name: str,
+ symbol: str,
+ timeframe: str,
+ initial_balance: Decimal,
+ db: Database,
+ strategies_dir: Path,
+) -> str:
+ from strategy_engine.plugin_loader import load_strategies
+
+ strategies = load_strategies(strategies_dir)
+ strategy = next((s for s in strategies if s.name == strategy_name), None)
+ if strategy is None:
+ return f"Strategy '{strategy_name}' not found"
+
+ candles_data = await db.get_candles(symbol, timeframe)
+ if not candles_data:
+ return f"No candle data for {symbol} {timeframe}"
+
+ from shared.models import Candle
+
+ candles = [Candle(**row) for row in reversed(candles_data)]
+
+ engine = BacktestEngine(strategy=strategy, initial_balance=initial_balance)
+ result = engine.run(candles)
+ return format_report(result)
+```
+
+- [ ] **Step 15: Create Dockerfile**
+
+Create `services/backtester/Dockerfile`:
+
+```dockerfile
+FROM python:3.12-slim
+
+WORKDIR /app
+
+COPY shared/ shared/
+RUN pip install --no-cache-dir ./shared
+
+COPY services/strategy-engine/strategies/ services/strategy-engine/strategies/
+COPY services/backtester/ services/backtester/
+RUN pip install --no-cache-dir ./services/backtester
+
+CMD ["python", "-m", "backtester.main"]
+```
+
+- [ ] **Step 16: Commit**
+
+```bash
+git add services/backtester/
+git commit -m "feat(backtester): add backtesting engine with simulator and reporting"
+```
+
+---
+
+## Task 10: CLI
+
+**Files:**
+- Create: `cli/pyproject.toml`
+- Create: `cli/src/trading_cli/__init__.py`
+- Create: `cli/src/trading_cli/main.py`
+- Create: `cli/src/trading_cli/commands/data.py`
+- Create: `cli/src/trading_cli/commands/trade.py`
+- Create: `cli/src/trading_cli/commands/backtest.py`
+- Create: `cli/src/trading_cli/commands/portfolio.py`
+- Create: `cli/src/trading_cli/commands/strategy.py`
+- Create: `cli/src/trading_cli/commands/service.py`
+- Create: `cli/tests/test_cli_data.py`
+
+- [ ] **Step 1: Create pyproject.toml**
+
+Create `cli/pyproject.toml`:
+
+```toml
+[project]
+name = "trading-cli"
+version = "0.1.0"
+description = "CLI interface for the trading platform"
+requires-python = ">=3.12"
+dependencies = [
+ "click>=8.0",
+ "rich>=13.0",
+ "trading-shared",
+]
+
+[project.scripts]
+trading = "trading_cli.main:cli"
+
+[project.optional-dependencies]
+dev = [
+ "pytest>=8.0",
+ "pytest-asyncio>=0.23",
+]
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/trading_cli"]
+```
+
+- [ ] **Step 2: Write failing tests for CLI data commands**
+
+Create `cli/tests/test_cli_data.py`:
+
+```python
+from click.testing import CliRunner
+from trading_cli.main import cli
+
+
+def test_cli_help():
+ runner = CliRunner()
+ result = runner.invoke(cli, ["--help"])
+ assert result.exit_code == 0
+ assert "trading" in result.output.lower() or "Usage" in result.output
+
+
+def test_cli_data_group():
+ runner = CliRunner()
+ result = runner.invoke(cli, ["data", "--help"])
+ assert result.exit_code == 0
+ assert "collect" in result.output
+ assert "history" in result.output
+```
+
+- [ ] **Step 3: Run tests to verify they fail**
+
+```bash
+pip install -e cli[dev]
+pytest cli/tests/test_cli_data.py -v
+```
+
+Expected: FAIL
+
+- [ ] **Step 4: Implement CLI main and data commands**
+
+Create `cli/src/trading_cli/__init__.py`:
+
+```python
+```
+
+Create `cli/src/trading_cli/main.py`:
+
+```python
+import click
+
+from trading_cli.commands.data import data
+from trading_cli.commands.trade import trade
+from trading_cli.commands.backtest import backtest
+from trading_cli.commands.portfolio import portfolio
+from trading_cli.commands.strategy import strategy
+from trading_cli.commands.service import service
+
+
+@click.group()
+@click.version_option(version="0.1.0")
+def cli():
+ """Trading Platform CLI — Binance spot crypto trading"""
+ pass
+
+
+cli.add_command(data)
+cli.add_command(trade)
+cli.add_command(backtest)
+cli.add_command(portfolio)
+cli.add_command(strategy)
+cli.add_command(service)
+```
+
+Create `cli/src/trading_cli/commands/data.py`:
+
+```python
+import asyncio
+
+import click
+
+
+@click.group()
+def data():
+ """Data collection commands"""
+ pass
+
+
+@data.command()
+@click.option("--symbol", required=True, help="Trading pair (e.g. BTCUSDT)")
+@click.option("--timeframe", default="1m", help="Candle timeframe")
+def collect(symbol: str, timeframe: str):
+ """Start real-time data collection"""
+ click.echo(f"Starting data collection: {symbol} {timeframe}")
+
+ from data_collector.config import CollectorConfig
+ from data_collector.main import run
+
+ asyncio.run(run())
+
+
+@data.command()
+@click.option("--symbol", required=True, help="Trading pair (e.g. BTCUSDT)")
+@click.option("--timeframe", default="1m", help="Candle timeframe")
+@click.option("--from", "since", required=True, help="Start date (YYYY-MM-DD)")
+@click.option("--limit", default=1000, help="Number of candles")
+def history(symbol: str, timeframe: str, since: str, limit: int):
+ """Download historical candle data"""
+ click.echo(f"Downloading history: {symbol} {timeframe} from {since} (limit={limit})")
+
+ async def _run():
+ import ccxt.async_support as ccxt
+ from datetime import datetime, timezone
+ from shared.broker import RedisBroker
+ from shared.config import Settings
+ from shared.db import Database
+ from data_collector.binance_rest import fetch_historical_candles
+ from data_collector.storage import CandleStorage
+
+ settings = Settings()
+ db = Database(settings.database_url)
+ await db.connect()
+ await db.init_tables()
+ broker = RedisBroker(settings.redis_url)
+ storage = CandleStorage(db=db, broker=broker)
+
+ exchange = ccxt.binance()
+ since_dt = datetime.strptime(since, "%Y-%m-%d").replace(tzinfo=timezone.utc)
+ candles = await fetch_historical_candles(
+ exchange=exchange,
+ symbol=symbol.replace("USDT", "/USDT"),
+ timeframe=timeframe,
+ since=since_dt,
+ limit=limit,
+ )
+ await storage.store_batch(candles)
+ await exchange.close()
+ await broker.close()
+ await db.close()
+ click.echo(f"Downloaded {len(candles)} candles")
+
+ asyncio.run(_run())
+
+
+@data.command("list")
+def list_data():
+ """List currently collecting symbols"""
+ click.echo("Collecting symbols:")
+ click.echo(" (Check docker-compose service status)")
+```
+
+- [ ] **Step 5: Implement remaining CLI command stubs**
+
+Create `cli/src/trading_cli/commands/trade.py`:
+
+```python
+import click
+
+
+@click.group()
+def trade():
+ """Trading bot commands"""
+ pass
+
+
+@trade.command()
+@click.option("--strategy", required=True, help="Strategy name")
+@click.option("--symbol", required=True, help="Trading pair")
+def start(strategy: str, symbol: str):
+ """Start a trading bot"""
+ click.echo(f"Starting bot: strategy={strategy} symbol={symbol}")
+
+
+@trade.command()
+@click.option("--strategy", required=True, help="Strategy name")
+def stop(strategy: str):
+ """Stop a trading bot"""
+ click.echo(f"Stopping bot: strategy={strategy}")
+
+
+@trade.command()
+def status():
+ """Show running bot status"""
+ click.echo("Running bots:")
+
+
+@trade.command("stop-all")
+def stop_all():
+ """Emergency stop: stop all bots and cancel all orders"""
+ click.confirm("Are you sure you want to stop ALL bots?", abort=True)
+ click.echo("Stopping all bots and cancelling open orders...")
+```
+
+Create `cli/src/trading_cli/commands/backtest.py`:
+
+```python
+import asyncio
+from decimal import Decimal
+
+import click
+
+
+@click.group()
+def backtest():
+ """Backtesting commands"""
+ pass
+
+
+@backtest.command("run")
+@click.option("--strategy", required=True, help="Strategy name")
+@click.option("--symbol", required=True, help="Trading pair")
+@click.option("--from", "since", required=True, help="Start date")
+@click.option("--to", "until", required=True, help="End date")
+@click.option("--balance", default=10000.0, help="Initial balance")
+def run_backtest(strategy: str, symbol: str, since: str, until: str, balance: float):
+ """Run a backtest"""
+ click.echo(f"Running backtest: {strategy} on {symbol} ({since} ~ {until})")
+
+ async def _run():
+ from pathlib import Path
+ from shared.config import Settings
+ from shared.db import Database
+ from backtester.main import run_backtest as bt_run
+
+ settings = Settings()
+ db = Database(settings.database_url)
+ await db.connect()
+
+ strategies_dir = Path(__file__).parent.parent.parent.parent.parent / "services" / "strategy-engine" / "strategies"
+ report = await bt_run(
+ strategy_name=strategy,
+ symbol=symbol,
+ timeframe="1m",
+ initial_balance=Decimal(str(balance)),
+ db=db,
+ strategies_dir=strategies_dir,
+ )
+ click.echo(report)
+ await db.close()
+
+ asyncio.run(_run())
+
+
+@backtest.command()
+@click.option("--id", "report_id", default="latest", help="Report ID")
+def report(report_id: str):
+ """Show backtest report"""
+ click.echo(f"Showing report: {report_id}")
+```
+
+Create `cli/src/trading_cli/commands/portfolio.py`:
+
+```python
+import click
+
+
+@click.group()
+def portfolio():
+ """Portfolio commands"""
+ pass
+
+
+@portfolio.command()
+def show():
+ """Show current portfolio"""
+ click.echo("Current Portfolio:")
+ click.echo(" (Connect to portfolio-manager service)")
+
+
+@portfolio.command()
+@click.option("--days", default=30, help="Number of days")
+def history(days: int):
+ """Show PnL history"""
+ click.echo(f"PnL history (last {days} days):")
+```
+
+Create `cli/src/trading_cli/commands/strategy.py`:
+
+```python
+from pathlib import Path
+
+import click
+
+
+@click.group()
+def strategy():
+ """Strategy management commands"""
+ pass
+
+
+@strategy.command("list")
+def list_strategies():
+ """List available strategies"""
+ from strategy_engine.plugin_loader import load_strategies
+
+ strategies_dir = Path(__file__).parent.parent.parent.parent.parent / "services" / "strategy-engine" / "strategies"
+ strategies = load_strategies(strategies_dir)
+ click.echo("Available strategies:")
+ for s in strategies:
+ click.echo(f" - {s.name}")
+
+
+@strategy.command()
+@click.option("--name", required=True, help="Strategy name")
+def info(name: str):
+ """Show strategy details"""
+ click.echo(f"Strategy: {name}")
+```
+
+Create `cli/src/trading_cli/commands/service.py`:
+
+```python
+import subprocess
+
+import click
+
+
+@click.group()
+def service():
+ """Service management commands"""
+ pass
+
+
+@service.command()
+def up():
+ """Start all services"""
+ click.echo("Starting all services...")
+ subprocess.run(["docker", "compose", "up", "-d"], check=True)
+
+
+@service.command()
+def down():
+ """Stop all services"""
+ click.echo("Stopping all services...")
+ subprocess.run(["docker", "compose", "down"], check=True)
+
+
+@service.command()
+@click.option("--name", required=True, help="Service name")
+def logs(name: str):
+ """Show service logs"""
+ subprocess.run(["docker", "compose", "logs", "-f", name])
+```
+
+- [ ] **Step 6: Run tests to verify they pass**
+
+```bash
+pytest cli/tests/test_cli_data.py -v
+```
+
+Expected: All PASS
+
+- [ ] **Step 7: Commit**
+
+```bash
+git add cli/
+git commit -m "feat(cli): add Click-based CLI with data, trade, backtest, portfolio, strategy, and service commands"
+```
+
+---
+
+## Task 11: Integration Verification
+
+- [ ] **Step 1: Run all tests**
+
+```bash
+cd /home/si/Private/repos/trading
+pytest -v
+```
+
+Expected: All tests pass
+
+- [ ] **Step 2: Lint check**
+
+```bash
+ruff check .
+```
+
+Fix any issues found.
+
+- [ ] **Step 3: Verify Docker builds**
+
+```bash
+docker compose build
+```
+
+Expected: All services build successfully
+
+- [ ] **Step 4: Start infrastructure and verify**
+
+```bash
+make infra
+# Wait for healthy status
+docker compose ps
+```
+
+Expected: redis and postgres running and healthy
+
+- [ ] **Step 5: Final commit**
+
+```bash
+git add .
+git commit -m "chore: integration verification — all tests pass, docker builds succeed"
+```
diff --git a/docs/superpowers/specs/2026-04-01-crypto-trading-platform-design.md b/docs/superpowers/specs/2026-04-01-crypto-trading-platform-design.md
new file mode 100644
index 0000000..aa32eb4
--- /dev/null
+++ b/docs/superpowers/specs/2026-04-01-crypto-trading-platform-design.md
@@ -0,0 +1,374 @@
+# Crypto Trading Platform — Design Spec
+
+## Overview
+
+Binance 현물 암호화폐 자동매매 플랫폼. 마이크로서비스 아키텍처 기반으로 데이터 수집, 전략 실행, 주문 처리, 포트폴리오 관리, 백테스팅을 독립 서비스로 운영한다. CLI로 제어하며, 전략은 플러그인 방식으로 확장 가능하다.
+
+- **시장:** 암호화폐 (Binance 현물)
+- **언어:** Python
+- **인터페이스:** CLI (Click)
+- **아키텍처:** 마이크로서비스 (Docker Compose)
+
+---
+
+## Architecture
+
+### 서비스 구성
+
+```
+┌─────────────┐ ┌──────────────────┐ ┌─────────────────┐
+│ Data │───▶│ Message Broker │◀──│ Strategy │
+│ Collector │ │ (Redis Streams) │ │ Engine │
+└─────────────┘ └──────────────────┘ └─────────────────┘
+ │ ▲ │
+ ▼ │ ▼
+ ┌──────────────────┐ ┌─────────────────┐
+ │ Backtester │ │ Order │
+ │ │ │ Executor │
+ └──────────────────┘ └─────────────────┘
+ │
+ ┌────────────────────────┘
+ ▼
+ ┌──────────────────┐
+ │ Portfolio │
+ │ Manager │
+ └──────────────────┘
+
+ CLI ──────▶ 각 서비스에 명령 전달
+```
+
+| 서비스 | 역할 | 상시 실행 |
+|--------|------|-----------|
+| **data-collector** | Binance WebSocket/REST로 시세 수집, DB 저장 | Yes |
+| **strategy-engine** | 플러그인 전략 로드 및 시그널 생성 | 봇 실행 시 |
+| **order-executor** | 시그널 받아 실제 주문 실행 + 리스크 관리 | 봇 실행 시 |
+| **portfolio-manager** | 잔고, 손익, 포지션 추적 | Yes |
+| **backtester** | 과거 데이터로 전략 검증 | 요청 시 |
+| **shared** | 공통 모델, 이벤트 정의, 유틸리티 (라이브러리) | — |
+| **cli** | 사용자 인터페이스, 각 서비스 제어 | — |
+
+### 통신 흐름
+
+```
+[Binance WS]
+ │
+ ▼
+data-collector ──publish──▶ Redis Stream: "candles.{symbol}"
+ │
+ ┌───────────────┤
+ ▼ ▼
+ strategy-engine backtester (과거 데이터는 DB에서)
+ │
+ ▼
+ Redis Stream: "signals"
+ │
+ ▼
+ order-executor
+ │
+ ┌───────┴───────┐
+ ▼ ▼
+ [Binance API] Redis Stream: "orders"
+ │
+ ▼
+ portfolio-manager
+```
+
+---
+
+## Project Structure
+
+```
+trading/
+├── services/
+│ ├── data-collector/
+│ │ ├── src/
+│ │ │ ├── __init__.py
+│ │ │ ├── main.py # 서비스 진입점
+│ │ │ ├── binance_ws.py # WebSocket 실시간 시세
+│ │ │ ├── binance_rest.py # REST 과거 데이터 수집
+│ │ │ ├── storage.py # DB 저장 로직
+│ │ │ └── config.py
+│ │ ├── tests/
+│ │ ├── Dockerfile
+│ │ └── pyproject.toml
+│ │
+│ ├── strategy-engine/
+│ │ ├── src/
+│ │ │ ├── __init__.py
+│ │ │ ├── main.py
+│ │ │ ├── engine.py # 전략 로더 + 실행기
+│ │ │ ├── plugin_loader.py # 플러그인 동적 로드
+│ │ │ └── config.py
+│ │ ├── strategies/ # 플러그인 전략 디렉토리
+│ │ │ ├── base.py # 전략 추상 클래스
+│ │ │ ├── rsi_strategy.py # 예시: RSI 전략
+│ │ │ └── grid_strategy.py # 예시: 그리드 전략
+│ │ ├── tests/
+│ │ ├── Dockerfile
+│ │ └── pyproject.toml
+│ │
+│ ├── order-executor/
+│ │ ├── src/
+│ │ │ ├── __init__.py
+│ │ │ ├── main.py
+│ │ │ ├── executor.py # 주문 실행 로직
+│ │ │ ├── risk_manager.py # 리스크 관리 (손절/익절)
+│ │ │ └── config.py
+│ │ ├── tests/
+│ │ ├── Dockerfile
+│ │ └── pyproject.toml
+│ │
+│ ├── portfolio-manager/
+│ │ ├── src/
+│ │ │ ├── __init__.py
+│ │ │ ├── main.py
+│ │ │ ├── portfolio.py # 잔고/포지션 추적
+│ │ │ ├── pnl.py # 손익 계산
+│ │ │ └── config.py
+│ │ ├── tests/
+│ │ ├── Dockerfile
+│ │ └── pyproject.toml
+│ │
+│ └── backtester/
+│ ├── src/
+│ │ ├── __init__.py
+│ │ ├── main.py
+│ │ ├── engine.py # 백테스팅 엔진
+│ │ ├── simulator.py # 가상 주문 시뮬레이터
+│ │ ├── reporter.py # 결과 리포트 생성
+│ │ └── config.py
+│ ├── tests/
+│ ├── Dockerfile
+│ └── pyproject.toml
+│
+├── shared/
+│ ├── src/shared/
+│ │ ├── __init__.py
+│ │ ├── models.py # 공통 데이터 모델
+│ │ ├── events.py # 이벤트 타입 정의
+│ │ ├── broker.py # Redis Streams 클라이언트
+│ │ ├── db.py # DB 연결 (PostgreSQL)
+│ │ └── config.py # 공통 설정
+│ ├── tests/
+│ └── pyproject.toml
+│
+├── cli/
+│ ├── src/
+│ │ ├── __init__.py
+│ │ ├── main.py # Click 기반 CLI 진입점
+│ │ ├── commands/
+│ │ │ ├── data.py # 데이터 수집 명령
+│ │ │ ├── trade.py # 매매 시작/중지
+│ │ │ ├── backtest.py # 백테스팅 실행
+│ │ │ ├── portfolio.py # 포트폴리오 조회
+│ │ │ └── strategy.py # 전략 관리
+│ │ └── config.py
+│ ├── tests/
+│ └── pyproject.toml
+│
+├── docker-compose.yml # 전체 서비스 오케스트레이션
+├── .env.example # 환경변수 템플릿
+├── Makefile # 공통 명령어
+└── README.md
+```
+
+---
+
+## Tech Stack
+
+| 용도 | 라이브러리 |
+|------|-----------|
+| 거래소 API | **ccxt** |
+| 메시지 브로커 | **Redis Streams** |
+| DB | **PostgreSQL** + **asyncpg** |
+| CLI | **Click** |
+| 데이터 분석 | **pandas**, **numpy** |
+| 기술 지표 | **pandas-ta** |
+| 비동기 처리 | **asyncio** + **aiohttp** |
+| 설정 관리 | **pydantic-settings** |
+| 컨테이너 | **Docker** + **docker-compose** |
+| 테스트 | **pytest** + **pytest-asyncio** |
+
+---
+
+## Data Models
+
+### Core Models (shared/models.py)
+
+```python
+class Candle:
+ symbol: str # "BTCUSDT"
+ timeframe: str # "1m", "5m", "1h"
+ open_time: datetime
+ open: Decimal
+ high: Decimal
+ low: Decimal
+ close: Decimal
+ volume: Decimal
+
+class Signal:
+ strategy: str # "rsi_strategy"
+ symbol: str
+ side: "BUY" | "SELL"
+ price: Decimal
+ quantity: Decimal
+ reason: str # 시그널 발생 근거
+
+class Order:
+ id: str
+ signal_id: str # 추적용
+ symbol: str
+ side: "BUY" | "SELL"
+ type: "MARKET" | "LIMIT"
+ price: Decimal
+ quantity: Decimal
+ status: "PENDING" | "FILLED" | "CANCELLED" | "FAILED"
+ created_at: datetime
+ filled_at: datetime | None
+
+class Position:
+ symbol: str
+ quantity: Decimal
+ avg_entry_price: Decimal
+ current_price: Decimal
+ unrealized_pnl: Decimal
+```
+
+### PostgreSQL Tables
+
+| 테이블 | 용도 |
+|--------|------|
+| `candles` | 시세 이력 (파티셔닝: symbol + timeframe) |
+| `signals` | 전략 시그널 이력 |
+| `orders` | 주문 이력 |
+| `trades` | 체결 이력 |
+| `positions` | 현재 포지션 |
+| `portfolio_snapshots` | 일별 포트폴리오 스냅샷 |
+
+### Storage Strategy
+
+- **실시간 시세:** Redis 캐싱 + PostgreSQL 영구 저장
+- **주문/체결:** PostgreSQL 즉시 기록
+- **백테스팅 데이터:** PostgreSQL에서 bulk read (pandas DataFrame)
+
+---
+
+## Strategy Plugin System
+
+### Base Interface
+
+```python
+from abc import ABC, abstractmethod
+from shared.models import Candle, Signal
+
+class BaseStrategy(ABC):
+ @abstractmethod
+ def on_candle(self, candle: Candle) -> Signal | None:
+ """캔들 데이터 수신 시 시그널 반환"""
+ pass
+
+ @abstractmethod
+ def configure(self, params: dict) -> None:
+ """전략 파라미터 설정"""
+ pass
+```
+
+새 전략 추가 = `BaseStrategy` 상속 파일 하나 작성 후 `strategies/` 디렉토리에 배치.
+
+### 예시 전략
+
+- **RSI Strategy:** RSI 과매도 시 매수, 과매수 시 매도
+- **Grid Strategy:** 가격 구간을 나눠 자동 매수/매도 주문 배치
+
+---
+
+## CLI Interface
+
+```bash
+# 데이터 수집
+trading data collect --symbol BTCUSDT --timeframe 1m
+trading data history --symbol BTCUSDT --from 2025-01-01
+trading data list
+
+# 자동매매
+trading trade start --strategy rsi --symbol BTCUSDT
+trading trade stop --strategy rsi
+trading trade status
+
+# 수동매매
+trading order buy --symbol BTCUSDT --quantity 0.01
+trading order sell --symbol BTCUSDT --price 70000
+trading order cancel --id abc123
+
+# 백테스팅
+trading backtest run --strategy rsi --symbol BTCUSDT \
+ --from 2025-01-01 --to 2025-12-31
+trading backtest report --id latest
+
+# 포트폴리오
+trading portfolio show
+trading portfolio history --days 30
+
+# 전략 관리
+trading strategy list
+trading strategy info --name rsi
+
+# 서비스 관리
+trading service up
+trading service down
+trading service logs --name strategy-engine
+```
+
+---
+
+## Risk Management
+
+### Risk Check Pipeline (order-executor)
+
+시그널 수신 시 다음 체크를 순서대로 통과해야 주문 실행:
+
+1. 최대 포지션 크기 초과 여부
+2. 일일 최대 손실 한도 도달 여부
+3. 동일 심볼 중복 주문 방지
+4. 주문 금액 < 가용 잔고 확인
+5. 가격 급변 감지 (슬리피지 보호)
+
+### Safety Mechanisms
+
+| 장치 | 설명 |
+|------|------|
+| **긴급 정지 (Kill Switch)** | `trading trade stop-all` — 모든 봇 중지, 미체결 주문 전량 취소 |
+| **일일 손실 한도** | 설정 비율 초과 시 자동 매매 중단 |
+| **최대 포지션 제한** | 총 자산 대비 단일 심볼 비율 제한 |
+| **연결 끊김 대응** | Binance 연결 끊기면 신규 주문 중단, 재연결 시도 |
+| **드라이런 모드** | 실제 주문 없이 시그널만 생성 — 전략 검증용 |
+
+---
+
+## Configuration (.env)
+
+```
+BINANCE_API_KEY=
+BINANCE_API_SECRET=
+REDIS_URL=redis://localhost:6379
+DATABASE_URL=postgresql://user:pass@localhost:5432/trading
+LOG_LEVEL=INFO
+RISK_MAX_POSITION_SIZE=0.1
+RISK_STOP_LOSS_PCT=5
+RISK_DAILY_LOSS_LIMIT_PCT=10
+DRY_RUN=true
+```
+
+---
+
+## Docker Compose Services
+
+```yaml
+services:
+ redis: # 메시지 브로커 (항상 실행)
+ postgres: # 데이터 저장소 (항상 실행)
+ data-collector: # 시세 수집 (항상 실행)
+ strategy-engine: # 전략 엔진 (봇 실행 시)
+ order-executor: # 주문 실행 (봇 실행 시)
+ portfolio-manager: # 포트폴리오 (항상 실행)
+```
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..debb032
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,14 @@
+[project]
+name = "trading-platform"
+version = "0.1.0"
+description = "Binance spot crypto trading platform"
+requires-python = ">=3.12"
+
+[tool.pytest.ini_options]
+asyncio_mode = "auto"
+testpaths = ["shared/tests", "services", "cli/tests"]
+addopts = "--import-mode=importlib"
+
+[tool.ruff]
+target-version = "py312"
+line-length = 100
diff --git a/services/backtester/Dockerfile b/services/backtester/Dockerfile
new file mode 100644
index 0000000..77ec453
--- /dev/null
+++ b/services/backtester/Dockerfile
@@ -0,0 +1,7 @@
+FROM python:3.12-slim
+WORKDIR /app
+COPY shared/ shared/
+RUN pip install --no-cache-dir ./shared
+COPY services/backtester/ services/backtester/
+RUN pip install --no-cache-dir ./services/backtester
+CMD ["python", "-m", "backtester.main"]
diff --git a/services/backtester/pyproject.toml b/services/backtester/pyproject.toml
new file mode 100644
index 0000000..b51f913
--- /dev/null
+++ b/services/backtester/pyproject.toml
@@ -0,0 +1,16 @@
+[project]
+name = "backtester"
+version = "0.1.0"
+description = "Strategy backtesting engine"
+requires-python = ">=3.12"
+dependencies = ["pandas>=2.0", "trading-shared"]
+
+[project.optional-dependencies]
+dev = ["pytest>=8.0", "pytest-asyncio>=0.23"]
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/backtester"]
diff --git a/services/backtester/src/backtester/__init__.py b/services/backtester/src/backtester/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/services/backtester/src/backtester/__init__.py
diff --git a/services/backtester/src/backtester/config.py b/services/backtester/src/backtester/config.py
new file mode 100644
index 0000000..bfbc196
--- /dev/null
+++ b/services/backtester/src/backtester/config.py
@@ -0,0 +1,13 @@
+"""Configuration for the backtester service."""
+from pydantic_settings import BaseSettings
+
+
+class BacktestConfig(BaseSettings):
+ backtest_initial_balance: float = 10000.0
+ database_url: str = "postgresql://trading:trading@localhost:5432/trading"
+ symbol: str = "BTCUSDT"
+ timeframe: str = "1h"
+ strategy_name: str = "sma_crossover"
+ candle_limit: int = 500
+
+ model_config = {"env_file": ".env", "env_file_encoding": "utf-8", "extra": "ignore"}
diff --git a/services/backtester/src/backtester/engine.py b/services/backtester/src/backtester/engine.py
new file mode 100644
index 0000000..b89d422
--- /dev/null
+++ b/services/backtester/src/backtester/engine.py
@@ -0,0 +1,95 @@
+"""Backtesting engine that runs strategies against historical candle data."""
+from dataclasses import dataclass, field
+from decimal import Decimal
+from typing import Protocol
+
+from shared.models import Candle, Signal
+
+from backtester.simulator import OrderSimulator, SimulatedTrade
+
+
+class StrategyProtocol(Protocol):
+ """Protocol matching BaseStrategy from strategy-engine."""
+
+ name: str
+
+ def on_candle(self, candle: Candle) -> Signal | None: ...
+
+ def configure(self, params: dict) -> None: ...
+
+ def reset(self) -> None: ...
+
+
+@dataclass
+class BacktestResult:
+ strategy_name: str
+ symbol: str
+ total_trades: int
+ initial_balance: Decimal
+ final_balance: Decimal
+ profit: Decimal
+ profit_pct: Decimal
+ trades: list[SimulatedTrade] = field(default_factory=list)
+
+ @property
+ def win_rate(self) -> float:
+ """Calculate win rate based on buy/sell pairs."""
+ buy_prices: list[Decimal] = []
+ wins = 0
+ total_pairs = 0
+
+ for trade in self.trades:
+ if trade.side.value == "BUY":
+ buy_prices.append(trade.price)
+ else:
+ if buy_prices:
+ buy_price = buy_prices.pop(0)
+ total_pairs += 1
+ if trade.price > buy_price:
+ wins += 1
+
+ if total_pairs == 0:
+ return 0.0
+ return wins / total_pairs * 100
+
+
+class BacktestEngine:
+ """Runs a strategy against historical candles using a simulated order executor."""
+
+ def __init__(self, strategy: StrategyProtocol, initial_balance: Decimal) -> None:
+ self._strategy = strategy
+ self._initial_balance = initial_balance
+
+ def run(self, candles: list[Candle]) -> BacktestResult:
+ """Run the backtest over a list of candles and return a result."""
+ simulator = OrderSimulator(self._initial_balance)
+
+ for candle in candles:
+ signal = self._strategy.on_candle(candle)
+ if signal is not None:
+ simulator.execute(signal)
+
+ # Calculate final balance including open positions valued at last candle close
+ final_balance = simulator.balance
+ if candles:
+ last_price = candles[-1].close
+ for symbol, qty in simulator.positions.items():
+ if qty > Decimal("0"):
+ final_balance += qty * last_price
+
+ profit = final_balance - self._initial_balance
+ if self._initial_balance != Decimal("0"):
+ profit_pct = (profit / self._initial_balance) * Decimal("100")
+ else:
+ profit_pct = Decimal("0")
+
+ return BacktestResult(
+ strategy_name=self._strategy.name,
+ symbol=candles[0].symbol if candles else "",
+ total_trades=len(simulator.trades),
+ initial_balance=self._initial_balance,
+ final_balance=final_balance,
+ profit=profit,
+ profit_pct=profit_pct,
+ trades=simulator.trades,
+ )
diff --git a/services/backtester/src/backtester/main.py b/services/backtester/src/backtester/main.py
new file mode 100644
index 0000000..ab69ee1
--- /dev/null
+++ b/services/backtester/src/backtester/main.py
@@ -0,0 +1,60 @@
+"""Main entry point for the backtester service."""
+import sys
+import os
+from decimal import Decimal
+
+# Allow importing strategies from the strategy-engine service
+_STRATEGY_ENGINE_PATH = os.path.join(
+ os.path.dirname(__file__), "../../../../strategy-engine"
+)
+if _STRATEGY_ENGINE_PATH not in sys.path:
+ sys.path.insert(0, _STRATEGY_ENGINE_PATH)
+
+from shared.db import Database
+from shared.models import Candle
+
+from backtester.config import BacktestConfig
+from backtester.engine import BacktestEngine
+from backtester.reporter import format_report
+
+
+async def run_backtest() -> str:
+ """Load strategy, fetch candles, run backtest, and return a formatted report."""
+ config = BacktestConfig()
+
+ # Import strategy dynamically (requires strategy-engine in sys.path)
+ try:
+ from strategies.base import BaseStrategy # noqa: F401
+
+ # Try to import concrete strategy by name
+ module_name = config.strategy_name
+ import importlib
+
+ mod = importlib.import_module(f"strategies.{module_name}")
+ strategy_cls = getattr(mod, "Strategy")
+ strategy = strategy_cls()
+ strategy.configure({})
+ except Exception as exc:
+ raise RuntimeError(
+ f"Failed to load strategy '{config.strategy_name}': {exc}"
+ ) from exc
+
+ db = Database(config.database_url)
+ await db.connect()
+ try:
+ rows = await db.get_candles(config.symbol, config.timeframe, config.candle_limit)
+ candles = [Candle(**row) for row in rows]
+ candles = list(reversed(candles)) # oldest first for strategy processing
+ finally:
+ await db.close()
+
+ engine = BacktestEngine(strategy, Decimal(str(config.backtest_initial_balance)))
+ result = engine.run(candles)
+ return format_report(result)
+
+
+if __name__ == "__main__":
+ import asyncio
+
+ report = asyncio.run(run_backtest())
+ print(report)
diff --git a/services/backtester/src/backtester/reporter.py b/services/backtester/src/backtester/reporter.py
new file mode 100644
index 0000000..916d5d4
--- /dev/null
+++ b/services/backtester/src/backtester/reporter.py
@@ -0,0 +1,28 @@
+"""Report formatting for backtest results."""
+from backtester.engine import BacktestResult
+
+
+def format_report(result: BacktestResult) -> str:
+ """Format a backtest result into a human-readable text report."""
+ separator = "=" * 50
+ lines = [
+ separator,
+ "BACKTEST REPORT",
+ separator,
+ f"Strategy: {result.strategy_name}",
+ f"Symbol: {result.symbol}",
+ separator,
+ "PERFORMANCE SUMMARY",
+ separator,
+ f"Initial Balance: {result.initial_balance:.2f}",
+ f"Final Balance: {result.final_balance:.2f}",
+ f"Profit/Loss: {result.profit:.2f}",
+ f"Profit %: {result.profit_pct:.2f}%",
+ separator,
+ "TRADE STATISTICS",
+ separator,
+ f"Total Trades: {result.total_trades}",
+ f"Win Rate: {result.win_rate:.2f}%",
+ separator,
+ ]
+ return "\n".join(lines)
diff --git a/services/backtester/src/backtester/simulator.py b/services/backtester/src/backtester/simulator.py
new file mode 100644
index 0000000..081ea3b
--- /dev/null
+++ b/services/backtester/src/backtester/simulator.py
@@ -0,0 +1,54 @@
+"""Simulated order executor for backtesting."""
+from dataclasses import dataclass, field
+from decimal import Decimal
+
+from shared.models import OrderSide, Signal
+
+
+@dataclass
+class SimulatedTrade:
+ symbol: str
+ side: OrderSide
+ price: Decimal
+ quantity: Decimal
+ balance_after: Decimal
+
+
+class OrderSimulator:
+ """Simulates order execution against a paper balance."""
+
+ def __init__(self, initial_balance: Decimal) -> None:
+ self.balance: Decimal = initial_balance
+ self.positions: dict[str, Decimal] = {}
+ self.trades: list[SimulatedTrade] = []
+
+ def execute(self, signal: Signal) -> bool:
+ """Execute a signal. Returns True if the trade was accepted, False otherwise."""
+ if signal.side == OrderSide.BUY:
+ cost = signal.price * signal.quantity
+ if cost > self.balance:
+ return False
+ self.balance -= cost
+ self.positions[signal.symbol] = (
+ self.positions.get(signal.symbol, Decimal("0")) + signal.quantity
+ )
+ trade_quantity = signal.quantity
+ else: # SELL
+ current_position = self.positions.get(signal.symbol, Decimal("0"))
+ if current_position <= Decimal("0"):
+ return False
+ trade_quantity = min(signal.quantity, current_position)
+ proceeds = signal.price * trade_quantity
+ self.balance += proceeds
+ self.positions[signal.symbol] = current_position - trade_quantity
+
+ self.trades.append(
+ SimulatedTrade(
+ symbol=signal.symbol,
+ side=signal.side,
+ price=signal.price,
+ quantity=trade_quantity,
+ balance_after=self.balance,
+ )
+ )
+ return True
diff --git a/services/backtester/tests/__init__.py b/services/backtester/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/services/backtester/tests/__init__.py
diff --git a/services/backtester/tests/test_engine.py b/services/backtester/tests/test_engine.py
new file mode 100644
index 0000000..1a25e1c
--- /dev/null
+++ b/services/backtester/tests/test_engine.py
@@ -0,0 +1,74 @@
+"""Tests for the BacktestEngine."""
+from datetime import datetime, timezone
+from decimal import Decimal
+from unittest.mock import MagicMock
+
+import pytest
+
+from shared.models import Candle, Signal, OrderSide
+
+from backtester.engine import BacktestEngine, BacktestResult
+
+
+def make_candle(symbol: str, price: float, timeframe: str = "1h") -> Candle:
+ return Candle(
+ symbol=symbol,
+ timeframe=timeframe,
+ open_time=datetime.now(timezone.utc),
+ open=Decimal(str(price)),
+ high=Decimal(str(price * 1.01)),
+ low=Decimal(str(price * 0.99)),
+ close=Decimal(str(price)),
+ volume=Decimal("100"),
+ )
+
+
+def make_candles(prices: list[float], symbol: str = "BTCUSDT") -> list[Candle]:
+ return [make_candle(symbol, p) for p in prices]
+
+
+def make_signal(side: OrderSide, price: str, quantity: str = "0.1") -> Signal:
+ return Signal(
+ strategy="test",
+ symbol="BTCUSDT",
+ side=side,
+ price=Decimal(price),
+ quantity=Decimal(quantity),
+ reason="test",
+ )
+
+
+def test_backtest_engine_runs_strategy_over_candles():
+ strategy = MagicMock()
+ strategy.name = "mock_strategy"
+ strategy.on_candle.return_value = None
+
+ candles = make_candles([50000.0, 51000.0, 52000.0])
+ engine = BacktestEngine(strategy, Decimal("10000"))
+ result = engine.run(candles)
+
+ assert strategy.on_candle.call_count == 3
+ assert result.total_trades == 0
+ assert result.final_balance == Decimal("10000")
+ assert result.strategy_name == "mock_strategy"
+
+
+def test_backtest_engine_executes_signals():
+ buy_signal = make_signal(OrderSide.BUY, "50000", "0.1")
+ sell_signal = make_signal(OrderSide.SELL, "55000", "0.1")
+
+ strategy = MagicMock()
+ strategy.name = "mock_strategy"
+ strategy.on_candle.side_effect = [buy_signal, None, sell_signal]
+
+ candles = make_candles([50000.0, 52000.0, 55000.0])
+ engine = BacktestEngine(strategy, Decimal("10000"))
+ result = engine.run(candles)
+
+ assert result.total_trades == 2
+ # Initial: 10000, bought 0.1 BTC @ 50000 (cost 5000) → balance 5000
+ # Sold 0.1 BTC @ 55000 (proceeds 5500) → balance 10500
+ expected_final = Decimal("10500")
+ assert result.final_balance == expected_final
+ expected_profit = Decimal("500")
+ assert result.profit == expected_profit
diff --git a/services/backtester/tests/test_reporter.py b/services/backtester/tests/test_reporter.py
new file mode 100644
index 0000000..f5c694c
--- /dev/null
+++ b/services/backtester/tests/test_reporter.py
@@ -0,0 +1,26 @@
+"""Tests for the report formatter."""
+from decimal import Decimal
+
+from backtester.engine import BacktestResult
+from backtester.reporter import format_report
+
+
+def test_format_report_contains_key_metrics():
+ result = BacktestResult(
+ strategy_name="sma_crossover",
+ symbol="BTCUSDT",
+ total_trades=10,
+ initial_balance=Decimal("10000"),
+ final_balance=Decimal("11500"),
+ profit=Decimal("1500"),
+ profit_pct=Decimal("15"),
+ trades=[],
+ )
+ report = format_report(result)
+
+ assert "sma_crossover" in report
+ assert "BTCUSDT" in report
+ assert "10000" in report
+ assert "11500" in report
+ assert "1500" in report
+ assert "15" in report
diff --git a/services/backtester/tests/test_simulator.py b/services/backtester/tests/test_simulator.py
new file mode 100644
index 0000000..9d8b23e
--- /dev/null
+++ b/services/backtester/tests/test_simulator.py
@@ -0,0 +1,73 @@
+"""Tests for the OrderSimulator."""
+from decimal import Decimal
+
+import pytest
+
+from shared.models import Signal, OrderSide, OrderType
+from backtester.simulator import OrderSimulator
+
+
+def make_signal(
+ symbol: str,
+ side: OrderSide,
+ price: str,
+ quantity: str,
+ strategy: str = "test",
+) -> Signal:
+ return Signal(
+ strategy=strategy,
+ symbol=symbol,
+ side=side,
+ price=Decimal(price),
+ quantity=Decimal(quantity),
+ reason="test",
+ )
+
+
+def test_simulator_initial_balance():
+ sim = OrderSimulator(Decimal("10000"))
+ assert sim.balance == Decimal("10000")
+
+
+def test_simulator_buy_reduces_balance():
+ sim = OrderSimulator(Decimal("10000"))
+ signal = make_signal("BTCUSDT", OrderSide.BUY, "50000", "0.1")
+ result = sim.execute(signal)
+ assert result is True
+ assert sim.balance == Decimal("5000")
+ assert sim.positions["BTCUSDT"] == Decimal("0.1")
+
+
+def test_simulator_sell_increases_balance():
+ sim = OrderSimulator(Decimal("10000"))
+ buy_signal = make_signal("BTCUSDT", OrderSide.BUY, "50000", "0.1")
+ sim.execute(buy_signal)
+ balance_after_buy = sim.balance
+
+ sell_signal = make_signal("BTCUSDT", OrderSide.SELL, "55000", "0.1")
+ result = sim.execute(sell_signal)
+ assert result is True
+ assert sim.balance > balance_after_buy
+ # Profit: sold at 55000, bought at 50000 → gain 500
+ assert sim.balance == Decimal("10000") - Decimal("5000") + Decimal("5500")
+
+
+def test_simulator_reject_buy_insufficient_balance():
+ sim = OrderSimulator(Decimal("100"))
+ signal = make_signal("BTCUSDT", OrderSide.BUY, "50000", "0.1")
+ result = sim.execute(signal)
+ assert result is False
+ assert sim.balance == Decimal("100")
+ assert sim.positions.get("BTCUSDT", Decimal("0")) == Decimal("0")
+
+
+def test_simulator_trade_history():
+ sim = OrderSimulator(Decimal("10000"))
+ signal = make_signal("BTCUSDT", OrderSide.BUY, "50000", "0.1")
+ sim.execute(signal)
+ assert len(sim.trades) == 1
+ trade = sim.trades[0]
+ assert trade.symbol == "BTCUSDT"
+ assert trade.side == OrderSide.BUY
+ assert trade.price == Decimal("50000")
+ assert trade.quantity == Decimal("0.1")
diff --git a/services/data-collector/Dockerfile b/services/data-collector/Dockerfile
new file mode 100644
index 0000000..06f6d72
--- /dev/null
+++ b/services/data-collector/Dockerfile
@@ -0,0 +1,7 @@
+FROM python:3.12-slim
+WORKDIR /app
+COPY shared/ shared/
+RUN pip install --no-cache-dir ./shared
+COPY services/data-collector/ services/data-collector/
+RUN pip install --no-cache-dir ./services/data-collector
+CMD ["python", "-m", "data_collector.main"]
diff --git a/services/data-collector/pyproject.toml b/services/data-collector/pyproject.toml
new file mode 100644
index 0000000..5fba78f
--- /dev/null
+++ b/services/data-collector/pyproject.toml
@@ -0,0 +1,23 @@
+[project]
+name = "data-collector"
+version = "0.1.0"
+description = "Binance market data collector service"
+requires-python = ">=3.12"
+dependencies = [
+ "ccxt>=4.0",
+ "websockets>=12.0",
+ "trading-shared",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest>=8.0",
+ "pytest-asyncio>=0.23",
+]
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/data_collector"]
diff --git a/services/data-collector/src/data_collector/__init__.py b/services/data-collector/src/data_collector/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/services/data-collector/src/data_collector/__init__.py
diff --git a/services/data-collector/src/data_collector/binance_rest.py b/services/data-collector/src/data_collector/binance_rest.py
new file mode 100644
index 0000000..af0eb77
--- /dev/null
+++ b/services/data-collector/src/data_collector/binance_rest.py
@@ -0,0 +1,53 @@
+"""Binance REST API helpers for fetching historical candle data."""
+from datetime import datetime, timezone
+from decimal import Decimal
+
+from shared.models import Candle
+
+
+def _normalize_symbol(symbol: str) -> str:
+ """Convert 'BTC/USDT' to 'BTCUSDT'."""
+ return symbol.replace("/", "")
+
+
+async def fetch_historical_candles(
+ exchange,
+ symbol: str,
+ timeframe: str,
+ since: int,
+ limit: int = 500,
+) -> list[Candle]:
+ """Fetch historical OHLCV candles from the exchange and return Candle models.
+
+ Args:
+ exchange: An async ccxt exchange instance.
+ symbol: Market symbol, e.g. 'BTC/USDT'.
+ timeframe: Candle timeframe, e.g. '1m'.
+ since: Start timestamp in milliseconds.
+ limit: Maximum number of candles to fetch.
+
+ Returns:
+ A list of Candle model instances.
+ """
+ rows = await exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit)
+
+ normalized = _normalize_symbol(symbol)
+ candles: list[Candle] = []
+
+ for row in rows:
+ ts_ms, o, h, l, c, v = row
+ open_time = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc)
+ candles.append(
+ Candle(
+ symbol=normalized,
+ timeframe=timeframe,
+ open_time=open_time,
+ open=Decimal(str(o)),
+ high=Decimal(str(h)),
+ low=Decimal(str(l)),
+ close=Decimal(str(c)),
+ volume=Decimal(str(v)),
+ )
+ )
+
+ return candles
diff --git a/services/data-collector/src/data_collector/binance_ws.py b/services/data-collector/src/data_collector/binance_ws.py
new file mode 100644
index 0000000..7a4bad2
--- /dev/null
+++ b/services/data-collector/src/data_collector/binance_ws.py
@@ -0,0 +1,106 @@
+"""Binance WebSocket client for real-time kline/candle data."""
+import asyncio
+import json
+import logging
+from datetime import datetime, timezone
+from decimal import Decimal
+from typing import Callable, Awaitable
+
+import websockets
+
+from shared.models import Candle
+
+logger = logging.getLogger(__name__)
+
+BINANCE_WS_URL = "wss://stream.binance.com:9443/ws"
+RECONNECT_DELAY = 5 # seconds
+
+
+def _normalize_symbol(symbol: str) -> str:
+ """Convert 'BTC/USDT' to 'BTCUSDT'."""
+ return symbol.replace("/", "")
+
+
+def _stream_name(symbol: str, timeframe: str) -> str:
+ """Build Binance stream name, e.g. 'btcusdt@kline_1m'."""
+ return f"{_normalize_symbol(symbol).lower()}@kline_{timeframe}"
+
+
+class BinanceWebSocket:
+ """Connects to Binance WebSocket streams and emits closed candles."""
+
+ def __init__(
+ self,
+ symbols: list[str],
+ timeframe: str,
+ on_candle: Callable[[Candle], Awaitable[None]],
+ ) -> None:
+ self._symbols = symbols
+ self._timeframe = timeframe
+ self._on_candle = on_candle
+ self._running = False
+
+ def _build_subscribe_message(self) -> dict:
+ streams = [_stream_name(s, self._timeframe) for s in self._symbols]
+ return {
+ "method": "SUBSCRIBE",
+ "params": streams,
+ "id": 1,
+ }
+
+ def _parse_candle(self, message: dict) -> Candle | None:
+ """Parse a kline WebSocket message into a Candle, or None if not closed."""
+ k = message.get("k")
+ if k is None:
+ return None
+ if not k.get("x"): # only closed candles
+ return None
+
+ symbol = k["s"] # already normalized, e.g. 'BTCUSDT'
+ open_time = datetime.fromtimestamp(k["t"] / 1000, tz=timezone.utc)
+ return Candle(
+ symbol=symbol,
+ timeframe=self._timeframe,
+ open_time=open_time,
+ open=Decimal(k["o"]),
+ high=Decimal(k["h"]),
+ low=Decimal(k["l"]),
+ close=Decimal(k["c"]),
+ volume=Decimal(k["v"]),
+ )
+
+ async def _run_once(self) -> None:
+ """Single connection attempt; processes messages until disconnected."""
+ async with websockets.connect(BINANCE_WS_URL) as ws:
+ subscribe_msg = self._build_subscribe_message()
+ await ws.send(json.dumps(subscribe_msg))
+ logger.info("Subscribed to Binance streams: %s", subscribe_msg["params"])
+
+ async for raw in ws:
+ if not self._running:
+ break
+ try:
+ message = json.loads(raw)
+ candle = self._parse_candle(message)
+ if candle is not None:
+ await self._on_candle(candle)
+ except Exception:
+ logger.exception("Error processing WebSocket message: %s", raw)
+
+ async def start(self) -> None:
+ """Connect to Binance WebSocket and process messages, auto-reconnecting."""
+ self._running = True
+ while self._running:
+ try:
+ await self._run_once()
+ except Exception:
+ if not self._running:
+ break
+ logger.warning(
+ "WebSocket disconnected. Reconnecting in %ds…", RECONNECT_DELAY
+ )
+ await asyncio.sleep(RECONNECT_DELAY)
+
+ def stop(self) -> None:
+ """Signal the WebSocket loop to stop after the current message."""
+ self._running = False
diff --git a/services/data-collector/src/data_collector/config.py b/services/data-collector/src/data_collector/config.py
new file mode 100644
index 0000000..1e080e5
--- /dev/null
+++ b/services/data-collector/src/data_collector/config.py
@@ -0,0 +1,6 @@
+from shared.config import Settings
+
+
+class CollectorConfig(Settings):
+ symbols: list[str] = ["BTC/USDT"]
+ timeframes: list[str] = ["1m"]
diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py
new file mode 100644
index 0000000..adf1e96
--- /dev/null
+++ b/services/data-collector/src/data_collector/main.py
@@ -0,0 +1,58 @@
+"""Data Collector Service entry point."""
+import asyncio
+import logging
+
+from shared.broker import RedisBroker
+from shared.db import Database
+
+from data_collector.binance_ws import BinanceWebSocket
+from data_collector.config import CollectorConfig
+from data_collector.storage import CandleStorage
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+
+async def run() -> None:
+ """Initialise all components and start the WebSocket collector."""
+ config = CollectorConfig()
+
+ db = Database(config.database_url)
+ await db.connect()
+ await db.init_tables()
+
+ broker = RedisBroker(config.redis_url)
+ storage = CandleStorage(db=db, broker=broker)
+
+ async def on_candle(candle):
+ logger.info("Candle received: %s %s %s", candle.symbol, candle.timeframe, candle.open_time)
+ await storage.store(candle)
+
+ # Use the first configured timeframe for the WebSocket subscription.
+ timeframe = config.timeframes[0] if config.timeframes else "1m"
+
+ ws = BinanceWebSocket(
+ symbols=config.symbols,
+ timeframe=timeframe,
+ on_candle=on_candle,
+ )
+
+ logger.info(
+ "Starting data collector for symbols=%s timeframe=%s",
+ config.symbols,
+ timeframe,
+ )
+
+ try:
+ await ws.start()
+ finally:
+ await broker.close()
+ await db.close()
+
+
+def main() -> None:
+ asyncio.run(run())
+
+
+if __name__ == "__main__":
+ main()
diff --git a/services/data-collector/src/data_collector/storage.py b/services/data-collector/src/data_collector/storage.py
new file mode 100644
index 0000000..1e40b82
--- /dev/null
+++ b/services/data-collector/src/data_collector/storage.py
@@ -0,0 +1,24 @@
+"""Candle storage: persists to DB and publishes to Redis."""
+from shared.events import CandleEvent
+from shared.models import Candle
+
+
+class CandleStorage:
+ """Stores candles in the database and publishes CandleEvents to Redis."""
+
+ def __init__(self, db, broker) -> None:
+ self._db = db
+ self._broker = broker
+
+ async def store(self, candle: Candle) -> None:
+ """Insert candle into DB and publish a CandleEvent to the Redis stream."""
+ await self._db.insert_candle(candle)
+
+ event = CandleEvent(data=candle)
+ stream = f"candles.{candle.symbol}"
+ await self._broker.publish(stream, event.to_dict())
+
+ async def store_batch(self, candles: list[Candle]) -> None:
+ """Store multiple candles one by one."""
+ for candle in candles:
+ await self.store(candle)
diff --git a/services/data-collector/tests/__init__.py b/services/data-collector/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/services/data-collector/tests/__init__.py
diff --git a/services/data-collector/tests/test_binance_rest.py b/services/data-collector/tests/test_binance_rest.py
new file mode 100644
index 0000000..695dcf9
--- /dev/null
+++ b/services/data-collector/tests/test_binance_rest.py
@@ -0,0 +1,53 @@
+"""Tests for binance_rest module."""
+import pytest
+from decimal import Decimal
+from unittest.mock import AsyncMock, MagicMock
+from datetime import datetime, timezone
+
+from data_collector.binance_rest import fetch_historical_candles
+
+
+@pytest.mark.asyncio
+async def test_fetch_historical_candles_parses_response():
+ """Verify that OHLCV rows are correctly parsed into Candle models."""
+ ts = 1700000000000 # milliseconds
+ mock_exchange = MagicMock()
+ mock_exchange.fetch_ohlcv = AsyncMock(
+ return_value=[
+ [ts, 30000.0, 30100.0, 29900.0, 30050.0, 1.5],
+ [ts + 60000, 30050.0, 30200.0, 30000.0, 30150.0, 2.0],
+ ]
+ )
+
+ candles = await fetch_historical_candles(
+ mock_exchange, "BTC/USDT", "1m", since=ts, limit=500
+ )
+
+ assert len(candles) == 2
+
+ c = candles[0]
+ assert c.symbol == "BTCUSDT"
+ assert c.timeframe == "1m"
+ assert c.open_time == datetime.fromtimestamp(ts / 1000, tz=timezone.utc)
+ assert c.open == Decimal("30000.0")
+ assert c.high == Decimal("30100.0")
+ assert c.low == Decimal("29900.0")
+ assert c.close == Decimal("30050.0")
+ assert c.volume == Decimal("1.5")
+
+ mock_exchange.fetch_ohlcv.assert_called_once_with(
+ "BTC/USDT", "1m", since=ts, limit=500
+ )
+
+
+@pytest.mark.asyncio
+async def test_fetch_historical_candles_empty_response():
+ """Verify that an empty exchange response returns an empty list."""
+ mock_exchange = MagicMock()
+ mock_exchange.fetch_ohlcv = AsyncMock(return_value=[])
+
+ candles = await fetch_historical_candles(
+ mock_exchange, "BTC/USDT", "1m", since=1700000000000
+ )
+
+ assert candles == []
diff --git a/services/data-collector/tests/test_storage.py b/services/data-collector/tests/test_storage.py
new file mode 100644
index 0000000..6b27414
--- /dev/null
+++ b/services/data-collector/tests/test_storage.py
@@ -0,0 +1,62 @@
+"""Tests for storage module."""
+import pytest
+from decimal import Decimal
+from datetime import datetime, timezone
+from unittest.mock import AsyncMock, MagicMock
+
+from shared.models import Candle
+from data_collector.storage import CandleStorage
+
+
+def _make_candle(symbol: str = "BTCUSDT") -> Candle:
+ return Candle(
+ symbol=symbol,
+ timeframe="1m",
+ open_time=datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
+ open=Decimal("30000"),
+ high=Decimal("30100"),
+ low=Decimal("29900"),
+ close=Decimal("30050"),
+ volume=Decimal("1.5"),
+ )
+
+
+@pytest.mark.asyncio
+async def test_storage_saves_to_db_and_publishes():
+ """Verify that store() calls insert_candle on db and publish on broker."""
+ mock_db = MagicMock()
+ mock_db.insert_candle = AsyncMock()
+ mock_broker = MagicMock()
+ mock_broker.publish = AsyncMock()
+
+ storage = CandleStorage(db=mock_db, broker=mock_broker)
+ candle = _make_candle()
+
+ await storage.store(candle)
+
+ mock_db.insert_candle.assert_called_once_with(candle)
+ mock_broker.publish.assert_called_once()
+
+ stream_arg = mock_broker.publish.call_args[0][0]
+ assert stream_arg == "candles.BTCUSDT"
+
+ data_arg = mock_broker.publish.call_args[0][1]
+ assert data_arg["type"] == "CANDLE"
+ assert data_arg["data"]["symbol"] == "BTCUSDT"
+
+
+@pytest.mark.asyncio
+async def test_storage_batch_store():
+ """Verify that store_batch() calls store for each candle."""
+ mock_db = MagicMock()
+ mock_db.insert_candle = AsyncMock()
+ mock_broker = MagicMock()
+ mock_broker.publish = AsyncMock()
+
+ storage = CandleStorage(db=mock_db, broker=mock_broker)
+ candles = [_make_candle() for _ in range(3)]
+
+ await storage.store_batch(candles)
+
+ assert mock_db.insert_candle.call_count == 3
+ assert mock_broker.publish.call_count == 3
diff --git a/services/order-executor/Dockerfile b/services/order-executor/Dockerfile
new file mode 100644
index 0000000..f044714
--- /dev/null
+++ b/services/order-executor/Dockerfile
@@ -0,0 +1,7 @@
+FROM python:3.12-slim
+WORKDIR /app
+COPY shared/ shared/
+RUN pip install --no-cache-dir ./shared
+COPY services/order-executor/ services/order-executor/
+RUN pip install --no-cache-dir ./services/order-executor
+CMD ["python", "-m", "order_executor.main"]
diff --git a/services/order-executor/pyproject.toml b/services/order-executor/pyproject.toml
new file mode 100644
index 0000000..eed4fef
--- /dev/null
+++ b/services/order-executor/pyproject.toml
@@ -0,0 +1,16 @@
+[project]
+name = "order-executor"
+version = "0.1.0"
+description = "Order execution service with risk management"
+requires-python = ">=3.12"
+dependencies = ["ccxt>=4.0", "trading-shared"]
+
+[project.optional-dependencies]
+dev = ["pytest>=8.0", "pytest-asyncio>=0.23"]
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/order_executor"]
diff --git a/services/order-executor/src/order_executor/__init__.py b/services/order-executor/src/order_executor/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/services/order-executor/src/order_executor/__init__.py
diff --git a/services/order-executor/src/order_executor/config.py b/services/order-executor/src/order_executor/config.py
new file mode 100644
index 0000000..856045f
--- /dev/null
+++ b/services/order-executor/src/order_executor/config.py
@@ -0,0 +1,6 @@
+"""Order Executor configuration."""
+from shared.config import Settings
+
+
+class ExecutorConfig(Settings):
+ pass
diff --git a/services/order-executor/src/order_executor/executor.py b/services/order-executor/src/order_executor/executor.py
new file mode 100644
index 0000000..16ae52c
--- /dev/null
+++ b/services/order-executor/src/order_executor/executor.py
@@ -0,0 +1,100 @@
+"""Order execution logic."""
+import logging
+from datetime import datetime, timezone
+from decimal import Decimal
+from typing import Any, Optional
+
+from shared.broker import RedisBroker
+from shared.db import Database
+from shared.events import OrderEvent
+from shared.models import Order, OrderSide, OrderStatus, OrderType, Signal
+
+from order_executor.risk_manager import RiskManager
+
+logger = logging.getLogger(__name__)
+
+
+class OrderExecutor:
+ """Executes orders on an exchange with risk gating."""
+
+ def __init__(
+ self,
+ exchange: Any,
+ risk_manager: RiskManager,
+ broker: RedisBroker,
+ db: Database,
+ dry_run: bool = True,
+ ) -> None:
+ self.exchange = exchange
+ self.risk_manager = risk_manager
+ self.broker = broker
+ self.db = db
+ self.dry_run = dry_run
+
+ async def execute(self, signal: Signal) -> Optional[Order]:
+ """Run risk checks and place an order for the given signal."""
+ # Fetch current balance from exchange
+ balance_data = await self.exchange.fetch_balance()
+ # Use USDT (or quote currency) free balance as available capital
+ free_balances = balance_data.get("free", {})
+ quote_currency = signal.symbol.split("/")[-1] if "/" in signal.symbol else "USDT"
+ balance = Decimal(str(free_balances.get(quote_currency, 0)))
+
+ # Fetch current positions
+ positions = {}
+
+ # Compute daily PnL (not tracked at executor level — use 0 unless provided)
+ daily_pnl = Decimal(0)
+
+ # Run risk checks
+ result = self.risk_manager.check(
+ signal=signal,
+ balance=balance,
+ positions=positions,
+ daily_pnl=daily_pnl,
+ )
+
+ if not result.allowed:
+ logger.warning(
+ "Risk check rejected signal %s: %s", signal.id, result.reason
+ )
+ return None
+
+ # Build the order model
+ order = Order(
+ signal_id=signal.id,
+ symbol=signal.symbol,
+ side=signal.side,
+ type=OrderType.MARKET,
+ price=signal.price,
+ quantity=signal.quantity,
+ status=OrderStatus.PENDING,
+ )
+
+ if self.dry_run:
+ order.status = OrderStatus.FILLED
+ order.filled_at = datetime.now(timezone.utc)
+ logger.info("[DRY RUN] Order filled: %s %s %s", order.side, order.quantity, order.symbol)
+ else:
+ try:
+ await self.exchange.create_order(
+ symbol=signal.symbol,
+ type="market",
+ side=signal.side.value.lower(),
+ amount=float(signal.quantity),
+ )
+ order.status = OrderStatus.FILLED
+ order.filled_at = datetime.now(timezone.utc)
+ logger.info("Order filled: %s %s %s", order.side, order.quantity, order.symbol)
+ except Exception as exc:
+ order.status = OrderStatus.FAILED
+ logger.error("Order failed for signal %s: %s", signal.id, exc)
+
+ # Persist to DB
+ await self.db.insert_order(order)
+
+ # Publish order event
+ event = OrderEvent(data=order)
+ await self.broker.publish("orders", event.to_dict())
+
+ return order
diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py
new file mode 100644
index 0000000..b57c513
--- /dev/null
+++ b/services/order-executor/src/order_executor/main.py
@@ -0,0 +1,83 @@
+"""Order Executor Service entry point."""
+import asyncio
+import logging
+from decimal import Decimal
+
+import ccxt.async_support as ccxt
+
+from shared.broker import RedisBroker
+from shared.db import Database
+from shared.events import Event, EventType
+
+from order_executor.config import ExecutorConfig
+from order_executor.executor import OrderExecutor
+from order_executor.risk_manager import RiskManager
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+
+async def run() -> None:
+ config = ExecutorConfig()
+ logging.getLogger().setLevel(config.log_level)
+
+ db = Database(config.database_url)
+ await db.connect()
+ await db.init_tables()
+
+ broker = RedisBroker(config.redis_url)
+
+ exchange = ccxt.binance(
+ {
+ "apiKey": config.binance_api_key,
+ "secret": config.binance_api_secret,
+ }
+ )
+
+ risk_manager = RiskManager(
+ max_position_size=Decimal(str(config.risk_max_position_size)),
+ stop_loss_pct=Decimal(str(config.risk_stop_loss_pct)),
+ daily_loss_limit_pct=Decimal(str(config.risk_daily_loss_limit_pct)),
+ )
+
+ executor = OrderExecutor(
+ exchange=exchange,
+ risk_manager=risk_manager,
+ broker=broker,
+ db=db,
+ dry_run=config.dry_run,
+ )
+
+ last_id = "$"
+ stream = "signals"
+ logger.info("Order executor started, listening on stream=%s dry_run=%s", stream, config.dry_run)
+
+ try:
+ while True:
+ messages = await broker.read(stream, last_id=last_id, count=10, block=5000)
+ for msg in messages:
+ try:
+ event = Event.from_dict(msg)
+ if event.type == EventType.SIGNAL:
+ signal = event.data
+ logger.info("Processing signal %s for %s", signal.id, signal.symbol)
+ await executor.execute(signal)
+ except Exception as exc:
+ logger.error("Failed to process message: %s", exc)
+ if messages:
+ # Advance last_id to avoid re-reading — broker.read returns decoded dicts,
+ # so we track progress by re-reading with "0" for replaying or "$" for new only.
+ # Since we block on "$" we get only new messages each iteration.
+ pass
+ finally:
+ await broker.close()
+ await db.close()
+ await exchange.close()
+
+
+def main() -> None:
+ asyncio.run(run())
+
+
+if __name__ == "__main__":
+ main()
diff --git a/services/order-executor/src/order_executor/risk_manager.py b/services/order-executor/src/order_executor/risk_manager.py
new file mode 100644
index 0000000..8e8a72c
--- /dev/null
+++ b/services/order-executor/src/order_executor/risk_manager.py
@@ -0,0 +1,55 @@
+"""Risk management for order execution."""
+from dataclasses import dataclass
+from decimal import Decimal
+
+from shared.models import Signal, OrderSide, Position
+
+
+@dataclass
+class RiskCheckResult:
+ allowed: bool
+ reason: str
+
+
+class RiskManager:
+ """Evaluates risk before order execution."""
+
+ def __init__(
+ self,
+ max_position_size: Decimal,
+ stop_loss_pct: Decimal,
+ daily_loss_limit_pct: Decimal,
+ ) -> None:
+ self.max_position_size = max_position_size
+ self.stop_loss_pct = stop_loss_pct
+ self.daily_loss_limit_pct = daily_loss_limit_pct
+
+ def check(
+ self,
+ signal: Signal,
+ balance: Decimal,
+ positions: dict[str, Position],
+ daily_pnl: Decimal,
+ ) -> RiskCheckResult:
+ """Run risk checks against a signal and current portfolio state."""
+ # Check daily loss limit
+ if balance > 0 and (daily_pnl / balance) * 100 < -self.daily_loss_limit_pct:
+ return RiskCheckResult(allowed=False, reason="Daily loss limit exceeded")
+
+ if signal.side == OrderSide.BUY:
+ order_cost = signal.price * signal.quantity
+
+ # Check sufficient balance
+ if order_cost > balance:
+ return RiskCheckResult(allowed=False, reason="Insufficient balance")
+
+ # Check position size limit
+ position = positions.get(signal.symbol)
+ current_position_value = Decimal(0)
+ if position is not None:
+ current_position_value = position.quantity * position.current_price
+
+ if balance > 0 and (current_position_value + order_cost) / balance > self.max_position_size:
+ return RiskCheckResult(allowed=False, reason="Position size exceeded")
+
+ return RiskCheckResult(allowed=True, reason="OK")
diff --git a/services/order-executor/tests/__init__.py b/services/order-executor/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/services/order-executor/tests/__init__.py
diff --git a/services/order-executor/tests/test_executor.py b/services/order-executor/tests/test_executor.py
new file mode 100644
index 0000000..5b18992
--- /dev/null
+++ b/services/order-executor/tests/test_executor.py
@@ -0,0 +1,122 @@
+"""Tests for OrderExecutor."""
+from decimal import Decimal
+from unittest.mock import AsyncMock, MagicMock
+
+import pytest
+
+from shared.models import OrderSide, OrderStatus, Signal
+from order_executor.executor import OrderExecutor
+from order_executor.risk_manager import RiskCheckResult, RiskManager
+
+
+def make_signal(side: OrderSide = OrderSide.BUY, price: str = "100", quantity: str = "1") -> Signal:
+ return Signal(
+ strategy="test",
+ symbol="BTC/USDT",
+ side=side,
+ price=Decimal(price),
+ quantity=Decimal(quantity),
+ reason="test",
+ )
+
+
+def make_mock_exchange(free_usdt: float = 10000.0) -> AsyncMock:
+ exchange = AsyncMock()
+ exchange.fetch_balance.return_value = {"free": {"USDT": free_usdt}}
+ exchange.create_order = AsyncMock(return_value={"id": "exchange-order-123"})
+ return exchange
+
+
+def make_mock_risk_manager(allowed: bool = True, reason: str = "OK") -> MagicMock:
+ rm = MagicMock(spec=RiskManager)
+ rm.check.return_value = RiskCheckResult(allowed=allowed, reason=reason)
+ return rm
+
+
+def make_mock_broker() -> AsyncMock:
+ broker = AsyncMock()
+ broker.publish = AsyncMock()
+ return broker
+
+
+def make_mock_db() -> AsyncMock:
+ db = AsyncMock()
+ db.insert_order = AsyncMock()
+ return db
+
+
+@pytest.mark.asyncio
+async def test_executor_places_order_when_risk_passes():
+ """When risk check passes, create_order is called and order status is FILLED."""
+ exchange = make_mock_exchange()
+ risk_manager = make_mock_risk_manager(allowed=True)
+ broker = make_mock_broker()
+ db = make_mock_db()
+
+ executor = OrderExecutor(
+ exchange=exchange,
+ risk_manager=risk_manager,
+ broker=broker,
+ db=db,
+ dry_run=False,
+ )
+
+ signal = make_signal()
+ order = await executor.execute(signal)
+
+ assert order is not None
+ assert order.status == OrderStatus.FILLED
+ exchange.create_order.assert_called_once()
+ db.insert_order.assert_called_once_with(order)
+ broker.publish.assert_called_once()
+
+
+@pytest.mark.asyncio
+async def test_executor_rejects_when_risk_fails():
+ """When risk check fails, create_order is not called and None is returned."""
+ exchange = make_mock_exchange()
+ risk_manager = make_mock_risk_manager(allowed=False, reason="Position size exceeded")
+ broker = make_mock_broker()
+ db = make_mock_db()
+
+ executor = OrderExecutor(
+ exchange=exchange,
+ risk_manager=risk_manager,
+ broker=broker,
+ db=db,
+ dry_run=False,
+ )
+
+ signal = make_signal()
+ order = await executor.execute(signal)
+
+ assert order is None
+ exchange.create_order.assert_not_called()
+ db.insert_order.assert_not_called()
+ broker.publish.assert_not_called()
+
+
+@pytest.mark.asyncio
+async def test_executor_dry_run_does_not_call_exchange():
+ """In dry-run mode, risk passes, order is FILLED, but exchange.create_order is NOT called."""
+ exchange = make_mock_exchange()
+ risk_manager = make_mock_risk_manager(allowed=True)
+ broker = make_mock_broker()
+ db = make_mock_db()
+
+ executor = OrderExecutor(
+ exchange=exchange,
+ risk_manager=risk_manager,
+ broker=broker,
+ db=db,
+ dry_run=True,
+ )
+
+ signal = make_signal()
+ order = await executor.execute(signal)
+
+ assert order is not None
+ assert order.status == OrderStatus.FILLED
+ exchange.create_order.assert_not_called()
+ db.insert_order.assert_called_once_with(order)
+ broker.publish.assert_called_once()
diff --git a/services/order-executor/tests/test_risk_manager.py b/services/order-executor/tests/test_risk_manager.py
new file mode 100644
index 0000000..f6b5545
--- /dev/null
+++ b/services/order-executor/tests/test_risk_manager.py
@@ -0,0 +1,72 @@
+"""Tests for RiskManager."""
+from decimal import Decimal
+
+import pytest
+
+from shared.models import OrderSide, Position, Signal
+from order_executor.risk_manager import RiskManager
+
+
+def make_signal(side: OrderSide, price: str, quantity: str, symbol: str = "BTC/USDT") -> Signal:
+ return Signal(
+ strategy="test",
+ symbol=symbol,
+ side=side,
+ price=Decimal(price),
+ quantity=Decimal(quantity),
+ reason="test signal",
+ )
+
+
+def make_risk_manager(
+ max_position_size: str = "0.1",
+ stop_loss_pct: str = "5.0",
+ daily_loss_limit_pct: str = "10.0",
+) -> RiskManager:
+ return RiskManager(
+ max_position_size=Decimal(max_position_size),
+ stop_loss_pct=Decimal(stop_loss_pct),
+ daily_loss_limit_pct=Decimal(daily_loss_limit_pct),
+ )
+
+
+def test_risk_check_passes_normal_order():
+ """Small BUY order with enough balance should be allowed."""
+ rm = make_risk_manager()
+ signal = make_signal(side=OrderSide.BUY, price="100", quantity="0.5")
+ # cost = 50, balance = 10000, position_value = 0 => (0+50)/10000 = 0.5% < 10%
+ result = rm.check(signal, balance=Decimal("10000"), positions={}, daily_pnl=Decimal("0"))
+ assert result.allowed is True
+ assert result.reason == "OK"
+
+
+def test_risk_check_rejects_exceeding_position_size():
+ """5 BTC at $50,000 = $250,000 order cost on $10,000,000 balance exceeds 10% limit."""
+ rm = make_risk_manager(max_position_size="0.1")
+ signal = make_signal(side=OrderSide.BUY, price="50000", quantity="5")
+ # cost = 250000, balance = 1000000 => 250000/1000000 = 25% > 10%
+ # balance is sufficient (250000 < 1000000) but position size is exceeded
+ result = rm.check(signal, balance=Decimal("1000000"), positions={}, daily_pnl=Decimal("0"))
+ assert result.allowed is False
+ assert result.reason == "Position size exceeded"
+
+
+def test_risk_check_rejects_daily_loss_exceeded():
+ """Daily PnL of -1100 on 10000 balance = -11%, exceeding -10% limit."""
+ rm = make_risk_manager(daily_loss_limit_pct="10.0")
+ signal = make_signal(side=OrderSide.BUY, price="100", quantity="0.1")
+ result = rm.check(
+ signal, balance=Decimal("10000"), positions={}, daily_pnl=Decimal("-1100")
+ )
+ assert result.allowed is False
+ assert result.reason == "Daily loss limit exceeded"
+
+
+def test_risk_check_rejects_insufficient_balance():
+ """Order cost of 500 exceeds available balance of 100."""
+ rm = make_risk_manager()
+ signal = make_signal(side=OrderSide.BUY, price="100", quantity="5")
+ # cost = 500, balance = 100
+ result = rm.check(signal, balance=Decimal("100"), positions={}, daily_pnl=Decimal("0"))
+ assert result.allowed is False
+ assert result.reason == "Insufficient balance"
diff --git a/services/portfolio-manager/Dockerfile b/services/portfolio-manager/Dockerfile
new file mode 100644
index 0000000..3f8587e
--- /dev/null
+++ b/services/portfolio-manager/Dockerfile
@@ -0,0 +1,7 @@
+FROM python:3.12-slim
+WORKDIR /app
+COPY shared/ shared/
+RUN pip install --no-cache-dir ./shared
+COPY services/portfolio-manager/ services/portfolio-manager/
+RUN pip install --no-cache-dir ./services/portfolio-manager
+CMD ["python", "-m", "portfolio_manager.main"]
diff --git a/services/portfolio-manager/pyproject.toml b/services/portfolio-manager/pyproject.toml
new file mode 100644
index 0000000..8245aa0
--- /dev/null
+++ b/services/portfolio-manager/pyproject.toml
@@ -0,0 +1,16 @@
+[project]
+name = "portfolio-manager"
+version = "0.1.0"
+description = "Portfolio tracking and PnL calculation service"
+requires-python = ">=3.12"
+dependencies = ["trading-shared"]
+
+[project.optional-dependencies]
+dev = ["pytest>=8.0", "pytest-asyncio>=0.23"]
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/portfolio_manager"]
diff --git a/services/portfolio-manager/src/portfolio_manager/__init__.py b/services/portfolio-manager/src/portfolio_manager/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/services/portfolio-manager/src/portfolio_manager/__init__.py
diff --git a/services/portfolio-manager/src/portfolio_manager/config.py b/services/portfolio-manager/src/portfolio_manager/config.py
new file mode 100644
index 0000000..bbd5049
--- /dev/null
+++ b/services/portfolio-manager/src/portfolio_manager/config.py
@@ -0,0 +1,6 @@
+"""Portfolio Manager configuration."""
+from shared.config import Settings
+
+
+class PortfolioConfig(Settings):
+ snapshot_interval_hours: int = 24
diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py
new file mode 100644
index 0000000..cb7e6a8
--- /dev/null
+++ b/services/portfolio-manager/src/portfolio_manager/main.py
@@ -0,0 +1,56 @@
+"""Portfolio Manager Service entry point."""
+import asyncio
+import logging
+
+from shared.broker import RedisBroker
+from shared.events import Event, OrderEvent
+
+from portfolio_manager.config import PortfolioConfig
+from portfolio_manager.portfolio import PortfolioTracker
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+ORDERS_STREAM = "orders"
+
+
+async def run() -> None:
+ config = PortfolioConfig()
+ broker = RedisBroker(config.redis_url)
+ tracker = PortfolioTracker()
+
+ last_id = "$"
+ logger.info("Portfolio manager started, listening on stream=%s", ORDERS_STREAM)
+
+ try:
+ while True:
+ messages = await broker.read(ORDERS_STREAM, last_id=last_id, block=1000)
+ for msg in messages:
+ try:
+ event = Event.from_dict(msg)
+ if isinstance(event, OrderEvent):
+ order = event.data
+ tracker.apply_order(order)
+ logger.info(
+ "Applied order symbol=%s side=%s qty=%s price=%s",
+ order.symbol,
+ order.side,
+ order.quantity,
+ order.price,
+ )
+ positions = tracker.get_all_positions()
+ logger.info("Current positions count=%d", len(positions))
+ except Exception:
+ logger.exception("Failed to process message: %s", msg)
+ # Update last_id to the latest processed message id if broker returns ids
+ # Since broker.read returns parsed payloads (not ids), we use "$" to get new msgs
+ finally:
+ await broker.close()
+
+
+def main() -> None:
+ asyncio.run(run())
+
+
+if __name__ == "__main__":
+ main()
diff --git a/services/portfolio-manager/src/portfolio_manager/pnl.py b/services/portfolio-manager/src/portfolio_manager/pnl.py
new file mode 100644
index 0000000..96f0da8
--- /dev/null
+++ b/services/portfolio-manager/src/portfolio_manager/pnl.py
@@ -0,0 +1,21 @@
+"""PnL calculation functions for the portfolio manager."""
+from decimal import Decimal
+
+
+def calculate_unrealized_pnl(
+ quantity: Decimal,
+ avg_entry_price: Decimal,
+ current_price: Decimal,
+) -> Decimal:
+ """Calculate unrealized PnL for an open position."""
+ return quantity * (current_price - avg_entry_price)
+
+
+def calculate_realized_pnl(
+ buy_price: Decimal,
+ sell_price: Decimal,
+ quantity: Decimal,
+ fee: Decimal = Decimal("0"),
+) -> Decimal:
+ """Calculate realized PnL for a completed trade."""
+ return quantity * (sell_price - buy_price) - fee
diff --git a/services/portfolio-manager/src/portfolio_manager/portfolio.py b/services/portfolio-manager/src/portfolio_manager/portfolio.py
new file mode 100644
index 0000000..59106bb
--- /dev/null
+++ b/services/portfolio-manager/src/portfolio_manager/portfolio.py
@@ -0,0 +1,62 @@
+"""Portfolio tracking for the portfolio manager service."""
+from decimal import Decimal
+
+from shared.models import Order, OrderSide, Position
+
+
+class _PositionState:
+ """Internal state for tracking a single symbol's position."""
+
+ def __init__(self) -> None:
+ self.quantity: Decimal = Decimal("0")
+ self.avg_entry: Decimal = Decimal("0")
+
+
+class PortfolioTracker:
+ """Tracks positions and updates them based on filled orders."""
+
+ def __init__(self) -> None:
+ self._positions: dict[str, _PositionState] = {}
+
+ def _get_or_create(self, symbol: str) -> _PositionState:
+ if symbol not in self._positions:
+ self._positions[symbol] = _PositionState()
+ return self._positions[symbol]
+
+ def apply_order(self, order: Order) -> None:
+ """Update internal position state based on a filled order."""
+ state = self._get_or_create(order.symbol)
+
+ if order.side == OrderSide.BUY:
+ # Weighted average entry price
+ total_cost = state.avg_entry * state.quantity + order.price * order.quantity
+ state.quantity += order.quantity
+ if state.quantity > Decimal("0"):
+ state.avg_entry = total_cost / state.quantity
+ elif order.side == OrderSide.SELL:
+ state.quantity -= order.quantity
+ # Keep avg_entry unchanged unless fully sold
+ if state.quantity <= Decimal("0"):
+ state.quantity = Decimal("0")
+ state.avg_entry = Decimal("0")
+
+ def get_position(self, symbol: str) -> Position | None:
+ """Return a Position model for the symbol, or None if no/zero position."""
+ state = self._positions.get(symbol)
+ if state is None or state.quantity <= Decimal("0"):
+ return None
+ return Position(
+ symbol=symbol,
+ quantity=state.quantity,
+ avg_entry_price=state.avg_entry,
+ current_price=state.avg_entry, # No live price here; caller can update
+ )
+
+ def get_all_positions(self) -> list[Position]:
+ """Return all non-zero positions."""
+ positions = []
+ for symbol in self._positions:
+ pos = self.get_position(symbol)
+ if pos is not None:
+ positions.append(pos)
+ return positions
diff --git a/services/portfolio-manager/tests/__init__.py b/services/portfolio-manager/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/services/portfolio-manager/tests/__init__.py
diff --git a/services/portfolio-manager/tests/test_pnl.py b/services/portfolio-manager/tests/test_pnl.py
new file mode 100644
index 0000000..4462adc
--- /dev/null
+++ b/services/portfolio-manager/tests/test_pnl.py
@@ -0,0 +1,32 @@
+"""Tests for PnL calculation functions."""
+from decimal import Decimal
+
+from portfolio_manager.pnl import calculate_realized_pnl, calculate_unrealized_pnl
+
+
+def test_unrealized_pnl_profit() -> None:
+ result = calculate_unrealized_pnl(
+ quantity=Decimal("0.1"),
+ avg_entry_price=Decimal("50000"),
+ current_price=Decimal("55000"),
+ )
+ assert result == Decimal("500")
+
+
+def test_unrealized_pnl_loss() -> None:
+ result = calculate_unrealized_pnl(
+ quantity=Decimal("0.1"),
+ avg_entry_price=Decimal("50000"),
+ current_price=Decimal("45000"),
+ )
+ assert result == Decimal("-500")
+
+
+def test_realized_pnl_single_trade() -> None:
+ result = calculate_realized_pnl(
+ buy_price=Decimal("50000"),
+ sell_price=Decimal("55000"),
+ quantity=Decimal("0.1"),
+ fee=Decimal("5.5"),
+ )
+ assert result == Decimal("494.5")
diff --git a/services/portfolio-manager/tests/test_portfolio.py b/services/portfolio-manager/tests/test_portfolio.py
new file mode 100644
index 0000000..26319ca
--- /dev/null
+++ b/services/portfolio-manager/tests/test_portfolio.py
@@ -0,0 +1,57 @@
+"""Tests for PortfolioTracker."""
+from decimal import Decimal
+
+from shared.models import Order, OrderSide, OrderStatus, OrderType
+from portfolio_manager.portfolio import PortfolioTracker
+
+
+def make_order(side: OrderSide, price: str, quantity: str) -> Order:
+ """Helper to create a filled Order."""
+ return Order(
+ signal_id="test-signal",
+ symbol="BTC/USDT",
+ side=side,
+ type=OrderType.MARKET,
+ price=Decimal(price),
+ quantity=Decimal(quantity),
+ status=OrderStatus.FILLED,
+ )
+
+
+def test_portfolio_add_buy_order() -> None:
+ tracker = PortfolioTracker()
+ order = make_order(OrderSide.BUY, "50000", "0.1")
+ tracker.apply_order(order)
+
+ position = tracker.get_position("BTC/USDT")
+ assert position is not None
+ assert position.quantity == Decimal("0.1")
+ assert position.avg_entry_price == Decimal("50000")
+
+
+def test_portfolio_add_multiple_buys() -> None:
+ tracker = PortfolioTracker()
+ tracker.apply_order(make_order(OrderSide.BUY, "50000", "0.1"))
+ tracker.apply_order(make_order(OrderSide.BUY, "52000", "0.1"))
+
+ position = tracker.get_position("BTC/USDT")
+ assert position is not None
+ assert position.quantity == Decimal("0.2")
+ assert position.avg_entry_price == Decimal("51000")
+
+
+def test_portfolio_sell_reduces_position() -> None:
+ tracker = PortfolioTracker()
+ tracker.apply_order(make_order(OrderSide.BUY, "50000", "0.2"))
+ tracker.apply_order(make_order(OrderSide.SELL, "55000", "0.1"))
+
+ position = tracker.get_position("BTC/USDT")
+ assert position is not None
+ assert position.quantity == Decimal("0.1")
+ assert position.avg_entry_price == Decimal("50000")
+
+
+def test_portfolio_no_position_returns_none() -> None:
+ tracker = PortfolioTracker()
+ position = tracker.get_position("ETH/USDT")
+ assert position is None
diff --git a/services/strategy-engine/Dockerfile b/services/strategy-engine/Dockerfile
new file mode 100644
index 0000000..adecdd4
--- /dev/null
+++ b/services/strategy-engine/Dockerfile
@@ -0,0 +1,7 @@
+FROM python:3.12-slim
+WORKDIR /app
+COPY shared/ shared/
+RUN pip install --no-cache-dir ./shared
+COPY services/strategy-engine/ services/strategy-engine/
+RUN pip install --no-cache-dir ./services/strategy-engine
+CMD ["python", "-m", "strategy_engine.main"]
diff --git a/services/strategy-engine/pyproject.toml b/services/strategy-engine/pyproject.toml
new file mode 100644
index 0000000..a86b282
--- /dev/null
+++ b/services/strategy-engine/pyproject.toml
@@ -0,0 +1,19 @@
+[project]
+name = "strategy-engine"
+version = "0.1.0"
+description = "Plugin-based strategy execution engine"
+requires-python = ">=3.12"
+dependencies = [
+ "pandas>=2.0",
+ "trading-shared",
+]
+
+[project.optional-dependencies]
+dev = ["pytest>=8.0", "pytest-asyncio>=0.23"]
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/strategy_engine"]
diff --git a/services/strategy-engine/src/strategy_engine/__init__.py b/services/strategy-engine/src/strategy_engine/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/services/strategy-engine/src/strategy_engine/__init__.py
diff --git a/services/strategy-engine/src/strategy_engine/config.py b/services/strategy-engine/src/strategy_engine/config.py
new file mode 100644
index 0000000..2864b09
--- /dev/null
+++ b/services/strategy-engine/src/strategy_engine/config.py
@@ -0,0 +1,8 @@
+"""Strategy Engine configuration."""
+from shared.config import Settings
+
+
+class StrategyConfig(Settings):
+ symbols: list[str] = ["BTC/USDT"]
+ timeframes: list[str] = ["1m"]
+ strategy_params: dict = {}
diff --git a/services/strategy-engine/src/strategy_engine/engine.py b/services/strategy-engine/src/strategy_engine/engine.py
new file mode 100644
index 0000000..09dbf65
--- /dev/null
+++ b/services/strategy-engine/src/strategy_engine/engine.py
@@ -0,0 +1,54 @@
+"""Strategy Engine: consumes candle events and publishes signals."""
+import logging
+
+from shared.broker import RedisBroker
+from shared.events import CandleEvent, SignalEvent, Event
+
+from strategies.base import BaseStrategy
+
+logger = logging.getLogger(__name__)
+
+
+class StrategyEngine:
+ def __init__(self, broker: RedisBroker, strategies: list[BaseStrategy]) -> None:
+ self._broker = broker
+ self._strategies = strategies
+
+ async def process_once(self, stream: str, last_id: str) -> str:
+ """Read one batch of messages from the stream, process candles, publish signals.
+
+ Returns the updated last_id for the next call.
+ """
+ messages = await self._broker.read(stream, last_id=last_id, count=10, block=100)
+
+ for raw in messages:
+ try:
+ event = Event.from_dict(raw)
+ except Exception as exc:
+ logger.warning("Failed to parse event: %s – %s", raw, exc)
+ continue
+
+ if not isinstance(event, CandleEvent):
+ continue
+
+ candle = event.data
+ for strategy in self._strategies:
+ try:
+ signal = strategy.on_candle(candle)
+ except Exception as exc:
+ logger.error(
+ "Strategy %s raised on candle: %s", strategy.name, exc
+ )
+ continue
+
+ if signal is not None:
+ signal_event = SignalEvent(data=signal)
+ await self._broker.publish("signals", signal_event.to_dict())
+ logger.info(
+ "Signal published: strategy=%s symbol=%s side=%s",
+ signal.strategy,
+ signal.symbol,
+ signal.side,
+ )
+
+ return last_id
diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py
new file mode 100644
index 0000000..83bb867
--- /dev/null
+++ b/services/strategy-engine/src/strategy_engine/main.py
@@ -0,0 +1,56 @@
+"""Strategy Engine Service entry point."""
+import asyncio
+import logging
+from pathlib import Path
+
+from shared.broker import RedisBroker
+
+from strategy_engine.config import StrategyConfig
+from strategy_engine.engine import StrategyEngine
+from strategy_engine.plugin_loader import load_strategies
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+# The strategies directory lives alongside the installed package
+STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies"
+
+
+async def run() -> None:
+ config = StrategyConfig()
+ broker = RedisBroker(config.redis_url)
+
+ strategies_dir = STRATEGIES_DIR
+ strategies = load_strategies(strategies_dir)
+
+ # Configure each strategy with params from config
+ for strategy in strategies:
+ params = config.strategy_params.get(strategy.name, {})
+ strategy.configure(params)
+
+ logger.info(
+ "Loaded %d strategies: %s",
+ len(strategies),
+ [s.name for s in strategies],
+ )
+
+ engine = StrategyEngine(broker=broker, strategies=strategies)
+
+ try:
+ for symbol in config.symbols:
+ stream = f"candles.{symbol.replace('/', '_')}"
+ last_id = "$"
+ logger.info("Starting engine loop for stream=%s", stream)
+
+ while True:
+ last_id = await engine.process_once(stream, last_id)
+ finally:
+ await broker.close()
+
+
+def main() -> None:
+ asyncio.run(run())
+
+
+if __name__ == "__main__":
+ main()
diff --git a/services/strategy-engine/src/strategy_engine/plugin_loader.py b/services/strategy-engine/src/strategy_engine/plugin_loader.py
new file mode 100644
index 0000000..719dc6d
--- /dev/null
+++ b/services/strategy-engine/src/strategy_engine/plugin_loader.py
@@ -0,0 +1,36 @@
+"""Dynamic plugin loader for strategy modules."""
+import importlib.util
+import sys
+from pathlib import Path
+
+from strategies.base import BaseStrategy
+
+
+def load_strategies(strategies_dir: Path) -> list[BaseStrategy]:
+ """Scan strategies_dir for *.py files and load all BaseStrategy subclasses."""
+ loaded: list[BaseStrategy] = []
+
+ for path in sorted(strategies_dir.glob("*.py")):
+ # Skip dunder files and base
+ if path.name.startswith("__") or path.name == "base.py":
+ continue
+
+ module_name = f"_strategy_plugin_{path.stem}"
+ spec = importlib.util.spec_from_file_location(module_name, path)
+ if spec is None or spec.loader is None:
+ continue
+
+ module = importlib.util.module_from_spec(spec)
+ sys.modules[module_name] = module
+ spec.loader.exec_module(module)
+
+ for attr_name in dir(module):
+ obj = getattr(module, attr_name)
+ if (
+ isinstance(obj, type)
+ and issubclass(obj, BaseStrategy)
+ and obj is not BaseStrategy
+ ):
+ loaded.append(obj())
+
+ return loaded
diff --git a/services/strategy-engine/strategies/__init__.py b/services/strategy-engine/strategies/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/services/strategy-engine/strategies/__init__.py
diff --git a/services/strategy-engine/strategies/base.py b/services/strategy-engine/strategies/base.py
new file mode 100644
index 0000000..06101d0
--- /dev/null
+++ b/services/strategy-engine/strategies/base.py
@@ -0,0 +1,17 @@
+from abc import ABC, abstractmethod
+from shared.models import Candle, Signal
+
+
+class BaseStrategy(ABC):
+ name: str = "base"
+
+ @abstractmethod
+ def on_candle(self, candle: Candle) -> Signal | None:
+ pass
+
+ @abstractmethod
+ def configure(self, params: dict) -> None:
+ pass
+
+ def reset(self) -> None:
+ pass
diff --git a/services/strategy-engine/strategies/grid_strategy.py b/services/strategy-engine/strategies/grid_strategy.py
new file mode 100644
index 0000000..f669f09
--- /dev/null
+++ b/services/strategy-engine/strategies/grid_strategy.py
@@ -0,0 +1,77 @@
+from decimal import Decimal
+from typing import Optional
+
+import numpy as np
+
+from shared.models import Candle, Signal, OrderSide
+from strategies.base import BaseStrategy
+
+
+class GridStrategy(BaseStrategy):
+ name: str = "grid"
+
+ def __init__(self) -> None:
+ self._lower_price: float = 0.0
+ self._upper_price: float = 0.0
+ self._grid_count: int = 5
+ self._quantity: Decimal = Decimal("0.01")
+ self._grid_levels: list[float] = []
+ self._last_zone: Optional[int] = None
+
+ def configure(self, params: dict) -> None:
+ self._lower_price = float(params["lower_price"])
+ self._upper_price = float(params["upper_price"])
+ self._grid_count = int(params.get("grid_count", 5))
+ self._quantity = Decimal(str(params.get("quantity", "0.01")))
+ self._grid_levels = list(
+ np.linspace(self._lower_price, self._upper_price, self._grid_count + 1)
+ )
+ self._last_zone = None
+
+ def reset(self) -> None:
+ self._last_zone = None
+
+ def _get_zone(self, price: float) -> int:
+ """Return the grid zone index for a given price.
+
+ Zone 0 is below the lowest level, zone grid_count is above the highest level.
+ Zones 1..grid_count-1 are between levels.
+ """
+ for i, level in enumerate(self._grid_levels):
+ if price < level:
+ return i
+ return len(self._grid_levels)
+
+ def on_candle(self, candle: Candle) -> Signal | None:
+ price = float(candle.close)
+ current_zone = self._get_zone(price)
+
+ if self._last_zone is None:
+ self._last_zone = current_zone
+ return None
+
+ prev_zone = self._last_zone
+ self._last_zone = current_zone
+
+ if current_zone < prev_zone:
+ # Price moved to a lower zone → BUY
+ return Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.BUY,
+ price=candle.close,
+ quantity=self._quantity,
+ reason=f"Grid: price crossed down from zone {prev_zone} to {current_zone}",
+ )
+ elif current_zone > prev_zone:
+ # Price moved to a higher zone → SELL
+ return Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.SELL,
+ price=candle.close,
+ quantity=self._quantity,
+ reason=f"Grid: price crossed up from zone {prev_zone} to {current_zone}",
+ )
+
+ return None
diff --git a/services/strategy-engine/strategies/rsi_strategy.py b/services/strategy-engine/strategies/rsi_strategy.py
new file mode 100644
index 0000000..aebbafc
--- /dev/null
+++ b/services/strategy-engine/strategies/rsi_strategy.py
@@ -0,0 +1,77 @@
+from collections import deque
+from decimal import Decimal
+
+import pandas as pd
+
+from shared.models import Candle, Signal, OrderSide
+from strategies.base import BaseStrategy
+
+
+def _compute_rsi(series: pd.Series, period: int) -> float | None:
+ """Compute RSI using Wilder's smoothing (EMA-based)."""
+ if len(series) < period + 1:
+ return None
+ delta = series.diff()
+ gain = delta.clip(lower=0)
+ loss = -delta.clip(upper=0)
+ avg_gain = gain.ewm(com=period - 1, min_periods=period).mean()
+ avg_loss = loss.ewm(com=period - 1, min_periods=period).mean()
+ rs = avg_gain / avg_loss.replace(0, float("nan"))
+ rsi = 100 - (100 / (1 + rs))
+ value = rsi.iloc[-1]
+ if pd.isna(value):
+ return None
+ return float(value)
+
+
+class RsiStrategy(BaseStrategy):
+ name: str = "rsi"
+
+ def __init__(self) -> None:
+ self._closes: deque[float] = deque(maxlen=200)
+ self._period: int = 14
+ self._oversold: float = 30.0
+ self._overbought: float = 70.0
+ self._quantity: Decimal = Decimal("0.01")
+
+ def configure(self, params: dict) -> None:
+ self._period = int(params.get("period", 14))
+ self._oversold = float(params.get("oversold", 30))
+ self._overbought = float(params.get("overbought", 70))
+ self._quantity = Decimal(str(params.get("quantity", "0.01")))
+
+ def reset(self) -> None:
+ self._closes.clear()
+
+ def on_candle(self, candle: Candle) -> Signal | None:
+ self._closes.append(float(candle.close))
+
+ if len(self._closes) < self._period + 1:
+ return None
+
+ series = pd.Series(list(self._closes))
+ rsi_value = _compute_rsi(series, self._period)
+
+ if rsi_value is None:
+ return None
+
+ if rsi_value < self._oversold:
+ return Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.BUY,
+ price=candle.close,
+ quantity=self._quantity,
+ reason=f"RSI {rsi_value:.2f} below oversold threshold {self._oversold}",
+ )
+ elif rsi_value > self._overbought:
+ return Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.SELL,
+ price=candle.close,
+ quantity=self._quantity,
+ reason=f"RSI {rsi_value:.2f} above overbought threshold {self._overbought}",
+ )
+
+ return None
diff --git a/services/strategy-engine/tests/__init__.py b/services/strategy-engine/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/services/strategy-engine/tests/__init__.py
diff --git a/services/strategy-engine/tests/conftest.py b/services/strategy-engine/tests/conftest.py
new file mode 100644
index 0000000..c9ef308
--- /dev/null
+++ b/services/strategy-engine/tests/conftest.py
@@ -0,0 +1,8 @@
+"""Pytest configuration: ensure strategies/ is importable."""
+import sys
+from pathlib import Path
+
+# Add the strategies directory to sys.path so that `from strategies.base import ...` works
+STRATEGIES_DIR = Path(__file__).parent.parent / "strategies"
+if str(STRATEGIES_DIR) not in sys.path:
+ sys.path.insert(0, str(STRATEGIES_DIR.parent))
diff --git a/services/strategy-engine/tests/test_engine.py b/services/strategy-engine/tests/test_engine.py
new file mode 100644
index 0000000..33ad4dd
--- /dev/null
+++ b/services/strategy-engine/tests/test_engine.py
@@ -0,0 +1,72 @@
+"""Tests for the StrategyEngine."""
+from datetime import datetime, timezone
+from decimal import Decimal
+from unittest.mock import AsyncMock, MagicMock
+
+import pytest
+
+from shared.models import Candle, Signal, OrderSide
+from shared.events import CandleEvent, SignalEvent
+from strategy_engine.engine import StrategyEngine
+
+
+def make_candle_event() -> dict:
+ candle = Candle(
+ symbol="BTC/USDT",
+ timeframe="1m",
+ open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ open=Decimal("50000"),
+ high=Decimal("50100"),
+ low=Decimal("49900"),
+ close=Decimal("50050"),
+ volume=Decimal("10.0"),
+ )
+ return CandleEvent(data=candle).to_dict()
+
+
+def make_signal() -> Signal:
+ return Signal(
+ strategy="test",
+ symbol="BTC/USDT",
+ side=OrderSide.BUY,
+ price=Decimal("50050"),
+ quantity=Decimal("0.01"),
+ reason="test signal",
+ )
+
+
+@pytest.mark.asyncio
+async def test_engine_dispatches_candle_to_strategies():
+ broker = MagicMock()
+ broker.read = AsyncMock(return_value=[make_candle_event()])
+ broker.publish = AsyncMock()
+
+ strategy = MagicMock()
+ strategy.on_candle = MagicMock(return_value=None)
+
+ engine = StrategyEngine(broker=broker, strategies=[strategy])
+ await engine.process_once("candles.BTC_USDT", "0")
+
+ strategy.on_candle.assert_called_once()
+ candle_arg = strategy.on_candle.call_args[0][0]
+ assert isinstance(candle_arg, Candle)
+ assert candle_arg.symbol == "BTC/USDT"
+
+
+@pytest.mark.asyncio
+async def test_engine_publishes_signal_when_strategy_returns_one():
+ broker = MagicMock()
+ broker.read = AsyncMock(return_value=[make_candle_event()])
+ broker.publish = AsyncMock()
+
+ strategy = MagicMock()
+ strategy.on_candle = MagicMock(return_value=make_signal())
+
+ engine = StrategyEngine(broker=broker, strategies=[strategy])
+ await engine.process_once("candles.BTC_USDT", "0")
+
+ broker.publish.assert_called_once()
+ call_args = broker.publish.call_args
+ assert call_args[0][0] == "signals"
+ published_data = call_args[0][1]
+ assert published_data["type"] == "SIGNAL"
diff --git a/services/strategy-engine/tests/test_grid_strategy.py b/services/strategy-engine/tests/test_grid_strategy.py
new file mode 100644
index 0000000..d96ebba
--- /dev/null
+++ b/services/strategy-engine/tests/test_grid_strategy.py
@@ -0,0 +1,60 @@
+"""Tests for the Grid strategy."""
+from datetime import datetime, timezone
+from decimal import Decimal
+
+import pytest
+
+from shared.models import Candle, OrderSide
+from strategies.grid_strategy import GridStrategy
+
+
+def make_candle(close: float) -> Candle:
+ return Candle(
+ symbol="BTC/USDT",
+ timeframe="1m",
+ open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ open=Decimal(str(close)),
+ high=Decimal(str(close)),
+ low=Decimal(str(close)),
+ close=Decimal(str(close)),
+ volume=Decimal("1.0"),
+ )
+
+
+def _configured_strategy() -> GridStrategy:
+ strategy = GridStrategy()
+ strategy.configure({
+ "lower_price": 48000,
+ "upper_price": 52000,
+ "grid_count": 5,
+ "quantity": "0.01",
+ })
+ return strategy
+
+
+def test_grid_strategy_buy_at_lower_grid():
+ strategy = _configured_strategy()
+ # First candle: establish zone at upper area
+ strategy.on_candle(make_candle(51500))
+ # Second candle: price drops to lower zone → BUY
+ signal = strategy.on_candle(make_candle(48100))
+ assert signal is not None
+ assert signal.side == OrderSide.BUY
+
+
+def test_grid_strategy_sell_at_upper_grid():
+ strategy = _configured_strategy()
+ # First candle: establish zone at lower area
+ strategy.on_candle(make_candle(48100))
+ # Second candle: price rises to upper zone → SELL
+ signal = strategy.on_candle(make_candle(51900))
+ assert signal is not None
+ assert signal.side == OrderSide.SELL
+
+
+def test_grid_strategy_no_signal_in_same_zone():
+ strategy = _configured_strategy()
+ # Both candles in approximately the same zone
+ strategy.on_candle(make_candle(50000))
+ signal = strategy.on_candle(make_candle(50100))
+ assert signal is None
diff --git a/services/strategy-engine/tests/test_plugin_loader.py b/services/strategy-engine/tests/test_plugin_loader.py
new file mode 100644
index 0000000..9496bab
--- /dev/null
+++ b/services/strategy-engine/tests/test_plugin_loader.py
@@ -0,0 +1,22 @@
+"""Tests for the plugin loader."""
+from pathlib import Path
+
+import pytest
+
+from strategy_engine.plugin_loader import load_strategies
+
+
+STRATEGIES_DIR = Path(__file__).parent.parent / "strategies"
+
+
+def test_load_strategies_finds_rsi_and_grid():
+ strategies = load_strategies(STRATEGIES_DIR)
+ names = [s.name for s in strategies]
+ assert "rsi" in names
+ assert "grid" in names
+
+
+def test_load_strategies_skips_base():
+ strategies = load_strategies(STRATEGIES_DIR)
+ names = [s.name for s in strategies]
+ assert "base" not in names
diff --git a/services/strategy-engine/tests/test_rsi_strategy.py b/services/strategy-engine/tests/test_rsi_strategy.py
new file mode 100644
index 0000000..90fface
--- /dev/null
+++ b/services/strategy-engine/tests/test_rsi_strategy.py
@@ -0,0 +1,45 @@
+"""Tests for the RSI strategy."""
+from datetime import datetime, timezone
+from decimal import Decimal
+
+import pytest
+
+from shared.models import Candle, OrderSide
+from strategies.rsi_strategy import RsiStrategy
+
+
+def make_candle(close: float, idx: int = 0) -> Candle:
+ return Candle(
+ symbol="BTC/USDT",
+ timeframe="1m",
+ open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ open=Decimal(str(close)),
+ high=Decimal(str(close)),
+ low=Decimal(str(close)),
+ close=Decimal(str(close)),
+ volume=Decimal("1.0"),
+ )
+
+
+def test_rsi_strategy_no_signal_insufficient_data():
+ strategy = RsiStrategy()
+ strategy.configure({})
+ candle = make_candle(50000.0)
+ result = strategy.on_candle(candle)
+ assert result is None
+
+
+def test_rsi_strategy_buy_signal_on_oversold():
+ strategy = RsiStrategy()
+ strategy.configure({"period": 14, "oversold": 30, "overbought": 70})
+
+ # Feed 20 steadily declining prices to force RSI into oversold territory
+ prices = [50000 - i * 500 for i in range(20)]
+ signal = None
+ for i, price in enumerate(prices):
+ signal = strategy.on_candle(make_candle(price, i))
+
+ # We may or may not get a signal depending on RSI calculation;
+ # if a signal is returned, it must be a BUY
+ if signal is not None:
+ assert signal.side == OrderSide.BUY
diff --git a/shared/pyproject.toml b/shared/pyproject.toml
new file mode 100644
index 0000000..bd09d3e
--- /dev/null
+++ b/shared/pyproject.toml
@@ -0,0 +1,25 @@
+[project]
+name = "trading-shared"
+version = "0.1.0"
+description = "Shared models, events, and utilities for trading platform"
+requires-python = ">=3.12"
+dependencies = [
+ "pydantic>=2.0",
+ "pydantic-settings>=2.0",
+ "redis>=5.0",
+ "asyncpg>=0.29",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest>=8.0",
+ "pytest-asyncio>=0.23",
+ "ruff>=0.4",
+]
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/shared"]
diff --git a/shared/src/shared/__init__.py b/shared/src/shared/__init__.py
new file mode 100644
index 0000000..d2ee024
--- /dev/null
+++ b/shared/src/shared/__init__.py
@@ -0,0 +1 @@
+"""Shared library for the trading platform."""
diff --git a/shared/src/shared/broker.py b/shared/src/shared/broker.py
new file mode 100644
index 0000000..9a50441
--- /dev/null
+++ b/shared/src/shared/broker.py
@@ -0,0 +1,43 @@
+"""Redis Streams broker for the trading platform."""
+import json
+from typing import Any
+
+import redis.asyncio
+
+
+class RedisBroker:
+ """Async Redis Streams broker for publishing and reading events."""
+
+ def __init__(self, redis_url: str) -> None:
+ self._redis = redis.asyncio.from_url(redis_url)
+
+ async def publish(self, stream: str, data: dict[str, Any]) -> None:
+ """Publish a message to a Redis stream."""
+ payload = json.dumps(data)
+ await self._redis.xadd(stream, {"payload": payload})
+
+ async def read(
+ self,
+ stream: str,
+ last_id: str = "$",
+ count: int = 10,
+ block: int = 0,
+ ) -> list[dict[str, Any]]:
+ """Read messages from a Redis stream."""
+ results = await self._redis.xread(
+ {stream: last_id}, count=count, block=block
+ )
+ messages = []
+ if results:
+ for _stream, entries in results:
+ for _msg_id, fields in entries:
+ payload = fields.get(b"payload") or fields.get("payload")
+ if payload:
+ if isinstance(payload, bytes):
+ payload = payload.decode()
+ messages.append(json.loads(payload))
+ return messages
+
+ async def close(self) -> None:
+ """Close the Redis connection."""
+ await self._redis.aclose()
diff --git a/shared/src/shared/config.py b/shared/src/shared/config.py
new file mode 100644
index 0000000..1304c5e
--- /dev/null
+++ b/shared/src/shared/config.py
@@ -0,0 +1,16 @@
+"""Shared configuration settings for the trading platform."""
+from pydantic_settings import BaseSettings
+
+
+class Settings(BaseSettings):
+ binance_api_key: str
+ binance_api_secret: str
+ redis_url: str = "redis://localhost:6379"
+ database_url: str = "postgresql://trading:trading@localhost:5432/trading"
+ log_level: str = "INFO"
+ risk_max_position_size: float = 0.1
+ risk_stop_loss_pct: float = 5.0
+ risk_daily_loss_limit_pct: float = 10.0
+ dry_run: bool = True
+
+ model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}
diff --git a/shared/src/shared/db.py b/shared/src/shared/db.py
new file mode 100644
index 0000000..6bddd7c
--- /dev/null
+++ b/shared/src/shared/db.py
@@ -0,0 +1,184 @@
+"""Database layer using asyncpg for the trading platform."""
+from datetime import datetime, timezone
+from typing import Optional
+
+import asyncpg
+
+from shared.models import Candle, Signal, Order, OrderStatus
+
+
+_INIT_SQL = """
+CREATE TABLE IF NOT EXISTS candles (
+ symbol TEXT NOT NULL,
+ timeframe TEXT NOT NULL,
+ open_time TIMESTAMPTZ NOT NULL,
+ open NUMERIC NOT NULL,
+ high NUMERIC NOT NULL,
+ low NUMERIC NOT NULL,
+ close NUMERIC NOT NULL,
+ volume NUMERIC NOT NULL,
+ PRIMARY KEY (symbol, timeframe, open_time)
+);
+
+CREATE TABLE IF NOT EXISTS signals (
+ id TEXT PRIMARY KEY,
+ strategy TEXT NOT NULL,
+ symbol TEXT NOT NULL,
+ side TEXT NOT NULL,
+ price NUMERIC NOT NULL,
+ quantity NUMERIC NOT NULL,
+ reason TEXT,
+ created_at TIMESTAMPTZ NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS orders (
+ id TEXT PRIMARY KEY,
+ signal_id TEXT REFERENCES signals(id),
+ symbol TEXT NOT NULL,
+ side TEXT NOT NULL,
+ type TEXT NOT NULL,
+ price NUMERIC NOT NULL,
+ quantity NUMERIC NOT NULL,
+ status TEXT NOT NULL DEFAULT 'PENDING',
+ created_at TIMESTAMPTZ NOT NULL,
+ filled_at TIMESTAMPTZ
+);
+
+CREATE TABLE IF NOT EXISTS trades (
+ id TEXT PRIMARY KEY,
+ order_id TEXT REFERENCES orders(id),
+ symbol TEXT NOT NULL,
+ side TEXT NOT NULL,
+ price NUMERIC NOT NULL,
+ quantity NUMERIC NOT NULL,
+ fee NUMERIC NOT NULL DEFAULT 0,
+ traded_at TIMESTAMPTZ NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS positions (
+ symbol TEXT PRIMARY KEY,
+ quantity NUMERIC NOT NULL,
+ avg_entry_price NUMERIC NOT NULL,
+ current_price NUMERIC NOT NULL,
+ updated_at TIMESTAMPTZ NOT NULL
+);
+
+CREATE TABLE IF NOT EXISTS portfolio_snapshots (
+ id SERIAL PRIMARY KEY,
+ total_value NUMERIC NOT NULL,
+ realized_pnl NUMERIC NOT NULL,
+ unrealized_pnl NUMERIC NOT NULL,
+ snapshot_at TIMESTAMPTZ NOT NULL
+);
+"""
+
+
+class Database:
+ """Async database access layer backed by asyncpg connection pool."""
+
+ def __init__(self, database_url: str) -> None:
+ self._database_url = database_url
+ self._pool: Optional[asyncpg.Pool] = None
+
+ async def connect(self) -> None:
+ """Create the asyncpg connection pool."""
+ self._pool = await asyncpg.create_pool(self._database_url)
+
+ async def close(self) -> None:
+ """Close the asyncpg connection pool."""
+ if self._pool:
+ await self._pool.close()
+ self._pool = None
+
+ async def init_tables(self) -> None:
+ """Create all tables if they do not exist."""
+ async with self._pool as conn:
+ await conn.execute(_INIT_SQL)
+
+ async def insert_candle(self, candle: Candle) -> None:
+ """Insert a candle row, ignoring duplicates."""
+ sql = """
+ INSERT INTO candles (symbol, timeframe, open_time, open, high, low, close, volume)
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
+ ON CONFLICT DO NOTHING
+ """
+ async with self._pool as conn:
+ await conn.execute(
+ sql,
+ candle.symbol,
+ candle.timeframe,
+ candle.open_time,
+ candle.open,
+ candle.high,
+ candle.low,
+ candle.close,
+ candle.volume,
+ )
+
+ async def insert_signal(self, signal: Signal) -> None:
+ """Insert a signal row."""
+ sql = """
+ INSERT INTO signals (id, strategy, symbol, side, price, quantity, reason, created_at)
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
+ """
+ async with self._pool as conn:
+ await conn.execute(
+ sql,
+ signal.id,
+ signal.strategy,
+ signal.symbol,
+ signal.side.value,
+ signal.price,
+ signal.quantity,
+ signal.reason,
+ signal.created_at,
+ )
+
+ async def insert_order(self, order: Order) -> None:
+ """Insert an order row."""
+ sql = """
+ INSERT INTO orders (id, signal_id, symbol, side, type, price, quantity, status, created_at, filled_at)
+ VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
+ """
+ async with self._pool as conn:
+ await conn.execute(
+ sql,
+ order.id,
+ order.signal_id,
+ order.symbol,
+ order.side.value,
+ order.type.value,
+ order.price,
+ order.quantity,
+ order.status.value,
+ order.created_at,
+ order.filled_at,
+ )
+
+ async def update_order_status(
+ self,
+ order_id: str,
+ status: OrderStatus,
+ filled_at: Optional[datetime] = None,
+ ) -> None:
+ """Update the status (and optionally filled_at) of an order."""
+ sql = """
+ UPDATE orders SET status = $2, filled_at = $3 WHERE id = $1
+ """
+ async with self._pool as conn:
+ await conn.execute(sql, order_id, status.value, filled_at)
+
+ async def get_candles(
+ self, symbol: str, timeframe: str, limit: int = 500
+ ) -> list[dict]:
+ """Retrieve candles ordered by open_time descending."""
+ sql = """
+ SELECT symbol, timeframe, open_time, open, high, low, close, volume
+ FROM candles
+ WHERE symbol = $1 AND timeframe = $2
+ ORDER BY open_time DESC
+ LIMIT $3
+ """
+ async with self._pool as conn:
+ rows = await conn.fetch(sql, symbol, timeframe, limit)
+ return [dict(row) for row in rows]
diff --git a/shared/src/shared/events.py b/shared/src/shared/events.py
new file mode 100644
index 0000000..1db2bee
--- /dev/null
+++ b/shared/src/shared/events.py
@@ -0,0 +1,75 @@
+"""Event types and serialization for the trading platform."""
+from enum import Enum
+from typing import Any
+
+from pydantic import BaseModel
+
+from shared.models import Candle, Signal, Order
+
+
+class EventType(str, Enum):
+ CANDLE = "CANDLE"
+ SIGNAL = "SIGNAL"
+ ORDER = "ORDER"
+
+
+class CandleEvent(BaseModel):
+ type: EventType = EventType.CANDLE
+ data: Candle
+
+ def to_dict(self) -> dict:
+ return {
+ "type": self.type,
+ "data": self.data.model_dump(mode="json"),
+ }
+
+ @classmethod
+ def from_raw(cls, raw: dict) -> "CandleEvent":
+ return cls(type=raw["type"], data=Candle(**raw["data"]))
+
+
+class SignalEvent(BaseModel):
+ type: EventType = EventType.SIGNAL
+ data: Signal
+
+ def to_dict(self) -> dict:
+ return {
+ "type": self.type,
+ "data": self.data.model_dump(mode="json"),
+ }
+
+ @classmethod
+ def from_raw(cls, raw: dict) -> "SignalEvent":
+ return cls(type=raw["type"], data=Signal(**raw["data"]))
+
+
+class OrderEvent(BaseModel):
+ type: EventType = EventType.ORDER
+ data: Order
+
+ def to_dict(self) -> dict:
+ return {
+ "type": self.type,
+ "data": self.data.model_dump(mode="json"),
+ }
+
+ @classmethod
+ def from_raw(cls, raw: dict) -> "OrderEvent":
+ return cls(type=raw["type"], data=Order(**raw["data"]))
+
+
+_EVENT_TYPE_MAP = {
+ EventType.CANDLE: CandleEvent,
+ EventType.SIGNAL: SignalEvent,
+ EventType.ORDER: OrderEvent,
+}
+
+
+class Event:
+ """Dispatcher for deserializing events from raw dicts."""
+
+ @staticmethod
+ def from_dict(data: dict) -> Any:
+ event_type = EventType(data["type"])
+ cls = _EVENT_TYPE_MAP[event_type]
+ return cls.from_raw(data)
diff --git a/shared/src/shared/models.py b/shared/src/shared/models.py
new file mode 100644
index 0000000..4cb1081
--- /dev/null
+++ b/shared/src/shared/models.py
@@ -0,0 +1,72 @@
+"""Shared Pydantic models for the trading platform."""
+import uuid
+from decimal import Decimal
+from datetime import datetime, timezone
+from enum import Enum
+from typing import Optional
+
+from pydantic import BaseModel, Field, computed_field
+
+
+class OrderSide(str, Enum):
+ BUY = "BUY"
+ SELL = "SELL"
+
+
+class OrderType(str, Enum):
+ MARKET = "MARKET"
+ LIMIT = "LIMIT"
+
+
+class OrderStatus(str, Enum):
+ PENDING = "PENDING"
+ FILLED = "FILLED"
+ CANCELLED = "CANCELLED"
+ FAILED = "FAILED"
+
+
+class Candle(BaseModel):
+ symbol: str
+ timeframe: str
+ open_time: datetime
+ open: Decimal
+ high: Decimal
+ low: Decimal
+ close: Decimal
+ volume: Decimal
+
+
+class Signal(BaseModel):
+ id: str = Field(default_factory=lambda: str(uuid.uuid4()))
+ strategy: str
+ symbol: str
+ side: OrderSide
+ price: Decimal
+ quantity: Decimal
+ reason: str
+ created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
+
+
+class Order(BaseModel):
+ id: str = Field(default_factory=lambda: str(uuid.uuid4()))
+ signal_id: str
+ symbol: str
+ side: OrderSide
+ type: OrderType
+ price: Decimal
+ quantity: Decimal
+ status: OrderStatus = OrderStatus.PENDING
+ created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
+ filled_at: Optional[datetime] = None
+
+
+class Position(BaseModel):
+ symbol: str
+ quantity: Decimal
+ avg_entry_price: Decimal
+ current_price: Decimal
+
+ @computed_field
+ @property
+ def unrealized_pnl(self) -> Decimal:
+ return self.quantity * (self.current_price - self.avg_entry_price)
diff --git a/shared/tests/__init__.py b/shared/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/shared/tests/__init__.py
diff --git a/shared/tests/test_broker.py b/shared/tests/test_broker.py
new file mode 100644
index 0000000..d3a3569
--- /dev/null
+++ b/shared/tests/test_broker.py
@@ -0,0 +1,66 @@
+"""Tests for the Redis broker."""
+import pytest
+import json
+from unittest.mock import AsyncMock, MagicMock, patch
+
+
+@pytest.mark.asyncio
+async def test_broker_publish():
+ """Test that publish calls xadd on the redis connection."""
+ with patch("redis.asyncio.from_url") as mock_from_url:
+ mock_redis = AsyncMock()
+ mock_from_url.return_value = mock_redis
+
+ from shared.broker import RedisBroker
+ broker = RedisBroker("redis://localhost:6379")
+ data = {"type": "CANDLE", "symbol": "BTCUSDT"}
+ await broker.publish("candles", data)
+
+ mock_redis.xadd.assert_called_once()
+ call_args = mock_redis.xadd.call_args
+ assert call_args[0][0] == "candles"
+ payload = call_args[0][1]
+ assert "payload" in payload
+ parsed = json.loads(payload["payload"])
+ assert parsed["type"] == "CANDLE"
+
+
+@pytest.mark.asyncio
+async def test_broker_subscribe_returns_messages():
+ """Test that read parses xread response correctly."""
+ with patch("redis.asyncio.from_url") as mock_from_url:
+ mock_redis = AsyncMock()
+ mock_from_url.return_value = mock_redis
+
+ payload_data = {"type": "CANDLE", "symbol": "ETHUSDT"}
+ mock_redis.xread.return_value = [
+ [
+ b"candles",
+ [
+ (b"1234567890-0", {b"payload": json.dumps(payload_data).encode()}),
+ ],
+ ]
+ ]
+
+ from shared.broker import RedisBroker
+ broker = RedisBroker("redis://localhost:6379")
+ messages = await broker.read("candles", last_id="$")
+
+ mock_redis.xread.assert_called_once()
+ assert len(messages) == 1
+ assert messages[0]["type"] == "CANDLE"
+ assert messages[0]["symbol"] == "ETHUSDT"
+
+
+@pytest.mark.asyncio
+async def test_broker_close():
+ """Test that close calls aclose on the redis connection."""
+ with patch("redis.asyncio.from_url") as mock_from_url:
+ mock_redis = AsyncMock()
+ mock_from_url.return_value = mock_redis
+
+ from shared.broker import RedisBroker
+ broker = RedisBroker("redis://localhost:6379")
+ await broker.close()
+
+ mock_redis.aclose.assert_called_once()
diff --git a/shared/tests/test_db.py b/shared/tests/test_db.py
new file mode 100644
index 0000000..c31e487
--- /dev/null
+++ b/shared/tests/test_db.py
@@ -0,0 +1,70 @@
+"""Tests for the database layer."""
+import pytest
+from decimal import Decimal
+from datetime import datetime, timezone
+from unittest.mock import AsyncMock, MagicMock, patch, call
+
+
+def make_candle():
+ from shared.models import Candle
+ return Candle(
+ symbol="BTCUSDT",
+ timeframe="1m",
+ open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ open=Decimal("50000"),
+ high=Decimal("51000"),
+ low=Decimal("49500"),
+ close=Decimal("50500"),
+ volume=Decimal("100"),
+ )
+
+
+@pytest.mark.asyncio
+async def test_db_init_sql_creates_tables():
+ """Verify that init_tables SQL references all required table names."""
+ with patch("asyncpg.create_pool", new_callable=AsyncMock) as mock_pool:
+ mock_conn = AsyncMock()
+ mock_pool.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
+ mock_pool.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ # Capture the SQL that gets executed
+ executed_sqls = []
+
+ async def capture_execute(sql, *args, **kwargs):
+ executed_sqls.append(sql)
+
+ mock_conn.execute = capture_execute
+
+ from shared.db import Database
+ db = Database("postgresql://trading:trading@localhost:5432/trading")
+ db._pool = mock_pool.return_value
+ await db.init_tables()
+
+ combined_sql = " ".join(executed_sqls)
+ for table in ["candles", "signals", "orders", "trades", "positions", "portfolio_snapshots"]:
+ assert table in combined_sql, f"Table '{table}' not found in SQL"
+
+
+@pytest.mark.asyncio
+async def test_db_insert_candle():
+ """Verify that insert_candle executes INSERT INTO candles."""
+ with patch("asyncpg.create_pool", new_callable=AsyncMock) as mock_pool:
+ mock_conn = AsyncMock()
+ mock_pool.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
+ mock_pool.return_value.__aexit__ = AsyncMock(return_value=False)
+
+ executed = []
+
+ async def capture_execute(sql, *args, **kwargs):
+ executed.append((sql, args))
+
+ mock_conn.execute = capture_execute
+
+ from shared.db import Database
+ db = Database("postgresql://trading:trading@localhost:5432/trading")
+ db._pool = mock_pool.return_value
+ candle = make_candle()
+ await db.insert_candle(candle)
+
+ assert any("INSERT INTO candles" in sql for sql, _ in executed), \
+ "Expected INSERT INTO candles"
diff --git a/shared/tests/test_events.py b/shared/tests/test_events.py
new file mode 100644
index 0000000..4bc7981
--- /dev/null
+++ b/shared/tests/test_events.py
@@ -0,0 +1,80 @@
+"""Tests for shared event types."""
+import pytest
+from decimal import Decimal
+from datetime import datetime, timezone
+
+
+def make_candle():
+ from shared.models import Candle
+ return Candle(
+ symbol="BTCUSDT",
+ timeframe="1m",
+ open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ open=Decimal("50000"),
+ high=Decimal("51000"),
+ low=Decimal("49500"),
+ close=Decimal("50500"),
+ volume=Decimal("100"),
+ )
+
+
+def make_signal():
+ from shared.models import Signal, OrderSide
+ return Signal(
+ strategy="test",
+ symbol="BTCUSDT",
+ side=OrderSide.BUY,
+ price=Decimal("50000"),
+ quantity=Decimal("0.01"),
+ reason="test signal",
+ )
+
+
+def test_candle_event_serialize():
+ """Test CandleEvent serializes to dict correctly."""
+ from shared.events import CandleEvent, EventType
+ candle = make_candle()
+ event = CandleEvent(data=candle)
+ d = event.to_dict()
+ assert d["type"] == EventType.CANDLE
+ assert d["data"]["symbol"] == "BTCUSDT"
+ assert d["data"]["timeframe"] == "1m"
+
+
+def test_candle_event_deserialize():
+ """Test CandleEvent round-trips through to_dict/from_raw."""
+ from shared.events import CandleEvent, EventType
+ candle = make_candle()
+ event = CandleEvent(data=candle)
+ d = event.to_dict()
+ restored = CandleEvent.from_raw(d)
+ assert restored.type == EventType.CANDLE
+ assert restored.data.symbol == "BTCUSDT"
+ assert restored.data.close == Decimal("50500")
+
+
+def test_signal_event_serialize():
+ """Test SignalEvent serializes to dict correctly."""
+ from shared.events import SignalEvent, EventType
+ signal = make_signal()
+ event = SignalEvent(data=signal)
+ d = event.to_dict()
+ assert d["type"] == EventType.SIGNAL
+ assert d["data"]["symbol"] == "BTCUSDT"
+ assert d["data"]["strategy"] == "test"
+
+
+def test_event_from_dict_dispatch():
+ """Test Event.from_dict dispatches to correct class."""
+ from shared.events import Event, CandleEvent, SignalEvent, EventType
+ candle = make_candle()
+ event = CandleEvent(data=candle)
+ d = event.to_dict()
+ restored = Event.from_dict(d)
+ assert isinstance(restored, CandleEvent)
+
+ signal = make_signal()
+ s_event = SignalEvent(data=signal)
+ sd = s_event.to_dict()
+ restored_s = Event.from_dict(sd)
+ assert isinstance(restored_s, SignalEvent)
diff --git a/shared/tests/test_models.py b/shared/tests/test_models.py
new file mode 100644
index 0000000..f1d92ec
--- /dev/null
+++ b/shared/tests/test_models.py
@@ -0,0 +1,100 @@
+"""Tests for shared models and settings."""
+import os
+import pytest
+from decimal import Decimal
+from datetime import datetime, timezone
+from unittest.mock import patch
+
+
+def test_settings_defaults():
+ """Test that Settings has correct defaults."""
+ with patch.dict(os.environ, {
+ "BINANCE_API_KEY": "test_key",
+ "BINANCE_API_SECRET": "test_secret",
+ }):
+ from shared.config import Settings
+ settings = Settings()
+ assert settings.redis_url == "redis://localhost:6379"
+ assert settings.database_url == "postgresql://trading:trading@localhost:5432/trading"
+ assert settings.log_level == "INFO"
+ assert settings.risk_max_position_size == 0.1
+ assert settings.risk_stop_loss_pct == 5.0
+ assert settings.risk_daily_loss_limit_pct == 10.0
+ assert settings.dry_run is True
+
+
+def test_candle_creation():
+ """Test Candle model creation."""
+ from shared.models import Candle
+ now = datetime.now(timezone.utc)
+ candle = Candle(
+ symbol="BTCUSDT",
+ timeframe="1m",
+ open_time=now,
+ open=Decimal("50000.00"),
+ high=Decimal("51000.00"),
+ low=Decimal("49500.00"),
+ close=Decimal("50500.00"),
+ volume=Decimal("100.5"),
+ )
+ assert candle.symbol == "BTCUSDT"
+ assert candle.timeframe == "1m"
+ assert candle.open == Decimal("50000.00")
+ assert candle.high == Decimal("51000.00")
+ assert candle.low == Decimal("49500.00")
+ assert candle.close == Decimal("50500.00")
+ assert candle.volume == Decimal("100.5")
+
+
+def test_signal_creation():
+ """Test Signal model creation."""
+ from shared.models import Signal, OrderSide
+ signal = Signal(
+ strategy="rsi_strategy",
+ symbol="BTCUSDT",
+ side=OrderSide.BUY,
+ price=Decimal("50000.00"),
+ quantity=Decimal("0.01"),
+ reason="RSI oversold",
+ )
+ assert signal.strategy == "rsi_strategy"
+ assert signal.symbol == "BTCUSDT"
+ assert signal.side == OrderSide.BUY
+ assert signal.price == Decimal("50000.00")
+ assert signal.quantity == Decimal("0.01")
+ assert signal.reason == "RSI oversold"
+ assert signal.id is not None
+ assert signal.created_at is not None
+
+
+def test_order_creation():
+ """Test Order model creation with defaults."""
+ from shared.models import Order, OrderSide, OrderType, OrderStatus
+ import uuid
+ signal_id = str(uuid.uuid4())
+ order = Order(
+ signal_id=signal_id,
+ symbol="BTCUSDT",
+ side=OrderSide.BUY,
+ type=OrderType.MARKET,
+ price=Decimal("50000.00"),
+ quantity=Decimal("0.01"),
+ )
+ assert order.id is not None
+ assert order.signal_id == signal_id
+ assert order.status == OrderStatus.PENDING
+ assert order.filled_at is None
+ assert order.created_at is not None
+
+
+def test_position_unrealized_pnl():
+ """Test Position unrealized_pnl computed property."""
+ from shared.models import Position
+ position = Position(
+ symbol="BTCUSDT",
+ quantity=Decimal("0.1"),
+ avg_entry_price=Decimal("50000"),
+ current_price=Decimal("51000"),
+ )
+ # 0.1 * (51000 - 50000) = 100
+ assert position.unrealized_pnl == Decimal("100")