summaryrefslogtreecommitdiff
path: root/services/order-executor/src/order_executor/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'services/order-executor/src/order_executor/main.py')
-rw-r--r--services/order-executor/src/order_executor/main.py83
1 files changed, 83 insertions, 0 deletions
diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py
new file mode 100644
index 0000000..b57c513
--- /dev/null
+++ b/services/order-executor/src/order_executor/main.py
@@ -0,0 +1,83 @@
+"""Order Executor Service entry point."""
+import asyncio
+import logging
+from decimal import Decimal
+
+import ccxt.async_support as ccxt
+
+from shared.broker import RedisBroker
+from shared.db import Database
+from shared.events import Event, EventType
+
+from order_executor.config import ExecutorConfig
+from order_executor.executor import OrderExecutor
+from order_executor.risk_manager import RiskManager
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+
+async def run() -> None:
+ config = ExecutorConfig()
+ logging.getLogger().setLevel(config.log_level)
+
+ db = Database(config.database_url)
+ await db.connect()
+ await db.init_tables()
+
+ broker = RedisBroker(config.redis_url)
+
+ exchange = ccxt.binance(
+ {
+ "apiKey": config.binance_api_key,
+ "secret": config.binance_api_secret,
+ }
+ )
+
+ risk_manager = RiskManager(
+ max_position_size=Decimal(str(config.risk_max_position_size)),
+ stop_loss_pct=Decimal(str(config.risk_stop_loss_pct)),
+ daily_loss_limit_pct=Decimal(str(config.risk_daily_loss_limit_pct)),
+ )
+
+ executor = OrderExecutor(
+ exchange=exchange,
+ risk_manager=risk_manager,
+ broker=broker,
+ db=db,
+ dry_run=config.dry_run,
+ )
+
+ last_id = "$"
+ stream = "signals"
+ logger.info("Order executor started, listening on stream=%s dry_run=%s", stream, config.dry_run)
+
+ try:
+ while True:
+ messages = await broker.read(stream, last_id=last_id, count=10, block=5000)
+ for msg in messages:
+ try:
+ event = Event.from_dict(msg)
+ if event.type == EventType.SIGNAL:
+ signal = event.data
+ logger.info("Processing signal %s for %s", signal.id, signal.symbol)
+ await executor.execute(signal)
+ except Exception as exc:
+ logger.error("Failed to process message: %s", exc)
+ if messages:
+ # Advance last_id to avoid re-reading — broker.read returns decoded dicts,
+ # so we track progress by re-reading with "0" for replaying or "$" for new only.
+ # Since we block on "$" we get only new messages each iteration.
+ pass
+ finally:
+ await broker.close()
+ await db.close()
+ await exchange.close()
+
+
+def main() -> None:
+ asyncio.run(run())
+
+
+if __name__ == "__main__":
+ main()