summaryrefslogtreecommitdiff
path: root/services/strategy-engine
diff options
context:
space:
mode:
Diffstat (limited to 'services/strategy-engine')
-rw-r--r--services/strategy-engine/src/strategy_engine/main.py36
-rw-r--r--services/strategy-engine/strategies/asian_session_rsi.py21
-rw-r--r--services/strategy-engine/strategies/config/asian_session_rsi.yaml2
-rw-r--r--services/strategy-engine/tests/test_asian_session_rsi.py29
-rw-r--r--services/strategy-engine/tests/test_sentiment_wiring.py31
5 files changed, 117 insertions, 2 deletions
diff --git a/services/strategy-engine/src/strategy_engine/main.py b/services/strategy-engine/src/strategy_engine/main.py
index 4549f70..ac28d6b 100644
--- a/services/strategy-engine/src/strategy_engine/main.py
+++ b/services/strategy-engine/src/strategy_engine/main.py
@@ -8,6 +8,7 @@ from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
+from shared.sentiment import SentimentProvider, SentimentData
from strategy_engine.config import StrategyConfig
from strategy_engine.engine import StrategyEngine
@@ -21,6 +22,28 @@ STRATEGIES_DIR = Path(__file__).parent.parent.parent.parent / "strategies"
# order-executor: +2 (8082), portfolio-manager: +3 (8083)
HEALTH_PORT_OFFSET = 1
+SENTIMENT_REFRESH_INTERVAL = 300 # 5 minutes
+
+
+async def sentiment_loop(provider: SentimentProvider, strategies: list, log) -> None:
+ """Periodically fetch sentiment and update strategies that support it."""
+ while True:
+ try:
+ sentiment = await provider.get_sentiment("SOL")
+ log.info(
+ "sentiment_updated",
+ fear_greed=sentiment.fear_greed_value,
+ news=sentiment.news_sentiment,
+ netflow=sentiment.exchange_netflow,
+ should_block=sentiment.should_block,
+ )
+ for strategy in strategies:
+ if hasattr(strategy, "update_sentiment"):
+ strategy.update_sentiment(sentiment)
+ except Exception as exc:
+ log.warning("sentiment_fetch_failed", error=str(exc))
+ await asyncio.sleep(SENTIMENT_REFRESH_INTERVAL)
+
async def process_symbol(engine: StrategyEngine, stream: str, log) -> None:
"""Process candles for a single symbol stream."""
@@ -51,6 +74,11 @@ async def run() -> None:
engine = StrategyEngine(broker=broker, strategies=strategies)
+ provider = SentimentProvider(
+ cryptopanic_api_key=config.cryptopanic_api_key,
+ cryptoquant_api_key=config.cryptoquant_api_key,
+ )
+
health = HealthCheckServer(
"strategy-engine",
port=config.health_port + HEALTH_PORT_OFFSET,
@@ -62,12 +90,17 @@ async def run() -> None:
tasks = []
try:
+ # Sentiment updater
+ tasks.append(asyncio.create_task(
+ sentiment_loop(provider, strategies, log)
+ ))
+ # Symbol processors
for symbol in config.symbols:
stream = f"candles.{symbol.replace('/', '_')}"
task = asyncio.create_task(process_symbol(engine, stream, log))
tasks.append(task)
- # Wait for all symbol processors (they run forever until cancelled)
+ # Wait for all tasks (they run forever until cancelled)
await asyncio.gather(*tasks)
except Exception as exc:
log.error("fatal_error", error=str(exc))
@@ -78,6 +111,7 @@ async def run() -> None:
task.cancel()
metrics.service_up.labels(service="strategy-engine").set(0)
await notifier.close()
+ await provider.close()
await broker.close()
diff --git a/services/strategy-engine/strategies/asian_session_rsi.py b/services/strategy-engine/strategies/asian_session_rsi.py
index 741cd63..1874591 100644
--- a/services/strategy-engine/strategies/asian_session_rsi.py
+++ b/services/strategy-engine/strategies/asian_session_rsi.py
@@ -36,6 +36,9 @@ class AsianSessionRsiStrategy(BaseStrategy):
self._max_trades_per_day: int = 3
self._max_consecutive_losses: int = 2
self._use_sentiment: bool = True
+ self._ema_period: int = 20
+ self._require_bullish_candle: bool = True
+ self._prev_candle_bullish: bool = False
# Sentiment (updated externally before each session)
self._sentiment: SentimentData | None = None
# State
@@ -63,6 +66,8 @@ class AsianSessionRsiStrategy(BaseStrategy):
self._max_trades_per_day = int(params.get("max_trades_per_day", 3))
self._max_consecutive_losses = int(params.get("max_consecutive_losses", 2))
self._use_sentiment = bool(params.get("use_sentiment", True))
+ self._ema_period = int(params.get("ema_period", 20))
+ self._require_bullish_candle = bool(params.get("require_bullish_candle", True))
if self._quantity <= 0:
raise ValueError(f"Quantity must be positive, got {self._quantity}")
@@ -89,6 +94,7 @@ class AsianSessionRsiStrategy(BaseStrategy):
self._in_position = False
self._entry_price = 0.0
self._sentiment = None
+ self._prev_candle_bullish = False
def update_sentiment(self, sentiment: SentimentData) -> None:
"""Update sentiment data. Call before each trading session."""
@@ -130,6 +136,14 @@ class AsianSessionRsiStrategy(BaseStrategy):
avg = sum(self._volumes) / len(self._volumes)
return self._volumes[-1] >= avg
+ def _price_above_ema(self) -> bool:
+ """Check if current price is above short-term EMA."""
+ if len(self._closes) < self._ema_period:
+ return True # Not enough data, allow by default
+ series = pd.Series(list(self._closes))
+ ema_val = series.ewm(span=self._ema_period, adjust=False).mean().iloc[-1]
+ return self._closes[-1] >= ema_val
+
def on_candle(self, candle: Candle) -> Signal | None:
self._update_filter_data(candle)
@@ -137,6 +151,9 @@ class AsianSessionRsiStrategy(BaseStrategy):
self._closes.append(close)
self._volumes.append(float(candle.volume))
+ # Track candle direction for bullish confirmation
+ is_bullish = float(candle.close) >= float(candle.open)
+
# Daily reset
day = candle.open_time.strftime("%Y-%m-%d")
if self._today != day:
@@ -218,7 +235,9 @@ class AsianSessionRsiStrategy(BaseStrategy):
if rsi is None:
return None
- if rsi < self._rsi_oversold and self._volume_above_average():
+ if rsi < self._rsi_oversold and self._volume_above_average() and self._price_above_ema():
+ if self._require_bullish_candle and not is_bullish:
+ return None # Wait for bullish candle confirmation
self._in_position = True
self._entry_price = close
self._trades_today += 1
diff --git a/services/strategy-engine/strategies/config/asian_session_rsi.yaml b/services/strategy-engine/strategies/config/asian_session_rsi.yaml
index 21d7715..bc7c5c9 100644
--- a/services/strategy-engine/strategies/config/asian_session_rsi.yaml
+++ b/services/strategy-engine/strategies/config/asian_session_rsi.yaml
@@ -10,3 +10,5 @@ session_start_utc: 0 # UTC 0시 = KST 9시
session_end_utc: 2 # UTC 2시 = KST 11시
max_trades_per_day: 3 # 하루 최대 3회
max_consecutive_losses: 2 # 2연패 시 중단
+ema_period: 20
+require_bullish_candle: true
diff --git a/services/strategy-engine/tests/test_asian_session_rsi.py b/services/strategy-engine/tests/test_asian_session_rsi.py
index b311220..db031f0 100644
--- a/services/strategy-engine/tests/test_asian_session_rsi.py
+++ b/services/strategy-engine/tests/test_asian_session_rsi.py
@@ -159,3 +159,32 @@ def test_reset_clears_all():
def test_warmup_period():
s = _make_strategy(rsi_period=14)
assert s.warmup_period == 15
+
+
+def test_ema_filter_blocks_below_ema():
+ """Entry blocked when price is below EMA."""
+ s = AsianSessionRsiStrategy()
+ s._rsi_period = 5
+ s._rsi_oversold = 40
+ s._quantity = Decimal("0.5")
+ s._take_profit_pct = 1.5
+ s._stop_loss_pct = 0.7
+ s._session_start_utc = 0
+ s._session_end_utc = 2
+ s._max_trades_per_day = 3
+ s._max_consecutive_losses = 10
+ s._ema_period = 5
+ s._require_bullish_candle = False # Test EMA only
+
+ # Feed rising prices to set EMA high, then sharp drop
+ for i in range(10):
+ s.on_candle(_candle(200 + i * 5, hour=0, minute=i * 5))
+ # Now feed low price -- below EMA, RSI should be low
+ signals = []
+ for i in range(5):
+ sig = s.on_candle(_candle(100 - i * 5, hour=0, minute=(15 + i) * 5 % 60))
+ if sig is not None:
+ signals.append(sig)
+ # Should have no BUY signals because price is way below EMA
+ buy_sigs = [s for s in signals if s.side == OrderSide.BUY]
+ assert len(buy_sigs) == 0
diff --git a/services/strategy-engine/tests/test_sentiment_wiring.py b/services/strategy-engine/tests/test_sentiment_wiring.py
new file mode 100644
index 0000000..f1a816f
--- /dev/null
+++ b/services/strategy-engine/tests/test_sentiment_wiring.py
@@ -0,0 +1,31 @@
+"""Test sentiment is wired into strategy engine."""
+import sys
+from pathlib import Path
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
+sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
+
+from shared.sentiment import SentimentData
+from strategies.asian_session_rsi import AsianSessionRsiStrategy
+
+
+def test_strategy_accepts_sentiment():
+ s = AsianSessionRsiStrategy()
+ data = SentimentData(fear_greed_value=20, fear_greed_label="Extreme Fear")
+ s.update_sentiment(data)
+ assert s._sentiment is not None
+ assert s._sentiment.fear_greed_value == 20
+
+
+def test_strategy_blocks_on_extreme_greed():
+ s = AsianSessionRsiStrategy()
+ data = SentimentData(fear_greed_value=85)
+ s.update_sentiment(data)
+ assert not s._check_sentiment()
+
+
+def test_strategy_allows_on_fear():
+ s = AsianSessionRsiStrategy()
+ data = SentimentData(fear_greed_value=20)
+ s.update_sentiment(data)
+ assert s._check_sentiment()