summaryrefslogtreecommitdiff
path: root/services/news-collector/src/news_collector/collectors/reddit.py
blob: 226a2f9301c9ee3aee249534ff14752d0b641beb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""Reddit social sentiment collector using JSON API with VADER sentiment analysis."""

import logging
import re
from datetime import datetime, timezone

import aiohttp
from nltk.sentiment.vader import SentimentIntensityAnalyzer

from shared.models import NewsCategory, NewsItem

from .base import BaseCollector

logger = logging.getLogger(__name__)

_SUBREDDITS = ["wallstreetbets", "stocks", "investing"]
_MIN_SCORE = 50

_TICKER_PATTERN = re.compile(
    r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|BRK\.?[AB]|JPM|V|UNH|XOM|"
    r"JNJ|WMT|MA|PG|HD|CVX|MRK|LLY|ABBV|PFE|BAC|KO|AVGO|COST|MCD|TMO|"
    r"CSCO|ACN|ABT|DHR|TXN|NEE|NFLX|PM|UPS|RTX|HON|QCOM|AMGN|LOW|IBM|"
    r"INTC|AMD|PYPL|GS|MS|BLK|SPGI|CAT|DE|GE|MMM|BA|F|GM|DIS|CMCSA)\b"
)


class RedditCollector(BaseCollector):
    name: str = "reddit"
    poll_interval: int = 900

    def __init__(self) -> None:
        self._vader = SentimentIntensityAnalyzer()

    async def is_available(self) -> bool:
        return True

    async def _fetch_subreddit(self, subreddit: str) -> list[dict]:
        url = f"https://www.reddit.com/r/{subreddit}/hot.json?limit=25"
        headers = {"User-Agent": "TradingPlatform/1.0 (research@example.com)"}
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(
                    url, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
                ) as resp:
                    if resp.status == 200:
                        data = await resp.json()
                        return data.get("data", {}).get("children", [])
        except Exception as exc:
            logger.error("Reddit fetch failed for r/%s: %s", subreddit, exc)
        return []

    async def collect(self) -> list[NewsItem]:
        seen_titles: set[str] = set()
        items: list[NewsItem] = []

        for subreddit in _SUBREDDITS:
            try:
                posts = await self._fetch_subreddit(subreddit)
            except Exception as exc:
                logger.error("Reddit collector error for r/%s: %s", subreddit, exc)
                continue

            for post in posts:
                post_data = post.get("data", {})
                title = post_data.get("title", "").strip()
                score = post_data.get("score", 0)

                if not title or score < _MIN_SCORE:
                    continue
                if title in seen_titles:
                    continue
                seen_titles.add(title)

                selftext = post_data.get("selftext", "") or ""
                combined = f"{title} {selftext}"

                sentiment = self._vader.polarity_scores(combined)["compound"]
                symbols = list(dict.fromkeys(_TICKER_PATTERN.findall(combined)))

                created_utc = post_data.get("created_utc", 0)
                published_at = datetime.fromtimestamp(created_utc, tz=timezone.utc)

                items.append(
                    NewsItem(
                        source=self.name,
                        headline=title,
                        summary=selftext or None,
                        url=post_data.get("url") or None,
                        published_at=published_at,
                        symbols=symbols,
                        sentiment=sentiment,
                        category=NewsCategory.SOCIAL,
                        raw_data=post_data,
                    )
                )

        return items