diff options
Diffstat (limited to 'services/order-executor/src/order_executor')
3 files changed, 55 insertions, 32 deletions
diff --git a/services/order-executor/src/order_executor/executor.py b/services/order-executor/src/order_executor/executor.py index a71e762..fd502cd 100644 --- a/services/order-executor/src/order_executor/executor.py +++ b/services/order-executor/src/order_executor/executor.py @@ -1,18 +1,18 @@ """Order execution logic.""" -import structlog -from datetime import datetime, timezone +from datetime import UTC, datetime from decimal import Decimal -from typing import Any, Optional +from typing import Any + +import structlog +from order_executor.risk_manager import RiskManager from shared.broker import RedisBroker from shared.db import Database from shared.events import OrderEvent from shared.models import Order, OrderStatus, OrderType, Signal from shared.notifier import TelegramNotifier -from order_executor.risk_manager import RiskManager - logger = structlog.get_logger() @@ -35,7 +35,7 @@ class OrderExecutor: self.notifier = notifier self.dry_run = dry_run - async def execute(self, signal: Signal) -> Optional[Order]: + async def execute(self, signal: Signal) -> Order | None: """Run risk checks and place an order for the given signal.""" # Fetch buying power from Alpaca balance = await self.exchange.get_buying_power() @@ -71,7 +71,7 @@ class OrderExecutor: if self.dry_run: order.status = OrderStatus.FILLED - order.filled_at = datetime.now(timezone.utc) + order.filled_at = datetime.now(UTC) logger.info( "order_filled_dry_run", side=str(order.side), @@ -87,7 +87,7 @@ class OrderExecutor: type="market", ) order.status = OrderStatus.FILLED - order.filled_at = datetime.now(timezone.utc) + order.filled_at = datetime.now(UTC) logger.info( "order_filled", side=str(order.side), diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py index 51ab286..99f88e1 100644 --- a/services/order-executor/src/order_executor/main.py +++ b/services/order-executor/src/order_executor/main.py @@ -3,6 +3,11 @@ import asyncio from decimal import Decimal +import aiohttp + +from order_executor.config import ExecutorConfig +from order_executor.executor import OrderExecutor +from order_executor.risk_manager import RiskManager from shared.alpaca import AlpacaClient from shared.broker import RedisBroker from shared.db import Database @@ -11,10 +16,7 @@ from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier - -from order_executor.config import ExecutorConfig -from order_executor.executor import OrderExecutor -from order_executor.risk_manager import RiskManager +from shared.shutdown import GracefulShutdown # Health check port: base + 2 HEALTH_PORT_OFFSET = 2 @@ -26,18 +28,18 @@ async def run() -> None: metrics = ServiceMetrics("order_executor") notifier = TelegramNotifier( - bot_token=config.telegram_bot_token, + bot_token=config.telegram_bot_token.get_secret_value(), chat_id=config.telegram_chat_id, ) - db = Database(config.database_url) + db = Database(config.database_url.get_secret_value()) await db.connect() - broker = RedisBroker(config.redis_url) + broker = RedisBroker(config.redis_url.get_secret_value()) alpaca = AlpacaClient( - api_key=config.alpaca_api_key, - api_secret=config.alpaca_api_secret, + api_key=config.alpaca_api_key.get_secret_value(), + api_secret=config.alpaca_api_secret.get_secret_value(), paper=config.alpaca_paper, ) @@ -83,6 +85,9 @@ async def run() -> None: await broker.ensure_group(stream, GROUP) + shutdown = GracefulShutdown() + shutdown.install_handlers() + log.info("started", stream=stream, dry_run=config.dry_run) try: @@ -94,10 +99,15 @@ async def run() -> None: if event.type == EventType.SIGNAL: await executor.execute(event.data) await broker.ack(stream, GROUP, msg_id) + except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc: + log.warning("pending_network_error", error=str(exc), msg_id=msg_id) + except (ValueError, KeyError, TypeError) as exc: + log.warning("pending_parse_error", error=str(exc), msg_id=msg_id) + await broker.ack(stream, GROUP, msg_id) except Exception as exc: - log.error("pending_failed", error=str(exc), msg_id=msg_id) + log.error("pending_failed", error=str(exc), msg_id=msg_id, exc_info=True) - while True: + while not shutdown.is_shutting_down: messages = await broker.read_group(stream, GROUP, CONSUMER, count=10, block=5000) for msg_id, msg in messages: try: @@ -110,8 +120,19 @@ async def run() -> None: service="order-executor", event_type="signal" ).inc() await broker.ack(stream, GROUP, msg_id) + except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc: + log.warning("process_network_error", error=str(exc)) + metrics.errors_total.labels( + service="order-executor", error_type="network" + ).inc() + except (ValueError, KeyError, TypeError) as exc: + log.warning("process_parse_error", error=str(exc)) + await broker.ack(stream, GROUP, msg_id) + metrics.errors_total.labels( + service="order-executor", error_type="validation" + ).inc() except Exception as exc: - log.error("process_failed", error=str(exc)) + log.error("process_failed", error=str(exc), exc_info=True) metrics.errors_total.labels( service="order-executor", error_type="processing" ).inc() diff --git a/services/order-executor/src/order_executor/risk_manager.py b/services/order-executor/src/order_executor/risk_manager.py index 5a05746..811a862 100644 --- a/services/order-executor/src/order_executor/risk_manager.py +++ b/services/order-executor/src/order_executor/risk_manager.py @@ -1,12 +1,12 @@ """Risk management for order execution.""" +import math +from collections import deque from dataclasses import dataclass -from datetime import datetime, timezone, timedelta +from datetime import UTC, datetime, timedelta from decimal import Decimal -from collections import deque -import math -from shared.models import Signal, OrderSide, Position +from shared.models import OrderSide, Position, Signal @dataclass @@ -123,15 +123,13 @@ class RiskManager: else: self._consecutive_losses += 1 if self._consecutive_losses >= self._max_consecutive_losses: - self._paused_until = datetime.now(timezone.utc) + timedelta( - minutes=self._loss_pause_minutes - ) + self._paused_until = datetime.now(UTC) + timedelta(minutes=self._loss_pause_minutes) def is_paused(self) -> bool: """Check if trading is paused due to consecutive losses.""" if self._paused_until is None: return False - if datetime.now(timezone.utc) >= self._paused_until: + if datetime.now(UTC) >= self._paused_until: self._paused_until = None self._consecutive_losses = 0 return False @@ -233,9 +231,9 @@ class RiskManager: mean_a = sum(returns_a) / len(returns_a) mean_b = sum(returns_b) / len(returns_b) - cov = sum((a - mean_a) * (b - mean_b) for a, b in zip(returns_a, returns_b)) / len( - returns_a - ) + cov = sum( + (a - mean_a) * (b - mean_b) for a, b in zip(returns_a, returns_b, strict=True) + ) / len(returns_a) std_a = math.sqrt(sum((a - mean_a) ** 2 for a in returns_a) / len(returns_a)) std_b = math.sqrt(sum((b - mean_b) ** 2 for b in returns_b) / len(returns_b)) @@ -280,7 +278,11 @@ class RiskManager: min_len = min(len(r) for r in all_returns) portfolio_returns = [] for i in range(min_len): - pr = sum(w * r[-(min_len - i)] for w, r in zip(weights, all_returns) if len(r) > i) + pr = sum( + w * r[-(min_len - i)] + for w, r in zip(weights, all_returns, strict=False) + if len(r) > i + ) portfolio_returns.append(pr) if not portfolio_returns: |
