summaryrefslogtreecommitdiff
path: root/services/portfolio-manager
diff options
context:
space:
mode:
Diffstat (limited to 'services/portfolio-manager')
-rw-r--r--services/portfolio-manager/Dockerfile9
-rw-r--r--services/portfolio-manager/src/portfolio_manager/main.py43
-rw-r--r--services/portfolio-manager/tests/test_portfolio.py27
-rw-r--r--services/portfolio-manager/tests/test_snapshot.py5
4 files changed, 57 insertions, 27 deletions
diff --git a/services/portfolio-manager/Dockerfile b/services/portfolio-manager/Dockerfile
index b1a7681..0fa3f35 100644
--- a/services/portfolio-manager/Dockerfile
+++ b/services/portfolio-manager/Dockerfile
@@ -1,8 +1,15 @@
-FROM python:3.12-slim
+FROM python:3.12-slim AS builder
WORKDIR /app
COPY shared/ shared/
RUN pip install --no-cache-dir ./shared
COPY services/portfolio-manager/ services/portfolio-manager/
RUN pip install --no-cache-dir ./services/portfolio-manager
+
+FROM python:3.12-slim
+RUN useradd -r -s /bin/false appuser
+WORKDIR /app
+COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
+COPY --from=builder /usr/local/bin /usr/local/bin
ENV PYTHONPATH=/app
+USER appuser
CMD ["python", "-m", "portfolio_manager.main"]
diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py
index a6823ae..f885aa8 100644
--- a/services/portfolio-manager/src/portfolio_manager/main.py
+++ b/services/portfolio-manager/src/portfolio_manager/main.py
@@ -2,6 +2,10 @@
import asyncio
+import sqlalchemy.exc
+
+from portfolio_manager.config import PortfolioConfig
+from portfolio_manager.portfolio import PortfolioTracker
from shared.broker import RedisBroker
from shared.db import Database
from shared.events import Event, OrderEvent
@@ -9,9 +13,7 @@ from shared.healthcheck import HealthCheckServer
from shared.logging import setup_logging
from shared.metrics import ServiceMetrics
from shared.notifier import TelegramNotifier
-
-from portfolio_manager.config import PortfolioConfig
-from portfolio_manager.portfolio import PortfolioTracker
+from shared.shutdown import GracefulShutdown
ORDERS_STREAM = "orders"
@@ -51,8 +53,12 @@ async def snapshot_loop(
while True:
try:
await save_snapshot(db, tracker, notifier, log)
+ except (sqlalchemy.exc.OperationalError, ConnectionError, TimeoutError) as exc:
+ log.warning("snapshot_db_error", error=str(exc))
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("snapshot_data_error", error=str(exc))
except Exception as exc:
- log.error("snapshot_failed", error=str(exc))
+ log.error("snapshot_failed", error=str(exc), exc_info=True)
await asyncio.sleep(interval_hours * 3600)
@@ -61,10 +67,10 @@ async def run() -> None:
log = setup_logging("portfolio-manager", config.log_level, config.log_format)
metrics = ServiceMetrics("portfolio_manager")
notifier = TelegramNotifier(
- bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id
+ bot_token=config.telegram_bot_token.get_secret_value(), chat_id=config.telegram_chat_id
)
- broker = RedisBroker(config.redis_url)
+ broker = RedisBroker(config.redis_url.get_secret_value())
tracker = PortfolioTracker()
health = HealthCheckServer(
@@ -76,13 +82,16 @@ async def run() -> None:
await health.start()
metrics.service_up.labels(service="portfolio-manager").set(1)
- db = Database(config.database_url)
+ db = Database(config.database_url.get_secret_value())
await db.connect()
snapshot_task = asyncio.create_task(
snapshot_loop(db, tracker, notifier, config.snapshot_interval_hours, log)
)
+ shutdown = GracefulShutdown()
+ shutdown.install_handlers()
+
GROUP = "portfolio-manager"
CONSUMER = "portfolio-1"
log.info("service_started", stream=ORDERS_STREAM)
@@ -108,12 +117,16 @@ async def run() -> None:
service="portfolio-manager", event_type="order"
).inc()
await broker.ack(ORDERS_STREAM, GROUP, msg_id)
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("pending_parse_error", error=str(exc), msg_id=msg_id)
+ await broker.ack(ORDERS_STREAM, GROUP, msg_id)
+ metrics.errors_total.labels(service="portfolio-manager", error_type="validation").inc()
except Exception as exc:
- log.error("pending_process_failed", error=str(exc), msg_id=msg_id)
+ log.error("pending_process_failed", error=str(exc), msg_id=msg_id, exc_info=True)
metrics.errors_total.labels(service="portfolio-manager", error_type="processing").inc()
try:
- while True:
+ while not shutdown.is_shutting_down:
messages = await broker.read_group(ORDERS_STREAM, GROUP, CONSUMER, count=10, block=1000)
for msg_id, msg in messages:
try:
@@ -134,13 +147,21 @@ async def run() -> None:
service="portfolio-manager", event_type="order"
).inc()
await broker.ack(ORDERS_STREAM, GROUP, msg_id)
+ except (ValueError, KeyError, TypeError) as exc:
+ log.warning("message_parse_error", error=str(exc), msg_id=msg_id)
+ await broker.ack(ORDERS_STREAM, GROUP, msg_id)
+ metrics.errors_total.labels(
+ service="portfolio-manager", error_type="validation"
+ ).inc()
except Exception as exc:
- log.exception("message_processing_failed", error=str(exc), msg_id=msg_id)
+ log.error(
+ "message_processing_failed", error=str(exc), msg_id=msg_id, exc_info=True
+ )
metrics.errors_total.labels(
service="portfolio-manager", error_type="processing"
).inc()
except Exception as exc:
- log.error("fatal_error", error=str(exc))
+ log.error("fatal_error", error=str(exc), exc_info=True)
await notifier.send_error(str(exc), "portfolio-manager")
raise
finally:
diff --git a/services/portfolio-manager/tests/test_portfolio.py b/services/portfolio-manager/tests/test_portfolio.py
index 768e071..c8a6894 100644
--- a/services/portfolio-manager/tests/test_portfolio.py
+++ b/services/portfolio-manager/tests/test_portfolio.py
@@ -2,15 +2,16 @@
from decimal import Decimal
-from shared.models import Order, OrderSide, OrderStatus, OrderType
from portfolio_manager.portfolio import PortfolioTracker
+from shared.models import Order, OrderSide, OrderStatus, OrderType
+
def make_order(side: OrderSide, price: str, quantity: str) -> Order:
"""Helper to create a filled Order."""
return Order(
signal_id="test-signal",
- symbol="BTC/USDT",
+ symbol="AAPL",
side=side,
type=OrderType.MARKET,
price=Decimal(price),
@@ -24,7 +25,7 @@ def test_portfolio_add_buy_order() -> None:
order = make_order(OrderSide.BUY, "50000", "0.1")
tracker.apply_order(order)
- position = tracker.get_position("BTC/USDT")
+ position = tracker.get_position("AAPL")
assert position is not None
assert position.quantity == Decimal("0.1")
assert position.avg_entry_price == Decimal("50000")
@@ -35,7 +36,7 @@ def test_portfolio_add_multiple_buys() -> None:
tracker.apply_order(make_order(OrderSide.BUY, "50000", "0.1"))
tracker.apply_order(make_order(OrderSide.BUY, "52000", "0.1"))
- position = tracker.get_position("BTC/USDT")
+ position = tracker.get_position("AAPL")
assert position is not None
assert position.quantity == Decimal("0.2")
assert position.avg_entry_price == Decimal("51000")
@@ -46,7 +47,7 @@ def test_portfolio_sell_reduces_position() -> None:
tracker.apply_order(make_order(OrderSide.BUY, "50000", "0.2"))
tracker.apply_order(make_order(OrderSide.SELL, "55000", "0.1"))
- position = tracker.get_position("BTC/USDT")
+ position = tracker.get_position("AAPL")
assert position is not None
assert position.quantity == Decimal("0.1")
assert position.avg_entry_price == Decimal("50000")
@@ -54,7 +55,7 @@ def test_portfolio_sell_reduces_position() -> None:
def test_portfolio_no_position_returns_none() -> None:
tracker = PortfolioTracker()
- position = tracker.get_position("ETH/USDT")
+ position = tracker.get_position("MSFT")
assert position is None
@@ -66,7 +67,7 @@ def test_realized_pnl_on_sell() -> None:
tracker.apply_order(
Order(
signal_id="s1",
- symbol="BTCUSDT",
+ symbol="AAPL",
side=OrderSide.BUY,
type=OrderType.MARKET,
price=Decimal("50000"),
@@ -80,7 +81,7 @@ def test_realized_pnl_on_sell() -> None:
tracker.apply_order(
Order(
signal_id="s2",
- symbol="BTCUSDT",
+ symbol="AAPL",
side=OrderSide.SELL,
type=OrderType.MARKET,
price=Decimal("55000"),
@@ -98,7 +99,7 @@ def test_realized_pnl_on_loss() -> None:
tracker.apply_order(
Order(
signal_id="s1",
- symbol="BTCUSDT",
+ symbol="AAPL",
side=OrderSide.BUY,
type=OrderType.MARKET,
price=Decimal("50000"),
@@ -109,7 +110,7 @@ def test_realized_pnl_on_loss() -> None:
tracker.apply_order(
Order(
signal_id="s2",
- symbol="BTCUSDT",
+ symbol="AAPL",
side=OrderSide.SELL,
type=OrderType.MARKET,
price=Decimal("45000"),
@@ -128,7 +129,7 @@ def test_realized_pnl_accumulates() -> None:
tracker.apply_order(
Order(
signal_id="s1",
- symbol="BTCUSDT",
+ symbol="AAPL",
side=OrderSide.BUY,
type=OrderType.MARKET,
price=Decimal("50000"),
@@ -141,7 +142,7 @@ def test_realized_pnl_accumulates() -> None:
tracker.apply_order(
Order(
signal_id="s2",
- symbol="BTCUSDT",
+ symbol="AAPL",
side=OrderSide.SELL,
type=OrderType.MARKET,
price=Decimal("55000"),
@@ -154,7 +155,7 @@ def test_realized_pnl_accumulates() -> None:
tracker.apply_order(
Order(
signal_id="s3",
- symbol="BTCUSDT",
+ symbol="AAPL",
side=OrderSide.SELL,
type=OrderType.MARKET,
price=Decimal("60000"),
diff --git a/services/portfolio-manager/tests/test_snapshot.py b/services/portfolio-manager/tests/test_snapshot.py
index a464599..f2026e2 100644
--- a/services/portfolio-manager/tests/test_snapshot.py
+++ b/services/portfolio-manager/tests/test_snapshot.py
@@ -1,9 +1,10 @@
"""Tests for save_snapshot in portfolio-manager."""
-import pytest
from decimal import Decimal
from unittest.mock import AsyncMock, MagicMock
+import pytest
+
from shared.models import Position
@@ -13,7 +14,7 @@ class TestSaveSnapshot:
from portfolio_manager.main import save_snapshot
pos = Position(
- symbol="BTCUSDT",
+ symbol="AAPL",
quantity=Decimal("0.5"),
avg_entry_price=Decimal("50000"),
current_price=Decimal("52000"),