From 33b14aaa2344b0fd95d1629627c3d135b24ae102 Mon Sep 17 00:00:00 2001 From: TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> Date: Wed, 1 Apr 2026 15:56:35 +0900 Subject: feat: initial trading platform implementation Binance spot crypto trading platform with microservices architecture: - shared: Pydantic models, Redis Streams broker, asyncpg DB layer - data-collector: Binance WebSocket/REST market data collection - strategy-engine: Plugin-based strategy execution (RSI, Grid) - order-executor: Order execution with risk management - portfolio-manager: Position tracking and PnL calculation - backtester: Historical strategy testing with simulator - cli: Click-based CLI for all operations - Docker Compose orchestration with Redis and PostgreSQL - 24 test files covering all modules --- services/order-executor/src/order_executor/main.py | 83 ++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 services/order-executor/src/order_executor/main.py (limited to 'services/order-executor/src/order_executor/main.py') diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py new file mode 100644 index 0000000..b57c513 --- /dev/null +++ b/services/order-executor/src/order_executor/main.py @@ -0,0 +1,83 @@ +"""Order Executor Service entry point.""" +import asyncio +import logging +from decimal import Decimal + +import ccxt.async_support as ccxt + +from shared.broker import RedisBroker +from shared.db import Database +from shared.events import Event, EventType + +from order_executor.config import ExecutorConfig +from order_executor.executor import OrderExecutor +from order_executor.risk_manager import RiskManager + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +async def run() -> None: + config = ExecutorConfig() + logging.getLogger().setLevel(config.log_level) + + db = Database(config.database_url) + await db.connect() + await db.init_tables() + + broker = RedisBroker(config.redis_url) + + exchange = ccxt.binance( + { + "apiKey": config.binance_api_key, + "secret": config.binance_api_secret, + } + ) + + risk_manager = RiskManager( + max_position_size=Decimal(str(config.risk_max_position_size)), + stop_loss_pct=Decimal(str(config.risk_stop_loss_pct)), + daily_loss_limit_pct=Decimal(str(config.risk_daily_loss_limit_pct)), + ) + + executor = OrderExecutor( + exchange=exchange, + risk_manager=risk_manager, + broker=broker, + db=db, + dry_run=config.dry_run, + ) + + last_id = "$" + stream = "signals" + logger.info("Order executor started, listening on stream=%s dry_run=%s", stream, config.dry_run) + + try: + while True: + messages = await broker.read(stream, last_id=last_id, count=10, block=5000) + for msg in messages: + try: + event = Event.from_dict(msg) + if event.type == EventType.SIGNAL: + signal = event.data + logger.info("Processing signal %s for %s", signal.id, signal.symbol) + await executor.execute(signal) + except Exception as exc: + logger.error("Failed to process message: %s", exc) + if messages: + # Advance last_id to avoid re-reading — broker.read returns decoded dicts, + # so we track progress by re-reading with "0" for replaying or "$" for new only. + # Since we block on "$" we get only new messages each iteration. + pass + finally: + await broker.close() + await db.close() + await exchange.close() + + +def main() -> None: + asyncio.run(run()) + + +if __name__ == "__main__": + main() -- cgit v1.2.3