"""RSS news collector using feedparser with VADER sentiment analysis.""" import asyncio import logging import re from datetime import datetime, timezone from time import mktime import feedparser from nltk.sentiment.vader import SentimentIntensityAnalyzer from shared.models import NewsCategory, NewsItem from .base import BaseCollector logger = logging.getLogger(__name__) _DEFAULT_FEEDS = [ "https://finance.yahoo.com/news/rssindex", "https://news.google.com/rss/search?q=stock+market+finance&hl=en-US&gl=US&ceid=US:en", "https://feeds.marketwatch.com/marketwatch/topstories/", ] _TICKER_PATTERN = re.compile( r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|BRK\.?[AB]|JPM|V|UNH|XOM|" r"JNJ|WMT|MA|PG|HD|CVX|MRK|LLY|ABBV|PFE|BAC|KO|AVGO|COST|MCD|TMO|" r"CSCO|ACN|ABT|DHR|TXN|NEE|NFLX|PM|UPS|RTX|HON|QCOM|AMGN|LOW|IBM|" r"INTC|AMD|PYPL|GS|MS|BLK|SPGI|CAT|DE|GE|MMM|BA|F|GM|DIS|CMCSA)\b" ) class RSSCollector(BaseCollector): name: str = "rss" poll_interval: int = 600 def __init__(self, feeds: list[str] | None = None) -> None: self._feeds = feeds if feeds is not None else _DEFAULT_FEEDS self._vader = SentimentIntensityAnalyzer() async def is_available(self) -> bool: return True async def _fetch_feeds(self) -> list[dict]: loop = asyncio.get_event_loop() results = [] for url in self._feeds: try: parsed = await loop.run_in_executor(None, feedparser.parse, url) results.append(parsed) except Exception as exc: logger.error("RSS fetch failed for %s: %s", url, exc) return results def _parse_published(self, entry: dict) -> datetime: parsed_time = entry.get("published_parsed") if parsed_time: try: ts = mktime(parsed_time) return datetime.fromtimestamp(ts, tz=timezone.utc) except Exception: pass return datetime.now(timezone.utc) async def collect(self) -> list[NewsItem]: try: feeds = await self._fetch_feeds() except Exception as exc: logger.error("RSS collector error: %s", exc) return [] seen_titles: set[str] = set() items: list[NewsItem] = [] for feed in feeds: for entry in feed.get("entries", []): title = entry.get("title", "").strip() if not title or title in seen_titles: continue seen_titles.add(title) summary = entry.get("summary", "") or "" combined = f"{title} {summary}" sentiment_scores = self._vader.polarity_scores(combined) sentiment = sentiment_scores["compound"] symbols = list(dict.fromkeys(_TICKER_PATTERN.findall(combined))) published_at = self._parse_published(entry) items.append( NewsItem( source=self.name, headline=title, summary=summary or None, url=entry.get("link") or None, published_at=published_at, symbols=symbols, sentiment=sentiment, category=NewsCategory.MACRO, raw_data=dict(entry), ) ) return items