summaryrefslogtreecommitdiff
path: root/services/order-executor/src/order_executor
diff options
context:
space:
mode:
authorTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 15:56:35 +0900
committerTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 15:56:35 +0900
commit33b14aaa2344b0fd95d1629627c3d135b24ae102 (patch)
tree90b214758bc3b076baa7711226a1a1be6268e72e /services/order-executor/src/order_executor
parent9360f1a800aa29b40399a2f3bfbfcf215a04e279 (diff)
feat: initial trading platform implementation
Binance spot crypto trading platform with microservices architecture: - shared: Pydantic models, Redis Streams broker, asyncpg DB layer - data-collector: Binance WebSocket/REST market data collection - strategy-engine: Plugin-based strategy execution (RSI, Grid) - order-executor: Order execution with risk management - portfolio-manager: Position tracking and PnL calculation - backtester: Historical strategy testing with simulator - cli: Click-based CLI for all operations - Docker Compose orchestration with Redis and PostgreSQL - 24 test files covering all modules
Diffstat (limited to 'services/order-executor/src/order_executor')
-rw-r--r--services/order-executor/src/order_executor/__init__.py0
-rw-r--r--services/order-executor/src/order_executor/config.py6
-rw-r--r--services/order-executor/src/order_executor/executor.py100
-rw-r--r--services/order-executor/src/order_executor/main.py83
-rw-r--r--services/order-executor/src/order_executor/risk_manager.py55
5 files changed, 244 insertions, 0 deletions
diff --git a/services/order-executor/src/order_executor/__init__.py b/services/order-executor/src/order_executor/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/services/order-executor/src/order_executor/__init__.py
diff --git a/services/order-executor/src/order_executor/config.py b/services/order-executor/src/order_executor/config.py
new file mode 100644
index 0000000..856045f
--- /dev/null
+++ b/services/order-executor/src/order_executor/config.py
@@ -0,0 +1,6 @@
+"""Order Executor configuration."""
+from shared.config import Settings
+
+
+class ExecutorConfig(Settings):
+ pass
diff --git a/services/order-executor/src/order_executor/executor.py b/services/order-executor/src/order_executor/executor.py
new file mode 100644
index 0000000..16ae52c
--- /dev/null
+++ b/services/order-executor/src/order_executor/executor.py
@@ -0,0 +1,100 @@
+"""Order execution logic."""
+import logging
+from datetime import datetime, timezone
+from decimal import Decimal
+from typing import Any, Optional
+
+from shared.broker import RedisBroker
+from shared.db import Database
+from shared.events import OrderEvent
+from shared.models import Order, OrderSide, OrderStatus, OrderType, Signal
+
+from order_executor.risk_manager import RiskManager
+
+logger = logging.getLogger(__name__)
+
+
+class OrderExecutor:
+ """Executes orders on an exchange with risk gating."""
+
+ def __init__(
+ self,
+ exchange: Any,
+ risk_manager: RiskManager,
+ broker: RedisBroker,
+ db: Database,
+ dry_run: bool = True,
+ ) -> None:
+ self.exchange = exchange
+ self.risk_manager = risk_manager
+ self.broker = broker
+ self.db = db
+ self.dry_run = dry_run
+
+ async def execute(self, signal: Signal) -> Optional[Order]:
+ """Run risk checks and place an order for the given signal."""
+ # Fetch current balance from exchange
+ balance_data = await self.exchange.fetch_balance()
+ # Use USDT (or quote currency) free balance as available capital
+ free_balances = balance_data.get("free", {})
+ quote_currency = signal.symbol.split("/")[-1] if "/" in signal.symbol else "USDT"
+ balance = Decimal(str(free_balances.get(quote_currency, 0)))
+
+ # Fetch current positions
+ positions = {}
+
+ # Compute daily PnL (not tracked at executor level — use 0 unless provided)
+ daily_pnl = Decimal(0)
+
+ # Run risk checks
+ result = self.risk_manager.check(
+ signal=signal,
+ balance=balance,
+ positions=positions,
+ daily_pnl=daily_pnl,
+ )
+
+ if not result.allowed:
+ logger.warning(
+ "Risk check rejected signal %s: %s", signal.id, result.reason
+ )
+ return None
+
+ # Build the order model
+ order = Order(
+ signal_id=signal.id,
+ symbol=signal.symbol,
+ side=signal.side,
+ type=OrderType.MARKET,
+ price=signal.price,
+ quantity=signal.quantity,
+ status=OrderStatus.PENDING,
+ )
+
+ if self.dry_run:
+ order.status = OrderStatus.FILLED
+ order.filled_at = datetime.now(timezone.utc)
+ logger.info("[DRY RUN] Order filled: %s %s %s", order.side, order.quantity, order.symbol)
+ else:
+ try:
+ await self.exchange.create_order(
+ symbol=signal.symbol,
+ type="market",
+ side=signal.side.value.lower(),
+ amount=float(signal.quantity),
+ )
+ order.status = OrderStatus.FILLED
+ order.filled_at = datetime.now(timezone.utc)
+ logger.info("Order filled: %s %s %s", order.side, order.quantity, order.symbol)
+ except Exception as exc:
+ order.status = OrderStatus.FAILED
+ logger.error("Order failed for signal %s: %s", signal.id, exc)
+
+ # Persist to DB
+ await self.db.insert_order(order)
+
+ # Publish order event
+ event = OrderEvent(data=order)
+ await self.broker.publish("orders", event.to_dict())
+
+ return order
diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py
new file mode 100644
index 0000000..b57c513
--- /dev/null
+++ b/services/order-executor/src/order_executor/main.py
@@ -0,0 +1,83 @@
+"""Order Executor Service entry point."""
+import asyncio
+import logging
+from decimal import Decimal
+
+import ccxt.async_support as ccxt
+
+from shared.broker import RedisBroker
+from shared.db import Database
+from shared.events import Event, EventType
+
+from order_executor.config import ExecutorConfig
+from order_executor.executor import OrderExecutor
+from order_executor.risk_manager import RiskManager
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+
+async def run() -> None:
+ config = ExecutorConfig()
+ logging.getLogger().setLevel(config.log_level)
+
+ db = Database(config.database_url)
+ await db.connect()
+ await db.init_tables()
+
+ broker = RedisBroker(config.redis_url)
+
+ exchange = ccxt.binance(
+ {
+ "apiKey": config.binance_api_key,
+ "secret": config.binance_api_secret,
+ }
+ )
+
+ risk_manager = RiskManager(
+ max_position_size=Decimal(str(config.risk_max_position_size)),
+ stop_loss_pct=Decimal(str(config.risk_stop_loss_pct)),
+ daily_loss_limit_pct=Decimal(str(config.risk_daily_loss_limit_pct)),
+ )
+
+ executor = OrderExecutor(
+ exchange=exchange,
+ risk_manager=risk_manager,
+ broker=broker,
+ db=db,
+ dry_run=config.dry_run,
+ )
+
+ last_id = "$"
+ stream = "signals"
+ logger.info("Order executor started, listening on stream=%s dry_run=%s", stream, config.dry_run)
+
+ try:
+ while True:
+ messages = await broker.read(stream, last_id=last_id, count=10, block=5000)
+ for msg in messages:
+ try:
+ event = Event.from_dict(msg)
+ if event.type == EventType.SIGNAL:
+ signal = event.data
+ logger.info("Processing signal %s for %s", signal.id, signal.symbol)
+ await executor.execute(signal)
+ except Exception as exc:
+ logger.error("Failed to process message: %s", exc)
+ if messages:
+ # Advance last_id to avoid re-reading — broker.read returns decoded dicts,
+ # so we track progress by re-reading with "0" for replaying or "$" for new only.
+ # Since we block on "$" we get only new messages each iteration.
+ pass
+ finally:
+ await broker.close()
+ await db.close()
+ await exchange.close()
+
+
+def main() -> None:
+ asyncio.run(run())
+
+
+if __name__ == "__main__":
+ main()
diff --git a/services/order-executor/src/order_executor/risk_manager.py b/services/order-executor/src/order_executor/risk_manager.py
new file mode 100644
index 0000000..8e8a72c
--- /dev/null
+++ b/services/order-executor/src/order_executor/risk_manager.py
@@ -0,0 +1,55 @@
+"""Risk management for order execution."""
+from dataclasses import dataclass
+from decimal import Decimal
+
+from shared.models import Signal, OrderSide, Position
+
+
+@dataclass
+class RiskCheckResult:
+ allowed: bool
+ reason: str
+
+
+class RiskManager:
+ """Evaluates risk before order execution."""
+
+ def __init__(
+ self,
+ max_position_size: Decimal,
+ stop_loss_pct: Decimal,
+ daily_loss_limit_pct: Decimal,
+ ) -> None:
+ self.max_position_size = max_position_size
+ self.stop_loss_pct = stop_loss_pct
+ self.daily_loss_limit_pct = daily_loss_limit_pct
+
+ def check(
+ self,
+ signal: Signal,
+ balance: Decimal,
+ positions: dict[str, Position],
+ daily_pnl: Decimal,
+ ) -> RiskCheckResult:
+ """Run risk checks against a signal and current portfolio state."""
+ # Check daily loss limit
+ if balance > 0 and (daily_pnl / balance) * 100 < -self.daily_loss_limit_pct:
+ return RiskCheckResult(allowed=False, reason="Daily loss limit exceeded")
+
+ if signal.side == OrderSide.BUY:
+ order_cost = signal.price * signal.quantity
+
+ # Check sufficient balance
+ if order_cost > balance:
+ return RiskCheckResult(allowed=False, reason="Insufficient balance")
+
+ # Check position size limit
+ position = positions.get(signal.symbol)
+ current_position_value = Decimal(0)
+ if position is not None:
+ current_position_value = position.quantity * position.current_price
+
+ if balance > 0 and (current_position_value + order_cost) / balance > self.max_position_size:
+ return RiskCheckResult(allowed=False, reason="Position size exceeded")
+
+ return RiskCheckResult(allowed=True, reason="OK")