summaryrefslogtreecommitdiff
path: root/tests/integration/test_strategy_signal_flow.py
blob: 448329f8a5b1b140c840420759441edaabac1a0f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
"""Integration test: candle -> strategy engine -> signal."""

import sys
from pathlib import Path

sys.path.insert(
    0, str(Path(__file__).resolve().parents[2] / "services" / "strategy-engine" / "src")
)
sys.path.insert(0, str(Path(__file__).resolve().parents[2] / "services" / "strategy-engine"))

import pytest
from decimal import Decimal
from datetime import datetime, timezone
from unittest.mock import AsyncMock

from shared.models import Candle
from shared.events import CandleEvent
from strategy_engine.engine import StrategyEngine


@pytest.fixture
def candles():
    """Generate a series of declining candles that should trigger RSI oversold."""
    base = []
    for i in range(20):
        price = Decimal(str(100 - i * 2))  # 100, 98, 96...
        base.append(
            Candle(
                symbol="BTCUSDT",
                timeframe="1m",
                open_time=datetime(2025, 1, 1, i, 0, tzinfo=timezone.utc),
                open=price,
                high=price + 1,
                low=price - 1,
                close=price,
                volume=Decimal("10"),
            )
        )
    return base


@pytest.mark.asyncio
async def test_strategy_engine_produces_signals_from_candles(candles):
    """Feed candles into strategy engine and verify signals are published."""
    from strategies.rsi_strategy import RsiStrategy

    broker = AsyncMock()
    # Mock broker.read to return candle events one at a time, then empty
    events = [CandleEvent(data=c).to_dict() for c in candles]
    broker.read = AsyncMock(side_effect=[events, []])

    strategy = RsiStrategy()
    strategy.configure({"period": 14, "oversold": 30, "overbought": 70, "quantity": "0.01"})

    engine = StrategyEngine(broker=broker, strategies=[strategy])

    await engine.process_once("candles.BTCUSDT", "$")

    # With 20 declining candles (100->62), RSI should be very low
    # Check if broker.publish was called with a signal
    if broker.publish.called:
        call_args = broker.publish.call_args_list
        for call in call_args:
            assert call[0][0] == "signals"  # published to signals stream