Files
rose-ash/shared/tests/test_internal_auth.py
giles 00efbc2a35 Add unit test coverage for shared pure-logic modules (240 tests)
Track 1.1 of master plan: expand from sexp-only tests to cover
DTOs, HTTP signatures, HMAC auth, URL utilities, Jinja filters,
calendar helpers, config freeze, activity bus registry, parse
utilities, sexp helpers, error classes, and jinja bridge render API.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 19:34:37 +00:00

115 lines
4.4 KiB
Python

"""Tests for HMAC-based internal service-to-service authentication."""
from __future__ import annotations
import hashlib
import hmac
import time
from unittest.mock import patch
from shared.infrastructure.internal_auth import sign_internal_headers
# ---------------------------------------------------------------------------
# sign_internal_headers
# ---------------------------------------------------------------------------
class TestSignInternalHeaders:
def test_returns_required_headers(self):
headers = sign_internal_headers("cart")
assert "X-Internal-Timestamp" in headers
assert "X-Internal-App" in headers
assert "X-Internal-Signature" in headers
def test_app_name_in_header(self):
headers = sign_internal_headers("blog")
assert headers["X-Internal-App"] == "blog"
def test_timestamp_is_recent(self):
headers = sign_internal_headers("events")
ts = int(headers["X-Internal-Timestamp"])
now = int(time.time())
assert abs(now - ts) < 5
def test_signature_is_hex(self):
headers = sign_internal_headers("cart")
sig = headers["X-Internal-Signature"]
# SHA-256 hex is 64 chars
assert len(sig) == 64
int(sig, 16) # should not raise
def test_different_apps_different_signatures(self):
h1 = sign_internal_headers("cart")
h2 = sign_internal_headers("blog")
assert h1["X-Internal-Signature"] != h2["X-Internal-Signature"]
# ---------------------------------------------------------------------------
# Round-trip: sign then validate
# ---------------------------------------------------------------------------
class TestSignAndValidate:
"""Test the HMAC signing logic directly without needing a Quart request context."""
def _validate_headers(self, headers: dict[str, str], secret: bytes, max_age: int = 300) -> bool:
"""Replicate validate_internal_request logic without Quart request context."""
ts = headers.get("X-Internal-Timestamp", "")
app_name = headers.get("X-Internal-App", "")
sig = headers.get("X-Internal-Signature", "")
if not ts or not app_name or not sig:
return False
try:
req_time = int(ts)
except (ValueError, TypeError):
return False
now = int(time.time())
if abs(now - req_time) > max_age:
return False
payload = f"{ts}:{app_name}".encode()
expected = hmac.new(secret, payload, hashlib.sha256).hexdigest()
return hmac.compare_digest(sig, expected)
def test_valid_signature(self):
from shared.infrastructure.internal_auth import _get_secret
secret = _get_secret()
headers = sign_internal_headers("relations")
assert self._validate_headers(headers, secret) is True
def test_tampered_signature_fails(self):
from shared.infrastructure.internal_auth import _get_secret
secret = _get_secret()
headers = sign_internal_headers("cart")
headers["X-Internal-Signature"] = "0" * 64
assert self._validate_headers(headers, secret) is False
def test_wrong_secret_fails(self):
headers = sign_internal_headers("cart")
assert self._validate_headers(headers, b"wrong-secret") is False
def test_expired_timestamp_fails(self):
from shared.infrastructure.internal_auth import _get_secret
secret = _get_secret()
headers = sign_internal_headers("cart")
# Set timestamp to 10 minutes ago
old_ts = str(int(time.time()) - 600)
headers["X-Internal-Timestamp"] = old_ts
# Re-sign with old timestamp (so the signature matches the old ts)
payload = f"{old_ts}:cart".encode()
headers["X-Internal-Signature"] = hmac.new(secret, payload, hashlib.sha256).hexdigest()
assert self._validate_headers(headers, secret) is False
def test_missing_headers_fail(self):
from shared.infrastructure.internal_auth import _get_secret
secret = _get_secret()
assert self._validate_headers({}, secret) is False
assert self._validate_headers({"X-Internal-Timestamp": "123"}, secret) is False
def test_invalid_timestamp_fails(self):
from shared.infrastructure.internal_auth import _get_secret
secret = _get_secret()
headers = {"X-Internal-Timestamp": "not-a-number", "X-Internal-App": "cart", "X-Internal-Signature": "abc"}
assert self._validate_headers(headers, secret) is False