1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
|
import asyncio
import sys
from pathlib import Path
import click
from rich.console import Console
from rich.table import Table
_ROOT = Path(__file__).resolve().parents[5]
@click.group()
def data():
"""Data collection and management commands."""
pass
@data.command()
@click.option("--symbol", required=True, help="Trading symbol (e.g. BTCUSDT)")
@click.option("--timeframe", default="1m", show_default=True, help="Candle timeframe")
def collect(symbol, timeframe):
"""Start collecting live market data for a symbol."""
click.echo(
f"To collect live data for {symbol} at {timeframe}, run the data-collector service:"
)
click.echo()
click.echo(" docker compose up -d data-collector")
click.echo()
click.echo("Or run directly:")
click.echo()
click.echo(f" cd {_ROOT / 'services' / 'data-collector'}")
click.echo(" python -m data_collector.main")
@data.command()
@click.option("--symbol", required=True, help="Trading symbol (e.g. BTCUSDT)")
@click.option("--timeframe", default="1m", show_default=True, help="Candle timeframe")
@click.option("--from", "since", default=None, help="Start date (ISO format)")
@click.option("--limit", default=1000, show_default=True, help="Number of candles to fetch")
def history(symbol, timeframe, since, limit):
"""Download historical market data for a symbol."""
sys.path.insert(0, str(_ROOT / "services" / "data-collector" / "src"))
try:
from data_collector.binance_rest import fetch_historical_candles
from shared.db import Database
from shared.config import Settings
except ImportError as e:
click.echo(f"Error: Could not import required modules: {e}", err=True)
sys.exit(1)
async def _fetch():
import ccxt.async_support as ccxt
from datetime import datetime, timezone
settings = Settings()
db = Database(settings.database_url)
await db.connect()
# Parse the since date to a timestamp in ms
if since:
try:
dt = datetime.fromisoformat(since).replace(tzinfo=timezone.utc)
since_ms = int(dt.timestamp() * 1000)
except ValueError:
click.echo(f"Error: Invalid date format '{since}'. Use ISO format (e.g. 2024-01-01).", err=True)
sys.exit(1)
else:
# Default: fetch from 1000 candles ago (approximate)
since_ms = None
# Normalize symbol for ccxt (BTCUSDT -> BTC/USDT)
ccxt_symbol = symbol
if "/" not in symbol and "USDT" in symbol:
base = symbol.replace("USDT", "")
ccxt_symbol = f"{base}/USDT"
exchange = ccxt.binance({
"apiKey": settings.binance_api_key,
"secret": settings.binance_api_secret,
})
try:
kwargs = {"limit": limit}
if since_ms is not None:
kwargs["since"] = since_ms
candles = await fetch_historical_candles(
exchange, ccxt_symbol, timeframe, **kwargs
)
count = 0
for candle in candles:
await db.insert_candle(candle)
count += 1
click.echo(f"Saved {count} candles for {symbol} ({timeframe}) to database.")
except Exception as e:
click.echo(f"Error fetching candles: {e}", err=True)
sys.exit(1)
finally:
await exchange.close()
await db.close()
asyncio.run(_fetch())
@data.command("list")
def list_():
"""List available data streams and symbols."""
try:
from shared.db import Database
from shared.config import Settings
from shared.sa_models import CandleRow
from sqlalchemy import select, func
except ImportError as e:
click.echo(f"Error: Could not import required modules: {e}", err=True)
sys.exit(1)
async def _list():
settings = Settings()
db = Database(settings.database_url)
await db.connect()
try:
stmt = (
select(
CandleRow.symbol,
CandleRow.timeframe,
func.count().label("count"),
func.min(CandleRow.open_time).label("earliest"),
func.max(CandleRow.open_time).label("latest"),
)
.group_by(CandleRow.symbol, CandleRow.timeframe)
.order_by(CandleRow.symbol, CandleRow.timeframe)
)
async with db.get_session() as session:
result = await session.execute(stmt)
rows = result.all()
if not rows:
click.echo("No data collected yet.")
return
console = Console()
table = Table(title="Collected Data", show_header=True, header_style="bold cyan")
table.add_column("Symbol", style="bold")
table.add_column("Timeframe")
table.add_column("Candles", justify="right")
table.add_column("From")
table.add_column("To")
for row in rows:
table.add_row(
row.symbol,
row.timeframe,
str(row.count),
row.earliest.strftime("%Y-%m-%d %H:%M") if row.earliest else "-",
row.latest.strftime("%Y-%m-%d %H:%M") if row.latest else "-",
)
console.print(table)
finally:
await db.close()
asyncio.run(_list())
|