diff options
Diffstat (limited to 'shared/tests/test_broker.py')
| -rw-r--r-- | shared/tests/test_broker.py | 120 |
1 files changed, 120 insertions, 0 deletions
diff --git a/shared/tests/test_broker.py b/shared/tests/test_broker.py index ea8b148..c33f6ec 100644 --- a/shared/tests/test_broker.py +++ b/shared/tests/test_broker.py @@ -2,6 +2,7 @@ import pytest import json +import redis from unittest.mock import AsyncMock, patch @@ -68,3 +69,122 @@ async def test_broker_close(): await broker.close() mock_redis.aclose.assert_called_once() + + +@pytest.mark.asyncio +async def test_broker_ensure_group(): + """Test that ensure_group creates a consumer group.""" + from shared.broker import RedisBroker + + mock_redis = AsyncMock() + broker = RedisBroker.__new__(RedisBroker) + broker._redis = mock_redis + + await broker.ensure_group("test-stream", "test-group") + mock_redis.xgroup_create.assert_called_once_with( + "test-stream", "test-group", id="0", mkstream=True + ) + + +@pytest.mark.asyncio +async def test_broker_ensure_group_already_exists(): + """Test that ensure_group ignores BUSYGROUP error.""" + from shared.broker import RedisBroker + + mock_redis = AsyncMock() + mock_redis.xgroup_create = AsyncMock( + side_effect=redis.ResponseError("BUSYGROUP Consumer Group name already exists") + ) + broker = RedisBroker.__new__(RedisBroker) + broker._redis = mock_redis + + # Should not raise + await broker.ensure_group("test-stream", "test-group") + + +@pytest.mark.asyncio +async def test_broker_read_group(): + """Test that read_group parses xreadgroup response correctly.""" + from shared.broker import RedisBroker + + mock_redis = AsyncMock() + mock_redis.xreadgroup = AsyncMock( + return_value=[ + (b"stream", [(b"1-0", {b"payload": b'{"type": "test"}'})]) + ] + ) + broker = RedisBroker.__new__(RedisBroker) + broker._redis = mock_redis + + messages = await broker.read_group("stream", "group", "consumer") + assert len(messages) == 1 + assert messages[0][0] == "1-0" + assert messages[0][1] == {"type": "test"} + + +@pytest.mark.asyncio +async def test_broker_ack(): + """Test that ack calls xack on the redis connection.""" + from shared.broker import RedisBroker + + mock_redis = AsyncMock() + broker = RedisBroker.__new__(RedisBroker) + broker._redis = mock_redis + + await broker.ack("stream", "group", "1-0", "2-0") + mock_redis.xack.assert_called_once_with("stream", "group", "1-0", "2-0") + + +@pytest.mark.asyncio +async def test_broker_read_pending(): + """Test that read_pending reads unacknowledged messages.""" + from shared.broker import RedisBroker + + mock_redis = AsyncMock() + mock_redis.xreadgroup = AsyncMock( + return_value=[ + (b"stream", [(b"1-0", {b"payload": b'{"type": "pending"}'})]) + ] + ) + broker = RedisBroker.__new__(RedisBroker) + broker._redis = mock_redis + + messages = await broker.read_pending("stream", "group", "consumer") + assert len(messages) == 1 + assert messages[0][0] == "1-0" + assert messages[0][1] == {"type": "pending"} + # Verify it uses "0" (not ">") to read pending + mock_redis.xreadgroup.assert_called_once_with( + "group", "consumer", {"stream": "0"}, count=10 + ) + + +@pytest.mark.asyncio +async def test_broker_read_pending_skips_empty_fields(): + """Test that read_pending skips already-acknowledged entries with empty fields.""" + from shared.broker import RedisBroker + + mock_redis = AsyncMock() + mock_redis.xreadgroup = AsyncMock( + return_value=[ + (b"stream", [(b"1-0", {})]) + ] + ) + broker = RedisBroker.__new__(RedisBroker) + broker._redis = mock_redis + + messages = await broker.read_pending("stream", "group", "consumer") + assert len(messages) == 0 + + +@pytest.mark.asyncio +async def test_broker_ack_no_ids(): + """Test that ack does nothing when no message IDs are provided.""" + from shared.broker import RedisBroker + + mock_redis = AsyncMock() + broker = RedisBroker.__new__(RedisBroker) + broker._redis = mock_redis + + await broker.ack("stream", "group") + mock_redis.xack.assert_not_called() |
