From be7dc5311328d5d4bcb16cd613bcc88c26eaffa2 Mon Sep 17 00:00:00 2001 From: TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> Date: Thu, 2 Apr 2026 15:33:50 +0900 Subject: feat: add retry and resilience to Redis broker with keepalive --- shared/src/shared/broker.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) (limited to 'shared/src') diff --git a/shared/src/shared/broker.py b/shared/src/shared/broker.py index fbe4576..2b96714 100644 --- a/shared/src/shared/broker.py +++ b/shared/src/shared/broker.py @@ -5,13 +5,21 @@ from typing import Any import redis.asyncio +from shared.resilience import retry_async + class RedisBroker: """Async Redis Streams broker for publishing and reading events.""" def __init__(self, redis_url: str) -> None: - self._redis = redis.asyncio.from_url(redis_url) + self._redis = redis.asyncio.from_url( + redis_url, + socket_keepalive=True, + health_check_interval=30, + retry_on_timeout=True, + ) + @retry_async(max_retries=3, base_delay=0.5, exclude=(ValueError,)) async def publish(self, stream: str, data: dict[str, Any]) -> None: """Publish a message to a Redis stream.""" payload = json.dumps(data) @@ -25,6 +33,7 @@ class RedisBroker: if "BUSYGROUP" not in str(e): raise + @retry_async(max_retries=3, base_delay=0.5, exclude=(ValueError,)) async def read_group( self, stream: str, @@ -99,6 +108,7 @@ class RedisBroker: messages.append(json.loads(payload)) return messages + @retry_async(max_retries=2, base_delay=0.5) async def ping(self) -> bool: """Ping the Redis server; return True if reachable.""" return await self._redis.ping() -- cgit v1.2.3