summaryrefslogtreecommitdiff
path: root/services/order-executor/src
diff options
context:
space:
mode:
Diffstat (limited to 'services/order-executor/src')
-rw-r--r--services/order-executor/src/order_executor/executor.py16
-rw-r--r--services/order-executor/src/order_executor/main.py45
-rw-r--r--services/order-executor/src/order_executor/risk_manager.py26
3 files changed, 55 insertions, 32 deletions
diff --git a/services/order-executor/src/order_executor/executor.py b/services/order-executor/src/order_executor/executor.py
index a71e762..fd502cd 100644
--- a/services/order-executor/src/order_executor/executor.py
+++ b/services/order-executor/src/order_executor/executor.py
@@ -1,18 +1,18 @@
"""Order execution logic."""
-import structlog
-from datetime import datetime, timezone
+from datetime import UTC, datetime
from decimal import Decimal
-from typing import Any, Optional
+from typing import Any
+
+import structlog
+from order_executor.risk_manager import RiskManager
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import OrderEvent
from shared.models import Order, OrderStatus, OrderType, Signal
from shared.notifier import TelegramNotifier
-from order_executor.risk_manager import RiskManager
-
logger = structlog.get_logger()
@@ -35,7 +35,7 @@ class OrderExecutor:
self.notifier = notifier
self.dry_run = dry_run
- async def execute(self, signal: Signal) -> Optional[Order]:
+ async def execute(self, signal: Signal) -> Order | None:
"""Run risk checks and place an order for the given signal."""
# Fetch buying power from Alpaca
balance = await self.exchange.get_buying_power()
@@ -71,7 +71,7 @@ class OrderExecutor:
if self.dry_run:
order.status = OrderStatus.FILLED
- order.filled_at = datetime.now(timezone.utc)
+ order.filled_at = datetime.now(UTC)
logger.info(
"order_filled_dry_run",
side=str(order.side),
@@ -87,7 +87,7 @@ class OrderExecutor:
type="market",
)
order.status = OrderStatus.FILLED
- order.filled_at = datetime.now(timezone.utc)
+ order.filled_at = datetime.now(UTC)
logger.info(
"order_filled",
side=str(order.side),
diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py
index 51ab286..99f88e1 100644
--- a/services/order-executor/src/order_executor/main.py
+++ b/services/order-executor/src/order_executor/main.py
@@ -3,6 +3,11 @@
import asyncio
from decimal import Decimal
+import aiohttp
+
+from order_executor.config import ExecutorConfig
+from order_executor.executor import OrderExecutor
+from order_executor.risk_manager import RiskManager
from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
from shared.db import Database
@@ -11,10 +16,7 @@ from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
-
-from order_executor.config import ExecutorConfig
-from order_executor.executor import OrderExecutor
-from order_executor.risk_manager import RiskManager
+from shared.shutdown import GracefulShutdown
# Health check port: base + 2
HEALTH_PORT_OFFSET = 2
@@ -26,18 +28,18 @@ async def run() -> None:
metrics = ServiceMetrics("order_executor")
notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token,
+ bot_token=config.telegram_bot_token.get_secret_value(),
chat_id=config.telegram_chat_id,
)
- db = Database(config.database_url)
+ db = Database(config.database_url.get_secret_value())
await db.connect()
- broker = RedisBroker(config.redis_url)
+ broker = RedisBroker(config.redis_url.get_secret_value())
alpaca = AlpacaClient(
- api_key=config.alpaca_api_key,
- api_secret=config.alpaca_api_secret,
+ api_key=config.alpaca_api_key.get_secret_value(),
+ api_secret=config.alpaca_api_secret.get_secret_value(),
paper=config.alpaca_paper,
)
@@ -83,6 +85,9 @@ async def run() -> None:
await broker.ensure_group(stream, GROUP)
+ shutdown = GracefulShutdown()
+ shutdown.install_handlers()
+
log.info("started", stream=stream, dry_run=config.dry_run)
try:
@@ -94,10 +99,15 @@ async def run() -> None:
if event.type == EventType.SIGNAL:
await executor.execute(event.data)
await broker.ack(stream, GROUP, msg_id)
+ except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc:
+ log.warning("pending_network_error", error=str(exc), msg_id=msg_id)
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("pending_parse_error", error=str(exc), msg_id=msg_id)
+ await broker.ack(stream, GROUP, msg_id)
except Exception as exc:
- log.error("pending_failed", error=str(exc), msg_id=msg_id)
+ log.error("pending_failed", error=str(exc), msg_id=msg_id, exc_info=True)
- while True:
+ while not shutdown.is_shutting_down:
messages = await broker.read_group(stream, GROUP, CONSUMER, count=10, block=5000)
for msg_id, msg in messages:
try:
@@ -110,8 +120,19 @@ async def run() -> None:
service="order-executor", event_type="signal"
).inc()
await broker.ack(stream, GROUP, msg_id)
+ except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc:
+ log.warning("process_network_error", error=str(exc))
+ metrics.errors_total.labels(
+ service="order-executor", error_type="network"
+ ).inc()
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("process_parse_error", error=str(exc))
+ await broker.ack(stream, GROUP, msg_id)
+ metrics.errors_total.labels(
+ service="order-executor", error_type="validation"
+ ).inc()
except Exception as exc:
- log.error("process_failed", error=str(exc))
+ log.error("process_failed", error=str(exc), exc_info=True)
metrics.errors_total.labels(
service="order-executor", error_type="processing"
).inc()
diff --git a/services/order-executor/src/order_executor/risk_manager.py b/services/order-executor/src/order_executor/risk_manager.py
index 5a05746..811a862 100644
--- a/services/order-executor/src/order_executor/risk_manager.py
+++ b/services/order-executor/src/order_executor/risk_manager.py
@@ -1,12 +1,12 @@
"""Risk management for order execution."""
+import math
+from collections import deque
from dataclasses import dataclass
-from datetime import datetime, timezone, timedelta
+from datetime import UTC, datetime, timedelta
from decimal import Decimal
-from collections import deque
-import math
-from shared.models import Signal, OrderSide, Position
+from shared.models import OrderSide, Position, Signal
@dataclass
@@ -123,15 +123,13 @@ class RiskManager:
else:
self._consecutive_losses += 1
if self._consecutive_losses >= self._max_consecutive_losses:
- self._paused_until = datetime.now(timezone.utc) + timedelta(
- minutes=self._loss_pause_minutes
- )
+ self._paused_until = datetime.now(UTC) + timedelta(minutes=self._loss_pause_minutes)
def is_paused(self) -> bool:
"""Check if trading is paused due to consecutive losses."""
if self._paused_until is None:
return False
- if datetime.now(timezone.utc) >= self._paused_until:
+ if datetime.now(UTC) >= self._paused_until:
self._paused_until = None
self._consecutive_losses = 0
return False
@@ -233,9 +231,9 @@ class RiskManager:
mean_a = sum(returns_a) / len(returns_a)
mean_b = sum(returns_b) / len(returns_b)
- cov = sum((a - mean_a) * (b - mean_b) for a, b in zip(returns_a, returns_b)) / len(
- returns_a
- )
+ cov = sum(
+ (a - mean_a) * (b - mean_b) for a, b in zip(returns_a, returns_b, strict=True)
+ ) / len(returns_a)
std_a = math.sqrt(sum((a - mean_a) ** 2 for a in returns_a) / len(returns_a))
std_b = math.sqrt(sum((b - mean_b) ** 2 for b in returns_b) / len(returns_b))
@@ -280,7 +278,11 @@ class RiskManager:
min_len = min(len(r) for r in all_returns)
portfolio_returns = []
for i in range(min_len):
- pr = sum(w * r[-(min_len - i)] for w, r in zip(weights, all_returns) if len(r) > i)
+ pr = sum(
+ w * r[-(min_len - i)]
+ for w, r in zip(weights, all_returns, strict=False)
+ if len(r) > i
+ )
portfolio_returns.append(pr)
if not portfolio_returns: