"""Federal Reserve RSS collector with hawkish/dovish/neutral stance detection.""" import asyncio import logging from calendar import timegm from datetime import UTC, datetime import feedparser from nltk.sentiment.vader import SentimentIntensityAnalyzer from shared.models import NewsCategory, NewsItem from .base import BaseCollector logger = logging.getLogger(__name__) _FED_RSS_URL = "https://www.federalreserve.gov/feeds/press_all.xml" _HAWKISH_KEYWORDS = [ "rate hike", "interest rate increase", "tighten", "tightening", "inflation", "hawkish", "restrictive", "raise rates", "hike rates", ] _DOVISH_KEYWORDS = [ "rate cut", "interest rate decrease", "easing", "ease", "stimulus", "dovish", "accommodative", "lower rates", "cut rates", "quantitative easing", ] def _detect_stance(text: str) -> str: lower = text.lower() hawkish_hits = sum(1 for kw in _HAWKISH_KEYWORDS if kw in lower) dovish_hits = sum(1 for kw in _DOVISH_KEYWORDS if kw in lower) if hawkish_hits > dovish_hits: return "hawkish" if dovish_hits > hawkish_hits: return "dovish" return "neutral" class FedCollector(BaseCollector): name: str = "fed" poll_interval: int = 3600 def __init__(self) -> None: self._vader = SentimentIntensityAnalyzer() async def is_available(self) -> bool: return True async def _fetch_fed_rss(self) -> list[dict]: loop = asyncio.get_event_loop() try: parsed = await loop.run_in_executor(None, feedparser.parse, _FED_RSS_URL) return parsed.get("entries", []) except Exception as exc: logger.error("Fed RSS fetch failed: %s", exc) return [] def _parse_published(self, entry: dict) -> datetime: published_parsed = entry.get("published_parsed") if published_parsed: try: ts = timegm(published_parsed) return datetime.fromtimestamp(ts, tz=UTC) except Exception: pass return datetime.now(UTC) async def collect(self) -> list[NewsItem]: try: entries = await self._fetch_fed_rss() except Exception as exc: logger.error("Fed collector error: %s", exc) return [] items: list[NewsItem] = [] for entry in entries: title = entry.get("title", "").strip() if not title: continue summary = entry.get("summary", "") or "" combined = f"{title} {summary}" sentiment = self._vader.polarity_scores(combined)["compound"] stance = _detect_stance(combined) published_at = self._parse_published(entry) items.append( NewsItem( source=self.name, headline=title, summary=summary or None, url=entry.get("link") or None, published_at=published_at, symbols=[], sentiment=sentiment, category=NewsCategory.FED, raw_data={"stance": stance, **dict(entry)}, ) ) return items