diff options
| author | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-01 16:12:40 +0900 |
|---|---|---|
| committer | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-01 16:12:40 +0900 |
| commit | c3560fdd637c4f034cac4f7371aeed65d018bc91 (patch) | |
| tree | c50d3318c0c37ca634012b7113a80c0d6e356844 /services | |
| parent | ec792a3d379c911165038d8da5b339df6ca3fccd (diff) | |
feat(services): integrate structlog, healthcheck, metrics, and Telegram
Replace logging.basicConfig/getLogger with structlog setup_logging in all
four service entry points. Add HealthCheckServer, ServiceMetrics, and
TelegramNotifier initialization to each service. Update OrderExecutor to
accept a notifier parameter and send order notifications. Add
RedisBroker.ping() for health checks. Update executor tests with
notifier=AsyncMock().
Diffstat (limited to 'services')
6 files changed, 120 insertions, 46 deletions
diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py index adf1e96..c778503 100644 --- a/services/data-collector/src/data_collector/main.py +++ b/services/data-collector/src/data_collector/main.py @@ -1,21 +1,24 @@ """Data Collector Service entry point.""" import asyncio -import logging from shared.broker import RedisBroker from shared.db import Database +from shared.healthcheck import HealthCheckServer +from shared.logging import setup_logging +from shared.metrics import ServiceMetrics +from shared.notifier import TelegramNotifier from data_collector.binance_ws import BinanceWebSocket from data_collector.config import CollectorConfig from data_collector.storage import CandleStorage -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - async def run() -> None: """Initialise all components and start the WebSocket collector.""" config = CollectorConfig() + log = setup_logging("data-collector", config.log_level, config.log_format) + metrics = ServiceMetrics("data_collector") + notifier = TelegramNotifier(bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id) db = Database(config.database_url) await db.connect() @@ -25,8 +28,9 @@ async def run() -> None: storage = CandleStorage(db=db, broker=broker) async def on_candle(candle): - logger.info("Candle received: %s %s %s", candle.symbol, candle.timeframe, candle.open_time) + log.info("candle_received", symbol=candle.symbol, timeframe=candle.timeframe, open_time=str(candle.open_time)) await storage.store(candle) + metrics.events_processed.labels(service="data-collector", event_type="candle").inc() # Use the first configured timeframe for the WebSocket subscription. timeframe = config.timeframes[0] if config.timeframes else "1m" @@ -37,15 +41,26 @@ async def run() -> None: on_candle=on_candle, ) - logger.info( - "Starting data collector for symbols=%s timeframe=%s", - config.symbols, - timeframe, + health = HealthCheckServer("data-collector", port=config.health_port) + health.register_check("redis", broker.ping) + await health.start() + metrics.service_up.labels(service="data-collector").set(1) + + log.info( + "service_started", + symbols=config.symbols, + timeframe=timeframe, ) try: await ws.start() + except Exception as exc: + log.error("fatal_error", error=str(exc)) + await notifier.send_error(str(exc), "data-collector") + raise finally: + metrics.service_up.labels(service="data-collector").set(0) + await notifier.close() await broker.close() await db.close() diff --git a/services/order-executor/src/order_executor/executor.py b/services/order-executor/src/order_executor/executor.py index 16ae52c..099520d 100644 --- a/services/order-executor/src/order_executor/executor.py +++ b/services/order-executor/src/order_executor/executor.py @@ -1,5 +1,5 @@ """Order execution logic.""" -import logging +import structlog from datetime import datetime, timezone from decimal import Decimal from typing import Any, Optional @@ -8,10 +8,11 @@ from shared.broker import RedisBroker from shared.db import Database from shared.events import OrderEvent from shared.models import Order, OrderSide, OrderStatus, OrderType, Signal +from shared.notifier import TelegramNotifier from order_executor.risk_manager import RiskManager -logger = logging.getLogger(__name__) +logger = structlog.get_logger() class OrderExecutor: @@ -23,12 +24,14 @@ class OrderExecutor: risk_manager: RiskManager, broker: RedisBroker, db: Database, + notifier: TelegramNotifier, dry_run: bool = True, ) -> None: self.exchange = exchange self.risk_manager = risk_manager self.broker = broker self.db = db + self.notifier = notifier self.dry_run = dry_run async def execute(self, signal: Signal) -> Optional[Order]: @@ -56,7 +59,7 @@ class OrderExecutor: if not result.allowed: logger.warning( - "Risk check rejected signal %s: %s", signal.id, result.reason + "risk_check_rejected", signal_id=str(signal.id), reason=result.reason ) return None @@ -74,7 +77,7 @@ class OrderExecutor: if self.dry_run: order.status = OrderStatus.FILLED order.filled_at = datetime.now(timezone.utc) - logger.info("[DRY RUN] Order filled: %s %s %s", order.side, order.quantity, order.symbol) + logger.info("order_filled_dry_run", side=str(order.side), quantity=str(order.quantity), symbol=order.symbol) else: try: await self.exchange.create_order( @@ -85,10 +88,10 @@ class OrderExecutor: ) order.status = OrderStatus.FILLED order.filled_at = datetime.now(timezone.utc) - logger.info("Order filled: %s %s %s", order.side, order.quantity, order.symbol) + logger.info("order_filled", side=str(order.side), quantity=str(order.quantity), symbol=order.symbol) except Exception as exc: order.status = OrderStatus.FAILED - logger.error("Order failed for signal %s: %s", signal.id, exc) + logger.error("order_failed", signal_id=str(signal.id), error=str(exc)) # Persist to DB await self.db.insert_order(order) @@ -97,4 +100,7 @@ class OrderExecutor: event = OrderEvent(data=order) await self.broker.publish("orders", event.to_dict()) + # Notify via Telegram + await self.notifier.send_order(order) + return order diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py index b57c513..7f0578d 100644 --- a/services/order-executor/src/order_executor/main.py +++ b/services/order-executor/src/order_executor/main.py @@ -1,6 +1,5 @@ """Order Executor Service entry point.""" import asyncio -import logging from decimal import Decimal import ccxt.async_support as ccxt @@ -8,18 +7,21 @@ import ccxt.async_support as ccxt from shared.broker import RedisBroker from shared.db import Database from shared.events import Event, EventType +from shared.healthcheck import HealthCheckServer +from shared.logging import setup_logging +from shared.metrics import ServiceMetrics +from shared.notifier import TelegramNotifier from order_executor.config import ExecutorConfig from order_executor.executor import OrderExecutor from order_executor.risk_manager import RiskManager -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - async def run() -> None: config = ExecutorConfig() - logging.getLogger().setLevel(config.log_level) + log = setup_logging("order-executor", config.log_level, config.log_format) + metrics = ServiceMetrics("order_executor") + notifier = TelegramNotifier(bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id) db = Database(config.database_url) await db.connect() @@ -45,12 +47,19 @@ async def run() -> None: risk_manager=risk_manager, broker=broker, db=db, + notifier=notifier, dry_run=config.dry_run, ) last_id = "$" stream = "signals" - logger.info("Order executor started, listening on stream=%s dry_run=%s", stream, config.dry_run) + + health = HealthCheckServer("order-executor", port=config.health_port + 2) + health.register_check("redis", broker.ping) + await health.start() + metrics.service_up.labels(service="order-executor").set(1) + + log.info("service_started", stream=stream, dry_run=config.dry_run) try: while True: @@ -60,16 +69,24 @@ async def run() -> None: event = Event.from_dict(msg) if event.type == EventType.SIGNAL: signal = event.data - logger.info("Processing signal %s for %s", signal.id, signal.symbol) + log.info("processing_signal", signal_id=str(signal.id), symbol=signal.symbol) await executor.execute(signal) + metrics.events_processed.labels(service="order-executor", event_type="signal").inc() except Exception as exc: - logger.error("Failed to process message: %s", exc) + log.error("message_processing_failed", error=str(exc)) + metrics.errors_total.labels(service="order-executor", error_type="processing").inc() if messages: # Advance last_id to avoid re-reading — broker.read returns decoded dicts, # so we track progress by re-reading with "0" for replaying or "$" for new only. # Since we block on "$" we get only new messages each iteration. pass + except Exception as exc: + log.error("fatal_error", error=str(exc)) + await notifier.send_error(str(exc), "order-executor") + raise finally: + metrics.service_up.labels(service="order-executor").set(0) + await notifier.close() await broker.close() await db.close() await exchange.close() diff --git a/services/order-executor/tests/test_executor.py b/services/order-executor/tests/test_executor.py index 5b18992..4836ffb 100644 --- a/services/order-executor/tests/test_executor.py +++ b/services/order-executor/tests/test_executor.py @@ -58,6 +58,7 @@ async def test_executor_places_order_when_risk_passes(): risk_manager=risk_manager, broker=broker, db=db, + notifier=AsyncMock(), dry_run=False, ) @@ -84,6 +85,7 @@ async def test_executor_rejects_when_risk_fails(): risk_manager=risk_manager, broker=broker, db=db, + notifier=AsyncMock(), dry_run=False, ) @@ -109,6 +111,7 @@ async def test_executor_dry_run_does_not_call_exchange(): risk_manager=risk_manager, broker=broker, db=db, + notifier=AsyncMock(), dry_run=True, ) diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py index cb7e6a8..56624f7 100644 --- a/services/portfolio-manager/src/portfolio_manager/main.py +++ b/services/portfolio-manager/src/portfolio_manager/main.py @@ -1,26 +1,35 @@ """Portfolio Manager Service entry point.""" import asyncio -import logging from shared.broker import RedisBroker from shared.events import Event, OrderEvent +from shared.healthcheck import HealthCheckServer +from shared.logging import setup_logging +from shared.metrics import ServiceMetrics +from shared.notifier import TelegramNotifier from portfolio_manager.config import PortfolioConfig from portfolio_manager.portfolio import PortfolioTracker -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - ORDERS_STREAM = "orders" async def run() -> None: config = PortfolioConfig() + log = setup_logging("portfolio-manager", config.log_level, config.log_format) + metrics = ServiceMetrics("portfolio_manager") + notifier = TelegramNotifier(bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id) + broker = RedisBroker(config.redis_url) tracker = PortfolioTracker() + health = HealthCheckServer("portfolio-manager", port=config.health_port + 3) + health.register_check("redis", broker.ping) + await health.start() + metrics.service_up.labels(service="portfolio-manager").set(1) + last_id = "$" - logger.info("Portfolio manager started, listening on stream=%s", ORDERS_STREAM) + log.info("service_started", stream=ORDERS_STREAM) try: while True: @@ -31,20 +40,28 @@ async def run() -> None: if isinstance(event, OrderEvent): order = event.data tracker.apply_order(order) - logger.info( - "Applied order symbol=%s side=%s qty=%s price=%s", - order.symbol, - order.side, - order.quantity, - order.price, + log.info( + "order_applied", + symbol=order.symbol, + side=str(order.side), + quantity=str(order.quantity), + price=str(order.price), ) positions = tracker.get_all_positions() - logger.info("Current positions count=%d", len(positions)) - except Exception: - logger.exception("Failed to process message: %s", msg) + log.info("positions_updated", count=len(positions)) + metrics.events_processed.labels(service="portfolio-manager", event_type="order").inc() + except Exception as exc: + log.exception("message_processing_failed", error=str(exc)) + metrics.errors_total.labels(service="portfolio-manager", error_type="processing").inc() # Update last_id to the latest processed message id if broker returns ids # Since broker.read returns parsed payloads (not ids), we use "$" to get new msgs + except Exception as exc: + log.error("fatal_error", error=str(exc)) + await notifier.send_error(str(exc), "portfolio-manager") + raise finally: + metrics.service_up.labels(service="portfolio-manager").set(0) + await notifier.close() await broker.close() diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py index 83bb867..2e3c4ac 100644 --- a/services/strategy-engine/src/strategy_engine/main.py +++ b/services/strategy-engine/src/strategy_engine/main.py @@ -1,23 +1,27 @@ """Strategy Engine Service entry point.""" import asyncio -import logging from pathlib import Path from shared.broker import RedisBroker +from shared.healthcheck import HealthCheckServer +from shared.logging import setup_logging +from shared.metrics import ServiceMetrics +from shared.notifier import TelegramNotifier from strategy_engine.config import StrategyConfig from strategy_engine.engine import StrategyEngine from strategy_engine.plugin_loader import load_strategies -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - # The strategies directory lives alongside the installed package STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies" async def run() -> None: config = StrategyConfig() + log = setup_logging("strategy-engine", config.log_level, config.log_format) + metrics = ServiceMetrics("strategy_engine") + notifier = TelegramNotifier(bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id) + broker = RedisBroker(config.redis_url) strategies_dir = STRATEGIES_DIR @@ -28,23 +32,35 @@ async def run() -> None: params = config.strategy_params.get(strategy.name, {}) strategy.configure(params) - logger.info( - "Loaded %d strategies: %s", - len(strategies), - [s.name for s in strategies], + log.info( + "strategies_loaded", + count=len(strategies), + names=[s.name for s in strategies], ) engine = StrategyEngine(broker=broker, strategies=strategies) + health = HealthCheckServer("strategy-engine", port=config.health_port + 1) + health.register_check("redis", broker.ping) + await health.start() + metrics.service_up.labels(service="strategy-engine").set(1) + try: for symbol in config.symbols: stream = f"candles.{symbol.replace('/', '_')}" last_id = "$" - logger.info("Starting engine loop for stream=%s", stream) + log.info("engine_loop_started", stream=stream) while True: last_id = await engine.process_once(stream, last_id) + metrics.events_processed.labels(service="strategy-engine", event_type="candle").inc() + except Exception as exc: + log.error("fatal_error", error=str(exc)) + await notifier.send_error(str(exc), "strategy-engine") + raise finally: + metrics.service_up.labels(service="strategy-engine").set(0) + await notifier.close() await broker.close() |
