summaryrefslogtreecommitdiff
path: root/services/strategy-engine/strategies/volume_profile_strategy.py
blob: 324f1c294c1f4dbfa285ab84c731ce0639929c70 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
from collections import deque
from decimal import Decimal

import numpy as np

from shared.models import Candle, Signal, OrderSide
from strategies.base import BaseStrategy


class VolumeProfileStrategy(BaseStrategy):
    name: str = "volume_profile"

    def __init__(self) -> None:
        super().__init__()
        self._lookback_period: int = 100
        self._num_bins: int = 50
        self._value_area_pct: float = 0.7
        self._quantity: Decimal = Decimal("0.01")
        self._candles: deque[tuple[float, float]] = deque(maxlen=500)
        self._was_below_va: bool = False
        self._was_above_va: bool = False

    @property
    def warmup_period(self) -> int:
        return self._lookback_period

    def configure(self, params: dict) -> None:
        self._lookback_period = int(params.get("lookback_period", 100))
        self._num_bins = int(params.get("num_bins", 50))
        self._value_area_pct = float(params.get("value_area_pct", 0.7))
        self._quantity = Decimal(str(params.get("quantity", "0.01")))

        if self._lookback_period < 2:
            raise ValueError(
                f"Volume profile lookback_period must be >= 2, got {self._lookback_period}"
            )
        if self._num_bins < 2:
            raise ValueError(f"Volume profile num_bins must be >= 2, got {self._num_bins}")
        if not (0 < self._value_area_pct <= 1):
            raise ValueError(
                f"Volume profile value_area_pct must be 0 < pct <= 1, got {self._value_area_pct}"
            )
        if self._quantity <= 0:
            raise ValueError(f"Quantity must be positive, got {self._quantity}")

        self._init_filters(
            require_trend=False,
            adx_threshold=float(params.get("adx_threshold", 25.0)),
            min_volume_ratio=float(params.get("min_volume_ratio", 0.5)),
            atr_stop_multiplier=float(params.get("atr_stop_multiplier", 2.0)),
            atr_tp_multiplier=float(params.get("atr_tp_multiplier", 3.0)),
        )

    def reset(self) -> None:
        self._candles.clear()
        self._was_below_va = False
        self._was_above_va = False

    def _compute_value_area(self) -> tuple[float, float, float] | None:
        data = list(self._candles)
        if len(data) < self._lookback_period:
            return None

        recent = data[-self._lookback_period :]
        prices = np.array([c[0] for c in recent])

        min_price = prices.min()
        max_price = prices.max()
        if min_price == max_price:
            return (float(min_price), float(min_price), float(max_price))

        bin_edges = np.linspace(min_price, max_price, self._num_bins + 1)
        vol_profile = np.zeros(self._num_bins)

        for price, volume in recent:
            idx = int((price - min_price) / (max_price - min_price) * self._num_bins)
            idx = min(idx, self._num_bins - 1)
            vol_profile[idx] += volume

        # POC: bin with max volume
        poc_idx = int(np.argmax(vol_profile))
        poc = float((bin_edges[poc_idx] + bin_edges[poc_idx + 1]) / 2)

        # Value Area: expand from POC outward
        total_volume = vol_profile.sum()
        if total_volume == 0:
            return (poc, float(bin_edges[0]), float(bin_edges[-1]))

        target_volume = self._value_area_pct * total_volume
        accumulated = vol_profile[poc_idx]
        low_idx = poc_idx
        high_idx = poc_idx

        while accumulated < target_volume:
            expand_low = low_idx > 0
            expand_high = high_idx < self._num_bins - 1

            if not expand_low and not expand_high:
                break

            low_vol = vol_profile[low_idx - 1] if expand_low else -1.0
            high_vol = vol_profile[high_idx + 1] if expand_high else -1.0

            if low_vol >= high_vol:
                low_idx -= 1
                accumulated += vol_profile[low_idx]
            else:
                high_idx += 1
                accumulated += vol_profile[high_idx]

        va_low = float(bin_edges[low_idx])
        va_high = float(bin_edges[high_idx + 1])

        return (poc, va_low, va_high)

    def on_candle(self, candle: Candle) -> Signal | None:
        self._update_filter_data(candle)
        close = float(candle.close)
        volume = float(candle.volume)
        self._candles.append((close, volume))

        result = self._compute_value_area()
        if result is None:
            return None

        poc, va_low, va_high = result

        if close < va_low:
            self._was_below_va = True
        if close > va_high:
            self._was_above_va = True

        # BUY: was below VA, price bounces back between va_low and poc
        if self._was_below_va and va_low <= close <= poc:
            self._was_below_va = False
            signal = Signal(
                strategy=self.name,
                symbol=candle.symbol,
                side=OrderSide.BUY,
                price=candle.close,
                quantity=self._quantity,
                conviction=0.6,
                reason=f"Price bounced from below VA low {va_low:.2f} to {close:.2f} (POC {poc:.2f})",
            )
            return self._apply_filters(signal)

        # SELL: was above VA, price pulls back between poc and va_high
        if self._was_above_va and poc <= close <= va_high:
            self._was_above_va = False
            signal = Signal(
                strategy=self.name,
                symbol=candle.symbol,
                side=OrderSide.SELL,
                price=candle.close,
                quantity=self._quantity,
                conviction=0.6,
                reason=f"Price rejected from above VA high {va_high:.2f} to {close:.2f} (POC {poc:.2f})",
            )
            return self._apply_filters(signal)

        return None