summaryrefslogtreecommitdiff
path: root/shared
diff options
context:
space:
mode:
authorTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-02 15:33:50 +0900
committerTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-02 15:33:50 +0900
commitbe7dc5311328d5d4bcb16cd613bcc88c26eaffa2 (patch)
tree9299e55f84d6b0fdd8da0517c300b53eccaafa3e /shared
parent78d48f99f93b6738f4239c3f4bb718d3aa5cbb56 (diff)
feat: add retry and resilience to Redis broker with keepalive
Diffstat (limited to 'shared')
-rw-r--r--shared/src/shared/broker.py12
1 files changed, 11 insertions, 1 deletions
diff --git a/shared/src/shared/broker.py b/shared/src/shared/broker.py
index fbe4576..2b96714 100644
--- a/shared/src/shared/broker.py
+++ b/shared/src/shared/broker.py
@@ -5,13 +5,21 @@ from typing import Any
import redis.asyncio
+from shared.resilience import retry_async
+
class RedisBroker:
"""Async Redis Streams broker for publishing and reading events."""
def __init__(self, redis_url: str) -> None:
- self._redis = redis.asyncio.from_url(redis_url)
+ self._redis = redis.asyncio.from_url(
+ redis_url,
+ socket_keepalive=True,
+ health_check_interval=30,
+ retry_on_timeout=True,
+ )
+ @retry_async(max_retries=3, base_delay=0.5, exclude=(ValueError,))
async def publish(self, stream: str, data: dict[str, Any]) -> None:
"""Publish a message to a Redis stream."""
payload = json.dumps(data)
@@ -25,6 +33,7 @@ class RedisBroker:
if "BUSYGROUP" not in str(e):
raise
+ @retry_async(max_retries=3, base_delay=0.5, exclude=(ValueError,))
async def read_group(
self,
stream: str,
@@ -99,6 +108,7 @@ class RedisBroker:
messages.append(json.loads(payload))
return messages
+ @retry_async(max_retries=2, base_delay=0.5)
async def ping(self) -> bool:
"""Ping the Redis server; return True if reachable."""
return await self._redis.ping()