diff options
| author | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-01 15:56:35 +0900 |
|---|---|---|
| committer | TheSiahxyz <164138827+TheSiahxyz@users.noreply.github.com> | 2026-04-01 15:56:35 +0900 |
| commit | 33b14aaa2344b0fd95d1629627c3d135b24ae102 (patch) | |
| tree | 90b214758bc3b076baa7711226a1a1be6268e72e /services/portfolio-manager/src/portfolio_manager/main.py | |
| parent | 9360f1a800aa29b40399a2f3bfbfcf215a04e279 (diff) | |
feat: initial trading platform implementation
Binance spot crypto trading platform with microservices architecture:
- shared: Pydantic models, Redis Streams broker, asyncpg DB layer
- data-collector: Binance WebSocket/REST market data collection
- strategy-engine: Plugin-based strategy execution (RSI, Grid)
- order-executor: Order execution with risk management
- portfolio-manager: Position tracking and PnL calculation
- backtester: Historical strategy testing with simulator
- cli: Click-based CLI for all operations
- Docker Compose orchestration with Redis and PostgreSQL
- 24 test files covering all modules
Diffstat (limited to 'services/portfolio-manager/src/portfolio_manager/main.py')
| -rw-r--r-- | services/portfolio-manager/src/portfolio_manager/main.py | 56 |
1 files changed, 56 insertions, 0 deletions
diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py new file mode 100644 index 0000000..cb7e6a8 --- /dev/null +++ b/services/portfolio-manager/src/portfolio_manager/main.py @@ -0,0 +1,56 @@ +"""Portfolio Manager Service entry point.""" +import asyncio +import logging + +from shared.broker import RedisBroker +from shared.events import Event, OrderEvent + +from portfolio_manager.config import PortfolioConfig +from portfolio_manager.portfolio import PortfolioTracker + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +ORDERS_STREAM = "orders" + + +async def run() -> None: + config = PortfolioConfig() + broker = RedisBroker(config.redis_url) + tracker = PortfolioTracker() + + last_id = "$" + logger.info("Portfolio manager started, listening on stream=%s", ORDERS_STREAM) + + try: + while True: + messages = await broker.read(ORDERS_STREAM, last_id=last_id, block=1000) + for msg in messages: + try: + event = Event.from_dict(msg) + if isinstance(event, OrderEvent): + order = event.data + tracker.apply_order(order) + logger.info( + "Applied order symbol=%s side=%s qty=%s price=%s", + order.symbol, + order.side, + order.quantity, + order.price, + ) + positions = tracker.get_all_positions() + logger.info("Current positions count=%d", len(positions)) + except Exception: + logger.exception("Failed to process message: %s", msg) + # Update last_id to the latest processed message id if broker returns ids + # Since broker.read returns parsed payloads (not ids), we use "$" to get new msgs + finally: + await broker.close() + + +def main() -> None: + asyncio.run(run()) + + +if __name__ == "__main__": + main() |
