"""News Collector Service — fetches news from multiple sources and aggregates sentiment.""" import asyncio from datetime import datetime, timezone import aiohttp from shared.broker import RedisBroker from shared.db import Database from shared.events import NewsEvent from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.models import NewsItem from shared.notifier import TelegramNotifier from shared.sentiment_models import MarketSentiment from shared.sentiment import SentimentAggregator from shared.shutdown import GracefulShutdown from news_collector.config import NewsCollectorConfig from news_collector.collectors.finnhub import FinnhubCollector from news_collector.collectors.rss import RSSCollector from news_collector.collectors.sec_edgar import SecEdgarCollector from news_collector.collectors.truth_social import TruthSocialCollector from news_collector.collectors.reddit import RedditCollector from news_collector.collectors.fear_greed import FearGreedCollector from news_collector.collectors.fed import FedCollector # Health check port: base + 4 HEALTH_PORT_OFFSET = 4 async def run_collector_once(collector, db: Database, broker: RedisBroker) -> int: """Run a single collector, store results in DB, publish to Redis. Returns the number of items collected. """ items: list[NewsItem] = await collector.collect() count = 0 for item in items: await db.insert_news_item(item) event = NewsEvent(data=item) stream = f"news.{item.category.value}" await broker.publish(stream, event.to_dict()) count += 1 return count async def run_collector_loop(collector, db: Database, broker: RedisBroker, log) -> None: """Run a collector repeatedly on its configured poll_interval.""" while True: try: count = await run_collector_once(collector, db, broker) log.info( "collector_ran", collector=collector.name, count=count, ) except (aiohttp.ClientError, ConnectionError, TimeoutError, asyncio.TimeoutError) as exc: log.warning( "collector_network_error", collector=collector.name, error=str(exc), ) except (ValueError, KeyError, TypeError) as exc: log.warning( "collector_parse_error", collector=collector.name, error=str(exc), ) await asyncio.sleep(collector.poll_interval) async def run_fear_greed_loop(collector: FearGreedCollector, db: Database, log) -> None: """Fetch Fear & Greed index on its interval and update MarketSentiment in DB.""" while True: try: result = await collector.collect() if result is not None: ms = MarketSentiment( fear_greed=result.fear_greed, fear_greed_label=result.fear_greed_label, vix=None, fed_stance="neutral", market_regime=_determine_regime(result.fear_greed, None), updated_at=datetime.now(timezone.utc), ) await db.upsert_market_sentiment(ms) log.info( "fear_greed_updated", value=result.fear_greed, label=result.fear_greed_label, ) except (aiohttp.ClientError, ConnectionError, TimeoutError, asyncio.TimeoutError) as exc: log.warning("fear_greed_network_error", error=str(exc)) except (ValueError, KeyError, TypeError) as exc: log.warning("fear_greed_parse_error", error=str(exc)) await asyncio.sleep(collector.poll_interval) async def run_aggregator_loop(db: Database, interval: int, log) -> None: """Run SentimentAggregator every interval seconds and persist scores.""" aggregator = SentimentAggregator() while True: await asyncio.sleep(interval) try: now = datetime.now(timezone.utc) news_items = await db.get_recent_news(hours=24) scores = aggregator.aggregate(news_items, now) for score in scores.values(): await db.upsert_symbol_score(score) log.info("aggregation_complete", symbols=len(scores)) except (ConnectionError, TimeoutError, asyncio.TimeoutError) as exc: log.warning("aggregator_network_error", error=str(exc)) except (ValueError, KeyError, TypeError) as exc: log.warning("aggregator_parse_error", error=str(exc)) def _determine_regime(fear_greed: int, vix: float | None) -> str: """Classify market regime from fear/greed index and optional VIX.""" aggregator = SentimentAggregator() return aggregator.determine_regime(fear_greed, vix) async def run() -> None: config = NewsCollectorConfig() log = setup_logging("news-collector", config.log_level, config.log_format) metrics = ServiceMetrics("news_collector") notifier = TelegramNotifier( bot_token=config.telegram_bot_token.get_secret_value(), chat_id=config.telegram_chat_id, ) db = Database(config.database_url.get_secret_value()) await db.connect() broker = RedisBroker(config.redis_url.get_secret_value()) health = HealthCheckServer( "news-collector", port=config.health_port, auth_token=config.metrics_auth_token, ) await health.start() metrics.service_up.labels(service="news-collector").set(1) # Build collectors finnhub = FinnhubCollector(api_key=config.finnhub_api_key.get_secret_value()) rss = RSSCollector() sec = SecEdgarCollector() truth = TruthSocialCollector() reddit = RedditCollector() fear_greed = FearGreedCollector() fed = FedCollector() news_collectors = [finnhub, rss, sec, truth, reddit, fed] shutdown = GracefulShutdown() shutdown.install_handlers() log.info( "starting", collectors=[c.name for c in news_collectors], poll_interval=config.news_poll_interval, aggregate_interval=config.sentiment_aggregate_interval, ) try: tasks = [] for collector in news_collectors: tasks.append( asyncio.create_task( run_collector_loop(collector, db, broker, log), name=f"collector-{collector.name}", ) ) tasks.append( asyncio.create_task( run_fear_greed_loop(fear_greed, db, log), name="fear-greed-loop", ) ) tasks.append( asyncio.create_task( run_aggregator_loop(db, config.sentiment_aggregate_interval, log), name="aggregator-loop", ) ) await shutdown.wait() except Exception as exc: log.error("fatal_error", error=str(exc), exc_info=True) await notifier.send_error(str(exc), "news-collector") raise finally: metrics.service_up.labels(service="news-collector").set(0) for task in tasks: task.cancel() await notifier.close() await broker.close() await db.close() def main() -> None: asyncio.run(run()) if __name__ == "__main__": main()