summaryrefslogtreecommitdiff
path: root/cli/src/trading_cli/commands/data.py
blob: 2810a07df22e7c18ae986fe273e537e3f4c28a15 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import asyncio
import sys
from pathlib import Path

import click
from rich.console import Console
from rich.table import Table

_ROOT = Path(__file__).resolve().parents[5]


@click.group()
def data():
    """Data collection and management commands."""
    pass


@data.command()
@click.option("--symbol", required=True, help="Trading symbol (e.g. BTCUSDT)")
@click.option("--timeframe", default="1m", show_default=True, help="Candle timeframe")
def collect(symbol, timeframe):
    """Start collecting live market data for a symbol."""
    click.echo(f"To collect live data for {symbol} at {timeframe}, run the data-collector service:")
    click.echo()
    click.echo("  docker compose up -d data-collector")
    click.echo()
    click.echo("Or run directly:")
    click.echo()
    click.echo(f"  cd {_ROOT / 'services' / 'data-collector'}")
    click.echo("  python -m data_collector.main")


@data.command()
@click.option("--symbol", required=True, help="Trading symbol (e.g. BTCUSDT)")
@click.option("--timeframe", default="1m", show_default=True, help="Candle timeframe")
@click.option("--from", "since", default=None, help="Start date (ISO format)")
@click.option("--limit", default=1000, show_default=True, help="Number of candles to fetch")
def history(symbol, timeframe, since, limit):
    """Download historical market data for a symbol."""
    sys.path.insert(0, str(_ROOT / "services" / "data-collector" / "src"))

    try:
        from data_collector.binance_rest import fetch_historical_candles
        from shared.db import Database
        from shared.config import Settings
    except ImportError as e:
        click.echo(f"Error: Could not import required modules: {e}", err=True)
        sys.exit(1)

    async def _fetch():
        import ccxt.async_support as ccxt
        from datetime import datetime, timezone

        settings = Settings()
        db = Database(settings.database_url)
        await db.connect()

        # Parse the since date to a timestamp in ms
        if since:
            try:
                dt = datetime.fromisoformat(since).replace(tzinfo=timezone.utc)
                since_ms = int(dt.timestamp() * 1000)
            except ValueError:
                click.echo(
                    f"Error: Invalid date format '{since}'. Use ISO format (e.g. 2024-01-01).",
                    err=True,
                )
                sys.exit(1)
        else:
            # Default: fetch from 1000 candles ago (approximate)
            since_ms = None

        # Normalize symbol for ccxt (BTCUSDT -> BTC/USDT)
        ccxt_symbol = symbol
        if "/" not in symbol and "USDT" in symbol:
            base = symbol.replace("USDT", "")
            ccxt_symbol = f"{base}/USDT"

        exchange = ccxt.binance(
            {
                "apiKey": settings.binance_api_key,
                "secret": settings.binance_api_secret,
            }
        )

        try:
            kwargs = {"limit": limit}
            if since_ms is not None:
                kwargs["since"] = since_ms

            candles = await fetch_historical_candles(exchange, ccxt_symbol, timeframe, **kwargs)

            count = 0
            for candle in candles:
                await db.insert_candle(candle)
                count += 1

            click.echo(f"Saved {count} candles for {symbol} ({timeframe}) to database.")
        except Exception as e:
            click.echo(f"Error fetching candles: {e}", err=True)
            sys.exit(1)
        finally:
            await exchange.close()
            await db.close()

    asyncio.run(_fetch())


@data.command("list")
def list_():
    """List available data streams and symbols."""
    try:
        from shared.db import Database
        from shared.config import Settings
        from shared.sa_models import CandleRow
        from sqlalchemy import select, func
    except ImportError as e:
        click.echo(f"Error: Could not import required modules: {e}", err=True)
        sys.exit(1)

    async def _list():
        settings = Settings()
        db = Database(settings.database_url)
        await db.connect()
        try:
            stmt = (
                select(
                    CandleRow.symbol,
                    CandleRow.timeframe,
                    func.count().label("count"),
                    func.min(CandleRow.open_time).label("earliest"),
                    func.max(CandleRow.open_time).label("latest"),
                )
                .group_by(CandleRow.symbol, CandleRow.timeframe)
                .order_by(CandleRow.symbol, CandleRow.timeframe)
            )
            async with db.get_session() as session:
                result = await session.execute(stmt)
                rows = result.all()

            if not rows:
                click.echo("No data collected yet.")
                return

            console = Console()
            table = Table(title="Collected Data", show_header=True, header_style="bold cyan")
            table.add_column("Symbol", style="bold")
            table.add_column("Timeframe")
            table.add_column("Candles", justify="right")
            table.add_column("From")
            table.add_column("To")

            for row in rows:
                table.add_row(
                    row.symbol,
                    row.timeframe,
                    str(row.count),
                    row.earliest.strftime("%Y-%m-%d %H:%M") if row.earliest else "-",
                    row.latest.strftime("%Y-%m-%d %H:%M") if row.latest else "-",
                )

            console.print(table)
        finally:
            await db.close()

    asyncio.run(_list())