"""Portfolio Manager Service entry point.""" import asyncio from shared.broker import RedisBroker from shared.events import Event, OrderEvent from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier from portfolio_manager.config import PortfolioConfig from portfolio_manager.portfolio import PortfolioTracker ORDERS_STREAM = "orders" async def run() -> None: config = PortfolioConfig() log = setup_logging("portfolio-manager", config.log_level, config.log_format) metrics = ServiceMetrics("portfolio_manager") notifier = TelegramNotifier( bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id ) broker = RedisBroker(config.redis_url) tracker = PortfolioTracker() health = HealthCheckServer("portfolio-manager", port=config.health_port + 3) health.register_check("redis", broker.ping) await health.start() metrics.service_up.labels(service="portfolio-manager").set(1) last_id = "$" log.info("service_started", stream=ORDERS_STREAM) try: while True: messages = await broker.read(ORDERS_STREAM, last_id=last_id, block=1000) for msg in messages: try: event = Event.from_dict(msg) if isinstance(event, OrderEvent): order = event.data tracker.apply_order(order) log.info( "order_applied", symbol=order.symbol, side=str(order.side), quantity=str(order.quantity), price=str(order.price), ) positions = tracker.get_all_positions() log.info("positions_updated", count=len(positions)) metrics.events_processed.labels( service="portfolio-manager", event_type="order" ).inc() except Exception as exc: log.exception("message_processing_failed", error=str(exc)) metrics.errors_total.labels( service="portfolio-manager", error_type="processing" ).inc() # Update last_id to the latest processed message id if broker returns ids # Since broker.read returns parsed payloads (not ids), we use "$" to get new msgs except Exception as exc: log.error("fatal_error", error=str(exc)) await notifier.send_error(str(exc), "portfolio-manager") raise finally: metrics.service_up.labels(service="portfolio-manager").set(0) await notifier.close() await broker.close() def main() -> None: asyncio.run(run()) if __name__ == "__main__": main()