summaryrefslogtreecommitdiff
path: root/services/data-collector
diff options
context:
space:
mode:
Diffstat (limited to 'services/data-collector')
-rw-r--r--services/data-collector/Dockerfile9
-rw-r--r--services/data-collector/src/data_collector/binance_rest.py54
-rw-r--r--services/data-collector/src/data_collector/binance_ws.py109
-rw-r--r--services/data-collector/src/data_collector/config.py1
-rw-r--r--services/data-collector/src/data_collector/main.py34
-rw-r--r--services/data-collector/src/data_collector/ws_factory.py34
-rw-r--r--services/data-collector/tests/test_binance_rest.py48
-rw-r--r--services/data-collector/tests/test_storage.py15
-rw-r--r--services/data-collector/tests/test_ws_factory.py21
9 files changed, 38 insertions, 287 deletions
diff --git a/services/data-collector/Dockerfile b/services/data-collector/Dockerfile
index 8cb8af4..4d154c5 100644
--- a/services/data-collector/Dockerfile
+++ b/services/data-collector/Dockerfile
@@ -1,8 +1,15 @@
-FROM python:3.12-slim
+FROM python:3.12-slim AS builder
WORKDIR /app
COPY shared/ shared/
RUN pip install --no-cache-dir ./shared
COPY services/data-collector/ services/data-collector/
RUN pip install --no-cache-dir ./services/data-collector
+
+FROM python:3.12-slim
+RUN useradd -r -s /bin/false appuser
+WORKDIR /app
+COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
+COPY --from=builder /usr/local/bin /usr/local/bin
ENV PYTHONPATH=/app
+USER appuser
CMD ["python", "-m", "data_collector.main"]
diff --git a/services/data-collector/src/data_collector/binance_rest.py b/services/data-collector/src/data_collector/binance_rest.py
deleted file mode 100644
index eaf4e30..0000000
--- a/services/data-collector/src/data_collector/binance_rest.py
+++ /dev/null
@@ -1,54 +0,0 @@
-"""Binance REST API helpers for fetching historical candle data."""
-
-from datetime import datetime, timezone
-from decimal import Decimal
-
-from shared.models import Candle
-
-
-def _normalize_symbol(symbol: str) -> str:
- """Convert 'BTC/USDT' to 'BTCUSDT'."""
- return symbol.replace("/", "")
-
-
-async def fetch_historical_candles(
- exchange,
- symbol: str,
- timeframe: str,
- since: int,
- limit: int = 500,
-) -> list[Candle]:
- """Fetch historical OHLCV candles from the exchange and return Candle models.
-
- Args:
- exchange: An async ccxt exchange instance.
- symbol: Market symbol, e.g. 'BTC/USDT'.
- timeframe: Candle timeframe, e.g. '1m'.
- since: Start timestamp in milliseconds.
- limit: Maximum number of candles to fetch.
-
- Returns:
- A list of Candle model instances.
- """
- rows = await exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit)
-
- normalized = _normalize_symbol(symbol)
- candles: list[Candle] = []
-
- for row in rows:
- ts_ms, o, h, low, c, v = row
- open_time = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc)
- candles.append(
- Candle(
- symbol=normalized,
- timeframe=timeframe,
- open_time=open_time,
- open=Decimal(str(o)),
- high=Decimal(str(h)),
- low=Decimal(str(low)),
- close=Decimal(str(c)),
- volume=Decimal(str(v)),
- )
- )
-
- return candles
diff --git a/services/data-collector/src/data_collector/binance_ws.py b/services/data-collector/src/data_collector/binance_ws.py
deleted file mode 100644
index e25e7a6..0000000
--- a/services/data-collector/src/data_collector/binance_ws.py
+++ /dev/null
@@ -1,109 +0,0 @@
-"""Binance WebSocket client for real-time kline/candle data.
-
-NOTE: This module is Binance-specific (uses Binance WebSocket URL and message format).
-Multi-exchange WebSocket support would require exchange-specific implementations.
-"""
-
-import asyncio
-import json
-import logging
-from datetime import datetime, timezone
-from decimal import Decimal
-from typing import Callable, Awaitable
-
-import websockets
-
-from shared.models import Candle
-
-logger = logging.getLogger(__name__)
-
-BINANCE_WS_URL = "wss://stream.binance.com:9443/ws"
-RECONNECT_DELAY = 5 # seconds
-
-
-def _normalize_symbol(symbol: str) -> str:
- """Convert 'BTC/USDT' to 'BTCUSDT'."""
- return symbol.replace("/", "")
-
-
-def _stream_name(symbol: str, timeframe: str) -> str:
- """Build Binance stream name, e.g. 'btcusdt@kline_1m'."""
- return f"{_normalize_symbol(symbol).lower()}@kline_{timeframe}"
-
-
-class BinanceWebSocket:
- """Connects to Binance WebSocket streams and emits closed candles."""
-
- def __init__(
- self,
- symbols: list[str],
- timeframe: str,
- on_candle: Callable[[Candle], Awaitable[None]],
- ) -> None:
- self._symbols = symbols
- self._timeframe = timeframe
- self._on_candle = on_candle
- self._running = False
-
- def _build_subscribe_message(self) -> dict:
- streams = [_stream_name(s, self._timeframe) for s in self._symbols]
- return {
- "method": "SUBSCRIBE",
- "params": streams,
- "id": 1,
- }
-
- def _parse_candle(self, message: dict) -> Candle | None:
- """Parse a kline WebSocket message into a Candle, or None if not closed."""
- k = message.get("k")
- if k is None:
- return None
- if not k.get("x"): # only closed candles
- return None
-
- symbol = k["s"] # already normalized, e.g. 'BTCUSDT'
- open_time = datetime.fromtimestamp(k["t"] / 1000, tz=timezone.utc)
- return Candle(
- symbol=symbol,
- timeframe=self._timeframe,
- open_time=open_time,
- open=Decimal(k["o"]),
- high=Decimal(k["h"]),
- low=Decimal(k["l"]),
- close=Decimal(k["c"]),
- volume=Decimal(k["v"]),
- )
-
- async def _run_once(self) -> None:
- """Single connection attempt; processes messages until disconnected."""
- async with websockets.connect(BINANCE_WS_URL) as ws:
- subscribe_msg = self._build_subscribe_message()
- await ws.send(json.dumps(subscribe_msg))
- logger.info("Subscribed to Binance streams: %s", subscribe_msg["params"])
-
- async for raw in ws:
- if not self._running:
- break
- try:
- message = json.loads(raw)
- candle = self._parse_candle(message)
- if candle is not None:
- await self._on_candle(candle)
- except Exception:
- logger.exception("Error processing WebSocket message: %s", raw)
-
- async def start(self) -> None:
- """Connect to Binance WebSocket and process messages, auto-reconnecting."""
- self._running = True
- while self._running:
- try:
- await self._run_once()
- except Exception:
- if not self._running:
- break
- logger.warning("WebSocket disconnected. Reconnecting in %ds…", RECONNECT_DELAY)
- await asyncio.sleep(RECONNECT_DELAY)
-
- def stop(self) -> None:
- """Signal the WebSocket loop to stop after the current message."""
- self._running = False
diff --git a/services/data-collector/src/data_collector/config.py b/services/data-collector/src/data_collector/config.py
index 4761013..dd430e6 100644
--- a/services/data-collector/src/data_collector/config.py
+++ b/services/data-collector/src/data_collector/config.py
@@ -1,4 +1,5 @@
"""Data Collector configuration."""
+
from shared.config import Settings
diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py
index 38f8759..2d44848 100644
--- a/services/data-collector/src/data_collector/main.py
+++ b/services/data-collector/src/data_collector/main.py
@@ -1,9 +1,12 @@
"""Data Collector Service — fetches US stock data from Alpaca."""
+
import asyncio
+import aiohttp
+
+from data_collector.config import CollectorConfig
from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
-from shared.config import Settings
from shared.db import Database
from shared.events import CandleEvent
from shared.healthcheck import HealthCheckServer
@@ -11,8 +14,7 @@ from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.models import Candle
from shared.notifier import TelegramNotifier
-
-from data_collector.config import CollectorConfig
+from shared.shutdown import GracefulShutdown
# Health check port: base + 0
HEALTH_PORT_OFFSET = 0
@@ -33,6 +35,7 @@ async def fetch_latest_bars(
bar = bars[-1]
from datetime import datetime
from decimal import Decimal
+
candle = Candle(
symbol=symbol,
timeframe=timeframe,
@@ -44,8 +47,10 @@ async def fetch_latest_bars(
volume=Decimal(str(bar["v"])),
)
candles.append(candle)
- except Exception as exc:
- log.warning("fetch_bar_failed", symbol=symbol, error=str(exc))
+ except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc:
+ log.warning("fetch_bar_network_error", symbol=symbol, error=str(exc))
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("fetch_bar_parse_error", symbol=symbol, error=str(exc))
return candles
@@ -55,18 +60,18 @@ async def run() -> None:
metrics = ServiceMetrics("data_collector")
notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token,
+ bot_token=config.telegram_bot_token.get_secret_value(),
chat_id=config.telegram_chat_id,
)
- db = Database(config.database_url)
+ db = Database(config.database_url.get_secret_value())
await db.connect()
- broker = RedisBroker(config.redis_url)
+ broker = RedisBroker(config.redis_url.get_secret_value())
alpaca = AlpacaClient(
- api_key=config.alpaca_api_key,
- api_secret=config.alpaca_api_secret,
+ api_key=config.alpaca_api_key.get_secret_value(),
+ api_secret=config.alpaca_api_secret.get_secret_value(),
paper=config.alpaca_paper,
)
@@ -82,14 +87,17 @@ async def run() -> None:
symbols = config.symbols
timeframe = config.timeframes[0] if config.timeframes else "1Day"
+ shutdown = GracefulShutdown()
+ shutdown.install_handlers()
+
log.info("starting", symbols=symbols, timeframe=timeframe, poll_interval=poll_interval)
try:
- while True:
+ while not shutdown.is_shutting_down:
# Check if market is open
try:
is_open = await alpaca.is_market_open()
- except Exception:
+ except (aiohttp.ClientError, ConnectionError, TimeoutError):
is_open = False
if is_open:
@@ -108,7 +116,7 @@ async def run() -> None:
await asyncio.sleep(poll_interval)
except Exception as exc:
- log.error("fatal_error", error=str(exc))
+ log.error("fatal_error", error=str(exc), exc_info=True)
await notifier.send_error(str(exc), "data-collector")
raise
finally:
diff --git a/services/data-collector/src/data_collector/ws_factory.py b/services/data-collector/src/data_collector/ws_factory.py
deleted file mode 100644
index e068399..0000000
--- a/services/data-collector/src/data_collector/ws_factory.py
+++ /dev/null
@@ -1,34 +0,0 @@
-"""WebSocket factory for exchange-specific connections."""
-
-import logging
-
-from data_collector.binance_ws import BinanceWebSocket
-
-logger = logging.getLogger(__name__)
-
-# Supported exchanges for WebSocket streaming
-SUPPORTED_WS = {"binance": BinanceWebSocket}
-
-
-def create_websocket(exchange_id: str, **kwargs):
- """Create an exchange-specific WebSocket handler.
-
- Args:
- exchange_id: Exchange identifier (e.g. 'binance')
- **kwargs: Passed to the WebSocket constructor (symbols, timeframe, on_candle)
-
- Returns:
- WebSocket handler instance
-
- Raises:
- ValueError: If exchange is not supported for WebSocket streaming
- """
- ws_cls = SUPPORTED_WS.get(exchange_id)
- if ws_cls is None:
- supported = ", ".join(sorted(SUPPORTED_WS.keys()))
- raise ValueError(
- f"WebSocket streaming not supported for '{exchange_id}'. "
- f"Supported: {supported}. "
- f"Use REST polling as fallback for unsupported exchanges."
- )
- return ws_cls(**kwargs)
diff --git a/services/data-collector/tests/test_binance_rest.py b/services/data-collector/tests/test_binance_rest.py
deleted file mode 100644
index bf88210..0000000
--- a/services/data-collector/tests/test_binance_rest.py
+++ /dev/null
@@ -1,48 +0,0 @@
-"""Tests for binance_rest module."""
-
-import pytest
-from decimal import Decimal
-from unittest.mock import AsyncMock, MagicMock
-from datetime import datetime, timezone
-
-from data_collector.binance_rest import fetch_historical_candles
-
-
-@pytest.mark.asyncio
-async def test_fetch_historical_candles_parses_response():
- """Verify that OHLCV rows are correctly parsed into Candle models."""
- ts = 1700000000000 # milliseconds
- mock_exchange = MagicMock()
- mock_exchange.fetch_ohlcv = AsyncMock(
- return_value=[
- [ts, 30000.0, 30100.0, 29900.0, 30050.0, 1.5],
- [ts + 60000, 30050.0, 30200.0, 30000.0, 30150.0, 2.0],
- ]
- )
-
- candles = await fetch_historical_candles(mock_exchange, "BTC/USDT", "1m", since=ts, limit=500)
-
- assert len(candles) == 2
-
- c = candles[0]
- assert c.symbol == "BTCUSDT"
- assert c.timeframe == "1m"
- assert c.open_time == datetime.fromtimestamp(ts / 1000, tz=timezone.utc)
- assert c.open == Decimal("30000.0")
- assert c.high == Decimal("30100.0")
- assert c.low == Decimal("29900.0")
- assert c.close == Decimal("30050.0")
- assert c.volume == Decimal("1.5")
-
- mock_exchange.fetch_ohlcv.assert_called_once_with("BTC/USDT", "1m", since=ts, limit=500)
-
-
-@pytest.mark.asyncio
-async def test_fetch_historical_candles_empty_response():
- """Verify that an empty exchange response returns an empty list."""
- mock_exchange = MagicMock()
- mock_exchange.fetch_ohlcv = AsyncMock(return_value=[])
-
- candles = await fetch_historical_candles(mock_exchange, "BTC/USDT", "1m", since=1700000000000)
-
- assert candles == []
diff --git a/services/data-collector/tests/test_storage.py b/services/data-collector/tests/test_storage.py
index be85578..51f3aee 100644
--- a/services/data-collector/tests/test_storage.py
+++ b/services/data-collector/tests/test_storage.py
@@ -1,19 +1,20 @@
"""Tests for storage module."""
-import pytest
+from datetime import UTC, datetime
from decimal import Decimal
-from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock
-from shared.models import Candle
+import pytest
from data_collector.storage import CandleStorage
+from shared.models import Candle
+
-def _make_candle(symbol: str = "BTCUSDT") -> Candle:
+def _make_candle(symbol: str = "AAPL") -> Candle:
return Candle(
symbol=symbol,
timeframe="1m",
- open_time=datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc),
+ open_time=datetime(2024, 1, 1, 0, 0, 0, tzinfo=UTC),
open=Decimal("30000"),
high=Decimal("30100"),
low=Decimal("29900"),
@@ -39,11 +40,11 @@ async def test_storage_saves_to_db_and_publishes():
mock_broker.publish.assert_called_once()
stream_arg = mock_broker.publish.call_args[0][0]
- assert stream_arg == "candles.BTCUSDT"
+ assert stream_arg == "candles.AAPL"
data_arg = mock_broker.publish.call_args[0][1]
assert data_arg["type"] == "CANDLE"
- assert data_arg["data"]["symbol"] == "BTCUSDT"
+ assert data_arg["data"]["symbol"] == "AAPL"
@pytest.mark.asyncio
diff --git a/services/data-collector/tests/test_ws_factory.py b/services/data-collector/tests/test_ws_factory.py
deleted file mode 100644
index cdddcca..0000000
--- a/services/data-collector/tests/test_ws_factory.py
+++ /dev/null
@@ -1,21 +0,0 @@
-"""Tests for WebSocket factory."""
-
-import pytest
-from data_collector.ws_factory import create_websocket, SUPPORTED_WS
-from data_collector.binance_ws import BinanceWebSocket
-
-
-def test_create_binance_ws():
- ws = create_websocket("binance", symbols=["BTCUSDT"], timeframe="1m", on_candle=lambda c: None)
- assert isinstance(ws, BinanceWebSocket)
-
-
-def test_create_unsupported_exchange():
- with pytest.raises(ValueError, match="not supported"):
- create_websocket(
- "unsupported_exchange", symbols=["BTCUSDT"], timeframe="1m", on_candle=lambda c: None
- )
-
-
-def test_supported_exchanges():
- assert "binance" in SUPPORTED_WS