From 71e5942632a5a8c7cd555b2d52e5632a67186a8d Mon Sep 17 00:00:00 2001 From: TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> Date: Thu, 2 Apr 2026 09:17:35 +0900 Subject: feat(strategy): add Grid trend guard and Bollinger squeeze detection --- .../strategies/bollinger_strategy.py | 61 +++++++++++++++--- .../strategy-engine/strategies/grid_strategy.py | 31 ++++++++++ .../tests/test_bollinger_strategy.py | 72 +++++++++++++++++++++- .../strategy-engine/tests/test_grid_strategy.py | 38 ++++++++++++ 4 files changed, 194 insertions(+), 8 deletions(-) (limited to 'services/strategy-engine') diff --git a/services/strategy-engine/strategies/bollinger_strategy.py b/services/strategy-engine/strategies/bollinger_strategy.py index e53ecaa..a195cb8 100644 --- a/services/strategy-engine/strategies/bollinger_strategy.py +++ b/services/strategy-engine/strategies/bollinger_strategy.py @@ -19,6 +19,9 @@ class BollingerStrategy(BaseStrategy): self._quantity: Decimal = Decimal("0.01") self._was_below_lower: bool = False self._was_above_upper: bool = False + self._squeeze_threshold: float = 0.01 # Bandwidth below this = squeeze + self._in_squeeze: bool = False + self._squeeze_bars: int = 0 # How many bars in squeeze @property def warmup_period(self) -> int: @@ -28,6 +31,7 @@ class BollingerStrategy(BaseStrategy): self._period = int(params.get("period", 20)) self._num_std = float(params.get("num_std", 2.0)) self._min_bandwidth = float(params.get("min_bandwidth", 0.02)) + self._squeeze_threshold = float(params.get("squeeze_threshold", 0.01)) self._quantity = Decimal(str(params.get("quantity", "0.01"))) if self._period < 2: @@ -46,9 +50,12 @@ class BollingerStrategy(BaseStrategy): ) def reset(self) -> None: + super().reset() self._closes.clear() self._was_below_lower = False self._was_above_upper = False + self._in_squeeze = False + self._squeeze_bars = 0 def _bollinger_conviction(self, price: float, band: float, sma: float) -> float: """Map distance from band to conviction (0.1-1.0). @@ -75,12 +82,52 @@ class BollingerStrategy(BaseStrategy): upper = sma + self._num_std * std lower = sma - self._num_std * std + price = float(candle.close) + + # %B calculation + bandwidth = (upper - lower) / sma if sma > 0 else 0 + pct_b = (price - lower) / (upper - lower) if (upper - lower) > 0 else 0.5 + + # Squeeze detection + if bandwidth < self._squeeze_threshold: + self._in_squeeze = True + self._squeeze_bars += 1 + return None # Don't trade during squeeze, wait for breakout + elif self._in_squeeze: + # Squeeze just ended — breakout! + self._in_squeeze = False + squeeze_duration = self._squeeze_bars + self._squeeze_bars = 0 + + if price > sma: + # Breakout upward + conv = min(0.5 + squeeze_duration * 0.1, 1.0) + return self._apply_filters(Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.BUY, + price=candle.close, + quantity=self._quantity, + conviction=conv, + reason=f"Bollinger squeeze breakout UP after {squeeze_duration} bars", + )) + else: + # Breakout downward + conv = min(0.5 + squeeze_duration * 0.1, 1.0) + return self._apply_filters(Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.SELL, + price=candle.close, + quantity=self._quantity, + conviction=conv, + reason=f"Bollinger squeeze breakout DOWN after {squeeze_duration} bars", + )) + # Bandwidth filter: skip sideways markets - if sma != 0 and (upper - lower) / sma < self._min_bandwidth: + if sma != 0 and bandwidth < self._min_bandwidth: return None - price = float(candle.close) - # Track band penetration if price < lower: self._was_below_lower = True @@ -90,14 +137,14 @@ class BollingerStrategy(BaseStrategy): # BUY: was below lower band and recovered back inside if self._was_below_lower and price >= lower: self._was_below_lower = False - conviction = self._bollinger_conviction(price, lower, sma) + conv = max(1.0 - pct_b, 0.3) # Closer to lower band = higher conviction signal = Signal( strategy=self.name, symbol=candle.symbol, side=OrderSide.BUY, price=candle.close, quantity=self._quantity, - conviction=conviction, + conviction=conv, reason=f"Price recovered above lower Bollinger Band ({lower:.2f})", ) return self._apply_filters(signal) @@ -105,14 +152,14 @@ class BollingerStrategy(BaseStrategy): # SELL: was above upper band and recovered back inside if self._was_above_upper and price <= upper: self._was_above_upper = False - conviction = self._bollinger_conviction(price, upper, sma) + conv = max(pct_b, 0.3) # Closer to upper band = higher conviction signal = Signal( strategy=self.name, symbol=candle.symbol, side=OrderSide.SELL, price=candle.close, quantity=self._quantity, - conviction=conviction, + conviction=conv, reason=f"Price recovered below upper Bollinger Band ({upper:.2f})", ) return self._apply_filters(signal) diff --git a/services/strategy-engine/strategies/grid_strategy.py b/services/strategy-engine/strategies/grid_strategy.py index 70443ec..07ccaba 100644 --- a/services/strategy-engine/strategies/grid_strategy.py +++ b/services/strategy-engine/strategies/grid_strategy.py @@ -18,6 +18,9 @@ class GridStrategy(BaseStrategy): self._quantity: Decimal = Decimal("0.01") self._grid_levels: list[float] = [] self._last_zone: Optional[int] = None + self._exit_threshold_pct: float = 5.0 + self._out_of_range: bool = False + self._in_position: bool = False # Track if we have any grid positions @property def warmup_period(self) -> int: @@ -29,11 +32,17 @@ class GridStrategy(BaseStrategy): self._grid_count = int(params.get("grid_count", 5)) self._quantity = Decimal(str(params.get("quantity", "0.01"))) + self._exit_threshold_pct = float(params.get("exit_threshold_pct", 5.0)) + if self._lower_price >= self._upper_price: raise ValueError( f"Grid lower_price must be < upper_price, " f"got lower={self._lower_price}, upper={self._upper_price}" ) + if self._exit_threshold_pct <= 0: + raise ValueError( + f"exit_threshold_pct must be > 0, got {self._exit_threshold_pct}" + ) if self._grid_count < 2: raise ValueError(f"Grid grid_count must be >= 2, got {self._grid_count}") if self._quantity <= 0: @@ -53,7 +62,9 @@ class GridStrategy(BaseStrategy): ) def reset(self) -> None: + super().reset() self._last_zone = None + self._out_of_range = False def _get_zone(self, price: float) -> int: """Return the grid zone index for a given price. @@ -69,6 +80,26 @@ class GridStrategy(BaseStrategy): def on_candle(self, candle: Candle) -> Signal | None: self._update_filter_data(candle) price = float(candle.close) + + # Check if price is out of grid range + if self._grid_levels: + lower_bound = self._grid_levels[0] * (1 - self._exit_threshold_pct / 100) + upper_bound = self._grid_levels[-1] * (1 + self._exit_threshold_pct / 100) + + if price < lower_bound or price > upper_bound: + if not self._out_of_range: + self._out_of_range = True + # Exit signal — close positions + return self._apply_filters(Signal( + strategy=self.name, symbol=candle.symbol, + side=OrderSide.SELL, price=candle.close, + quantity=self._quantity, conviction=0.8, + reason=f"Grid: price {price:.2f} broke out of range [{self._grid_levels[0]:.2f}, {self._grid_levels[-1]:.2f}]", + )) + return None # Already out of range, no more signals + else: + self._out_of_range = False + current_zone = self._get_zone(price) if self._last_zone is None: diff --git a/services/strategy-engine/tests/test_bollinger_strategy.py b/services/strategy-engine/tests/test_bollinger_strategy.py index 348a9e0..473d9b4 100644 --- a/services/strategy-engine/tests/test_bollinger_strategy.py +++ b/services/strategy-engine/tests/test_bollinger_strategy.py @@ -23,7 +23,7 @@ def make_candle(close: float) -> Candle: def _make_strategy() -> BollingerStrategy: s = BollingerStrategy() - s.configure({"period": 5, "num_std": 1.0, "min_bandwidth": 0.0}) + s.configure({"period": 5, "num_std": 1.0, "min_bandwidth": 0.0, "squeeze_threshold": 0.0}) return s @@ -99,3 +99,73 @@ def test_bollinger_reset_clears_state(): assert len(strategy._closes) == 1 assert strategy._was_below_lower is False assert strategy._was_above_upper is False + assert strategy._in_squeeze is False + assert strategy._squeeze_bars == 0 + + +def test_bollinger_squeeze_detection(): + """Tight bandwidth → no signal during squeeze.""" + # Use a strategy with a high squeeze threshold so constant prices trigger squeeze + s = BollingerStrategy() + s.configure({ + "period": 5, + "num_std": 2.0, + "min_bandwidth": 0.0, + "squeeze_threshold": 0.5, # Very high threshold to ensure squeeze triggers + }) + + # Feed identical prices → bandwidth = 0 (below any threshold) + for _ in range(6): + result = s.on_candle(make_candle(100.0)) + + # With identical prices, std=0, bandwidth=0 < 0.5 → squeeze, no signal + assert s._in_squeeze is True + assert result is None + + +def test_bollinger_squeeze_breakout_buy(): + """Squeeze ends with price above SMA → BUY signal.""" + s = BollingerStrategy() + s.configure({ + "period": 5, + "num_std": 1.0, + "min_bandwidth": 0.0, + "squeeze_threshold": 0.01, + }) + + # Feed identical prices to create a squeeze (bandwidth = 0) + for _ in range(6): + s.on_candle(make_candle(100.0)) + + assert s._in_squeeze is True + + # Now feed a price that creates enough spread to exit squeeze AND is above SMA + signal = s.on_candle(make_candle(120.0)) + assert signal is not None + assert signal.side == OrderSide.BUY + assert "squeeze breakout UP" in signal.reason + + +def test_bollinger_pct_b_conviction(): + """Signals near band extremes have higher conviction via %B.""" + s = BollingerStrategy() + s.configure({ + "period": 5, + "num_std": 1.0, + "min_bandwidth": 0.0, + "squeeze_threshold": 0.0, # Disable squeeze for this test + }) + + # Build up with stable prices + for _ in range(5): + s.on_candle(make_candle(100.0)) + + # Drop below lower band + s.on_candle(make_candle(50.0)) + + # Recover just at the lower band edge — %B close to 0 → high conviction + signal = s.on_candle(make_candle(100.0)) + assert signal is not None + assert signal.side == OrderSide.BUY + # conviction = max(1.0 - pct_b, 0.3), with pct_b near lower → conviction should be >= 0.3 + assert signal.conviction >= 0.3 diff --git a/services/strategy-engine/tests/test_grid_strategy.py b/services/strategy-engine/tests/test_grid_strategy.py index 79eb22a..9823f98 100644 --- a/services/strategy-engine/tests/test_grid_strategy.py +++ b/services/strategy-engine/tests/test_grid_strategy.py @@ -60,3 +60,41 @@ def test_grid_strategy_no_signal_in_same_zone(): strategy.on_candle(make_candle(50000)) signal = strategy.on_candle(make_candle(50100)) assert signal is None + + +def test_grid_exit_on_trend_break(): + """Price drops well below grid range → SELL signal emitted.""" + strategy = _configured_strategy() + # Grid range is 48000-52000, exit_threshold_pct defaults to 5% + # Lower bound = 48000 * 0.95 = 45600 + # Establish a zone first + strategy.on_candle(make_candle(50000)) + # Price drops far below the grid range + signal = strategy.on_candle(make_candle(45000)) + assert signal is not None + assert signal.side == OrderSide.SELL + assert "broke out of range" in signal.reason + + +def test_grid_no_signal_while_out_of_range(): + """After exit signal, no more grid signals until price returns to range.""" + strategy = _configured_strategy() + # Establish a zone + strategy.on_candle(make_candle(50000)) + # First out-of-range candle → SELL exit signal + signal = strategy.on_candle(make_candle(45000)) + assert signal is not None + assert signal.side == OrderSide.SELL + + # Subsequent out-of-range candles → no signals + signal = strategy.on_candle(make_candle(44000)) + assert signal is None + + signal = strategy.on_candle(make_candle(43000)) + assert signal is None + + # Price returns to grid range → grid signals resume + strategy.on_candle(make_candle(50000)) + signal = strategy.on_candle(make_candle(48100)) + assert signal is not None + assert signal.side == OrderSide.BUY -- cgit v1.2.3