"""Binance WebSocket client for real-time kline/candle data.""" import asyncio import json import logging from datetime import datetime, timezone from decimal import Decimal from typing import Callable, Awaitable import websockets from shared.models import Candle logger = logging.getLogger(__name__) BINANCE_WS_URL = "wss://stream.binance.com:9443/ws" RECONNECT_DELAY = 5 # seconds def _normalize_symbol(symbol: str) -> str: """Convert 'BTC/USDT' to 'BTCUSDT'.""" return symbol.replace("/", "") def _stream_name(symbol: str, timeframe: str) -> str: """Build Binance stream name, e.g. 'btcusdt@kline_1m'.""" return f"{_normalize_symbol(symbol).lower()}@kline_{timeframe}" class BinanceWebSocket: """Connects to Binance WebSocket streams and emits closed candles.""" def __init__( self, symbols: list[str], timeframe: str, on_candle: Callable[[Candle], Awaitable[None]], ) -> None: self._symbols = symbols self._timeframe = timeframe self._on_candle = on_candle self._running = False def _build_subscribe_message(self) -> dict: streams = [_stream_name(s, self._timeframe) for s in self._symbols] return { "method": "SUBSCRIBE", "params": streams, "id": 1, } def _parse_candle(self, message: dict) -> Candle | None: """Parse a kline WebSocket message into a Candle, or None if not closed.""" k = message.get("k") if k is None: return None if not k.get("x"): # only closed candles return None symbol = k["s"] # already normalized, e.g. 'BTCUSDT' open_time = datetime.fromtimestamp(k["t"] / 1000, tz=timezone.utc) return Candle( symbol=symbol, timeframe=self._timeframe, open_time=open_time, open=Decimal(k["o"]), high=Decimal(k["h"]), low=Decimal(k["l"]), close=Decimal(k["c"]), volume=Decimal(k["v"]), ) async def _run_once(self) -> None: """Single connection attempt; processes messages until disconnected.""" async with websockets.connect(BINANCE_WS_URL) as ws: subscribe_msg = self._build_subscribe_message() await ws.send(json.dumps(subscribe_msg)) logger.info("Subscribed to Binance streams: %s", subscribe_msg["params"]) async for raw in ws: if not self._running: break try: message = json.loads(raw) candle = self._parse_candle(message) if candle is not None: await self._on_candle(candle) except Exception: logger.exception("Error processing WebSocket message: %s", raw) async def start(self) -> None: """Connect to Binance WebSocket and process messages, auto-reconnecting.""" self._running = True while self._running: try: await self._run_once() except Exception: if not self._running: break logger.warning( "WebSocket disconnected. Reconnecting in %ds…", RECONNECT_DELAY ) await asyncio.sleep(RECONNECT_DELAY) def stop(self) -> None: """Signal the WebSocket loop to stop after the current message.""" self._running = False