diff options
Diffstat (limited to 'services/data-collector')
| -rw-r--r-- | services/data-collector/Dockerfile | 7 | ||||
| -rw-r--r-- | services/data-collector/pyproject.toml | 23 | ||||
| -rw-r--r-- | services/data-collector/src/data_collector/__init__.py | 0 | ||||
| -rw-r--r-- | services/data-collector/src/data_collector/binance_rest.py | 53 | ||||
| -rw-r--r-- | services/data-collector/src/data_collector/binance_ws.py | 106 | ||||
| -rw-r--r-- | services/data-collector/src/data_collector/config.py | 6 | ||||
| -rw-r--r-- | services/data-collector/src/data_collector/main.py | 58 | ||||
| -rw-r--r-- | services/data-collector/src/data_collector/storage.py | 24 | ||||
| -rw-r--r-- | services/data-collector/tests/__init__.py | 0 | ||||
| -rw-r--r-- | services/data-collector/tests/test_binance_rest.py | 53 | ||||
| -rw-r--r-- | services/data-collector/tests/test_storage.py | 62 |
11 files changed, 392 insertions, 0 deletions
diff --git a/services/data-collector/Dockerfile b/services/data-collector/Dockerfile new file mode 100644 index 0000000..06f6d72 --- /dev/null +++ b/services/data-collector/Dockerfile @@ -0,0 +1,7 @@ +FROM python:3.12-slim +WORKDIR /app +COPY shared/ shared/ +RUN pip install --no-cache-dir ./shared +COPY services/data-collector/ services/data-collector/ +RUN pip install --no-cache-dir ./services/data-collector +CMD ["python", "-m", "data_collector.main"] diff --git a/services/data-collector/pyproject.toml b/services/data-collector/pyproject.toml new file mode 100644 index 0000000..5fba78f --- /dev/null +++ b/services/data-collector/pyproject.toml @@ -0,0 +1,23 @@ +[project] +name = "data-collector" +version = "0.1.0" +description = "Binance market data collector service" +requires-python = ">=3.12" +dependencies = [ + "ccxt>=4.0", + "websockets>=12.0", + "trading-shared", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0", + "pytest-asyncio>=0.23", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/data_collector"] diff --git a/services/data-collector/src/data_collector/__init__.py b/services/data-collector/src/data_collector/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/services/data-collector/src/data_collector/__init__.py diff --git a/services/data-collector/src/data_collector/binance_rest.py b/services/data-collector/src/data_collector/binance_rest.py new file mode 100644 index 0000000..af0eb77 --- /dev/null +++ b/services/data-collector/src/data_collector/binance_rest.py @@ -0,0 +1,53 @@ +"""Binance REST API helpers for fetching historical candle data.""" +from datetime import datetime, timezone +from decimal import Decimal + +from shared.models import Candle + + +def _normalize_symbol(symbol: str) -> str: + """Convert 'BTC/USDT' to 'BTCUSDT'.""" + return symbol.replace("/", "") + + +async def fetch_historical_candles( + exchange, + symbol: str, + timeframe: str, + since: int, + limit: int = 500, +) -> list[Candle]: + """Fetch historical OHLCV candles from the exchange and return Candle models. + + Args: + exchange: An async ccxt exchange instance. + symbol: Market symbol, e.g. 'BTC/USDT'. + timeframe: Candle timeframe, e.g. '1m'. + since: Start timestamp in milliseconds. + limit: Maximum number of candles to fetch. + + Returns: + A list of Candle model instances. + """ + rows = await exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit) + + normalized = _normalize_symbol(symbol) + candles: list[Candle] = [] + + for row in rows: + ts_ms, o, h, l, c, v = row + open_time = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc) + candles.append( + Candle( + symbol=normalized, + timeframe=timeframe, + open_time=open_time, + open=Decimal(str(o)), + high=Decimal(str(h)), + low=Decimal(str(l)), + close=Decimal(str(c)), + volume=Decimal(str(v)), + ) + ) + + return candles diff --git a/services/data-collector/src/data_collector/binance_ws.py b/services/data-collector/src/data_collector/binance_ws.py new file mode 100644 index 0000000..7a4bad2 --- /dev/null +++ b/services/data-collector/src/data_collector/binance_ws.py @@ -0,0 +1,106 @@ +"""Binance WebSocket client for real-time kline/candle data.""" +import asyncio +import json +import logging +from datetime import datetime, timezone +from decimal import Decimal +from typing import Callable, Awaitable + +import websockets + +from shared.models import Candle + +logger = logging.getLogger(__name__) + +BINANCE_WS_URL = "wss://stream.binance.com:9443/ws" +RECONNECT_DELAY = 5 # seconds + + +def _normalize_symbol(symbol: str) -> str: + """Convert 'BTC/USDT' to 'BTCUSDT'.""" + return symbol.replace("/", "") + + +def _stream_name(symbol: str, timeframe: str) -> str: + """Build Binance stream name, e.g. 'btcusdt@kline_1m'.""" + return f"{_normalize_symbol(symbol).lower()}@kline_{timeframe}" + + +class BinanceWebSocket: + """Connects to Binance WebSocket streams and emits closed candles.""" + + def __init__( + self, + symbols: list[str], + timeframe: str, + on_candle: Callable[[Candle], Awaitable[None]], + ) -> None: + self._symbols = symbols + self._timeframe = timeframe + self._on_candle = on_candle + self._running = False + + def _build_subscribe_message(self) -> dict: + streams = [_stream_name(s, self._timeframe) for s in self._symbols] + return { + "method": "SUBSCRIBE", + "params": streams, + "id": 1, + } + + def _parse_candle(self, message: dict) -> Candle | None: + """Parse a kline WebSocket message into a Candle, or None if not closed.""" + k = message.get("k") + if k is None: + return None + if not k.get("x"): # only closed candles + return None + + symbol = k["s"] # already normalized, e.g. 'BTCUSDT' + open_time = datetime.fromtimestamp(k["t"] / 1000, tz=timezone.utc) + return Candle( + symbol=symbol, + timeframe=self._timeframe, + open_time=open_time, + open=Decimal(k["o"]), + high=Decimal(k["h"]), + low=Decimal(k["l"]), + close=Decimal(k["c"]), + volume=Decimal(k["v"]), + ) + + async def _run_once(self) -> None: + """Single connection attempt; processes messages until disconnected.""" + async with websockets.connect(BINANCE_WS_URL) as ws: + subscribe_msg = self._build_subscribe_message() + await ws.send(json.dumps(subscribe_msg)) + logger.info("Subscribed to Binance streams: %s", subscribe_msg["params"]) + + async for raw in ws: + if not self._running: + break + try: + message = json.loads(raw) + candle = self._parse_candle(message) + if candle is not None: + await self._on_candle(candle) + except Exception: + logger.exception("Error processing WebSocket message: %s", raw) + + async def start(self) -> None: + """Connect to Binance WebSocket and process messages, auto-reconnecting.""" + self._running = True + while self._running: + try: + await self._run_once() + except Exception: + if not self._running: + break + logger.warning( + "WebSocket disconnected. Reconnecting in %ds…", RECONNECT_DELAY + ) + await asyncio.sleep(RECONNECT_DELAY) + + def stop(self) -> None: + """Signal the WebSocket loop to stop after the current message.""" + self._running = False diff --git a/services/data-collector/src/data_collector/config.py b/services/data-collector/src/data_collector/config.py new file mode 100644 index 0000000..1e080e5 --- /dev/null +++ b/services/data-collector/src/data_collector/config.py @@ -0,0 +1,6 @@ +from shared.config import Settings + + +class CollectorConfig(Settings): + symbols: list[str] = ["BTC/USDT"] + timeframes: list[str] = ["1m"] diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py new file mode 100644 index 0000000..adf1e96 --- /dev/null +++ b/services/data-collector/src/data_collector/main.py @@ -0,0 +1,58 @@ +"""Data Collector Service entry point.""" +import asyncio +import logging + +from shared.broker import RedisBroker +from shared.db import Database + +from data_collector.binance_ws import BinanceWebSocket +from data_collector.config import CollectorConfig +from data_collector.storage import CandleStorage + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +async def run() -> None: + """Initialise all components and start the WebSocket collector.""" + config = CollectorConfig() + + db = Database(config.database_url) + await db.connect() + await db.init_tables() + + broker = RedisBroker(config.redis_url) + storage = CandleStorage(db=db, broker=broker) + + async def on_candle(candle): + logger.info("Candle received: %s %s %s", candle.symbol, candle.timeframe, candle.open_time) + await storage.store(candle) + + # Use the first configured timeframe for the WebSocket subscription. + timeframe = config.timeframes[0] if config.timeframes else "1m" + + ws = BinanceWebSocket( + symbols=config.symbols, + timeframe=timeframe, + on_candle=on_candle, + ) + + logger.info( + "Starting data collector for symbols=%s timeframe=%s", + config.symbols, + timeframe, + ) + + try: + await ws.start() + finally: + await broker.close() + await db.close() + + +def main() -> None: + asyncio.run(run()) + + +if __name__ == "__main__": + main() diff --git a/services/data-collector/src/data_collector/storage.py b/services/data-collector/src/data_collector/storage.py new file mode 100644 index 0000000..1e40b82 --- /dev/null +++ b/services/data-collector/src/data_collector/storage.py @@ -0,0 +1,24 @@ +"""Candle storage: persists to DB and publishes to Redis.""" +from shared.events import CandleEvent +from shared.models import Candle + + +class CandleStorage: + """Stores candles in the database and publishes CandleEvents to Redis.""" + + def __init__(self, db, broker) -> None: + self._db = db + self._broker = broker + + async def store(self, candle: Candle) -> None: + """Insert candle into DB and publish a CandleEvent to the Redis stream.""" + await self._db.insert_candle(candle) + + event = CandleEvent(data=candle) + stream = f"candles.{candle.symbol}" + await self._broker.publish(stream, event.to_dict()) + + async def store_batch(self, candles: list[Candle]) -> None: + """Store multiple candles one by one.""" + for candle in candles: + await self.store(candle) diff --git a/services/data-collector/tests/__init__.py b/services/data-collector/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/services/data-collector/tests/__init__.py diff --git a/services/data-collector/tests/test_binance_rest.py b/services/data-collector/tests/test_binance_rest.py new file mode 100644 index 0000000..695dcf9 --- /dev/null +++ b/services/data-collector/tests/test_binance_rest.py @@ -0,0 +1,53 @@ +"""Tests for binance_rest module.""" +import pytest +from decimal import Decimal +from unittest.mock import AsyncMock, MagicMock +from datetime import datetime, timezone + +from data_collector.binance_rest import fetch_historical_candles + + +@pytest.mark.asyncio +async def test_fetch_historical_candles_parses_response(): + """Verify that OHLCV rows are correctly parsed into Candle models.""" + ts = 1700000000000 # milliseconds + mock_exchange = MagicMock() + mock_exchange.fetch_ohlcv = AsyncMock( + return_value=[ + [ts, 30000.0, 30100.0, 29900.0, 30050.0, 1.5], + [ts + 60000, 30050.0, 30200.0, 30000.0, 30150.0, 2.0], + ] + ) + + candles = await fetch_historical_candles( + mock_exchange, "BTC/USDT", "1m", since=ts, limit=500 + ) + + assert len(candles) == 2 + + c = candles[0] + assert c.symbol == "BTCUSDT" + assert c.timeframe == "1m" + assert c.open_time == datetime.fromtimestamp(ts / 1000, tz=timezone.utc) + assert c.open == Decimal("30000.0") + assert c.high == Decimal("30100.0") + assert c.low == Decimal("29900.0") + assert c.close == Decimal("30050.0") + assert c.volume == Decimal("1.5") + + mock_exchange.fetch_ohlcv.assert_called_once_with( + "BTC/USDT", "1m", since=ts, limit=500 + ) + + +@pytest.mark.asyncio +async def test_fetch_historical_candles_empty_response(): + """Verify that an empty exchange response returns an empty list.""" + mock_exchange = MagicMock() + mock_exchange.fetch_ohlcv = AsyncMock(return_value=[]) + + candles = await fetch_historical_candles( + mock_exchange, "BTC/USDT", "1m", since=1700000000000 + ) + + assert candles == [] diff --git a/services/data-collector/tests/test_storage.py b/services/data-collector/tests/test_storage.py new file mode 100644 index 0000000..6b27414 --- /dev/null +++ b/services/data-collector/tests/test_storage.py @@ -0,0 +1,62 @@ +"""Tests for storage module.""" +import pytest +from decimal import Decimal +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock + +from shared.models import Candle +from data_collector.storage import CandleStorage + + +def _make_candle(symbol: str = "BTCUSDT") -> Candle: + return Candle( + symbol=symbol, + timeframe="1m", + open_time=datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc), + open=Decimal("30000"), + high=Decimal("30100"), + low=Decimal("29900"), + close=Decimal("30050"), + volume=Decimal("1.5"), + ) + + +@pytest.mark.asyncio +async def test_storage_saves_to_db_and_publishes(): + """Verify that store() calls insert_candle on db and publish on broker.""" + mock_db = MagicMock() + mock_db.insert_candle = AsyncMock() + mock_broker = MagicMock() + mock_broker.publish = AsyncMock() + + storage = CandleStorage(db=mock_db, broker=mock_broker) + candle = _make_candle() + + await storage.store(candle) + + mock_db.insert_candle.assert_called_once_with(candle) + mock_broker.publish.assert_called_once() + + stream_arg = mock_broker.publish.call_args[0][0] + assert stream_arg == "candles.BTCUSDT" + + data_arg = mock_broker.publish.call_args[0][1] + assert data_arg["type"] == "CANDLE" + assert data_arg["data"]["symbol"] == "BTCUSDT" + + +@pytest.mark.asyncio +async def test_storage_batch_store(): + """Verify that store_batch() calls store for each candle.""" + mock_db = MagicMock() + mock_db.insert_candle = AsyncMock() + mock_broker = MagicMock() + mock_broker.publish = AsyncMock() + + storage = CandleStorage(db=mock_db, broker=mock_broker) + candles = [_make_candle() for _ in range(3)] + + await storage.store_batch(candles) + + assert mock_db.insert_candle.call_count == 3 + assert mock_broker.publish.call_count == 3 |
