blob: ea8b148e6348bf196c92e29afe2e453436ea8960 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
"""Tests for the Redis broker."""
import pytest
import json
from unittest.mock import AsyncMock, patch
@pytest.mark.asyncio
async def test_broker_publish():
"""Test that publish calls xadd on the redis connection."""
with patch("redis.asyncio.from_url") as mock_from_url:
mock_redis = AsyncMock()
mock_from_url.return_value = mock_redis
from shared.broker import RedisBroker
broker = RedisBroker("redis://localhost:6379")
data = {"type": "CANDLE", "symbol": "BTCUSDT"}
await broker.publish("candles", data)
mock_redis.xadd.assert_called_once()
call_args = mock_redis.xadd.call_args
assert call_args[0][0] == "candles"
payload = call_args[0][1]
assert "payload" in payload
parsed = json.loads(payload["payload"])
assert parsed["type"] == "CANDLE"
@pytest.mark.asyncio
async def test_broker_subscribe_returns_messages():
"""Test that read parses xread response correctly."""
with patch("redis.asyncio.from_url") as mock_from_url:
mock_redis = AsyncMock()
mock_from_url.return_value = mock_redis
payload_data = {"type": "CANDLE", "symbol": "ETHUSDT"}
mock_redis.xread.return_value = [
[
b"candles",
[
(b"1234567890-0", {b"payload": json.dumps(payload_data).encode()}),
],
]
]
from shared.broker import RedisBroker
broker = RedisBroker("redis://localhost:6379")
messages = await broker.read("candles", last_id="$")
mock_redis.xread.assert_called_once()
assert len(messages) == 1
assert messages[0]["type"] == "CANDLE"
assert messages[0]["symbol"] == "ETHUSDT"
@pytest.mark.asyncio
async def test_broker_close():
"""Test that close calls aclose on the redis connection."""
with patch("redis.asyncio.from_url") as mock_from_url:
mock_redis = AsyncMock()
mock_from_url.return_value = mock_redis
from shared.broker import RedisBroker
broker = RedisBroker("redis://localhost:6379")
await broker.close()
mock_redis.aclose.assert_called_once()
|