from collections import deque from decimal import Decimal import pandas as pd from shared.models import Candle, Signal, OrderSide from strategies.base import BaseStrategy def _compute_rsi(series: pd.Series, period: int) -> float | None: """Compute RSI using Wilder's smoothing (EMA-based).""" if len(series) < period + 1: return None delta = series.diff() gain = delta.clip(lower=0) loss = -delta.clip(upper=0) avg_gain = gain.ewm(com=period - 1, min_periods=period).mean() avg_loss = loss.ewm(com=period - 1, min_periods=period).mean() rs = avg_gain / avg_loss.replace(0, float("nan")) rsi = 100 - (100 / (1 + rs)) value = rsi.iloc[-1] if pd.isna(value): return None return float(value) class RsiStrategy(BaseStrategy): name: str = "rsi" def __init__(self) -> None: super().__init__() self._closes: deque[float] = deque(maxlen=200) self._period: int = 14 self._oversold: float = 30.0 self._overbought: float = 70.0 self._quantity: Decimal = Decimal("0.01") # Divergence detection state self._price_lows: deque[float] = deque(maxlen=5) self._price_highs: deque[float] = deque(maxlen=5) self._rsi_at_lows: deque[float] = deque(maxlen=5) self._rsi_at_highs: deque[float] = deque(maxlen=5) self._prev_close: float | None = None self._prev_prev_close: float | None = None self._prev_rsi: float | None = None @property def warmup_period(self) -> int: return self._period + 1 def configure(self, params: dict) -> None: self._period = int(params.get("period", 14)) self._oversold = float(params.get("oversold", 30)) self._overbought = float(params.get("overbought", 70)) self._quantity = Decimal(str(params.get("quantity", "0.01"))) if self._period < 2: raise ValueError(f"RSI period must be >= 2, got {self._period}") if not (0 < self._oversold < self._overbought < 100): raise ValueError( f"RSI thresholds must be 0 < oversold < overbought < 100, " f"got oversold={self._oversold}, overbought={self._overbought}" ) if self._quantity <= 0: raise ValueError(f"Quantity must be positive, got {self._quantity}") self._init_filters( require_trend=False, adx_threshold=float(params.get("adx_threshold", 25.0)), min_volume_ratio=float(params.get("min_volume_ratio", 0.5)), atr_stop_multiplier=float(params.get("atr_stop_multiplier", 2.0)), atr_tp_multiplier=float(params.get("atr_tp_multiplier", 3.0)), ) def reset(self) -> None: self._closes.clear() self._price_lows.clear() self._price_highs.clear() self._rsi_at_lows.clear() self._rsi_at_highs.clear() self._prev_close = None self._prev_prev_close = None self._prev_rsi = None def _rsi_conviction(self, rsi_value: float) -> float: """Map RSI value to conviction strength (0.0-1.0). For BUY (oversold): lower RSI = higher conviction. For SELL (overbought): higher RSI = higher conviction. Linear scale from the threshold to the extreme (0 or 100). """ if rsi_value < self._oversold: # RSI 0 -> 1.0, RSI at oversold threshold -> 0.0 return min(1.0, max(0.1, (self._oversold - rsi_value) / self._oversold)) elif rsi_value > self._overbought: # RSI 100 -> 1.0, RSI at overbought threshold -> 0.0 return min(1.0, max(0.1, (rsi_value - self._overbought) / (100.0 - self._overbought))) return 0.0 def on_candle(self, candle: Candle) -> Signal | None: self._update_filter_data(candle) self._closes.append(float(candle.close)) if len(self._closes) < self._period + 1: self._prev_prev_close = self._prev_close self._prev_close = float(candle.close) return None series = pd.Series(list(self._closes)) rsi_value = _compute_rsi(series, self._period) if rsi_value is None: self._prev_prev_close = self._prev_close self._prev_close = float(candle.close) return None close = float(candle.close) # Detect swing points for divergence if self._prev_close is not None and self._prev_prev_close is not None: # Swing low: prev_close < both neighbors if self._prev_close < self._prev_prev_close and self._prev_close < close: self._price_lows.append(self._prev_close) self._rsi_at_lows.append( self._prev_rsi if self._prev_rsi is not None else rsi_value ) # Swing high: prev_close > both neighbors if self._prev_close > self._prev_prev_close and self._prev_close > close: self._price_highs.append(self._prev_close) self._rsi_at_highs.append( self._prev_rsi if self._prev_rsi is not None else rsi_value ) # Check bullish divergence: price lower low, RSI higher low if len(self._price_lows) >= 2: if ( self._price_lows[-1] < self._price_lows[-2] and self._rsi_at_lows[-1] > self._rsi_at_lows[-2] ): signal = Signal( strategy=self.name, symbol=candle.symbol, side=OrderSide.BUY, price=candle.close, quantity=self._quantity, conviction=0.9, reason="RSI bullish divergence", ) self._prev_rsi = rsi_value self._prev_prev_close = self._prev_close self._prev_close = close return self._apply_filters(signal) # Check bearish divergence: price higher high, RSI lower high if len(self._price_highs) >= 2: if ( self._price_highs[-1] > self._price_highs[-2] and self._rsi_at_highs[-1] < self._rsi_at_highs[-2] ): signal = Signal( strategy=self.name, symbol=candle.symbol, side=OrderSide.SELL, price=candle.close, quantity=self._quantity, conviction=0.9, reason="RSI bearish divergence", ) self._prev_rsi = rsi_value self._prev_prev_close = self._prev_close self._prev_close = close return self._apply_filters(signal) # Existing oversold/overbought logic (secondary signals) if rsi_value < self._oversold: signal = Signal( strategy=self.name, symbol=candle.symbol, side=OrderSide.BUY, price=candle.close, quantity=self._quantity, conviction=self._rsi_conviction(rsi_value), reason=f"RSI {rsi_value:.2f} below oversold threshold {self._oversold}", ) self._prev_rsi = rsi_value self._prev_prev_close = self._prev_close self._prev_close = close return self._apply_filters(signal) elif rsi_value > self._overbought: signal = Signal( strategy=self.name, symbol=candle.symbol, side=OrderSide.SELL, price=candle.close, quantity=self._quantity, conviction=self._rsi_conviction(rsi_value), reason=f"RSI {rsi_value:.2f} above overbought threshold {self._overbought}", ) self._prev_rsi = rsi_value self._prev_prev_close = self._prev_close self._prev_close = close return self._apply_filters(signal) self._prev_rsi = rsi_value self._prev_prev_close = self._prev_close self._prev_close = close return None