summaryrefslogtreecommitdiff
path: root/services/portfolio-manager
diff options
context:
space:
mode:
Diffstat (limited to 'services/portfolio-manager')
-rw-r--r--services/portfolio-manager/src/portfolio_manager/main.py46
-rw-r--r--services/portfolio-manager/tests/test_snapshot.py67
2 files changed, 113 insertions, 0 deletions
diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py
index a1c73be..02df5d2 100644
--- a/services/portfolio-manager/src/portfolio_manager/main.py
+++ b/services/portfolio-manager/src/portfolio_manager/main.py
@@ -1,8 +1,10 @@
"""Portfolio Manager Service entry point."""
import asyncio
+from decimal import Decimal
from shared.broker import RedisBroker
+from shared.db import Database
from shared.events import Event, OrderEvent
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
@@ -15,6 +17,41 @@ from portfolio_manager.portfolio import PortfolioTracker
ORDERS_STREAM = "orders"
+async def save_snapshot(
+ db: Database,
+ tracker: PortfolioTracker,
+ notifier: TelegramNotifier,
+ log,
+) -> None:
+ """Compute and save a portfolio snapshot, then send a daily Telegram summary."""
+ positions = tracker.get_all_positions()
+ total_value = sum(p.quantity * p.current_price for p in positions)
+ unrealized = sum(p.unrealized_pnl for p in positions)
+ await db.insert_portfolio_snapshot(
+ total_value=total_value,
+ realized_pnl=Decimal("0"), # TODO: track realized PnL
+ unrealized_pnl=unrealized,
+ )
+ await notifier.send_daily_summary(positions, total_value, unrealized)
+ log.info("snapshot_saved", total_value=str(total_value), positions=len(positions))
+
+
+async def snapshot_loop(
+ db: Database,
+ tracker: PortfolioTracker,
+ notifier: TelegramNotifier,
+ interval_hours: int,
+ log,
+) -> None:
+ """Periodically save portfolio snapshots and send daily summary."""
+ while True:
+ await asyncio.sleep(interval_hours * 3600)
+ try:
+ await save_snapshot(db, tracker, notifier, log)
+ except Exception as exc:
+ log.error("snapshot_failed", error=str(exc))
+
+
async def run() -> None:
config = PortfolioConfig()
log = setup_logging("portfolio-manager", config.log_level, config.log_format)
@@ -31,6 +68,13 @@ async def run() -> None:
await health.start()
metrics.service_up.labels(service="portfolio-manager").set(1)
+ db = Database(config.database_url)
+ await db.connect()
+
+ snapshot_task = asyncio.create_task(
+ snapshot_loop(db, tracker, notifier, config.snapshot_interval_hours, log)
+ )
+
last_id = "$"
log.info("service_started", stream=ORDERS_STREAM)
@@ -67,9 +111,11 @@ async def run() -> None:
await notifier.send_error(str(exc), "portfolio-manager")
raise
finally:
+ snapshot_task.cancel()
metrics.service_up.labels(service="portfolio-manager").set(0)
await notifier.close()
await broker.close()
+ await db.close()
def main() -> None:
diff --git a/services/portfolio-manager/tests/test_snapshot.py b/services/portfolio-manager/tests/test_snapshot.py
new file mode 100644
index 0000000..89d23d7
--- /dev/null
+++ b/services/portfolio-manager/tests/test_snapshot.py
@@ -0,0 +1,67 @@
+"""Tests for save_snapshot in portfolio-manager."""
+
+import pytest
+from decimal import Decimal
+from unittest.mock import AsyncMock, MagicMock
+
+from shared.models import Position
+
+
+class TestSaveSnapshot:
+ @pytest.mark.asyncio
+ async def test_save_snapshot_saves_to_db_and_notifies(self):
+ from portfolio_manager.main import save_snapshot
+
+ pos = Position(
+ symbol="BTCUSDT",
+ quantity=Decimal("0.5"),
+ avg_entry_price=Decimal("50000"),
+ current_price=Decimal("52000"),
+ )
+
+ tracker = MagicMock()
+ tracker.get_all_positions.return_value = [pos]
+
+ db = AsyncMock()
+ notifier = AsyncMock()
+ log = MagicMock()
+
+ await save_snapshot(db, tracker, notifier, log)
+
+ expected_total = Decimal("0.5") * Decimal("52000") # 26000
+ expected_unrealized = Decimal("0.5") * (Decimal("52000") - Decimal("50000")) # 1000
+
+ db.insert_portfolio_snapshot.assert_awaited_once_with(
+ total_value=expected_total,
+ realized_pnl=Decimal("0"),
+ unrealized_pnl=expected_unrealized,
+ )
+ notifier.send_daily_summary.assert_awaited_once_with(
+ [pos], expected_total, expected_unrealized
+ )
+ log.info.assert_called_once_with(
+ "snapshot_saved",
+ total_value=str(expected_total),
+ positions=1,
+ )
+
+ @pytest.mark.asyncio
+ async def test_save_snapshot_empty_positions(self):
+ from portfolio_manager.main import save_snapshot
+
+ tracker = MagicMock()
+ tracker.get_all_positions.return_value = []
+
+ db = AsyncMock()
+ notifier = AsyncMock()
+ log = MagicMock()
+
+ await save_snapshot(db, tracker, notifier, log)
+
+ db.insert_portfolio_snapshot.assert_awaited_once_with(
+ total_value=Decimal("0"),
+ realized_pnl=Decimal("0"),
+ unrealized_pnl=Decimal("0"),
+ )
+ notifier.send_daily_summary.assert_awaited_once()
+ log.info.assert_called_once()