import asyncio import sys from datetime import UTC from pathlib import Path import click from rich.console import Console from rich.table import Table _ROOT = Path(__file__).resolve().parents[5] @click.group() def data(): """Data collection and management commands.""" pass @data.command() @click.option("--symbol", required=True, help="Trading symbol (e.g. AAPL)") @click.option("--timeframe", default="1m", show_default=True, help="Candle timeframe") def collect(symbol, timeframe): """Start collecting live stock market data for a symbol.""" click.echo(f"To collect live data for {symbol} at {timeframe}, run the data-collector service:") click.echo() click.echo(" docker compose up -d data-collector") click.echo() click.echo("Or run directly:") click.echo() click.echo(f" cd {_ROOT / 'services' / 'data-collector'}") click.echo(" python -m data_collector.main") @data.command() @click.option("--symbol", required=True, help="Trading symbol (e.g. AAPL)") @click.option("--timeframe", default="1Day", show_default=True, help="Bar timeframe") @click.option("--from", "since", default=None, help="Start date (ISO format)") @click.option("--limit", default=1000, show_default=True, help="Number of bars to fetch") def history(symbol, timeframe, since, limit): """Download historical stock market data for a symbol.""" try: from shared.alpaca_client import AlpacaClient from shared.config import Settings from shared.db import Database except ImportError as e: click.echo(f"Error: Could not import required modules: {e}", err=True) sys.exit(1) async def _fetch(): from datetime import datetime settings = Settings() db = Database(settings.database_url.get_secret_value()) await db.connect() start = None if since: try: start = datetime.fromisoformat(since).replace(tzinfo=UTC) except ValueError: click.echo( f"Error: Invalid date format '{since}'. Use ISO format (e.g. 2024-01-01).", err=True, ) sys.exit(1) client = AlpacaClient( api_key=settings.alpaca_api_key.get_secret_value(), api_secret=settings.alpaca_api_secret.get_secret_value(), base_url=getattr(settings, "alpaca_base_url", "https://paper-api.alpaca.markets"), ) try: candles = await client.get_historical_bars( symbol=symbol, timeframe=timeframe, start=start, limit=limit, ) count = 0 for candle in candles: await db.insert_candle(candle) count += 1 click.echo(f"Saved {count} bars for {symbol} ({timeframe}) to database.") except Exception as e: click.echo(f"Error fetching bars: {e}", err=True) sys.exit(1) finally: await client.close() await db.close() asyncio.run(_fetch()) @data.command("list") def list_(): """List available data streams and symbols.""" try: from sqlalchemy import func, select from shared.config import Settings from shared.db import Database from shared.sa_models import CandleRow except ImportError as e: click.echo(f"Error: Could not import required modules: {e}", err=True) sys.exit(1) async def _list(): settings = Settings() db = Database(settings.database_url.get_secret_value()) await db.connect() try: stmt = ( select( CandleRow.symbol, CandleRow.timeframe, func.count().label("count"), func.min(CandleRow.open_time).label("earliest"), func.max(CandleRow.open_time).label("latest"), ) .group_by(CandleRow.symbol, CandleRow.timeframe) .order_by(CandleRow.symbol, CandleRow.timeframe) ) async with db.get_session() as session: result = await session.execute(stmt) rows = result.all() if not rows: click.echo("No data collected yet.") return console = Console() table = Table(title="Collected Data", show_header=True, header_style="bold cyan") table.add_column("Symbol", style="bold") table.add_column("Timeframe") table.add_column("Candles", justify="right") table.add_column("From") table.add_column("To") for row in rows: table.add_row( row.symbol, row.timeframe, str(row.count), row.earliest.strftime("%Y-%m-%d %H:%M") if row.earliest else "-", row.latest.strftime("%Y-%m-%d %H:%M") if row.latest else "-", ) console.print(table) finally: await db.close() asyncio.run(_list())