summaryrefslogtreecommitdiff
path: root/services/data-collector/src/data_collector
diff options
context:
space:
mode:
authorTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 16:12:40 +0900
committerTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 16:12:40 +0900
commitc3560fdd637c4f034cac4f7371aeed65d018bc91 (patch)
treec50d3318c0c37ca634012b7113a80c0d6e356844 /services/data-collector/src/data_collector
parentec792a3d379c911165038d8da5b339df6ca3fccd (diff)
feat(services): integrate structlog, healthcheck, metrics, and Telegram
Replace logging.basicConfig/getLogger with structlog setup_logging in all four service entry points. Add HealthCheckServer, ServiceMetrics, and TelegramNotifier initialization to each service. Update OrderExecutor to accept a notifier parameter and send order notifications. Add RedisBroker.ping() for health checks. Update executor tests with notifier=AsyncMock().
Diffstat (limited to 'services/data-collector/src/data_collector')
-rw-r--r--services/data-collector/src/data_collector/main.py33
1 files changed, 24 insertions, 9 deletions
diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py
index adf1e96..c778503 100644
--- a/services/data-collector/src/data_collector/main.py
+++ b/services/data-collector/src/data_collector/main.py
@@ -1,21 +1,24 @@
"""Data Collector Service entry point."""
import asyncio
-import logging
from shared.broker import RedisBroker
from shared.db import Database
+from shared.healthcheck import HealthCheckServer
+from shared.logging import setup_logging
+from shared.metrics import ServiceMetrics
+from shared.notifier import TelegramNotifier
from data_collector.binance_ws import BinanceWebSocket
from data_collector.config import CollectorConfig
from data_collector.storage import CandleStorage
-logging.basicConfig(level=logging.INFO)
-logger = logging.getLogger(__name__)
-
async def run() -> None:
"""Initialise all components and start the WebSocket collector."""
config = CollectorConfig()
+ log = setup_logging("data-collector", config.log_level, config.log_format)
+ metrics = ServiceMetrics("data_collector")
+ notifier = TelegramNotifier(bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id)
db = Database(config.database_url)
await db.connect()
@@ -25,8 +28,9 @@ async def run() -> None:
storage = CandleStorage(db=db, broker=broker)
async def on_candle(candle):
- logger.info("Candle received: %s %s %s", candle.symbol, candle.timeframe, candle.open_time)
+ log.info("candle_received", symbol=candle.symbol, timeframe=candle.timeframe, open_time=str(candle.open_time))
await storage.store(candle)
+ metrics.events_processed.labels(service="data-collector", event_type="candle").inc()
# Use the first configured timeframe for the WebSocket subscription.
timeframe = config.timeframes[0] if config.timeframes else "1m"
@@ -37,15 +41,26 @@ async def run() -> None:
on_candle=on_candle,
)
- logger.info(
- "Starting data collector for symbols=%s timeframe=%s",
- config.symbols,
- timeframe,
+ health = HealthCheckServer("data-collector", port=config.health_port)
+ health.register_check("redis", broker.ping)
+ await health.start()
+ metrics.service_up.labels(service="data-collector").set(1)
+
+ log.info(
+ "service_started",
+ symbols=config.symbols,
+ timeframe=timeframe,
)
try:
await ws.start()
+ except Exception as exc:
+ log.error("fatal_error", error=str(exc))
+ await notifier.send_error(str(exc), "data-collector")
+ raise
finally:
+ metrics.service_up.labels(service="data-collector").set(0)
+ await notifier.close()
await broker.close()
await db.close()