summaryrefslogtreecommitdiff
path: root/services/portfolio-manager
diff options
context:
space:
mode:
Diffstat (limited to 'services/portfolio-manager')
-rw-r--r--services/portfolio-manager/Dockerfile7
-rw-r--r--services/portfolio-manager/pyproject.toml16
-rw-r--r--services/portfolio-manager/src/portfolio_manager/__init__.py0
-rw-r--r--services/portfolio-manager/src/portfolio_manager/config.py6
-rw-r--r--services/portfolio-manager/src/portfolio_manager/main.py56
-rw-r--r--services/portfolio-manager/src/portfolio_manager/pnl.py21
-rw-r--r--services/portfolio-manager/src/portfolio_manager/portfolio.py62
-rw-r--r--services/portfolio-manager/tests/__init__.py0
-rw-r--r--services/portfolio-manager/tests/test_pnl.py32
-rw-r--r--services/portfolio-manager/tests/test_portfolio.py57
10 files changed, 257 insertions, 0 deletions
diff --git a/services/portfolio-manager/Dockerfile b/services/portfolio-manager/Dockerfile
new file mode 100644
index 0000000..3f8587e
--- /dev/null
+++ b/services/portfolio-manager/Dockerfile
@@ -0,0 +1,7 @@
+FROM python:3.12-slim
+WORKDIR /app
+COPY shared/ shared/
+RUN pip install --no-cache-dir ./shared
+COPY services/portfolio-manager/ services/portfolio-manager/
+RUN pip install --no-cache-dir ./services/portfolio-manager
+CMD ["python", "-m", "portfolio_manager.main"]
diff --git a/services/portfolio-manager/pyproject.toml b/services/portfolio-manager/pyproject.toml
new file mode 100644
index 0000000..8245aa0
--- /dev/null
+++ b/services/portfolio-manager/pyproject.toml
@@ -0,0 +1,16 @@
+[project]
+name = "portfolio-manager"
+version = "0.1.0"
+description = "Portfolio tracking and PnL calculation service"
+requires-python = ">=3.12"
+dependencies = ["trading-shared"]
+
+[project.optional-dependencies]
+dev = ["pytest>=8.0", "pytest-asyncio>=0.23"]
+
+[build-system]
+requires = ["hatchling"]
+build-backend = "hatchling.build"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/portfolio_manager"]
diff --git a/services/portfolio-manager/src/portfolio_manager/__init__.py b/services/portfolio-manager/src/portfolio_manager/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/services/portfolio-manager/src/portfolio_manager/__init__.py
diff --git a/services/portfolio-manager/src/portfolio_manager/config.py b/services/portfolio-manager/src/portfolio_manager/config.py
new file mode 100644
index 0000000..bbd5049
--- /dev/null
+++ b/services/portfolio-manager/src/portfolio_manager/config.py
@@ -0,0 +1,6 @@
+"""Portfolio Manager configuration."""
+from shared.config import Settings
+
+
+class PortfolioConfig(Settings):
+ snapshot_interval_hours: int = 24
diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py
new file mode 100644
index 0000000..cb7e6a8
--- /dev/null
+++ b/services/portfolio-manager/src/portfolio_manager/main.py
@@ -0,0 +1,56 @@
+"""Portfolio Manager Service entry point."""
+import asyncio
+import logging
+
+from shared.broker import RedisBroker
+from shared.events import Event, OrderEvent
+
+from portfolio_manager.config import PortfolioConfig
+from portfolio_manager.portfolio import PortfolioTracker
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+ORDERS_STREAM = "orders"
+
+
+async def run() -> None:
+ config = PortfolioConfig()
+ broker = RedisBroker(config.redis_url)
+ tracker = PortfolioTracker()
+
+ last_id = "$"
+ logger.info("Portfolio manager started, listening on stream=%s", ORDERS_STREAM)
+
+ try:
+ while True:
+ messages = await broker.read(ORDERS_STREAM, last_id=last_id, block=1000)
+ for msg in messages:
+ try:
+ event = Event.from_dict(msg)
+ if isinstance(event, OrderEvent):
+ order = event.data
+ tracker.apply_order(order)
+ logger.info(
+ "Applied order symbol=%s side=%s qty=%s price=%s",
+ order.symbol,
+ order.side,
+ order.quantity,
+ order.price,
+ )
+ positions = tracker.get_all_positions()
+ logger.info("Current positions count=%d", len(positions))
+ except Exception:
+ logger.exception("Failed to process message: %s", msg)
+ # Update last_id to the latest processed message id if broker returns ids
+ # Since broker.read returns parsed payloads (not ids), we use "$" to get new msgs
+ finally:
+ await broker.close()
+
+
+def main() -> None:
+ asyncio.run(run())
+
+
+if __name__ == "__main__":
+ main()
diff --git a/services/portfolio-manager/src/portfolio_manager/pnl.py b/services/portfolio-manager/src/portfolio_manager/pnl.py
new file mode 100644
index 0000000..96f0da8
--- /dev/null
+++ b/services/portfolio-manager/src/portfolio_manager/pnl.py
@@ -0,0 +1,21 @@
+"""PnL calculation functions for the portfolio manager."""
+from decimal import Decimal
+
+
+def calculate_unrealized_pnl(
+ quantity: Decimal,
+ avg_entry_price: Decimal,
+ current_price: Decimal,
+) -> Decimal:
+ """Calculate unrealized PnL for an open position."""
+ return quantity * (current_price - avg_entry_price)
+
+
+def calculate_realized_pnl(
+ buy_price: Decimal,
+ sell_price: Decimal,
+ quantity: Decimal,
+ fee: Decimal = Decimal("0"),
+) -> Decimal:
+ """Calculate realized PnL for a completed trade."""
+ return quantity * (sell_price - buy_price) - fee
diff --git a/services/portfolio-manager/src/portfolio_manager/portfolio.py b/services/portfolio-manager/src/portfolio_manager/portfolio.py
new file mode 100644
index 0000000..59106bb
--- /dev/null
+++ b/services/portfolio-manager/src/portfolio_manager/portfolio.py
@@ -0,0 +1,62 @@
+"""Portfolio tracking for the portfolio manager service."""
+from decimal import Decimal
+
+from shared.models import Order, OrderSide, Position
+
+
+class _PositionState:
+ """Internal state for tracking a single symbol's position."""
+
+ def __init__(self) -> None:
+ self.quantity: Decimal = Decimal("0")
+ self.avg_entry: Decimal = Decimal("0")
+
+
+class PortfolioTracker:
+ """Tracks positions and updates them based on filled orders."""
+
+ def __init__(self) -> None:
+ self._positions: dict[str, _PositionState] = {}
+
+ def _get_or_create(self, symbol: str) -> _PositionState:
+ if symbol not in self._positions:
+ self._positions[symbol] = _PositionState()
+ return self._positions[symbol]
+
+ def apply_order(self, order: Order) -> None:
+ """Update internal position state based on a filled order."""
+ state = self._get_or_create(order.symbol)
+
+ if order.side == OrderSide.BUY:
+ # Weighted average entry price
+ total_cost = state.avg_entry * state.quantity + order.price * order.quantity
+ state.quantity += order.quantity
+ if state.quantity > Decimal("0"):
+ state.avg_entry = total_cost / state.quantity
+ elif order.side == OrderSide.SELL:
+ state.quantity -= order.quantity
+ # Keep avg_entry unchanged unless fully sold
+ if state.quantity <= Decimal("0"):
+ state.quantity = Decimal("0")
+ state.avg_entry = Decimal("0")
+
+ def get_position(self, symbol: str) -> Position | None:
+ """Return a Position model for the symbol, or None if no/zero position."""
+ state = self._positions.get(symbol)
+ if state is None or state.quantity <= Decimal("0"):
+ return None
+ return Position(
+ symbol=symbol,
+ quantity=state.quantity,
+ avg_entry_price=state.avg_entry,
+ current_price=state.avg_entry, # No live price here; caller can update
+ )
+
+ def get_all_positions(self) -> list[Position]:
+ """Return all non-zero positions."""
+ positions = []
+ for symbol in self._positions:
+ pos = self.get_position(symbol)
+ if pos is not None:
+ positions.append(pos)
+ return positions
diff --git a/services/portfolio-manager/tests/__init__.py b/services/portfolio-manager/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/services/portfolio-manager/tests/__init__.py
diff --git a/services/portfolio-manager/tests/test_pnl.py b/services/portfolio-manager/tests/test_pnl.py
new file mode 100644
index 0000000..4462adc
--- /dev/null
+++ b/services/portfolio-manager/tests/test_pnl.py
@@ -0,0 +1,32 @@
+"""Tests for PnL calculation functions."""
+from decimal import Decimal
+
+from portfolio_manager.pnl import calculate_realized_pnl, calculate_unrealized_pnl
+
+
+def test_unrealized_pnl_profit() -> None:
+ result = calculate_unrealized_pnl(
+ quantity=Decimal("0.1"),
+ avg_entry_price=Decimal("50000"),
+ current_price=Decimal("55000"),
+ )
+ assert result == Decimal("500")
+
+
+def test_unrealized_pnl_loss() -> None:
+ result = calculate_unrealized_pnl(
+ quantity=Decimal("0.1"),
+ avg_entry_price=Decimal("50000"),
+ current_price=Decimal("45000"),
+ )
+ assert result == Decimal("-500")
+
+
+def test_realized_pnl_single_trade() -> None:
+ result = calculate_realized_pnl(
+ buy_price=Decimal("50000"),
+ sell_price=Decimal("55000"),
+ quantity=Decimal("0.1"),
+ fee=Decimal("5.5"),
+ )
+ assert result == Decimal("494.5")
diff --git a/services/portfolio-manager/tests/test_portfolio.py b/services/portfolio-manager/tests/test_portfolio.py
new file mode 100644
index 0000000..26319ca
--- /dev/null
+++ b/services/portfolio-manager/tests/test_portfolio.py
@@ -0,0 +1,57 @@
+"""Tests for PortfolioTracker."""
+from decimal import Decimal
+
+from shared.models import Order, OrderSide, OrderStatus, OrderType
+from portfolio_manager.portfolio import PortfolioTracker
+
+
+def make_order(side: OrderSide, price: str, quantity: str) -> Order:
+ """Helper to create a filled Order."""
+ return Order(
+ signal_id="test-signal",
+ symbol="BTC/USDT",
+ side=side,
+ type=OrderType.MARKET,
+ price=Decimal(price),
+ quantity=Decimal(quantity),
+ status=OrderStatus.FILLED,
+ )
+
+
+def test_portfolio_add_buy_order() -> None:
+ tracker = PortfolioTracker()
+ order = make_order(OrderSide.BUY, "50000", "0.1")
+ tracker.apply_order(order)
+
+ position = tracker.get_position("BTC/USDT")
+ assert position is not None
+ assert position.quantity == Decimal("0.1")
+ assert position.avg_entry_price == Decimal("50000")
+
+
+def test_portfolio_add_multiple_buys() -> None:
+ tracker = PortfolioTracker()
+ tracker.apply_order(make_order(OrderSide.BUY, "50000", "0.1"))
+ tracker.apply_order(make_order(OrderSide.BUY, "52000", "0.1"))
+
+ position = tracker.get_position("BTC/USDT")
+ assert position is not None
+ assert position.quantity == Decimal("0.2")
+ assert position.avg_entry_price == Decimal("51000")
+
+
+def test_portfolio_sell_reduces_position() -> None:
+ tracker = PortfolioTracker()
+ tracker.apply_order(make_order(OrderSide.BUY, "50000", "0.2"))
+ tracker.apply_order(make_order(OrderSide.SELL, "55000", "0.1"))
+
+ position = tracker.get_position("BTC/USDT")
+ assert position is not None
+ assert position.quantity == Decimal("0.1")
+ assert position.avg_entry_price == Decimal("50000")
+
+
+def test_portfolio_no_position_returns_none() -> None:
+ tracker = PortfolioTracker()
+ position = tracker.get_position("ETH/USDT")
+ assert position is None