diff options
Diffstat (limited to 'services/news-collector/src/news_collector')
| -rw-r--r-- | services/news-collector/src/news_collector/main.py | 193 |
1 files changed, 193 insertions, 0 deletions
diff --git a/services/news-collector/src/news_collector/main.py b/services/news-collector/src/news_collector/main.py new file mode 100644 index 0000000..3493f7c --- /dev/null +++ b/services/news-collector/src/news_collector/main.py @@ -0,0 +1,193 @@ +"""News Collector Service — fetches news from multiple sources and aggregates sentiment.""" + +import asyncio +from datetime import datetime, timezone + +from shared.broker import RedisBroker +from shared.db import Database +from shared.events import NewsEvent +from shared.healthcheck import HealthCheckServer +from shared.logging import setup_logging +from shared.metrics import ServiceMetrics +from shared.models import NewsItem +from shared.notifier import TelegramNotifier +from shared.sentiment_models import MarketSentiment +from shared.sentiment import SentimentAggregator + +from news_collector.config import NewsCollectorConfig +from news_collector.collectors.finnhub import FinnhubCollector +from news_collector.collectors.rss import RSSCollector +from news_collector.collectors.sec_edgar import SecEdgarCollector +from news_collector.collectors.truth_social import TruthSocialCollector +from news_collector.collectors.reddit import RedditCollector +from news_collector.collectors.fear_greed import FearGreedCollector +from news_collector.collectors.fed import FedCollector + +# Health check port: base + 4 +HEALTH_PORT_OFFSET = 4 + + +async def run_collector_once(collector, db: Database, broker: RedisBroker) -> int: + """Run a single collector, store results in DB, publish to Redis. + + Returns the number of items collected. + """ + items: list[NewsItem] = await collector.collect() + count = 0 + for item in items: + await db.insert_news_item(item) + event = NewsEvent(data=item) + stream = f"news.{item.category.value}" + await broker.publish(stream, event.to_dict()) + count += 1 + return count + + +async def run_collector_loop(collector, db: Database, broker: RedisBroker, log) -> None: + """Run a collector repeatedly on its configured poll_interval.""" + while True: + try: + count = await run_collector_once(collector, db, broker) + log.info( + "collector_ran", + collector=collector.name, + count=count, + ) + except Exception as exc: + log.warning( + "collector_error", + collector=collector.name, + error=str(exc), + ) + await asyncio.sleep(collector.poll_interval) + + +async def run_fear_greed_loop(collector: FearGreedCollector, db: Database, log) -> None: + """Fetch Fear & Greed index on its interval and update MarketSentiment in DB.""" + while True: + try: + result = await collector.collect() + if result is not None: + ms = MarketSentiment( + fear_greed=result.fear_greed, + fear_greed_label=result.fear_greed_label, + vix=None, + fed_stance="neutral", + market_regime=_determine_regime(result.fear_greed, None), + updated_at=datetime.now(timezone.utc), + ) + await db.upsert_market_sentiment(ms) + log.info( + "fear_greed_updated", + value=result.fear_greed, + label=result.fear_greed_label, + ) + except Exception as exc: + log.warning("fear_greed_error", error=str(exc)) + await asyncio.sleep(collector.poll_interval) + + +async def run_aggregator_loop(db: Database, interval: int, log) -> None: + """Run SentimentAggregator every interval seconds and persist scores.""" + aggregator = SentimentAggregator() + while True: + await asyncio.sleep(interval) + try: + now = datetime.now(timezone.utc) + news_items = await db.get_recent_news(hours=24) + scores = aggregator.aggregate(news_items, now) + for score in scores.values(): + await db.upsert_symbol_score(score) + log.info("aggregation_complete", symbols=len(scores)) + except Exception as exc: + log.warning("aggregator_error", error=str(exc)) + + +def _determine_regime(fear_greed: int, vix: float | None) -> str: + """Classify market regime from fear/greed index and optional VIX.""" + aggregator = SentimentAggregator() + return aggregator.determine_regime(fear_greed, vix) + + +async def run() -> None: + config = NewsCollectorConfig() + log = setup_logging("news-collector", config.log_level, config.log_format) + metrics = ServiceMetrics("news_collector") + + notifier = TelegramNotifier( + bot_token=config.telegram_bot_token, + chat_id=config.telegram_chat_id, + ) + + db = Database(config.database_url) + await db.connect() + + broker = RedisBroker(config.redis_url) + + health = HealthCheckServer( + "news-collector", + port=config.health_port, + auth_token=config.metrics_auth_token, + ) + await health.start() + metrics.service_up.labels(service="news-collector").set(1) + + # Build collectors + finnhub = FinnhubCollector(api_key=config.finnhub_api_key) + rss = RSSCollector() + sec = SecEdgarCollector() + truth = TruthSocialCollector() + reddit = RedditCollector() + fear_greed = FearGreedCollector() + fed = FedCollector() + + news_collectors = [finnhub, rss, sec, truth, reddit, fed] + + log.info( + "starting", + collectors=[c.name for c in news_collectors], + poll_interval=config.news_poll_interval, + aggregate_interval=config.sentiment_aggregate_interval, + ) + + try: + tasks = [] + for collector in news_collectors: + tasks.append( + asyncio.create_task( + run_collector_loop(collector, db, broker, log), + name=f"collector-{collector.name}", + ) + ) + tasks.append( + asyncio.create_task( + run_fear_greed_loop(fear_greed, db, log), + name="fear-greed-loop", + ) + ) + tasks.append( + asyncio.create_task( + run_aggregator_loop(db, config.sentiment_aggregate_interval, log), + name="aggregator-loop", + ) + ) + await asyncio.gather(*tasks) + except Exception as exc: + log.error("fatal_error", error=str(exc)) + await notifier.send_error(str(exc), "news-collector") + raise + finally: + metrics.service_up.labels(service="news-collector").set(0) + for task in tasks: + task.cancel() + await notifier.close() + await broker.close() + await db.close() + + +def main() -> None: + asyncio.run(run()) + + +if __name__ == "__main__": + main() |
