1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
"""Data Collector Service — fetches US stock data from Alpaca."""
import asyncio
import aiohttp
from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import CandleEvent
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.models import Candle
from shared.notifier import TelegramNotifier
from shared.shutdown import GracefulShutdown
from data_collector.config import CollectorConfig
# Health check port: base + 0
HEALTH_PORT_OFFSET = 0
async def fetch_latest_bars(
alpaca: AlpacaClient,
symbols: list[str],
timeframe: str,
log,
) -> list[Candle]:
"""Fetch latest bar for each symbol from Alpaca."""
candles = []
for symbol in symbols:
try:
bars = await alpaca.get_bars(symbol, timeframe=timeframe, limit=1)
if bars:
bar = bars[-1]
from datetime import datetime
from decimal import Decimal
candle = Candle(
symbol=symbol,
timeframe=timeframe,
open_time=datetime.fromisoformat(bar["t"].replace("Z", "+00:00")),
open=Decimal(str(bar["o"])),
high=Decimal(str(bar["h"])),
low=Decimal(str(bar["l"])),
close=Decimal(str(bar["c"])),
volume=Decimal(str(bar["v"])),
)
candles.append(candle)
except (aiohttp.ClientError, ConnectionError, TimeoutError, asyncio.TimeoutError) as exc:
log.warning("fetch_bar_network_error", symbol=symbol, error=str(exc))
except (ValueError, KeyError, TypeError) as exc:
log.warning("fetch_bar_parse_error", symbol=symbol, error=str(exc))
return candles
async def run() -> None:
config = CollectorConfig()
log = setup_logging("data-collector", config.log_level, config.log_format)
metrics = ServiceMetrics("data_collector")
notifier = TelegramNotifier(
bot_token=config.telegram_bot_token.get_secret_value(),
chat_id=config.telegram_chat_id,
)
db = Database(config.database_url.get_secret_value())
await db.connect()
broker = RedisBroker(config.redis_url.get_secret_value())
alpaca = AlpacaClient(
api_key=config.alpaca_api_key.get_secret_value(),
api_secret=config.alpaca_api_secret.get_secret_value(),
paper=config.alpaca_paper,
)
health = HealthCheckServer(
"data-collector",
port=config.health_port + HEALTH_PORT_OFFSET,
auth_token=config.metrics_auth_token,
)
await health.start()
metrics.service_up.labels(service="data-collector").set(1)
poll_interval = int(getattr(config, "poll_interval_seconds", 60))
symbols = config.symbols
timeframe = config.timeframes[0] if config.timeframes else "1Day"
shutdown = GracefulShutdown()
shutdown.install_handlers()
log.info("starting", symbols=symbols, timeframe=timeframe, poll_interval=poll_interval)
try:
while not shutdown.is_shutting_down:
# Check if market is open
try:
is_open = await alpaca.is_market_open()
except (aiohttp.ClientError, ConnectionError, TimeoutError, asyncio.TimeoutError):
is_open = False
if is_open:
candles = await fetch_latest_bars(alpaca, symbols, timeframe, log)
for candle in candles:
await db.insert_candle(candle)
event = CandleEvent(data=candle)
stream = f"candles.{candle.symbol}"
await broker.publish(stream, event.to_dict())
metrics.events_processed.labels(
service="data-collector", event_type="candle"
).inc()
log.info("candle_stored", symbol=candle.symbol, close=str(candle.close))
else:
log.debug("market_closed")
await asyncio.sleep(poll_interval)
except Exception as exc:
log.error("fatal_error", error=str(exc), exc_info=True)
await notifier.send_error(str(exc), "data-collector")
raise
finally:
metrics.service_up.labels(service="data-collector").set(0)
await notifier.close()
await alpaca.close()
await broker.close()
await db.close()
def main() -> None:
asyncio.run(run())
if __name__ == "__main__":
main()
|