"""Order Executor Service entry point.""" import asyncio from decimal import Decimal from shared.broker import RedisBroker from shared.db import Database from shared.events import Event, EventType from shared.healthcheck import HealthCheckServer from shared.exchange import create_exchange from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier from order_executor.config import ExecutorConfig from order_executor.executor import OrderExecutor from order_executor.risk_manager import RiskManager # Health check port: base (HEALTH_PORT, default 8080) + offset # data-collector: +0 (8080), strategy-engine: +1 (8081) # order-executor: +2 (8082), portfolio-manager: +3 (8083) HEALTH_PORT_OFFSET = 2 async def run() -> None: config = ExecutorConfig() log = setup_logging("order-executor", config.log_level, config.log_format) metrics = ServiceMetrics("order_executor") notifier = TelegramNotifier( bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id ) db = Database(config.database_url) await db.connect() await db.init_tables() broker = RedisBroker(config.redis_url) exchange = create_exchange( exchange_id=config.exchange_id, api_key=config.binance_api_key, api_secret=config.binance_api_secret, sandbox=config.exchange_sandbox, ) risk_manager = RiskManager( max_position_size=Decimal(str(config.risk_max_position_size)), stop_loss_pct=Decimal(str(config.risk_stop_loss_pct)), daily_loss_limit_pct=Decimal(str(config.risk_daily_loss_limit_pct)), trailing_stop_pct=Decimal(str(config.risk_trailing_stop_pct)), max_open_positions=config.risk_max_open_positions, volatility_lookback=config.risk_volatility_lookback, volatility_scale=config.risk_volatility_scale, ) executor = OrderExecutor( exchange=exchange, risk_manager=risk_manager, broker=broker, db=db, notifier=notifier, dry_run=config.dry_run, ) GROUP = "order-executor" CONSUMER = "executor-1" stream = "signals" health = HealthCheckServer( "order-executor", port=config.health_port + HEALTH_PORT_OFFSET, auth_token=config.metrics_auth_token, ) health.register_check("redis", broker.ping) await health.start() metrics.service_up.labels(service="order-executor").set(1) log.info("service_started", stream=stream, dry_run=config.dry_run) await broker.ensure_group(stream, GROUP) # Process pending messages first (from previous crash) pending = await broker.read_pending(stream, GROUP, CONSUMER) for msg_id, msg in pending: try: event = Event.from_dict(msg) if event.type == EventType.SIGNAL: signal = event.data log.info( "processing_pending_signal", signal_id=str(signal.id), symbol=signal.symbol ) await executor.execute(signal) metrics.events_processed.labels( service="order-executor", event_type="signal" ).inc() await broker.ack(stream, GROUP, msg_id) except Exception as exc: log.error("pending_process_failed", error=str(exc), msg_id=msg_id) metrics.errors_total.labels( service="order-executor", error_type="processing" ).inc() try: while True: messages = await broker.read_group( stream, GROUP, CONSUMER, count=10, block=5000 ) for msg_id, msg in messages: try: event = Event.from_dict(msg) if event.type == EventType.SIGNAL: signal = event.data log.info( "processing_signal", signal_id=str(signal.id), symbol=signal.symbol ) await executor.execute(signal) metrics.events_processed.labels( service="order-executor", event_type="signal" ).inc() await broker.ack(stream, GROUP, msg_id) except Exception as exc: log.error("message_processing_failed", error=str(exc), msg_id=msg_id) metrics.errors_total.labels( service="order-executor", error_type="processing" ).inc() except Exception as exc: log.error("fatal_error", error=str(exc)) await notifier.send_error(str(exc), "order-executor") raise finally: metrics.service_up.labels(service="order-executor").set(0) await notifier.close() await broker.close() await db.close() await exchange.close() def main() -> None: asyncio.run(run()) if __name__ == "__main__": main()