summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-02 09:17:54 +0900
committerTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-02 09:17:54 +0900
commit828682de5904c8c1d05664a961f7931ebe60fabd (patch)
treee4a5075c7a5881406785b95f6d1912399332419a
parent71e5942632a5a8c7cd555b2d52e5632a67186a8d (diff)
feat(strategy): add Volume Profile HVN/LVN and Combined adaptive weighting
-rw-r--r--services/strategy-engine/strategies/combined_strategy.py39
-rw-r--r--services/strategy-engine/strategies/volume_profile_strategy.py52
-rw-r--r--services/strategy-engine/tests/test_combined_strategy.py57
-rw-r--r--services/strategy-engine/tests/test_volume_profile_strategy.py53
4 files changed, 191 insertions, 10 deletions
diff --git a/services/strategy-engine/strategies/combined_strategy.py b/services/strategy-engine/strategies/combined_strategy.py
index be1cbed..907d9c5 100644
--- a/services/strategy-engine/strategies/combined_strategy.py
+++ b/services/strategy-engine/strategies/combined_strategy.py
@@ -20,6 +20,9 @@ class CombinedStrategy(BaseStrategy):
self._strategies: list[tuple[BaseStrategy, float]] = [] # (strategy, weight)
self._threshold: float = 0.5
self._quantity: Decimal = Decimal("0.01")
+ self._trade_history: dict[str, list[bool]] = {} # strategy_name -> [win, loss, ...]
+ self._adaptive_weights: bool = False
+ self._history_window: int = 20 # Last N signals to evaluate
@property
def warmup_period(self) -> int:
@@ -30,6 +33,8 @@ class CombinedStrategy(BaseStrategy):
def configure(self, params: dict) -> None:
self._threshold = float(params.get("threshold", 0.5))
self._quantity = Decimal(str(params.get("quantity", "0.01")))
+ self._adaptive_weights = bool(params.get("adaptive_weights", False))
+ self._history_window = int(params.get("history_window", 20))
if self._threshold <= 0:
raise ValueError(f"Threshold must be positive, got {self._threshold}")
if self._quantity <= 0:
@@ -41,6 +46,29 @@ class CombinedStrategy(BaseStrategy):
raise ValueError(f"Weight must be positive, got {weight}")
self._strategies.append((strategy, weight))
+ def record_result(self, strategy_name: str, is_win: bool) -> None:
+ """Record a trade result for adaptive weighting."""
+ if strategy_name not in self._trade_history:
+ self._trade_history[strategy_name] = []
+ self._trade_history[strategy_name].append(is_win)
+ # Keep only last N results
+ if len(self._trade_history[strategy_name]) > self._history_window:
+ self._trade_history[strategy_name] = self._trade_history[strategy_name][-self._history_window:]
+
+ def _get_adaptive_weight(self, strategy_name: str, base_weight: float) -> float:
+ """Get weight adjusted by recent performance."""
+ if not self._adaptive_weights:
+ return base_weight
+
+ history = self._trade_history.get(strategy_name, [])
+ if len(history) < 5: # Not enough data, use base weight
+ return base_weight
+
+ win_rate = sum(1 for w in history if w) / len(history)
+ # Scale weight: 0.5x at 20% win rate, 1.0x at 50%, 1.5x at 80%
+ scale = 0.5 + win_rate # Range: 0.5 to 1.5
+ return base_weight * scale
+
def reset(self) -> None:
for strategy, _ in self._strategies:
strategy.reset()
@@ -49,7 +77,7 @@ class CombinedStrategy(BaseStrategy):
if not self._strategies:
return None
- total_weight = sum(w for _, w in self._strategies)
+ total_weight = sum(self._get_adaptive_weight(s.name, w) for s, w in self._strategies)
if total_weight == 0:
return None
@@ -59,12 +87,13 @@ class CombinedStrategy(BaseStrategy):
for strategy, weight in self._strategies:
signal = strategy.on_candle(candle)
if signal is not None:
+ effective_weight = self._get_adaptive_weight(strategy.name, weight)
if signal.side == OrderSide.BUY:
- score += weight * signal.conviction
- reasons.append(f"{strategy.name}:BUY({weight}*{signal.conviction:.2f})")
+ score += effective_weight * signal.conviction
+ reasons.append(f"{strategy.name}:BUY({effective_weight}*{signal.conviction:.2f})")
elif signal.side == OrderSide.SELL:
- score -= weight * signal.conviction
- reasons.append(f"{strategy.name}:SELL({weight}*{signal.conviction:.2f})")
+ score -= effective_weight * signal.conviction
+ reasons.append(f"{strategy.name}:SELL({effective_weight}*{signal.conviction:.2f})")
normalized = score / total_weight # Range: -1.0 to 1.0
diff --git a/services/strategy-engine/strategies/volume_profile_strategy.py b/services/strategy-engine/strategies/volume_profile_strategy.py
index 324f1c2..ef2ae14 100644
--- a/services/strategy-engine/strategies/volume_profile_strategy.py
+++ b/services/strategy-engine/strategies/volume_profile_strategy.py
@@ -56,7 +56,8 @@ class VolumeProfileStrategy(BaseStrategy):
self._was_below_va = False
self._was_above_va = False
- def _compute_value_area(self) -> tuple[float, float, float] | None:
+ def _compute_value_area(self) -> tuple[float, float, float, list[float], list[float]] | None:
+ """Compute POC, VA low, VA high, HVN levels, LVN levels."""
data = list(self._candles)
if len(data) < self._lookback_period:
return None
@@ -67,7 +68,7 @@ class VolumeProfileStrategy(BaseStrategy):
min_price = prices.min()
max_price = prices.max()
if min_price == max_price:
- return (float(min_price), float(min_price), float(max_price))
+ return (float(min_price), float(min_price), float(max_price), [], [])
bin_edges = np.linspace(min_price, max_price, self._num_bins + 1)
vol_profile = np.zeros(self._num_bins)
@@ -84,7 +85,7 @@ class VolumeProfileStrategy(BaseStrategy):
# Value Area: expand from POC outward
total_volume = vol_profile.sum()
if total_volume == 0:
- return (poc, float(bin_edges[0]), float(bin_edges[-1]))
+ return (poc, float(bin_edges[0]), float(bin_edges[-1]), [], [])
target_volume = self._value_area_pct * total_volume
accumulated = vol_profile[poc_idx]
@@ -111,7 +112,20 @@ class VolumeProfileStrategy(BaseStrategy):
va_low = float(bin_edges[low_idx])
va_high = float(bin_edges[high_idx + 1])
- return (poc, va_low, va_high)
+ # HVN/LVN detection
+ mean_vol = vol_profile.mean()
+ std_vol = vol_profile.std()
+
+ hvn_levels: list[float] = []
+ lvn_levels: list[float] = []
+ for i in range(len(vol_profile)):
+ mid = float((bin_edges[i] + bin_edges[i + 1]) / 2)
+ if vol_profile[i] > mean_vol + std_vol:
+ hvn_levels.append(mid)
+ elif vol_profile[i] < mean_vol - 0.5 * std_vol and vol_profile[i] > 0:
+ lvn_levels.append(mid)
+
+ return (poc, va_low, va_high, hvn_levels, lvn_levels)
def on_candle(self, candle: Candle) -> Signal | None:
self._update_filter_data(candle)
@@ -123,13 +137,41 @@ class VolumeProfileStrategy(BaseStrategy):
if result is None:
return None
- poc, va_low, va_high = result
+ poc, va_low, va_high, hvn_levels, lvn_levels = result
if close < va_low:
self._was_below_va = True
if close > va_high:
self._was_above_va = True
+ # HVN bounce signals (stronger than regular VA bounces)
+ for hvn in hvn_levels:
+ if abs(close - hvn) / hvn < 0.005: # Within 0.5% of HVN
+ if self._was_below_va and close >= va_low:
+ self._was_below_va = False
+ signal = Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.BUY,
+ price=candle.close,
+ quantity=self._quantity,
+ conviction=0.85,
+ reason=f"Price near HVN {hvn:.2f}, bounced from below VA low {va_low:.2f} to {close:.2f}",
+ )
+ return self._apply_filters(signal)
+ if self._was_above_va and close <= va_high:
+ self._was_above_va = False
+ signal = Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.SELL,
+ price=candle.close,
+ quantity=self._quantity,
+ conviction=0.85,
+ reason=f"Price near HVN {hvn:.2f}, rejected from above VA high {va_high:.2f} to {close:.2f}",
+ )
+ return self._apply_filters(signal)
+
# BUY: was below VA, price bounces back between va_low and poc
if self._was_below_va and va_low <= close <= poc:
self._was_below_va = False
diff --git a/services/strategy-engine/tests/test_combined_strategy.py b/services/strategy-engine/tests/test_combined_strategy.py
index 3408a89..20a572e 100644
--- a/services/strategy-engine/tests/test_combined_strategy.py
+++ b/services/strategy-engine/tests/test_combined_strategy.py
@@ -167,3 +167,60 @@ def test_combined_invalid_weight():
c.configure({})
with pytest.raises(ValueError):
c.add_strategy(AlwaysBuyStrategy(), weight=-1.0)
+
+
+def test_combined_record_result():
+ """Verify trade history tracking works correctly."""
+ c = CombinedStrategy()
+ c.configure({"adaptive_weights": True, "history_window": 5})
+
+ c.record_result("test_strat", True)
+ c.record_result("test_strat", False)
+ c.record_result("test_strat", True)
+
+ assert len(c._trade_history["test_strat"]) == 3
+ assert c._trade_history["test_strat"] == [True, False, True]
+
+ # Fill beyond window size to test trimming
+ for _ in range(5):
+ c.record_result("test_strat", False)
+
+ assert len(c._trade_history["test_strat"]) == 5 # Trimmed to history_window
+
+
+def test_combined_adaptive_weight_increases_for_winners():
+ """Strategy with high win rate gets higher effective weight."""
+ c = CombinedStrategy()
+ c.configure({"threshold": 0.3, "adaptive_weights": True, "history_window": 20})
+ c.add_strategy(AlwaysBuyStrategy(), weight=1.0)
+
+ # Record high win rate for always_buy (80% wins)
+ for _ in range(8):
+ c.record_result("always_buy", True)
+ for _ in range(2):
+ c.record_result("always_buy", False)
+
+ # Adaptive weight should be > base weight (1.0)
+ adaptive_w = c._get_adaptive_weight("always_buy", 1.0)
+ assert adaptive_w > 1.0
+ # 80% win rate -> scale = 0.5 + 0.8 = 1.3 -> weight = 1.3
+ assert abs(adaptive_w - 1.3) < 0.01
+
+
+def test_combined_adaptive_weight_decreases_for_losers():
+ """Strategy with low win rate gets lower effective weight."""
+ c = CombinedStrategy()
+ c.configure({"threshold": 0.3, "adaptive_weights": True, "history_window": 20})
+ c.add_strategy(AlwaysBuyStrategy(), weight=1.0)
+
+ # Record low win rate for always_buy (20% wins)
+ for _ in range(2):
+ c.record_result("always_buy", True)
+ for _ in range(8):
+ c.record_result("always_buy", False)
+
+ # Adaptive weight should be < base weight (1.0)
+ adaptive_w = c._get_adaptive_weight("always_buy", 1.0)
+ assert adaptive_w < 1.0
+ # 20% win rate -> scale = 0.5 + 0.2 = 0.7 -> weight = 0.7
+ assert abs(adaptive_w - 0.7) < 0.01
diff --git a/services/strategy-engine/tests/test_volume_profile_strategy.py b/services/strategy-engine/tests/test_volume_profile_strategy.py
index 71f0eca..f40261c 100644
--- a/services/strategy-engine/tests/test_volume_profile_strategy.py
+++ b/services/strategy-engine/tests/test_volume_profile_strategy.py
@@ -125,3 +125,56 @@ def test_volume_profile_reset_clears_state():
# After reset, should not have enough data
result = strategy.on_candle(make_candle(100.0, 10.0))
assert result is None
+
+
+def test_volume_profile_hvn_detection():
+ """Feed clustered volume at specific price levels to produce HVN nodes."""
+ strategy = VolumeProfileStrategy()
+ strategy.configure({"lookback_period": 20, "num_bins": 10, "value_area_pct": 0.7})
+
+ # Create a profile with very high volume at price ~100 and low volume elsewhere
+ # Prices range from 90 to 110, heavy volume concentrated at 100
+ candles_data = []
+ # Low volume at extremes
+ for p in [90, 91, 92, 109, 110]:
+ candles_data.append((p, 1.0))
+ # Very high volume around 100
+ for _ in range(15):
+ candles_data.append((100, 100.0))
+
+ for price, vol in candles_data:
+ strategy.on_candle(make_candle(price, vol))
+
+ # Access the internal method to verify HVN detection
+ result = strategy._compute_value_area()
+ assert result is not None
+ poc, va_low, va_high, hvn_levels, lvn_levels = result
+
+ # The bin containing price ~100 should have very high volume -> HVN
+ assert len(hvn_levels) > 0
+ # At least one HVN should be near 100
+ assert any(abs(h - 100) < 5 for h in hvn_levels)
+
+
+def test_volume_profile_reset_thorough():
+ """Verify all state is cleared on reset."""
+ strategy = VolumeProfileStrategy()
+ strategy.configure({"lookback_period": 10, "num_bins": 5})
+
+ # Build up state
+ for _ in range(10):
+ strategy.on_candle(make_candle(100.0, 10.0))
+ # Set below/above VA flags
+ strategy.on_candle(make_candle(50.0, 1.0)) # below VA
+ strategy.on_candle(make_candle(200.0, 1.0)) # above VA
+
+ strategy.reset()
+
+ # Verify all state cleared
+ assert len(strategy._candles) == 0
+ assert strategy._was_below_va is False
+ assert strategy._was_above_va is False
+
+ # Should not produce signal since no data
+ result = strategy.on_candle(make_candle(100.0, 10.0))
+ assert result is None