"""Trend indicators: EMA, SMA, MACD, ADX.""" import pandas as pd import numpy as np def sma(series: pd.Series, period: int) -> pd.Series: """Simple Moving Average.""" return series.rolling(window=period).mean() def ema(series: pd.Series, period: int) -> pd.Series: """Exponential Moving Average.""" return series.ewm(span=period, adjust=False).mean() def macd( closes: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9, ) -> tuple[pd.Series, pd.Series, pd.Series]: """MACD indicator. Returns: (macd_line, signal_line, histogram) """ fast_ema = ema(closes, fast) slow_ema = ema(closes, slow) macd_line = fast_ema - slow_ema signal_line = ema(macd_line, signal) histogram = macd_line - signal_line return macd_line, signal_line, histogram def adx( highs: pd.Series, lows: pd.Series, closes: pd.Series, period: int = 14, ) -> pd.Series: """Average Directional Index (ADX). Returns ADX series. Values > 25 indicate strong trend, < 20 indicate ranging. """ high = highs.values low = lows.values close = closes.values n = len(close) if n < period + 1: return pd.Series([np.nan] * n) # True Range tr = np.zeros(n) tr[0] = high[0] - low[0] for i in range(1, n): tr[i] = max( high[i] - low[i], abs(high[i] - close[i - 1]), abs(low[i] - close[i - 1]), ) # Directional Movement plus_dm = np.zeros(n) minus_dm = np.zeros(n) for i in range(1, n): up = high[i] - high[i - 1] down = low[i - 1] - low[i] plus_dm[i] = up if up > down and up > 0 else 0.0 minus_dm[i] = down if down > up and down > 0 else 0.0 # Smoothed with Wilder's method atr_vals = np.zeros(n) plus_di = np.zeros(n) minus_di = np.zeros(n) atr_vals[period] = np.mean(tr[1 : period + 1]) plus_smooth = np.mean(plus_dm[1 : period + 1]) minus_smooth = np.mean(minus_dm[1 : period + 1]) if atr_vals[period] > 0: plus_di[period] = 100 * plus_smooth / atr_vals[period] minus_di[period] = 100 * minus_smooth / atr_vals[period] for i in range(period + 1, n): atr_vals[i] = (atr_vals[i - 1] * (period - 1) + tr[i]) / period plus_smooth = (plus_smooth * (period - 1) + plus_dm[i]) / period minus_smooth = (minus_smooth * (period - 1) + minus_dm[i]) / period if atr_vals[i] > 0: plus_di[i] = 100 * plus_smooth / atr_vals[i] minus_di[i] = 100 * minus_smooth / atr_vals[i] # DX and ADX dx = np.zeros(n) for i in range(period, n): di_sum = plus_di[i] + minus_di[i] dx[i] = 100 * abs(plus_di[i] - minus_di[i]) / di_sum if di_sum > 0 else 0.0 adx_vals = np.full(n, np.nan) if 2 * period < n: adx_vals[2 * period] = np.mean(dx[period : 2 * period + 1]) for i in range(2 * period + 1, n): adx_vals[i] = (adx_vals[i - 1] * (period - 1) + dx[i]) / period return pd.Series(adx_vals, index=closes.index if hasattr(closes, 'index') else None)