diff options
| author | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-01 15:56:35 +0900 |
|---|---|---|
| committer | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-01 15:56:35 +0900 |
| commit | 33b14aaa2344b0fd95d1629627c3d135b24ae102 (patch) | |
| tree | 90b214758bc3b076baa7711226a1a1be6268e72e /services/data-collector/src/data_collector/binance_ws.py | |
| parent | 9360f1a800aa29b40399a2f3bfbfcf215a04e279 (diff) | |
feat: initial trading platform implementation
Binance spot crypto trading platform with microservices architecture:
- shared: Pydantic models, Redis Streams broker, asyncpg DB layer
- data-collector: Binance WebSocket/REST market data collection
- strategy-engine: Plugin-based strategy execution (RSI, Grid)
- order-executor: Order execution with risk management
- portfolio-manager: Position tracking and PnL calculation
- backtester: Historical strategy testing with simulator
- cli: Click-based CLI for all operations
- Docker Compose orchestration with Redis and PostgreSQL
- 24 test files covering all modules
Diffstat (limited to 'services/data-collector/src/data_collector/binance_ws.py')
| -rw-r--r-- | services/data-collector/src/data_collector/binance_ws.py | 106 |
1 files changed, 106 insertions, 0 deletions
diff --git a/services/data-collector/src/data_collector/binance_ws.py b/services/data-collector/src/data_collector/binance_ws.py new file mode 100644 index 0000000..7a4bad2 --- /dev/null +++ b/services/data-collector/src/data_collector/binance_ws.py @@ -0,0 +1,106 @@ +"""Binance WebSocket client for real-time kline/candle data.""" +import asyncio +import json +import logging +from datetime import datetime, timezone +from decimal import Decimal +from typing import Callable, Awaitable + +import websockets + +from shared.models import Candle + +logger = logging.getLogger(__name__) + +BINANCE_WS_URL = "wss://stream.binance.com:9443/ws" +RECONNECT_DELAY = 5 # seconds + + +def _normalize_symbol(symbol: str) -> str: + """Convert 'BTC/USDT' to 'BTCUSDT'.""" + return symbol.replace("/", "") + + +def _stream_name(symbol: str, timeframe: str) -> str: + """Build Binance stream name, e.g. 'btcusdt@kline_1m'.""" + return f"{_normalize_symbol(symbol).lower()}@kline_{timeframe}" + + +class BinanceWebSocket: + """Connects to Binance WebSocket streams and emits closed candles.""" + + def __init__( + self, + symbols: list[str], + timeframe: str, + on_candle: Callable[[Candle], Awaitable[None]], + ) -> None: + self._symbols = symbols + self._timeframe = timeframe + self._on_candle = on_candle + self._running = False + + def _build_subscribe_message(self) -> dict: + streams = [_stream_name(s, self._timeframe) for s in self._symbols] + return { + "method": "SUBSCRIBE", + "params": streams, + "id": 1, + } + + def _parse_candle(self, message: dict) -> Candle | None: + """Parse a kline WebSocket message into a Candle, or None if not closed.""" + k = message.get("k") + if k is None: + return None + if not k.get("x"): # only closed candles + return None + + symbol = k["s"] # already normalized, e.g. 'BTCUSDT' + open_time = datetime.fromtimestamp(k["t"] / 1000, tz=timezone.utc) + return Candle( + symbol=symbol, + timeframe=self._timeframe, + open_time=open_time, + open=Decimal(k["o"]), + high=Decimal(k["h"]), + low=Decimal(k["l"]), + close=Decimal(k["c"]), + volume=Decimal(k["v"]), + ) + + async def _run_once(self) -> None: + """Single connection attempt; processes messages until disconnected.""" + async with websockets.connect(BINANCE_WS_URL) as ws: + subscribe_msg = self._build_subscribe_message() + await ws.send(json.dumps(subscribe_msg)) + logger.info("Subscribed to Binance streams: %s", subscribe_msg["params"]) + + async for raw in ws: + if not self._running: + break + try: + message = json.loads(raw) + candle = self._parse_candle(message) + if candle is not None: + await self._on_candle(candle) + except Exception: + logger.exception("Error processing WebSocket message: %s", raw) + + async def start(self) -> None: + """Connect to Binance WebSocket and process messages, auto-reconnecting.""" + self._running = True + while self._running: + try: + await self._run_once() + except Exception: + if not self._running: + break + logger.warning( + "WebSocket disconnected. Reconnecting in %ds…", RECONNECT_DELAY + ) + await asyncio.sleep(RECONNECT_DELAY) + + def stop(self) -> None: + """Signal the WebSocket loop to stop after the current message.""" + self._running = False |
