"""Reddit social sentiment collector using JSON API with VADER sentiment analysis.""" import logging import re from datetime import datetime, timezone import aiohttp from nltk.sentiment.vader import SentimentIntensityAnalyzer from shared.models import NewsCategory, NewsItem from .base import BaseCollector logger = logging.getLogger(__name__) _SUBREDDITS = ["wallstreetbets", "stocks", "investing"] _MIN_SCORE = 50 _TICKER_PATTERN = re.compile( r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|BRK\.?[AB]|JPM|V|UNH|XOM|" r"JNJ|WMT|MA|PG|HD|CVX|MRK|LLY|ABBV|PFE|BAC|KO|AVGO|COST|MCD|TMO|" r"CSCO|ACN|ABT|DHR|TXN|NEE|NFLX|PM|UPS|RTX|HON|QCOM|AMGN|LOW|IBM|" r"INTC|AMD|PYPL|GS|MS|BLK|SPGI|CAT|DE|GE|MMM|BA|F|GM|DIS|CMCSA)\b" ) class RedditCollector(BaseCollector): name: str = "reddit" poll_interval: int = 900 def __init__(self) -> None: self._vader = SentimentIntensityAnalyzer() async def is_available(self) -> bool: return True async def _fetch_subreddit(self, subreddit: str) -> list[dict]: url = f"https://www.reddit.com/r/{subreddit}/hot.json?limit=25" headers = {"User-Agent": "TradingPlatform/1.0 (research@example.com)"} try: async with aiohttp.ClientSession() as session: async with session.get( url, headers=headers, timeout=aiohttp.ClientTimeout(total=10) ) as resp: if resp.status == 200: data = await resp.json() return data.get("data", {}).get("children", []) except Exception as exc: logger.error("Reddit fetch failed for r/%s: %s", subreddit, exc) return [] async def collect(self) -> list[NewsItem]: seen_titles: set[str] = set() items: list[NewsItem] = [] for subreddit in _SUBREDDITS: try: posts = await self._fetch_subreddit(subreddit) except Exception as exc: logger.error("Reddit collector error for r/%s: %s", subreddit, exc) continue for post in posts: post_data = post.get("data", {}) title = post_data.get("title", "").strip() score = post_data.get("score", 0) if not title or score < _MIN_SCORE: continue if title in seen_titles: continue seen_titles.add(title) selftext = post_data.get("selftext", "") or "" combined = f"{title} {selftext}" sentiment = self._vader.polarity_scores(combined)["compound"] symbols = list(dict.fromkeys(_TICKER_PATTERN.findall(combined))) created_utc = post_data.get("created_utc", 0) published_at = datetime.fromtimestamp(created_utc, tz=timezone.utc) items.append( NewsItem( source=self.name, headline=title, summary=selftext or None, url=post_data.get("url") or None, published_at=published_at, symbols=symbols, sentiment=sentiment, category=NewsCategory.SOCIAL, raw_data=post_data, ) ) return items