diff options
| author | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-01 15:56:35 +0900 |
|---|---|---|
| committer | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-01 15:56:35 +0900 |
| commit | 33b14aaa2344b0fd95d1629627c3d135b24ae102 (patch) | |
| tree | 90b214758bc3b076baa7711226a1a1be6268e72e /services/order-executor | |
| parent | 9360f1a800aa29b40399a2f3bfbfcf215a04e279 (diff) | |
feat: initial trading platform implementation
Binance spot crypto trading platform with microservices architecture:
- shared: Pydantic models, Redis Streams broker, asyncpg DB layer
- data-collector: Binance WebSocket/REST market data collection
- strategy-engine: Plugin-based strategy execution (RSI, Grid)
- order-executor: Order execution with risk management
- portfolio-manager: Position tracking and PnL calculation
- backtester: Historical strategy testing with simulator
- cli: Click-based CLI for all operations
- Docker Compose orchestration with Redis and PostgreSQL
- 24 test files covering all modules
Diffstat (limited to 'services/order-executor')
| -rw-r--r-- | services/order-executor/Dockerfile | 7 | ||||
| -rw-r--r-- | services/order-executor/pyproject.toml | 16 | ||||
| -rw-r--r-- | services/order-executor/src/order_executor/__init__.py | 0 | ||||
| -rw-r--r-- | services/order-executor/src/order_executor/config.py | 6 | ||||
| -rw-r--r-- | services/order-executor/src/order_executor/executor.py | 100 | ||||
| -rw-r--r-- | services/order-executor/src/order_executor/main.py | 83 | ||||
| -rw-r--r-- | services/order-executor/src/order_executor/risk_manager.py | 55 | ||||
| -rw-r--r-- | services/order-executor/tests/__init__.py | 0 | ||||
| -rw-r--r-- | services/order-executor/tests/test_executor.py | 122 | ||||
| -rw-r--r-- | services/order-executor/tests/test_risk_manager.py | 72 |
10 files changed, 461 insertions, 0 deletions
diff --git a/services/order-executor/Dockerfile b/services/order-executor/Dockerfile new file mode 100644 index 0000000..f044714 --- /dev/null +++ b/services/order-executor/Dockerfile @@ -0,0 +1,7 @@ +FROM python:3.12-slim +WORKDIR /app +COPY shared/ shared/ +RUN pip install --no-cache-dir ./shared +COPY services/order-executor/ services/order-executor/ +RUN pip install --no-cache-dir ./services/order-executor +CMD ["python", "-m", "order_executor.main"] diff --git a/services/order-executor/pyproject.toml b/services/order-executor/pyproject.toml new file mode 100644 index 0000000..eed4fef --- /dev/null +++ b/services/order-executor/pyproject.toml @@ -0,0 +1,16 @@ +[project] +name = "order-executor" +version = "0.1.0" +description = "Order execution service with risk management" +requires-python = ">=3.12" +dependencies = ["ccxt>=4.0", "trading-shared"] + +[project.optional-dependencies] +dev = ["pytest>=8.0", "pytest-asyncio>=0.23"] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/order_executor"] diff --git a/services/order-executor/src/order_executor/__init__.py b/services/order-executor/src/order_executor/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/services/order-executor/src/order_executor/__init__.py diff --git a/services/order-executor/src/order_executor/config.py b/services/order-executor/src/order_executor/config.py new file mode 100644 index 0000000..856045f --- /dev/null +++ b/services/order-executor/src/order_executor/config.py @@ -0,0 +1,6 @@ +"""Order Executor configuration.""" +from shared.config import Settings + + +class ExecutorConfig(Settings): + pass diff --git a/services/order-executor/src/order_executor/executor.py b/services/order-executor/src/order_executor/executor.py new file mode 100644 index 0000000..16ae52c --- /dev/null +++ b/services/order-executor/src/order_executor/executor.py @@ -0,0 +1,100 @@ +"""Order execution logic.""" +import logging +from datetime import datetime, timezone +from decimal import Decimal +from typing import Any, Optional + +from shared.broker import RedisBroker +from shared.db import Database +from shared.events import OrderEvent +from shared.models import Order, OrderSide, OrderStatus, OrderType, Signal + +from order_executor.risk_manager import RiskManager + +logger = logging.getLogger(__name__) + + +class OrderExecutor: + """Executes orders on an exchange with risk gating.""" + + def __init__( + self, + exchange: Any, + risk_manager: RiskManager, + broker: RedisBroker, + db: Database, + dry_run: bool = True, + ) -> None: + self.exchange = exchange + self.risk_manager = risk_manager + self.broker = broker + self.db = db + self.dry_run = dry_run + + async def execute(self, signal: Signal) -> Optional[Order]: + """Run risk checks and place an order for the given signal.""" + # Fetch current balance from exchange + balance_data = await self.exchange.fetch_balance() + # Use USDT (or quote currency) free balance as available capital + free_balances = balance_data.get("free", {}) + quote_currency = signal.symbol.split("/")[-1] if "/" in signal.symbol else "USDT" + balance = Decimal(str(free_balances.get(quote_currency, 0))) + + # Fetch current positions + positions = {} + + # Compute daily PnL (not tracked at executor level — use 0 unless provided) + daily_pnl = Decimal(0) + + # Run risk checks + result = self.risk_manager.check( + signal=signal, + balance=balance, + positions=positions, + daily_pnl=daily_pnl, + ) + + if not result.allowed: + logger.warning( + "Risk check rejected signal %s: %s", signal.id, result.reason + ) + return None + + # Build the order model + order = Order( + signal_id=signal.id, + symbol=signal.symbol, + side=signal.side, + type=OrderType.MARKET, + price=signal.price, + quantity=signal.quantity, + status=OrderStatus.PENDING, + ) + + if self.dry_run: + order.status = OrderStatus.FILLED + order.filled_at = datetime.now(timezone.utc) + logger.info("[DRY RUN] Order filled: %s %s %s", order.side, order.quantity, order.symbol) + else: + try: + await self.exchange.create_order( + symbol=signal.symbol, + type="market", + side=signal.side.value.lower(), + amount=float(signal.quantity), + ) + order.status = OrderStatus.FILLED + order.filled_at = datetime.now(timezone.utc) + logger.info("Order filled: %s %s %s", order.side, order.quantity, order.symbol) + except Exception as exc: + order.status = OrderStatus.FAILED + logger.error("Order failed for signal %s: %s", signal.id, exc) + + # Persist to DB + await self.db.insert_order(order) + + # Publish order event + event = OrderEvent(data=order) + await self.broker.publish("orders", event.to_dict()) + + return order diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py new file mode 100644 index 0000000..b57c513 --- /dev/null +++ b/services/order-executor/src/order_executor/main.py @@ -0,0 +1,83 @@ +"""Order Executor Service entry point.""" +import asyncio +import logging +from decimal import Decimal + +import ccxt.async_support as ccxt + +from shared.broker import RedisBroker +from shared.db import Database +from shared.events import Event, EventType + +from order_executor.config import ExecutorConfig +from order_executor.executor import OrderExecutor +from order_executor.risk_manager import RiskManager + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +async def run() -> None: + config = ExecutorConfig() + logging.getLogger().setLevel(config.log_level) + + db = Database(config.database_url) + await db.connect() + await db.init_tables() + + broker = RedisBroker(config.redis_url) + + exchange = ccxt.binance( + { + "apiKey": config.binance_api_key, + "secret": config.binance_api_secret, + } + ) + + risk_manager = RiskManager( + max_position_size=Decimal(str(config.risk_max_position_size)), + stop_loss_pct=Decimal(str(config.risk_stop_loss_pct)), + daily_loss_limit_pct=Decimal(str(config.risk_daily_loss_limit_pct)), + ) + + executor = OrderExecutor( + exchange=exchange, + risk_manager=risk_manager, + broker=broker, + db=db, + dry_run=config.dry_run, + ) + + last_id = "$" + stream = "signals" + logger.info("Order executor started, listening on stream=%s dry_run=%s", stream, config.dry_run) + + try: + while True: + messages = await broker.read(stream, last_id=last_id, count=10, block=5000) + for msg in messages: + try: + event = Event.from_dict(msg) + if event.type == EventType.SIGNAL: + signal = event.data + logger.info("Processing signal %s for %s", signal.id, signal.symbol) + await executor.execute(signal) + except Exception as exc: + logger.error("Failed to process message: %s", exc) + if messages: + # Advance last_id to avoid re-reading — broker.read returns decoded dicts, + # so we track progress by re-reading with "0" for replaying or "$" for new only. + # Since we block on "$" we get only new messages each iteration. + pass + finally: + await broker.close() + await db.close() + await exchange.close() + + +def main() -> None: + asyncio.run(run()) + + +if __name__ == "__main__": + main() diff --git a/services/order-executor/src/order_executor/risk_manager.py b/services/order-executor/src/order_executor/risk_manager.py new file mode 100644 index 0000000..8e8a72c --- /dev/null +++ b/services/order-executor/src/order_executor/risk_manager.py @@ -0,0 +1,55 @@ +"""Risk management for order execution.""" +from dataclasses import dataclass +from decimal import Decimal + +from shared.models import Signal, OrderSide, Position + + +@dataclass +class RiskCheckResult: + allowed: bool + reason: str + + +class RiskManager: + """Evaluates risk before order execution.""" + + def __init__( + self, + max_position_size: Decimal, + stop_loss_pct: Decimal, + daily_loss_limit_pct: Decimal, + ) -> None: + self.max_position_size = max_position_size + self.stop_loss_pct = stop_loss_pct + self.daily_loss_limit_pct = daily_loss_limit_pct + + def check( + self, + signal: Signal, + balance: Decimal, + positions: dict[str, Position], + daily_pnl: Decimal, + ) -> RiskCheckResult: + """Run risk checks against a signal and current portfolio state.""" + # Check daily loss limit + if balance > 0 and (daily_pnl / balance) * 100 < -self.daily_loss_limit_pct: + return RiskCheckResult(allowed=False, reason="Daily loss limit exceeded") + + if signal.side == OrderSide.BUY: + order_cost = signal.price * signal.quantity + + # Check sufficient balance + if order_cost > balance: + return RiskCheckResult(allowed=False, reason="Insufficient balance") + + # Check position size limit + position = positions.get(signal.symbol) + current_position_value = Decimal(0) + if position is not None: + current_position_value = position.quantity * position.current_price + + if balance > 0 and (current_position_value + order_cost) / balance > self.max_position_size: + return RiskCheckResult(allowed=False, reason="Position size exceeded") + + return RiskCheckResult(allowed=True, reason="OK") diff --git a/services/order-executor/tests/__init__.py b/services/order-executor/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/services/order-executor/tests/__init__.py diff --git a/services/order-executor/tests/test_executor.py b/services/order-executor/tests/test_executor.py new file mode 100644 index 0000000..5b18992 --- /dev/null +++ b/services/order-executor/tests/test_executor.py @@ -0,0 +1,122 @@ +"""Tests for OrderExecutor.""" +from decimal import Decimal +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from shared.models import OrderSide, OrderStatus, Signal +from order_executor.executor import OrderExecutor +from order_executor.risk_manager import RiskCheckResult, RiskManager + + +def make_signal(side: OrderSide = OrderSide.BUY, price: str = "100", quantity: str = "1") -> Signal: + return Signal( + strategy="test", + symbol="BTC/USDT", + side=side, + price=Decimal(price), + quantity=Decimal(quantity), + reason="test", + ) + + +def make_mock_exchange(free_usdt: float = 10000.0) -> AsyncMock: + exchange = AsyncMock() + exchange.fetch_balance.return_value = {"free": {"USDT": free_usdt}} + exchange.create_order = AsyncMock(return_value={"id": "exchange-order-123"}) + return exchange + + +def make_mock_risk_manager(allowed: bool = True, reason: str = "OK") -> MagicMock: + rm = MagicMock(spec=RiskManager) + rm.check.return_value = RiskCheckResult(allowed=allowed, reason=reason) + return rm + + +def make_mock_broker() -> AsyncMock: + broker = AsyncMock() + broker.publish = AsyncMock() + return broker + + +def make_mock_db() -> AsyncMock: + db = AsyncMock() + db.insert_order = AsyncMock() + return db + + +@pytest.mark.asyncio +async def test_executor_places_order_when_risk_passes(): + """When risk check passes, create_order is called and order status is FILLED.""" + exchange = make_mock_exchange() + risk_manager = make_mock_risk_manager(allowed=True) + broker = make_mock_broker() + db = make_mock_db() + + executor = OrderExecutor( + exchange=exchange, + risk_manager=risk_manager, + broker=broker, + db=db, + dry_run=False, + ) + + signal = make_signal() + order = await executor.execute(signal) + + assert order is not None + assert order.status == OrderStatus.FILLED + exchange.create_order.assert_called_once() + db.insert_order.assert_called_once_with(order) + broker.publish.assert_called_once() + + +@pytest.mark.asyncio +async def test_executor_rejects_when_risk_fails(): + """When risk check fails, create_order is not called and None is returned.""" + exchange = make_mock_exchange() + risk_manager = make_mock_risk_manager(allowed=False, reason="Position size exceeded") + broker = make_mock_broker() + db = make_mock_db() + + executor = OrderExecutor( + exchange=exchange, + risk_manager=risk_manager, + broker=broker, + db=db, + dry_run=False, + ) + + signal = make_signal() + order = await executor.execute(signal) + + assert order is None + exchange.create_order.assert_not_called() + db.insert_order.assert_not_called() + broker.publish.assert_not_called() + + +@pytest.mark.asyncio +async def test_executor_dry_run_does_not_call_exchange(): + """In dry-run mode, risk passes, order is FILLED, but exchange.create_order is NOT called.""" + exchange = make_mock_exchange() + risk_manager = make_mock_risk_manager(allowed=True) + broker = make_mock_broker() + db = make_mock_db() + + executor = OrderExecutor( + exchange=exchange, + risk_manager=risk_manager, + broker=broker, + db=db, + dry_run=True, + ) + + signal = make_signal() + order = await executor.execute(signal) + + assert order is not None + assert order.status == OrderStatus.FILLED + exchange.create_order.assert_not_called() + db.insert_order.assert_called_once_with(order) + broker.publish.assert_called_once() diff --git a/services/order-executor/tests/test_risk_manager.py b/services/order-executor/tests/test_risk_manager.py new file mode 100644 index 0000000..f6b5545 --- /dev/null +++ b/services/order-executor/tests/test_risk_manager.py @@ -0,0 +1,72 @@ +"""Tests for RiskManager.""" +from decimal import Decimal + +import pytest + +from shared.models import OrderSide, Position, Signal +from order_executor.risk_manager import RiskManager + + +def make_signal(side: OrderSide, price: str, quantity: str, symbol: str = "BTC/USDT") -> Signal: + return Signal( + strategy="test", + symbol=symbol, + side=side, + price=Decimal(price), + quantity=Decimal(quantity), + reason="test signal", + ) + + +def make_risk_manager( + max_position_size: str = "0.1", + stop_loss_pct: str = "5.0", + daily_loss_limit_pct: str = "10.0", +) -> RiskManager: + return RiskManager( + max_position_size=Decimal(max_position_size), + stop_loss_pct=Decimal(stop_loss_pct), + daily_loss_limit_pct=Decimal(daily_loss_limit_pct), + ) + + +def test_risk_check_passes_normal_order(): + """Small BUY order with enough balance should be allowed.""" + rm = make_risk_manager() + signal = make_signal(side=OrderSide.BUY, price="100", quantity="0.5") + # cost = 50, balance = 10000, position_value = 0 => (0+50)/10000 = 0.5% < 10% + result = rm.check(signal, balance=Decimal("10000"), positions={}, daily_pnl=Decimal("0")) + assert result.allowed is True + assert result.reason == "OK" + + +def test_risk_check_rejects_exceeding_position_size(): + """5 BTC at $50,000 = $250,000 order cost on $10,000,000 balance exceeds 10% limit.""" + rm = make_risk_manager(max_position_size="0.1") + signal = make_signal(side=OrderSide.BUY, price="50000", quantity="5") + # cost = 250000, balance = 1000000 => 250000/1000000 = 25% > 10% + # balance is sufficient (250000 < 1000000) but position size is exceeded + result = rm.check(signal, balance=Decimal("1000000"), positions={}, daily_pnl=Decimal("0")) + assert result.allowed is False + assert result.reason == "Position size exceeded" + + +def test_risk_check_rejects_daily_loss_exceeded(): + """Daily PnL of -1100 on 10000 balance = -11%, exceeding -10% limit.""" + rm = make_risk_manager(daily_loss_limit_pct="10.0") + signal = make_signal(side=OrderSide.BUY, price="100", quantity="0.1") + result = rm.check( + signal, balance=Decimal("10000"), positions={}, daily_pnl=Decimal("-1100") + ) + assert result.allowed is False + assert result.reason == "Daily loss limit exceeded" + + +def test_risk_check_rejects_insufficient_balance(): + """Order cost of 500 exceeds available balance of 100.""" + rm = make_risk_manager() + signal = make_signal(side=OrderSide.BUY, price="100", quantity="5") + # cost = 500, balance = 100 + result = rm.check(signal, balance=Decimal("100"), positions={}, daily_pnl=Decimal("0")) + assert result.allowed is False + assert result.reason == "Insufficient balance" |
