summaryrefslogtreecommitdiff
path: root/services/strategy-engine/strategies/indicators/trend.py
blob: 10b69fab7b2bd8c51866b0b87850dd56dd07961b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
"""Trend indicators: EMA, SMA, MACD, ADX."""
import pandas as pd
import numpy as np


def sma(series: pd.Series, period: int) -> pd.Series:
    """Simple Moving Average."""
    return series.rolling(window=period).mean()


def ema(series: pd.Series, period: int) -> pd.Series:
    """Exponential Moving Average."""
    return series.ewm(span=period, adjust=False).mean()


def macd(
    closes: pd.Series,
    fast: int = 12,
    slow: int = 26,
    signal: int = 9,
) -> tuple[pd.Series, pd.Series, pd.Series]:
    """MACD indicator.

    Returns: (macd_line, signal_line, histogram)
    """
    fast_ema = ema(closes, fast)
    slow_ema = ema(closes, slow)
    macd_line = fast_ema - slow_ema
    signal_line = ema(macd_line, signal)
    histogram = macd_line - signal_line
    return macd_line, signal_line, histogram


def adx(
    highs: pd.Series,
    lows: pd.Series,
    closes: pd.Series,
    period: int = 14,
) -> pd.Series:
    """Average Directional Index (ADX).

    Returns ADX series. Values > 25 indicate strong trend, < 20 indicate ranging.
    """
    high = highs.values
    low = lows.values
    close = closes.values
    n = len(close)

    if n < period + 1:
        return pd.Series([np.nan] * n)

    # True Range
    tr = np.zeros(n)
    tr[0] = high[0] - low[0]
    for i in range(1, n):
        tr[i] = max(
            high[i] - low[i],
            abs(high[i] - close[i - 1]),
            abs(low[i] - close[i - 1]),
        )

    # Directional Movement
    plus_dm = np.zeros(n)
    minus_dm = np.zeros(n)
    for i in range(1, n):
        up = high[i] - high[i - 1]
        down = low[i - 1] - low[i]
        plus_dm[i] = up if up > down and up > 0 else 0.0
        minus_dm[i] = down if down > up and down > 0 else 0.0

    # Smoothed with Wilder's method
    atr_vals = np.zeros(n)
    plus_di = np.zeros(n)
    minus_di = np.zeros(n)

    atr_vals[period] = np.mean(tr[1 : period + 1])
    plus_smooth = np.mean(plus_dm[1 : period + 1])
    minus_smooth = np.mean(minus_dm[1 : period + 1])

    if atr_vals[period] > 0:
        plus_di[period] = 100 * plus_smooth / atr_vals[period]
        minus_di[period] = 100 * minus_smooth / atr_vals[period]

    for i in range(period + 1, n):
        atr_vals[i] = (atr_vals[i - 1] * (period - 1) + tr[i]) / period
        plus_smooth = (plus_smooth * (period - 1) + plus_dm[i]) / period
        minus_smooth = (minus_smooth * (period - 1) + minus_dm[i]) / period
        if atr_vals[i] > 0:
            plus_di[i] = 100 * plus_smooth / atr_vals[i]
            minus_di[i] = 100 * minus_smooth / atr_vals[i]

    # DX and ADX
    dx = np.zeros(n)
    for i in range(period, n):
        di_sum = plus_di[i] + minus_di[i]
        dx[i] = 100 * abs(plus_di[i] - minus_di[i]) / di_sum if di_sum > 0 else 0.0

    adx_vals = np.full(n, np.nan)
    if 2 * period < n:
        adx_vals[2 * period] = np.mean(dx[period : 2 * period + 1])
        for i in range(2 * period + 1, n):
            adx_vals[i] = (adx_vals[i - 1] * (period - 1) + dx[i]) / period

    return pd.Series(adx_vals, index=closes.index if hasattr(closes, 'index') else None)