diff options
Diffstat (limited to 'services/strategy-engine/strategies/indicators/trend.py')
| -rw-r--r-- | services/strategy-engine/strategies/indicators/trend.py | 104 |
1 files changed, 104 insertions, 0 deletions
diff --git a/services/strategy-engine/strategies/indicators/trend.py b/services/strategy-engine/strategies/indicators/trend.py new file mode 100644 index 0000000..10b69fa --- /dev/null +++ b/services/strategy-engine/strategies/indicators/trend.py @@ -0,0 +1,104 @@ +"""Trend indicators: EMA, SMA, MACD, ADX.""" +import pandas as pd +import numpy as np + + +def sma(series: pd.Series, period: int) -> pd.Series: + """Simple Moving Average.""" + return series.rolling(window=period).mean() + + +def ema(series: pd.Series, period: int) -> pd.Series: + """Exponential Moving Average.""" + return series.ewm(span=period, adjust=False).mean() + + +def macd( + closes: pd.Series, + fast: int = 12, + slow: int = 26, + signal: int = 9, +) -> tuple[pd.Series, pd.Series, pd.Series]: + """MACD indicator. + + Returns: (macd_line, signal_line, histogram) + """ + fast_ema = ema(closes, fast) + slow_ema = ema(closes, slow) + macd_line = fast_ema - slow_ema + signal_line = ema(macd_line, signal) + histogram = macd_line - signal_line + return macd_line, signal_line, histogram + + +def adx( + highs: pd.Series, + lows: pd.Series, + closes: pd.Series, + period: int = 14, +) -> pd.Series: + """Average Directional Index (ADX). + + Returns ADX series. Values > 25 indicate strong trend, < 20 indicate ranging. + """ + high = highs.values + low = lows.values + close = closes.values + n = len(close) + + if n < period + 1: + return pd.Series([np.nan] * n) + + # True Range + tr = np.zeros(n) + tr[0] = high[0] - low[0] + for i in range(1, n): + tr[i] = max( + high[i] - low[i], + abs(high[i] - close[i - 1]), + abs(low[i] - close[i - 1]), + ) + + # Directional Movement + plus_dm = np.zeros(n) + minus_dm = np.zeros(n) + for i in range(1, n): + up = high[i] - high[i - 1] + down = low[i - 1] - low[i] + plus_dm[i] = up if up > down and up > 0 else 0.0 + minus_dm[i] = down if down > up and down > 0 else 0.0 + + # Smoothed with Wilder's method + atr_vals = np.zeros(n) + plus_di = np.zeros(n) + minus_di = np.zeros(n) + + atr_vals[period] = np.mean(tr[1 : period + 1]) + plus_smooth = np.mean(plus_dm[1 : period + 1]) + minus_smooth = np.mean(minus_dm[1 : period + 1]) + + if atr_vals[period] > 0: + plus_di[period] = 100 * plus_smooth / atr_vals[period] + minus_di[period] = 100 * minus_smooth / atr_vals[period] + + for i in range(period + 1, n): + atr_vals[i] = (atr_vals[i - 1] * (period - 1) + tr[i]) / period + plus_smooth = (plus_smooth * (period - 1) + plus_dm[i]) / period + minus_smooth = (minus_smooth * (period - 1) + minus_dm[i]) / period + if atr_vals[i] > 0: + plus_di[i] = 100 * plus_smooth / atr_vals[i] + minus_di[i] = 100 * minus_smooth / atr_vals[i] + + # DX and ADX + dx = np.zeros(n) + for i in range(period, n): + di_sum = plus_di[i] + minus_di[i] + dx[i] = 100 * abs(plus_di[i] - minus_di[i]) / di_sum if di_sum > 0 else 0.0 + + adx_vals = np.full(n, np.nan) + if 2 * period < n: + adx_vals[2 * period] = np.mean(dx[period : 2 * period + 1]) + for i in range(2 * period + 1, n): + adx_vals[i] = (adx_vals[i - 1] * (period - 1) + dx[i]) / period + + return pd.Series(adx_vals, index=closes.index if hasattr(closes, 'index') else None) |
