diff options
| author | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-02 15:33:50 +0900 |
|---|---|---|
| committer | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-02 15:33:50 +0900 |
| commit | be7dc5311328d5d4bcb16cd613bcc88c26eaffa2 (patch) | |
| tree | 9299e55f84d6b0fdd8da0517c300b53eccaafa3e | |
| parent | 78d48f99f93b6738f4239c3f4bb718d3aa5cbb56 (diff) | |
feat: add retry and resilience to Redis broker with keepalive
| -rw-r--r-- | shared/src/shared/broker.py | 12 |
1 files changed, 11 insertions, 1 deletions
diff --git a/shared/src/shared/broker.py b/shared/src/shared/broker.py index fbe4576..2b96714 100644 --- a/shared/src/shared/broker.py +++ b/shared/src/shared/broker.py @@ -5,13 +5,21 @@ from typing import Any import redis.asyncio +from shared.resilience import retry_async + class RedisBroker: """Async Redis Streams broker for publishing and reading events.""" def __init__(self, redis_url: str) -> None: - self._redis = redis.asyncio.from_url(redis_url) + self._redis = redis.asyncio.from_url( + redis_url, + socket_keepalive=True, + health_check_interval=30, + retry_on_timeout=True, + ) + @retry_async(max_retries=3, base_delay=0.5, exclude=(ValueError,)) async def publish(self, stream: str, data: dict[str, Any]) -> None: """Publish a message to a Redis stream.""" payload = json.dumps(data) @@ -25,6 +33,7 @@ class RedisBroker: if "BUSYGROUP" not in str(e): raise + @retry_async(max_retries=3, base_delay=0.5, exclude=(ValueError,)) async def read_group( self, stream: str, @@ -99,6 +108,7 @@ class RedisBroker: messages.append(json.loads(payload)) return messages + @retry_async(max_retries=2, base_delay=0.5) async def ping(self) -> bool: """Ping the Redis server; return True if reachable.""" return await self._redis.ping() |
