1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
|
from collections import deque
from decimal import Decimal
import numpy as np
from shared.models import Candle, Signal, OrderSide
from strategies.base import BaseStrategy
class VolumeProfileStrategy(BaseStrategy):
name: str = "volume_profile"
def __init__(self) -> None:
super().__init__()
self._lookback_period: int = 100
self._num_bins: int = 50
self._value_area_pct: float = 0.7
self._quantity: Decimal = Decimal("0.01")
self._candles: deque[tuple[float, float]] = deque(maxlen=500)
self._was_below_va: bool = False
self._was_above_va: bool = False
@property
def warmup_period(self) -> int:
return self._lookback_period
def configure(self, params: dict) -> None:
self._lookback_period = int(params.get("lookback_period", 100))
self._num_bins = int(params.get("num_bins", 50))
self._value_area_pct = float(params.get("value_area_pct", 0.7))
self._quantity = Decimal(str(params.get("quantity", "0.01")))
if self._lookback_period < 2:
raise ValueError(
f"Volume profile lookback_period must be >= 2, got {self._lookback_period}"
)
if self._num_bins < 2:
raise ValueError(f"Volume profile num_bins must be >= 2, got {self._num_bins}")
if not (0 < self._value_area_pct <= 1):
raise ValueError(
f"Volume profile value_area_pct must be 0 < pct <= 1, got {self._value_area_pct}"
)
if self._quantity <= 0:
raise ValueError(f"Quantity must be positive, got {self._quantity}")
self._init_filters(
require_trend=False,
adx_threshold=float(params.get("adx_threshold", 25.0)),
min_volume_ratio=float(params.get("min_volume_ratio", 0.5)),
atr_stop_multiplier=float(params.get("atr_stop_multiplier", 2.0)),
atr_tp_multiplier=float(params.get("atr_tp_multiplier", 3.0)),
)
def reset(self) -> None:
self._candles.clear()
self._was_below_va = False
self._was_above_va = False
def _compute_value_area(self) -> tuple[float, float, float, list[float], list[float]] | None:
"""Compute POC, VA low, VA high, HVN levels, LVN levels."""
data = list(self._candles)
if len(data) < self._lookback_period:
return None
recent = data[-self._lookback_period :]
prices = np.array([c[0] for c in recent])
min_price = prices.min()
max_price = prices.max()
if min_price == max_price:
return (float(min_price), float(min_price), float(max_price), [], [])
bin_edges = np.linspace(min_price, max_price, self._num_bins + 1)
vol_profile = np.zeros(self._num_bins)
for price, volume in recent:
idx = int((price - min_price) / (max_price - min_price) * self._num_bins)
idx = min(idx, self._num_bins - 1)
vol_profile[idx] += volume
# POC: bin with max volume
poc_idx = int(np.argmax(vol_profile))
poc = float((bin_edges[poc_idx] + bin_edges[poc_idx + 1]) / 2)
# Value Area: expand from POC outward
total_volume = vol_profile.sum()
if total_volume == 0:
return (poc, float(bin_edges[0]), float(bin_edges[-1]), [], [])
target_volume = self._value_area_pct * total_volume
accumulated = vol_profile[poc_idx]
low_idx = poc_idx
high_idx = poc_idx
while accumulated < target_volume:
expand_low = low_idx > 0
expand_high = high_idx < self._num_bins - 1
if not expand_low and not expand_high:
break
low_vol = vol_profile[low_idx - 1] if expand_low else -1.0
high_vol = vol_profile[high_idx + 1] if expand_high else -1.0
if low_vol >= high_vol:
low_idx -= 1
accumulated += vol_profile[low_idx]
else:
high_idx += 1
accumulated += vol_profile[high_idx]
va_low = float(bin_edges[low_idx])
va_high = float(bin_edges[high_idx + 1])
# HVN/LVN detection
mean_vol = vol_profile.mean()
std_vol = vol_profile.std()
hvn_levels: list[float] = []
lvn_levels: list[float] = []
for i in range(len(vol_profile)):
mid = float((bin_edges[i] + bin_edges[i + 1]) / 2)
if vol_profile[i] > mean_vol + std_vol:
hvn_levels.append(mid)
elif vol_profile[i] < mean_vol - 0.5 * std_vol and vol_profile[i] > 0:
lvn_levels.append(mid)
return (poc, va_low, va_high, hvn_levels, lvn_levels)
def on_candle(self, candle: Candle) -> Signal | None:
self._update_filter_data(candle)
close = float(candle.close)
volume = float(candle.volume)
self._candles.append((close, volume))
result = self._compute_value_area()
if result is None:
return None
poc, va_low, va_high, hvn_levels, lvn_levels = result
if close < va_low:
self._was_below_va = True
if close > va_high:
self._was_above_va = True
# HVN bounce signals (stronger than regular VA bounces)
for hvn in hvn_levels:
if abs(close - hvn) / hvn < 0.005: # Within 0.5% of HVN
if self._was_below_va and close >= va_low:
self._was_below_va = False
signal = Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.BUY,
price=candle.close,
quantity=self._quantity,
conviction=0.85,
reason=f"Price near HVN {hvn:.2f}, bounced from below VA low {va_low:.2f} to {close:.2f}",
)
return self._apply_filters(signal)
if self._was_above_va and close <= va_high:
self._was_above_va = False
signal = Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.SELL,
price=candle.close,
quantity=self._quantity,
conviction=0.85,
reason=f"Price near HVN {hvn:.2f}, rejected from above VA high {va_high:.2f} to {close:.2f}",
)
return self._apply_filters(signal)
# BUY: was below VA, price bounces back between va_low and poc
if self._was_below_va and va_low <= close <= poc:
self._was_below_va = False
signal = Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.BUY,
price=candle.close,
quantity=self._quantity,
conviction=0.6,
reason=f"Price bounced from below VA low {va_low:.2f} to {close:.2f} (POC {poc:.2f})",
)
return self._apply_filters(signal)
# SELL: was above VA, price pulls back between poc and va_high
if self._was_above_va and poc <= close <= va_high:
self._was_above_va = False
signal = Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.SELL,
price=candle.close,
quantity=self._quantity,
conviction=0.6,
reason=f"Price rejected from above VA high {va_high:.2f} to {close:.2f} (POC {poc:.2f})",
)
return self._apply_filters(signal)
return None
|