diff options
| -rw-r--r-- | services/data-collector/src/data_collector/main.py | 6 | ||||
| -rw-r--r-- | services/news-collector/src/news_collector/main.py | 6 | ||||
| -rw-r--r-- | services/order-executor/src/order_executor/main.py | 6 | ||||
| -rw-r--r-- | services/portfolio-manager/src/portfolio_manager/main.py | 6 | ||||
| -rw-r--r-- | services/strategy-engine/src/strategy_engine/main.py | 6 | ||||
| -rw-r--r-- | shared/src/shared/shutdown.py | 30 |
6 files changed, 55 insertions, 5 deletions
diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py index 608d6cd..171db52 100644 --- a/services/data-collector/src/data_collector/main.py +++ b/services/data-collector/src/data_collector/main.py @@ -11,6 +11,7 @@ from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.models import Candle from shared.notifier import TelegramNotifier +from shared.shutdown import GracefulShutdown from data_collector.config import CollectorConfig @@ -83,10 +84,13 @@ async def run() -> None: symbols = config.symbols timeframe = config.timeframes[0] if config.timeframes else "1Day" + shutdown = GracefulShutdown() + shutdown.install_handlers() + log.info("starting", symbols=symbols, timeframe=timeframe, poll_interval=poll_interval) try: - while True: + while not shutdown.is_shutting_down: # Check if market is open try: is_open = await alpaca.is_market_open() diff --git a/services/news-collector/src/news_collector/main.py b/services/news-collector/src/news_collector/main.py index f56914f..837a397 100644 --- a/services/news-collector/src/news_collector/main.py +++ b/services/news-collector/src/news_collector/main.py @@ -13,6 +13,7 @@ from shared.models import NewsItem from shared.notifier import TelegramNotifier from shared.sentiment_models import MarketSentiment from shared.sentiment import SentimentAggregator +from shared.shutdown import GracefulShutdown from news_collector.config import NewsCollectorConfig from news_collector.collectors.finnhub import FinnhubCollector @@ -143,6 +144,9 @@ async def run() -> None: news_collectors = [finnhub, rss, sec, truth, reddit, fed] + shutdown = GracefulShutdown() + shutdown.install_handlers() + log.info( "starting", collectors=[c.name for c in news_collectors], @@ -171,7 +175,7 @@ async def run() -> None: name="aggregator-loop", ) ) - await asyncio.gather(*tasks) + await shutdown.wait() except Exception as exc: log.error("fatal_error", error=str(exc)) await notifier.send_error(str(exc), "news-collector") diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py index 1d167ef..63d93bc 100644 --- a/services/order-executor/src/order_executor/main.py +++ b/services/order-executor/src/order_executor/main.py @@ -11,6 +11,7 @@ from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier +from shared.shutdown import GracefulShutdown from order_executor.config import ExecutorConfig from order_executor.executor import OrderExecutor @@ -83,6 +84,9 @@ async def run() -> None: await broker.ensure_group(stream, GROUP) + shutdown = GracefulShutdown() + shutdown.install_handlers() + log.info("started", stream=stream, dry_run=config.dry_run) try: @@ -97,7 +101,7 @@ async def run() -> None: except Exception as exc: log.error("pending_failed", error=str(exc), msg_id=msg_id) - while True: + while not shutdown.is_shutting_down: messages = await broker.read_group(stream, GROUP, CONSUMER, count=10, block=5000) for msg_id, msg in messages: try: diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py index 0214099..6cf248f 100644 --- a/services/portfolio-manager/src/portfolio_manager/main.py +++ b/services/portfolio-manager/src/portfolio_manager/main.py @@ -9,6 +9,7 @@ from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier +from shared.shutdown import GracefulShutdown from portfolio_manager.config import PortfolioConfig from portfolio_manager.portfolio import PortfolioTracker @@ -83,6 +84,9 @@ async def run() -> None: snapshot_loop(db, tracker, notifier, config.snapshot_interval_hours, log) ) + shutdown = GracefulShutdown() + shutdown.install_handlers() + GROUP = "portfolio-manager" CONSUMER = "portfolio-1" log.info("service_started", stream=ORDERS_STREAM) @@ -113,7 +117,7 @@ async def run() -> None: metrics.errors_total.labels(service="portfolio-manager", error_type="processing").inc() try: - while True: + while not shutdown.is_shutting_down: messages = await broker.read_group(ORDERS_STREAM, GROUP, CONSUMER, count=10, block=1000) for msg_id, msg in messages: try: diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py index 2635b7c..411c54b 100644 --- a/services/strategy-engine/src/strategy_engine/main.py +++ b/services/strategy-engine/src/strategy_engine/main.py @@ -13,6 +13,7 @@ from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier from shared.sentiment_models import MarketSentiment +from shared.shutdown import GracefulShutdown from strategy_engine.config import StrategyConfig from strategy_engine.engine import StrategyEngine @@ -97,6 +98,9 @@ async def run() -> None: params = config.strategy_params.get(strategy.name, {}) strategy.configure(params) + shutdown = GracefulShutdown() + shutdown.install_handlers() + log.info("loaded_strategies", count=len(strategies), names=[s.name for s in strategies]) engine = StrategyEngine(broker=broker, strategies=strategies) @@ -131,7 +135,7 @@ async def run() -> None: ) log.info("stock_selector_enabled", time=config.selector_final_time) - await asyncio.gather(*tasks) + await shutdown.wait() except Exception as exc: log.error("fatal_error", error=str(exc)) await notifier.send_error(str(exc), "strategy-engine") diff --git a/shared/src/shared/shutdown.py b/shared/src/shared/shutdown.py new file mode 100644 index 0000000..4ed9aa7 --- /dev/null +++ b/shared/src/shared/shutdown.py @@ -0,0 +1,30 @@ +"""Graceful shutdown utilities for services.""" + +import asyncio +import logging +import signal + +logger = logging.getLogger(__name__) + + +class GracefulShutdown: + """Manages graceful shutdown via SIGTERM/SIGINT signals.""" + + def __init__(self) -> None: + self._event = asyncio.Event() + + @property + def is_shutting_down(self) -> bool: + return self._event.is_set() + + async def wait(self) -> None: + await self._event.wait() + + def trigger(self) -> None: + logger.info("shutdown_signal_received") + self._event.set() + + def install_handlers(self) -> None: + loop = asyncio.get_running_loop() + for sig in (signal.SIGTERM, signal.SIGINT): + loop.add_signal_handler(sig, self.trigger) |
