From 8c11cae987292421840658585c0667706790c8ca Mon Sep 17 00:00:00 2001 From: TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> Date: Wed, 1 Apr 2026 17:15:23 +0900 Subject: feat(portfolio): add periodic portfolio snapshots and daily Telegram summary --- .../src/portfolio_manager/main.py | 46 ++++++++++++++++++++++ 1 file changed, 46 insertions(+) (limited to 'services/portfolio-manager/src/portfolio_manager') diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py index a1c73be..02df5d2 100644 --- a/services/portfolio-manager/src/portfolio_manager/main.py +++ b/services/portfolio-manager/src/portfolio_manager/main.py @@ -1,8 +1,10 @@ """Portfolio Manager Service entry point.""" import asyncio +from decimal import Decimal from shared.broker import RedisBroker +from shared.db import Database from shared.events import Event, OrderEvent from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging @@ -15,6 +17,41 @@ from portfolio_manager.portfolio import PortfolioTracker ORDERS_STREAM = "orders" +async def save_snapshot( + db: Database, + tracker: PortfolioTracker, + notifier: TelegramNotifier, + log, +) -> None: + """Compute and save a portfolio snapshot, then send a daily Telegram summary.""" + positions = tracker.get_all_positions() + total_value = sum(p.quantity * p.current_price for p in positions) + unrealized = sum(p.unrealized_pnl for p in positions) + await db.insert_portfolio_snapshot( + total_value=total_value, + realized_pnl=Decimal("0"), # TODO: track realized PnL + unrealized_pnl=unrealized, + ) + await notifier.send_daily_summary(positions, total_value, unrealized) + log.info("snapshot_saved", total_value=str(total_value), positions=len(positions)) + + +async def snapshot_loop( + db: Database, + tracker: PortfolioTracker, + notifier: TelegramNotifier, + interval_hours: int, + log, +) -> None: + """Periodically save portfolio snapshots and send daily summary.""" + while True: + await asyncio.sleep(interval_hours * 3600) + try: + await save_snapshot(db, tracker, notifier, log) + except Exception as exc: + log.error("snapshot_failed", error=str(exc)) + + async def run() -> None: config = PortfolioConfig() log = setup_logging("portfolio-manager", config.log_level, config.log_format) @@ -31,6 +68,13 @@ async def run() -> None: await health.start() metrics.service_up.labels(service="portfolio-manager").set(1) + db = Database(config.database_url) + await db.connect() + + snapshot_task = asyncio.create_task( + snapshot_loop(db, tracker, notifier, config.snapshot_interval_hours, log) + ) + last_id = "$" log.info("service_started", stream=ORDERS_STREAM) @@ -67,9 +111,11 @@ async def run() -> None: await notifier.send_error(str(exc), "portfolio-manager") raise finally: + snapshot_task.cancel() metrics.service_up.labels(service="portfolio-manager").set(0) await notifier.close() await broker.close() + await db.close() def main() -> None: -- cgit v1.2.3