summaryrefslogtreecommitdiff
path: root/services/news-collector
diff options
context:
space:
mode:
Diffstat (limited to 'services/news-collector')
-rw-r--r--services/news-collector/Dockerfile12
-rw-r--r--services/news-collector/pyproject.toml7
-rw-r--r--services/news-collector/src/news_collector/collectors/fear_greed.py5
-rw-r--r--services/news-collector/src/news_collector/collectors/fed.py6
-rw-r--r--services/news-collector/src/news_collector/collectors/finnhub.py4
-rw-r--r--services/news-collector/src/news_collector/collectors/reddit.py4
-rw-r--r--services/news-collector/src/news_collector/collectors/rss.py6
-rw-r--r--services/news-collector/src/news_collector/collectors/sec_edgar.py10
-rw-r--r--services/news-collector/src/news_collector/collectors/truth_social.py4
-rw-r--r--services/news-collector/src/news_collector/config.py3
-rw-r--r--services/news-collector/src/news_collector/main.py81
-rw-r--r--services/news-collector/tests/test_fear_greed.py2
-rw-r--r--services/news-collector/tests/test_fed.py3
-rw-r--r--services/news-collector/tests/test_finnhub.py2
-rw-r--r--services/news-collector/tests/test_main.py8
-rw-r--r--services/news-collector/tests/test_reddit.py3
-rw-r--r--services/news-collector/tests/test_rss.py2
-rw-r--r--services/news-collector/tests/test_sec_edgar.py8
-rw-r--r--services/news-collector/tests/test_truth_social.py3
19 files changed, 93 insertions, 80 deletions
diff --git a/services/news-collector/Dockerfile b/services/news-collector/Dockerfile
index a8e5902..7accee2 100644
--- a/services/news-collector/Dockerfile
+++ b/services/news-collector/Dockerfile
@@ -1,9 +1,17 @@
-FROM python:3.12-slim
+FROM python:3.12-slim AS builder
WORKDIR /app
COPY shared/ shared/
RUN pip install --no-cache-dir ./shared
COPY services/news-collector/ services/news-collector/
RUN pip install --no-cache-dir ./services/news-collector
-RUN python -c "import nltk; nltk.download('vader_lexicon', quiet=True)"
+RUN python -c "import nltk; nltk.download('vader_lexicon', download_dir='/usr/local/nltk_data')"
+
+FROM python:3.12-slim
+RUN useradd -r -s /bin/false appuser
+WORKDIR /app
+COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
+COPY --from=builder /usr/local/bin /usr/local/bin
+COPY --from=builder /usr/local/nltk_data /usr/local/nltk_data
ENV PYTHONPATH=/app
+USER appuser
CMD ["python", "-m", "news_collector.main"]
diff --git a/services/news-collector/pyproject.toml b/services/news-collector/pyproject.toml
index 14c856a..6e62b70 100644
--- a/services/news-collector/pyproject.toml
+++ b/services/news-collector/pyproject.toml
@@ -3,12 +3,7 @@ name = "news-collector"
version = "0.1.0"
description = "News and sentiment data collector service"
requires-python = ">=3.12"
-dependencies = [
- "trading-shared",
- "feedparser>=6.0",
- "nltk>=3.8",
- "aiohttp>=3.9",
-]
+dependencies = ["trading-shared", "feedparser>=6.0,<7", "nltk>=3.8,<4", "aiohttp>=3.9,<4"]
[project.optional-dependencies]
dev = [
diff --git a/services/news-collector/src/news_collector/collectors/fear_greed.py b/services/news-collector/src/news_collector/collectors/fear_greed.py
index f79f716..42e8f88 100644
--- a/services/news-collector/src/news_collector/collectors/fear_greed.py
+++ b/services/news-collector/src/news_collector/collectors/fear_greed.py
@@ -2,7 +2,6 @@
import logging
from dataclasses import dataclass
-from typing import Optional
import aiohttp
@@ -26,7 +25,7 @@ class FearGreedCollector(BaseCollector):
async def is_available(self) -> bool:
return True
- async def _fetch_index(self) -> Optional[dict]:
+ async def _fetch_index(self) -> dict | None:
headers = {"User-Agent": "Mozilla/5.0"}
try:
async with aiohttp.ClientSession() as session:
@@ -50,7 +49,7 @@ class FearGreedCollector(BaseCollector):
return "Greed"
return "Extreme Greed"
- async def collect(self) -> Optional[FearGreedResult]:
+ async def collect(self) -> FearGreedResult | None:
data = await self._fetch_index()
if data is None:
return None
diff --git a/services/news-collector/src/news_collector/collectors/fed.py b/services/news-collector/src/news_collector/collectors/fed.py
index fce4842..52128e5 100644
--- a/services/news-collector/src/news_collector/collectors/fed.py
+++ b/services/news-collector/src/news_collector/collectors/fed.py
@@ -3,7 +3,7 @@
import asyncio
import logging
from calendar import timegm
-from datetime import datetime, timezone
+from datetime import UTC, datetime
import feedparser
from nltk.sentiment.vader import SentimentIntensityAnalyzer
@@ -76,10 +76,10 @@ class FedCollector(BaseCollector):
if published_parsed:
try:
ts = timegm(published_parsed)
- return datetime.fromtimestamp(ts, tz=timezone.utc)
+ return datetime.fromtimestamp(ts, tz=UTC)
except Exception:
pass
- return datetime.now(timezone.utc)
+ return datetime.now(UTC)
async def collect(self) -> list[NewsItem]:
try:
diff --git a/services/news-collector/src/news_collector/collectors/finnhub.py b/services/news-collector/src/news_collector/collectors/finnhub.py
index 13e3602..67cb455 100644
--- a/services/news-collector/src/news_collector/collectors/finnhub.py
+++ b/services/news-collector/src/news_collector/collectors/finnhub.py
@@ -1,7 +1,7 @@
"""Finnhub news collector with VADER sentiment analysis."""
import logging
-from datetime import datetime, timezone
+from datetime import UTC, datetime
import aiohttp
from nltk.sentiment.vader import SentimentIntensityAnalyzer
@@ -64,7 +64,7 @@ class FinnhubCollector(BaseCollector):
sentiment = sentiment_scores["compound"]
ts = article.get("datetime", 0)
- published_at = datetime.fromtimestamp(ts, tz=timezone.utc)
+ published_at = datetime.fromtimestamp(ts, tz=UTC)
related = article.get("related", "")
symbols = [t.strip() for t in related.split(",") if t.strip()] if related else []
diff --git a/services/news-collector/src/news_collector/collectors/reddit.py b/services/news-collector/src/news_collector/collectors/reddit.py
index 226a2f9..4e9d6f5 100644
--- a/services/news-collector/src/news_collector/collectors/reddit.py
+++ b/services/news-collector/src/news_collector/collectors/reddit.py
@@ -2,7 +2,7 @@
import logging
import re
-from datetime import datetime, timezone
+from datetime import UTC, datetime
import aiohttp
from nltk.sentiment.vader import SentimentIntensityAnalyzer
@@ -78,7 +78,7 @@ class RedditCollector(BaseCollector):
symbols = list(dict.fromkeys(_TICKER_PATTERN.findall(combined)))
created_utc = post_data.get("created_utc", 0)
- published_at = datetime.fromtimestamp(created_utc, tz=timezone.utc)
+ published_at = datetime.fromtimestamp(created_utc, tz=UTC)
items.append(
NewsItem(
diff --git a/services/news-collector/src/news_collector/collectors/rss.py b/services/news-collector/src/news_collector/collectors/rss.py
index ddf8503..bca0e9f 100644
--- a/services/news-collector/src/news_collector/collectors/rss.py
+++ b/services/news-collector/src/news_collector/collectors/rss.py
@@ -3,7 +3,7 @@
import asyncio
import logging
import re
-from datetime import datetime, timezone
+from datetime import UTC, datetime
from time import mktime
import feedparser
@@ -56,10 +56,10 @@ class RSSCollector(BaseCollector):
if parsed_time:
try:
ts = mktime(parsed_time)
- return datetime.fromtimestamp(ts, tz=timezone.utc)
+ return datetime.fromtimestamp(ts, tz=UTC)
except Exception:
pass
- return datetime.now(timezone.utc)
+ return datetime.now(UTC)
async def collect(self) -> list[NewsItem]:
try:
diff --git a/services/news-collector/src/news_collector/collectors/sec_edgar.py b/services/news-collector/src/news_collector/collectors/sec_edgar.py
index ca1d070..d88518f 100644
--- a/services/news-collector/src/news_collector/collectors/sec_edgar.py
+++ b/services/news-collector/src/news_collector/collectors/sec_edgar.py
@@ -1,13 +1,13 @@
"""SEC EDGAR filing collector (free, no API key required)."""
import logging
-from datetime import datetime, timezone
+from datetime import UTC, datetime
import aiohttp
from nltk.sentiment.vader import SentimentIntensityAnalyzer
-from shared.models import NewsCategory, NewsItem
from news_collector.collectors.base import BaseCollector
+from shared.models import NewsCategory, NewsItem
logger = logging.getLogger(__name__)
@@ -58,7 +58,7 @@ class SecEdgarCollector(BaseCollector):
async def collect(self) -> list[NewsItem]:
filings_data = await self._fetch_recent_filings()
items = []
- today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
+ today = datetime.now(UTC).strftime("%Y-%m-%d")
for company_data in filings_data:
tickers = [t["ticker"] for t in company_data.get("tickers", [])]
@@ -87,9 +87,7 @@ class SecEdgarCollector(BaseCollector):
headline=headline,
summary=desc,
url=f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&accession={accession}",
- published_at=datetime.strptime(filing_date, "%Y-%m-%d").replace(
- tzinfo=timezone.utc
- ),
+ published_at=datetime.strptime(filing_date, "%Y-%m-%d").replace(tzinfo=UTC),
symbols=tickers,
sentiment=self._vader.polarity_scores(headline)["compound"],
category=NewsCategory.FILING,
diff --git a/services/news-collector/src/news_collector/collectors/truth_social.py b/services/news-collector/src/news_collector/collectors/truth_social.py
index 33ebc86..e2acd88 100644
--- a/services/news-collector/src/news_collector/collectors/truth_social.py
+++ b/services/news-collector/src/news_collector/collectors/truth_social.py
@@ -2,7 +2,7 @@
import logging
import re
-from datetime import datetime, timezone
+from datetime import UTC, datetime
import aiohttp
from nltk.sentiment.vader import SentimentIntensityAnalyzer
@@ -67,7 +67,7 @@ class TruthSocialCollector(BaseCollector):
try:
published_at = datetime.fromisoformat(created_at_str.replace("Z", "+00:00"))
except Exception:
- published_at = datetime.now(timezone.utc)
+ published_at = datetime.now(UTC)
items.append(
NewsItem(
diff --git a/services/news-collector/src/news_collector/config.py b/services/news-collector/src/news_collector/config.py
index 70d98f1..6e78eba 100644
--- a/services/news-collector/src/news_collector/config.py
+++ b/services/news-collector/src/news_collector/config.py
@@ -5,6 +5,3 @@ from shared.config import Settings
class NewsCollectorConfig(Settings):
health_port: int = 8084
- finnhub_api_key: str = ""
- news_poll_interval: int = 300
- sentiment_aggregate_interval: int = 900
diff --git a/services/news-collector/src/news_collector/main.py b/services/news-collector/src/news_collector/main.py
index 3493f7c..c39fa67 100644
--- a/services/news-collector/src/news_collector/main.py
+++ b/services/news-collector/src/news_collector/main.py
@@ -1,8 +1,18 @@
"""News Collector Service — fetches news from multiple sources and aggregates sentiment."""
import asyncio
-from datetime import datetime, timezone
+from datetime import UTC, datetime
+import aiohttp
+
+from news_collector.collectors.fear_greed import FearGreedCollector
+from news_collector.collectors.fed import FedCollector
+from news_collector.collectors.finnhub import FinnhubCollector
+from news_collector.collectors.reddit import RedditCollector
+from news_collector.collectors.rss import RSSCollector
+from news_collector.collectors.sec_edgar import SecEdgarCollector
+from news_collector.collectors.truth_social import TruthSocialCollector
+from news_collector.config import NewsCollectorConfig
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import NewsEvent
@@ -11,20 +21,9 @@ from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.models import NewsItem
from shared.notifier import TelegramNotifier
-from shared.sentiment_models import MarketSentiment
from shared.sentiment import SentimentAggregator
-
-from news_collector.config import NewsCollectorConfig
-from news_collector.collectors.finnhub import FinnhubCollector
-from news_collector.collectors.rss import RSSCollector
-from news_collector.collectors.sec_edgar import SecEdgarCollector
-from news_collector.collectors.truth_social import TruthSocialCollector
-from news_collector.collectors.reddit import RedditCollector
-from news_collector.collectors.fear_greed import FearGreedCollector
-from news_collector.collectors.fed import FedCollector
-
-# Health check port: base + 4
-HEALTH_PORT_OFFSET = 4
+from shared.sentiment_models import MarketSentiment
+from shared.shutdown import GracefulShutdown
async def run_collector_once(collector, db: Database, broker: RedisBroker) -> int:
@@ -53,9 +52,15 @@ async def run_collector_loop(collector, db: Database, broker: RedisBroker, log)
collector=collector.name,
count=count,
)
- except Exception as exc:
+ except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc:
log.warning(
- "collector_error",
+ "collector_network_error",
+ collector=collector.name,
+ error=str(exc),
+ )
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning(
+ "collector_parse_error",
collector=collector.name,
error=str(exc),
)
@@ -74,7 +79,7 @@ async def run_fear_greed_loop(collector: FearGreedCollector, db: Database, log)
vix=None,
fed_stance="neutral",
market_regime=_determine_regime(result.fear_greed, None),
- updated_at=datetime.now(timezone.utc),
+ updated_at=datetime.now(UTC),
)
await db.upsert_market_sentiment(ms)
log.info(
@@ -82,8 +87,10 @@ async def run_fear_greed_loop(collector: FearGreedCollector, db: Database, log)
value=result.fear_greed,
label=result.fear_greed_label,
)
- except Exception as exc:
- log.warning("fear_greed_error", error=str(exc))
+ except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc:
+ log.warning("fear_greed_network_error", error=str(exc))
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("fear_greed_parse_error", error=str(exc))
await asyncio.sleep(collector.poll_interval)
@@ -93,14 +100,16 @@ async def run_aggregator_loop(db: Database, interval: int, log) -> None:
while True:
await asyncio.sleep(interval)
try:
- now = datetime.now(timezone.utc)
+ now = datetime.now(UTC)
news_items = await db.get_recent_news(hours=24)
scores = aggregator.aggregate(news_items, now)
for score in scores.values():
await db.upsert_symbol_score(score)
log.info("aggregation_complete", symbols=len(scores))
- except Exception as exc:
- log.warning("aggregator_error", error=str(exc))
+ except (ConnectionError, TimeoutError) as exc:
+ log.warning("aggregator_network_error", error=str(exc))
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("aggregator_parse_error", error=str(exc))
def _determine_regime(fear_greed: int, vix: float | None) -> str:
@@ -115,14 +124,14 @@ async def run() -> None:
metrics = ServiceMetrics("news_collector")
notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token,
+ bot_token=config.telegram_bot_token.get_secret_value(),
chat_id=config.telegram_chat_id,
)
- db = Database(config.database_url)
+ db = Database(config.database_url.get_secret_value())
await db.connect()
- broker = RedisBroker(config.redis_url)
+ broker = RedisBroker(config.redis_url.get_secret_value())
health = HealthCheckServer(
"news-collector",
@@ -133,7 +142,7 @@ async def run() -> None:
metrics.service_up.labels(service="news-collector").set(1)
# Build collectors
- finnhub = FinnhubCollector(api_key=config.finnhub_api_key)
+ finnhub = FinnhubCollector(api_key=config.finnhub_api_key.get_secret_value())
rss = RSSCollector()
sec = SecEdgarCollector()
truth = TruthSocialCollector()
@@ -143,6 +152,9 @@ async def run() -> None:
news_collectors = [finnhub, rss, sec, truth, reddit, fed]
+ shutdown = GracefulShutdown()
+ shutdown.install_handlers()
+
log.info(
"starting",
collectors=[c.name for c in news_collectors],
@@ -151,14 +163,13 @@ async def run() -> None:
)
try:
- tasks = []
- for collector in news_collectors:
- tasks.append(
- asyncio.create_task(
- run_collector_loop(collector, db, broker, log),
- name=f"collector-{collector.name}",
- )
+ tasks = [
+ asyncio.create_task(
+ run_collector_loop(collector, db, broker, log),
+ name=f"collector-{collector.name}",
)
+ for collector in news_collectors
+ ]
tasks.append(
asyncio.create_task(
run_fear_greed_loop(fear_greed, db, log),
@@ -171,9 +182,9 @@ async def run() -> None:
name="aggregator-loop",
)
)
- await asyncio.gather(*tasks)
+ await shutdown.wait()
except Exception as exc:
- log.error("fatal_error", error=str(exc))
+ log.error("fatal_error", error=str(exc), exc_info=True)
await notifier.send_error(str(exc), "news-collector")
raise
finally:
diff --git a/services/news-collector/tests/test_fear_greed.py b/services/news-collector/tests/test_fear_greed.py
index d483aa6..e8bd8f0 100644
--- a/services/news-collector/tests/test_fear_greed.py
+++ b/services/news-collector/tests/test_fear_greed.py
@@ -1,8 +1,8 @@
"""Tests for CNN Fear & Greed Index collector."""
-import pytest
from unittest.mock import AsyncMock, patch
+import pytest
from news_collector.collectors.fear_greed import FearGreedCollector
diff --git a/services/news-collector/tests/test_fed.py b/services/news-collector/tests/test_fed.py
index d1a736b..7f1c46c 100644
--- a/services/news-collector/tests/test_fed.py
+++ b/services/news-collector/tests/test_fed.py
@@ -1,7 +1,8 @@
"""Tests for Federal Reserve collector."""
-import pytest
from unittest.mock import AsyncMock, patch
+
+import pytest
from news_collector.collectors.fed import FedCollector
diff --git a/services/news-collector/tests/test_finnhub.py b/services/news-collector/tests/test_finnhub.py
index a4cf169..3af65b8 100644
--- a/services/news-collector/tests/test_finnhub.py
+++ b/services/news-collector/tests/test_finnhub.py
@@ -1,8 +1,8 @@
"""Tests for Finnhub news collector."""
-import pytest
from unittest.mock import AsyncMock, patch
+import pytest
from news_collector.collectors.finnhub import FinnhubCollector
diff --git a/services/news-collector/tests/test_main.py b/services/news-collector/tests/test_main.py
index 66190dc..f85569a 100644
--- a/services/news-collector/tests/test_main.py
+++ b/services/news-collector/tests/test_main.py
@@ -1,16 +1,18 @@
"""Tests for news collector scheduler."""
+from datetime import UTC, datetime
from unittest.mock import AsyncMock, MagicMock
-from datetime import datetime, timezone
-from shared.models import NewsCategory, NewsItem
+
from news_collector.main import run_collector_once
+from shared.models import NewsCategory, NewsItem
+
async def test_run_collector_once_stores_and_publishes():
mock_item = NewsItem(
source="test",
headline="Test news",
- published_at=datetime(2026, 4, 2, tzinfo=timezone.utc),
+ published_at=datetime(2026, 4, 2, tzinfo=UTC),
sentiment=0.5,
category=NewsCategory.MACRO,
)
diff --git a/services/news-collector/tests/test_reddit.py b/services/news-collector/tests/test_reddit.py
index 440b173..31b1dc1 100644
--- a/services/news-collector/tests/test_reddit.py
+++ b/services/news-collector/tests/test_reddit.py
@@ -1,7 +1,8 @@
"""Tests for Reddit collector."""
-import pytest
from unittest.mock import AsyncMock, patch
+
+import pytest
from news_collector.collectors.reddit import RedditCollector
diff --git a/services/news-collector/tests/test_rss.py b/services/news-collector/tests/test_rss.py
index e03250a..7242c75 100644
--- a/services/news-collector/tests/test_rss.py
+++ b/services/news-collector/tests/test_rss.py
@@ -1,8 +1,8 @@
"""Tests for RSS news collector."""
-import pytest
from unittest.mock import AsyncMock, patch
+import pytest
from news_collector.collectors.rss import RSSCollector
diff --git a/services/news-collector/tests/test_sec_edgar.py b/services/news-collector/tests/test_sec_edgar.py
index 5d4f69f..b0faf18 100644
--- a/services/news-collector/tests/test_sec_edgar.py
+++ b/services/news-collector/tests/test_sec_edgar.py
@@ -1,9 +1,9 @@
"""Tests for SEC EDGAR filing collector."""
-import pytest
-from datetime import datetime, timezone
-from unittest.mock import AsyncMock, patch, MagicMock
+from datetime import UTC, datetime
+from unittest.mock import AsyncMock, MagicMock, patch
+import pytest
from news_collector.collectors.sec_edgar import SecEdgarCollector
@@ -37,7 +37,7 @@ async def test_collect_parses_filings(collector):
}
mock_datetime = MagicMock(spec=datetime)
- mock_datetime.now.return_value = datetime(2026, 4, 2, tzinfo=timezone.utc)
+ mock_datetime.now.return_value = datetime(2026, 4, 2, tzinfo=UTC)
mock_datetime.strptime = datetime.strptime
with patch.object(
diff --git a/services/news-collector/tests/test_truth_social.py b/services/news-collector/tests/test_truth_social.py
index 91ddb9d..52f1e46 100644
--- a/services/news-collector/tests/test_truth_social.py
+++ b/services/news-collector/tests/test_truth_social.py
@@ -1,7 +1,8 @@
"""Tests for Truth Social collector."""
-import pytest
from unittest.mock import AsyncMock, patch
+
+import pytest
from news_collector.collectors.truth_social import TruthSocialCollector