diff options
Diffstat (limited to 'services/data-collector/src/data_collector/binance_ws.py')
| -rw-r--r-- | services/data-collector/src/data_collector/binance_ws.py | 106 |
1 files changed, 106 insertions, 0 deletions
diff --git a/services/data-collector/src/data_collector/binance_ws.py b/services/data-collector/src/data_collector/binance_ws.py new file mode 100644 index 0000000..7a4bad2 --- /dev/null +++ b/services/data-collector/src/data_collector/binance_ws.py @@ -0,0 +1,106 @@ +"""Binance WebSocket client for real-time kline/candle data.""" +import asyncio +import json +import logging +from datetime import datetime, timezone +from decimal import Decimal +from typing import Callable, Awaitable + +import websockets + +from shared.models import Candle + +logger = logging.getLogger(__name__) + +BINANCE_WS_URL = "wss://stream.binance.com:9443/ws" +RECONNECT_DELAY = 5 # seconds + + +def _normalize_symbol(symbol: str) -> str: + """Convert 'BTC/USDT' to 'BTCUSDT'.""" + return symbol.replace("/", "") + + +def _stream_name(symbol: str, timeframe: str) -> str: + """Build Binance stream name, e.g. 'btcusdt@kline_1m'.""" + return f"{_normalize_symbol(symbol).lower()}@kline_{timeframe}" + + +class BinanceWebSocket: + """Connects to Binance WebSocket streams and emits closed candles.""" + + def __init__( + self, + symbols: list[str], + timeframe: str, + on_candle: Callable[[Candle], Awaitable[None]], + ) -> None: + self._symbols = symbols + self._timeframe = timeframe + self._on_candle = on_candle + self._running = False + + def _build_subscribe_message(self) -> dict: + streams = [_stream_name(s, self._timeframe) for s in self._symbols] + return { + "method": "SUBSCRIBE", + "params": streams, + "id": 1, + } + + def _parse_candle(self, message: dict) -> Candle | None: + """Parse a kline WebSocket message into a Candle, or None if not closed.""" + k = message.get("k") + if k is None: + return None + if not k.get("x"): # only closed candles + return None + + symbol = k["s"] # already normalized, e.g. 'BTCUSDT' + open_time = datetime.fromtimestamp(k["t"] / 1000, tz=timezone.utc) + return Candle( + symbol=symbol, + timeframe=self._timeframe, + open_time=open_time, + open=Decimal(k["o"]), + high=Decimal(k["h"]), + low=Decimal(k["l"]), + close=Decimal(k["c"]), + volume=Decimal(k["v"]), + ) + + async def _run_once(self) -> None: + """Single connection attempt; processes messages until disconnected.""" + async with websockets.connect(BINANCE_WS_URL) as ws: + subscribe_msg = self._build_subscribe_message() + await ws.send(json.dumps(subscribe_msg)) + logger.info("Subscribed to Binance streams: %s", subscribe_msg["params"]) + + async for raw in ws: + if not self._running: + break + try: + message = json.loads(raw) + candle = self._parse_candle(message) + if candle is not None: + await self._on_candle(candle) + except Exception: + logger.exception("Error processing WebSocket message: %s", raw) + + async def start(self) -> None: + """Connect to Binance WebSocket and process messages, auto-reconnecting.""" + self._running = True + while self._running: + try: + await self._run_once() + except Exception: + if not self._running: + break + logger.warning( + "WebSocket disconnected. Reconnecting in %ds…", RECONNECT_DELAY + ) + await asyncio.sleep(RECONNECT_DELAY) + + def stop(self) -> None: + """Signal the WebSocket loop to stop after the current message.""" + self._running = False |
