summaryrefslogtreecommitdiff
path: root/services/news-collector/src/news_collector/collectors/fed.py
blob: 47b70f5cbb0bae0e2dc091015a1b643023e30480 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
"""Federal Reserve RSS collector with hawkish/dovish/neutral stance detection."""

import asyncio
import logging
from calendar import timegm
from datetime import datetime, timezone

import feedparser
from nltk.sentiment.vader import SentimentIntensityAnalyzer

from shared.models import NewsCategory, NewsItem

from .base import BaseCollector

logger = logging.getLogger(__name__)

_FED_RSS_URL = "https://www.federalreserve.gov/feeds/press_all.xml"

_HAWKISH_KEYWORDS = [
    "rate hike", "interest rate increase", "tighten", "tightening", "inflation",
    "hawkish", "restrictive", "raise rates", "hike rates",
]
_DOVISH_KEYWORDS = [
    "rate cut", "interest rate decrease", "easing", "ease", "stimulus",
    "dovish", "accommodative", "lower rates", "cut rates", "quantitative easing",
]


def _detect_stance(text: str) -> str:
    lower = text.lower()
    hawkish_hits = sum(1 for kw in _HAWKISH_KEYWORDS if kw in lower)
    dovish_hits = sum(1 for kw in _DOVISH_KEYWORDS if kw in lower)
    if hawkish_hits > dovish_hits:
        return "hawkish"
    if dovish_hits > hawkish_hits:
        return "dovish"
    return "neutral"


class FedCollector(BaseCollector):
    name: str = "fed"
    poll_interval: int = 3600

    def __init__(self) -> None:
        self._vader = SentimentIntensityAnalyzer()

    async def is_available(self) -> bool:
        return True

    async def _fetch_fed_rss(self) -> list[dict]:
        loop = asyncio.get_event_loop()
        try:
            parsed = await loop.run_in_executor(None, feedparser.parse, _FED_RSS_URL)
            return parsed.get("entries", [])
        except Exception as exc:
            logger.error("Fed RSS fetch failed: %s", exc)
            return []

    def _parse_published(self, entry: dict) -> datetime:
        published_parsed = entry.get("published_parsed")
        if published_parsed:
            try:
                ts = timegm(published_parsed)
                return datetime.fromtimestamp(ts, tz=timezone.utc)
            except Exception:
                pass
        return datetime.now(timezone.utc)

    async def collect(self) -> list[NewsItem]:
        try:
            entries = await self._fetch_fed_rss()
        except Exception as exc:
            logger.error("Fed collector error: %s", exc)
            return []

        items: list[NewsItem] = []

        for entry in entries:
            title = entry.get("title", "").strip()
            if not title:
                continue

            summary = entry.get("summary", "") or ""
            combined = f"{title} {summary}"

            sentiment = self._vader.polarity_scores(combined)["compound"]
            stance = _detect_stance(combined)
            published_at = self._parse_published(entry)

            items.append(
                NewsItem(
                    source=self.name,
                    headline=title,
                    summary=summary or None,
                    url=entry.get("link") or None,
                    published_at=published_at,
                    symbols=[],
                    sentiment=sentiment,
                    category=NewsCategory.FED,
                    raw_data={"stance": stance, **dict(entry)},
                )
            )

        return items