Track 1.1 of master plan: expand from sexp-only tests to cover DTOs, HTTP signatures, HMAC auth, URL utilities, Jinja filters, calendar helpers, config freeze, activity bus registry, parse utilities, sexp helpers, error classes, and jinja bridge render API. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
115 lines
4.4 KiB
Python
115 lines
4.4 KiB
Python
"""Tests for HMAC-based internal service-to-service authentication."""
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import hmac
|
|
import time
|
|
from unittest.mock import patch
|
|
|
|
from shared.infrastructure.internal_auth import sign_internal_headers
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# sign_internal_headers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestSignInternalHeaders:
|
|
def test_returns_required_headers(self):
|
|
headers = sign_internal_headers("cart")
|
|
assert "X-Internal-Timestamp" in headers
|
|
assert "X-Internal-App" in headers
|
|
assert "X-Internal-Signature" in headers
|
|
|
|
def test_app_name_in_header(self):
|
|
headers = sign_internal_headers("blog")
|
|
assert headers["X-Internal-App"] == "blog"
|
|
|
|
def test_timestamp_is_recent(self):
|
|
headers = sign_internal_headers("events")
|
|
ts = int(headers["X-Internal-Timestamp"])
|
|
now = int(time.time())
|
|
assert abs(now - ts) < 5
|
|
|
|
def test_signature_is_hex(self):
|
|
headers = sign_internal_headers("cart")
|
|
sig = headers["X-Internal-Signature"]
|
|
# SHA-256 hex is 64 chars
|
|
assert len(sig) == 64
|
|
int(sig, 16) # should not raise
|
|
|
|
def test_different_apps_different_signatures(self):
|
|
h1 = sign_internal_headers("cart")
|
|
h2 = sign_internal_headers("blog")
|
|
assert h1["X-Internal-Signature"] != h2["X-Internal-Signature"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Round-trip: sign then validate
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestSignAndValidate:
|
|
"""Test the HMAC signing logic directly without needing a Quart request context."""
|
|
|
|
def _validate_headers(self, headers: dict[str, str], secret: bytes, max_age: int = 300) -> bool:
|
|
"""Replicate validate_internal_request logic without Quart request context."""
|
|
ts = headers.get("X-Internal-Timestamp", "")
|
|
app_name = headers.get("X-Internal-App", "")
|
|
sig = headers.get("X-Internal-Signature", "")
|
|
|
|
if not ts or not app_name or not sig:
|
|
return False
|
|
|
|
try:
|
|
req_time = int(ts)
|
|
except (ValueError, TypeError):
|
|
return False
|
|
|
|
now = int(time.time())
|
|
if abs(now - req_time) > max_age:
|
|
return False
|
|
|
|
payload = f"{ts}:{app_name}".encode()
|
|
expected = hmac.new(secret, payload, hashlib.sha256).hexdigest()
|
|
return hmac.compare_digest(sig, expected)
|
|
|
|
def test_valid_signature(self):
|
|
from shared.infrastructure.internal_auth import _get_secret
|
|
secret = _get_secret()
|
|
headers = sign_internal_headers("relations")
|
|
assert self._validate_headers(headers, secret) is True
|
|
|
|
def test_tampered_signature_fails(self):
|
|
from shared.infrastructure.internal_auth import _get_secret
|
|
secret = _get_secret()
|
|
headers = sign_internal_headers("cart")
|
|
headers["X-Internal-Signature"] = "0" * 64
|
|
assert self._validate_headers(headers, secret) is False
|
|
|
|
def test_wrong_secret_fails(self):
|
|
headers = sign_internal_headers("cart")
|
|
assert self._validate_headers(headers, b"wrong-secret") is False
|
|
|
|
def test_expired_timestamp_fails(self):
|
|
from shared.infrastructure.internal_auth import _get_secret
|
|
secret = _get_secret()
|
|
headers = sign_internal_headers("cart")
|
|
# Set timestamp to 10 minutes ago
|
|
old_ts = str(int(time.time()) - 600)
|
|
headers["X-Internal-Timestamp"] = old_ts
|
|
# Re-sign with old timestamp (so the signature matches the old ts)
|
|
payload = f"{old_ts}:cart".encode()
|
|
headers["X-Internal-Signature"] = hmac.new(secret, payload, hashlib.sha256).hexdigest()
|
|
assert self._validate_headers(headers, secret) is False
|
|
|
|
def test_missing_headers_fail(self):
|
|
from shared.infrastructure.internal_auth import _get_secret
|
|
secret = _get_secret()
|
|
assert self._validate_headers({}, secret) is False
|
|
assert self._validate_headers({"X-Internal-Timestamp": "123"}, secret) is False
|
|
|
|
def test_invalid_timestamp_fails(self):
|
|
from shared.infrastructure.internal_auth import _get_secret
|
|
secret = _get_secret()
|
|
headers = {"X-Internal-Timestamp": "not-a-number", "X-Internal-App": "cart", "X-Internal-Signature": "abc"}
|
|
assert self._validate_headers(headers, secret) is False
|