diff options
Diffstat (limited to 'services/data-collector/src')
3 files changed, 0 insertions, 197 deletions
diff --git a/services/data-collector/src/data_collector/binance_rest.py b/services/data-collector/src/data_collector/binance_rest.py deleted file mode 100644 index eaf4e30..0000000 --- a/services/data-collector/src/data_collector/binance_rest.py +++ /dev/null @@ -1,54 +0,0 @@ -"""Binance REST API helpers for fetching historical candle data.""" - -from datetime import datetime, timezone -from decimal import Decimal - -from shared.models import Candle - - -def _normalize_symbol(symbol: str) -> str: - """Convert 'BTC/USDT' to 'BTCUSDT'.""" - return symbol.replace("/", "") - - -async def fetch_historical_candles( - exchange, - symbol: str, - timeframe: str, - since: int, - limit: int = 500, -) -> list[Candle]: - """Fetch historical OHLCV candles from the exchange and return Candle models. - - Args: - exchange: An async ccxt exchange instance. - symbol: Market symbol, e.g. 'BTC/USDT'. - timeframe: Candle timeframe, e.g. '1m'. - since: Start timestamp in milliseconds. - limit: Maximum number of candles to fetch. - - Returns: - A list of Candle model instances. - """ - rows = await exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit) - - normalized = _normalize_symbol(symbol) - candles: list[Candle] = [] - - for row in rows: - ts_ms, o, h, low, c, v = row - open_time = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc) - candles.append( - Candle( - symbol=normalized, - timeframe=timeframe, - open_time=open_time, - open=Decimal(str(o)), - high=Decimal(str(h)), - low=Decimal(str(low)), - close=Decimal(str(c)), - volume=Decimal(str(v)), - ) - ) - - return candles diff --git a/services/data-collector/src/data_collector/binance_ws.py b/services/data-collector/src/data_collector/binance_ws.py deleted file mode 100644 index e25e7a6..0000000 --- a/services/data-collector/src/data_collector/binance_ws.py +++ /dev/null @@ -1,109 +0,0 @@ -"""Binance WebSocket client for real-time kline/candle data. - -NOTE: This module is Binance-specific (uses Binance WebSocket URL and message format). -Multi-exchange WebSocket support would require exchange-specific implementations. -""" - -import asyncio -import json -import logging -from datetime import datetime, timezone -from decimal import Decimal -from typing import Callable, Awaitable - -import websockets - -from shared.models import Candle - -logger = logging.getLogger(__name__) - -BINANCE_WS_URL = "wss://stream.binance.com:9443/ws" -RECONNECT_DELAY = 5 # seconds - - -def _normalize_symbol(symbol: str) -> str: - """Convert 'BTC/USDT' to 'BTCUSDT'.""" - return symbol.replace("/", "") - - -def _stream_name(symbol: str, timeframe: str) -> str: - """Build Binance stream name, e.g. 'btcusdt@kline_1m'.""" - return f"{_normalize_symbol(symbol).lower()}@kline_{timeframe}" - - -class BinanceWebSocket: - """Connects to Binance WebSocket streams and emits closed candles.""" - - def __init__( - self, - symbols: list[str], - timeframe: str, - on_candle: Callable[[Candle], Awaitable[None]], - ) -> None: - self._symbols = symbols - self._timeframe = timeframe - self._on_candle = on_candle - self._running = False - - def _build_subscribe_message(self) -> dict: - streams = [_stream_name(s, self._timeframe) for s in self._symbols] - return { - "method": "SUBSCRIBE", - "params": streams, - "id": 1, - } - - def _parse_candle(self, message: dict) -> Candle | None: - """Parse a kline WebSocket message into a Candle, or None if not closed.""" - k = message.get("k") - if k is None: - return None - if not k.get("x"): # only closed candles - return None - - symbol = k["s"] # already normalized, e.g. 'BTCUSDT' - open_time = datetime.fromtimestamp(k["t"] / 1000, tz=timezone.utc) - return Candle( - symbol=symbol, - timeframe=self._timeframe, - open_time=open_time, - open=Decimal(k["o"]), - high=Decimal(k["h"]), - low=Decimal(k["l"]), - close=Decimal(k["c"]), - volume=Decimal(k["v"]), - ) - - async def _run_once(self) -> None: - """Single connection attempt; processes messages until disconnected.""" - async with websockets.connect(BINANCE_WS_URL) as ws: - subscribe_msg = self._build_subscribe_message() - await ws.send(json.dumps(subscribe_msg)) - logger.info("Subscribed to Binance streams: %s", subscribe_msg["params"]) - - async for raw in ws: - if not self._running: - break - try: - message = json.loads(raw) - candle = self._parse_candle(message) - if candle is not None: - await self._on_candle(candle) - except Exception: - logger.exception("Error processing WebSocket message: %s", raw) - - async def start(self) -> None: - """Connect to Binance WebSocket and process messages, auto-reconnecting.""" - self._running = True - while self._running: - try: - await self._run_once() - except Exception: - if not self._running: - break - logger.warning("WebSocket disconnected. Reconnecting in %ds…", RECONNECT_DELAY) - await asyncio.sleep(RECONNECT_DELAY) - - def stop(self) -> None: - """Signal the WebSocket loop to stop after the current message.""" - self._running = False diff --git a/services/data-collector/src/data_collector/ws_factory.py b/services/data-collector/src/data_collector/ws_factory.py deleted file mode 100644 index e068399..0000000 --- a/services/data-collector/src/data_collector/ws_factory.py +++ /dev/null @@ -1,34 +0,0 @@ -"""WebSocket factory for exchange-specific connections.""" - -import logging - -from data_collector.binance_ws import BinanceWebSocket - -logger = logging.getLogger(__name__) - -# Supported exchanges for WebSocket streaming -SUPPORTED_WS = {"binance": BinanceWebSocket} - - -def create_websocket(exchange_id: str, **kwargs): - """Create an exchange-specific WebSocket handler. - - Args: - exchange_id: Exchange identifier (e.g. 'binance') - **kwargs: Passed to the WebSocket constructor (symbols, timeframe, on_candle) - - Returns: - WebSocket handler instance - - Raises: - ValueError: If exchange is not supported for WebSocket streaming - """ - ws_cls = SUPPORTED_WS.get(exchange_id) - if ws_cls is None: - supported = ", ".join(sorted(SUPPORTED_WS.keys())) - raise ValueError( - f"WebSocket streaming not supported for '{exchange_id}'. " - f"Supported: {supported}. " - f"Use REST polling as fallback for unsupported exchanges." - ) - return ws_cls(**kwargs) |
