summaryrefslogtreecommitdiff
path: root/services/news-collector/src/news_collector/collectors/rss.py
blob: ddf8503488b1eb6fb788994d3552d4b139e8d7d9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
"""RSS news collector using feedparser with VADER sentiment analysis."""

import asyncio
import logging
import re
from datetime import datetime, timezone
from time import mktime

import feedparser
from nltk.sentiment.vader import SentimentIntensityAnalyzer

from shared.models import NewsCategory, NewsItem

from .base import BaseCollector

logger = logging.getLogger(__name__)

_DEFAULT_FEEDS = [
    "https://finance.yahoo.com/news/rssindex",
    "https://news.google.com/rss/search?q=stock+market+finance&hl=en-US&gl=US&ceid=US:en",
    "https://feeds.marketwatch.com/marketwatch/topstories/",
]

_TICKER_PATTERN = re.compile(
    r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|BRK\.?[AB]|JPM|V|UNH|XOM|"
    r"JNJ|WMT|MA|PG|HD|CVX|MRK|LLY|ABBV|PFE|BAC|KO|AVGO|COST|MCD|TMO|"
    r"CSCO|ACN|ABT|DHR|TXN|NEE|NFLX|PM|UPS|RTX|HON|QCOM|AMGN|LOW|IBM|"
    r"INTC|AMD|PYPL|GS|MS|BLK|SPGI|CAT|DE|GE|MMM|BA|F|GM|DIS|CMCSA)\b"
)


class RSSCollector(BaseCollector):
    name: str = "rss"
    poll_interval: int = 600

    def __init__(self, feeds: list[str] | None = None) -> None:
        self._feeds = feeds if feeds is not None else _DEFAULT_FEEDS
        self._vader = SentimentIntensityAnalyzer()

    async def is_available(self) -> bool:
        return True

    async def _fetch_feeds(self) -> list[dict]:
        loop = asyncio.get_event_loop()
        results = []
        for url in self._feeds:
            try:
                parsed = await loop.run_in_executor(None, feedparser.parse, url)
                results.append(parsed)
            except Exception as exc:
                logger.error("RSS fetch failed for %s: %s", url, exc)
        return results

    def _parse_published(self, entry: dict) -> datetime:
        parsed_time = entry.get("published_parsed")
        if parsed_time:
            try:
                ts = mktime(parsed_time)
                return datetime.fromtimestamp(ts, tz=timezone.utc)
            except Exception:
                pass
        return datetime.now(timezone.utc)

    async def collect(self) -> list[NewsItem]:
        try:
            feeds = await self._fetch_feeds()
        except Exception as exc:
            logger.error("RSS collector error: %s", exc)
            return []

        seen_titles: set[str] = set()
        items: list[NewsItem] = []

        for feed in feeds:
            for entry in feed.get("entries", []):
                title = entry.get("title", "").strip()
                if not title or title in seen_titles:
                    continue
                seen_titles.add(title)

                summary = entry.get("summary", "") or ""
                combined = f"{title} {summary}"

                sentiment_scores = self._vader.polarity_scores(combined)
                sentiment = sentiment_scores["compound"]

                symbols = list(dict.fromkeys(_TICKER_PATTERN.findall(combined)))

                published_at = self._parse_published(entry)

                items.append(
                    NewsItem(
                        source=self.name,
                        headline=title,
                        summary=summary or None,
                        url=entry.get("link") or None,
                        published_at=published_at,
                        symbols=symbols,
                        sentiment=sentiment,
                        category=NewsCategory.MACRO,
                        raw_data=dict(entry),
                    )
                )

        return items