From c3560fdd637c4f034cac4f7371aeed65d018bc91 Mon Sep 17 00:00:00 2001 From: TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> Date: Wed, 1 Apr 2026 16:12:40 +0900 Subject: feat(services): integrate structlog, healthcheck, metrics, and Telegram Replace logging.basicConfig/getLogger with structlog setup_logging in all four service entry points. Add HealthCheckServer, ServiceMetrics, and TelegramNotifier initialization to each service. Update OrderExecutor to accept a notifier parameter and send order notifications. Add RedisBroker.ping() for health checks. Update executor tests with notifier=AsyncMock(). --- services/order-executor/src/order_executor/main.py | 33 ++++++++++++++++------ 1 file changed, 25 insertions(+), 8 deletions(-) (limited to 'services/order-executor/src/order_executor/main.py') diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py index b57c513..7f0578d 100644 --- a/services/order-executor/src/order_executor/main.py +++ b/services/order-executor/src/order_executor/main.py @@ -1,6 +1,5 @@ """Order Executor Service entry point.""" import asyncio -import logging from decimal import Decimal import ccxt.async_support as ccxt @@ -8,18 +7,21 @@ import ccxt.async_support as ccxt from shared.broker import RedisBroker from shared.db import Database from shared.events import Event, EventType +from shared.healthcheck import HealthCheckServer +from shared.logging import setup_logging +from shared.metrics import ServiceMetrics +from shared.notifier import TelegramNotifier from order_executor.config import ExecutorConfig from order_executor.executor import OrderExecutor from order_executor.risk_manager import RiskManager -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - async def run() -> None: config = ExecutorConfig() - logging.getLogger().setLevel(config.log_level) + log = setup_logging("order-executor", config.log_level, config.log_format) + metrics = ServiceMetrics("order_executor") + notifier = TelegramNotifier(bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id) db = Database(config.database_url) await db.connect() @@ -45,12 +47,19 @@ async def run() -> None: risk_manager=risk_manager, broker=broker, db=db, + notifier=notifier, dry_run=config.dry_run, ) last_id = "$" stream = "signals" - logger.info("Order executor started, listening on stream=%s dry_run=%s", stream, config.dry_run) + + health = HealthCheckServer("order-executor", port=config.health_port + 2) + health.register_check("redis", broker.ping) + await health.start() + metrics.service_up.labels(service="order-executor").set(1) + + log.info("service_started", stream=stream, dry_run=config.dry_run) try: while True: @@ -60,16 +69,24 @@ async def run() -> None: event = Event.from_dict(msg) if event.type == EventType.SIGNAL: signal = event.data - logger.info("Processing signal %s for %s", signal.id, signal.symbol) + log.info("processing_signal", signal_id=str(signal.id), symbol=signal.symbol) await executor.execute(signal) + metrics.events_processed.labels(service="order-executor", event_type="signal").inc() except Exception as exc: - logger.error("Failed to process message: %s", exc) + log.error("message_processing_failed", error=str(exc)) + metrics.errors_total.labels(service="order-executor", error_type="processing").inc() if messages: # Advance last_id to avoid re-reading — broker.read returns decoded dicts, # so we track progress by re-reading with "0" for replaying or "$" for new only. # Since we block on "$" we get only new messages each iteration. pass + except Exception as exc: + log.error("fatal_error", error=str(exc)) + await notifier.send_error(str(exc), "order-executor") + raise finally: + metrics.service_up.labels(service="order-executor").set(0) + await notifier.close() await broker.close() await db.close() await exchange.close() -- cgit v1.2.3