summaryrefslogtreecommitdiff
path: root/services/news-collector/src/news_collector
diff options
context:
space:
mode:
Diffstat (limited to 'services/news-collector/src/news_collector')
-rw-r--r--services/news-collector/src/news_collector/main.py193
1 files changed, 193 insertions, 0 deletions
diff --git a/services/news-collector/src/news_collector/main.py b/services/news-collector/src/news_collector/main.py
new file mode 100644
index 0000000..3493f7c
--- /dev/null
+++ b/services/news-collector/src/news_collector/main.py
@@ -0,0 +1,193 @@
+"""News Collector Service — fetches news from multiple sources and aggregates sentiment."""
+
+import asyncio
+from datetime import datetime, timezone
+
+from shared.broker import RedisBroker
+from shared.db import Database
+from shared.events import NewsEvent
+from shared.healthcheck import HealthCheckServer
+from shared.logging import setup_logging
+from shared.metrics import ServiceMetrics
+from shared.models import NewsItem
+from shared.notifier import TelegramNotifier
+from shared.sentiment_models import MarketSentiment
+from shared.sentiment import SentimentAggregator
+
+from news_collector.config import NewsCollectorConfig
+from news_collector.collectors.finnhub import FinnhubCollector
+from news_collector.collectors.rss import RSSCollector
+from news_collector.collectors.sec_edgar import SecEdgarCollector
+from news_collector.collectors.truth_social import TruthSocialCollector
+from news_collector.collectors.reddit import RedditCollector
+from news_collector.collectors.fear_greed import FearGreedCollector
+from news_collector.collectors.fed import FedCollector
+
+# Health check port: base + 4
+HEALTH_PORT_OFFSET = 4
+
+
+async def run_collector_once(collector, db: Database, broker: RedisBroker) -> int:
+ """Run a single collector, store results in DB, publish to Redis.
+
+ Returns the number of items collected.
+ """
+ items: list[NewsItem] = await collector.collect()
+ count = 0
+ for item in items:
+ await db.insert_news_item(item)
+ event = NewsEvent(data=item)
+ stream = f"news.{item.category.value}"
+ await broker.publish(stream, event.to_dict())
+ count += 1
+ return count
+
+
+async def run_collector_loop(collector, db: Database, broker: RedisBroker, log) -> None:
+ """Run a collector repeatedly on its configured poll_interval."""
+ while True:
+ try:
+ count = await run_collector_once(collector, db, broker)
+ log.info(
+ "collector_ran",
+ collector=collector.name,
+ count=count,
+ )
+ except Exception as exc:
+ log.warning(
+ "collector_error",
+ collector=collector.name,
+ error=str(exc),
+ )
+ await asyncio.sleep(collector.poll_interval)
+
+
+async def run_fear_greed_loop(collector: FearGreedCollector, db: Database, log) -> None:
+ """Fetch Fear & Greed index on its interval and update MarketSentiment in DB."""
+ while True:
+ try:
+ result = await collector.collect()
+ if result is not None:
+ ms = MarketSentiment(
+ fear_greed=result.fear_greed,
+ fear_greed_label=result.fear_greed_label,
+ vix=None,
+ fed_stance="neutral",
+ market_regime=_determine_regime(result.fear_greed, None),
+ updated_at=datetime.now(timezone.utc),
+ )
+ await db.upsert_market_sentiment(ms)
+ log.info(
+ "fear_greed_updated",
+ value=result.fear_greed,
+ label=result.fear_greed_label,
+ )
+ except Exception as exc:
+ log.warning("fear_greed_error", error=str(exc))
+ await asyncio.sleep(collector.poll_interval)
+
+
+async def run_aggregator_loop(db: Database, interval: int, log) -> None:
+ """Run SentimentAggregator every interval seconds and persist scores."""
+ aggregator = SentimentAggregator()
+ while True:
+ await asyncio.sleep(interval)
+ try:
+ now = datetime.now(timezone.utc)
+ news_items = await db.get_recent_news(hours=24)
+ scores = aggregator.aggregate(news_items, now)
+ for score in scores.values():
+ await db.upsert_symbol_score(score)
+ log.info("aggregation_complete", symbols=len(scores))
+ except Exception as exc:
+ log.warning("aggregator_error", error=str(exc))
+
+
+def _determine_regime(fear_greed: int, vix: float | None) -> str:
+ """Classify market regime from fear/greed index and optional VIX."""
+ aggregator = SentimentAggregator()
+ return aggregator.determine_regime(fear_greed, vix)
+
+
+async def run() -> None:
+ config = NewsCollectorConfig()
+ log = setup_logging("news-collector", config.log_level, config.log_format)
+ metrics = ServiceMetrics("news_collector")
+
+ notifier = TelegramNotifier(
+ bot_token=config.telegram_bot_token,
+ chat_id=config.telegram_chat_id,
+ )
+
+ db = Database(config.database_url)
+ await db.connect()
+
+ broker = RedisBroker(config.redis_url)
+
+ health = HealthCheckServer(
+ "news-collector",
+ port=config.health_port,
+ auth_token=config.metrics_auth_token,
+ )
+ await health.start()
+ metrics.service_up.labels(service="news-collector").set(1)
+
+ # Build collectors
+ finnhub = FinnhubCollector(api_key=config.finnhub_api_key)
+ rss = RSSCollector()
+ sec = SecEdgarCollector()
+ truth = TruthSocialCollector()
+ reddit = RedditCollector()
+ fear_greed = FearGreedCollector()
+ fed = FedCollector()
+
+ news_collectors = [finnhub, rss, sec, truth, reddit, fed]
+
+ log.info(
+ "starting",
+ collectors=[c.name for c in news_collectors],
+ poll_interval=config.news_poll_interval,
+ aggregate_interval=config.sentiment_aggregate_interval,
+ )
+
+ try:
+ tasks = []
+ for collector in news_collectors:
+ tasks.append(
+ asyncio.create_task(
+ run_collector_loop(collector, db, broker, log),
+ name=f"collector-{collector.name}",
+ )
+ )
+ tasks.append(
+ asyncio.create_task(
+ run_fear_greed_loop(fear_greed, db, log),
+ name="fear-greed-loop",
+ )
+ )
+ tasks.append(
+ asyncio.create_task(
+ run_aggregator_loop(db, config.sentiment_aggregate_interval, log),
+ name="aggregator-loop",
+ )
+ )
+ await asyncio.gather(*tasks)
+ except Exception as exc:
+ log.error("fatal_error", error=str(exc))
+ await notifier.send_error(str(exc), "news-collector")
+ raise
+ finally:
+ metrics.service_up.labels(service="news-collector").set(0)
+ for task in tasks:
+ task.cancel()
+ await notifier.close()
+ await broker.close()
+ await db.close()
+
+
+def main() -> None:
+ asyncio.run(run())
+
+
+if __name__ == "__main__":
+ main()