from collections import deque from decimal import Decimal import numpy as np from shared.models import Candle, Signal, OrderSide from strategies.base import BaseStrategy class VolumeProfileStrategy(BaseStrategy): name: str = "volume_profile" def __init__(self) -> None: super().__init__() self._lookback_period: int = 100 self._num_bins: int = 50 self._value_area_pct: float = 0.7 self._quantity: Decimal = Decimal("0.01") self._candles: deque[tuple[float, float]] = deque(maxlen=500) self._was_below_va: bool = False self._was_above_va: bool = False @property def warmup_period(self) -> int: return self._lookback_period def configure(self, params: dict) -> None: self._lookback_period = int(params.get("lookback_period", 100)) self._num_bins = int(params.get("num_bins", 50)) self._value_area_pct = float(params.get("value_area_pct", 0.7)) self._quantity = Decimal(str(params.get("quantity", "0.01"))) if self._lookback_period < 2: raise ValueError( f"Volume profile lookback_period must be >= 2, got {self._lookback_period}" ) if self._num_bins < 2: raise ValueError(f"Volume profile num_bins must be >= 2, got {self._num_bins}") if not (0 < self._value_area_pct <= 1): raise ValueError( f"Volume profile value_area_pct must be 0 < pct <= 1, got {self._value_area_pct}" ) if self._quantity <= 0: raise ValueError(f"Quantity must be positive, got {self._quantity}") def reset(self) -> None: self._candles.clear() self._was_below_va = False self._was_above_va = False def _compute_value_area(self) -> tuple[float, float, float] | None: data = list(self._candles) if len(data) < self._lookback_period: return None recent = data[-self._lookback_period :] prices = np.array([c[0] for c in recent]) min_price = prices.min() max_price = prices.max() if min_price == max_price: return (float(min_price), float(min_price), float(max_price)) bin_edges = np.linspace(min_price, max_price, self._num_bins + 1) vol_profile = np.zeros(self._num_bins) for price, volume in recent: idx = int((price - min_price) / (max_price - min_price) * self._num_bins) idx = min(idx, self._num_bins - 1) vol_profile[idx] += volume # POC: bin with max volume poc_idx = int(np.argmax(vol_profile)) poc = float((bin_edges[poc_idx] + bin_edges[poc_idx + 1]) / 2) # Value Area: expand from POC outward total_volume = vol_profile.sum() if total_volume == 0: return (poc, float(bin_edges[0]), float(bin_edges[-1])) target_volume = self._value_area_pct * total_volume accumulated = vol_profile[poc_idx] low_idx = poc_idx high_idx = poc_idx while accumulated < target_volume: expand_low = low_idx > 0 expand_high = high_idx < self._num_bins - 1 if not expand_low and not expand_high: break low_vol = vol_profile[low_idx - 1] if expand_low else -1.0 high_vol = vol_profile[high_idx + 1] if expand_high else -1.0 if low_vol >= high_vol: low_idx -= 1 accumulated += vol_profile[low_idx] else: high_idx += 1 accumulated += vol_profile[high_idx] va_low = float(bin_edges[low_idx]) va_high = float(bin_edges[high_idx + 1]) return (poc, va_low, va_high) def on_candle(self, candle: Candle) -> Signal | None: close = float(candle.close) volume = float(candle.volume) self._candles.append((close, volume)) result = self._compute_value_area() if result is None: return None poc, va_low, va_high = result if close < va_low: self._was_below_va = True if close > va_high: self._was_above_va = True # BUY: was below VA, price bounces back between va_low and poc if self._was_below_va and va_low <= close <= poc: self._was_below_va = False return Signal( strategy=self.name, symbol=candle.symbol, side=OrderSide.BUY, price=candle.close, quantity=self._quantity, reason=f"Price bounced from below VA low {va_low:.2f} to {close:.2f} (POC {poc:.2f})", ) # SELL: was above VA, price pulls back between poc and va_high if self._was_above_va and poc <= close <= va_high: self._was_above_va = False return Signal( strategy=self.name, symbol=candle.symbol, side=OrderSide.SELL, price=candle.close, quantity=self._quantity, reason=f"Price rejected from above VA high {va_high:.2f} to {close:.2f} (POC {poc:.2f})", ) return None