"""Tests for TelegramNotifier failure modes.""" import asyncio import logging import sys from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch import aiohttp import pytest sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "services" / "strategy-engine")) sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "services" / "backtester" / "src")) from shared.notifier import TelegramNotifier @pytest.fixture def notifier(): return TelegramNotifier(bot_token="fake-token", chat_id="12345") class TestSendConnectionError: """TelegramNotifier.send() when session.post raises ConnectionError should not crash.""" @pytest.mark.asyncio async def test_connection_error_does_not_crash(self, notifier, caplog): mock_session = AsyncMock(spec=aiohttp.ClientSession) mock_session.closed = False mock_session.post = MagicMock(side_effect=aiohttp.ClientError("Connection refused")) notifier._session = mock_session with caplog.at_level(logging.WARNING): await notifier.send("test message") # Should not raise, should log the error class TestSendRateLimited: """TelegramNotifier.send() when API returns 429 should retry.""" @pytest.mark.asyncio async def test_rate_limit_retries(self, notifier): mock_response_429 = AsyncMock() mock_response_429.status = 429 mock_response_429.json = AsyncMock(return_value={"description": "Too Many Requests"}) mock_response_429.__aenter__ = AsyncMock(return_value=mock_response_429) mock_response_429.__aexit__ = AsyncMock(return_value=False) mock_response_200 = AsyncMock() mock_response_200.status = 200 mock_response_200.__aenter__ = AsyncMock(return_value=mock_response_200) mock_response_200.__aexit__ = AsyncMock(return_value=False) mock_session = AsyncMock(spec=aiohttp.ClientSession) mock_session.closed = False # First two calls return 429, third returns 200 mock_session.post = MagicMock( side_effect=[mock_response_429, mock_response_429, mock_response_200] ) notifier._session = mock_session with patch("shared.notifier.asyncio.sleep", new_callable=AsyncMock): await notifier.send("test message") # Should have been called 3 times (2 retries + 1 success) assert mock_session.post.call_count == 3 class TestCloseAlreadyClosed: """TelegramNotifier.close() when session already closed should not crash.""" @pytest.mark.asyncio async def test_close_no_session(self): notifier = TelegramNotifier(bot_token="fake", chat_id="123") # No session created yet await notifier.close() # Should not raise @pytest.mark.asyncio async def test_close_already_closed_session(self, notifier): mock_session = AsyncMock(spec=aiohttp.ClientSession) mock_session.close = AsyncMock() notifier._session = mock_session await notifier.close() await notifier.close() # Second close should not crash