summaryrefslogtreecommitdiff
path: root/services/data-collector/src/data_collector/storage.py
blob: aeeaaedb90a7e56629481788e59d76638207689a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
"""Candle storage: persists to DB and publishes to Redis."""

from shared.events import CandleEvent
from shared.models import Candle


class CandleStorage:
    """Stores candles in the database and publishes CandleEvents to Redis."""

    def __init__(self, db, broker) -> None:
        self._db = db
        self._broker = broker

    async def store(self, candle: Candle) -> None:
        """Insert candle into DB and publish a CandleEvent to the Redis stream."""
        await self._db.insert_candle(candle)

        event = CandleEvent(data=candle)
        stream = f"candles.{candle.symbol}"
        await self._broker.publish(stream, event.to_dict())

    async def store_batch(self, candles: list[Candle]) -> None:
        """Store multiple candles one by one."""
        for candle in candles:
            await self.store(candle)