Security audit: fix IDOR, add rate limiting, HMAC auth, token hashing, XSS sanitization
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 3m22s

Critical: Add ownership checks to all order routes (IDOR fix).
High: Redis rate limiting on auth endpoints, HMAC-signed internal
service calls replacing header-presence-only checks, nh3 HTML
sanitization on ghost_sync and product import, internal auth on
market API endpoints, SHA-256 hashed OAuth grant/code tokens.
Medium: SECRET_KEY production guard, AP signature enforcement,
is_admin param removal, cart_sid validation, SSRF protection on
remote actor fetch.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-26 13:30:27 +00:00
parent 404449fcab
commit c015f3f02f
27 changed files with 607 additions and 33 deletions

View File

@@ -17,6 +17,9 @@ def register() -> Blueprint:
async def _require_action_header():
if not request.headers.get(ACTION_HEADER):
return jsonify({"error": "forbidden"}), 403
from shared.infrastructure.internal_auth import validate_internal_request
if not validate_internal_request():
return jsonify({"error": "forbidden"}), 403
_handlers: dict[str, object] = {}

View File

@@ -15,6 +15,7 @@ from html import escape as html_escape
from typing import Dict, Any, Optional
import httpx
import nh3
from sqlalchemy import select, delete
from sqlalchemy.ext.asyncio import AsyncSession
@@ -29,6 +30,35 @@ GHOST_ADMIN_API_URL = os.environ["GHOST_ADMIN_API_URL"]
from shared.browser.app.utils import utcnow
def _sanitize_html(html: str | None) -> str | None:
"""Sanitize HTML content using nh3, allowing safe formatting tags."""
if not html:
return html
return nh3.clean(
html,
tags={
"a", "abbr", "acronym", "b", "blockquote", "br", "code",
"div", "em", "figcaption", "figure", "h1", "h2", "h3",
"h4", "h5", "h6", "hr", "i", "img", "li", "ol", "p",
"pre", "span", "strong", "sub", "sup", "table", "tbody",
"td", "th", "thead", "tr", "ul", "video", "source",
"picture", "iframe", "audio",
},
attributes={
"*": {"class", "id", "style"},
"a": {"href", "title", "target", "rel"},
"img": {"src", "alt", "title", "width", "height", "loading"},
"video": {"src", "controls", "width", "height", "poster"},
"audio": {"src", "controls"},
"source": {"src", "type"},
"iframe": {"src", "width", "height", "frameborder", "allowfullscreen"},
"td": {"colspan", "rowspan"},
"th": {"colspan", "rowspan"},
},
url_schemes={"http", "https", "mailto"},
)
def _auth_header() -> dict[str, str]:
return {"Authorization": f"Ghost {make_ghost_admin_jwt()}"}
@@ -99,13 +129,13 @@ def _apply_ghost_fields(obj: Post, gp: Dict[str, Any], author_map: Dict[str, Aut
obj.uuid = gp.get("uuid") or obj.uuid
obj.slug = gp.get("slug") or obj.slug
obj.title = gp.get("title") or obj.title
obj.html = gp.get("html")
obj.html = _sanitize_html(gp.get("html"))
obj.plaintext = gp.get("plaintext")
obj.mobiledoc = gp.get("mobiledoc")
obj.lexical = gp.get("lexical")
obj.feature_image = gp.get("feature_image")
obj.feature_image_alt = gp.get("feature_image_alt")
obj.feature_image_caption = gp.get("feature_image_caption")
obj.feature_image_caption = _sanitize_html(gp.get("feature_image_caption"))
obj.excerpt = gp.get("excerpt")
obj.custom_excerpt = gp.get("custom_excerpt")
obj.visibility = gp.get("visibility") or obj.visibility

View File

@@ -35,6 +35,9 @@ def register() -> Blueprint:
async def _require_data_header():
if not request.headers.get(DATA_HEADER):
return jsonify({"error": "forbidden"}), 403
from shared.infrastructure.internal_auth import validate_internal_request
if not validate_internal_request():
return jsonify({"error": "forbidden"}), 403
_handlers: dict[str, object] = {}