summaryrefslogtreecommitdiff
path: root/cli/src/trading_cli/commands/data.py
diff options
context:
space:
mode:
Diffstat (limited to 'cli/src/trading_cli/commands/data.py')
-rw-r--r--cli/src/trading_cli/commands/data.py144
1 files changed, 137 insertions, 7 deletions
diff --git a/cli/src/trading_cli/commands/data.py b/cli/src/trading_cli/commands/data.py
index 25d1693..5c6f274 100644
--- a/cli/src/trading_cli/commands/data.py
+++ b/cli/src/trading_cli/commands/data.py
@@ -1,4 +1,12 @@
+import asyncio
+import sys
+from pathlib import Path
+
import click
+from rich.console import Console
+from rich.table import Table
+
+_ROOT = Path(__file__).resolve().parents[5]
@click.group()
@@ -12,7 +20,16 @@ def data():
@click.option("--timeframe", default="1m", show_default=True, help="Candle timeframe")
def collect(symbol, timeframe):
"""Start collecting live market data for a symbol."""
- click.echo(f"Starting data collection for {symbol} at {timeframe} timeframe...")
+ click.echo(
+ f"To collect live data for {symbol} at {timeframe}, run the data-collector service:"
+ )
+ click.echo()
+ click.echo(" docker compose up -d data-collector")
+ click.echo()
+ click.echo("Or run directly:")
+ click.echo()
+ click.echo(f" cd {_ROOT / 'services' / 'data-collector'}")
+ click.echo(" python -m data_collector.main")
@data.command()
@@ -22,14 +39,127 @@ def collect(symbol, timeframe):
@click.option("--limit", default=1000, show_default=True, help="Number of candles to fetch")
def history(symbol, timeframe, since, limit):
"""Download historical market data for a symbol."""
- click.echo(
- f"Downloading {limit} {timeframe} candles for {symbol}"
- + (f" since {since}" if since else "")
- + "..."
- )
+ sys.path.insert(0, str(_ROOT / "services" / "data-collector" / "src"))
+
+ try:
+ from data_collector.binance_rest import fetch_historical_candles
+ from shared.db import Database
+ from shared.config import Settings
+ except ImportError as e:
+ click.echo(f"Error: Could not import required modules: {e}", err=True)
+ sys.exit(1)
+
+ async def _fetch():
+ import ccxt.async_support as ccxt
+ from datetime import datetime, timezone
+
+ settings = Settings()
+ db = Database(settings.database_url)
+ await db.connect()
+
+ # Parse the since date to a timestamp in ms
+ if since:
+ try:
+ dt = datetime.fromisoformat(since).replace(tzinfo=timezone.utc)
+ since_ms = int(dt.timestamp() * 1000)
+ except ValueError:
+ click.echo(f"Error: Invalid date format '{since}'. Use ISO format (e.g. 2024-01-01).", err=True)
+ sys.exit(1)
+ else:
+ # Default: fetch from 1000 candles ago (approximate)
+ since_ms = None
+
+ # Normalize symbol for ccxt (BTCUSDT -> BTC/USDT)
+ ccxt_symbol = symbol
+ if "/" not in symbol and "USDT" in symbol:
+ base = symbol.replace("USDT", "")
+ ccxt_symbol = f"{base}/USDT"
+
+ exchange = ccxt.binance({
+ "apiKey": settings.binance_api_key,
+ "secret": settings.binance_api_secret,
+ })
+
+ try:
+ kwargs = {"limit": limit}
+ if since_ms is not None:
+ kwargs["since"] = since_ms
+
+ candles = await fetch_historical_candles(
+ exchange, ccxt_symbol, timeframe, **kwargs
+ )
+
+ count = 0
+ for candle in candles:
+ await db.insert_candle(candle)
+ count += 1
+
+ click.echo(f"Saved {count} candles for {symbol} ({timeframe}) to database.")
+ except Exception as e:
+ click.echo(f"Error fetching candles: {e}", err=True)
+ sys.exit(1)
+ finally:
+ await exchange.close()
+ await db.close()
+
+ asyncio.run(_fetch())
@data.command("list")
def list_():
"""List available data streams and symbols."""
- click.echo("Fetching available data streams and collected symbols...")
+ try:
+ from shared.db import Database
+ from shared.config import Settings
+ from shared.sa_models import CandleRow
+ from sqlalchemy import select, func
+ except ImportError as e:
+ click.echo(f"Error: Could not import required modules: {e}", err=True)
+ sys.exit(1)
+
+ async def _list():
+ settings = Settings()
+ db = Database(settings.database_url)
+ await db.connect()
+ try:
+ stmt = (
+ select(
+ CandleRow.symbol,
+ CandleRow.timeframe,
+ func.count().label("count"),
+ func.min(CandleRow.open_time).label("earliest"),
+ func.max(CandleRow.open_time).label("latest"),
+ )
+ .group_by(CandleRow.symbol, CandleRow.timeframe)
+ .order_by(CandleRow.symbol, CandleRow.timeframe)
+ )
+ async with db.get_session() as session:
+ result = await session.execute(stmt)
+ rows = result.all()
+
+ if not rows:
+ click.echo("No data collected yet.")
+ return
+
+ console = Console()
+ table = Table(title="Collected Data", show_header=True, header_style="bold cyan")
+ table.add_column("Symbol", style="bold")
+ table.add_column("Timeframe")
+ table.add_column("Candles", justify="right")
+ table.add_column("From")
+ table.add_column("To")
+
+ for row in rows:
+ table.add_row(
+ row.symbol,
+ row.timeframe,
+ str(row.count),
+ row.earliest.strftime("%Y-%m-%d %H:%M") if row.earliest else "-",
+ row.latest.strftime("%Y-%m-%d %H:%M") if row.latest else "-",
+ )
+
+ console.print(table)
+ finally:
+ await db.close()
+
+ asyncio.run(_list())