diff options
Diffstat (limited to 'services/news-collector')
| -rw-r--r-- | services/news-collector/src/news_collector/collectors/rss.py | 105 | ||||
| -rw-r--r-- | services/news-collector/tests/test_rss.py | 48 |
2 files changed, 153 insertions, 0 deletions
diff --git a/services/news-collector/src/news_collector/collectors/rss.py b/services/news-collector/src/news_collector/collectors/rss.py new file mode 100644 index 0000000..ddf8503 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/rss.py @@ -0,0 +1,105 @@ +"""RSS news collector using feedparser with VADER sentiment analysis.""" + +import asyncio +import logging +import re +from datetime import datetime, timezone +from time import mktime + +import feedparser +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem + +from .base import BaseCollector + +logger = logging.getLogger(__name__) + +_DEFAULT_FEEDS = [ + "https://finance.yahoo.com/news/rssindex", + "https://news.google.com/rss/search?q=stock+market+finance&hl=en-US&gl=US&ceid=US:en", + "https://feeds.marketwatch.com/marketwatch/topstories/", +] + +_TICKER_PATTERN = re.compile( + r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|BRK\.?[AB]|JPM|V|UNH|XOM|" + r"JNJ|WMT|MA|PG|HD|CVX|MRK|LLY|ABBV|PFE|BAC|KO|AVGO|COST|MCD|TMO|" + r"CSCO|ACN|ABT|DHR|TXN|NEE|NFLX|PM|UPS|RTX|HON|QCOM|AMGN|LOW|IBM|" + r"INTC|AMD|PYPL|GS|MS|BLK|SPGI|CAT|DE|GE|MMM|BA|F|GM|DIS|CMCSA)\b" +) + + +class RSSCollector(BaseCollector): + name: str = "rss" + poll_interval: int = 600 + + def __init__(self, feeds: list[str] | None = None) -> None: + self._feeds = feeds if feeds is not None else _DEFAULT_FEEDS + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_feeds(self) -> list[dict]: + loop = asyncio.get_event_loop() + results = [] + for url in self._feeds: + try: + parsed = await loop.run_in_executor(None, feedparser.parse, url) + results.append(parsed) + except Exception as exc: + logger.error("RSS fetch failed for %s: %s", url, exc) + return results + + def _parse_published(self, entry: dict) -> datetime: + parsed_time = entry.get("published_parsed") + if parsed_time: + try: + ts = mktime(parsed_time) + return datetime.fromtimestamp(ts, tz=timezone.utc) + except Exception: + pass + return datetime.now(timezone.utc) + + async def collect(self) -> list[NewsItem]: + try: + feeds = await self._fetch_feeds() + except Exception as exc: + logger.error("RSS collector error: %s", exc) + return [] + + seen_titles: set[str] = set() + items: list[NewsItem] = [] + + for feed in feeds: + for entry in feed.get("entries", []): + title = entry.get("title", "").strip() + if not title or title in seen_titles: + continue + seen_titles.add(title) + + summary = entry.get("summary", "") or "" + combined = f"{title} {summary}" + + sentiment_scores = self._vader.polarity_scores(combined) + sentiment = sentiment_scores["compound"] + + symbols = list(dict.fromkeys(_TICKER_PATTERN.findall(combined))) + + published_at = self._parse_published(entry) + + items.append( + NewsItem( + source=self.name, + headline=title, + summary=summary or None, + url=entry.get("link") or None, + published_at=published_at, + symbols=symbols, + sentiment=sentiment, + category=NewsCategory.MACRO, + raw_data=dict(entry), + ) + ) + + return items diff --git a/services/news-collector/tests/test_rss.py b/services/news-collector/tests/test_rss.py new file mode 100644 index 0000000..58c5f7c --- /dev/null +++ b/services/news-collector/tests/test_rss.py @@ -0,0 +1,48 @@ +"""Tests for RSS news collector.""" + +import pytest +from unittest.mock import AsyncMock, patch +from datetime import datetime, timezone + +from news_collector.collectors.rss import RSSCollector + + +@pytest.fixture +def collector(): + return RSSCollector() + + +def test_collector_name(collector): + assert collector.name == "rss" + assert collector.poll_interval == 600 + + +async def test_is_available(collector): + assert await collector.is_available() is True + + +async def test_collect_parses_feed(collector): + mock_feed = { + "entries": [ + { + "title": "NVDA surges on AI demand", + "link": "https://example.com/nvda", + "published_parsed": (2026, 4, 2, 12, 0, 0, 0, 0, 0), + "summary": "Nvidia stock jumped 5%...", + }, + { + "title": "Markets rally on jobs data", + "link": "https://example.com/market", + "published_parsed": (2026, 4, 2, 11, 0, 0, 0, 0, 0), + "summary": "The S&P 500 rose...", + }, + ], + } + + with patch.object(collector, "_fetch_feeds", new_callable=AsyncMock, return_value=[mock_feed]): + items = await collector.collect() + + assert len(items) == 2 + assert items[0].source == "rss" + assert items[0].headline == "NVDA surges on AI demand" + assert isinstance(items[0].sentiment, float) |
