summaryrefslogtreecommitdiff
path: root/services/news-collector/src/news_collector/collectors/truth_social.py
blob: e2acd886981f4a16951630322b0cea5884ba2b74 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
"""Truth Social collector using Mastodon-compatible API with VADER sentiment analysis."""

import logging
import re
from datetime import UTC, datetime

import aiohttp
from nltk.sentiment.vader import SentimentIntensityAnalyzer

from shared.models import NewsCategory, NewsItem

from .base import BaseCollector

logger = logging.getLogger(__name__)

_TRUMP_ACCOUNT_ID = "107780257626128497"
_API_URL = f"https://truthsocial.com/api/v1/accounts/{_TRUMP_ACCOUNT_ID}/statuses"

_HTML_TAG_PATTERN = re.compile(r"<[^>]+>")


def _strip_html(text: str) -> str:
    return _HTML_TAG_PATTERN.sub("", text).strip()


class TruthSocialCollector(BaseCollector):
    name: str = "truth_social"
    poll_interval: int = 900

    def __init__(self) -> None:
        self._vader = SentimentIntensityAnalyzer()

    async def is_available(self) -> bool:
        return True

    async def _fetch_posts(self) -> list[dict]:
        headers = {"User-Agent": "TradingPlatform/1.0 (research@example.com)"}
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(
                    _API_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=10)
                ) as resp:
                    if resp.status == 200:
                        return await resp.json()
        except Exception as exc:
            logger.error("Truth Social fetch failed: %s", exc)
        return []

    async def collect(self) -> list[NewsItem]:
        try:
            posts = await self._fetch_posts()
        except Exception as exc:
            logger.error("Truth Social collector error: %s", exc)
            return []

        items: list[NewsItem] = []

        for post in posts:
            raw_content = post.get("content", "") or ""
            content = _strip_html(raw_content)
            if not content:
                continue

            sentiment = self._vader.polarity_scores(content)["compound"]

            created_at_str = post.get("created_at", "")
            try:
                published_at = datetime.fromisoformat(created_at_str.replace("Z", "+00:00"))
            except Exception:
                published_at = datetime.now(UTC)

            items.append(
                NewsItem(
                    source=self.name,
                    headline=content[:200],
                    summary=content if len(content) > 200 else None,
                    url=post.get("url") or None,
                    published_at=published_at,
                    symbols=[],
                    sentiment=sentiment,
                    category=NewsCategory.POLICY,
                    raw_data=post,
                )
            )

        return items