Critical: Add ownership checks to all order routes (IDOR fix). High: Redis rate limiting on auth endpoints, HMAC-signed internal service calls replacing header-presence-only checks, nh3 HTML sanitization on ghost_sync and product import, internal auth on market API endpoints, SHA-256 hashed OAuth grant/code tokens. Medium: SECRET_KEY production guard, AP signature enforcement, is_admin param removal, cart_sid validation, SSRF protection on remote actor fetch. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
93 lines
2.8 KiB
Python
93 lines
2.8 KiB
Python
"""HMAC-based authentication for internal service-to-service calls.
|
|
|
|
Replaces the previous header-presence-only check with a signed token
|
|
that includes a timestamp to prevent replay attacks.
|
|
|
|
Signing side (data_client.py / actions.py)::
|
|
|
|
from shared.infrastructure.internal_auth import sign_internal_headers
|
|
headers = sign_internal_headers("cart")
|
|
|
|
Validation side (before_request guards, csrf.py)::
|
|
|
|
from shared.infrastructure.internal_auth import validate_internal_request
|
|
if not validate_internal_request():
|
|
abort(403)
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import hmac
|
|
import os
|
|
import time
|
|
|
|
from quart import request
|
|
|
|
# Shared secret — MUST be set in production
|
|
_SECRET = os.getenv("INTERNAL_HMAC_SECRET", "").encode() or os.getenv("SECRET_KEY", "").encode()
|
|
|
|
# Maximum age of a signed request (seconds)
|
|
_MAX_AGE = 300 # 5 minutes
|
|
|
|
|
|
def _get_secret() -> bytes:
|
|
return _SECRET or os.getenv("SECRET_KEY", "dev-secret-key-change-me-777").encode()
|
|
|
|
|
|
def sign_internal_headers(app_name: str) -> dict[str, str]:
|
|
"""Generate signed headers for an internal request.
|
|
|
|
Returns a dict of headers to include in the request.
|
|
"""
|
|
ts = str(int(time.time()))
|
|
payload = f"{ts}:{app_name}".encode()
|
|
sig = hmac.new(_get_secret(), payload, hashlib.sha256).hexdigest()
|
|
return {
|
|
"X-Internal-Timestamp": ts,
|
|
"X-Internal-App": app_name,
|
|
"X-Internal-Signature": sig,
|
|
}
|
|
|
|
|
|
def validate_internal_request() -> bool:
|
|
"""Validate that an incoming request has a valid HMAC signature.
|
|
|
|
Checks X-Internal-Timestamp, X-Internal-App, and X-Internal-Signature
|
|
headers. Returns True if valid, False otherwise.
|
|
"""
|
|
ts = request.headers.get("X-Internal-Timestamp", "")
|
|
app_name = request.headers.get("X-Internal-App", "")
|
|
sig = request.headers.get("X-Internal-Signature", "")
|
|
|
|
if not ts or not app_name or not sig:
|
|
return False
|
|
|
|
# Check timestamp freshness
|
|
try:
|
|
req_time = int(ts)
|
|
except (ValueError, TypeError):
|
|
return False
|
|
|
|
now = int(time.time())
|
|
if abs(now - req_time) > _MAX_AGE:
|
|
return False
|
|
|
|
# Verify signature
|
|
payload = f"{ts}:{app_name}".encode()
|
|
expected = hmac.new(_get_secret(), payload, hashlib.sha256).hexdigest()
|
|
return hmac.compare_digest(sig, expected)
|
|
|
|
|
|
def is_internal_request() -> bool:
|
|
"""Check if the current request is a signed internal request.
|
|
|
|
This is a convenience that checks for any of the internal headers
|
|
(legacy or new HMAC-signed).
|
|
"""
|
|
# New HMAC-signed headers
|
|
if request.headers.get("X-Internal-Signature"):
|
|
return validate_internal_request()
|
|
# Legacy: presence-only headers (still accepted during migration,
|
|
# but callers should be updated to use signed headers)
|
|
return False
|