diff options
Diffstat (limited to 'services/data-collector/src')
4 files changed, 16 insertions, 7 deletions
diff --git a/services/data-collector/src/data_collector/binance_rest.py b/services/data-collector/src/data_collector/binance_rest.py index af0eb77..eaf4e30 100644 --- a/services/data-collector/src/data_collector/binance_rest.py +++ b/services/data-collector/src/data_collector/binance_rest.py @@ -1,4 +1,5 @@ """Binance REST API helpers for fetching historical candle data.""" + from datetime import datetime, timezone from decimal import Decimal @@ -35,7 +36,7 @@ async def fetch_historical_candles( candles: list[Candle] = [] for row in rows: - ts_ms, o, h, l, c, v = row + ts_ms, o, h, low, c, v = row open_time = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc) candles.append( Candle( @@ -44,7 +45,7 @@ async def fetch_historical_candles( open_time=open_time, open=Decimal(str(o)), high=Decimal(str(h)), - low=Decimal(str(l)), + low=Decimal(str(low)), close=Decimal(str(c)), volume=Decimal(str(v)), ) diff --git a/services/data-collector/src/data_collector/binance_ws.py b/services/data-collector/src/data_collector/binance_ws.py index 7a4bad2..a1c81d6 100644 --- a/services/data-collector/src/data_collector/binance_ws.py +++ b/services/data-collector/src/data_collector/binance_ws.py @@ -1,4 +1,5 @@ """Binance WebSocket client for real-time kline/candle data.""" + import asyncio import json import logging @@ -96,9 +97,7 @@ class BinanceWebSocket: except Exception: if not self._running: break - logger.warning( - "WebSocket disconnected. Reconnecting in %ds…", RECONNECT_DELAY - ) + logger.warning("WebSocket disconnected. Reconnecting in %ds…", RECONNECT_DELAY) await asyncio.sleep(RECONNECT_DELAY) def stop(self) -> None: diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py index c778503..170e8b1 100644 --- a/services/data-collector/src/data_collector/main.py +++ b/services/data-collector/src/data_collector/main.py @@ -1,4 +1,5 @@ """Data Collector Service entry point.""" + import asyncio from shared.broker import RedisBroker @@ -18,7 +19,9 @@ async def run() -> None: config = CollectorConfig() log = setup_logging("data-collector", config.log_level, config.log_format) metrics = ServiceMetrics("data_collector") - notifier = TelegramNotifier(bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id) + notifier = TelegramNotifier( + bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id + ) db = Database(config.database_url) await db.connect() @@ -28,7 +31,12 @@ async def run() -> None: storage = CandleStorage(db=db, broker=broker) async def on_candle(candle): - log.info("candle_received", symbol=candle.symbol, timeframe=candle.timeframe, open_time=str(candle.open_time)) + log.info( + "candle_received", + symbol=candle.symbol, + timeframe=candle.timeframe, + open_time=str(candle.open_time), + ) await storage.store(candle) metrics.events_processed.labels(service="data-collector", event_type="candle").inc() diff --git a/services/data-collector/src/data_collector/storage.py b/services/data-collector/src/data_collector/storage.py index 1e40b82..aeeaaed 100644 --- a/services/data-collector/src/data_collector/storage.py +++ b/services/data-collector/src/data_collector/storage.py @@ -1,4 +1,5 @@ """Candle storage: persists to DB and publishes to Redis.""" + from shared.events import CandleEvent from shared.models import Candle |
