summaryrefslogtreecommitdiff
path: root/services/portfolio-manager/src/portfolio_manager/main.py
diff options
context:
space:
mode:
authorTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 15:56:35 +0900
committerTheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com>2026-04-01 15:56:35 +0900
commit33b14aaa2344b0fd95d1629627c3d135b24ae102 (patch)
tree90b214758bc3b076baa7711226a1a1be6268e72e /services/portfolio-manager/src/portfolio_manager/main.py
parent9360f1a800aa29b40399a2f3bfbfcf215a04e279 (diff)
feat: initial trading platform implementation
Binance spot crypto trading platform with microservices architecture: - shared: Pydantic models, Redis Streams broker, asyncpg DB layer - data-collector: Binance WebSocket/REST market data collection - strategy-engine: Plugin-based strategy execution (RSI, Grid) - order-executor: Order execution with risk management - portfolio-manager: Position tracking and PnL calculation - backtester: Historical strategy testing with simulator - cli: Click-based CLI for all operations - Docker Compose orchestration with Redis and PostgreSQL - 24 test files covering all modules
Diffstat (limited to 'services/portfolio-manager/src/portfolio_manager/main.py')
-rw-r--r--services/portfolio-manager/src/portfolio_manager/main.py56
1 files changed, 56 insertions, 0 deletions
diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py
new file mode 100644
index 0000000..cb7e6a8
--- /dev/null
+++ b/services/portfolio-manager/src/portfolio_manager/main.py
@@ -0,0 +1,56 @@
+"""Portfolio Manager Service entry point."""
+import asyncio
+import logging
+
+from shared.broker import RedisBroker
+from shared.events import Event, OrderEvent
+
+from portfolio_manager.config import PortfolioConfig
+from portfolio_manager.portfolio import PortfolioTracker
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+ORDERS_STREAM = "orders"
+
+
+async def run() -> None:
+ config = PortfolioConfig()
+ broker = RedisBroker(config.redis_url)
+ tracker = PortfolioTracker()
+
+ last_id = "$"
+ logger.info("Portfolio manager started, listening on stream=%s", ORDERS_STREAM)
+
+ try:
+ while True:
+ messages = await broker.read(ORDERS_STREAM, last_id=last_id, block=1000)
+ for msg in messages:
+ try:
+ event = Event.from_dict(msg)
+ if isinstance(event, OrderEvent):
+ order = event.data
+ tracker.apply_order(order)
+ logger.info(
+ "Applied order symbol=%s side=%s qty=%s price=%s",
+ order.symbol,
+ order.side,
+ order.quantity,
+ order.price,
+ )
+ positions = tracker.get_all_positions()
+ logger.info("Current positions count=%d", len(positions))
+ except Exception:
+ logger.exception("Failed to process message: %s", msg)
+ # Update last_id to the latest processed message id if broker returns ids
+ # Since broker.read returns parsed payloads (not ids), we use "$" to get new msgs
+ finally:
+ await broker.close()
+
+
+def main() -> None:
+ asyncio.run(run())
+
+
+if __name__ == "__main__":
+ main()