diff options
Diffstat (limited to 'services/data-collector/src')
6 files changed, 247 insertions, 0 deletions
diff --git a/services/data-collector/src/data_collector/__init__.py b/services/data-collector/src/data_collector/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/services/data-collector/src/data_collector/__init__.py diff --git a/services/data-collector/src/data_collector/binance_rest.py b/services/data-collector/src/data_collector/binance_rest.py new file mode 100644 index 0000000..af0eb77 --- /dev/null +++ b/services/data-collector/src/data_collector/binance_rest.py @@ -0,0 +1,53 @@ +"""Binance REST API helpers for fetching historical candle data.""" +from datetime import datetime, timezone +from decimal import Decimal + +from shared.models import Candle + + +def _normalize_symbol(symbol: str) -> str: + """Convert 'BTC/USDT' to 'BTCUSDT'.""" + return symbol.replace("/", "") + + +async def fetch_historical_candles( + exchange, + symbol: str, + timeframe: str, + since: int, + limit: int = 500, +) -> list[Candle]: + """Fetch historical OHLCV candles from the exchange and return Candle models. + + Args: + exchange: An async ccxt exchange instance. + symbol: Market symbol, e.g. 'BTC/USDT'. + timeframe: Candle timeframe, e.g. '1m'. + since: Start timestamp in milliseconds. + limit: Maximum number of candles to fetch. + + Returns: + A list of Candle model instances. + """ + rows = await exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit) + + normalized = _normalize_symbol(symbol) + candles: list[Candle] = [] + + for row in rows: + ts_ms, o, h, l, c, v = row + open_time = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc) + candles.append( + Candle( + symbol=normalized, + timeframe=timeframe, + open_time=open_time, + open=Decimal(str(o)), + high=Decimal(str(h)), + low=Decimal(str(l)), + close=Decimal(str(c)), + volume=Decimal(str(v)), + ) + ) + + return candles diff --git a/services/data-collector/src/data_collector/binance_ws.py b/services/data-collector/src/data_collector/binance_ws.py new file mode 100644 index 0000000..7a4bad2 --- /dev/null +++ b/services/data-collector/src/data_collector/binance_ws.py @@ -0,0 +1,106 @@ +"""Binance WebSocket client for real-time kline/candle data.""" +import asyncio +import json +import logging +from datetime import datetime, timezone +from decimal import Decimal +from typing import Callable, Awaitable + +import websockets + +from shared.models import Candle + +logger = logging.getLogger(__name__) + +BINANCE_WS_URL = "wss://stream.binance.com:9443/ws" +RECONNECT_DELAY = 5 # seconds + + +def _normalize_symbol(symbol: str) -> str: + """Convert 'BTC/USDT' to 'BTCUSDT'.""" + return symbol.replace("/", "") + + +def _stream_name(symbol: str, timeframe: str) -> str: + """Build Binance stream name, e.g. 'btcusdt@kline_1m'.""" + return f"{_normalize_symbol(symbol).lower()}@kline_{timeframe}" + + +class BinanceWebSocket: + """Connects to Binance WebSocket streams and emits closed candles.""" + + def __init__( + self, + symbols: list[str], + timeframe: str, + on_candle: Callable[[Candle], Awaitable[None]], + ) -> None: + self._symbols = symbols + self._timeframe = timeframe + self._on_candle = on_candle + self._running = False + + def _build_subscribe_message(self) -> dict: + streams = [_stream_name(s, self._timeframe) for s in self._symbols] + return { + "method": "SUBSCRIBE", + "params": streams, + "id": 1, + } + + def _parse_candle(self, message: dict) -> Candle | None: + """Parse a kline WebSocket message into a Candle, or None if not closed.""" + k = message.get("k") + if k is None: + return None + if not k.get("x"): # only closed candles + return None + + symbol = k["s"] # already normalized, e.g. 'BTCUSDT' + open_time = datetime.fromtimestamp(k["t"] / 1000, tz=timezone.utc) + return Candle( + symbol=symbol, + timeframe=self._timeframe, + open_time=open_time, + open=Decimal(k["o"]), + high=Decimal(k["h"]), + low=Decimal(k["l"]), + close=Decimal(k["c"]), + volume=Decimal(k["v"]), + ) + + async def _run_once(self) -> None: + """Single connection attempt; processes messages until disconnected.""" + async with websockets.connect(BINANCE_WS_URL) as ws: + subscribe_msg = self._build_subscribe_message() + await ws.send(json.dumps(subscribe_msg)) + logger.info("Subscribed to Binance streams: %s", subscribe_msg["params"]) + + async for raw in ws: + if not self._running: + break + try: + message = json.loads(raw) + candle = self._parse_candle(message) + if candle is not None: + await self._on_candle(candle) + except Exception: + logger.exception("Error processing WebSocket message: %s", raw) + + async def start(self) -> None: + """Connect to Binance WebSocket and process messages, auto-reconnecting.""" + self._running = True + while self._running: + try: + await self._run_once() + except Exception: + if not self._running: + break + logger.warning( + "WebSocket disconnected. Reconnecting in %ds…", RECONNECT_DELAY + ) + await asyncio.sleep(RECONNECT_DELAY) + + def stop(self) -> None: + """Signal the WebSocket loop to stop after the current message.""" + self._running = False diff --git a/services/data-collector/src/data_collector/config.py b/services/data-collector/src/data_collector/config.py new file mode 100644 index 0000000..1e080e5 --- /dev/null +++ b/services/data-collector/src/data_collector/config.py @@ -0,0 +1,6 @@ +from shared.config import Settings + + +class CollectorConfig(Settings): + symbols: list[str] = ["BTC/USDT"] + timeframes: list[str] = ["1m"] diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py new file mode 100644 index 0000000..adf1e96 --- /dev/null +++ b/services/data-collector/src/data_collector/main.py @@ -0,0 +1,58 @@ +"""Data Collector Service entry point.""" +import asyncio +import logging + +from shared.broker import RedisBroker +from shared.db import Database + +from data_collector.binance_ws import BinanceWebSocket +from data_collector.config import CollectorConfig +from data_collector.storage import CandleStorage + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +async def run() -> None: + """Initialise all components and start the WebSocket collector.""" + config = CollectorConfig() + + db = Database(config.database_url) + await db.connect() + await db.init_tables() + + broker = RedisBroker(config.redis_url) + storage = CandleStorage(db=db, broker=broker) + + async def on_candle(candle): + logger.info("Candle received: %s %s %s", candle.symbol, candle.timeframe, candle.open_time) + await storage.store(candle) + + # Use the first configured timeframe for the WebSocket subscription. + timeframe = config.timeframes[0] if config.timeframes else "1m" + + ws = BinanceWebSocket( + symbols=config.symbols, + timeframe=timeframe, + on_candle=on_candle, + ) + + logger.info( + "Starting data collector for symbols=%s timeframe=%s", + config.symbols, + timeframe, + ) + + try: + await ws.start() + finally: + await broker.close() + await db.close() + + +def main() -> None: + asyncio.run(run()) + + +if __name__ == "__main__": + main() diff --git a/services/data-collector/src/data_collector/storage.py b/services/data-collector/src/data_collector/storage.py new file mode 100644 index 0000000..1e40b82 --- /dev/null +++ b/services/data-collector/src/data_collector/storage.py @@ -0,0 +1,24 @@ +"""Candle storage: persists to DB and publishes to Redis.""" +from shared.events import CandleEvent +from shared.models import Candle + + +class CandleStorage: + """Stores candles in the database and publishes CandleEvents to Redis.""" + + def __init__(self, db, broker) -> None: + self._db = db + self._broker = broker + + async def store(self, candle: Candle) -> None: + """Insert candle into DB and publish a CandleEvent to the Redis stream.""" + await self._db.insert_candle(candle) + + event = CandleEvent(data=candle) + stream = f"candles.{candle.symbol}" + await self._broker.publish(stream, event.to_dict()) + + async def store_batch(self, candles: list[Candle]) -> None: + """Store multiple candles one by one.""" + for candle in candles: + await self.store(candle) |
