summaryrefslogtreecommitdiff
path: root/services/order-executor
diff options
context:
space:
mode:
Diffstat (limited to 'services/order-executor')
-rw-r--r--services/order-executor/src/order_executor/main.py40
1 files changed, 31 insertions, 9 deletions
diff --git a/services/order-executor/src/order_executor/main.py b/services/order-executor/src/order_executor/main.py
index 4a51d5d..930517e 100644
--- a/services/order-executor/src/order_executor/main.py
+++ b/services/order-executor/src/order_executor/main.py
@@ -62,7 +62,8 @@ async def run() -> None:
dry_run=config.dry_run,
)
- last_id = "$"
+ GROUP = "order-executor"
+ CONSUMER = "executor-1"
stream = "signals"
health = HealthCheckServer(
@@ -76,10 +77,35 @@ async def run() -> None:
log.info("service_started", stream=stream, dry_run=config.dry_run)
+ await broker.ensure_group(stream, GROUP)
+
+ # Process pending messages first (from previous crash)
+ pending = await broker.read_pending(stream, GROUP, CONSUMER)
+ for msg_id, msg in pending:
+ try:
+ event = Event.from_dict(msg)
+ if event.type == EventType.SIGNAL:
+ signal = event.data
+ log.info(
+ "processing_pending_signal", signal_id=str(signal.id), symbol=signal.symbol
+ )
+ await executor.execute(signal)
+ metrics.events_processed.labels(
+ service="order-executor", event_type="signal"
+ ).inc()
+ await broker.ack(stream, GROUP, msg_id)
+ except Exception as exc:
+ log.error("pending_process_failed", error=str(exc), msg_id=msg_id)
+ metrics.errors_total.labels(
+ service="order-executor", error_type="processing"
+ ).inc()
+
try:
while True:
- messages = await broker.read(stream, last_id=last_id, count=10, block=5000)
- for msg in messages:
+ messages = await broker.read_group(
+ stream, GROUP, CONSUMER, count=10, block=5000
+ )
+ for msg_id, msg in messages:
try:
event = Event.from_dict(msg)
if event.type == EventType.SIGNAL:
@@ -91,16 +117,12 @@ async def run() -> None:
metrics.events_processed.labels(
service="order-executor", event_type="signal"
).inc()
+ await broker.ack(stream, GROUP, msg_id)
except Exception as exc:
- log.error("message_processing_failed", error=str(exc))
+ log.error("message_processing_failed", error=str(exc), msg_id=msg_id)
metrics.errors_total.labels(
service="order-executor", error_type="processing"
).inc()
- if messages:
- # Advance last_id to avoid re-reading — broker.read returns decoded dicts,
- # so we track progress by re-reading with "0" for replaying or "$" for new only.
- # Since we block on "$" we get only new messages each iteration.
- pass
except Exception as exc:
log.error("fatal_error", error=str(exc))
await notifier.send_error(str(exc), "order-executor")