diff options
| author | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-01 18:40:32 +0900 |
|---|---|---|
| committer | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-01 18:40:32 +0900 |
| commit | 0b0aace94fa633cd7a90c95ee89658167a8afd35 (patch) | |
| tree | 4f5dc36c301608ed3af4a1bba9b1a924ca99581c /services/strategy-engine/strategies | |
| parent | 8b0cf4e574390738ee33f7ff334dd5f5109b7819 (diff) | |
feat(strategy): add ADX regime filter, volume confirmation, and ATR stops to BaseStrategy
Diffstat (limited to 'services/strategy-engine/strategies')
9 files changed, 161 insertions, 3 deletions
diff --git a/services/strategy-engine/strategies/base.py b/services/strategy-engine/strategies/base.py index cf5e6e4..d5be675 100644 --- a/services/strategy-engine/strategies/base.py +++ b/services/strategy-engine/strategies/base.py @@ -1,10 +1,156 @@ from abc import ABC, abstractmethod +from collections import deque +from decimal import Decimal +from typing import Optional + +import pandas as pd + from shared.models import Candle, Signal +from strategies.indicators.trend import adx +from strategies.indicators.volatility import atr +from strategies.indicators.volume import volume_ratio class BaseStrategy(ABC): name: str = "base" + def __init__(self) -> None: + # Filter state — subclasses can enable by calling _init_filters() in their __init__ + self._filter_enabled: bool = False + self._highs: deque[float] = deque(maxlen=500) + self._lows: deque[float] = deque(maxlen=500) + self._closes_filter: deque[float] = deque(maxlen=500) + self._volumes: deque[float] = deque(maxlen=500) + # Filter config + self._adx_period: int = 14 + self._adx_threshold: float = 25.0 + self._require_trend: bool = True # True = trend-following, False = mean-reversion + self._volume_period: int = 20 + self._min_volume_ratio: float = 0.5 + self._atr_period: int = 14 + self._atr_stop_multiplier: float = 2.0 + self._atr_tp_multiplier: float = 3.0 + + def _init_filters( + self, + adx_period: int = 14, + adx_threshold: float = 25.0, + require_trend: bool = True, + volume_period: int = 20, + min_volume_ratio: float = 0.5, + atr_period: int = 14, + atr_stop_multiplier: float = 2.0, + atr_tp_multiplier: float = 3.0, + ) -> None: + """Enable filters. Call from subclass __init__ or configure().""" + self._filter_enabled = True + self._adx_period = adx_period + self._adx_threshold = adx_threshold + self._require_trend = require_trend + self._volume_period = volume_period + self._min_volume_ratio = min_volume_ratio + self._atr_period = atr_period + self._atr_stop_multiplier = atr_stop_multiplier + self._atr_tp_multiplier = atr_tp_multiplier + + def _update_filter_data(self, candle: Candle) -> None: + """Call at the start of on_candle() to track filter data.""" + if self._filter_enabled: + self._highs.append(float(candle.high)) + self._lows.append(float(candle.low)) + self._closes_filter.append(float(candle.close)) + self._volumes.append(float(candle.volume)) + + def _check_regime(self) -> bool: + """Check if current market regime matches strategy type. + + Returns True if signal should be allowed. + - require_trend=True (trend strategies): only trade when ADX > threshold + - require_trend=False (mean-reversion): only trade when ADX < threshold + """ + if not self._filter_enabled: + return True + if len(self._closes_filter) < self._adx_period * 2 + 1: + return True # Not enough data, allow by default + + highs = pd.Series(list(self._highs)) + lows = pd.Series(list(self._lows)) + closes = pd.Series(list(self._closes_filter)) + adx_value = adx(highs, lows, closes, self._adx_period).iloc[-1] + + if pd.isna(adx_value): + return True + + if self._require_trend: + return adx_value >= self._adx_threshold + return adx_value < self._adx_threshold + + def _check_volume(self) -> bool: + """Check if current volume is sufficient (above minimum ratio of average).""" + if not self._filter_enabled: + return True + if len(self._volumes) < self._volume_period + 1: + return True + + volumes = pd.Series(list(self._volumes)) + ratio = volume_ratio(volumes, self._volume_period).iloc[-1] + + if pd.isna(ratio): + return True + + return ratio >= self._min_volume_ratio + + def _calculate_atr_stops( + self, entry_price: Decimal, side: str + ) -> tuple[Optional[Decimal], Optional[Decimal]]: + """Calculate ATR-based stop-loss and take-profit. + + Returns (stop_loss, take_profit) as Decimal or (None, None) if not enough data. + """ + if not self._filter_enabled: + return None, None + if len(self._closes_filter) < self._atr_period + 1: + return None, None + + highs = pd.Series(list(self._highs)) + lows = pd.Series(list(self._lows)) + closes = pd.Series(list(self._closes_filter)) + atr_value = atr(highs, lows, closes, self._atr_period).iloc[-1] + + if pd.isna(atr_value) or atr_value == 0: + return None, None + + atr_dec = Decimal(str(atr_value)) + + if side == "BUY": + sl = entry_price - atr_dec * Decimal(str(self._atr_stop_multiplier)) + tp = entry_price + atr_dec * Decimal(str(self._atr_tp_multiplier)) + else: # SELL + sl = entry_price + atr_dec * Decimal(str(self._atr_stop_multiplier)) + tp = entry_price - atr_dec * Decimal(str(self._atr_tp_multiplier)) + + return sl, tp + + def _apply_filters(self, signal: Signal) -> Optional[Signal]: + """Apply all filters to a signal. Returns signal with SL/TP or None if filtered out.""" + if signal is None: + return None + + if not self._check_regime(): + return None + + if not self._check_volume(): + return None + + # Add ATR-based stops + sl, tp = self._calculate_atr_stops(signal.price, signal.side.value) + if sl is not None: + signal.stop_loss = sl + if tp is not None: + signal.take_profit = tp + + return signal + @property @abstractmethod def warmup_period(self) -> int: @@ -18,9 +164,13 @@ class BaseStrategy(ABC): def configure(self, params: dict) -> None: pass - def reset(self) -> None: - pass - def validate_params(self, params: dict) -> list[str]: """Validate parameters and return list of error messages. Empty = valid.""" return [] + + def reset(self) -> None: + if self._filter_enabled: + self._highs.clear() + self._lows.clear() + self._closes_filter.clear() + self._volumes.clear() diff --git a/services/strategy-engine/strategies/bollinger_strategy.py b/services/strategy-engine/strategies/bollinger_strategy.py index 4aceee4..1354182 100644 --- a/services/strategy-engine/strategies/bollinger_strategy.py +++ b/services/strategy-engine/strategies/bollinger_strategy.py @@ -11,6 +11,7 @@ class BollingerStrategy(BaseStrategy): name: str = "bollinger" def __init__(self) -> None: + super().__init__() self._closes: deque[float] = deque(maxlen=500) self._period: int = 20 self._num_std: float = 2.0 diff --git a/services/strategy-engine/strategies/combined_strategy.py b/services/strategy-engine/strategies/combined_strategy.py index 507ef5b..c70538d 100644 --- a/services/strategy-engine/strategies/combined_strategy.py +++ b/services/strategy-engine/strategies/combined_strategy.py @@ -16,6 +16,7 @@ class CombinedStrategy(BaseStrategy): name: str = "combined" def __init__(self) -> None: + super().__init__() self._strategies: list[tuple[BaseStrategy, float]] = [] # (strategy, weight) self._threshold: float = 0.5 self._quantity: Decimal = Decimal("0.01") diff --git a/services/strategy-engine/strategies/ema_crossover_strategy.py b/services/strategy-engine/strategies/ema_crossover_strategy.py index b0ccbbf..bc36f36 100644 --- a/services/strategy-engine/strategies/ema_crossover_strategy.py +++ b/services/strategy-engine/strategies/ema_crossover_strategy.py @@ -11,6 +11,7 @@ class EmaCrossoverStrategy(BaseStrategy): name: str = "ema_crossover" def __init__(self) -> None: + super().__init__() self._closes: deque[float] = deque(maxlen=500) self._short_period: int = 9 self._long_period: int = 21 diff --git a/services/strategy-engine/strategies/grid_strategy.py b/services/strategy-engine/strategies/grid_strategy.py index b65264c..1244eda 100644 --- a/services/strategy-engine/strategies/grid_strategy.py +++ b/services/strategy-engine/strategies/grid_strategy.py @@ -11,6 +11,7 @@ class GridStrategy(BaseStrategy): name: str = "grid" def __init__(self) -> None: + super().__init__() self._lower_price: float = 0.0 self._upper_price: float = 0.0 self._grid_count: int = 5 diff --git a/services/strategy-engine/strategies/macd_strategy.py b/services/strategy-engine/strategies/macd_strategy.py index e3bb35c..bf30ed3 100644 --- a/services/strategy-engine/strategies/macd_strategy.py +++ b/services/strategy-engine/strategies/macd_strategy.py @@ -11,6 +11,7 @@ class MacdStrategy(BaseStrategy): name: str = "macd" def __init__(self) -> None: + super().__init__() self._fast_period: int = 12 self._slow_period: int = 26 self._signal_period: int = 9 diff --git a/services/strategy-engine/strategies/rsi_strategy.py b/services/strategy-engine/strategies/rsi_strategy.py index 59946f4..490a8a9 100644 --- a/services/strategy-engine/strategies/rsi_strategy.py +++ b/services/strategy-engine/strategies/rsi_strategy.py @@ -28,6 +28,7 @@ class RsiStrategy(BaseStrategy): name: str = "rsi" def __init__(self) -> None: + super().__init__() self._closes: deque[float] = deque(maxlen=200) self._period: int = 14 self._oversold: float = 30.0 diff --git a/services/strategy-engine/strategies/volume_profile_strategy.py b/services/strategy-engine/strategies/volume_profile_strategy.py index b91e107..2cfa87a 100644 --- a/services/strategy-engine/strategies/volume_profile_strategy.py +++ b/services/strategy-engine/strategies/volume_profile_strategy.py @@ -11,6 +11,7 @@ class VolumeProfileStrategy(BaseStrategy): name: str = "volume_profile" def __init__(self) -> None: + super().__init__() self._lookback_period: int = 100 self._num_bins: int = 50 self._value_area_pct: float = 0.7 diff --git a/services/strategy-engine/strategies/vwap_strategy.py b/services/strategy-engine/strategies/vwap_strategy.py index f371c32..d220832 100644 --- a/services/strategy-engine/strategies/vwap_strategy.py +++ b/services/strategy-engine/strategies/vwap_strategy.py @@ -8,6 +8,7 @@ class VwapStrategy(BaseStrategy): name: str = "vwap" def __init__(self) -> None: + super().__init__() self._deviation_threshold: float = 0.002 self._quantity: Decimal = Decimal("0.01") self._cumulative_tp_vol: float = 0.0 |
