summaryrefslogtreecommitdiff
path: root/shared/src
diff options
context:
space:
mode:
authorTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 17:56:50 +0900
committerTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 17:56:50 +0900
commit13a9b2c80bb3eb1353cf2d49bdbf7d0dbd858ccc (patch)
tree4595b83f1ba4fe5d1bdf4694f53496120956085a /shared/src
parentfa7e1dc44787592da647bdda0a63310be0cfcc8b (diff)
feat(broker): add Redis consumer groups for reliable message processing
Diffstat (limited to 'shared/src')
-rw-r--r--shared/src/shared/broker.py66
1 files changed, 65 insertions, 1 deletions
diff --git a/shared/src/shared/broker.py b/shared/src/shared/broker.py
index 9c6c4c6..c060c24 100644
--- a/shared/src/shared/broker.py
+++ b/shared/src/shared/broker.py
@@ -17,6 +17,70 @@ class RedisBroker:
payload = json.dumps(data)
await self._redis.xadd(stream, {"payload": payload})
+ async def ensure_group(self, stream: str, group: str) -> None:
+ """Create a consumer group if it doesn't exist."""
+ try:
+ await self._redis.xgroup_create(stream, group, id="0", mkstream=True)
+ except redis.ResponseError as e:
+ if "BUSYGROUP" not in str(e):
+ raise
+
+ async def read_group(
+ self,
+ stream: str,
+ group: str,
+ consumer: str,
+ count: int = 10,
+ block: int = 0,
+ ) -> list[tuple[str, dict[str, Any]]]:
+ """Read messages from a consumer group. Returns list of (message_id, data)."""
+ results = await self._redis.xreadgroup(
+ group, consumer, {stream: ">"}, count=count, block=block
+ )
+ messages = []
+ if results:
+ for _stream, entries in results:
+ for msg_id, fields in entries:
+ payload = fields.get(b"payload") or fields.get("payload")
+ if payload:
+ if isinstance(payload, bytes):
+ payload = payload.decode()
+ if isinstance(msg_id, bytes):
+ msg_id = msg_id.decode()
+ messages.append((msg_id, json.loads(payload)))
+ return messages
+
+ async def ack(self, stream: str, group: str, *msg_ids: str) -> None:
+ """Acknowledge messages in a consumer group."""
+ if msg_ids:
+ await self._redis.xack(stream, group, *msg_ids)
+
+ async def read_pending(
+ self,
+ stream: str,
+ group: str,
+ consumer: str,
+ count: int = 10,
+ ) -> list[tuple[str, dict[str, Any]]]:
+ """Read pending (unacknowledged) messages for this consumer."""
+ results = await self._redis.xreadgroup(
+ group, consumer, {stream: "0"}, count=count
+ )
+ messages = []
+ if results:
+ for _stream, entries in results:
+ for msg_id, fields in entries:
+ if not fields: # Empty fields means already acknowledged
+ continue
+ payload = fields.get(b"payload") or fields.get("payload")
+ if payload:
+ if isinstance(payload, bytes):
+ payload = payload.decode()
+ if isinstance(msg_id, bytes):
+ msg_id = msg_id.decode()
+ messages.append((msg_id, json.loads(payload)))
+ return messages
+
async def read(
self,
stream: str,
@@ -24,7 +88,7 @@ class RedisBroker:
count: int = 10,
block: int = 0,
) -> list[dict[str, Any]]:
- """Read messages from a Redis stream."""
+ """Read messages (original method, kept for backward compatibility)."""
results = await self._redis.xread({stream: last_id}, count=count, block=block)
messages = []
if results: