summaryrefslogtreecommitdiff
path: root/services/order-executor/src/order_executor/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'services/order-executor/src/order_executor/main.py')
-rw-r--r--services/order-executor/src/order_executor/main.py33
1 files changed, 25 insertions, 8 deletions
diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py
index b57c513..7f0578d 100644
--- a/services/order-executor/src/order_executor/main.py
+++ b/services/order-executor/src/order_executor/main.py
@@ -1,6 +1,5 @@
"""Order Executor Service entry point."""
import asyncio
-import logging
from decimal import Decimal
import ccxt.async_support as ccxt
@@ -8,18 +7,21 @@ import ccxt.async_support as ccxt
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import Event, EventType
+from shared.healthcheck import HealthCheckServer
+from shared.logging import setup_logging
+from shared.metrics import ServiceMetrics
+from shared.notifier import TelegramNotifier
from order_executor.config import ExecutorConfig
from order_executor.executor import OrderExecutor
from order_executor.risk_manager import RiskManager
-logging.basicConfig(level=logging.INFO)
-logger = logging.getLogger(__name__)
-
async def run() -> None:
config = ExecutorConfig()
- logging.getLogger().setLevel(config.log_level)
+ log = setup_logging("order-executor", config.log_level, config.log_format)
+ metrics = ServiceMetrics("order_executor")
+ notifier = TelegramNotifier(bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id)
db = Database(config.database_url)
await db.connect()
@@ -45,12 +47,19 @@ async def run() -> None:
risk_manager=risk_manager,
broker=broker,
db=db,
+ notifier=notifier,
dry_run=config.dry_run,
)
last_id = "$"
stream = "signals"
- logger.info("Order executor started, listening on stream=%s dry_run=%s", stream, config.dry_run)
+
+ health = HealthCheckServer("order-executor", port=config.health_port + 2)
+ health.register_check("redis", broker.ping)
+ await health.start()
+ metrics.service_up.labels(service="order-executor").set(1)
+
+ log.info("service_started", stream=stream, dry_run=config.dry_run)
try:
while True:
@@ -60,16 +69,24 @@ async def run() -> None:
event = Event.from_dict(msg)
if event.type == EventType.SIGNAL:
signal = event.data
- logger.info("Processing signal %s for %s", signal.id, signal.symbol)
+ log.info("processing_signal", signal_id=str(signal.id), symbol=signal.symbol)
await executor.execute(signal)
+ metrics.events_processed.labels(service="order-executor", event_type="signal").inc()
except Exception as exc:
- logger.error("Failed to process message: %s", exc)
+ log.error("message_processing_failed", error=str(exc))
+ metrics.errors_total.labels(service="order-executor", error_type="processing").inc()
if messages:
# Advance last_id to avoid re-reading — broker.read returns decoded dicts,
# so we track progress by re-reading with "0" for replaying or "$" for new only.
# Since we block on "$" we get only new messages each iteration.
pass
+ except Exception as exc:
+ log.error("fatal_error", error=str(exc))
+ await notifier.send_error(str(exc), "order-executor")
+ raise
finally:
+ metrics.service_up.labels(service="order-executor").set(0)
+ await notifier.close()
await broker.close()
await db.close()
await exchange.close()