diff options
Diffstat (limited to 'services/news-collector/src/news_collector/main.py')
| -rw-r--r-- | services/news-collector/src/news_collector/main.py | 81 |
1 files changed, 46 insertions, 35 deletions
diff --git a/services/news-collector/src/news_collector/main.py b/services/news-collector/src/news_collector/main.py index 3493f7c..c39fa67 100644 --- a/services/news-collector/src/news_collector/main.py +++ b/services/news-collector/src/news_collector/main.py @@ -1,8 +1,18 @@ """News Collector Service — fetches news from multiple sources and aggregates sentiment.""" import asyncio -from datetime import datetime, timezone +from datetime import UTC, datetime +import aiohttp + +from news_collector.collectors.fear_greed import FearGreedCollector +from news_collector.collectors.fed import FedCollector +from news_collector.collectors.finnhub import FinnhubCollector +from news_collector.collectors.reddit import RedditCollector +from news_collector.collectors.rss import RSSCollector +from news_collector.collectors.sec_edgar import SecEdgarCollector +from news_collector.collectors.truth_social import TruthSocialCollector +from news_collector.config import NewsCollectorConfig from shared.broker import RedisBroker from shared.db import Database from shared.events import NewsEvent @@ -11,20 +21,9 @@ from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.models import NewsItem from shared.notifier import TelegramNotifier -from shared.sentiment_models import MarketSentiment from shared.sentiment import SentimentAggregator - -from news_collector.config import NewsCollectorConfig -from news_collector.collectors.finnhub import FinnhubCollector -from news_collector.collectors.rss import RSSCollector -from news_collector.collectors.sec_edgar import SecEdgarCollector -from news_collector.collectors.truth_social import TruthSocialCollector -from news_collector.collectors.reddit import RedditCollector -from news_collector.collectors.fear_greed import FearGreedCollector -from news_collector.collectors.fed import FedCollector - -# Health check port: base + 4 -HEALTH_PORT_OFFSET = 4 +from shared.sentiment_models import MarketSentiment +from shared.shutdown import GracefulShutdown async def run_collector_once(collector, db: Database, broker: RedisBroker) -> int: @@ -53,9 +52,15 @@ async def run_collector_loop(collector, db: Database, broker: RedisBroker, log) collector=collector.name, count=count, ) - except Exception as exc: + except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc: log.warning( - "collector_error", + "collector_network_error", + collector=collector.name, + error=str(exc), + ) + except (ValueError, KeyError, TypeError) as exc: + log.warning( + "collector_parse_error", collector=collector.name, error=str(exc), ) @@ -74,7 +79,7 @@ async def run_fear_greed_loop(collector: FearGreedCollector, db: Database, log) vix=None, fed_stance="neutral", market_regime=_determine_regime(result.fear_greed, None), - updated_at=datetime.now(timezone.utc), + updated_at=datetime.now(UTC), ) await db.upsert_market_sentiment(ms) log.info( @@ -82,8 +87,10 @@ async def run_fear_greed_loop(collector: FearGreedCollector, db: Database, log) value=result.fear_greed, label=result.fear_greed_label, ) - except Exception as exc: - log.warning("fear_greed_error", error=str(exc)) + except (aiohttp.ClientError, ConnectionError, TimeoutError) as exc: + log.warning("fear_greed_network_error", error=str(exc)) + except (ValueError, KeyError, TypeError) as exc: + log.warning("fear_greed_parse_error", error=str(exc)) await asyncio.sleep(collector.poll_interval) @@ -93,14 +100,16 @@ async def run_aggregator_loop(db: Database, interval: int, log) -> None: while True: await asyncio.sleep(interval) try: - now = datetime.now(timezone.utc) + now = datetime.now(UTC) news_items = await db.get_recent_news(hours=24) scores = aggregator.aggregate(news_items, now) for score in scores.values(): await db.upsert_symbol_score(score) log.info("aggregation_complete", symbols=len(scores)) - except Exception as exc: - log.warning("aggregator_error", error=str(exc)) + except (ConnectionError, TimeoutError) as exc: + log.warning("aggregator_network_error", error=str(exc)) + except (ValueError, KeyError, TypeError) as exc: + log.warning("aggregator_parse_error", error=str(exc)) def _determine_regime(fear_greed: int, vix: float | None) -> str: @@ -115,14 +124,14 @@ async def run() -> None: metrics = ServiceMetrics("news_collector") notifier = TelegramNotifier( - bot_token=config.telegram_bot_token, + bot_token=config.telegram_bot_token.get_secret_value(), chat_id=config.telegram_chat_id, ) - db = Database(config.database_url) + db = Database(config.database_url.get_secret_value()) await db.connect() - broker = RedisBroker(config.redis_url) + broker = RedisBroker(config.redis_url.get_secret_value()) health = HealthCheckServer( "news-collector", @@ -133,7 +142,7 @@ async def run() -> None: metrics.service_up.labels(service="news-collector").set(1) # Build collectors - finnhub = FinnhubCollector(api_key=config.finnhub_api_key) + finnhub = FinnhubCollector(api_key=config.finnhub_api_key.get_secret_value()) rss = RSSCollector() sec = SecEdgarCollector() truth = TruthSocialCollector() @@ -143,6 +152,9 @@ async def run() -> None: news_collectors = [finnhub, rss, sec, truth, reddit, fed] + shutdown = GracefulShutdown() + shutdown.install_handlers() + log.info( "starting", collectors=[c.name for c in news_collectors], @@ -151,14 +163,13 @@ async def run() -> None: ) try: - tasks = [] - for collector in news_collectors: - tasks.append( - asyncio.create_task( - run_collector_loop(collector, db, broker, log), - name=f"collector-{collector.name}", - ) + tasks = [ + asyncio.create_task( + run_collector_loop(collector, db, broker, log), + name=f"collector-{collector.name}", ) + for collector in news_collectors + ] tasks.append( asyncio.create_task( run_fear_greed_loop(fear_greed, db, log), @@ -171,9 +182,9 @@ async def run() -> None: name="aggregator-loop", ) ) - await asyncio.gather(*tasks) + await shutdown.wait() except Exception as exc: - log.error("fatal_error", error=str(exc)) + log.error("fatal_error", error=str(exc), exc_info=True) await notifier.send_error(str(exc), "news-collector") raise finally: |
