summaryrefslogtreecommitdiff
path: root/services/order-executor
diff options
context:
space:
mode:
Diffstat (limited to 'services/order-executor')
-rw-r--r--services/order-executor/Dockerfile7
-rw-r--r--services/order-executor/pyproject.toml16
-rw-r--r--services/order-executor/src/order_executor/__init__.py0
-rw-r--r--services/order-executor/src/order_executor/config.py6
-rw-r--r--services/order-executor/src/order_executor/executor.py100
-rw-r--r--services/order-executor/src/order_executor/main.py83
-rw-r--r--services/order-executor/src/order_executor/risk_manager.py55
-rw-r--r--services/order-executor/tests/__init__.py0
-rw-r--r--services/order-executor/tests/test_executor.py122
-rw-r--r--services/order-executor/tests/test_risk_manager.py72
10 files changed, 461 insertions, 0 deletions
diff --git a/services/order-executor/Dockerfile b/services/order-executor/Dockerfile
new file mode 100644
index 0000000..f044714
--- /dev/null
+++ b/services/order-executor/Dockerfile
@@ -0,0 +1,7 @@
+FROM python:3.12-slim
+WORKDIR /app
+COPY shared/ shared/
+RUN pip install --no-cache-dir ./shared
+COPY services/order-executor/ services/order-executor/
+RUN pip install --no-cache-dir ./services/order-executor
+CMD ["python", "-m", "order_executor.main"]
diff --git a/services/order-executor/pyproject.toml b/services/order-executor/pyproject.toml
new file mode 100644
index 0000000..eed4fef
--- /dev/null
+++ b/services/order-executor/pyproject.toml
@@ -0,0 +1,16 @@
+[project]
+name = "order-executor"
+version = "0.1.0"
+description = "Order execution service with risk management"
+requires-python = ">=3.12"
+dependencies = ["ccxt>=4.0", "trading-shared"]
+
+[project.optional-dependencies]
+dev = ["pytest>=8.0", "pytest-asyncio>=0.23"]
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/order_executor"]
diff --git a/services/order-executor/src/order_executor/__init__.py b/services/order-executor/src/order_executor/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/services/order-executor/src/order_executor/__init__.py
diff --git a/services/order-executor/src/order_executor/config.py b/services/order-executor/src/order_executor/config.py
new file mode 100644
index 0000000..856045f
--- /dev/null
+++ b/services/order-executor/src/order_executor/config.py
@@ -0,0 +1,6 @@
+"""Order Executor configuration."""
+from shared.config import Settings
+
+
+class ExecutorConfig(Settings):
+ pass
diff --git a/services/order-executor/src/order_executor/executor.py b/services/order-executor/src/order_executor/executor.py
new file mode 100644
index 0000000..16ae52c
--- /dev/null
+++ b/services/order-executor/src/order_executor/executor.py
@@ -0,0 +1,100 @@
+"""Order execution logic."""
+import logging
+from datetime import datetime, timezone
+from decimal import Decimal
+from typing import Any, Optional
+
+from shared.broker import RedisBroker
+from shared.db import Database
+from shared.events import OrderEvent
+from shared.models import Order, OrderSide, OrderStatus, OrderType, Signal
+
+from order_executor.risk_manager import RiskManager
+
+logger = logging.getLogger(__name__)
+
+
+class OrderExecutor:
+ """Executes orders on an exchange with risk gating."""
+
+ def __init__(
+ self,
+ exchange: Any,
+ risk_manager: RiskManager,
+ broker: RedisBroker,
+ db: Database,
+ dry_run: bool = True,
+ ) -> None:
+ self.exchange = exchange
+ self.risk_manager = risk_manager
+ self.broker = broker
+ self.db = db
+ self.dry_run = dry_run
+
+ async def execute(self, signal: Signal) -> Optional[Order]:
+ """Run risk checks and place an order for the given signal."""
+ # Fetch current balance from exchange
+ balance_data = await self.exchange.fetch_balance()
+ # Use USDT (or quote currency) free balance as available capital
+ free_balances = balance_data.get("free", {})
+ quote_currency = signal.symbol.split("/")[-1] if "/" in signal.symbol else "USDT"
+ balance = Decimal(str(free_balances.get(quote_currency, 0)))
+
+ # Fetch current positions
+ positions = {}
+
+ # Compute daily PnL (not tracked at executor level — use 0 unless provided)
+ daily_pnl = Decimal(0)
+
+ # Run risk checks
+ result = self.risk_manager.check(
+ signal=signal,
+ balance=balance,
+ positions=positions,
+ daily_pnl=daily_pnl,
+ )
+
+ if not result.allowed:
+ logger.warning(
+ "Risk check rejected signal %s: %s", signal.id, result.reason
+ )
+ return None
+
+ # Build the order model
+ order = Order(
+ signal_id=signal.id,
+ symbol=signal.symbol,
+ side=signal.side,
+ type=OrderType.MARKET,
+ price=signal.price,
+ quantity=signal.quantity,
+ status=OrderStatus.PENDING,
+ )
+
+ if self.dry_run:
+ order.status = OrderStatus.FILLED
+ order.filled_at = datetime.now(timezone.utc)
+ logger.info("[DRY RUN] Order filled: %s %s %s", order.side, order.quantity, order.symbol)
+ else:
+ try:
+ await self.exchange.create_order(
+ symbol=signal.symbol,
+ type="market",
+ side=signal.side.value.lower(),
+ amount=float(signal.quantity),
+ )
+ order.status = OrderStatus.FILLED
+ order.filled_at = datetime.now(timezone.utc)
+ logger.info("Order filled: %s %s %s", order.side, order.quantity, order.symbol)
+ except Exception as exc:
+ order.status = OrderStatus.FAILED
+ logger.error("Order failed for signal %s: %s", signal.id, exc)
+
+ # Persist to DB
+ await self.db.insert_order(order)
+
+ # Publish order event
+ event = OrderEvent(data=order)
+ await self.broker.publish("orders", event.to_dict())
+
+ return order
diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py
new file mode 100644
index 0000000..b57c513
--- /dev/null
+++ b/services/order-executor/src/order_executor/main.py
@@ -0,0 +1,83 @@
+"""Order Executor Service entry point."""
+import asyncio
+import logging
+from decimal import Decimal
+
+import ccxt.async_support as ccxt
+
+from shared.broker import RedisBroker
+from shared.db import Database
+from shared.events import Event, EventType
+
+from order_executor.config import ExecutorConfig
+from order_executor.executor import OrderExecutor
+from order_executor.risk_manager import RiskManager
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+
+async def run() -> None:
+ config = ExecutorConfig()
+ logging.getLogger().setLevel(config.log_level)
+
+ db = Database(config.database_url)
+ await db.connect()
+ await db.init_tables()
+
+ broker = RedisBroker(config.redis_url)
+
+ exchange = ccxt.binance(
+ {
+ "apiKey": config.binance_api_key,
+ "secret": config.binance_api_secret,
+ }
+ )
+
+ risk_manager = RiskManager(
+ max_position_size=Decimal(str(config.risk_max_position_size)),
+ stop_loss_pct=Decimal(str(config.risk_stop_loss_pct)),
+ daily_loss_limit_pct=Decimal(str(config.risk_daily_loss_limit_pct)),
+ )
+
+ executor = OrderExecutor(
+ exchange=exchange,
+ risk_manager=risk_manager,
+ broker=broker,
+ db=db,
+ dry_run=config.dry_run,
+ )
+
+ last_id = "$"
+ stream = "signals"
+ logger.info("Order executor started, listening on stream=%s dry_run=%s", stream, config.dry_run)
+
+ try:
+ while True:
+ messages = await broker.read(stream, last_id=last_id, count=10, block=5000)
+ for msg in messages:
+ try:
+ event = Event.from_dict(msg)
+ if event.type == EventType.SIGNAL:
+ signal = event.data
+ logger.info("Processing signal %s for %s", signal.id, signal.symbol)
+ await executor.execute(signal)
+ except Exception as exc:
+ logger.error("Failed to process message: %s", exc)
+ if messages:
+ # Advance last_id to avoid re-reading — broker.read returns decoded dicts,
+ # so we track progress by re-reading with "0" for replaying or "$" for new only.
+ # Since we block on "$" we get only new messages each iteration.
+ pass
+ finally:
+ await broker.close()
+ await db.close()
+ await exchange.close()
+
+
+def main() -> None:
+ asyncio.run(run())
+
+
+if __name__ == "__main__":
+ main()
diff --git a/services/order-executor/src/order_executor/risk_manager.py b/services/order-executor/src/order_executor/risk_manager.py
new file mode 100644
index 0000000..8e8a72c
--- /dev/null
+++ b/services/order-executor/src/order_executor/risk_manager.py
@@ -0,0 +1,55 @@
+"""Risk management for order execution."""
+from dataclasses import dataclass
+from decimal import Decimal
+
+from shared.models import Signal, OrderSide, Position
+
+
+@dataclass
+class RiskCheckResult:
+ allowed: bool
+ reason: str
+
+
+class RiskManager:
+ """Evaluates risk before order execution."""
+
+ def __init__(
+ self,
+ max_position_size: Decimal,
+ stop_loss_pct: Decimal,
+ daily_loss_limit_pct: Decimal,
+ ) -> None:
+ self.max_position_size = max_position_size
+ self.stop_loss_pct = stop_loss_pct
+ self.daily_loss_limit_pct = daily_loss_limit_pct
+
+ def check(
+ self,
+ signal: Signal,
+ balance: Decimal,
+ positions: dict[str, Position],
+ daily_pnl: Decimal,
+ ) -> RiskCheckResult:
+ """Run risk checks against a signal and current portfolio state."""
+ # Check daily loss limit
+ if balance > 0 and (daily_pnl / balance) * 100 < -self.daily_loss_limit_pct:
+ return RiskCheckResult(allowed=False, reason="Daily loss limit exceeded")
+
+ if signal.side == OrderSide.BUY:
+ order_cost = signal.price * signal.quantity
+
+ # Check sufficient balance
+ if order_cost > balance:
+ return RiskCheckResult(allowed=False, reason="Insufficient balance")
+
+ # Check position size limit
+ position = positions.get(signal.symbol)
+ current_position_value = Decimal(0)
+ if position is not None:
+ current_position_value = position.quantity * position.current_price
+
+ if balance > 0 and (current_position_value + order_cost) / balance > self.max_position_size:
+ return RiskCheckResult(allowed=False, reason="Position size exceeded")
+
+ return RiskCheckResult(allowed=True, reason="OK")
diff --git a/services/order-executor/tests/__init__.py b/services/order-executor/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/services/order-executor/tests/__init__.py
diff --git a/services/order-executor/tests/test_executor.py b/services/order-executor/tests/test_executor.py
new file mode 100644
index 0000000..5b18992
--- /dev/null
+++ b/services/order-executor/tests/test_executor.py
@@ -0,0 +1,122 @@
+"""Tests for OrderExecutor."""
+from decimal import Decimal
+from unittest.mock import AsyncMock, MagicMock
+
+import pytest
+
+from shared.models import OrderSide, OrderStatus, Signal
+from order_executor.executor import OrderExecutor
+from order_executor.risk_manager import RiskCheckResult, RiskManager
+
+
+def make_signal(side: OrderSide = OrderSide.BUY, price: str = "100", quantity: str = "1") -> Signal:
+ return Signal(
+ strategy="test",
+ symbol="BTC/USDT",
+ side=side,
+ price=Decimal(price),
+ quantity=Decimal(quantity),
+ reason="test",
+ )
+
+
+def make_mock_exchange(free_usdt: float = 10000.0) -> AsyncMock:
+ exchange = AsyncMock()
+ exchange.fetch_balance.return_value = {"free": {"USDT": free_usdt}}
+ exchange.create_order = AsyncMock(return_value={"id": "exchange-order-123"})
+ return exchange
+
+
+def make_mock_risk_manager(allowed: bool = True, reason: str = "OK") -> MagicMock:
+ rm = MagicMock(spec=RiskManager)
+ rm.check.return_value = RiskCheckResult(allowed=allowed, reason=reason)
+ return rm
+
+
+def make_mock_broker() -> AsyncMock:
+ broker = AsyncMock()
+ broker.publish = AsyncMock()
+ return broker
+
+
+def make_mock_db() -> AsyncMock:
+ db = AsyncMock()
+ db.insert_order = AsyncMock()
+ return db
+
+
+@pytest.mark.asyncio
+async def test_executor_places_order_when_risk_passes():
+ """When risk check passes, create_order is called and order status is FILLED."""
+ exchange = make_mock_exchange()
+ risk_manager = make_mock_risk_manager(allowed=True)
+ broker = make_mock_broker()
+ db = make_mock_db()
+
+ executor = OrderExecutor(
+ exchange=exchange,
+ risk_manager=risk_manager,
+ broker=broker,
+ db=db,
+ dry_run=False,
+ )
+
+ signal = make_signal()
+ order = await executor.execute(signal)
+
+ assert order is not None
+ assert order.status == OrderStatus.FILLED
+ exchange.create_order.assert_called_once()
+ db.insert_order.assert_called_once_with(order)
+ broker.publish.assert_called_once()
+
+
+@pytest.mark.asyncio
+async def test_executor_rejects_when_risk_fails():
+ """When risk check fails, create_order is not called and None is returned."""
+ exchange = make_mock_exchange()
+ risk_manager = make_mock_risk_manager(allowed=False, reason="Position size exceeded")
+ broker = make_mock_broker()
+ db = make_mock_db()
+
+ executor = OrderExecutor(
+ exchange=exchange,
+ risk_manager=risk_manager,
+ broker=broker,
+ db=db,
+ dry_run=False,
+ )
+
+ signal = make_signal()
+ order = await executor.execute(signal)
+
+ assert order is None
+ exchange.create_order.assert_not_called()
+ db.insert_order.assert_not_called()
+ broker.publish.assert_not_called()
+
+
+@pytest.mark.asyncio
+async def test_executor_dry_run_does_not_call_exchange():
+ """In dry-run mode, risk passes, order is FILLED, but exchange.create_order is NOT called."""
+ exchange = make_mock_exchange()
+ risk_manager = make_mock_risk_manager(allowed=True)
+ broker = make_mock_broker()
+ db = make_mock_db()
+
+ executor = OrderExecutor(
+ exchange=exchange,
+ risk_manager=risk_manager,
+ broker=broker,
+ db=db,
+ dry_run=True,
+ )
+
+ signal = make_signal()
+ order = await executor.execute(signal)
+
+ assert order is not None
+ assert order.status == OrderStatus.FILLED
+ exchange.create_order.assert_not_called()
+ db.insert_order.assert_called_once_with(order)
+ broker.publish.assert_called_once()
diff --git a/services/order-executor/tests/test_risk_manager.py b/services/order-executor/tests/test_risk_manager.py
new file mode 100644
index 0000000..f6b5545
--- /dev/null
+++ b/services/order-executor/tests/test_risk_manager.py
@@ -0,0 +1,72 @@
+"""Tests for RiskManager."""
+from decimal import Decimal
+
+import pytest
+
+from shared.models import OrderSide, Position, Signal
+from order_executor.risk_manager import RiskManager
+
+
+def make_signal(side: OrderSide, price: str, quantity: str, symbol: str = "BTC/USDT") -> Signal:
+ return Signal(
+ strategy="test",
+ symbol=symbol,
+ side=side,
+ price=Decimal(price),
+ quantity=Decimal(quantity),
+ reason="test signal",
+ )
+
+
+def make_risk_manager(
+ max_position_size: str = "0.1",
+ stop_loss_pct: str = "5.0",
+ daily_loss_limit_pct: str = "10.0",
+) -> RiskManager:
+ return RiskManager(
+ max_position_size=Decimal(max_position_size),
+ stop_loss_pct=Decimal(stop_loss_pct),
+ daily_loss_limit_pct=Decimal(daily_loss_limit_pct),
+ )
+
+
+def test_risk_check_passes_normal_order():
+ """Small BUY order with enough balance should be allowed."""
+ rm = make_risk_manager()
+ signal = make_signal(side=OrderSide.BUY, price="100", quantity="0.5")
+ # cost = 50, balance = 10000, position_value = 0 => (0+50)/10000 = 0.5% < 10%
+ result = rm.check(signal, balance=Decimal("10000"), positions={}, daily_pnl=Decimal("0"))
+ assert result.allowed is True
+ assert result.reason == "OK"
+
+
+def test_risk_check_rejects_exceeding_position_size():
+ """5 BTC at $50,000 = $250,000 order cost on $10,000,000 balance exceeds 10% limit."""
+ rm = make_risk_manager(max_position_size="0.1")
+ signal = make_signal(side=OrderSide.BUY, price="50000", quantity="5")
+ # cost = 250000, balance = 1000000 => 250000/1000000 = 25% > 10%
+ # balance is sufficient (250000 < 1000000) but position size is exceeded
+ result = rm.check(signal, balance=Decimal("1000000"), positions={}, daily_pnl=Decimal("0"))
+ assert result.allowed is False
+ assert result.reason == "Position size exceeded"
+
+
+def test_risk_check_rejects_daily_loss_exceeded():
+ """Daily PnL of -1100 on 10000 balance = -11%, exceeding -10% limit."""
+ rm = make_risk_manager(daily_loss_limit_pct="10.0")
+ signal = make_signal(side=OrderSide.BUY, price="100", quantity="0.1")
+ result = rm.check(
+ signal, balance=Decimal("10000"), positions={}, daily_pnl=Decimal("-1100")
+ )
+ assert result.allowed is False
+ assert result.reason == "Daily loss limit exceeded"
+
+
+def test_risk_check_rejects_insufficient_balance():
+ """Order cost of 500 exceeds available balance of 100."""
+ rm = make_risk_manager()
+ signal = make_signal(side=OrderSide.BUY, price="100", quantity="5")
+ # cost = 500, balance = 100
+ result = rm.check(signal, balance=Decimal("100"), positions={}, daily_pnl=Decimal("0"))
+ assert result.allowed is False
+ assert result.reason == "Insufficient balance"