"""HMAC-based authentication for internal service-to-service calls. Replaces the previous header-presence-only check with a signed token that includes a timestamp to prevent replay attacks. Signing side (data_client.py / actions.py):: from shared.infrastructure.internal_auth import sign_internal_headers headers = sign_internal_headers("cart") Validation side (before_request guards, csrf.py):: from shared.infrastructure.internal_auth import validate_internal_request if not validate_internal_request(): abort(403) """ from __future__ import annotations import hashlib import hmac import os import time from quart import request # Shared secret — MUST be set in production _SECRET = os.getenv("INTERNAL_HMAC_SECRET", "").encode() or os.getenv("SECRET_KEY", "").encode() # Maximum age of a signed request (seconds) _MAX_AGE = 300 # 5 minutes def _get_secret() -> bytes: return _SECRET or os.getenv("SECRET_KEY", "dev-secret-key-change-me-777").encode() def sign_internal_headers(app_name: str) -> dict[str, str]: """Generate signed headers for an internal request. Returns a dict of headers to include in the request. """ ts = str(int(time.time())) payload = f"{ts}:{app_name}".encode() sig = hmac.new(_get_secret(), payload, hashlib.sha256).hexdigest() return { "X-Internal-Timestamp": ts, "X-Internal-App": app_name, "X-Internal-Signature": sig, } def validate_internal_request() -> bool: """Validate that an incoming request has a valid HMAC signature. Checks X-Internal-Timestamp, X-Internal-App, and X-Internal-Signature headers. Returns True if valid, False otherwise. """ ts = request.headers.get("X-Internal-Timestamp", "") app_name = request.headers.get("X-Internal-App", "") sig = request.headers.get("X-Internal-Signature", "") if not ts or not app_name or not sig: return False # Check timestamp freshness try: req_time = int(ts) except (ValueError, TypeError): return False now = int(time.time()) if abs(now - req_time) > _MAX_AGE: return False # Verify signature payload = f"{ts}:{app_name}".encode() expected = hmac.new(_get_secret(), payload, hashlib.sha256).hexdigest() return hmac.compare_digest(sig, expected) def is_internal_request() -> bool: """Check if the current request is a signed internal request. This is a convenience that checks for any of the internal headers (legacy or new HMAC-signed). """ # New HMAC-signed headers if request.headers.get("X-Internal-Signature"): return validate_internal_request() # Legacy: presence-only headers (still accepted during migration, # but callers should be updated to use signed headers) return False