"""Integration test: candle -> strategy engine -> signal.""" import sys from pathlib import Path sys.path.insert( 0, str(Path(__file__).resolve().parents[2] / "services" / "strategy-engine" / "src") ) sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "services" / "strategy-engine")) import pytest from decimal import Decimal from datetime import datetime, timezone from unittest.mock import AsyncMock from shared.models import Candle from shared.events import CandleEvent from strategy_engine.engine import StrategyEngine @pytest.fixture def candles(): """Generate a series of declining candles that should trigger RSI oversold.""" base = [] for i in range(20): price = Decimal(str(100 - i * 2)) # 100, 98, 96... base.append( Candle( symbol="BTCUSDT", timeframe="1m", open_time=datetime(2025, 1, 1, i, 0, tzinfo=timezone.utc), open=price, high=price + 1, low=price - 1, close=price, volume=Decimal("10"), ) ) return base @pytest.mark.asyncio async def test_strategy_engine_produces_signals_from_candles(candles): """Feed candles into strategy engine and verify signals are published.""" from strategies.rsi_strategy import RsiStrategy broker = AsyncMock() # Mock broker.read to return candle events one at a time, then empty events = [CandleEvent(data=c).to_dict() for c in candles] broker.read = AsyncMock(side_effect=[events, []]) strategy = RsiStrategy() strategy.configure({"period": 14, "oversold": 30, "overbought": 70, "quantity": "0.01"}) engine = StrategyEngine(broker=broker, strategies=[strategy]) await engine.process_once("candles.BTCUSDT", "$") # With 20 declining candles (100->62), RSI should be very low # Check if broker.publish was called with a signal if broker.publish.called: call_args = broker.publish.call_args_list for call in call_args: assert call[0][0] == "signals" # published to signals stream