import asyncio import sys from pathlib import Path import click from rich.console import Console from rich.table import Table _ROOT = Path(__file__).resolve().parents[5] @click.group() def data(): """Data collection and management commands.""" pass @data.command() @click.option("--symbol", required=True, help="Trading symbol (e.g. BTCUSDT)") @click.option("--timeframe", default="1m", show_default=True, help="Candle timeframe") def collect(symbol, timeframe): """Start collecting live market data for a symbol.""" click.echo( f"To collect live data for {symbol} at {timeframe}, run the data-collector service:" ) click.echo() click.echo(" docker compose up -d data-collector") click.echo() click.echo("Or run directly:") click.echo() click.echo(f" cd {_ROOT / 'services' / 'data-collector'}") click.echo(" python -m data_collector.main") @data.command() @click.option("--symbol", required=True, help="Trading symbol (e.g. BTCUSDT)") @click.option("--timeframe", default="1m", show_default=True, help="Candle timeframe") @click.option("--from", "since", default=None, help="Start date (ISO format)") @click.option("--limit", default=1000, show_default=True, help="Number of candles to fetch") def history(symbol, timeframe, since, limit): """Download historical market data for a symbol.""" sys.path.insert(0, str(_ROOT / "services" / "data-collector" / "src")) try: from data_collector.binance_rest import fetch_historical_candles from shared.db import Database from shared.config import Settings except ImportError as e: click.echo(f"Error: Could not import required modules: {e}", err=True) sys.exit(1) async def _fetch(): import ccxt.async_support as ccxt from datetime import datetime, timezone settings = Settings() db = Database(settings.database_url) await db.connect() # Parse the since date to a timestamp in ms if since: try: dt = datetime.fromisoformat(since).replace(tzinfo=timezone.utc) since_ms = int(dt.timestamp() * 1000) except ValueError: click.echo(f"Error: Invalid date format '{since}'. Use ISO format (e.g. 2024-01-01).", err=True) sys.exit(1) else: # Default: fetch from 1000 candles ago (approximate) since_ms = None # Normalize symbol for ccxt (BTCUSDT -> BTC/USDT) ccxt_symbol = symbol if "/" not in symbol and "USDT" in symbol: base = symbol.replace("USDT", "") ccxt_symbol = f"{base}/USDT" exchange = ccxt.binance({ "apiKey": settings.binance_api_key, "secret": settings.binance_api_secret, }) try: kwargs = {"limit": limit} if since_ms is not None: kwargs["since"] = since_ms candles = await fetch_historical_candles( exchange, ccxt_symbol, timeframe, **kwargs ) count = 0 for candle in candles: await db.insert_candle(candle) count += 1 click.echo(f"Saved {count} candles for {symbol} ({timeframe}) to database.") except Exception as e: click.echo(f"Error fetching candles: {e}", err=True) sys.exit(1) finally: await exchange.close() await db.close() asyncio.run(_fetch()) @data.command("list") def list_(): """List available data streams and symbols.""" try: from shared.db import Database from shared.config import Settings from shared.sa_models import CandleRow from sqlalchemy import select, func except ImportError as e: click.echo(f"Error: Could not import required modules: {e}", err=True) sys.exit(1) async def _list(): settings = Settings() db = Database(settings.database_url) await db.connect() try: stmt = ( select( CandleRow.symbol, CandleRow.timeframe, func.count().label("count"), func.min(CandleRow.open_time).label("earliest"), func.max(CandleRow.open_time).label("latest"), ) .group_by(CandleRow.symbol, CandleRow.timeframe) .order_by(CandleRow.symbol, CandleRow.timeframe) ) async with db.get_session() as session: result = await session.execute(stmt) rows = result.all() if not rows: click.echo("No data collected yet.") return console = Console() table = Table(title="Collected Data", show_header=True, header_style="bold cyan") table.add_column("Symbol", style="bold") table.add_column("Timeframe") table.add_column("Candles", justify="right") table.add_column("From") table.add_column("To") for row in rows: table.add_row( row.symbol, row.timeframe, str(row.count), row.earliest.strftime("%Y-%m-%d %H:%M") if row.earliest else "-", row.latest.strftime("%Y-%m-%d %H:%M") if row.latest else "-", ) console.print(table) finally: await db.close() asyncio.run(_list())