diff options
Diffstat (limited to 'services/data-collector')
| -rw-r--r-- | services/data-collector/Dockerfile | 9 | ||||
| -rw-r--r-- | services/data-collector/src/data_collector/main.py | 31 | ||||
| -rw-r--r-- | services/data-collector/tests/test_storage.py | 9 |
3 files changed, 32 insertions, 17 deletions
diff --git a/services/data-collector/Dockerfile b/services/data-collector/Dockerfile index 8cb8af4..4d154c5 100644 --- a/services/data-collector/Dockerfile +++ b/services/data-collector/Dockerfile @@ -1,8 +1,15 @@ -FROM python:3.12-slim +FROM python:3.12-slim AS builder WORKDIR /app COPY shared/ shared/ RUN pip install --no-cache-dir ./shared COPY services/data-collector/ services/data-collector/ RUN pip install --no-cache-dir ./services/data-collector + +FROM python:3.12-slim +RUN useradd -r -s /bin/false appuser +WORKDIR /app +COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages +COPY --from=builder /usr/local/bin /usr/local/bin ENV PYTHONPATH=/app +USER appuser CMD ["python", "-m", "data_collector.main"] diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py index b42b34c..2d44848 100644 --- a/services/data-collector/src/data_collector/main.py +++ b/services/data-collector/src/data_collector/main.py @@ -2,6 +2,9 @@ import asyncio +import aiohttp + +from data_collector.config import CollectorConfig from shared.alpaca import AlpacaClient from shared.broker import RedisBroker from shared.db import Database @@ -11,8 +14,7 @@ from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.models import Candle from shared.notifier import TelegramNotifier - -from data_collector.config import CollectorConfig +from shared.shutdown import GracefulShutdown # Health check port: base + 0 HEALTH_PORT_OFFSET = 0 @@ -45,8 +47,10 @@ async def fetch_latest_bars( volume=Decimal(str(bar["v"])), ) candles.append(candle) - except Exception as exc: - log.warning("fetch_bar_failed", symbol=symbol, error=str(exc)) + except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc: + log.warning("fetch_bar_network_error", symbol=symbol, error=str(exc)) + except (ValueError, KeyError, TypeError) as exc: + log.warning("fetch_bar_parse_error", symbol=symbol, error=str(exc)) return candles @@ -56,18 +60,18 @@ async def run() -> None: metrics = ServiceMetrics("data_collector") notifier = TelegramNotifier( - bot_token=config.telegram_bot_token, + bot_token=config.telegram_bot_token.get_secret_value(), chat_id=config.telegram_chat_id, ) - db = Database(config.database_url) + db = Database(config.database_url.get_secret_value()) await db.connect() - broker = RedisBroker(config.redis_url) + broker = RedisBroker(config.redis_url.get_secret_value()) alpaca = AlpacaClient( - api_key=config.alpaca_api_key, - api_secret=config.alpaca_api_secret, + api_key=config.alpaca_api_key.get_secret_value(), + api_secret=config.alpaca_api_secret.get_secret_value(), paper=config.alpaca_paper, ) @@ -83,14 +87,17 @@ async def run() -> None: symbols = config.symbols timeframe = config.timeframes[0] if config.timeframes else "1Day" + shutdown = GracefulShutdown() + shutdown.install_handlers() + log.info("starting", symbols=symbols, timeframe=timeframe, poll_interval=poll_interval) try: - while True: + while not shutdown.is_shutting_down: # Check if market is open try: is_open = await alpaca.is_market_open() - except Exception: + except (aiohttp.ClientError, ConnectionError, TimeoutError): is_open = False if is_open: @@ -109,7 +116,7 @@ async def run() -> None: await asyncio.sleep(poll_interval) except Exception as exc: - log.error("fatal_error", error=str(exc)) + log.error("fatal_error", error=str(exc), exc_info=True) await notifier.send_error(str(exc), "data-collector") raise finally: diff --git a/services/data-collector/tests/test_storage.py b/services/data-collector/tests/test_storage.py index ffffa40..51f3aee 100644 --- a/services/data-collector/tests/test_storage.py +++ b/services/data-collector/tests/test_storage.py @@ -1,19 +1,20 @@ """Tests for storage module.""" -import pytest +from datetime import UTC, datetime from decimal import Decimal -from datetime import datetime, timezone from unittest.mock import AsyncMock, MagicMock -from shared.models import Candle +import pytest from data_collector.storage import CandleStorage +from shared.models import Candle + def _make_candle(symbol: str = "AAPL") -> Candle: return Candle( symbol=symbol, timeframe="1m", - open_time=datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc), + open_time=datetime(2024, 1, 1, 0, 0, 0, tzinfo=UTC), open=Decimal("30000"), high=Decimal("30100"), low=Decimal("29900"), |
