summaryrefslogtreecommitdiff
path: root/services/portfolio-manager
diff options
context:
space:
mode:
Diffstat (limited to 'services/portfolio-manager')
-rw-r--r--services/portfolio-manager/Dockerfile9
-rw-r--r--services/portfolio-manager/src/portfolio_manager/main.py43
-rw-r--r--services/portfolio-manager/tests/test_portfolio.py3
-rw-r--r--services/portfolio-manager/tests/test_snapshot.py3
4 files changed, 44 insertions, 14 deletions
diff --git a/services/portfolio-manager/Dockerfile b/services/portfolio-manager/Dockerfile
index b1a7681..0fa3f35 100644
--- a/services/portfolio-manager/Dockerfile
+++ b/services/portfolio-manager/Dockerfile
@@ -1,8 +1,15 @@
-FROM python:3.12-slim
+FROM python:3.12-slim AS builder
WORKDIR /app
COPY shared/ shared/
RUN pip install --no-cache-dir ./shared
COPY services/portfolio-manager/ services/portfolio-manager/
RUN pip install --no-cache-dir ./services/portfolio-manager
+
+FROM python:3.12-slim
+RUN useradd -r -s /bin/false appuser
+WORKDIR /app
+COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
+COPY --from=builder /usr/local/bin /usr/local/bin
ENV PYTHONPATH=/app
+USER appuser
CMD ["python", "-m", "portfolio_manager.main"]
diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py
index a6823ae..f885aa8 100644
--- a/services/portfolio-manager/src/portfolio_manager/main.py
+++ b/services/portfolio-manager/src/portfolio_manager/main.py
@@ -2,6 +2,10 @@
import asyncio
+import sqlalchemy.exc
+
+from portfolio_manager.config import PortfolioConfig
+from portfolio_manager.portfolio import PortfolioTracker
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import Event, OrderEvent
@@ -9,9 +13,7 @@ from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
-
-from portfolio_manager.config import PortfolioConfig
-from portfolio_manager.portfolio import PortfolioTracker
+from shared.shutdown import GracefulShutdown
ORDERS_STREAM = "orders"
@@ -51,8 +53,12 @@ async def snapshot_loop(
while True:
try:
await save_snapshot(db, tracker, notifier, log)
+ except (sqlalchemy.exc.OperationalError, ConnectionError, TimeoutError) as exc:
+ log.warning("snapshot_db_error", error=str(exc))
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("snapshot_data_error", error=str(exc))
except Exception as exc:
- log.error("snapshot_failed", error=str(exc))
+ log.error("snapshot_failed", error=str(exc), exc_info=True)
await asyncio.sleep(interval_hours * 3600)
@@ -61,10 +67,10 @@ async def run() -> None:
log = setup_logging("portfolio-manager", config.log_level, config.log_format)
metrics = ServiceMetrics("portfolio_manager")
notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id
+ bot_token=config.telegram_bot_token.get_secret_value(), chat_id=config.telegram_chat_id
)
- broker = RedisBroker(config.redis_url)
+ broker = RedisBroker(config.redis_url.get_secret_value())
tracker = PortfolioTracker()
health = HealthCheckServer(
@@ -76,13 +82,16 @@ async def run() -> None:
await health.start()
metrics.service_up.labels(service="portfolio-manager").set(1)
- db = Database(config.database_url)
+ db = Database(config.database_url.get_secret_value())
await db.connect()
snapshot_task = asyncio.create_task(
snapshot_loop(db, tracker, notifier, config.snapshot_interval_hours, log)
)
+ shutdown = GracefulShutdown()
+ shutdown.install_handlers()
+
GROUP = "portfolio-manager"
CONSUMER = "portfolio-1"
log.info("service_started", stream=ORDERS_STREAM)
@@ -108,12 +117,16 @@ async def run() -> None:
service="portfolio-manager", event_type="order"
).inc()
await broker.ack(ORDERS_STREAM, GROUP, msg_id)
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("pending_parse_error", error=str(exc), msg_id=msg_id)
+ await broker.ack(ORDERS_STREAM, GROUP, msg_id)
+ metrics.errors_total.labels(service="portfolio-manager", error_type="validation").inc()
except Exception as exc:
- log.error("pending_process_failed", error=str(exc), msg_id=msg_id)
+ log.error("pending_process_failed", error=str(exc), msg_id=msg_id, exc_info=True)
metrics.errors_total.labels(service="portfolio-manager", error_type="processing").inc()
try:
- while True:
+ while not shutdown.is_shutting_down:
messages = await broker.read_group(ORDERS_STREAM, GROUP, CONSUMER, count=10, block=1000)
for msg_id, msg in messages:
try:
@@ -134,13 +147,21 @@ async def run() -> None:
service="portfolio-manager", event_type="order"
).inc()
await broker.ack(ORDERS_STREAM, GROUP, msg_id)
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("message_parse_error", error=str(exc), msg_id=msg_id)
+ await broker.ack(ORDERS_STREAM, GROUP, msg_id)
+ metrics.errors_total.labels(
+ service="portfolio-manager", error_type="validation"
+ ).inc()
except Exception as exc:
- log.exception("message_processing_failed", error=str(exc), msg_id=msg_id)
+ log.error(
+ "message_processing_failed", error=str(exc), msg_id=msg_id, exc_info=True
+ )
metrics.errors_total.labels(
service="portfolio-manager", error_type="processing"
).inc()
except Exception as exc:
- log.error("fatal_error", error=str(exc))
+ log.error("fatal_error", error=str(exc), exc_info=True)
await notifier.send_error(str(exc), "portfolio-manager")
raise
finally:
diff --git a/services/portfolio-manager/tests/test_portfolio.py b/services/portfolio-manager/tests/test_portfolio.py
index 365dc1a..c8a6894 100644
--- a/services/portfolio-manager/tests/test_portfolio.py
+++ b/services/portfolio-manager/tests/test_portfolio.py
@@ -2,9 +2,10 @@
from decimal import Decimal
-from shared.models import Order, OrderSide, OrderStatus, OrderType
from portfolio_manager.portfolio import PortfolioTracker
+from shared.models import Order, OrderSide, OrderStatus, OrderType
+
def make_order(side: OrderSide, price: str, quantity: str) -> Order:
"""Helper to create a filled Order."""
diff --git a/services/portfolio-manager/tests/test_snapshot.py b/services/portfolio-manager/tests/test_snapshot.py
index ec5e92d..f2026e2 100644
--- a/services/portfolio-manager/tests/test_snapshot.py
+++ b/services/portfolio-manager/tests/test_snapshot.py
@@ -1,9 +1,10 @@
"""Tests for save_snapshot in portfolio-manager."""
-import pytest
from decimal import Decimal
from unittest.mock import AsyncMock, MagicMock
+import pytest
+
from shared.models import Position