diff options
Diffstat (limited to 'shared/tests/test_healthcheck.py')
| -rw-r--r-- | shared/tests/test_healthcheck.py | 42 |
1 files changed, 40 insertions, 2 deletions
diff --git a/shared/tests/test_healthcheck.py b/shared/tests/test_healthcheck.py index 6970a8f..2f79757 100644 --- a/shared/tests/test_healthcheck.py +++ b/shared/tests/test_healthcheck.py @@ -1,6 +1,9 @@ """Tests for health check server.""" +from unittest.mock import MagicMock + import pytest +from multidict import CIMultiDict from prometheus_client import CollectorRegistry @@ -9,10 +12,17 @@ def registry(): return CollectorRegistry() -def make_server(service_name="test-service", port=8080, registry=None): +def make_server(service_name="test-service", port=8080, registry=None, auth_token=""): from shared.healthcheck import HealthCheckServer - return HealthCheckServer(service_name, port=port, registry=registry) + return HealthCheckServer(service_name, port=port, auth_token=auth_token, registry=registry) + + +def _fake_request(headers: dict | None = None) -> MagicMock: + """Create a minimal mock that quacks like aiohttp.web.Request.""" + req = MagicMock() + req.headers = CIMultiDict(headers or {}) + return req def test_init_defaults(registry): @@ -87,3 +97,31 @@ async def test_run_checks_false_is_fail(registry): result = await server.run_checks() assert result["status"] == "degraded" assert result["checks"]["cache"] == "fail" + + +# ── Bearer-token auth tests ──────────────────────────────────────── + + +def test_healthcheck_no_auth_when_token_empty(registry): + """When auth_token is empty, all requests pass auth regardless of headers.""" + server = make_server(registry=registry, auth_token="") + assert server._check_auth(_fake_request()) is True + assert server._check_auth(_fake_request({"Authorization": "Bearer wrong"})) is True + + +def test_healthcheck_auth_required_when_token_set(registry): + """When auth_token is set, a matching Bearer header passes auth.""" + server = make_server(registry=registry, auth_token="s3cret") + req = _fake_request({"Authorization": "Bearer s3cret"}) + assert server._check_auth(req) is True + + +def test_healthcheck_rejects_wrong_token(registry): + """When auth_token is set, a wrong or missing Bearer header is rejected.""" + server = make_server(registry=registry, auth_token="s3cret") + # Wrong token + assert server._check_auth(_fake_request({"Authorization": "Bearer bad"})) is False + # Missing header entirely + assert server._check_auth(_fake_request()) is False + # Malformed header + assert server._check_auth(_fake_request({"Authorization": "Token s3cret"})) is False |
