1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
"""Portfolio Manager Service entry point."""
import asyncio
from shared.broker import RedisBroker
from shared.events import Event, OrderEvent
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
from portfolio_manager.config import PortfolioConfig
from portfolio_manager.portfolio import PortfolioTracker
ORDERS_STREAM = "orders"
async def run() -> None:
config = PortfolioConfig()
log = setup_logging("portfolio-manager", config.log_level, config.log_format)
metrics = ServiceMetrics("portfolio_manager")
notifier = TelegramNotifier(
bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id
)
broker = RedisBroker(config.redis_url)
tracker = PortfolioTracker()
health = HealthCheckServer("portfolio-manager", port=config.health_port + 3)
health.register_check("redis", broker.ping)
await health.start()
metrics.service_up.labels(service="portfolio-manager").set(1)
last_id = "$"
log.info("service_started", stream=ORDERS_STREAM)
try:
while True:
messages = await broker.read(ORDERS_STREAM, last_id=last_id, block=1000)
for msg in messages:
try:
event = Event.from_dict(msg)
if isinstance(event, OrderEvent):
order = event.data
tracker.apply_order(order)
log.info(
"order_applied",
symbol=order.symbol,
side=str(order.side),
quantity=str(order.quantity),
price=str(order.price),
)
positions = tracker.get_all_positions()
log.info("positions_updated", count=len(positions))
metrics.events_processed.labels(
service="portfolio-manager", event_type="order"
).inc()
except Exception as exc:
log.exception("message_processing_failed", error=str(exc))
metrics.errors_total.labels(
service="portfolio-manager", error_type="processing"
).inc()
# Update last_id to the latest processed message id if broker returns ids
# Since broker.read returns parsed payloads (not ids), we use "$" to get new msgs
except Exception as exc:
log.error("fatal_error", error=str(exc))
await notifier.send_error(str(exc), "portfolio-manager")
raise
finally:
metrics.service_up.labels(service="portfolio-manager").set(0)
await notifier.close()
await broker.close()
def main() -> None:
asyncio.run(run())
if __name__ == "__main__":
main()
|