From 33b14aaa2344b0fd95d1629627c3d135b24ae102 Mon Sep 17 00:00:00 2001 From: TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> Date: Wed, 1 Apr 2026 15:56:35 +0900 Subject: feat: initial trading platform implementation Binance spot crypto trading platform with microservices architecture: - shared: Pydantic models, Redis Streams broker, asyncpg DB layer - data-collector: Binance WebSocket/REST market data collection - strategy-engine: Plugin-based strategy execution (RSI, Grid) - order-executor: Order execution with risk management - portfolio-manager: Position tracking and PnL calculation - backtester: Historical strategy testing with simulator - cli: Click-based CLI for all operations - Docker Compose orchestration with Redis and PostgreSQL - 24 test files covering all modules --- services/data-collector/src/data_collector/main.py | 58 ++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 services/data-collector/src/data_collector/main.py (limited to 'services/data-collector/src/data_collector/main.py') diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py new file mode 100644 index 0000000..adf1e96 --- /dev/null +++ b/services/data-collector/src/data_collector/main.py @@ -0,0 +1,58 @@ +"""Data Collector Service entry point.""" +import asyncio +import logging + +from shared.broker import RedisBroker +from shared.db import Database + +from data_collector.binance_ws import BinanceWebSocket +from data_collector.config import CollectorConfig +from data_collector.storage import CandleStorage + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +async def run() -> None: + """Initialise all components and start the WebSocket collector.""" + config = CollectorConfig() + + db = Database(config.database_url) + await db.connect() + await db.init_tables() + + broker = RedisBroker(config.redis_url) + storage = CandleStorage(db=db, broker=broker) + + async def on_candle(candle): + logger.info("Candle received: %s %s %s", candle.symbol, candle.timeframe, candle.open_time) + await storage.store(candle) + + # Use the first configured timeframe for the WebSocket subscription. + timeframe = config.timeframes[0] if config.timeframes else "1m" + + ws = BinanceWebSocket( + symbols=config.symbols, + timeframe=timeframe, + on_candle=on_candle, + ) + + logger.info( + "Starting data collector for symbols=%s timeframe=%s", + config.symbols, + timeframe, + ) + + try: + await ws.start() + finally: + await broker.close() + await db.close() + + +def main() -> None: + asyncio.run(run()) + + +if __name__ == "__main__": + main() -- cgit v1.2.3