"""Order Executor Service entry point.""" import asyncio import logging from decimal import Decimal import ccxt.async_support as ccxt from shared.broker import RedisBroker from shared.db import Database from shared.events import Event, EventType from order_executor.config import ExecutorConfig from order_executor.executor import OrderExecutor from order_executor.risk_manager import RiskManager logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) async def run() -> None: config = ExecutorConfig() logging.getLogger().setLevel(config.log_level) db = Database(config.database_url) await db.connect() await db.init_tables() broker = RedisBroker(config.redis_url) exchange = ccxt.binance( { "apiKey": config.binance_api_key, "secret": config.binance_api_secret, } ) risk_manager = RiskManager( max_position_size=Decimal(str(config.risk_max_position_size)), stop_loss_pct=Decimal(str(config.risk_stop_loss_pct)), daily_loss_limit_pct=Decimal(str(config.risk_daily_loss_limit_pct)), ) executor = OrderExecutor( exchange=exchange, risk_manager=risk_manager, broker=broker, db=db, dry_run=config.dry_run, ) last_id = "$" stream = "signals" logger.info("Order executor started, listening on stream=%s dry_run=%s", stream, config.dry_run) try: while True: messages = await broker.read(stream, last_id=last_id, count=10, block=5000) for msg in messages: try: event = Event.from_dict(msg) if event.type == EventType.SIGNAL: signal = event.data logger.info("Processing signal %s for %s", signal.id, signal.symbol) await executor.execute(signal) except Exception as exc: logger.error("Failed to process message: %s", exc) if messages: # Advance last_id to avoid re-reading — broker.read returns decoded dicts, # so we track progress by re-reading with "0" for replaying or "$" for new only. # Since we block on "$" we get only new messages each iteration. pass finally: await broker.close() await db.close() await exchange.close() def main() -> None: asyncio.run(run()) if __name__ == "__main__": main()