summaryrefslogtreecommitdiff
path: root/shared/tests
diff options
context:
space:
mode:
authorTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 17:56:50 +0900
committerTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 17:56:50 +0900
commit13a9b2c80bb3eb1353cf2d49bdbf7d0dbd858ccc (patch)
tree4595b83f1ba4fe5d1bdf4694f53496120956085a /shared/tests
parentfa7e1dc44787592da647bdda0a63310be0cfcc8b (diff)
feat(broker): add Redis consumer groups for reliable message processing
Diffstat (limited to 'shared/tests')
-rw-r--r--shared/tests/test_broker.py120
1 files changed, 120 insertions, 0 deletions
diff --git a/shared/tests/test_broker.py b/shared/tests/test_broker.py
index ea8b148..c33f6ec 100644
--- a/shared/tests/test_broker.py
+++ b/shared/tests/test_broker.py
@@ -2,6 +2,7 @@
import pytest
import json
+import redis
from unittest.mock import AsyncMock, patch
@@ -68,3 +69,122 @@ async def test_broker_close():
await broker.close()
mock_redis.aclose.assert_called_once()
+
+
+@pytest.mark.asyncio
+async def test_broker_ensure_group():
+ """Test that ensure_group creates a consumer group."""
+ from shared.broker import RedisBroker
+
+ mock_redis = AsyncMock()
+ broker = RedisBroker.__new__(RedisBroker)
+ broker._redis = mock_redis
+
+ await broker.ensure_group("test-stream", "test-group")
+ mock_redis.xgroup_create.assert_called_once_with(
+ "test-stream", "test-group", id="0", mkstream=True
+ )
+
+
+@pytest.mark.asyncio
+async def test_broker_ensure_group_already_exists():
+ """Test that ensure_group ignores BUSYGROUP error."""
+ from shared.broker import RedisBroker
+
+ mock_redis = AsyncMock()
+ mock_redis.xgroup_create = AsyncMock(
+ side_effect=redis.ResponseError("BUSYGROUP Consumer Group name already exists")
+ )
+ broker = RedisBroker.__new__(RedisBroker)
+ broker._redis = mock_redis
+
+ # Should not raise
+ await broker.ensure_group("test-stream", "test-group")
+
+
+@pytest.mark.asyncio
+async def test_broker_read_group():
+ """Test that read_group parses xreadgroup response correctly."""
+ from shared.broker import RedisBroker
+
+ mock_redis = AsyncMock()
+ mock_redis.xreadgroup = AsyncMock(
+ return_value=[
+ (b"stream", [(b"1-0", {b"payload": b'{"type": "test"}'})])
+ ]
+ )
+ broker = RedisBroker.__new__(RedisBroker)
+ broker._redis = mock_redis
+
+ messages = await broker.read_group("stream", "group", "consumer")
+ assert len(messages) == 1
+ assert messages[0][0] == "1-0"
+ assert messages[0][1] == {"type": "test"}
+
+
+@pytest.mark.asyncio
+async def test_broker_ack():
+ """Test that ack calls xack on the redis connection."""
+ from shared.broker import RedisBroker
+
+ mock_redis = AsyncMock()
+ broker = RedisBroker.__new__(RedisBroker)
+ broker._redis = mock_redis
+
+ await broker.ack("stream", "group", "1-0", "2-0")
+ mock_redis.xack.assert_called_once_with("stream", "group", "1-0", "2-0")
+
+
+@pytest.mark.asyncio
+async def test_broker_read_pending():
+ """Test that read_pending reads unacknowledged messages."""
+ from shared.broker import RedisBroker
+
+ mock_redis = AsyncMock()
+ mock_redis.xreadgroup = AsyncMock(
+ return_value=[
+ (b"stream", [(b"1-0", {b"payload": b'{"type": "pending"}'})])
+ ]
+ )
+ broker = RedisBroker.__new__(RedisBroker)
+ broker._redis = mock_redis
+
+ messages = await broker.read_pending("stream", "group", "consumer")
+ assert len(messages) == 1
+ assert messages[0][0] == "1-0"
+ assert messages[0][1] == {"type": "pending"}
+ # Verify it uses "0" (not ">") to read pending
+ mock_redis.xreadgroup.assert_called_once_with(
+ "group", "consumer", {"stream": "0"}, count=10
+ )
+
+
+@pytest.mark.asyncio
+async def test_broker_read_pending_skips_empty_fields():
+ """Test that read_pending skips already-acknowledged entries with empty fields."""
+ from shared.broker import RedisBroker
+
+ mock_redis = AsyncMock()
+ mock_redis.xreadgroup = AsyncMock(
+ return_value=[
+ (b"stream", [(b"1-0", {})])
+ ]
+ )
+ broker = RedisBroker.__new__(RedisBroker)
+ broker._redis = mock_redis
+
+ messages = await broker.read_pending("stream", "group", "consumer")
+ assert len(messages) == 0
+
+
+@pytest.mark.asyncio
+async def test_broker_ack_no_ids():
+ """Test that ack does nothing when no message IDs are provided."""
+ from shared.broker import RedisBroker
+
+ mock_redis = AsyncMock()
+ broker = RedisBroker.__new__(RedisBroker)
+ broker._redis = mock_redis
+
+ await broker.ack("stream", "group")
+ mock_redis.xack.assert_not_called()