"""Portfolio Manager Service entry point.""" import asyncio import logging from shared.broker import RedisBroker from shared.events import Event, OrderEvent from portfolio_manager.config import PortfolioConfig from portfolio_manager.portfolio import PortfolioTracker logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) ORDERS_STREAM = "orders" async def run() -> None: config = PortfolioConfig() broker = RedisBroker(config.redis_url) tracker = PortfolioTracker() last_id = "$" logger.info("Portfolio manager started, listening on stream=%s", ORDERS_STREAM) try: while True: messages = await broker.read(ORDERS_STREAM, last_id=last_id, block=1000) for msg in messages: try: event = Event.from_dict(msg) if isinstance(event, OrderEvent): order = event.data tracker.apply_order(order) logger.info( "Applied order symbol=%s side=%s qty=%s price=%s", order.symbol, order.side, order.quantity, order.price, ) positions = tracker.get_all_positions() logger.info("Current positions count=%d", len(positions)) except Exception: logger.exception("Failed to process message: %s", msg) # Update last_id to the latest processed message id if broker returns ids # Since broker.read returns parsed payloads (not ids), we use "$" to get new msgs finally: await broker.close() def main() -> None: asyncio.run(run()) if __name__ == "__main__": main()