"""Order Executor Service entry point.""" import asyncio from decimal import Decimal from shared.alpaca import AlpacaClient from shared.broker import RedisBroker from shared.db import Database from shared.events import Event, EventType from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier from shared.shutdown import GracefulShutdown from order_executor.config import ExecutorConfig from order_executor.executor import OrderExecutor from order_executor.risk_manager import RiskManager # Health check port: base + 2 HEALTH_PORT_OFFSET = 2 async def run() -> None: config = ExecutorConfig() log = setup_logging("order-executor", config.log_level, config.log_format) metrics = ServiceMetrics("order_executor") notifier = TelegramNotifier( bot_token=config.telegram_bot_token.get_secret_value(), chat_id=config.telegram_chat_id, ) db = Database(config.database_url.get_secret_value()) await db.connect() broker = RedisBroker(config.redis_url.get_secret_value()) alpaca = AlpacaClient( api_key=config.alpaca_api_key.get_secret_value(), api_secret=config.alpaca_api_secret.get_secret_value(), paper=config.alpaca_paper, ) risk_manager = RiskManager( max_position_size=Decimal(str(config.risk_max_position_size)), stop_loss_pct=Decimal(str(config.risk_stop_loss_pct)), daily_loss_limit_pct=Decimal(str(config.risk_daily_loss_limit_pct)), trailing_stop_pct=Decimal(str(config.risk_trailing_stop_pct)), max_open_positions=config.risk_max_open_positions, volatility_lookback=config.risk_volatility_lookback, volatility_scale=config.risk_volatility_scale, max_portfolio_exposure=config.risk_max_portfolio_exposure, max_correlated_exposure=config.risk_max_correlated_exposure, correlation_threshold=config.risk_correlation_threshold, var_confidence=config.risk_var_confidence, var_limit_pct=config.risk_var_limit_pct, drawdown_reduction_threshold=config.risk_drawdown_reduction_threshold, drawdown_halt_threshold=config.risk_drawdown_halt_threshold, max_consecutive_losses=config.risk_max_consecutive_losses, loss_pause_minutes=config.risk_loss_pause_minutes, ) executor = OrderExecutor( exchange=alpaca, risk_manager=risk_manager, broker=broker, db=db, notifier=notifier, dry_run=config.dry_run, ) health = HealthCheckServer( "order-executor", port=config.health_port + HEALTH_PORT_OFFSET, auth_token=config.metrics_auth_token, ) await health.start() metrics.service_up.labels(service="order-executor").set(1) GROUP = "order-executor" CONSUMER = "executor-1" stream = "signals" await broker.ensure_group(stream, GROUP) shutdown = GracefulShutdown() shutdown.install_handlers() log.info("started", stream=stream, dry_run=config.dry_run) try: # Process pending messages first pending = await broker.read_pending(stream, GROUP, CONSUMER) for msg_id, msg in pending: try: event = Event.from_dict(msg) if event.type == EventType.SIGNAL: await executor.execute(event.data) await broker.ack(stream, GROUP, msg_id) except Exception as exc: log.error("pending_failed", error=str(exc), msg_id=msg_id) while not shutdown.is_shutting_down: messages = await broker.read_group(stream, GROUP, CONSUMER, count=10, block=5000) for msg_id, msg in messages: try: event = Event.from_dict(msg) if event.type == EventType.SIGNAL: signal = event.data log.info("processing_signal", signal_id=signal.id, symbol=signal.symbol) await executor.execute(signal) metrics.events_processed.labels( service="order-executor", event_type="signal" ).inc() await broker.ack(stream, GROUP, msg_id) except Exception as exc: log.error("process_failed", error=str(exc)) metrics.errors_total.labels( service="order-executor", error_type="processing" ).inc() finally: metrics.service_up.labels(service="order-executor").set(0) await notifier.close() await broker.close() await db.close() await alpaca.close() def main() -> None: asyncio.run(run()) if __name__ == "__main__": main()