"""Portfolio Manager Service entry point.""" import asyncio from shared.broker import RedisBroker from shared.db import Database from shared.events import Event, OrderEvent from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier from portfolio_manager.config import PortfolioConfig from portfolio_manager.portfolio import PortfolioTracker ORDERS_STREAM = "orders" # Health check port: base (HEALTH_PORT, default 8080) + offset # data-collector: +0 (8080), strategy-engine: +1 (8081) # order-executor: +2 (8082), portfolio-manager: +3 (8083) HEALTH_PORT_OFFSET = 3 async def save_snapshot( db: Database, tracker: PortfolioTracker, notifier: TelegramNotifier, log, ) -> None: """Compute and save a portfolio snapshot, then send a daily Telegram summary.""" positions = tracker.get_all_positions() total_value = sum(p.quantity * p.current_price for p in positions) unrealized = sum(p.unrealized_pnl for p in positions) await db.insert_portfolio_snapshot( total_value=total_value, realized_pnl=tracker.realized_pnl, unrealized_pnl=unrealized, ) await notifier.send_daily_summary(positions, total_value, unrealized) log.info("snapshot_saved", total_value=str(total_value), positions=len(positions)) async def snapshot_loop( db: Database, tracker: PortfolioTracker, notifier: TelegramNotifier, interval_hours: int, log, ) -> None: """Periodically save portfolio snapshots and send daily summary.""" while True: try: await save_snapshot(db, tracker, notifier, log) except Exception as exc: log.error("snapshot_failed", error=str(exc)) await asyncio.sleep(interval_hours * 3600) async def run() -> None: config = PortfolioConfig() log = setup_logging("portfolio-manager", config.log_level, config.log_format) metrics = ServiceMetrics("portfolio_manager") notifier = TelegramNotifier( bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id ) broker = RedisBroker(config.redis_url) tracker = PortfolioTracker() health = HealthCheckServer( "portfolio-manager", port=config.health_port + HEALTH_PORT_OFFSET, auth_token=config.metrics_auth_token, ) health.register_check("redis", broker.ping) await health.start() metrics.service_up.labels(service="portfolio-manager").set(1) db = Database(config.database_url) await db.connect() snapshot_task = asyncio.create_task( snapshot_loop(db, tracker, notifier, config.snapshot_interval_hours, log) ) GROUP = "portfolio-manager" CONSUMER = "portfolio-1" log.info("service_started", stream=ORDERS_STREAM) await broker.ensure_group(ORDERS_STREAM, GROUP) # Process pending messages first (from previous crash) pending = await broker.read_pending(ORDERS_STREAM, GROUP, CONSUMER) for msg_id, msg in pending: try: event = Event.from_dict(msg) if isinstance(event, OrderEvent): order = event.data tracker.apply_order(order) log.info( "pending_order_applied", symbol=order.symbol, side=str(order.side), quantity=str(order.quantity), price=str(order.price), ) metrics.events_processed.labels( service="portfolio-manager", event_type="order" ).inc() await broker.ack(ORDERS_STREAM, GROUP, msg_id) except Exception as exc: log.error("pending_process_failed", error=str(exc), msg_id=msg_id) metrics.errors_total.labels(service="portfolio-manager", error_type="processing").inc() try: while True: messages = await broker.read_group(ORDERS_STREAM, GROUP, CONSUMER, count=10, block=1000) for msg_id, msg in messages: try: event = Event.from_dict(msg) if isinstance(event, OrderEvent): order = event.data tracker.apply_order(order) log.info( "order_applied", symbol=order.symbol, side=str(order.side), quantity=str(order.quantity), price=str(order.price), ) positions = tracker.get_all_positions() log.info("positions_updated", count=len(positions)) metrics.events_processed.labels( service="portfolio-manager", event_type="order" ).inc() await broker.ack(ORDERS_STREAM, GROUP, msg_id) except Exception as exc: log.exception("message_processing_failed", error=str(exc), msg_id=msg_id) metrics.errors_total.labels( service="portfolio-manager", error_type="processing" ).inc() except Exception as exc: log.error("fatal_error", error=str(exc)) await notifier.send_error(str(exc), "portfolio-manager") raise finally: snapshot_task.cancel() metrics.service_up.labels(service="portfolio-manager").set(0) await notifier.close() await broker.close() await db.close() def main() -> None: asyncio.run(run()) if __name__ == "__main__": main()