diff options
Diffstat (limited to 'services/order-executor/src/order_executor/main.py')
| -rw-r--r-- | services/order-executor/src/order_executor/main.py | 45 |
1 files changed, 33 insertions, 12 deletions
diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py index 51ab286..99f88e1 100644 --- a/services/order-executor/src/order_executor/main.py +++ b/services/order-executor/src/order_executor/main.py @@ -3,6 +3,11 @@ import asyncio from decimal import Decimal +import aiohttp + +from order_executor.config import ExecutorConfig +from order_executor.executor import OrderExecutor +from order_executor.risk_manager import RiskManager from shared.alpaca import AlpacaClient from shared.broker import RedisBroker from shared.db import Database @@ -11,10 +16,7 @@ from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier - -from order_executor.config import ExecutorConfig -from order_executor.executor import OrderExecutor -from order_executor.risk_manager import RiskManager +from shared.shutdown import GracefulShutdown # Health check port: base + 2 HEALTH_PORT_OFFSET = 2 @@ -26,18 +28,18 @@ async def run() -> None: metrics = ServiceMetrics("order_executor") notifier = TelegramNotifier( - bot_token=config.telegram_bot_token, + bot_token=config.telegram_bot_token.get_secret_value(), chat_id=config.telegram_chat_id, ) - db = Database(config.database_url) + db = Database(config.database_url.get_secret_value()) await db.connect() - broker = RedisBroker(config.redis_url) + broker = RedisBroker(config.redis_url.get_secret_value()) alpaca = AlpacaClient( - api_key=config.alpaca_api_key, - api_secret=config.alpaca_api_secret, + api_key=config.alpaca_api_key.get_secret_value(), + api_secret=config.alpaca_api_secret.get_secret_value(), paper=config.alpaca_paper, ) @@ -83,6 +85,9 @@ async def run() -> None: await broker.ensure_group(stream, GROUP) + shutdown = GracefulShutdown() + shutdown.install_handlers() + log.info("started", stream=stream, dry_run=config.dry_run) try: @@ -94,10 +99,15 @@ async def run() -> None: if event.type == EventType.SIGNAL: await executor.execute(event.data) await broker.ack(stream, GROUP, msg_id) + except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc: + log.warning("pending_network_error", error=str(exc), msg_id=msg_id) + except (ValueError, KeyError, TypeError) as exc: + log.warning("pending_parse_error", error=str(exc), msg_id=msg_id) + await broker.ack(stream, GROUP, msg_id) except Exception as exc: - log.error("pending_failed", error=str(exc), msg_id=msg_id) + log.error("pending_failed", error=str(exc), msg_id=msg_id, exc_info=True) - while True: + while not shutdown.is_shutting_down: messages = await broker.read_group(stream, GROUP, CONSUMER, count=10, block=5000) for msg_id, msg in messages: try: @@ -110,8 +120,19 @@ async def run() -> None: service="order-executor", event_type="signal" ).inc() await broker.ack(stream, GROUP, msg_id) + except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc: + log.warning("process_network_error", error=str(exc)) + metrics.errors_total.labels( + service="order-executor", error_type="network" + ).inc() + except (ValueError, KeyError, TypeError) as exc: + log.warning("process_parse_error", error=str(exc)) + await broker.ack(stream, GROUP, msg_id) + metrics.errors_total.labels( + service="order-executor", error_type="validation" + ).inc() except Exception as exc: - log.error("process_failed", error=str(exc)) + log.error("process_failed", error=str(exc), exc_info=True) metrics.errors_total.labels( service="order-executor", error_type="processing" ).inc() |
