summaryrefslogtreecommitdiff
path: root/services/data-collector/src/data_collector/main.py
blob: 608d6cde7584f25b5f3bdf5c637a13809bc462ed (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""Data Collector Service — fetches US stock data from Alpaca."""

import asyncio

from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import CandleEvent
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.models import Candle
from shared.notifier import TelegramNotifier

from data_collector.config import CollectorConfig

# Health check port: base + 0
HEALTH_PORT_OFFSET = 0


async def fetch_latest_bars(
    alpaca: AlpacaClient,
    symbols: list[str],
    timeframe: str,
    log,
) -> list[Candle]:
    """Fetch latest bar for each symbol from Alpaca."""
    candles = []
    for symbol in symbols:
        try:
            bars = await alpaca.get_bars(symbol, timeframe=timeframe, limit=1)
            if bars:
                bar = bars[-1]
                from datetime import datetime
                from decimal import Decimal

                candle = Candle(
                    symbol=symbol,
                    timeframe=timeframe,
                    open_time=datetime.fromisoformat(bar["t"].replace("Z", "+00:00")),
                    open=Decimal(str(bar["o"])),
                    high=Decimal(str(bar["h"])),
                    low=Decimal(str(bar["l"])),
                    close=Decimal(str(bar["c"])),
                    volume=Decimal(str(bar["v"])),
                )
                candles.append(candle)
        except Exception as exc:
            log.warning("fetch_bar_failed", symbol=symbol, error=str(exc))
    return candles


async def run() -> None:
    config = CollectorConfig()
    log = setup_logging("data-collector", config.log_level, config.log_format)
    metrics = ServiceMetrics("data_collector")

    notifier = TelegramNotifier(
        bot_token=config.telegram_bot_token.get_secret_value(),
        chat_id=config.telegram_chat_id,
    )

    db = Database(config.database_url.get_secret_value())
    await db.connect()

    broker = RedisBroker(config.redis_url.get_secret_value())

    alpaca = AlpacaClient(
        api_key=config.alpaca_api_key.get_secret_value(),
        api_secret=config.alpaca_api_secret.get_secret_value(),
        paper=config.alpaca_paper,
    )

    health = HealthCheckServer(
        "data-collector",
        port=config.health_port + HEALTH_PORT_OFFSET,
        auth_token=config.metrics_auth_token,
    )
    await health.start()
    metrics.service_up.labels(service="data-collector").set(1)

    poll_interval = int(getattr(config, "poll_interval_seconds", 60))
    symbols = config.symbols
    timeframe = config.timeframes[0] if config.timeframes else "1Day"

    log.info("starting", symbols=symbols, timeframe=timeframe, poll_interval=poll_interval)

    try:
        while True:
            # Check if market is open
            try:
                is_open = await alpaca.is_market_open()
            except Exception:
                is_open = False

            if is_open:
                candles = await fetch_latest_bars(alpaca, symbols, timeframe, log)
                for candle in candles:
                    await db.insert_candle(candle)
                    event = CandleEvent(data=candle)
                    stream = f"candles.{candle.symbol}"
                    await broker.publish(stream, event.to_dict())
                    metrics.events_processed.labels(
                        service="data-collector", event_type="candle"
                    ).inc()
                    log.info("candle_stored", symbol=candle.symbol, close=str(candle.close))
            else:
                log.debug("market_closed")

            await asyncio.sleep(poll_interval)
    except Exception as exc:
        log.error("fatal_error", error=str(exc))
        await notifier.send_error(str(exc), "data-collector")
        raise
    finally:
        metrics.service_up.labels(service="data-collector").set(0)
        await notifier.close()
        await alpaca.close()
        await broker.close()
        await db.close()


def main() -> None:
    asyncio.run(run())


if __name__ == "__main__":
    main()