summaryrefslogtreecommitdiff
path: root/services/portfolio-manager/src/portfolio_manager/main.py
blob: a1c73be23c4e0edeecf2918ac5096fda5c907fab (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
"""Portfolio Manager Service entry point."""

import asyncio

from shared.broker import RedisBroker
from shared.events import Event, OrderEvent
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier

from portfolio_manager.config import PortfolioConfig
from portfolio_manager.portfolio import PortfolioTracker

ORDERS_STREAM = "orders"


async def run() -> None:
    config = PortfolioConfig()
    log = setup_logging("portfolio-manager", config.log_level, config.log_format)
    metrics = ServiceMetrics("portfolio_manager")
    notifier = TelegramNotifier(
        bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id
    )

    broker = RedisBroker(config.redis_url)
    tracker = PortfolioTracker()

    health = HealthCheckServer("portfolio-manager", port=config.health_port + 3)
    health.register_check("redis", broker.ping)
    await health.start()
    metrics.service_up.labels(service="portfolio-manager").set(1)

    last_id = "$"
    log.info("service_started", stream=ORDERS_STREAM)

    try:
        while True:
            messages = await broker.read(ORDERS_STREAM, last_id=last_id, block=1000)
            for msg in messages:
                try:
                    event = Event.from_dict(msg)
                    if isinstance(event, OrderEvent):
                        order = event.data
                        tracker.apply_order(order)
                        log.info(
                            "order_applied",
                            symbol=order.symbol,
                            side=str(order.side),
                            quantity=str(order.quantity),
                            price=str(order.price),
                        )
                        positions = tracker.get_all_positions()
                        log.info("positions_updated", count=len(positions))
                        metrics.events_processed.labels(
                            service="portfolio-manager", event_type="order"
                        ).inc()
                except Exception as exc:
                    log.exception("message_processing_failed", error=str(exc))
                    metrics.errors_total.labels(
                        service="portfolio-manager", error_type="processing"
                    ).inc()
            # Update last_id to the latest processed message id if broker returns ids
            # Since broker.read returns parsed payloads (not ids), we use "$" to get new msgs
    except Exception as exc:
        log.error("fatal_error", error=str(exc))
        await notifier.send_error(str(exc), "portfolio-manager")
        raise
    finally:
        metrics.service_up.labels(service="portfolio-manager").set(0)
        await notifier.close()
        await broker.close()


def main() -> None:
    asyncio.run(run())


if __name__ == "__main__":
    main()