diff options
Diffstat (limited to 'services/order-executor')
| -rw-r--r-- | services/order-executor/Dockerfile | 9 | ||||
| -rw-r--r-- | services/order-executor/src/order_executor/executor.py | 16 | ||||
| -rw-r--r-- | services/order-executor/src/order_executor/main.py | 45 | ||||
| -rw-r--r-- | services/order-executor/src/order_executor/risk_manager.py | 26 | ||||
| -rw-r--r-- | services/order-executor/tests/test_executor.py | 4 | ||||
| -rw-r--r-- | services/order-executor/tests/test_risk_manager.py | 48 |
6 files changed, 89 insertions, 59 deletions
diff --git a/services/order-executor/Dockerfile b/services/order-executor/Dockerfile index bc8b21c..376afec 100644 --- a/services/order-executor/Dockerfile +++ b/services/order-executor/Dockerfile @@ -1,8 +1,15 @@ -FROM python:3.12-slim +FROM python:3.12-slim AS builder WORKDIR /app COPY shared/ shared/ RUN pip install --no-cache-dir ./shared COPY services/order-executor/ services/order-executor/ RUN pip install --no-cache-dir ./services/order-executor + +FROM python:3.12-slim +RUN useradd -r -s /bin/false appuser +WORKDIR /app +COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages +COPY --from=builder /usr/local/bin /usr/local/bin ENV PYTHONPATH=/app +USER appuser CMD ["python", "-m", "order_executor.main"] diff --git a/services/order-executor/src/order_executor/executor.py b/services/order-executor/src/order_executor/executor.py index a71e762..fd502cd 100644 --- a/services/order-executor/src/order_executor/executor.py +++ b/services/order-executor/src/order_executor/executor.py @@ -1,18 +1,18 @@ """Order execution logic.""" -import structlog -from datetime import datetime, timezone +from datetime import UTC, datetime from decimal import Decimal -from typing import Any, Optional +from typing import Any + +import structlog +from order_executor.risk_manager import RiskManager from shared.broker import RedisBroker from shared.db import Database from shared.events import OrderEvent from shared.models import Order, OrderStatus, OrderType, Signal from shared.notifier import TelegramNotifier -from order_executor.risk_manager import RiskManager - logger = structlog.get_logger() @@ -35,7 +35,7 @@ class OrderExecutor: self.notifier = notifier self.dry_run = dry_run - async def execute(self, signal: Signal) -> Optional[Order]: + async def execute(self, signal: Signal) -> Order | None: """Run risk checks and place an order for the given signal.""" # Fetch buying power from Alpaca balance = await self.exchange.get_buying_power() @@ -71,7 +71,7 @@ class OrderExecutor: if self.dry_run: order.status = OrderStatus.FILLED - order.filled_at = datetime.now(timezone.utc) + order.filled_at = datetime.now(UTC) logger.info( "order_filled_dry_run", side=str(order.side), @@ -87,7 +87,7 @@ class OrderExecutor: type="market", ) order.status = OrderStatus.FILLED - order.filled_at = datetime.now(timezone.utc) + order.filled_at = datetime.now(UTC) logger.info( "order_filled", side=str(order.side), diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py index 51ab286..99f88e1 100644 --- a/services/order-executor/src/order_executor/main.py +++ b/services/order-executor/src/order_executor/main.py @@ -3,6 +3,11 @@ import asyncio from decimal import Decimal +import aiohttp + +from order_executor.config import ExecutorConfig +from order_executor.executor import OrderExecutor +from order_executor.risk_manager import RiskManager from shared.alpaca import AlpacaClient from shared.broker import RedisBroker from shared.db import Database @@ -11,10 +16,7 @@ from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier - -from order_executor.config import ExecutorConfig -from order_executor.executor import OrderExecutor -from order_executor.risk_manager import RiskManager +from shared.shutdown import GracefulShutdown # Health check port: base + 2 HEALTH_PORT_OFFSET = 2 @@ -26,18 +28,18 @@ async def run() -> None: metrics = ServiceMetrics("order_executor") notifier = TelegramNotifier( - bot_token=config.telegram_bot_token, + bot_token=config.telegram_bot_token.get_secret_value(), chat_id=config.telegram_chat_id, ) - db = Database(config.database_url) + db = Database(config.database_url.get_secret_value()) await db.connect() - broker = RedisBroker(config.redis_url) + broker = RedisBroker(config.redis_url.get_secret_value()) alpaca = AlpacaClient( - api_key=config.alpaca_api_key, - api_secret=config.alpaca_api_secret, + api_key=config.alpaca_api_key.get_secret_value(), + api_secret=config.alpaca_api_secret.get_secret_value(), paper=config.alpaca_paper, ) @@ -83,6 +85,9 @@ async def run() -> None: await broker.ensure_group(stream, GROUP) + shutdown = GracefulShutdown() + shutdown.install_handlers() + log.info("started", stream=stream, dry_run=config.dry_run) try: @@ -94,10 +99,15 @@ async def run() -> None: if event.type == EventType.SIGNAL: await executor.execute(event.data) await broker.ack(stream, GROUP, msg_id) + except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc: + log.warning("pending_network_error", error=str(exc), msg_id=msg_id) + except (ValueError, KeyError, TypeError) as exc: + log.warning("pending_parse_error", error=str(exc), msg_id=msg_id) + await broker.ack(stream, GROUP, msg_id) except Exception as exc: - log.error("pending_failed", error=str(exc), msg_id=msg_id) + log.error("pending_failed", error=str(exc), msg_id=msg_id, exc_info=True) - while True: + while not shutdown.is_shutting_down: messages = await broker.read_group(stream, GROUP, CONSUMER, count=10, block=5000) for msg_id, msg in messages: try: @@ -110,8 +120,19 @@ async def run() -> None: service="order-executor", event_type="signal" ).inc() await broker.ack(stream, GROUP, msg_id) + except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc: + log.warning("process_network_error", error=str(exc)) + metrics.errors_total.labels( + service="order-executor", error_type="network" + ).inc() + except (ValueError, KeyError, TypeError) as exc: + log.warning("process_parse_error", error=str(exc)) + await broker.ack(stream, GROUP, msg_id) + metrics.errors_total.labels( + service="order-executor", error_type="validation" + ).inc() except Exception as exc: - log.error("process_failed", error=str(exc)) + log.error("process_failed", error=str(exc), exc_info=True) metrics.errors_total.labels( service="order-executor", error_type="processing" ).inc() diff --git a/services/order-executor/src/order_executor/risk_manager.py b/services/order-executor/src/order_executor/risk_manager.py index 5a05746..811a862 100644 --- a/services/order-executor/src/order_executor/risk_manager.py +++ b/services/order-executor/src/order_executor/risk_manager.py @@ -1,12 +1,12 @@ """Risk management for order execution.""" +import math +from collections import deque from dataclasses import dataclass -from datetime import datetime, timezone, timedelta +from datetime import UTC, datetime, timedelta from decimal import Decimal -from collections import deque -import math -from shared.models import Signal, OrderSide, Position +from shared.models import OrderSide, Position, Signal @dataclass @@ -123,15 +123,13 @@ class RiskManager: else: self._consecutive_losses += 1 if self._consecutive_losses >= self._max_consecutive_losses: - self._paused_until = datetime.now(timezone.utc) + timedelta( - minutes=self._loss_pause_minutes - ) + self._paused_until = datetime.now(UTC) + timedelta(minutes=self._loss_pause_minutes) def is_paused(self) -> bool: """Check if trading is paused due to consecutive losses.""" if self._paused_until is None: return False - if datetime.now(timezone.utc) >= self._paused_until: + if datetime.now(UTC) >= self._paused_until: self._paused_until = None self._consecutive_losses = 0 return False @@ -233,9 +231,9 @@ class RiskManager: mean_a = sum(returns_a) / len(returns_a) mean_b = sum(returns_b) / len(returns_b) - cov = sum((a - mean_a) * (b - mean_b) for a, b in zip(returns_a, returns_b)) / len( - returns_a - ) + cov = sum( + (a - mean_a) * (b - mean_b) for a, b in zip(returns_a, returns_b, strict=True) + ) / len(returns_a) std_a = math.sqrt(sum((a - mean_a) ** 2 for a in returns_a) / len(returns_a)) std_b = math.sqrt(sum((b - mean_b) ** 2 for b in returns_b) / len(returns_b)) @@ -280,7 +278,11 @@ class RiskManager: min_len = min(len(r) for r in all_returns) portfolio_returns = [] for i in range(min_len): - pr = sum(w * r[-(min_len - i)] for w, r in zip(weights, all_returns) if len(r) > i) + pr = sum( + w * r[-(min_len - i)] + for w, r in zip(weights, all_returns, strict=False) + if len(r) > i + ) portfolio_returns.append(pr) if not portfolio_returns: diff --git a/services/order-executor/tests/test_executor.py b/services/order-executor/tests/test_executor.py index dd823d7..cda6b72 100644 --- a/services/order-executor/tests/test_executor.py +++ b/services/order-executor/tests/test_executor.py @@ -4,11 +4,11 @@ from decimal import Decimal from unittest.mock import AsyncMock, MagicMock import pytest - -from shared.models import OrderSide, OrderStatus, Signal from order_executor.executor import OrderExecutor from order_executor.risk_manager import RiskCheckResult, RiskManager +from shared.models import OrderSide, OrderStatus, Signal + def make_signal(side: OrderSide = OrderSide.BUY, price: str = "100", quantity: str = "1") -> Signal: return Signal( diff --git a/services/order-executor/tests/test_risk_manager.py b/services/order-executor/tests/test_risk_manager.py index 00a9ab4..66e769c 100644 --- a/services/order-executor/tests/test_risk_manager.py +++ b/services/order-executor/tests/test_risk_manager.py @@ -2,12 +2,12 @@ from decimal import Decimal +from order_executor.risk_manager import RiskManager from shared.models import OrderSide, Position, Signal -from order_executor.risk_manager import RiskManager -def make_signal(side: OrderSide, price: str, quantity: str, symbol: str = "BTC/USDT") -> Signal: +def make_signal(side: OrderSide, price: str, quantity: str, symbol: str = "AAPL") -> Signal: return Signal( strategy="test", symbol=symbol, @@ -93,7 +93,7 @@ def test_risk_check_rejects_insufficient_balance(): def test_trailing_stop_set_and_trigger(): """Trailing stop should trigger when price drops below stop level.""" rm = make_risk_manager(trailing_stop_pct="5") - rm.set_trailing_stop("BTC/USDT", Decimal("100")) + rm.set_trailing_stop("AAPL", Decimal("100")) signal = make_signal(side=OrderSide.BUY, price="94", quantity="0.01") result = rm.check(signal, balance=Decimal("10000"), positions={}, daily_pnl=Decimal("0")) @@ -104,10 +104,10 @@ def test_trailing_stop_set_and_trigger(): def test_trailing_stop_updates_highest_price(): """Trailing stop should track the highest price seen.""" rm = make_risk_manager(trailing_stop_pct="5") - rm.set_trailing_stop("BTC/USDT", Decimal("100")) + rm.set_trailing_stop("AAPL", Decimal("100")) # Price rises to 120 => stop at 114 - rm.update_price("BTC/USDT", Decimal("120")) + rm.update_price("AAPL", Decimal("120")) # Price at 115 is above stop (114), should be allowed signal = make_signal(side=OrderSide.BUY, price="115", quantity="0.01") @@ -124,7 +124,7 @@ def test_trailing_stop_updates_highest_price(): def test_trailing_stop_not_triggered_above_stop(): """Trailing stop should not trigger when price is above stop level.""" rm = make_risk_manager(trailing_stop_pct="5") - rm.set_trailing_stop("BTC/USDT", Decimal("100")) + rm.set_trailing_stop("AAPL", Decimal("100")) # Price at 96 is above stop (95), should be allowed signal = make_signal(side=OrderSide.BUY, price="96", quantity="0.01") @@ -140,11 +140,11 @@ def test_max_open_positions_check(): rm = make_risk_manager(max_open_positions=2) positions = { - "BTC/USDT": make_position("BTC/USDT", "1", "100", "100"), - "ETH/USDT": make_position("ETH/USDT", "10", "50", "50"), + "AAPL": make_position("AAPL", "1", "100", "100"), + "MSFT": make_position("MSFT", "10", "50", "50"), } - signal = make_signal(side=OrderSide.BUY, price="10", quantity="1", symbol="SOL/USDT") + signal = make_signal(side=OrderSide.BUY, price="10", quantity="1", symbol="TSLA") result = rm.check(signal, balance=Decimal("10000"), positions=positions, daily_pnl=Decimal("0")) assert result.allowed is False assert result.reason == "Max open positions reached" @@ -158,14 +158,14 @@ def test_volatility_calculation(): rm = make_risk_manager(volatility_lookback=5) # No history yet - assert rm.get_volatility("BTC/USDT") is None + assert rm.get_volatility("AAPL") is None # Feed prices prices = [100, 102, 98, 105, 101] for p in prices: - rm.update_price("BTC/USDT", Decimal(str(p))) + rm.update_price("AAPL", Decimal(str(p))) - vol = rm.get_volatility("BTC/USDT") + vol = rm.get_volatility("AAPL") assert vol is not None assert vol > 0 @@ -177,9 +177,9 @@ def test_position_size_with_volatility_scaling(): # Feed volatile prices prices = [100, 120, 80, 130, 70] for p in prices: - rm.update_price("BTC/USDT", Decimal(str(p))) + rm.update_price("AAPL", Decimal(str(p))) - size = rm.calculate_position_size("BTC/USDT", Decimal("10000")) + size = rm.calculate_position_size("AAPL", Decimal("10000")) base = Decimal("10000") * Decimal("0.1") # High volatility should reduce size below base @@ -192,9 +192,9 @@ def test_position_size_without_scaling(): prices = [100, 120, 80, 130, 70] for p in prices: - rm.update_price("BTC/USDT", Decimal(str(p))) + rm.update_price("AAPL", Decimal(str(p))) - size = rm.calculate_position_size("BTC/USDT", Decimal("10000")) + size = rm.calculate_position_size("AAPL", Decimal("10000")) base = Decimal("10000") * Decimal("0.1") assert size == base @@ -211,8 +211,8 @@ def test_portfolio_exposure_check_passes(): max_portfolio_exposure=0.8, ) positions = { - "BTCUSDT": Position( - symbol="BTCUSDT", + "AAPL": Position( + symbol="AAPL", quantity=Decimal("0.01"), avg_entry_price=Decimal("50000"), current_price=Decimal("50000"), @@ -230,8 +230,8 @@ def test_portfolio_exposure_check_rejects(): max_portfolio_exposure=0.3, ) positions = { - "BTCUSDT": Position( - symbol="BTCUSDT", + "AAPL": Position( + symbol="AAPL", quantity=Decimal("1"), avg_entry_price=Decimal("50000"), current_price=Decimal("50000"), @@ -263,10 +263,10 @@ def test_var_calculation(): daily_loss_limit_pct=Decimal("10"), ) for i in range(30): - rm.update_price("BTCUSDT", Decimal(str(100 + (i % 5) - 2))) + rm.update_price("AAPL", Decimal(str(100 + (i % 5) - 2))) positions = { - "BTCUSDT": Position( - symbol="BTCUSDT", + "AAPL": Position( + symbol="AAPL", quantity=Decimal("1"), avg_entry_price=Decimal("100"), current_price=Decimal("100"), @@ -357,7 +357,7 @@ def test_drawdown_check_rejects_in_check(): rm.update_balance(Decimal("10000")) signal = Signal( strategy="test", - symbol="BTC/USDT", + symbol="AAPL", side=OrderSide.BUY, price=Decimal("50000"), quantity=Decimal("0.01"), |
