summaryrefslogtreecommitdiff
path: root/shared/src
diff options
context:
space:
mode:
Diffstat (limited to 'shared/src')
-rw-r--r--shared/src/shared/healthcheck.py76
-rw-r--r--shared/src/shared/metrics.py41
2 files changed, 117 insertions, 0 deletions
diff --git a/shared/src/shared/healthcheck.py b/shared/src/shared/healthcheck.py
new file mode 100644
index 0000000..8294294
--- /dev/null
+++ b/shared/src/shared/healthcheck.py
@@ -0,0 +1,76 @@
+"""Health check HTTP server with Prometheus metrics endpoint."""
+from __future__ import annotations
+
+import time
+from typing import Any, Callable, Awaitable
+
+from aiohttp import web
+from prometheus_client import CollectorRegistry, REGISTRY, generate_latest, CONTENT_TYPE_LATEST
+
+
+class HealthCheckServer:
+ """Lightweight aiohttp server exposing /health and /metrics."""
+
+ def __init__(
+ self,
+ service_name: str,
+ port: int = 8080,
+ *,
+ registry: CollectorRegistry | None = None,
+ ) -> None:
+ self.service_name = service_name
+ self.port = port
+ self._checks: dict[str, Callable[[], Awaitable[bool]]] = {}
+ self._start_time = time.monotonic()
+ self._registry = registry or REGISTRY
+
+ def register_check(self, name: str, check_fn: Callable[[], Awaitable[bool]]) -> None:
+ """Register a named async health check function."""
+ self._checks[name] = check_fn
+
+ async def run_checks(self) -> dict[str, Any]:
+ """Execute all registered checks and return a status dict."""
+ checks: dict[str, str] = {}
+ all_ok = True
+
+ for name, fn in self._checks.items():
+ try:
+ result = await fn()
+ if result:
+ checks[name] = "ok"
+ else:
+ checks[name] = "fail"
+ all_ok = False
+ except Exception as exc:
+ checks[name] = f"fail: {exc}"
+ all_ok = False
+
+ return {
+ "status": "ok" if all_ok else "degraded",
+ "service": self.service_name,
+ "uptime_seconds": round(time.monotonic() - self._start_time, 2),
+ "checks": checks,
+ }
+
+ async def _handle_health(self, request: web.Request) -> web.Response:
+ """GET /health — JSON health status."""
+ result = await self.run_checks()
+ status_code = 200 if result["status"] == "ok" else 503
+ return web.json_response(result, status=status_code)
+
+ async def _handle_metrics(self, request: web.Request) -> web.Response:
+ """GET /metrics — Prometheus text exposition."""
+ output = generate_latest(self._registry)
+ return web.Response(body=output, content_type=CONTENT_TYPE_LATEST)
+
+ async def start(self) -> web.AppRunner:
+ """Create and start the aiohttp application, returning the runner."""
+ app = web.Application()
+ app.router.add_get("/health", self._handle_health)
+ app.router.add_get("/metrics", self._handle_metrics)
+
+ runner = web.AppRunner(app)
+ await runner.setup()
+ site = web.TCPSite(runner, "0.0.0.0", self.port)
+ await site.start()
+ return runner
diff --git a/shared/src/shared/metrics.py b/shared/src/shared/metrics.py
new file mode 100644
index 0000000..3b00c5d
--- /dev/null
+++ b/shared/src/shared/metrics.py
@@ -0,0 +1,41 @@
+"""Prometheus metrics for trading platform services."""
+from __future__ import annotations
+
+from prometheus_client import Counter, Gauge, Histogram, CollectorRegistry, REGISTRY
+
+
+class ServiceMetrics:
+ """Creates Prometheus metrics with a service-name prefix."""
+
+ def __init__(self, service_name: str, *, registry: CollectorRegistry | None = None) -> None:
+ self.service_name = service_name.replace("-", "_")
+ reg = registry or REGISTRY
+ prefix = self.service_name
+
+ self.errors_total = Counter(
+ f"{prefix}_errors_total",
+ "Total error count",
+ labelnames=["service", "error_type"],
+ registry=reg,
+ )
+
+ self.events_processed = Counter(
+ f"{prefix}_events_processed_total",
+ "Total events processed",
+ labelnames=["service", "event_type"],
+ registry=reg,
+ )
+
+ self.processing_seconds = Histogram(
+ f"{prefix}_processing_seconds",
+ "Processing duration in seconds",
+ labelnames=["service"],
+ registry=reg,
+ )
+
+ self.service_up = Gauge(
+ f"{prefix}_service_up",
+ "Whether the service is up (1) or down (0)",
+ labelnames=["service"],
+ registry=reg,
+ )