diff options
Diffstat (limited to 'services/strategy-engine')
5 files changed, 117 insertions, 2 deletions
diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py index 4549f70..ac28d6b 100644 --- a/services/strategy-engine/src/strategy_engine/main.py +++ b/services/strategy-engine/src/strategy_engine/main.py @@ -8,6 +8,7 @@ from shared.healthcheck import HealthCheckServer from shared.logging import setup_logging from shared.metrics import ServiceMetrics from shared.notifier import TelegramNotifier +from shared.sentiment import SentimentProvider, SentimentData from strategy_engine.config import StrategyConfig from strategy_engine.engine import StrategyEngine @@ -21,6 +22,28 @@ STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies" # order-executor: +2 (8082), portfolio-manager: +3 (8083) HEALTH_PORT_OFFSET = 1 +SENTIMENT_REFRESH_INTERVAL = 300 # 5 minutes + + +async def sentiment_loop(provider: SentimentProvider, strategies: list, log) -> None: + """Periodically fetch sentiment and update strategies that support it.""" + while True: + try: + sentiment = await provider.get_sentiment("SOL") + log.info( + "sentiment_updated", + fear_greed=sentiment.fear_greed_value, + news=sentiment.news_sentiment, + netflow=sentiment.exchange_netflow, + should_block=sentiment.should_block, + ) + for strategy in strategies: + if hasattr(strategy, "update_sentiment"): + strategy.update_sentiment(sentiment) + except Exception as exc: + log.warning("sentiment_fetch_failed", error=str(exc)) + await asyncio.sleep(SENTIMENT_REFRESH_INTERVAL) + async def process_symbol(engine: StrategyEngine, stream: str, log) -> None: """Process candles for a single symbol stream.""" @@ -51,6 +74,11 @@ async def run() -> None: engine = StrategyEngine(broker=broker, strategies=strategies) + provider = SentimentProvider( + cryptopanic_api_key=config.cryptopanic_api_key, + cryptoquant_api_key=config.cryptoquant_api_key, + ) + health = HealthCheckServer( "strategy-engine", port=config.health_port + HEALTH_PORT_OFFSET, @@ -62,12 +90,17 @@ async def run() -> None: tasks = [] try: + # Sentiment updater + tasks.append(asyncio.create_task( + sentiment_loop(provider, strategies, log) + )) + # Symbol processors for symbol in config.symbols: stream = f"candles.{symbol.replace('/', '_')}" task = asyncio.create_task(process_symbol(engine, stream, log)) tasks.append(task) - # Wait for all symbol processors (they run forever until cancelled) + # Wait for all tasks (they run forever until cancelled) await asyncio.gather(*tasks) except Exception as exc: log.error("fatal_error", error=str(exc)) @@ -78,6 +111,7 @@ async def run() -> None: task.cancel() metrics.service_up.labels(service="strategy-engine").set(0) await notifier.close() + await provider.close() await broker.close() diff --git a/services/strategy-engine/strategies/asian_session_rsi.py b/services/strategy-engine/strategies/asian_session_rsi.py index 741cd63..1874591 100644 --- a/services/strategy-engine/strategies/asian_session_rsi.py +++ b/services/strategy-engine/strategies/asian_session_rsi.py @@ -36,6 +36,9 @@ class AsianSessionRsiStrategy(BaseStrategy): self._max_trades_per_day: int = 3 self._max_consecutive_losses: int = 2 self._use_sentiment: bool = True + self._ema_period: int = 20 + self._require_bullish_candle: bool = True + self._prev_candle_bullish: bool = False # Sentiment (updated externally before each session) self._sentiment: SentimentData | None = None # State @@ -63,6 +66,8 @@ class AsianSessionRsiStrategy(BaseStrategy): self._max_trades_per_day = int(params.get("max_trades_per_day", 3)) self._max_consecutive_losses = int(params.get("max_consecutive_losses", 2)) self._use_sentiment = bool(params.get("use_sentiment", True)) + self._ema_period = int(params.get("ema_period", 20)) + self._require_bullish_candle = bool(params.get("require_bullish_candle", True)) if self._quantity <= 0: raise ValueError(f"Quantity must be positive, got {self._quantity}") @@ -89,6 +94,7 @@ class AsianSessionRsiStrategy(BaseStrategy): self._in_position = False self._entry_price = 0.0 self._sentiment = None + self._prev_candle_bullish = False def update_sentiment(self, sentiment: SentimentData) -> None: """Update sentiment data. Call before each trading session.""" @@ -130,6 +136,14 @@ class AsianSessionRsiStrategy(BaseStrategy): avg = sum(self._volumes) / len(self._volumes) return self._volumes[-1] >= avg + def _price_above_ema(self) -> bool: + """Check if current price is above short-term EMA.""" + if len(self._closes) < self._ema_period: + return True # Not enough data, allow by default + series = pd.Series(list(self._closes)) + ema_val = series.ewm(span=self._ema_period, adjust=False).mean().iloc[-1] + return self._closes[-1] >= ema_val + def on_candle(self, candle: Candle) -> Signal | None: self._update_filter_data(candle) @@ -137,6 +151,9 @@ class AsianSessionRsiStrategy(BaseStrategy): self._closes.append(close) self._volumes.append(float(candle.volume)) + # Track candle direction for bullish confirmation + is_bullish = float(candle.close) >= float(candle.open) + # Daily reset day = candle.open_time.strftime("%Y-%m-%d") if self._today != day: @@ -218,7 +235,9 @@ class AsianSessionRsiStrategy(BaseStrategy): if rsi is None: return None - if rsi < self._rsi_oversold and self._volume_above_average(): + if rsi < self._rsi_oversold and self._volume_above_average() and self._price_above_ema(): + if self._require_bullish_candle and not is_bullish: + return None # Wait for bullish candle confirmation self._in_position = True self._entry_price = close self._trades_today += 1 diff --git a/services/strategy-engine/strategies/config/asian_session_rsi.yaml b/services/strategy-engine/strategies/config/asian_session_rsi.yaml index 21d7715..bc7c5c9 100644 --- a/services/strategy-engine/strategies/config/asian_session_rsi.yaml +++ b/services/strategy-engine/strategies/config/asian_session_rsi.yaml @@ -10,3 +10,5 @@ session_start_utc: 0 # UTC 0시 = KST 9시 session_end_utc: 2 # UTC 2시 = KST 11시 max_trades_per_day: 3 # 하루 최대 3회 max_consecutive_losses: 2 # 2연패 시 중단 +ema_period: 20 +require_bullish_candle: true diff --git a/services/strategy-engine/tests/test_asian_session_rsi.py b/services/strategy-engine/tests/test_asian_session_rsi.py index b311220..db031f0 100644 --- a/services/strategy-engine/tests/test_asian_session_rsi.py +++ b/services/strategy-engine/tests/test_asian_session_rsi.py @@ -159,3 +159,32 @@ def test_reset_clears_all(): def test_warmup_period(): s = _make_strategy(rsi_period=14) assert s.warmup_period == 15 + + +def test_ema_filter_blocks_below_ema(): + """Entry blocked when price is below EMA.""" + s = AsianSessionRsiStrategy() + s._rsi_period = 5 + s._rsi_oversold = 40 + s._quantity = Decimal("0.5") + s._take_profit_pct = 1.5 + s._stop_loss_pct = 0.7 + s._session_start_utc = 0 + s._session_end_utc = 2 + s._max_trades_per_day = 3 + s._max_consecutive_losses = 10 + s._ema_period = 5 + s._require_bullish_candle = False # Test EMA only + + # Feed rising prices to set EMA high, then sharp drop + for i in range(10): + s.on_candle(_candle(200 + i * 5, hour=0, minute=i * 5)) + # Now feed low price -- below EMA, RSI should be low + signals = [] + for i in range(5): + sig = s.on_candle(_candle(100 - i * 5, hour=0, minute=(15 + i) * 5 % 60)) + if sig is not None: + signals.append(sig) + # Should have no BUY signals because price is way below EMA + buy_sigs = [s for s in signals if s.side == OrderSide.BUY] + assert len(buy_sigs) == 0 diff --git a/services/strategy-engine/tests/test_sentiment_wiring.py b/services/strategy-engine/tests/test_sentiment_wiring.py new file mode 100644 index 0000000..f1a816f --- /dev/null +++ b/services/strategy-engine/tests/test_sentiment_wiring.py @@ -0,0 +1,31 @@ +"""Test sentiment is wired into strategy engine.""" +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src")) +sys.path.insert(0, str(Path(__file__).resolve().parents[1])) + +from shared.sentiment import SentimentData +from strategies.asian_session_rsi import AsianSessionRsiStrategy + + +def test_strategy_accepts_sentiment(): + s = AsianSessionRsiStrategy() + data = SentimentData(fear_greed_value=20, fear_greed_label="Extreme Fear") + s.update_sentiment(data) + assert s._sentiment is not None + assert s._sentiment.fear_greed_value == 20 + + +def test_strategy_blocks_on_extreme_greed(): + s = AsianSessionRsiStrategy() + data = SentimentData(fear_greed_value=85) + s.update_sentiment(data) + assert not s._check_sentiment() + + +def test_strategy_allows_on_fear(): + s = AsianSessionRsiStrategy() + data = SentimentData(fear_greed_value=20) + s.update_sentiment(data) + assert s._check_sentiment() |
