"""Order Executor Service entry point.""" import asyncio from decimal import Decimal from shared.broker import RedisBroker from shared.db import Database from shared.events import Event, EventType from shared.healthcheck import HealthCheckServer from shared.exchange import create_exchange from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier from order_executor.config import ExecutorConfig from order_executor.executor import OrderExecutor from order_executor.risk_manager import RiskManager async def run() -> None: config = ExecutorConfig() log = setup_logging("order-executor", config.log_level, config.log_format) metrics = ServiceMetrics("order_executor") notifier = TelegramNotifier( bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id ) db = Database(config.database_url) await db.connect() await db.init_tables() broker = RedisBroker(config.redis_url) exchange = create_exchange( exchange_id=config.exchange_id, api_key=config.binance_api_key, api_secret=config.binance_api_secret, sandbox=config.exchange_sandbox, ) risk_manager = RiskManager( max_position_size=Decimal(str(config.risk_max_position_size)), stop_loss_pct=Decimal(str(config.risk_stop_loss_pct)), daily_loss_limit_pct=Decimal(str(config.risk_daily_loss_limit_pct)), ) executor = OrderExecutor( exchange=exchange, risk_manager=risk_manager, broker=broker, db=db, notifier=notifier, dry_run=config.dry_run, ) last_id = "$" stream = "signals" health = HealthCheckServer("order-executor", port=config.health_port + 2, auth_token=config.metrics_auth_token) health.register_check("redis", broker.ping) await health.start() metrics.service_up.labels(service="order-executor").set(1) log.info("service_started", stream=stream, dry_run=config.dry_run) try: while True: messages = await broker.read(stream, last_id=last_id, count=10, block=5000) for msg in messages: try: event = Event.from_dict(msg) if event.type == EventType.SIGNAL: signal = event.data log.info( "processing_signal", signal_id=str(signal.id), symbol=signal.symbol ) await executor.execute(signal) metrics.events_processed.labels( service="order-executor", event_type="signal" ).inc() except Exception as exc: log.error("message_processing_failed", error=str(exc)) metrics.errors_total.labels( service="order-executor", error_type="processing" ).inc() if messages: # Advance last_id to avoid re-reading — broker.read returns decoded dicts, # so we track progress by re-reading with "0" for replaying or "$" for new only. # Since we block on "$" we get only new messages each iteration. pass except Exception as exc: log.error("fatal_error", error=str(exc)) await notifier.send_error(str(exc), "order-executor") raise finally: metrics.service_up.labels(service="order-executor").set(0) await notifier.close() await broker.close() await db.close() await exchange.close() def main() -> None: asyncio.run(run()) if __name__ == "__main__": main()