1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
"""Federal Reserve RSS collector with hawkish/dovish/neutral stance detection."""
import asyncio
import logging
from calendar import timegm
from datetime import UTC, datetime
import feedparser
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from shared.models import NewsCategory, NewsItem
from .base import BaseCollector
logger = logging.getLogger(__name__)
_FED_RSS_URL = "https://www.federalreserve.gov/feeds/press_all.xml"
_HAWKISH_KEYWORDS = [
"rate hike",
"interest rate increase",
"tighten",
"tightening",
"inflation",
"hawkish",
"restrictive",
"raise rates",
"hike rates",
]
_DOVISH_KEYWORDS = [
"rate cut",
"interest rate decrease",
"easing",
"ease",
"stimulus",
"dovish",
"accommodative",
"lower rates",
"cut rates",
"quantitative easing",
]
def _detect_stance(text: str) -> str:
lower = text.lower()
hawkish_hits = sum(1 for kw in _HAWKISH_KEYWORDS if kw in lower)
dovish_hits = sum(1 for kw in _DOVISH_KEYWORDS if kw in lower)
if hawkish_hits > dovish_hits:
return "hawkish"
if dovish_hits > hawkish_hits:
return "dovish"
return "neutral"
class FedCollector(BaseCollector):
name: str = "fed"
poll_interval: int = 3600
def __init__(self) -> None:
self._vader = SentimentIntensityAnalyzer()
async def is_available(self) -> bool:
return True
async def _fetch_fed_rss(self) -> list[dict]:
loop = asyncio.get_event_loop()
try:
parsed = await loop.run_in_executor(None, feedparser.parse, _FED_RSS_URL)
return parsed.get("entries", [])
except Exception as exc:
logger.error("Fed RSS fetch failed: %s", exc)
return []
def _parse_published(self, entry: dict) -> datetime:
published_parsed = entry.get("published_parsed")
if published_parsed:
try:
ts = timegm(published_parsed)
return datetime.fromtimestamp(ts, tz=UTC)
except Exception:
pass
return datetime.now(UTC)
async def collect(self) -> list[NewsItem]:
try:
entries = await self._fetch_fed_rss()
except Exception as exc:
logger.error("Fed collector error: %s", exc)
return []
items: list[NewsItem] = []
for entry in entries:
title = entry.get("title", "").strip()
if not title:
continue
summary = entry.get("summary", "") or ""
combined = f"{title} {summary}"
sentiment = self._vader.polarity_scores(combined)["compound"]
stance = _detect_stance(combined)
published_at = self._parse_published(entry)
items.append(
NewsItem(
source=self.name,
headline=title,
summary=summary or None,
url=entry.get("link") or None,
published_at=published_at,
symbols=[],
sentiment=sentiment,
category=NewsCategory.FED,
raw_data={"stance": stance, **dict(entry)},
)
)
return items
|