summaryrefslogtreecommitdiff
path: root/services/news-collector/src/news_collector/collectors/fed.py
diff options
context:
space:
mode:
Diffstat (limited to 'services/news-collector/src/news_collector/collectors/fed.py')
-rw-r--r--services/news-collector/src/news_collector/collectors/fed.py104
1 files changed, 104 insertions, 0 deletions
diff --git a/services/news-collector/src/news_collector/collectors/fed.py b/services/news-collector/src/news_collector/collectors/fed.py
new file mode 100644
index 0000000..47b70f5
--- /dev/null
+++ b/services/news-collector/src/news_collector/collectors/fed.py
@@ -0,0 +1,104 @@
+"""Federal Reserve RSS collector with hawkish/dovish/neutral stance detection."""
+
+import asyncio
+import logging
+from calendar import timegm
+from datetime import datetime, timezone
+
+import feedparser
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+
+from .base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+_FED_RSS_URL = "https://www.federalreserve.gov/feeds/press_all.xml"
+
+_HAWKISH_KEYWORDS = [
+ "rate hike", "interest rate increase", "tighten", "tightening", "inflation",
+ "hawkish", "restrictive", "raise rates", "hike rates",
+]
+_DOVISH_KEYWORDS = [
+ "rate cut", "interest rate decrease", "easing", "ease", "stimulus",
+ "dovish", "accommodative", "lower rates", "cut rates", "quantitative easing",
+]
+
+
+def _detect_stance(text: str) -> str:
+ lower = text.lower()
+ hawkish_hits = sum(1 for kw in _HAWKISH_KEYWORDS if kw in lower)
+ dovish_hits = sum(1 for kw in _DOVISH_KEYWORDS if kw in lower)
+ if hawkish_hits > dovish_hits:
+ return "hawkish"
+ if dovish_hits > hawkish_hits:
+ return "dovish"
+ return "neutral"
+
+
+class FedCollector(BaseCollector):
+ name: str = "fed"
+ poll_interval: int = 3600
+
+ def __init__(self) -> None:
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_fed_rss(self) -> list[dict]:
+ loop = asyncio.get_event_loop()
+ try:
+ parsed = await loop.run_in_executor(None, feedparser.parse, _FED_RSS_URL)
+ return parsed.get("entries", [])
+ except Exception as exc:
+ logger.error("Fed RSS fetch failed: %s", exc)
+ return []
+
+ def _parse_published(self, entry: dict) -> datetime:
+ published_parsed = entry.get("published_parsed")
+ if published_parsed:
+ try:
+ ts = timegm(published_parsed)
+ return datetime.fromtimestamp(ts, tz=timezone.utc)
+ except Exception:
+ pass
+ return datetime.now(timezone.utc)
+
+ async def collect(self) -> list[NewsItem]:
+ try:
+ entries = await self._fetch_fed_rss()
+ except Exception as exc:
+ logger.error("Fed collector error: %s", exc)
+ return []
+
+ items: list[NewsItem] = []
+
+ for entry in entries:
+ title = entry.get("title", "").strip()
+ if not title:
+ continue
+
+ summary = entry.get("summary", "") or ""
+ combined = f"{title} {summary}"
+
+ sentiment = self._vader.polarity_scores(combined)["compound"]
+ stance = _detect_stance(combined)
+ published_at = self._parse_published(entry)
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=title,
+ summary=summary or None,
+ url=entry.get("link") or None,
+ published_at=published_at,
+ symbols=[],
+ sentiment=sentiment,
+ category=NewsCategory.FED,
+ raw_data={"stance": stance, **dict(entry)},
+ )
+ )
+
+ return items