diff options
| author | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-02 09:17:54 +0900 |
|---|---|---|
| committer | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-02 09:17:54 +0900 |
| commit | 828682de5904c8c1d05664a961f7931ebe60fabd (patch) | |
| tree | e4a5075c7a5881406785b95f6d1912399332419a /services/strategy-engine/strategies | |
| parent | 71e5942632a5a8c7cd555b2d52e5632a67186a8d (diff) | |
feat(strategy): add Volume Profile HVN/LVN and Combined adaptive weighting
Diffstat (limited to 'services/strategy-engine/strategies')
| -rw-r--r-- | services/strategy-engine/strategies/combined_strategy.py | 39 | ||||
| -rw-r--r-- | services/strategy-engine/strategies/volume_profile_strategy.py | 52 |
2 files changed, 81 insertions, 10 deletions
diff --git a/services/strategy-engine/strategies/combined_strategy.py b/services/strategy-engine/strategies/combined_strategy.py index be1cbed..907d9c5 100644 --- a/services/strategy-engine/strategies/combined_strategy.py +++ b/services/strategy-engine/strategies/combined_strategy.py @@ -20,6 +20,9 @@ class CombinedStrategy(BaseStrategy): self._strategies: list[tuple[BaseStrategy, float]] = [] # (strategy, weight) self._threshold: float = 0.5 self._quantity: Decimal = Decimal("0.01") + self._trade_history: dict[str, list[bool]] = {} # strategy_name -> [win, loss, ...] + self._adaptive_weights: bool = False + self._history_window: int = 20 # Last N signals to evaluate @property def warmup_period(self) -> int: @@ -30,6 +33,8 @@ class CombinedStrategy(BaseStrategy): def configure(self, params: dict) -> None: self._threshold = float(params.get("threshold", 0.5)) self._quantity = Decimal(str(params.get("quantity", "0.01"))) + self._adaptive_weights = bool(params.get("adaptive_weights", False)) + self._history_window = int(params.get("history_window", 20)) if self._threshold <= 0: raise ValueError(f"Threshold must be positive, got {self._threshold}") if self._quantity <= 0: @@ -41,6 +46,29 @@ class CombinedStrategy(BaseStrategy): raise ValueError(f"Weight must be positive, got {weight}") self._strategies.append((strategy, weight)) + def record_result(self, strategy_name: str, is_win: bool) -> None: + """Record a trade result for adaptive weighting.""" + if strategy_name not in self._trade_history: + self._trade_history[strategy_name] = [] + self._trade_history[strategy_name].append(is_win) + # Keep only last N results + if len(self._trade_history[strategy_name]) > self._history_window: + self._trade_history[strategy_name] = self._trade_history[strategy_name][-self._history_window:] + + def _get_adaptive_weight(self, strategy_name: str, base_weight: float) -> float: + """Get weight adjusted by recent performance.""" + if not self._adaptive_weights: + return base_weight + + history = self._trade_history.get(strategy_name, []) + if len(history) < 5: # Not enough data, use base weight + return base_weight + + win_rate = sum(1 for w in history if w) / len(history) + # Scale weight: 0.5x at 20% win rate, 1.0x at 50%, 1.5x at 80% + scale = 0.5 + win_rate # Range: 0.5 to 1.5 + return base_weight * scale + def reset(self) -> None: for strategy, _ in self._strategies: strategy.reset() @@ -49,7 +77,7 @@ class CombinedStrategy(BaseStrategy): if not self._strategies: return None - total_weight = sum(w for _, w in self._strategies) + total_weight = sum(self._get_adaptive_weight(s.name, w) for s, w in self._strategies) if total_weight == 0: return None @@ -59,12 +87,13 @@ class CombinedStrategy(BaseStrategy): for strategy, weight in self._strategies: signal = strategy.on_candle(candle) if signal is not None: + effective_weight = self._get_adaptive_weight(strategy.name, weight) if signal.side == OrderSide.BUY: - score += weight * signal.conviction - reasons.append(f"{strategy.name}:BUY({weight}*{signal.conviction:.2f})") + score += effective_weight * signal.conviction + reasons.append(f"{strategy.name}:BUY({effective_weight}*{signal.conviction:.2f})") elif signal.side == OrderSide.SELL: - score -= weight * signal.conviction - reasons.append(f"{strategy.name}:SELL({weight}*{signal.conviction:.2f})") + score -= effective_weight * signal.conviction + reasons.append(f"{strategy.name}:SELL({effective_weight}*{signal.conviction:.2f})") normalized = score / total_weight # Range: -1.0 to 1.0 diff --git a/services/strategy-engine/strategies/volume_profile_strategy.py b/services/strategy-engine/strategies/volume_profile_strategy.py index 324f1c2..ef2ae14 100644 --- a/services/strategy-engine/strategies/volume_profile_strategy.py +++ b/services/strategy-engine/strategies/volume_profile_strategy.py @@ -56,7 +56,8 @@ class VolumeProfileStrategy(BaseStrategy): self._was_below_va = False self._was_above_va = False - def _compute_value_area(self) -> tuple[float, float, float] | None: + def _compute_value_area(self) -> tuple[float, float, float, list[float], list[float]] | None: + """Compute POC, VA low, VA high, HVN levels, LVN levels.""" data = list(self._candles) if len(data) < self._lookback_period: return None @@ -67,7 +68,7 @@ class VolumeProfileStrategy(BaseStrategy): min_price = prices.min() max_price = prices.max() if min_price == max_price: - return (float(min_price), float(min_price), float(max_price)) + return (float(min_price), float(min_price), float(max_price), [], []) bin_edges = np.linspace(min_price, max_price, self._num_bins + 1) vol_profile = np.zeros(self._num_bins) @@ -84,7 +85,7 @@ class VolumeProfileStrategy(BaseStrategy): # Value Area: expand from POC outward total_volume = vol_profile.sum() if total_volume == 0: - return (poc, float(bin_edges[0]), float(bin_edges[-1])) + return (poc, float(bin_edges[0]), float(bin_edges[-1]), [], []) target_volume = self._value_area_pct * total_volume accumulated = vol_profile[poc_idx] @@ -111,7 +112,20 @@ class VolumeProfileStrategy(BaseStrategy): va_low = float(bin_edges[low_idx]) va_high = float(bin_edges[high_idx + 1]) - return (poc, va_low, va_high) + # HVN/LVN detection + mean_vol = vol_profile.mean() + std_vol = vol_profile.std() + + hvn_levels: list[float] = [] + lvn_levels: list[float] = [] + for i in range(len(vol_profile)): + mid = float((bin_edges[i] + bin_edges[i + 1]) / 2) + if vol_profile[i] > mean_vol + std_vol: + hvn_levels.append(mid) + elif vol_profile[i] < mean_vol - 0.5 * std_vol and vol_profile[i] > 0: + lvn_levels.append(mid) + + return (poc, va_low, va_high, hvn_levels, lvn_levels) def on_candle(self, candle: Candle) -> Signal | None: self._update_filter_data(candle) @@ -123,13 +137,41 @@ class VolumeProfileStrategy(BaseStrategy): if result is None: return None - poc, va_low, va_high = result + poc, va_low, va_high, hvn_levels, lvn_levels = result if close < va_low: self._was_below_va = True if close > va_high: self._was_above_va = True + # HVN bounce signals (stronger than regular VA bounces) + for hvn in hvn_levels: + if abs(close - hvn) / hvn < 0.005: # Within 0.5% of HVN + if self._was_below_va and close >= va_low: + self._was_below_va = False + signal = Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.BUY, + price=candle.close, + quantity=self._quantity, + conviction=0.85, + reason=f"Price near HVN {hvn:.2f}, bounced from below VA low {va_low:.2f} to {close:.2f}", + ) + return self._apply_filters(signal) + if self._was_above_va and close <= va_high: + self._was_above_va = False + signal = Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.SELL, + price=candle.close, + quantity=self._quantity, + conviction=0.85, + reason=f"Price near HVN {hvn:.2f}, rejected from above VA high {va_high:.2f} to {close:.2f}", + ) + return self._apply_filters(signal) + # BUY: was below VA, price bounces back between va_low and poc if self._was_below_va and va_low <= close <= poc: self._was_below_va = False |
