"""Data Collector Service — fetches US stock data from Alpaca.""" import asyncio import aiohttp from data_collector.config import CollectorConfig from shared.alpaca import AlpacaClient from shared.broker import RedisBroker from shared.db import Database from shared.events import CandleEvent from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.models import Candle from shared.notifier import TelegramNotifier from shared.shutdown import GracefulShutdown # Health check port: base + 0 HEALTH_PORT_OFFSET = 0 async def fetch_latest_bars( alpaca: AlpacaClient, symbols: list[str], timeframe: str, log, ) -> list[Candle]: """Fetch latest bar for each symbol from Alpaca.""" candles = [] for symbol in symbols: try: bars = await alpaca.get_bars(symbol, timeframe=timeframe, limit=1) if bars: bar = bars[-1] from datetime import datetime from decimal import Decimal candle = Candle( symbol=symbol, timeframe=timeframe, open_time=datetime.fromisoformat(bar["t"].replace("Z", "+00:00")), open=Decimal(str(bar["o"])), high=Decimal(str(bar["h"])), low=Decimal(str(bar["l"])), close=Decimal(str(bar["c"])), volume=Decimal(str(bar["v"])), ) candles.append(candle) except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc: log.warning("fetch_bar_network_error", symbol=symbol, error=str(exc)) except (ValueError, KeyError, TypeError) as exc: log.warning("fetch_bar_parse_error", symbol=symbol, error=str(exc)) return candles async def run() -> None: config = CollectorConfig() log = setup_logging("data-collector", config.log_level, config.log_format) metrics = ServiceMetrics("data_collector") notifier = TelegramNotifier( bot_token=config.telegram_bot_token.get_secret_value(), chat_id=config.telegram_chat_id, ) db = Database(config.database_url.get_secret_value()) await db.connect() broker = RedisBroker(config.redis_url.get_secret_value()) alpaca = AlpacaClient( api_key=config.alpaca_api_key.get_secret_value(), api_secret=config.alpaca_api_secret.get_secret_value(), paper=config.alpaca_paper, ) health = HealthCheckServer( "data-collector", port=config.health_port + HEALTH_PORT_OFFSET, auth_token=config.metrics_auth_token, ) await health.start() metrics.service_up.labels(service="data-collector").set(1) poll_interval = int(getattr(config, "poll_interval_seconds", 60)) symbols = config.symbols timeframe = config.timeframes[0] if config.timeframes else "1Day" shutdown = GracefulShutdown() shutdown.install_handlers() log.info("starting", symbols=symbols, timeframe=timeframe, poll_interval=poll_interval) try: while not shutdown.is_shutting_down: # Check if market is open try: is_open = await alpaca.is_market_open() except (aiohttp.ClientError, ConnectionError, TimeoutError): is_open = False if is_open: candles = await fetch_latest_bars(alpaca, symbols, timeframe, log) for candle in candles: await db.insert_candle(candle) event = CandleEvent(data=candle) stream = f"candles.{candle.symbol}" await broker.publish(stream, event.to_dict()) metrics.events_processed.labels( service="data-collector", event_type="candle" ).inc() log.info("candle_stored", symbol=candle.symbol, close=str(candle.close)) else: log.debug("market_closed") await asyncio.sleep(poll_interval) except Exception as exc: log.error("fatal_error", error=str(exc), exc_info=True) await notifier.send_error(str(exc), "data-collector") raise finally: metrics.service_up.labels(service="data-collector").set(0) await notifier.close() await alpaca.close() await broker.close() await db.close() def main() -> None: asyncio.run(run()) if __name__ == "__main__": main()