From 13a9b2c80bb3eb1353cf2d49bdbf7d0dbd858ccc Mon Sep 17 00:00:00 2001 From: TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> Date: Wed, 1 Apr 2026 17:56:50 +0900 Subject: feat(broker): add Redis consumer groups for reliable message processing --- services/order-executor/src/order_executor/main.py | 40 ++++++++++++++++----- .../src/portfolio_manager/main.py | 41 ++++++++++++++++++---- 2 files changed, 66 insertions(+), 15 deletions(-) (limited to 'services') diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py index 4a51d5d..930517e 100644 --- a/services/order-executor/src/order_executor/main.py +++ b/services/order-executor/src/order_executor/main.py @@ -62,7 +62,8 @@ async def run() -> None: dry_run=config.dry_run, ) - last_id = "$" + GROUP = "order-executor" + CONSUMER = "executor-1" stream = "signals" health = HealthCheckServer( @@ -76,10 +77,35 @@ async def run() -> None: log.info("service_started", stream=stream, dry_run=config.dry_run) + await broker.ensure_group(stream, GROUP) + + # Process pending messages first (from previous crash) + pending = await broker.read_pending(stream, GROUP, CONSUMER) + for msg_id, msg in pending: + try: + event = Event.from_dict(msg) + if event.type == EventType.SIGNAL: + signal = event.data + log.info( + "processing_pending_signal", signal_id=str(signal.id), symbol=signal.symbol + ) + await executor.execute(signal) + metrics.events_processed.labels( + service="order-executor", event_type="signal" + ).inc() + await broker.ack(stream, GROUP, msg_id) + except Exception as exc: + log.error("pending_process_failed", error=str(exc), msg_id=msg_id) + metrics.errors_total.labels( + service="order-executor", error_type="processing" + ).inc() + try: while True: - messages = await broker.read(stream, last_id=last_id, count=10, block=5000) - for msg in messages: + messages = await broker.read_group( + stream, GROUP, CONSUMER, count=10, block=5000 + ) + for msg_id, msg in messages: try: event = Event.from_dict(msg) if event.type == EventType.SIGNAL: @@ -91,16 +117,12 @@ async def run() -> None: metrics.events_processed.labels( service="order-executor", event_type="signal" ).inc() + await broker.ack(stream, GROUP, msg_id) except Exception as exc: - log.error("message_processing_failed", error=str(exc)) + log.error("message_processing_failed", error=str(exc), msg_id=msg_id) metrics.errors_total.labels( service="order-executor", error_type="processing" ).inc() - if messages: - # Advance last_id to avoid re-reading — broker.read returns decoded dicts, - # so we track progress by re-reading with "0" for replaying or "$" for new only. - # Since we block on "$" we get only new messages each iteration. - pass except Exception as exc: log.error("fatal_error", error=str(exc)) await notifier.send_error(str(exc), "order-executor") diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py index a7f1a14..87e4c64 100644 --- a/services/portfolio-manager/src/portfolio_manager/main.py +++ b/services/portfolio-manager/src/portfolio_manager/main.py @@ -84,13 +84,43 @@ async def run() -> None: snapshot_loop(db, tracker, notifier, config.snapshot_interval_hours, log) ) - last_id = "$" + GROUP = "portfolio-manager" + CONSUMER = "portfolio-1" log.info("service_started", stream=ORDERS_STREAM) + await broker.ensure_group(ORDERS_STREAM, GROUP) + + # Process pending messages first (from previous crash) + pending = await broker.read_pending(ORDERS_STREAM, GROUP, CONSUMER) + for msg_id, msg in pending: + try: + event = Event.from_dict(msg) + if isinstance(event, OrderEvent): + order = event.data + tracker.apply_order(order) + log.info( + "pending_order_applied", + symbol=order.symbol, + side=str(order.side), + quantity=str(order.quantity), + price=str(order.price), + ) + metrics.events_processed.labels( + service="portfolio-manager", event_type="order" + ).inc() + await broker.ack(ORDERS_STREAM, GROUP, msg_id) + except Exception as exc: + log.error("pending_process_failed", error=str(exc), msg_id=msg_id) + metrics.errors_total.labels( + service="portfolio-manager", error_type="processing" + ).inc() + try: while True: - messages = await broker.read(ORDERS_STREAM, last_id=last_id, block=1000) - for msg in messages: + messages = await broker.read_group( + ORDERS_STREAM, GROUP, CONSUMER, count=10, block=1000 + ) + for msg_id, msg in messages: try: event = Event.from_dict(msg) if isinstance(event, OrderEvent): @@ -108,13 +138,12 @@ async def run() -> None: metrics.events_processed.labels( service="portfolio-manager", event_type="order" ).inc() + await broker.ack(ORDERS_STREAM, GROUP, msg_id) except Exception as exc: - log.exception("message_processing_failed", error=str(exc)) + log.exception("message_processing_failed", error=str(exc), msg_id=msg_id) metrics.errors_total.labels( service="portfolio-manager", error_type="processing" ).inc() - # Update last_id to the latest processed message id if broker returns ids - # Since broker.read returns parsed payloads (not ids), we use "$" to get new msgs except Exception as exc: log.error("fatal_error", error=str(exc)) await notifier.send_error(str(exc), "portfolio-manager") -- cgit v1.2.3