"""Data Collector Service — fetches US stock data from Alpaca.""" import asyncio from shared.alpaca import AlpacaClient from shared.broker import RedisBroker from shared.db import Database from shared.events import CandleEvent from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.models import Candle from shared.notifier import TelegramNotifier from data_collector.config import CollectorConfig # Health check port: base + 0 HEALTH_PORT_OFFSET = 0 async def fetch_latest_bars( alpaca: AlpacaClient, symbols: list[str], timeframe: str, log, ) -> list[Candle]: """Fetch latest bar for each symbol from Alpaca.""" candles = [] for symbol in symbols: try: bars = await alpaca.get_bars(symbol, timeframe=timeframe, limit=1) if bars: bar = bars[-1] from datetime import datetime from decimal import Decimal candle = Candle( symbol=symbol, timeframe=timeframe, open_time=datetime.fromisoformat(bar["t"].replace("Z", "+00:00")), open=Decimal(str(bar["o"])), high=Decimal(str(bar["h"])), low=Decimal(str(bar["l"])), close=Decimal(str(bar["c"])), volume=Decimal(str(bar["v"])), ) candles.append(candle) except Exception as exc: log.warning("fetch_bar_failed", symbol=symbol, error=str(exc)) return candles async def run() -> None: config = CollectorConfig() log = setup_logging("data-collector", config.log_level, config.log_format) metrics = ServiceMetrics("data_collector") notifier = TelegramNotifier( bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id, ) db = Database(config.database_url) await db.connect() broker = RedisBroker(config.redis_url) alpaca = AlpacaClient( api_key=config.alpaca_api_key, api_secret=config.alpaca_api_secret, paper=config.alpaca_paper, ) health = HealthCheckServer( "data-collector", port=config.health_port + HEALTH_PORT_OFFSET, auth_token=config.metrics_auth_token, ) await health.start() metrics.service_up.labels(service="data-collector").set(1) poll_interval = int(getattr(config, "poll_interval_seconds", 60)) symbols = config.symbols timeframe = config.timeframes[0] if config.timeframes else "1Day" log.info("starting", symbols=symbols, timeframe=timeframe, poll_interval=poll_interval) try: while True: # Check if market is open try: is_open = await alpaca.is_market_open() except Exception: is_open = False if is_open: candles = await fetch_latest_bars(alpaca, symbols, timeframe, log) for candle in candles: await db.insert_candle(candle) event = CandleEvent(data=candle) stream = f"candles.{candle.symbol}" await broker.publish(stream, event.to_dict()) metrics.events_processed.labels( service="data-collector", event_type="candle" ).inc() log.info("candle_stored", symbol=candle.symbol, close=str(candle.close)) else: log.debug("market_closed") await asyncio.sleep(poll_interval) except Exception as exc: log.error("fatal_error", error=str(exc)) await notifier.send_error(str(exc), "data-collector") raise finally: metrics.service_up.labels(service="data-collector").set(0) await notifier.close() await alpaca.close() await broker.close() await db.close() def main() -> None: asyncio.run(run()) if __name__ == "__main__": main()