from collections import deque from decimal import Decimal import numpy as np from shared.models import Candle, OrderSide, Signal from strategies.base import BaseStrategy class VolumeProfileStrategy(BaseStrategy): name: str = "volume_profile" def __init__(self) -> None: super().__init__() self._lookback_period: int = 100 self._num_bins: int = 50 self._value_area_pct: float = 0.7 self._quantity: Decimal = Decimal("0.01") self._candles: deque[tuple[float, float]] = deque(maxlen=500) self._was_below_va: bool = False self._was_above_va: bool = False @property def warmup_period(self) -> int: return self._lookback_period def configure(self, params: dict) -> None: self._lookback_period = int(params.get("lookback_period", 100)) self._num_bins = int(params.get("num_bins", 50)) self._value_area_pct = float(params.get("value_area_pct", 0.7)) self._quantity = Decimal(str(params.get("quantity", "0.01"))) if self._lookback_period < 2: raise ValueError( f"Volume profile lookback_period must be >= 2, got {self._lookback_period}" ) if self._num_bins < 2: raise ValueError(f"Volume profile num_bins must be >= 2, got {self._num_bins}") if not (0 < self._value_area_pct <= 1): raise ValueError( f"Volume profile value_area_pct must be 0 < pct <= 1, got {self._value_area_pct}" ) if self._quantity <= 0: raise ValueError(f"Quantity must be positive, got {self._quantity}") self._init_filters( require_trend=False, adx_threshold=float(params.get("adx_threshold", 25.0)), min_volume_ratio=float(params.get("min_volume_ratio", 0.5)), atr_stop_multiplier=float(params.get("atr_stop_multiplier", 2.0)), atr_tp_multiplier=float(params.get("atr_tp_multiplier", 3.0)), ) def reset(self) -> None: self._candles.clear() self._was_below_va = False self._was_above_va = False def _compute_value_area(self) -> tuple[float, float, float, list[float], list[float]] | None: """Compute POC, VA low, VA high, HVN levels, LVN levels.""" data = list(self._candles) if len(data) < self._lookback_period: return None recent = data[-self._lookback_period :] prices = np.array([c[0] for c in recent]) min_price = prices.min() max_price = prices.max() if min_price == max_price: return (float(min_price), float(min_price), float(max_price), [], []) bin_edges = np.linspace(min_price, max_price, self._num_bins + 1) vol_profile = np.zeros(self._num_bins) for price, volume in recent: idx = int((price - min_price) / (max_price - min_price) * self._num_bins) idx = min(idx, self._num_bins - 1) vol_profile[idx] += volume # POC: bin with max volume poc_idx = int(np.argmax(vol_profile)) poc = float((bin_edges[poc_idx] + bin_edges[poc_idx + 1]) / 2) # Value Area: expand from POC outward total_volume = vol_profile.sum() if total_volume == 0: return (poc, float(bin_edges[0]), float(bin_edges[-1]), [], []) target_volume = self._value_area_pct * total_volume accumulated = vol_profile[poc_idx] low_idx = poc_idx high_idx = poc_idx while accumulated < target_volume: expand_low = low_idx > 0 expand_high = high_idx < self._num_bins - 1 if not expand_low and not expand_high: break low_vol = vol_profile[low_idx - 1] if expand_low else -1.0 high_vol = vol_profile[high_idx + 1] if expand_high else -1.0 if low_vol >= high_vol: low_idx -= 1 accumulated += vol_profile[low_idx] else: high_idx += 1 accumulated += vol_profile[high_idx] va_low = float(bin_edges[low_idx]) va_high = float(bin_edges[high_idx + 1]) # HVN/LVN detection mean_vol = vol_profile.mean() std_vol = vol_profile.std() hvn_levels: list[float] = [] lvn_levels: list[float] = [] for i in range(len(vol_profile)): mid = float((bin_edges[i] + bin_edges[i + 1]) / 2) if vol_profile[i] > mean_vol + std_vol: hvn_levels.append(mid) elif vol_profile[i] < mean_vol - 0.5 * std_vol and vol_profile[i] > 0: lvn_levels.append(mid) return (poc, va_low, va_high, hvn_levels, lvn_levels) def on_candle(self, candle: Candle) -> Signal | None: self._update_filter_data(candle) close = float(candle.close) volume = float(candle.volume) self._candles.append((close, volume)) result = self._compute_value_area() if result is None: return None poc, va_low, va_high, hvn_levels, _lvn_levels = result if close < va_low: self._was_below_va = True if close > va_high: self._was_above_va = True # HVN bounce signals (stronger than regular VA bounces) for hvn in hvn_levels: if abs(close - hvn) / hvn < 0.005: # Within 0.5% of HVN if self._was_below_va and close >= va_low: self._was_below_va = False signal = Signal( strategy=self.name, symbol=candle.symbol, side=OrderSide.BUY, price=candle.close, quantity=self._quantity, conviction=0.85, reason=f"Price near HVN {hvn:.2f}, bounced from below VA low {va_low:.2f} to {close:.2f}", ) return self._apply_filters(signal) if self._was_above_va and close <= va_high: self._was_above_va = False signal = Signal( strategy=self.name, symbol=candle.symbol, side=OrderSide.SELL, price=candle.close, quantity=self._quantity, conviction=0.85, reason=f"Price near HVN {hvn:.2f}, rejected from above VA high {va_high:.2f} to {close:.2f}", ) return self._apply_filters(signal) # BUY: was below VA, price bounces back between va_low and poc if self._was_below_va and va_low <= close <= poc: self._was_below_va = False signal = Signal( strategy=self.name, symbol=candle.symbol, side=OrderSide.BUY, price=candle.close, quantity=self._quantity, conviction=0.6, reason=f"Price bounced from below VA low {va_low:.2f} to {close:.2f} (POC {poc:.2f})", ) return self._apply_filters(signal) # SELL: was above VA, price pulls back between poc and va_high if self._was_above_va and poc <= close <= va_high: self._was_above_va = False signal = Signal( strategy=self.name, symbol=candle.symbol, side=OrderSide.SELL, price=candle.close, quantity=self._quantity, conviction=0.6, reason=f"Price rejected from above VA high {va_high:.2f} to {close:.2f} (POC {poc:.2f})", ) return self._apply_filters(signal) return None