summaryrefslogtreecommitdiff
path: root/services/portfolio-manager/src/portfolio_manager/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'services/portfolio-manager/src/portfolio_manager/main.py')
-rw-r--r--services/portfolio-manager/src/portfolio_manager/main.py43
1 files changed, 32 insertions, 11 deletions
diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py
index a6823ae..f885aa8 100644
--- a/services/portfolio-manager/src/portfolio_manager/main.py
+++ b/services/portfolio-manager/src/portfolio_manager/main.py
@@ -2,6 +2,10 @@
import asyncio
+import sqlalchemy.exc
+
+from portfolio_manager.config import PortfolioConfig
+from portfolio_manager.portfolio import PortfolioTracker
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import Event, OrderEvent
@@ -9,9 +13,7 @@ from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
-
-from portfolio_manager.config import PortfolioConfig
-from portfolio_manager.portfolio import PortfolioTracker
+from shared.shutdown import GracefulShutdown
ORDERS_STREAM = "orders"
@@ -51,8 +53,12 @@ async def snapshot_loop(
while True:
try:
await save_snapshot(db, tracker, notifier, log)
+ except (sqlalchemy.exc.OperationalError, ConnectionError, TimeoutError) as exc:
+ log.warning("snapshot_db_error", error=str(exc))
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("snapshot_data_error", error=str(exc))
except Exception as exc:
- log.error("snapshot_failed", error=str(exc))
+ log.error("snapshot_failed", error=str(exc), exc_info=True)
await asyncio.sleep(interval_hours * 3600)
@@ -61,10 +67,10 @@ async def run() -> None:
log = setup_logging("portfolio-manager", config.log_level, config.log_format)
metrics = ServiceMetrics("portfolio_manager")
notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id
+ bot_token=config.telegram_bot_token.get_secret_value(), chat_id=config.telegram_chat_id
)
- broker = RedisBroker(config.redis_url)
+ broker = RedisBroker(config.redis_url.get_secret_value())
tracker = PortfolioTracker()
health = HealthCheckServer(
@@ -76,13 +82,16 @@ async def run() -> None:
await health.start()
metrics.service_up.labels(service="portfolio-manager").set(1)
- db = Database(config.database_url)
+ db = Database(config.database_url.get_secret_value())
await db.connect()
snapshot_task = asyncio.create_task(
snapshot_loop(db, tracker, notifier, config.snapshot_interval_hours, log)
)
+ shutdown = GracefulShutdown()
+ shutdown.install_handlers()
+
GROUP = "portfolio-manager"
CONSUMER = "portfolio-1"
log.info("service_started", stream=ORDERS_STREAM)
@@ -108,12 +117,16 @@ async def run() -> None:
service="portfolio-manager", event_type="order"
).inc()
await broker.ack(ORDERS_STREAM, GROUP, msg_id)
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("pending_parse_error", error=str(exc), msg_id=msg_id)
+ await broker.ack(ORDERS_STREAM, GROUP, msg_id)
+ metrics.errors_total.labels(service="portfolio-manager", error_type="validation").inc()
except Exception as exc:
- log.error("pending_process_failed", error=str(exc), msg_id=msg_id)
+ log.error("pending_process_failed", error=str(exc), msg_id=msg_id, exc_info=True)
metrics.errors_total.labels(service="portfolio-manager", error_type="processing").inc()
try:
- while True:
+ while not shutdown.is_shutting_down:
messages = await broker.read_group(ORDERS_STREAM, GROUP, CONSUMER, count=10, block=1000)
for msg_id, msg in messages:
try:
@@ -134,13 +147,21 @@ async def run() -> None:
service="portfolio-manager", event_type="order"
).inc()
await broker.ack(ORDERS_STREAM, GROUP, msg_id)
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("message_parse_error", error=str(exc), msg_id=msg_id)
+ await broker.ack(ORDERS_STREAM, GROUP, msg_id)
+ metrics.errors_total.labels(
+ service="portfolio-manager", error_type="validation"
+ ).inc()
except Exception as exc:
- log.exception("message_processing_failed", error=str(exc), msg_id=msg_id)
+ log.error(
+ "message_processing_failed", error=str(exc), msg_id=msg_id, exc_info=True
+ )
metrics.errors_total.labels(
service="portfolio-manager", error_type="processing"
).inc()
except Exception as exc:
- log.error("fatal_error", error=str(exc))
+ log.error("fatal_error", error=str(exc), exc_info=True)
await notifier.send_error(str(exc), "portfolio-manager")
raise
finally: