summaryrefslogtreecommitdiff
path: root/services/strategy-engine
diff options
context:
space:
mode:
Diffstat (limited to 'services/strategy-engine')
-rw-r--r--services/strategy-engine/src/strategy_engine/config.py2
-rw-r--r--services/strategy-engine/src/strategy_engine/main.py1
-rw-r--r--services/strategy-engine/strategies/bollinger_strategy.py65
-rw-r--r--services/strategy-engine/strategies/combined_strategy.py45
-rw-r--r--services/strategy-engine/strategies/config/grid_strategy.yaml6
-rw-r--r--services/strategy-engine/strategies/config/moc_strategy.yaml13
-rw-r--r--services/strategy-engine/strategies/ema_crossover_strategy.py82
-rw-r--r--services/strategy-engine/strategies/grid_strategy.py34
-rw-r--r--services/strategy-engine/strategies/indicators/__init__.py17
-rw-r--r--services/strategy-engine/strategies/indicators/momentum.py1
-rw-r--r--services/strategy-engine/strategies/indicators/trend.py3
-rw-r--r--services/strategy-engine/strategies/indicators/volatility.py4
-rw-r--r--services/strategy-engine/strategies/indicators/volume.py1
-rw-r--r--services/strategy-engine/strategies/macd_strategy.py50
-rw-r--r--services/strategy-engine/strategies/moc_strategy.py230
-rw-r--r--services/strategy-engine/strategies/rsi_strategy.py86
-rw-r--r--services/strategy-engine/strategies/volume_profile_strategy.py52
-rw-r--r--services/strategy-engine/strategies/vwap_strategy.py49
-rw-r--r--services/strategy-engine/tests/test_base_filters.py21
-rw-r--r--services/strategy-engine/tests/test_bollinger_strategy.py80
-rw-r--r--services/strategy-engine/tests/test_combined_strategy.py59
-rw-r--r--services/strategy-engine/tests/test_ema_crossover_strategy.py122
-rw-r--r--services/strategy-engine/tests/test_engine.py10
-rw-r--r--services/strategy-engine/tests/test_grid_strategy.py40
-rw-r--r--services/strategy-engine/tests/test_indicators.py2
-rw-r--r--services/strategy-engine/tests/test_macd_strategy.py62
-rw-r--r--services/strategy-engine/tests/test_moc_strategy.py152
-rw-r--r--services/strategy-engine/tests/test_multi_symbol.py24
-rw-r--r--services/strategy-engine/tests/test_rsi_strategy.py59
-rw-r--r--services/strategy-engine/tests/test_volume_profile_strategy.py55
-rw-r--r--services/strategy-engine/tests/test_vwap_strategy.py50
31 files changed, 1395 insertions, 82 deletions
diff --git a/services/strategy-engine/src/strategy_engine/config.py b/services/strategy-engine/src/strategy_engine/config.py
index e3a49c2..9fd9c49 100644
--- a/services/strategy-engine/src/strategy_engine/config.py
+++ b/services/strategy-engine/src/strategy_engine/config.py
@@ -4,6 +4,6 @@ from shared.config import Settings
class StrategyConfig(Settings):
- symbols: list[str] = ["BTC/USDT"]
+ symbols: list[str] = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA"]
timeframes: list[str] = ["1m"]
strategy_params: dict = {}
diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py
index 4549f70..30de528 100644
--- a/services/strategy-engine/src/strategy_engine/main.py
+++ b/services/strategy-engine/src/strategy_engine/main.py
@@ -67,7 +67,6 @@ async def run() -> None:
task = asyncio.create_task(process_symbol(engine, stream, log))
tasks.append(task)
- # Wait for all symbol processors (they run forever until cancelled)
await asyncio.gather(*tasks)
except Exception as exc:
log.error("fatal_error", error=str(exc))
diff --git a/services/strategy-engine/strategies/bollinger_strategy.py b/services/strategy-engine/strategies/bollinger_strategy.py
index e53ecaa..ebe7967 100644
--- a/services/strategy-engine/strategies/bollinger_strategy.py
+++ b/services/strategy-engine/strategies/bollinger_strategy.py
@@ -19,6 +19,9 @@ class BollingerStrategy(BaseStrategy):
self._quantity: Decimal = Decimal("0.01")
self._was_below_lower: bool = False
self._was_above_upper: bool = False
+ self._squeeze_threshold: float = 0.01 # Bandwidth below this = squeeze
+ self._in_squeeze: bool = False
+ self._squeeze_bars: int = 0 # How many bars in squeeze
@property
def warmup_period(self) -> int:
@@ -28,6 +31,7 @@ class BollingerStrategy(BaseStrategy):
self._period = int(params.get("period", 20))
self._num_std = float(params.get("num_std", 2.0))
self._min_bandwidth = float(params.get("min_bandwidth", 0.02))
+ self._squeeze_threshold = float(params.get("squeeze_threshold", 0.01))
self._quantity = Decimal(str(params.get("quantity", "0.01")))
if self._period < 2:
@@ -46,9 +50,12 @@ class BollingerStrategy(BaseStrategy):
)
def reset(self) -> None:
+ super().reset()
self._closes.clear()
self._was_below_lower = False
self._was_above_upper = False
+ self._in_squeeze = False
+ self._squeeze_bars = 0
def _bollinger_conviction(self, price: float, band: float, sma: float) -> float:
"""Map distance from band to conviction (0.1-1.0).
@@ -75,12 +82,56 @@ class BollingerStrategy(BaseStrategy):
upper = sma + self._num_std * std
lower = sma - self._num_std * std
+ price = float(candle.close)
+
+ # %B calculation
+ bandwidth = (upper - lower) / sma if sma > 0 else 0
+ pct_b = (price - lower) / (upper - lower) if (upper - lower) > 0 else 0.5
+
+ # Squeeze detection
+ if bandwidth < self._squeeze_threshold:
+ self._in_squeeze = True
+ self._squeeze_bars += 1
+ return None # Don't trade during squeeze, wait for breakout
+ elif self._in_squeeze:
+ # Squeeze just ended — breakout!
+ self._in_squeeze = False
+ squeeze_duration = self._squeeze_bars
+ self._squeeze_bars = 0
+
+ if price > sma:
+ # Breakout upward
+ conv = min(0.5 + squeeze_duration * 0.1, 1.0)
+ return self._apply_filters(
+ Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.BUY,
+ price=candle.close,
+ quantity=self._quantity,
+ conviction=conv,
+ reason=f"Bollinger squeeze breakout UP after {squeeze_duration} bars",
+ )
+ )
+ else:
+ # Breakout downward
+ conv = min(0.5 + squeeze_duration * 0.1, 1.0)
+ return self._apply_filters(
+ Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.SELL,
+ price=candle.close,
+ quantity=self._quantity,
+ conviction=conv,
+ reason=f"Bollinger squeeze breakout DOWN after {squeeze_duration} bars",
+ )
+ )
+
# Bandwidth filter: skip sideways markets
- if sma != 0 and (upper - lower) / sma < self._min_bandwidth:
+ if sma != 0 and bandwidth < self._min_bandwidth:
return None
- price = float(candle.close)
-
# Track band penetration
if price < lower:
self._was_below_lower = True
@@ -90,14 +141,14 @@ class BollingerStrategy(BaseStrategy):
# BUY: was below lower band and recovered back inside
if self._was_below_lower and price >= lower:
self._was_below_lower = False
- conviction = self._bollinger_conviction(price, lower, sma)
+ conv = max(1.0 - pct_b, 0.3) # Closer to lower band = higher conviction
signal = Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.BUY,
price=candle.close,
quantity=self._quantity,
- conviction=conviction,
+ conviction=conv,
reason=f"Price recovered above lower Bollinger Band ({lower:.2f})",
)
return self._apply_filters(signal)
@@ -105,14 +156,14 @@ class BollingerStrategy(BaseStrategy):
# SELL: was above upper band and recovered back inside
if self._was_above_upper and price <= upper:
self._was_above_upper = False
- conviction = self._bollinger_conviction(price, upper, sma)
+ conv = max(pct_b, 0.3) # Closer to upper band = higher conviction
signal = Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.SELL,
price=candle.close,
quantity=self._quantity,
- conviction=conviction,
+ conviction=conv,
reason=f"Price recovered below upper Bollinger Band ({upper:.2f})",
)
return self._apply_filters(signal)
diff --git a/services/strategy-engine/strategies/combined_strategy.py b/services/strategy-engine/strategies/combined_strategy.py
index be1cbed..ba92485 100644
--- a/services/strategy-engine/strategies/combined_strategy.py
+++ b/services/strategy-engine/strategies/combined_strategy.py
@@ -20,6 +20,9 @@ class CombinedStrategy(BaseStrategy):
self._strategies: list[tuple[BaseStrategy, float]] = [] # (strategy, weight)
self._threshold: float = 0.5
self._quantity: Decimal = Decimal("0.01")
+ self._trade_history: dict[str, list[bool]] = {} # strategy_name -> [win, loss, ...]
+ self._adaptive_weights: bool = False
+ self._history_window: int = 20 # Last N signals to evaluate
@property
def warmup_period(self) -> int:
@@ -30,6 +33,8 @@ class CombinedStrategy(BaseStrategy):
def configure(self, params: dict) -> None:
self._threshold = float(params.get("threshold", 0.5))
self._quantity = Decimal(str(params.get("quantity", "0.01")))
+ self._adaptive_weights = bool(params.get("adaptive_weights", False))
+ self._history_window = int(params.get("history_window", 20))
if self._threshold <= 0:
raise ValueError(f"Threshold must be positive, got {self._threshold}")
if self._quantity <= 0:
@@ -41,6 +46,31 @@ class CombinedStrategy(BaseStrategy):
raise ValueError(f"Weight must be positive, got {weight}")
self._strategies.append((strategy, weight))
+ def record_result(self, strategy_name: str, is_win: bool) -> None:
+ """Record a trade result for adaptive weighting."""
+ if strategy_name not in self._trade_history:
+ self._trade_history[strategy_name] = []
+ self._trade_history[strategy_name].append(is_win)
+ # Keep only last N results
+ if len(self._trade_history[strategy_name]) > self._history_window:
+ self._trade_history[strategy_name] = self._trade_history[strategy_name][
+ -self._history_window :
+ ]
+
+ def _get_adaptive_weight(self, strategy_name: str, base_weight: float) -> float:
+ """Get weight adjusted by recent performance."""
+ if not self._adaptive_weights:
+ return base_weight
+
+ history = self._trade_history.get(strategy_name, [])
+ if len(history) < 5: # Not enough data, use base weight
+ return base_weight
+
+ win_rate = sum(1 for w in history if w) / len(history)
+ # Scale weight: 0.5x at 20% win rate, 1.0x at 50%, 1.5x at 80%
+ scale = 0.5 + win_rate # Range: 0.5 to 1.5
+ return base_weight * scale
+
def reset(self) -> None:
for strategy, _ in self._strategies:
strategy.reset()
@@ -49,7 +79,7 @@ class CombinedStrategy(BaseStrategy):
if not self._strategies:
return None
- total_weight = sum(w for _, w in self._strategies)
+ total_weight = sum(self._get_adaptive_weight(s.name, w) for s, w in self._strategies)
if total_weight == 0:
return None
@@ -59,12 +89,17 @@ class CombinedStrategy(BaseStrategy):
for strategy, weight in self._strategies:
signal = strategy.on_candle(candle)
if signal is not None:
+ effective_weight = self._get_adaptive_weight(strategy.name, weight)
if signal.side == OrderSide.BUY:
- score += weight * signal.conviction
- reasons.append(f"{strategy.name}:BUY({weight}*{signal.conviction:.2f})")
+ score += effective_weight * signal.conviction
+ reasons.append(
+ f"{strategy.name}:BUY({effective_weight}*{signal.conviction:.2f})"
+ )
elif signal.side == OrderSide.SELL:
- score -= weight * signal.conviction
- reasons.append(f"{strategy.name}:SELL({weight}*{signal.conviction:.2f})")
+ score -= effective_weight * signal.conviction
+ reasons.append(
+ f"{strategy.name}:SELL({effective_weight}*{signal.conviction:.2f})"
+ )
normalized = score / total_weight # Range: -1.0 to 1.0
diff --git a/services/strategy-engine/strategies/config/grid_strategy.yaml b/services/strategy-engine/strategies/config/grid_strategy.yaml
index 607f3df..338bb4c 100644
--- a/services/strategy-engine/strategies/config/grid_strategy.yaml
+++ b/services/strategy-engine/strategies/config/grid_strategy.yaml
@@ -1,4 +1,4 @@
-lower_price: 60000
-upper_price: 70000
+lower_price: 170
+upper_price: 190
grid_count: 5
-quantity: "0.01"
+quantity: "1"
diff --git a/services/strategy-engine/strategies/config/moc_strategy.yaml b/services/strategy-engine/strategies/config/moc_strategy.yaml
new file mode 100644
index 0000000..349ae1b
--- /dev/null
+++ b/services/strategy-engine/strategies/config/moc_strategy.yaml
@@ -0,0 +1,13 @@
+# Market on Close (MOC) Strategy — US Stocks
+quantity_pct: 0.2 # 20% of capital per position
+stop_loss_pct: 2.0 # -2% stop loss
+rsi_min: 30 # RSI lower bound
+rsi_max: 60 # RSI upper bound (not overbought)
+ema_period: 20 # EMA for trend confirmation
+volume_avg_period: 20 # Volume average lookback
+min_volume_ratio: 1.0 # Volume must be >= average
+buy_start_utc: 19 # Buy window start (15:00 ET summer)
+buy_end_utc: 21 # Buy window end (16:00 ET)
+sell_start_utc: 13 # Sell window start (9:00 ET)
+sell_end_utc: 15 # Sell window end (10:00 ET)
+max_positions: 5 # Max simultaneous positions
diff --git a/services/strategy-engine/strategies/ema_crossover_strategy.py b/services/strategy-engine/strategies/ema_crossover_strategy.py
index a812eff..68d0ba3 100644
--- a/services/strategy-engine/strategies/ema_crossover_strategy.py
+++ b/services/strategy-engine/strategies/ema_crossover_strategy.py
@@ -17,6 +17,9 @@ class EmaCrossoverStrategy(BaseStrategy):
self._long_period: int = 21
self._quantity: Decimal = Decimal("0.01")
self._prev_short_above: bool | None = None
+ self._pending_signal: str | None = None # "BUY" or "SELL" if waiting for pullback
+ self._pullback_enabled: bool = True
+ self._pullback_tolerance: float = 0.002 # 0.2% tolerance around short EMA
@property
def warmup_period(self) -> int:
@@ -27,6 +30,9 @@ class EmaCrossoverStrategy(BaseStrategy):
self._long_period = int(params.get("long_period", 21))
self._quantity = Decimal(str(params.get("quantity", "0.01")))
+ self._pullback_enabled = bool(params.get("pullback_enabled", True))
+ self._pullback_tolerance = float(params.get("pullback_tolerance", 0.002))
+
if self._short_period >= self._long_period:
raise ValueError(
f"EMA short_period must be < long_period, "
@@ -48,8 +54,10 @@ class EmaCrossoverStrategy(BaseStrategy):
)
def reset(self) -> None:
+ super().reset()
self._closes.clear()
self._prev_short_above = None
+ self._pending_signal = None
def _ema_conviction(self, short_ema: float, long_ema: float, price: float) -> float:
"""Map EMA gap to conviction (0.1-1.0). Larger gap = stronger crossover."""
@@ -70,33 +78,87 @@ class EmaCrossoverStrategy(BaseStrategy):
short_ema = series.ewm(span=self._short_period, adjust=False).mean().iloc[-1]
long_ema = series.ewm(span=self._long_period, adjust=False).mean().iloc[-1]
+ close = float(candle.close)
short_above = short_ema > long_ema
signal = None
if self._prev_short_above is not None:
- conviction = self._ema_conviction(short_ema, long_ema, float(candle.close))
- if not self._prev_short_above and short_above:
+ prev = self._prev_short_above
+ conviction = self._ema_conviction(short_ema, long_ema, close)
+
+ # Golden Cross detected
+ if not prev and short_above:
+ if self._pullback_enabled:
+ self._pending_signal = "BUY"
+ # Don't signal yet — wait for pullback
+ else:
+ signal = Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.BUY,
+ price=candle.close,
+ quantity=self._quantity,
+ conviction=conviction,
+ reason=f"Golden Cross: short EMA ({short_ema:.2f}) crossed above long EMA ({long_ema:.2f})",
+ )
+
+ # Death Cross detected
+ elif prev and not short_above:
+ if self._pullback_enabled:
+ self._pending_signal = "SELL"
+ else:
+ signal = Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.SELL,
+ price=candle.close,
+ quantity=self._quantity,
+ conviction=conviction,
+ reason=f"Death Cross: short EMA ({short_ema:.2f}) crossed below long EMA ({long_ema:.2f})",
+ )
+
+ self._prev_short_above = short_above
+
+ if signal is not None:
+ return self._apply_filters(signal)
+
+ # Check for pullback entry
+ if self._pending_signal == "BUY":
+ distance = abs(close - short_ema) / short_ema if short_ema > 0 else 999
+ if distance <= self._pullback_tolerance:
+ self._pending_signal = None
+ conv = min(0.5 + (1.0 - distance / self._pullback_tolerance) * 0.5, 1.0)
signal = Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.BUY,
price=candle.close,
quantity=self._quantity,
- conviction=conviction,
- reason=f"Golden Cross: short EMA ({short_ema:.2f}) crossed above long EMA ({long_ema:.2f})",
+ conviction=conv,
+ reason=f"EMA Golden Cross pullback entry (distance={distance:.4f})",
)
- elif self._prev_short_above and not short_above:
+ return self._apply_filters(signal)
+ # Cancel if crossover reverses
+ if not short_above:
+ self._pending_signal = None
+
+ if self._pending_signal == "SELL":
+ distance = abs(close - short_ema) / short_ema if short_ema > 0 else 999
+ if distance <= self._pullback_tolerance:
+ self._pending_signal = None
+ conv = min(0.5 + (1.0 - distance / self._pullback_tolerance) * 0.5, 1.0)
signal = Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.SELL,
price=candle.close,
quantity=self._quantity,
- conviction=conviction,
- reason=f"Death Cross: short EMA ({short_ema:.2f}) crossed below long EMA ({long_ema:.2f})",
+ conviction=conv,
+ reason=f"EMA Death Cross pullback entry (distance={distance:.4f})",
)
+ return self._apply_filters(signal)
+ # Cancel if crossover reverses
+ if short_above:
+ self._pending_signal = None
- self._prev_short_above = short_above
- if signal is not None:
- return self._apply_filters(signal)
return None
diff --git a/services/strategy-engine/strategies/grid_strategy.py b/services/strategy-engine/strategies/grid_strategy.py
index 70443ec..283bfe5 100644
--- a/services/strategy-engine/strategies/grid_strategy.py
+++ b/services/strategy-engine/strategies/grid_strategy.py
@@ -18,6 +18,9 @@ class GridStrategy(BaseStrategy):
self._quantity: Decimal = Decimal("0.01")
self._grid_levels: list[float] = []
self._last_zone: Optional[int] = None
+ self._exit_threshold_pct: float = 5.0
+ self._out_of_range: bool = False
+ self._in_position: bool = False # Track if we have any grid positions
@property
def warmup_period(self) -> int:
@@ -29,11 +32,15 @@ class GridStrategy(BaseStrategy):
self._grid_count = int(params.get("grid_count", 5))
self._quantity = Decimal(str(params.get("quantity", "0.01")))
+ self._exit_threshold_pct = float(params.get("exit_threshold_pct", 5.0))
+
if self._lower_price >= self._upper_price:
raise ValueError(
f"Grid lower_price must be < upper_price, "
f"got lower={self._lower_price}, upper={self._upper_price}"
)
+ if self._exit_threshold_pct <= 0:
+ raise ValueError(f"exit_threshold_pct must be > 0, got {self._exit_threshold_pct}")
if self._grid_count < 2:
raise ValueError(f"Grid grid_count must be >= 2, got {self._grid_count}")
if self._quantity <= 0:
@@ -53,7 +60,9 @@ class GridStrategy(BaseStrategy):
)
def reset(self) -> None:
+ super().reset()
self._last_zone = None
+ self._out_of_range = False
def _get_zone(self, price: float) -> int:
"""Return the grid zone index for a given price.
@@ -69,6 +78,31 @@ class GridStrategy(BaseStrategy):
def on_candle(self, candle: Candle) -> Signal | None:
self._update_filter_data(candle)
price = float(candle.close)
+
+ # Check if price is out of grid range
+ if self._grid_levels:
+ lower_bound = self._grid_levels[0] * (1 - self._exit_threshold_pct / 100)
+ upper_bound = self._grid_levels[-1] * (1 + self._exit_threshold_pct / 100)
+
+ if price < lower_bound or price > upper_bound:
+ if not self._out_of_range:
+ self._out_of_range = True
+ # Exit signal — close positions
+ return self._apply_filters(
+ Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.SELL,
+ price=candle.close,
+ quantity=self._quantity,
+ conviction=0.8,
+ reason=f"Grid: price {price:.2f} broke out of range [{self._grid_levels[0]:.2f}, {self._grid_levels[-1]:.2f}]",
+ )
+ )
+ return None # Already out of range, no more signals
+ else:
+ self._out_of_range = False
+
current_zone = self._get_zone(price)
if self._last_zone is None:
diff --git a/services/strategy-engine/strategies/indicators/__init__.py b/services/strategy-engine/strategies/indicators/__init__.py
index 1a54d59..3c713e6 100644
--- a/services/strategy-engine/strategies/indicators/__init__.py
+++ b/services/strategy-engine/strategies/indicators/__init__.py
@@ -1,12 +1,21 @@
"""Reusable technical indicator functions."""
+
from strategies.indicators.trend import ema, sma, macd, adx
from strategies.indicators.volatility import atr, bollinger_bands, keltner_channels
from strategies.indicators.momentum import rsi, stochastic
from strategies.indicators.volume import volume_sma, volume_ratio, obv
__all__ = [
- "ema", "sma", "macd", "adx",
- "atr", "bollinger_bands", "keltner_channels",
- "rsi", "stochastic",
- "volume_sma", "volume_ratio", "obv",
+ "ema",
+ "sma",
+ "macd",
+ "adx",
+ "atr",
+ "bollinger_bands",
+ "keltner_channels",
+ "rsi",
+ "stochastic",
+ "volume_sma",
+ "volume_ratio",
+ "obv",
]
diff --git a/services/strategy-engine/strategies/indicators/momentum.py b/services/strategy-engine/strategies/indicators/momentum.py
index 395c52d..c479452 100644
--- a/services/strategy-engine/strategies/indicators/momentum.py
+++ b/services/strategy-engine/strategies/indicators/momentum.py
@@ -1,4 +1,5 @@
"""Momentum indicators: RSI, Stochastic."""
+
import pandas as pd
import numpy as np
diff --git a/services/strategy-engine/strategies/indicators/trend.py b/services/strategy-engine/strategies/indicators/trend.py
index 10b69fa..c94a071 100644
--- a/services/strategy-engine/strategies/indicators/trend.py
+++ b/services/strategy-engine/strategies/indicators/trend.py
@@ -1,4 +1,5 @@
"""Trend indicators: EMA, SMA, MACD, ADX."""
+
import pandas as pd
import numpy as np
@@ -101,4 +102,4 @@ def adx(
for i in range(2 * period + 1, n):
adx_vals[i] = (adx_vals[i - 1] * (period - 1) + dx[i]) / period
- return pd.Series(adx_vals, index=closes.index if hasattr(closes, 'index') else None)
+ return pd.Series(adx_vals, index=closes.index if hasattr(closes, "index") else None)
diff --git a/services/strategy-engine/strategies/indicators/volatility.py b/services/strategy-engine/strategies/indicators/volatility.py
index d47eb86..c16143e 100644
--- a/services/strategy-engine/strategies/indicators/volatility.py
+++ b/services/strategy-engine/strategies/indicators/volatility.py
@@ -1,4 +1,5 @@
"""Volatility indicators: ATR, Bollinger Bands, Keltner Channels."""
+
import pandas as pd
import numpy as np
@@ -30,7 +31,7 @@ def atr(
for i in range(period, n):
atr_vals[i] = (atr_vals[i - 1] * (period - 1) + tr[i]) / period
- return pd.Series(atr_vals, index=closes.index if hasattr(closes, 'index') else None)
+ return pd.Series(atr_vals, index=closes.index if hasattr(closes, "index") else None)
def bollinger_bands(
@@ -62,6 +63,7 @@ def keltner_channels(
Returns: (upper_channel, middle_ema, lower_channel)
"""
from strategies.indicators.trend import ema as calc_ema
+
middle = calc_ema(closes, ema_period)
atr_vals = atr(highs, lows, closes, atr_period)
upper = middle + atr_multiplier * atr_vals
diff --git a/services/strategy-engine/strategies/indicators/volume.py b/services/strategy-engine/strategies/indicators/volume.py
index 323d427..502f1ce 100644
--- a/services/strategy-engine/strategies/indicators/volume.py
+++ b/services/strategy-engine/strategies/indicators/volume.py
@@ -1,4 +1,5 @@
"""Volume indicators: Volume SMA, Volume Ratio, OBV."""
+
import pandas as pd
import numpy as np
diff --git a/services/strategy-engine/strategies/macd_strategy.py b/services/strategy-engine/strategies/macd_strategy.py
index 67c5e44..356a42b 100644
--- a/services/strategy-engine/strategies/macd_strategy.py
+++ b/services/strategy-engine/strategies/macd_strategy.py
@@ -18,6 +18,8 @@ class MacdStrategy(BaseStrategy):
self._quantity: Decimal = Decimal("0.01")
self._closes: deque[float] = deque(maxlen=500)
self._prev_histogram: float | None = None
+ self._prev_macd: float | None = None
+ self._prev_signal: float | None = None
@property
def warmup_period(self) -> int:
@@ -54,6 +56,8 @@ class MacdStrategy(BaseStrategy):
def reset(self) -> None:
self._closes.clear()
self._prev_histogram = None
+ self._prev_macd = None
+ self._prev_signal = None
def _macd_conviction(self, histogram_value: float, price: float) -> float:
"""Map histogram magnitude to conviction (0.1-1.0).
@@ -81,13 +85,45 @@ class MacdStrategy(BaseStrategy):
histogram = macd_line - signal_line
current_histogram = float(histogram.iloc[-1])
- signal = None
+ macd_val = float(macd_line.iloc[-1])
+ signal_val = float(signal_line.iloc[-1])
+ result_signal = None
+
+ # Signal-line crossover detection (MACD crosses signal line directly)
+ if self._prev_macd is not None and self._prev_signal is not None:
+ # Bullish: MACD crosses above signal
+ if self._prev_macd <= self._prev_signal and macd_val > signal_val:
+ distance_from_zero = abs(macd_val) / float(candle.close) * 1000
+ conv = min(max(distance_from_zero, 0.3), 1.0)
+ result_signal = Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.BUY,
+ price=candle.close,
+ quantity=self._quantity,
+ conviction=conv,
+ reason="MACD signal-line bullish crossover",
+ )
+ # Bearish: MACD crosses below signal
+ elif self._prev_macd >= self._prev_signal and macd_val < signal_val:
+ distance_from_zero = abs(macd_val) / float(candle.close) * 1000
+ conv = min(max(distance_from_zero, 0.3), 1.0)
+ result_signal = Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.SELL,
+ price=candle.close,
+ quantity=self._quantity,
+ conviction=conv,
+ reason="MACD signal-line bearish crossover",
+ )
- if self._prev_histogram is not None:
+ # Histogram crossover detection (existing logic, as secondary signal)
+ if result_signal is None and self._prev_histogram is not None:
conviction = self._macd_conviction(current_histogram, float(candle.close))
# Bullish crossover: histogram crosses from negative to positive
if self._prev_histogram <= 0 and current_histogram > 0:
- signal = Signal(
+ result_signal = Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.BUY,
@@ -98,7 +134,7 @@ class MacdStrategy(BaseStrategy):
)
# Bearish crossover: histogram crosses from positive to negative
elif self._prev_histogram >= 0 and current_histogram < 0:
- signal = Signal(
+ result_signal = Signal(
strategy=self.name,
symbol=candle.symbol,
side=OrderSide.SELL,
@@ -109,6 +145,8 @@ class MacdStrategy(BaseStrategy):
)
self._prev_histogram = current_histogram
- if signal is not None:
- return self._apply_filters(signal)
+ self._prev_macd = macd_val
+ self._prev_signal = signal_val
+ if result_signal is not None:
+ return self._apply_filters(result_signal)
return None
diff --git a/services/strategy-engine/strategies/moc_strategy.py b/services/strategy-engine/strategies/moc_strategy.py
new file mode 100644
index 0000000..7eaa59e
--- /dev/null
+++ b/services/strategy-engine/strategies/moc_strategy.py
@@ -0,0 +1,230 @@
+"""Market on Close (MOC) Strategy — US Stock 종가매매.
+
+Rules:
+- Buy: 15:50-16:00 ET (market close) when screening criteria met
+- Sell: 9:35-10:00 ET (market open next day)
+- Screening: bullish candle, volume above average, RSI 30-60, positive momentum
+- Risk: -2% stop loss, max 5 positions, 20% of capital per position
+"""
+
+from collections import deque
+from decimal import Decimal
+from datetime import datetime
+
+import pandas as pd
+
+from shared.models import Candle, Signal, OrderSide
+from strategies.base import BaseStrategy
+
+
+class MocStrategy(BaseStrategy):
+ """Market on Close strategy for overnight gap trading."""
+
+ name: str = "moc"
+
+ def __init__(self) -> None:
+ super().__init__()
+ # Parameters
+ self._quantity_pct: float = 0.2 # 20% of capital per trade
+ self._stop_loss_pct: float = 2.0
+ self._rsi_min: float = 30.0
+ self._rsi_max: float = 60.0
+ self._ema_period: int = 20
+ self._volume_avg_period: int = 20
+ self._min_volume_ratio: float = 1.0 # Volume must be above average
+ # Session times (UTC hours)
+ self._buy_start_utc: int = 19 # 15:00 ET = 19:00 UTC (summer) / 20:00 UTC (winter)
+ self._buy_end_utc: int = 21 # 16:00 ET = 20:00 UTC / 21:00 UTC
+ self._sell_start_utc: int = 13 # 9:00 ET = 13:00 UTC / 14:00 UTC
+ self._sell_end_utc: int = 15 # 10:00 ET = 14:00 UTC / 15:00 UTC
+ self._max_positions: int = 5
+ # State
+ self._closes: deque[float] = deque(maxlen=200)
+ self._volumes: deque[float] = deque(maxlen=200)
+ self._highs: deque[float] = deque(maxlen=200)
+ self._lows: deque[float] = deque(maxlen=200)
+ self._in_position: bool = False
+ self._entry_price: float = 0.0
+ self._today: str | None = None
+ self._bought_today: bool = False
+ self._sold_today: bool = False
+
+ @property
+ def warmup_period(self) -> int:
+ return max(self._ema_period, self._volume_avg_period) + 1
+
+ def configure(self, params: dict) -> None:
+ self._quantity_pct = float(params.get("quantity_pct", 0.2))
+ self._stop_loss_pct = float(params.get("stop_loss_pct", 2.0))
+ self._rsi_min = float(params.get("rsi_min", 30.0))
+ self._rsi_max = float(params.get("rsi_max", 60.0))
+ self._ema_period = int(params.get("ema_period", 20))
+ self._volume_avg_period = int(params.get("volume_avg_period", 20))
+ self._min_volume_ratio = float(params.get("min_volume_ratio", 1.0))
+ self._buy_start_utc = int(params.get("buy_start_utc", 19))
+ self._buy_end_utc = int(params.get("buy_end_utc", 21))
+ self._sell_start_utc = int(params.get("sell_start_utc", 13))
+ self._sell_end_utc = int(params.get("sell_end_utc", 15))
+ self._max_positions = int(params.get("max_positions", 5))
+
+ if self._quantity_pct <= 0 or self._quantity_pct > 1:
+ raise ValueError(f"quantity_pct must be 0-1, got {self._quantity_pct}")
+ if self._stop_loss_pct <= 0:
+ raise ValueError(f"stop_loss_pct must be positive, got {self._stop_loss_pct}")
+
+ def reset(self) -> None:
+ super().reset()
+ self._closes.clear()
+ self._volumes.clear()
+ self._highs.clear()
+ self._lows.clear()
+ self._in_position = False
+ self._entry_price = 0.0
+ self._today = None
+ self._bought_today = False
+ self._sold_today = False
+
+ def _is_buy_window(self, dt: datetime) -> bool:
+ """Check if in buy window (near market close)."""
+ hour = dt.hour
+ return self._buy_start_utc <= hour < self._buy_end_utc
+
+ def _is_sell_window(self, dt: datetime) -> bool:
+ """Check if in sell window (near market open)."""
+ hour = dt.hour
+ return self._sell_start_utc <= hour < self._sell_end_utc
+
+ def _compute_rsi(self, period: int = 14) -> float | None:
+ if len(self._closes) < period + 1:
+ return None
+ series = pd.Series(list(self._closes))
+ delta = series.diff()
+ gain = delta.clip(lower=0)
+ loss = -delta.clip(upper=0)
+ avg_gain = gain.ewm(com=period - 1, min_periods=period).mean()
+ avg_loss = loss.ewm(com=period - 1, min_periods=period).mean()
+ rs = avg_gain / avg_loss.replace(0, float("nan"))
+ rsi = 100 - (100 / (1 + rs))
+ val = rsi.iloc[-1]
+ return None if pd.isna(val) else float(val)
+
+ def _is_bullish_candle(self, candle: Candle) -> bool:
+ return float(candle.close) > float(candle.open)
+
+ def _price_above_ema(self) -> bool:
+ if len(self._closes) < self._ema_period:
+ return True
+ series = pd.Series(list(self._closes))
+ ema = series.ewm(span=self._ema_period, adjust=False).mean().iloc[-1]
+ return self._closes[-1] >= ema
+
+ def _volume_above_average(self) -> bool:
+ if len(self._volumes) < self._volume_avg_period:
+ return True
+ avg = sum(list(self._volumes)[-self._volume_avg_period :]) / self._volume_avg_period
+ return avg > 0 and self._volumes[-1] / avg >= self._min_volume_ratio
+
+ def _positive_momentum(self) -> bool:
+ """Check if price has positive short-term momentum (close > close 5 bars ago)."""
+ if len(self._closes) < 6:
+ return True
+ return self._closes[-1] > self._closes[-6]
+
+ def on_candle(self, candle: Candle) -> Signal | None:
+ self._update_filter_data(candle)
+
+ close = float(candle.close)
+ self._closes.append(close)
+ self._volumes.append(float(candle.volume))
+ self._highs.append(float(candle.high))
+ self._lows.append(float(candle.low))
+
+ # Daily reset
+ day = candle.open_time.strftime("%Y-%m-%d")
+ if self._today != day:
+ self._today = day
+ self._bought_today = False
+ self._sold_today = False
+
+ # --- SELL LOGIC (market open next day) ---
+ if self._in_position and self._is_sell_window(candle.open_time):
+ if not self._sold_today:
+ pnl_pct = (close - self._entry_price) / self._entry_price * 100
+ self._in_position = False
+ self._sold_today = True
+
+ conv = 0.8 if pnl_pct > 0 else 0.5
+ return self._apply_filters(
+ Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.SELL,
+ price=candle.close,
+ quantity=Decimal(str(self._quantity_pct)),
+ conviction=conv,
+ reason=f"MOC sell at open, PnL {pnl_pct:.2f}%",
+ )
+ )
+
+ # --- STOP LOSS ---
+ if self._in_position:
+ pnl_pct = (close - self._entry_price) / self._entry_price * 100
+ if pnl_pct <= -self._stop_loss_pct:
+ self._in_position = False
+ return self._apply_filters(
+ Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.SELL,
+ price=candle.close,
+ quantity=Decimal(str(self._quantity_pct)),
+ conviction=1.0,
+ stop_loss=candle.close,
+ reason=f"MOC stop loss {pnl_pct:.2f}% <= -{self._stop_loss_pct}%",
+ )
+ )
+
+ # --- BUY LOGIC (near market close) ---
+ if not self._in_position and self._is_buy_window(candle.open_time):
+ if self._bought_today:
+ return None
+
+ # Screening criteria
+ rsi = self._compute_rsi()
+ if rsi is None:
+ return None
+
+ checks = [
+ self._rsi_min <= rsi <= self._rsi_max, # RSI in sweet spot
+ self._is_bullish_candle(candle), # Bullish candle
+ self._price_above_ema(), # Above EMA (uptrend)
+ self._volume_above_average(), # Volume confirmation
+ self._positive_momentum(), # Short-term momentum
+ ]
+
+ if all(checks):
+ self._in_position = True
+ self._entry_price = close
+ self._bought_today = True
+
+ # Conviction based on RSI position within range
+ rsi_range = self._rsi_max - self._rsi_min
+ rsi_pos = (rsi - self._rsi_min) / rsi_range if rsi_range > 0 else 0.5
+ conv = 0.5 + (1.0 - rsi_pos) * 0.4 # Lower RSI = higher conviction
+
+ sl = candle.close * (1 - Decimal(str(self._stop_loss_pct / 100)))
+
+ return self._apply_filters(
+ Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.BUY,
+ price=candle.close,
+ quantity=Decimal(str(self._quantity_pct)),
+ conviction=conv,
+ stop_loss=sl,
+ reason=f"MOC buy: RSI={rsi:.1f}, bullish candle, above EMA, vol OK",
+ )
+ )
+
+ return None
diff --git a/services/strategy-engine/strategies/rsi_strategy.py b/services/strategy-engine/strategies/rsi_strategy.py
index 0ec6780..0646d8c 100644
--- a/services/strategy-engine/strategies/rsi_strategy.py
+++ b/services/strategy-engine/strategies/rsi_strategy.py
@@ -34,6 +34,14 @@ class RsiStrategy(BaseStrategy):
self._oversold: float = 30.0
self._overbought: float = 70.0
self._quantity: Decimal = Decimal("0.01")
+ # Divergence detection state
+ self._price_lows: deque[float] = deque(maxlen=5)
+ self._price_highs: deque[float] = deque(maxlen=5)
+ self._rsi_at_lows: deque[float] = deque(maxlen=5)
+ self._rsi_at_highs: deque[float] = deque(maxlen=5)
+ self._prev_close: float | None = None
+ self._prev_prev_close: float | None = None
+ self._prev_rsi: float | None = None
@property
def warmup_period(self) -> int:
@@ -65,6 +73,13 @@ class RsiStrategy(BaseStrategy):
def reset(self) -> None:
self._closes.clear()
+ self._price_lows.clear()
+ self._price_highs.clear()
+ self._rsi_at_lows.clear()
+ self._rsi_at_highs.clear()
+ self._prev_close = None
+ self._prev_prev_close = None
+ self._prev_rsi = None
def _rsi_conviction(self, rsi_value: float) -> float:
"""Map RSI value to conviction strength (0.0-1.0).
@@ -86,14 +101,76 @@ class RsiStrategy(BaseStrategy):
self._closes.append(float(candle.close))
if len(self._closes) < self._period + 1:
+ self._prev_prev_close = self._prev_close
+ self._prev_close = float(candle.close)
return None
series = pd.Series(list(self._closes))
rsi_value = _compute_rsi(series, self._period)
if rsi_value is None:
+ self._prev_prev_close = self._prev_close
+ self._prev_close = float(candle.close)
return None
+ close = float(candle.close)
+
+ # Detect swing points for divergence
+ if self._prev_close is not None and self._prev_prev_close is not None:
+ # Swing low: prev_close < both neighbors
+ if self._prev_close < self._prev_prev_close and self._prev_close < close:
+ self._price_lows.append(self._prev_close)
+ self._rsi_at_lows.append(
+ self._prev_rsi if self._prev_rsi is not None else rsi_value
+ )
+ # Swing high: prev_close > both neighbors
+ if self._prev_close > self._prev_prev_close and self._prev_close > close:
+ self._price_highs.append(self._prev_close)
+ self._rsi_at_highs.append(
+ self._prev_rsi if self._prev_rsi is not None else rsi_value
+ )
+
+ # Check bullish divergence: price lower low, RSI higher low
+ if len(self._price_lows) >= 2:
+ if (
+ self._price_lows[-1] < self._price_lows[-2]
+ and self._rsi_at_lows[-1] > self._rsi_at_lows[-2]
+ ):
+ signal = Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.BUY,
+ price=candle.close,
+ quantity=self._quantity,
+ conviction=0.9,
+ reason="RSI bullish divergence",
+ )
+ self._prev_rsi = rsi_value
+ self._prev_prev_close = self._prev_close
+ self._prev_close = close
+ return self._apply_filters(signal)
+
+ # Check bearish divergence: price higher high, RSI lower high
+ if len(self._price_highs) >= 2:
+ if (
+ self._price_highs[-1] > self._price_highs[-2]
+ and self._rsi_at_highs[-1] < self._rsi_at_highs[-2]
+ ):
+ signal = Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.SELL,
+ price=candle.close,
+ quantity=self._quantity,
+ conviction=0.9,
+ reason="RSI bearish divergence",
+ )
+ self._prev_rsi = rsi_value
+ self._prev_prev_close = self._prev_close
+ self._prev_close = close
+ return self._apply_filters(signal)
+
+ # Existing oversold/overbought logic (secondary signals)
if rsi_value < self._oversold:
signal = Signal(
strategy=self.name,
@@ -104,6 +181,9 @@ class RsiStrategy(BaseStrategy):
conviction=self._rsi_conviction(rsi_value),
reason=f"RSI {rsi_value:.2f} below oversold threshold {self._oversold}",
)
+ self._prev_rsi = rsi_value
+ self._prev_prev_close = self._prev_close
+ self._prev_close = close
return self._apply_filters(signal)
elif rsi_value > self._overbought:
signal = Signal(
@@ -115,6 +195,12 @@ class RsiStrategy(BaseStrategy):
conviction=self._rsi_conviction(rsi_value),
reason=f"RSI {rsi_value:.2f} above overbought threshold {self._overbought}",
)
+ self._prev_rsi = rsi_value
+ self._prev_prev_close = self._prev_close
+ self._prev_close = close
return self._apply_filters(signal)
+ self._prev_rsi = rsi_value
+ self._prev_prev_close = self._prev_close
+ self._prev_close = close
return None
diff --git a/services/strategy-engine/strategies/volume_profile_strategy.py b/services/strategy-engine/strategies/volume_profile_strategy.py
index 324f1c2..ef2ae14 100644
--- a/services/strategy-engine/strategies/volume_profile_strategy.py
+++ b/services/strategy-engine/strategies/volume_profile_strategy.py
@@ -56,7 +56,8 @@ class VolumeProfileStrategy(BaseStrategy):
self._was_below_va = False
self._was_above_va = False
- def _compute_value_area(self) -> tuple[float, float, float] | None:
+ def _compute_value_area(self) -> tuple[float, float, float, list[float], list[float]] | None:
+ """Compute POC, VA low, VA high, HVN levels, LVN levels."""
data = list(self._candles)
if len(data) < self._lookback_period:
return None
@@ -67,7 +68,7 @@ class VolumeProfileStrategy(BaseStrategy):
min_price = prices.min()
max_price = prices.max()
if min_price == max_price:
- return (float(min_price), float(min_price), float(max_price))
+ return (float(min_price), float(min_price), float(max_price), [], [])
bin_edges = np.linspace(min_price, max_price, self._num_bins + 1)
vol_profile = np.zeros(self._num_bins)
@@ -84,7 +85,7 @@ class VolumeProfileStrategy(BaseStrategy):
# Value Area: expand from POC outward
total_volume = vol_profile.sum()
if total_volume == 0:
- return (poc, float(bin_edges[0]), float(bin_edges[-1]))
+ return (poc, float(bin_edges[0]), float(bin_edges[-1]), [], [])
target_volume = self._value_area_pct * total_volume
accumulated = vol_profile[poc_idx]
@@ -111,7 +112,20 @@ class VolumeProfileStrategy(BaseStrategy):
va_low = float(bin_edges[low_idx])
va_high = float(bin_edges[high_idx + 1])
- return (poc, va_low, va_high)
+ # HVN/LVN detection
+ mean_vol = vol_profile.mean()
+ std_vol = vol_profile.std()
+
+ hvn_levels: list[float] = []
+ lvn_levels: list[float] = []
+ for i in range(len(vol_profile)):
+ mid = float((bin_edges[i] + bin_edges[i + 1]) / 2)
+ if vol_profile[i] > mean_vol + std_vol:
+ hvn_levels.append(mid)
+ elif vol_profile[i] < mean_vol - 0.5 * std_vol and vol_profile[i] > 0:
+ lvn_levels.append(mid)
+
+ return (poc, va_low, va_high, hvn_levels, lvn_levels)
def on_candle(self, candle: Candle) -> Signal | None:
self._update_filter_data(candle)
@@ -123,13 +137,41 @@ class VolumeProfileStrategy(BaseStrategy):
if result is None:
return None
- poc, va_low, va_high = result
+ poc, va_low, va_high, hvn_levels, lvn_levels = result
if close < va_low:
self._was_below_va = True
if close > va_high:
self._was_above_va = True
+ # HVN bounce signals (stronger than regular VA bounces)
+ for hvn in hvn_levels:
+ if abs(close - hvn) / hvn < 0.005: # Within 0.5% of HVN
+ if self._was_below_va and close >= va_low:
+ self._was_below_va = False
+ signal = Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.BUY,
+ price=candle.close,
+ quantity=self._quantity,
+ conviction=0.85,
+ reason=f"Price near HVN {hvn:.2f}, bounced from below VA low {va_low:.2f} to {close:.2f}",
+ )
+ return self._apply_filters(signal)
+ if self._was_above_va and close <= va_high:
+ self._was_above_va = False
+ signal = Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.SELL,
+ price=candle.close,
+ quantity=self._quantity,
+ conviction=0.85,
+ reason=f"Price near HVN {hvn:.2f}, rejected from above VA high {va_high:.2f} to {close:.2f}",
+ )
+ return self._apply_filters(signal)
+
# BUY: was below VA, price bounces back between va_low and poc
if self._was_below_va and va_low <= close <= poc:
self._was_below_va = False
diff --git a/services/strategy-engine/strategies/vwap_strategy.py b/services/strategy-engine/strategies/vwap_strategy.py
index c525ff3..d64950e 100644
--- a/services/strategy-engine/strategies/vwap_strategy.py
+++ b/services/strategy-engine/strategies/vwap_strategy.py
@@ -1,3 +1,4 @@
+from collections import deque
from decimal import Decimal
from shared.models import Candle, Signal, OrderSide
@@ -16,6 +17,9 @@ class VwapStrategy(BaseStrategy):
self._candle_count: int = 0
self._was_below_vwap: bool = False
self._was_above_vwap: bool = False
+ self._current_date: str | None = None # Track date for daily reset
+ self._tp_values: deque[float] = deque(maxlen=500) # For std calculation
+ self._vwap_values: deque[float] = deque(maxlen=500)
@property
def warmup_period(self) -> int:
@@ -41,11 +45,15 @@ class VwapStrategy(BaseStrategy):
)
def reset(self) -> None:
+ super().reset()
self._cumulative_tp_vol = 0.0
self._cumulative_vol = 0.0
self._candle_count = 0
self._was_below_vwap = False
self._was_above_vwap = False
+ self._current_date = None
+ self._tp_values.clear()
+ self._vwap_values.clear()
def _vwap_conviction(self, deviation: float) -> float:
"""Map VWAP deviation magnitude to conviction (0.1-1.0).
@@ -58,6 +66,20 @@ class VwapStrategy(BaseStrategy):
def on_candle(self, candle: Candle) -> Signal | None:
self._update_filter_data(candle)
+
+ # Daily reset
+ candle_date = candle.open_time.strftime("%Y-%m-%d")
+ if self._current_date is not None and candle_date != self._current_date:
+ # New day — reset VWAP
+ self._cumulative_tp_vol = 0.0
+ self._cumulative_vol = 0.0
+ self._candle_count = 0
+ self._was_below_vwap = False
+ self._was_above_vwap = False
+ self._tp_values.clear()
+ self._vwap_values.clear()
+ self._current_date = candle_date
+
high = float(candle.high)
low = float(candle.low)
close = float(candle.close)
@@ -77,6 +99,19 @@ class VwapStrategy(BaseStrategy):
vwap = self._cumulative_tp_vol / self._cumulative_vol
if vwap == 0.0:
return None
+
+ # Track values for deviation band calculation
+ self._tp_values.append(typical_price)
+ self._vwap_values.append(vwap)
+
+ # Standard deviation of (TP - VWAP) for bands
+ std_dev = 0.0
+ if len(self._tp_values) >= 2:
+ diffs = [tp - v for tp, v in zip(self._tp_values, self._vwap_values)]
+ mean_diff = sum(diffs) / len(diffs)
+ variance = sum((d - mean_diff) ** 2 for d in diffs) / len(diffs)
+ std_dev = variance**0.5
+
deviation = (close - vwap) / vwap
if deviation < -self._deviation_threshold:
@@ -84,10 +119,20 @@ class VwapStrategy(BaseStrategy):
if deviation > self._deviation_threshold:
self._was_above_vwap = True
+ # Determine conviction based on deviation bands
+ def _band_conviction(price: float) -> float:
+ if std_dev > 0 and len(self._tp_values) >= 2:
+ dist_from_vwap = abs(price - vwap)
+ if dist_from_vwap >= 2 * std_dev:
+ return 0.9
+ elif dist_from_vwap >= std_dev:
+ return 0.6
+ return 0.5
+
# Mean reversion from below: was below VWAP, now back near it
if self._was_below_vwap and abs(deviation) <= self._deviation_threshold:
self._was_below_vwap = False
- conviction = self._vwap_conviction(deviation)
+ conviction = _band_conviction(close)
signal = Signal(
strategy=self.name,
symbol=candle.symbol,
@@ -102,7 +147,7 @@ class VwapStrategy(BaseStrategy):
# Mean reversion from above: was above VWAP, now back near it
if self._was_above_vwap and abs(deviation) <= self._deviation_threshold:
self._was_above_vwap = False
- conviction = self._vwap_conviction(deviation)
+ conviction = _band_conviction(close)
signal = Signal(
strategy=self.name,
symbol=candle.symbol,
diff --git a/services/strategy-engine/tests/test_base_filters.py b/services/strategy-engine/tests/test_base_filters.py
index 97d9e16..ae9ca05 100644
--- a/services/strategy-engine/tests/test_base_filters.py
+++ b/services/strategy-engine/tests/test_base_filters.py
@@ -1,11 +1,12 @@
"""Tests for BaseStrategy filters (ADX, volume, ATR stops)."""
+
import sys
from pathlib import Path
+
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from decimal import Decimal
from datetime import datetime, timezone
-import pytest
from shared.models import Candle, Signal, OrderSide
from strategies.base import BaseStrategy
@@ -28,9 +29,12 @@ class DummyStrategy(BaseStrategy):
def on_candle(self, candle: Candle) -> Signal | None:
self._update_filter_data(candle)
signal = Signal(
- strategy=self.name, symbol=candle.symbol,
- side=OrderSide.BUY, price=candle.close,
- quantity=self._quantity, reason="test",
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.BUY,
+ price=candle.close,
+ quantity=self._quantity,
+ reason="test",
)
return self._apply_filters(signal)
@@ -39,10 +43,13 @@ def _candle(price=100.0, volume=10.0, high=None, low=None):
h = high if high is not None else price + 5
lo = low if low is not None else price - 5
return Candle(
- symbol="BTCUSDT", timeframe="1h",
+ symbol="AAPL",
+ timeframe="1h",
open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
- open=Decimal(str(price)), high=Decimal(str(h)),
- low=Decimal(str(lo)), close=Decimal(str(price)),
+ open=Decimal(str(price)),
+ high=Decimal(str(h)),
+ low=Decimal(str(lo)),
+ close=Decimal(str(price)),
volume=Decimal(str(volume)),
)
diff --git a/services/strategy-engine/tests/test_bollinger_strategy.py b/services/strategy-engine/tests/test_bollinger_strategy.py
index 348a9e0..8261377 100644
--- a/services/strategy-engine/tests/test_bollinger_strategy.py
+++ b/services/strategy-engine/tests/test_bollinger_strategy.py
@@ -10,7 +10,7 @@ from strategies.bollinger_strategy import BollingerStrategy
def make_candle(close: float) -> Candle:
return Candle(
- symbol="BTC/USDT",
+ symbol="AAPL",
timeframe="1m",
open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
open=Decimal(str(close)),
@@ -23,7 +23,7 @@ def make_candle(close: float) -> Candle:
def _make_strategy() -> BollingerStrategy:
s = BollingerStrategy()
- s.configure({"period": 5, "num_std": 1.0, "min_bandwidth": 0.0})
+ s.configure({"period": 5, "num_std": 1.0, "min_bandwidth": 0.0, "squeeze_threshold": 0.0})
return s
@@ -99,3 +99,79 @@ def test_bollinger_reset_clears_state():
assert len(strategy._closes) == 1
assert strategy._was_below_lower is False
assert strategy._was_above_upper is False
+ assert strategy._in_squeeze is False
+ assert strategy._squeeze_bars == 0
+
+
+def test_bollinger_squeeze_detection():
+ """Tight bandwidth → no signal during squeeze."""
+ # Use a strategy with a high squeeze threshold so constant prices trigger squeeze
+ s = BollingerStrategy()
+ s.configure(
+ {
+ "period": 5,
+ "num_std": 2.0,
+ "min_bandwidth": 0.0,
+ "squeeze_threshold": 0.5, # Very high threshold to ensure squeeze triggers
+ }
+ )
+
+ # Feed identical prices → bandwidth = 0 (below any threshold)
+ for _ in range(6):
+ result = s.on_candle(make_candle(100.0))
+
+ # With identical prices, std=0, bandwidth=0 < 0.5 → squeeze, no signal
+ assert s._in_squeeze is True
+ assert result is None
+
+
+def test_bollinger_squeeze_breakout_buy():
+ """Squeeze ends with price above SMA → BUY signal."""
+ s = BollingerStrategy()
+ s.configure(
+ {
+ "period": 5,
+ "num_std": 1.0,
+ "min_bandwidth": 0.0,
+ "squeeze_threshold": 0.01,
+ }
+ )
+
+ # Feed identical prices to create a squeeze (bandwidth = 0)
+ for _ in range(6):
+ s.on_candle(make_candle(100.0))
+
+ assert s._in_squeeze is True
+
+ # Now feed a price that creates enough spread to exit squeeze AND is above SMA
+ signal = s.on_candle(make_candle(120.0))
+ assert signal is not None
+ assert signal.side == OrderSide.BUY
+ assert "squeeze breakout UP" in signal.reason
+
+
+def test_bollinger_pct_b_conviction():
+ """Signals near band extremes have higher conviction via %B."""
+ s = BollingerStrategy()
+ s.configure(
+ {
+ "period": 5,
+ "num_std": 1.0,
+ "min_bandwidth": 0.0,
+ "squeeze_threshold": 0.0, # Disable squeeze for this test
+ }
+ )
+
+ # Build up with stable prices
+ for _ in range(5):
+ s.on_candle(make_candle(100.0))
+
+ # Drop below lower band
+ s.on_candle(make_candle(50.0))
+
+ # Recover just at the lower band edge — %B close to 0 → high conviction
+ signal = s.on_candle(make_candle(100.0))
+ assert signal is not None
+ assert signal.side == OrderSide.BUY
+ # conviction = max(1.0 - pct_b, 0.3), with pct_b near lower → conviction should be >= 0.3
+ assert signal.conviction >= 0.3
diff --git a/services/strategy-engine/tests/test_combined_strategy.py b/services/strategy-engine/tests/test_combined_strategy.py
index 3408a89..8a4dc74 100644
--- a/services/strategy-engine/tests/test_combined_strategy.py
+++ b/services/strategy-engine/tests/test_combined_strategy.py
@@ -72,7 +72,7 @@ class NeutralStrategy(BaseStrategy):
def _candle(price=100.0):
return Candle(
- symbol="BTCUSDT",
+ symbol="AAPL",
timeframe="1m",
open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
open=Decimal(str(price)),
@@ -167,3 +167,60 @@ def test_combined_invalid_weight():
c.configure({})
with pytest.raises(ValueError):
c.add_strategy(AlwaysBuyStrategy(), weight=-1.0)
+
+
+def test_combined_record_result():
+ """Verify trade history tracking works correctly."""
+ c = CombinedStrategy()
+ c.configure({"adaptive_weights": True, "history_window": 5})
+
+ c.record_result("test_strat", True)
+ c.record_result("test_strat", False)
+ c.record_result("test_strat", True)
+
+ assert len(c._trade_history["test_strat"]) == 3
+ assert c._trade_history["test_strat"] == [True, False, True]
+
+ # Fill beyond window size to test trimming
+ for _ in range(5):
+ c.record_result("test_strat", False)
+
+ assert len(c._trade_history["test_strat"]) == 5 # Trimmed to history_window
+
+
+def test_combined_adaptive_weight_increases_for_winners():
+ """Strategy with high win rate gets higher effective weight."""
+ c = CombinedStrategy()
+ c.configure({"threshold": 0.3, "adaptive_weights": True, "history_window": 20})
+ c.add_strategy(AlwaysBuyStrategy(), weight=1.0)
+
+ # Record high win rate for always_buy (80% wins)
+ for _ in range(8):
+ c.record_result("always_buy", True)
+ for _ in range(2):
+ c.record_result("always_buy", False)
+
+ # Adaptive weight should be > base weight (1.0)
+ adaptive_w = c._get_adaptive_weight("always_buy", 1.0)
+ assert adaptive_w > 1.0
+ # 80% win rate -> scale = 0.5 + 0.8 = 1.3 -> weight = 1.3
+ assert abs(adaptive_w - 1.3) < 0.01
+
+
+def test_combined_adaptive_weight_decreases_for_losers():
+ """Strategy with low win rate gets lower effective weight."""
+ c = CombinedStrategy()
+ c.configure({"threshold": 0.3, "adaptive_weights": True, "history_window": 20})
+ c.add_strategy(AlwaysBuyStrategy(), weight=1.0)
+
+ # Record low win rate for always_buy (20% wins)
+ for _ in range(2):
+ c.record_result("always_buy", True)
+ for _ in range(8):
+ c.record_result("always_buy", False)
+
+ # Adaptive weight should be < base weight (1.0)
+ adaptive_w = c._get_adaptive_weight("always_buy", 1.0)
+ assert adaptive_w < 1.0
+ # 20% win rate -> scale = 0.5 + 0.2 = 0.7 -> weight = 0.7
+ assert abs(adaptive_w - 0.7) < 0.01
diff --git a/services/strategy-engine/tests/test_ema_crossover_strategy.py b/services/strategy-engine/tests/test_ema_crossover_strategy.py
index 0cf767b..7028eb0 100644
--- a/services/strategy-engine/tests/test_ema_crossover_strategy.py
+++ b/services/strategy-engine/tests/test_ema_crossover_strategy.py
@@ -10,7 +10,7 @@ from strategies.ema_crossover_strategy import EmaCrossoverStrategy
def make_candle(close: float) -> Candle:
return Candle(
- symbol="BTC/USDT",
+ symbol="AAPL",
timeframe="1m",
open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
open=Decimal(str(close)),
@@ -21,9 +21,18 @@ def make_candle(close: float) -> Candle:
)
-def _make_strategy(short: int = 3, long: int = 6) -> EmaCrossoverStrategy:
+def _make_strategy(
+ short: int = 3, long: int = 6, pullback_enabled: bool = False
+) -> EmaCrossoverStrategy:
s = EmaCrossoverStrategy()
- s.configure({"short_period": short, "long_period": long, "quantity": "0.01"})
+ s.configure(
+ {
+ "short_period": short,
+ "long_period": long,
+ "quantity": "0.01",
+ "pullback_enabled": pullback_enabled,
+ }
+ )
return s
@@ -97,3 +106,110 @@ def test_ema_reset_clears_state():
# Internal state should be cleared
assert len(strategy._closes) == 1
assert strategy._prev_short_above is None
+ assert strategy._pending_signal is None
+
+
+def test_ema_pullback_entry():
+ """Crossover detected, then pullback to short EMA triggers signal."""
+ strategy = EmaCrossoverStrategy()
+ strategy.configure(
+ {
+ "short_period": 3,
+ "long_period": 6,
+ "quantity": "0.01",
+ "pullback_enabled": True,
+ "pullback_tolerance": 0.05, # 5% tolerance for test simplicity
+ }
+ )
+
+ # Declining prices so short EMA stays below long EMA
+ declining = [100, 98, 96, 94, 92, 90, 88, 86, 84, 82]
+ for price in declining:
+ strategy.on_candle(make_candle(price))
+
+ # Sharp rise to force golden cross — with pullback enabled, no signal yet
+ rising = [120, 140, 160]
+ for price in rising:
+ strategy.on_candle(make_candle(price))
+
+ # With pullback enabled, crossover should NOT produce immediate signal
+ # but _pending_signal should be set
+ assert strategy._pending_signal == "BUY"
+
+ # Now feed a candle whose close is near the short EMA (pullback)
+ # The short EMA will be tracking recent prices; feed a price that pulls back
+ # toward it. We use a moderate price to get close to short EMA.
+ import pandas as pd
+
+ series = pd.Series(list(strategy._closes))
+ short_ema_val = series.ewm(span=3, adjust=False).mean().iloc[-1]
+ # Feed a candle at approximately the short EMA value
+ result = strategy.on_candle(make_candle(short_ema_val))
+ assert result is not None
+ assert result.side == OrderSide.BUY
+ assert "pullback" in result.reason
+
+
+def test_ema_pullback_cancelled_on_reversal():
+ """Crossover detected, then reversal cancels the pending signal."""
+ strategy = EmaCrossoverStrategy()
+ strategy.configure(
+ {
+ "short_period": 3,
+ "long_period": 6,
+ "quantity": "0.01",
+ "pullback_enabled": True,
+ "pullback_tolerance": 0.001, # Very tight tolerance — won't trigger easily
+ }
+ )
+
+ # Declining prices
+ declining = [100, 98, 96, 94, 92, 90, 88, 86, 84, 82]
+ for price in declining:
+ strategy.on_candle(make_candle(price))
+
+ # Sharp rise to force golden cross
+ for price in [120, 140, 160]:
+ strategy.on_candle(make_candle(price))
+
+ assert strategy._pending_signal == "BUY"
+
+ # Now sharp decline to reverse the crossover (death cross)
+ for price in [60, 40, 20]:
+ strategy.on_candle(make_candle(price))
+
+ # The BUY pending signal should be cancelled because short EMA fell below long EMA.
+ # A new death cross may set _pending_signal to "SELL", but the original "BUY" is gone.
+ assert strategy._pending_signal != "BUY"
+
+
+def test_ema_immediate_mode():
+ """With pullback_enabled=False, original immediate entry works."""
+ strategy = EmaCrossoverStrategy()
+ strategy.configure(
+ {
+ "short_period": 3,
+ "long_period": 6,
+ "quantity": "0.01",
+ "pullback_enabled": False,
+ }
+ )
+
+ # Declining prices so short EMA stays below long EMA
+ declining = [100, 98, 96, 94, 92, 90, 88, 86, 84, 82]
+ for price in declining:
+ strategy.on_candle(make_candle(price))
+
+ # Sharp rise to force golden cross — immediate mode should fire signal
+ rising = [120, 140, 160]
+ signal = None
+ for price in rising:
+ result = strategy.on_candle(make_candle(price))
+ if result is not None:
+ signal = result
+
+ assert signal is not None
+ assert signal.side == OrderSide.BUY
+ assert "Golden Cross" in signal.reason
+ # No pending signal should be set
+ assert strategy._pending_signal is None
diff --git a/services/strategy-engine/tests/test_engine.py b/services/strategy-engine/tests/test_engine.py
index ac9a596..2623027 100644
--- a/services/strategy-engine/tests/test_engine.py
+++ b/services/strategy-engine/tests/test_engine.py
@@ -13,7 +13,7 @@ from strategy_engine.engine import StrategyEngine
def make_candle_event() -> dict:
candle = Candle(
- symbol="BTC/USDT",
+ symbol="AAPL",
timeframe="1m",
open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
open=Decimal("50000"),
@@ -28,7 +28,7 @@ def make_candle_event() -> dict:
def make_signal() -> Signal:
return Signal(
strategy="test",
- symbol="BTC/USDT",
+ symbol="AAPL",
side=OrderSide.BUY,
price=Decimal("50050"),
quantity=Decimal("0.01"),
@@ -46,12 +46,12 @@ async def test_engine_dispatches_candle_to_strategies():
strategy.on_candle = MagicMock(return_value=None)
engine = StrategyEngine(broker=broker, strategies=[strategy])
- await engine.process_once("candles.BTC_USDT", "0")
+ await engine.process_once("candles.AAPL", "0")
strategy.on_candle.assert_called_once()
candle_arg = strategy.on_candle.call_args[0][0]
assert isinstance(candle_arg, Candle)
- assert candle_arg.symbol == "BTC/USDT"
+ assert candle_arg.symbol == "AAPL"
@pytest.mark.asyncio
@@ -64,7 +64,7 @@ async def test_engine_publishes_signal_when_strategy_returns_one():
strategy.on_candle = MagicMock(return_value=make_signal())
engine = StrategyEngine(broker=broker, strategies=[strategy])
- await engine.process_once("candles.BTC_USDT", "0")
+ await engine.process_once("candles.AAPL", "0")
broker.publish.assert_called_once()
call_args = broker.publish.call_args
diff --git a/services/strategy-engine/tests/test_grid_strategy.py b/services/strategy-engine/tests/test_grid_strategy.py
index 79eb22a..878b900 100644
--- a/services/strategy-engine/tests/test_grid_strategy.py
+++ b/services/strategy-engine/tests/test_grid_strategy.py
@@ -10,7 +10,7 @@ from strategies.grid_strategy import GridStrategy
def make_candle(close: float) -> Candle:
return Candle(
- symbol="BTC/USDT",
+ symbol="AAPL",
timeframe="1m",
open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
open=Decimal(str(close)),
@@ -60,3 +60,41 @@ def test_grid_strategy_no_signal_in_same_zone():
strategy.on_candle(make_candle(50000))
signal = strategy.on_candle(make_candle(50100))
assert signal is None
+
+
+def test_grid_exit_on_trend_break():
+ """Price drops well below grid range → SELL signal emitted."""
+ strategy = _configured_strategy()
+ # Grid range is 48000-52000, exit_threshold_pct defaults to 5%
+ # Lower bound = 48000 * 0.95 = 45600
+ # Establish a zone first
+ strategy.on_candle(make_candle(50000))
+ # Price drops far below the grid range
+ signal = strategy.on_candle(make_candle(45000))
+ assert signal is not None
+ assert signal.side == OrderSide.SELL
+ assert "broke out of range" in signal.reason
+
+
+def test_grid_no_signal_while_out_of_range():
+ """After exit signal, no more grid signals until price returns to range."""
+ strategy = _configured_strategy()
+ # Establish a zone
+ strategy.on_candle(make_candle(50000))
+ # First out-of-range candle → SELL exit signal
+ signal = strategy.on_candle(make_candle(45000))
+ assert signal is not None
+ assert signal.side == OrderSide.SELL
+
+ # Subsequent out-of-range candles → no signals
+ signal = strategy.on_candle(make_candle(44000))
+ assert signal is None
+
+ signal = strategy.on_candle(make_candle(43000))
+ assert signal is None
+
+ # Price returns to grid range → grid signals resume
+ strategy.on_candle(make_candle(50000))
+ signal = strategy.on_candle(make_candle(48100))
+ assert signal is not None
+ assert signal.side == OrderSide.BUY
diff --git a/services/strategy-engine/tests/test_indicators.py b/services/strategy-engine/tests/test_indicators.py
index ac5b505..481569b 100644
--- a/services/strategy-engine/tests/test_indicators.py
+++ b/services/strategy-engine/tests/test_indicators.py
@@ -1,6 +1,8 @@
"""Tests for technical indicator library."""
+
import sys
from pathlib import Path
+
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
import pandas as pd
diff --git a/services/strategy-engine/tests/test_macd_strategy.py b/services/strategy-engine/tests/test_macd_strategy.py
index 9931b43..556fd4c 100644
--- a/services/strategy-engine/tests/test_macd_strategy.py
+++ b/services/strategy-engine/tests/test_macd_strategy.py
@@ -10,7 +10,7 @@ from strategies.macd_strategy import MacdStrategy
def _candle(price: float) -> Candle:
return Candle(
- symbol="BTC/USDT",
+ symbol="AAPL",
timeframe="1m",
open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
open=Decimal(str(price)),
@@ -78,3 +78,63 @@ def test_macd_reset_clears_state():
s.reset()
assert len(s._closes) == 0
assert s._prev_histogram is None
+ assert s._prev_macd is None
+ assert s._prev_signal is None
+
+
+def test_macd_signal_line_crossover():
+ """Test that MACD signal-line crossover generates signals."""
+ s = _make_strategy()
+ # Declining then rising prices should produce a signal-line bullish crossover
+ prices = [100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88]
+ prices += [89, 91, 94, 98, 103, 109, 116, 124, 133, 143]
+ signals = []
+ for p in prices:
+ result = s.on_candle(_candle(float(p)))
+ if result is not None:
+ signals.append(result)
+
+ buy_signals = [sig for sig in signals if sig.side == OrderSide.BUY]
+ assert len(buy_signals) > 0, "Expected at least one BUY signal"
+ # Check that at least one is a signal-line crossover or histogram crossover
+ all_reasons = [sig.reason for sig in buy_signals]
+ assert any("crossover" in r for r in all_reasons), (
+ f"Expected crossover signal, got: {all_reasons}"
+ )
+
+
+def test_macd_conviction_varies_with_distance():
+ """Test that conviction varies based on MACD distance from zero line."""
+ s1 = _make_strategy()
+ s2 = _make_strategy()
+
+ # Small price movements -> MACD near zero -> lower conviction
+ small_prices = [100, 99.5, 99, 98.5, 98, 97.5, 97, 96.5, 96, 95.5, 95, 94.5, 94]
+ small_prices += [94.5, 95, 95.5, 96, 96.5, 97, 97.5, 98, 98.5, 99]
+ small_signals = []
+ for p in small_prices:
+ result = s1.on_candle(_candle(float(p)))
+ if result is not None:
+ small_signals.append(result)
+
+ # Large price movements -> MACD far from zero -> higher conviction
+ large_prices = [100, 95, 90, 85, 80, 75, 70, 65, 60, 55, 50, 45, 40]
+ large_prices += [45, 55, 70, 90, 115, 145, 180, 220, 265, 315]
+ large_signals = []
+ for p in large_prices:
+ result = s2.on_candle(_candle(float(p)))
+ if result is not None:
+ large_signals.append(result)
+
+ # Both should produce signals
+ assert len(small_signals) > 0, "Expected signals from small movements"
+ assert len(large_signals) > 0, "Expected signals from large movements"
+
+ # The large-movement signals should generally have higher conviction
+ # (or at least different conviction, since distance from zero affects it)
+ small_conv = small_signals[-1].conviction
+ large_conv = large_signals[-1].conviction
+ # Large movements should produce conviction >= small movements
+ assert large_conv >= small_conv, (
+ f"Expected large movement conviction ({large_conv}) >= small ({small_conv})"
+ )
diff --git a/services/strategy-engine/tests/test_moc_strategy.py b/services/strategy-engine/tests/test_moc_strategy.py
new file mode 100644
index 0000000..1928a28
--- /dev/null
+++ b/services/strategy-engine/tests/test_moc_strategy.py
@@ -0,0 +1,152 @@
+"""Tests for MOC (Market on Close) strategy."""
+
+import sys
+from pathlib import Path
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
+
+from datetime import datetime, timezone
+from decimal import Decimal
+
+from shared.models import Candle, OrderSide
+from strategies.moc_strategy import MocStrategy
+
+
+def _candle(price, hour=20, minute=0, volume=100.0, day=1, open_price=None):
+ op = open_price if open_price is not None else price - 1 # Default: bullish
+ return Candle(
+ symbol="AAPL",
+ timeframe="5Min",
+ open_time=datetime(2025, 1, day, hour, minute, tzinfo=timezone.utc),
+ open=Decimal(str(op)),
+ high=Decimal(str(price + 1)),
+ low=Decimal(str(min(op, price) - 1)),
+ close=Decimal(str(price)),
+ volume=Decimal(str(volume)),
+ )
+
+
+def _make_strategy(**overrides):
+ s = MocStrategy()
+ params = {
+ "quantity_pct": 0.2,
+ "stop_loss_pct": 2.0,
+ "rsi_min": 30,
+ "rsi_max": 70, # Wider for tests
+ "ema_period": 5,
+ "volume_avg_period": 5,
+ "min_volume_ratio": 0.5,
+ "buy_start_utc": 19,
+ "buy_end_utc": 21,
+ "sell_start_utc": 13,
+ "sell_end_utc": 15,
+ "max_positions": 5,
+ }
+ params.update(overrides)
+ s.configure(params)
+ return s
+
+
+def test_moc_warmup_period():
+ s = _make_strategy(ema_period=20, volume_avg_period=15)
+ assert s.warmup_period == 21
+
+
+def test_moc_no_signal_outside_buy_window():
+ s = _make_strategy()
+ # Hour 12 UTC — not in buy (19-21) or sell (13-15) window
+ for i in range(10):
+ sig = s.on_candle(_candle(150 + i, hour=12, minute=i * 5))
+ assert sig is None
+
+
+def test_moc_buy_signal_in_window():
+ s = _make_strategy(ema_period=3)
+ # Build up history with some oscillation so RSI settles in the 30-70 range
+ prices = [
+ 150,
+ 149,
+ 151,
+ 148,
+ 152,
+ 149,
+ 150,
+ 151,
+ 148,
+ 150,
+ 149,
+ 151,
+ 150,
+ 152,
+ 151,
+ 153,
+ 152,
+ 154,
+ 153,
+ 155,
+ ]
+ signals = []
+ for i, p in enumerate(prices):
+ sig = s.on_candle(_candle(p, hour=20, minute=i * 2, volume=200.0))
+ if sig is not None:
+ signals.append(sig)
+ buy_signals = [sig for sig in signals if sig.side == OrderSide.BUY]
+ assert len(buy_signals) > 0
+ assert buy_signals[0].strategy == "moc"
+
+
+def test_moc_sell_at_open():
+ s = _make_strategy(ema_period=3)
+ # Force entry
+ for i in range(10):
+ s.on_candle(_candle(150 + i, hour=20, minute=i * 3, volume=200.0))
+
+ if s._in_position:
+ # Next day, sell window
+ sig = s.on_candle(_candle(155, hour=14, minute=0, day=2))
+ assert sig is not None
+ assert sig.side == OrderSide.SELL
+ assert "MOC sell" in sig.reason
+
+
+def test_moc_stop_loss():
+ s = _make_strategy(ema_period=3, stop_loss_pct=1.0)
+ for i in range(10):
+ s.on_candle(_candle(150 + i, hour=20, minute=i * 3, volume=200.0))
+
+ if s._in_position:
+ drop_price = s._entry_price * 0.98 # -2%
+ sig = s.on_candle(_candle(drop_price, hour=22, minute=0))
+ if sig is not None:
+ assert sig.side == OrderSide.SELL
+ assert "stop loss" in sig.reason
+
+
+def test_moc_no_buy_on_bearish_candle():
+ s = _make_strategy(ema_period=3)
+ for i in range(8):
+ s.on_candle(_candle(150, hour=20, minute=i * 3, volume=200.0))
+ # Bearish candle (open > close)
+ s.on_candle(_candle(149, hour=20, minute=30, open_price=151))
+ # May or may not signal depending on other criteria, but bearish should reduce chances
+ # Just verify no crash
+
+
+def test_moc_only_one_buy_per_day():
+ s = _make_strategy(ema_period=3)
+ buy_count = 0
+ for i in range(20):
+ sig = s.on_candle(_candle(150 + i * 0.3, hour=20, minute=i * 2, volume=200.0))
+ if sig is not None and sig.side == OrderSide.BUY:
+ buy_count += 1
+ assert buy_count <= 1
+
+
+def test_moc_reset():
+ s = _make_strategy()
+ s.on_candle(_candle(150, hour=20))
+ s._in_position = True
+ s.reset()
+ assert not s._in_position
+ assert len(s._closes) == 0
+ assert not s._bought_today
diff --git a/services/strategy-engine/tests/test_multi_symbol.py b/services/strategy-engine/tests/test_multi_symbol.py
index cb8088c..671a9d3 100644
--- a/services/strategy-engine/tests/test_multi_symbol.py
+++ b/services/strategy-engine/tests/test_multi_symbol.py
@@ -22,7 +22,7 @@ async def test_engine_processes_multiple_streams():
broker = AsyncMock()
candle_btc = Candle(
- symbol="BTCUSDT",
+ symbol="AAPL",
timeframe="1m",
open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
open=Decimal("50000"),
@@ -32,7 +32,7 @@ async def test_engine_processes_multiple_streams():
volume=Decimal("10"),
)
candle_eth = Candle(
- symbol="ETHUSDT",
+ symbol="MSFT",
timeframe="1m",
open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
open=Decimal("3000"),
@@ -45,16 +45,16 @@ async def test_engine_processes_multiple_streams():
btc_events = [CandleEvent(data=candle_btc).to_dict()]
eth_events = [CandleEvent(data=candle_eth).to_dict()]
- # First call returns BTC event, second ETH, then empty
- call_count = {"btc": 0, "eth": 0}
+ # First call returns AAPL event, second MSFT, then empty
+ call_count = {"aapl": 0, "msft": 0}
async def mock_read(stream, **kwargs):
- if "BTC" in stream:
- call_count["btc"] += 1
- return btc_events if call_count["btc"] == 1 else []
- elif "ETH" in stream:
- call_count["eth"] += 1
- return eth_events if call_count["eth"] == 1 else []
+ if "AAPL" in stream:
+ call_count["aapl"] += 1
+ return btc_events if call_count["aapl"] == 1 else []
+ elif "MSFT" in stream:
+ call_count["msft"] += 1
+ return eth_events if call_count["msft"] == 1 else []
return []
broker.read = AsyncMock(side_effect=mock_read)
@@ -65,8 +65,8 @@ async def test_engine_processes_multiple_streams():
engine = StrategyEngine(broker=broker, strategies=[strategy])
# Process both streams
- await engine.process_once("candles.BTCUSDT", "$")
- await engine.process_once("candles.ETHUSDT", "$")
+ await engine.process_once("candles.AAPL", "$")
+ await engine.process_once("candles.MSFT", "$")
# Strategy should have been called with both candles
assert strategy.on_candle.call_count == 2
diff --git a/services/strategy-engine/tests/test_rsi_strategy.py b/services/strategy-engine/tests/test_rsi_strategy.py
index 2a2f4e7..6d31fd5 100644
--- a/services/strategy-engine/tests/test_rsi_strategy.py
+++ b/services/strategy-engine/tests/test_rsi_strategy.py
@@ -10,7 +10,7 @@ from strategies.rsi_strategy import RsiStrategy
def make_candle(close: float, idx: int = 0) -> Candle:
return Candle(
- symbol="BTC/USDT",
+ symbol="AAPL",
timeframe="1m",
open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
open=Decimal(str(close)),
@@ -43,3 +43,60 @@ def test_rsi_strategy_buy_signal_on_oversold():
# if a signal is returned, it must be a BUY
if signal is not None:
assert signal.side == OrderSide.BUY
+
+
+def test_rsi_detects_bullish_divergence():
+ """Bullish divergence: price makes lower low, RSI makes higher low."""
+ strategy = RsiStrategy()
+ strategy.configure({"period": 5, "oversold": 20, "overbought": 80})
+ strategy._filter_enabled = False # Disable filters to test divergence logic only
+
+ # Sharp consecutive drop to 50 drives RSI near 0 (first swing low).
+ # Big recovery, then gradual decline to 48 (lower price, but RSI > 0 = higher low).
+ prices = [100.0] * 7
+ prices += [85.0, 70.0, 55.0, 50.0]
+ prices += [55.0, 65.0, 80.0, 95.0, 110.0, 120.0, 130.0, 135.0, 140.0, 142.0, 143.0, 144.0]
+ prices += [142.0, 140.0, 138.0, 135.0, 130.0, 125.0, 120.0, 115.0, 110.0, 105.0]
+ prices += [100.0, 95.0, 90.0, 85.0, 80.0, 75.0, 70.0, 65.0, 60.0, 55.0, 50.0, 48.0]
+ prices += [52.0, 58.0]
+
+ signals = []
+ for p in prices:
+ result = strategy.on_candle(make_candle(p))
+ if result is not None:
+ signals.append(result)
+
+ divergence_signals = [s for s in signals if "divergence" in s.reason]
+ assert len(divergence_signals) > 0, "Expected at least one bullish divergence signal"
+ assert divergence_signals[0].side == OrderSide.BUY
+ assert divergence_signals[0].conviction == 0.9
+ assert "bullish divergence" in divergence_signals[0].reason
+
+
+def test_rsi_detects_bearish_divergence():
+ """Bearish divergence: price makes higher high, RSI makes lower high."""
+ strategy = RsiStrategy()
+ strategy.configure({"period": 5, "oversold": 20, "overbought": 80})
+ strategy._filter_enabled = False # Disable filters to test divergence logic only
+
+ # Sharp consecutive rise to 160 drives RSI very high (first swing high).
+ # Deep pullback, then rise to 162 (higher price) but with a dip right before
+ # the peak to dampen RSI (lower high).
+ prices = [100.0] * 7
+ prices += [110.0, 120.0, 130.0, 140.0, 150.0, 160.0]
+ prices += [155.0, 145.0, 130.0, 115.0, 100.0, 90.0, 80.0]
+ prices += [90.0, 100.0, 110.0, 120.0, 130.0, 140.0, 150.0]
+ prices += [145.0, 162.0]
+ prices += [155.0, 148.0]
+
+ signals = []
+ for p in prices:
+ result = strategy.on_candle(make_candle(p))
+ if result is not None:
+ signals.append(result)
+
+ divergence_signals = [s for s in signals if "divergence" in s.reason]
+ assert len(divergence_signals) > 0, "Expected at least one bearish divergence signal"
+ assert divergence_signals[0].side == OrderSide.SELL
+ assert divergence_signals[0].conviction == 0.9
+ assert "bearish divergence" in divergence_signals[0].reason
diff --git a/services/strategy-engine/tests/test_volume_profile_strategy.py b/services/strategy-engine/tests/test_volume_profile_strategy.py
index 71f0eca..65ee2e8 100644
--- a/services/strategy-engine/tests/test_volume_profile_strategy.py
+++ b/services/strategy-engine/tests/test_volume_profile_strategy.py
@@ -10,7 +10,7 @@ from strategies.volume_profile_strategy import VolumeProfileStrategy
def make_candle(close: float, volume: float = 1.0) -> Candle:
return Candle(
- symbol="BTC/USDT",
+ symbol="AAPL",
timeframe="1m",
open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
open=Decimal(str(close)),
@@ -125,3 +125,56 @@ def test_volume_profile_reset_clears_state():
# After reset, should not have enough data
result = strategy.on_candle(make_candle(100.0, 10.0))
assert result is None
+
+
+def test_volume_profile_hvn_detection():
+ """Feed clustered volume at specific price levels to produce HVN nodes."""
+ strategy = VolumeProfileStrategy()
+ strategy.configure({"lookback_period": 20, "num_bins": 10, "value_area_pct": 0.7})
+
+ # Create a profile with very high volume at price ~100 and low volume elsewhere
+ # Prices range from 90 to 110, heavy volume concentrated at 100
+ candles_data = []
+ # Low volume at extremes
+ for p in [90, 91, 92, 109, 110]:
+ candles_data.append((p, 1.0))
+ # Very high volume around 100
+ for _ in range(15):
+ candles_data.append((100, 100.0))
+
+ for price, vol in candles_data:
+ strategy.on_candle(make_candle(price, vol))
+
+ # Access the internal method to verify HVN detection
+ result = strategy._compute_value_area()
+ assert result is not None
+ poc, va_low, va_high, hvn_levels, lvn_levels = result
+
+ # The bin containing price ~100 should have very high volume -> HVN
+ assert len(hvn_levels) > 0
+ # At least one HVN should be near 100
+ assert any(abs(h - 100) < 5 for h in hvn_levels)
+
+
+def test_volume_profile_reset_thorough():
+ """Verify all state is cleared on reset."""
+ strategy = VolumeProfileStrategy()
+ strategy.configure({"lookback_period": 10, "num_bins": 5})
+
+ # Build up state
+ for _ in range(10):
+ strategy.on_candle(make_candle(100.0, 10.0))
+ # Set below/above VA flags
+ strategy.on_candle(make_candle(50.0, 1.0)) # below VA
+ strategy.on_candle(make_candle(200.0, 1.0)) # above VA
+
+ strategy.reset()
+
+ # Verify all state cleared
+ assert len(strategy._candles) == 0
+ assert strategy._was_below_va is False
+ assert strategy._was_above_va is False
+
+ # Should not produce signal since no data
+ result = strategy.on_candle(make_candle(100.0, 10.0))
+ assert result is None
diff --git a/services/strategy-engine/tests/test_vwap_strategy.py b/services/strategy-engine/tests/test_vwap_strategy.py
index 5d76b04..2c34b01 100644
--- a/services/strategy-engine/tests/test_vwap_strategy.py
+++ b/services/strategy-engine/tests/test_vwap_strategy.py
@@ -13,15 +13,18 @@ def make_candle(
high: float | None = None,
low: float | None = None,
volume: float = 1.0,
+ open_time: datetime | None = None,
) -> Candle:
if high is None:
high = close
if low is None:
low = close
+ if open_time is None:
+ open_time = datetime(2024, 1, 1, tzinfo=timezone.utc)
return Candle(
- symbol="BTC/USDT",
+ symbol="AAPL",
timeframe="1m",
- open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ open_time=open_time,
open=Decimal(str(close)),
high=Decimal(str(high)),
low=Decimal(str(low)),
@@ -99,3 +102,46 @@ def test_vwap_reset_clears_state():
assert strategy._candle_count == 0
assert strategy._was_below_vwap is False
assert strategy._was_above_vwap is False
+ assert strategy._current_date is None
+ assert len(strategy._tp_values) == 0
+ assert len(strategy._vwap_values) == 0
+
+
+def test_vwap_daily_reset():
+ """Candles from two different dates cause VWAP to reset."""
+ strategy = _configured_strategy()
+
+ day1 = datetime(2024, 1, 1, tzinfo=timezone.utc)
+ day2 = datetime(2024, 1, 2, tzinfo=timezone.utc)
+
+ # Feed 35 candles on day 1 to build VWAP state
+ for i in range(35):
+ strategy.on_candle(make_candle(100.0, high=101.0, low=99.0, open_time=day1))
+
+ # Verify state is built up
+ assert strategy._candle_count == 35
+ assert strategy._cumulative_vol > 0
+ assert strategy._current_date == "2024-01-01"
+
+ # Feed first candle of day 2 — should reset
+ strategy.on_candle(make_candle(100.0, high=101.0, low=99.0, open_time=day2))
+
+ # After reset, candle_count should be 1 (the new candle)
+ assert strategy._candle_count == 1
+ assert strategy._current_date == "2024-01-02"
+
+
+def test_vwap_reset_clears_date():
+ """Verify reset() clears _current_date and deviation band state."""
+ strategy = _configured_strategy()
+
+ for _ in range(35):
+ strategy.on_candle(make_candle(100.0))
+
+ assert strategy._current_date is not None
+
+ strategy.reset()
+
+ assert strategy._current_date is None
+ assert len(strategy._tp_values) == 0
+ assert len(strategy._vwap_values) == 0