From 71e5942632a5a8c7cd555b2d52e5632a67186a8d Mon Sep 17 00:00:00 2001 From: TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> Date: Thu, 2 Apr 2026 09:17:35 +0900 Subject: feat(strategy): add Grid trend guard and Bollinger squeeze detection --- .../strategies/bollinger_strategy.py | 61 +++++++++++++++++++--- .../strategy-engine/strategies/grid_strategy.py | 31 +++++++++++ 2 files changed, 85 insertions(+), 7 deletions(-) (limited to 'services/strategy-engine/strategies') diff --git a/services/strategy-engine/strategies/bollinger_strategy.py b/services/strategy-engine/strategies/bollinger_strategy.py index e53ecaa..a195cb8 100644 --- a/services/strategy-engine/strategies/bollinger_strategy.py +++ b/services/strategy-engine/strategies/bollinger_strategy.py @@ -19,6 +19,9 @@ class BollingerStrategy(BaseStrategy): self._quantity: Decimal = Decimal("0.01") self._was_below_lower: bool = False self._was_above_upper: bool = False + self._squeeze_threshold: float = 0.01 # Bandwidth below this = squeeze + self._in_squeeze: bool = False + self._squeeze_bars: int = 0 # How many bars in squeeze @property def warmup_period(self) -> int: @@ -28,6 +31,7 @@ class BollingerStrategy(BaseStrategy): self._period = int(params.get("period", 20)) self._num_std = float(params.get("num_std", 2.0)) self._min_bandwidth = float(params.get("min_bandwidth", 0.02)) + self._squeeze_threshold = float(params.get("squeeze_threshold", 0.01)) self._quantity = Decimal(str(params.get("quantity", "0.01"))) if self._period < 2: @@ -46,9 +50,12 @@ class BollingerStrategy(BaseStrategy): ) def reset(self) -> None: + super().reset() self._closes.clear() self._was_below_lower = False self._was_above_upper = False + self._in_squeeze = False + self._squeeze_bars = 0 def _bollinger_conviction(self, price: float, band: float, sma: float) -> float: """Map distance from band to conviction (0.1-1.0). @@ -75,12 +82,52 @@ class BollingerStrategy(BaseStrategy): upper = sma + self._num_std * std lower = sma - self._num_std * std + price = float(candle.close) + + # %B calculation + bandwidth = (upper - lower) / sma if sma > 0 else 0 + pct_b = (price - lower) / (upper - lower) if (upper - lower) > 0 else 0.5 + + # Squeeze detection + if bandwidth < self._squeeze_threshold: + self._in_squeeze = True + self._squeeze_bars += 1 + return None # Don't trade during squeeze, wait for breakout + elif self._in_squeeze: + # Squeeze just ended — breakout! + self._in_squeeze = False + squeeze_duration = self._squeeze_bars + self._squeeze_bars = 0 + + if price > sma: + # Breakout upward + conv = min(0.5 + squeeze_duration * 0.1, 1.0) + return self._apply_filters(Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.BUY, + price=candle.close, + quantity=self._quantity, + conviction=conv, + reason=f"Bollinger squeeze breakout UP after {squeeze_duration} bars", + )) + else: + # Breakout downward + conv = min(0.5 + squeeze_duration * 0.1, 1.0) + return self._apply_filters(Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.SELL, + price=candle.close, + quantity=self._quantity, + conviction=conv, + reason=f"Bollinger squeeze breakout DOWN after {squeeze_duration} bars", + )) + # Bandwidth filter: skip sideways markets - if sma != 0 and (upper - lower) / sma < self._min_bandwidth: + if sma != 0 and bandwidth < self._min_bandwidth: return None - price = float(candle.close) - # Track band penetration if price < lower: self._was_below_lower = True @@ -90,14 +137,14 @@ class BollingerStrategy(BaseStrategy): # BUY: was below lower band and recovered back inside if self._was_below_lower and price >= lower: self._was_below_lower = False - conviction = self._bollinger_conviction(price, lower, sma) + conv = max(1.0 - pct_b, 0.3) # Closer to lower band = higher conviction signal = Signal( strategy=self.name, symbol=candle.symbol, side=OrderSide.BUY, price=candle.close, quantity=self._quantity, - conviction=conviction, + conviction=conv, reason=f"Price recovered above lower Bollinger Band ({lower:.2f})", ) return self._apply_filters(signal) @@ -105,14 +152,14 @@ class BollingerStrategy(BaseStrategy): # SELL: was above upper band and recovered back inside if self._was_above_upper and price <= upper: self._was_above_upper = False - conviction = self._bollinger_conviction(price, upper, sma) + conv = max(pct_b, 0.3) # Closer to upper band = higher conviction signal = Signal( strategy=self.name, symbol=candle.symbol, side=OrderSide.SELL, price=candle.close, quantity=self._quantity, - conviction=conviction, + conviction=conv, reason=f"Price recovered below upper Bollinger Band ({upper:.2f})", ) return self._apply_filters(signal) diff --git a/services/strategy-engine/strategies/grid_strategy.py b/services/strategy-engine/strategies/grid_strategy.py index 70443ec..07ccaba 100644 --- a/services/strategy-engine/strategies/grid_strategy.py +++ b/services/strategy-engine/strategies/grid_strategy.py @@ -18,6 +18,9 @@ class GridStrategy(BaseStrategy): self._quantity: Decimal = Decimal("0.01") self._grid_levels: list[float] = [] self._last_zone: Optional[int] = None + self._exit_threshold_pct: float = 5.0 + self._out_of_range: bool = False + self._in_position: bool = False # Track if we have any grid positions @property def warmup_period(self) -> int: @@ -29,11 +32,17 @@ class GridStrategy(BaseStrategy): self._grid_count = int(params.get("grid_count", 5)) self._quantity = Decimal(str(params.get("quantity", "0.01"))) + self._exit_threshold_pct = float(params.get("exit_threshold_pct", 5.0)) + if self._lower_price >= self._upper_price: raise ValueError( f"Grid lower_price must be < upper_price, " f"got lower={self._lower_price}, upper={self._upper_price}" ) + if self._exit_threshold_pct <= 0: + raise ValueError( + f"exit_threshold_pct must be > 0, got {self._exit_threshold_pct}" + ) if self._grid_count < 2: raise ValueError(f"Grid grid_count must be >= 2, got {self._grid_count}") if self._quantity <= 0: @@ -53,7 +62,9 @@ class GridStrategy(BaseStrategy): ) def reset(self) -> None: + super().reset() self._last_zone = None + self._out_of_range = False def _get_zone(self, price: float) -> int: """Return the grid zone index for a given price. @@ -69,6 +80,26 @@ class GridStrategy(BaseStrategy): def on_candle(self, candle: Candle) -> Signal | None: self._update_filter_data(candle) price = float(candle.close) + + # Check if price is out of grid range + if self._grid_levels: + lower_bound = self._grid_levels[0] * (1 - self._exit_threshold_pct / 100) + upper_bound = self._grid_levels[-1] * (1 + self._exit_threshold_pct / 100) + + if price < lower_bound or price > upper_bound: + if not self._out_of_range: + self._out_of_range = True + # Exit signal — close positions + return self._apply_filters(Signal( + strategy=self.name, symbol=candle.symbol, + side=OrderSide.SELL, price=candle.close, + quantity=self._quantity, conviction=0.8, + reason=f"Grid: price {price:.2f} broke out of range [{self._grid_levels[0]:.2f}, {self._grid_levels[-1]:.2f}]", + )) + return None # Already out of range, no more signals + else: + self._out_of_range = False + current_zone = self._get_zone(price) if self._last_zone is None: -- cgit v1.2.3