1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
"""RSS news collector using feedparser with VADER sentiment analysis."""
import asyncio
import logging
import re
from datetime import datetime, timezone
from time import mktime
import feedparser
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from shared.models import NewsCategory, NewsItem
from .base import BaseCollector
logger = logging.getLogger(__name__)
_DEFAULT_FEEDS = [
"https://finance.yahoo.com/news/rssindex",
"https://news.google.com/rss/search?q=stock+market+finance&hl=en-US&gl=US&ceid=US:en",
"https://feeds.marketwatch.com/marketwatch/topstories/",
]
_TICKER_PATTERN = re.compile(
r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|BRK\.?[AB]|JPM|V|UNH|XOM|"
r"JNJ|WMT|MA|PG|HD|CVX|MRK|LLY|ABBV|PFE|BAC|KO|AVGO|COST|MCD|TMO|"
r"CSCO|ACN|ABT|DHR|TXN|NEE|NFLX|PM|UPS|RTX|HON|QCOM|AMGN|LOW|IBM|"
r"INTC|AMD|PYPL|GS|MS|BLK|SPGI|CAT|DE|GE|MMM|BA|F|GM|DIS|CMCSA)\b"
)
class RSSCollector(BaseCollector):
name: str = "rss"
poll_interval: int = 600
def __init__(self, feeds: list[str] | None = None) -> None:
self._feeds = feeds if feeds is not None else _DEFAULT_FEEDS
self._vader = SentimentIntensityAnalyzer()
async def is_available(self) -> bool:
return True
async def _fetch_feeds(self) -> list[dict]:
loop = asyncio.get_event_loop()
results = []
for url in self._feeds:
try:
parsed = await loop.run_in_executor(None, feedparser.parse, url)
results.append(parsed)
except Exception as exc:
logger.error("RSS fetch failed for %s: %s", url, exc)
return results
def _parse_published(self, entry: dict) -> datetime:
parsed_time = entry.get("published_parsed")
if parsed_time:
try:
ts = mktime(parsed_time)
return datetime.fromtimestamp(ts, tz=timezone.utc)
except Exception:
pass
return datetime.now(timezone.utc)
async def collect(self) -> list[NewsItem]:
try:
feeds = await self._fetch_feeds()
except Exception as exc:
logger.error("RSS collector error: %s", exc)
return []
seen_titles: set[str] = set()
items: list[NewsItem] = []
for feed in feeds:
for entry in feed.get("entries", []):
title = entry.get("title", "").strip()
if not title or title in seen_titles:
continue
seen_titles.add(title)
summary = entry.get("summary", "") or ""
combined = f"{title} {summary}"
sentiment_scores = self._vader.polarity_scores(combined)
sentiment = sentiment_scores["compound"]
symbols = list(dict.fromkeys(_TICKER_PATTERN.findall(combined)))
published_at = self._parse_published(entry)
items.append(
NewsItem(
source=self.name,
headline=title,
summary=summary or None,
url=entry.get("link") or None,
published_at=published_at,
symbols=symbols,
sentiment=sentiment,
category=NewsCategory.MACRO,
raw_data=dict(entry),
)
)
return items
|