From 49e5baaebf2f9ca1ba7b85a80c3451c5789edde4 Mon Sep 17 00:00:00 2001 From: TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> Date: Wed, 1 Apr 2026 16:16:31 +0900 Subject: feat(strategy): add VWAP strategy --- .../strategies/config/vwap_strategy.yaml | 2 + .../strategy-engine/strategies/vwap_strategy.py | 83 +++++++++++++++++ .../strategy-engine/tests/test_vwap_strategy.py | 101 +++++++++++++++++++++ 3 files changed, 186 insertions(+) create mode 100644 services/strategy-engine/strategies/config/vwap_strategy.yaml create mode 100644 services/strategy-engine/strategies/vwap_strategy.py create mode 100644 services/strategy-engine/tests/test_vwap_strategy.py diff --git a/services/strategy-engine/strategies/config/vwap_strategy.yaml b/services/strategy-engine/strategies/config/vwap_strategy.yaml new file mode 100644 index 0000000..e12d5e7 --- /dev/null +++ b/services/strategy-engine/strategies/config/vwap_strategy.yaml @@ -0,0 +1,2 @@ +deviation_threshold: 0.002 +quantity: "0.01" diff --git a/services/strategy-engine/strategies/vwap_strategy.py b/services/strategy-engine/strategies/vwap_strategy.py new file mode 100644 index 0000000..d1b86b5 --- /dev/null +++ b/services/strategy-engine/strategies/vwap_strategy.py @@ -0,0 +1,83 @@ +from decimal import Decimal + +from shared.models import Candle, Signal, OrderSide +from strategies.base import BaseStrategy + + +class VwapStrategy(BaseStrategy): + name: str = "vwap" + + def __init__(self) -> None: + self._deviation_threshold: float = 0.002 + self._quantity: Decimal = Decimal("0.01") + self._cumulative_tp_vol: float = 0.0 + self._cumulative_vol: float = 0.0 + self._candle_count: int = 0 + self._was_below_vwap: bool = False + self._was_above_vwap: bool = False + + @property + def warmup_period(self) -> int: + return 30 + + def configure(self, params: dict) -> None: + self._deviation_threshold = float(params.get("deviation_threshold", 0.002)) + self._quantity = Decimal(str(params.get("quantity", "0.01"))) + + def reset(self) -> None: + self._cumulative_tp_vol = 0.0 + self._cumulative_vol = 0.0 + self._candle_count = 0 + self._was_below_vwap = False + self._was_above_vwap = False + + def on_candle(self, candle: Candle) -> Signal | None: + high = float(candle.high) + low = float(candle.low) + close = float(candle.close) + volume = float(candle.volume) + + typical_price = (high + low + close) / 3.0 + self._cumulative_tp_vol += typical_price * volume + self._cumulative_vol += volume + self._candle_count += 1 + + if self._candle_count < self.warmup_period: + return None + + if self._cumulative_vol == 0.0: + return None + + vwap = self._cumulative_tp_vol / self._cumulative_vol + deviation = (close - vwap) / vwap + + if deviation < -self._deviation_threshold: + self._was_below_vwap = True + if deviation > self._deviation_threshold: + self._was_above_vwap = True + + # Mean reversion from below: was below VWAP, now back near it + if self._was_below_vwap and abs(deviation) <= self._deviation_threshold: + self._was_below_vwap = False + return Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.BUY, + price=candle.close, + quantity=self._quantity, + reason=f"VWAP mean reversion BUY: deviation {deviation:.4f} within threshold {self._deviation_threshold}", + ) + + # Mean reversion from above: was above VWAP, now back near it + if self._was_above_vwap and abs(deviation) <= self._deviation_threshold: + self._was_above_vwap = False + return Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.SELL, + price=candle.close, + quantity=self._quantity, + reason=f"VWAP mean reversion SELL: deviation {deviation:.4f} within threshold {self._deviation_threshold}", + ) + + return None diff --git a/services/strategy-engine/tests/test_vwap_strategy.py b/services/strategy-engine/tests/test_vwap_strategy.py new file mode 100644 index 0000000..37d35bc --- /dev/null +++ b/services/strategy-engine/tests/test_vwap_strategy.py @@ -0,0 +1,101 @@ +"""Tests for the VWAP strategy.""" +from datetime import datetime, timezone +from decimal import Decimal + +import pytest + +from shared.models import Candle, OrderSide +from strategies.vwap_strategy import VwapStrategy + + +def make_candle( + close: float, + high: float | None = None, + low: float | None = None, + volume: float = 1.0, +) -> Candle: + if high is None: + high = close + if low is None: + low = close + return Candle( + symbol="BTC/USDT", + timeframe="1m", + open_time=datetime(2024, 1, 1, tzinfo=timezone.utc), + open=Decimal(str(close)), + high=Decimal(str(high)), + low=Decimal(str(low)), + close=Decimal(str(close)), + volume=Decimal(str(volume)), + ) + + +def _configured_strategy() -> VwapStrategy: + strategy = VwapStrategy() + strategy.configure({"deviation_threshold": 0.01, "quantity": "0.01"}) + return strategy + + +def test_vwap_warmup_period(): + strategy = VwapStrategy() + assert strategy.warmup_period == 30 + + +def test_vwap_no_signal_insufficient_data(): + strategy = _configured_strategy() + # Feed fewer candles than warmup_period + for _ in range(29): + signal = strategy.on_candle(make_candle(100.0)) + assert signal is None + + +def test_vwap_buy_signal_below_vwap_recovery(): + strategy = _configured_strategy() + + # Build VWAP around 100 with 30 candles (satisfy warmup) + for _ in range(30): + strategy.on_candle(make_candle(100.0, high=101.0, low=99.0)) + + # Drop price well below VWAP to trigger _was_below_vwap + for _ in range(3): + strategy.on_candle(make_candle(95.0, high=96.0, low=94.0)) + + # Recover back to VWAP (close ~100, deviation within threshold) + signal = strategy.on_candle(make_candle(100.0, high=101.0, low=99.0)) + assert signal is not None + assert signal.side == OrderSide.BUY + assert "VWAP" in signal.reason + + +def test_vwap_sell_signal_above_vwap_recovery(): + strategy = _configured_strategy() + + # Build VWAP around 100 with 30 candles (satisfy warmup) + for _ in range(30): + strategy.on_candle(make_candle(100.0, high=101.0, low=99.0)) + + # Rise price well above VWAP to trigger _was_above_vwap + for _ in range(3): + strategy.on_candle(make_candle(105.0, high=106.0, low=104.0)) + + # Recover back to VWAP (close ~100, deviation within threshold) + signal = strategy.on_candle(make_candle(100.0, high=101.0, low=99.0)) + assert signal is not None + assert signal.side == OrderSide.SELL + assert "VWAP" in signal.reason + + +def test_vwap_reset_clears_state(): + strategy = _configured_strategy() + + # Build some state + for _ in range(35): + strategy.on_candle(make_candle(100.0)) + + strategy.reset() + + assert strategy._cumulative_tp_vol == 0.0 + assert strategy._cumulative_vol == 0.0 + assert strategy._candle_count == 0 + assert strategy._was_below_vwap is False + assert strategy._was_above_vwap is False -- cgit v1.2.3