"""Portfolio Manager Service entry point.""" import asyncio from decimal import Decimal from shared.broker import RedisBroker from shared.db import Database from shared.events import Event, OrderEvent from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier from portfolio_manager.config import PortfolioConfig from portfolio_manager.portfolio import PortfolioTracker ORDERS_STREAM = "orders" # Health check port: base (HEALTH_PORT, default 8080) + offset # data-collector: +0 (8080), strategy-engine: +1 (8081) # order-executor: +2 (8082), portfolio-manager: +3 (8083) HEALTH_PORT_OFFSET = 3 async def save_snapshot( db: Database, tracker: PortfolioTracker, notifier: TelegramNotifier, log, ) -> None: """Compute and save a portfolio snapshot, then send a daily Telegram summary.""" positions = tracker.get_all_positions() total_value = sum(p.quantity * p.current_price for p in positions) unrealized = sum(p.unrealized_pnl for p in positions) await db.insert_portfolio_snapshot( total_value=total_value, realized_pnl=tracker.realized_pnl, unrealized_pnl=unrealized, ) await notifier.send_daily_summary(positions, total_value, unrealized) log.info("snapshot_saved", total_value=str(total_value), positions=len(positions)) async def snapshot_loop( db: Database, tracker: PortfolioTracker, notifier: TelegramNotifier, interval_hours: int, log, ) -> None: """Periodically save portfolio snapshots and send daily summary.""" while True: try: await save_snapshot(db, tracker, notifier, log) except Exception as exc: log.error("snapshot_failed", error=str(exc)) await asyncio.sleep(interval_hours * 3600) async def run() -> None: config = PortfolioConfig() log = setup_logging("portfolio-manager", config.log_level, config.log_format) metrics = ServiceMetrics("portfolio_manager") notifier = TelegramNotifier( bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id ) broker = RedisBroker(config.redis_url) tracker = PortfolioTracker() health = HealthCheckServer( "portfolio-manager", port=config.health_port + HEALTH_PORT_OFFSET, auth_token=config.metrics_auth_token, ) health.register_check("redis", broker.ping) await health.start() metrics.service_up.labels(service="portfolio-manager").set(1) db = Database(config.database_url) await db.connect() snapshot_task = asyncio.create_task( snapshot_loop(db, tracker, notifier, config.snapshot_interval_hours, log) ) GROUP = "portfolio-manager" CONSUMER = "portfolio-1" log.info("service_started", stream=ORDERS_STREAM) await broker.ensure_group(ORDERS_STREAM, GROUP) # Process pending messages first (from previous crash) pending = await broker.read_pending(ORDERS_STREAM, GROUP, CONSUMER) for msg_id, msg in pending: try: event = Event.from_dict(msg) if isinstance(event, OrderEvent): order = event.data tracker.apply_order(order) log.info( "pending_order_applied", symbol=order.symbol, side=str(order.side), quantity=str(order.quantity), price=str(order.price), ) metrics.events_processed.labels( service="portfolio-manager", event_type="order" ).inc() await broker.ack(ORDERS_STREAM, GROUP, msg_id) except Exception as exc: log.error("pending_process_failed", error=str(exc), msg_id=msg_id) metrics.errors_total.labels( service="portfolio-manager", error_type="processing" ).inc() try: while True: messages = await broker.read_group( ORDERS_STREAM, GROUP, CONSUMER, count=10, block=1000 ) for msg_id, msg in messages: try: event = Event.from_dict(msg) if isinstance(event, OrderEvent): order = event.data tracker.apply_order(order) log.info( "order_applied", symbol=order.symbol, side=str(order.side), quantity=str(order.quantity), price=str(order.price), ) positions = tracker.get_all_positions() log.info("positions_updated", count=len(positions)) metrics.events_processed.labels( service="portfolio-manager", event_type="order" ).inc() await broker.ack(ORDERS_STREAM, GROUP, msg_id) except Exception as exc: log.exception("message_processing_failed", error=str(exc), msg_id=msg_id) metrics.errors_total.labels( service="portfolio-manager", error_type="processing" ).inc() except Exception as exc: log.error("fatal_error", error=str(exc)) await notifier.send_error(str(exc), "portfolio-manager") raise finally: snapshot_task.cancel() metrics.service_up.labels(service="portfolio-manager").set(0) await notifier.close() await broker.close() await db.close() def main() -> None: asyncio.run(run()) if __name__ == "__main__": main()