diff options
Diffstat (limited to 'services/strategy-engine/strategies')
| -rw-r--r-- | services/strategy-engine/strategies/config/volume_profile_strategy.yaml | 4 | ||||
| -rw-r--r-- | services/strategy-engine/strategies/volume_profile_strategy.py | 135 |
2 files changed, 139 insertions, 0 deletions
diff --git a/services/strategy-engine/strategies/config/volume_profile_strategy.yaml b/services/strategy-engine/strategies/config/volume_profile_strategy.yaml new file mode 100644 index 0000000..635b000 --- /dev/null +++ b/services/strategy-engine/strategies/config/volume_profile_strategy.yaml @@ -0,0 +1,4 @@ +lookback_period: 100 +num_bins: 50 +value_area_pct: 0.7 +quantity: "0.01" diff --git a/services/strategy-engine/strategies/volume_profile_strategy.py b/services/strategy-engine/strategies/volume_profile_strategy.py new file mode 100644 index 0000000..684c33c --- /dev/null +++ b/services/strategy-engine/strategies/volume_profile_strategy.py @@ -0,0 +1,135 @@ +from collections import deque +from decimal import Decimal + +import numpy as np + +from shared.models import Candle, Signal, OrderSide +from strategies.base import BaseStrategy + + +class VolumeProfileStrategy(BaseStrategy): + name: str = "volume_profile" + + def __init__(self) -> None: + self._lookback_period: int = 100 + self._num_bins: int = 50 + self._value_area_pct: float = 0.7 + self._quantity: Decimal = Decimal("0.01") + self._candles: deque[tuple[float, float]] = deque(maxlen=500) + self._was_below_va: bool = False + self._was_above_va: bool = False + + @property + def warmup_period(self) -> int: + return self._lookback_period + + def configure(self, params: dict) -> None: + self._lookback_period = int(params.get("lookback_period", 100)) + self._num_bins = int(params.get("num_bins", 50)) + self._value_area_pct = float(params.get("value_area_pct", 0.7)) + self._quantity = Decimal(str(params.get("quantity", "0.01"))) + + def reset(self) -> None: + self._candles.clear() + self._was_below_va = False + self._was_above_va = False + + def _compute_value_area(self) -> tuple[float, float, float] | None: + data = list(self._candles) + if len(data) < self._lookback_period: + return None + + recent = data[-self._lookback_period:] + prices = np.array([c[0] for c in recent]) + volumes = np.array([c[1] for c in recent]) + + min_price = prices.min() + max_price = prices.max() + if min_price == max_price: + return (float(min_price), float(min_price), float(max_price)) + + bin_edges = np.linspace(min_price, max_price, self._num_bins + 1) + vol_profile = np.zeros(self._num_bins) + + for price, volume in recent: + idx = int((price - min_price) / (max_price - min_price) * self._num_bins) + idx = min(idx, self._num_bins - 1) + vol_profile[idx] += volume + + # POC: bin with max volume + poc_idx = int(np.argmax(vol_profile)) + poc = float((bin_edges[poc_idx] + bin_edges[poc_idx + 1]) / 2) + + # Value Area: expand from POC outward + total_volume = vol_profile.sum() + if total_volume == 0: + return (poc, float(bin_edges[0]), float(bin_edges[-1])) + + target_volume = self._value_area_pct * total_volume + accumulated = vol_profile[poc_idx] + low_idx = poc_idx + high_idx = poc_idx + + while accumulated < target_volume: + expand_low = low_idx > 0 + expand_high = high_idx < self._num_bins - 1 + + if not expand_low and not expand_high: + break + + low_vol = vol_profile[low_idx - 1] if expand_low else -1.0 + high_vol = vol_profile[high_idx + 1] if expand_high else -1.0 + + if low_vol >= high_vol: + low_idx -= 1 + accumulated += vol_profile[low_idx] + else: + high_idx += 1 + accumulated += vol_profile[high_idx] + + va_low = float(bin_edges[low_idx]) + va_high = float(bin_edges[high_idx + 1]) + + return (poc, va_low, va_high) + + def on_candle(self, candle: Candle) -> Signal | None: + close = float(candle.close) + volume = float(candle.volume) + self._candles.append((close, volume)) + + result = self._compute_value_area() + if result is None: + return None + + poc, va_low, va_high = result + + if close < va_low: + self._was_below_va = True + if close > va_high: + self._was_above_va = True + + # BUY: was below VA, price bounces back between va_low and poc + if self._was_below_va and va_low <= close <= poc: + self._was_below_va = False + return Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.BUY, + price=candle.close, + quantity=self._quantity, + reason=f"Price bounced from below VA low {va_low:.2f} to {close:.2f} (POC {poc:.2f})", + ) + + # SELL: was above VA, price pulls back between poc and va_high + if self._was_above_va and poc <= close <= va_high: + self._was_above_va = False + return Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.SELL, + price=candle.close, + quantity=self._quantity, + reason=f"Price rejected from above VA high {va_high:.2f} to {close:.2f} (POC {poc:.2f})", + ) + + return None |
