summaryrefslogtreecommitdiff
path: root/services/data-collector/src/data_collector/main.py
blob: 2d44848e9a7d5247aadb3c30fa88e60834d3e5c7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""Data Collector Service — fetches US stock data from Alpaca."""

import asyncio

import aiohttp

from data_collector.config import CollectorConfig
from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import CandleEvent
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.models import Candle
from shared.notifier import TelegramNotifier
from shared.shutdown import GracefulShutdown

# Health check port: base + 0
HEALTH_PORT_OFFSET = 0


async def fetch_latest_bars(
    alpaca: AlpacaClient,
    symbols: list[str],
    timeframe: str,
    log,
) -> list[Candle]:
    """Fetch latest bar for each symbol from Alpaca."""
    candles = []
    for symbol in symbols:
        try:
            bars = await alpaca.get_bars(symbol, timeframe=timeframe, limit=1)
            if bars:
                bar = bars[-1]
                from datetime import datetime
                from decimal import Decimal

                candle = Candle(
                    symbol=symbol,
                    timeframe=timeframe,
                    open_time=datetime.fromisoformat(bar["t"].replace("Z", "+00:00")),
                    open=Decimal(str(bar["o"])),
                    high=Decimal(str(bar["h"])),
                    low=Decimal(str(bar["l"])),
                    close=Decimal(str(bar["c"])),
                    volume=Decimal(str(bar["v"])),
                )
                candles.append(candle)
        except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc:
            log.warning("fetch_bar_network_error", symbol=symbol, error=str(exc))
        except (ValueError, KeyError, TypeError) as exc:
            log.warning("fetch_bar_parse_error", symbol=symbol, error=str(exc))
    return candles


async def run() -> None:
    config = CollectorConfig()
    log = setup_logging("data-collector", config.log_level, config.log_format)
    metrics = ServiceMetrics("data_collector")

    notifier = TelegramNotifier(
        bot_token=config.telegram_bot_token.get_secret_value(),
        chat_id=config.telegram_chat_id,
    )

    db = Database(config.database_url.get_secret_value())
    await db.connect()

    broker = RedisBroker(config.redis_url.get_secret_value())

    alpaca = AlpacaClient(
        api_key=config.alpaca_api_key.get_secret_value(),
        api_secret=config.alpaca_api_secret.get_secret_value(),
        paper=config.alpaca_paper,
    )

    health = HealthCheckServer(
        "data-collector",
        port=config.health_port + HEALTH_PORT_OFFSET,
        auth_token=config.metrics_auth_token,
    )
    await health.start()
    metrics.service_up.labels(service="data-collector").set(1)

    poll_interval = int(getattr(config, "poll_interval_seconds", 60))
    symbols = config.symbols
    timeframe = config.timeframes[0] if config.timeframes else "1Day"

    shutdown = GracefulShutdown()
    shutdown.install_handlers()

    log.info("starting", symbols=symbols, timeframe=timeframe, poll_interval=poll_interval)

    try:
        while not shutdown.is_shutting_down:
            # Check if market is open
            try:
                is_open = await alpaca.is_market_open()
            except (aiohttp.ClientError, ConnectionError, TimeoutError):
                is_open = False

            if is_open:
                candles = await fetch_latest_bars(alpaca, symbols, timeframe, log)
                for candle in candles:
                    await db.insert_candle(candle)
                    event = CandleEvent(data=candle)
                    stream = f"candles.{candle.symbol}"
                    await broker.publish(stream, event.to_dict())
                    metrics.events_processed.labels(
                        service="data-collector", event_type="candle"
                    ).inc()
                    log.info("candle_stored", symbol=candle.symbol, close=str(candle.close))
            else:
                log.debug("market_closed")

            await asyncio.sleep(poll_interval)
    except Exception as exc:
        log.error("fatal_error", error=str(exc), exc_info=True)
        await notifier.send_error(str(exc), "data-collector")
        raise
    finally:
        metrics.service_up.labels(service="data-collector").set(0)
        await notifier.close()
        await alpaca.close()
        await broker.close()
        await db.close()


def main() -> None:
    asyncio.run(run())


if __name__ == "__main__":
    main()