diff options
Diffstat (limited to 'shared/src')
| -rw-r--r-- | shared/src/shared/healthcheck.py | 76 | ||||
| -rw-r--r-- | shared/src/shared/metrics.py | 41 |
2 files changed, 117 insertions, 0 deletions
diff --git a/shared/src/shared/healthcheck.py b/shared/src/shared/healthcheck.py new file mode 100644 index 0000000..8294294 --- /dev/null +++ b/shared/src/shared/healthcheck.py @@ -0,0 +1,76 @@ +"""Health check HTTP server with Prometheus metrics endpoint.""" +from __future__ import annotations + +import time +from typing import Any, Callable, Awaitable + +from aiohttp import web +from prometheus_client import CollectorRegistry, REGISTRY, generate_latest, CONTENT_TYPE_LATEST + + +class HealthCheckServer: + """Lightweight aiohttp server exposing /health and /metrics.""" + + def __init__( + self, + service_name: str, + port: int = 8080, + *, + registry: CollectorRegistry | None = None, + ) -> None: + self.service_name = service_name + self.port = port + self._checks: dict[str, Callable[[], Awaitable[bool]]] = {} + self._start_time = time.monotonic() + self._registry = registry or REGISTRY + + def register_check(self, name: str, check_fn: Callable[[], Awaitable[bool]]) -> None: + """Register a named async health check function.""" + self._checks[name] = check_fn + + async def run_checks(self) -> dict[str, Any]: + """Execute all registered checks and return a status dict.""" + checks: dict[str, str] = {} + all_ok = True + + for name, fn in self._checks.items(): + try: + result = await fn() + if result: + checks[name] = "ok" + else: + checks[name] = "fail" + all_ok = False + except Exception as exc: + checks[name] = f"fail: {exc}" + all_ok = False + + return { + "status": "ok" if all_ok else "degraded", + "service": self.service_name, + "uptime_seconds": round(time.monotonic() - self._start_time, 2), + "checks": checks, + } + + async def _handle_health(self, request: web.Request) -> web.Response: + """GET /health — JSON health status.""" + result = await self.run_checks() + status_code = 200 if result["status"] == "ok" else 503 + return web.json_response(result, status=status_code) + + async def _handle_metrics(self, request: web.Request) -> web.Response: + """GET /metrics — Prometheus text exposition.""" + output = generate_latest(self._registry) + return web.Response(body=output, content_type=CONTENT_TYPE_LATEST) + + async def start(self) -> web.AppRunner: + """Create and start the aiohttp application, returning the runner.""" + app = web.Application() + app.router.add_get("/health", self._handle_health) + app.router.add_get("/metrics", self._handle_metrics) + + runner = web.AppRunner(app) + await runner.setup() + site = web.TCPSite(runner, "0.0.0.0", self.port) + await site.start() + return runner diff --git a/shared/src/shared/metrics.py b/shared/src/shared/metrics.py new file mode 100644 index 0000000..3b00c5d --- /dev/null +++ b/shared/src/shared/metrics.py @@ -0,0 +1,41 @@ +"""Prometheus metrics for trading platform services.""" +from __future__ import annotations + +from prometheus_client import Counter, Gauge, Histogram, CollectorRegistry, REGISTRY + + +class ServiceMetrics: + """Creates Prometheus metrics with a service-name prefix.""" + + def __init__(self, service_name: str, *, registry: CollectorRegistry | None = None) -> None: + self.service_name = service_name.replace("-", "_") + reg = registry or REGISTRY + prefix = self.service_name + + self.errors_total = Counter( + f"{prefix}_errors_total", + "Total error count", + labelnames=["service", "error_type"], + registry=reg, + ) + + self.events_processed = Counter( + f"{prefix}_events_processed_total", + "Total events processed", + labelnames=["service", "event_type"], + registry=reg, + ) + + self.processing_seconds = Histogram( + f"{prefix}_processing_seconds", + "Processing duration in seconds", + labelnames=["service"], + registry=reg, + ) + + self.service_up = Gauge( + f"{prefix}_service_up", + "Whether the service is up (1) or down (0)", + labelnames=["service"], + registry=reg, + ) |
