summaryrefslogtreecommitdiff
path: root/shared/tests/test_broker.py
blob: c33f6ecd16e94cf0fa241dff13223c50826539f3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
"""Tests for the Redis broker."""

import pytest
import json
import redis
from unittest.mock import AsyncMock, patch


@pytest.mark.asyncio
async def test_broker_publish():
    """Test that publish calls xadd on the redis connection."""
    with patch("redis.asyncio.from_url") as mock_from_url:
        mock_redis = AsyncMock()
        mock_from_url.return_value = mock_redis

        from shared.broker import RedisBroker

        broker = RedisBroker("redis://localhost:6379")
        data = {"type": "CANDLE", "symbol": "BTCUSDT"}
        await broker.publish("candles", data)

        mock_redis.xadd.assert_called_once()
        call_args = mock_redis.xadd.call_args
        assert call_args[0][0] == "candles"
        payload = call_args[0][1]
        assert "payload" in payload
        parsed = json.loads(payload["payload"])
        assert parsed["type"] == "CANDLE"


@pytest.mark.asyncio
async def test_broker_subscribe_returns_messages():
    """Test that read parses xread response correctly."""
    with patch("redis.asyncio.from_url") as mock_from_url:
        mock_redis = AsyncMock()
        mock_from_url.return_value = mock_redis

        payload_data = {"type": "CANDLE", "symbol": "ETHUSDT"}
        mock_redis.xread.return_value = [
            [
                b"candles",
                [
                    (b"1234567890-0", {b"payload": json.dumps(payload_data).encode()}),
                ],
            ]
        ]

        from shared.broker import RedisBroker

        broker = RedisBroker("redis://localhost:6379")
        messages = await broker.read("candles", last_id="$")

        mock_redis.xread.assert_called_once()
        assert len(messages) == 1
        assert messages[0]["type"] == "CANDLE"
        assert messages[0]["symbol"] == "ETHUSDT"


@pytest.mark.asyncio
async def test_broker_close():
    """Test that close calls aclose on the redis connection."""
    with patch("redis.asyncio.from_url") as mock_from_url:
        mock_redis = AsyncMock()
        mock_from_url.return_value = mock_redis

        from shared.broker import RedisBroker

        broker = RedisBroker("redis://localhost:6379")
        await broker.close()

        mock_redis.aclose.assert_called_once()


@pytest.mark.asyncio
async def test_broker_ensure_group():
    """Test that ensure_group creates a consumer group."""
    from shared.broker import RedisBroker

    mock_redis = AsyncMock()
    broker = RedisBroker.__new__(RedisBroker)
    broker._redis = mock_redis

    await broker.ensure_group("test-stream", "test-group")
    mock_redis.xgroup_create.assert_called_once_with(
        "test-stream", "test-group", id="0", mkstream=True
    )


@pytest.mark.asyncio
async def test_broker_ensure_group_already_exists():
    """Test that ensure_group ignores BUSYGROUP error."""
    from shared.broker import RedisBroker

    mock_redis = AsyncMock()
    mock_redis.xgroup_create = AsyncMock(
        side_effect=redis.ResponseError("BUSYGROUP Consumer Group name already exists")
    )
    broker = RedisBroker.__new__(RedisBroker)
    broker._redis = mock_redis

    # Should not raise
    await broker.ensure_group("test-stream", "test-group")


@pytest.mark.asyncio
async def test_broker_read_group():
    """Test that read_group parses xreadgroup response correctly."""
    from shared.broker import RedisBroker

    mock_redis = AsyncMock()
    mock_redis.xreadgroup = AsyncMock(
        return_value=[
            (b"stream", [(b"1-0", {b"payload": b'{"type": "test"}'})])
        ]
    )
    broker = RedisBroker.__new__(RedisBroker)
    broker._redis = mock_redis

    messages = await broker.read_group("stream", "group", "consumer")
    assert len(messages) == 1
    assert messages[0][0] == "1-0"
    assert messages[0][1] == {"type": "test"}


@pytest.mark.asyncio
async def test_broker_ack():
    """Test that ack calls xack on the redis connection."""
    from shared.broker import RedisBroker

    mock_redis = AsyncMock()
    broker = RedisBroker.__new__(RedisBroker)
    broker._redis = mock_redis

    await broker.ack("stream", "group", "1-0", "2-0")
    mock_redis.xack.assert_called_once_with("stream", "group", "1-0", "2-0")


@pytest.mark.asyncio
async def test_broker_read_pending():
    """Test that read_pending reads unacknowledged messages."""
    from shared.broker import RedisBroker

    mock_redis = AsyncMock()
    mock_redis.xreadgroup = AsyncMock(
        return_value=[
            (b"stream", [(b"1-0", {b"payload": b'{"type": "pending"}'})])
        ]
    )
    broker = RedisBroker.__new__(RedisBroker)
    broker._redis = mock_redis

    messages = await broker.read_pending("stream", "group", "consumer")
    assert len(messages) == 1
    assert messages[0][0] == "1-0"
    assert messages[0][1] == {"type": "pending"}
    # Verify it uses "0" (not ">") to read pending
    mock_redis.xreadgroup.assert_called_once_with(
        "group", "consumer", {"stream": "0"}, count=10
    )


@pytest.mark.asyncio
async def test_broker_read_pending_skips_empty_fields():
    """Test that read_pending skips already-acknowledged entries with empty fields."""
    from shared.broker import RedisBroker

    mock_redis = AsyncMock()
    mock_redis.xreadgroup = AsyncMock(
        return_value=[
            (b"stream", [(b"1-0", {})])
        ]
    )
    broker = RedisBroker.__new__(RedisBroker)
    broker._redis = mock_redis

    messages = await broker.read_pending("stream", "group", "consumer")
    assert len(messages) == 0


@pytest.mark.asyncio
async def test_broker_ack_no_ids():
    """Test that ack does nothing when no message IDs are provided."""
    from shared.broker import RedisBroker

    mock_redis = AsyncMock()
    broker = RedisBroker.__new__(RedisBroker)
    broker._redis = mock_redis

    await broker.ack("stream", "group")
    mock_redis.xack.assert_not_called()