diff options
Diffstat (limited to 'services/strategy-engine/tests')
13 files changed, 700 insertions, 36 deletions
diff --git a/services/strategy-engine/tests/test_base_filters.py b/services/strategy-engine/tests/test_base_filters.py index 97d9e16..ae9ca05 100644 --- a/services/strategy-engine/tests/test_base_filters.py +++ b/services/strategy-engine/tests/test_base_filters.py @@ -1,11 +1,12 @@ """Tests for BaseStrategy filters (ADX, volume, ATR stops).""" + import sys from pathlib import Path + sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from decimal import Decimal from datetime import datetime, timezone -import pytest from shared.models import Candle, Signal, OrderSide from strategies.base import BaseStrategy @@ -28,9 +29,12 @@ class DummyStrategy(BaseStrategy): def on_candle(self, candle: Candle) -> Signal | None: self._update_filter_data(candle) signal = Signal( - strategy=self.name, symbol=candle.symbol, - side=OrderSide.BUY, price=candle.close, - quantity=self._quantity, reason="test", + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.BUY, + price=candle.close, + quantity=self._quantity, + reason="test", ) return self._apply_filters(signal) @@ -39,10 +43,13 @@ def _candle(price=100.0, volume=10.0, high=None, low=None): h = high if high is not None else price + 5 lo = low if low is not None else price - 5 return Candle( - symbol="BTCUSDT", timeframe="1h", + symbol="AAPL", + timeframe="1h", open_time=datetime(2025, 1, 1, tzinfo=timezone.utc), - open=Decimal(str(price)), high=Decimal(str(h)), - low=Decimal(str(lo)), close=Decimal(str(price)), + open=Decimal(str(price)), + high=Decimal(str(h)), + low=Decimal(str(lo)), + close=Decimal(str(price)), volume=Decimal(str(volume)), ) diff --git a/services/strategy-engine/tests/test_bollinger_strategy.py b/services/strategy-engine/tests/test_bollinger_strategy.py index 348a9e0..8261377 100644 --- a/services/strategy-engine/tests/test_bollinger_strategy.py +++ b/services/strategy-engine/tests/test_bollinger_strategy.py @@ -10,7 +10,7 @@ from strategies.bollinger_strategy import BollingerStrategy def make_candle(close: float) -> Candle: return Candle( - symbol="BTC/USDT", + symbol="AAPL", timeframe="1m", open_time=datetime(2024, 1, 1, tzinfo=timezone.utc), open=Decimal(str(close)), @@ -23,7 +23,7 @@ def make_candle(close: float) -> Candle: def _make_strategy() -> BollingerStrategy: s = BollingerStrategy() - s.configure({"period": 5, "num_std": 1.0, "min_bandwidth": 0.0}) + s.configure({"period": 5, "num_std": 1.0, "min_bandwidth": 0.0, "squeeze_threshold": 0.0}) return s @@ -99,3 +99,79 @@ def test_bollinger_reset_clears_state(): assert len(strategy._closes) == 1 assert strategy._was_below_lower is False assert strategy._was_above_upper is False + assert strategy._in_squeeze is False + assert strategy._squeeze_bars == 0 + + +def test_bollinger_squeeze_detection(): + """Tight bandwidth → no signal during squeeze.""" + # Use a strategy with a high squeeze threshold so constant prices trigger squeeze + s = BollingerStrategy() + s.configure( + { + "period": 5, + "num_std": 2.0, + "min_bandwidth": 0.0, + "squeeze_threshold": 0.5, # Very high threshold to ensure squeeze triggers + } + ) + + # Feed identical prices → bandwidth = 0 (below any threshold) + for _ in range(6): + result = s.on_candle(make_candle(100.0)) + + # With identical prices, std=0, bandwidth=0 < 0.5 → squeeze, no signal + assert s._in_squeeze is True + assert result is None + + +def test_bollinger_squeeze_breakout_buy(): + """Squeeze ends with price above SMA → BUY signal.""" + s = BollingerStrategy() + s.configure( + { + "period": 5, + "num_std": 1.0, + "min_bandwidth": 0.0, + "squeeze_threshold": 0.01, + } + ) + + # Feed identical prices to create a squeeze (bandwidth = 0) + for _ in range(6): + s.on_candle(make_candle(100.0)) + + assert s._in_squeeze is True + + # Now feed a price that creates enough spread to exit squeeze AND is above SMA + signal = s.on_candle(make_candle(120.0)) + assert signal is not None + assert signal.side == OrderSide.BUY + assert "squeeze breakout UP" in signal.reason + + +def test_bollinger_pct_b_conviction(): + """Signals near band extremes have higher conviction via %B.""" + s = BollingerStrategy() + s.configure( + { + "period": 5, + "num_std": 1.0, + "min_bandwidth": 0.0, + "squeeze_threshold": 0.0, # Disable squeeze for this test + } + ) + + # Build up with stable prices + for _ in range(5): + s.on_candle(make_candle(100.0)) + + # Drop below lower band + s.on_candle(make_candle(50.0)) + + # Recover just at the lower band edge — %B close to 0 → high conviction + signal = s.on_candle(make_candle(100.0)) + assert signal is not None + assert signal.side == OrderSide.BUY + # conviction = max(1.0 - pct_b, 0.3), with pct_b near lower → conviction should be >= 0.3 + assert signal.conviction >= 0.3 diff --git a/services/strategy-engine/tests/test_combined_strategy.py b/services/strategy-engine/tests/test_combined_strategy.py index 3408a89..8a4dc74 100644 --- a/services/strategy-engine/tests/test_combined_strategy.py +++ b/services/strategy-engine/tests/test_combined_strategy.py @@ -72,7 +72,7 @@ class NeutralStrategy(BaseStrategy): def _candle(price=100.0): return Candle( - symbol="BTCUSDT", + symbol="AAPL", timeframe="1m", open_time=datetime(2025, 1, 1, tzinfo=timezone.utc), open=Decimal(str(price)), @@ -167,3 +167,60 @@ def test_combined_invalid_weight(): c.configure({}) with pytest.raises(ValueError): c.add_strategy(AlwaysBuyStrategy(), weight=-1.0) + + +def test_combined_record_result(): + """Verify trade history tracking works correctly.""" + c = CombinedStrategy() + c.configure({"adaptive_weights": True, "history_window": 5}) + + c.record_result("test_strat", True) + c.record_result("test_strat", False) + c.record_result("test_strat", True) + + assert len(c._trade_history["test_strat"]) == 3 + assert c._trade_history["test_strat"] == [True, False, True] + + # Fill beyond window size to test trimming + for _ in range(5): + c.record_result("test_strat", False) + + assert len(c._trade_history["test_strat"]) == 5 # Trimmed to history_window + + +def test_combined_adaptive_weight_increases_for_winners(): + """Strategy with high win rate gets higher effective weight.""" + c = CombinedStrategy() + c.configure({"threshold": 0.3, "adaptive_weights": True, "history_window": 20}) + c.add_strategy(AlwaysBuyStrategy(), weight=1.0) + + # Record high win rate for always_buy (80% wins) + for _ in range(8): + c.record_result("always_buy", True) + for _ in range(2): + c.record_result("always_buy", False) + + # Adaptive weight should be > base weight (1.0) + adaptive_w = c._get_adaptive_weight("always_buy", 1.0) + assert adaptive_w > 1.0 + # 80% win rate -> scale = 0.5 + 0.8 = 1.3 -> weight = 1.3 + assert abs(adaptive_w - 1.3) < 0.01 + + +def test_combined_adaptive_weight_decreases_for_losers(): + """Strategy with low win rate gets lower effective weight.""" + c = CombinedStrategy() + c.configure({"threshold": 0.3, "adaptive_weights": True, "history_window": 20}) + c.add_strategy(AlwaysBuyStrategy(), weight=1.0) + + # Record low win rate for always_buy (20% wins) + for _ in range(2): + c.record_result("always_buy", True) + for _ in range(8): + c.record_result("always_buy", False) + + # Adaptive weight should be < base weight (1.0) + adaptive_w = c._get_adaptive_weight("always_buy", 1.0) + assert adaptive_w < 1.0 + # 20% win rate -> scale = 0.5 + 0.2 = 0.7 -> weight = 0.7 + assert abs(adaptive_w - 0.7) < 0.01 diff --git a/services/strategy-engine/tests/test_ema_crossover_strategy.py b/services/strategy-engine/tests/test_ema_crossover_strategy.py index 0cf767b..7028eb0 100644 --- a/services/strategy-engine/tests/test_ema_crossover_strategy.py +++ b/services/strategy-engine/tests/test_ema_crossover_strategy.py @@ -10,7 +10,7 @@ from strategies.ema_crossover_strategy import EmaCrossoverStrategy def make_candle(close: float) -> Candle: return Candle( - symbol="BTC/USDT", + symbol="AAPL", timeframe="1m", open_time=datetime(2024, 1, 1, tzinfo=timezone.utc), open=Decimal(str(close)), @@ -21,9 +21,18 @@ def make_candle(close: float) -> Candle: ) -def _make_strategy(short: int = 3, long: int = 6) -> EmaCrossoverStrategy: +def _make_strategy( + short: int = 3, long: int = 6, pullback_enabled: bool = False +) -> EmaCrossoverStrategy: s = EmaCrossoverStrategy() - s.configure({"short_period": short, "long_period": long, "quantity": "0.01"}) + s.configure( + { + "short_period": short, + "long_period": long, + "quantity": "0.01", + "pullback_enabled": pullback_enabled, + } + ) return s @@ -97,3 +106,110 @@ def test_ema_reset_clears_state(): # Internal state should be cleared assert len(strategy._closes) == 1 assert strategy._prev_short_above is None + assert strategy._pending_signal is None + + +def test_ema_pullback_entry(): + """Crossover detected, then pullback to short EMA triggers signal.""" + strategy = EmaCrossoverStrategy() + strategy.configure( + { + "short_period": 3, + "long_period": 6, + "quantity": "0.01", + "pullback_enabled": True, + "pullback_tolerance": 0.05, # 5% tolerance for test simplicity + } + ) + + # Declining prices so short EMA stays below long EMA + declining = [100, 98, 96, 94, 92, 90, 88, 86, 84, 82] + for price in declining: + strategy.on_candle(make_candle(price)) + + # Sharp rise to force golden cross — with pullback enabled, no signal yet + rising = [120, 140, 160] + for price in rising: + strategy.on_candle(make_candle(price)) + + # With pullback enabled, crossover should NOT produce immediate signal + # but _pending_signal should be set + assert strategy._pending_signal == "BUY" + + # Now feed a candle whose close is near the short EMA (pullback) + # The short EMA will be tracking recent prices; feed a price that pulls back + # toward it. We use a moderate price to get close to short EMA. + import pandas as pd + + series = pd.Series(list(strategy._closes)) + short_ema_val = series.ewm(span=3, adjust=False).mean().iloc[-1] + # Feed a candle at approximately the short EMA value + result = strategy.on_candle(make_candle(short_ema_val)) + assert result is not None + assert result.side == OrderSide.BUY + assert "pullback" in result.reason + + +def test_ema_pullback_cancelled_on_reversal(): + """Crossover detected, then reversal cancels the pending signal.""" + strategy = EmaCrossoverStrategy() + strategy.configure( + { + "short_period": 3, + "long_period": 6, + "quantity": "0.01", + "pullback_enabled": True, + "pullback_tolerance": 0.001, # Very tight tolerance — won't trigger easily + } + ) + + # Declining prices + declining = [100, 98, 96, 94, 92, 90, 88, 86, 84, 82] + for price in declining: + strategy.on_candle(make_candle(price)) + + # Sharp rise to force golden cross + for price in [120, 140, 160]: + strategy.on_candle(make_candle(price)) + + assert strategy._pending_signal == "BUY" + + # Now sharp decline to reverse the crossover (death cross) + for price in [60, 40, 20]: + strategy.on_candle(make_candle(price)) + + # The BUY pending signal should be cancelled because short EMA fell below long EMA. + # A new death cross may set _pending_signal to "SELL", but the original "BUY" is gone. + assert strategy._pending_signal != "BUY" + + +def test_ema_immediate_mode(): + """With pullback_enabled=False, original immediate entry works.""" + strategy = EmaCrossoverStrategy() + strategy.configure( + { + "short_period": 3, + "long_period": 6, + "quantity": "0.01", + "pullback_enabled": False, + } + ) + + # Declining prices so short EMA stays below long EMA + declining = [100, 98, 96, 94, 92, 90, 88, 86, 84, 82] + for price in declining: + strategy.on_candle(make_candle(price)) + + # Sharp rise to force golden cross — immediate mode should fire signal + rising = [120, 140, 160] + signal = None + for price in rising: + result = strategy.on_candle(make_candle(price)) + if result is not None: + signal = result + + assert signal is not None + assert signal.side == OrderSide.BUY + assert "Golden Cross" in signal.reason + # No pending signal should be set + assert strategy._pending_signal is None diff --git a/services/strategy-engine/tests/test_engine.py b/services/strategy-engine/tests/test_engine.py index ac9a596..2623027 100644 --- a/services/strategy-engine/tests/test_engine.py +++ b/services/strategy-engine/tests/test_engine.py @@ -13,7 +13,7 @@ from strategy_engine.engine import StrategyEngine def make_candle_event() -> dict: candle = Candle( - symbol="BTC/USDT", + symbol="AAPL", timeframe="1m", open_time=datetime(2024, 1, 1, tzinfo=timezone.utc), open=Decimal("50000"), @@ -28,7 +28,7 @@ def make_candle_event() -> dict: def make_signal() -> Signal: return Signal( strategy="test", - symbol="BTC/USDT", + symbol="AAPL", side=OrderSide.BUY, price=Decimal("50050"), quantity=Decimal("0.01"), @@ -46,12 +46,12 @@ async def test_engine_dispatches_candle_to_strategies(): strategy.on_candle = MagicMock(return_value=None) engine = StrategyEngine(broker=broker, strategies=[strategy]) - await engine.process_once("candles.BTC_USDT", "0") + await engine.process_once("candles.AAPL", "0") strategy.on_candle.assert_called_once() candle_arg = strategy.on_candle.call_args[0][0] assert isinstance(candle_arg, Candle) - assert candle_arg.symbol == "BTC/USDT" + assert candle_arg.symbol == "AAPL" @pytest.mark.asyncio @@ -64,7 +64,7 @@ async def test_engine_publishes_signal_when_strategy_returns_one(): strategy.on_candle = MagicMock(return_value=make_signal()) engine = StrategyEngine(broker=broker, strategies=[strategy]) - await engine.process_once("candles.BTC_USDT", "0") + await engine.process_once("candles.AAPL", "0") broker.publish.assert_called_once() call_args = broker.publish.call_args diff --git a/services/strategy-engine/tests/test_grid_strategy.py b/services/strategy-engine/tests/test_grid_strategy.py index 79eb22a..878b900 100644 --- a/services/strategy-engine/tests/test_grid_strategy.py +++ b/services/strategy-engine/tests/test_grid_strategy.py @@ -10,7 +10,7 @@ from strategies.grid_strategy import GridStrategy def make_candle(close: float) -> Candle: return Candle( - symbol="BTC/USDT", + symbol="AAPL", timeframe="1m", open_time=datetime(2024, 1, 1, tzinfo=timezone.utc), open=Decimal(str(close)), @@ -60,3 +60,41 @@ def test_grid_strategy_no_signal_in_same_zone(): strategy.on_candle(make_candle(50000)) signal = strategy.on_candle(make_candle(50100)) assert signal is None + + +def test_grid_exit_on_trend_break(): + """Price drops well below grid range → SELL signal emitted.""" + strategy = _configured_strategy() + # Grid range is 48000-52000, exit_threshold_pct defaults to 5% + # Lower bound = 48000 * 0.95 = 45600 + # Establish a zone first + strategy.on_candle(make_candle(50000)) + # Price drops far below the grid range + signal = strategy.on_candle(make_candle(45000)) + assert signal is not None + assert signal.side == OrderSide.SELL + assert "broke out of range" in signal.reason + + +def test_grid_no_signal_while_out_of_range(): + """After exit signal, no more grid signals until price returns to range.""" + strategy = _configured_strategy() + # Establish a zone + strategy.on_candle(make_candle(50000)) + # First out-of-range candle → SELL exit signal + signal = strategy.on_candle(make_candle(45000)) + assert signal is not None + assert signal.side == OrderSide.SELL + + # Subsequent out-of-range candles → no signals + signal = strategy.on_candle(make_candle(44000)) + assert signal is None + + signal = strategy.on_candle(make_candle(43000)) + assert signal is None + + # Price returns to grid range → grid signals resume + strategy.on_candle(make_candle(50000)) + signal = strategy.on_candle(make_candle(48100)) + assert signal is not None + assert signal.side == OrderSide.BUY diff --git a/services/strategy-engine/tests/test_indicators.py b/services/strategy-engine/tests/test_indicators.py index ac5b505..481569b 100644 --- a/services/strategy-engine/tests/test_indicators.py +++ b/services/strategy-engine/tests/test_indicators.py @@ -1,6 +1,8 @@ """Tests for technical indicator library.""" + import sys from pathlib import Path + sys.path.insert(0, str(Path(__file__).resolve().parents[1])) import pandas as pd diff --git a/services/strategy-engine/tests/test_macd_strategy.py b/services/strategy-engine/tests/test_macd_strategy.py index 9931b43..556fd4c 100644 --- a/services/strategy-engine/tests/test_macd_strategy.py +++ b/services/strategy-engine/tests/test_macd_strategy.py @@ -10,7 +10,7 @@ from strategies.macd_strategy import MacdStrategy def _candle(price: float) -> Candle: return Candle( - symbol="BTC/USDT", + symbol="AAPL", timeframe="1m", open_time=datetime(2024, 1, 1, tzinfo=timezone.utc), open=Decimal(str(price)), @@ -78,3 +78,63 @@ def test_macd_reset_clears_state(): s.reset() assert len(s._closes) == 0 assert s._prev_histogram is None + assert s._prev_macd is None + assert s._prev_signal is None + + +def test_macd_signal_line_crossover(): + """Test that MACD signal-line crossover generates signals.""" + s = _make_strategy() + # Declining then rising prices should produce a signal-line bullish crossover + prices = [100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88] + prices += [89, 91, 94, 98, 103, 109, 116, 124, 133, 143] + signals = [] + for p in prices: + result = s.on_candle(_candle(float(p))) + if result is not None: + signals.append(result) + + buy_signals = [sig for sig in signals if sig.side == OrderSide.BUY] + assert len(buy_signals) > 0, "Expected at least one BUY signal" + # Check that at least one is a signal-line crossover or histogram crossover + all_reasons = [sig.reason for sig in buy_signals] + assert any("crossover" in r for r in all_reasons), ( + f"Expected crossover signal, got: {all_reasons}" + ) + + +def test_macd_conviction_varies_with_distance(): + """Test that conviction varies based on MACD distance from zero line.""" + s1 = _make_strategy() + s2 = _make_strategy() + + # Small price movements -> MACD near zero -> lower conviction + small_prices = [100, 99.5, 99, 98.5, 98, 97.5, 97, 96.5, 96, 95.5, 95, 94.5, 94] + small_prices += [94.5, 95, 95.5, 96, 96.5, 97, 97.5, 98, 98.5, 99] + small_signals = [] + for p in small_prices: + result = s1.on_candle(_candle(float(p))) + if result is not None: + small_signals.append(result) + + # Large price movements -> MACD far from zero -> higher conviction + large_prices = [100, 95, 90, 85, 80, 75, 70, 65, 60, 55, 50, 45, 40] + large_prices += [45, 55, 70, 90, 115, 145, 180, 220, 265, 315] + large_signals = [] + for p in large_prices: + result = s2.on_candle(_candle(float(p))) + if result is not None: + large_signals.append(result) + + # Both should produce signals + assert len(small_signals) > 0, "Expected signals from small movements" + assert len(large_signals) > 0, "Expected signals from large movements" + + # The large-movement signals should generally have higher conviction + # (or at least different conviction, since distance from zero affects it) + small_conv = small_signals[-1].conviction + large_conv = large_signals[-1].conviction + # Large movements should produce conviction >= small movements + assert large_conv >= small_conv, ( + f"Expected large movement conviction ({large_conv}) >= small ({small_conv})" + ) diff --git a/services/strategy-engine/tests/test_moc_strategy.py b/services/strategy-engine/tests/test_moc_strategy.py new file mode 100644 index 0000000..1928a28 --- /dev/null +++ b/services/strategy-engine/tests/test_moc_strategy.py @@ -0,0 +1,152 @@ +"""Tests for MOC (Market on Close) strategy.""" + +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + +from datetime import datetime, timezone +from decimal import Decimal + +from shared.models import Candle, OrderSide +from strategies.moc_strategy import MocStrategy + + +def _candle(price, hour=20, minute=0, volume=100.0, day=1, open_price=None): + op = open_price if open_price is not None else price - 1 # Default: bullish + return Candle( + symbol="AAPL", + timeframe="5Min", + open_time=datetime(2025, 1, day, hour, minute, tzinfo=timezone.utc), + open=Decimal(str(op)), + high=Decimal(str(price + 1)), + low=Decimal(str(min(op, price) - 1)), + close=Decimal(str(price)), + volume=Decimal(str(volume)), + ) + + +def _make_strategy(**overrides): + s = MocStrategy() + params = { + "quantity_pct": 0.2, + "stop_loss_pct": 2.0, + "rsi_min": 30, + "rsi_max": 70, # Wider for tests + "ema_period": 5, + "volume_avg_period": 5, + "min_volume_ratio": 0.5, + "buy_start_utc": 19, + "buy_end_utc": 21, + "sell_start_utc": 13, + "sell_end_utc": 15, + "max_positions": 5, + } + params.update(overrides) + s.configure(params) + return s + + +def test_moc_warmup_period(): + s = _make_strategy(ema_period=20, volume_avg_period=15) + assert s.warmup_period == 21 + + +def test_moc_no_signal_outside_buy_window(): + s = _make_strategy() + # Hour 12 UTC — not in buy (19-21) or sell (13-15) window + for i in range(10): + sig = s.on_candle(_candle(150 + i, hour=12, minute=i * 5)) + assert sig is None + + +def test_moc_buy_signal_in_window(): + s = _make_strategy(ema_period=3) + # Build up history with some oscillation so RSI settles in the 30-70 range + prices = [ + 150, + 149, + 151, + 148, + 152, + 149, + 150, + 151, + 148, + 150, + 149, + 151, + 150, + 152, + 151, + 153, + 152, + 154, + 153, + 155, + ] + signals = [] + for i, p in enumerate(prices): + sig = s.on_candle(_candle(p, hour=20, minute=i * 2, volume=200.0)) + if sig is not None: + signals.append(sig) + buy_signals = [sig for sig in signals if sig.side == OrderSide.BUY] + assert len(buy_signals) > 0 + assert buy_signals[0].strategy == "moc" + + +def test_moc_sell_at_open(): + s = _make_strategy(ema_period=3) + # Force entry + for i in range(10): + s.on_candle(_candle(150 + i, hour=20, minute=i * 3, volume=200.0)) + + if s._in_position: + # Next day, sell window + sig = s.on_candle(_candle(155, hour=14, minute=0, day=2)) + assert sig is not None + assert sig.side == OrderSide.SELL + assert "MOC sell" in sig.reason + + +def test_moc_stop_loss(): + s = _make_strategy(ema_period=3, stop_loss_pct=1.0) + for i in range(10): + s.on_candle(_candle(150 + i, hour=20, minute=i * 3, volume=200.0)) + + if s._in_position: + drop_price = s._entry_price * 0.98 # -2% + sig = s.on_candle(_candle(drop_price, hour=22, minute=0)) + if sig is not None: + assert sig.side == OrderSide.SELL + assert "stop loss" in sig.reason + + +def test_moc_no_buy_on_bearish_candle(): + s = _make_strategy(ema_period=3) + for i in range(8): + s.on_candle(_candle(150, hour=20, minute=i * 3, volume=200.0)) + # Bearish candle (open > close) + s.on_candle(_candle(149, hour=20, minute=30, open_price=151)) + # May or may not signal depending on other criteria, but bearish should reduce chances + # Just verify no crash + + +def test_moc_only_one_buy_per_day(): + s = _make_strategy(ema_period=3) + buy_count = 0 + for i in range(20): + sig = s.on_candle(_candle(150 + i * 0.3, hour=20, minute=i * 2, volume=200.0)) + if sig is not None and sig.side == OrderSide.BUY: + buy_count += 1 + assert buy_count <= 1 + + +def test_moc_reset(): + s = _make_strategy() + s.on_candle(_candle(150, hour=20)) + s._in_position = True + s.reset() + assert not s._in_position + assert len(s._closes) == 0 + assert not s._bought_today diff --git a/services/strategy-engine/tests/test_multi_symbol.py b/services/strategy-engine/tests/test_multi_symbol.py index cb8088c..671a9d3 100644 --- a/services/strategy-engine/tests/test_multi_symbol.py +++ b/services/strategy-engine/tests/test_multi_symbol.py @@ -22,7 +22,7 @@ async def test_engine_processes_multiple_streams(): broker = AsyncMock() candle_btc = Candle( - symbol="BTCUSDT", + symbol="AAPL", timeframe="1m", open_time=datetime(2025, 1, 1, tzinfo=timezone.utc), open=Decimal("50000"), @@ -32,7 +32,7 @@ async def test_engine_processes_multiple_streams(): volume=Decimal("10"), ) candle_eth = Candle( - symbol="ETHUSDT", + symbol="MSFT", timeframe="1m", open_time=datetime(2025, 1, 1, tzinfo=timezone.utc), open=Decimal("3000"), @@ -45,16 +45,16 @@ async def test_engine_processes_multiple_streams(): btc_events = [CandleEvent(data=candle_btc).to_dict()] eth_events = [CandleEvent(data=candle_eth).to_dict()] - # First call returns BTC event, second ETH, then empty - call_count = {"btc": 0, "eth": 0} + # First call returns AAPL event, second MSFT, then empty + call_count = {"aapl": 0, "msft": 0} async def mock_read(stream, **kwargs): - if "BTC" in stream: - call_count["btc"] += 1 - return btc_events if call_count["btc"] == 1 else [] - elif "ETH" in stream: - call_count["eth"] += 1 - return eth_events if call_count["eth"] == 1 else [] + if "AAPL" in stream: + call_count["aapl"] += 1 + return btc_events if call_count["aapl"] == 1 else [] + elif "MSFT" in stream: + call_count["msft"] += 1 + return eth_events if call_count["msft"] == 1 else [] return [] broker.read = AsyncMock(side_effect=mock_read) @@ -65,8 +65,8 @@ async def test_engine_processes_multiple_streams(): engine = StrategyEngine(broker=broker, strategies=[strategy]) # Process both streams - await engine.process_once("candles.BTCUSDT", "$") - await engine.process_once("candles.ETHUSDT", "$") + await engine.process_once("candles.AAPL", "$") + await engine.process_once("candles.MSFT", "$") # Strategy should have been called with both candles assert strategy.on_candle.call_count == 2 diff --git a/services/strategy-engine/tests/test_rsi_strategy.py b/services/strategy-engine/tests/test_rsi_strategy.py index 2a2f4e7..6d31fd5 100644 --- a/services/strategy-engine/tests/test_rsi_strategy.py +++ b/services/strategy-engine/tests/test_rsi_strategy.py @@ -10,7 +10,7 @@ from strategies.rsi_strategy import RsiStrategy def make_candle(close: float, idx: int = 0) -> Candle: return Candle( - symbol="BTC/USDT", + symbol="AAPL", timeframe="1m", open_time=datetime(2024, 1, 1, tzinfo=timezone.utc), open=Decimal(str(close)), @@ -43,3 +43,60 @@ def test_rsi_strategy_buy_signal_on_oversold(): # if a signal is returned, it must be a BUY if signal is not None: assert signal.side == OrderSide.BUY + + +def test_rsi_detects_bullish_divergence(): + """Bullish divergence: price makes lower low, RSI makes higher low.""" + strategy = RsiStrategy() + strategy.configure({"period": 5, "oversold": 20, "overbought": 80}) + strategy._filter_enabled = False # Disable filters to test divergence logic only + + # Sharp consecutive drop to 50 drives RSI near 0 (first swing low). + # Big recovery, then gradual decline to 48 (lower price, but RSI > 0 = higher low). + prices = [100.0] * 7 + prices += [85.0, 70.0, 55.0, 50.0] + prices += [55.0, 65.0, 80.0, 95.0, 110.0, 120.0, 130.0, 135.0, 140.0, 142.0, 143.0, 144.0] + prices += [142.0, 140.0, 138.0, 135.0, 130.0, 125.0, 120.0, 115.0, 110.0, 105.0] + prices += [100.0, 95.0, 90.0, 85.0, 80.0, 75.0, 70.0, 65.0, 60.0, 55.0, 50.0, 48.0] + prices += [52.0, 58.0] + + signals = [] + for p in prices: + result = strategy.on_candle(make_candle(p)) + if result is not None: + signals.append(result) + + divergence_signals = [s for s in signals if "divergence" in s.reason] + assert len(divergence_signals) > 0, "Expected at least one bullish divergence signal" + assert divergence_signals[0].side == OrderSide.BUY + assert divergence_signals[0].conviction == 0.9 + assert "bullish divergence" in divergence_signals[0].reason + + +def test_rsi_detects_bearish_divergence(): + """Bearish divergence: price makes higher high, RSI makes lower high.""" + strategy = RsiStrategy() + strategy.configure({"period": 5, "oversold": 20, "overbought": 80}) + strategy._filter_enabled = False # Disable filters to test divergence logic only + + # Sharp consecutive rise to 160 drives RSI very high (first swing high). + # Deep pullback, then rise to 162 (higher price) but with a dip right before + # the peak to dampen RSI (lower high). + prices = [100.0] * 7 + prices += [110.0, 120.0, 130.0, 140.0, 150.0, 160.0] + prices += [155.0, 145.0, 130.0, 115.0, 100.0, 90.0, 80.0] + prices += [90.0, 100.0, 110.0, 120.0, 130.0, 140.0, 150.0] + prices += [145.0, 162.0] + prices += [155.0, 148.0] + + signals = [] + for p in prices: + result = strategy.on_candle(make_candle(p)) + if result is not None: + signals.append(result) + + divergence_signals = [s for s in signals if "divergence" in s.reason] + assert len(divergence_signals) > 0, "Expected at least one bearish divergence signal" + assert divergence_signals[0].side == OrderSide.SELL + assert divergence_signals[0].conviction == 0.9 + assert "bearish divergence" in divergence_signals[0].reason diff --git a/services/strategy-engine/tests/test_volume_profile_strategy.py b/services/strategy-engine/tests/test_volume_profile_strategy.py index 71f0eca..65ee2e8 100644 --- a/services/strategy-engine/tests/test_volume_profile_strategy.py +++ b/services/strategy-engine/tests/test_volume_profile_strategy.py @@ -10,7 +10,7 @@ from strategies.volume_profile_strategy import VolumeProfileStrategy def make_candle(close: float, volume: float = 1.0) -> Candle: return Candle( - symbol="BTC/USDT", + symbol="AAPL", timeframe="1m", open_time=datetime(2024, 1, 1, tzinfo=timezone.utc), open=Decimal(str(close)), @@ -125,3 +125,56 @@ def test_volume_profile_reset_clears_state(): # After reset, should not have enough data result = strategy.on_candle(make_candle(100.0, 10.0)) assert result is None + + +def test_volume_profile_hvn_detection(): + """Feed clustered volume at specific price levels to produce HVN nodes.""" + strategy = VolumeProfileStrategy() + strategy.configure({"lookback_period": 20, "num_bins": 10, "value_area_pct": 0.7}) + + # Create a profile with very high volume at price ~100 and low volume elsewhere + # Prices range from 90 to 110, heavy volume concentrated at 100 + candles_data = [] + # Low volume at extremes + for p in [90, 91, 92, 109, 110]: + candles_data.append((p, 1.0)) + # Very high volume around 100 + for _ in range(15): + candles_data.append((100, 100.0)) + + for price, vol in candles_data: + strategy.on_candle(make_candle(price, vol)) + + # Access the internal method to verify HVN detection + result = strategy._compute_value_area() + assert result is not None + poc, va_low, va_high, hvn_levels, lvn_levels = result + + # The bin containing price ~100 should have very high volume -> HVN + assert len(hvn_levels) > 0 + # At least one HVN should be near 100 + assert any(abs(h - 100) < 5 for h in hvn_levels) + + +def test_volume_profile_reset_thorough(): + """Verify all state is cleared on reset.""" + strategy = VolumeProfileStrategy() + strategy.configure({"lookback_period": 10, "num_bins": 5}) + + # Build up state + for _ in range(10): + strategy.on_candle(make_candle(100.0, 10.0)) + # Set below/above VA flags + strategy.on_candle(make_candle(50.0, 1.0)) # below VA + strategy.on_candle(make_candle(200.0, 1.0)) # above VA + + strategy.reset() + + # Verify all state cleared + assert len(strategy._candles) == 0 + assert strategy._was_below_va is False + assert strategy._was_above_va is False + + # Should not produce signal since no data + result = strategy.on_candle(make_candle(100.0, 10.0)) + assert result is None diff --git a/services/strategy-engine/tests/test_vwap_strategy.py b/services/strategy-engine/tests/test_vwap_strategy.py index 5d76b04..2c34b01 100644 --- a/services/strategy-engine/tests/test_vwap_strategy.py +++ b/services/strategy-engine/tests/test_vwap_strategy.py @@ -13,15 +13,18 @@ def make_candle( high: float | None = None, low: float | None = None, volume: float = 1.0, + open_time: datetime | None = None, ) -> Candle: if high is None: high = close if low is None: low = close + if open_time is None: + open_time = datetime(2024, 1, 1, tzinfo=timezone.utc) return Candle( - symbol="BTC/USDT", + symbol="AAPL", timeframe="1m", - open_time=datetime(2024, 1, 1, tzinfo=timezone.utc), + open_time=open_time, open=Decimal(str(close)), high=Decimal(str(high)), low=Decimal(str(low)), @@ -99,3 +102,46 @@ def test_vwap_reset_clears_state(): assert strategy._candle_count == 0 assert strategy._was_below_vwap is False assert strategy._was_above_vwap is False + assert strategy._current_date is None + assert len(strategy._tp_values) == 0 + assert len(strategy._vwap_values) == 0 + + +def test_vwap_daily_reset(): + """Candles from two different dates cause VWAP to reset.""" + strategy = _configured_strategy() + + day1 = datetime(2024, 1, 1, tzinfo=timezone.utc) + day2 = datetime(2024, 1, 2, tzinfo=timezone.utc) + + # Feed 35 candles on day 1 to build VWAP state + for i in range(35): + strategy.on_candle(make_candle(100.0, high=101.0, low=99.0, open_time=day1)) + + # Verify state is built up + assert strategy._candle_count == 35 + assert strategy._cumulative_vol > 0 + assert strategy._current_date == "2024-01-01" + + # Feed first candle of day 2 — should reset + strategy.on_candle(make_candle(100.0, high=101.0, low=99.0, open_time=day2)) + + # After reset, candle_count should be 1 (the new candle) + assert strategy._candle_count == 1 + assert strategy._current_date == "2024-01-02" + + +def test_vwap_reset_clears_date(): + """Verify reset() clears _current_date and deviation band state.""" + strategy = _configured_strategy() + + for _ in range(35): + strategy.on_candle(make_candle(100.0)) + + assert strategy._current_date is not None + + strategy.reset() + + assert strategy._current_date is None + assert len(strategy._tp_values) == 0 + assert len(strategy._vwap_values) == 0 |
