summaryrefslogtreecommitdiff
path: root/services/data-collector/src/data_collector/binance_ws.py
blob: e25e7a6178d42eeb15b8c5f7d9d44bed3aa3d384 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
"""Binance WebSocket client for real-time kline/candle data.

NOTE: This module is Binance-specific (uses Binance WebSocket URL and message format).
Multi-exchange WebSocket support would require exchange-specific implementations.
"""

import asyncio
import json
import logging
from datetime import datetime, timezone
from decimal import Decimal
from typing import Callable, Awaitable

import websockets

from shared.models import Candle

logger = logging.getLogger(__name__)

BINANCE_WS_URL = "wss://stream.binance.com:9443/ws"
RECONNECT_DELAY = 5  # seconds


def _normalize_symbol(symbol: str) -> str:
    """Convert 'BTC/USDT' to 'BTCUSDT'."""
    return symbol.replace("/", "")


def _stream_name(symbol: str, timeframe: str) -> str:
    """Build Binance stream name, e.g. 'btcusdt@kline_1m'."""
    return f"{_normalize_symbol(symbol).lower()}@kline_{timeframe}"


class BinanceWebSocket:
    """Connects to Binance WebSocket streams and emits closed candles."""

    def __init__(
        self,
        symbols: list[str],
        timeframe: str,
        on_candle: Callable[[Candle], Awaitable[None]],
    ) -> None:
        self._symbols = symbols
        self._timeframe = timeframe
        self._on_candle = on_candle
        self._running = False

    def _build_subscribe_message(self) -> dict:
        streams = [_stream_name(s, self._timeframe) for s in self._symbols]
        return {
            "method": "SUBSCRIBE",
            "params": streams,
            "id": 1,
        }

    def _parse_candle(self, message: dict) -> Candle | None:
        """Parse a kline WebSocket message into a Candle, or None if not closed."""
        k = message.get("k")
        if k is None:
            return None
        if not k.get("x"):  # only closed candles
            return None

        symbol = k["s"]  # already normalized, e.g. 'BTCUSDT'
        open_time = datetime.fromtimestamp(k["t"] / 1000, tz=timezone.utc)
        return Candle(
            symbol=symbol,
            timeframe=self._timeframe,
            open_time=open_time,
            open=Decimal(k["o"]),
            high=Decimal(k["h"]),
            low=Decimal(k["l"]),
            close=Decimal(k["c"]),
            volume=Decimal(k["v"]),
        )

    async def _run_once(self) -> None:
        """Single connection attempt; processes messages until disconnected."""
        async with websockets.connect(BINANCE_WS_URL) as ws:
            subscribe_msg = self._build_subscribe_message()
            await ws.send(json.dumps(subscribe_msg))
            logger.info("Subscribed to Binance streams: %s", subscribe_msg["params"])

            async for raw in ws:
                if not self._running:
                    break
                try:
                    message = json.loads(raw)
                    candle = self._parse_candle(message)
                    if candle is not None:
                        await self._on_candle(candle)
                except Exception:
                    logger.exception("Error processing WebSocket message: %s", raw)

    async def start(self) -> None:
        """Connect to Binance WebSocket and process messages, auto-reconnecting."""
        self._running = True
        while self._running:
            try:
                await self._run_once()
            except Exception:
                if not self._running:
                    break
                logger.warning("WebSocket disconnected. Reconnecting in %ds…", RECONNECT_DELAY)
                await asyncio.sleep(RECONNECT_DELAY)

    def stop(self) -> None:
        """Signal the WebSocket loop to stop after the current message."""
        self._running = False