"""Order Executor Service entry point.""" import asyncio from decimal import Decimal import aiohttp from order_executor.config import ExecutorConfig from order_executor.executor import OrderExecutor from order_executor.risk_manager import RiskManager from shared.alpaca import AlpacaClient from shared.broker import RedisBroker from shared.db import Database from shared.events import Event, EventType from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier from shared.shutdown import GracefulShutdown # Health check port: base + 2 HEALTH_PORT_OFFSET = 2 async def run() -> None: config = ExecutorConfig() log = setup_logging("order-executor", config.log_level, config.log_format) metrics = ServiceMetrics("order_executor") notifier = TelegramNotifier( bot_token=config.telegram_bot_token.get_secret_value(), chat_id=config.telegram_chat_id, ) db = Database(config.database_url.get_secret_value()) await db.connect() broker = RedisBroker(config.redis_url.get_secret_value()) alpaca = AlpacaClient( api_key=config.alpaca_api_key.get_secret_value(), api_secret=config.alpaca_api_secret.get_secret_value(), paper=config.alpaca_paper, ) risk_manager = RiskManager( max_position_size=Decimal(str(config.risk_max_position_size)), stop_loss_pct=Decimal(str(config.risk_stop_loss_pct)), daily_loss_limit_pct=Decimal(str(config.risk_daily_loss_limit_pct)), trailing_stop_pct=Decimal(str(config.risk_trailing_stop_pct)), max_open_positions=config.risk_max_open_positions, volatility_lookback=config.risk_volatility_lookback, volatility_scale=config.risk_volatility_scale, max_portfolio_exposure=config.risk_max_portfolio_exposure, max_correlated_exposure=config.risk_max_correlated_exposure, correlation_threshold=config.risk_correlation_threshold, var_confidence=config.risk_var_confidence, var_limit_pct=config.risk_var_limit_pct, drawdown_reduction_threshold=config.risk_drawdown_reduction_threshold, drawdown_halt_threshold=config.risk_drawdown_halt_threshold, max_consecutive_losses=config.risk_max_consecutive_losses, loss_pause_minutes=config.risk_loss_pause_minutes, ) executor = OrderExecutor( exchange=alpaca, risk_manager=risk_manager, broker=broker, db=db, notifier=notifier, dry_run=config.dry_run, ) health = HealthCheckServer( "order-executor", port=config.health_port + HEALTH_PORT_OFFSET, auth_token=config.metrics_auth_token, ) await health.start() metrics.service_up.labels(service="order-executor").set(1) GROUP = "order-executor" CONSUMER = "executor-1" stream = "signals" await broker.ensure_group(stream, GROUP) shutdown = GracefulShutdown() shutdown.install_handlers() log.info("started", stream=stream, dry_run=config.dry_run) try: # Process pending messages first pending = await broker.read_pending(stream, GROUP, CONSUMER) for msg_id, msg in pending: try: event = Event.from_dict(msg) if event.type == EventType.SIGNAL: await executor.execute(event.data) await broker.ack(stream, GROUP, msg_id) except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc: log.warning("pending_network_error", error=str(exc), msg_id=msg_id) except (ValueError, KeyError, TypeError) as exc: log.warning("pending_parse_error", error=str(exc), msg_id=msg_id) await broker.ack(stream, GROUP, msg_id) except Exception as exc: log.error("pending_failed", error=str(exc), msg_id=msg_id, exc_info=True) while not shutdown.is_shutting_down: messages = await broker.read_group(stream, GROUP, CONSUMER, count=10, block=5000) for msg_id, msg in messages: try: event = Event.from_dict(msg) if event.type == EventType.SIGNAL: signal = event.data log.info("processing_signal", signal_id=signal.id, symbol=signal.symbol) await executor.execute(signal) metrics.events_processed.labels( service="order-executor", event_type="signal" ).inc() await broker.ack(stream, GROUP, msg_id) except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc: log.warning("process_network_error", error=str(exc)) metrics.errors_total.labels( service="order-executor", error_type="network" ).inc() except (ValueError, KeyError, TypeError) as exc: log.warning("process_parse_error", error=str(exc)) await broker.ack(stream, GROUP, msg_id) metrics.errors_total.labels( service="order-executor", error_type="validation" ).inc() except Exception as exc: log.error("process_failed", error=str(exc), exc_info=True) metrics.errors_total.labels( service="order-executor", error_type="processing" ).inc() finally: metrics.service_up.labels(service="order-executor").set(0) await notifier.close() await broker.close() await db.close() await alpaca.close() def main() -> None: asyncio.run(run()) if __name__ == "__main__": main()