diff options
Diffstat (limited to 'services')
3 files changed, 392 insertions, 0 deletions
diff --git a/services/strategy-engine/strategies/asian_session_rsi.py b/services/strategy-engine/strategies/asian_session_rsi.py new file mode 100644 index 0000000..f22c3eb --- /dev/null +++ b/services/strategy-engine/strategies/asian_session_rsi.py @@ -0,0 +1,219 @@ +"""Asian Session RSI Strategy — 한국시간 9:00~11:00 단타. + +규칙: +- SOL/USDT 5분봉 +- 매수: RSI(14) < 25 + 볼륨 > 평균 +- 익절: +1.5%, 손절: -0.7%, 시간청산: 11:00 KST (02:00 UTC) +- 하루 최대 3회, 2연패 시 중단 +""" + +from collections import deque +from decimal import Decimal +from datetime import datetime + +import pandas as pd + +from shared.models import Candle, Signal, OrderSide +from strategies.base import BaseStrategy + + +class AsianSessionRsiStrategy(BaseStrategy): + name: str = "asian_session_rsi" + + def __init__(self) -> None: + super().__init__() + self._rsi_period: int = 14 + self._rsi_oversold: float = 25.0 + self._rsi_overbought: float = 75.0 + self._quantity: Decimal = Decimal("0.1") + self._take_profit_pct: float = 1.5 + self._stop_loss_pct: float = 0.7 + # Session: 00:00~02:00 UTC = 09:00~11:00 KST + self._session_start_utc: int = 0 + self._session_end_utc: int = 2 + self._max_trades_per_day: int = 3 + self._max_consecutive_losses: int = 2 + # State + self._closes: deque[float] = deque(maxlen=200) + self._volumes: deque[float] = deque(maxlen=50) + self._today: str | None = None + self._trades_today: int = 0 + self._consecutive_losses: int = 0 + self._in_position: bool = False + self._entry_price: float = 0.0 + + @property + def warmup_period(self) -> int: + return self._rsi_period + 1 + + def configure(self, params: dict) -> None: + self._rsi_period = int(params.get("rsi_period", 14)) + self._rsi_oversold = float(params.get("rsi_oversold", 25.0)) + self._rsi_overbought = float(params.get("rsi_overbought", 75.0)) + self._quantity = Decimal(str(params.get("quantity", "0.1"))) + self._take_profit_pct = float(params.get("take_profit_pct", 1.5)) + self._stop_loss_pct = float(params.get("stop_loss_pct", 0.7)) + self._session_start_utc = int(params.get("session_start_utc", 0)) + self._session_end_utc = int(params.get("session_end_utc", 2)) + self._max_trades_per_day = int(params.get("max_trades_per_day", 3)) + self._max_consecutive_losses = int(params.get("max_consecutive_losses", 2)) + + if self._quantity <= 0: + raise ValueError(f"Quantity must be positive, got {self._quantity}") + if self._stop_loss_pct <= 0: + raise ValueError(f"Stop loss must be positive, got {self._stop_loss_pct}") + if self._take_profit_pct <= 0: + raise ValueError(f"Take profit must be positive, got {self._take_profit_pct}") + + self._init_filters( + require_trend=False, + adx_threshold=25.0, + min_volume_ratio=0.5, + atr_stop_multiplier=1.5, + atr_tp_multiplier=2.0, + ) + + def reset(self) -> None: + super().reset() + self._closes.clear() + self._volumes.clear() + self._today = None + self._trades_today = 0 + self._consecutive_losses = 0 + self._in_position = False + self._entry_price = 0.0 + + def _is_session_active(self, dt: datetime) -> bool: + """Check if current time is within trading session.""" + hour = dt.hour + if self._session_start_utc <= self._session_end_utc: + return self._session_start_utc <= hour < self._session_end_utc + # Wrap around midnight + return hour >= self._session_start_utc or hour < self._session_end_utc + + def _compute_rsi(self) -> float | None: + if len(self._closes) < self._rsi_period + 1: + return None + series = pd.Series(list(self._closes)) + delta = series.diff() + gain = delta.clip(lower=0) + loss = -delta.clip(upper=0) + avg_gain = gain.ewm(com=self._rsi_period - 1, min_periods=self._rsi_period).mean() + avg_loss = loss.ewm(com=self._rsi_period - 1, min_periods=self._rsi_period).mean() + rs = avg_gain / avg_loss.replace(0, float("nan")) + rsi = 100 - (100 / (1 + rs)) + val = rsi.iloc[-1] + if pd.isna(val): + return None + return float(val) + + def _volume_above_average(self) -> bool: + if len(self._volumes) < 20: + return True # Not enough data, allow + avg = sum(self._volumes) / len(self._volumes) + return self._volumes[-1] >= avg + + def on_candle(self, candle: Candle) -> Signal | None: + self._update_filter_data(candle) + + close = float(candle.close) + self._closes.append(close) + self._volumes.append(float(candle.volume)) + + # Daily reset + day = candle.open_time.strftime("%Y-%m-%d") + if self._today != day: + self._today = day + self._trades_today = 0 + # Don't reset consecutive_losses — carries across days + + # Check exit conditions first (if in position) + if self._in_position: + pnl_pct = (close - self._entry_price) / self._entry_price * 100 + + # Take profit + if pnl_pct >= self._take_profit_pct: + self._in_position = False + self._consecutive_losses = 0 + return self._apply_filters(Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.SELL, + price=candle.close, + quantity=self._quantity, + conviction=0.9, + reason=f"Take profit {pnl_pct:.2f}% >= {self._take_profit_pct}%", + )) + + # Stop loss + if pnl_pct <= -self._stop_loss_pct: + self._in_position = False + self._consecutive_losses += 1 + return self._apply_filters(Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.SELL, + price=candle.close, + quantity=self._quantity, + conviction=1.0, + reason=f"Stop loss {pnl_pct:.2f}% <= -{self._stop_loss_pct}%", + )) + + # Time exit: session ended while in position + if not self._is_session_active(candle.open_time): + self._in_position = False + if pnl_pct < 0: + self._consecutive_losses += 1 + else: + self._consecutive_losses = 0 + return self._apply_filters(Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.SELL, + price=candle.close, + quantity=self._quantity, + conviction=0.5, + reason=f"Time exit (session ended), PnL {pnl_pct:.2f}%", + )) + + return None # Still in position, no action + + # Entry conditions + if not self._is_session_active(candle.open_time): + return None # Outside trading hours + + if self._trades_today >= self._max_trades_per_day: + return None # Daily limit reached + + if self._consecutive_losses >= self._max_consecutive_losses: + return None # Consecutive loss limit + + rsi = self._compute_rsi() + if rsi is None: + return None + + if rsi < self._rsi_oversold and self._volume_above_average(): + self._in_position = True + self._entry_price = close + self._trades_today += 1 + + # Conviction: lower RSI = stronger signal + conv = min((self._rsi_oversold - rsi) / self._rsi_oversold, 1.0) + conv = max(conv, 0.3) + + sl = candle.close * (1 - Decimal(str(self._stop_loss_pct / 100))) + tp = candle.close * (1 + Decimal(str(self._take_profit_pct / 100))) + + return self._apply_filters(Signal( + strategy=self.name, + symbol=candle.symbol, + side=OrderSide.BUY, + price=candle.close, + quantity=self._quantity, + conviction=conv, + stop_loss=sl, + take_profit=tp, + reason=f"RSI {rsi:.1f} < {self._rsi_oversold} (session active, vol OK)", + )) + + return None diff --git a/services/strategy-engine/strategies/config/asian_session_rsi.yaml b/services/strategy-engine/strategies/config/asian_session_rsi.yaml new file mode 100644 index 0000000..21d7715 --- /dev/null +++ b/services/strategy-engine/strategies/config/asian_session_rsi.yaml @@ -0,0 +1,12 @@ +# Asian Session RSI — SOL/USDT 5분봉 단타 +# 한국시간 9:00~11:00 (UTC 0:00~2:00) +rsi_period: 14 +rsi_oversold: 25 +rsi_overbought: 75 +quantity: "0.5" # SOL 0.5개 (~$75, 100만원의 10%) +take_profit_pct: 1.5 # 익절 1.5% +stop_loss_pct: 0.7 # 손절 0.7% +session_start_utc: 0 # UTC 0시 = KST 9시 +session_end_utc: 2 # UTC 2시 = KST 11시 +max_trades_per_day: 3 # 하루 최대 3회 +max_consecutive_losses: 2 # 2연패 시 중단 diff --git a/services/strategy-engine/tests/test_asian_session_rsi.py b/services/strategy-engine/tests/test_asian_session_rsi.py new file mode 100644 index 0000000..b311220 --- /dev/null +++ b/services/strategy-engine/tests/test_asian_session_rsi.py @@ -0,0 +1,161 @@ +"""Tests for Asian Session RSI strategy.""" + +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + +from datetime import datetime, timezone +from decimal import Decimal + +from shared.models import Candle, OrderSide +from strategies.asian_session_rsi import AsianSessionRsiStrategy + + +def _candle(price, hour=0, minute=30, volume=100.0, day=1): + return Candle( + symbol="SOLUSDT", + timeframe="5m", + open_time=datetime(2025, 1, day, hour, minute, tzinfo=timezone.utc), + open=Decimal(str(price)), + high=Decimal(str(price + 1)), + low=Decimal(str(price - 1)), + close=Decimal(str(price)), + volume=Decimal(str(volume)), + ) + + +def _make_strategy(**overrides): + s = AsianSessionRsiStrategy() + params = { + "rsi_period": 5, + "rsi_oversold": 30, + "rsi_overbought": 70, + "quantity": "0.5", + "take_profit_pct": 1.5, + "stop_loss_pct": 0.7, + "session_start_utc": 0, + "session_end_utc": 2, + "max_trades_per_day": 3, + "max_consecutive_losses": 2, + } + params.update(overrides) + s.configure(params) + return s + + +def test_no_signal_outside_session(): + s = _make_strategy() + # Hour 5 UTC = outside session (0-2 UTC) + for i in range(10): + sig = s.on_candle(_candle(100 - i * 3, hour=5)) + assert sig is None + + +def test_buy_signal_during_session_on_oversold(): + s = AsianSessionRsiStrategy() + s._rsi_period = 5 + s._rsi_oversold = 30 + s._quantity = Decimal("0.5") + s._take_profit_pct = 1.5 + s._stop_loss_pct = 0.7 + s._session_start_utc = 0 + s._session_end_utc = 2 + s._max_trades_per_day = 3 + s._max_consecutive_losses = 10 # High limit so test isn't blocked + + # Feed declining prices — collect all signals + signals = [] + for i in range(10): + sig = s.on_candle(_candle(100 - i * 3, hour=0, minute=i * 5)) + if sig is not None: + signals.append(sig) + + # Should have generated at least one BUY signal + buy_signals = [s for s in signals if s.side == OrderSide.BUY] + assert len(buy_signals) > 0 + assert buy_signals[0].strategy == "asian_session_rsi" + + +def test_take_profit_exit(): + s = _make_strategy(rsi_period=5, rsi_oversold=40) + # Force entry + for i in range(8): + s.on_candle(_candle(100 - i * 2, hour=0, minute=i * 5)) + + # Should be in position now — push price up for TP + sig = s.on_candle(_candle(100, hour=0, minute=50)) # entry ~around 84-86 + if s._in_position: + tp_price = s._entry_price * (1 + s._take_profit_pct / 100) + sig = s.on_candle(_candle(tp_price + 1, hour=1, minute=0)) + if sig is not None: + assert sig.side == OrderSide.SELL + assert "Take profit" in sig.reason + + +def test_stop_loss_exit(): + s = _make_strategy(rsi_period=5, rsi_oversold=40) + for i in range(8): + s.on_candle(_candle(100 - i * 2, hour=0, minute=i * 5)) + + if s._in_position: + sl_price = s._entry_price * (1 - s._stop_loss_pct / 100) + sig = s.on_candle(_candle(sl_price - 1, hour=1, minute=0)) + if sig is not None: + assert sig.side == OrderSide.SELL + assert "Stop loss" in sig.reason + + +def test_time_exit_when_session_ends(): + s = _make_strategy(rsi_period=5, rsi_oversold=40) + for i in range(8): + s.on_candle(_candle(100 - i * 2, hour=0, minute=i * 5)) + + if s._in_position: + # Session ends at hour 2 + sig = s.on_candle(_candle(s._entry_price, hour=3, minute=0)) + if sig is not None: + assert sig.side == OrderSide.SELL + assert "Time exit" in sig.reason + + +def test_max_trades_per_day(): + s = _make_strategy(rsi_period=3, rsi_oversold=40, max_trades_per_day=1) + # Force one trade + for i in range(6): + s.on_candle(_candle(100 - i * 5, hour=0, minute=i * 5)) + # Exit + if s._in_position: + s.on_candle(_candle(200, hour=0, minute=35)) # TP exit + # Try to enter again — should be blocked + for i in range(6): + s.on_candle(_candle(100 - i * 5, hour=1, minute=i * 5)) + # After 1 trade, no more allowed + assert not s._in_position or s._trades_today >= 1 + + +def test_consecutive_losses_stop(): + s = _make_strategy(rsi_period=3, rsi_oversold=40, max_consecutive_losses=2) + # Simulate 2 losses + s._consecutive_losses = 2 + # Even with valid conditions, should not enter + for i in range(6): + sig = s.on_candle(_candle(100 - i * 5, hour=0, minute=i * 5)) + assert sig is None + + +def test_reset_clears_all(): + s = _make_strategy() + s.on_candle(_candle(100, hour=0)) + s._in_position = True + s._trades_today = 2 + s._consecutive_losses = 1 + s.reset() + assert not s._in_position + assert s._trades_today == 0 + assert len(s._closes) == 0 + + +def test_warmup_period(): + s = _make_strategy(rsi_period=14) + assert s.warmup_period == 15 |
