summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-02 09:17:35 +0900
committerTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-02 09:17:35 +0900
commit71e5942632a5a8c7cd555b2d52e5632a67186a8d (patch)
tree668a2c80c04b40b43dac39c6e22efa5cf1aad9a7
parenta841b3a1f2f08caa7f82a1516c47bb5f3c4b7356 (diff)
feat(strategy): add Grid trend guard and Bollinger squeeze detection
-rw-r--r--services/strategy-engine/strategies/bollinger_strategy.py61
-rw-r--r--services/strategy-engine/strategies/grid_strategy.py31
-rw-r--r--services/strategy-engine/tests/test_bollinger_strategy.py72
-rw-r--r--services/strategy-engine/tests/test_grid_strategy.py38
4 files changed, 194 insertions, 8 deletions
diff --git a/services/strategy-engine/strategies/bollinger_strategy.py b/services/strategy-engine/strategies/bollinger_strategy.py
index e53ecaa..a195cb8 100644
--- a/services/strategy-engine/strategies/bollinger_strategy.py
+++ b/services/strategy-engine/strategies/bollinger_strategy.py
@@ -19,6 +19,9 @@ class BollingerStrategy(BaseStrategy):
self._quantity: Decimal = Decimal("0.01")
self._was_below_lower: bool = False
self._was_above_upper: bool = False
+ self._squeeze_threshold: float = 0.01 # Bandwidth below this = squeeze
+ self._in_squeeze: bool = False
+ self._squeeze_bars: int = 0 # How many bars in squeeze
@property
def warmup_period(self) -> int:
@@ -28,6 +31,7 @@ class BollingerStrategy(BaseStrategy):
self._period = int(params.get("period", 20))
self._num_std = float(params.get("num_std", 2.0))
self._min_bandwidth = float(params.get("min_bandwidth", 0.02))
+ self._squeeze_threshold = float(params.get("squeeze_threshold", 0.01))
self._quantity = Decimal(str(params.get("quantity", "0.01")))
if self._period < 2:
@@ -46,9 +50,12 @@ class BollingerStrategy(BaseStrategy):
)
def reset(self) -> None:
+ super().reset()
self._closes.clear()
self._was_below_lower = False
self._was_above_upper = False
+ self._in_squeeze = False
+ self._squeeze_bars = 0
def _bollinger_conviction(self, price: float, band: float, sma: float) -> float:
"""Map distance from band to conviction (0.1-1.0).
@@ -75,12 +82,52 @@ class BollingerStrategy(BaseStrategy):
upper = sma + self._num_std * std
lower = sma - self._num_std * std
+ price = float(candle.close)
+
+ # %B calculation
+ bandwidth = (upper - lower) / sma if sma > 0 else 0
+ pct_b = (price - lower) / (upper - lower) if (upper - lower) > 0 else 0.5
+
+ # Squeeze detection
+ if bandwidth < self._squeeze_threshold:
+ self._in_squeeze = True
+ self._squeeze_bars += 1
+ return None # Don't trade during squeeze, wait for breakout
+ elif self._in_squeeze:
+ # Squeeze just ended — breakout!
+ self._in_squeeze = False
+ squeeze_duration = self._squeeze_bars
+ self._squeeze_bars = 0
+
+ if price > sma:
+ # Breakout upward
+ conv = min(0.5 + squeeze_duration * 0.1, 1.0)
+ return self._apply_filters(Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.BUY,
+ price=candle.close,
+ quantity=self._quantity,
+ conviction=conv,
+ reason=f"Bollinger squeeze breakout UP after {squeeze_duration} bars",
+ ))
+ else:
+ # Breakout downward
+ conv = min(0.5 + squeeze_duration * 0.1, 1.0)
+ return self._apply_filters(Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.SELL,
+ price=candle.close,
+ quantity=self._quantity,
+ conviction=conv,
+ reason=f"Bollinger squeeze breakout DOWN after {squeeze_duration} bars",
+ ))
+
# Bandwidth filter: skip sideways markets
- if sma != 0 and (upper - lower) / sma < self._min_bandwidth:
+ if sma != 0 and bandwidth < self._min_bandwidth:
return None
- price = float(candle.close)
-
# Track band penetration
if price < lower:
self._was_below_lower = True
@@ -90,14 +137,14 @@ class BollingerStrategy(BaseStrategy):
# BUY: was below lower band and recovered back inside
if self._was_below_lower and price >= lower:
self._was_below_lower = False
- conviction = self._bollinger_conviction(price, lower, sma)
+ conv = max(1.0 - pct_b, 0.3) # Closer to lower band = higher conviction
signal = Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.BUY,
price=candle.close,
quantity=self._quantity,
- conviction=conviction,
+ conviction=conv,
reason=f"Price recovered above lower Bollinger Band ({lower:.2f})",
)
return self._apply_filters(signal)
@@ -105,14 +152,14 @@ class BollingerStrategy(BaseStrategy):
# SELL: was above upper band and recovered back inside
if self._was_above_upper and price <= upper:
self._was_above_upper = False
- conviction = self._bollinger_conviction(price, upper, sma)
+ conv = max(pct_b, 0.3) # Closer to upper band = higher conviction
signal = Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.SELL,
price=candle.close,
quantity=self._quantity,
- conviction=conviction,
+ conviction=conv,
reason=f"Price recovered below upper Bollinger Band ({upper:.2f})",
)
return self._apply_filters(signal)
diff --git a/services/strategy-engine/strategies/grid_strategy.py b/services/strategy-engine/strategies/grid_strategy.py
index 70443ec..07ccaba 100644
--- a/services/strategy-engine/strategies/grid_strategy.py
+++ b/services/strategy-engine/strategies/grid_strategy.py
@@ -18,6 +18,9 @@ class GridStrategy(BaseStrategy):
self._quantity: Decimal = Decimal("0.01")
self._grid_levels: list[float] = []
self._last_zone: Optional[int] = None
+ self._exit_threshold_pct: float = 5.0
+ self._out_of_range: bool = False
+ self._in_position: bool = False # Track if we have any grid positions
@property
def warmup_period(self) -> int:
@@ -29,11 +32,17 @@ class GridStrategy(BaseStrategy):
self._grid_count = int(params.get("grid_count", 5))
self._quantity = Decimal(str(params.get("quantity", "0.01")))
+ self._exit_threshold_pct = float(params.get("exit_threshold_pct", 5.0))
+
if self._lower_price >= self._upper_price:
raise ValueError(
f"Grid lower_price must be < upper_price, "
f"got lower={self._lower_price}, upper={self._upper_price}"
)
+ if self._exit_threshold_pct <= 0:
+ raise ValueError(
+ f"exit_threshold_pct must be > 0, got {self._exit_threshold_pct}"
+ )
if self._grid_count < 2:
raise ValueError(f"Grid grid_count must be >= 2, got {self._grid_count}")
if self._quantity <= 0:
@@ -53,7 +62,9 @@ class GridStrategy(BaseStrategy):
)
def reset(self) -> None:
+ super().reset()
self._last_zone = None
+ self._out_of_range = False
def _get_zone(self, price: float) -> int:
"""Return the grid zone index for a given price.
@@ -69,6 +80,26 @@ class GridStrategy(BaseStrategy):
def on_candle(self, candle: Candle) -> Signal | None:
self._update_filter_data(candle)
price = float(candle.close)
+
+ # Check if price is out of grid range
+ if self._grid_levels:
+ lower_bound = self._grid_levels[0] * (1 - self._exit_threshold_pct / 100)
+ upper_bound = self._grid_levels[-1] * (1 + self._exit_threshold_pct / 100)
+
+ if price < lower_bound or price > upper_bound:
+ if not self._out_of_range:
+ self._out_of_range = True
+ # Exit signal — close positions
+ return self._apply_filters(Signal(
+ strategy=self.name, symbol=candle.symbol,
+ side=OrderSide.SELL, price=candle.close,
+ quantity=self._quantity, conviction=0.8,
+ reason=f"Grid: price {price:.2f} broke out of range [{self._grid_levels[0]:.2f}, {self._grid_levels[-1]:.2f}]",
+ ))
+ return None # Already out of range, no more signals
+ else:
+ self._out_of_range = False
+
current_zone = self._get_zone(price)
if self._last_zone is None:
diff --git a/services/strategy-engine/tests/test_bollinger_strategy.py b/services/strategy-engine/tests/test_bollinger_strategy.py
index 348a9e0..473d9b4 100644
--- a/services/strategy-engine/tests/test_bollinger_strategy.py
+++ b/services/strategy-engine/tests/test_bollinger_strategy.py
@@ -23,7 +23,7 @@ def make_candle(close: float) -> Candle:
def _make_strategy() -> BollingerStrategy:
s = BollingerStrategy()
- s.configure({"period": 5, "num_std": 1.0, "min_bandwidth": 0.0})
+ s.configure({"period": 5, "num_std": 1.0, "min_bandwidth": 0.0, "squeeze_threshold": 0.0})
return s
@@ -99,3 +99,73 @@ def test_bollinger_reset_clears_state():
assert len(strategy._closes) == 1
assert strategy._was_below_lower is False
assert strategy._was_above_upper is False
+ assert strategy._in_squeeze is False
+ assert strategy._squeeze_bars == 0
+
+
+def test_bollinger_squeeze_detection():
+ """Tight bandwidth → no signal during squeeze."""
+ # Use a strategy with a high squeeze threshold so constant prices trigger squeeze
+ s = BollingerStrategy()
+ s.configure({
+ "period": 5,
+ "num_std": 2.0,
+ "min_bandwidth": 0.0,
+ "squeeze_threshold": 0.5, # Very high threshold to ensure squeeze triggers
+ })
+
+ # Feed identical prices → bandwidth = 0 (below any threshold)
+ for _ in range(6):
+ result = s.on_candle(make_candle(100.0))
+
+ # With identical prices, std=0, bandwidth=0 < 0.5 → squeeze, no signal
+ assert s._in_squeeze is True
+ assert result is None
+
+
+def test_bollinger_squeeze_breakout_buy():
+ """Squeeze ends with price above SMA → BUY signal."""
+ s = BollingerStrategy()
+ s.configure({
+ "period": 5,
+ "num_std": 1.0,
+ "min_bandwidth": 0.0,
+ "squeeze_threshold": 0.01,
+ })
+
+ # Feed identical prices to create a squeeze (bandwidth = 0)
+ for _ in range(6):
+ s.on_candle(make_candle(100.0))
+
+ assert s._in_squeeze is True
+
+ # Now feed a price that creates enough spread to exit squeeze AND is above SMA
+ signal = s.on_candle(make_candle(120.0))
+ assert signal is not None
+ assert signal.side == OrderSide.BUY
+ assert "squeeze breakout UP" in signal.reason
+
+
+def test_bollinger_pct_b_conviction():
+ """Signals near band extremes have higher conviction via %B."""
+ s = BollingerStrategy()
+ s.configure({
+ "period": 5,
+ "num_std": 1.0,
+ "min_bandwidth": 0.0,
+ "squeeze_threshold": 0.0, # Disable squeeze for this test
+ })
+
+ # Build up with stable prices
+ for _ in range(5):
+ s.on_candle(make_candle(100.0))
+
+ # Drop below lower band
+ s.on_candle(make_candle(50.0))
+
+ # Recover just at the lower band edge — %B close to 0 → high conviction
+ signal = s.on_candle(make_candle(100.0))
+ assert signal is not None
+ assert signal.side == OrderSide.BUY
+ # conviction = max(1.0 - pct_b, 0.3), with pct_b near lower → conviction should be >= 0.3
+ assert signal.conviction >= 0.3
diff --git a/services/strategy-engine/tests/test_grid_strategy.py b/services/strategy-engine/tests/test_grid_strategy.py
index 79eb22a..9823f98 100644
--- a/services/strategy-engine/tests/test_grid_strategy.py
+++ b/services/strategy-engine/tests/test_grid_strategy.py
@@ -60,3 +60,41 @@ def test_grid_strategy_no_signal_in_same_zone():
strategy.on_candle(make_candle(50000))
signal = strategy.on_candle(make_candle(50100))
assert signal is None
+
+
+def test_grid_exit_on_trend_break():
+ """Price drops well below grid range → SELL signal emitted."""
+ strategy = _configured_strategy()
+ # Grid range is 48000-52000, exit_threshold_pct defaults to 5%
+ # Lower bound = 48000 * 0.95 = 45600
+ # Establish a zone first
+ strategy.on_candle(make_candle(50000))
+ # Price drops far below the grid range
+ signal = strategy.on_candle(make_candle(45000))
+ assert signal is not None
+ assert signal.side == OrderSide.SELL
+ assert "broke out of range" in signal.reason
+
+
+def test_grid_no_signal_while_out_of_range():
+ """After exit signal, no more grid signals until price returns to range."""
+ strategy = _configured_strategy()
+ # Establish a zone
+ strategy.on_candle(make_candle(50000))
+ # First out-of-range candle → SELL exit signal
+ signal = strategy.on_candle(make_candle(45000))
+ assert signal is not None
+ assert signal.side == OrderSide.SELL
+
+ # Subsequent out-of-range candles → no signals
+ signal = strategy.on_candle(make_candle(44000))
+ assert signal is None
+
+ signal = strategy.on_candle(make_candle(43000))
+ assert signal is None
+
+ # Price returns to grid range → grid signals resume
+ strategy.on_candle(make_candle(50000))
+ signal = strategy.on_candle(make_candle(48100))
+ assert signal is not None
+ assert signal.side == OrderSide.BUY