summaryrefslogtreecommitdiff
path: root/services/order-executor/src/order_executor/main.py
blob: 63d93bca54b529ec7ccc7ff9b4b89b93b7e0c1dd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""Order Executor Service entry point."""

import asyncio
from decimal import Decimal

from shared.alpaca import AlpacaClient
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import Event, EventType
from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
from shared.shutdown import GracefulShutdown

from order_executor.config import ExecutorConfig
from order_executor.executor import OrderExecutor
from order_executor.risk_manager import RiskManager

# Health check port: base + 2
HEALTH_PORT_OFFSET = 2


async def run() -> None:
    config = ExecutorConfig()
    log = setup_logging("order-executor", config.log_level, config.log_format)
    metrics = ServiceMetrics("order_executor")

    notifier = TelegramNotifier(
        bot_token=config.telegram_bot_token.get_secret_value(),
        chat_id=config.telegram_chat_id,
    )

    db = Database(config.database_url.get_secret_value())
    await db.connect()

    broker = RedisBroker(config.redis_url.get_secret_value())

    alpaca = AlpacaClient(
        api_key=config.alpaca_api_key.get_secret_value(),
        api_secret=config.alpaca_api_secret.get_secret_value(),
        paper=config.alpaca_paper,
    )

    risk_manager = RiskManager(
        max_position_size=Decimal(str(config.risk_max_position_size)),
        stop_loss_pct=Decimal(str(config.risk_stop_loss_pct)),
        daily_loss_limit_pct=Decimal(str(config.risk_daily_loss_limit_pct)),
        trailing_stop_pct=Decimal(str(config.risk_trailing_stop_pct)),
        max_open_positions=config.risk_max_open_positions,
        volatility_lookback=config.risk_volatility_lookback,
        volatility_scale=config.risk_volatility_scale,
        max_portfolio_exposure=config.risk_max_portfolio_exposure,
        max_correlated_exposure=config.risk_max_correlated_exposure,
        correlation_threshold=config.risk_correlation_threshold,
        var_confidence=config.risk_var_confidence,
        var_limit_pct=config.risk_var_limit_pct,
        drawdown_reduction_threshold=config.risk_drawdown_reduction_threshold,
        drawdown_halt_threshold=config.risk_drawdown_halt_threshold,
        max_consecutive_losses=config.risk_max_consecutive_losses,
        loss_pause_minutes=config.risk_loss_pause_minutes,
    )

    executor = OrderExecutor(
        exchange=alpaca,
        risk_manager=risk_manager,
        broker=broker,
        db=db,
        notifier=notifier,
        dry_run=config.dry_run,
    )

    health = HealthCheckServer(
        "order-executor",
        port=config.health_port + HEALTH_PORT_OFFSET,
        auth_token=config.metrics_auth_token,
    )
    await health.start()
    metrics.service_up.labels(service="order-executor").set(1)

    GROUP = "order-executor"
    CONSUMER = "executor-1"
    stream = "signals"

    await broker.ensure_group(stream, GROUP)

    shutdown = GracefulShutdown()
    shutdown.install_handlers()

    log.info("started", stream=stream, dry_run=config.dry_run)

    try:
        # Process pending messages first
        pending = await broker.read_pending(stream, GROUP, CONSUMER)
        for msg_id, msg in pending:
            try:
                event = Event.from_dict(msg)
                if event.type == EventType.SIGNAL:
                    await executor.execute(event.data)
                await broker.ack(stream, GROUP, msg_id)
            except Exception as exc:
                log.error("pending_failed", error=str(exc), msg_id=msg_id)

        while not shutdown.is_shutting_down:
            messages = await broker.read_group(stream, GROUP, CONSUMER, count=10, block=5000)
            for msg_id, msg in messages:
                try:
                    event = Event.from_dict(msg)
                    if event.type == EventType.SIGNAL:
                        signal = event.data
                        log.info("processing_signal", signal_id=signal.id, symbol=signal.symbol)
                        await executor.execute(signal)
                        metrics.events_processed.labels(
                            service="order-executor", event_type="signal"
                        ).inc()
                    await broker.ack(stream, GROUP, msg_id)
                except Exception as exc:
                    log.error("process_failed", error=str(exc))
                    metrics.errors_total.labels(
                        service="order-executor", error_type="processing"
                    ).inc()
    finally:
        metrics.service_up.labels(service="order-executor").set(0)
        await notifier.close()
        await broker.close()
        await db.close()
        await alpaca.close()


def main() -> None:
    asyncio.run(run())


if __name__ == "__main__":
    main()