"""Truth Social collector using Mastodon-compatible API with VADER sentiment analysis.""" import logging import re from datetime import UTC, datetime import aiohttp from nltk.sentiment.vader import SentimentIntensityAnalyzer from shared.models import NewsCategory, NewsItem from .base import BaseCollector logger = logging.getLogger(__name__) _TRUMP_ACCOUNT_ID = "107780257626128497" _API_URL = f"https://truthsocial.com/api/v1/accounts/{_TRUMP_ACCOUNT_ID}/statuses" _HTML_TAG_PATTERN = re.compile(r"<[^>]+>") def _strip_html(text: str) -> str: return _HTML_TAG_PATTERN.sub("", text).strip() class TruthSocialCollector(BaseCollector): name: str = "truth_social" poll_interval: int = 900 def __init__(self) -> None: self._vader = SentimentIntensityAnalyzer() async def is_available(self) -> bool: return True async def _fetch_posts(self) -> list[dict]: headers = {"User-Agent": "TradingPlatform/1.0 (research@example.com)"} try: async with aiohttp.ClientSession() as session: async with session.get( _API_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=10) ) as resp: if resp.status == 200: return await resp.json() except Exception as exc: logger.error("Truth Social fetch failed: %s", exc) return [] async def collect(self) -> list[NewsItem]: try: posts = await self._fetch_posts() except Exception as exc: logger.error("Truth Social collector error: %s", exc) return [] items: list[NewsItem] = [] for post in posts: raw_content = post.get("content", "") or "" content = _strip_html(raw_content) if not content: continue sentiment = self._vader.polarity_scores(content)["compound"] created_at_str = post.get("created_at", "") try: published_at = datetime.fromisoformat(created_at_str.replace("Z", "+00:00")) except Exception: published_at = datetime.now(UTC) items.append( NewsItem( source=self.name, headline=content[:200], summary=content if len(content) > 200 else None, url=post.get("url") or None, published_at=published_at, symbols=[], sentiment=sentiment, category=NewsCategory.POLICY, raw_data=post, ) ) return items