summaryrefslogtreecommitdiff
path: root/services/portfolio-manager/src/portfolio_manager
diff options
context:
space:
mode:
Diffstat (limited to 'services/portfolio-manager/src/portfolio_manager')
-rw-r--r--services/portfolio-manager/src/portfolio_manager/main.py46
1 files changed, 46 insertions, 0 deletions
diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py
index a1c73be..02df5d2 100644
--- a/services/portfolio-manager/src/portfolio_manager/main.py
+++ b/services/portfolio-manager/src/portfolio_manager/main.py
@@ -1,8 +1,10 @@
"""Portfolio Manager Service entry point."""
import asyncio
+from decimal import Decimal
from shared.broker import RedisBroker
+from shared.db import Database
from shared.events import Event, OrderEvent
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
@@ -15,6 +17,41 @@ from portfolio_manager.portfolio import PortfolioTracker
ORDERS_STREAM = "orders"
+async def save_snapshot(
+ db: Database,
+ tracker: PortfolioTracker,
+ notifier: TelegramNotifier,
+ log,
+) -> None:
+ """Compute and save a portfolio snapshot, then send a daily Telegram summary."""
+ positions = tracker.get_all_positions()
+ total_value = sum(p.quantity * p.current_price for p in positions)
+ unrealized = sum(p.unrealized_pnl for p in positions)
+ await db.insert_portfolio_snapshot(
+ total_value=total_value,
+ realized_pnl=Decimal("0"), # TODO: track realized PnL
+ unrealized_pnl=unrealized,
+ )
+ await notifier.send_daily_summary(positions, total_value, unrealized)
+ log.info("snapshot_saved", total_value=str(total_value), positions=len(positions))
+
+
+async def snapshot_loop(
+ db: Database,
+ tracker: PortfolioTracker,
+ notifier: TelegramNotifier,
+ interval_hours: int,
+ log,
+) -> None:
+ """Periodically save portfolio snapshots and send daily summary."""
+ while True:
+ await asyncio.sleep(interval_hours * 3600)
+ try:
+ await save_snapshot(db, tracker, notifier, log)
+ except Exception as exc:
+ log.error("snapshot_failed", error=str(exc))
+
+
async def run() -> None:
config = PortfolioConfig()
log = setup_logging("portfolio-manager", config.log_level, config.log_format)
@@ -31,6 +68,13 @@ async def run() -> None:
await health.start()
metrics.service_up.labels(service="portfolio-manager").set(1)
+ db = Database(config.database_url)
+ await db.connect()
+
+ snapshot_task = asyncio.create_task(
+ snapshot_loop(db, tracker, notifier, config.snapshot_interval_hours, log)
+ )
+
last_id = "$"
log.info("service_started", stream=ORDERS_STREAM)
@@ -67,9 +111,11 @@ async def run() -> None:
await notifier.send_error(str(exc), "portfolio-manager")
raise
finally:
+ snapshot_task.cancel()
metrics.service_up.labels(service="portfolio-manager").set(0)
await notifier.close()
await broker.close()
+ await db.close()
def main() -> None: