summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 17:11:51 +0900
committerTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 17:11:51 +0900
commit66368d580cf569b50a33e438f2287a977e6fc704 (patch)
tree8a3b9e538333abf4564846849affec1ef1279e05 /tests
parent2d1530f210f4b4f679a5d3b3597c4815904398a7 (diff)
test: add edge case tests for zero volume, empty data, extreme values
Diffstat (limited to 'tests')
-rw-r--r--tests/edge_cases/__init__.py0
-rw-r--r--tests/edge_cases/test_empty_data.py113
-rw-r--r--tests/edge_cases/test_extreme_values.py172
-rw-r--r--tests/edge_cases/test_notifier_failures.py85
-rw-r--r--tests/edge_cases/test_strategy_reset.py141
-rw-r--r--tests/edge_cases/test_zero_volume.py79
6 files changed, 590 insertions, 0 deletions
diff --git a/tests/edge_cases/__init__.py b/tests/edge_cases/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/edge_cases/__init__.py
diff --git a/tests/edge_cases/test_empty_data.py b/tests/edge_cases/test_empty_data.py
new file mode 100644
index 0000000..2449b90
--- /dev/null
+++ b/tests/edge_cases/test_empty_data.py
@@ -0,0 +1,113 @@
+"""Tests for empty/zero data edge cases."""
+
+import sys
+from datetime import datetime, timezone, timedelta
+from decimal import Decimal
+from pathlib import Path
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "services" / "strategy-engine"))
+sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "services" / "backtester" / "src"))
+sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "services" / "order-executor" / "src"))
+sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "services" / "portfolio-manager" / "src"))
+
+from shared.models import Candle, Signal, OrderSide, Position
+from backtester.engine import BacktestEngine
+from backtester.metrics import TradeRecord, compute_detailed_metrics
+from portfolio_manager.portfolio import PortfolioTracker
+from order_executor.risk_manager import RiskManager
+from strategies.rsi_strategy import RsiStrategy
+
+
+class TestBacktestEngineEmptyCandles:
+ """BacktestEngine.run([]) should return valid result with 0 trades."""
+
+ def test_run_empty_candles(self):
+ strategy = RsiStrategy()
+ engine = BacktestEngine(strategy, initial_balance=Decimal("10000"))
+ result = engine.run([])
+ assert result.total_trades == 0
+ assert result.initial_balance == Decimal("10000")
+ assert result.final_balance == Decimal("10000")
+ assert result.profit == Decimal("0")
+ assert result.symbol == ""
+
+
+class TestPortfolioTrackerEmpty:
+ """PortfolioTracker.get_all_positions() on fresh tracker returns empty list."""
+
+ def test_fresh_tracker_returns_empty(self):
+ tracker = PortfolioTracker()
+ positions = tracker.get_all_positions()
+ assert positions == []
+
+ def test_get_position_returns_none_for_unknown_symbol(self):
+ tracker = PortfolioTracker()
+ assert tracker.get_position("BTCUSDT") is None
+
+
+class TestRiskManagerZeroBalance:
+ """RiskManager.check with zero balance should reject BUY signals."""
+
+ def test_zero_balance_rejects_buy(self):
+ rm = RiskManager(
+ max_position_size=Decimal("0.5"),
+ stop_loss_pct=Decimal("5"),
+ daily_loss_limit_pct=Decimal("3"),
+ )
+ signal = Signal(
+ strategy="test",
+ symbol="BTCUSDT",
+ side=OrderSide.BUY,
+ price=Decimal("50000"),
+ quantity=Decimal("0.01"),
+ reason="test buy",
+ )
+ result = rm.check(
+ signal=signal,
+ balance=Decimal("0"),
+ positions={},
+ daily_pnl=Decimal("0"),
+ )
+ assert result.allowed is False
+
+ def test_zero_balance_allows_sell(self):
+ rm = RiskManager(
+ max_position_size=Decimal("0.5"),
+ stop_loss_pct=Decimal("5"),
+ daily_loss_limit_pct=Decimal("3"),
+ )
+ signal = Signal(
+ strategy="test",
+ symbol="BTCUSDT",
+ side=OrderSide.SELL,
+ price=Decimal("50000"),
+ quantity=Decimal("0.01"),
+ reason="test sell",
+ )
+ result = rm.check(
+ signal=signal,
+ balance=Decimal("0"),
+ positions={},
+ daily_pnl=Decimal("0"),
+ )
+ # SELL doesn't require balance, so should be allowed
+ assert result.allowed is True
+
+
+class TestComputeDetailedMetricsEmpty:
+ """compute_detailed_metrics with empty trades returns zeroed metrics."""
+
+ def test_empty_trades(self):
+ metrics = compute_detailed_metrics(
+ trades=[],
+ initial_balance=Decimal("10000"),
+ final_balance=Decimal("10000"),
+ )
+ assert metrics.total_return == 0.0
+ assert metrics.total_trades == 0
+ assert metrics.winning_trades == 0
+ assert metrics.losing_trades == 0
+ assert metrics.win_rate == 0.0
+ assert metrics.sharpe_ratio == 0.0
+ assert metrics.max_drawdown == 0.0
+ assert metrics.max_drawdown_duration == timedelta(0)
diff --git a/tests/edge_cases/test_extreme_values.py b/tests/edge_cases/test_extreme_values.py
new file mode 100644
index 0000000..fe9dc1a
--- /dev/null
+++ b/tests/edge_cases/test_extreme_values.py
@@ -0,0 +1,172 @@
+"""Tests for extreme value edge cases."""
+
+import sys
+from datetime import datetime, timezone
+from decimal import Decimal
+from pathlib import Path
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "services" / "strategy-engine"))
+sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "services" / "backtester" / "src"))
+sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "services" / "order-executor" / "src"))
+
+from shared.models import Candle, Signal, OrderSide
+from strategies.rsi_strategy import RsiStrategy
+from strategies.vwap_strategy import VwapStrategy
+from strategies.bollinger_strategy import BollingerStrategy
+from backtester.engine import BacktestEngine
+from backtester.simulator import OrderSimulator
+from order_executor.risk_manager import RiskManager
+
+
+def _candle(close: str, volume: str = "1000", idx: int = 0) -> Candle:
+ from datetime import timedelta
+ base = datetime(2025, 1, 1, tzinfo=timezone.utc)
+ return Candle(
+ symbol="BTCUSDT",
+ timeframe="1h",
+ open_time=base + timedelta(hours=idx),
+ open=Decimal(close),
+ high=Decimal(close),
+ low=Decimal(close),
+ close=Decimal(close),
+ volume=Decimal(volume),
+ )
+
+
+class TestZeroPriceCandle:
+ """Strategy should handle Decimal('0') price without crashing."""
+
+ def test_rsi_zero_price(self):
+ strategy = RsiStrategy()
+ for i in range(20):
+ strategy.on_candle(_candle("0", idx=i))
+ # Should not crash; RSI is undefined with constant price
+ result = strategy.on_candle(_candle("0", idx=20))
+ # With all-zero prices, diffs are zero, RSI is NaN -> returns None
+ assert result is None
+
+ def test_vwap_zero_price(self):
+ strategy = VwapStrategy()
+ for i in range(40):
+ result = strategy.on_candle(_candle("0", "100", idx=i))
+ # VWAP of zero-price candles is 0; deviation calc is 0/0 -> should handle gracefully
+ # This tests the division by vwap when vwap == 0
+ assert result is None
+
+
+class TestVeryLargePrice:
+ """Strategy should handle extremely large prices."""
+
+ def test_rsi_large_price(self):
+ strategy = RsiStrategy()
+ for i in range(20):
+ strategy.on_candle(_candle("999999999", idx=i))
+ result = strategy.on_candle(_candle("999999999", idx=20))
+ # Constant large price -> no signal
+ assert result is None
+
+ def test_bollinger_large_price(self):
+ strategy = BollingerStrategy()
+ for i in range(25):
+ strategy.on_candle(_candle("999999999", idx=i))
+ # Should not overflow; constant price -> std=0, bandwidth=0 -> skip
+ result = strategy.on_candle(_candle("999999999", idx=25))
+ assert result is None
+
+
+class TestZeroInitialBalance:
+ """Backtester with Decimal('0') initial balance."""
+
+ def test_backtest_zero_balance(self):
+ strategy = RsiStrategy()
+ engine = BacktestEngine(strategy, initial_balance=Decimal("0"))
+ candles = [_candle(str(100 - i), idx=i) for i in range(30)]
+ result = engine.run(candles)
+ # Cannot buy with 0 balance, so 0 trades
+ assert result.total_trades == 0
+ assert result.final_balance == Decimal("0")
+ assert result.profit_pct == Decimal("0")
+
+
+class TestOrderQuantityZero:
+ """Order with quantity=0."""
+
+ def test_simulator_zero_quantity_buy(self):
+ sim = OrderSimulator(initial_balance=Decimal("10000"))
+ signal = Signal(
+ strategy="test",
+ symbol="BTCUSDT",
+ side=OrderSide.BUY,
+ price=Decimal("50000"),
+ quantity=Decimal("0"),
+ reason="test zero qty",
+ )
+ result = sim.execute(signal)
+ # cost = 50000 * 0 = 0; this is technically valid (no cost)
+ assert result is True
+ # Balance unchanged
+ assert sim.balance == Decimal("10000")
+
+ def test_simulator_zero_quantity_sell(self):
+ sim = OrderSimulator(initial_balance=Decimal("10000"))
+ signal = Signal(
+ strategy="test",
+ symbol="BTCUSDT",
+ side=OrderSide.SELL,
+ price=Decimal("50000"),
+ quantity=Decimal("0"),
+ reason="test zero qty sell",
+ )
+ result = sim.execute(signal)
+ # No position to sell -> rejected
+ assert result is False
+
+
+class TestRiskManagerZeroDailyLossLimit:
+ """RiskManager with 0% daily loss limit should reject everything on any loss."""
+
+ def test_zero_daily_loss_limit_rejects_on_loss(self):
+ rm = RiskManager(
+ max_position_size=Decimal("0.5"),
+ stop_loss_pct=Decimal("5"),
+ daily_loss_limit_pct=Decimal("0"),
+ )
+ signal = Signal(
+ strategy="test",
+ symbol="BTCUSDT",
+ side=OrderSide.BUY,
+ price=Decimal("50000"),
+ quantity=Decimal("0.01"),
+ reason="test",
+ )
+ # Any negative pnl with 0% limit -> should reject
+ result = rm.check(
+ signal=signal,
+ balance=Decimal("10000"),
+ positions={},
+ daily_pnl=Decimal("-1"), # any loss at all
+ )
+ assert result.allowed is False
+
+ def test_zero_daily_loss_limit_allows_no_loss(self):
+ rm = RiskManager(
+ max_position_size=Decimal("0.5"),
+ stop_loss_pct=Decimal("5"),
+ daily_loss_limit_pct=Decimal("0"),
+ )
+ signal = Signal(
+ strategy="test",
+ symbol="BTCUSDT",
+ side=OrderSide.BUY,
+ price=Decimal("100"),
+ quantity=Decimal("0.01"),
+ reason="test",
+ )
+ # No loss -> should allow
+ result = rm.check(
+ signal=signal,
+ balance=Decimal("10000"),
+ positions={},
+ daily_pnl=Decimal("0"),
+ )
+ assert result.allowed is True
diff --git a/tests/edge_cases/test_notifier_failures.py b/tests/edge_cases/test_notifier_failures.py
new file mode 100644
index 0000000..4ba781f
--- /dev/null
+++ b/tests/edge_cases/test_notifier_failures.py
@@ -0,0 +1,85 @@
+"""Tests for TelegramNotifier failure modes."""
+
+import asyncio
+import logging
+import sys
+from pathlib import Path
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import aiohttp
+import pytest
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "services" / "strategy-engine"))
+sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "services" / "backtester" / "src"))
+
+from shared.notifier import TelegramNotifier
+
+
+@pytest.fixture
+def notifier():
+ return TelegramNotifier(bot_token="fake-token", chat_id="12345")
+
+
+class TestSendConnectionError:
+ """TelegramNotifier.send() when session.post raises ConnectionError should not crash."""
+
+ @pytest.mark.asyncio
+ async def test_connection_error_does_not_crash(self, notifier, caplog):
+ mock_session = AsyncMock(spec=aiohttp.ClientSession)
+ mock_session.closed = False
+ mock_session.post = MagicMock(side_effect=aiohttp.ClientError("Connection refused"))
+ notifier._session = mock_session
+
+ with caplog.at_level(logging.WARNING):
+ await notifier.send("test message")
+ # Should not raise, should log the error
+
+
+class TestSendRateLimited:
+ """TelegramNotifier.send() when API returns 429 should retry."""
+
+ @pytest.mark.asyncio
+ async def test_rate_limit_retries(self, notifier):
+ mock_response_429 = AsyncMock()
+ mock_response_429.status = 429
+ mock_response_429.json = AsyncMock(return_value={"description": "Too Many Requests"})
+ mock_response_429.__aenter__ = AsyncMock(return_value=mock_response_429)
+ mock_response_429.__aexit__ = AsyncMock(return_value=False)
+
+ mock_response_200 = AsyncMock()
+ mock_response_200.status = 200
+ mock_response_200.__aenter__ = AsyncMock(return_value=mock_response_200)
+ mock_response_200.__aexit__ = AsyncMock(return_value=False)
+
+ mock_session = AsyncMock(spec=aiohttp.ClientSession)
+ mock_session.closed = False
+ # First two calls return 429, third returns 200
+ mock_session.post = MagicMock(
+ side_effect=[mock_response_429, mock_response_429, mock_response_200]
+ )
+ notifier._session = mock_session
+
+ with patch("shared.notifier.asyncio.sleep", new_callable=AsyncMock):
+ await notifier.send("test message")
+
+ # Should have been called 3 times (2 retries + 1 success)
+ assert mock_session.post.call_count == 3
+
+
+class TestCloseAlreadyClosed:
+ """TelegramNotifier.close() when session already closed should not crash."""
+
+ @pytest.mark.asyncio
+ async def test_close_no_session(self):
+ notifier = TelegramNotifier(bot_token="fake", chat_id="123")
+ # No session created yet
+ await notifier.close() # Should not raise
+
+ @pytest.mark.asyncio
+ async def test_close_already_closed_session(self, notifier):
+ mock_session = AsyncMock(spec=aiohttp.ClientSession)
+ mock_session.close = AsyncMock()
+ notifier._session = mock_session
+
+ await notifier.close()
+ await notifier.close() # Second close should not crash
diff --git a/tests/edge_cases/test_strategy_reset.py b/tests/edge_cases/test_strategy_reset.py
new file mode 100644
index 0000000..f84adf0
--- /dev/null
+++ b/tests/edge_cases/test_strategy_reset.py
@@ -0,0 +1,141 @@
+"""Tests that strategy reset() properly clears internal state."""
+
+import sys
+from datetime import datetime, timezone
+from decimal import Decimal
+from pathlib import Path
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "services" / "strategy-engine"))
+sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "services" / "backtester" / "src"))
+
+from shared.models import Candle
+from strategies.rsi_strategy import RsiStrategy
+from strategies.grid_strategy import GridStrategy
+from strategies.macd_strategy import MacdStrategy
+from strategies.bollinger_strategy import BollingerStrategy
+from strategies.ema_crossover_strategy import EmaCrossoverStrategy
+from strategies.vwap_strategy import VwapStrategy
+from strategies.volume_profile_strategy import VolumeProfileStrategy
+
+
+def _make_candles(count: int, base_price: float = 100.0) -> list[Candle]:
+ """Generate a list of candles with slight price variation."""
+ candles = []
+ for i in range(count):
+ # Oscillating price to potentially trigger signals
+ price = base_price + (i % 10) - 5
+ candles.append(
+ Candle(
+ symbol="BTCUSDT",
+ timeframe="1h",
+ open_time=datetime(2025, 1, 1, i % 24, tzinfo=timezone.utc),
+ open=Decimal(str(price)),
+ high=Decimal(str(price + 1)),
+ low=Decimal(str(price - 1)),
+ close=Decimal(str(price)),
+ volume=Decimal("1000"),
+ )
+ )
+ return candles
+
+
+def _collect_signals(strategy, candles):
+ """Feed candles through a strategy and collect all signals."""
+ signals = []
+ for c in candles:
+ s = strategy.on_candle(c)
+ if s is not None:
+ signals.append(s)
+ return signals
+
+
+class TestRsiReset:
+ def test_reset_produces_same_signals(self):
+ strategy = RsiStrategy()
+ candles = _make_candles(50)
+ signals1 = _collect_signals(strategy, candles)
+ strategy.reset()
+ signals2 = _collect_signals(strategy, candles)
+ assert len(signals1) == len(signals2)
+ for s1, s2 in zip(signals1, signals2):
+ assert s1.side == s2.side
+ assert s1.price == s2.price
+
+
+class TestGridReset:
+ def test_reset_produces_same_signals(self):
+ strategy = GridStrategy()
+ strategy.configure({"lower_price": 90, "upper_price": 110, "grid_count": 5})
+ candles = _make_candles(50)
+ signals1 = _collect_signals(strategy, candles)
+ strategy.reset()
+ signals2 = _collect_signals(strategy, candles)
+ assert len(signals1) == len(signals2)
+ for s1, s2 in zip(signals1, signals2):
+ assert s1.side == s2.side
+ assert s1.price == s2.price
+
+
+class TestMacdReset:
+ def test_reset_produces_same_signals(self):
+ strategy = MacdStrategy()
+ candles = _make_candles(60)
+ signals1 = _collect_signals(strategy, candles)
+ strategy.reset()
+ signals2 = _collect_signals(strategy, candles)
+ assert len(signals1) == len(signals2)
+ for s1, s2 in zip(signals1, signals2):
+ assert s1.side == s2.side
+ assert s1.price == s2.price
+
+
+class TestBollingerReset:
+ def test_reset_produces_same_signals(self):
+ strategy = BollingerStrategy()
+ candles = _make_candles(50)
+ signals1 = _collect_signals(strategy, candles)
+ strategy.reset()
+ signals2 = _collect_signals(strategy, candles)
+ assert len(signals1) == len(signals2)
+ for s1, s2 in zip(signals1, signals2):
+ assert s1.side == s2.side
+ assert s1.price == s2.price
+
+
+class TestEmaCrossoverReset:
+ def test_reset_produces_same_signals(self):
+ strategy = EmaCrossoverStrategy()
+ candles = _make_candles(50)
+ signals1 = _collect_signals(strategy, candles)
+ strategy.reset()
+ signals2 = _collect_signals(strategy, candles)
+ assert len(signals1) == len(signals2)
+ for s1, s2 in zip(signals1, signals2):
+ assert s1.side == s2.side
+ assert s1.price == s2.price
+
+
+class TestVwapReset:
+ def test_reset_produces_same_signals(self):
+ strategy = VwapStrategy()
+ candles = _make_candles(50)
+ signals1 = _collect_signals(strategy, candles)
+ strategy.reset()
+ signals2 = _collect_signals(strategy, candles)
+ assert len(signals1) == len(signals2)
+ for s1, s2 in zip(signals1, signals2):
+ assert s1.side == s2.side
+ assert s1.price == s2.price
+
+
+class TestVolumeProfileReset:
+ def test_reset_produces_same_signals(self):
+ strategy = VolumeProfileStrategy()
+ candles = _make_candles(150)
+ signals1 = _collect_signals(strategy, candles)
+ strategy.reset()
+ signals2 = _collect_signals(strategy, candles)
+ assert len(signals1) == len(signals2)
+ for s1, s2 in zip(signals1, signals2):
+ assert s1.side == s2.side
+ assert s1.price == s2.price
diff --git a/tests/edge_cases/test_zero_volume.py b/tests/edge_cases/test_zero_volume.py
new file mode 100644
index 0000000..0aefa07
--- /dev/null
+++ b/tests/edge_cases/test_zero_volume.py
@@ -0,0 +1,79 @@
+"""Tests for strategies handling zero-volume candles gracefully."""
+
+import sys
+from datetime import datetime, timezone
+from decimal import Decimal
+from pathlib import Path
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "services" / "strategy-engine"))
+sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "services" / "backtester" / "src"))
+
+from shared.models import Candle
+from strategies.vwap_strategy import VwapStrategy
+from strategies.volume_profile_strategy import VolumeProfileStrategy
+from strategies.rsi_strategy import RsiStrategy
+
+
+def _candle(close: str, volume: str = "0", idx: int = 0) -> Candle:
+ base = datetime(2025, 1, 1, tzinfo=timezone.utc)
+ from datetime import timedelta
+ return Candle(
+ symbol="BTCUSDT",
+ timeframe="1h",
+ open_time=base + timedelta(hours=idx),
+ open=Decimal(close),
+ high=Decimal(close),
+ low=Decimal(close),
+ close=Decimal(close),
+ volume=Decimal(volume),
+ )
+
+
+class TestVwapZeroVolume:
+ """VWAP strategy must not divide by zero when all volume is 0."""
+
+ def test_all_zero_volume_no_crash(self):
+ strategy = VwapStrategy()
+ for i in range(50):
+ result = strategy.on_candle(_candle("100", "0", i))
+ # Should return None (no signal) rather than raising ZeroDivisionError
+ assert result is None
+
+ def test_mixed_zero_and_nonzero_volume(self):
+ strategy = VwapStrategy()
+ # First 30 candles with volume to pass warmup, then zero volume
+ for i in range(30):
+ strategy.on_candle(_candle("100", "10", i))
+ # Now feed zero-volume candles; cumulative_vol > 0 so VWAP still defined
+ result = strategy.on_candle(_candle("100", "0", 30))
+ # Should not crash
+ assert result is None or result is not None # just checking no exception
+
+
+class TestVolumeProfileZeroVolume:
+ """Volume Profile strategy with all-zero volumes should return None."""
+
+ def test_all_zero_volume_returns_none(self):
+ strategy = VolumeProfileStrategy()
+ for i in range(150):
+ result = strategy.on_candle(_candle(str(100 + i % 10), "0", i))
+ # With zero total volume, should return None (no meaningful signal)
+ assert result is None
+
+
+class TestRsiZeroVolume:
+ """RSI strategy should process price correctly regardless of volume."""
+
+ def test_rsi_with_zero_volume_processes_price(self):
+ strategy = RsiStrategy()
+ # Feed descending prices to trigger oversold RSI
+ signals = []
+ for i in range(30):
+ price = str(100 - i)
+ result = strategy.on_candle(_candle(price, "0", i))
+ if result is not None:
+ signals.append(result)
+ # RSI uses only close prices, volume is irrelevant; should work fine
+ # With steadily dropping prices, RSI should go below oversold threshold
+ assert len(signals) >= 1
+ assert signals[0].side.value == "BUY" # oversold signal