summaryrefslogtreecommitdiff
path: root/services/data-collector/src/data_collector/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'services/data-collector/src/data_collector/main.py')
-rw-r--r--services/data-collector/src/data_collector/main.py58
1 files changed, 58 insertions, 0 deletions
diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py
new file mode 100644
index 0000000..adf1e96
--- /dev/null
+++ b/services/data-collector/src/data_collector/main.py
@@ -0,0 +1,58 @@
+"""Data Collector Service entry point."""
+import asyncio
+import logging
+
+from shared.broker import RedisBroker
+from shared.db import Database
+
+from data_collector.binance_ws import BinanceWebSocket
+from data_collector.config import CollectorConfig
+from data_collector.storage import CandleStorage
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+
+async def run() -> None:
+ """Initialise all components and start the WebSocket collector."""
+ config = CollectorConfig()
+
+ db = Database(config.database_url)
+ await db.connect()
+ await db.init_tables()
+
+ broker = RedisBroker(config.redis_url)
+ storage = CandleStorage(db=db, broker=broker)
+
+ async def on_candle(candle):
+ logger.info("Candle received: %s %s %s", candle.symbol, candle.timeframe, candle.open_time)
+ await storage.store(candle)
+
+ # Use the first configured timeframe for the WebSocket subscription.
+ timeframe = config.timeframes[0] if config.timeframes else "1m"
+
+ ws = BinanceWebSocket(
+ symbols=config.symbols,
+ timeframe=timeframe,
+ on_candle=on_candle,
+ )
+
+ logger.info(
+ "Starting data collector for symbols=%s timeframe=%s",
+ config.symbols,
+ timeframe,
+ )
+
+ try:
+ await ws.start()
+ finally:
+ await broker.close()
+ await db.close()
+
+
+def main() -> None:
+ asyncio.run(run())
+
+
+if __name__ == "__main__":
+ main()