1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
|
"""Portfolio Manager Service entry point."""
import asyncio
import sqlalchemy.exc
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import Event, OrderEvent
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
from shared.shutdown import GracefulShutdown
from portfolio_manager.config import PortfolioConfig
from portfolio_manager.portfolio import PortfolioTracker
ORDERS_STREAM = "orders"
# Health check port: base (HEALTH_PORT, default 8080) + offset
# data-collector: +0 (8080), strategy-engine: +1 (8081)
# order-executor: +2 (8082), portfolio-manager: +3 (8083)
HEALTH_PORT_OFFSET = 3
async def save_snapshot(
db: Database,
tracker: PortfolioTracker,
notifier: TelegramNotifier,
log,
) -> None:
"""Compute and save a portfolio snapshot, then send a daily Telegram summary."""
positions = tracker.get_all_positions()
total_value = sum(p.quantity * p.current_price for p in positions)
unrealized = sum(p.unrealized_pnl for p in positions)
await db.insert_portfolio_snapshot(
total_value=total_value,
realized_pnl=tracker.realized_pnl,
unrealized_pnl=unrealized,
)
await notifier.send_daily_summary(positions, total_value, unrealized)
log.info("snapshot_saved", total_value=str(total_value), positions=len(positions))
async def snapshot_loop(
db: Database,
tracker: PortfolioTracker,
notifier: TelegramNotifier,
interval_hours: int,
log,
) -> None:
"""Periodically save portfolio snapshots and send daily summary."""
while True:
try:
await save_snapshot(db, tracker, notifier, log)
except (sqlalchemy.exc.OperationalError, ConnectionError, TimeoutError) as exc:
log.warning("snapshot_db_error", error=str(exc))
except (ValueError, KeyError, TypeError) as exc:
log.warning("snapshot_data_error", error=str(exc))
except Exception as exc:
log.error("snapshot_failed", error=str(exc), exc_info=True)
await asyncio.sleep(interval_hours * 3600)
async def run() -> None:
config = PortfolioConfig()
log = setup_logging("portfolio-manager", config.log_level, config.log_format)
metrics = ServiceMetrics("portfolio_manager")
notifier = TelegramNotifier(
bot_token=config.telegram_bot_token.get_secret_value(), chat_id=config.telegram_chat_id
)
broker = RedisBroker(config.redis_url.get_secret_value())
tracker = PortfolioTracker()
health = HealthCheckServer(
"portfolio-manager",
port=config.health_port + HEALTH_PORT_OFFSET,
auth_token=config.metrics_auth_token,
)
health.register_check("redis", broker.ping)
await health.start()
metrics.service_up.labels(service="portfolio-manager").set(1)
db = Database(config.database_url.get_secret_value())
await db.connect()
snapshot_task = asyncio.create_task(
snapshot_loop(db, tracker, notifier, config.snapshot_interval_hours, log)
)
shutdown = GracefulShutdown()
shutdown.install_handlers()
GROUP = "portfolio-manager"
CONSUMER = "portfolio-1"
log.info("service_started", stream=ORDERS_STREAM)
await broker.ensure_group(ORDERS_STREAM, GROUP)
# Process pending messages first (from previous crash)
pending = await broker.read_pending(ORDERS_STREAM, GROUP, CONSUMER)
for msg_id, msg in pending:
try:
event = Event.from_dict(msg)
if isinstance(event, OrderEvent):
order = event.data
tracker.apply_order(order)
log.info(
"pending_order_applied",
symbol=order.symbol,
side=str(order.side),
quantity=str(order.quantity),
price=str(order.price),
)
metrics.events_processed.labels(
service="portfolio-manager", event_type="order"
).inc()
await broker.ack(ORDERS_STREAM, GROUP, msg_id)
except (ValueError, KeyError, TypeError) as exc:
log.warning("pending_parse_error", error=str(exc), msg_id=msg_id)
await broker.ack(ORDERS_STREAM, GROUP, msg_id)
metrics.errors_total.labels(service="portfolio-manager", error_type="validation").inc()
except Exception as exc:
log.error("pending_process_failed", error=str(exc), msg_id=msg_id, exc_info=True)
metrics.errors_total.labels(service="portfolio-manager", error_type="processing").inc()
try:
while not shutdown.is_shutting_down:
messages = await broker.read_group(ORDERS_STREAM, GROUP, CONSUMER, count=10, block=1000)
for msg_id, msg in messages:
try:
event = Event.from_dict(msg)
if isinstance(event, OrderEvent):
order = event.data
tracker.apply_order(order)
log.info(
"order_applied",
symbol=order.symbol,
side=str(order.side),
quantity=str(order.quantity),
price=str(order.price),
)
positions = tracker.get_all_positions()
log.info("positions_updated", count=len(positions))
metrics.events_processed.labels(
service="portfolio-manager", event_type="order"
).inc()
await broker.ack(ORDERS_STREAM, GROUP, msg_id)
except (ValueError, KeyError, TypeError) as exc:
log.warning("message_parse_error", error=str(exc), msg_id=msg_id)
await broker.ack(ORDERS_STREAM, GROUP, msg_id)
metrics.errors_total.labels(
service="portfolio-manager", error_type="validation"
).inc()
except Exception as exc:
log.error(
"message_processing_failed", error=str(exc), msg_id=msg_id, exc_info=True
)
metrics.errors_total.labels(
service="portfolio-manager", error_type="processing"
).inc()
except Exception as exc:
log.error("fatal_error", error=str(exc), exc_info=True)
await notifier.send_error(str(exc), "portfolio-manager")
raise
finally:
snapshot_task.cancel()
metrics.service_up.labels(service="portfolio-manager").set(0)
await notifier.close()
await broker.close()
await db.close()
def main() -> None:
asyncio.run(run())
if __name__ == "__main__":
main()
|