summaryrefslogtreecommitdiff
path: root/services/portfolio-manager/src/portfolio_manager/main.py
blob: d60e6c91a4bc54e12e13016471743b64feb12ae7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""Portfolio Manager Service entry point."""

import asyncio
from decimal import Decimal

from shared.broker import RedisBroker
from shared.db import Database
from shared.events import Event, OrderEvent
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier

from portfolio_manager.config import PortfolioConfig
from portfolio_manager.portfolio import PortfolioTracker

ORDERS_STREAM = "orders"


async def save_snapshot(
    db: Database,
    tracker: PortfolioTracker,
    notifier: TelegramNotifier,
    log,
) -> None:
    """Compute and save a portfolio snapshot, then send a daily Telegram summary."""
    positions = tracker.get_all_positions()
    total_value = sum(p.quantity * p.current_price for p in positions)
    unrealized = sum(p.unrealized_pnl for p in positions)
    await db.insert_portfolio_snapshot(
        total_value=total_value,
        realized_pnl=Decimal("0"),  # TODO: track realized PnL
        unrealized_pnl=unrealized,
    )
    await notifier.send_daily_summary(positions, total_value, unrealized)
    log.info("snapshot_saved", total_value=str(total_value), positions=len(positions))


async def snapshot_loop(
    db: Database,
    tracker: PortfolioTracker,
    notifier: TelegramNotifier,
    interval_hours: int,
    log,
) -> None:
    """Periodically save portfolio snapshots and send daily summary."""
    while True:
        await asyncio.sleep(interval_hours * 3600)
        try:
            await save_snapshot(db, tracker, notifier, log)
        except Exception as exc:
            log.error("snapshot_failed", error=str(exc))


async def run() -> None:
    config = PortfolioConfig()
    log = setup_logging("portfolio-manager", config.log_level, config.log_format)
    metrics = ServiceMetrics("portfolio_manager")
    notifier = TelegramNotifier(
        bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id
    )

    broker = RedisBroker(config.redis_url)
    tracker = PortfolioTracker()

    health = HealthCheckServer(
        "portfolio-manager", port=config.health_port + 3, auth_token=config.metrics_auth_token
    )
    health.register_check("redis", broker.ping)
    await health.start()
    metrics.service_up.labels(service="portfolio-manager").set(1)

    db = Database(config.database_url)
    await db.connect()

    snapshot_task = asyncio.create_task(
        snapshot_loop(db, tracker, notifier, config.snapshot_interval_hours, log)
    )

    last_id = "$"
    log.info("service_started", stream=ORDERS_STREAM)

    try:
        while True:
            messages = await broker.read(ORDERS_STREAM, last_id=last_id, block=1000)
            for msg in messages:
                try:
                    event = Event.from_dict(msg)
                    if isinstance(event, OrderEvent):
                        order = event.data
                        tracker.apply_order(order)
                        log.info(
                            "order_applied",
                            symbol=order.symbol,
                            side=str(order.side),
                            quantity=str(order.quantity),
                            price=str(order.price),
                        )
                        positions = tracker.get_all_positions()
                        log.info("positions_updated", count=len(positions))
                        metrics.events_processed.labels(
                            service="portfolio-manager", event_type="order"
                        ).inc()
                except Exception as exc:
                    log.exception("message_processing_failed", error=str(exc))
                    metrics.errors_total.labels(
                        service="portfolio-manager", error_type="processing"
                    ).inc()
            # Update last_id to the latest processed message id if broker returns ids
            # Since broker.read returns parsed payloads (not ids), we use "$" to get new msgs
    except Exception as exc:
        log.error("fatal_error", error=str(exc))
        await notifier.send_error(str(exc), "portfolio-manager")
        raise
    finally:
        snapshot_task.cancel()
        metrics.service_up.labels(service="portfolio-manager").set(0)
        await notifier.close()
        await broker.close()
        await db.close()


def main() -> None:
    asyncio.run(run())


if __name__ == "__main__":
    main()