summaryrefslogtreecommitdiff
path: root/services/strategy-engine/tests/test_engine.py
blob: ac9a5969dbb2baf7651a37771aa9f588ddd7c65b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
"""Tests for the StrategyEngine."""

from datetime import datetime, timezone
from decimal import Decimal
from unittest.mock import AsyncMock, MagicMock

import pytest

from shared.models import Candle, Signal, OrderSide
from shared.events import CandleEvent
from strategy_engine.engine import StrategyEngine


def make_candle_event() -> dict:
    candle = Candle(
        symbol="BTC/USDT",
        timeframe="1m",
        open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
        open=Decimal("50000"),
        high=Decimal("50100"),
        low=Decimal("49900"),
        close=Decimal("50050"),
        volume=Decimal("10.0"),
    )
    return CandleEvent(data=candle).to_dict()


def make_signal() -> Signal:
    return Signal(
        strategy="test",
        symbol="BTC/USDT",
        side=OrderSide.BUY,
        price=Decimal("50050"),
        quantity=Decimal("0.01"),
        reason="test signal",
    )


@pytest.mark.asyncio
async def test_engine_dispatches_candle_to_strategies():
    broker = MagicMock()
    broker.read = AsyncMock(return_value=[make_candle_event()])
    broker.publish = AsyncMock()

    strategy = MagicMock()
    strategy.on_candle = MagicMock(return_value=None)

    engine = StrategyEngine(broker=broker, strategies=[strategy])
    await engine.process_once("candles.BTC_USDT", "0")

    strategy.on_candle.assert_called_once()
    candle_arg = strategy.on_candle.call_args[0][0]
    assert isinstance(candle_arg, Candle)
    assert candle_arg.symbol == "BTC/USDT"


@pytest.mark.asyncio
async def test_engine_publishes_signal_when_strategy_returns_one():
    broker = MagicMock()
    broker.read = AsyncMock(return_value=[make_candle_event()])
    broker.publish = AsyncMock()

    strategy = MagicMock()
    strategy.on_candle = MagicMock(return_value=make_signal())

    engine = StrategyEngine(broker=broker, strategies=[strategy])
    await engine.process_once("candles.BTC_USDT", "0")

    broker.publish.assert_called_once()
    call_args = broker.publish.call_args
    assert call_args[0][0] == "signals"
    published_data = call_args[0][1]
    assert published_data["type"] == "SIGNAL"