summaryrefslogtreecommitdiff
path: root/services/news-collector
diff options
context:
space:
mode:
Diffstat (limited to 'services/news-collector')
-rw-r--r--services/news-collector/src/news_collector/collectors/fed.py104
-rw-r--r--services/news-collector/src/news_collector/collectors/truth_social.py84
-rw-r--r--services/news-collector/tests/test_fed.py25
-rw-r--r--services/news-collector/tests/test_truth_social.py30
4 files changed, 243 insertions, 0 deletions
diff --git a/services/news-collector/src/news_collector/collectors/fed.py b/services/news-collector/src/news_collector/collectors/fed.py
new file mode 100644
index 0000000..47b70f5
--- /dev/null
+++ b/services/news-collector/src/news_collector/collectors/fed.py
@@ -0,0 +1,104 @@
+"""Federal Reserve RSS collector with hawkish/dovish/neutral stance detection."""
+
+import asyncio
+import logging
+from calendar import timegm
+from datetime import datetime, timezone
+
+import feedparser
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+
+from .base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+_FED_RSS_URL = "https://www.federalreserve.gov/feeds/press_all.xml"
+
+_HAWKISH_KEYWORDS = [
+ "rate hike", "interest rate increase", "tighten", "tightening", "inflation",
+ "hawkish", "restrictive", "raise rates", "hike rates",
+]
+_DOVISH_KEYWORDS = [
+ "rate cut", "interest rate decrease", "easing", "ease", "stimulus",
+ "dovish", "accommodative", "lower rates", "cut rates", "quantitative easing",
+]
+
+
+def _detect_stance(text: str) -> str:
+ lower = text.lower()
+ hawkish_hits = sum(1 for kw in _HAWKISH_KEYWORDS if kw in lower)
+ dovish_hits = sum(1 for kw in _DOVISH_KEYWORDS if kw in lower)
+ if hawkish_hits > dovish_hits:
+ return "hawkish"
+ if dovish_hits > hawkish_hits:
+ return "dovish"
+ return "neutral"
+
+
+class FedCollector(BaseCollector):
+ name: str = "fed"
+ poll_interval: int = 3600
+
+ def __init__(self) -> None:
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_fed_rss(self) -> list[dict]:
+ loop = asyncio.get_event_loop()
+ try:
+ parsed = await loop.run_in_executor(None, feedparser.parse, _FED_RSS_URL)
+ return parsed.get("entries", [])
+ except Exception as exc:
+ logger.error("Fed RSS fetch failed: %s", exc)
+ return []
+
+ def _parse_published(self, entry: dict) -> datetime:
+ published_parsed = entry.get("published_parsed")
+ if published_parsed:
+ try:
+ ts = timegm(published_parsed)
+ return datetime.fromtimestamp(ts, tz=timezone.utc)
+ except Exception:
+ pass
+ return datetime.now(timezone.utc)
+
+ async def collect(self) -> list[NewsItem]:
+ try:
+ entries = await self._fetch_fed_rss()
+ except Exception as exc:
+ logger.error("Fed collector error: %s", exc)
+ return []
+
+ items: list[NewsItem] = []
+
+ for entry in entries:
+ title = entry.get("title", "").strip()
+ if not title:
+ continue
+
+ summary = entry.get("summary", "") or ""
+ combined = f"{title} {summary}"
+
+ sentiment = self._vader.polarity_scores(combined)["compound"]
+ stance = _detect_stance(combined)
+ published_at = self._parse_published(entry)
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=title,
+ summary=summary or None,
+ url=entry.get("link") or None,
+ published_at=published_at,
+ symbols=[],
+ sentiment=sentiment,
+ category=NewsCategory.FED,
+ raw_data={"stance": stance, **dict(entry)},
+ )
+ )
+
+ return items
diff --git a/services/news-collector/src/news_collector/collectors/truth_social.py b/services/news-collector/src/news_collector/collectors/truth_social.py
new file mode 100644
index 0000000..2205257
--- /dev/null
+++ b/services/news-collector/src/news_collector/collectors/truth_social.py
@@ -0,0 +1,84 @@
+"""Truth Social collector using Mastodon-compatible API with VADER sentiment analysis."""
+
+import logging
+import re
+from datetime import datetime, timezone
+
+import aiohttp
+from nltk.sentiment.vader import SentimentIntensityAnalyzer
+
+from shared.models import NewsCategory, NewsItem
+
+from .base import BaseCollector
+
+logger = logging.getLogger(__name__)
+
+_TRUMP_ACCOUNT_ID = "107780257626128497"
+_API_URL = f"https://truthsocial.com/api/v1/accounts/{_TRUMP_ACCOUNT_ID}/statuses"
+
+_HTML_TAG_PATTERN = re.compile(r"<[^>]+>")
+
+
+def _strip_html(text: str) -> str:
+ return _HTML_TAG_PATTERN.sub("", text).strip()
+
+
+class TruthSocialCollector(BaseCollector):
+ name: str = "truth_social"
+ poll_interval: int = 900
+
+ def __init__(self) -> None:
+ self._vader = SentimentIntensityAnalyzer()
+
+ async def is_available(self) -> bool:
+ return True
+
+ async def _fetch_posts(self) -> list[dict]:
+ headers = {"User-Agent": "TradingPlatform/1.0 (research@example.com)"}
+ try:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(_API_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=10)) as resp:
+ if resp.status == 200:
+ return await resp.json()
+ except Exception as exc:
+ logger.error("Truth Social fetch failed: %s", exc)
+ return []
+
+ async def collect(self) -> list[NewsItem]:
+ try:
+ posts = await self._fetch_posts()
+ except Exception as exc:
+ logger.error("Truth Social collector error: %s", exc)
+ return []
+
+ items: list[NewsItem] = []
+
+ for post in posts:
+ raw_content = post.get("content", "") or ""
+ content = _strip_html(raw_content)
+ if not content:
+ continue
+
+ sentiment = self._vader.polarity_scores(content)["compound"]
+
+ created_at_str = post.get("created_at", "")
+ try:
+ published_at = datetime.fromisoformat(created_at_str.replace("Z", "+00:00"))
+ except Exception:
+ published_at = datetime.now(timezone.utc)
+
+ items.append(
+ NewsItem(
+ source=self.name,
+ headline=content[:200],
+ summary=content if len(content) > 200 else None,
+ url=post.get("url") or None,
+ published_at=published_at,
+ symbols=[],
+ sentiment=sentiment,
+ category=NewsCategory.POLICY,
+ raw_data=post,
+ )
+ )
+
+ return items
diff --git a/services/news-collector/tests/test_fed.py b/services/news-collector/tests/test_fed.py
new file mode 100644
index 0000000..8acea5f
--- /dev/null
+++ b/services/news-collector/tests/test_fed.py
@@ -0,0 +1,25 @@
+"""Tests for Federal Reserve collector."""
+import pytest
+from unittest.mock import AsyncMock, patch
+from news_collector.collectors.fed import FedCollector
+
+@pytest.fixture
+def collector():
+ return FedCollector()
+
+def test_collector_name(collector):
+ assert collector.name == "fed"
+ assert collector.poll_interval == 3600
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+async def test_collect_parses_rss(collector):
+ mock_entries = [
+ {"title": "Federal Reserve issues FOMC statement", "link": "https://www.federalreserve.gov/newsevents/pressreleases/monetary20260402a.htm", "published_parsed": (2026, 4, 2, 14, 0, 0, 0, 0, 0), "summary": "The Federal Open Market Committee decided to maintain the target range..."},
+ ]
+ with patch.object(collector, "_fetch_fed_rss", new_callable=AsyncMock, return_value=mock_entries):
+ items = await collector.collect()
+ assert len(items) == 1
+ assert items[0].source == "fed"
+ assert items[0].category.value == "fed"
diff --git a/services/news-collector/tests/test_truth_social.py b/services/news-collector/tests/test_truth_social.py
new file mode 100644
index 0000000..bcf8a8c
--- /dev/null
+++ b/services/news-collector/tests/test_truth_social.py
@@ -0,0 +1,30 @@
+"""Tests for Truth Social collector."""
+import pytest
+from unittest.mock import AsyncMock, patch
+from news_collector.collectors.truth_social import TruthSocialCollector
+
+@pytest.fixture
+def collector():
+ return TruthSocialCollector()
+
+def test_collector_name(collector):
+ assert collector.name == "truth_social"
+ assert collector.poll_interval == 900
+
+async def test_is_available(collector):
+ assert await collector.is_available() is True
+
+async def test_collect_parses_posts(collector):
+ mock_posts = [
+ {"content": "<p>We are imposing 25% tariffs on all steel imports!</p>", "created_at": "2026-04-02T12:00:00.000Z", "url": "https://truthsocial.com/@realDonaldTrump/12345", "id": "12345"},
+ ]
+ with patch.object(collector, "_fetch_posts", new_callable=AsyncMock, return_value=mock_posts):
+ items = await collector.collect()
+ assert len(items) == 1
+ assert items[0].source == "truth_social"
+ assert items[0].category.value == "policy"
+
+async def test_collect_handles_empty(collector):
+ with patch.object(collector, "_fetch_posts", new_callable=AsyncMock, return_value=[]):
+ items = await collector.collect()
+ assert items == []