summaryrefslogtreecommitdiff
path: root/services/portfolio-manager
diff options
context:
space:
mode:
Diffstat (limited to 'services/portfolio-manager')
-rw-r--r--services/portfolio-manager/src/portfolio_manager/main.py45
1 files changed, 31 insertions, 14 deletions
diff --git a/services/portfolio-manager/src/portfolio_manager/main.py b/services/portfolio-manager/src/portfolio_manager/main.py
index cb7e6a8..56624f7 100644
--- a/services/portfolio-manager/src/portfolio_manager/main.py
+++ b/services/portfolio-manager/src/portfolio_manager/main.py
@@ -1,26 +1,35 @@
"""Portfolio Manager Service entry point."""
import asyncio
-import logging
from shared.broker import RedisBroker
from shared.events import Event, OrderEvent
+from shared.healthcheck import HealthCheckServer
+from shared.logging import setup_logging
+from shared.metrics import ServiceMetrics
+from shared.notifier import TelegramNotifier
from portfolio_manager.config import PortfolioConfig
from portfolio_manager.portfolio import PortfolioTracker
-logging.basicConfig(level=logging.INFO)
-logger = logging.getLogger(__name__)
-
ORDERS_STREAM = "orders"
async def run() -> None:
config = PortfolioConfig()
+ log = setup_logging("portfolio-manager", config.log_level, config.log_format)
+ metrics = ServiceMetrics("portfolio_manager")
+ notifier = TelegramNotifier(bot_token=config.telegram_bot_token, chat_id=config.telegram_chat_id)
+
broker = RedisBroker(config.redis_url)
tracker = PortfolioTracker()
+ health = HealthCheckServer("portfolio-manager", port=config.health_port + 3)
+ health.register_check("redis", broker.ping)
+ await health.start()
+ metrics.service_up.labels(service="portfolio-manager").set(1)
+
last_id = "$"
- logger.info("Portfolio manager started, listening on stream=%s", ORDERS_STREAM)
+ log.info("service_started", stream=ORDERS_STREAM)
try:
while True:
@@ -31,20 +40,28 @@ async def run() -> None:
if isinstance(event, OrderEvent):
order = event.data
tracker.apply_order(order)
- logger.info(
- "Applied order symbol=%s side=%s qty=%s price=%s",
- order.symbol,
- order.side,
- order.quantity,
- order.price,
+ log.info(
+ "order_applied",
+ symbol=order.symbol,
+ side=str(order.side),
+ quantity=str(order.quantity),
+ price=str(order.price),
)
positions = tracker.get_all_positions()
- logger.info("Current positions count=%d", len(positions))
- except Exception:
- logger.exception("Failed to process message: %s", msg)
+ log.info("positions_updated", count=len(positions))
+ metrics.events_processed.labels(service="portfolio-manager", event_type="order").inc()
+ except Exception as exc:
+ log.exception("message_processing_failed", error=str(exc))
+ metrics.errors_total.labels(service="portfolio-manager", error_type="processing").inc()
# Update last_id to the latest processed message id if broker returns ids
# Since broker.read returns parsed payloads (not ids), we use "$" to get new msgs
+ except Exception as exc:
+ log.error("fatal_error", error=str(exc))
+ await notifier.send_error(str(exc), "portfolio-manager")
+ raise
finally:
+ metrics.service_up.labels(service="portfolio-manager").set(0)
+ await notifier.close()
await broker.close()