diff options
| author | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-01 17:56:50 +0900 |
|---|---|---|
| committer | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-01 17:56:50 +0900 |
| commit | 13a9b2c80bb3eb1353cf2d49bdbf7d0dbd858ccc (patch) | |
| tree | 4595b83f1ba4fe5d1bdf4694f53496120956085a /shared/src | |
| parent | fa7e1dc44787592da647bdda0a63310be0cfcc8b (diff) | |
feat(broker): add Redis consumer groups for reliable message processing
Diffstat (limited to 'shared/src')
| -rw-r--r-- | shared/src/shared/broker.py | 66 |
1 files changed, 65 insertions, 1 deletions
diff --git a/shared/src/shared/broker.py b/shared/src/shared/broker.py index 9c6c4c6..c060c24 100644 --- a/shared/src/shared/broker.py +++ b/shared/src/shared/broker.py @@ -17,6 +17,70 @@ class RedisBroker: payload = json.dumps(data) await self._redis.xadd(stream, {"payload": payload}) + async def ensure_group(self, stream: str, group: str) -> None: + """Create a consumer group if it doesn't exist.""" + try: + await self._redis.xgroup_create(stream, group, id="0", mkstream=True) + except redis.ResponseError as e: + if "BUSYGROUP" not in str(e): + raise + + async def read_group( + self, + stream: str, + group: str, + consumer: str, + count: int = 10, + block: int = 0, + ) -> list[tuple[str, dict[str, Any]]]: + """Read messages from a consumer group. Returns list of (message_id, data).""" + results = await self._redis.xreadgroup( + group, consumer, {stream: ">"}, count=count, block=block + ) + messages = [] + if results: + for _stream, entries in results: + for msg_id, fields in entries: + payload = fields.get(b"payload") or fields.get("payload") + if payload: + if isinstance(payload, bytes): + payload = payload.decode() + if isinstance(msg_id, bytes): + msg_id = msg_id.decode() + messages.append((msg_id, json.loads(payload))) + return messages + + async def ack(self, stream: str, group: str, *msg_ids: str) -> None: + """Acknowledge messages in a consumer group.""" + if msg_ids: + await self._redis.xack(stream, group, *msg_ids) + + async def read_pending( + self, + stream: str, + group: str, + consumer: str, + count: int = 10, + ) -> list[tuple[str, dict[str, Any]]]: + """Read pending (unacknowledged) messages for this consumer.""" + results = await self._redis.xreadgroup( + group, consumer, {stream: "0"}, count=count + ) + messages = [] + if results: + for _stream, entries in results: + for msg_id, fields in entries: + if not fields: # Empty fields means already acknowledged + continue + payload = fields.get(b"payload") or fields.get("payload") + if payload: + if isinstance(payload, bytes): + payload = payload.decode() + if isinstance(msg_id, bytes): + msg_id = msg_id.decode() + messages.append((msg_id, json.loads(payload))) + return messages + async def read( self, stream: str, @@ -24,7 +88,7 @@ class RedisBroker: count: int = 10, block: int = 0, ) -> list[dict[str, Any]]: - """Read messages from a Redis stream.""" + """Read messages (original method, kept for backward compatibility).""" results = await self._redis.xread({stream: last_id}, count=count, block=block) messages = [] if results: |
