summaryrefslogtreecommitdiff
path: root/services/data-collector/src/data_collector
diff options
context:
space:
mode:
Diffstat (limited to 'services/data-collector/src/data_collector')
-rw-r--r--services/data-collector/src/data_collector/binance_rest.py5
-rw-r--r--services/data-collector/src/data_collector/binance_ws.py5
-rw-r--r--services/data-collector/src/data_collector/main.py12
-rw-r--r--services/data-collector/src/data_collector/storage.py1
4 files changed, 16 insertions, 7 deletions
diff --git a/services/data-collector/src/data_collector/binance_rest.py b/services/data-collector/src/data_collector/binance_rest.py
index af0eb77..eaf4e30 100644
--- a/services/data-collector/src/data_collector/binance_rest.py
+++ b/services/data-collector/src/data_collector/binance_rest.py
@@ -1,4 +1,5 @@
"""Binance REST API helpers for fetching historical candle data."""
+
from datetime import datetime, timezone
from decimal import Decimal
@@ -35,7 +36,7 @@ async def fetch_historical_candles(
candles: list[Candle] = []
for row in rows:
- ts_ms, o, h, l, c, v = row
+ ts_ms, o, h, low, c, v = row
open_time = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc)
candles.append(
Candle(
@@ -44,7 +45,7 @@ async def fetch_historical_candles(
open_time=open_time,
open=Decimal(str(o)),
high=Decimal(str(h)),
- low=Decimal(str(l)),
+ low=Decimal(str(low)),
close=Decimal(str(c)),
volume=Decimal(str(v)),
)
diff --git a/services/data-collector/src/data_collector/binance_ws.py b/services/data-collector/src/data_collector/binance_ws.py
index 7a4bad2..a1c81d6 100644
--- a/services/data-collector/src/data_collector/binance_ws.py
+++ b/services/data-collector/src/data_collector/binance_ws.py
@@ -1,4 +1,5 @@
"""Binance WebSocket client for real-time kline/candle data."""
+
import asyncio
import json
import logging
@@ -96,9 +97,7 @@ class BinanceWebSocket:
except Exception:
if not self._running:
break
- logger.warning(
- "WebSocket disconnected. Reconnecting in %ds…", RECONNECT_DELAY
- )
+ logger.warning("WebSocket disconnected. Reconnecting in %ds…", RECONNECT_DELAY)
await asyncio.sleep(RECONNECT_DELAY)
def stop(self) -> None:
diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py
index c778503..170e8b1 100644
--- a/services/data-collector/src/data_collector/main.py
+++ b/services/data-collector/src/data_collector/main.py
@@ -1,4 +1,5 @@
"""Data Collector Service entry point."""
+
import asyncio
from shared.broker import RedisBroker
@@ -18,7 +19,9 @@ async def run() -> None:
config = CollectorConfig()
log = setup_logging("data-collector", config.log_level, config.log_format)
metrics = ServiceMetrics("data_collector")
- notifier = TelegramNotifier(bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id)
+ notifier = TelegramNotifier(
+ bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id
+ )
db = Database(config.database_url)
await db.connect()
@@ -28,7 +31,12 @@ async def run() -> None:
storage = CandleStorage(db=db, broker=broker)
async def on_candle(candle):
- log.info("candle_received", symbol=candle.symbol, timeframe=candle.timeframe, open_time=str(candle.open_time))
+ log.info(
+ "candle_received",
+ symbol=candle.symbol,
+ timeframe=candle.timeframe,
+ open_time=str(candle.open_time),
+ )
await storage.store(candle)
metrics.events_processed.labels(service="data-collector", event_type="candle").inc()
diff --git a/services/data-collector/src/data_collector/storage.py b/services/data-collector/src/data_collector/storage.py
index 1e40b82..aeeaaed 100644
--- a/services/data-collector/src/data_collector/storage.py
+++ b/services/data-collector/src/data_collector/storage.py
@@ -1,4 +1,5 @@
"""Candle storage: persists to DB and publishes to Redis."""
+
from shared.events import CandleEvent
from shared.models import Candle