1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
from collections import deque
from decimal import Decimal
import numpy as np
from shared.models import Candle, Signal, OrderSide
from strategies.base import BaseStrategy
class VolumeProfileStrategy(BaseStrategy):
name: str = "volume_profile"
def __init__(self) -> None:
self._lookback_period: int = 100
self._num_bins: int = 50
self._value_area_pct: float = 0.7
self._quantity: Decimal = Decimal("0.01")
self._candles: deque[tuple[float, float]] = deque(maxlen=500)
self._was_below_va: bool = False
self._was_above_va: bool = False
@property
def warmup_period(self) -> int:
return self._lookback_period
def configure(self, params: dict) -> None:
self._lookback_period = int(params.get("lookback_period", 100))
self._num_bins = int(params.get("num_bins", 50))
self._value_area_pct = float(params.get("value_area_pct", 0.7))
self._quantity = Decimal(str(params.get("quantity", "0.01")))
if self._lookback_period < 2:
raise ValueError(
f"Volume profile lookback_period must be >= 2, got {self._lookback_period}"
)
if self._num_bins < 2:
raise ValueError(f"Volume profile num_bins must be >= 2, got {self._num_bins}")
if not (0 < self._value_area_pct <= 1):
raise ValueError(
f"Volume profile value_area_pct must be 0 < pct <= 1, got {self._value_area_pct}"
)
if self._quantity <= 0:
raise ValueError(f"Quantity must be positive, got {self._quantity}")
def reset(self) -> None:
self._candles.clear()
self._was_below_va = False
self._was_above_va = False
def _compute_value_area(self) -> tuple[float, float, float] | None:
data = list(self._candles)
if len(data) < self._lookback_period:
return None
recent = data[-self._lookback_period :]
prices = np.array([c[0] for c in recent])
min_price = prices.min()
max_price = prices.max()
if min_price == max_price:
return (float(min_price), float(min_price), float(max_price))
bin_edges = np.linspace(min_price, max_price, self._num_bins + 1)
vol_profile = np.zeros(self._num_bins)
for price, volume in recent:
idx = int((price - min_price) / (max_price - min_price) * self._num_bins)
idx = min(idx, self._num_bins - 1)
vol_profile[idx] += volume
# POC: bin with max volume
poc_idx = int(np.argmax(vol_profile))
poc = float((bin_edges[poc_idx] + bin_edges[poc_idx + 1]) / 2)
# Value Area: expand from POC outward
total_volume = vol_profile.sum()
if total_volume == 0:
return (poc, float(bin_edges[0]), float(bin_edges[-1]))
target_volume = self._value_area_pct * total_volume
accumulated = vol_profile[poc_idx]
low_idx = poc_idx
high_idx = poc_idx
while accumulated < target_volume:
expand_low = low_idx > 0
expand_high = high_idx < self._num_bins - 1
if not expand_low and not expand_high:
break
low_vol = vol_profile[low_idx - 1] if expand_low else -1.0
high_vol = vol_profile[high_idx + 1] if expand_high else -1.0
if low_vol >= high_vol:
low_idx -= 1
accumulated += vol_profile[low_idx]
else:
high_idx += 1
accumulated += vol_profile[high_idx]
va_low = float(bin_edges[low_idx])
va_high = float(bin_edges[high_idx + 1])
return (poc, va_low, va_high)
def on_candle(self, candle: Candle) -> Signal | None:
close = float(candle.close)
volume = float(candle.volume)
self._candles.append((close, volume))
result = self._compute_value_area()
if result is None:
return None
poc, va_low, va_high = result
if close < va_low:
self._was_below_va = True
if close > va_high:
self._was_above_va = True
# BUY: was below VA, price bounces back between va_low and poc
if self._was_below_va and va_low <= close <= poc:
self._was_below_va = False
return Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.BUY,
price=candle.close,
quantity=self._quantity,
reason=f"Price bounced from below VA low {va_low:.2f} to {close:.2f} (POC {poc:.2f})",
)
# SELL: was above VA, price pulls back between poc and va_high
if self._was_above_va and poc <= close <= va_high:
self._was_above_va = False
return Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.SELL,
price=candle.close,
quantity=self._quantity,
reason=f"Price rejected from above VA high {va_high:.2f} to {close:.2f} (POC {poc:.2f})",
)
return None
|