"""Tests for HMAC-based internal service-to-service authentication.""" from __future__ import annotations import hashlib import hmac import time from unittest.mock import patch from shared.infrastructure.internal_auth import sign_internal_headers # --------------------------------------------------------------------------- # sign_internal_headers # --------------------------------------------------------------------------- class TestSignInternalHeaders: def test_returns_required_headers(self): headers = sign_internal_headers("cart") assert "X-Internal-Timestamp" in headers assert "X-Internal-App" in headers assert "X-Internal-Signature" in headers def test_app_name_in_header(self): headers = sign_internal_headers("blog") assert headers["X-Internal-App"] == "blog" def test_timestamp_is_recent(self): headers = sign_internal_headers("events") ts = int(headers["X-Internal-Timestamp"]) now = int(time.time()) assert abs(now - ts) < 5 def test_signature_is_hex(self): headers = sign_internal_headers("cart") sig = headers["X-Internal-Signature"] # SHA-256 hex is 64 chars assert len(sig) == 64 int(sig, 16) # should not raise def test_different_apps_different_signatures(self): h1 = sign_internal_headers("cart") h2 = sign_internal_headers("blog") assert h1["X-Internal-Signature"] != h2["X-Internal-Signature"] # --------------------------------------------------------------------------- # Round-trip: sign then validate # --------------------------------------------------------------------------- class TestSignAndValidate: """Test the HMAC signing logic directly without needing a Quart request context.""" def _validate_headers(self, headers: dict[str, str], secret: bytes, max_age: int = 300) -> bool: """Replicate validate_internal_request logic without Quart request context.""" ts = headers.get("X-Internal-Timestamp", "") app_name = headers.get("X-Internal-App", "") sig = headers.get("X-Internal-Signature", "") if not ts or not app_name or not sig: return False try: req_time = int(ts) except (ValueError, TypeError): return False now = int(time.time()) if abs(now - req_time) > max_age: return False payload = f"{ts}:{app_name}".encode() expected = hmac.new(secret, payload, hashlib.sha256).hexdigest() return hmac.compare_digest(sig, expected) def test_valid_signature(self): from shared.infrastructure.internal_auth import _get_secret secret = _get_secret() headers = sign_internal_headers("relations") assert self._validate_headers(headers, secret) is True def test_tampered_signature_fails(self): from shared.infrastructure.internal_auth import _get_secret secret = _get_secret() headers = sign_internal_headers("cart") headers["X-Internal-Signature"] = "0" * 64 assert self._validate_headers(headers, secret) is False def test_wrong_secret_fails(self): headers = sign_internal_headers("cart") assert self._validate_headers(headers, b"wrong-secret") is False def test_expired_timestamp_fails(self): from shared.infrastructure.internal_auth import _get_secret secret = _get_secret() headers = sign_internal_headers("cart") # Set timestamp to 10 minutes ago old_ts = str(int(time.time()) - 600) headers["X-Internal-Timestamp"] = old_ts # Re-sign with old timestamp (so the signature matches the old ts) payload = f"{old_ts}:cart".encode() headers["X-Internal-Signature"] = hmac.new(secret, payload, hashlib.sha256).hexdigest() assert self._validate_headers(headers, secret) is False def test_missing_headers_fail(self): from shared.infrastructure.internal_auth import _get_secret secret = _get_secret() assert self._validate_headers({}, secret) is False assert self._validate_headers({"X-Internal-Timestamp": "123"}, secret) is False def test_invalid_timestamp_fails(self): from shared.infrastructure.internal_auth import _get_secret secret = _get_secret() headers = {"X-Internal-Timestamp": "not-a-number", "X-Internal-App": "cart", "X-Internal-Signature": "abc"} assert self._validate_headers(headers, secret) is False