diff options
Diffstat (limited to 'services/portfolio-manager/src/portfolio_manager/main.py')
| -rw-r--r-- | services/portfolio-manager/src/portfolio_manager/main.py | 43 |
1 files changed, 32 insertions, 11 deletions
diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py index a6823ae..f885aa8 100644 --- a/services/portfolio-manager/src/portfolio_manager/main.py +++ b/services/portfolio-manager/src/portfolio_manager/main.py @@ -2,6 +2,10 @@ import asyncio +import sqlalchemy.exc + +from portfolio_manager.config import PortfolioConfig +from portfolio_manager.portfolio import PortfolioTracker from shared.broker import RedisBroker from shared.db import Database from shared.events import Event, OrderEvent @@ -9,9 +13,7 @@ from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier - -from portfolio_manager.config import PortfolioConfig -from portfolio_manager.portfolio import PortfolioTracker +from shared.shutdown import GracefulShutdown ORDERS_STREAM = "orders" @@ -51,8 +53,12 @@ async def snapshot_loop( while True: try: await save_snapshot(db, tracker, notifier, log) + except (sqlalchemy.exc.OperationalError, ConnectionError, TimeoutError) as exc: + log.warning("snapshot_db_error", error=str(exc)) + except (ValueError, KeyError, TypeError) as exc: + log.warning("snapshot_data_error", error=str(exc)) except Exception as exc: - log.error("snapshot_failed", error=str(exc)) + log.error("snapshot_failed", error=str(exc), exc_info=True) await asyncio.sleep(interval_hours * 3600) @@ -61,10 +67,10 @@ async def run() -> None: log = setup_logging("portfolio-manager", config.log_level, config.log_format) metrics = ServiceMetrics("portfolio_manager") notifier = TelegramNotifier( - bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id + bot_token=config.telegram_bot_token.get_secret_value(), chat_id=config.telegram_chat_id ) - broker = RedisBroker(config.redis_url) + broker = RedisBroker(config.redis_url.get_secret_value()) tracker = PortfolioTracker() health = HealthCheckServer( @@ -76,13 +82,16 @@ async def run() -> None: await health.start() metrics.service_up.labels(service="portfolio-manager").set(1) - db = Database(config.database_url) + db = Database(config.database_url.get_secret_value()) await db.connect() snapshot_task = asyncio.create_task( snapshot_loop(db, tracker, notifier, config.snapshot_interval_hours, log) ) + shutdown = GracefulShutdown() + shutdown.install_handlers() + GROUP = "portfolio-manager" CONSUMER = "portfolio-1" log.info("service_started", stream=ORDERS_STREAM) @@ -108,12 +117,16 @@ async def run() -> None: service="portfolio-manager", event_type="order" ).inc() await broker.ack(ORDERS_STREAM, GROUP, msg_id) + except (ValueError, KeyError, TypeError) as exc: + log.warning("pending_parse_error", error=str(exc), msg_id=msg_id) + await broker.ack(ORDERS_STREAM, GROUP, msg_id) + metrics.errors_total.labels(service="portfolio-manager", error_type="validation").inc() except Exception as exc: - log.error("pending_process_failed", error=str(exc), msg_id=msg_id) + log.error("pending_process_failed", error=str(exc), msg_id=msg_id, exc_info=True) metrics.errors_total.labels(service="portfolio-manager", error_type="processing").inc() try: - while True: + while not shutdown.is_shutting_down: messages = await broker.read_group(ORDERS_STREAM, GROUP, CONSUMER, count=10, block=1000) for msg_id, msg in messages: try: @@ -134,13 +147,21 @@ async def run() -> None: service="portfolio-manager", event_type="order" ).inc() await broker.ack(ORDERS_STREAM, GROUP, msg_id) + except (ValueError, KeyError, TypeError) as exc: + log.warning("message_parse_error", error=str(exc), msg_id=msg_id) + await broker.ack(ORDERS_STREAM, GROUP, msg_id) + metrics.errors_total.labels( + service="portfolio-manager", error_type="validation" + ).inc() except Exception as exc: - log.exception("message_processing_failed", error=str(exc), msg_id=msg_id) + log.error( + "message_processing_failed", error=str(exc), msg_id=msg_id, exc_info=True + ) metrics.errors_total.labels( service="portfolio-manager", error_type="processing" ).inc() except Exception as exc: - log.error("fatal_error", error=str(exc)) + log.error("fatal_error", error=str(exc), exc_info=True) await notifier.send_error(str(exc), "portfolio-manager") raise finally: |
