summaryrefslogtreecommitdiff
path: root/services/data-collector
diff options
context:
space:
mode:
Diffstat (limited to 'services/data-collector')
-rw-r--r--services/data-collector/src/data_collector/binance_rest.py54
-rw-r--r--services/data-collector/src/data_collector/binance_ws.py109
-rw-r--r--services/data-collector/src/data_collector/ws_factory.py34
-rw-r--r--services/data-collector/tests/test_binance_rest.py48
-rw-r--r--services/data-collector/tests/test_ws_factory.py21
5 files changed, 0 insertions, 266 deletions
diff --git a/services/data-collector/src/data_collector/binance_rest.py b/services/data-collector/src/data_collector/binance_rest.py
deleted file mode 100644
index eaf4e30..0000000
--- a/services/data-collector/src/data_collector/binance_rest.py
+++ /dev/null
@@ -1,54 +0,0 @@
-"""Binance REST API helpers for fetching historical candle data."""
-
-from datetime import datetime, timezone
-from decimal import Decimal
-
-from shared.models import Candle
-
-
-def _normalize_symbol(symbol: str) -> str:
- """Convert 'BTC/USDT' to 'BTCUSDT'."""
- return symbol.replace("/", "")
-
-
-async def fetch_historical_candles(
- exchange,
- symbol: str,
- timeframe: str,
- since: int,
- limit: int = 500,
-) -> list[Candle]:
- """Fetch historical OHLCV candles from the exchange and return Candle models.
-
- Args:
- exchange: An async ccxt exchange instance.
- symbol: Market symbol, e.g. 'BTC/USDT'.
- timeframe: Candle timeframe, e.g. '1m'.
- since: Start timestamp in milliseconds.
- limit: Maximum number of candles to fetch.
-
- Returns:
- A list of Candle model instances.
- """
- rows = await exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit)
-
- normalized = _normalize_symbol(symbol)
- candles: list[Candle] = []
-
- for row in rows:
- ts_ms, o, h, low, c, v = row
- open_time = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc)
- candles.append(
- Candle(
- symbol=normalized,
- timeframe=timeframe,
- open_time=open_time,
- open=Decimal(str(o)),
- high=Decimal(str(h)),
- low=Decimal(str(low)),
- close=Decimal(str(c)),
- volume=Decimal(str(v)),
- )
- )
-
- return candles
diff --git a/services/data-collector/src/data_collector/binance_ws.py b/services/data-collector/src/data_collector/binance_ws.py
deleted file mode 100644
index e25e7a6..0000000
--- a/services/data-collector/src/data_collector/binance_ws.py
+++ /dev/null
@@ -1,109 +0,0 @@
-"""Binance WebSocket client for real-time kline/candle data.
-
-NOTE: This module is Binance-specific (uses Binance WebSocket URL and message format).
-Multi-exchange WebSocket support would require exchange-specific implementations.
-"""
-
-import asyncio
-import json
-import logging
-from datetime import datetime, timezone
-from decimal import Decimal
-from typing import Callable, Awaitable
-
-import websockets
-
-from shared.models import Candle
-
-logger = logging.getLogger(__name__)
-
-BINANCE_WS_URL = "wss://stream.binance.com:9443/ws"
-RECONNECT_DELAY = 5 # seconds
-
-
-def _normalize_symbol(symbol: str) -> str:
- """Convert 'BTC/USDT' to 'BTCUSDT'."""
- return symbol.replace("/", "")
-
-
-def _stream_name(symbol: str, timeframe: str) -> str:
- """Build Binance stream name, e.g. 'btcusdt@kline_1m'."""
- return f"{_normalize_symbol(symbol).lower()}@kline_{timeframe}"
-
-
-class BinanceWebSocket:
- """Connects to Binance WebSocket streams and emits closed candles."""
-
- def __init__(
- self,
- symbols: list[str],
- timeframe: str,
- on_candle: Callable[[Candle], Awaitable[None]],
- ) -> None:
- self._symbols = symbols
- self._timeframe = timeframe
- self._on_candle = on_candle
- self._running = False
-
- def _build_subscribe_message(self) -> dict:
- streams = [_stream_name(s, self._timeframe) for s in self._symbols]
- return {
- "method": "SUBSCRIBE",
- "params": streams,
- "id": 1,
- }
-
- def _parse_candle(self, message: dict) -> Candle | None:
- """Parse a kline WebSocket message into a Candle, or None if not closed."""
- k = message.get("k")
- if k is None:
- return None
- if not k.get("x"): # only closed candles
- return None
-
- symbol = k["s"] # already normalized, e.g. 'BTCUSDT'
- open_time = datetime.fromtimestamp(k["t"] / 1000, tz=timezone.utc)
- return Candle(
- symbol=symbol,
- timeframe=self._timeframe,
- open_time=open_time,
- open=Decimal(k["o"]),
- high=Decimal(k["h"]),
- low=Decimal(k["l"]),
- close=Decimal(k["c"]),
- volume=Decimal(k["v"]),
- )
-
- async def _run_once(self) -> None:
- """Single connection attempt; processes messages until disconnected."""
- async with websockets.connect(BINANCE_WS_URL) as ws:
- subscribe_msg = self._build_subscribe_message()
- await ws.send(json.dumps(subscribe_msg))
- logger.info("Subscribed to Binance streams: %s", subscribe_msg["params"])
-
- async for raw in ws:
- if not self._running:
- break
- try:
- message = json.loads(raw)
- candle = self._parse_candle(message)
- if candle is not None:
- await self._on_candle(candle)
- except Exception:
- logger.exception("Error processing WebSocket message: %s", raw)
-
- async def start(self) -> None:
- """Connect to Binance WebSocket and process messages, auto-reconnecting."""
- self._running = True
- while self._running:
- try:
- await self._run_once()
- except Exception:
- if not self._running:
- break
- logger.warning("WebSocket disconnected. Reconnecting in %ds…", RECONNECT_DELAY)
- await asyncio.sleep(RECONNECT_DELAY)
-
- def stop(self) -> None:
- """Signal the WebSocket loop to stop after the current message."""
- self._running = False
diff --git a/services/data-collector/src/data_collector/ws_factory.py b/services/data-collector/src/data_collector/ws_factory.py
deleted file mode 100644
index e068399..0000000
--- a/services/data-collector/src/data_collector/ws_factory.py
+++ /dev/null
@@ -1,34 +0,0 @@
-"""WebSocket factory for exchange-specific connections."""
-
-import logging
-
-from data_collector.binance_ws import BinanceWebSocket
-
-logger = logging.getLogger(__name__)
-
-# Supported exchanges for WebSocket streaming
-SUPPORTED_WS = {"binance": BinanceWebSocket}
-
-
-def create_websocket(exchange_id: str, **kwargs):
- """Create an exchange-specific WebSocket handler.
-
- Args:
- exchange_id: Exchange identifier (e.g. 'binance')
- **kwargs: Passed to the WebSocket constructor (symbols, timeframe, on_candle)
-
- Returns:
- WebSocket handler instance
-
- Raises:
- ValueError: If exchange is not supported for WebSocket streaming
- """
- ws_cls = SUPPORTED_WS.get(exchange_id)
- if ws_cls is None:
- supported = ", ".join(sorted(SUPPORTED_WS.keys()))
- raise ValueError(
- f"WebSocket streaming not supported for '{exchange_id}'. "
- f"Supported: {supported}. "
- f"Use REST polling as fallback for unsupported exchanges."
- )
- return ws_cls(**kwargs)
diff --git a/services/data-collector/tests/test_binance_rest.py b/services/data-collector/tests/test_binance_rest.py
deleted file mode 100644
index bf88210..0000000
--- a/services/data-collector/tests/test_binance_rest.py
+++ /dev/null
@@ -1,48 +0,0 @@
-"""Tests for binance_rest module."""
-
-import pytest
-from decimal import Decimal
-from unittest.mock import AsyncMock, MagicMock
-from datetime import datetime, timezone
-
-from data_collector.binance_rest import fetch_historical_candles
-
-
-@pytest.mark.asyncio
-async def test_fetch_historical_candles_parses_response():
- """Verify that OHLCV rows are correctly parsed into Candle models."""
- ts = 1700000000000 # milliseconds
- mock_exchange = MagicMock()
- mock_exchange.fetch_ohlcv = AsyncMock(
- return_value=[
- [ts, 30000.0, 30100.0, 29900.0, 30050.0, 1.5],
- [ts + 60000, 30050.0, 30200.0, 30000.0, 30150.0, 2.0],
- ]
- )
-
- candles = await fetch_historical_candles(mock_exchange, "BTC/USDT", "1m", since=ts, limit=500)
-
- assert len(candles) == 2
-
- c = candles[0]
- assert c.symbol == "BTCUSDT"
- assert c.timeframe == "1m"
- assert c.open_time == datetime.fromtimestamp(ts / 1000, tz=timezone.utc)
- assert c.open == Decimal("30000.0")
- assert c.high == Decimal("30100.0")
- assert c.low == Decimal("29900.0")
- assert c.close == Decimal("30050.0")
- assert c.volume == Decimal("1.5")
-
- mock_exchange.fetch_ohlcv.assert_called_once_with("BTC/USDT", "1m", since=ts, limit=500)
-
-
-@pytest.mark.asyncio
-async def test_fetch_historical_candles_empty_response():
- """Verify that an empty exchange response returns an empty list."""
- mock_exchange = MagicMock()
- mock_exchange.fetch_ohlcv = AsyncMock(return_value=[])
-
- candles = await fetch_historical_candles(mock_exchange, "BTC/USDT", "1m", since=1700000000000)
-
- assert candles == []
diff --git a/services/data-collector/tests/test_ws_factory.py b/services/data-collector/tests/test_ws_factory.py
deleted file mode 100644
index cdddcca..0000000
--- a/services/data-collector/tests/test_ws_factory.py
+++ /dev/null
@@ -1,21 +0,0 @@
-"""Tests for WebSocket factory."""
-
-import pytest
-from data_collector.ws_factory import create_websocket, SUPPORTED_WS
-from data_collector.binance_ws import BinanceWebSocket
-
-
-def test_create_binance_ws():
- ws = create_websocket("binance", symbols=["BTCUSDT"], timeframe="1m", on_candle=lambda c: None)
- assert isinstance(ws, BinanceWebSocket)
-
-
-def test_create_unsupported_exchange():
- with pytest.raises(ValueError, match="not supported"):
- create_websocket(
- "unsupported_exchange", symbols=["BTCUSDT"], timeframe="1m", on_candle=lambda c: None
- )
-
-
-def test_supported_exchanges():
- assert "binance" in SUPPORTED_WS