diff options
Diffstat (limited to 'services/data-collector/src/data_collector/main.py')
| -rw-r--r-- | services/data-collector/src/data_collector/main.py | 111 |
1 files changed, 73 insertions, 38 deletions
diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py index eebe14a..38f8759 100644 --- a/services/data-collector/src/data_collector/main.py +++ b/services/data-collector/src/data_collector/main.py @@ -1,59 +1,73 @@ -"""Data Collector Service entry point.""" - +"""Data Collector Service — fetches US stock data from Alpaca.""" import asyncio +from shared.alpaca import AlpacaClient from shared.broker import RedisBroker +from shared.config import Settings from shared.db import Database +from shared.events import CandleEvent from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics +from shared.models import Candle from shared.notifier import TelegramNotifier from data_collector.config import CollectorConfig -from data_collector.storage import CandleStorage -from data_collector.ws_factory import create_websocket - -# Health check port: base (HEALTH_PORT, default 8080) + offset -# data-collector: +0 (8080), strategy-engine: +1 (8081) -# order-executor: +2 (8082), portfolio-manager: +3 (8083) +# Health check port: base + 0 HEALTH_PORT_OFFSET = 0 +async def fetch_latest_bars( + alpaca: AlpacaClient, + symbols: list[str], + timeframe: str, + log, +) -> list[Candle]: + """Fetch latest bar for each symbol from Alpaca.""" + candles = [] + for symbol in symbols: + try: + bars = await alpaca.get_bars(symbol, timeframe=timeframe, limit=1) + if bars: + bar = bars[-1] + from datetime import datetime + from decimal import Decimal + candle = Candle( + symbol=symbol, + timeframe=timeframe, + open_time=datetime.fromisoformat(bar["t"].replace("Z", "+00:00")), + open=Decimal(str(bar["o"])), + high=Decimal(str(bar["h"])), + low=Decimal(str(bar["l"])), + close=Decimal(str(bar["c"])), + volume=Decimal(str(bar["v"])), + ) + candles.append(candle) + except Exception as exc: + log.warning("fetch_bar_failed", symbol=symbol, error=str(exc)) + return candles + + async def run() -> None: - """Initialise all components and start the WebSocket collector.""" config = CollectorConfig() log = setup_logging("data-collector", config.log_level, config.log_format) metrics = ServiceMetrics("data_collector") + notifier = TelegramNotifier( - bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id + bot_token=config.telegram_bot_token, + chat_id=config.telegram_chat_id, ) db = Database(config.database_url) await db.connect() - await db.init_tables() broker = RedisBroker(config.redis_url) - storage = CandleStorage(db=db, broker=broker) - - async def on_candle(candle): - log.info( - "candle_received", - symbol=candle.symbol, - timeframe=candle.timeframe, - open_time=str(candle.open_time), - ) - await storage.store(candle) - metrics.events_processed.labels(service="data-collector", event_type="candle").inc() - - # Use the first configured timeframe for the WebSocket subscription. - timeframe = config.timeframes[0] if config.timeframes else "1m" - - ws = create_websocket( - exchange_id=config.exchange_id, - symbols=config.symbols, - timeframe=timeframe, - on_candle=on_candle, + + alpaca = AlpacaClient( + api_key=config.alpaca_api_key, + api_secret=config.alpaca_api_secret, + paper=config.alpaca_paper, ) health = HealthCheckServer( @@ -61,18 +75,38 @@ async def run() -> None: port=config.health_port + HEALTH_PORT_OFFSET, auth_token=config.metrics_auth_token, ) - health.register_check("redis", broker.ping) await health.start() metrics.service_up.labels(service="data-collector").set(1) - log.info( - "service_started", - symbols=config.symbols, - timeframe=timeframe, - ) + poll_interval = int(getattr(config, "poll_interval_seconds", 60)) + symbols = config.symbols + timeframe = config.timeframes[0] if config.timeframes else "1Day" + + log.info("starting", symbols=symbols, timeframe=timeframe, poll_interval=poll_interval) try: - await ws.start() + while True: + # Check if market is open + try: + is_open = await alpaca.is_market_open() + except Exception: + is_open = False + + if is_open: + candles = await fetch_latest_bars(alpaca, symbols, timeframe, log) + for candle in candles: + await db.insert_candle(candle) + event = CandleEvent(data=candle) + stream = f"candles.{candle.symbol}" + await broker.publish(stream, event.to_dict()) + metrics.events_processed.labels( + service="data-collector", event_type="candle" + ).inc() + log.info("candle_stored", symbol=candle.symbol, close=str(candle.close)) + else: + log.debug("market_closed") + + await asyncio.sleep(poll_interval) except Exception as exc: log.error("fatal_error", error=str(exc)) await notifier.send_error(str(exc), "data-collector") @@ -80,6 +114,7 @@ async def run() -> None: finally: metrics.service_up.labels(service="data-collector").set(0) await notifier.close() + await alpaca.close() await broker.close() await db.close() |
