diff options
Diffstat (limited to 'services/portfolio-manager/src')
5 files changed, 145 insertions, 0 deletions
diff --git a/services/portfolio-manager/src/portfolio_manager/__init__.py b/services/portfolio-manager/src/portfolio_manager/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/services/portfolio-manager/src/portfolio_manager/__init__.py diff --git a/services/portfolio-manager/src/portfolio_manager/config.py b/services/portfolio-manager/src/portfolio_manager/config.py new file mode 100644 index 0000000..bbd5049 --- /dev/null +++ b/services/portfolio-manager/src/portfolio_manager/config.py @@ -0,0 +1,6 @@ +"""Portfolio Manager configuration.""" +from shared.config import Settings + + +class PortfolioConfig(Settings): + snapshot_interval_hours: int = 24 diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py new file mode 100644 index 0000000..cb7e6a8 --- /dev/null +++ b/services/portfolio-manager/src/portfolio_manager/main.py @@ -0,0 +1,56 @@ +"""Portfolio Manager Service entry point.""" +import asyncio +import logging + +from shared.broker import RedisBroker +from shared.events import Event, OrderEvent + +from portfolio_manager.config import PortfolioConfig +from portfolio_manager.portfolio import PortfolioTracker + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +ORDERS_STREAM = "orders" + + +async def run() -> None: + config = PortfolioConfig() + broker = RedisBroker(config.redis_url) + tracker = PortfolioTracker() + + last_id = "$" + logger.info("Portfolio manager started, listening on stream=%s", ORDERS_STREAM) + + try: + while True: + messages = await broker.read(ORDERS_STREAM, last_id=last_id, block=1000) + for msg in messages: + try: + event = Event.from_dict(msg) + if isinstance(event, OrderEvent): + order = event.data + tracker.apply_order(order) + logger.info( + "Applied order symbol=%s side=%s qty=%s price=%s", + order.symbol, + order.side, + order.quantity, + order.price, + ) + positions = tracker.get_all_positions() + logger.info("Current positions count=%d", len(positions)) + except Exception: + logger.exception("Failed to process message: %s", msg) + # Update last_id to the latest processed message id if broker returns ids + # Since broker.read returns parsed payloads (not ids), we use "$" to get new msgs + finally: + await broker.close() + + +def main() -> None: + asyncio.run(run()) + + +if __name__ == "__main__": + main() diff --git a/services/portfolio-manager/src/portfolio_manager/pnl.py b/services/portfolio-manager/src/portfolio_manager/pnl.py new file mode 100644 index 0000000..96f0da8 --- /dev/null +++ b/services/portfolio-manager/src/portfolio_manager/pnl.py @@ -0,0 +1,21 @@ +"""PnL calculation functions for the portfolio manager.""" +from decimal import Decimal + + +def calculate_unrealized_pnl( + quantity: Decimal, + avg_entry_price: Decimal, + current_price: Decimal, +) -> Decimal: + """Calculate unrealized PnL for an open position.""" + return quantity * (current_price - avg_entry_price) + + +def calculate_realized_pnl( + buy_price: Decimal, + sell_price: Decimal, + quantity: Decimal, + fee: Decimal = Decimal("0"), +) -> Decimal: + """Calculate realized PnL for a completed trade.""" + return quantity * (sell_price - buy_price) - fee diff --git a/services/portfolio-manager/src/portfolio_manager/portfolio.py b/services/portfolio-manager/src/portfolio_manager/portfolio.py new file mode 100644 index 0000000..59106bb --- /dev/null +++ b/services/portfolio-manager/src/portfolio_manager/portfolio.py @@ -0,0 +1,62 @@ +"""Portfolio tracking for the portfolio manager service.""" +from decimal import Decimal + +from shared.models import Order, OrderSide, Position + + +class _PositionState: + """Internal state for tracking a single symbol's position.""" + + def __init__(self) -> None: + self.quantity: Decimal = Decimal("0") + self.avg_entry: Decimal = Decimal("0") + + +class PortfolioTracker: + """Tracks positions and updates them based on filled orders.""" + + def __init__(self) -> None: + self._positions: dict[str, _PositionState] = {} + + def _get_or_create(self, symbol: str) -> _PositionState: + if symbol not in self._positions: + self._positions[symbol] = _PositionState() + return self._positions[symbol] + + def apply_order(self, order: Order) -> None: + """Update internal position state based on a filled order.""" + state = self._get_or_create(order.symbol) + + if order.side == OrderSide.BUY: + # Weighted average entry price + total_cost = state.avg_entry * state.quantity + order.price * order.quantity + state.quantity += order.quantity + if state.quantity > Decimal("0"): + state.avg_entry = total_cost / state.quantity + elif order.side == OrderSide.SELL: + state.quantity -= order.quantity + # Keep avg_entry unchanged unless fully sold + if state.quantity <= Decimal("0"): + state.quantity = Decimal("0") + state.avg_entry = Decimal("0") + + def get_position(self, symbol: str) -> Position | None: + """Return a Position model for the symbol, or None if no/zero position.""" + state = self._positions.get(symbol) + if state is None or state.quantity <= Decimal("0"): + return None + return Position( + symbol=symbol, + quantity=state.quantity, + avg_entry_price=state.avg_entry, + current_price=state.avg_entry, # No live price here; caller can update + ) + + def get_all_positions(self) -> list[Position]: + """Return all non-zero positions.""" + positions = [] + for symbol in self._positions: + pos = self.get_position(symbol) + if pos is not None: + positions.append(pos) + return positions |
