diff options
Diffstat (limited to 'services/news-collector/src/news_collector/collectors')
9 files changed, 674 insertions, 0 deletions
diff --git a/services/news-collector/src/news_collector/collectors/__init__.py b/services/news-collector/src/news_collector/collectors/__init__.py new file mode 100644 index 0000000..5ef36a7 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/__init__.py @@ -0,0 +1 @@ +"""News collectors.""" diff --git a/services/news-collector/src/news_collector/collectors/base.py b/services/news-collector/src/news_collector/collectors/base.py new file mode 100644 index 0000000..bb43fd6 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/base.py @@ -0,0 +1,18 @@ +"""Base class for all news collectors.""" + +from abc import ABC, abstractmethod + +from shared.models import NewsItem + + +class BaseCollector(ABC): + name: str = "base" + poll_interval: int = 300 # seconds + + @abstractmethod + async def collect(self) -> list[NewsItem]: + """Collect news items from the source.""" + + @abstractmethod + async def is_available(self) -> bool: + """Check if this data source is accessible.""" diff --git a/services/news-collector/src/news_collector/collectors/fear_greed.py b/services/news-collector/src/news_collector/collectors/fear_greed.py new file mode 100644 index 0000000..42e8f88 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/fear_greed.py @@ -0,0 +1,62 @@ +"""CNN Fear & Greed Index collector.""" + +import logging +from dataclasses import dataclass + +import aiohttp + +from news_collector.collectors.base import BaseCollector + +logger = logging.getLogger(__name__) + +FEAR_GREED_URL = "https://production.dataviz.cnn.io/index/fearandgreed/graphdata" + + +@dataclass +class FearGreedResult: + fear_greed: int + fear_greed_label: str + + +class FearGreedCollector(BaseCollector): + name = "fear_greed" + poll_interval = 3600 # 1 hour + + async def is_available(self) -> bool: + return True + + async def _fetch_index(self) -> dict | None: + headers = {"User-Agent": "Mozilla/5.0"} + try: + async with aiohttp.ClientSession() as session: + async with session.get( + FEAR_GREED_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=10) + ) as resp: + if resp.status != 200: + return None + return await resp.json() + except Exception: + return None + + def _classify(self, score: int) -> str: + if score <= 20: + return "Extreme Fear" + if score <= 40: + return "Fear" + if score <= 60: + return "Neutral" + if score <= 80: + return "Greed" + return "Extreme Greed" + + async def collect(self) -> FearGreedResult | None: + data = await self._fetch_index() + if data is None: + return None + try: + fg = data["fear_and_greed"] + score = int(fg["score"]) + label = fg.get("rating", self._classify(score)) + return FearGreedResult(fear_greed=score, fear_greed_label=label) + except (KeyError, ValueError, TypeError): + return None diff --git a/services/news-collector/src/news_collector/collectors/fed.py b/services/news-collector/src/news_collector/collectors/fed.py new file mode 100644 index 0000000..52128e5 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/fed.py @@ -0,0 +1,119 @@ +"""Federal Reserve RSS collector with hawkish/dovish/neutral stance detection.""" + +import asyncio +import logging +from calendar import timegm +from datetime import UTC, datetime + +import feedparser +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem + +from .base import BaseCollector + +logger = logging.getLogger(__name__) + +_FED_RSS_URL = "https://www.federalreserve.gov/feeds/press_all.xml" + +_HAWKISH_KEYWORDS = [ + "rate hike", + "interest rate increase", + "tighten", + "tightening", + "inflation", + "hawkish", + "restrictive", + "raise rates", + "hike rates", +] +_DOVISH_KEYWORDS = [ + "rate cut", + "interest rate decrease", + "easing", + "ease", + "stimulus", + "dovish", + "accommodative", + "lower rates", + "cut rates", + "quantitative easing", +] + + +def _detect_stance(text: str) -> str: + lower = text.lower() + hawkish_hits = sum(1 for kw in _HAWKISH_KEYWORDS if kw in lower) + dovish_hits = sum(1 for kw in _DOVISH_KEYWORDS if kw in lower) + if hawkish_hits > dovish_hits: + return "hawkish" + if dovish_hits > hawkish_hits: + return "dovish" + return "neutral" + + +class FedCollector(BaseCollector): + name: str = "fed" + poll_interval: int = 3600 + + def __init__(self) -> None: + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_fed_rss(self) -> list[dict]: + loop = asyncio.get_event_loop() + try: + parsed = await loop.run_in_executor(None, feedparser.parse, _FED_RSS_URL) + return parsed.get("entries", []) + except Exception as exc: + logger.error("Fed RSS fetch failed: %s", exc) + return [] + + def _parse_published(self, entry: dict) -> datetime: + published_parsed = entry.get("published_parsed") + if published_parsed: + try: + ts = timegm(published_parsed) + return datetime.fromtimestamp(ts, tz=UTC) + except Exception: + pass + return datetime.now(UTC) + + async def collect(self) -> list[NewsItem]: + try: + entries = await self._fetch_fed_rss() + except Exception as exc: + logger.error("Fed collector error: %s", exc) + return [] + + items: list[NewsItem] = [] + + for entry in entries: + title = entry.get("title", "").strip() + if not title: + continue + + summary = entry.get("summary", "") or "" + combined = f"{title} {summary}" + + sentiment = self._vader.polarity_scores(combined)["compound"] + stance = _detect_stance(combined) + published_at = self._parse_published(entry) + + items.append( + NewsItem( + source=self.name, + headline=title, + summary=summary or None, + url=entry.get("link") or None, + published_at=published_at, + symbols=[], + sentiment=sentiment, + category=NewsCategory.FED, + raw_data={"stance": stance, **dict(entry)}, + ) + ) + + return items diff --git a/services/news-collector/src/news_collector/collectors/finnhub.py b/services/news-collector/src/news_collector/collectors/finnhub.py new file mode 100644 index 0000000..67cb455 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/finnhub.py @@ -0,0 +1,88 @@ +"""Finnhub news collector with VADER sentiment analysis.""" + +import logging +from datetime import UTC, datetime + +import aiohttp +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem + +from .base import BaseCollector + +logger = logging.getLogger(__name__) + +_CATEGORY_KEYWORDS: dict[NewsCategory, list[str]] = { + NewsCategory.FED: ["fed", "fomc", "rate", "federal reserve"], + NewsCategory.POLICY: ["tariff", "trump", "regulation", "policy", "trade war"], + NewsCategory.EARNINGS: ["earnings", "revenue", "profit", "eps", "guidance", "quarter"], +} + + +def _categorize(text: str) -> NewsCategory: + lower = text.lower() + for category, keywords in _CATEGORY_KEYWORDS.items(): + if any(kw in lower for kw in keywords): + return category + return NewsCategory.MACRO + + +class FinnhubCollector(BaseCollector): + name: str = "finnhub" + poll_interval: int = 300 + + _BASE_URL = "https://finnhub.io/api/v1/news" + + def __init__(self, api_key: str) -> None: + self._api_key = api_key + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return bool(self._api_key) + + async def _fetch_news(self) -> list[dict]: + url = f"{self._BASE_URL}?category=general&token={self._api_key}" + async with aiohttp.ClientSession() as session: + async with session.get(url) as resp: + resp.raise_for_status() + return await resp.json() + + async def collect(self) -> list[NewsItem]: + try: + raw_items = await self._fetch_news() + except Exception as exc: + logger.error("Finnhub fetch failed: %s", exc) + return [] + + items: list[NewsItem] = [] + for article in raw_items: + headline = article.get("headline", "") + summary = article.get("summary", "") + combined = f"{headline} {summary}" + + sentiment_scores = self._vader.polarity_scores(combined) + sentiment = sentiment_scores["compound"] + + ts = article.get("datetime", 0) + published_at = datetime.fromtimestamp(ts, tz=UTC) + + related = article.get("related", "") + symbols = [t.strip() for t in related.split(",") if t.strip()] if related else [] + + category = _categorize(combined) + + items.append( + NewsItem( + source=self.name, + headline=headline, + summary=summary or None, + url=article.get("url") or None, + published_at=published_at, + symbols=symbols, + sentiment=sentiment, + category=category, + raw_data=article, + ) + ) + + return items diff --git a/services/news-collector/src/news_collector/collectors/reddit.py b/services/news-collector/src/news_collector/collectors/reddit.py new file mode 100644 index 0000000..4e9d6f5 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/reddit.py @@ -0,0 +1,97 @@ +"""Reddit social sentiment collector using JSON API with VADER sentiment analysis.""" + +import logging +import re +from datetime import UTC, datetime + +import aiohttp +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem + +from .base import BaseCollector + +logger = logging.getLogger(__name__) + +_SUBREDDITS = ["wallstreetbets", "stocks", "investing"] +_MIN_SCORE = 50 + +_TICKER_PATTERN = re.compile( + r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|BRK\.?[AB]|JPM|V|UNH|XOM|" + r"JNJ|WMT|MA|PG|HD|CVX|MRK|LLY|ABBV|PFE|BAC|KO|AVGO|COST|MCD|TMO|" + r"CSCO|ACN|ABT|DHR|TXN|NEE|NFLX|PM|UPS|RTX|HON|QCOM|AMGN|LOW|IBM|" + r"INTC|AMD|PYPL|GS|MS|BLK|SPGI|CAT|DE|GE|MMM|BA|F|GM|DIS|CMCSA)\b" +) + + +class RedditCollector(BaseCollector): + name: str = "reddit" + poll_interval: int = 900 + + def __init__(self) -> None: + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_subreddit(self, subreddit: str) -> list[dict]: + url = f"https://www.reddit.com/r/{subreddit}/hot.json?limit=25" + headers = {"User-Agent": "TradingPlatform/1.0 (research@example.com)"} + try: + async with aiohttp.ClientSession() as session: + async with session.get( + url, headers=headers, timeout=aiohttp.ClientTimeout(total=10) + ) as resp: + if resp.status == 200: + data = await resp.json() + return data.get("data", {}).get("children", []) + except Exception as exc: + logger.error("Reddit fetch failed for r/%s: %s", subreddit, exc) + return [] + + async def collect(self) -> list[NewsItem]: + seen_titles: set[str] = set() + items: list[NewsItem] = [] + + for subreddit in _SUBREDDITS: + try: + posts = await self._fetch_subreddit(subreddit) + except Exception as exc: + logger.error("Reddit collector error for r/%s: %s", subreddit, exc) + continue + + for post in posts: + post_data = post.get("data", {}) + title = post_data.get("title", "").strip() + score = post_data.get("score", 0) + + if not title or score < _MIN_SCORE: + continue + if title in seen_titles: + continue + seen_titles.add(title) + + selftext = post_data.get("selftext", "") or "" + combined = f"{title} {selftext}" + + sentiment = self._vader.polarity_scores(combined)["compound"] + symbols = list(dict.fromkeys(_TICKER_PATTERN.findall(combined))) + + created_utc = post_data.get("created_utc", 0) + published_at = datetime.fromtimestamp(created_utc, tz=UTC) + + items.append( + NewsItem( + source=self.name, + headline=title, + summary=selftext or None, + url=post_data.get("url") or None, + published_at=published_at, + symbols=symbols, + sentiment=sentiment, + category=NewsCategory.SOCIAL, + raw_data=post_data, + ) + ) + + return items diff --git a/services/news-collector/src/news_collector/collectors/rss.py b/services/news-collector/src/news_collector/collectors/rss.py new file mode 100644 index 0000000..bca0e9f --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/rss.py @@ -0,0 +1,105 @@ +"""RSS news collector using feedparser with VADER sentiment analysis.""" + +import asyncio +import logging +import re +from datetime import UTC, datetime +from time import mktime + +import feedparser +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem + +from .base import BaseCollector + +logger = logging.getLogger(__name__) + +_DEFAULT_FEEDS = [ + "https://finance.yahoo.com/news/rssindex", + "https://news.google.com/rss/search?q=stock+market+finance&hl=en-US&gl=US&ceid=US:en", + "https://feeds.marketwatch.com/marketwatch/topstories/", +] + +_TICKER_PATTERN = re.compile( + r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|BRK\.?[AB]|JPM|V|UNH|XOM|" + r"JNJ|WMT|MA|PG|HD|CVX|MRK|LLY|ABBV|PFE|BAC|KO|AVGO|COST|MCD|TMO|" + r"CSCO|ACN|ABT|DHR|TXN|NEE|NFLX|PM|UPS|RTX|HON|QCOM|AMGN|LOW|IBM|" + r"INTC|AMD|PYPL|GS|MS|BLK|SPGI|CAT|DE|GE|MMM|BA|F|GM|DIS|CMCSA)\b" +) + + +class RSSCollector(BaseCollector): + name: str = "rss" + poll_interval: int = 600 + + def __init__(self, feeds: list[str] | None = None) -> None: + self._feeds = feeds if feeds is not None else _DEFAULT_FEEDS + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_feeds(self) -> list[dict]: + loop = asyncio.get_event_loop() + results = [] + for url in self._feeds: + try: + parsed = await loop.run_in_executor(None, feedparser.parse, url) + results.append(parsed) + except Exception as exc: + logger.error("RSS fetch failed for %s: %s", url, exc) + return results + + def _parse_published(self, entry: dict) -> datetime: + parsed_time = entry.get("published_parsed") + if parsed_time: + try: + ts = mktime(parsed_time) + return datetime.fromtimestamp(ts, tz=UTC) + except Exception: + pass + return datetime.now(UTC) + + async def collect(self) -> list[NewsItem]: + try: + feeds = await self._fetch_feeds() + except Exception as exc: + logger.error("RSS collector error: %s", exc) + return [] + + seen_titles: set[str] = set() + items: list[NewsItem] = [] + + for feed in feeds: + for entry in feed.get("entries", []): + title = entry.get("title", "").strip() + if not title or title in seen_titles: + continue + seen_titles.add(title) + + summary = entry.get("summary", "") or "" + combined = f"{title} {summary}" + + sentiment_scores = self._vader.polarity_scores(combined) + sentiment = sentiment_scores["compound"] + + symbols = list(dict.fromkeys(_TICKER_PATTERN.findall(combined))) + + published_at = self._parse_published(entry) + + items.append( + NewsItem( + source=self.name, + headline=title, + summary=summary or None, + url=entry.get("link") or None, + published_at=published_at, + symbols=symbols, + sentiment=sentiment, + category=NewsCategory.MACRO, + raw_data=dict(entry), + ) + ) + + return items diff --git a/services/news-collector/src/news_collector/collectors/sec_edgar.py b/services/news-collector/src/news_collector/collectors/sec_edgar.py new file mode 100644 index 0000000..d88518f --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/sec_edgar.py @@ -0,0 +1,98 @@ +"""SEC EDGAR filing collector (free, no API key required).""" + +import logging +from datetime import UTC, datetime + +import aiohttp +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from news_collector.collectors.base import BaseCollector +from shared.models import NewsCategory, NewsItem + +logger = logging.getLogger(__name__) + +TRACKED_CIKS = { + "0000320193": "AAPL", + "0000789019": "MSFT", + "0001652044": "GOOGL", + "0001018724": "AMZN", + "0001318605": "TSLA", + "0001045810": "NVDA", + "0001326801": "META", + "0000019617": "JPM", + "0000078003": "PFE", + "0000021344": "KO", +} + +SEC_USER_AGENT = "TradingPlatform research@example.com" + + +class SecEdgarCollector(BaseCollector): + name = "sec_edgar" + poll_interval = 1800 # 30 minutes + + def __init__(self) -> None: + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_recent_filings(self) -> list[dict]: + results = [] + headers = {"User-Agent": SEC_USER_AGENT} + async with aiohttp.ClientSession() as session: + for cik, ticker in TRACKED_CIKS.items(): + try: + url = f"https://data.sec.gov/submissions/CIK{cik}.json" + async with session.get( + url, headers=headers, timeout=aiohttp.ClientTimeout(total=10) + ) as resp: + if resp.status == 200: + data = await resp.json() + data["tickers"] = [{"ticker": ticker}] + results.append(data) + except Exception as exc: + logger.warning("sec_fetch_failed", cik=cik, error=str(exc)) + return results + + async def collect(self) -> list[NewsItem]: + filings_data = await self._fetch_recent_filings() + items = [] + today = datetime.now(UTC).strftime("%Y-%m-%d") + + for company_data in filings_data: + tickers = [t["ticker"] for t in company_data.get("tickers", [])] + company_name = company_data.get("name", "Unknown") + recent = company_data.get("filings", {}).get("recent", {}) + + forms = recent.get("form", []) + dates = recent.get("filingDate", []) + descriptions = recent.get("primaryDocDescription", []) + accessions = recent.get("accessionNumber", []) + + for i, form in enumerate(forms): + if form != "8-K": + continue + filing_date = dates[i] if i < len(dates) else "" + if filing_date != today: + continue + + desc = descriptions[i] if i < len(descriptions) else "8-K Filing" + accession = accessions[i] if i < len(accessions) else "" + headline = f"{company_name} ({', '.join(tickers)}): {form} - {desc}" + + items.append( + NewsItem( + source=self.name, + headline=headline, + summary=desc, + url=f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&accession={accession}", + published_at=datetime.strptime(filing_date, "%Y-%m-%d").replace(tzinfo=UTC), + symbols=tickers, + sentiment=self._vader.polarity_scores(headline)["compound"], + category=NewsCategory.FILING, + raw_data={"form": form, "accession": accession}, + ) + ) + + return items diff --git a/services/news-collector/src/news_collector/collectors/truth_social.py b/services/news-collector/src/news_collector/collectors/truth_social.py new file mode 100644 index 0000000..e2acd88 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/truth_social.py @@ -0,0 +1,86 @@ +"""Truth Social collector using Mastodon-compatible API with VADER sentiment analysis.""" + +import logging +import re +from datetime import UTC, datetime + +import aiohttp +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem + +from .base import BaseCollector + +logger = logging.getLogger(__name__) + +_TRUMP_ACCOUNT_ID = "107780257626128497" +_API_URL = f"https://truthsocial.com/api/v1/accounts/{_TRUMP_ACCOUNT_ID}/statuses" + +_HTML_TAG_PATTERN = re.compile(r"<[^>]+>") + + +def _strip_html(text: str) -> str: + return _HTML_TAG_PATTERN.sub("", text).strip() + + +class TruthSocialCollector(BaseCollector): + name: str = "truth_social" + poll_interval: int = 900 + + def __init__(self) -> None: + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_posts(self) -> list[dict]: + headers = {"User-Agent": "TradingPlatform/1.0 (research@example.com)"} + try: + async with aiohttp.ClientSession() as session: + async with session.get( + _API_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=10) + ) as resp: + if resp.status == 200: + return await resp.json() + except Exception as exc: + logger.error("Truth Social fetch failed: %s", exc) + return [] + + async def collect(self) -> list[NewsItem]: + try: + posts = await self._fetch_posts() + except Exception as exc: + logger.error("Truth Social collector error: %s", exc) + return [] + + items: list[NewsItem] = [] + + for post in posts: + raw_content = post.get("content", "") or "" + content = _strip_html(raw_content) + if not content: + continue + + sentiment = self._vader.polarity_scores(content)["compound"] + + created_at_str = post.get("created_at", "") + try: + published_at = datetime.fromisoformat(created_at_str.replace("Z", "+00:00")) + except Exception: + published_at = datetime.now(UTC) + + items.append( + NewsItem( + source=self.name, + headline=content[:200], + summary=content if len(content) > 200 else None, + url=post.get("url") or None, + published_at=published_at, + symbols=[], + sentiment=sentiment, + category=NewsCategory.POLICY, + raw_data=post, + ) + ) + + return items |
