diff options
Diffstat (limited to 'services/order-executor/src')
5 files changed, 244 insertions, 0 deletions
diff --git a/services/order-executor/src/order_executor/__init__.py b/services/order-executor/src/order_executor/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/services/order-executor/src/order_executor/__init__.py diff --git a/services/order-executor/src/order_executor/config.py b/services/order-executor/src/order_executor/config.py new file mode 100644 index 0000000..856045f --- /dev/null +++ b/services/order-executor/src/order_executor/config.py @@ -0,0 +1,6 @@ +"""Order Executor configuration.""" +from shared.config import Settings + + +class ExecutorConfig(Settings): + pass diff --git a/services/order-executor/src/order_executor/executor.py b/services/order-executor/src/order_executor/executor.py new file mode 100644 index 0000000..16ae52c --- /dev/null +++ b/services/order-executor/src/order_executor/executor.py @@ -0,0 +1,100 @@ +"""Order execution logic.""" +import logging +from datetime import datetime, timezone +from decimal import Decimal +from typing import Any, Optional + +from shared.broker import RedisBroker +from shared.db import Database +from shared.events import OrderEvent +from shared.models import Order, OrderSide, OrderStatus, OrderType, Signal + +from order_executor.risk_manager import RiskManager + +logger = logging.getLogger(__name__) + + +class OrderExecutor: + """Executes orders on an exchange with risk gating.""" + + def __init__( + self, + exchange: Any, + risk_manager: RiskManager, + broker: RedisBroker, + db: Database, + dry_run: bool = True, + ) -> None: + self.exchange = exchange + self.risk_manager = risk_manager + self.broker = broker + self.db = db + self.dry_run = dry_run + + async def execute(self, signal: Signal) -> Optional[Order]: + """Run risk checks and place an order for the given signal.""" + # Fetch current balance from exchange + balance_data = await self.exchange.fetch_balance() + # Use USDT (or quote currency) free balance as available capital + free_balances = balance_data.get("free", {}) + quote_currency = signal.symbol.split("/")[-1] if "/" in signal.symbol else "USDT" + balance = Decimal(str(free_balances.get(quote_currency, 0))) + + # Fetch current positions + positions = {} + + # Compute daily PnL (not tracked at executor level — use 0 unless provided) + daily_pnl = Decimal(0) + + # Run risk checks + result = self.risk_manager.check( + signal=signal, + balance=balance, + positions=positions, + daily_pnl=daily_pnl, + ) + + if not result.allowed: + logger.warning( + "Risk check rejected signal %s: %s", signal.id, result.reason + ) + return None + + # Build the order model + order = Order( + signal_id=signal.id, + symbol=signal.symbol, + side=signal.side, + type=OrderType.MARKET, + price=signal.price, + quantity=signal.quantity, + status=OrderStatus.PENDING, + ) + + if self.dry_run: + order.status = OrderStatus.FILLED + order.filled_at = datetime.now(timezone.utc) + logger.info("[DRY RUN] Order filled: %s %s %s", order.side, order.quantity, order.symbol) + else: + try: + await self.exchange.create_order( + symbol=signal.symbol, + type="market", + side=signal.side.value.lower(), + amount=float(signal.quantity), + ) + order.status = OrderStatus.FILLED + order.filled_at = datetime.now(timezone.utc) + logger.info("Order filled: %s %s %s", order.side, order.quantity, order.symbol) + except Exception as exc: + order.status = OrderStatus.FAILED + logger.error("Order failed for signal %s: %s", signal.id, exc) + + # Persist to DB + await self.db.insert_order(order) + + # Publish order event + event = OrderEvent(data=order) + await self.broker.publish("orders", event.to_dict()) + + return order diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py new file mode 100644 index 0000000..b57c513 --- /dev/null +++ b/services/order-executor/src/order_executor/main.py @@ -0,0 +1,83 @@ +"""Order Executor Service entry point.""" +import asyncio +import logging +from decimal import Decimal + +import ccxt.async_support as ccxt + +from shared.broker import RedisBroker +from shared.db import Database +from shared.events import Event, EventType + +from order_executor.config import ExecutorConfig +from order_executor.executor import OrderExecutor +from order_executor.risk_manager import RiskManager + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +async def run() -> None: + config = ExecutorConfig() + logging.getLogger().setLevel(config.log_level) + + db = Database(config.database_url) + await db.connect() + await db.init_tables() + + broker = RedisBroker(config.redis_url) + + exchange = ccxt.binance( + { + "apiKey": config.binance_api_key, + "secret": config.binance_api_secret, + } + ) + + risk_manager = RiskManager( + max_position_size=Decimal(str(config.risk_max_position_size)), + stop_loss_pct=Decimal(str(config.risk_stop_loss_pct)), + daily_loss_limit_pct=Decimal(str(config.risk_daily_loss_limit_pct)), + ) + + executor = OrderExecutor( + exchange=exchange, + risk_manager=risk_manager, + broker=broker, + db=db, + dry_run=config.dry_run, + ) + + last_id = "$" + stream = "signals" + logger.info("Order executor started, listening on stream=%s dry_run=%s", stream, config.dry_run) + + try: + while True: + messages = await broker.read(stream, last_id=last_id, count=10, block=5000) + for msg in messages: + try: + event = Event.from_dict(msg) + if event.type == EventType.SIGNAL: + signal = event.data + logger.info("Processing signal %s for %s", signal.id, signal.symbol) + await executor.execute(signal) + except Exception as exc: + logger.error("Failed to process message: %s", exc) + if messages: + # Advance last_id to avoid re-reading — broker.read returns decoded dicts, + # so we track progress by re-reading with "0" for replaying or "$" for new only. + # Since we block on "$" we get only new messages each iteration. + pass + finally: + await broker.close() + await db.close() + await exchange.close() + + +def main() -> None: + asyncio.run(run()) + + +if __name__ == "__main__": + main() diff --git a/services/order-executor/src/order_executor/risk_manager.py b/services/order-executor/src/order_executor/risk_manager.py new file mode 100644 index 0000000..8e8a72c --- /dev/null +++ b/services/order-executor/src/order_executor/risk_manager.py @@ -0,0 +1,55 @@ +"""Risk management for order execution.""" +from dataclasses import dataclass +from decimal import Decimal + +from shared.models import Signal, OrderSide, Position + + +@dataclass +class RiskCheckResult: + allowed: bool + reason: str + + +class RiskManager: + """Evaluates risk before order execution.""" + + def __init__( + self, + max_position_size: Decimal, + stop_loss_pct: Decimal, + daily_loss_limit_pct: Decimal, + ) -> None: + self.max_position_size = max_position_size + self.stop_loss_pct = stop_loss_pct + self.daily_loss_limit_pct = daily_loss_limit_pct + + def check( + self, + signal: Signal, + balance: Decimal, + positions: dict[str, Position], + daily_pnl: Decimal, + ) -> RiskCheckResult: + """Run risk checks against a signal and current portfolio state.""" + # Check daily loss limit + if balance > 0 and (daily_pnl / balance) * 100 < -self.daily_loss_limit_pct: + return RiskCheckResult(allowed=False, reason="Daily loss limit exceeded") + + if signal.side == OrderSide.BUY: + order_cost = signal.price * signal.quantity + + # Check sufficient balance + if order_cost > balance: + return RiskCheckResult(allowed=False, reason="Insufficient balance") + + # Check position size limit + position = positions.get(signal.symbol) + current_position_value = Decimal(0) + if position is not None: + current_position_value = position.quantity * position.current_price + + if balance > 0 and (current_position_value + order_cost) / balance > self.max_position_size: + return RiskCheckResult(allowed=False, reason="Position size exceeded") + + return RiskCheckResult(allowed=True, reason="OK") |
