summaryrefslogtreecommitdiff
path: root/services/data-collector/src/data_collector
diff options
context:
space:
mode:
Diffstat (limited to 'services/data-collector/src/data_collector')
-rw-r--r--services/data-collector/src/data_collector/config.py6
-rw-r--r--services/data-collector/src/data_collector/main.py111
2 files changed, 77 insertions, 40 deletions
diff --git a/services/data-collector/src/data_collector/config.py b/services/data-collector/src/data_collector/config.py
index 1e080e5..4761013 100644
--- a/services/data-collector/src/data_collector/config.py
+++ b/services/data-collector/src/data_collector/config.py
@@ -1,6 +1,8 @@
+"""Data Collector configuration."""
from shared.config import Settings
class CollectorConfig(Settings):
- symbols: list[str] = ["BTC/USDT"]
- timeframes: list[str] = ["1m"]
+ symbols: list[str] = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA"]
+ timeframes: list[str] = ["5Min"]
+ poll_interval_seconds: int = 60
diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py
index eebe14a..38f8759 100644
--- a/services/data-collector/src/data_collector/main.py
+++ b/services/data-collector/src/data_collector/main.py
@@ -1,59 +1,73 @@
-"""Data Collector Service entry point."""
-
+"""Data Collector Service — fetches US stock data from Alpaca."""
import asyncio
+from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
+from shared.config import Settings
from shared.db import Database
+from shared.events import CandleEvent
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
+from shared.models import Candle
from shared.notifier import TelegramNotifier
from data_collector.config import CollectorConfig
-from data_collector.storage import CandleStorage
-from data_collector.ws_factory import create_websocket
-
-# Health check port: base (HEALTH_PORT, default 8080) + offset
-# data-collector: +0 (8080), strategy-engine: +1 (8081)
-# order-executor: +2 (8082), portfolio-manager: +3 (8083)
+# Health check port: base + 0
HEALTH_PORT_OFFSET = 0
+async def fetch_latest_bars(
+ alpaca: AlpacaClient,
+ symbols: list[str],
+ timeframe: str,
+ log,
+) -> list[Candle]:
+ """Fetch latest bar for each symbol from Alpaca."""
+ candles = []
+ for symbol in symbols:
+ try:
+ bars = await alpaca.get_bars(symbol, timeframe=timeframe, limit=1)
+ if bars:
+ bar = bars[-1]
+ from datetime import datetime
+ from decimal import Decimal
+ candle = Candle(
+ symbol=symbol,
+ timeframe=timeframe,
+ open_time=datetime.fromisoformat(bar["t"].replace("Z", "+00:00")),
+ open=Decimal(str(bar["o"])),
+ high=Decimal(str(bar["h"])),
+ low=Decimal(str(bar["l"])),
+ close=Decimal(str(bar["c"])),
+ volume=Decimal(str(bar["v"])),
+ )
+ candles.append(candle)
+ except Exception as exc:
+ log.warning("fetch_bar_failed", symbol=symbol, error=str(exc))
+ return candles
+
+
async def run() -> None:
- """Initialise all components and start the WebSocket collector."""
config = CollectorConfig()
log = setup_logging("data-collector", config.log_level, config.log_format)
metrics = ServiceMetrics("data_collector")
+
notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id
+ bot_token=config.telegram_bot_token,
+ chat_id=config.telegram_chat_id,
)
db = Database(config.database_url)
await db.connect()
- await db.init_tables()
broker = RedisBroker(config.redis_url)
- storage = CandleStorage(db=db, broker=broker)
-
- async def on_candle(candle):
- log.info(
- "candle_received",
- symbol=candle.symbol,
- timeframe=candle.timeframe,
- open_time=str(candle.open_time),
- )
- await storage.store(candle)
- metrics.events_processed.labels(service="data-collector", event_type="candle").inc()
-
- # Use the first configured timeframe for the WebSocket subscription.
- timeframe = config.timeframes[0] if config.timeframes else "1m"
-
- ws = create_websocket(
- exchange_id=config.exchange_id,
- symbols=config.symbols,
- timeframe=timeframe,
- on_candle=on_candle,
+
+ alpaca = AlpacaClient(
+ api_key=config.alpaca_api_key,
+ api_secret=config.alpaca_api_secret,
+ paper=config.alpaca_paper,
)
health = HealthCheckServer(
@@ -61,18 +75,38 @@ async def run() -> None:
port=config.health_port + HEALTH_PORT_OFFSET,
auth_token=config.metrics_auth_token,
)
- health.register_check("redis", broker.ping)
await health.start()
metrics.service_up.labels(service="data-collector").set(1)
- log.info(
- "service_started",
- symbols=config.symbols,
- timeframe=timeframe,
- )
+ poll_interval = int(getattr(config, "poll_interval_seconds", 60))
+ symbols = config.symbols
+ timeframe = config.timeframes[0] if config.timeframes else "1Day"
+
+ log.info("starting", symbols=symbols, timeframe=timeframe, poll_interval=poll_interval)
try:
- await ws.start()
+ while True:
+ # Check if market is open
+ try:
+ is_open = await alpaca.is_market_open()
+ except Exception:
+ is_open = False
+
+ if is_open:
+ candles = await fetch_latest_bars(alpaca, symbols, timeframe, log)
+ for candle in candles:
+ await db.insert_candle(candle)
+ event = CandleEvent(data=candle)
+ stream = f"candles.{candle.symbol}"
+ await broker.publish(stream, event.to_dict())
+ metrics.events_processed.labels(
+ service="data-collector", event_type="candle"
+ ).inc()
+ log.info("candle_stored", symbol=candle.symbol, close=str(candle.close))
+ else:
+ log.debug("market_closed")
+
+ await asyncio.sleep(poll_interval)
except Exception as exc:
log.error("fatal_error", error=str(exc))
await notifier.send_error(str(exc), "data-collector")
@@ -80,6 +114,7 @@ async def run() -> None:
finally:
metrics.service_up.labels(service="data-collector").set(0)
await notifier.close()
+ await alpaca.close()
await broker.close()
await db.close()