diff options
Diffstat (limited to 'cli/src/trading_cli/commands/data.py')
| -rw-r--r-- | cli/src/trading_cli/commands/data.py | 144 |
1 files changed, 137 insertions, 7 deletions
diff --git a/cli/src/trading_cli/commands/data.py b/cli/src/trading_cli/commands/data.py index 25d1693..5c6f274 100644 --- a/cli/src/trading_cli/commands/data.py +++ b/cli/src/trading_cli/commands/data.py @@ -1,4 +1,12 @@ +import asyncio +import sys +from pathlib import Path + import click +from rich.console import Console +from rich.table import Table + +_ROOT = Path(__file__).resolve().parents[5] @click.group() @@ -12,7 +20,16 @@ def data(): @click.option("--timeframe", default="1m", show_default=True, help="Candle timeframe") def collect(symbol, timeframe): """Start collecting live market data for a symbol.""" - click.echo(f"Starting data collection for {symbol} at {timeframe} timeframe...") + click.echo( + f"To collect live data for {symbol} at {timeframe}, run the data-collector service:" + ) + click.echo() + click.echo(" docker compose up -d data-collector") + click.echo() + click.echo("Or run directly:") + click.echo() + click.echo(f" cd {_ROOT / 'services' / 'data-collector'}") + click.echo(" python -m data_collector.main") @data.command() @@ -22,14 +39,127 @@ def collect(symbol, timeframe): @click.option("--limit", default=1000, show_default=True, help="Number of candles to fetch") def history(symbol, timeframe, since, limit): """Download historical market data for a symbol.""" - click.echo( - f"Downloading {limit} {timeframe} candles for {symbol}" - + (f" since {since}" if since else "") - + "..." - ) + sys.path.insert(0, str(_ROOT / "services" / "data-collector" / "src")) + + try: + from data_collector.binance_rest import fetch_historical_candles + from shared.db import Database + from shared.config import Settings + except ImportError as e: + click.echo(f"Error: Could not import required modules: {e}", err=True) + sys.exit(1) + + async def _fetch(): + import ccxt.async_support as ccxt + from datetime import datetime, timezone + + settings = Settings() + db = Database(settings.database_url) + await db.connect() + + # Parse the since date to a timestamp in ms + if since: + try: + dt = datetime.fromisoformat(since).replace(tzinfo=timezone.utc) + since_ms = int(dt.timestamp() * 1000) + except ValueError: + click.echo(f"Error: Invalid date format '{since}'. Use ISO format (e.g. 2024-01-01).", err=True) + sys.exit(1) + else: + # Default: fetch from 1000 candles ago (approximate) + since_ms = None + + # Normalize symbol for ccxt (BTCUSDT -> BTC/USDT) + ccxt_symbol = symbol + if "/" not in symbol and "USDT" in symbol: + base = symbol.replace("USDT", "") + ccxt_symbol = f"{base}/USDT" + + exchange = ccxt.binance({ + "apiKey": settings.binance_api_key, + "secret": settings.binance_api_secret, + }) + + try: + kwargs = {"limit": limit} + if since_ms is not None: + kwargs["since"] = since_ms + + candles = await fetch_historical_candles( + exchange, ccxt_symbol, timeframe, **kwargs + ) + + count = 0 + for candle in candles: + await db.insert_candle(candle) + count += 1 + + click.echo(f"Saved {count} candles for {symbol} ({timeframe}) to database.") + except Exception as e: + click.echo(f"Error fetching candles: {e}", err=True) + sys.exit(1) + finally: + await exchange.close() + await db.close() + + asyncio.run(_fetch()) @data.command("list") def list_(): """List available data streams and symbols.""" - click.echo("Fetching available data streams and collected symbols...") + try: + from shared.db import Database + from shared.config import Settings + from shared.sa_models import CandleRow + from sqlalchemy import select, func + except ImportError as e: + click.echo(f"Error: Could not import required modules: {e}", err=True) + sys.exit(1) + + async def _list(): + settings = Settings() + db = Database(settings.database_url) + await db.connect() + try: + stmt = ( + select( + CandleRow.symbol, + CandleRow.timeframe, + func.count().label("count"), + func.min(CandleRow.open_time).label("earliest"), + func.max(CandleRow.open_time).label("latest"), + ) + .group_by(CandleRow.symbol, CandleRow.timeframe) + .order_by(CandleRow.symbol, CandleRow.timeframe) + ) + async with db.get_session() as session: + result = await session.execute(stmt) + rows = result.all() + + if not rows: + click.echo("No data collected yet.") + return + + console = Console() + table = Table(title="Collected Data", show_header=True, header_style="bold cyan") + table.add_column("Symbol", style="bold") + table.add_column("Timeframe") + table.add_column("Candles", justify="right") + table.add_column("From") + table.add_column("To") + + for row in rows: + table.add_row( + row.symbol, + row.timeframe, + str(row.count), + row.earliest.strftime("%Y-%m-%d %H:%M") if row.earliest else "-", + row.latest.strftime("%Y-%m-%d %H:%M") if row.latest else "-", + ) + + console.print(table) + finally: + await db.close() + + asyncio.run(_list()) |
