summaryrefslogtreecommitdiff
path: root/services/order-executor/src/order_executor/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'services/order-executor/src/order_executor/main.py')
-rw-r--r--services/order-executor/src/order_executor/main.py45
1 files changed, 33 insertions, 12 deletions
diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py
index 51ab286..99f88e1 100644
--- a/services/order-executor/src/order_executor/main.py
+++ b/services/order-executor/src/order_executor/main.py
@@ -3,6 +3,11 @@
import asyncio
from decimal import Decimal
+import aiohttp
+
+from order_executor.config import ExecutorConfig
+from order_executor.executor import OrderExecutor
+from order_executor.risk_manager import RiskManager
from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
from shared.db import Database
@@ -11,10 +16,7 @@ from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
-
-from order_executor.config import ExecutorConfig
-from order_executor.executor import OrderExecutor
-from order_executor.risk_manager import RiskManager
+from shared.shutdown import GracefulShutdown
# Health check port: base + 2
HEALTH_PORT_OFFSET = 2
@@ -26,18 +28,18 @@ async def run() -> None:
metrics = ServiceMetrics("order_executor")
notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token,
+ bot_token=config.telegram_bot_token.get_secret_value(),
chat_id=config.telegram_chat_id,
)
- db = Database(config.database_url)
+ db = Database(config.database_url.get_secret_value())
await db.connect()
- broker = RedisBroker(config.redis_url)
+ broker = RedisBroker(config.redis_url.get_secret_value())
alpaca = AlpacaClient(
- api_key=config.alpaca_api_key,
- api_secret=config.alpaca_api_secret,
+ api_key=config.alpaca_api_key.get_secret_value(),
+ api_secret=config.alpaca_api_secret.get_secret_value(),
paper=config.alpaca_paper,
)
@@ -83,6 +85,9 @@ async def run() -> None:
await broker.ensure_group(stream, GROUP)
+ shutdown = GracefulShutdown()
+ shutdown.install_handlers()
+
log.info("started", stream=stream, dry_run=config.dry_run)
try:
@@ -94,10 +99,15 @@ async def run() -> None:
if event.type == EventType.SIGNAL:
await executor.execute(event.data)
await broker.ack(stream, GROUP, msg_id)
+ except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc:
+ log.warning("pending_network_error", error=str(exc), msg_id=msg_id)
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("pending_parse_error", error=str(exc), msg_id=msg_id)
+ await broker.ack(stream, GROUP, msg_id)
except Exception as exc:
- log.error("pending_failed", error=str(exc), msg_id=msg_id)
+ log.error("pending_failed", error=str(exc), msg_id=msg_id, exc_info=True)
- while True:
+ while not shutdown.is_shutting_down:
messages = await broker.read_group(stream, GROUP, CONSUMER, count=10, block=5000)
for msg_id, msg in messages:
try:
@@ -110,8 +120,19 @@ async def run() -> None:
service="order-executor", event_type="signal"
).inc()
await broker.ack(stream, GROUP, msg_id)
+ except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc:
+ log.warning("process_network_error", error=str(exc))
+ metrics.errors_total.labels(
+ service="order-executor", error_type="network"
+ ).inc()
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("process_parse_error", error=str(exc))
+ await broker.ack(stream, GROUP, msg_id)
+ metrics.errors_total.labels(
+ service="order-executor", error_type="validation"
+ ).inc()
except Exception as exc:
- log.error("process_failed", error=str(exc))
+ log.error("process_failed", error=str(exc), exc_info=True)
metrics.errors_total.labels(
service="order-executor", error_type="processing"
).inc()