1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
"""Binance WebSocket client for real-time kline/candle data."""
import asyncio
import json
import logging
from datetime import datetime, timezone
from decimal import Decimal
from typing import Callable, Awaitable
import websockets
from shared.models import Candle
logger = logging.getLogger(__name__)
BINANCE_WS_URL = "wss://stream.binance.com:9443/ws"
RECONNECT_DELAY = 5 # seconds
def _normalize_symbol(symbol: str) -> str:
"""Convert 'BTC/USDT' to 'BTCUSDT'."""
return symbol.replace("/", "")
def _stream_name(symbol: str, timeframe: str) -> str:
"""Build Binance stream name, e.g. 'btcusdt@kline_1m'."""
return f"{_normalize_symbol(symbol).lower()}@kline_{timeframe}"
class BinanceWebSocket:
"""Connects to Binance WebSocket streams and emits closed candles."""
def __init__(
self,
symbols: list[str],
timeframe: str,
on_candle: Callable[[Candle], Awaitable[None]],
) -> None:
self._symbols = symbols
self._timeframe = timeframe
self._on_candle = on_candle
self._running = False
def _build_subscribe_message(self) -> dict:
streams = [_stream_name(s, self._timeframe) for s in self._symbols]
return {
"method": "SUBSCRIBE",
"params": streams,
"id": 1,
}
def _parse_candle(self, message: dict) -> Candle | None:
"""Parse a kline WebSocket message into a Candle, or None if not closed."""
k = message.get("k")
if k is None:
return None
if not k.get("x"): # only closed candles
return None
symbol = k["s"] # already normalized, e.g. 'BTCUSDT'
open_time = datetime.fromtimestamp(k["t"] / 1000, tz=timezone.utc)
return Candle(
symbol=symbol,
timeframe=self._timeframe,
open_time=open_time,
open=Decimal(k["o"]),
high=Decimal(k["h"]),
low=Decimal(k["l"]),
close=Decimal(k["c"]),
volume=Decimal(k["v"]),
)
async def _run_once(self) -> None:
"""Single connection attempt; processes messages until disconnected."""
async with websockets.connect(BINANCE_WS_URL) as ws:
subscribe_msg = self._build_subscribe_message()
await ws.send(json.dumps(subscribe_msg))
logger.info("Subscribed to Binance streams: %s", subscribe_msg["params"])
async for raw in ws:
if not self._running:
break
try:
message = json.loads(raw)
candle = self._parse_candle(message)
if candle is not None:
await self._on_candle(candle)
except Exception:
logger.exception("Error processing WebSocket message: %s", raw)
async def start(self) -> None:
"""Connect to Binance WebSocket and process messages, auto-reconnecting."""
self._running = True
while self._running:
try:
await self._run_once()
except Exception:
if not self._running:
break
logger.warning(
"WebSocket disconnected. Reconnecting in %ds…", RECONNECT_DELAY
)
await asyncio.sleep(RECONNECT_DELAY)
def stop(self) -> None:
"""Signal the WebSocket loop to stop after the current message."""
self._running = False
|