diff options
Diffstat (limited to 'services/portfolio-manager/src')
| -rw-r--r-- | services/portfolio-manager/src/portfolio_manager/main.py | 41 |
1 files changed, 35 insertions, 6 deletions
diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py index a7f1a14..87e4c64 100644 --- a/services/portfolio-manager/src/portfolio_manager/main.py +++ b/services/portfolio-manager/src/portfolio_manager/main.py @@ -84,13 +84,43 @@ async def run() -> None: snapshot_loop(db, tracker, notifier, config.snapshot_interval_hours, log) ) - last_id = "$" + GROUP = "portfolio-manager" + CONSUMER = "portfolio-1" log.info("service_started", stream=ORDERS_STREAM) + await broker.ensure_group(ORDERS_STREAM, GROUP) + + # Process pending messages first (from previous crash) + pending = await broker.read_pending(ORDERS_STREAM, GROUP, CONSUMER) + for msg_id, msg in pending: + try: + event = Event.from_dict(msg) + if isinstance(event, OrderEvent): + order = event.data + tracker.apply_order(order) + log.info( + "pending_order_applied", + symbol=order.symbol, + side=str(order.side), + quantity=str(order.quantity), + price=str(order.price), + ) + metrics.events_processed.labels( + service="portfolio-manager", event_type="order" + ).inc() + await broker.ack(ORDERS_STREAM, GROUP, msg_id) + except Exception as exc: + log.error("pending_process_failed", error=str(exc), msg_id=msg_id) + metrics.errors_total.labels( + service="portfolio-manager", error_type="processing" + ).inc() + try: while True: - messages = await broker.read(ORDERS_STREAM, last_id=last_id, block=1000) - for msg in messages: + messages = await broker.read_group( + ORDERS_STREAM, GROUP, CONSUMER, count=10, block=1000 + ) + for msg_id, msg in messages: try: event = Event.from_dict(msg) if isinstance(event, OrderEvent): @@ -108,13 +138,12 @@ async def run() -> None: metrics.events_processed.labels( service="portfolio-manager", event_type="order" ).inc() + await broker.ack(ORDERS_STREAM, GROUP, msg_id) except Exception as exc: - log.exception("message_processing_failed", error=str(exc)) + log.exception("message_processing_failed", error=str(exc), msg_id=msg_id) metrics.errors_total.labels( service="portfolio-manager", error_type="processing" ).inc() - # Update last_id to the latest processed message id if broker returns ids - # Since broker.read returns parsed payloads (not ids), we use "$" to get new msgs except Exception as exc: log.error("fatal_error", error=str(exc)) await notifier.send_error(str(exc), "portfolio-manager") |
