diff options
Diffstat (limited to 'services/news-collector')
19 files changed, 93 insertions, 80 deletions
diff --git a/services/news-collector/Dockerfile b/services/news-collector/Dockerfile index a8e5902..7accee2 100644 --- a/services/news-collector/Dockerfile +++ b/services/news-collector/Dockerfile @@ -1,9 +1,17 @@ -FROM python:3.12-slim +FROM python:3.12-slim AS builder WORKDIR /app COPY shared/ shared/ RUN pip install --no-cache-dir ./shared COPY services/news-collector/ services/news-collector/ RUN pip install --no-cache-dir ./services/news-collector -RUN python -c "import nltk; nltk.download('vader_lexicon', quiet=True)" +RUN python -c "import nltk; nltk.download('vader_lexicon', download_dir='/usr/local/nltk_data')" + +FROM python:3.12-slim +RUN useradd -r -s /bin/false appuser +WORKDIR /app +COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages +COPY --from=builder /usr/local/bin /usr/local/bin +COPY --from=builder /usr/local/nltk_data /usr/local/nltk_data ENV PYTHONPATH=/app +USER appuser CMD ["python", "-m", "news_collector.main"] diff --git a/services/news-collector/pyproject.toml b/services/news-collector/pyproject.toml index 14c856a..6e62b70 100644 --- a/services/news-collector/pyproject.toml +++ b/services/news-collector/pyproject.toml @@ -3,12 +3,7 @@ name = "news-collector" version = "0.1.0" description = "News and sentiment data collector service" requires-python = ">=3.12" -dependencies = [ - "trading-shared", - "feedparser>=6.0", - "nltk>=3.8", - "aiohttp>=3.9", -] +dependencies = ["trading-shared", "feedparser>=6.0,<7", "nltk>=3.8,<4", "aiohttp>=3.9,<4"] [project.optional-dependencies] dev = [ diff --git a/services/news-collector/src/news_collector/collectors/fear_greed.py b/services/news-collector/src/news_collector/collectors/fear_greed.py index f79f716..42e8f88 100644 --- a/services/news-collector/src/news_collector/collectors/fear_greed.py +++ b/services/news-collector/src/news_collector/collectors/fear_greed.py @@ -2,7 +2,6 @@ import logging from dataclasses import dataclass -from typing import Optional import aiohttp @@ -26,7 +25,7 @@ class FearGreedCollector(BaseCollector): async def is_available(self) -> bool: return True - async def _fetch_index(self) -> Optional[dict]: + async def _fetch_index(self) -> dict | None: headers = {"User-Agent": "Mozilla/5.0"} try: async with aiohttp.ClientSession() as session: @@ -50,7 +49,7 @@ class FearGreedCollector(BaseCollector): return "Greed" return "Extreme Greed" - async def collect(self) -> Optional[FearGreedResult]: + async def collect(self) -> FearGreedResult | None: data = await self._fetch_index() if data is None: return None diff --git a/services/news-collector/src/news_collector/collectors/fed.py b/services/news-collector/src/news_collector/collectors/fed.py index fce4842..52128e5 100644 --- a/services/news-collector/src/news_collector/collectors/fed.py +++ b/services/news-collector/src/news_collector/collectors/fed.py @@ -3,7 +3,7 @@ import asyncio import logging from calendar import timegm -from datetime import datetime, timezone +from datetime import UTC, datetime import feedparser from nltk.sentiment.vader import SentimentIntensityAnalyzer @@ -76,10 +76,10 @@ class FedCollector(BaseCollector): if published_parsed: try: ts = timegm(published_parsed) - return datetime.fromtimestamp(ts, tz=timezone.utc) + return datetime.fromtimestamp(ts, tz=UTC) except Exception: pass - return datetime.now(timezone.utc) + return datetime.now(UTC) async def collect(self) -> list[NewsItem]: try: diff --git a/services/news-collector/src/news_collector/collectors/finnhub.py b/services/news-collector/src/news_collector/collectors/finnhub.py index 13e3602..67cb455 100644 --- a/services/news-collector/src/news_collector/collectors/finnhub.py +++ b/services/news-collector/src/news_collector/collectors/finnhub.py @@ -1,7 +1,7 @@ """Finnhub news collector with VADER sentiment analysis.""" import logging -from datetime import datetime, timezone +from datetime import UTC, datetime import aiohttp from nltk.sentiment.vader import SentimentIntensityAnalyzer @@ -64,7 +64,7 @@ class FinnhubCollector(BaseCollector): sentiment = sentiment_scores["compound"] ts = article.get("datetime", 0) - published_at = datetime.fromtimestamp(ts, tz=timezone.utc) + published_at = datetime.fromtimestamp(ts, tz=UTC) related = article.get("related", "") symbols = [t.strip() for t in related.split(",") if t.strip()] if related else [] diff --git a/services/news-collector/src/news_collector/collectors/reddit.py b/services/news-collector/src/news_collector/collectors/reddit.py index 226a2f9..4e9d6f5 100644 --- a/services/news-collector/src/news_collector/collectors/reddit.py +++ b/services/news-collector/src/news_collector/collectors/reddit.py @@ -2,7 +2,7 @@ import logging import re -from datetime import datetime, timezone +from datetime import UTC, datetime import aiohttp from nltk.sentiment.vader import SentimentIntensityAnalyzer @@ -78,7 +78,7 @@ class RedditCollector(BaseCollector): symbols = list(dict.fromkeys(_TICKER_PATTERN.findall(combined))) created_utc = post_data.get("created_utc", 0) - published_at = datetime.fromtimestamp(created_utc, tz=timezone.utc) + published_at = datetime.fromtimestamp(created_utc, tz=UTC) items.append( NewsItem( diff --git a/services/news-collector/src/news_collector/collectors/rss.py b/services/news-collector/src/news_collector/collectors/rss.py index ddf8503..bca0e9f 100644 --- a/services/news-collector/src/news_collector/collectors/rss.py +++ b/services/news-collector/src/news_collector/collectors/rss.py @@ -3,7 +3,7 @@ import asyncio import logging import re -from datetime import datetime, timezone +from datetime import UTC, datetime from time import mktime import feedparser @@ -56,10 +56,10 @@ class RSSCollector(BaseCollector): if parsed_time: try: ts = mktime(parsed_time) - return datetime.fromtimestamp(ts, tz=timezone.utc) + return datetime.fromtimestamp(ts, tz=UTC) except Exception: pass - return datetime.now(timezone.utc) + return datetime.now(UTC) async def collect(self) -> list[NewsItem]: try: diff --git a/services/news-collector/src/news_collector/collectors/sec_edgar.py b/services/news-collector/src/news_collector/collectors/sec_edgar.py index ca1d070..d88518f 100644 --- a/services/news-collector/src/news_collector/collectors/sec_edgar.py +++ b/services/news-collector/src/news_collector/collectors/sec_edgar.py @@ -1,13 +1,13 @@ """SEC EDGAR filing collector (free, no API key required).""" import logging -from datetime import datetime, timezone +from datetime import UTC, datetime import aiohttp from nltk.sentiment.vader import SentimentIntensityAnalyzer -from shared.models import NewsCategory, NewsItem from news_collector.collectors.base import BaseCollector +from shared.models import NewsCategory, NewsItem logger = logging.getLogger(__name__) @@ -58,7 +58,7 @@ class SecEdgarCollector(BaseCollector): async def collect(self) -> list[NewsItem]: filings_data = await self._fetch_recent_filings() items = [] - today = datetime.now(timezone.utc).strftime("%Y-%m-%d") + today = datetime.now(UTC).strftime("%Y-%m-%d") for company_data in filings_data: tickers = [t["ticker"] for t in company_data.get("tickers", [])] @@ -87,9 +87,7 @@ class SecEdgarCollector(BaseCollector): headline=headline, summary=desc, url=f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&accession={accession}", - published_at=datetime.strptime(filing_date, "%Y-%m-%d").replace( - tzinfo=timezone.utc - ), + published_at=datetime.strptime(filing_date, "%Y-%m-%d").replace(tzinfo=UTC), symbols=tickers, sentiment=self._vader.polarity_scores(headline)["compound"], category=NewsCategory.FILING, diff --git a/services/news-collector/src/news_collector/collectors/truth_social.py b/services/news-collector/src/news_collector/collectors/truth_social.py index 33ebc86..e2acd88 100644 --- a/services/news-collector/src/news_collector/collectors/truth_social.py +++ b/services/news-collector/src/news_collector/collectors/truth_social.py @@ -2,7 +2,7 @@ import logging import re -from datetime import datetime, timezone +from datetime import UTC, datetime import aiohttp from nltk.sentiment.vader import SentimentIntensityAnalyzer @@ -67,7 +67,7 @@ class TruthSocialCollector(BaseCollector): try: published_at = datetime.fromisoformat(created_at_str.replace("Z", "+00:00")) except Exception: - published_at = datetime.now(timezone.utc) + published_at = datetime.now(UTC) items.append( NewsItem( diff --git a/services/news-collector/src/news_collector/config.py b/services/news-collector/src/news_collector/config.py index 70d98f1..6e78eba 100644 --- a/services/news-collector/src/news_collector/config.py +++ b/services/news-collector/src/news_collector/config.py @@ -5,6 +5,3 @@ from shared.config import Settings class NewsCollectorConfig(Settings): health_port: int = 8084 - finnhub_api_key: str = "" - news_poll_interval: int = 300 - sentiment_aggregate_interval: int = 900 diff --git a/services/news-collector/src/news_collector/main.py b/services/news-collector/src/news_collector/main.py index 3493f7c..c39fa67 100644 --- a/services/news-collector/src/news_collector/main.py +++ b/services/news-collector/src/news_collector/main.py @@ -1,8 +1,18 @@ """News Collector Service — fetches news from multiple sources and aggregates sentiment.""" import asyncio -from datetime import datetime, timezone +from datetime import UTC, datetime +import aiohttp + +from news_collector.collectors.fear_greed import FearGreedCollector +from news_collector.collectors.fed import FedCollector +from news_collector.collectors.finnhub import FinnhubCollector +from news_collector.collectors.reddit import RedditCollector +from news_collector.collectors.rss import RSSCollector +from news_collector.collectors.sec_edgar import SecEdgarCollector +from news_collector.collectors.truth_social import TruthSocialCollector +from news_collector.config import NewsCollectorConfig from shared.broker import RedisBroker from shared.db import Database from shared.events import NewsEvent @@ -11,20 +21,9 @@ from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.models import NewsItem from shared.notifier import TelegramNotifier -from shared.sentiment_models import MarketSentiment from shared.sentiment import SentimentAggregator - -from news_collector.config import NewsCollectorConfig -from news_collector.collectors.finnhub import FinnhubCollector -from news_collector.collectors.rss import RSSCollector -from news_collector.collectors.sec_edgar import SecEdgarCollector -from news_collector.collectors.truth_social import TruthSocialCollector -from news_collector.collectors.reddit import RedditCollector -from news_collector.collectors.fear_greed import FearGreedCollector -from news_collector.collectors.fed import FedCollector - -# Health check port: base + 4 -HEALTH_PORT_OFFSET = 4 +from shared.sentiment_models import MarketSentiment +from shared.shutdown import GracefulShutdown async def run_collector_once(collector, db: Database, broker: RedisBroker) -> int: @@ -53,9 +52,15 @@ async def run_collector_loop(collector, db: Database, broker: RedisBroker, log) collector=collector.name, count=count, ) - except Exception as exc: + except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc: log.warning( - "collector_error", + "collector_network_error", + collector=collector.name, + error=str(exc), + ) + except (ValueError, KeyError, TypeError) as exc: + log.warning( + "collector_parse_error", collector=collector.name, error=str(exc), ) @@ -74,7 +79,7 @@ async def run_fear_greed_loop(collector: FearGreedCollector, db: Database, log) vix=None, fed_stance="neutral", market_regime=_determine_regime(result.fear_greed, None), - updated_at=datetime.now(timezone.utc), + updated_at=datetime.now(UTC), ) await db.upsert_market_sentiment(ms) log.info( @@ -82,8 +87,10 @@ async def run_fear_greed_loop(collector: FearGreedCollector, db: Database, log) value=result.fear_greed, label=result.fear_greed_label, ) - except Exception as exc: - log.warning("fear_greed_error", error=str(exc)) + except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc: + log.warning("fear_greed_network_error", error=str(exc)) + except (ValueError, KeyError, TypeError) as exc: + log.warning("fear_greed_parse_error", error=str(exc)) await asyncio.sleep(collector.poll_interval) @@ -93,14 +100,16 @@ async def run_aggregator_loop(db: Database, interval: int, log) -> None: while True: await asyncio.sleep(interval) try: - now = datetime.now(timezone.utc) + now = datetime.now(UTC) news_items = await db.get_recent_news(hours=24) scores = aggregator.aggregate(news_items, now) for score in scores.values(): await db.upsert_symbol_score(score) log.info("aggregation_complete", symbols=len(scores)) - except Exception as exc: - log.warning("aggregator_error", error=str(exc)) + except (ConnectionError, TimeoutError) as exc: + log.warning("aggregator_network_error", error=str(exc)) + except (ValueError, KeyError, TypeError) as exc: + log.warning("aggregator_parse_error", error=str(exc)) def _determine_regime(fear_greed: int, vix: float | None) -> str: @@ -115,14 +124,14 @@ async def run() -> None: metrics = ServiceMetrics("news_collector") notifier = TelegramNotifier( - bot_token=config.telegram_bot_token, + bot_token=config.telegram_bot_token.get_secret_value(), chat_id=config.telegram_chat_id, ) - db = Database(config.database_url) + db = Database(config.database_url.get_secret_value()) await db.connect() - broker = RedisBroker(config.redis_url) + broker = RedisBroker(config.redis_url.get_secret_value()) health = HealthCheckServer( "news-collector", @@ -133,7 +142,7 @@ async def run() -> None: metrics.service_up.labels(service="news-collector").set(1) # Build collectors - finnhub = FinnhubCollector(api_key=config.finnhub_api_key) + finnhub = FinnhubCollector(api_key=config.finnhub_api_key.get_secret_value()) rss = RSSCollector() sec = SecEdgarCollector() truth = TruthSocialCollector() @@ -143,6 +152,9 @@ async def run() -> None: news_collectors = [finnhub, rss, sec, truth, reddit, fed] + shutdown = GracefulShutdown() + shutdown.install_handlers() + log.info( "starting", collectors=[c.name for c in news_collectors], @@ -151,14 +163,13 @@ async def run() -> None: ) try: - tasks = [] - for collector in news_collectors: - tasks.append( - asyncio.create_task( - run_collector_loop(collector, db, broker, log), - name=f"collector-{collector.name}", - ) + tasks = [ + asyncio.create_task( + run_collector_loop(collector, db, broker, log), + name=f"collector-{collector.name}", ) + for collector in news_collectors + ] tasks.append( asyncio.create_task( run_fear_greed_loop(fear_greed, db, log), @@ -171,9 +182,9 @@ async def run() -> None: name="aggregator-loop", ) ) - await asyncio.gather(*tasks) + await shutdown.wait() except Exception as exc: - log.error("fatal_error", error=str(exc)) + log.error("fatal_error", error=str(exc), exc_info=True) await notifier.send_error(str(exc), "news-collector") raise finally: diff --git a/services/news-collector/tests/test_fear_greed.py b/services/news-collector/tests/test_fear_greed.py index d483aa6..e8bd8f0 100644 --- a/services/news-collector/tests/test_fear_greed.py +++ b/services/news-collector/tests/test_fear_greed.py @@ -1,8 +1,8 @@ """Tests for CNN Fear & Greed Index collector.""" -import pytest from unittest.mock import AsyncMock, patch +import pytest from news_collector.collectors.fear_greed import FearGreedCollector diff --git a/services/news-collector/tests/test_fed.py b/services/news-collector/tests/test_fed.py index d1a736b..7f1c46c 100644 --- a/services/news-collector/tests/test_fed.py +++ b/services/news-collector/tests/test_fed.py @@ -1,7 +1,8 @@ """Tests for Federal Reserve collector.""" -import pytest from unittest.mock import AsyncMock, patch + +import pytest from news_collector.collectors.fed import FedCollector diff --git a/services/news-collector/tests/test_finnhub.py b/services/news-collector/tests/test_finnhub.py index a4cf169..3af65b8 100644 --- a/services/news-collector/tests/test_finnhub.py +++ b/services/news-collector/tests/test_finnhub.py @@ -1,8 +1,8 @@ """Tests for Finnhub news collector.""" -import pytest from unittest.mock import AsyncMock, patch +import pytest from news_collector.collectors.finnhub import FinnhubCollector diff --git a/services/news-collector/tests/test_main.py b/services/news-collector/tests/test_main.py index 66190dc..f85569a 100644 --- a/services/news-collector/tests/test_main.py +++ b/services/news-collector/tests/test_main.py @@ -1,16 +1,18 @@ """Tests for news collector scheduler.""" +from datetime import UTC, datetime from unittest.mock import AsyncMock, MagicMock -from datetime import datetime, timezone -from shared.models import NewsCategory, NewsItem + from news_collector.main import run_collector_once +from shared.models import NewsCategory, NewsItem + async def test_run_collector_once_stores_and_publishes(): mock_item = NewsItem( source="test", headline="Test news", - published_at=datetime(2026, 4, 2, tzinfo=timezone.utc), + published_at=datetime(2026, 4, 2, tzinfo=UTC), sentiment=0.5, category=NewsCategory.MACRO, ) diff --git a/services/news-collector/tests/test_reddit.py b/services/news-collector/tests/test_reddit.py index 440b173..31b1dc1 100644 --- a/services/news-collector/tests/test_reddit.py +++ b/services/news-collector/tests/test_reddit.py @@ -1,7 +1,8 @@ """Tests for Reddit collector.""" -import pytest from unittest.mock import AsyncMock, patch + +import pytest from news_collector.collectors.reddit import RedditCollector diff --git a/services/news-collector/tests/test_rss.py b/services/news-collector/tests/test_rss.py index e03250a..7242c75 100644 --- a/services/news-collector/tests/test_rss.py +++ b/services/news-collector/tests/test_rss.py @@ -1,8 +1,8 @@ """Tests for RSS news collector.""" -import pytest from unittest.mock import AsyncMock, patch +import pytest from news_collector.collectors.rss import RSSCollector diff --git a/services/news-collector/tests/test_sec_edgar.py b/services/news-collector/tests/test_sec_edgar.py index 5d4f69f..b0faf18 100644 --- a/services/news-collector/tests/test_sec_edgar.py +++ b/services/news-collector/tests/test_sec_edgar.py @@ -1,9 +1,9 @@ """Tests for SEC EDGAR filing collector.""" -import pytest -from datetime import datetime, timezone -from unittest.mock import AsyncMock, patch, MagicMock +from datetime import UTC, datetime +from unittest.mock import AsyncMock, MagicMock, patch +import pytest from news_collector.collectors.sec_edgar import SecEdgarCollector @@ -37,7 +37,7 @@ async def test_collect_parses_filings(collector): } mock_datetime = MagicMock(spec=datetime) - mock_datetime.now.return_value = datetime(2026, 4, 2, tzinfo=timezone.utc) + mock_datetime.now.return_value = datetime(2026, 4, 2, tzinfo=UTC) mock_datetime.strptime = datetime.strptime with patch.object( diff --git a/services/news-collector/tests/test_truth_social.py b/services/news-collector/tests/test_truth_social.py index 91ddb9d..52f1e46 100644 --- a/services/news-collector/tests/test_truth_social.py +++ b/services/news-collector/tests/test_truth_social.py @@ -1,7 +1,8 @@ """Tests for Truth Social collector.""" -import pytest from unittest.mock import AsyncMock, patch + +import pytest from news_collector.collectors.truth_social import TruthSocialCollector |
