"""Data Collector Service — fetches US stock data from Alpaca.""" import asyncio import aiohttp from shared.alpaca import AlpacaClient from shared.broker import RedisBroker from shared.db import Database from shared.events import CandleEvent from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.models import Candle from shared.notifier import TelegramNotifier from shared.shutdown import GracefulShutdown from data_collector.config import CollectorConfig # Health check port: base + 0 HEALTH_PORT_OFFSET = 0 async def fetch_latest_bars( alpaca: AlpacaClient, symbols: list[str], timeframe: str, log, ) -> list[Candle]: """Fetch latest bar for each symbol from Alpaca.""" candles = [] for symbol in symbols: try: bars = await alpaca.get_bars(symbol, timeframe=timeframe, limit=1) if bars: bar = bars[-1] from datetime import datetime from decimal import Decimal candle = Candle( symbol=symbol, timeframe=timeframe, open_time=datetime.fromisoformat(bar["t"].replace("Z", "+00:00")), open=Decimal(str(bar["o"])), high=Decimal(str(bar["h"])), low=Decimal(str(bar["l"])), close=Decimal(str(bar["c"])), volume=Decimal(str(bar["v"])), ) candles.append(candle) except (aiohttp.ClientError, ConnectionError, TimeoutError, asyncio.TimeoutError) as exc: log.warning("fetch_bar_network_error", symbol=symbol, error=str(exc)) except (ValueError, KeyError, TypeError) as exc: log.warning("fetch_bar_parse_error", symbol=symbol, error=str(exc)) return candles async def run() -> None: config = CollectorConfig() log = setup_logging("data-collector", config.log_level, config.log_format) metrics = ServiceMetrics("data_collector") notifier = TelegramNotifier( bot_token=config.telegram_bot_token.get_secret_value(), chat_id=config.telegram_chat_id, ) db = Database(config.database_url.get_secret_value()) await db.connect() broker = RedisBroker(config.redis_url.get_secret_value()) alpaca = AlpacaClient( api_key=config.alpaca_api_key.get_secret_value(), api_secret=config.alpaca_api_secret.get_secret_value(), paper=config.alpaca_paper, ) health = HealthCheckServer( "data-collector", port=config.health_port + HEALTH_PORT_OFFSET, auth_token=config.metrics_auth_token, ) await health.start() metrics.service_up.labels(service="data-collector").set(1) poll_interval = int(getattr(config, "poll_interval_seconds", 60)) symbols = config.symbols timeframe = config.timeframes[0] if config.timeframes else "1Day" shutdown = GracefulShutdown() shutdown.install_handlers() log.info("starting", symbols=symbols, timeframe=timeframe, poll_interval=poll_interval) try: while not shutdown.is_shutting_down: # Check if market is open try: is_open = await alpaca.is_market_open() except (aiohttp.ClientError, ConnectionError, TimeoutError, asyncio.TimeoutError): is_open = False if is_open: candles = await fetch_latest_bars(alpaca, symbols, timeframe, log) for candle in candles: await db.insert_candle(candle) event = CandleEvent(data=candle) stream = f"candles.{candle.symbol}" await broker.publish(stream, event.to_dict()) metrics.events_processed.labels( service="data-collector", event_type="candle" ).inc() log.info("candle_stored", symbol=candle.symbol, close=str(candle.close)) else: log.debug("market_closed") await asyncio.sleep(poll_interval) except Exception as exc: log.error("fatal_error", error=str(exc), exc_info=True) await notifier.send_error(str(exc), "data-collector") raise finally: metrics.service_up.labels(service="data-collector").set(0) await notifier.close() await alpaca.close() await broker.close() await db.close() def main() -> None: asyncio.run(run()) if __name__ == "__main__": main()