1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
"""Order Executor Service entry point."""
import asyncio
from decimal import Decimal
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import Event, EventType
from shared.healthcheck import HealthCheckServer
from shared.exchange import create_exchange
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
from order_executor.config import ExecutorConfig
from order_executor.executor import OrderExecutor
from order_executor.risk_manager import RiskManager
# Health check port: base (HEALTH_PORT, default 8080) + offset
# data-collector: +0 (8080), strategy-engine: +1 (8081)
# order-executor: +2 (8082), portfolio-manager: +3 (8083)
HEALTH_PORT_OFFSET = 2
async def run() -> None:
config = ExecutorConfig()
log = setup_logging("order-executor", config.log_level, config.log_format)
metrics = ServiceMetrics("order_executor")
notifier = TelegramNotifier(
bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id
)
db = Database(config.database_url)
await db.connect()
await db.init_tables()
broker = RedisBroker(config.redis_url)
exchange = create_exchange(
exchange_id=config.exchange_id,
api_key=config.binance_api_key,
api_secret=config.binance_api_secret,
sandbox=config.exchange_sandbox,
)
risk_manager = RiskManager(
max_position_size=Decimal(str(config.risk_max_position_size)),
stop_loss_pct=Decimal(str(config.risk_stop_loss_pct)),
daily_loss_limit_pct=Decimal(str(config.risk_daily_loss_limit_pct)),
trailing_stop_pct=Decimal(str(config.risk_trailing_stop_pct)),
max_open_positions=config.risk_max_open_positions,
volatility_lookback=config.risk_volatility_lookback,
volatility_scale=config.risk_volatility_scale,
)
executor = OrderExecutor(
exchange=exchange,
risk_manager=risk_manager,
broker=broker,
db=db,
notifier=notifier,
dry_run=config.dry_run,
)
last_id = "$"
stream = "signals"
health = HealthCheckServer(
"order-executor", port=config.health_port + HEALTH_PORT_OFFSET, auth_token=config.metrics_auth_token
)
health.register_check("redis", broker.ping)
await health.start()
metrics.service_up.labels(service="order-executor").set(1)
log.info("service_started", stream=stream, dry_run=config.dry_run)
try:
while True:
messages = await broker.read(stream, last_id=last_id, count=10, block=5000)
for msg in messages:
try:
event = Event.from_dict(msg)
if event.type == EventType.SIGNAL:
signal = event.data
log.info(
"processing_signal", signal_id=str(signal.id), symbol=signal.symbol
)
await executor.execute(signal)
metrics.events_processed.labels(
service="order-executor", event_type="signal"
).inc()
except Exception as exc:
log.error("message_processing_failed", error=str(exc))
metrics.errors_total.labels(
service="order-executor", error_type="processing"
).inc()
if messages:
# Advance last_id to avoid re-reading — broker.read returns decoded dicts,
# so we track progress by re-reading with "0" for replaying or "$" for new only.
# Since we block on "$" we get only new messages each iteration.
pass
except Exception as exc:
log.error("fatal_error", error=str(exc))
await notifier.send_error(str(exc), "order-executor")
raise
finally:
metrics.service_up.labels(service="order-executor").set(0)
await notifier.close()
await broker.close()
await db.close()
await exchange.close()
def main() -> None:
asyncio.run(run())
if __name__ == "__main__":
main()
|