1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
"""Portfolio Manager Service entry point."""
import asyncio
from decimal import Decimal
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import Event, OrderEvent
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
from portfolio_manager.config import PortfolioConfig
from portfolio_manager.portfolio import PortfolioTracker
ORDERS_STREAM = "orders"
# Health check port: base (HEALTH_PORT, default 8080) + offset
# data-collector: +0 (8080), strategy-engine: +1 (8081)
# order-executor: +2 (8082), portfolio-manager: +3 (8083)
HEALTH_PORT_OFFSET = 3
async def save_snapshot(
db: Database,
tracker: PortfolioTracker,
notifier: TelegramNotifier,
log,
) -> None:
"""Compute and save a portfolio snapshot, then send a daily Telegram summary."""
positions = tracker.get_all_positions()
total_value = sum(p.quantity * p.current_price for p in positions)
unrealized = sum(p.unrealized_pnl for p in positions)
await db.insert_portfolio_snapshot(
total_value=total_value,
realized_pnl=Decimal("0"), # TODO: track realized PnL
unrealized_pnl=unrealized,
)
await notifier.send_daily_summary(positions, total_value, unrealized)
log.info("snapshot_saved", total_value=str(total_value), positions=len(positions))
async def snapshot_loop(
db: Database,
tracker: PortfolioTracker,
notifier: TelegramNotifier,
interval_hours: int,
log,
) -> None:
"""Periodically save portfolio snapshots and send daily summary."""
while True:
try:
await save_snapshot(db, tracker, notifier, log)
except Exception as exc:
log.error("snapshot_failed", error=str(exc))
await asyncio.sleep(interval_hours * 3600)
async def run() -> None:
config = PortfolioConfig()
log = setup_logging("portfolio-manager", config.log_level, config.log_format)
metrics = ServiceMetrics("portfolio_manager")
notifier = TelegramNotifier(
bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id
)
broker = RedisBroker(config.redis_url)
tracker = PortfolioTracker()
health = HealthCheckServer(
"portfolio-manager",
port=config.health_port + HEALTH_PORT_OFFSET,
auth_token=config.metrics_auth_token,
)
health.register_check("redis", broker.ping)
await health.start()
metrics.service_up.labels(service="portfolio-manager").set(1)
db = Database(config.database_url)
await db.connect()
snapshot_task = asyncio.create_task(
snapshot_loop(db, tracker, notifier, config.snapshot_interval_hours, log)
)
last_id = "$"
log.info("service_started", stream=ORDERS_STREAM)
try:
while True:
messages = await broker.read(ORDERS_STREAM, last_id=last_id, block=1000)
for msg in messages:
try:
event = Event.from_dict(msg)
if isinstance(event, OrderEvent):
order = event.data
tracker.apply_order(order)
log.info(
"order_applied",
symbol=order.symbol,
side=str(order.side),
quantity=str(order.quantity),
price=str(order.price),
)
positions = tracker.get_all_positions()
log.info("positions_updated", count=len(positions))
metrics.events_processed.labels(
service="portfolio-manager", event_type="order"
).inc()
except Exception as exc:
log.exception("message_processing_failed", error=str(exc))
metrics.errors_total.labels(
service="portfolio-manager", error_type="processing"
).inc()
# Update last_id to the latest processed message id if broker returns ids
# Since broker.read returns parsed payloads (not ids), we use "$" to get new msgs
except Exception as exc:
log.error("fatal_error", error=str(exc))
await notifier.send_error(str(exc), "portfolio-manager")
raise
finally:
snapshot_task.cancel()
metrics.service_up.labels(service="portfolio-manager").set(0)
await notifier.close()
await broker.close()
await db.close()
def main() -> None:
asyncio.run(run())
if __name__ == "__main__":
main()
|