From 35120795147adf53de59b7f2a3c8aa14adec9a56 Mon Sep 17 00:00:00 2001 From: TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> Date: Thu, 2 Apr 2026 10:12:06 +0900 Subject: refactor: update data-collector and order-executor for Alpaca API --- services/order-executor/src/order_executor/main.py | 72 +++++++++------------- 1 file changed, 28 insertions(+), 44 deletions(-) (limited to 'services/order-executor/src/order_executor/main.py') diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py index 68e14aa..3e098c3 100644 --- a/services/order-executor/src/order_executor/main.py +++ b/services/order-executor/src/order_executor/main.py @@ -1,13 +1,12 @@ """Order Executor Service entry point.""" - import asyncio from decimal import Decimal +from shared.alpaca import AlpacaClient from shared.broker import RedisBroker from shared.db import Database from shared.events import Event, EventType from shared.healthcheck import HealthCheckServer -from shared.exchange import create_exchange from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier @@ -16,9 +15,7 @@ from order_executor.config import ExecutorConfig from order_executor.executor import OrderExecutor from order_executor.risk_manager import RiskManager -# Health check port: base (HEALTH_PORT, default 8080) + offset -# data-collector: +0 (8080), strategy-engine: +1 (8081) -# order-executor: +2 (8082), portfolio-manager: +3 (8083) +# Health check port: base + 2 HEALTH_PORT_OFFSET = 2 @@ -26,21 +23,21 @@ async def run() -> None: config = ExecutorConfig() log = setup_logging("order-executor", config.log_level, config.log_format) metrics = ServiceMetrics("order_executor") + notifier = TelegramNotifier( - bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id + bot_token=config.telegram_bot_token, + chat_id=config.telegram_chat_id, ) db = Database(config.database_url) await db.connect() - await db.init_tables() broker = RedisBroker(config.redis_url) - exchange = create_exchange( - exchange_id=config.exchange_id, - api_key=config.binance_api_key, - api_secret=config.binance_api_secret, - sandbox=config.exchange_sandbox, + alpaca = AlpacaClient( + api_key=config.alpaca_api_key, + api_secret=config.alpaca_api_secret, + paper=config.alpaca_paper, ) risk_manager = RiskManager( @@ -63,7 +60,7 @@ async def run() -> None: ) executor = OrderExecutor( - exchange=exchange, + exchange=alpaca, risk_manager=risk_manager, broker=broker, db=db, @@ -71,41 +68,34 @@ async def run() -> None: dry_run=config.dry_run, ) - GROUP = "order-executor" - CONSUMER = "executor-1" - stream = "signals" - health = HealthCheckServer( "order-executor", port=config.health_port + HEALTH_PORT_OFFSET, auth_token=config.metrics_auth_token, ) - health.register_check("redis", broker.ping) await health.start() metrics.service_up.labels(service="order-executor").set(1) - log.info("service_started", stream=stream, dry_run=config.dry_run) + GROUP = "order-executor" + CONSUMER = "executor-1" + stream = "signals" await broker.ensure_group(stream, GROUP) - # Process pending messages first (from previous crash) - pending = await broker.read_pending(stream, GROUP, CONSUMER) - for msg_id, msg in pending: - try: - event = Event.from_dict(msg) - if event.type == EventType.SIGNAL: - signal = event.data - log.info( - "processing_pending_signal", signal_id=str(signal.id), symbol=signal.symbol - ) - await executor.execute(signal) - metrics.events_processed.labels(service="order-executor", event_type="signal").inc() - await broker.ack(stream, GROUP, msg_id) - except Exception as exc: - log.error("pending_process_failed", error=str(exc), msg_id=msg_id) - metrics.errors_total.labels(service="order-executor", error_type="processing").inc() + log.info("started", stream=stream, dry_run=config.dry_run) try: + # Process pending messages first + pending = await broker.read_pending(stream, GROUP, CONSUMER) + for msg_id, msg in pending: + try: + event = Event.from_dict(msg) + if event.type == EventType.SIGNAL: + await executor.execute(event.data) + await broker.ack(stream, GROUP, msg_id) + except Exception as exc: + log.error("pending_failed", error=str(exc), msg_id=msg_id) + while True: messages = await broker.read_group(stream, GROUP, CONSUMER, count=10, block=5000) for msg_id, msg in messages: @@ -113,29 +103,23 @@ async def run() -> None: event = Event.from_dict(msg) if event.type == EventType.SIGNAL: signal = event.data - log.info( - "processing_signal", signal_id=str(signal.id), symbol=signal.symbol - ) + log.info("processing_signal", signal_id=signal.id, symbol=signal.symbol) await executor.execute(signal) metrics.events_processed.labels( service="order-executor", event_type="signal" ).inc() await broker.ack(stream, GROUP, msg_id) except Exception as exc: - log.error("message_processing_failed", error=str(exc), msg_id=msg_id) + log.error("process_failed", error=str(exc)) metrics.errors_total.labels( service="order-executor", error_type="processing" ).inc() - except Exception as exc: - log.error("fatal_error", error=str(exc)) - await notifier.send_error(str(exc), "order-executor") - raise finally: metrics.service_up.labels(service="order-executor").set(0) await notifier.close() await broker.close() await db.close() - await exchange.close() + await alpaca.close() def main() -> None: -- cgit v1.2.3