summaryrefslogtreecommitdiff
path: root/services/news-collector/src/news_collector/collectors/rss.py
diff options
context:
space:
mode:
Diffstat (limited to 'services/news-collector/src/news_collector/collectors/rss.py')
-rw-r--r--services/news-collector/src/news_collector/collectors/rss.py105
1 files changed, 105 insertions, 0 deletions
diff --git a/services/news-collector/src/news_collector/collectors/rss.py b/services/news-collector/src/news_collector/collectors/rss.py
new file mode 100644
index 0000000..bca0e9f
--- /dev/null
+++ b/services/news-collector/src/news_collector/collectors/rss.py
@@ -0,0 +1,105 @@
+"""RSS news collector using feedparser with VADER sentiment analysis."""
+
+import asyncio
+import logging
+import re
+from datetime import UTC, datetime
+from time import mktime
+
+import feedparser
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+
+from .base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+_DEFAULT_FEEDS = [
+ "https://finance.yahoo.com/news/rssindex",
+ "https://news.google.com/rss/search?q=stock+market+finance&hl=en-US&gl=US&ceid=US:en",
+ "https://feeds.marketwatch.com/marketwatch/topstories/",
+]
+
+_TICKER_PATTERN = re.compile(
+ r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|BRK\.?[AB]|JPM|V|UNH|XOM|"
+ r"JNJ|WMT|MA|PG|HD|CVX|MRK|LLY|ABBV|PFE|BAC|KO|AVGO|COST|MCD|TMO|"
+ r"CSCO|ACN|ABT|DHR|TXN|NEE|NFLX|PM|UPS|RTX|HON|QCOM|AMGN|LOW|IBM|"
+ r"INTC|AMD|PYPL|GS|MS|BLK|SPGI|CAT|DE|GE|MMM|BA|F|GM|DIS|CMCSA)\b"
+)
+
+
+class RSSCollector(BaseCollector):
+ name: str = "rss"
+ poll_interval: int = 600
+
+ def __init__(self, feeds: list[str] | None = None) -> None:
+ self._feeds = feeds if feeds is not None else _DEFAULT_FEEDS
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_feeds(self) -> list[dict]:
+ loop = asyncio.get_event_loop()
+ results = []
+ for url in self._feeds:
+ try:
+ parsed = await loop.run_in_executor(None, feedparser.parse, url)
+ results.append(parsed)
+ except Exception as exc:
+ logger.error("RSS fetch failed for %s: %s", url, exc)
+ return results
+
+ def _parse_published(self, entry: dict) -> datetime:
+ parsed_time = entry.get("published_parsed")
+ if parsed_time:
+ try:
+ ts = mktime(parsed_time)
+ return datetime.fromtimestamp(ts, tz=UTC)
+ except Exception:
+ pass
+ return datetime.now(UTC)
+
+ async def collect(self) -> list[NewsItem]:
+ try:
+ feeds = await self._fetch_feeds()
+ except Exception as exc:
+ logger.error("RSS collector error: %s", exc)
+ return []
+
+ seen_titles: set[str] = set()
+ items: list[NewsItem] = []
+
+ for feed in feeds:
+ for entry in feed.get("entries", []):
+ title = entry.get("title", "").strip()
+ if not title or title in seen_titles:
+ continue
+ seen_titles.add(title)
+
+ summary = entry.get("summary", "") or ""
+ combined = f"{title} {summary}"
+
+ sentiment_scores = self._vader.polarity_scores(combined)
+ sentiment = sentiment_scores["compound"]
+
+ symbols = list(dict.fromkeys(_TICKER_PATTERN.findall(combined)))
+
+ published_at = self._parse_published(entry)
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=title,
+ summary=summary or None,
+ url=entry.get("link") or None,
+ published_at=published_at,
+ symbols=symbols,
+ sentiment=sentiment,
+ category=NewsCategory.MACRO,
+ raw_data=dict(entry),
+ )
+ )
+
+ return items