summaryrefslogtreecommitdiff
path: root/services/data-collector/src/data_collector/binance_ws.py
diff options
context:
space:
mode:
authorTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-02 10:26:52 +0900
committerTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-02 10:26:52 +0900
commit53cadcf7e34f05f77082e84f0696b56bcbcbae36 (patch)
treee02650e10c4d5727bc1e32e27788c17327fa46f7 /services/data-collector/src/data_collector/binance_ws.py
parentf5521da2876a2c19afc24f370b3258f2be95f81a (diff)
refactor: remove all crypto/Binance code, update to US stock symbols
Diffstat (limited to 'services/data-collector/src/data_collector/binance_ws.py')
-rw-r--r--services/data-collector/src/data_collector/binance_ws.py109
1 files changed, 0 insertions, 109 deletions
diff --git a/services/data-collector/src/data_collector/binance_ws.py b/services/data-collector/src/data_collector/binance_ws.py
deleted file mode 100644
index e25e7a6..0000000
--- a/services/data-collector/src/data_collector/binance_ws.py
+++ /dev/null
@@ -1,109 +0,0 @@
-"""Binance WebSocket client for real-time kline/candle data.
-
-NOTE: This module is Binance-specific (uses Binance WebSocket URL and message format).
-Multi-exchange WebSocket support would require exchange-specific implementations.
-"""
-
-import asyncio
-import json
-import logging
-from datetime import datetime, timezone
-from decimal import Decimal
-from typing import Callable, Awaitable
-
-import websockets
-
-from shared.models import Candle
-
-logger = logging.getLogger(__name__)
-
-BINANCE_WS_URL = "wss://stream.binance.com:9443/ws"
-RECONNECT_DELAY = 5 # seconds
-
-
-def _normalize_symbol(symbol: str) -> str:
- """Convert 'BTC/USDT' to 'BTCUSDT'."""
- return symbol.replace("/", "")
-
-
-def _stream_name(symbol: str, timeframe: str) -> str:
- """Build Binance stream name, e.g. 'btcusdt@kline_1m'."""
- return f"{_normalize_symbol(symbol).lower()}@kline_{timeframe}"
-
-
-class BinanceWebSocket:
- """Connects to Binance WebSocket streams and emits closed candles."""
-
- def __init__(
- self,
- symbols: list[str],
- timeframe: str,
- on_candle: Callable[[Candle], Awaitable[None]],
- ) -> None:
- self._symbols = symbols
- self._timeframe = timeframe
- self._on_candle = on_candle
- self._running = False
-
- def _build_subscribe_message(self) -> dict:
- streams = [_stream_name(s, self._timeframe) for s in self._symbols]
- return {
- "method": "SUBSCRIBE",
- "params": streams,
- "id": 1,
- }
-
- def _parse_candle(self, message: dict) -> Candle | None:
- """Parse a kline WebSocket message into a Candle, or None if not closed."""
- k = message.get("k")
- if k is None:
- return None
- if not k.get("x"): # only closed candles
- return None
-
- symbol = k["s"] # already normalized, e.g. 'BTCUSDT'
- open_time = datetime.fromtimestamp(k["t"] / 1000, tz=timezone.utc)
- return Candle(
- symbol=symbol,
- timeframe=self._timeframe,
- open_time=open_time,
- open=Decimal(k["o"]),
- high=Decimal(k["h"]),
- low=Decimal(k["l"]),
- close=Decimal(k["c"]),
- volume=Decimal(k["v"]),
- )
-
- async def _run_once(self) -> None:
- """Single connection attempt; processes messages until disconnected."""
- async with websockets.connect(BINANCE_WS_URL) as ws:
- subscribe_msg = self._build_subscribe_message()
- await ws.send(json.dumps(subscribe_msg))
- logger.info("Subscribed to Binance streams: %s", subscribe_msg["params"])
-
- async for raw in ws:
- if not self._running:
- break
- try:
- message = json.loads(raw)
- candle = self._parse_candle(message)
- if candle is not None:
- await self._on_candle(candle)
- except Exception:
- logger.exception("Error processing WebSocket message: %s", raw)
-
- async def start(self) -> None:
- """Connect to Binance WebSocket and process messages, auto-reconnecting."""
- self._running = True
- while self._running:
- try:
- await self._run_once()
- except Exception:
- if not self._running:
- break
- logger.warning("WebSocket disconnected. Reconnecting in %ds…", RECONNECT_DELAY)
- await asyncio.sleep(RECONNECT_DELAY)
-
- def stop(self) -> None:
- """Signal the WebSocket loop to stop after the current message."""
- self._running = False