summaryrefslogtreecommitdiff
path: root/services/data-collector/src/data_collector/storage.py
diff options
context:
space:
mode:
Diffstat (limited to 'services/data-collector/src/data_collector/storage.py')
-rw-r--r--services/data-collector/src/data_collector/storage.py24
1 files changed, 24 insertions, 0 deletions
diff --git a/services/data-collector/src/data_collector/storage.py b/services/data-collector/src/data_collector/storage.py
new file mode 100644
index 0000000..1e40b82
--- /dev/null
+++ b/services/data-collector/src/data_collector/storage.py
@@ -0,0 +1,24 @@
+"""Candle storage: persists to DB and publishes to Redis."""
+from shared.events import CandleEvent
+from shared.models import Candle
+
+
+class CandleStorage:
+ """Stores candles in the database and publishes CandleEvents to Redis."""
+
+ def __init__(self, db, broker) -> None:
+ self._db = db
+ self._broker = broker
+
+ async def store(self, candle: Candle) -> None:
+ """Insert candle into DB and publish a CandleEvent to the Redis stream."""
+ await self._db.insert_candle(candle)
+
+ event = CandleEvent(data=candle)
+ stream = f"candles.{candle.symbol}"
+ await self._broker.publish(stream, event.to_dict())
+
+ async def store_batch(self, candles: list[Candle]) -> None:
+ """Store multiple candles one by one."""
+ for candle in candles:
+ await self.store(candle)