diff options
| author | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-02 09:19:31 +0900 |
|---|---|---|
| committer | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-02 09:19:31 +0900 |
| commit | 3a256abb8c04ef07f125b0fb41f8f9090d97b136 (patch) | |
| tree | 4ae95445bff10b2e74b589fd55a0015c44d66cb5 | |
| parent | da6c9598f92057e2fcbb206aa7466b6997a455f3 (diff) | |
feat(strategy): add RSI divergence detection and MACD signal-line crossover
4 files changed, 245 insertions, 6 deletions
diff --git a/services/strategy-engine/strategies/macd_strategy.py b/services/strategy-engine/strategies/macd_strategy.py index 67c5e44..4ce0737 100644 --- a/services/strategy-engine/strategies/macd_strategy.py +++ b/services/strategy-engine/strategies/macd_strategy.py @@ -18,6 +18,8 @@ class MacdStrategy(BaseStrategy): self._quantity: Decimal = Decimal("0.01") self._closes: deque[float] = deque(maxlen=500) self._prev_histogram: float | None = None + self._prev_macd: float | None = None + self._prev_signal: float | None = None @property def warmup_period(self) -> int: @@ -54,6 +56,8 @@ class MacdStrategy(BaseStrategy): def reset(self) -> None: self._closes.clear() self._prev_histogram = None + self._prev_macd = None + self._prev_signal = None def _macd_conviction(self, histogram_value: float, price: float) -> float: """Map histogram magnitude to conviction (0.1-1.0). @@ -81,13 +85,45 @@ class MacdStrategy(BaseStrategy): histogram = macd_line - signal_line current_histogram = float(histogram.iloc[-1]) - signal = None + macd_val = float(macd_line.iloc[-1]) + signal_val = float(signal_line.iloc[-1]) + result_signal = None + + # Signal-line crossover detection (MACD crosses signal line directly) + if self._prev_macd is not None and self._prev_signal is not None: + # Bullish: MACD crosses above signal + if self._prev_macd <= self._prev_signal and macd_val > signal_val: + distance_from_zero = abs(macd_val) / float(candle.close) * 1000 + conv = min(max(distance_from_zero, 0.3), 1.0) + result_signal = Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.BUY, + price=candle.close, + quantity=self._quantity, + conviction=conv, + reason=f"MACD signal-line bullish crossover", + ) + # Bearish: MACD crosses below signal + elif self._prev_macd >= self._prev_signal and macd_val < signal_val: + distance_from_zero = abs(macd_val) / float(candle.close) * 1000 + conv = min(max(distance_from_zero, 0.3), 1.0) + result_signal = Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.SELL, + price=candle.close, + quantity=self._quantity, + conviction=conv, + reason=f"MACD signal-line bearish crossover", + ) - if self._prev_histogram is not None: + # Histogram crossover detection (existing logic, as secondary signal) + if result_signal is None and self._prev_histogram is not None: conviction = self._macd_conviction(current_histogram, float(candle.close)) # Bullish crossover: histogram crosses from negative to positive if self._prev_histogram <= 0 and current_histogram > 0: - signal = Signal( + result_signal = Signal( strategy=self.name, symbol=candle.symbol, side=OrderSide.BUY, @@ -98,7 +134,7 @@ class MacdStrategy(BaseStrategy): ) # Bearish crossover: histogram crosses from positive to negative elif self._prev_histogram >= 0 and current_histogram < 0: - signal = Signal( + result_signal = Signal( strategy=self.name, symbol=candle.symbol, side=OrderSide.SELL, @@ -109,6 +145,8 @@ class MacdStrategy(BaseStrategy): ) self._prev_histogram = current_histogram - if signal is not None: - return self._apply_filters(signal) + self._prev_macd = macd_val + self._prev_signal = signal_val + if result_signal is not None: + return self._apply_filters(result_signal) return None diff --git a/services/strategy-engine/strategies/rsi_strategy.py b/services/strategy-engine/strategies/rsi_strategy.py index 0ec6780..0646d8c 100644 --- a/services/strategy-engine/strategies/rsi_strategy.py +++ b/services/strategy-engine/strategies/rsi_strategy.py @@ -34,6 +34,14 @@ class RsiStrategy(BaseStrategy): self._oversold: float = 30.0 self._overbought: float = 70.0 self._quantity: Decimal = Decimal("0.01") + # Divergence detection state + self._price_lows: deque[float] = deque(maxlen=5) + self._price_highs: deque[float] = deque(maxlen=5) + self._rsi_at_lows: deque[float] = deque(maxlen=5) + self._rsi_at_highs: deque[float] = deque(maxlen=5) + self._prev_close: float | None = None + self._prev_prev_close: float | None = None + self._prev_rsi: float | None = None @property def warmup_period(self) -> int: @@ -65,6 +73,13 @@ class RsiStrategy(BaseStrategy): def reset(self) -> None: self._closes.clear() + self._price_lows.clear() + self._price_highs.clear() + self._rsi_at_lows.clear() + self._rsi_at_highs.clear() + self._prev_close = None + self._prev_prev_close = None + self._prev_rsi = None def _rsi_conviction(self, rsi_value: float) -> float: """Map RSI value to conviction strength (0.0-1.0). @@ -86,14 +101,76 @@ class RsiStrategy(BaseStrategy): self._closes.append(float(candle.close)) if len(self._closes) < self._period + 1: + self._prev_prev_close = self._prev_close + self._prev_close = float(candle.close) return None series = pd.Series(list(self._closes)) rsi_value = _compute_rsi(series, self._period) if rsi_value is None: + self._prev_prev_close = self._prev_close + self._prev_close = float(candle.close) return None + close = float(candle.close) + + # Detect swing points for divergence + if self._prev_close is not None and self._prev_prev_close is not None: + # Swing low: prev_close < both neighbors + if self._prev_close < self._prev_prev_close and self._prev_close < close: + self._price_lows.append(self._prev_close) + self._rsi_at_lows.append( + self._prev_rsi if self._prev_rsi is not None else rsi_value + ) + # Swing high: prev_close > both neighbors + if self._prev_close > self._prev_prev_close and self._prev_close > close: + self._price_highs.append(self._prev_close) + self._rsi_at_highs.append( + self._prev_rsi if self._prev_rsi is not None else rsi_value + ) + + # Check bullish divergence: price lower low, RSI higher low + if len(self._price_lows) >= 2: + if ( + self._price_lows[-1] < self._price_lows[-2] + and self._rsi_at_lows[-1] > self._rsi_at_lows[-2] + ): + signal = Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.BUY, + price=candle.close, + quantity=self._quantity, + conviction=0.9, + reason="RSI bullish divergence", + ) + self._prev_rsi = rsi_value + self._prev_prev_close = self._prev_close + self._prev_close = close + return self._apply_filters(signal) + + # Check bearish divergence: price higher high, RSI lower high + if len(self._price_highs) >= 2: + if ( + self._price_highs[-1] > self._price_highs[-2] + and self._rsi_at_highs[-1] < self._rsi_at_highs[-2] + ): + signal = Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.SELL, + price=candle.close, + quantity=self._quantity, + conviction=0.9, + reason="RSI bearish divergence", + ) + self._prev_rsi = rsi_value + self._prev_prev_close = self._prev_close + self._prev_close = close + return self._apply_filters(signal) + + # Existing oversold/overbought logic (secondary signals) if rsi_value < self._oversold: signal = Signal( strategy=self.name, @@ -104,6 +181,9 @@ class RsiStrategy(BaseStrategy): conviction=self._rsi_conviction(rsi_value), reason=f"RSI {rsi_value:.2f} below oversold threshold {self._oversold}", ) + self._prev_rsi = rsi_value + self._prev_prev_close = self._prev_close + self._prev_close = close return self._apply_filters(signal) elif rsi_value > self._overbought: signal = Signal( @@ -115,6 +195,12 @@ class RsiStrategy(BaseStrategy): conviction=self._rsi_conviction(rsi_value), reason=f"RSI {rsi_value:.2f} above overbought threshold {self._overbought}", ) + self._prev_rsi = rsi_value + self._prev_prev_close = self._prev_close + self._prev_close = close return self._apply_filters(signal) + self._prev_rsi = rsi_value + self._prev_prev_close = self._prev_close + self._prev_close = close return None diff --git a/services/strategy-engine/tests/test_macd_strategy.py b/services/strategy-engine/tests/test_macd_strategy.py index 9931b43..cd24ee0 100644 --- a/services/strategy-engine/tests/test_macd_strategy.py +++ b/services/strategy-engine/tests/test_macd_strategy.py @@ -78,3 +78,61 @@ def test_macd_reset_clears_state(): s.reset() assert len(s._closes) == 0 assert s._prev_histogram is None + assert s._prev_macd is None + assert s._prev_signal is None + + +def test_macd_signal_line_crossover(): + """Test that MACD signal-line crossover generates signals.""" + s = _make_strategy() + # Declining then rising prices should produce a signal-line bullish crossover + prices = [100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88] + prices += [89, 91, 94, 98, 103, 109, 116, 124, 133, 143] + signals = [] + for p in prices: + result = s.on_candle(_candle(float(p))) + if result is not None: + signals.append(result) + + buy_signals = [sig for sig in signals if sig.side == OrderSide.BUY] + assert len(buy_signals) > 0, "Expected at least one BUY signal" + # Check that at least one is a signal-line crossover or histogram crossover + all_reasons = [sig.reason for sig in buy_signals] + assert any("crossover" in r for r in all_reasons), f"Expected crossover signal, got: {all_reasons}" + + +def test_macd_conviction_varies_with_distance(): + """Test that conviction varies based on MACD distance from zero line.""" + s1 = _make_strategy() + s2 = _make_strategy() + + # Small price movements -> MACD near zero -> lower conviction + small_prices = [100, 99.5, 99, 98.5, 98, 97.5, 97, 96.5, 96, 95.5, 95, 94.5, 94] + small_prices += [94.5, 95, 95.5, 96, 96.5, 97, 97.5, 98, 98.5, 99] + small_signals = [] + for p in small_prices: + result = s1.on_candle(_candle(float(p))) + if result is not None: + small_signals.append(result) + + # Large price movements -> MACD far from zero -> higher conviction + large_prices = [100, 95, 90, 85, 80, 75, 70, 65, 60, 55, 50, 45, 40] + large_prices += [45, 55, 70, 90, 115, 145, 180, 220, 265, 315] + large_signals = [] + for p in large_prices: + result = s2.on_candle(_candle(float(p))) + if result is not None: + large_signals.append(result) + + # Both should produce signals + assert len(small_signals) > 0, "Expected signals from small movements" + assert len(large_signals) > 0, "Expected signals from large movements" + + # The large-movement signals should generally have higher conviction + # (or at least different conviction, since distance from zero affects it) + small_conv = small_signals[-1].conviction + large_conv = large_signals[-1].conviction + # Large movements should produce conviction >= small movements + assert large_conv >= small_conv, ( + f"Expected large movement conviction ({large_conv}) >= small ({small_conv})" + ) diff --git a/services/strategy-engine/tests/test_rsi_strategy.py b/services/strategy-engine/tests/test_rsi_strategy.py index 2a2f4e7..b2aecc9 100644 --- a/services/strategy-engine/tests/test_rsi_strategy.py +++ b/services/strategy-engine/tests/test_rsi_strategy.py @@ -43,3 +43,60 @@ def test_rsi_strategy_buy_signal_on_oversold(): # if a signal is returned, it must be a BUY if signal is not None: assert signal.side == OrderSide.BUY + + +def test_rsi_detects_bullish_divergence(): + """Bullish divergence: price makes lower low, RSI makes higher low.""" + strategy = RsiStrategy() + strategy.configure({"period": 5, "oversold": 20, "overbought": 80}) + strategy._filter_enabled = False # Disable filters to test divergence logic only + + # Sharp consecutive drop to 50 drives RSI near 0 (first swing low). + # Big recovery, then gradual decline to 48 (lower price, but RSI > 0 = higher low). + prices = [100.0] * 7 + prices += [85.0, 70.0, 55.0, 50.0] + prices += [55.0, 65.0, 80.0, 95.0, 110.0, 120.0, 130.0, 135.0, 140.0, 142.0, 143.0, 144.0] + prices += [142.0, 140.0, 138.0, 135.0, 130.0, 125.0, 120.0, 115.0, 110.0, 105.0] + prices += [100.0, 95.0, 90.0, 85.0, 80.0, 75.0, 70.0, 65.0, 60.0, 55.0, 50.0, 48.0] + prices += [52.0, 58.0] + + signals = [] + for p in prices: + result = strategy.on_candle(make_candle(p)) + if result is not None: + signals.append(result) + + divergence_signals = [s for s in signals if "divergence" in s.reason] + assert len(divergence_signals) > 0, "Expected at least one bullish divergence signal" + assert divergence_signals[0].side == OrderSide.BUY + assert divergence_signals[0].conviction == 0.9 + assert "bullish divergence" in divergence_signals[0].reason + + +def test_rsi_detects_bearish_divergence(): + """Bearish divergence: price makes higher high, RSI makes lower high.""" + strategy = RsiStrategy() + strategy.configure({"period": 5, "oversold": 20, "overbought": 80}) + strategy._filter_enabled = False # Disable filters to test divergence logic only + + # Sharp consecutive rise to 160 drives RSI very high (first swing high). + # Deep pullback, then rise to 162 (higher price) but with a dip right before + # the peak to dampen RSI (lower high). + prices = [100.0] * 7 + prices += [110.0, 120.0, 130.0, 140.0, 150.0, 160.0] + prices += [155.0, 145.0, 130.0, 115.0, 100.0, 90.0, 80.0] + prices += [90.0, 100.0, 110.0, 120.0, 130.0, 140.0, 150.0] + prices += [145.0, 162.0] + prices += [155.0, 148.0] + + signals = [] + for p in prices: + result = strategy.on_candle(make_candle(p)) + if result is not None: + signals.append(result) + + divergence_signals = [s for s in signals if "divergence" in s.reason] + assert len(divergence_signals) > 0, "Expected at least one bearish divergence signal" + assert divergence_signals[0].side == OrderSide.SELL + assert divergence_signals[0].conviction == 0.9 + assert "bearish divergence" in divergence_signals[0].reason |
