summaryrefslogtreecommitdiff
path: root/services/data-collector/src
diff options
context:
space:
mode:
Diffstat (limited to 'services/data-collector/src')
-rw-r--r--services/data-collector/src/data_collector/binance_rest.py54
-rw-r--r--services/data-collector/src/data_collector/binance_ws.py109
-rw-r--r--services/data-collector/src/data_collector/config.py7
-rw-r--r--services/data-collector/src/data_collector/main.py127
-rw-r--r--services/data-collector/src/data_collector/ws_factory.py34
5 files changed, 90 insertions, 241 deletions
diff --git a/services/data-collector/src/data_collector/binance_rest.py b/services/data-collector/src/data_collector/binance_rest.py
deleted file mode 100644
index eaf4e30..0000000
--- a/services/data-collector/src/data_collector/binance_rest.py
+++ /dev/null
@@ -1,54 +0,0 @@
-"""Binance REST API helpers for fetching historical candle data."""
-
-from datetime import datetime, timezone
-from decimal import Decimal
-
-from shared.models import Candle
-
-
-def _normalize_symbol(symbol: str) -> str:
- """Convert 'BTC/USDT' to 'BTCUSDT'."""
- return symbol.replace("/", "")
-
-
-async def fetch_historical_candles(
- exchange,
- symbol: str,
- timeframe: str,
- since: int,
- limit: int = 500,
-) -> list[Candle]:
- """Fetch historical OHLCV candles from the exchange and return Candle models.
-
- Args:
- exchange: An async ccxt exchange instance.
- symbol: Market symbol, e.g. 'BTC/USDT'.
- timeframe: Candle timeframe, e.g. '1m'.
- since: Start timestamp in milliseconds.
- limit: Maximum number of candles to fetch.
-
- Returns:
- A list of Candle model instances.
- """
- rows = await exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit)
-
- normalized = _normalize_symbol(symbol)
- candles: list[Candle] = []
-
- for row in rows:
- ts_ms, o, h, low, c, v = row
- open_time = datetime.fromtimestamp(ts_ms / 1000, tz=timezone.utc)
- candles.append(
- Candle(
- symbol=normalized,
- timeframe=timeframe,
- open_time=open_time,
- open=Decimal(str(o)),
- high=Decimal(str(h)),
- low=Decimal(str(low)),
- close=Decimal(str(c)),
- volume=Decimal(str(v)),
- )
- )
-
- return candles
diff --git a/services/data-collector/src/data_collector/binance_ws.py b/services/data-collector/src/data_collector/binance_ws.py
deleted file mode 100644
index e25e7a6..0000000
--- a/services/data-collector/src/data_collector/binance_ws.py
+++ /dev/null
@@ -1,109 +0,0 @@
-"""Binance WebSocket client for real-time kline/candle data.
-
-NOTE: This module is Binance-specific (uses Binance WebSocket URL and message format).
-Multi-exchange WebSocket support would require exchange-specific implementations.
-"""
-
-import asyncio
-import json
-import logging
-from datetime import datetime, timezone
-from decimal import Decimal
-from typing import Callable, Awaitable
-
-import websockets
-
-from shared.models import Candle
-
-logger = logging.getLogger(__name__)
-
-BINANCE_WS_URL = "wss://stream.binance.com:9443/ws"
-RECONNECT_DELAY = 5 # seconds
-
-
-def _normalize_symbol(symbol: str) -> str:
- """Convert 'BTC/USDT' to 'BTCUSDT'."""
- return symbol.replace("/", "")
-
-
-def _stream_name(symbol: str, timeframe: str) -> str:
- """Build Binance stream name, e.g. 'btcusdt@kline_1m'."""
- return f"{_normalize_symbol(symbol).lower()}@kline_{timeframe}"
-
-
-class BinanceWebSocket:
- """Connects to Binance WebSocket streams and emits closed candles."""
-
- def __init__(
- self,
- symbols: list[str],
- timeframe: str,
- on_candle: Callable[[Candle], Awaitable[None]],
- ) -> None:
- self._symbols = symbols
- self._timeframe = timeframe
- self._on_candle = on_candle
- self._running = False
-
- def _build_subscribe_message(self) -> dict:
- streams = [_stream_name(s, self._timeframe) for s in self._symbols]
- return {
- "method": "SUBSCRIBE",
- "params": streams,
- "id": 1,
- }
-
- def _parse_candle(self, message: dict) -> Candle | None:
- """Parse a kline WebSocket message into a Candle, or None if not closed."""
- k = message.get("k")
- if k is None:
- return None
- if not k.get("x"): # only closed candles
- return None
-
- symbol = k["s"] # already normalized, e.g. 'BTCUSDT'
- open_time = datetime.fromtimestamp(k["t"] / 1000, tz=timezone.utc)
- return Candle(
- symbol=symbol,
- timeframe=self._timeframe,
- open_time=open_time,
- open=Decimal(k["o"]),
- high=Decimal(k["h"]),
- low=Decimal(k["l"]),
- close=Decimal(k["c"]),
- volume=Decimal(k["v"]),
- )
-
- async def _run_once(self) -> None:
- """Single connection attempt; processes messages until disconnected."""
- async with websockets.connect(BINANCE_WS_URL) as ws:
- subscribe_msg = self._build_subscribe_message()
- await ws.send(json.dumps(subscribe_msg))
- logger.info("Subscribed to Binance streams: %s", subscribe_msg["params"])
-
- async for raw in ws:
- if not self._running:
- break
- try:
- message = json.loads(raw)
- candle = self._parse_candle(message)
- if candle is not None:
- await self._on_candle(candle)
- except Exception:
- logger.exception("Error processing WebSocket message: %s", raw)
-
- async def start(self) -> None:
- """Connect to Binance WebSocket and process messages, auto-reconnecting."""
- self._running = True
- while self._running:
- try:
- await self._run_once()
- except Exception:
- if not self._running:
- break
- logger.warning("WebSocket disconnected. Reconnecting in %ds…", RECONNECT_DELAY)
- await asyncio.sleep(RECONNECT_DELAY)
-
- def stop(self) -> None:
- """Signal the WebSocket loop to stop after the current message."""
- self._running = False
diff --git a/services/data-collector/src/data_collector/config.py b/services/data-collector/src/data_collector/config.py
index 1e080e5..dd430e6 100644
--- a/services/data-collector/src/data_collector/config.py
+++ b/services/data-collector/src/data_collector/config.py
@@ -1,6 +1,9 @@
+"""Data Collector configuration."""
+
from shared.config import Settings
class CollectorConfig(Settings):
- symbols: list[str] = ["BTC/USDT"]
- timeframes: list[str] = ["1m"]
+ symbols: list[str] = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA"]
+ timeframes: list[str] = ["5Min"]
+ poll_interval_seconds: int = 60
diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py
index eebe14a..2d44848 100644
--- a/services/data-collector/src/data_collector/main.py
+++ b/services/data-collector/src/data_collector/main.py
@@ -1,59 +1,78 @@
-"""Data Collector Service entry point."""
+"""Data Collector Service — fetches US stock data from Alpaca."""
import asyncio
+import aiohttp
+
+from data_collector.config import CollectorConfig
+from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
from shared.db import Database
+from shared.events import CandleEvent
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
+from shared.models import Candle
from shared.notifier import TelegramNotifier
+from shared.shutdown import GracefulShutdown
-from data_collector.config import CollectorConfig
-from data_collector.storage import CandleStorage
-from data_collector.ws_factory import create_websocket
+# Health check port: base + 0
+HEALTH_PORT_OFFSET = 0
-# Health check port: base (HEALTH_PORT, default 8080) + offset
-# data-collector: +0 (8080), strategy-engine: +1 (8081)
-# order-executor: +2 (8082), portfolio-manager: +3 (8083)
-HEALTH_PORT_OFFSET = 0
+async def fetch_latest_bars(
+ alpaca: AlpacaClient,
+ symbols: list[str],
+ timeframe: str,
+ log,
+) -> list[Candle]:
+ """Fetch latest bar for each symbol from Alpaca."""
+ candles = []
+ for symbol in symbols:
+ try:
+ bars = await alpaca.get_bars(symbol, timeframe=timeframe, limit=1)
+ if bars:
+ bar = bars[-1]
+ from datetime import datetime
+ from decimal import Decimal
+
+ candle = Candle(
+ symbol=symbol,
+ timeframe=timeframe,
+ open_time=datetime.fromisoformat(bar["t"].replace("Z", "+00:00")),
+ open=Decimal(str(bar["o"])),
+ high=Decimal(str(bar["h"])),
+ low=Decimal(str(bar["l"])),
+ close=Decimal(str(bar["c"])),
+ volume=Decimal(str(bar["v"])),
+ )
+ candles.append(candle)
+ except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc:
+ log.warning("fetch_bar_network_error", symbol=symbol, error=str(exc))
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("fetch_bar_parse_error", symbol=symbol, error=str(exc))
+ return candles
async def run() -> None:
- """Initialise all components and start the WebSocket collector."""
config = CollectorConfig()
log = setup_logging("data-collector", config.log_level, config.log_format)
metrics = ServiceMetrics("data_collector")
+
notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id
+ bot_token=config.telegram_bot_token.get_secret_value(),
+ chat_id=config.telegram_chat_id,
)
- db = Database(config.database_url)
+ db = Database(config.database_url.get_secret_value())
await db.connect()
- await db.init_tables()
-
- broker = RedisBroker(config.redis_url)
- storage = CandleStorage(db=db, broker=broker)
-
- async def on_candle(candle):
- log.info(
- "candle_received",
- symbol=candle.symbol,
- timeframe=candle.timeframe,
- open_time=str(candle.open_time),
- )
- await storage.store(candle)
- metrics.events_processed.labels(service="data-collector", event_type="candle").inc()
-
- # Use the first configured timeframe for the WebSocket subscription.
- timeframe = config.timeframes[0] if config.timeframes else "1m"
-
- ws = create_websocket(
- exchange_id=config.exchange_id,
- symbols=config.symbols,
- timeframe=timeframe,
- on_candle=on_candle,
+
+ broker = RedisBroker(config.redis_url.get_secret_value())
+
+ alpaca = AlpacaClient(
+ api_key=config.alpaca_api_key.get_secret_value(),
+ api_secret=config.alpaca_api_secret.get_secret_value(),
+ paper=config.alpaca_paper,
)
health = HealthCheckServer(
@@ -61,25 +80,49 @@ async def run() -> None:
port=config.health_port + HEALTH_PORT_OFFSET,
auth_token=config.metrics_auth_token,
)
- health.register_check("redis", broker.ping)
await health.start()
metrics.service_up.labels(service="data-collector").set(1)
- log.info(
- "service_started",
- symbols=config.symbols,
- timeframe=timeframe,
- )
+ poll_interval = int(getattr(config, "poll_interval_seconds", 60))
+ symbols = config.symbols
+ timeframe = config.timeframes[0] if config.timeframes else "1Day"
+
+ shutdown = GracefulShutdown()
+ shutdown.install_handlers()
+
+ log.info("starting", symbols=symbols, timeframe=timeframe, poll_interval=poll_interval)
try:
- await ws.start()
+ while not shutdown.is_shutting_down:
+ # Check if market is open
+ try:
+ is_open = await alpaca.is_market_open()
+ except (aiohttp.ClientError, ConnectionError, TimeoutError):
+ is_open = False
+
+ if is_open:
+ candles = await fetch_latest_bars(alpaca, symbols, timeframe, log)
+ for candle in candles:
+ await db.insert_candle(candle)
+ event = CandleEvent(data=candle)
+ stream = f"candles.{candle.symbol}"
+ await broker.publish(stream, event.to_dict())
+ metrics.events_processed.labels(
+ service="data-collector", event_type="candle"
+ ).inc()
+ log.info("candle_stored", symbol=candle.symbol, close=str(candle.close))
+ else:
+ log.debug("market_closed")
+
+ await asyncio.sleep(poll_interval)
except Exception as exc:
- log.error("fatal_error", error=str(exc))
+ log.error("fatal_error", error=str(exc), exc_info=True)
await notifier.send_error(str(exc), "data-collector")
raise
finally:
metrics.service_up.labels(service="data-collector").set(0)
await notifier.close()
+ await alpaca.close()
await broker.close()
await db.close()
diff --git a/services/data-collector/src/data_collector/ws_factory.py b/services/data-collector/src/data_collector/ws_factory.py
deleted file mode 100644
index e068399..0000000
--- a/services/data-collector/src/data_collector/ws_factory.py
+++ /dev/null
@@ -1,34 +0,0 @@
-"""WebSocket factory for exchange-specific connections."""
-
-import logging
-
-from data_collector.binance_ws import BinanceWebSocket
-
-logger = logging.getLogger(__name__)
-
-# Supported exchanges for WebSocket streaming
-SUPPORTED_WS = {"binance": BinanceWebSocket}
-
-
-def create_websocket(exchange_id: str, **kwargs):
- """Create an exchange-specific WebSocket handler.
-
- Args:
- exchange_id: Exchange identifier (e.g. 'binance')
- **kwargs: Passed to the WebSocket constructor (symbols, timeframe, on_candle)
-
- Returns:
- WebSocket handler instance
-
- Raises:
- ValueError: If exchange is not supported for WebSocket streaming
- """
- ws_cls = SUPPORTED_WS.get(exchange_id)
- if ws_cls is None:
- supported = ", ".join(sorted(SUPPORTED_WS.keys()))
- raise ValueError(
- f"WebSocket streaming not supported for '{exchange_id}'. "
- f"Supported: {supported}. "
- f"Use REST polling as fallback for unsupported exchanges."
- )
- return ws_cls(**kwargs)