diff options
Diffstat (limited to 'services/strategy-engine')
4 files changed, 191 insertions, 10 deletions
diff --git a/services/strategy-engine/strategies/combined_strategy.py b/services/strategy-engine/strategies/combined_strategy.py index be1cbed..907d9c5 100644 --- a/services/strategy-engine/strategies/combined_strategy.py +++ b/services/strategy-engine/strategies/combined_strategy.py @@ -20,6 +20,9 @@ class CombinedStrategy(BaseStrategy): self._strategies: list[tuple[BaseStrategy, float]] = [] # (strategy, weight) self._threshold: float = 0.5 self._quantity: Decimal = Decimal("0.01") + self._trade_history: dict[str, list[bool]] = {} # strategy_name -> [win, loss, ...] + self._adaptive_weights: bool = False + self._history_window: int = 20 # Last N signals to evaluate @property def warmup_period(self) -> int: @@ -30,6 +33,8 @@ class CombinedStrategy(BaseStrategy): def configure(self, params: dict) -> None: self._threshold = float(params.get("threshold", 0.5)) self._quantity = Decimal(str(params.get("quantity", "0.01"))) + self._adaptive_weights = bool(params.get("adaptive_weights", False)) + self._history_window = int(params.get("history_window", 20)) if self._threshold <= 0: raise ValueError(f"Threshold must be positive, got {self._threshold}") if self._quantity <= 0: @@ -41,6 +46,29 @@ class CombinedStrategy(BaseStrategy): raise ValueError(f"Weight must be positive, got {weight}") self._strategies.append((strategy, weight)) + def record_result(self, strategy_name: str, is_win: bool) -> None: + """Record a trade result for adaptive weighting.""" + if strategy_name not in self._trade_history: + self._trade_history[strategy_name] = [] + self._trade_history[strategy_name].append(is_win) + # Keep only last N results + if len(self._trade_history[strategy_name]) > self._history_window: + self._trade_history[strategy_name] = self._trade_history[strategy_name][-self._history_window:] + + def _get_adaptive_weight(self, strategy_name: str, base_weight: float) -> float: + """Get weight adjusted by recent performance.""" + if not self._adaptive_weights: + return base_weight + + history = self._trade_history.get(strategy_name, []) + if len(history) < 5: # Not enough data, use base weight + return base_weight + + win_rate = sum(1 for w in history if w) / len(history) + # Scale weight: 0.5x at 20% win rate, 1.0x at 50%, 1.5x at 80% + scale = 0.5 + win_rate # Range: 0.5 to 1.5 + return base_weight * scale + def reset(self) -> None: for strategy, _ in self._strategies: strategy.reset() @@ -49,7 +77,7 @@ class CombinedStrategy(BaseStrategy): if not self._strategies: return None - total_weight = sum(w for _, w in self._strategies) + total_weight = sum(self._get_adaptive_weight(s.name, w) for s, w in self._strategies) if total_weight == 0: return None @@ -59,12 +87,13 @@ class CombinedStrategy(BaseStrategy): for strategy, weight in self._strategies: signal = strategy.on_candle(candle) if signal is not None: + effective_weight = self._get_adaptive_weight(strategy.name, weight) if signal.side == OrderSide.BUY: - score += weight * signal.conviction - reasons.append(f"{strategy.name}:BUY({weight}*{signal.conviction:.2f})") + score += effective_weight * signal.conviction + reasons.append(f"{strategy.name}:BUY({effective_weight}*{signal.conviction:.2f})") elif signal.side == OrderSide.SELL: - score -= weight * signal.conviction - reasons.append(f"{strategy.name}:SELL({weight}*{signal.conviction:.2f})") + score -= effective_weight * signal.conviction + reasons.append(f"{strategy.name}:SELL({effective_weight}*{signal.conviction:.2f})") normalized = score / total_weight # Range: -1.0 to 1.0 diff --git a/services/strategy-engine/strategies/volume_profile_strategy.py b/services/strategy-engine/strategies/volume_profile_strategy.py index 324f1c2..ef2ae14 100644 --- a/services/strategy-engine/strategies/volume_profile_strategy.py +++ b/services/strategy-engine/strategies/volume_profile_strategy.py @@ -56,7 +56,8 @@ class VolumeProfileStrategy(BaseStrategy): self._was_below_va = False self._was_above_va = False - def _compute_value_area(self) -> tuple[float, float, float] | None: + def _compute_value_area(self) -> tuple[float, float, float, list[float], list[float]] | None: + """Compute POC, VA low, VA high, HVN levels, LVN levels.""" data = list(self._candles) if len(data) < self._lookback_period: return None @@ -67,7 +68,7 @@ class VolumeProfileStrategy(BaseStrategy): min_price = prices.min() max_price = prices.max() if min_price == max_price: - return (float(min_price), float(min_price), float(max_price)) + return (float(min_price), float(min_price), float(max_price), [], []) bin_edges = np.linspace(min_price, max_price, self._num_bins + 1) vol_profile = np.zeros(self._num_bins) @@ -84,7 +85,7 @@ class VolumeProfileStrategy(BaseStrategy): # Value Area: expand from POC outward total_volume = vol_profile.sum() if total_volume == 0: - return (poc, float(bin_edges[0]), float(bin_edges[-1])) + return (poc, float(bin_edges[0]), float(bin_edges[-1]), [], []) target_volume = self._value_area_pct * total_volume accumulated = vol_profile[poc_idx] @@ -111,7 +112,20 @@ class VolumeProfileStrategy(BaseStrategy): va_low = float(bin_edges[low_idx]) va_high = float(bin_edges[high_idx + 1]) - return (poc, va_low, va_high) + # HVN/LVN detection + mean_vol = vol_profile.mean() + std_vol = vol_profile.std() + + hvn_levels: list[float] = [] + lvn_levels: list[float] = [] + for i in range(len(vol_profile)): + mid = float((bin_edges[i] + bin_edges[i + 1]) / 2) + if vol_profile[i] > mean_vol + std_vol: + hvn_levels.append(mid) + elif vol_profile[i] < mean_vol - 0.5 * std_vol and vol_profile[i] > 0: + lvn_levels.append(mid) + + return (poc, va_low, va_high, hvn_levels, lvn_levels) def on_candle(self, candle: Candle) -> Signal | None: self._update_filter_data(candle) @@ -123,13 +137,41 @@ class VolumeProfileStrategy(BaseStrategy): if result is None: return None - poc, va_low, va_high = result + poc, va_low, va_high, hvn_levels, lvn_levels = result if close < va_low: self._was_below_va = True if close > va_high: self._was_above_va = True + # HVN bounce signals (stronger than regular VA bounces) + for hvn in hvn_levels: + if abs(close - hvn) / hvn < 0.005: # Within 0.5% of HVN + if self._was_below_va and close >= va_low: + self._was_below_va = False + signal = Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.BUY, + price=candle.close, + quantity=self._quantity, + conviction=0.85, + reason=f"Price near HVN {hvn:.2f}, bounced from below VA low {va_low:.2f} to {close:.2f}", + ) + return self._apply_filters(signal) + if self._was_above_va and close <= va_high: + self._was_above_va = False + signal = Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.SELL, + price=candle.close, + quantity=self._quantity, + conviction=0.85, + reason=f"Price near HVN {hvn:.2f}, rejected from above VA high {va_high:.2f} to {close:.2f}", + ) + return self._apply_filters(signal) + # BUY: was below VA, price bounces back between va_low and poc if self._was_below_va and va_low <= close <= poc: self._was_below_va = False diff --git a/services/strategy-engine/tests/test_combined_strategy.py b/services/strategy-engine/tests/test_combined_strategy.py index 3408a89..20a572e 100644 --- a/services/strategy-engine/tests/test_combined_strategy.py +++ b/services/strategy-engine/tests/test_combined_strategy.py @@ -167,3 +167,60 @@ def test_combined_invalid_weight(): c.configure({}) with pytest.raises(ValueError): c.add_strategy(AlwaysBuyStrategy(), weight=-1.0) + + +def test_combined_record_result(): + """Verify trade history tracking works correctly.""" + c = CombinedStrategy() + c.configure({"adaptive_weights": True, "history_window": 5}) + + c.record_result("test_strat", True) + c.record_result("test_strat", False) + c.record_result("test_strat", True) + + assert len(c._trade_history["test_strat"]) == 3 + assert c._trade_history["test_strat"] == [True, False, True] + + # Fill beyond window size to test trimming + for _ in range(5): + c.record_result("test_strat", False) + + assert len(c._trade_history["test_strat"]) == 5 # Trimmed to history_window + + +def test_combined_adaptive_weight_increases_for_winners(): + """Strategy with high win rate gets higher effective weight.""" + c = CombinedStrategy() + c.configure({"threshold": 0.3, "adaptive_weights": True, "history_window": 20}) + c.add_strategy(AlwaysBuyStrategy(), weight=1.0) + + # Record high win rate for always_buy (80% wins) + for _ in range(8): + c.record_result("always_buy", True) + for _ in range(2): + c.record_result("always_buy", False) + + # Adaptive weight should be > base weight (1.0) + adaptive_w = c._get_adaptive_weight("always_buy", 1.0) + assert adaptive_w > 1.0 + # 80% win rate -> scale = 0.5 + 0.8 = 1.3 -> weight = 1.3 + assert abs(adaptive_w - 1.3) < 0.01 + + +def test_combined_adaptive_weight_decreases_for_losers(): + """Strategy with low win rate gets lower effective weight.""" + c = CombinedStrategy() + c.configure({"threshold": 0.3, "adaptive_weights": True, "history_window": 20}) + c.add_strategy(AlwaysBuyStrategy(), weight=1.0) + + # Record low win rate for always_buy (20% wins) + for _ in range(2): + c.record_result("always_buy", True) + for _ in range(8): + c.record_result("always_buy", False) + + # Adaptive weight should be < base weight (1.0) + adaptive_w = c._get_adaptive_weight("always_buy", 1.0) + assert adaptive_w < 1.0 + # 20% win rate -> scale = 0.5 + 0.2 = 0.7 -> weight = 0.7 + assert abs(adaptive_w - 0.7) < 0.01 diff --git a/services/strategy-engine/tests/test_volume_profile_strategy.py b/services/strategy-engine/tests/test_volume_profile_strategy.py index 71f0eca..f40261c 100644 --- a/services/strategy-engine/tests/test_volume_profile_strategy.py +++ b/services/strategy-engine/tests/test_volume_profile_strategy.py @@ -125,3 +125,56 @@ def test_volume_profile_reset_clears_state(): # After reset, should not have enough data result = strategy.on_candle(make_candle(100.0, 10.0)) assert result is None + + +def test_volume_profile_hvn_detection(): + """Feed clustered volume at specific price levels to produce HVN nodes.""" + strategy = VolumeProfileStrategy() + strategy.configure({"lookback_period": 20, "num_bins": 10, "value_area_pct": 0.7}) + + # Create a profile with very high volume at price ~100 and low volume elsewhere + # Prices range from 90 to 110, heavy volume concentrated at 100 + candles_data = [] + # Low volume at extremes + for p in [90, 91, 92, 109, 110]: + candles_data.append((p, 1.0)) + # Very high volume around 100 + for _ in range(15): + candles_data.append((100, 100.0)) + + for price, vol in candles_data: + strategy.on_candle(make_candle(price, vol)) + + # Access the internal method to verify HVN detection + result = strategy._compute_value_area() + assert result is not None + poc, va_low, va_high, hvn_levels, lvn_levels = result + + # The bin containing price ~100 should have very high volume -> HVN + assert len(hvn_levels) > 0 + # At least one HVN should be near 100 + assert any(abs(h - 100) < 5 for h in hvn_levels) + + +def test_volume_profile_reset_thorough(): + """Verify all state is cleared on reset.""" + strategy = VolumeProfileStrategy() + strategy.configure({"lookback_period": 10, "num_bins": 5}) + + # Build up state + for _ in range(10): + strategy.on_candle(make_candle(100.0, 10.0)) + # Set below/above VA flags + strategy.on_candle(make_candle(50.0, 1.0)) # below VA + strategy.on_candle(make_candle(200.0, 1.0)) # above VA + + strategy.reset() + + # Verify all state cleared + assert len(strategy._candles) == 0 + assert strategy._was_below_va is False + assert strategy._was_above_va is False + + # Should not produce signal since no data + result = strategy.on_candle(make_candle(100.0, 10.0)) + assert result is None |
