"""Tests for the Redis broker.""" import pytest import json import redis from unittest.mock import AsyncMock, patch @pytest.mark.asyncio async def test_broker_publish(): """Test that publish calls xadd on the redis connection.""" with patch("redis.asyncio.from_url") as mock_from_url: mock_redis = AsyncMock() mock_from_url.return_value = mock_redis from shared.broker import RedisBroker broker = RedisBroker("redis://localhost:6379") data = {"type": "CANDLE", "symbol": "BTCUSDT"} await broker.publish("candles", data) mock_redis.xadd.assert_called_once() call_args = mock_redis.xadd.call_args assert call_args[0][0] == "candles" payload = call_args[0][1] assert "payload" in payload parsed = json.loads(payload["payload"]) assert parsed["type"] == "CANDLE" @pytest.mark.asyncio async def test_broker_subscribe_returns_messages(): """Test that read parses xread response correctly.""" with patch("redis.asyncio.from_url") as mock_from_url: mock_redis = AsyncMock() mock_from_url.return_value = mock_redis payload_data = {"type": "CANDLE", "symbol": "ETHUSDT"} mock_redis.xread.return_value = [ [ b"candles", [ (b"1234567890-0", {b"payload": json.dumps(payload_data).encode()}), ], ] ] from shared.broker import RedisBroker broker = RedisBroker("redis://localhost:6379") messages = await broker.read("candles", last_id="$") mock_redis.xread.assert_called_once() assert len(messages) == 1 assert messages[0]["type"] == "CANDLE" assert messages[0]["symbol"] == "ETHUSDT" @pytest.mark.asyncio async def test_broker_close(): """Test that close calls aclose on the redis connection.""" with patch("redis.asyncio.from_url") as mock_from_url: mock_redis = AsyncMock() mock_from_url.return_value = mock_redis from shared.broker import RedisBroker broker = RedisBroker("redis://localhost:6379") await broker.close() mock_redis.aclose.assert_called_once() @pytest.mark.asyncio async def test_broker_ensure_group(): """Test that ensure_group creates a consumer group.""" from shared.broker import RedisBroker mock_redis = AsyncMock() broker = RedisBroker.__new__(RedisBroker) broker._redis = mock_redis await broker.ensure_group("test-stream", "test-group") mock_redis.xgroup_create.assert_called_once_with( "test-stream", "test-group", id="0", mkstream=True ) @pytest.mark.asyncio async def test_broker_ensure_group_already_exists(): """Test that ensure_group ignores BUSYGROUP error.""" from shared.broker import RedisBroker mock_redis = AsyncMock() mock_redis.xgroup_create = AsyncMock( side_effect=redis.ResponseError("BUSYGROUP Consumer Group name already exists") ) broker = RedisBroker.__new__(RedisBroker) broker._redis = mock_redis # Should not raise await broker.ensure_group("test-stream", "test-group") @pytest.mark.asyncio async def test_broker_read_group(): """Test that read_group parses xreadgroup response correctly.""" from shared.broker import RedisBroker mock_redis = AsyncMock() mock_redis.xreadgroup = AsyncMock( return_value=[ (b"stream", [(b"1-0", {b"payload": b'{"type": "test"}'})]) ] ) broker = RedisBroker.__new__(RedisBroker) broker._redis = mock_redis messages = await broker.read_group("stream", "group", "consumer") assert len(messages) == 1 assert messages[0][0] == "1-0" assert messages[0][1] == {"type": "test"} @pytest.mark.asyncio async def test_broker_ack(): """Test that ack calls xack on the redis connection.""" from shared.broker import RedisBroker mock_redis = AsyncMock() broker = RedisBroker.__new__(RedisBroker) broker._redis = mock_redis await broker.ack("stream", "group", "1-0", "2-0") mock_redis.xack.assert_called_once_with("stream", "group", "1-0", "2-0") @pytest.mark.asyncio async def test_broker_read_pending(): """Test that read_pending reads unacknowledged messages.""" from shared.broker import RedisBroker mock_redis = AsyncMock() mock_redis.xreadgroup = AsyncMock( return_value=[ (b"stream", [(b"1-0", {b"payload": b'{"type": "pending"}'})]) ] ) broker = RedisBroker.__new__(RedisBroker) broker._redis = mock_redis messages = await broker.read_pending("stream", "group", "consumer") assert len(messages) == 1 assert messages[0][0] == "1-0" assert messages[0][1] == {"type": "pending"} # Verify it uses "0" (not ">") to read pending mock_redis.xreadgroup.assert_called_once_with( "group", "consumer", {"stream": "0"}, count=10 ) @pytest.mark.asyncio async def test_broker_read_pending_skips_empty_fields(): """Test that read_pending skips already-acknowledged entries with empty fields.""" from shared.broker import RedisBroker mock_redis = AsyncMock() mock_redis.xreadgroup = AsyncMock( return_value=[ (b"stream", [(b"1-0", {})]) ] ) broker = RedisBroker.__new__(RedisBroker) broker._redis = mock_redis messages = await broker.read_pending("stream", "group", "consumer") assert len(messages) == 0 @pytest.mark.asyncio async def test_broker_ack_no_ids(): """Test that ack does nothing when no message IDs are provided.""" from shared.broker import RedisBroker mock_redis = AsyncMock() broker = RedisBroker.__new__(RedisBroker) broker._redis = mock_redis await broker.ack("stream", "group") mock_redis.xack.assert_not_called()