summaryrefslogtreecommitdiff
path: root/services/news-collector/src/news_collector/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'services/news-collector/src/news_collector/main.py')
-rw-r--r--services/news-collector/src/news_collector/main.py81
1 files changed, 46 insertions, 35 deletions
diff --git a/services/news-collector/src/news_collector/main.py b/services/news-collector/src/news_collector/main.py
index 3493f7c..c39fa67 100644
--- a/services/news-collector/src/news_collector/main.py
+++ b/services/news-collector/src/news_collector/main.py
@@ -1,8 +1,18 @@
"""News Collector Service — fetches news from multiple sources and aggregates sentiment."""
import asyncio
-from datetime import datetime, timezone
+from datetime import UTC, datetime
+import aiohttp
+
+from news_collector.collectors.fear_greed import FearGreedCollector
+from news_collector.collectors.fed import FedCollector
+from news_collector.collectors.finnhub import FinnhubCollector
+from news_collector.collectors.reddit import RedditCollector
+from news_collector.collectors.rss import RSSCollector
+from news_collector.collectors.sec_edgar import SecEdgarCollector
+from news_collector.collectors.truth_social import TruthSocialCollector
+from news_collector.config import NewsCollectorConfig
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import NewsEvent
@@ -11,20 +21,9 @@ from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.models import NewsItem
from shared.notifier import TelegramNotifier
-from shared.sentiment_models import MarketSentiment
from shared.sentiment import SentimentAggregator
-
-from news_collector.config import NewsCollectorConfig
-from news_collector.collectors.finnhub import FinnhubCollector
-from news_collector.collectors.rss import RSSCollector
-from news_collector.collectors.sec_edgar import SecEdgarCollector
-from news_collector.collectors.truth_social import TruthSocialCollector
-from news_collector.collectors.reddit import RedditCollector
-from news_collector.collectors.fear_greed import FearGreedCollector
-from news_collector.collectors.fed import FedCollector
-
-# Health check port: base + 4
-HEALTH_PORT_OFFSET = 4
+from shared.sentiment_models import MarketSentiment
+from shared.shutdown import GracefulShutdown
async def run_collector_once(collector, db: Database, broker: RedisBroker) -> int:
@@ -53,9 +52,15 @@ async def run_collector_loop(collector, db: Database, broker: RedisBroker, log)
collector=collector.name,
count=count,
)
- except Exception as exc:
+ except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc:
log.warning(
- "collector_error",
+ "collector_network_error",
+ collector=collector.name,
+ error=str(exc),
+ )
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning(
+ "collector_parse_error",
collector=collector.name,
error=str(exc),
)
@@ -74,7 +79,7 @@ async def run_fear_greed_loop(collector: FearGreedCollector, db: Database, log)
vix=None,
fed_stance="neutral",
market_regime=_determine_regime(result.fear_greed, None),
- updated_at=datetime.now(timezone.utc),
+ updated_at=datetime.now(UTC),
)
await db.upsert_market_sentiment(ms)
log.info(
@@ -82,8 +87,10 @@ async def run_fear_greed_loop(collector: FearGreedCollector, db: Database, log)
value=result.fear_greed,
label=result.fear_greed_label,
)
- except Exception as exc:
- log.warning("fear_greed_error", error=str(exc))
+ except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc:
+ log.warning("fear_greed_network_error", error=str(exc))
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("fear_greed_parse_error", error=str(exc))
await asyncio.sleep(collector.poll_interval)
@@ -93,14 +100,16 @@ async def run_aggregator_loop(db: Database, interval: int, log) -> None:
while True:
await asyncio.sleep(interval)
try:
- now = datetime.now(timezone.utc)
+ now = datetime.now(UTC)
news_items = await db.get_recent_news(hours=24)
scores = aggregator.aggregate(news_items, now)
for score in scores.values():
await db.upsert_symbol_score(score)
log.info("aggregation_complete", symbols=len(scores))
- except Exception as exc:
- log.warning("aggregator_error", error=str(exc))
+ except (ConnectionError, TimeoutError) as exc:
+ log.warning("aggregator_network_error", error=str(exc))
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("aggregator_parse_error", error=str(exc))
def _determine_regime(fear_greed: int, vix: float | None) -> str:
@@ -115,14 +124,14 @@ async def run() -> None:
metrics = ServiceMetrics("news_collector")
notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token,
+ bot_token=config.telegram_bot_token.get_secret_value(),
chat_id=config.telegram_chat_id,
)
- db = Database(config.database_url)
+ db = Database(config.database_url.get_secret_value())
await db.connect()
- broker = RedisBroker(config.redis_url)
+ broker = RedisBroker(config.redis_url.get_secret_value())
health = HealthCheckServer(
"news-collector",
@@ -133,7 +142,7 @@ async def run() -> None:
metrics.service_up.labels(service="news-collector").set(1)
# Build collectors
- finnhub = FinnhubCollector(api_key=config.finnhub_api_key)
+ finnhub = FinnhubCollector(api_key=config.finnhub_api_key.get_secret_value())
rss = RSSCollector()
sec = SecEdgarCollector()
truth = TruthSocialCollector()
@@ -143,6 +152,9 @@ async def run() -> None:
news_collectors = [finnhub, rss, sec, truth, reddit, fed]
+ shutdown = GracefulShutdown()
+ shutdown.install_handlers()
+
log.info(
"starting",
collectors=[c.name for c in news_collectors],
@@ -151,14 +163,13 @@ async def run() -> None:
)
try:
- tasks = []
- for collector in news_collectors:
- tasks.append(
- asyncio.create_task(
- run_collector_loop(collector, db, broker, log),
- name=f"collector-{collector.name}",
- )
+ tasks = [
+ asyncio.create_task(
+ run_collector_loop(collector, db, broker, log),
+ name=f"collector-{collector.name}",
)
+ for collector in news_collectors
+ ]
tasks.append(
asyncio.create_task(
run_fear_greed_loop(fear_greed, db, log),
@@ -171,9 +182,9 @@ async def run() -> None:
name="aggregator-loop",
)
)
- await asyncio.gather(*tasks)
+ await shutdown.wait()
except Exception as exc:
- log.error("fatal_error", error=str(exc))
+ log.error("fatal_error", error=str(exc), exc_info=True)
await notifier.send_error(str(exc), "news-collector")
raise
finally: