"""News Collector Service — fetches news from multiple sources and aggregates sentiment.""" import asyncio from datetime import UTC, datetime import aiohttp from news_collector.collectors.fear_greed import FearGreedCollector from news_collector.collectors.fed import FedCollector from news_collector.collectors.finnhub import FinnhubCollector from news_collector.collectors.reddit import RedditCollector from news_collector.collectors.rss import RSSCollector from news_collector.collectors.sec_edgar import SecEdgarCollector from news_collector.collectors.truth_social import TruthSocialCollector from news_collector.config import NewsCollectorConfig from shared.broker import RedisBroker from shared.db import Database from shared.events import NewsEvent from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.models import NewsItem from shared.notifier import TelegramNotifier from shared.sentiment import SentimentAggregator from shared.sentiment_models import MarketSentiment from shared.shutdown import GracefulShutdown # Health check port: base + 4 HEALTH_PORT_OFFSET = 4 async def run_collector_once(collector, db: Database, broker: RedisBroker) -> int: """Run a single collector, store results in DB, publish to Redis. Returns the number of items collected. """ items: list[NewsItem] = await collector.collect() count = 0 for item in items: await db.insert_news_item(item) event = NewsEvent(data=item) stream = f"news.{item.category.value}" await broker.publish(stream, event.to_dict()) count += 1 return count async def run_collector_loop(collector, db: Database, broker: RedisBroker, log) -> None: """Run a collector repeatedly on its configured poll_interval.""" while True: try: count = await run_collector_once(collector, db, broker) log.info( "collector_ran", collector=collector.name, count=count, ) except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc: log.warning( "collector_network_error", collector=collector.name, error=str(exc), ) except (ValueError, KeyError, TypeError) as exc: log.warning( "collector_parse_error", collector=collector.name, error=str(exc), ) await asyncio.sleep(collector.poll_interval) async def run_fear_greed_loop(collector: FearGreedCollector, db: Database, log) -> None: """Fetch Fear & Greed index on its interval and update MarketSentiment in DB.""" while True: try: result = await collector.collect() if result is not None: ms = MarketSentiment( fear_greed=result.fear_greed, fear_greed_label=result.fear_greed_label, vix=None, fed_stance="neutral", market_regime=_determine_regime(result.fear_greed, None), updated_at=datetime.now(UTC), ) await db.upsert_market_sentiment(ms) log.info( "fear_greed_updated", value=result.fear_greed, label=result.fear_greed_label, ) except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc: log.warning("fear_greed_network_error", error=str(exc)) except (ValueError, KeyError, TypeError) as exc: log.warning("fear_greed_parse_error", error=str(exc)) await asyncio.sleep(collector.poll_interval) async def run_aggregator_loop(db: Database, interval: int, log) -> None: """Run SentimentAggregator every interval seconds and persist scores.""" aggregator = SentimentAggregator() while True: await asyncio.sleep(interval) try: now = datetime.now(UTC) news_items = await db.get_recent_news(hours=24) scores = aggregator.aggregate(news_items, now) for score in scores.values(): await db.upsert_symbol_score(score) log.info("aggregation_complete", symbols=len(scores)) except (ConnectionError, TimeoutError) as exc: log.warning("aggregator_network_error", error=str(exc)) except (ValueError, KeyError, TypeError) as exc: log.warning("aggregator_parse_error", error=str(exc)) def _determine_regime(fear_greed: int, vix: float | None) -> str: """Classify market regime from fear/greed index and optional VIX.""" aggregator = SentimentAggregator() return aggregator.determine_regime(fear_greed, vix) async def run() -> None: config = NewsCollectorConfig() log = setup_logging("news-collector", config.log_level, config.log_format) metrics = ServiceMetrics("news_collector") notifier = TelegramNotifier( bot_token=config.telegram_bot_token.get_secret_value(), chat_id=config.telegram_chat_id, ) db = Database(config.database_url.get_secret_value()) await db.connect() broker = RedisBroker(config.redis_url.get_secret_value()) health = HealthCheckServer( "news-collector", port=config.health_port, auth_token=config.metrics_auth_token, ) await health.start() metrics.service_up.labels(service="news-collector").set(1) # Build collectors finnhub = FinnhubCollector(api_key=config.finnhub_api_key.get_secret_value()) rss = RSSCollector() sec = SecEdgarCollector() truth = TruthSocialCollector() reddit = RedditCollector() fear_greed = FearGreedCollector() fed = FedCollector() news_collectors = [finnhub, rss, sec, truth, reddit, fed] shutdown = GracefulShutdown() shutdown.install_handlers() log.info( "starting", collectors=[c.name for c in news_collectors], poll_interval=config.news_poll_interval, aggregate_interval=config.sentiment_aggregate_interval, ) try: tasks = [ asyncio.create_task( run_collector_loop(collector, db, broker, log), name=f"collector-{collector.name}", ) for collector in news_collectors ] tasks.append( asyncio.create_task( run_fear_greed_loop(fear_greed, db, log), name="fear-greed-loop", ) ) tasks.append( asyncio.create_task( run_aggregator_loop(db, config.sentiment_aggregate_interval, log), name="aggregator-loop", ) ) await shutdown.wait() except Exception as exc: log.error("fatal_error", error=str(exc), exc_info=True) await notifier.send_error(str(exc), "news-collector") raise finally: metrics.service_up.labels(service="news-collector").set(0) for task in tasks: task.cancel() await notifier.close() await broker.close() await db.close() def main() -> None: asyncio.run(run()) if __name__ == "__main__": main()