1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
"""Tests for TelegramNotifier failure modes."""
import logging
import sys
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import aiohttp
import pytest
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "services" / "strategy-engine"))
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "services" / "backtester" / "src"))
from shared.notifier import TelegramNotifier
@pytest.fixture
def notifier():
return TelegramNotifier(bot_token="fake-token", chat_id="12345")
class TestSendConnectionError:
"""TelegramNotifier.send() when session.post raises ConnectionError should not crash."""
@pytest.mark.asyncio
async def test_connection_error_does_not_crash(self, notifier, caplog):
mock_session = AsyncMock(spec=aiohttp.ClientSession)
mock_session.closed = False
mock_session.post = MagicMock(side_effect=aiohttp.ClientError("Connection refused"))
notifier._session = mock_session
with caplog.at_level(logging.WARNING):
await notifier.send("test message")
# Should not raise, should log the error
class TestSendRateLimited:
"""TelegramNotifier.send() when API returns 429 should retry."""
@pytest.mark.asyncio
async def test_rate_limit_retries(self, notifier):
mock_response_429 = AsyncMock()
mock_response_429.status = 429
mock_response_429.json = AsyncMock(return_value={"description": "Too Many Requests"})
mock_response_429.__aenter__ = AsyncMock(return_value=mock_response_429)
mock_response_429.__aexit__ = AsyncMock(return_value=False)
mock_response_200 = AsyncMock()
mock_response_200.status = 200
mock_response_200.__aenter__ = AsyncMock(return_value=mock_response_200)
mock_response_200.__aexit__ = AsyncMock(return_value=False)
mock_session = AsyncMock(spec=aiohttp.ClientSession)
mock_session.closed = False
# First two calls return 429, third returns 200
mock_session.post = MagicMock(
side_effect=[mock_response_429, mock_response_429, mock_response_200]
)
notifier._session = mock_session
with patch("shared.notifier.asyncio.sleep", new_callable=AsyncMock):
await notifier.send("test message")
# Should have been called 3 times (2 retries + 1 success)
assert mock_session.post.call_count == 3
class TestCloseAlreadyClosed:
"""TelegramNotifier.close() when session already closed should not crash."""
@pytest.mark.asyncio
async def test_close_no_session(self):
notifier = TelegramNotifier(bot_token="fake", chat_id="123")
# No session created yet
await notifier.close() # Should not raise
@pytest.mark.asyncio
async def test_close_already_closed_session(self, notifier):
mock_session = AsyncMock(spec=aiohttp.ClientSession)
mock_session.close = AsyncMock()
notifier._session = mock_session
await notifier.close()
await notifier.close() # Second close should not crash
|