diff options
Diffstat (limited to 'services/data-collector')
| -rw-r--r-- | services/data-collector/pyproject.toml | 8 | ||||
| -rw-r--r-- | services/data-collector/src/data_collector/binance_rest.py | 54 | ||||
| -rw-r--r-- | services/data-collector/src/data_collector/binance_ws.py | 109 | ||||
| -rw-r--r-- | services/data-collector/src/data_collector/config.py | 7 | ||||
| -rw-r--r-- | services/data-collector/src/data_collector/main.py | 110 | ||||
| -rw-r--r-- | services/data-collector/src/data_collector/ws_factory.py | 34 | ||||
| -rw-r--r-- | services/data-collector/tests/test_binance_rest.py | 48 | ||||
| -rw-r--r-- | services/data-collector/tests/test_storage.py | 6 | ||||
| -rw-r--r-- | services/data-collector/tests/test_ws_factory.py | 21 |
9 files changed, 83 insertions, 314 deletions
diff --git a/services/data-collector/pyproject.toml b/services/data-collector/pyproject.toml index 5fba78f..48282c3 100644 --- a/services/data-collector/pyproject.toml +++ b/services/data-collector/pyproject.toml @@ -1,13 +1,9 @@ [project] name = "data-collector" version = "0.1.0" -description = "Binance market data collector service" +description = "Alpaca market data collector service" requires-python = ">=3.12" -dependencies = [ - "ccxt>=4.0", - "websockets>=12.0", - "trading-shared", -] +dependencies = ["trading-shared"] [project.optional-dependencies] dev = [ diff --git a/services/data-collector/src/data_collector/binance_rest.py b/services/data-collector/src/data_collector/binance_rest.py deleted file mode 100644 index eaf4e30..0000000 --- a/services/data-collector/src/data_collector/binance_rest.py +++ /dev/null @@ -1,54 +0,0 @@ -"""Binance REST API helpers for fetching historical candle data.""" - -from datetime import datetime, timezone -from decimal import Decimal - -from shared.models import Candle - - -def _normalize_symbol(symbol: str) -> str: - """Convert 'BTC/USDT' to 'BTCUSDT'.""" - return symbol.replace("/", "") - - -async def fetch_historical_candles( - exchange, - symbol: str, - timeframe: str, - since: int, - limit: int = 500, -) -> list[Candle]: - """Fetch historical OHLCV candles from the exchange and return Candle models. - - Args: - exchange: An async ccxt exchange instance. - symbol: Market symbol, e.g. 'BTC/USDT'. - timeframe: Candle timeframe, e.g. '1m'. - since: Start timestamp in milliseconds. - limit: Maximum number of candles to fetch. - - Returns: - A list of Candle model instances. - """ - rows = await exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit) - - normalized = _normalize_symbol(symbol) - candles: list[Candle] = [] - - for row in rows: - ts_ms, o, h, low, c, v = row - open_time = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc) - candles.append( - Candle( - symbol=normalized, - timeframe=timeframe, - open_time=open_time, - open=Decimal(str(o)), - high=Decimal(str(h)), - low=Decimal(str(low)), - close=Decimal(str(c)), - volume=Decimal(str(v)), - ) - ) - - return candles diff --git a/services/data-collector/src/data_collector/binance_ws.py b/services/data-collector/src/data_collector/binance_ws.py deleted file mode 100644 index e25e7a6..0000000 --- a/services/data-collector/src/data_collector/binance_ws.py +++ /dev/null @@ -1,109 +0,0 @@ -"""Binance WebSocket client for real-time kline/candle data. - -NOTE: This module is Binance-specific (uses Binance WebSocket URL and message format). -Multi-exchange WebSocket support would require exchange-specific implementations. -""" - -import asyncio -import json -import logging -from datetime import datetime, timezone -from decimal import Decimal -from typing import Callable, Awaitable - -import websockets - -from shared.models import Candle - -logger = logging.getLogger(__name__) - -BINANCE_WS_URL = "wss://stream.binance.com:9443/ws" -RECONNECT_DELAY = 5 # seconds - - -def _normalize_symbol(symbol: str) -> str: - """Convert 'BTC/USDT' to 'BTCUSDT'.""" - return symbol.replace("/", "") - - -def _stream_name(symbol: str, timeframe: str) -> str: - """Build Binance stream name, e.g. 'btcusdt@kline_1m'.""" - return f"{_normalize_symbol(symbol).lower()}@kline_{timeframe}" - - -class BinanceWebSocket: - """Connects to Binance WebSocket streams and emits closed candles.""" - - def __init__( - self, - symbols: list[str], - timeframe: str, - on_candle: Callable[[Candle], Awaitable[None]], - ) -> None: - self._symbols = symbols - self._timeframe = timeframe - self._on_candle = on_candle - self._running = False - - def _build_subscribe_message(self) -> dict: - streams = [_stream_name(s, self._timeframe) for s in self._symbols] - return { - "method": "SUBSCRIBE", - "params": streams, - "id": 1, - } - - def _parse_candle(self, message: dict) -> Candle | None: - """Parse a kline WebSocket message into a Candle, or None if not closed.""" - k = message.get("k") - if k is None: - return None - if not k.get("x"): # only closed candles - return None - - symbol = k["s"] # already normalized, e.g. 'BTCUSDT' - open_time = datetime.fromtimestamp(k["t"] / 1000, tz=timezone.utc) - return Candle( - symbol=symbol, - timeframe=self._timeframe, - open_time=open_time, - open=Decimal(k["o"]), - high=Decimal(k["h"]), - low=Decimal(k["l"]), - close=Decimal(k["c"]), - volume=Decimal(k["v"]), - ) - - async def _run_once(self) -> None: - """Single connection attempt; processes messages until disconnected.""" - async with websockets.connect(BINANCE_WS_URL) as ws: - subscribe_msg = self._build_subscribe_message() - await ws.send(json.dumps(subscribe_msg)) - logger.info("Subscribed to Binance streams: %s", subscribe_msg["params"]) - - async for raw in ws: - if not self._running: - break - try: - message = json.loads(raw) - candle = self._parse_candle(message) - if candle is not None: - await self._on_candle(candle) - except Exception: - logger.exception("Error processing WebSocket message: %s", raw) - - async def start(self) -> None: - """Connect to Binance WebSocket and process messages, auto-reconnecting.""" - self._running = True - while self._running: - try: - await self._run_once() - except Exception: - if not self._running: - break - logger.warning("WebSocket disconnected. Reconnecting in %ds…", RECONNECT_DELAY) - await asyncio.sleep(RECONNECT_DELAY) - - def stop(self) -> None: - """Signal the WebSocket loop to stop after the current message.""" - self._running = False diff --git a/services/data-collector/src/data_collector/config.py b/services/data-collector/src/data_collector/config.py index 1e080e5..dd430e6 100644 --- a/services/data-collector/src/data_collector/config.py +++ b/services/data-collector/src/data_collector/config.py @@ -1,6 +1,9 @@ +"""Data Collector configuration.""" + from shared.config import Settings class CollectorConfig(Settings): - symbols: list[str] = ["BTC/USDT"] - timeframes: list[str] = ["1m"] + symbols: list[str] = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA"] + timeframes: list[str] = ["5Min"] + poll_interval_seconds: int = 60 diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py index eebe14a..b42b34c 100644 --- a/services/data-collector/src/data_collector/main.py +++ b/services/data-collector/src/data_collector/main.py @@ -1,59 +1,74 @@ -"""Data Collector Service entry point.""" +"""Data Collector Service — fetches US stock data from Alpaca.""" import asyncio +from shared.alpaca import AlpacaClient from shared.broker import RedisBroker from shared.db import Database +from shared.events import CandleEvent from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics +from shared.models import Candle from shared.notifier import TelegramNotifier from data_collector.config import CollectorConfig -from data_collector.storage import CandleStorage -from data_collector.ws_factory import create_websocket - -# Health check port: base (HEALTH_PORT, default 8080) + offset -# data-collector: +0 (8080), strategy-engine: +1 (8081) -# order-executor: +2 (8082), portfolio-manager: +3 (8083) +# Health check port: base + 0 HEALTH_PORT_OFFSET = 0 +async def fetch_latest_bars( + alpaca: AlpacaClient, + symbols: list[str], + timeframe: str, + log, +) -> list[Candle]: + """Fetch latest bar for each symbol from Alpaca.""" + candles = [] + for symbol in symbols: + try: + bars = await alpaca.get_bars(symbol, timeframe=timeframe, limit=1) + if bars: + bar = bars[-1] + from datetime import datetime + from decimal import Decimal + + candle = Candle( + symbol=symbol, + timeframe=timeframe, + open_time=datetime.fromisoformat(bar["t"].replace("Z", "+00:00")), + open=Decimal(str(bar["o"])), + high=Decimal(str(bar["h"])), + low=Decimal(str(bar["l"])), + close=Decimal(str(bar["c"])), + volume=Decimal(str(bar["v"])), + ) + candles.append(candle) + except Exception as exc: + log.warning("fetch_bar_failed", symbol=symbol, error=str(exc)) + return candles + + async def run() -> None: - """Initialise all components and start the WebSocket collector.""" config = CollectorConfig() log = setup_logging("data-collector", config.log_level, config.log_format) metrics = ServiceMetrics("data_collector") + notifier = TelegramNotifier( - bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id + bot_token=config.telegram_bot_token, + chat_id=config.telegram_chat_id, ) db = Database(config.database_url) await db.connect() - await db.init_tables() broker = RedisBroker(config.redis_url) - storage = CandleStorage(db=db, broker=broker) - - async def on_candle(candle): - log.info( - "candle_received", - symbol=candle.symbol, - timeframe=candle.timeframe, - open_time=str(candle.open_time), - ) - await storage.store(candle) - metrics.events_processed.labels(service="data-collector", event_type="candle").inc() - - # Use the first configured timeframe for the WebSocket subscription. - timeframe = config.timeframes[0] if config.timeframes else "1m" - - ws = create_websocket( - exchange_id=config.exchange_id, - symbols=config.symbols, - timeframe=timeframe, - on_candle=on_candle, + + alpaca = AlpacaClient( + api_key=config.alpaca_api_key, + api_secret=config.alpaca_api_secret, + paper=config.alpaca_paper, ) health = HealthCheckServer( @@ -61,18 +76,38 @@ async def run() -> None: port=config.health_port + HEALTH_PORT_OFFSET, auth_token=config.metrics_auth_token, ) - health.register_check("redis", broker.ping) await health.start() metrics.service_up.labels(service="data-collector").set(1) - log.info( - "service_started", - symbols=config.symbols, - timeframe=timeframe, - ) + poll_interval = int(getattr(config, "poll_interval_seconds", 60)) + symbols = config.symbols + timeframe = config.timeframes[0] if config.timeframes else "1Day" + + log.info("starting", symbols=symbols, timeframe=timeframe, poll_interval=poll_interval) try: - await ws.start() + while True: + # Check if market is open + try: + is_open = await alpaca.is_market_open() + except Exception: + is_open = False + + if is_open: + candles = await fetch_latest_bars(alpaca, symbols, timeframe, log) + for candle in candles: + await db.insert_candle(candle) + event = CandleEvent(data=candle) + stream = f"candles.{candle.symbol}" + await broker.publish(stream, event.to_dict()) + metrics.events_processed.labels( + service="data-collector", event_type="candle" + ).inc() + log.info("candle_stored", symbol=candle.symbol, close=str(candle.close)) + else: + log.debug("market_closed") + + await asyncio.sleep(poll_interval) except Exception as exc: log.error("fatal_error", error=str(exc)) await notifier.send_error(str(exc), "data-collector") @@ -80,6 +115,7 @@ async def run() -> None: finally: metrics.service_up.labels(service="data-collector").set(0) await notifier.close() + await alpaca.close() await broker.close() await db.close() diff --git a/services/data-collector/src/data_collector/ws_factory.py b/services/data-collector/src/data_collector/ws_factory.py deleted file mode 100644 index e068399..0000000 --- a/services/data-collector/src/data_collector/ws_factory.py +++ /dev/null @@ -1,34 +0,0 @@ -"""WebSocket factory for exchange-specific connections.""" - -import logging - -from data_collector.binance_ws import BinanceWebSocket - -logger = logging.getLogger(__name__) - -# Supported exchanges for WebSocket streaming -SUPPORTED_WS = {"binance": BinanceWebSocket} - - -def create_websocket(exchange_id: str, **kwargs): - """Create an exchange-specific WebSocket handler. - - Args: - exchange_id: Exchange identifier (e.g. 'binance') - **kwargs: Passed to the WebSocket constructor (symbols, timeframe, on_candle) - - Returns: - WebSocket handler instance - - Raises: - ValueError: If exchange is not supported for WebSocket streaming - """ - ws_cls = SUPPORTED_WS.get(exchange_id) - if ws_cls is None: - supported = ", ".join(sorted(SUPPORTED_WS.keys())) - raise ValueError( - f"WebSocket streaming not supported for '{exchange_id}'. " - f"Supported: {supported}. " - f"Use REST polling as fallback for unsupported exchanges." - ) - return ws_cls(**kwargs) diff --git a/services/data-collector/tests/test_binance_rest.py b/services/data-collector/tests/test_binance_rest.py deleted file mode 100644 index bf88210..0000000 --- a/services/data-collector/tests/test_binance_rest.py +++ /dev/null @@ -1,48 +0,0 @@ -"""Tests for binance_rest module.""" - -import pytest -from decimal import Decimal -from unittest.mock import AsyncMock, MagicMock -from datetime import datetime, timezone - -from data_collector.binance_rest import fetch_historical_candles - - -@pytest.mark.asyncio -async def test_fetch_historical_candles_parses_response(): - """Verify that OHLCV rows are correctly parsed into Candle models.""" - ts = 1700000000000 # milliseconds - mock_exchange = MagicMock() - mock_exchange.fetch_ohlcv = AsyncMock( - return_value=[ - [ts, 30000.0, 30100.0, 29900.0, 30050.0, 1.5], - [ts + 60000, 30050.0, 30200.0, 30000.0, 30150.0, 2.0], - ] - ) - - candles = await fetch_historical_candles(mock_exchange, "BTC/USDT", "1m", since=ts, limit=500) - - assert len(candles) == 2 - - c = candles[0] - assert c.symbol == "BTCUSDT" - assert c.timeframe == "1m" - assert c.open_time == datetime.fromtimestamp(ts / 1000, tz=timezone.utc) - assert c.open == Decimal("30000.0") - assert c.high == Decimal("30100.0") - assert c.low == Decimal("29900.0") - assert c.close == Decimal("30050.0") - assert c.volume == Decimal("1.5") - - mock_exchange.fetch_ohlcv.assert_called_once_with("BTC/USDT", "1m", since=ts, limit=500) - - -@pytest.mark.asyncio -async def test_fetch_historical_candles_empty_response(): - """Verify that an empty exchange response returns an empty list.""" - mock_exchange = MagicMock() - mock_exchange.fetch_ohlcv = AsyncMock(return_value=[]) - - candles = await fetch_historical_candles(mock_exchange, "BTC/USDT", "1m", since=1700000000000) - - assert candles == [] diff --git a/services/data-collector/tests/test_storage.py b/services/data-collector/tests/test_storage.py index be85578..ffffa40 100644 --- a/services/data-collector/tests/test_storage.py +++ b/services/data-collector/tests/test_storage.py @@ -9,7 +9,7 @@ from shared.models import Candle from data_collector.storage import CandleStorage -def _make_candle(symbol: str = "BTCUSDT") -> Candle: +def _make_candle(symbol: str = "AAPL") -> Candle: return Candle( symbol=symbol, timeframe="1m", @@ -39,11 +39,11 @@ async def test_storage_saves_to_db_and_publishes(): mock_broker.publish.assert_called_once() stream_arg = mock_broker.publish.call_args[0][0] - assert stream_arg == "candles.BTCUSDT" + assert stream_arg == "candles.AAPL" data_arg = mock_broker.publish.call_args[0][1] assert data_arg["type"] == "CANDLE" - assert data_arg["data"]["symbol"] == "BTCUSDT" + assert data_arg["data"]["symbol"] == "AAPL" @pytest.mark.asyncio diff --git a/services/data-collector/tests/test_ws_factory.py b/services/data-collector/tests/test_ws_factory.py deleted file mode 100644 index cdddcca..0000000 --- a/services/data-collector/tests/test_ws_factory.py +++ /dev/null @@ -1,21 +0,0 @@ -"""Tests for WebSocket factory.""" - -import pytest -from data_collector.ws_factory import create_websocket, SUPPORTED_WS -from data_collector.binance_ws import BinanceWebSocket - - -def test_create_binance_ws(): - ws = create_websocket("binance", symbols=["BTCUSDT"], timeframe="1m", on_candle=lambda c: None) - assert isinstance(ws, BinanceWebSocket) - - -def test_create_unsupported_exchange(): - with pytest.raises(ValueError, match="not supported"): - create_websocket( - "unsupported_exchange", symbols=["BTCUSDT"], timeframe="1m", on_candle=lambda c: None - ) - - -def test_supported_exchanges(): - assert "binance" in SUPPORTED_WS |
