1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
"""Trend indicators: EMA, SMA, MACD, ADX."""
import pandas as pd
import numpy as np
def sma(series: pd.Series, period: int) -> pd.Series:
"""Simple Moving Average."""
return series.rolling(window=period).mean()
def ema(series: pd.Series, period: int) -> pd.Series:
"""Exponential Moving Average."""
return series.ewm(span=period, adjust=False).mean()
def macd(
closes: pd.Series,
fast: int = 12,
slow: int = 26,
signal: int = 9,
) -> tuple[pd.Series, pd.Series, pd.Series]:
"""MACD indicator.
Returns: (macd_line, signal_line, histogram)
"""
fast_ema = ema(closes, fast)
slow_ema = ema(closes, slow)
macd_line = fast_ema - slow_ema
signal_line = ema(macd_line, signal)
histogram = macd_line - signal_line
return macd_line, signal_line, histogram
def adx(
highs: pd.Series,
lows: pd.Series,
closes: pd.Series,
period: int = 14,
) -> pd.Series:
"""Average Directional Index (ADX).
Returns ADX series. Values > 25 indicate strong trend, < 20 indicate ranging.
"""
high = highs.values
low = lows.values
close = closes.values
n = len(close)
if n < period + 1:
return pd.Series([np.nan] * n)
# True Range
tr = np.zeros(n)
tr[0] = high[0] - low[0]
for i in range(1, n):
tr[i] = max(
high[i] - low[i],
abs(high[i] - close[i - 1]),
abs(low[i] - close[i - 1]),
)
# Directional Movement
plus_dm = np.zeros(n)
minus_dm = np.zeros(n)
for i in range(1, n):
up = high[i] - high[i - 1]
down = low[i - 1] - low[i]
plus_dm[i] = up if up > down and up > 0 else 0.0
minus_dm[i] = down if down > up and down > 0 else 0.0
# Smoothed with Wilder's method
atr_vals = np.zeros(n)
plus_di = np.zeros(n)
minus_di = np.zeros(n)
atr_vals[period] = np.mean(tr[1 : period + 1])
plus_smooth = np.mean(plus_dm[1 : period + 1])
minus_smooth = np.mean(minus_dm[1 : period + 1])
if atr_vals[period] > 0:
plus_di[period] = 100 * plus_smooth / atr_vals[period]
minus_di[period] = 100 * minus_smooth / atr_vals[period]
for i in range(period + 1, n):
atr_vals[i] = (atr_vals[i - 1] * (period - 1) + tr[i]) / period
plus_smooth = (plus_smooth * (period - 1) + plus_dm[i]) / period
minus_smooth = (minus_smooth * (period - 1) + minus_dm[i]) / period
if atr_vals[i] > 0:
plus_di[i] = 100 * plus_smooth / atr_vals[i]
minus_di[i] = 100 * minus_smooth / atr_vals[i]
# DX and ADX
dx = np.zeros(n)
for i in range(period, n):
di_sum = plus_di[i] + minus_di[i]
dx[i] = 100 * abs(plus_di[i] - minus_di[i]) / di_sum if di_sum > 0 else 0.0
adx_vals = np.full(n, np.nan)
if 2 * period < n:
adx_vals[2 * period] = np.mean(dx[period : 2 * period + 1])
for i in range(2 * period + 1, n):
adx_vals[i] = (adx_vals[i - 1] * (period - 1) + dx[i]) / period
return pd.Series(adx_vals, index=closes.index if hasattr(closes, 'index') else None)
|