summaryrefslogtreecommitdiff
path: root/services/order-executor
diff options
context:
space:
mode:
Diffstat (limited to 'services/order-executor')
-rw-r--r--services/order-executor/src/order_executor/executor.py16
-rw-r--r--services/order-executor/src/order_executor/main.py21
-rw-r--r--services/order-executor/src/order_executor/risk_manager.py26
-rw-r--r--services/order-executor/tests/test_executor.py4
-rw-r--r--services/order-executor/tests/test_risk_manager.py2
5 files changed, 30 insertions, 39 deletions
diff --git a/services/order-executor/src/order_executor/executor.py b/services/order-executor/src/order_executor/executor.py
index a71e762..fd502cd 100644
--- a/services/order-executor/src/order_executor/executor.py
+++ b/services/order-executor/src/order_executor/executor.py
@@ -1,18 +1,18 @@
"""Order execution logic."""
-import structlog
-from datetime import datetime, timezone
+from datetime import UTC, datetime
from decimal import Decimal
-from typing import Any, Optional
+from typing import Any
+
+import structlog
+from order_executor.risk_manager import RiskManager
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import OrderEvent
from shared.models import Order, OrderStatus, OrderType, Signal
from shared.notifier import TelegramNotifier
-from order_executor.risk_manager import RiskManager
-
logger = structlog.get_logger()
@@ -35,7 +35,7 @@ class OrderExecutor:
self.notifier = notifier
self.dry_run = dry_run
- async def execute(self, signal: Signal) -> Optional[Order]:
+ async def execute(self, signal: Signal) -> Order | None:
"""Run risk checks and place an order for the given signal."""
# Fetch buying power from Alpaca
balance = await self.exchange.get_buying_power()
@@ -71,7 +71,7 @@ class OrderExecutor:
if self.dry_run:
order.status = OrderStatus.FILLED
- order.filled_at = datetime.now(timezone.utc)
+ order.filled_at = datetime.now(UTC)
logger.info(
"order_filled_dry_run",
side=str(order.side),
@@ -87,7 +87,7 @@ class OrderExecutor:
type="market",
)
order.status = OrderStatus.FILLED
- order.filled_at = datetime.now(timezone.utc)
+ order.filled_at = datetime.now(UTC)
logger.info(
"order_filled",
side=str(order.side),
diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py
index d9e2373..99f88e1 100644
--- a/services/order-executor/src/order_executor/main.py
+++ b/services/order-executor/src/order_executor/main.py
@@ -5,6 +5,9 @@ from decimal import Decimal
import aiohttp
+from order_executor.config import ExecutorConfig
+from order_executor.executor import OrderExecutor
+from order_executor.risk_manager import RiskManager
from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
from shared.db import Database
@@ -15,10 +18,6 @@ from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
from shared.shutdown import GracefulShutdown
-from order_executor.config import ExecutorConfig
-from order_executor.executor import OrderExecutor
-from order_executor.risk_manager import RiskManager
-
# Health check port: base + 2
HEALTH_PORT_OFFSET = 2
@@ -100,12 +99,7 @@ async def run() -> None:
if event.type == EventType.SIGNAL:
await executor.execute(event.data)
await broker.ack(stream, GROUP, msg_id)
- except (
- aiohttp.ClientError,
- ConnectionError,
- TimeoutError,
- asyncio.TimeoutError,
- ) as exc:
+ except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc:
log.warning("pending_network_error", error=str(exc), msg_id=msg_id)
except (ValueError, KeyError, TypeError) as exc:
log.warning("pending_parse_error", error=str(exc), msg_id=msg_id)
@@ -126,12 +120,7 @@ async def run() -> None:
service="order-executor", event_type="signal"
).inc()
await broker.ack(stream, GROUP, msg_id)
- except (
- aiohttp.ClientError,
- ConnectionError,
- TimeoutError,
- asyncio.TimeoutError,
- ) as exc:
+ except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc:
log.warning("process_network_error", error=str(exc))
metrics.errors_total.labels(
service="order-executor", error_type="network"
diff --git a/services/order-executor/src/order_executor/risk_manager.py b/services/order-executor/src/order_executor/risk_manager.py
index 5a05746..811a862 100644
--- a/services/order-executor/src/order_executor/risk_manager.py
+++ b/services/order-executor/src/order_executor/risk_manager.py
@@ -1,12 +1,12 @@
"""Risk management for order execution."""
+import math
+from collections import deque
from dataclasses import dataclass
-from datetime import datetime, timezone, timedelta
+from datetime import UTC, datetime, timedelta
from decimal import Decimal
-from collections import deque
-import math
-from shared.models import Signal, OrderSide, Position
+from shared.models import OrderSide, Position, Signal
@dataclass
@@ -123,15 +123,13 @@ class RiskManager:
else:
self._consecutive_losses += 1
if self._consecutive_losses >= self._max_consecutive_losses:
- self._paused_until = datetime.now(timezone.utc) + timedelta(
- minutes=self._loss_pause_minutes
- )
+ self._paused_until = datetime.now(UTC) + timedelta(minutes=self._loss_pause_minutes)
def is_paused(self) -> bool:
"""Check if trading is paused due to consecutive losses."""
if self._paused_until is None:
return False
- if datetime.now(timezone.utc) >= self._paused_until:
+ if datetime.now(UTC) >= self._paused_until:
self._paused_until = None
self._consecutive_losses = 0
return False
@@ -233,9 +231,9 @@ class RiskManager:
mean_a = sum(returns_a) / len(returns_a)
mean_b = sum(returns_b) / len(returns_b)
- cov = sum((a - mean_a) * (b - mean_b) for a, b in zip(returns_a, returns_b)) / len(
- returns_a
- )
+ cov = sum(
+ (a - mean_a) * (b - mean_b) for a, b in zip(returns_a, returns_b, strict=True)
+ ) / len(returns_a)
std_a = math.sqrt(sum((a - mean_a) ** 2 for a in returns_a) / len(returns_a))
std_b = math.sqrt(sum((b - mean_b) ** 2 for b in returns_b) / len(returns_b))
@@ -280,7 +278,11 @@ class RiskManager:
min_len = min(len(r) for r in all_returns)
portfolio_returns = []
for i in range(min_len):
- pr = sum(w * r[-(min_len - i)] for w, r in zip(weights, all_returns) if len(r) > i)
+ pr = sum(
+ w * r[-(min_len - i)]
+ for w, r in zip(weights, all_returns, strict=False)
+ if len(r) > i
+ )
portfolio_returns.append(pr)
if not portfolio_returns:
diff --git a/services/order-executor/tests/test_executor.py b/services/order-executor/tests/test_executor.py
index dd823d7..cda6b72 100644
--- a/services/order-executor/tests/test_executor.py
+++ b/services/order-executor/tests/test_executor.py
@@ -4,11 +4,11 @@ from decimal import Decimal
from unittest.mock import AsyncMock, MagicMock
import pytest
-
-from shared.models import OrderSide, OrderStatus, Signal
from order_executor.executor import OrderExecutor
from order_executor.risk_manager import RiskCheckResult, RiskManager
+from shared.models import OrderSide, OrderStatus, Signal
+
def make_signal(side: OrderSide = OrderSide.BUY, price: str = "100", quantity: str = "1") -> Signal:
return Signal(
diff --git a/services/order-executor/tests/test_risk_manager.py b/services/order-executor/tests/test_risk_manager.py
index 3d5175b..66e769c 100644
--- a/services/order-executor/tests/test_risk_manager.py
+++ b/services/order-executor/tests/test_risk_manager.py
@@ -2,9 +2,9 @@
from decimal import Decimal
+from order_executor.risk_manager import RiskManager
from shared.models import OrderSide, Position, Signal
-from order_executor.risk_manager import RiskManager
def make_signal(side: OrderSide, price: str, quantity: str, symbol: str = "AAPL") -> Signal: