blob: aeeaaedb90a7e56629481788e59d76638207689a (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
"""Candle storage: persists to DB and publishes to Redis."""
from shared.events import CandleEvent
from shared.models import Candle
class CandleStorage:
"""Stores candles in the database and publishes CandleEvents to Redis."""
def __init__(self, db, broker) -> None:
self._db = db
self._broker = broker
async def store(self, candle: Candle) -> None:
"""Insert candle into DB and publish a CandleEvent to the Redis stream."""
await self._db.insert_candle(candle)
event = CandleEvent(data=candle)
stream = f"candles.{candle.symbol}"
await self._broker.publish(stream, event.to_dict())
async def store_batch(self, candles: list[Candle]) -> None:
"""Store multiple candles one by one."""
for candle in candles:
await self.store(candle)
|