summaryrefslogtreecommitdiff
path: root/services/data-collector/src/data_collector/binance_ws.py
diff options
context:
space:
mode:
authorTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 15:56:35 +0900
committerTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 15:56:35 +0900
commit33b14aaa2344b0fd95d1629627c3d135b24ae102 (patch)
tree90b214758bc3b076baa7711226a1a1be6268e72e /services/data-collector/src/data_collector/binance_ws.py
parent9360f1a800aa29b40399a2f3bfbfcf215a04e279 (diff)
feat: initial trading platform implementation
Binance spot crypto trading platform with microservices architecture: - shared: Pydantic models, Redis Streams broker, asyncpg DB layer - data-collector: Binance WebSocket/REST market data collection - strategy-engine: Plugin-based strategy execution (RSI, Grid) - order-executor: Order execution with risk management - portfolio-manager: Position tracking and PnL calculation - backtester: Historical strategy testing with simulator - cli: Click-based CLI for all operations - Docker Compose orchestration with Redis and PostgreSQL - 24 test files covering all modules
Diffstat (limited to 'services/data-collector/src/data_collector/binance_ws.py')
-rw-r--r--services/data-collector/src/data_collector/binance_ws.py106
1 files changed, 106 insertions, 0 deletions
diff --git a/services/data-collector/src/data_collector/binance_ws.py b/services/data-collector/src/data_collector/binance_ws.py
new file mode 100644
index 0000000..7a4bad2
--- /dev/null
+++ b/services/data-collector/src/data_collector/binance_ws.py
@@ -0,0 +1,106 @@
+"""Binance WebSocket client for real-time kline/candle data."""
+import asyncio
+import json
+import logging
+from datetime import datetime, timezone
+from decimal import Decimal
+from typing import Callable, Awaitable
+
+import websockets
+
+from shared.models import Candle
+
+logger = logging.getLogger(__name__)
+
+BINANCE_WS_URL = "wss://stream.binance.com:9443/ws"
+RECONNECT_DELAY = 5 # seconds
+
+
+def _normalize_symbol(symbol: str) -> str:
+ """Convert 'BTC/USDT' to 'BTCUSDT'."""
+ return symbol.replace("/", "")
+
+
+def _stream_name(symbol: str, timeframe: str) -> str:
+ """Build Binance stream name, e.g. 'btcusdt@kline_1m'."""
+ return f"{_normalize_symbol(symbol).lower()}@kline_{timeframe}"
+
+
+class BinanceWebSocket:
+ """Connects to Binance WebSocket streams and emits closed candles."""
+
+ def __init__(
+ self,
+ symbols: list[str],
+ timeframe: str,
+ on_candle: Callable[[Candle], Awaitable[None]],
+ ) -> None:
+ self._symbols = symbols
+ self._timeframe = timeframe
+ self._on_candle = on_candle
+ self._running = False
+
+ def _build_subscribe_message(self) -> dict:
+ streams = [_stream_name(s, self._timeframe) for s in self._symbols]
+ return {
+ "method": "SUBSCRIBE",
+ "params": streams,
+ "id": 1,
+ }
+
+ def _parse_candle(self, message: dict) -> Candle | None:
+ """Parse a kline WebSocket message into a Candle, or None if not closed."""
+ k = message.get("k")
+ if k is None:
+ return None
+ if not k.get("x"): # only closed candles
+ return None
+
+ symbol = k["s"] # already normalized, e.g. 'BTCUSDT'
+ open_time = datetime.fromtimestamp(k["t"] / 1000, tz=timezone.utc)
+ return Candle(
+ symbol=symbol,
+ timeframe=self._timeframe,
+ open_time=open_time,
+ open=Decimal(k["o"]),
+ high=Decimal(k["h"]),
+ low=Decimal(k["l"]),
+ close=Decimal(k["c"]),
+ volume=Decimal(k["v"]),
+ )
+
+ async def _run_once(self) -> None:
+ """Single connection attempt; processes messages until disconnected."""
+ async with websockets.connect(BINANCE_WS_URL) as ws:
+ subscribe_msg = self._build_subscribe_message()
+ await ws.send(json.dumps(subscribe_msg))
+ logger.info("Subscribed to Binance streams: %s", subscribe_msg["params"])
+
+ async for raw in ws:
+ if not self._running:
+ break
+ try:
+ message = json.loads(raw)
+ candle = self._parse_candle(message)
+ if candle is not None:
+ await self._on_candle(candle)
+ except Exception:
+ logger.exception("Error processing WebSocket message: %s", raw)
+
+ async def start(self) -> None:
+ """Connect to Binance WebSocket and process messages, auto-reconnecting."""
+ self._running = True
+ while self._running:
+ try:
+ await self._run_once()
+ except Exception:
+ if not self._running:
+ break
+ logger.warning(
+ "WebSocket disconnected. Reconnecting in %ds…", RECONNECT_DELAY
+ )
+ await asyncio.sleep(RECONNECT_DELAY)
+
+ def stop(self) -> None:
+ """Signal the WebSocket loop to stop after the current message."""
+ self._running = False