summaryrefslogtreecommitdiff
path: root/shared/src
diff options
context:
space:
mode:
Diffstat (limited to 'shared/src')
-rw-r--r--shared/src/shared/config.py1
-rw-r--r--shared/src/shared/healthcheck.py13
2 files changed, 14 insertions, 0 deletions
diff --git a/shared/src/shared/config.py b/shared/src/shared/config.py
index 47bc2b1..7b34d78 100644
--- a/shared/src/shared/config.py
+++ b/shared/src/shared/config.py
@@ -20,5 +20,6 @@ class Settings(BaseSettings):
health_port: int = 8080
circuit_breaker_threshold: int = 5
circuit_breaker_timeout: int = 60
+ metrics_auth_token: str = "" # If set, /health and /metrics require Bearer token
model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}
diff --git a/shared/src/shared/healthcheck.py b/shared/src/shared/healthcheck.py
index be02712..7411e8a 100644
--- a/shared/src/shared/healthcheck.py
+++ b/shared/src/shared/healthcheck.py
@@ -17,12 +17,14 @@ class HealthCheckServer:
service_name: str,
port: int = 8080,
*,
+ auth_token: str = "",
registry: CollectorRegistry | None = None,
) -> None:
self.service_name = service_name
self.port = port
self._checks: dict[str, Callable[[], Awaitable[bool]]] = {}
self._start_time = time.monotonic()
+ self._auth_token = auth_token
self._registry = registry or REGISTRY
def register_check(self, name: str, check_fn: Callable[[], Awaitable[bool]]) -> None:
@@ -53,14 +55,25 @@ class HealthCheckServer:
"checks": checks,
}
+ def _check_auth(self, request: web.Request) -> bool:
+ """Return True if the request is authorised (or no token is configured)."""
+ if not self._auth_token:
+ return True
+ auth = request.headers.get("Authorization", "")
+ return auth == f"Bearer {self._auth_token}"
+
async def _handle_health(self, request: web.Request) -> web.Response:
"""GET /health — JSON health status."""
+ if not self._check_auth(request):
+ return web.json_response({"error": "unauthorized"}, status=401)
result = await self.run_checks()
status_code = 200 if result["status"] == "ok" else 503
return web.json_response(result, status=status_code)
async def _handle_metrics(self, request: web.Request) -> web.Response:
"""GET /metrics — Prometheus text exposition."""
+ if not self._check_auth(request):
+ return web.Response(text="Unauthorized", status=401)
output = generate_latest(self._registry)
return web.Response(body=output, content_type=CONTENT_TYPE_LATEST)