summaryrefslogtreecommitdiff
path: root/shared/tests/test_db.py
blob: c31e487f158fa9ac3bac326530ca89e1b9f2c049 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
"""Tests for the database layer."""
import pytest
from decimal import Decimal
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch, call


def make_candle():
    from shared.models import Candle
    return Candle(
        symbol="BTCUSDT",
        timeframe="1m",
        open_time=datetime(2024, 1, 1, tzinfo=timezone.utc),
        open=Decimal("50000"),
        high=Decimal("51000"),
        low=Decimal("49500"),
        close=Decimal("50500"),
        volume=Decimal("100"),
    )


@pytest.mark.asyncio
async def test_db_init_sql_creates_tables():
    """Verify that init_tables SQL references all required table names."""
    with patch("asyncpg.create_pool", new_callable=AsyncMock) as mock_pool:
        mock_conn = AsyncMock()
        mock_pool.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
        mock_pool.return_value.__aexit__ = AsyncMock(return_value=False)

        # Capture the SQL that gets executed
        executed_sqls = []

        async def capture_execute(sql, *args, **kwargs):
            executed_sqls.append(sql)

        mock_conn.execute = capture_execute

        from shared.db import Database
        db = Database("postgresql://trading:trading@localhost:5432/trading")
        db._pool = mock_pool.return_value
        await db.init_tables()

        combined_sql = " ".join(executed_sqls)
        for table in ["candles", "signals", "orders", "trades", "positions", "portfolio_snapshots"]:
            assert table in combined_sql, f"Table '{table}' not found in SQL"


@pytest.mark.asyncio
async def test_db_insert_candle():
    """Verify that insert_candle executes INSERT INTO candles."""
    with patch("asyncpg.create_pool", new_callable=AsyncMock) as mock_pool:
        mock_conn = AsyncMock()
        mock_pool.return_value.__aenter__ = AsyncMock(return_value=mock_conn)
        mock_pool.return_value.__aexit__ = AsyncMock(return_value=False)

        executed = []

        async def capture_execute(sql, *args, **kwargs):
            executed.append((sql, args))

        mock_conn.execute = capture_execute

        from shared.db import Database
        db = Database("postgresql://trading:trading@localhost:5432/trading")
        db._pool = mock_pool.return_value
        candle = make_candle()
        await db.insert_candle(candle)

        assert any("INSERT INTO candles" in sql for sql, _ in executed), \
            "Expected INSERT INTO candles"