1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
"""Integration test: candle -> strategy engine -> signal."""
import sys
from pathlib import Path
sys.path.insert(
0, str(Path(__file__).resolve().parents[2] / "services" / "strategy-engine" / "src")
)
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "services" / "strategy-engine"))
from datetime import UTC, datetime
from decimal import Decimal
from unittest.mock import AsyncMock
import pytest
from strategy_engine.engine import StrategyEngine
from shared.events import CandleEvent
from shared.models import Candle
@pytest.fixture
def candles():
"""Generate a series of declining candles that should trigger RSI oversold."""
base = []
for i in range(20):
price = Decimal(str(100 - i * 2)) # 100, 98, 96...
base.append(
Candle(
symbol="AAPL",
timeframe="1m",
open_time=datetime(2025, 1, 1, i, 0, tzinfo=UTC),
open=price,
high=price + 1,
low=price - 1,
close=price,
volume=Decimal("10"),
)
)
return base
@pytest.mark.asyncio
async def test_strategy_engine_produces_signals_from_candles(candles):
"""Feed candles into strategy engine and verify signals are published."""
from strategies.rsi_strategy import RsiStrategy
broker = AsyncMock()
# Mock broker.read to return candle events one at a time, then empty
events = [CandleEvent(data=c).to_dict() for c in candles]
broker.read = AsyncMock(side_effect=[events, []])
strategy = RsiStrategy()
strategy.configure({"period": 14, "oversold": 30, "overbought": 70, "quantity": "0.01"})
engine = StrategyEngine(broker=broker, strategies=[strategy])
await engine.process_once("candles.AAPL", "$")
# With 20 declining candles (100->62), RSI should be very low
# Check if broker.publish was called with a signal
if broker.publish.called:
call_args = broker.publish.call_args_list
for call in call_args:
assert call[0][0] == "signals" # published to signals stream
|