diff options
Diffstat (limited to 'services/news-collector/src/news_collector/collectors/fed.py')
| -rw-r--r-- | services/news-collector/src/news_collector/collectors/fed.py | 104 |
1 files changed, 104 insertions, 0 deletions
diff --git a/services/news-collector/src/news_collector/collectors/fed.py b/services/news-collector/src/news_collector/collectors/fed.py new file mode 100644 index 0000000..47b70f5 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/fed.py @@ -0,0 +1,104 @@ +"""Federal Reserve RSS collector with hawkish/dovish/neutral stance detection.""" + +import asyncio +import logging +from calendar import timegm +from datetime import datetime, timezone + +import feedparser +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem + +from .base import BaseCollector + +logger = logging.getLogger(__name__) + +_FED_RSS_URL = "https://www.federalreserve.gov/feeds/press_all.xml" + +_HAWKISH_KEYWORDS = [ + "rate hike", "interest rate increase", "tighten", "tightening", "inflation", + "hawkish", "restrictive", "raise rates", "hike rates", +] +_DOVISH_KEYWORDS = [ + "rate cut", "interest rate decrease", "easing", "ease", "stimulus", + "dovish", "accommodative", "lower rates", "cut rates", "quantitative easing", +] + + +def _detect_stance(text: str) -> str: + lower = text.lower() + hawkish_hits = sum(1 for kw in _HAWKISH_KEYWORDS if kw in lower) + dovish_hits = sum(1 for kw in _DOVISH_KEYWORDS if kw in lower) + if hawkish_hits > dovish_hits: + return "hawkish" + if dovish_hits > hawkish_hits: + return "dovish" + return "neutral" + + +class FedCollector(BaseCollector): + name: str = "fed" + poll_interval: int = 3600 + + def __init__(self) -> None: + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_fed_rss(self) -> list[dict]: + loop = asyncio.get_event_loop() + try: + parsed = await loop.run_in_executor(None, feedparser.parse, _FED_RSS_URL) + return parsed.get("entries", []) + except Exception as exc: + logger.error("Fed RSS fetch failed: %s", exc) + return [] + + def _parse_published(self, entry: dict) -> datetime: + published_parsed = entry.get("published_parsed") + if published_parsed: + try: + ts = timegm(published_parsed) + return datetime.fromtimestamp(ts, tz=timezone.utc) + except Exception: + pass + return datetime.now(timezone.utc) + + async def collect(self) -> list[NewsItem]: + try: + entries = await self._fetch_fed_rss() + except Exception as exc: + logger.error("Fed collector error: %s", exc) + return [] + + items: list[NewsItem] = [] + + for entry in entries: + title = entry.get("title", "").strip() + if not title: + continue + + summary = entry.get("summary", "") or "" + combined = f"{title} {summary}" + + sentiment = self._vader.polarity_scores(combined)["compound"] + stance = _detect_stance(combined) + published_at = self._parse_published(entry) + + items.append( + NewsItem( + source=self.name, + headline=title, + summary=summary or None, + url=entry.get("link") or None, + published_at=published_at, + symbols=[], + sentiment=sentiment, + category=NewsCategory.FED, + raw_data={"stance": stance, **dict(entry)}, + ) + ) + + return items |
