summaryrefslogtreecommitdiff
path: root/services/strategy-engine/strategies/combined_strategy.py
blob: ba9248555dc91e856623320252b88e88f64f20a1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
"""Combined strategy that aggregates signals from multiple sub-strategies."""

from decimal import Decimal

from shared.models import Candle, Signal, OrderSide
from strategies.base import BaseStrategy


class CombinedStrategy(BaseStrategy):
    """Combines multiple strategies using weighted signal voting.

    Each sub-strategy votes BUY (+weight), SELL (-weight), or HOLD (0).
    The combined signal fires when the weighted sum exceeds a threshold.
    """

    name: str = "combined"

    def __init__(self) -> None:
        super().__init__()
        self._strategies: list[tuple[BaseStrategy, float]] = []  # (strategy, weight)
        self._threshold: float = 0.5
        self._quantity: Decimal = Decimal("0.01")
        self._trade_history: dict[str, list[bool]] = {}  # strategy_name -> [win, loss, ...]
        self._adaptive_weights: bool = False
        self._history_window: int = 20  # Last N signals to evaluate

    @property
    def warmup_period(self) -> int:
        if not self._strategies:
            return 0
        return max(s.warmup_period for s, _ in self._strategies)

    def configure(self, params: dict) -> None:
        self._threshold = float(params.get("threshold", 0.5))
        self._quantity = Decimal(str(params.get("quantity", "0.01")))
        self._adaptive_weights = bool(params.get("adaptive_weights", False))
        self._history_window = int(params.get("history_window", 20))
        if self._threshold <= 0:
            raise ValueError(f"Threshold must be positive, got {self._threshold}")
        if self._quantity <= 0:
            raise ValueError(f"Quantity must be positive, got {self._quantity}")

    def add_strategy(self, strategy: BaseStrategy, weight: float = 1.0) -> None:
        """Add a sub-strategy with a weight."""
        if weight <= 0:
            raise ValueError(f"Weight must be positive, got {weight}")
        self._strategies.append((strategy, weight))

    def record_result(self, strategy_name: str, is_win: bool) -> None:
        """Record a trade result for adaptive weighting."""
        if strategy_name not in self._trade_history:
            self._trade_history[strategy_name] = []
        self._trade_history[strategy_name].append(is_win)
        # Keep only last N results
        if len(self._trade_history[strategy_name]) > self._history_window:
            self._trade_history[strategy_name] = self._trade_history[strategy_name][
                -self._history_window :
            ]

    def _get_adaptive_weight(self, strategy_name: str, base_weight: float) -> float:
        """Get weight adjusted by recent performance."""
        if not self._adaptive_weights:
            return base_weight

        history = self._trade_history.get(strategy_name, [])
        if len(history) < 5:  # Not enough data, use base weight
            return base_weight

        win_rate = sum(1 for w in history if w) / len(history)
        # Scale weight: 0.5x at 20% win rate, 1.0x at 50%, 1.5x at 80%
        scale = 0.5 + win_rate  # Range: 0.5 to 1.5
        return base_weight * scale

    def reset(self) -> None:
        for strategy, _ in self._strategies:
            strategy.reset()

    def on_candle(self, candle: Candle) -> Signal | None:
        if not self._strategies:
            return None

        total_weight = sum(self._get_adaptive_weight(s.name, w) for s, w in self._strategies)
        if total_weight == 0:
            return None

        score = 0.0
        reasons = []

        for strategy, weight in self._strategies:
            signal = strategy.on_candle(candle)
            if signal is not None:
                effective_weight = self._get_adaptive_weight(strategy.name, weight)
                if signal.side == OrderSide.BUY:
                    score += effective_weight * signal.conviction
                    reasons.append(
                        f"{strategy.name}:BUY({effective_weight}*{signal.conviction:.2f})"
                    )
                elif signal.side == OrderSide.SELL:
                    score -= effective_weight * signal.conviction
                    reasons.append(
                        f"{strategy.name}:SELL({effective_weight}*{signal.conviction:.2f})"
                    )

        normalized = score / total_weight  # Range: -1.0 to 1.0

        if normalized >= self._threshold:
            return Signal(
                strategy=self.name,
                symbol=candle.symbol,
                side=OrderSide.BUY,
                price=candle.close,
                quantity=self._quantity,
                reason=f"Combined score {normalized:.2f} >= {self._threshold} [{', '.join(reasons)}]",
            )
        elif normalized <= -self._threshold:
            return Signal(
                strategy=self.name,
                symbol=candle.symbol,
                side=OrderSide.SELL,
                price=candle.close,
                quantity=self._quantity,
                reason=f"Combined score {normalized:.2f} <= -{self._threshold} [{', '.join(reasons)}]",
            )

        return None