diff options
Diffstat (limited to 'services/news-collector/src/news_collector')
12 files changed, 886 insertions, 0 deletions
diff --git a/services/news-collector/src/news_collector/__init__.py b/services/news-collector/src/news_collector/__init__.py new file mode 100644 index 0000000..5547af2 --- /dev/null +++ b/services/news-collector/src/news_collector/__init__.py @@ -0,0 +1 @@ +"""News collector service.""" diff --git a/services/news-collector/src/news_collector/collectors/__init__.py b/services/news-collector/src/news_collector/collectors/__init__.py new file mode 100644 index 0000000..5ef36a7 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/__init__.py @@ -0,0 +1 @@ +"""News collectors.""" diff --git a/services/news-collector/src/news_collector/collectors/base.py b/services/news-collector/src/news_collector/collectors/base.py new file mode 100644 index 0000000..bb43fd6 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/base.py @@ -0,0 +1,18 @@ +"""Base class for all news collectors.""" + +from abc import ABC, abstractmethod + +from shared.models import NewsItem + + +class BaseCollector(ABC): + name: str = "base" + poll_interval: int = 300 # seconds + + @abstractmethod + async def collect(self) -> list[NewsItem]: + """Collect news items from the source.""" + + @abstractmethod + async def is_available(self) -> bool: + """Check if this data source is accessible.""" diff --git a/services/news-collector/src/news_collector/collectors/fear_greed.py b/services/news-collector/src/news_collector/collectors/fear_greed.py new file mode 100644 index 0000000..42e8f88 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/fear_greed.py @@ -0,0 +1,62 @@ +"""CNN Fear & Greed Index collector.""" + +import logging +from dataclasses import dataclass + +import aiohttp + +from news_collector.collectors.base import BaseCollector + +logger = logging.getLogger(__name__) + +FEAR_GREED_URL = "https://production.dataviz.cnn.io/index/fearandgreed/graphdata" + + +@dataclass +class FearGreedResult: + fear_greed: int + fear_greed_label: str + + +class FearGreedCollector(BaseCollector): + name = "fear_greed" + poll_interval = 3600 # 1 hour + + async def is_available(self) -> bool: + return True + + async def _fetch_index(self) -> dict | None: + headers = {"User-Agent": "Mozilla/5.0"} + try: + async with aiohttp.ClientSession() as session: + async with session.get( + FEAR_GREED_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=10) + ) as resp: + if resp.status != 200: + return None + return await resp.json() + except Exception: + return None + + def _classify(self, score: int) -> str: + if score <= 20: + return "Extreme Fear" + if score <= 40: + return "Fear" + if score <= 60: + return "Neutral" + if score <= 80: + return "Greed" + return "Extreme Greed" + + async def collect(self) -> FearGreedResult | None: + data = await self._fetch_index() + if data is None: + return None + try: + fg = data["fear_and_greed"] + score = int(fg["score"]) + label = fg.get("rating", self._classify(score)) + return FearGreedResult(fear_greed=score, fear_greed_label=label) + except (KeyError, ValueError, TypeError): + return None diff --git a/services/news-collector/src/news_collector/collectors/fed.py b/services/news-collector/src/news_collector/collectors/fed.py new file mode 100644 index 0000000..52128e5 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/fed.py @@ -0,0 +1,119 @@ +"""Federal Reserve RSS collector with hawkish/dovish/neutral stance detection.""" + +import asyncio +import logging +from calendar import timegm +from datetime import UTC, datetime + +import feedparser +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem + +from .base import BaseCollector + +logger = logging.getLogger(__name__) + +_FED_RSS_URL = "https://www.federalreserve.gov/feeds/press_all.xml" + +_HAWKISH_KEYWORDS = [ + "rate hike", + "interest rate increase", + "tighten", + "tightening", + "inflation", + "hawkish", + "restrictive", + "raise rates", + "hike rates", +] +_DOVISH_KEYWORDS = [ + "rate cut", + "interest rate decrease", + "easing", + "ease", + "stimulus", + "dovish", + "accommodative", + "lower rates", + "cut rates", + "quantitative easing", +] + + +def _detect_stance(text: str) -> str: + lower = text.lower() + hawkish_hits = sum(1 for kw in _HAWKISH_KEYWORDS if kw in lower) + dovish_hits = sum(1 for kw in _DOVISH_KEYWORDS if kw in lower) + if hawkish_hits > dovish_hits: + return "hawkish" + if dovish_hits > hawkish_hits: + return "dovish" + return "neutral" + + +class FedCollector(BaseCollector): + name: str = "fed" + poll_interval: int = 3600 + + def __init__(self) -> None: + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_fed_rss(self) -> list[dict]: + loop = asyncio.get_event_loop() + try: + parsed = await loop.run_in_executor(None, feedparser.parse, _FED_RSS_URL) + return parsed.get("entries", []) + except Exception as exc: + logger.error("Fed RSS fetch failed: %s", exc) + return [] + + def _parse_published(self, entry: dict) -> datetime: + published_parsed = entry.get("published_parsed") + if published_parsed: + try: + ts = timegm(published_parsed) + return datetime.fromtimestamp(ts, tz=UTC) + except Exception: + pass + return datetime.now(UTC) + + async def collect(self) -> list[NewsItem]: + try: + entries = await self._fetch_fed_rss() + except Exception as exc: + logger.error("Fed collector error: %s", exc) + return [] + + items: list[NewsItem] = [] + + for entry in entries: + title = entry.get("title", "").strip() + if not title: + continue + + summary = entry.get("summary", "") or "" + combined = f"{title} {summary}" + + sentiment = self._vader.polarity_scores(combined)["compound"] + stance = _detect_stance(combined) + published_at = self._parse_published(entry) + + items.append( + NewsItem( + source=self.name, + headline=title, + summary=summary or None, + url=entry.get("link") or None, + published_at=published_at, + symbols=[], + sentiment=sentiment, + category=NewsCategory.FED, + raw_data={"stance": stance, **dict(entry)}, + ) + ) + + return items diff --git a/services/news-collector/src/news_collector/collectors/finnhub.py b/services/news-collector/src/news_collector/collectors/finnhub.py new file mode 100644 index 0000000..67cb455 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/finnhub.py @@ -0,0 +1,88 @@ +"""Finnhub news collector with VADER sentiment analysis.""" + +import logging +from datetime import UTC, datetime + +import aiohttp +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem + +from .base import BaseCollector + +logger = logging.getLogger(__name__) + +_CATEGORY_KEYWORDS: dict[NewsCategory, list[str]] = { + NewsCategory.FED: ["fed", "fomc", "rate", "federal reserve"], + NewsCategory.POLICY: ["tariff", "trump", "regulation", "policy", "trade war"], + NewsCategory.EARNINGS: ["earnings", "revenue", "profit", "eps", "guidance", "quarter"], +} + + +def _categorize(text: str) -> NewsCategory: + lower = text.lower() + for category, keywords in _CATEGORY_KEYWORDS.items(): + if any(kw in lower for kw in keywords): + return category + return NewsCategory.MACRO + + +class FinnhubCollector(BaseCollector): + name: str = "finnhub" + poll_interval: int = 300 + + _BASE_URL = "https://finnhub.io/api/v1/news" + + def __init__(self, api_key: str) -> None: + self._api_key = api_key + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return bool(self._api_key) + + async def _fetch_news(self) -> list[dict]: + url = f"{self._BASE_URL}?category=general&token={self._api_key}" + async with aiohttp.ClientSession() as session: + async with session.get(url) as resp: + resp.raise_for_status() + return await resp.json() + + async def collect(self) -> list[NewsItem]: + try: + raw_items = await self._fetch_news() + except Exception as exc: + logger.error("Finnhub fetch failed: %s", exc) + return [] + + items: list[NewsItem] = [] + for article in raw_items: + headline = article.get("headline", "") + summary = article.get("summary", "") + combined = f"{headline} {summary}" + + sentiment_scores = self._vader.polarity_scores(combined) + sentiment = sentiment_scores["compound"] + + ts = article.get("datetime", 0) + published_at = datetime.fromtimestamp(ts, tz=UTC) + + related = article.get("related", "") + symbols = [t.strip() for t in related.split(",") if t.strip()] if related else [] + + category = _categorize(combined) + + items.append( + NewsItem( + source=self.name, + headline=headline, + summary=summary or None, + url=article.get("url") or None, + published_at=published_at, + symbols=symbols, + sentiment=sentiment, + category=category, + raw_data=article, + ) + ) + + return items diff --git a/services/news-collector/src/news_collector/collectors/reddit.py b/services/news-collector/src/news_collector/collectors/reddit.py new file mode 100644 index 0000000..4e9d6f5 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/reddit.py @@ -0,0 +1,97 @@ +"""Reddit social sentiment collector using JSON API with VADER sentiment analysis.""" + +import logging +import re +from datetime import UTC, datetime + +import aiohttp +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem + +from .base import BaseCollector + +logger = logging.getLogger(__name__) + +_SUBREDDITS = ["wallstreetbets", "stocks", "investing"] +_MIN_SCORE = 50 + +_TICKER_PATTERN = re.compile( + r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|BRK\.?[AB]|JPM|V|UNH|XOM|" + r"JNJ|WMT|MA|PG|HD|CVX|MRK|LLY|ABBV|PFE|BAC|KO|AVGO|COST|MCD|TMO|" + r"CSCO|ACN|ABT|DHR|TXN|NEE|NFLX|PM|UPS|RTX|HON|QCOM|AMGN|LOW|IBM|" + r"INTC|AMD|PYPL|GS|MS|BLK|SPGI|CAT|DE|GE|MMM|BA|F|GM|DIS|CMCSA)\b" +) + + +class RedditCollector(BaseCollector): + name: str = "reddit" + poll_interval: int = 900 + + def __init__(self) -> None: + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_subreddit(self, subreddit: str) -> list[dict]: + url = f"https://www.reddit.com/r/{subreddit}/hot.json?limit=25" + headers = {"User-Agent": "TradingPlatform/1.0 (research@example.com)"} + try: + async with aiohttp.ClientSession() as session: + async with session.get( + url, headers=headers, timeout=aiohttp.ClientTimeout(total=10) + ) as resp: + if resp.status == 200: + data = await resp.json() + return data.get("data", {}).get("children", []) + except Exception as exc: + logger.error("Reddit fetch failed for r/%s: %s", subreddit, exc) + return [] + + async def collect(self) -> list[NewsItem]: + seen_titles: set[str] = set() + items: list[NewsItem] = [] + + for subreddit in _SUBREDDITS: + try: + posts = await self._fetch_subreddit(subreddit) + except Exception as exc: + logger.error("Reddit collector error for r/%s: %s", subreddit, exc) + continue + + for post in posts: + post_data = post.get("data", {}) + title = post_data.get("title", "").strip() + score = post_data.get("score", 0) + + if not title or score < _MIN_SCORE: + continue + if title in seen_titles: + continue + seen_titles.add(title) + + selftext = post_data.get("selftext", "") or "" + combined = f"{title} {selftext}" + + sentiment = self._vader.polarity_scores(combined)["compound"] + symbols = list(dict.fromkeys(_TICKER_PATTERN.findall(combined))) + + created_utc = post_data.get("created_utc", 0) + published_at = datetime.fromtimestamp(created_utc, tz=UTC) + + items.append( + NewsItem( + source=self.name, + headline=title, + summary=selftext or None, + url=post_data.get("url") or None, + published_at=published_at, + symbols=symbols, + sentiment=sentiment, + category=NewsCategory.SOCIAL, + raw_data=post_data, + ) + ) + + return items diff --git a/services/news-collector/src/news_collector/collectors/rss.py b/services/news-collector/src/news_collector/collectors/rss.py new file mode 100644 index 0000000..bca0e9f --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/rss.py @@ -0,0 +1,105 @@ +"""RSS news collector using feedparser with VADER sentiment analysis.""" + +import asyncio +import logging +import re +from datetime import UTC, datetime +from time import mktime + +import feedparser +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem + +from .base import BaseCollector + +logger = logging.getLogger(__name__) + +_DEFAULT_FEEDS = [ + "https://finance.yahoo.com/news/rssindex", + "https://news.google.com/rss/search?q=stock+market+finance&hl=en-US&gl=US&ceid=US:en", + "https://feeds.marketwatch.com/marketwatch/topstories/", +] + +_TICKER_PATTERN = re.compile( + r"\b(AAPL|MSFT|GOOGL|GOOG|AMZN|TSLA|NVDA|META|BRK\.?[AB]|JPM|V|UNH|XOM|" + r"JNJ|WMT|MA|PG|HD|CVX|MRK|LLY|ABBV|PFE|BAC|KO|AVGO|COST|MCD|TMO|" + r"CSCO|ACN|ABT|DHR|TXN|NEE|NFLX|PM|UPS|RTX|HON|QCOM|AMGN|LOW|IBM|" + r"INTC|AMD|PYPL|GS|MS|BLK|SPGI|CAT|DE|GE|MMM|BA|F|GM|DIS|CMCSA)\b" +) + + +class RSSCollector(BaseCollector): + name: str = "rss" + poll_interval: int = 600 + + def __init__(self, feeds: list[str] | None = None) -> None: + self._feeds = feeds if feeds is not None else _DEFAULT_FEEDS + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_feeds(self) -> list[dict]: + loop = asyncio.get_event_loop() + results = [] + for url in self._feeds: + try: + parsed = await loop.run_in_executor(None, feedparser.parse, url) + results.append(parsed) + except Exception as exc: + logger.error("RSS fetch failed for %s: %s", url, exc) + return results + + def _parse_published(self, entry: dict) -> datetime: + parsed_time = entry.get("published_parsed") + if parsed_time: + try: + ts = mktime(parsed_time) + return datetime.fromtimestamp(ts, tz=UTC) + except Exception: + pass + return datetime.now(UTC) + + async def collect(self) -> list[NewsItem]: + try: + feeds = await self._fetch_feeds() + except Exception as exc: + logger.error("RSS collector error: %s", exc) + return [] + + seen_titles: set[str] = set() + items: list[NewsItem] = [] + + for feed in feeds: + for entry in feed.get("entries", []): + title = entry.get("title", "").strip() + if not title or title in seen_titles: + continue + seen_titles.add(title) + + summary = entry.get("summary", "") or "" + combined = f"{title} {summary}" + + sentiment_scores = self._vader.polarity_scores(combined) + sentiment = sentiment_scores["compound"] + + symbols = list(dict.fromkeys(_TICKER_PATTERN.findall(combined))) + + published_at = self._parse_published(entry) + + items.append( + NewsItem( + source=self.name, + headline=title, + summary=summary or None, + url=entry.get("link") or None, + published_at=published_at, + symbols=symbols, + sentiment=sentiment, + category=NewsCategory.MACRO, + raw_data=dict(entry), + ) + ) + + return items diff --git a/services/news-collector/src/news_collector/collectors/sec_edgar.py b/services/news-collector/src/news_collector/collectors/sec_edgar.py new file mode 100644 index 0000000..d88518f --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/sec_edgar.py @@ -0,0 +1,98 @@ +"""SEC EDGAR filing collector (free, no API key required).""" + +import logging +from datetime import UTC, datetime + +import aiohttp +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from news_collector.collectors.base import BaseCollector +from shared.models import NewsCategory, NewsItem + +logger = logging.getLogger(__name__) + +TRACKED_CIKS = { + "0000320193": "AAPL", + "0000789019": "MSFT", + "0001652044": "GOOGL", + "0001018724": "AMZN", + "0001318605": "TSLA", + "0001045810": "NVDA", + "0001326801": "META", + "0000019617": "JPM", + "0000078003": "PFE", + "0000021344": "KO", +} + +SEC_USER_AGENT = "TradingPlatform research@example.com" + + +class SecEdgarCollector(BaseCollector): + name = "sec_edgar" + poll_interval = 1800 # 30 minutes + + def __init__(self) -> None: + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_recent_filings(self) -> list[dict]: + results = [] + headers = {"User-Agent": SEC_USER_AGENT} + async with aiohttp.ClientSession() as session: + for cik, ticker in TRACKED_CIKS.items(): + try: + url = f"https://data.sec.gov/submissions/CIK{cik}.json" + async with session.get( + url, headers=headers, timeout=aiohttp.ClientTimeout(total=10) + ) as resp: + if resp.status == 200: + data = await resp.json() + data["tickers"] = [{"ticker": ticker}] + results.append(data) + except Exception as exc: + logger.warning("sec_fetch_failed", cik=cik, error=str(exc)) + return results + + async def collect(self) -> list[NewsItem]: + filings_data = await self._fetch_recent_filings() + items = [] + today = datetime.now(UTC).strftime("%Y-%m-%d") + + for company_data in filings_data: + tickers = [t["ticker"] for t in company_data.get("tickers", [])] + company_name = company_data.get("name", "Unknown") + recent = company_data.get("filings", {}).get("recent", {}) + + forms = recent.get("form", []) + dates = recent.get("filingDate", []) + descriptions = recent.get("primaryDocDescription", []) + accessions = recent.get("accessionNumber", []) + + for i, form in enumerate(forms): + if form != "8-K": + continue + filing_date = dates[i] if i < len(dates) else "" + if filing_date != today: + continue + + desc = descriptions[i] if i < len(descriptions) else "8-K Filing" + accession = accessions[i] if i < len(accessions) else "" + headline = f"{company_name} ({', '.join(tickers)}): {form} - {desc}" + + items.append( + NewsItem( + source=self.name, + headline=headline, + summary=desc, + url=f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&accession={accession}", + published_at=datetime.strptime(filing_date, "%Y-%m-%d").replace(tzinfo=UTC), + symbols=tickers, + sentiment=self._vader.polarity_scores(headline)["compound"], + category=NewsCategory.FILING, + raw_data={"form": form, "accession": accession}, + ) + ) + + return items diff --git a/services/news-collector/src/news_collector/collectors/truth_social.py b/services/news-collector/src/news_collector/collectors/truth_social.py new file mode 100644 index 0000000..e2acd88 --- /dev/null +++ b/services/news-collector/src/news_collector/collectors/truth_social.py @@ -0,0 +1,86 @@ +"""Truth Social collector using Mastodon-compatible API with VADER sentiment analysis.""" + +import logging +import re +from datetime import UTC, datetime + +import aiohttp +from nltk.sentiment.vader import SentimentIntensityAnalyzer + +from shared.models import NewsCategory, NewsItem + +from .base import BaseCollector + +logger = logging.getLogger(__name__) + +_TRUMP_ACCOUNT_ID = "107780257626128497" +_API_URL = f"https://truthsocial.com/api/v1/accounts/{_TRUMP_ACCOUNT_ID}/statuses" + +_HTML_TAG_PATTERN = re.compile(r"<[^>]+>") + + +def _strip_html(text: str) -> str: + return _HTML_TAG_PATTERN.sub("", text).strip() + + +class TruthSocialCollector(BaseCollector): + name: str = "truth_social" + poll_interval: int = 900 + + def __init__(self) -> None: + self._vader = SentimentIntensityAnalyzer() + + async def is_available(self) -> bool: + return True + + async def _fetch_posts(self) -> list[dict]: + headers = {"User-Agent": "TradingPlatform/1.0 (research@example.com)"} + try: + async with aiohttp.ClientSession() as session: + async with session.get( + _API_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=10) + ) as resp: + if resp.status == 200: + return await resp.json() + except Exception as exc: + logger.error("Truth Social fetch failed: %s", exc) + return [] + + async def collect(self) -> list[NewsItem]: + try: + posts = await self._fetch_posts() + except Exception as exc: + logger.error("Truth Social collector error: %s", exc) + return [] + + items: list[NewsItem] = [] + + for post in posts: + raw_content = post.get("content", "") or "" + content = _strip_html(raw_content) + if not content: + continue + + sentiment = self._vader.polarity_scores(content)["compound"] + + created_at_str = post.get("created_at", "") + try: + published_at = datetime.fromisoformat(created_at_str.replace("Z", "+00:00")) + except Exception: + published_at = datetime.now(UTC) + + items.append( + NewsItem( + source=self.name, + headline=content[:200], + summary=content if len(content) > 200 else None, + url=post.get("url") or None, + published_at=published_at, + symbols=[], + sentiment=sentiment, + category=NewsCategory.POLICY, + raw_data=post, + ) + ) + + return items diff --git a/services/news-collector/src/news_collector/config.py b/services/news-collector/src/news_collector/config.py new file mode 100644 index 0000000..6e78eba --- /dev/null +++ b/services/news-collector/src/news_collector/config.py @@ -0,0 +1,7 @@ +"""News Collector configuration.""" + +from shared.config import Settings + + +class NewsCollectorConfig(Settings): + health_port: int = 8084 diff --git a/services/news-collector/src/news_collector/main.py b/services/news-collector/src/news_collector/main.py new file mode 100644 index 0000000..c39fa67 --- /dev/null +++ b/services/news-collector/src/news_collector/main.py @@ -0,0 +1,204 @@ +"""News Collector Service — fetches news from multiple sources and aggregates sentiment.""" + +import asyncio +from datetime import UTC, datetime + +import aiohttp + +from news_collector.collectors.fear_greed import FearGreedCollector +from news_collector.collectors.fed import FedCollector +from news_collector.collectors.finnhub import FinnhubCollector +from news_collector.collectors.reddit import RedditCollector +from news_collector.collectors.rss import RSSCollector +from news_collector.collectors.sec_edgar import SecEdgarCollector +from news_collector.collectors.truth_social import TruthSocialCollector +from news_collector.config import NewsCollectorConfig +from shared.broker import RedisBroker +from shared.db import Database +from shared.events import NewsEvent +from shared.healthcheck import HealthCheckServer +from shared.logging import setup_logging +from shared.metrics import ServiceMetrics +from shared.models import NewsItem +from shared.notifier import TelegramNotifier +from shared.sentiment import SentimentAggregator +from shared.sentiment_models import MarketSentiment +from shared.shutdown import GracefulShutdown + + +async def run_collector_once(collector, db: Database, broker: RedisBroker) -> int: + """Run a single collector, store results in DB, publish to Redis. + + Returns the number of items collected. + """ + items: list[NewsItem] = await collector.collect() + count = 0 + for item in items: + await db.insert_news_item(item) + event = NewsEvent(data=item) + stream = f"news.{item.category.value}" + await broker.publish(stream, event.to_dict()) + count += 1 + return count + + +async def run_collector_loop(collector, db: Database, broker: RedisBroker, log) -> None: + """Run a collector repeatedly on its configured poll_interval.""" + while True: + try: + count = await run_collector_once(collector, db, broker) + log.info( + "collector_ran", + collector=collector.name, + count=count, + ) + except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc: + log.warning( + "collector_network_error", + collector=collector.name, + error=str(exc), + ) + except (ValueError, KeyError, TypeError) as exc: + log.warning( + "collector_parse_error", + collector=collector.name, + error=str(exc), + ) + await asyncio.sleep(collector.poll_interval) + + +async def run_fear_greed_loop(collector: FearGreedCollector, db: Database, log) -> None: + """Fetch Fear & Greed index on its interval and update MarketSentiment in DB.""" + while True: + try: + result = await collector.collect() + if result is not None: + ms = MarketSentiment( + fear_greed=result.fear_greed, + fear_greed_label=result.fear_greed_label, + vix=None, + fed_stance="neutral", + market_regime=_determine_regime(result.fear_greed, None), + updated_at=datetime.now(UTC), + ) + await db.upsert_market_sentiment(ms) + log.info( + "fear_greed_updated", + value=result.fear_greed, + label=result.fear_greed_label, + ) + except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc: + log.warning("fear_greed_network_error", error=str(exc)) + except (ValueError, KeyError, TypeError) as exc: + log.warning("fear_greed_parse_error", error=str(exc)) + await asyncio.sleep(collector.poll_interval) + + +async def run_aggregator_loop(db: Database, interval: int, log) -> None: + """Run SentimentAggregator every interval seconds and persist scores.""" + aggregator = SentimentAggregator() + while True: + await asyncio.sleep(interval) + try: + now = datetime.now(UTC) + news_items = await db.get_recent_news(hours=24) + scores = aggregator.aggregate(news_items, now) + for score in scores.values(): + await db.upsert_symbol_score(score) + log.info("aggregation_complete", symbols=len(scores)) + except (ConnectionError, TimeoutError) as exc: + log.warning("aggregator_network_error", error=str(exc)) + except (ValueError, KeyError, TypeError) as exc: + log.warning("aggregator_parse_error", error=str(exc)) + + +def _determine_regime(fear_greed: int, vix: float | None) -> str: + """Classify market regime from fear/greed index and optional VIX.""" + aggregator = SentimentAggregator() + return aggregator.determine_regime(fear_greed, vix) + + +async def run() -> None: + config = NewsCollectorConfig() + log = setup_logging("news-collector", config.log_level, config.log_format) + metrics = ServiceMetrics("news_collector") + + notifier = TelegramNotifier( + bot_token=config.telegram_bot_token.get_secret_value(), + chat_id=config.telegram_chat_id, + ) + + db = Database(config.database_url.get_secret_value()) + await db.connect() + + broker = RedisBroker(config.redis_url.get_secret_value()) + + health = HealthCheckServer( + "news-collector", + port=config.health_port, + auth_token=config.metrics_auth_token, + ) + await health.start() + metrics.service_up.labels(service="news-collector").set(1) + + # Build collectors + finnhub = FinnhubCollector(api_key=config.finnhub_api_key.get_secret_value()) + rss = RSSCollector() + sec = SecEdgarCollector() + truth = TruthSocialCollector() + reddit = RedditCollector() + fear_greed = FearGreedCollector() + fed = FedCollector() + + news_collectors = [finnhub, rss, sec, truth, reddit, fed] + + shutdown = GracefulShutdown() + shutdown.install_handlers() + + log.info( + "starting", + collectors=[c.name for c in news_collectors], + poll_interval=config.news_poll_interval, + aggregate_interval=config.sentiment_aggregate_interval, + ) + + try: + tasks = [ + asyncio.create_task( + run_collector_loop(collector, db, broker, log), + name=f"collector-{collector.name}", + ) + for collector in news_collectors + ] + tasks.append( + asyncio.create_task( + run_fear_greed_loop(fear_greed, db, log), + name="fear-greed-loop", + ) + ) + tasks.append( + asyncio.create_task( + run_aggregator_loop(db, config.sentiment_aggregate_interval, log), + name="aggregator-loop", + ) + ) + await shutdown.wait() + except Exception as exc: + log.error("fatal_error", error=str(exc), exc_info=True) + await notifier.send_error(str(exc), "news-collector") + raise + finally: + metrics.service_up.labels(service="news-collector").set(0) + for task in tasks: + task.cancel() + await notifier.close() + await broker.close() + await db.close() + + +def main() -> None: + asyncio.run(run()) + + +if __name__ == "__main__": + main() |
