Monorepo: consolidate 7 repos into one
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m5s

Combines shared, blog, market, cart, events, federation, and account
into a single repository. Eliminates submodule sync, sibling model
copying at build time, and per-app CI orchestration.

Changes:
- Remove per-app .git, .gitmodules, .gitea, submodule shared/ dirs
- Remove stale sibling model copies from each app
- Update all 6 Dockerfiles for monorepo build context (root = .)
- Add build directives to docker-compose.yml
- Add single .gitea/workflows/ci.yml with change detection
- Add .dockerignore for monorepo build context
- Create __init__.py for federation and account (cross-app imports)
This commit is contained in:
giles
2026-02-24 19:44:17 +00:00
commit f42042ccb7
895 changed files with 61147 additions and 0 deletions

View File

@@ -0,0 +1,256 @@
"""
Editor API proxy image/media/file uploads and oembed.
Forwards requests to the Ghost Admin API with JWT auth so the browser
never needs direct Ghost access.
"""
from __future__ import annotations
import logging
import os
import httpx
from quart import Blueprint, request, jsonify, g
from sqlalchemy import select, or_
from shared.browser.app.authz import require_admin, require_login
from models import Snippet
from .ghost_admin_token import make_ghost_admin_jwt
log = logging.getLogger(__name__)
GHOST_ADMIN_API_URL = os.environ["GHOST_ADMIN_API_URL"]
MAX_IMAGE_SIZE = 10 * 1024 * 1024 # 10 MB
MAX_MEDIA_SIZE = 100 * 1024 * 1024 # 100 MB
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50 MB
ALLOWED_IMAGE_MIMETYPES = frozenset({
"image/jpeg", "image/png", "image/gif", "image/webp", "image/svg+xml",
})
ALLOWED_MEDIA_MIMETYPES = frozenset({
"audio/mpeg", "audio/ogg", "audio/wav", "audio/mp4", "audio/aac",
"video/mp4", "video/webm", "video/ogg",
})
editor_api_bp = Blueprint("editor_api", __name__, url_prefix="/editor-api")
def _auth_header() -> dict[str, str]:
return {"Authorization": f"Ghost {make_ghost_admin_jwt()}"}
@editor_api_bp.post("/images/upload/")
@require_admin
async def upload_image():
"""Proxy image upload to Ghost Admin API."""
files = await request.files
uploaded = files.get("file")
if not uploaded:
return jsonify({"errors": [{"message": "No file provided"}]}), 400
content = uploaded.read()
if len(content) > MAX_IMAGE_SIZE:
return jsonify({"errors": [{"message": "File too large (max 10 MB)"}]}), 413
if uploaded.content_type not in ALLOWED_IMAGE_MIMETYPES:
return jsonify({"errors": [{"message": f"Unsupported file type: {uploaded.content_type}"}]}), 415
url = f"{GHOST_ADMIN_API_URL}/images/upload/"
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(
url,
headers=_auth_header(),
files={"file": (uploaded.filename, content, uploaded.content_type)},
)
if not resp.is_success:
log.error("Ghost image upload failed %s: %s", resp.status_code, resp.text[:500])
return resp.json(), resp.status_code
@editor_api_bp.post("/media/upload/")
@require_admin
async def upload_media():
"""Proxy audio/video upload to Ghost Admin API."""
files = await request.files
uploaded = files.get("file")
if not uploaded:
return jsonify({"errors": [{"message": "No file provided"}]}), 400
content = uploaded.read()
if len(content) > MAX_MEDIA_SIZE:
return jsonify({"errors": [{"message": "File too large (max 100 MB)"}]}), 413
if uploaded.content_type not in ALLOWED_MEDIA_MIMETYPES:
return jsonify({"errors": [{"message": f"Unsupported media type: {uploaded.content_type}"}]}), 415
ghost_files = {"file": (uploaded.filename, content, uploaded.content_type)}
# Optional video thumbnail
thumbnail = files.get("thumbnail")
if thumbnail:
thumb_content = thumbnail.read()
ghost_files["thumbnail"] = (thumbnail.filename, thumb_content, thumbnail.content_type)
url = f"{GHOST_ADMIN_API_URL}/media/upload/"
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.post(url, headers=_auth_header(), files=ghost_files)
if not resp.is_success:
log.error("Ghost media upload failed %s: %s", resp.status_code, resp.text[:500])
return resp.json(), resp.status_code
@editor_api_bp.post("/files/upload/")
@require_admin
async def upload_file():
"""Proxy file upload to Ghost Admin API."""
files = await request.files
uploaded = files.get("file")
if not uploaded:
return jsonify({"errors": [{"message": "No file provided"}]}), 400
content = uploaded.read()
if len(content) > MAX_FILE_SIZE:
return jsonify({"errors": [{"message": "File too large (max 50 MB)"}]}), 413
url = f"{GHOST_ADMIN_API_URL}/files/upload/"
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.post(
url,
headers=_auth_header(),
files={"file": (uploaded.filename, content, uploaded.content_type)},
)
if not resp.is_success:
log.error("Ghost file upload failed %s: %s", resp.status_code, resp.text[:500])
return resp.json(), resp.status_code
@editor_api_bp.get("/oembed/")
@require_admin
async def oembed_proxy():
"""Proxy oembed lookups to Ghost Admin API."""
params = dict(request.args)
if not params.get("url"):
return jsonify({"errors": [{"message": "url parameter required"}]}), 400
url = f"{GHOST_ADMIN_API_URL}/oembed/"
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url, headers=_auth_header(), params=params)
if not resp.is_success:
log.error("Ghost oembed failed %s: %s", resp.status_code, resp.text[:500])
return resp.json(), resp.status_code
# ── Snippets ────────────────────────────────────────────────────────
VALID_VISIBILITY = frozenset({"private", "shared", "admin"})
@editor_api_bp.get("/snippets/")
@require_login
async def list_snippets():
"""Return snippets visible to the current user."""
uid = g.user.id
is_admin = g.rights.get("admin")
filters = [Snippet.user_id == uid, Snippet.visibility == "shared"]
if is_admin:
filters.append(Snippet.visibility == "admin")
rows = (await g.s.execute(
select(Snippet).where(or_(*filters)).order_by(Snippet.name)
)).scalars().all()
return jsonify([
{"id": s.id, "name": s.name, "value": s.value, "visibility": s.visibility}
for s in rows
])
@editor_api_bp.post("/snippets/")
@require_login
async def create_snippet():
"""Create or upsert a snippet by (user_id, name)."""
data = await request.get_json(force=True)
name = (data.get("name") or "").strip()
value = data.get("value")
visibility = data.get("visibility", "private")
if not name or value is None:
return jsonify({"error": "name and value are required"}), 400
if visibility not in VALID_VISIBILITY:
return jsonify({"error": f"visibility must be one of {sorted(VALID_VISIBILITY)}"}), 400
if visibility != "private" and not g.rights.get("admin"):
visibility = "private"
uid = g.user.id
existing = (await g.s.execute(
select(Snippet).where(Snippet.user_id == uid, Snippet.name == name)
)).scalar_one_or_none()
if existing:
existing.value = value
existing.visibility = visibility
snippet = existing
else:
snippet = Snippet(user_id=uid, name=name, value=value, visibility=visibility)
g.s.add(snippet)
await g.s.flush()
return jsonify({
"id": snippet.id, "name": snippet.name,
"value": snippet.value, "visibility": snippet.visibility,
}), 200 if existing else 201
@editor_api_bp.patch("/snippets/<int:snippet_id>/")
@require_login
async def patch_snippet(snippet_id: int):
"""Update snippet visibility. Only admins may set shared/admin."""
snippet = await g.s.get(Snippet, snippet_id)
if not snippet:
return jsonify({"error": "not found"}), 404
is_admin = g.rights.get("admin")
if snippet.user_id != g.user.id and not is_admin:
return jsonify({"error": "forbidden"}), 403
data = await request.get_json(force=True)
visibility = data.get("visibility")
if visibility is not None:
if visibility not in VALID_VISIBILITY:
return jsonify({"error": f"visibility must be one of {sorted(VALID_VISIBILITY)}"}), 400
if visibility != "private" and not is_admin:
return jsonify({"error": "only admins may set shared/admin visibility"}), 403
snippet.visibility = visibility
await g.s.flush()
return jsonify({
"id": snippet.id, "name": snippet.name,
"value": snippet.value, "visibility": snippet.visibility,
})
@editor_api_bp.delete("/snippets/<int:snippet_id>/")
@require_login
async def delete_snippet(snippet_id: int):
"""Delete a snippet. Owners can delete their own; admins can delete any."""
snippet = await g.s.get(Snippet, snippet_id)
if not snippet:
return jsonify({"error": "not found"}), 404
if snippet.user_id != g.user.id and not g.rights.get("admin"):
return jsonify({"error": "forbidden"}), 403
await g.s.delete(snippet)
await g.s.flush()
return jsonify({"ok": True})

View File

@@ -0,0 +1,46 @@
import os
import time
import jwt # PyJWT
from typing import Tuple
def _split_key(raw_key: str) -> Tuple[str, bytes]:
"""
raw_key is the 'id:secret' from Ghost.
Returns (id, secret_bytes)
"""
key_id, key_secret_hex = raw_key.split(':', 1)
secret_bytes = bytes.fromhex(key_secret_hex)
return key_id, secret_bytes
def make_ghost_admin_jwt() -> str:
"""
Generate a short-lived JWT suitable for Authorization: Ghost <token>
"""
raw_key = os.environ["GHOST_ADMIN_API_KEY"]
key_id, secret_bytes = _split_key(raw_key)
now = int(time.time())
payload = {
"iat": now,
"exp": now + 5 * 60, # now + 5 minutes
"aud": "/admin/",
}
headers = {
"alg": "HS256",
"kid": key_id,
"typ": "JWT",
}
token = jwt.encode(
payload,
secret_bytes,
algorithm="HS256",
headers=headers,
)
# PyJWT returns str in recent versions; Ghost expects bare token string
return token

View File

@@ -0,0 +1,204 @@
"""
Ghost Admin API post CRUD.
Uses the same JWT auth and httpx patterns as ghost_sync.py.
"""
from __future__ import annotations
import logging
import os
import httpx
from .ghost_admin_token import make_ghost_admin_jwt
log = logging.getLogger(__name__)
GHOST_ADMIN_API_URL = os.environ["GHOST_ADMIN_API_URL"]
def _auth_header() -> dict[str, str]:
return {"Authorization": f"Ghost {make_ghost_admin_jwt()}"}
def _check(resp: httpx.Response) -> None:
"""Raise with the Ghost error body so callers see what went wrong."""
if resp.is_success:
return
body = resp.text[:2000]
log.error("Ghost API %s %s%s: %s", resp.request.method, resp.request.url, resp.status_code, body)
resp.raise_for_status()
async def get_post_for_edit(ghost_id: str, *, is_page: bool = False) -> dict | None:
"""Fetch a single post/page by Ghost ID, including lexical source."""
resource = "pages" if is_page else "posts"
url = (
f"{GHOST_ADMIN_API_URL}/{resource}/{ghost_id}/"
"?formats=lexical,html,mobiledoc&include=newsletters"
)
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url, headers=_auth_header())
if resp.status_code == 404:
return None
_check(resp)
return resp.json()[resource][0]
async def create_post(
title: str,
lexical_json: str,
status: str = "draft",
feature_image: str | None = None,
custom_excerpt: str | None = None,
feature_image_caption: str | None = None,
) -> dict:
"""Create a new post in Ghost. Returns the created post dict."""
post_body: dict = {
"title": title,
"lexical": lexical_json,
"mobiledoc": None,
"status": status,
}
if feature_image:
post_body["feature_image"] = feature_image
if custom_excerpt:
post_body["custom_excerpt"] = custom_excerpt
if feature_image_caption is not None:
post_body["feature_image_caption"] = feature_image_caption
payload = {"posts": [post_body]}
url = f"{GHOST_ADMIN_API_URL}/posts/"
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(url, json=payload, headers=_auth_header())
_check(resp)
return resp.json()["posts"][0]
async def create_page(
title: str,
lexical_json: str,
status: str = "draft",
feature_image: str | None = None,
custom_excerpt: str | None = None,
feature_image_caption: str | None = None,
) -> dict:
"""Create a new page in Ghost (via /pages/ endpoint). Returns the created page dict."""
page_body: dict = {
"title": title,
"lexical": lexical_json,
"mobiledoc": None,
"status": status,
}
if feature_image:
page_body["feature_image"] = feature_image
if custom_excerpt:
page_body["custom_excerpt"] = custom_excerpt
if feature_image_caption is not None:
page_body["feature_image_caption"] = feature_image_caption
payload = {"pages": [page_body]}
url = f"{GHOST_ADMIN_API_URL}/pages/"
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(url, json=payload, headers=_auth_header())
_check(resp)
return resp.json()["pages"][0]
async def update_post(
ghost_id: str,
lexical_json: str,
title: str | None,
updated_at: str,
feature_image: str | None = None,
custom_excerpt: str | None = None,
feature_image_caption: str | None = None,
status: str | None = None,
newsletter_slug: str | None = None,
email_segment: str | None = None,
email_only: bool | None = None,
is_page: bool = False,
) -> dict:
"""Update an existing Ghost post. Returns the updated post dict.
``updated_at`` is Ghost's optimistic-locking token pass the value
you received from ``get_post_for_edit``.
When ``newsletter_slug`` is set the publish request also triggers an
email send via Ghost's query-parameter API:
``?newsletter={slug}&email_segment={segment}``.
"""
post_body: dict = {
"lexical": lexical_json,
"mobiledoc": None,
"updated_at": updated_at,
}
if title is not None:
post_body["title"] = title
if feature_image is not None:
post_body["feature_image"] = feature_image or None
if custom_excerpt is not None:
post_body["custom_excerpt"] = custom_excerpt or None
if feature_image_caption is not None:
post_body["feature_image_caption"] = feature_image_caption
if status is not None:
post_body["status"] = status
if email_only:
post_body["email_only"] = True
resource = "pages" if is_page else "posts"
payload = {resource: [post_body]}
url = f"{GHOST_ADMIN_API_URL}/{resource}/{ghost_id}/"
if newsletter_slug:
url += f"?newsletter={newsletter_slug}"
if email_segment:
url += f"&email_segment={email_segment}"
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.put(url, json=payload, headers=_auth_header())
_check(resp)
return resp.json()[resource][0]
_SETTINGS_FIELDS = (
"slug",
"published_at",
"featured",
"visibility",
"email_only",
"custom_template",
"meta_title",
"meta_description",
"canonical_url",
"og_image",
"og_title",
"og_description",
"twitter_image",
"twitter_title",
"twitter_description",
"tags",
"feature_image_alt",
)
async def update_post_settings(
ghost_id: str,
updated_at: str,
is_page: bool = False,
**kwargs,
) -> dict:
"""Update Ghost post/page settings (slug, tags, SEO, social, etc.).
Only non-None keyword args are included in the PUT payload.
Accepts any key from ``_SETTINGS_FIELDS``.
"""
resource = "pages" if is_page else "posts"
post_body: dict = {"updated_at": updated_at}
for key in _SETTINGS_FIELDS:
val = kwargs.get(key)
if val is not None:
post_body[key] = val
payload = {resource: [post_body]}
url = f"{GHOST_ADMIN_API_URL}/{resource}/{ghost_id}/"
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.put(url, json=payload, headers=_auth_header())
_check(resp)
return resp.json()[resource][0]

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,668 @@
"""
Lexical JSON → HTML renderer.
Produces HTML matching Ghost's ``kg-*`` class conventions so the existing
``cards.css`` stylesheet works unchanged.
Public API
----------
render_lexical(doc) Lexical JSON (dict or string) → HTML string
"""
from __future__ import annotations
import html
import json
from typing import Callable
import mistune
# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
_RENDERERS: dict[str, Callable[[dict], str]] = {}
def _renderer(node_type: str):
"""Decorator — register a function as the renderer for *node_type*."""
def decorator(fn: Callable[[dict], str]) -> Callable[[dict], str]:
_RENDERERS[node_type] = fn
return fn
return decorator
# ---------------------------------------------------------------------------
# Public entry point
# ---------------------------------------------------------------------------
def render_lexical(doc: dict | str) -> str:
"""Render a Lexical JSON document to an HTML string."""
if isinstance(doc, str):
doc = json.loads(doc)
root = doc.get("root", doc)
return _render_children(root.get("children", []))
# ---------------------------------------------------------------------------
# Core dispatch
# ---------------------------------------------------------------------------
def _render_node(node: dict) -> str:
node_type = node.get("type", "")
renderer = _RENDERERS.get(node_type)
if renderer:
return renderer(node)
return ""
def _render_children(children: list[dict]) -> str:
return "".join(_render_node(c) for c in children)
# ---------------------------------------------------------------------------
# Text formatting
# ---------------------------------------------------------------------------
# Lexical format bitmask
_FORMAT_BOLD = 1
_FORMAT_ITALIC = 2
_FORMAT_STRIKETHROUGH = 4
_FORMAT_UNDERLINE = 8
_FORMAT_CODE = 16
_FORMAT_SUBSCRIPT = 32
_FORMAT_SUPERSCRIPT = 64
_FORMAT_HIGHLIGHT = 128
_FORMAT_TAGS: list[tuple[int, str, str]] = [
(_FORMAT_BOLD, "<strong>", "</strong>"),
(_FORMAT_ITALIC, "<em>", "</em>"),
(_FORMAT_STRIKETHROUGH, "<s>", "</s>"),
(_FORMAT_UNDERLINE, "<u>", "</u>"),
(_FORMAT_CODE, "<code>", "</code>"),
(_FORMAT_SUBSCRIPT, "<sub>", "</sub>"),
(_FORMAT_SUPERSCRIPT, "<sup>", "</sup>"),
(_FORMAT_HIGHLIGHT, "<mark>", "</mark>"),
]
# Element-level alignment from ``format`` field
_ALIGN_MAP = {
1: "text-align: left",
2: "text-align: center",
3: "text-align: right",
4: "text-align: justify",
}
def _align_style(node: dict) -> str:
fmt = node.get("format")
if isinstance(fmt, int) and fmt in _ALIGN_MAP:
return f' style="{_ALIGN_MAP[fmt]}"'
if isinstance(fmt, str) and fmt:
return f' style="text-align: {fmt}"'
return ""
def _wrap_format(text: str, fmt: int) -> str:
for mask, open_tag, close_tag in _FORMAT_TAGS:
if fmt & mask:
text = f"{open_tag}{text}{close_tag}"
return text
# ---------------------------------------------------------------------------
# Tier 1 — text nodes
# ---------------------------------------------------------------------------
@_renderer("text")
def _text(node: dict) -> str:
text = html.escape(node.get("text", ""))
fmt = node.get("format", 0)
if isinstance(fmt, int) and fmt:
text = _wrap_format(text, fmt)
return text
@_renderer("linebreak")
def _linebreak(_node: dict) -> str:
return "<br>"
@_renderer("tab")
def _tab(_node: dict) -> str:
return "\t"
@_renderer("paragraph")
def _paragraph(node: dict) -> str:
inner = _render_children(node.get("children", []))
if not inner:
inner = "<br>"
style = _align_style(node)
return f"<p{style}>{inner}</p>"
@_renderer("extended-text")
def _extended_text(node: dict) -> str:
return _paragraph(node)
@_renderer("heading")
def _heading(node: dict) -> str:
tag = node.get("tag", "h2")
inner = _render_children(node.get("children", []))
style = _align_style(node)
return f"<{tag}{style}>{inner}</{tag}>"
@_renderer("extended-heading")
def _extended_heading(node: dict) -> str:
return _heading(node)
@_renderer("quote")
def _quote(node: dict) -> str:
inner = _render_children(node.get("children", []))
return f"<blockquote>{inner}</blockquote>"
@_renderer("extended-quote")
def _extended_quote(node: dict) -> str:
return _quote(node)
@_renderer("aside")
def _aside(node: dict) -> str:
inner = _render_children(node.get("children", []))
return f"<aside>{inner}</aside>"
@_renderer("link")
def _link(node: dict) -> str:
href = html.escape(node.get("url", ""), quote=True)
target = node.get("target", "")
rel = node.get("rel", "")
inner = _render_children(node.get("children", []))
attrs = f' href="{href}"'
if target:
attrs += f' target="{html.escape(target, quote=True)}"'
if rel:
attrs += f' rel="{html.escape(rel, quote=True)}"'
return f"<a{attrs}>{inner}</a>"
@_renderer("autolink")
def _autolink(node: dict) -> str:
return _link(node)
@_renderer("at-link")
def _at_link(node: dict) -> str:
return _link(node)
@_renderer("list")
def _list(node: dict) -> str:
tag = "ol" if node.get("listType") == "number" else "ul"
start = node.get("start")
inner = _render_children(node.get("children", []))
attrs = ""
if tag == "ol" and start and start != 1:
attrs = f' start="{start}"'
return f"<{tag}{attrs}>{inner}</{tag}>"
@_renderer("listitem")
def _listitem(node: dict) -> str:
inner = _render_children(node.get("children", []))
return f"<li>{inner}</li>"
@_renderer("horizontalrule")
def _horizontalrule(_node: dict) -> str:
return "<hr>"
@_renderer("code")
def _code(node: dict) -> str:
# Inline code nodes from Lexical — just render inner text
inner = _render_children(node.get("children", []))
return f"<code>{inner}</code>"
@_renderer("codeblock")
def _codeblock(node: dict) -> str:
lang = node.get("language", "")
code = html.escape(node.get("code", ""))
cls = f' class="language-{html.escape(lang)}"' if lang else ""
return f'<pre><code{cls}>{code}</code></pre>'
@_renderer("code-highlight")
def _code_highlight(node: dict) -> str:
text = html.escape(node.get("text", ""))
highlight_type = node.get("highlightType", "")
if highlight_type:
return f'<span class="token {html.escape(highlight_type)}">{text}</span>'
return text
# ---------------------------------------------------------------------------
# Tier 2 — common cards
# ---------------------------------------------------------------------------
@_renderer("image")
def _image(node: dict) -> str:
src = node.get("src", "")
alt = node.get("alt", "")
caption = node.get("caption", "")
width = node.get("cardWidth", "") or node.get("width", "")
href = node.get("href", "")
width_class = ""
if width == "wide":
width_class = " kg-width-wide"
elif width == "full":
width_class = " kg-width-full"
img_tag = f'<img src="{html.escape(src, quote=True)}" alt="{html.escape(alt, quote=True)}" loading="lazy">'
if href:
img_tag = f'<a href="{html.escape(href, quote=True)}">{img_tag}</a>'
parts = [f'<figure class="kg-card kg-image-card{width_class}">']
parts.append(img_tag)
if caption:
parts.append(f"<figcaption>{caption}</figcaption>")
parts.append("</figure>")
return "".join(parts)
@_renderer("gallery")
def _gallery(node: dict) -> str:
images = node.get("images", [])
if not images:
return ""
rows = []
for i in range(0, len(images), 3):
row_imgs = images[i:i + 3]
row_cls = f"kg-gallery-row" if len(row_imgs) <= 3 else "kg-gallery-row"
imgs_html = []
for img in row_imgs:
src = img.get("src", "")
alt = img.get("alt", "")
caption = img.get("caption", "")
img_tag = f'<img src="{html.escape(src, quote=True)}" alt="{html.escape(alt, quote=True)}" loading="lazy">'
fig = f'<figure class="kg-gallery-image">{img_tag}'
if caption:
fig += f"<figcaption>{caption}</figcaption>"
fig += "</figure>"
imgs_html.append(fig)
rows.append(f'<div class="{row_cls}">{"".join(imgs_html)}</div>')
caption = node.get("caption", "")
caption_html = f"<figcaption>{caption}</figcaption>" if caption else ""
return (
f'<figure class="kg-card kg-gallery-card kg-width-wide">'
f'<div class="kg-gallery-container">{"".join(rows)}</div>'
f"{caption_html}</figure>"
)
@_renderer("html")
def _html_card(node: dict) -> str:
raw = node.get("html", "")
return f"<!--kg-card-begin: html-->{raw}<!--kg-card-end: html-->"
@_renderer("markdown")
def _markdown(node: dict) -> str:
md_text = node.get("markdown", "")
rendered = mistune.html(md_text)
return f"<!--kg-card-begin: markdown-->{rendered}<!--kg-card-end: markdown-->"
@_renderer("embed")
def _embed(node: dict) -> str:
embed_html = node.get("html", "")
caption = node.get("caption", "")
url = node.get("url", "")
caption_html = f"<figcaption>{caption}</figcaption>" if caption else ""
return (
f'<figure class="kg-card kg-embed-card">'
f"{embed_html}{caption_html}</figure>"
)
@_renderer("bookmark")
def _bookmark(node: dict) -> str:
url = node.get("url", "")
title = html.escape(node.get("metadata", {}).get("title", "") or node.get("title", ""))
description = html.escape(node.get("metadata", {}).get("description", "") or node.get("description", ""))
icon = node.get("metadata", {}).get("icon", "") or node.get("icon", "")
author = html.escape(node.get("metadata", {}).get("author", "") or node.get("author", ""))
publisher = html.escape(node.get("metadata", {}).get("publisher", "") or node.get("publisher", ""))
thumbnail = node.get("metadata", {}).get("thumbnail", "") or node.get("thumbnail", "")
caption = node.get("caption", "")
icon_html = f'<img class="kg-bookmark-icon" src="{html.escape(icon, quote=True)}" alt="">' if icon else ""
thumbnail_html = (
f'<div class="kg-bookmark-thumbnail">'
f'<img src="{html.escape(thumbnail, quote=True)}" alt=""></div>'
) if thumbnail else ""
meta_parts = []
if icon_html:
meta_parts.append(icon_html)
if author:
meta_parts.append(f'<span class="kg-bookmark-author">{author}</span>')
if publisher:
meta_parts.append(f'<span class="kg-bookmark-publisher">{publisher}</span>')
metadata_html = f'<span class="kg-bookmark-metadata">{"".join(meta_parts)}</span>' if meta_parts else ""
caption_html = f"<figcaption>{caption}</figcaption>" if caption else ""
return (
f'<figure class="kg-card kg-bookmark-card">'
f'<a class="kg-bookmark-container" href="{html.escape(url, quote=True)}">'
f'<div class="kg-bookmark-content">'
f'<div class="kg-bookmark-title">{title}</div>'
f'<div class="kg-bookmark-description">{description}</div>'
f'{metadata_html}'
f'</div>'
f'{thumbnail_html}'
f'</a>'
f'{caption_html}'
f'</figure>'
)
@_renderer("callout")
def _callout(node: dict) -> str:
color = node.get("backgroundColor", "grey")
emoji = node.get("calloutEmoji", "")
inner = _render_children(node.get("children", []))
emoji_html = f'<div class="kg-callout-emoji">{emoji}</div>' if emoji else ""
return (
f'<div class="kg-card kg-callout-card kg-callout-card-{html.escape(color)}">'
f'{emoji_html}'
f'<div class="kg-callout-text">{inner}</div>'
f'</div>'
)
@_renderer("button")
def _button(node: dict) -> str:
text = html.escape(node.get("buttonText", ""))
url = html.escape(node.get("buttonUrl", ""), quote=True)
alignment = node.get("alignment", "center")
return (
f'<div class="kg-card kg-button-card kg-align-{alignment}">'
f'<a href="{url}" class="kg-btn kg-btn-accent">{text}</a>'
f'</div>'
)
@_renderer("toggle")
def _toggle(node: dict) -> str:
heading = node.get("heading", "")
# Toggle content is in children
inner = _render_children(node.get("children", []))
return (
f'<div class="kg-card kg-toggle-card" data-kg-toggle-state="close">'
f'<div class="kg-toggle-heading">'
f'<h4 class="kg-toggle-heading-text">{heading}</h4>'
f'<button class="kg-toggle-card-icon">'
f'<svg viewBox="0 0 14 14"><path d="M7 0a.5.5 0 0 1 .5.5v6h6a.5.5 0 1 1 0 1h-6v6a.5.5 0 1 1-1 0v-6h-6a.5.5 0 0 1 0-1h6v-6A.5.5 0 0 1 7 0Z" fill="currentColor"/></svg>'
f'</button>'
f'</div>'
f'<div class="kg-toggle-content">{inner}</div>'
f'</div>'
)
# ---------------------------------------------------------------------------
# Tier 3 — media & remaining cards
# ---------------------------------------------------------------------------
@_renderer("audio")
def _audio(node: dict) -> str:
src = node.get("src", "")
title = html.escape(node.get("title", ""))
duration = node.get("duration", 0)
thumbnail = node.get("thumbnailSrc", "")
duration_min = int(duration) // 60
duration_sec = int(duration) % 60
duration_str = f"{duration_min}:{duration_sec:02d}"
if thumbnail:
thumb_html = (
f'<img src="{html.escape(thumbnail, quote=True)}" alt="audio-thumbnail" '
f'class="kg-audio-thumbnail">'
)
else:
thumb_html = (
'<div class="kg-audio-thumbnail placeholder">'
'<svg viewBox="0 0 24 24"><path d="M2 12C2 6.48 6.48 2 12 2s10 4.48 10 10-4.48 10-10 10S2 17.52 2 12zm7.5 5.25L16 12 9.5 6.75v10.5z" fill="currentColor"/></svg>'
'</div>'
)
return (
f'<div class="kg-card kg-audio-card">'
f'{thumb_html}'
f'<div class="kg-audio-player-container">'
f'<div class="kg-audio-title">{title}</div>'
f'<div class="kg-audio-player">'
f'<button class="kg-audio-play-icon"><svg viewBox="0 0 24 24"><path d="M8 5v14l11-7z" fill="currentColor"/></svg></button>'
f'<div class="kg-audio-current-time">0:00</div>'
f'<div class="kg-audio-time">/ {duration_str}</div>'
f'<input type="range" class="kg-audio-seek-slider" max="100" value="0">'
f'<button class="kg-audio-playback-rate">1&#215;</button>'
f'<button class="kg-audio-unmute-icon"><svg viewBox="0 0 24 24"><path d="M3 9v6h4l5 5V4L7 9H3zm13.5 3c0-1.77-1.02-3.29-2.5-4.03v8.05c1.48-.73 2.5-2.25 2.5-4.02zM14 3.23v2.06c2.89.86 5 3.54 5 6.71s-2.11 5.85-5 6.71v2.06c4.01-.91 7-4.49 7-8.77s-2.99-7.86-7-8.77z" fill="currentColor"/></svg></button>'
f'<input type="range" class="kg-audio-volume-slider" max="100" value="100">'
f'</div>'
f'</div>'
f'<audio src="{html.escape(src, quote=True)}" preload="metadata"></audio>'
f'</div>'
)
@_renderer("video")
def _video(node: dict) -> str:
src = node.get("src", "")
caption = node.get("caption", "")
width = node.get("cardWidth", "")
thumbnail = node.get("thumbnailSrc", "") or node.get("customThumbnailSrc", "")
loop = node.get("loop", False)
width_class = ""
if width == "wide":
width_class = " kg-width-wide"
elif width == "full":
width_class = " kg-width-full"
loop_attr = " loop" if loop else ""
poster_attr = f' poster="{html.escape(thumbnail, quote=True)}"' if thumbnail else ""
caption_html = f"<figcaption>{caption}</figcaption>" if caption else ""
return (
f'<figure class="kg-card kg-video-card{width_class}">'
f'<div class="kg-video-container">'
f'<video src="{html.escape(src, quote=True)}" controls preload="metadata"{poster_attr}{loop_attr}></video>'
f'</div>'
f'{caption_html}'
f'</figure>'
)
@_renderer("file")
def _file(node: dict) -> str:
src = node.get("src", "")
title = html.escape(node.get("fileName", "") or node.get("title", ""))
caption = node.get("caption", "")
file_size = node.get("fileSize", 0)
file_name = html.escape(node.get("fileName", ""))
# Format size
if file_size:
kb = file_size / 1024
if kb < 1024:
size_str = f"{kb:.0f} KB"
else:
size_str = f"{kb / 1024:.1f} MB"
else:
size_str = ""
caption_html = f'<div class="kg-file-card-caption">{caption}</div>' if caption else ""
size_html = f'<div class="kg-file-card-filesize">{size_str}</div>' if size_str else ""
return (
f'<div class="kg-card kg-file-card">'
f'<a class="kg-file-card-container" href="{html.escape(src, quote=True)}" download="{file_name}">'
f'<div class="kg-file-card-contents">'
f'<div class="kg-file-card-title">{title}</div>'
f'{size_html}'
f'</div>'
f'<div class="kg-file-card-icon">'
f'<svg viewBox="0 0 24 24"><path d="M19 9h-4V3H9v6H5l7 7 7-7zM5 18v2h14v-2H5z" fill="currentColor"/></svg>'
f'</div>'
f'</a>'
f'{caption_html}'
f'</div>'
)
@_renderer("paywall")
def _paywall(_node: dict) -> str:
return "<!--members-only-->"
@_renderer("header")
def _header(node: dict) -> str:
heading = node.get("heading", "")
subheading = node.get("subheading", "")
size = node.get("size", "small")
style = node.get("style", "dark")
bg_image = node.get("backgroundImageSrc", "")
button_text = node.get("buttonText", "")
button_url = node.get("buttonUrl", "")
bg_style = f' style="background-image: url({html.escape(bg_image, quote=True)})"' if bg_image else ""
heading_html = f"<h2>{heading}</h2>" if heading else ""
subheading_html = f"<p>{subheading}</p>" if subheading else ""
button_html = (
f'<a class="kg-header-card-button" href="{html.escape(button_url, quote=True)}">{html.escape(button_text)}</a>'
if button_text and button_url else ""
)
return (
f'<div class="kg-card kg-header-card kg-style-{html.escape(style)} kg-size-{html.escape(size)}"{bg_style}>'
f'{heading_html}{subheading_html}{button_html}'
f'</div>'
)
@_renderer("signup")
def _signup(node: dict) -> str:
heading = node.get("heading", "")
subheading = node.get("subheading", "")
disclaimer = node.get("disclaimer", "")
button_text = html.escape(node.get("buttonText", "Subscribe"))
button_color = node.get("buttonColor", "")
bg_color = node.get("backgroundColor", "")
bg_image = node.get("backgroundImageSrc", "")
style = node.get("style", "dark")
bg_style_parts = []
if bg_color:
bg_style_parts.append(f"background-color: {bg_color}")
if bg_image:
bg_style_parts.append(f"background-image: url({html.escape(bg_image, quote=True)})")
style_attr = f' style="{"; ".join(bg_style_parts)}"' if bg_style_parts else ""
heading_html = f"<h2>{heading}</h2>" if heading else ""
subheading_html = f"<p>{subheading}</p>" if subheading else ""
disclaimer_html = f'<p class="kg-signup-card-disclaimer">{disclaimer}</p>' if disclaimer else ""
btn_style = f' style="background-color: {button_color}"' if button_color else ""
return (
f'<div class="kg-card kg-signup-card kg-style-{html.escape(style)}"{style_attr}>'
f'{heading_html}{subheading_html}'
f'<form class="kg-signup-card-form" data-members-form>'
f'<input type="email" placeholder="Your email" required>'
f'<button type="submit" class="kg-signup-card-button"{btn_style}>{button_text}</button>'
f'</form>'
f'{disclaimer_html}'
f'</div>'
)
@_renderer("product")
def _product(node: dict) -> str:
title = html.escape(node.get("productTitle", "") or node.get("title", ""))
description = node.get("productDescription", "") or node.get("description", "")
img_src = node.get("productImageSrc", "")
button_text = html.escape(node.get("buttonText", ""))
button_url = node.get("buttonUrl", "")
rating = node.get("rating", 0)
img_html = (
f'<img class="kg-product-card-image" src="{html.escape(img_src, quote=True)}" alt="">'
if img_src else ""
)
button_html = (
f'<a class="kg-product-card-button kg-btn kg-btn-accent" href="{html.escape(button_url, quote=True)}">{button_text}</a>'
if button_text and button_url else ""
)
stars = ""
if rating:
active = int(rating)
stars_html = []
for i in range(5):
cls = "kg-product-card-rating-active" if i < active else ""
stars_html.append(
f'<svg class="kg-product-card-rating-star {cls}" viewBox="0 0 24 24">'
f'<path d="M12 .587l3.668 7.568 8.332 1.151-6.064 5.828 1.48 8.279L12 19.771l-7.416 3.642 1.48-8.279L0 9.306l8.332-1.151z" fill="currentColor"/>'
f'</svg>'
)
stars = f'<div class="kg-product-card-rating">{"".join(stars_html)}</div>'
return (
f'<div class="kg-card kg-product-card">'
f'{img_html}'
f'<div class="kg-product-card-container">'
f'<h4 class="kg-product-card-title">{title}</h4>'
f'{stars}'
f'<div class="kg-product-card-description">{description}</div>'
f'{button_html}'
f'</div>'
f'</div>'
)
@_renderer("email")
def _email(node: dict) -> str:
raw_html = node.get("html", "")
return f"<!--kg-card-begin: email-->{raw_html}<!--kg-card-end: email-->"
@_renderer("email-cta")
def _email_cta(node: dict) -> str:
raw_html = node.get("html", "")
return f"<!--kg-card-begin: email-cta-->{raw_html}<!--kg-card-end: email-cta-->"
@_renderer("call-to-action")
def _call_to_action(node: dict) -> str:
raw_html = node.get("html", "")
sponsor_label = node.get("sponsorLabel", "")
label_html = (
f'<span class="kg-cta-sponsor-label">{html.escape(sponsor_label)}</span>'
if sponsor_label else ""
)
return (
f'<div class="kg-card kg-cta-card">'
f'{label_html}{raw_html}'
f'</div>'
)

View File

@@ -0,0 +1,86 @@
"""
Server-side validation for Lexical editor JSON.
Walk the document tree and reject any node whose ``type`` is not in
ALLOWED_NODE_TYPES. This is a belt-and-braces check: the Lexical
client already restricts which nodes can be created, but we validate
server-side too.
"""
from __future__ import annotations
ALLOWED_NODE_TYPES: frozenset[str] = frozenset(
{
# Standard Lexical nodes
"root",
"paragraph",
"heading",
"quote",
"list",
"listitem",
"link",
"autolink",
"code",
"code-highlight",
"linebreak",
"text",
"horizontalrule",
"image",
"tab",
# Ghost "extended-*" variants
"extended-text",
"extended-heading",
"extended-quote",
# Ghost card types
"html",
"gallery",
"embed",
"bookmark",
"markdown",
"email",
"email-cta",
"button",
"callout",
"toggle",
"video",
"audio",
"file",
"product",
"header",
"signup",
"aside",
"codeblock",
"call-to-action",
"at-link",
"paywall",
}
)
def validate_lexical(doc: dict) -> tuple[bool, str | None]:
"""Recursively validate a Lexical JSON document.
Returns ``(True, None)`` when the document is valid, or
``(False, reason)`` when an unknown node type is found.
"""
if not isinstance(doc, dict):
return False, "Document must be a JSON object"
root = doc.get("root")
if not isinstance(root, dict):
return False, "Document must contain a 'root' object"
return _walk(root)
def _walk(node: dict) -> tuple[bool, str | None]:
node_type = node.get("type")
if node_type is not None and node_type not in ALLOWED_NODE_TYPES:
return False, f"Disallowed node type: {node_type}"
for child in node.get("children", []):
if isinstance(child, dict):
ok, reason = _walk(child)
if not ok:
return False, reason
return True, None