summaryrefslogtreecommitdiff
path: root/services/order-executor/src
diff options
context:
space:
mode:
Diffstat (limited to 'services/order-executor/src')
-rw-r--r--services/order-executor/src/order_executor/executor.py14
-rw-r--r--services/order-executor/src/order_executor/main.py80
-rw-r--r--services/order-executor/src/order_executor/risk_manager.py251
3 files changed, 293 insertions, 52 deletions
diff --git a/services/order-executor/src/order_executor/executor.py b/services/order-executor/src/order_executor/executor.py
index 80f441d..a71e762 100644
--- a/services/order-executor/src/order_executor/executor.py
+++ b/services/order-executor/src/order_executor/executor.py
@@ -37,12 +37,8 @@ class OrderExecutor:
async def execute(self, signal: Signal) -> Optional[Order]:
"""Run risk checks and place an order for the given signal."""
- # Fetch current balance from exchange
- balance_data = await self.exchange.fetch_balance()
- # Use USDT (or quote currency) free balance as available capital
- free_balances = balance_data.get("free", {})
- quote_currency = signal.symbol.split("/")[-1] if "/" in signal.symbol else "USDT"
- balance = Decimal(str(free_balances.get(quote_currency, 0)))
+ # Fetch buying power from Alpaca
+ balance = await self.exchange.get_buying_power()
# Fetch current positions
positions = {}
@@ -84,11 +80,11 @@ class OrderExecutor:
)
else:
try:
- await self.exchange.create_order(
+ await self.exchange.submit_order(
symbol=signal.symbol,
- type="market",
+ qty=float(signal.quantity),
side=signal.side.value.lower(),
- amount=float(signal.quantity),
+ type="market",
)
order.status = OrderStatus.FILLED
order.filled_at = datetime.now(timezone.utc)
diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py
index 3fe4c12..51ab286 100644
--- a/services/order-executor/src/order_executor/main.py
+++ b/services/order-executor/src/order_executor/main.py
@@ -3,11 +3,11 @@
import asyncio
from decimal import Decimal
+from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import Event, EventType
from shared.healthcheck import HealthCheckServer
-from shared.exchange import create_exchange
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
@@ -16,9 +16,7 @@ from order_executor.config import ExecutorConfig
from order_executor.executor import OrderExecutor
from order_executor.risk_manager import RiskManager
-# Health check port: base (HEALTH_PORT, default 8080) + offset
-# data-collector: +0 (8080), strategy-engine: +1 (8081)
-# order-executor: +2 (8082), portfolio-manager: +3 (8083)
+# Health check port: base + 2
HEALTH_PORT_OFFSET = 2
@@ -26,21 +24,21 @@ async def run() -> None:
config = ExecutorConfig()
log = setup_logging("order-executor", config.log_level, config.log_format)
metrics = ServiceMetrics("order_executor")
+
notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id
+ bot_token=config.telegram_bot_token,
+ chat_id=config.telegram_chat_id,
)
db = Database(config.database_url)
await db.connect()
- await db.init_tables()
broker = RedisBroker(config.redis_url)
- exchange = create_exchange(
- exchange_id=config.exchange_id,
- api_key=config.binance_api_key,
- api_secret=config.binance_api_secret,
- sandbox=config.exchange_sandbox,
+ alpaca = AlpacaClient(
+ api_key=config.alpaca_api_key,
+ api_secret=config.alpaca_api_secret,
+ paper=config.alpaca_paper,
)
risk_manager = RiskManager(
@@ -51,10 +49,19 @@ async def run() -> None:
max_open_positions=config.risk_max_open_positions,
volatility_lookback=config.risk_volatility_lookback,
volatility_scale=config.risk_volatility_scale,
+ max_portfolio_exposure=config.risk_max_portfolio_exposure,
+ max_correlated_exposure=config.risk_max_correlated_exposure,
+ correlation_threshold=config.risk_correlation_threshold,
+ var_confidence=config.risk_var_confidence,
+ var_limit_pct=config.risk_var_limit_pct,
+ drawdown_reduction_threshold=config.risk_drawdown_reduction_threshold,
+ drawdown_halt_threshold=config.risk_drawdown_halt_threshold,
+ max_consecutive_losses=config.risk_max_consecutive_losses,
+ loss_pause_minutes=config.risk_loss_pause_minutes,
)
executor = OrderExecutor(
- exchange=exchange,
+ exchange=alpaca,
risk_manager=risk_manager,
broker=broker,
db=db,
@@ -62,41 +69,34 @@ async def run() -> None:
dry_run=config.dry_run,
)
- GROUP = "order-executor"
- CONSUMER = "executor-1"
- stream = "signals"
-
health = HealthCheckServer(
"order-executor",
port=config.health_port + HEALTH_PORT_OFFSET,
auth_token=config.metrics_auth_token,
)
- health.register_check("redis", broker.ping)
await health.start()
metrics.service_up.labels(service="order-executor").set(1)
- log.info("service_started", stream=stream, dry_run=config.dry_run)
+ GROUP = "order-executor"
+ CONSUMER = "executor-1"
+ stream = "signals"
await broker.ensure_group(stream, GROUP)
- # Process pending messages first (from previous crash)
- pending = await broker.read_pending(stream, GROUP, CONSUMER)
- for msg_id, msg in pending:
- try:
- event = Event.from_dict(msg)
- if event.type == EventType.SIGNAL:
- signal = event.data
- log.info(
- "processing_pending_signal", signal_id=str(signal.id), symbol=signal.symbol
- )
- await executor.execute(signal)
- metrics.events_processed.labels(service="order-executor", event_type="signal").inc()
- await broker.ack(stream, GROUP, msg_id)
- except Exception as exc:
- log.error("pending_process_failed", error=str(exc), msg_id=msg_id)
- metrics.errors_total.labels(service="order-executor", error_type="processing").inc()
+ log.info("started", stream=stream, dry_run=config.dry_run)
try:
+ # Process pending messages first
+ pending = await broker.read_pending(stream, GROUP, CONSUMER)
+ for msg_id, msg in pending:
+ try:
+ event = Event.from_dict(msg)
+ if event.type == EventType.SIGNAL:
+ await executor.execute(event.data)
+ await broker.ack(stream, GROUP, msg_id)
+ except Exception as exc:
+ log.error("pending_failed", error=str(exc), msg_id=msg_id)
+
while True:
messages = await broker.read_group(stream, GROUP, CONSUMER, count=10, block=5000)
for msg_id, msg in messages:
@@ -104,29 +104,23 @@ async def run() -> None:
event = Event.from_dict(msg)
if event.type == EventType.SIGNAL:
signal = event.data
- log.info(
- "processing_signal", signal_id=str(signal.id), symbol=signal.symbol
- )
+ log.info("processing_signal", signal_id=signal.id, symbol=signal.symbol)
await executor.execute(signal)
metrics.events_processed.labels(
service="order-executor", event_type="signal"
).inc()
await broker.ack(stream, GROUP, msg_id)
except Exception as exc:
- log.error("message_processing_failed", error=str(exc), msg_id=msg_id)
+ log.error("process_failed", error=str(exc))
metrics.errors_total.labels(
service="order-executor", error_type="processing"
).inc()
- except Exception as exc:
- log.error("fatal_error", error=str(exc))
- await notifier.send_error(str(exc), "order-executor")
- raise
finally:
metrics.service_up.labels(service="order-executor").set(0)
await notifier.close()
await broker.close()
await db.close()
- await exchange.close()
+ await alpaca.close()
def main() -> None:
diff --git a/services/order-executor/src/order_executor/risk_manager.py b/services/order-executor/src/order_executor/risk_manager.py
index c3578a7..5a05746 100644
--- a/services/order-executor/src/order_executor/risk_manager.py
+++ b/services/order-executor/src/order_executor/risk_manager.py
@@ -1,6 +1,7 @@
"""Risk management for order execution."""
from dataclasses import dataclass
+from datetime import datetime, timezone, timedelta
from decimal import Decimal
from collections import deque
import math
@@ -46,6 +47,15 @@ class RiskManager:
max_open_positions: int = 10,
volatility_lookback: int = 20,
volatility_scale: bool = False,
+ max_portfolio_exposure: float = 0.8,
+ max_correlated_exposure: float = 0.5,
+ correlation_threshold: float = 0.7,
+ var_confidence: float = 0.95,
+ var_limit_pct: float = 5.0,
+ drawdown_reduction_threshold: float = 0.1, # Start reducing at 10% drawdown
+ drawdown_halt_threshold: float = 0.2, # Halt trading at 20% drawdown
+ max_consecutive_losses: int = 5, # Pause after 5 consecutive losses
+ loss_pause_minutes: int = 60, # Pause for 60 minutes after consecutive losses
) -> None:
self.max_position_size = max_position_size
self.stop_loss_pct = stop_loss_pct
@@ -57,6 +67,75 @@ class RiskManager:
self._trailing_stops: dict[str, TrailingStop] = {}
self._price_history: dict[str, deque[float]] = {}
+ self._return_history: dict[str, list[float]] = {}
+ self._max_portfolio_exposure = Decimal(str(max_portfolio_exposure))
+ self._max_correlated_exposure = Decimal(str(max_correlated_exposure))
+ self._correlation_threshold = correlation_threshold
+ self._var_confidence = var_confidence
+ self._var_limit_pct = Decimal(str(var_limit_pct))
+
+ self._drawdown_reduction_threshold = drawdown_reduction_threshold
+ self._drawdown_halt_threshold = drawdown_halt_threshold
+ self._max_consecutive_losses = max_consecutive_losses
+ self._loss_pause_minutes = loss_pause_minutes
+
+ self._peak_balance: Decimal = Decimal("0")
+ self._consecutive_losses: int = 0
+ self._paused_until: datetime | None = None
+
+ def update_balance(self, current_balance: Decimal) -> None:
+ """Track peak balance for drawdown calculation."""
+ if current_balance > self._peak_balance:
+ self._peak_balance = current_balance
+
+ def get_current_drawdown(self, current_balance: Decimal) -> float:
+ """Calculate current drawdown from peak as a fraction (0.0 to 1.0)."""
+ if self._peak_balance <= 0:
+ return 0.0
+ dd = float((self._peak_balance - current_balance) / self._peak_balance)
+ return max(dd, 0.0)
+
+ def get_position_scale(self, current_balance: Decimal) -> float:
+ """Get position size multiplier based on current drawdown.
+
+ Returns 1.0 (full size) when no drawdown.
+ Linearly reduces to 0.25 between reduction threshold and halt threshold.
+ Returns 0.0 at or beyond halt threshold.
+ """
+ dd = self.get_current_drawdown(current_balance)
+
+ if dd >= self._drawdown_halt_threshold:
+ return 0.0
+
+ if dd >= self._drawdown_reduction_threshold:
+ # Linear interpolation from 1.0 to 0.25
+ range_pct = (dd - self._drawdown_reduction_threshold) / (
+ self._drawdown_halt_threshold - self._drawdown_reduction_threshold
+ )
+ return max(1.0 - 0.75 * range_pct, 0.25)
+
+ return 1.0
+
+ def record_trade_result(self, is_win: bool) -> None:
+ """Record a trade result for consecutive loss tracking."""
+ if is_win:
+ self._consecutive_losses = 0
+ else:
+ self._consecutive_losses += 1
+ if self._consecutive_losses >= self._max_consecutive_losses:
+ self._paused_until = datetime.now(timezone.utc) + timedelta(
+ minutes=self._loss_pause_minutes
+ )
+
+ def is_paused(self) -> bool:
+ """Check if trading is paused due to consecutive losses."""
+ if self._paused_until is None:
+ return False
+ if datetime.now(timezone.utc) >= self._paused_until:
+ self._paused_until = None
+ self._consecutive_losses = 0
+ return False
+ return True
def update_price(self, symbol: str, price: Decimal) -> None:
"""Update price tracking for trailing stops and volatility."""
@@ -120,6 +199,145 @@ class RiskManager:
scale = min(target_vol / vol, 2.0) # Cap at 2x
return base_size * Decimal(str(scale))
+ def calculate_correlation(self, symbol_a: str, symbol_b: str) -> float | None:
+ """Calculate Pearson correlation between two symbols' returns."""
+ hist_a = self._price_history.get(symbol_a)
+ hist_b = self._price_history.get(symbol_b)
+ if not hist_a or not hist_b or len(hist_a) < 5 or len(hist_b) < 5:
+ return None
+
+ prices_a = list(hist_a)
+ prices_b = list(hist_b)
+ min_len = min(len(prices_a), len(prices_b))
+ prices_a = prices_a[-min_len:]
+ prices_b = prices_b[-min_len:]
+
+ returns_a = [
+ (prices_a[i] - prices_a[i - 1]) / prices_a[i - 1]
+ for i in range(1, len(prices_a))
+ if prices_a[i - 1] != 0
+ ]
+ returns_b = [
+ (prices_b[i] - prices_b[i - 1]) / prices_b[i - 1]
+ for i in range(1, len(prices_b))
+ if prices_b[i - 1] != 0
+ ]
+
+ if len(returns_a) < 3 or len(returns_b) < 3:
+ return None
+
+ min_len = min(len(returns_a), len(returns_b))
+ returns_a = returns_a[-min_len:]
+ returns_b = returns_b[-min_len:]
+
+ mean_a = sum(returns_a) / len(returns_a)
+ mean_b = sum(returns_b) / len(returns_b)
+
+ cov = sum((a - mean_a) * (b - mean_b) for a, b in zip(returns_a, returns_b)) / len(
+ returns_a
+ )
+ std_a = math.sqrt(sum((a - mean_a) ** 2 for a in returns_a) / len(returns_a))
+ std_b = math.sqrt(sum((b - mean_b) ** 2 for b in returns_b) / len(returns_b))
+
+ if std_a == 0 or std_b == 0:
+ return None
+
+ return cov / (std_a * std_b)
+
+ def calculate_portfolio_var(self, positions: dict[str, Position], balance: Decimal) -> float:
+ """Calculate portfolio VaR using historical simulation.
+
+ Returns VaR as a percentage of balance (e.g., 3.5 for 3.5%).
+ """
+ if not positions or balance <= 0:
+ return 0.0
+
+ # Collect returns for all positioned symbols
+ all_returns: list[list[float]] = []
+ weights: list[float] = []
+
+ for symbol, pos in positions.items():
+ if pos.quantity <= 0:
+ continue
+ hist = self._price_history.get(symbol)
+ if not hist or len(hist) < 5:
+ continue
+ prices = list(hist)
+ returns = [
+ (prices[i] - prices[i - 1]) / prices[i - 1]
+ for i in range(1, len(prices))
+ if prices[i - 1] != 0
+ ]
+ if returns:
+ all_returns.append(returns)
+ weight = float(pos.quantity * pos.current_price / balance)
+ weights.append(weight)
+
+ if not all_returns:
+ return 0.0
+
+ # Portfolio returns (weighted sum)
+ min_len = min(len(r) for r in all_returns)
+ portfolio_returns = []
+ for i in range(min_len):
+ pr = sum(w * r[-(min_len - i)] for w, r in zip(weights, all_returns) if len(r) > i)
+ portfolio_returns.append(pr)
+
+ if not portfolio_returns:
+ return 0.0
+
+ # Historical VaR: sort returns, take the (1-confidence) percentile
+ sorted_returns = sorted(portfolio_returns)
+ index = int((1 - self._var_confidence) * len(sorted_returns))
+ index = max(0, min(index, len(sorted_returns) - 1))
+ var_return = sorted_returns[index]
+
+ return abs(var_return) * 100 # As percentage
+
+ def check_portfolio_exposure(
+ self, positions: dict[str, Position], balance: Decimal
+ ) -> RiskCheckResult:
+ """Check total portfolio exposure."""
+ if balance <= 0:
+ return RiskCheckResult(allowed=True, reason="OK")
+
+ total_exposure = sum(
+ pos.quantity * pos.current_price for pos in positions.values() if pos.quantity > 0
+ )
+
+ exposure_ratio = total_exposure / balance
+ if exposure_ratio > self._max_portfolio_exposure:
+ return RiskCheckResult(
+ allowed=False,
+ reason=f"Portfolio exposure {float(exposure_ratio):.1%} exceeds max {float(self._max_portfolio_exposure):.1%}",
+ )
+
+ return RiskCheckResult(allowed=True, reason="OK")
+
+ def check_correlation_risk(
+ self, signal: Signal, positions: dict[str, Position], balance: Decimal
+ ) -> RiskCheckResult:
+ """Check if adding this position creates too much correlated exposure."""
+ if signal.side != OrderSide.BUY or balance <= 0:
+ return RiskCheckResult(allowed=True, reason="OK")
+
+ correlated_value = signal.price * signal.quantity
+
+ for symbol, pos in positions.items():
+ if pos.quantity <= 0 or symbol == signal.symbol:
+ continue
+ corr = self.calculate_correlation(signal.symbol, symbol)
+ if corr is not None and abs(corr) >= self._correlation_threshold:
+ correlated_value += pos.quantity * pos.current_price
+
+ if correlated_value / balance > self._max_correlated_exposure:
+ return RiskCheckResult(
+ allowed=False,
+ reason=f"Correlated exposure would exceed {float(self._max_correlated_exposure):.1%}",
+ )
+
+ return RiskCheckResult(allowed=True, reason="OK")
+
def check(
self,
signal: Signal,
@@ -128,6 +346,21 @@ class RiskManager:
daily_pnl: Decimal,
) -> RiskCheckResult:
"""Run risk checks against a signal and current portfolio state."""
+ # Check if paused due to consecutive losses
+ if self.is_paused():
+ return RiskCheckResult(
+ allowed=False,
+ reason=f"Trading paused until {self._paused_until.isoformat()} after {self._max_consecutive_losses} consecutive losses",
+ )
+
+ # Check drawdown halt
+ dd = self.get_current_drawdown(balance)
+ if dd >= self._drawdown_halt_threshold:
+ return RiskCheckResult(
+ allowed=False,
+ reason=f"Trading halted: drawdown {dd:.1%} exceeds {self._drawdown_halt_threshold:.1%}",
+ )
+
# Check daily loss limit
if balance > 0 and (daily_pnl / balance) * 100 < -self.daily_loss_limit_pct:
return RiskCheckResult(allowed=False, reason="Daily loss limit exceeded")
@@ -165,4 +398,22 @@ class RiskManager:
):
return RiskCheckResult(allowed=False, reason="Position size exceeded")
+ # Portfolio-level checks
+ exposure_check = self.check_portfolio_exposure(positions, balance)
+ if not exposure_check.allowed:
+ return exposure_check
+
+ corr_check = self.check_correlation_risk(signal, positions, balance)
+ if not corr_check.allowed:
+ return corr_check
+
+ # VaR check
+ if positions:
+ var = self.calculate_portfolio_var(positions, balance)
+ if var > float(self._var_limit_pct):
+ return RiskCheckResult(
+ allowed=False,
+ reason=f"Portfolio VaR {var:.1f}% exceeds limit {float(self._var_limit_pct):.1f}%",
+ )
+
return RiskCheckResult(allowed=True, reason="OK")