summaryrefslogtreecommitdiff
path: root/services/portfolio-manager/src
diff options
context:
space:
mode:
authorTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 15:56:35 +0900
committerTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 15:56:35 +0900
commit33b14aaa2344b0fd95d1629627c3d135b24ae102 (patch)
tree90b214758bc3b076baa7711226a1a1be6268e72e /services/portfolio-manager/src
parent9360f1a800aa29b40399a2f3bfbfcf215a04e279 (diff)
feat: initial trading platform implementation
Binance spot crypto trading platform with microservices architecture: - shared: Pydantic models, Redis Streams broker, asyncpg DB layer - data-collector: Binance WebSocket/REST market data collection - strategy-engine: Plugin-based strategy execution (RSI, Grid) - order-executor: Order execution with risk management - portfolio-manager: Position tracking and PnL calculation - backtester: Historical strategy testing with simulator - cli: Click-based CLI for all operations - Docker Compose orchestration with Redis and PostgreSQL - 24 test files covering all modules
Diffstat (limited to 'services/portfolio-manager/src')
-rw-r--r--services/portfolio-manager/src/portfolio_manager/__init__.py0
-rw-r--r--services/portfolio-manager/src/portfolio_manager/config.py6
-rw-r--r--services/portfolio-manager/src/portfolio_manager/main.py56
-rw-r--r--services/portfolio-manager/src/portfolio_manager/pnl.py21
-rw-r--r--services/portfolio-manager/src/portfolio_manager/portfolio.py62
5 files changed, 145 insertions, 0 deletions
diff --git a/services/portfolio-manager/src/portfolio_manager/__init__.py b/services/portfolio-manager/src/portfolio_manager/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/services/portfolio-manager/src/portfolio_manager/__init__.py
diff --git a/services/portfolio-manager/src/portfolio_manager/config.py b/services/portfolio-manager/src/portfolio_manager/config.py
new file mode 100644
index 0000000..bbd5049
--- /dev/null
+++ b/services/portfolio-manager/src/portfolio_manager/config.py
@@ -0,0 +1,6 @@
+"""Portfolio Manager configuration."""
+from shared.config import Settings
+
+
+class PortfolioConfig(Settings):
+ snapshot_interval_hours: int = 24
diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py
new file mode 100644
index 0000000..cb7e6a8
--- /dev/null
+++ b/services/portfolio-manager/src/portfolio_manager/main.py
@@ -0,0 +1,56 @@
+"""Portfolio Manager Service entry point."""
+import asyncio
+import logging
+
+from shared.broker import RedisBroker
+from shared.events import Event, OrderEvent
+
+from portfolio_manager.config import PortfolioConfig
+from portfolio_manager.portfolio import PortfolioTracker
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+ORDERS_STREAM = "orders"
+
+
+async def run() -> None:
+ config = PortfolioConfig()
+ broker = RedisBroker(config.redis_url)
+ tracker = PortfolioTracker()
+
+ last_id = "$"
+ logger.info("Portfolio manager started, listening on stream=%s", ORDERS_STREAM)
+
+ try:
+ while True:
+ messages = await broker.read(ORDERS_STREAM, last_id=last_id, block=1000)
+ for msg in messages:
+ try:
+ event = Event.from_dict(msg)
+ if isinstance(event, OrderEvent):
+ order = event.data
+ tracker.apply_order(order)
+ logger.info(
+ "Applied order symbol=%s side=%s qty=%s price=%s",
+ order.symbol,
+ order.side,
+ order.quantity,
+ order.price,
+ )
+ positions = tracker.get_all_positions()
+ logger.info("Current positions count=%d", len(positions))
+ except Exception:
+ logger.exception("Failed to process message: %s", msg)
+ # Update last_id to the latest processed message id if broker returns ids
+ # Since broker.read returns parsed payloads (not ids), we use "$" to get new msgs
+ finally:
+ await broker.close()
+
+
+def main() -> None:
+ asyncio.run(run())
+
+
+if __name__ == "__main__":
+ main()
diff --git a/services/portfolio-manager/src/portfolio_manager/pnl.py b/services/portfolio-manager/src/portfolio_manager/pnl.py
new file mode 100644
index 0000000..96f0da8
--- /dev/null
+++ b/services/portfolio-manager/src/portfolio_manager/pnl.py
@@ -0,0 +1,21 @@
+"""PnL calculation functions for the portfolio manager."""
+from decimal import Decimal
+
+
+def calculate_unrealized_pnl(
+ quantity: Decimal,
+ avg_entry_price: Decimal,
+ current_price: Decimal,
+) -> Decimal:
+ """Calculate unrealized PnL for an open position."""
+ return quantity * (current_price - avg_entry_price)
+
+
+def calculate_realized_pnl(
+ buy_price: Decimal,
+ sell_price: Decimal,
+ quantity: Decimal,
+ fee: Decimal = Decimal("0"),
+) -> Decimal:
+ """Calculate realized PnL for a completed trade."""
+ return quantity * (sell_price - buy_price) - fee
diff --git a/services/portfolio-manager/src/portfolio_manager/portfolio.py b/services/portfolio-manager/src/portfolio_manager/portfolio.py
new file mode 100644
index 0000000..59106bb
--- /dev/null
+++ b/services/portfolio-manager/src/portfolio_manager/portfolio.py
@@ -0,0 +1,62 @@
+"""Portfolio tracking for the portfolio manager service."""
+from decimal import Decimal
+
+from shared.models import Order, OrderSide, Position
+
+
+class _PositionState:
+ """Internal state for tracking a single symbol's position."""
+
+ def __init__(self) -> None:
+ self.quantity: Decimal = Decimal("0")
+ self.avg_entry: Decimal = Decimal("0")
+
+
+class PortfolioTracker:
+ """Tracks positions and updates them based on filled orders."""
+
+ def __init__(self) -> None:
+ self._positions: dict[str, _PositionState] = {}
+
+ def _get_or_create(self, symbol: str) -> _PositionState:
+ if symbol not in self._positions:
+ self._positions[symbol] = _PositionState()
+ return self._positions[symbol]
+
+ def apply_order(self, order: Order) -> None:
+ """Update internal position state based on a filled order."""
+ state = self._get_or_create(order.symbol)
+
+ if order.side == OrderSide.BUY:
+ # Weighted average entry price
+ total_cost = state.avg_entry * state.quantity + order.price * order.quantity
+ state.quantity += order.quantity
+ if state.quantity > Decimal("0"):
+ state.avg_entry = total_cost / state.quantity
+ elif order.side == OrderSide.SELL:
+ state.quantity -= order.quantity
+ # Keep avg_entry unchanged unless fully sold
+ if state.quantity <= Decimal("0"):
+ state.quantity = Decimal("0")
+ state.avg_entry = Decimal("0")
+
+ def get_position(self, symbol: str) -> Position | None:
+ """Return a Position model for the symbol, or None if no/zero position."""
+ state = self._positions.get(symbol)
+ if state is None or state.quantity <= Decimal("0"):
+ return None
+ return Position(
+ symbol=symbol,
+ quantity=state.quantity,
+ avg_entry_price=state.avg_entry,
+ current_price=state.avg_entry, # No live price here; caller can update
+ )
+
+ def get_all_positions(self) -> list[Position]:
+ """Return all non-zero positions."""
+ positions = []
+ for symbol in self._positions:
+ pos = self.get_position(symbol)
+ if pos is not None:
+ positions.append(pos)
+ return positions