"""Portfolio Manager Service entry point.""" import asyncio from decimal import Decimal from shared.broker import RedisBroker from shared.db import Database from shared.events import Event, OrderEvent from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier from portfolio_manager.config import PortfolioConfig from portfolio_manager.portfolio import PortfolioTracker ORDERS_STREAM = "orders" async def save_snapshot( db: Database, tracker: PortfolioTracker, notifier: TelegramNotifier, log, ) -> None: """Compute and save a portfolio snapshot, then send a daily Telegram summary.""" positions = tracker.get_all_positions() total_value = sum(p.quantity * p.current_price for p in positions) unrealized = sum(p.unrealized_pnl for p in positions) await db.insert_portfolio_snapshot( total_value=total_value, realized_pnl=Decimal("0"), # TODO: track realized PnL unrealized_pnl=unrealized, ) await notifier.send_daily_summary(positions, total_value, unrealized) log.info("snapshot_saved", total_value=str(total_value), positions=len(positions)) async def snapshot_loop( db: Database, tracker: PortfolioTracker, notifier: TelegramNotifier, interval_hours: int, log, ) -> None: """Periodically save portfolio snapshots and send daily summary.""" while True: await asyncio.sleep(interval_hours * 3600) try: await save_snapshot(db, tracker, notifier, log) except Exception as exc: log.error("snapshot_failed", error=str(exc)) async def run() -> None: config = PortfolioConfig() log = setup_logging("portfolio-manager", config.log_level, config.log_format) metrics = ServiceMetrics("portfolio_manager") notifier = TelegramNotifier( bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id ) broker = RedisBroker(config.redis_url) tracker = PortfolioTracker() health = HealthCheckServer( "portfolio-manager", port=config.health_port + 3, auth_token=config.metrics_auth_token ) health.register_check("redis", broker.ping) await health.start() metrics.service_up.labels(service="portfolio-manager").set(1) db = Database(config.database_url) await db.connect() snapshot_task = asyncio.create_task( snapshot_loop(db, tracker, notifier, config.snapshot_interval_hours, log) ) last_id = "$" log.info("service_started", stream=ORDERS_STREAM) try: while True: messages = await broker.read(ORDERS_STREAM, last_id=last_id, block=1000) for msg in messages: try: event = Event.from_dict(msg) if isinstance(event, OrderEvent): order = event.data tracker.apply_order(order) log.info( "order_applied", symbol=order.symbol, side=str(order.side), quantity=str(order.quantity), price=str(order.price), ) positions = tracker.get_all_positions() log.info("positions_updated", count=len(positions)) metrics.events_processed.labels( service="portfolio-manager", event_type="order" ).inc() except Exception as exc: log.exception("message_processing_failed", error=str(exc)) metrics.errors_total.labels( service="portfolio-manager", error_type="processing" ).inc() # Update last_id to the latest processed message id if broker returns ids # Since broker.read returns parsed payloads (not ids), we use "$" to get new msgs except Exception as exc: log.error("fatal_error", error=str(exc)) await notifier.send_error(str(exc), "portfolio-manager") raise finally: snapshot_task.cancel() metrics.service_up.labels(service="portfolio-manager").set(0) await notifier.close() await broker.close() await db.close() def main() -> None: asyncio.run(run()) if __name__ == "__main__": main()