Files
rose-ash/shared/infrastructure/internal_auth.py
giles c015f3f02f
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 3m22s
Security audit: fix IDOR, add rate limiting, HMAC auth, token hashing, XSS sanitization
Critical: Add ownership checks to all order routes (IDOR fix).
High: Redis rate limiting on auth endpoints, HMAC-signed internal
service calls replacing header-presence-only checks, nh3 HTML
sanitization on ghost_sync and product import, internal auth on
market API endpoints, SHA-256 hashed OAuth grant/code tokens.
Medium: SECRET_KEY production guard, AP signature enforcement,
is_admin param removal, cart_sid validation, SSRF protection on
remote actor fetch.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 13:30:27 +00:00

93 lines
2.8 KiB
Python

"""HMAC-based authentication for internal service-to-service calls.
Replaces the previous header-presence-only check with a signed token
that includes a timestamp to prevent replay attacks.
Signing side (data_client.py / actions.py)::
from shared.infrastructure.internal_auth import sign_internal_headers
headers = sign_internal_headers("cart")
Validation side (before_request guards, csrf.py)::
from shared.infrastructure.internal_auth import validate_internal_request
if not validate_internal_request():
abort(403)
"""
from __future__ import annotations
import hashlib
import hmac
import os
import time
from quart import request
# Shared secret — MUST be set in production
_SECRET = os.getenv("INTERNAL_HMAC_SECRET", "").encode() or os.getenv("SECRET_KEY", "").encode()
# Maximum age of a signed request (seconds)
_MAX_AGE = 300 # 5 minutes
def _get_secret() -> bytes:
return _SECRET or os.getenv("SECRET_KEY", "dev-secret-key-change-me-777").encode()
def sign_internal_headers(app_name: str) -> dict[str, str]:
"""Generate signed headers for an internal request.
Returns a dict of headers to include in the request.
"""
ts = str(int(time.time()))
payload = f"{ts}:{app_name}".encode()
sig = hmac.new(_get_secret(), payload, hashlib.sha256).hexdigest()
return {
"X-Internal-Timestamp": ts,
"X-Internal-App": app_name,
"X-Internal-Signature": sig,
}
def validate_internal_request() -> bool:
"""Validate that an incoming request has a valid HMAC signature.
Checks X-Internal-Timestamp, X-Internal-App, and X-Internal-Signature
headers. Returns True if valid, False otherwise.
"""
ts = request.headers.get("X-Internal-Timestamp", "")
app_name = request.headers.get("X-Internal-App", "")
sig = request.headers.get("X-Internal-Signature", "")
if not ts or not app_name or not sig:
return False
# Check timestamp freshness
try:
req_time = int(ts)
except (ValueError, TypeError):
return False
now = int(time.time())
if abs(now - req_time) > _MAX_AGE:
return False
# Verify signature
payload = f"{ts}:{app_name}".encode()
expected = hmac.new(_get_secret(), payload, hashlib.sha256).hexdigest()
return hmac.compare_digest(sig, expected)
def is_internal_request() -> bool:
"""Check if the current request is a signed internal request.
This is a convenience that checks for any of the internal headers
(legacy or new HMAC-signed).
"""
# New HMAC-signed headers
if request.headers.get("X-Internal-Signature"):
return validate_internal_request()
# Legacy: presence-only headers (still accepted during migration,
# but callers should be updated to use signed headers)
return False