1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
"""Reddit social sentiment collector using JSON API with VADER sentiment analysis."""
import logging
import re
from datetime import datetime, timezone
import aiohttp
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from shared.models import NewsCategory, NewsItem
from .base import BaseCollector
logger = logging.getLogger(__name__)
_SUBREDDITS = ["wallstreetbets", "stocks", "investing"]
_MIN_SCORE = 50
_TICKER_PATTERN = re.compile(
r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|BRK\.?[AB]|JPM|V|UNH|XOM|"
r"JNJ|WMT|MA|PG|HD|CVX|MRK|LLY|ABBV|PFE|BAC|KO|AVGO|COST|MCD|TMO|"
r"CSCO|ACN|ABT|DHR|TXN|NEE|NFLX|PM|UPS|RTX|HON|QCOM|AMGN|LOW|IBM|"
r"INTC|AMD|PYPL|GS|MS|BLK|SPGI|CAT|DE|GE|MMM|BA|F|GM|DIS|CMCSA)\b"
)
class RedditCollector(BaseCollector):
name: str = "reddit"
poll_interval: int = 900
def __init__(self) -> None:
self._vader = SentimentIntensityAnalyzer()
async def is_available(self) -> bool:
return True
async def _fetch_subreddit(self, subreddit: str) -> list[dict]:
url = f"https://www.reddit.com/r/{subreddit}/hot.json?limit=25"
headers = {"User-Agent": "TradingPlatform/1.0 (research@example.com)"}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status == 200:
data = await resp.json()
return data.get("data", {}).get("children", [])
except Exception as exc:
logger.error("Reddit fetch failed for r/%s: %s", subreddit, exc)
return []
async def collect(self) -> list[NewsItem]:
seen_titles: set[str] = set()
items: list[NewsItem] = []
for subreddit in _SUBREDDITS:
try:
posts = await self._fetch_subreddit(subreddit)
except Exception as exc:
logger.error("Reddit collector error for r/%s: %s", subreddit, exc)
continue
for post in posts:
post_data = post.get("data", {})
title = post_data.get("title", "").strip()
score = post_data.get("score", 0)
if not title or score < _MIN_SCORE:
continue
if title in seen_titles:
continue
seen_titles.add(title)
selftext = post_data.get("selftext", "") or ""
combined = f"{title} {selftext}"
sentiment = self._vader.polarity_scores(combined)["compound"]
symbols = list(dict.fromkeys(_TICKER_PATTERN.findall(combined)))
created_utc = post_data.get("created_utc", 0)
published_at = datetime.fromtimestamp(created_utc, tz=timezone.utc)
items.append(
NewsItem(
source=self.name,
headline=title,
summary=selftext or None,
url=post_data.get("url") or None,
published_at=published_at,
symbols=symbols,
sentiment=sentiment,
category=NewsCategory.SOCIAL,
raw_data=post_data,
)
)
return items
|