1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
"""Finnhub news collector with VADER sentiment analysis."""
import logging
from datetime import datetime, timezone
import aiohttp
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from shared.models import NewsCategory, NewsItem
from .base import BaseCollector
logger = logging.getLogger(__name__)
_CATEGORY_KEYWORDS: dict[NewsCategory, list[str]] = {
NewsCategory.FED: ["fed", "fomc", "rate", "federal reserve"],
NewsCategory.POLICY: ["tariff", "trump", "regulation", "policy", "trade war"],
NewsCategory.EARNINGS: ["earnings", "revenue", "profit", "eps", "guidance", "quarter"],
}
def _categorize(text: str) -> NewsCategory:
lower = text.lower()
for category, keywords in _CATEGORY_KEYWORDS.items():
if any(kw in lower for kw in keywords):
return category
return NewsCategory.MACRO
class FinnhubCollector(BaseCollector):
name: str = "finnhub"
poll_interval: int = 300
_BASE_URL = "https://finnhub.io/api/v1/news"
def __init__(self, api_key: str) -> None:
self._api_key = api_key
self._vader = SentimentIntensityAnalyzer()
async def is_available(self) -> bool:
return bool(self._api_key)
async def _fetch_news(self) -> list[dict]:
url = f"{self._BASE_URL}?category=general&token={self._api_key}"
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
resp.raise_for_status()
return await resp.json()
async def collect(self) -> list[NewsItem]:
try:
raw_items = await self._fetch_news()
except Exception as exc:
logger.error("Finnhub fetch failed: %s", exc)
return []
items: list[NewsItem] = []
for article in raw_items:
headline = article.get("headline", "")
summary = article.get("summary", "")
combined = f"{headline} {summary}"
sentiment_scores = self._vader.polarity_scores(combined)
sentiment = sentiment_scores["compound"]
ts = article.get("datetime", 0)
published_at = datetime.fromtimestamp(ts, tz=timezone.utc)
related = article.get("related", "")
symbols = [t.strip() for t in related.split(",") if t.strip()] if related else []
category = _categorize(combined)
items.append(
NewsItem(
source=self.name,
headline=headline,
summary=summary or None,
url=article.get("url") or None,
published_at=published_at,
symbols=symbols,
sentiment=sentiment,
category=category,
raw_data=article,
)
)
return items
|