From 13a9b2c80bb3eb1353cf2d49bdbf7d0dbd858ccc Mon Sep 17 00:00:00 2001 From: TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> Date: Wed, 1 Apr 2026 17:56:50 +0900 Subject: feat(broker): add Redis consumer groups for reliable message processing --- services/order-executor/src/order_executor/main.py | 40 +++++++++++++++++----- 1 file changed, 31 insertions(+), 9 deletions(-) (limited to 'services/order-executor/src/order_executor') diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py index 4a51d5d..930517e 100644 --- a/services/order-executor/src/order_executor/main.py +++ b/services/order-executor/src/order_executor/main.py @@ -62,7 +62,8 @@ async def run() -> None: dry_run=config.dry_run, ) - last_id = "$" + GROUP = "order-executor" + CONSUMER = "executor-1" stream = "signals" health = HealthCheckServer( @@ -76,10 +77,35 @@ async def run() -> None: log.info("service_started", stream=stream, dry_run=config.dry_run) + await broker.ensure_group(stream, GROUP) + + # Process pending messages first (from previous crash) + pending = await broker.read_pending(stream, GROUP, CONSUMER) + for msg_id, msg in pending: + try: + event = Event.from_dict(msg) + if event.type == EventType.SIGNAL: + signal = event.data + log.info( + "processing_pending_signal", signal_id=str(signal.id), symbol=signal.symbol + ) + await executor.execute(signal) + metrics.events_processed.labels( + service="order-executor", event_type="signal" + ).inc() + await broker.ack(stream, GROUP, msg_id) + except Exception as exc: + log.error("pending_process_failed", error=str(exc), msg_id=msg_id) + metrics.errors_total.labels( + service="order-executor", error_type="processing" + ).inc() + try: while True: - messages = await broker.read(stream, last_id=last_id, count=10, block=5000) - for msg in messages: + messages = await broker.read_group( + stream, GROUP, CONSUMER, count=10, block=5000 + ) + for msg_id, msg in messages: try: event = Event.from_dict(msg) if event.type == EventType.SIGNAL: @@ -91,16 +117,12 @@ async def run() -> None: metrics.events_processed.labels( service="order-executor", event_type="signal" ).inc() + await broker.ack(stream, GROUP, msg_id) except Exception as exc: - log.error("message_processing_failed", error=str(exc)) + log.error("message_processing_failed", error=str(exc), msg_id=msg_id) metrics.errors_total.labels( service="order-executor", error_type="processing" ).inc() - if messages: - # Advance last_id to avoid re-reading — broker.read returns decoded dicts, - # so we track progress by re-reading with "0" for replaying or "$" for new only. - # Since we block on "$" we get only new messages each iteration. - pass except Exception as exc: log.error("fatal_error", error=str(exc)) await notifier.send_error(str(exc), "order-executor") -- cgit v1.2.3