summaryrefslogtreecommitdiff
path: root/services/data-collector
diff options
context:
space:
mode:
Diffstat (limited to 'services/data-collector')
-rw-r--r--services/data-collector/Dockerfile7
-rw-r--r--services/data-collector/pyproject.toml23
-rw-r--r--services/data-collector/src/data_collector/__init__.py0
-rw-r--r--services/data-collector/src/data_collector/binance_rest.py53
-rw-r--r--services/data-collector/src/data_collector/binance_ws.py106
-rw-r--r--services/data-collector/src/data_collector/config.py6
-rw-r--r--services/data-collector/src/data_collector/main.py58
-rw-r--r--services/data-collector/src/data_collector/storage.py24
-rw-r--r--services/data-collector/tests/__init__.py0
-rw-r--r--services/data-collector/tests/test_binance_rest.py53
-rw-r--r--services/data-collector/tests/test_storage.py62
11 files changed, 392 insertions, 0 deletions
diff --git a/services/data-collector/Dockerfile b/services/data-collector/Dockerfile
new file mode 100644
index 0000000..06f6d72
--- /dev/null
+++ b/services/data-collector/Dockerfile
@@ -0,0 +1,7 @@
+FROM python:3.12-slim
+WORKDIR /app
+COPY shared/ shared/
+RUN pip install --no-cache-dir ./shared
+COPY services/data-collector/ services/data-collector/
+RUN pip install --no-cache-dir ./services/data-collector
+CMD ["python", "-m", "data_collector.main"]
diff --git a/services/data-collector/pyproject.toml b/services/data-collector/pyproject.toml
new file mode 100644
index 0000000..5fba78f
--- /dev/null
+++ b/services/data-collector/pyproject.toml
@@ -0,0 +1,23 @@
+[project]
+name = "data-collector"
+version = "0.1.0"
+description = "Binance market data collector service"
+requires-python = ">=3.12"
+dependencies = [
+ "ccxt>=4.0",
+ "websockets>=12.0",
+ "trading-shared",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest>=8.0",
+ "pytest-asyncio>=0.23",
+]
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/data_collector"]
diff --git a/services/data-collector/src/data_collector/__init__.py b/services/data-collector/src/data_collector/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/services/data-collector/src/data_collector/__init__.py
diff --git a/services/data-collector/src/data_collector/binance_rest.py b/services/data-collector/src/data_collector/binance_rest.py
new file mode 100644
index 0000000..af0eb77
--- /dev/null
+++ b/services/data-collector/src/data_collector/binance_rest.py
@@ -0,0 +1,53 @@
+"""Binance REST API helpers for fetching historical candle data."""
+from datetime import datetime, timezone
+from decimal import Decimal
+
+from shared.models import Candle
+
+
+def _normalize_symbol(symbol: str) -> str:
+ """Convert 'BTC/USDT' to 'BTCUSDT'."""
+ return symbol.replace("/", "")
+
+
+async def fetch_historical_candles(
+ exchange,
+ symbol: str,
+ timeframe: str,
+ since: int,
+ limit: int = 500,
+) -> list[Candle]:
+ """Fetch historical OHLCV candles from the exchange and return Candle models.
+
+ Args:
+ exchange: An async ccxt exchange instance.
+ symbol: Market symbol, e.g. 'BTC/USDT'.
+ timeframe: Candle timeframe, e.g. '1m'.
+ since: Start timestamp in milliseconds.
+ limit: Maximum number of candles to fetch.
+
+ Returns:
+ A list of Candle model instances.
+ """
+ rows = await exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit)
+
+ normalized = _normalize_symbol(symbol)
+ candles: list[Candle] = []
+
+ for row in rows:
+ ts_ms, o, h, l, c, v = row
+ open_time = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc)
+ candles.append(
+ Candle(
+ symbol=normalized,
+ timeframe=timeframe,
+ open_time=open_time,
+ open=Decimal(str(o)),
+ high=Decimal(str(h)),
+ low=Decimal(str(l)),
+ close=Decimal(str(c)),
+ volume=Decimal(str(v)),
+ )
+ )
+
+ return candles
diff --git a/services/data-collector/src/data_collector/binance_ws.py b/services/data-collector/src/data_collector/binance_ws.py
new file mode 100644
index 0000000..7a4bad2
--- /dev/null
+++ b/services/data-collector/src/data_collector/binance_ws.py
@@ -0,0 +1,106 @@
+"""Binance WebSocket client for real-time kline/candle data."""
+import asyncio
+import json
+import logging
+from datetime import datetime, timezone
+from decimal import Decimal
+from typing import Callable, Awaitable
+
+import websockets
+
+from shared.models import Candle
+
+logger = logging.getLogger(__name__)
+
+BINANCE_WS_URL = "wss://stream.binance.com:9443/ws"
+RECONNECT_DELAY = 5 # seconds
+
+
+def _normalize_symbol(symbol: str) -> str:
+ """Convert 'BTC/USDT' to 'BTCUSDT'."""
+ return symbol.replace("/", "")
+
+
+def _stream_name(symbol: str, timeframe: str) -> str:
+ """Build Binance stream name, e.g. 'btcusdt@kline_1m'."""
+ return f"{_normalize_symbol(symbol).lower()}@kline_{timeframe}"
+
+
+class BinanceWebSocket:
+ """Connects to Binance WebSocket streams and emits closed candles."""
+
+ def __init__(
+ self,
+ symbols: list[str],
+ timeframe: str,
+ on_candle: Callable[[Candle], Awaitable[None]],
+ ) -> None:
+ self._symbols = symbols
+ self._timeframe = timeframe
+ self._on_candle = on_candle
+ self._running = False
+
+ def _build_subscribe_message(self) -> dict:
+ streams = [_stream_name(s, self._timeframe) for s in self._symbols]
+ return {
+ "method": "SUBSCRIBE",
+ "params": streams,
+ "id": 1,
+ }
+
+ def _parse_candle(self, message: dict) -> Candle | None:
+ """Parse a kline WebSocket message into a Candle, or None if not closed."""
+ k = message.get("k")
+ if k is None:
+ return None
+ if not k.get("x"): # only closed candles
+ return None
+
+ symbol = k["s"] # already normalized, e.g. 'BTCUSDT'
+ open_time = datetime.fromtimestamp(k["t"] / 1000, tz=timezone.utc)
+ return Candle(
+ symbol=symbol,
+ timeframe=self._timeframe,
+ open_time=open_time,
+ open=Decimal(k["o"]),
+ high=Decimal(k["h"]),
+ low=Decimal(k["l"]),
+ close=Decimal(k["c"]),
+ volume=Decimal(k["v"]),
+ )
+
+ async def _run_once(self) -> None:
+ """Single connection attempt; processes messages until disconnected."""
+ async with websockets.connect(BINANCE_WS_URL) as ws:
+ subscribe_msg = self._build_subscribe_message()
+ await ws.send(json.dumps(subscribe_msg))
+ logger.info("Subscribed to Binance streams: %s", subscribe_msg["params"])
+
+ async for raw in ws:
+ if not self._running:
+ break
+ try:
+ message = json.loads(raw)
+ candle = self._parse_candle(message)
+ if candle is not None:
+ await self._on_candle(candle)
+ except Exception:
+ logger.exception("Error processing WebSocket message: %s", raw)
+
+ async def start(self) -> None:
+ """Connect to Binance WebSocket and process messages, auto-reconnecting."""
+ self._running = True
+ while self._running:
+ try:
+ await self._run_once()
+ except Exception:
+ if not self._running:
+ break
+ logger.warning(
+ "WebSocket disconnected. Reconnecting in %ds…", RECONNECT_DELAY
+ )
+ await asyncio.sleep(RECONNECT_DELAY)
+
+ def stop(self) -> None:
+ """Signal the WebSocket loop to stop after the current message."""
+ self._running = False
diff --git a/services/data-collector/src/data_collector/config.py b/services/data-collector/src/data_collector/config.py
new file mode 100644
index 0000000..1e080e5
--- /dev/null
+++ b/services/data-collector/src/data_collector/config.py
@@ -0,0 +1,6 @@
+from shared.config import Settings
+
+
+class CollectorConfig(Settings):
+ symbols: list[str] = ["BTC/USDT"]
+ timeframes: list[str] = ["1m"]
diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py
new file mode 100644
index 0000000..adf1e96
--- /dev/null
+++ b/services/data-collector/src/data_collector/main.py
@@ -0,0 +1,58 @@
+"""Data Collector Service entry point."""
+import asyncio
+import logging
+
+from shared.broker import RedisBroker
+from shared.db import Database
+
+from data_collector.binance_ws import BinanceWebSocket
+from data_collector.config import CollectorConfig
+from data_collector.storage import CandleStorage
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+
+async def run() -> None:
+ """Initialise all components and start the WebSocket collector."""
+ config = CollectorConfig()
+
+ db = Database(config.database_url)
+ await db.connect()
+ await db.init_tables()
+
+ broker = RedisBroker(config.redis_url)
+ storage = CandleStorage(db=db, broker=broker)
+
+ async def on_candle(candle):
+ logger.info("Candle received: %s %s %s", candle.symbol, candle.timeframe, candle.open_time)
+ await storage.store(candle)
+
+ # Use the first configured timeframe for the WebSocket subscription.
+ timeframe = config.timeframes[0] if config.timeframes else "1m"
+
+ ws = BinanceWebSocket(
+ symbols=config.symbols,
+ timeframe=timeframe,
+ on_candle=on_candle,
+ )
+
+ logger.info(
+ "Starting data collector for symbols=%s timeframe=%s",
+ config.symbols,
+ timeframe,
+ )
+
+ try:
+ await ws.start()
+ finally:
+ await broker.close()
+ await db.close()
+
+
+def main() -> None:
+ asyncio.run(run())
+
+
+if __name__ == "__main__":
+ main()
diff --git a/services/data-collector/src/data_collector/storage.py b/services/data-collector/src/data_collector/storage.py
new file mode 100644
index 0000000..1e40b82
--- /dev/null
+++ b/services/data-collector/src/data_collector/storage.py
@@ -0,0 +1,24 @@
+"""Candle storage: persists to DB and publishes to Redis."""
+from shared.events import CandleEvent
+from shared.models import Candle
+
+
+class CandleStorage:
+ """Stores candles in the database and publishes CandleEvents to Redis."""
+
+ def __init__(self, db, broker) -> None:
+ self._db = db
+ self._broker = broker
+
+ async def store(self, candle: Candle) -> None:
+ """Insert candle into DB and publish a CandleEvent to the Redis stream."""
+ await self._db.insert_candle(candle)
+
+ event = CandleEvent(data=candle)
+ stream = f"candles.{candle.symbol}"
+ await self._broker.publish(stream, event.to_dict())
+
+ async def store_batch(self, candles: list[Candle]) -> None:
+ """Store multiple candles one by one."""
+ for candle in candles:
+ await self.store(candle)
diff --git a/services/data-collector/tests/__init__.py b/services/data-collector/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/services/data-collector/tests/__init__.py
diff --git a/services/data-collector/tests/test_binance_rest.py b/services/data-collector/tests/test_binance_rest.py
new file mode 100644
index 0000000..695dcf9
--- /dev/null
+++ b/services/data-collector/tests/test_binance_rest.py
@@ -0,0 +1,53 @@
+"""Tests for binance_rest module."""
+import pytest
+from decimal import Decimal
+from unittest.mock import AsyncMock, MagicMock
+from datetime import datetime, timezone
+
+from data_collector.binance_rest import fetch_historical_candles
+
+
+@pytest.mark.asyncio
+async def test_fetch_historical_candles_parses_response():
+ """Verify that OHLCV rows are correctly parsed into Candle models."""
+ ts = 1700000000000 # milliseconds
+ mock_exchange = MagicMock()
+ mock_exchange.fetch_ohlcv = AsyncMock(
+ return_value=[
+ [ts, 30000.0, 30100.0, 29900.0, 30050.0, 1.5],
+ [ts + 60000, 30050.0, 30200.0, 30000.0, 30150.0, 2.0],
+ ]
+ )
+
+ candles = await fetch_historical_candles(
+ mock_exchange, "BTC/USDT", "1m", since=ts, limit=500
+ )
+
+ assert len(candles) == 2
+
+ c = candles[0]
+ assert c.symbol == "BTCUSDT"
+ assert c.timeframe == "1m"
+ assert c.open_time == datetime.fromtimestamp(ts / 1000, tz=timezone.utc)
+ assert c.open == Decimal("30000.0")
+ assert c.high == Decimal("30100.0")
+ assert c.low == Decimal("29900.0")
+ assert c.close == Decimal("30050.0")
+ assert c.volume == Decimal("1.5")
+
+ mock_exchange.fetch_ohlcv.assert_called_once_with(
+ "BTC/USDT", "1m", since=ts, limit=500
+ )
+
+
+@pytest.mark.asyncio
+async def test_fetch_historical_candles_empty_response():
+ """Verify that an empty exchange response returns an empty list."""
+ mock_exchange = MagicMock()
+ mock_exchange.fetch_ohlcv = AsyncMock(return_value=[])
+
+ candles = await fetch_historical_candles(
+ mock_exchange, "BTC/USDT", "1m", since=1700000000000
+ )
+
+ assert candles == []
diff --git a/services/data-collector/tests/test_storage.py b/services/data-collector/tests/test_storage.py
new file mode 100644
index 0000000..6b27414
--- /dev/null
+++ b/services/data-collector/tests/test_storage.py
@@ -0,0 +1,62 @@
+"""Tests for storage module."""
+import pytest
+from decimal import Decimal
+from datetime import datetime, timezone
+from unittest.mock import AsyncMock, MagicMock
+
+from shared.models import Candle
+from data_collector.storage import CandleStorage
+
+
+def _make_candle(symbol: str = "BTCUSDT") -> Candle:
+ return Candle(
+ symbol=symbol,
+ timeframe="1m",
+ open_time=datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
+ open=Decimal("30000"),
+ high=Decimal("30100"),
+ low=Decimal("29900"),
+ close=Decimal("30050"),
+ volume=Decimal("1.5"),
+ )
+
+
+@pytest.mark.asyncio
+async def test_storage_saves_to_db_and_publishes():
+ """Verify that store() calls insert_candle on db and publish on broker."""
+ mock_db = MagicMock()
+ mock_db.insert_candle = AsyncMock()
+ mock_broker = MagicMock()
+ mock_broker.publish = AsyncMock()
+
+ storage = CandleStorage(db=mock_db, broker=mock_broker)
+ candle = _make_candle()
+
+ await storage.store(candle)
+
+ mock_db.insert_candle.assert_called_once_with(candle)
+ mock_broker.publish.assert_called_once()
+
+ stream_arg = mock_broker.publish.call_args[0][0]
+ assert stream_arg == "candles.BTCUSDT"
+
+ data_arg = mock_broker.publish.call_args[0][1]
+ assert data_arg["type"] == "CANDLE"
+ assert data_arg["data"]["symbol"] == "BTCUSDT"
+
+
+@pytest.mark.asyncio
+async def test_storage_batch_store():
+ """Verify that store_batch() calls store for each candle."""
+ mock_db = MagicMock()
+ mock_db.insert_candle = AsyncMock()
+ mock_broker = MagicMock()
+ mock_broker.publish = AsyncMock()
+
+ storage = CandleStorage(db=mock_db, broker=mock_broker)
+ candles = [_make_candle() for _ in range(3)]
+
+ await storage.store_batch(candles)
+
+ assert mock_db.insert_candle.call_count == 3
+ assert mock_broker.publish.call_count == 3