"""Combined strategy that aggregates signals from multiple sub-strategies.""" from decimal import Decimal from shared.models import Candle, OrderSide, Signal from strategies.base import BaseStrategy class CombinedStrategy(BaseStrategy): """Combines multiple strategies using weighted signal voting. Each sub-strategy votes BUY (+weight), SELL (-weight), or HOLD (0). The combined signal fires when the weighted sum exceeds a threshold. """ name: str = "combined" def __init__(self) -> None: super().__init__() self._strategies: list[tuple[BaseStrategy, float]] = [] # (strategy, weight) self._threshold: float = 0.5 self._quantity: Decimal = Decimal("0.01") self._trade_history: dict[str, list[bool]] = {} # strategy_name -> [win, loss, ...] self._adaptive_weights: bool = False self._history_window: int = 20 # Last N signals to evaluate @property def warmup_period(self) -> int: if not self._strategies: return 0 return max(s.warmup_period for s, _ in self._strategies) def configure(self, params: dict) -> None: self._threshold = float(params.get("threshold", 0.5)) self._quantity = Decimal(str(params.get("quantity", "0.01"))) self._adaptive_weights = bool(params.get("adaptive_weights", False)) self._history_window = int(params.get("history_window", 20)) if self._threshold <= 0: raise ValueError(f"Threshold must be positive, got {self._threshold}") if self._quantity <= 0: raise ValueError(f"Quantity must be positive, got {self._quantity}") def add_strategy(self, strategy: BaseStrategy, weight: float = 1.0) -> None: """Add a sub-strategy with a weight.""" if weight <= 0: raise ValueError(f"Weight must be positive, got {weight}") self._strategies.append((strategy, weight)) def record_result(self, strategy_name: str, is_win: bool) -> None: """Record a trade result for adaptive weighting.""" if strategy_name not in self._trade_history: self._trade_history[strategy_name] = [] self._trade_history[strategy_name].append(is_win) # Keep only last N results if len(self._trade_history[strategy_name]) > self._history_window: self._trade_history[strategy_name] = self._trade_history[strategy_name][ -self._history_window : ] def _get_adaptive_weight(self, strategy_name: str, base_weight: float) -> float: """Get weight adjusted by recent performance.""" if not self._adaptive_weights: return base_weight history = self._trade_history.get(strategy_name, []) if len(history) < 5: # Not enough data, use base weight return base_weight win_rate = sum(1 for w in history if w) / len(history) # Scale weight: 0.5x at 20% win rate, 1.0x at 50%, 1.5x at 80% scale = 0.5 + win_rate # Range: 0.5 to 1.5 return base_weight * scale def reset(self) -> None: for strategy, _ in self._strategies: strategy.reset() def on_candle(self, candle: Candle) -> Signal | None: if not self._strategies: return None total_weight = sum(self._get_adaptive_weight(s.name, w) for s, w in self._strategies) if total_weight == 0: return None score = 0.0 reasons = [] for strategy, weight in self._strategies: signal = strategy.on_candle(candle) if signal is not None: effective_weight = self._get_adaptive_weight(strategy.name, weight) if signal.side == OrderSide.BUY: score += effective_weight * signal.conviction reasons.append( f"{strategy.name}:BUY({effective_weight}*{signal.conviction:.2f})" ) elif signal.side == OrderSide.SELL: score -= effective_weight * signal.conviction reasons.append( f"{strategy.name}:SELL({effective_weight}*{signal.conviction:.2f})" ) normalized = score / total_weight # Range: -1.0 to 1.0 if normalized >= self._threshold: return Signal( strategy=self.name, symbol=candle.symbol, side=OrderSide.BUY, price=candle.close, quantity=self._quantity, reason=f"Combined score {normalized:.2f} >= {self._threshold} [{', '.join(reasons)}]", ) elif normalized <= -self._threshold: return Signal( strategy=self.name, symbol=candle.symbol, side=OrderSide.SELL, price=candle.close, quantity=self._quantity, reason=f"Combined score {normalized:.2f} <= -{self._threshold} [{', '.join(reasons)}]", ) return None