diff options
| author | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-01 16:12:40 +0900 |
|---|---|---|
| committer | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-01 16:12:40 +0900 |
| commit | c3560fdd637c4f034cac4f7371aeed65d018bc91 (patch) | |
| tree | c50d3318c0c37ca634012b7113a80c0d6e356844 /services/portfolio-manager/src/portfolio_manager/main.py | |
| parent | ec792a3d379c911165038d8da5b339df6ca3fccd (diff) | |
feat(services): integrate structlog, healthcheck, metrics, and Telegram
Replace logging.basicConfig/getLogger with structlog setup_logging in all
four service entry points. Add HealthCheckServer, ServiceMetrics, and
TelegramNotifier initialization to each service. Update OrderExecutor to
accept a notifier parameter and send order notifications. Add
RedisBroker.ping() for health checks. Update executor tests with
notifier=AsyncMock().
Diffstat (limited to 'services/portfolio-manager/src/portfolio_manager/main.py')
| -rw-r--r-- | services/portfolio-manager/src/portfolio_manager/main.py | 45 |
1 files changed, 31 insertions, 14 deletions
diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py index cb7e6a8..56624f7 100644 --- a/services/portfolio-manager/src/portfolio_manager/main.py +++ b/services/portfolio-manager/src/portfolio_manager/main.py @@ -1,26 +1,35 @@ """Portfolio Manager Service entry point.""" import asyncio -import logging from shared.broker import RedisBroker from shared.events import Event, OrderEvent +from shared.healthcheck import HealthCheckServer +from shared.logging import setup_logging +from shared.metrics import ServiceMetrics +from shared.notifier import TelegramNotifier from portfolio_manager.config import PortfolioConfig from portfolio_manager.portfolio import PortfolioTracker -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - ORDERS_STREAM = "orders" async def run() -> None: config = PortfolioConfig() + log = setup_logging("portfolio-manager", config.log_level, config.log_format) + metrics = ServiceMetrics("portfolio_manager") + notifier = TelegramNotifier(bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id) + broker = RedisBroker(config.redis_url) tracker = PortfolioTracker() + health = HealthCheckServer("portfolio-manager", port=config.health_port + 3) + health.register_check("redis", broker.ping) + await health.start() + metrics.service_up.labels(service="portfolio-manager").set(1) + last_id = "$" - logger.info("Portfolio manager started, listening on stream=%s", ORDERS_STREAM) + log.info("service_started", stream=ORDERS_STREAM) try: while True: @@ -31,20 +40,28 @@ async def run() -> None: if isinstance(event, OrderEvent): order = event.data tracker.apply_order(order) - logger.info( - "Applied order symbol=%s side=%s qty=%s price=%s", - order.symbol, - order.side, - order.quantity, - order.price, + log.info( + "order_applied", + symbol=order.symbol, + side=str(order.side), + quantity=str(order.quantity), + price=str(order.price), ) positions = tracker.get_all_positions() - logger.info("Current positions count=%d", len(positions)) - except Exception: - logger.exception("Failed to process message: %s", msg) + log.info("positions_updated", count=len(positions)) + metrics.events_processed.labels(service="portfolio-manager", event_type="order").inc() + except Exception as exc: + log.exception("message_processing_failed", error=str(exc)) + metrics.errors_total.labels(service="portfolio-manager", error_type="processing").inc() # Update last_id to the latest processed message id if broker returns ids # Since broker.read returns parsed payloads (not ids), we use "$" to get new msgs + except Exception as exc: + log.error("fatal_error", error=str(exc)) + await notifier.send_error(str(exc), "portfolio-manager") + raise finally: + metrics.service_up.labels(service="portfolio-manager").set(0) + await notifier.close() await broker.close() |
