summaryrefslogtreecommitdiff
path: root/services/strategy-engine
diff options
context:
space:
mode:
authorTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 16:16:31 +0900
committerTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 16:16:31 +0900
commit49e5baaebf2f9ca1ba7b85a80c3451c5789edde4 (patch)
treea756ccbf4166df6ee6b3cabd9b17e6c485dc5fe2 /services/strategy-engine
parent2faec2f1f1631bd286a5c55051e582f8abe2898c (diff)
feat(strategy): add VWAP strategy
Diffstat (limited to 'services/strategy-engine')
-rw-r--r--services/strategy-engine/strategies/config/vwap_strategy.yaml2
-rw-r--r--services/strategy-engine/strategies/vwap_strategy.py83
-rw-r--r--services/strategy-engine/tests/test_vwap_strategy.py101
3 files changed, 186 insertions, 0 deletions
diff --git a/services/strategy-engine/strategies/config/vwap_strategy.yaml b/services/strategy-engine/strategies/config/vwap_strategy.yaml
new file mode 100644
index 0000000..e12d5e7
--- /dev/null
+++ b/services/strategy-engine/strategies/config/vwap_strategy.yaml
@@ -0,0 +1,2 @@
+deviation_threshold: 0.002
+quantity: "0.01"
diff --git a/services/strategy-engine/strategies/vwap_strategy.py b/services/strategy-engine/strategies/vwap_strategy.py
new file mode 100644
index 0000000..d1b86b5
--- /dev/null
+++ b/services/strategy-engine/strategies/vwap_strategy.py
@@ -0,0 +1,83 @@
+from decimal import Decimal
+
+from shared.models import Candle, Signal, OrderSide
+from strategies.base import BaseStrategy
+
+
+class VwapStrategy(BaseStrategy):
+ name: str = "vwap"
+
+ def __init__(self) -> None:
+ self._deviation_threshold: float = 0.002
+ self._quantity: Decimal = Decimal("0.01")
+ self._cumulative_tp_vol: float = 0.0
+ self._cumulative_vol: float = 0.0
+ self._candle_count: int = 0
+ self._was_below_vwap: bool = False
+ self._was_above_vwap: bool = False
+
+ @property
+ def warmup_period(self) -> int:
+ return 30
+
+ def configure(self, params: dict) -> None:
+ self._deviation_threshold = float(params.get("deviation_threshold", 0.002))
+ self._quantity = Decimal(str(params.get("quantity", "0.01")))
+
+ def reset(self) -> None:
+ self._cumulative_tp_vol = 0.0
+ self._cumulative_vol = 0.0
+ self._candle_count = 0
+ self._was_below_vwap = False
+ self._was_above_vwap = False
+
+ def on_candle(self, candle: Candle) -> Signal | None:
+ high = float(candle.high)
+ low = float(candle.low)
+ close = float(candle.close)
+ volume = float(candle.volume)
+
+ typical_price = (high + low + close) / 3.0
+ self._cumulative_tp_vol += typical_price * volume
+ self._cumulative_vol += volume
+ self._candle_count += 1
+
+ if self._candle_count < self.warmup_period:
+ return None
+
+ if self._cumulative_vol == 0.0:
+ return None
+
+ vwap = self._cumulative_tp_vol / self._cumulative_vol
+ deviation = (close - vwap) / vwap
+
+ if deviation < -self._deviation_threshold:
+ self._was_below_vwap = True
+ if deviation > self._deviation_threshold:
+ self._was_above_vwap = True
+
+ # Mean reversion from below: was below VWAP, now back near it
+ if self._was_below_vwap and abs(deviation) <= self._deviation_threshold:
+ self._was_below_vwap = False
+ return Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.BUY,
+ price=candle.close,
+ quantity=self._quantity,
+ reason=f"VWAP mean reversion BUY: deviation {deviation:.4f} within threshold {self._deviation_threshold}",
+ )
+
+ # Mean reversion from above: was above VWAP, now back near it
+ if self._was_above_vwap and abs(deviation) <= self._deviation_threshold:
+ self._was_above_vwap = False
+ return Signal(
+ strategy=self.name,
+ symbol=candle.symbol,
+ side=OrderSide.SELL,
+ price=candle.close,
+ quantity=self._quantity,
+ reason=f"VWAP mean reversion SELL: deviation {deviation:.4f} within threshold {self._deviation_threshold}",
+ )
+
+ return None
diff --git a/services/strategy-engine/tests/test_vwap_strategy.py b/services/strategy-engine/tests/test_vwap_strategy.py
new file mode 100644
index 0000000..37d35bc
--- /dev/null
+++ b/services/strategy-engine/tests/test_vwap_strategy.py
@@ -0,0 +1,101 @@
+"""Tests for the VWAP strategy."""
+from datetime import datetime, timezone
+from decimal import Decimal
+
+import pytest
+
+from shared.models import Candle, OrderSide
+from strategies.vwap_strategy import VwapStrategy
+
+
+def make_candle(
+ close: float,
+ high: float | None = None,
+ low: float | None = None,
+ volume: float = 1.0,
+) -> Candle:
+ if high is None:
+ high = close
+ if low is None:
+ low = close
+ return Candle(
+ symbol="BTC/USDT",
+ timeframe="1m",
+ open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
+ open=Decimal(str(close)),
+ high=Decimal(str(high)),
+ low=Decimal(str(low)),
+ close=Decimal(str(close)),
+ volume=Decimal(str(volume)),
+ )
+
+
+def _configured_strategy() -> VwapStrategy:
+ strategy = VwapStrategy()
+ strategy.configure({"deviation_threshold": 0.01, "quantity": "0.01"})
+ return strategy
+
+
+def test_vwap_warmup_period():
+ strategy = VwapStrategy()
+ assert strategy.warmup_period == 30
+
+
+def test_vwap_no_signal_insufficient_data():
+ strategy = _configured_strategy()
+ # Feed fewer candles than warmup_period
+ for _ in range(29):
+ signal = strategy.on_candle(make_candle(100.0))
+ assert signal is None
+
+
+def test_vwap_buy_signal_below_vwap_recovery():
+ strategy = _configured_strategy()
+
+ # Build VWAP around 100 with 30 candles (satisfy warmup)
+ for _ in range(30):
+ strategy.on_candle(make_candle(100.0, high=101.0, low=99.0))
+
+ # Drop price well below VWAP to trigger _was_below_vwap
+ for _ in range(3):
+ strategy.on_candle(make_candle(95.0, high=96.0, low=94.0))
+
+ # Recover back to VWAP (close ~100, deviation within threshold)
+ signal = strategy.on_candle(make_candle(100.0, high=101.0, low=99.0))
+ assert signal is not None
+ assert signal.side == OrderSide.BUY
+ assert "VWAP" in signal.reason
+
+
+def test_vwap_sell_signal_above_vwap_recovery():
+ strategy = _configured_strategy()
+
+ # Build VWAP around 100 with 30 candles (satisfy warmup)
+ for _ in range(30):
+ strategy.on_candle(make_candle(100.0, high=101.0, low=99.0))
+
+ # Rise price well above VWAP to trigger _was_above_vwap
+ for _ in range(3):
+ strategy.on_candle(make_candle(105.0, high=106.0, low=104.0))
+
+ # Recover back to VWAP (close ~100, deviation within threshold)
+ signal = strategy.on_candle(make_candle(100.0, high=101.0, low=99.0))
+ assert signal is not None
+ assert signal.side == OrderSide.SELL
+ assert "VWAP" in signal.reason
+
+
+def test_vwap_reset_clears_state():
+ strategy = _configured_strategy()
+
+ # Build some state
+ for _ in range(35):
+ strategy.on_candle(make_candle(100.0))
+
+ strategy.reset()
+
+ assert strategy._cumulative_tp_vol == 0.0
+ assert strategy._cumulative_vol == 0.0
+ assert strategy._candle_count == 0
+ assert strategy._was_below_vwap is False
+ assert strategy._was_above_vwap is False