summaryrefslogtreecommitdiff
path: root/services/strategy-engine/tests
diff options
context:
space:
mode:
Diffstat (limited to 'services/strategy-engine/tests')
-rw-r--r--services/strategy-engine/tests/test_base_filters.py21
-rw-r--r--services/strategy-engine/tests/test_bollinger_strategy.py80
-rw-r--r--services/strategy-engine/tests/test_combined_strategy.py59
-rw-r--r--services/strategy-engine/tests/test_ema_crossover_strategy.py122
-rw-r--r--services/strategy-engine/tests/test_engine.py10
-rw-r--r--services/strategy-engine/tests/test_grid_strategy.py40
-rw-r--r--services/strategy-engine/tests/test_indicators.py2
-rw-r--r--services/strategy-engine/tests/test_macd_strategy.py62
-rw-r--r--services/strategy-engine/tests/test_moc_strategy.py152
-rw-r--r--services/strategy-engine/tests/test_multi_symbol.py24
-rw-r--r--services/strategy-engine/tests/test_rsi_strategy.py59
-rw-r--r--services/strategy-engine/tests/test_volume_profile_strategy.py55
-rw-r--r--services/strategy-engine/tests/test_vwap_strategy.py50
13 files changed, 700 insertions, 36 deletions
diff --git a/services/strategy-engine/tests/test_base_filters.py b/services/strategy-engine/tests/test_base_filters.py
index 97d9e16..ae9ca05 100644
--- a/services/strategy-engine/tests/test_base_filters.py
+++ b/services/strategy-engine/tests/test_base_filters.py
@@ -1,11 +1,12 @@
"""Tests for BaseStrategy filters (ADX, volume, ATR stops)."""
+
import sys
from pathlib import Path
+
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
from decimal import Decimal
from datetime import datetime, timezone
-import pytest
from shared.models import Candle, Signal, OrderSide
from strategies.base import BaseStrategy
@@ -28,9 +29,12 @@ class DummyStrategy(BaseStrategy):
def on_candle(self, candle: Candle) -> Signal | None:
self._update_filter_data(candle)
signal = Signal(
- strategy=self.name, symbol=candle.symbol,
- side=OrderSide.BUY, price=candle.close,
- quantity=self._quantity, reason="test",
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.BUY,
+ price=candle.close,
+ quantity=self._quantity,
+ reason="test",
)
return self._apply_filters(signal)
@@ -39,10 +43,13 @@ def _candle(price=100.0, volume=10.0, high=None, low=None):
h = high if high is not None else price + 5
lo = low if low is not None else price - 5
return Candle(
- symbol="BTCUSDT", timeframe="1h",
+ symbol="AAPL",
+ timeframe="1h",
open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
- open=Decimal(str(price)), high=Decimal(str(h)),
- low=Decimal(str(lo)), close=Decimal(str(price)),
+ open=Decimal(str(price)),
+ high=Decimal(str(h)),
+ low=Decimal(str(lo)),
+ close=Decimal(str(price)),
volume=Decimal(str(volume)),
)
diff --git a/services/strategy-engine/tests/test_bollinger_strategy.py b/services/strategy-engine/tests/test_bollinger_strategy.py
index 348a9e0..8261377 100644
--- a/services/strategy-engine/tests/test_bollinger_strategy.py
+++ b/services/strategy-engine/tests/test_bollinger_strategy.py
@@ -10,7 +10,7 @@ from strategies.bollinger_strategy import BollingerStrategy
def make_candle(close: float) -> Candle:
return Candle(
- symbol="BTC/USDT",
+ symbol="AAPL",
timeframe="1m",
open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
open=Decimal(str(close)),
@@ -23,7 +23,7 @@ def make_candle(close: float) -> Candle:
def _make_strategy() -> BollingerStrategy:
s = BollingerStrategy()
- s.configure({"period": 5, "num_std": 1.0, "min_bandwidth": 0.0})
+ s.configure({"period": 5, "num_std": 1.0, "min_bandwidth": 0.0, "squeeze_threshold": 0.0})
return s
@@ -99,3 +99,79 @@ def test_bollinger_reset_clears_state():
assert len(strategy._closes) == 1
assert strategy._was_below_lower is False
assert strategy._was_above_upper is False
+ assert strategy._in_squeeze is False
+ assert strategy._squeeze_bars == 0
+
+
+def test_bollinger_squeeze_detection():
+ """Tight bandwidth → no signal during squeeze."""
+ # Use a strategy with a high squeeze threshold so constant prices trigger squeeze
+ s = BollingerStrategy()
+ s.configure(
+ {
+ "period": 5,
+ "num_std": 2.0,
+ "min_bandwidth": 0.0,
+ "squeeze_threshold": 0.5, # Very high threshold to ensure squeeze triggers
+ }
+ )
+
+ # Feed identical prices → bandwidth = 0 (below any threshold)
+ for _ in range(6):
+ result = s.on_candle(make_candle(100.0))
+
+ # With identical prices, std=0, bandwidth=0 < 0.5 → squeeze, no signal
+ assert s._in_squeeze is True
+ assert result is None
+
+
+def test_bollinger_squeeze_breakout_buy():
+ """Squeeze ends with price above SMA → BUY signal."""
+ s = BollingerStrategy()
+ s.configure(
+ {
+ "period": 5,
+ "num_std": 1.0,
+ "min_bandwidth": 0.0,
+ "squeeze_threshold": 0.01,
+ }
+ )
+
+ # Feed identical prices to create a squeeze (bandwidth = 0)
+ for _ in range(6):
+ s.on_candle(make_candle(100.0))
+
+ assert s._in_squeeze is True
+
+ # Now feed a price that creates enough spread to exit squeeze AND is above SMA
+ signal = s.on_candle(make_candle(120.0))
+ assert signal is not None
+ assert signal.side == OrderSide.BUY
+ assert "squeeze breakout UP" in signal.reason
+
+
+def test_bollinger_pct_b_conviction():
+ """Signals near band extremes have higher conviction via %B."""
+ s = BollingerStrategy()
+ s.configure(
+ {
+ "period": 5,
+ "num_std": 1.0,
+ "min_bandwidth": 0.0,
+ "squeeze_threshold": 0.0, # Disable squeeze for this test
+ }
+ )
+
+ # Build up with stable prices
+ for _ in range(5):
+ s.on_candle(make_candle(100.0))
+
+ # Drop below lower band
+ s.on_candle(make_candle(50.0))
+
+ # Recover just at the lower band edge — %B close to 0 → high conviction
+ signal = s.on_candle(make_candle(100.0))
+ assert signal is not None
+ assert signal.side == OrderSide.BUY
+ # conviction = max(1.0 - pct_b, 0.3), with pct_b near lower → conviction should be >= 0.3
+ assert signal.conviction >= 0.3
diff --git a/services/strategy-engine/tests/test_combined_strategy.py b/services/strategy-engine/tests/test_combined_strategy.py
index 3408a89..8a4dc74 100644
--- a/services/strategy-engine/tests/test_combined_strategy.py
+++ b/services/strategy-engine/tests/test_combined_strategy.py
@@ -72,7 +72,7 @@ class NeutralStrategy(BaseStrategy):
def _candle(price=100.0):
return Candle(
- symbol="BTCUSDT",
+ symbol="AAPL",
timeframe="1m",
open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
open=Decimal(str(price)),
@@ -167,3 +167,60 @@ def test_combined_invalid_weight():
c.configure({})
with pytest.raises(ValueError):
c.add_strategy(AlwaysBuyStrategy(), weight=-1.0)
+
+
+def test_combined_record_result():
+ """Verify trade history tracking works correctly."""
+ c = CombinedStrategy()
+ c.configure({"adaptive_weights": True, "history_window": 5})
+
+ c.record_result("test_strat", True)
+ c.record_result("test_strat", False)
+ c.record_result("test_strat", True)
+
+ assert len(c._trade_history["test_strat"]) == 3
+ assert c._trade_history["test_strat"] == [True, False, True]
+
+ # Fill beyond window size to test trimming
+ for _ in range(5):
+ c.record_result("test_strat", False)
+
+ assert len(c._trade_history["test_strat"]) == 5 # Trimmed to history_window
+
+
+def test_combined_adaptive_weight_increases_for_winners():
+ """Strategy with high win rate gets higher effective weight."""
+ c = CombinedStrategy()
+ c.configure({"threshold": 0.3, "adaptive_weights": True, "history_window": 20})
+ c.add_strategy(AlwaysBuyStrategy(), weight=1.0)
+
+ # Record high win rate for always_buy (80% wins)
+ for _ in range(8):
+ c.record_result("always_buy", True)
+ for _ in range(2):
+ c.record_result("always_buy", False)
+
+ # Adaptive weight should be > base weight (1.0)
+ adaptive_w = c._get_adaptive_weight("always_buy", 1.0)
+ assert adaptive_w > 1.0
+ # 80% win rate -> scale = 0.5 + 0.8 = 1.3 -> weight = 1.3
+ assert abs(adaptive_w - 1.3) < 0.01
+
+
+def test_combined_adaptive_weight_decreases_for_losers():
+ """Strategy with low win rate gets lower effective weight."""
+ c = CombinedStrategy()
+ c.configure({"threshold": 0.3, "adaptive_weights": True, "history_window": 20})
+ c.add_strategy(AlwaysBuyStrategy(), weight=1.0)
+
+ # Record low win rate for always_buy (20% wins)
+ for _ in range(2):
+ c.record_result("always_buy", True)
+ for _ in range(8):
+ c.record_result("always_buy", False)
+
+ # Adaptive weight should be < base weight (1.0)
+ adaptive_w = c._get_adaptive_weight("always_buy", 1.0)
+ assert adaptive_w < 1.0
+ # 20% win rate -> scale = 0.5 + 0.2 = 0.7 -> weight = 0.7
+ assert abs(adaptive_w - 0.7) < 0.01
diff --git a/services/strategy-engine/tests/test_ema_crossover_strategy.py b/services/strategy-engine/tests/test_ema_crossover_strategy.py
index 0cf767b..7028eb0 100644
--- a/services/strategy-engine/tests/test_ema_crossover_strategy.py
+++ b/services/strategy-engine/tests/test_ema_crossover_strategy.py
@@ -10,7 +10,7 @@ from strategies.ema_crossover_strategy import EmaCrossoverStrategy
def make_candle(close: float) -> Candle:
return Candle(
- symbol="BTC/USDT",
+ symbol="AAPL",
timeframe="1m",
open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
open=Decimal(str(close)),
@@ -21,9 +21,18 @@ def make_candle(close: float) -> Candle:
)
-def _make_strategy(short: int = 3, long: int = 6) -> EmaCrossoverStrategy:
+def _make_strategy(
+ short: int = 3, long: int = 6, pullback_enabled: bool = False
+) -> EmaCrossoverStrategy:
s = EmaCrossoverStrategy()
- s.configure({"short_period": short, "long_period": long, "quantity": "0.01"})
+ s.configure(
+ {
+ "short_period": short,
+ "long_period": long,
+ "quantity": "0.01",
+ "pullback_enabled": pullback_enabled,
+ }
+ )
return s
@@ -97,3 +106,110 @@ def test_ema_reset_clears_state():
# Internal state should be cleared
assert len(strategy._closes) == 1
assert strategy._prev_short_above is None
+ assert strategy._pending_signal is None
+
+
+def test_ema_pullback_entry():
+ """Crossover detected, then pullback to short EMA triggers signal."""
+ strategy = EmaCrossoverStrategy()
+ strategy.configure(
+ {
+ "short_period": 3,
+ "long_period": 6,
+ "quantity": "0.01",
+ "pullback_enabled": True,
+ "pullback_tolerance": 0.05, # 5% tolerance for test simplicity
+ }
+ )
+
+ # Declining prices so short EMA stays below long EMA
+ declining = [100, 98, 96, 94, 92, 90, 88, 86, 84, 82]
+ for price in declining:
+ strategy.on_candle(make_candle(price))
+
+ # Sharp rise to force golden cross — with pullback enabled, no signal yet
+ rising = [120, 140, 160]
+ for price in rising:
+ strategy.on_candle(make_candle(price))
+
+ # With pullback enabled, crossover should NOT produce immediate signal
+ # but _pending_signal should be set
+ assert strategy._pending_signal == "BUY"
+
+ # Now feed a candle whose close is near the short EMA (pullback)
+ # The short EMA will be tracking recent prices; feed a price that pulls back
+ # toward it. We use a moderate price to get close to short EMA.
+ import pandas as pd
+
+ series = pd.Series(list(strategy._closes))
+ short_ema_val = series.ewm(span=3, adjust=False).mean().iloc[-1]
+ # Feed a candle at approximately the short EMA value
+ result = strategy.on_candle(make_candle(short_ema_val))
+ assert result is not None
+ assert result.side == OrderSide.BUY
+ assert "pullback" in result.reason
+
+
+def test_ema_pullback_cancelled_on_reversal():
+ """Crossover detected, then reversal cancels the pending signal."""
+ strategy = EmaCrossoverStrategy()
+ strategy.configure(
+ {
+ "short_period": 3,
+ "long_period": 6,
+ "quantity": "0.01",
+ "pullback_enabled": True,
+ "pullback_tolerance": 0.001, # Very tight tolerance — won't trigger easily
+ }
+ )
+
+ # Declining prices
+ declining = [100, 98, 96, 94, 92, 90, 88, 86, 84, 82]
+ for price in declining:
+ strategy.on_candle(make_candle(price))
+
+ # Sharp rise to force golden cross
+ for price in [120, 140, 160]:
+ strategy.on_candle(make_candle(price))
+
+ assert strategy._pending_signal == "BUY"
+
+ # Now sharp decline to reverse the crossover (death cross)
+ for price in [60, 40, 20]:
+ strategy.on_candle(make_candle(price))
+
+ # The BUY pending signal should be cancelled because short EMA fell below long EMA.
+ # A new death cross may set _pending_signal to "SELL", but the original "BUY" is gone.
+ assert strategy._pending_signal != "BUY"
+
+
+def test_ema_immediate_mode():
+ """With pullback_enabled=False, original immediate entry works."""
+ strategy = EmaCrossoverStrategy()
+ strategy.configure(
+ {
+ "short_period": 3,
+ "long_period": 6,
+ "quantity": "0.01",
+ "pullback_enabled": False,
+ }
+ )
+
+ # Declining prices so short EMA stays below long EMA
+ declining = [100, 98, 96, 94, 92, 90, 88, 86, 84, 82]
+ for price in declining:
+ strategy.on_candle(make_candle(price))
+
+ # Sharp rise to force golden cross — immediate mode should fire signal
+ rising = [120, 140, 160]
+ signal = None
+ for price in rising:
+ result = strategy.on_candle(make_candle(price))
+ if result is not None:
+ signal = result
+
+ assert signal is not None
+ assert signal.side == OrderSide.BUY
+ assert "Golden Cross" in signal.reason
+ # No pending signal should be set
+ assert strategy._pending_signal is None
diff --git a/services/strategy-engine/tests/test_engine.py b/services/strategy-engine/tests/test_engine.py
index ac9a596..2623027 100644
--- a/services/strategy-engine/tests/test_engine.py
+++ b/services/strategy-engine/tests/test_engine.py
@@ -13,7 +13,7 @@ from strategy_engine.engine import StrategyEngine
def make_candle_event() -> dict:
candle = Candle(
- symbol="BTC/USDT",
+ symbol="AAPL",
timeframe="1m",
open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
open=Decimal("50000"),
@@ -28,7 +28,7 @@ def make_candle_event() -> dict:
def make_signal() -> Signal:
return Signal(
strategy="test",
- symbol="BTC/USDT",
+ symbol="AAPL",
side=OrderSide.BUY,
price=Decimal("50050"),
quantity=Decimal("0.01"),
@@ -46,12 +46,12 @@ async def test_engine_dispatches_candle_to_strategies():
strategy.on_candle = MagicMock(return_value=None)
engine = StrategyEngine(broker=broker, strategies=[strategy])
- await engine.process_once("candles.BTC_USDT", "0")
+ await engine.process_once("candles.AAPL", "0")
strategy.on_candle.assert_called_once()
candle_arg = strategy.on_candle.call_args[0][0]
assert isinstance(candle_arg, Candle)
- assert candle_arg.symbol == "BTC/USDT"
+ assert candle_arg.symbol == "AAPL"
@pytest.mark.asyncio
@@ -64,7 +64,7 @@ async def test_engine_publishes_signal_when_strategy_returns_one():
strategy.on_candle = MagicMock(return_value=make_signal())
engine = StrategyEngine(broker=broker, strategies=[strategy])
- await engine.process_once("candles.BTC_USDT", "0")
+ await engine.process_once("candles.AAPL", "0")
broker.publish.assert_called_once()
call_args = broker.publish.call_args
diff --git a/services/strategy-engine/tests/test_grid_strategy.py b/services/strategy-engine/tests/test_grid_strategy.py
index 79eb22a..878b900 100644
--- a/services/strategy-engine/tests/test_grid_strategy.py
+++ b/services/strategy-engine/tests/test_grid_strategy.py
@@ -10,7 +10,7 @@ from strategies.grid_strategy import GridStrategy
def make_candle(close: float) -> Candle:
return Candle(
- symbol="BTC/USDT",
+ symbol="AAPL",
timeframe="1m",
open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
open=Decimal(str(close)),
@@ -60,3 +60,41 @@ def test_grid_strategy_no_signal_in_same_zone():
strategy.on_candle(make_candle(50000))
signal = strategy.on_candle(make_candle(50100))
assert signal is None
+
+
+def test_grid_exit_on_trend_break():
+ """Price drops well below grid range → SELL signal emitted."""
+ strategy = _configured_strategy()
+ # Grid range is 48000-52000, exit_threshold_pct defaults to 5%
+ # Lower bound = 48000 * 0.95 = 45600
+ # Establish a zone first
+ strategy.on_candle(make_candle(50000))
+ # Price drops far below the grid range
+ signal = strategy.on_candle(make_candle(45000))
+ assert signal is not None
+ assert signal.side == OrderSide.SELL
+ assert "broke out of range" in signal.reason
+
+
+def test_grid_no_signal_while_out_of_range():
+ """After exit signal, no more grid signals until price returns to range."""
+ strategy = _configured_strategy()
+ # Establish a zone
+ strategy.on_candle(make_candle(50000))
+ # First out-of-range candle → SELL exit signal
+ signal = strategy.on_candle(make_candle(45000))
+ assert signal is not None
+ assert signal.side == OrderSide.SELL
+
+ # Subsequent out-of-range candles → no signals
+ signal = strategy.on_candle(make_candle(44000))
+ assert signal is None
+
+ signal = strategy.on_candle(make_candle(43000))
+ assert signal is None
+
+ # Price returns to grid range → grid signals resume
+ strategy.on_candle(make_candle(50000))
+ signal = strategy.on_candle(make_candle(48100))
+ assert signal is not None
+ assert signal.side == OrderSide.BUY
diff --git a/services/strategy-engine/tests/test_indicators.py b/services/strategy-engine/tests/test_indicators.py
index ac5b505..481569b 100644
--- a/services/strategy-engine/tests/test_indicators.py
+++ b/services/strategy-engine/tests/test_indicators.py
@@ -1,6 +1,8 @@
"""Tests for technical indicator library."""
+
import sys
from pathlib import Path
+
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
import pandas as pd
diff --git a/services/strategy-engine/tests/test_macd_strategy.py b/services/strategy-engine/tests/test_macd_strategy.py
index 9931b43..556fd4c 100644
--- a/services/strategy-engine/tests/test_macd_strategy.py
+++ b/services/strategy-engine/tests/test_macd_strategy.py
@@ -10,7 +10,7 @@ from strategies.macd_strategy import MacdStrategy
def _candle(price: float) -> Candle:
return Candle(
- symbol="BTC/USDT",
+ symbol="AAPL",
timeframe="1m",
open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
open=Decimal(str(price)),
@@ -78,3 +78,63 @@ def test_macd_reset_clears_state():
s.reset()
assert len(s._closes) == 0
assert s._prev_histogram is None
+ assert s._prev_macd is None
+ assert s._prev_signal is None
+
+
+def test_macd_signal_line_crossover():
+ """Test that MACD signal-line crossover generates signals."""
+ s = _make_strategy()
+ # Declining then rising prices should produce a signal-line bullish crossover
+ prices = [100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88]
+ prices += [89, 91, 94, 98, 103, 109, 116, 124, 133, 143]
+ signals = []
+ for p in prices:
+ result = s.on_candle(_candle(float(p)))
+ if result is not None:
+ signals.append(result)
+
+ buy_signals = [sig for sig in signals if sig.side == OrderSide.BUY]
+ assert len(buy_signals) > 0, "Expected at least one BUY signal"
+ # Check that at least one is a signal-line crossover or histogram crossover
+ all_reasons = [sig.reason for sig in buy_signals]
+ assert any("crossover" in r for r in all_reasons), (
+ f"Expected crossover signal, got: {all_reasons}"
+ )
+
+
+def test_macd_conviction_varies_with_distance():
+ """Test that conviction varies based on MACD distance from zero line."""
+ s1 = _make_strategy()
+ s2 = _make_strategy()
+
+ # Small price movements -> MACD near zero -> lower conviction
+ small_prices = [100, 99.5, 99, 98.5, 98, 97.5, 97, 96.5, 96, 95.5, 95, 94.5, 94]
+ small_prices += [94.5, 95, 95.5, 96, 96.5, 97, 97.5, 98, 98.5, 99]
+ small_signals = []
+ for p in small_prices:
+ result = s1.on_candle(_candle(float(p)))
+ if result is not None:
+ small_signals.append(result)
+
+ # Large price movements -> MACD far from zero -> higher conviction
+ large_prices = [100, 95, 90, 85, 80, 75, 70, 65, 60, 55, 50, 45, 40]
+ large_prices += [45, 55, 70, 90, 115, 145, 180, 220, 265, 315]
+ large_signals = []
+ for p in large_prices:
+ result = s2.on_candle(_candle(float(p)))
+ if result is not None:
+ large_signals.append(result)
+
+ # Both should produce signals
+ assert len(small_signals) > 0, "Expected signals from small movements"
+ assert len(large_signals) > 0, "Expected signals from large movements"
+
+ # The large-movement signals should generally have higher conviction
+ # (or at least different conviction, since distance from zero affects it)
+ small_conv = small_signals[-1].conviction
+ large_conv = large_signals[-1].conviction
+ # Large movements should produce conviction >= small movements
+ assert large_conv >= small_conv, (
+ f"Expected large movement conviction ({large_conv}) >= small ({small_conv})"
+ )
diff --git a/services/strategy-engine/tests/test_moc_strategy.py b/services/strategy-engine/tests/test_moc_strategy.py
new file mode 100644
index 0000000..1928a28
--- /dev/null
+++ b/services/strategy-engine/tests/test_moc_strategy.py
@@ -0,0 +1,152 @@
+"""Tests for MOC (Market on Close) strategy."""
+
+import sys
+from pathlib import Path
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
+
+from datetime import datetime, timezone
+from decimal import Decimal
+
+from shared.models import Candle, OrderSide
+from strategies.moc_strategy import MocStrategy
+
+
+def _candle(price, hour=20, minute=0, volume=100.0, day=1, open_price=None):
+ op = open_price if open_price is not None else price - 1 # Default: bullish
+ return Candle(
+ symbol="AAPL",
+ timeframe="5Min",
+ open_time=datetime(2025, 1, day, hour, minute, tzinfo=timezone.utc),
+ open=Decimal(str(op)),
+ high=Decimal(str(price + 1)),
+ low=Decimal(str(min(op, price) - 1)),
+ close=Decimal(str(price)),
+ volume=Decimal(str(volume)),
+ )
+
+
+def _make_strategy(**overrides):
+ s = MocStrategy()
+ params = {
+ "quantity_pct": 0.2,
+ "stop_loss_pct": 2.0,
+ "rsi_min": 30,
+ "rsi_max": 70, # Wider for tests
+ "ema_period": 5,
+ "volume_avg_period": 5,
+ "min_volume_ratio": 0.5,
+ "buy_start_utc": 19,
+ "buy_end_utc": 21,
+ "sell_start_utc": 13,
+ "sell_end_utc": 15,
+ "max_positions": 5,
+ }
+ params.update(overrides)
+ s.configure(params)
+ return s
+
+
+def test_moc_warmup_period():
+ s = _make_strategy(ema_period=20, volume_avg_period=15)
+ assert s.warmup_period == 21
+
+
+def test_moc_no_signal_outside_buy_window():
+ s = _make_strategy()
+ # Hour 12 UTC — not in buy (19-21) or sell (13-15) window
+ for i in range(10):
+ sig = s.on_candle(_candle(150 + i, hour=12, minute=i * 5))
+ assert sig is None
+
+
+def test_moc_buy_signal_in_window():
+ s = _make_strategy(ema_period=3)
+ # Build up history with some oscillation so RSI settles in the 30-70 range
+ prices = [
+ 150,
+ 149,
+ 151,
+ 148,
+ 152,
+ 149,
+ 150,
+ 151,
+ 148,
+ 150,
+ 149,
+ 151,
+ 150,
+ 152,
+ 151,
+ 153,
+ 152,
+ 154,
+ 153,
+ 155,
+ ]
+ signals = []
+ for i, p in enumerate(prices):
+ sig = s.on_candle(_candle(p, hour=20, minute=i * 2, volume=200.0))
+ if sig is not None:
+ signals.append(sig)
+ buy_signals = [sig for sig in signals if sig.side == OrderSide.BUY]
+ assert len(buy_signals) > 0
+ assert buy_signals[0].strategy == "moc"
+
+
+def test_moc_sell_at_open():
+ s = _make_strategy(ema_period=3)
+ # Force entry
+ for i in range(10):
+ s.on_candle(_candle(150 + i, hour=20, minute=i * 3, volume=200.0))
+
+ if s._in_position:
+ # Next day, sell window
+ sig = s.on_candle(_candle(155, hour=14, minute=0, day=2))
+ assert sig is not None
+ assert sig.side == OrderSide.SELL
+ assert "MOC sell" in sig.reason
+
+
+def test_moc_stop_loss():
+ s = _make_strategy(ema_period=3, stop_loss_pct=1.0)
+ for i in range(10):
+ s.on_candle(_candle(150 + i, hour=20, minute=i * 3, volume=200.0))
+
+ if s._in_position:
+ drop_price = s._entry_price * 0.98 # -2%
+ sig = s.on_candle(_candle(drop_price, hour=22, minute=0))
+ if sig is not None:
+ assert sig.side == OrderSide.SELL
+ assert "stop loss" in sig.reason
+
+
+def test_moc_no_buy_on_bearish_candle():
+ s = _make_strategy(ema_period=3)
+ for i in range(8):
+ s.on_candle(_candle(150, hour=20, minute=i * 3, volume=200.0))
+ # Bearish candle (open > close)
+ s.on_candle(_candle(149, hour=20, minute=30, open_price=151))
+ # May or may not signal depending on other criteria, but bearish should reduce chances
+ # Just verify no crash
+
+
+def test_moc_only_one_buy_per_day():
+ s = _make_strategy(ema_period=3)
+ buy_count = 0
+ for i in range(20):
+ sig = s.on_candle(_candle(150 + i * 0.3, hour=20, minute=i * 2, volume=200.0))
+ if sig is not None and sig.side == OrderSide.BUY:
+ buy_count += 1
+ assert buy_count <= 1
+
+
+def test_moc_reset():
+ s = _make_strategy()
+ s.on_candle(_candle(150, hour=20))
+ s._in_position = True
+ s.reset()
+ assert not s._in_position
+ assert len(s._closes) == 0
+ assert not s._bought_today
diff --git a/services/strategy-engine/tests/test_multi_symbol.py b/services/strategy-engine/tests/test_multi_symbol.py
index cb8088c..671a9d3 100644
--- a/services/strategy-engine/tests/test_multi_symbol.py
+++ b/services/strategy-engine/tests/test_multi_symbol.py
@@ -22,7 +22,7 @@ async def test_engine_processes_multiple_streams():
broker = AsyncMock()
candle_btc = Candle(
- symbol="BTCUSDT",
+ symbol="AAPL",
timeframe="1m",
open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
open=Decimal("50000"),
@@ -32,7 +32,7 @@ async def test_engine_processes_multiple_streams():
volume=Decimal("10"),
)
candle_eth = Candle(
- symbol="ETHUSDT",
+ symbol="MSFT",
timeframe="1m",
open_time=datetime(2025, 1, 1, tzinfo=timezone.utc),
open=Decimal("3000"),
@@ -45,16 +45,16 @@ async def test_engine_processes_multiple_streams():
btc_events = [CandleEvent(data=candle_btc).to_dict()]
eth_events = [CandleEvent(data=candle_eth).to_dict()]
- # First call returns BTC event, second ETH, then empty
- call_count = {"btc": 0, "eth": 0}
+ # First call returns AAPL event, second MSFT, then empty
+ call_count = {"aapl": 0, "msft": 0}
async def mock_read(stream, **kwargs):
- if "BTC" in stream:
- call_count["btc"] += 1
- return btc_events if call_count["btc"] == 1 else []
- elif "ETH" in stream:
- call_count["eth"] += 1
- return eth_events if call_count["eth"] == 1 else []
+ if "AAPL" in stream:
+ call_count["aapl"] += 1
+ return btc_events if call_count["aapl"] == 1 else []
+ elif "MSFT" in stream:
+ call_count["msft"] += 1
+ return eth_events if call_count["msft"] == 1 else []
return []
broker.read = AsyncMock(side_effect=mock_read)
@@ -65,8 +65,8 @@ async def test_engine_processes_multiple_streams():
engine = StrategyEngine(broker=broker, strategies=[strategy])
# Process both streams
- await engine.process_once("candles.BTCUSDT", "$")
- await engine.process_once("candles.ETHUSDT", "$")
+ await engine.process_once("candles.AAPL", "$")
+ await engine.process_once("candles.MSFT", "$")
# Strategy should have been called with both candles
assert strategy.on_candle.call_count == 2
diff --git a/services/strategy-engine/tests/test_rsi_strategy.py b/services/strategy-engine/tests/test_rsi_strategy.py
index 2a2f4e7..6d31fd5 100644
--- a/services/strategy-engine/tests/test_rsi_strategy.py
+++ b/services/strategy-engine/tests/test_rsi_strategy.py
@@ -10,7 +10,7 @@ from strategies.rsi_strategy import RsiStrategy
def make_candle(close: float, idx: int = 0) -> Candle:
return Candle(
- symbol="BTC/USDT",
+ symbol="AAPL",
timeframe="1m",
open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
open=Decimal(str(close)),
@@ -43,3 +43,60 @@ def test_rsi_strategy_buy_signal_on_oversold():
# if a signal is returned, it must be a BUY
if signal is not None:
assert signal.side == OrderSide.BUY
+
+
+def test_rsi_detects_bullish_divergence():
+ """Bullish divergence: price makes lower low, RSI makes higher low."""
+ strategy = RsiStrategy()
+ strategy.configure({"period": 5, "oversold": 20, "overbought": 80})
+ strategy._filter_enabled = False # Disable filters to test divergence logic only
+
+ # Sharp consecutive drop to 50 drives RSI near 0 (first swing low).
+ # Big recovery, then gradual decline to 48 (lower price, but RSI > 0 = higher low).
+ prices = [100.0] * 7
+ prices += [85.0, 70.0, 55.0, 50.0]
+ prices += [55.0, 65.0, 80.0, 95.0, 110.0, 120.0, 130.0, 135.0, 140.0, 142.0, 143.0, 144.0]
+ prices += [142.0, 140.0, 138.0, 135.0, 130.0, 125.0, 120.0, 115.0, 110.0, 105.0]
+ prices += [100.0, 95.0, 90.0, 85.0, 80.0, 75.0, 70.0, 65.0, 60.0, 55.0, 50.0, 48.0]
+ prices += [52.0, 58.0]
+
+ signals = []
+ for p in prices:
+ result = strategy.on_candle(make_candle(p))
+ if result is not None:
+ signals.append(result)
+
+ divergence_signals = [s for s in signals if "divergence" in s.reason]
+ assert len(divergence_signals) > 0, "Expected at least one bullish divergence signal"
+ assert divergence_signals[0].side == OrderSide.BUY
+ assert divergence_signals[0].conviction == 0.9
+ assert "bullish divergence" in divergence_signals[0].reason
+
+
+def test_rsi_detects_bearish_divergence():
+ """Bearish divergence: price makes higher high, RSI makes lower high."""
+ strategy = RsiStrategy()
+ strategy.configure({"period": 5, "oversold": 20, "overbought": 80})
+ strategy._filter_enabled = False # Disable filters to test divergence logic only
+
+ # Sharp consecutive rise to 160 drives RSI very high (first swing high).
+ # Deep pullback, then rise to 162 (higher price) but with a dip right before
+ # the peak to dampen RSI (lower high).
+ prices = [100.0] * 7
+ prices += [110.0, 120.0, 130.0, 140.0, 150.0, 160.0]
+ prices += [155.0, 145.0, 130.0, 115.0, 100.0, 90.0, 80.0]
+ prices += [90.0, 100.0, 110.0, 120.0, 130.0, 140.0, 150.0]
+ prices += [145.0, 162.0]
+ prices += [155.0, 148.0]
+
+ signals = []
+ for p in prices:
+ result = strategy.on_candle(make_candle(p))
+ if result is not None:
+ signals.append(result)
+
+ divergence_signals = [s for s in signals if "divergence" in s.reason]
+ assert len(divergence_signals) > 0, "Expected at least one bearish divergence signal"
+ assert divergence_signals[0].side == OrderSide.SELL
+ assert divergence_signals[0].conviction == 0.9
+ assert "bearish divergence" in divergence_signals[0].reason
diff --git a/services/strategy-engine/tests/test_volume_profile_strategy.py b/services/strategy-engine/tests/test_volume_profile_strategy.py
index 71f0eca..65ee2e8 100644
--- a/services/strategy-engine/tests/test_volume_profile_strategy.py
+++ b/services/strategy-engine/tests/test_volume_profile_strategy.py
@@ -10,7 +10,7 @@ from strategies.volume_profile_strategy import VolumeProfileStrategy
def make_candle(close: float, volume: float = 1.0) -> Candle:
return Candle(
- symbol="BTC/USDT",
+ symbol="AAPL",
timeframe="1m",
open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
open=Decimal(str(close)),
@@ -125,3 +125,56 @@ def test_volume_profile_reset_clears_state():
# After reset, should not have enough data
result = strategy.on_candle(make_candle(100.0, 10.0))
assert result is None
+
+
+def test_volume_profile_hvn_detection():
+ """Feed clustered volume at specific price levels to produce HVN nodes."""
+ strategy = VolumeProfileStrategy()
+ strategy.configure({"lookback_period": 20, "num_bins": 10, "value_area_pct": 0.7})
+
+ # Create a profile with very high volume at price ~100 and low volume elsewhere
+ # Prices range from 90 to 110, heavy volume concentrated at 100
+ candles_data = []
+ # Low volume at extremes
+ for p in [90, 91, 92, 109, 110]:
+ candles_data.append((p, 1.0))
+ # Very high volume around 100
+ for _ in range(15):
+ candles_data.append((100, 100.0))
+
+ for price, vol in candles_data:
+ strategy.on_candle(make_candle(price, vol))
+
+ # Access the internal method to verify HVN detection
+ result = strategy._compute_value_area()
+ assert result is not None
+ poc, va_low, va_high, hvn_levels, lvn_levels = result
+
+ # The bin containing price ~100 should have very high volume -> HVN
+ assert len(hvn_levels) > 0
+ # At least one HVN should be near 100
+ assert any(abs(h - 100) < 5 for h in hvn_levels)
+
+
+def test_volume_profile_reset_thorough():
+ """Verify all state is cleared on reset."""
+ strategy = VolumeProfileStrategy()
+ strategy.configure({"lookback_period": 10, "num_bins": 5})
+
+ # Build up state
+ for _ in range(10):
+ strategy.on_candle(make_candle(100.0, 10.0))
+ # Set below/above VA flags
+ strategy.on_candle(make_candle(50.0, 1.0)) # below VA
+ strategy.on_candle(make_candle(200.0, 1.0)) # above VA
+
+ strategy.reset()
+
+ # Verify all state cleared
+ assert len(strategy._candles) == 0
+ assert strategy._was_below_va is False
+ assert strategy._was_above_va is False
+
+ # Should not produce signal since no data
+ result = strategy.on_candle(make_candle(100.0, 10.0))
+ assert result is None
diff --git a/services/strategy-engine/tests/test_vwap_strategy.py b/services/strategy-engine/tests/test_vwap_strategy.py
index 5d76b04..2c34b01 100644
--- a/services/strategy-engine/tests/test_vwap_strategy.py
+++ b/services/strategy-engine/tests/test_vwap_strategy.py
@@ -13,15 +13,18 @@ def make_candle(
high: float | None = None,
low: float | None = None,
volume: float = 1.0,
+ open_time: datetime | None = None,
) -> Candle:
if high is None:
high = close
if low is None:
low = close
+ if open_time is None:
+ open_time = datetime(2024, 1, 1, tzinfo=timezone.utc)
return Candle(
- symbol="BTC/USDT",
+ symbol="AAPL",
timeframe="1m",
- open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ open_time=open_time,
open=Decimal(str(close)),
high=Decimal(str(high)),
low=Decimal(str(low)),
@@ -99,3 +102,46 @@ def test_vwap_reset_clears_state():
assert strategy._candle_count == 0
assert strategy._was_below_vwap is False
assert strategy._was_above_vwap is False
+ assert strategy._current_date is None
+ assert len(strategy._tp_values) == 0
+ assert len(strategy._vwap_values) == 0
+
+
+def test_vwap_daily_reset():
+ """Candles from two different dates cause VWAP to reset."""
+ strategy = _configured_strategy()
+
+ day1 = datetime(2024, 1, 1, tzinfo=timezone.utc)
+ day2 = datetime(2024, 1, 2, tzinfo=timezone.utc)
+
+ # Feed 35 candles on day 1 to build VWAP state
+ for i in range(35):
+ strategy.on_candle(make_candle(100.0, high=101.0, low=99.0, open_time=day1))
+
+ # Verify state is built up
+ assert strategy._candle_count == 35
+ assert strategy._cumulative_vol > 0
+ assert strategy._current_date == "2024-01-01"
+
+ # Feed first candle of day 2 — should reset
+ strategy.on_candle(make_candle(100.0, high=101.0, low=99.0, open_time=day2))
+
+ # After reset, candle_count should be 1 (the new candle)
+ assert strategy._candle_count == 1
+ assert strategy._current_date == "2024-01-02"
+
+
+def test_vwap_reset_clears_date():
+ """Verify reset() clears _current_date and deviation band state."""
+ strategy = _configured_strategy()
+
+ for _ in range(35):
+ strategy.on_candle(make_candle(100.0))
+
+ assert strategy._current_date is not None
+
+ strategy.reset()
+
+ assert strategy._current_date is None
+ assert len(strategy._tp_values) == 0
+ assert len(strategy._vwap_values) == 0