summaryrefslogtreecommitdiff
path: root/services/order-executor/src/order_executor/main.py
blob: 930517eec2b4989d668f8781dad87bda2bc81dea (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
"""Order Executor Service entry point."""

import asyncio
from decimal import Decimal

from shared.broker import RedisBroker
from shared.db import Database
from shared.events import Event, EventType
from shared.healthcheck import HealthCheckServer
from shared.exchange import create_exchange
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier

from order_executor.config import ExecutorConfig
from order_executor.executor import OrderExecutor
from order_executor.risk_manager import RiskManager

# Health check port: base (HEALTH_PORT, default 8080) + offset
# data-collector: +0 (8080), strategy-engine: +1 (8081)
# order-executor: +2 (8082), portfolio-manager: +3 (8083)
HEALTH_PORT_OFFSET = 2


async def run() -> None:
    config = ExecutorConfig()
    log = setup_logging("order-executor", config.log_level, config.log_format)
    metrics = ServiceMetrics("order_executor")
    notifier = TelegramNotifier(
        bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id
    )

    db = Database(config.database_url)
    await db.connect()
    await db.init_tables()

    broker = RedisBroker(config.redis_url)

    exchange = create_exchange(
        exchange_id=config.exchange_id,
        api_key=config.binance_api_key,
        api_secret=config.binance_api_secret,
        sandbox=config.exchange_sandbox,
    )

    risk_manager = RiskManager(
        max_position_size=Decimal(str(config.risk_max_position_size)),
        stop_loss_pct=Decimal(str(config.risk_stop_loss_pct)),
        daily_loss_limit_pct=Decimal(str(config.risk_daily_loss_limit_pct)),
        trailing_stop_pct=Decimal(str(config.risk_trailing_stop_pct)),
        max_open_positions=config.risk_max_open_positions,
        volatility_lookback=config.risk_volatility_lookback,
        volatility_scale=config.risk_volatility_scale,
    )

    executor = OrderExecutor(
        exchange=exchange,
        risk_manager=risk_manager,
        broker=broker,
        db=db,
        notifier=notifier,
        dry_run=config.dry_run,
    )

    GROUP = "order-executor"
    CONSUMER = "executor-1"
    stream = "signals"

    health = HealthCheckServer(
        "order-executor",
        port=config.health_port + HEALTH_PORT_OFFSET,
        auth_token=config.metrics_auth_token,
    )
    health.register_check("redis", broker.ping)
    await health.start()
    metrics.service_up.labels(service="order-executor").set(1)

    log.info("service_started", stream=stream, dry_run=config.dry_run)

    await broker.ensure_group(stream, GROUP)

    # Process pending messages first (from previous crash)
    pending = await broker.read_pending(stream, GROUP, CONSUMER)
    for msg_id, msg in pending:
        try:
            event = Event.from_dict(msg)
            if event.type == EventType.SIGNAL:
                signal = event.data
                log.info(
                    "processing_pending_signal", signal_id=str(signal.id), symbol=signal.symbol
                )
                await executor.execute(signal)
                metrics.events_processed.labels(
                    service="order-executor", event_type="signal"
                ).inc()
            await broker.ack(stream, GROUP, msg_id)
        except Exception as exc:
            log.error("pending_process_failed", error=str(exc), msg_id=msg_id)
            metrics.errors_total.labels(
                service="order-executor", error_type="processing"
            ).inc()

    try:
        while True:
            messages = await broker.read_group(
                stream, GROUP, CONSUMER, count=10, block=5000
            )
            for msg_id, msg in messages:
                try:
                    event = Event.from_dict(msg)
                    if event.type == EventType.SIGNAL:
                        signal = event.data
                        log.info(
                            "processing_signal", signal_id=str(signal.id), symbol=signal.symbol
                        )
                        await executor.execute(signal)
                        metrics.events_processed.labels(
                            service="order-executor", event_type="signal"
                        ).inc()
                    await broker.ack(stream, GROUP, msg_id)
                except Exception as exc:
                    log.error("message_processing_failed", error=str(exc), msg_id=msg_id)
                    metrics.errors_total.labels(
                        service="order-executor", error_type="processing"
                    ).inc()
    except Exception as exc:
        log.error("fatal_error", error=str(exc))
        await notifier.send_error(str(exc), "order-executor")
        raise
    finally:
        metrics.service_up.labels(service="order-executor").set(0)
        await notifier.close()
        await broker.close()
        await db.close()
        await exchange.close()


def main() -> None:
    asyncio.run(run())


if __name__ == "__main__":
    main()