summaryrefslogtreecommitdiff
path: root/services/portfolio-manager/src/portfolio_manager/main.py
diff options
context:
space:
mode:
authorTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 17:56:50 +0900
committerTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 17:56:50 +0900
commit13a9b2c80bb3eb1353cf2d49bdbf7d0dbd858ccc (patch)
tree4595b83f1ba4fe5d1bdf4694f53496120956085a /services/portfolio-manager/src/portfolio_manager/main.py
parentfa7e1dc44787592da647bdda0a63310be0cfcc8b (diff)
feat(broker): add Redis consumer groups for reliable message processing
Diffstat (limited to 'services/portfolio-manager/src/portfolio_manager/main.py')
-rw-r--r--services/portfolio-manager/src/portfolio_manager/main.py41
1 files changed, 35 insertions, 6 deletions
diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py
index a7f1a14..87e4c64 100644
--- a/services/portfolio-manager/src/portfolio_manager/main.py
+++ b/services/portfolio-manager/src/portfolio_manager/main.py
@@ -84,13 +84,43 @@ async def run() -> None:
snapshot_loop(db, tracker, notifier, config.snapshot_interval_hours, log)
)
- last_id = "$"
+ GROUP = "portfolio-manager"
+ CONSUMER = "portfolio-1"
log.info("service_started", stream=ORDERS_STREAM)
+ await broker.ensure_group(ORDERS_STREAM, GROUP)
+
+ # Process pending messages first (from previous crash)
+ pending = await broker.read_pending(ORDERS_STREAM, GROUP, CONSUMER)
+ for msg_id, msg in pending:
+ try:
+ event = Event.from_dict(msg)
+ if isinstance(event, OrderEvent):
+ order = event.data
+ tracker.apply_order(order)
+ log.info(
+ "pending_order_applied",
+ symbol=order.symbol,
+ side=str(order.side),
+ quantity=str(order.quantity),
+ price=str(order.price),
+ )
+ metrics.events_processed.labels(
+ service="portfolio-manager", event_type="order"
+ ).inc()
+ await broker.ack(ORDERS_STREAM, GROUP, msg_id)
+ except Exception as exc:
+ log.error("pending_process_failed", error=str(exc), msg_id=msg_id)
+ metrics.errors_total.labels(
+ service="portfolio-manager", error_type="processing"
+ ).inc()
+
try:
while True:
- messages = await broker.read(ORDERS_STREAM, last_id=last_id, block=1000)
- for msg in messages:
+ messages = await broker.read_group(
+ ORDERS_STREAM, GROUP, CONSUMER, count=10, block=1000
+ )
+ for msg_id, msg in messages:
try:
event = Event.from_dict(msg)
if isinstance(event, OrderEvent):
@@ -108,13 +138,12 @@ async def run() -> None:
metrics.events_processed.labels(
service="portfolio-manager", event_type="order"
).inc()
+ await broker.ack(ORDERS_STREAM, GROUP, msg_id)
except Exception as exc:
- log.exception("message_processing_failed", error=str(exc))
+ log.exception("message_processing_failed", error=str(exc), msg_id=msg_id)
metrics.errors_total.labels(
service="portfolio-manager", error_type="processing"
).inc()
- # Update last_id to the latest processed message id if broker returns ids
- # Since broker.read returns parsed payloads (not ids), we use "$" to get new msgs
except Exception as exc:
log.error("fatal_error", error=str(exc))
await notifier.send_error(str(exc), "portfolio-manager")