- Settings form: ~135 lines raw HTML → ~blog-settings-form-content defcomp - Data introspection: ~110 lines raw HTML → ~blog-data-table-content with recursive ~blog-data-model-content defcomps, Python extracts ORM data only - Preview: sx_call composition → ~blog-preview-content defcomp - Entries browser: ~65 lines raw HTML → ~blog-entries-browser-content + ~blog-calendar-browser-item + ~blog-associated-entries-from-data defcomps - Editor panels: sx_call composition in both helpers.py and renders.py → ~blog-editor-content and ~blog-edit-content composition defcomps - renders.py: 178 → 25 lines (87% reduction) - routes.py _render_associated_entries: data extraction → single sx_call Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
714 lines
29 KiB
Python
714 lines
29 KiB
Python
"""Blog page helpers — async functions available in .sx defpage expressions."""
|
|
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shared hydration helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _add_to_defpage_ctx(**kwargs: Any) -> None:
|
|
from quart import g
|
|
if not hasattr(g, '_defpage_ctx'):
|
|
g._defpage_ctx = {}
|
|
g._defpage_ctx.update(kwargs)
|
|
|
|
|
|
async def _ensure_post_data(slug: str | None) -> None:
|
|
"""Load post data and set g.post_data + defpage context.
|
|
|
|
Replicates post bp's hydrate_post_data + context_processor.
|
|
"""
|
|
from quart import g, abort
|
|
|
|
if hasattr(g, 'post_data') and g.post_data:
|
|
await _inject_post_context(g.post_data)
|
|
return
|
|
|
|
if not slug:
|
|
abort(404)
|
|
|
|
from bp.post.services.post_data import post_data
|
|
|
|
is_admin = bool((g.get("rights") or {}).get("admin"))
|
|
p_data = await post_data(slug, g.s, include_drafts=True)
|
|
if not p_data:
|
|
abort(404)
|
|
|
|
# Draft access control
|
|
if p_data["post"].get("status") != "published":
|
|
if is_admin:
|
|
pass
|
|
elif g.user and p_data["post"].get("user_id") == g.user.id:
|
|
pass
|
|
else:
|
|
abort(404)
|
|
|
|
g.post_data = p_data
|
|
g.post_slug = slug
|
|
await _inject_post_context(p_data)
|
|
|
|
|
|
async def _inject_post_context(p_data: dict) -> None:
|
|
"""Add post context_processor data to defpage context."""
|
|
from shared.config import config
|
|
from shared.infrastructure.fragments import fetch_fragment
|
|
from shared.infrastructure.data_client import fetch_data
|
|
from shared.contracts.dtos import CartSummaryDTO, dto_from_dict
|
|
from shared.infrastructure.cart_identity import current_cart_identity
|
|
|
|
db_post_id = p_data["post"]["id"]
|
|
post_slug = p_data["post"]["slug"]
|
|
|
|
container_nav = await fetch_fragment("relations", "container-nav", params={
|
|
"container_type": "page",
|
|
"container_id": str(db_post_id),
|
|
"post_slug": post_slug,
|
|
})
|
|
|
|
ctx: dict = {
|
|
**p_data,
|
|
"base_title": config()["title"],
|
|
"container_nav": container_nav,
|
|
}
|
|
|
|
if p_data["post"].get("is_page"):
|
|
ident = current_cart_identity()
|
|
summary_params: dict = {"page_slug": post_slug}
|
|
if ident["user_id"] is not None:
|
|
summary_params["user_id"] = ident["user_id"]
|
|
if ident["session_id"] is not None:
|
|
summary_params["session_id"] = ident["session_id"]
|
|
raw_summary = await fetch_data(
|
|
"cart", "cart-summary", params=summary_params, required=False,
|
|
)
|
|
page_summary = dto_from_dict(CartSummaryDTO, raw_summary) if raw_summary else CartSummaryDTO()
|
|
ctx["page_cart_count"] = (
|
|
page_summary.count + page_summary.calendar_count + page_summary.ticket_count
|
|
)
|
|
ctx["page_cart_total"] = float(
|
|
page_summary.total + page_summary.calendar_total + page_summary.ticket_total
|
|
)
|
|
|
|
_add_to_defpage_ctx(**ctx)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Page helpers (async functions available in .sx defpage expressions)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _register_blog_helpers() -> None:
|
|
from shared.sx.pages import register_page_helpers
|
|
register_page_helpers("blog", {
|
|
"editor-content": _h_editor_content,
|
|
"editor-page-content": _h_editor_page_content,
|
|
"post-admin-content": _h_post_admin_content,
|
|
"post-data-content": _h_post_data_content,
|
|
"post-preview-content": _h_post_preview_content,
|
|
"post-entries-content": _h_post_entries_content,
|
|
"post-settings-content": _h_post_settings_content,
|
|
"post-edit-content": _h_post_edit_content,
|
|
})
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Editor helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _editor_init_js(urls: dict, *, form_id: str = "post-edit-form",
|
|
has_initial_json: bool = True) -> str:
|
|
"""Build the editor initialization JavaScript string.
|
|
|
|
URLs dict must contain: upload_image, upload_media, upload_file, oembed,
|
|
snippets, unsplash_key.
|
|
"""
|
|
font_size_preamble = (
|
|
"(function() {"
|
|
" function applyEditorFontSize() {"
|
|
" document.documentElement.style.fontSize = '62.5%';"
|
|
" document.body.style.fontSize = '1.6rem';"
|
|
" }"
|
|
" function restoreDefaultFontSize() {"
|
|
" document.documentElement.style.fontSize = '';"
|
|
" document.body.style.fontSize = '';"
|
|
" }"
|
|
" applyEditorFontSize();"
|
|
" document.body.addEventListener('htmx:beforeSwap', function cleanup(e) {"
|
|
" if (e.detail.target && e.detail.target.id === 'main-panel') {"
|
|
" restoreDefaultFontSize();"
|
|
" document.body.removeEventListener('htmx:beforeSwap', cleanup);"
|
|
" }"
|
|
" });"
|
|
)
|
|
|
|
upload_image = urls["upload_image"]
|
|
upload_media = urls["upload_media"]
|
|
upload_file = urls["upload_file"]
|
|
oembed = urls["oembed"]
|
|
unsplash_key = urls["unsplash_key"]
|
|
snippets = urls["snippets"]
|
|
|
|
init_body = (
|
|
" function init() {"
|
|
" var csrfToken = document.querySelector('input[name=\"csrf_token\"]').value;"
|
|
f" var uploadUrl = '{upload_image}';"
|
|
" var uploadUrls = {"
|
|
" image: uploadUrl,"
|
|
f" media: '{upload_media}',"
|
|
f" file: '{upload_file}',"
|
|
" };"
|
|
" var fileInput = document.getElementById('feature-image-file');"
|
|
" var addBtn = document.getElementById('feature-image-add-btn');"
|
|
" var deleteBtn = document.getElementById('feature-image-delete-btn');"
|
|
" var preview = document.getElementById('feature-image-preview');"
|
|
" var emptyState = document.getElementById('feature-image-empty');"
|
|
" var filledState = document.getElementById('feature-image-filled');"
|
|
" var hiddenUrl = document.getElementById('feature-image-input');"
|
|
" var hiddenCaption = document.getElementById('feature-image-caption-input');"
|
|
" var captionInput = document.getElementById('feature-image-caption');"
|
|
" var uploading = document.getElementById('feature-image-uploading');"
|
|
" function showFilled(url) {"
|
|
" preview.src = url; hiddenUrl.value = url;"
|
|
" emptyState.classList.add('hidden'); filledState.classList.remove('hidden'); uploading.classList.add('hidden');"
|
|
" }"
|
|
" function showEmpty() {"
|
|
" preview.src = ''; hiddenUrl.value = ''; hiddenCaption.value = ''; captionInput.value = '';"
|
|
" emptyState.classList.remove('hidden'); filledState.classList.add('hidden'); uploading.classList.add('hidden');"
|
|
" }"
|
|
" function uploadFile(file) {"
|
|
" emptyState.classList.add('hidden'); uploading.classList.remove('hidden');"
|
|
" var fd = new FormData(); fd.append('file', file);"
|
|
" fetch(uploadUrl, { method: 'POST', body: fd, headers: { 'X-CSRFToken': csrfToken } })"
|
|
" .then(function(r) { if (!r.ok) throw new Error('Upload failed (' + r.status + ')'); return r.json(); })"
|
|
" .then(function(data) {"
|
|
" var url = data.images && data.images[0] && data.images[0].url;"
|
|
" if (url) showFilled(url); else { showEmpty(); alert('Upload succeeded but no image URL returned.'); }"
|
|
" })"
|
|
" .catch(function(e) { showEmpty(); alert(e.message); });"
|
|
" }"
|
|
" addBtn.addEventListener('click', function() { fileInput.click(); });"
|
|
" preview.addEventListener('click', function() { fileInput.click(); });"
|
|
" deleteBtn.addEventListener('click', function(e) { e.stopPropagation(); showEmpty(); });"
|
|
" fileInput.addEventListener('change', function() {"
|
|
" if (fileInput.files && fileInput.files[0]) { uploadFile(fileInput.files[0]); fileInput.value = ''; }"
|
|
" });"
|
|
" captionInput.addEventListener('input', function() { hiddenCaption.value = captionInput.value; });"
|
|
" var excerpt = document.querySelector('textarea[name=\"custom_excerpt\"]');"
|
|
" function autoResize() { excerpt.style.height = 'auto'; excerpt.style.height = excerpt.scrollHeight + 'px'; }"
|
|
" excerpt.addEventListener('input', autoResize); autoResize();"
|
|
)
|
|
|
|
if has_initial_json:
|
|
init_body += (
|
|
" var dataEl = document.getElementById('lexical-initial-data');"
|
|
" var initialJson = dataEl ? dataEl.textContent.trim() : null;"
|
|
" if (initialJson) { var hidden = document.getElementById('lexical-json-input'); if (hidden) hidden.value = initialJson; }"
|
|
)
|
|
initial_json_arg = "initialJson: initialJson,"
|
|
else:
|
|
initial_json_arg = "initialJson: null,"
|
|
|
|
init_body += (
|
|
" window.mountEditor('lexical-editor', {"
|
|
f" {initial_json_arg}"
|
|
" csrfToken: csrfToken,"
|
|
" uploadUrls: uploadUrls,"
|
|
f" oembedUrl: '{oembed}',"
|
|
f" unsplashApiKey: '{unsplash_key}',"
|
|
f" snippetsUrl: '{snippets}',"
|
|
" });"
|
|
" if (typeof SxEditor !== 'undefined') {"
|
|
" SxEditor.mount('sx-editor', {"
|
|
" initialSx: (document.getElementById('sx-content-input') || {}).value || null,"
|
|
" csrfToken: csrfToken,"
|
|
" uploadUrls: uploadUrls,"
|
|
f" oembedUrl: '{oembed}',"
|
|
" onChange: function(sx) {"
|
|
" document.getElementById('sx-content-input').value = sx;"
|
|
" }"
|
|
" });"
|
|
" }"
|
|
" document.addEventListener('keydown', function(e) {"
|
|
" if ((e.ctrlKey || e.metaKey) && e.key === 's') {"
|
|
f" e.preventDefault(); document.getElementById('{form_id}').requestSubmit();"
|
|
" }"
|
|
" });"
|
|
" }"
|
|
" if (typeof window.mountEditor === 'function') { init(); }"
|
|
" else { var _t = setInterval(function() {"
|
|
" if (typeof window.mountEditor === 'function') { clearInterval(_t); init(); }"
|
|
" }, 50); }"
|
|
"})();"
|
|
)
|
|
|
|
return font_size_preamble + init_body
|
|
|
|
|
|
def _editor_urls() -> dict:
|
|
"""Extract editor API URLs and asset paths."""
|
|
import os
|
|
from quart import url_for as qurl, current_app
|
|
|
|
asset_url_fn = current_app.jinja_env.globals.get("asset_url", lambda p: "")
|
|
return {
|
|
"upload_image": qurl("blog.editor_api.upload_image"),
|
|
"upload_media": qurl("blog.editor_api.upload_media"),
|
|
"upload_file": qurl("blog.editor_api.upload_file"),
|
|
"oembed": qurl("blog.editor_api.oembed_proxy"),
|
|
"snippets": qurl("blog.editor_api.list_snippets"),
|
|
"unsplash_key": os.environ.get("UNSPLASH_ACCESS_KEY", ""),
|
|
"css_href": asset_url_fn("scripts/editor.css"),
|
|
"js_src": asset_url_fn("scripts/editor.js"),
|
|
"sx_editor_js_src": asset_url_fn("scripts/sx-editor.js"),
|
|
}
|
|
|
|
|
|
def _h_editor_content(**kw):
|
|
"""New post editor panel."""
|
|
from shared.sx.helpers import sx_call
|
|
from shared.browser.app.csrf import generate_csrf_token
|
|
|
|
urls = _editor_urls()
|
|
csrf = generate_csrf_token()
|
|
init_js = _editor_init_js(urls, form_id="post-new-form", has_initial_json=False)
|
|
|
|
return sx_call("blog-editor-content",
|
|
csrf=csrf,
|
|
title_placeholder="Post title...",
|
|
create_label="Create Post",
|
|
css_href=urls["css_href"],
|
|
js_src=urls["js_src"],
|
|
sx_editor_js_src=urls["sx_editor_js_src"],
|
|
init_js=init_js)
|
|
|
|
|
|
def _h_editor_page_content(**kw):
|
|
"""New page editor panel."""
|
|
from shared.sx.helpers import sx_call
|
|
from shared.browser.app.csrf import generate_csrf_token
|
|
|
|
urls = _editor_urls()
|
|
csrf = generate_csrf_token()
|
|
init_js = _editor_init_js(urls, form_id="post-new-form", has_initial_json=False)
|
|
|
|
return sx_call("blog-editor-content",
|
|
csrf=csrf,
|
|
title_placeholder="Page title...",
|
|
create_label="Create Page",
|
|
css_href=urls["css_href"],
|
|
js_src=urls["js_src"],
|
|
sx_editor_js_src=urls["sx_editor_js_src"],
|
|
init_js=init_js)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Post admin helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _h_post_admin_content(slug=None, **kw):
|
|
await _ensure_post_data(slug)
|
|
from shared.sx.helpers import sx_call
|
|
return sx_call("blog-admin-placeholder")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data introspection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _extract_model_data(obj, depth=0, max_depth=2) -> dict:
|
|
"""Recursively extract ORM model data into a nested dict for .sx rendering."""
|
|
from markupsafe import escape as esc
|
|
|
|
# Scalar columns
|
|
columns = []
|
|
for col in obj.__mapper__.columns:
|
|
key = col.key
|
|
if key == "_sa_instance_state":
|
|
continue
|
|
val = getattr(obj, key, None)
|
|
if val is None:
|
|
columns.append({"key": str(key), "value": "", "type": "nil"})
|
|
elif hasattr(val, "isoformat"):
|
|
columns.append({"key": str(key), "value": str(esc(val.isoformat())), "type": "date"})
|
|
elif isinstance(val, str):
|
|
columns.append({"key": str(key), "value": str(esc(val)), "type": "str"})
|
|
else:
|
|
columns.append({"key": str(key), "value": str(esc(str(val))), "type": "other"})
|
|
|
|
# Relationships
|
|
relationships = []
|
|
for rel in obj.__mapper__.relationships:
|
|
rel_name = rel.key
|
|
loaded = rel_name in obj.__dict__
|
|
value = getattr(obj, rel_name, None) if loaded else None
|
|
cardinality = "many" if rel.uselist else "one"
|
|
cls_name = rel.mapper.class_.__name__
|
|
|
|
rel_data: dict[str, Any] = {
|
|
"name": rel_name,
|
|
"cardinality": cardinality,
|
|
"class_name": cls_name,
|
|
"loaded": loaded,
|
|
"value": None,
|
|
}
|
|
|
|
if value is None:
|
|
pass # value stays None
|
|
elif rel.uselist:
|
|
items_list = list(value) if value else []
|
|
val_data: dict[str, Any] = {"is_list": True, "count": len(items_list)}
|
|
if items_list and depth < max_depth:
|
|
items = []
|
|
for i, it in enumerate(items_list, 1):
|
|
summary = _obj_summary(it)
|
|
children = _extract_model_data(it, depth + 1, max_depth) if depth < max_depth else None
|
|
items.append({"index": i, "summary": summary, "children": children})
|
|
val_data["items"] = items
|
|
rel_data["value"] = val_data
|
|
else:
|
|
child = value
|
|
summary = _obj_summary(child)
|
|
children = _extract_model_data(child, depth + 1, max_depth) if depth < max_depth else None
|
|
rel_data["value"] = {"is_list": False, "summary": summary, "children": children}
|
|
|
|
relationships.append(rel_data)
|
|
|
|
return {"columns": columns, "relationships": relationships}
|
|
|
|
|
|
def _obj_summary(obj) -> str:
|
|
"""Build a summary string for an ORM object."""
|
|
from markupsafe import escape as esc
|
|
ident_parts = []
|
|
for k in ("id", "ghost_id", "uuid", "slug", "name", "title"):
|
|
if k in obj.__mapper__.c:
|
|
v = getattr(obj, k, "")
|
|
ident_parts.append(f"{k}={v}")
|
|
return str(esc(" \u2022 ".join(ident_parts) if ident_parts else str(obj)))
|
|
|
|
|
|
async def _h_post_data_content(slug=None, **kw):
|
|
await _ensure_post_data(slug)
|
|
from quart import g
|
|
from shared.sx.helpers import sx_call
|
|
|
|
original_post = getattr(g, "post_data", {}).get("original_post")
|
|
if original_post is None:
|
|
return sx_call("blog-data-table-content")
|
|
|
|
tablename = getattr(original_post, "__tablename__", "?")
|
|
model_data = _extract_model_data(original_post, 0, 2)
|
|
|
|
return sx_call("blog-data-table-content",
|
|
tablename=tablename, model_data=model_data)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Preview content
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _h_post_preview_content(slug=None, **kw):
|
|
await _ensure_post_data(slug)
|
|
from quart import g
|
|
from shared.services.registry import services
|
|
from shared.sx.helpers import sx_call
|
|
from shared.sx.parser import SxExpr
|
|
|
|
preview = await services.blog_page.preview_data(g.s)
|
|
|
|
return sx_call("blog-preview-content",
|
|
sx_pretty=SxExpr(preview["sx_pretty"]) if preview.get("sx_pretty") else None,
|
|
json_pretty=SxExpr(preview["json_pretty"]) if preview.get("json_pretty") else None,
|
|
sx_rendered=preview.get("sx_rendered") or None,
|
|
lex_rendered=preview.get("lex_rendered") or None)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entries browser
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _extract_associated_entries_data(all_calendars, associated_entry_ids, post_slug: str) -> list:
|
|
"""Extract associated entry data for .sx rendering."""
|
|
from quart import url_for as qurl
|
|
from shared.utils import host_url
|
|
|
|
entries = []
|
|
for calendar in all_calendars:
|
|
cal_entries = getattr(calendar, "entries", []) or []
|
|
cal_name = getattr(calendar, "name", "")
|
|
cal_post = getattr(calendar, "post", None)
|
|
cal_fi = getattr(cal_post, "feature_image", None) if cal_post else None
|
|
cal_title = getattr(cal_post, "title", "") if cal_post else ""
|
|
|
|
for entry in cal_entries:
|
|
e_id = getattr(entry, "id", None)
|
|
if e_id not in associated_entry_ids:
|
|
continue
|
|
if getattr(entry, "deleted_at", None) is not None:
|
|
continue
|
|
|
|
e_name = getattr(entry, "name", "")
|
|
e_start = getattr(entry, "start_at", None)
|
|
e_end = getattr(entry, "end_at", None)
|
|
|
|
toggle_url = host_url(qurl("blog.post.admin.toggle_entry",
|
|
slug=post_slug, entry_id=e_id))
|
|
|
|
date_str = e_start.strftime("%A, %B %d, %Y at %H:%M") if e_start else ""
|
|
if e_end:
|
|
date_str += f" \u2013 {e_end.strftime('%H:%M')}"
|
|
|
|
entries.append({
|
|
"name": e_name,
|
|
"confirm_text": f"This will remove {e_name} from this post",
|
|
"toggle_url": toggle_url,
|
|
"cal_image": cal_fi or "",
|
|
"cal_title": cal_title,
|
|
"date_str": f"{cal_name} \u2022 {date_str}",
|
|
})
|
|
|
|
return entries
|
|
|
|
|
|
def _extract_calendar_browser_data(all_calendars, post_slug: str) -> list:
|
|
"""Extract calendar browser data for .sx rendering."""
|
|
from quart import url_for as qurl
|
|
from shared.utils import host_url
|
|
|
|
calendars = []
|
|
for cal in all_calendars:
|
|
cal_post = getattr(cal, "post", None)
|
|
cal_fi = getattr(cal_post, "feature_image", None) if cal_post else None
|
|
cal_title = getattr(cal_post, "title", "") if cal_post else ""
|
|
cal_name = getattr(cal, "name", "")
|
|
view_url = host_url(qurl("blog.post.admin.calendar_view",
|
|
slug=post_slug, calendar_id=cal.id))
|
|
calendars.append({
|
|
"name": cal_name,
|
|
"title": cal_title,
|
|
"image": cal_fi or "",
|
|
"view_url": view_url,
|
|
})
|
|
return calendars
|
|
|
|
|
|
async def _h_post_entries_content(slug=None, **kw):
|
|
await _ensure_post_data(slug)
|
|
from quart import g
|
|
from sqlalchemy import select
|
|
from shared.models.calendars import Calendar
|
|
from shared.sx.helpers import sx_call
|
|
from shared.sx.parser import SxExpr
|
|
from shared.browser.app.csrf import generate_csrf_token
|
|
from bp.post.services.entry_associations import get_post_entry_ids
|
|
|
|
post_id = g.post_data["post"]["id"]
|
|
post_slug = g.post_data["post"]["slug"]
|
|
associated_entry_ids = await get_post_entry_ids(post_id)
|
|
result = await g.s.execute(
|
|
select(Calendar)
|
|
.where(Calendar.deleted_at.is_(None))
|
|
.order_by(Calendar.name.asc())
|
|
)
|
|
all_calendars = result.scalars().all()
|
|
for calendar in all_calendars:
|
|
await g.s.refresh(calendar, ["entries", "post"])
|
|
|
|
csrf = generate_csrf_token()
|
|
entry_data = _extract_associated_entries_data(
|
|
all_calendars, associated_entry_ids, post_slug)
|
|
calendar_data = _extract_calendar_browser_data(all_calendars, post_slug)
|
|
|
|
entries_panel = sx_call("blog-associated-entries-from-data",
|
|
entries=entry_data, csrf=csrf)
|
|
|
|
return sx_call("blog-entries-browser-content",
|
|
entries_panel=SxExpr(entries_panel),
|
|
calendars=calendar_data)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Settings form
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _h_post_settings_content(slug=None, **kw):
|
|
await _ensure_post_data(slug)
|
|
from quart import g, request
|
|
from models.ghost_content import Post
|
|
from sqlalchemy import select as sa_select
|
|
from sqlalchemy.orm import selectinload
|
|
from shared.browser.app.csrf import generate_csrf_token
|
|
from shared.sx.helpers import sx_call
|
|
from bp.post.admin.routes import _post_to_edit_dict
|
|
|
|
post_id = g.post_data["post"]["id"]
|
|
post = (await g.s.execute(
|
|
sa_select(Post)
|
|
.where(Post.id == post_id)
|
|
.options(selectinload(Post.tags))
|
|
)).scalar_one_or_none()
|
|
ghost_post = _post_to_edit_dict(post) if post else {}
|
|
save_success = request.args.get("saved") == "1"
|
|
csrf = generate_csrf_token()
|
|
|
|
p = g.post_data.get("post", {}) if hasattr(g, "post_data") else {}
|
|
is_page = p.get("is_page", False)
|
|
gp = ghost_post
|
|
|
|
# Extract tag names
|
|
tags = gp.get("tags") or []
|
|
if tags:
|
|
tag_names = ", ".join(
|
|
getattr(t, "name", t.get("name", "") if isinstance(t, dict) else str(t))
|
|
for t in tags
|
|
)
|
|
else:
|
|
tag_names = ""
|
|
|
|
# Published at — trim to datetime-local format
|
|
pub_at = gp.get("published_at") or ""
|
|
pub_at_val = pub_at[:16] if pub_at else ""
|
|
|
|
return sx_call("blog-settings-form-content",
|
|
csrf=csrf,
|
|
updated_at=gp.get("updated_at") or "",
|
|
is_page=is_page,
|
|
save_success=save_success,
|
|
slug=gp.get("slug") or "",
|
|
published_at=pub_at_val,
|
|
featured=bool(gp.get("featured")),
|
|
visibility=gp.get("visibility") or "public",
|
|
email_only=bool(gp.get("email_only")),
|
|
tags=tag_names,
|
|
feature_image_alt=gp.get("feature_image_alt") or "",
|
|
meta_title=gp.get("meta_title") or "",
|
|
meta_description=gp.get("meta_description") or "",
|
|
canonical_url=gp.get("canonical_url") or "",
|
|
og_title=gp.get("og_title") or "",
|
|
og_description=gp.get("og_description") or "",
|
|
og_image=gp.get("og_image") or "",
|
|
twitter_title=gp.get("twitter_title") or "",
|
|
twitter_description=gp.get("twitter_description") or "",
|
|
twitter_image=gp.get("twitter_image") or "",
|
|
custom_template=gp.get("custom_template") or "")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Post edit content
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _extract_newsletter_options(newsletters) -> list:
|
|
"""Extract newsletter data for .sx rendering."""
|
|
return [{"slug": getattr(nl, "slug", ""),
|
|
"name": getattr(nl, "name", "")} for nl in newsletters]
|
|
|
|
|
|
def _extract_footer_badges(ghost_post: dict, post: dict, save_success: bool,
|
|
publish_requested: bool, already_emailed: bool) -> list:
|
|
"""Extract footer badge data for .sx rendering."""
|
|
badges = []
|
|
if save_success:
|
|
badges.append({"cls": "text-[14px] text-green-600", "text": "Saved."})
|
|
if publish_requested:
|
|
badges.append({"cls": "text-[14px] text-blue-600",
|
|
"text": "Publish requested \u2014 an admin will review."})
|
|
if post.get("publish_requested"):
|
|
badges.append({"cls": "inline-block px-2 py-0.5 rounded-full text-xs font-semibold bg-blue-100 text-blue-800",
|
|
"text": "Publish requested"})
|
|
if already_emailed:
|
|
nl_name = ""
|
|
newsletter = ghost_post.get("newsletter")
|
|
if newsletter:
|
|
nl_name = (getattr(newsletter, "name", "")
|
|
if not isinstance(newsletter, dict)
|
|
else newsletter.get("name", ""))
|
|
suffix = f" to {nl_name}" if nl_name else ""
|
|
badges.append({"cls": "inline-block px-2 py-0.5 rounded-full text-xs font-semibold bg-green-100 text-green-800",
|
|
"text": f"Emailed{suffix}"})
|
|
return badges
|
|
|
|
|
|
async def _h_post_edit_content(slug=None, **kw):
|
|
await _ensure_post_data(slug)
|
|
from quart import g, request as qrequest
|
|
from models.ghost_content import Post
|
|
from sqlalchemy import select as sa_select
|
|
from sqlalchemy.orm import selectinload
|
|
from shared.infrastructure.data_client import fetch_data
|
|
from shared.browser.app.csrf import generate_csrf_token
|
|
from shared.sx.helpers import sx_call
|
|
from shared.sx.parser import SxExpr, serialize as sx_serialize
|
|
from bp.post.admin.routes import _post_to_edit_dict
|
|
|
|
post_id = g.post_data["post"]["id"]
|
|
db_post = (await g.s.execute(
|
|
sa_select(Post)
|
|
.where(Post.id == post_id)
|
|
.options(selectinload(Post.tags))
|
|
)).scalar_one_or_none()
|
|
ghost_post = _post_to_edit_dict(db_post) if db_post else {}
|
|
save_success = qrequest.args.get("saved") == "1"
|
|
save_error = qrequest.args.get("error", "")
|
|
raw_newsletters = await fetch_data("account", "newsletters", required=False) or []
|
|
from types import SimpleNamespace
|
|
newsletters = [SimpleNamespace(**nl) for nl in raw_newsletters]
|
|
|
|
csrf = generate_csrf_token()
|
|
urls = _editor_urls()
|
|
|
|
post = g.post_data.get("post", {}) if hasattr(g, "post_data") else {}
|
|
is_page = post.get("is_page", False)
|
|
|
|
feature_image = ghost_post.get("feature_image") or ""
|
|
feature_image_caption = ghost_post.get("feature_image_caption") or ""
|
|
title_val = ghost_post.get("title") or ""
|
|
excerpt_val = ghost_post.get("custom_excerpt") or ""
|
|
updated_at = ghost_post.get("updated_at") or ""
|
|
status = ghost_post.get("status") or "draft"
|
|
lexical_json = ghost_post.get("lexical") or '{"root":{"children":[{"children":[],"direction":null,"format":"","indent":0,"type":"paragraph","version":1}],"direction":null,"format":"","indent":0,"type":"root","version":1}}'
|
|
sx_content = ghost_post.get("sx_content") or ""
|
|
has_sx = bool(sx_content)
|
|
|
|
already_emailed = bool(ghost_post and ghost_post.get("email") and
|
|
(ghost_post["email"] if isinstance(ghost_post["email"], dict) else {}).get("status"))
|
|
email_obj = ghost_post.get("email")
|
|
if email_obj and not isinstance(email_obj, dict):
|
|
already_emailed = bool(getattr(email_obj, "status", None))
|
|
|
|
title_placeholder = "Page title..." if is_page else "Post title..."
|
|
|
|
# Newsletter options as SX fragment
|
|
nl_parts = ['(option :value "" "Select newsletter\u2026")']
|
|
for nl in newsletters:
|
|
nl_slug = sx_serialize(getattr(nl, "slug", ""))
|
|
nl_name = sx_serialize(getattr(nl, "name", ""))
|
|
nl_parts.append(f"(option :value {nl_slug} {nl_name})")
|
|
nl_opts_sx = SxExpr("(<> " + " ".join(nl_parts) + ")")
|
|
|
|
# Footer extra badges as SX fragment
|
|
publish_requested = bool(qrequest.args.get("publish_requested")) if hasattr(qrequest, 'args') else False
|
|
badges = _extract_footer_badges(ghost_post, post, save_success,
|
|
publish_requested, already_emailed)
|
|
if badges:
|
|
badge_parts = [f'(span :class "{b["cls"]}" {sx_serialize(b["text"])})'
|
|
for b in badges]
|
|
footer_extra_sx = SxExpr("(<> " + " ".join(badge_parts) + ")")
|
|
else:
|
|
footer_extra_sx = None
|
|
|
|
init_js = _editor_init_js(urls, form_id="post-edit-form", has_initial_json=True)
|
|
|
|
return sx_call("blog-edit-content",
|
|
csrf=csrf, updated_at=str(updated_at),
|
|
title_val=title_val, excerpt_val=excerpt_val,
|
|
feature_image=feature_image,
|
|
feature_image_caption=feature_image_caption,
|
|
sx_content_val=sx_content, lexical_json=lexical_json,
|
|
has_sx=has_sx, title_placeholder=title_placeholder,
|
|
status=status, already_emailed=already_emailed,
|
|
newsletter_options=nl_opts_sx, footer_extra=footer_extra_sx,
|
|
css_href=urls["css_href"], js_src=urls["js_src"],
|
|
sx_editor_js_src=urls["sx_editor_js_src"],
|
|
init_js=init_js, save_error=save_error or None)
|