summaryrefslogtreecommitdiff
path: root/services
diff options
context:
space:
mode:
Diffstat (limited to 'services')
-rw-r--r--services/data-collector/src/data_collector/main.py6
-rw-r--r--services/news-collector/src/news_collector/main.py6
-rw-r--r--services/order-executor/src/order_executor/main.py6
-rw-r--r--services/portfolio-manager/src/portfolio_manager/main.py6
-rw-r--r--services/strategy-engine/src/strategy_engine/main.py6
5 files changed, 25 insertions, 5 deletions
diff --git a/services/data-collector/src/data_collector/main.py b/services/data-collector/src/data_collector/main.py
index 608d6cd..171db52 100644
--- a/services/data-collector/src/data_collector/main.py
+++ b/services/data-collector/src/data_collector/main.py
@@ -11,6 +11,7 @@ from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.models import Candle
from shared.notifier import TelegramNotifier
+from shared.shutdown import GracefulShutdown
from data_collector.config import CollectorConfig
@@ -83,10 +84,13 @@ async def run() -> None:
symbols = config.symbols
timeframe = config.timeframes[0] if config.timeframes else "1Day"
+ shutdown = GracefulShutdown()
+ shutdown.install_handlers()
+
log.info("starting", symbols=symbols, timeframe=timeframe, poll_interval=poll_interval)
try:
- while True:
+ while not shutdown.is_shutting_down:
# Check if market is open
try:
is_open = await alpaca.is_market_open()
diff --git a/services/news-collector/src/news_collector/main.py b/services/news-collector/src/news_collector/main.py
index f56914f..837a397 100644
--- a/services/news-collector/src/news_collector/main.py
+++ b/services/news-collector/src/news_collector/main.py
@@ -13,6 +13,7 @@ from shared.models import NewsItem
from shared.notifier import TelegramNotifier
from shared.sentiment_models import MarketSentiment
from shared.sentiment import SentimentAggregator
+from shared.shutdown import GracefulShutdown
from news_collector.config import NewsCollectorConfig
from news_collector.collectors.finnhub import FinnhubCollector
@@ -143,6 +144,9 @@ async def run() -> None:
news_collectors = [finnhub, rss, sec, truth, reddit, fed]
+ shutdown = GracefulShutdown()
+ shutdown.install_handlers()
+
log.info(
"starting",
collectors=[c.name for c in news_collectors],
@@ -171,7 +175,7 @@ async def run() -> None:
name="aggregator-loop",
)
)
- await asyncio.gather(*tasks)
+ await shutdown.wait()
except Exception as exc:
log.error("fatal_error", error=str(exc))
await notifier.send_error(str(exc), "news-collector")
diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py
index 1d167ef..63d93bc 100644
--- a/services/order-executor/src/order_executor/main.py
+++ b/services/order-executor/src/order_executor/main.py
@@ -11,6 +11,7 @@ from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
+from shared.shutdown import GracefulShutdown
from order_executor.config import ExecutorConfig
from order_executor.executor import OrderExecutor
@@ -83,6 +84,9 @@ async def run() -> None:
await broker.ensure_group(stream, GROUP)
+ shutdown = GracefulShutdown()
+ shutdown.install_handlers()
+
log.info("started", stream=stream, dry_run=config.dry_run)
try:
@@ -97,7 +101,7 @@ async def run() -> None:
except Exception as exc:
log.error("pending_failed", error=str(exc), msg_id=msg_id)
- while True:
+ while not shutdown.is_shutting_down:
messages = await broker.read_group(stream, GROUP, CONSUMER, count=10, block=5000)
for msg_id, msg in messages:
try:
diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py
index 0214099..6cf248f 100644
--- a/services/portfolio-manager/src/portfolio_manager/main.py
+++ b/services/portfolio-manager/src/portfolio_manager/main.py
@@ -9,6 +9,7 @@ from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
+from shared.shutdown import GracefulShutdown
from portfolio_manager.config import PortfolioConfig
from portfolio_manager.portfolio import PortfolioTracker
@@ -83,6 +84,9 @@ async def run() -> None:
snapshot_loop(db, tracker, notifier, config.snapshot_interval_hours, log)
)
+ shutdown = GracefulShutdown()
+ shutdown.install_handlers()
+
GROUP = "portfolio-manager"
CONSUMER = "portfolio-1"
log.info("service_started", stream=ORDERS_STREAM)
@@ -113,7 +117,7 @@ async def run() -> None:
metrics.errors_total.labels(service="portfolio-manager", error_type="processing").inc()
try:
- while True:
+ while not shutdown.is_shutting_down:
messages = await broker.read_group(ORDERS_STREAM, GROUP, CONSUMER, count=10, block=1000)
for msg_id, msg in messages:
try:
diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py
index 2635b7c..411c54b 100644
--- a/services/strategy-engine/src/strategy_engine/main.py
+++ b/services/strategy-engine/src/strategy_engine/main.py
@@ -13,6 +13,7 @@ from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
from shared.sentiment_models import MarketSentiment
+from shared.shutdown import GracefulShutdown
from strategy_engine.config import StrategyConfig
from strategy_engine.engine import StrategyEngine
@@ -97,6 +98,9 @@ async def run() -> None:
params = config.strategy_params.get(strategy.name, {})
strategy.configure(params)
+ shutdown = GracefulShutdown()
+ shutdown.install_handlers()
+
log.info("loaded_strategies", count=len(strategies), names=[s.name for s in strategies])
engine = StrategyEngine(broker=broker, strategies=strategies)
@@ -131,7 +135,7 @@ async def run() -> None:
)
log.info("stock_selector_enabled", time=config.selector_final_time)
- await asyncio.gather(*tasks)
+ await shutdown.wait()
except Exception as exc:
log.error("fatal_error", error=str(exc))
await notifier.send_error(str(exc), "strategy-engine")