diff options
| author | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-01 16:12:40 +0900 |
|---|---|---|
| committer | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-01 16:12:40 +0900 |
| commit | c3560fdd637c4f034cac4f7371aeed65d018bc91 (patch) | |
| tree | c50d3318c0c37ca634012b7113a80c0d6e356844 /services/data-collector/src/data_collector/main.py | |
| parent | ec792a3d379c911165038d8da5b339df6ca3fccd (diff) | |
feat(services): integrate structlog, healthcheck, metrics, and Telegram
Replace logging.basicConfig/getLogger with structlog setup_logging in all
four service entry points. Add HealthCheckServer, ServiceMetrics, and
TelegramNotifier initialization to each service. Update OrderExecutor to
accept a notifier parameter and send order notifications. Add
RedisBroker.ping() for health checks. Update executor tests with
notifier=AsyncMock().
Diffstat (limited to 'services/data-collector/src/data_collector/main.py')
| -rw-r--r-- | services/data-collector/src/data_collector/main.py | 33 |
1 files changed, 24 insertions, 9 deletions
diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py index adf1e96..c778503 100644 --- a/services/data-collector/src/data_collector/main.py +++ b/services/data-collector/src/data_collector/main.py @@ -1,21 +1,24 @@ """Data Collector Service entry point.""" import asyncio -import logging from shared.broker import RedisBroker from shared.db import Database +from shared.healthcheck import HealthCheckServer +from shared.logging import setup_logging +from shared.metrics import ServiceMetrics +from shared.notifier import TelegramNotifier from data_collector.binance_ws import BinanceWebSocket from data_collector.config import CollectorConfig from data_collector.storage import CandleStorage -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - async def run() -> None: """Initialise all components and start the WebSocket collector.""" config = CollectorConfig() + log = setup_logging("data-collector", config.log_level, config.log_format) + metrics = ServiceMetrics("data_collector") + notifier = TelegramNotifier(bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id) db = Database(config.database_url) await db.connect() @@ -25,8 +28,9 @@ async def run() -> None: storage = CandleStorage(db=db, broker=broker) async def on_candle(candle): - logger.info("Candle received: %s %s %s", candle.symbol, candle.timeframe, candle.open_time) + log.info("candle_received", symbol=candle.symbol, timeframe=candle.timeframe, open_time=str(candle.open_time)) await storage.store(candle) + metrics.events_processed.labels(service="data-collector", event_type="candle").inc() # Use the first configured timeframe for the WebSocket subscription. timeframe = config.timeframes[0] if config.timeframes else "1m" @@ -37,15 +41,26 @@ async def run() -> None: on_candle=on_candle, ) - logger.info( - "Starting data collector for symbols=%s timeframe=%s", - config.symbols, - timeframe, + health = HealthCheckServer("data-collector", port=config.health_port) + health.register_check("redis", broker.ping) + await health.start() + metrics.service_up.labels(service="data-collector").set(1) + + log.info( + "service_started", + symbols=config.symbols, + timeframe=timeframe, ) try: await ws.start() + except Exception as exc: + log.error("fatal_error", error=str(exc)) + await notifier.send_error(str(exc), "data-collector") + raise finally: + metrics.service_up.labels(service="data-collector").set(0) + await notifier.close() await broker.close() await db.close() |
