From 33b14aaa2344b0fd95d1629627c3d135b24ae102 Mon Sep 17 00:00:00 2001 From: TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> Date: Wed, 1 Apr 2026 15:56:35 +0900 Subject: feat: initial trading platform implementation Binance spot crypto trading platform with microservices architecture: - shared: Pydantic models, Redis Streams broker, asyncpg DB layer - data-collector: Binance WebSocket/REST market data collection - strategy-engine: Plugin-based strategy execution (RSI, Grid) - order-executor: Order execution with risk management - portfolio-manager: Position tracking and PnL calculation - backtester: Historical strategy testing with simulator - cli: Click-based CLI for all operations - Docker Compose orchestration with Redis and PostgreSQL - 24 test files covering all modules --- .../data-collector/src/data_collector/storage.py | 24 ++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 services/data-collector/src/data_collector/storage.py (limited to 'services/data-collector/src/data_collector/storage.py') diff --git a/services/data-collector/src/data_collector/storage.py b/services/data-collector/src/data_collector/storage.py new file mode 100644 index 0000000..1e40b82 --- /dev/null +++ b/services/data-collector/src/data_collector/storage.py @@ -0,0 +1,24 @@ +"""Candle storage: persists to DB and publishes to Redis.""" +from shared.events import CandleEvent +from shared.models import Candle + + +class CandleStorage: + """Stores candles in the database and publishes CandleEvents to Redis.""" + + def __init__(self, db, broker) -> None: + self._db = db + self._broker = broker + + async def store(self, candle: Candle) -> None: + """Insert candle into DB and publish a CandleEvent to the Redis stream.""" + await self._db.insert_candle(candle) + + event = CandleEvent(data=candle) + stream = f"candles.{candle.symbol}" + await self._broker.publish(stream, event.to_dict()) + + async def store_batch(self, candles: list[Candle]) -> None: + """Store multiple candles one by one.""" + for candle in candles: + await self.store(candle) -- cgit v1.2.3