1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
import asyncio
import sys
from datetime import UTC
from pathlib import Path
import click
from rich.console import Console
from rich.table import Table
_ROOT = Path(__file__).resolve().parents[5]
@click.group()
def data():
"""Data collection and management commands."""
pass
@data.command()
@click.option("--symbol", required=True, help="Trading symbol (e.g. AAPL)")
@click.option("--timeframe", default="1m", show_default=True, help="Candle timeframe")
def collect(symbol, timeframe):
"""Start collecting live stock market data for a symbol."""
click.echo(f"To collect live data for {symbol} at {timeframe}, run the data-collector service:")
click.echo()
click.echo(" docker compose up -d data-collector")
click.echo()
click.echo("Or run directly:")
click.echo()
click.echo(f" cd {_ROOT / 'services' / 'data-collector'}")
click.echo(" python -m data_collector.main")
@data.command()
@click.option("--symbol", required=True, help="Trading symbol (e.g. AAPL)")
@click.option("--timeframe", default="1Day", show_default=True, help="Bar timeframe")
@click.option("--from", "since", default=None, help="Start date (ISO format)")
@click.option("--limit", default=1000, show_default=True, help="Number of bars to fetch")
def history(symbol, timeframe, since, limit):
"""Download historical stock market data for a symbol."""
try:
from shared.alpaca_client import AlpacaClient
from shared.config import Settings
from shared.db import Database
except ImportError as e:
click.echo(f"Error: Could not import required modules: {e}", err=True)
sys.exit(1)
async def _fetch():
from datetime import datetime
settings = Settings()
db = Database(settings.database_url.get_secret_value())
await db.connect()
start = None
if since:
try:
start = datetime.fromisoformat(since).replace(tzinfo=UTC)
except ValueError:
click.echo(
f"Error: Invalid date format '{since}'. Use ISO format (e.g. 2024-01-01).",
err=True,
)
sys.exit(1)
client = AlpacaClient(
api_key=settings.alpaca_api_key.get_secret_value(),
api_secret=settings.alpaca_api_secret.get_secret_value(),
base_url=getattr(settings, "alpaca_base_url", "https://paper-api.alpaca.markets"),
)
try:
candles = await client.get_historical_bars(
symbol=symbol,
timeframe=timeframe,
start=start,
limit=limit,
)
count = 0
for candle in candles:
await db.insert_candle(candle)
count += 1
click.echo(f"Saved {count} bars for {symbol} ({timeframe}) to database.")
except Exception as e:
click.echo(f"Error fetching bars: {e}", err=True)
sys.exit(1)
finally:
await client.close()
await db.close()
asyncio.run(_fetch())
@data.command("list")
def list_():
"""List available data streams and symbols."""
try:
from sqlalchemy import func, select
from shared.config import Settings
from shared.db import Database
from shared.sa_models import CandleRow
except ImportError as e:
click.echo(f"Error: Could not import required modules: {e}", err=True)
sys.exit(1)
async def _list():
settings = Settings()
db = Database(settings.database_url.get_secret_value())
await db.connect()
try:
stmt = (
select(
CandleRow.symbol,
CandleRow.timeframe,
func.count().label("count"),
func.min(CandleRow.open_time).label("earliest"),
func.max(CandleRow.open_time).label("latest"),
)
.group_by(CandleRow.symbol, CandleRow.timeframe)
.order_by(CandleRow.symbol, CandleRow.timeframe)
)
async with db.get_session() as session:
result = await session.execute(stmt)
rows = result.all()
if not rows:
click.echo("No data collected yet.")
return
console = Console()
table = Table(title="Collected Data", show_header=True, header_style="bold cyan")
table.add_column("Symbol", style="bold")
table.add_column("Timeframe")
table.add_column("Candles", justify="right")
table.add_column("From")
table.add_column("To")
for row in rows:
table.add_row(
row.symbol,
row.timeframe,
str(row.count),
row.earliest.strftime("%Y-%m-%d %H:%M") if row.earliest else "-",
row.latest.strftime("%Y-%m-%d %H:%M") if row.latest else "-",
)
console.print(table)
finally:
await db.close()
asyncio.run(_list())
|