"""Order Executor Service entry point.""" import asyncio from decimal import Decimal import ccxt.async_support as ccxt from shared.broker import RedisBroker from shared.db import Database from shared.events import Event, EventType from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier from order_executor.config import ExecutorConfig from order_executor.executor import OrderExecutor from order_executor.risk_manager import RiskManager async def run() -> None: config = ExecutorConfig() log = setup_logging("order-executor", config.log_level, config.log_format) metrics = ServiceMetrics("order_executor") notifier = TelegramNotifier( bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id ) db = Database(config.database_url) await db.connect() await db.init_tables() broker = RedisBroker(config.redis_url) exchange = ccxt.binance( { "apiKey": config.binance_api_key, "secret": config.binance_api_secret, } ) risk_manager = RiskManager( max_position_size=Decimal(str(config.risk_max_position_size)), stop_loss_pct=Decimal(str(config.risk_stop_loss_pct)), daily_loss_limit_pct=Decimal(str(config.risk_daily_loss_limit_pct)), ) executor = OrderExecutor( exchange=exchange, risk_manager=risk_manager, broker=broker, db=db, notifier=notifier, dry_run=config.dry_run, ) last_id = "$" stream = "signals" health = HealthCheckServer("order-executor", port=config.health_port + 2) health.register_check("redis", broker.ping) await health.start() metrics.service_up.labels(service="order-executor").set(1) log.info("service_started", stream=stream, dry_run=config.dry_run) try: while True: messages = await broker.read(stream, last_id=last_id, count=10, block=5000) for msg in messages: try: event = Event.from_dict(msg) if event.type == EventType.SIGNAL: signal = event.data log.info( "processing_signal", signal_id=str(signal.id), symbol=signal.symbol ) await executor.execute(signal) metrics.events_processed.labels( service="order-executor", event_type="signal" ).inc() except Exception as exc: log.error("message_processing_failed", error=str(exc)) metrics.errors_total.labels( service="order-executor", error_type="processing" ).inc() if messages: # Advance last_id to avoid re-reading — broker.read returns decoded dicts, # so we track progress by re-reading with "0" for replaying or "$" for new only. # Since we block on "$" we get only new messages each iteration. pass except Exception as exc: log.error("fatal_error", error=str(exc)) await notifier.send_error(str(exc), "order-executor") raise finally: metrics.service_up.labels(service="order-executor").set(0) await notifier.close() await broker.close() await db.close() await exchange.close() def main() -> None: asyncio.run(run()) if __name__ == "__main__": main()