Migrate all apps to defpage declarative page routes
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 3m41s

Replace Python GET page handlers with declarative defpage definitions in .sx
files across all 8 apps (sx docs, orders, account, market, cart, federation,
events, blog). Each app now has sxc/pages/ with setup functions, layout
registrations, page helpers, and .sx defpage declarations.

Core infrastructure: add g I/O primitive, PageDef support for auth/layout/
data/content/filter/aside/menu slots, post_author auth level, and custom
layout registration. Remove ~1400 lines of render_*_page/render_*_oob
boilerplate. Update all endpoint references in routes, sx_components, and
templates to defpage_* naming.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-03 14:52:34 +00:00
parent 5b4cacaf19
commit c243d17eeb
108 changed files with 3598 additions and 2851 deletions

View File

@@ -55,51 +55,154 @@ def _post_to_edit_dict(post) -> dict:
def register():
bp = Blueprint("admin", __name__, url_prefix='/admin')
@bp.get("/")
@require_admin
async def admin(slug: str):
from shared.browser.app.utils.htmx import is_htmx_request
from sqlalchemy import select
from shared.models.page_config import PageConfig
@bp.before_request
async def _prepare_page_data():
ep = request.endpoint or ""
if "defpage_post_admin" in ep:
from sqlalchemy import select
from shared.models.page_config import PageConfig
post = (g.post_data or {}).get("post", {})
features = {}
sumup_configured = False
sumup_merchant_code = ""
sumup_checkout_prefix = ""
if post.get("is_page"):
pc = (await g.s.execute(
select(PageConfig).where(
PageConfig.container_type == "page",
PageConfig.container_id == post["id"],
)
)).scalar_one_or_none()
if pc:
features = pc.features or {}
sumup_configured = bool(pc.sumup_api_key)
sumup_merchant_code = pc.sumup_merchant_code or ""
sumup_checkout_prefix = pc.sumup_checkout_prefix or ""
from shared.sx.page import get_template_context
from sx.sx_components import _post_admin_main_panel_sx
tctx = await get_template_context()
tctx.update({
"features": features,
"sumup_configured": sumup_configured,
"sumup_merchant_code": sumup_merchant_code,
"sumup_checkout_prefix": sumup_checkout_prefix,
})
g.post_admin_content = _post_admin_main_panel_sx(tctx)
# Load features for page admin (page_configs now lives in db_blog)
post = (g.post_data or {}).get("post", {})
features = {}
sumup_configured = False
sumup_merchant_code = ""
sumup_checkout_prefix = ""
if post.get("is_page"):
pc = (await g.s.execute(
select(PageConfig).where(
PageConfig.container_type == "page",
PageConfig.container_id == post["id"],
)
)).scalar_one_or_none()
if pc:
features = pc.features or {}
sumup_configured = bool(pc.sumup_api_key)
sumup_merchant_code = pc.sumup_merchant_code or ""
sumup_checkout_prefix = pc.sumup_checkout_prefix or ""
elif "defpage_post_data" in ep:
from shared.sx.page import get_template_context
from sx.sx_components import _post_data_content_sx
tctx = await get_template_context()
g.post_data_content = _post_data_content_sx(tctx)
ctx = {
"features": features,
"sumup_configured": sumup_configured,
"sumup_merchant_code": sumup_merchant_code,
"sumup_checkout_prefix": sumup_checkout_prefix,
}
elif "defpage_post_preview" in ep:
from models.ghost_content import Post
from sqlalchemy import select as sa_select
post_id = g.post_data["post"]["id"]
post = (await g.s.execute(
sa_select(Post).where(Post.id == post_id)
)).scalar_one_or_none()
preview_ctx = {}
sx_content = getattr(post, "sx_content", None) or ""
if sx_content:
from shared.sx.prettify import sx_to_pretty_sx
preview_ctx["sx_pretty"] = sx_to_pretty_sx(sx_content)
lexical_raw = getattr(post, "lexical", None) or ""
if lexical_raw:
from shared.sx.prettify import json_to_pretty_sx
preview_ctx["json_pretty"] = json_to_pretty_sx(lexical_raw)
if sx_content:
from shared.sx.parser import parse as sx_parse
from shared.sx.html import render as sx_html_render
from shared.sx.jinja_bridge import _COMPONENT_ENV
try:
parsed = sx_parse(sx_content)
preview_ctx["sx_rendered"] = sx_html_render(parsed, dict(_COMPONENT_ENV))
except Exception:
preview_ctx["sx_rendered"] = "<em>Error rendering sx</em>"
if lexical_raw:
from bp.blog.ghost.lexical_renderer import render_lexical
try:
preview_ctx["lex_rendered"] = render_lexical(lexical_raw)
except Exception:
preview_ctx["lex_rendered"] = "<em>Error rendering lexical</em>"
from shared.sx.page import get_template_context
from sx.sx_components import _preview_main_panel_sx
tctx = await get_template_context()
tctx.update(preview_ctx)
g.post_preview_content = _preview_main_panel_sx(tctx)
from shared.sx.page import get_template_context
from sx.sx_components import render_post_admin_page, render_post_admin_oob
elif "defpage_post_entries" in ep:
from sqlalchemy import select
from shared.models.calendars import Calendar
from ..services.entry_associations import get_post_entry_ids
post_id = g.post_data["post"]["id"]
associated_entry_ids = await get_post_entry_ids(post_id)
result = await g.s.execute(
select(Calendar)
.where(Calendar.deleted_at.is_(None))
.order_by(Calendar.name.asc())
)
all_calendars = result.scalars().all()
for calendar in all_calendars:
await g.s.refresh(calendar, ["entries", "post"])
from shared.sx.page import get_template_context
from sx.sx_components import _post_entries_content_sx
tctx = await get_template_context()
tctx["all_calendars"] = all_calendars
tctx["associated_entry_ids"] = associated_entry_ids
g.post_entries_content = _post_entries_content_sx(tctx)
tctx = await get_template_context()
tctx.update(ctx)
if not is_htmx_request():
html = await render_post_admin_page(tctx)
return await make_response(html)
else:
sx_src = await render_post_admin_oob(tctx)
return sx_response(sx_src)
elif "defpage_post_settings" in ep:
from models.ghost_content import Post
from sqlalchemy import select as sa_select
from sqlalchemy.orm import selectinload
post_id = g.post_data["post"]["id"]
post = (await g.s.execute(
sa_select(Post)
.where(Post.id == post_id)
.options(selectinload(Post.tags))
)).scalar_one_or_none()
ghost_post = _post_to_edit_dict(post) if post else {}
save_success = request.args.get("saved") == "1"
from shared.sx.page import get_template_context
from sx.sx_components import _post_settings_content_sx
tctx = await get_template_context()
tctx["ghost_post"] = ghost_post
tctx["save_success"] = save_success
g.post_settings_content = _post_settings_content_sx(tctx)
elif "defpage_post_edit" in ep:
from models.ghost_content import Post
from sqlalchemy import select as sa_select
from sqlalchemy.orm import selectinload
from shared.infrastructure.data_client import fetch_data
post_id = g.post_data["post"]["id"]
post = (await g.s.execute(
sa_select(Post)
.where(Post.id == post_id)
.options(selectinload(Post.tags))
)).scalar_one_or_none()
ghost_post = _post_to_edit_dict(post) if post else {}
save_success = request.args.get("saved") == "1"
save_error = request.args.get("error", "")
raw_newsletters = await fetch_data("account", "newsletters", required=False) or []
from types import SimpleNamespace
newsletters = [SimpleNamespace(**nl) for nl in raw_newsletters]
from shared.sx.page import get_template_context
from sx.sx_components import _post_edit_content_sx
tctx = await get_template_context()
tctx["ghost_post"] = ghost_post
tctx["save_success"] = save_success
tctx["save_error"] = save_error
tctx["newsletters"] = newsletters
g.post_edit_content = _post_edit_content_sx(tctx)
from shared.sx.pages import mount_pages
mount_pages(bp, "blog", names=[
"post-admin", "post-data", "post-preview",
"post-entries", "post-settings", "post-edit",
])
@bp.put("/features/")
@require_admin
@@ -184,77 +287,6 @@ def register():
)
return sx_response(html)
@bp.get("/data/")
@require_admin
async def data(slug: str):
from shared.sx.page import get_template_context
from sx.sx_components import render_post_data_page, render_post_data_oob
tctx = await get_template_context()
if not is_htmx_request():
html = await render_post_data_page(tctx)
return await make_response(html)
else:
sx_src = await render_post_data_oob(tctx)
return sx_response(sx_src)
@bp.get("/preview/")
@require_admin
async def preview(slug: str):
from models.ghost_content import Post
from sqlalchemy import select as sa_select
from shared.sx.page import get_template_context
from sx.sx_components import render_post_preview_page, render_post_preview_oob
post_id = g.post_data["post"]["id"]
post = (await g.s.execute(
sa_select(Post).where(Post.id == post_id)
)).scalar_one_or_none()
# Build the 4 preview views
preview_ctx = {}
# 1. Prettified sx source
sx_content = getattr(post, "sx_content", None) or ""
if sx_content:
from shared.sx.prettify import sx_to_pretty_sx
preview_ctx["sx_pretty"] = sx_to_pretty_sx(sx_content)
# 2. Prettified lexical JSON
lexical_raw = getattr(post, "lexical", None) or ""
if lexical_raw:
from shared.sx.prettify import json_to_pretty_sx
preview_ctx["json_pretty"] = json_to_pretty_sx(lexical_raw)
# 3. SX rendered preview
if sx_content:
from shared.sx.parser import parse as sx_parse
from shared.sx.html import render as sx_html_render
from shared.sx.jinja_bridge import _COMPONENT_ENV
try:
parsed = sx_parse(sx_content)
preview_ctx["sx_rendered"] = sx_html_render(parsed, dict(_COMPONENT_ENV))
except Exception:
preview_ctx["sx_rendered"] = "<em>Error rendering sx</em>"
# 4. Lexical rendered preview
if lexical_raw:
from bp.blog.ghost.lexical_renderer import render_lexical
try:
preview_ctx["lex_rendered"] = render_lexical(lexical_raw)
except Exception:
preview_ctx["lex_rendered"] = "<em>Error rendering lexical</em>"
tctx = await get_template_context()
tctx.update(preview_ctx)
if not is_htmx_request():
html = await render_post_preview_page(tctx)
return await make_response(html)
else:
sx_src = await render_post_preview_oob(tctx)
return sx_response(sx_src)
@bp.get("/entries/calendar/<int:calendar_id>/")
@require_admin
async def calendar_view(slug: str, calendar_id: int):
@@ -330,40 +362,6 @@ def register():
)
return sx_response(html)
@bp.get("/entries/")
@require_admin
async def entries(slug: str):
from ..services.entry_associations import get_post_entry_ids
from shared.models.calendars import Calendar
from sqlalchemy import select
post_id = g.post_data["post"]["id"]
associated_entry_ids = await get_post_entry_ids(post_id)
# Load ALL calendars (not just this post's calendars)
result = await g.s.execute(
select(Calendar)
.where(Calendar.deleted_at.is_(None))
.order_by(Calendar.name.asc())
)
all_calendars = result.scalars().all()
# Load entries and post for each calendar
for calendar in all_calendars:
await g.s.refresh(calendar, ["entries", "post"])
from shared.sx.page import get_template_context
from sx.sx_components import render_post_entries_page, render_post_entries_oob
tctx = await get_template_context()
tctx["all_calendars"] = all_calendars
tctx["associated_entry_ids"] = associated_entry_ids
if not is_htmx_request():
html = await render_post_entries_page(tctx)
return await make_response(html)
else:
sx_src = await render_post_entries_oob(tctx)
return sx_response(sx_src)
@bp.post("/entries/<int:entry_id>/toggle/")
@require_admin
async def toggle_entry(slug: str, entry_id: int):
@@ -416,36 +414,6 @@ def register():
return sx_response(admin_list + nav_entries_html)
@bp.get("/settings/")
@require_post_author
async def settings(slug: str):
from models.ghost_content import Post
from sqlalchemy import select as sa_select
from sqlalchemy.orm import selectinload
post_id = g.post_data["post"]["id"]
post = (await g.s.execute(
sa_select(Post)
.where(Post.id == post_id)
.options(selectinload(Post.tags))
)).scalar_one_or_none()
ghost_post = _post_to_edit_dict(post) if post else {}
save_success = request.args.get("saved") == "1"
from shared.sx.page import get_template_context
from sx.sx_components import render_post_settings_page, render_post_settings_oob
tctx = await get_template_context()
tctx["ghost_post"] = ghost_post
tctx["save_success"] = save_success
if not is_htmx_request():
html = await render_post_settings_page(tctx)
return await make_response(html)
else:
sx_src = await render_post_settings_oob(tctx)
return sx_response(sx_src)
@bp.post("/settings/")
@require_post_author
async def settings_save(slug: str):
@@ -500,7 +468,7 @@ def register():
except OptimisticLockError:
from urllib.parse import quote
return redirect(
host_url(url_for("blog.post.admin.settings", slug=slug))
host_url(url_for("blog.post.admin.defpage_post_settings", slug=slug))
+ "?error=" + quote("Someone else edited this post. Please reload and try again.")
)
@@ -511,46 +479,7 @@ def register():
await invalidate_tag_cache("post.post_detail")
# Redirect using the (possibly new) slug
return redirect(host_url(url_for("blog.post.admin.settings", slug=post.slug)) + "?saved=1")
@bp.get("/edit/")
@require_post_author
async def edit(slug: str):
from models.ghost_content import Post
from sqlalchemy import select as sa_select
from sqlalchemy.orm import selectinload
from shared.infrastructure.data_client import fetch_data
post_id = g.post_data["post"]["id"]
post = (await g.s.execute(
sa_select(Post)
.where(Post.id == post_id)
.options(selectinload(Post.tags))
)).scalar_one_or_none()
ghost_post = _post_to_edit_dict(post) if post else {}
save_success = request.args.get("saved") == "1"
save_error = request.args.get("error", "")
# Newsletters live in db_account — fetch via HTTP
raw_newsletters = await fetch_data("account", "newsletters", required=False) or []
from types import SimpleNamespace
newsletters = [SimpleNamespace(**nl) for nl in raw_newsletters]
from shared.sx.page import get_template_context
from sx.sx_components import render_post_edit_page, render_post_edit_oob
tctx = await get_template_context()
tctx["ghost_post"] = ghost_post
tctx["save_success"] = save_success
tctx["save_error"] = save_error
tctx["newsletters"] = newsletters
if not is_htmx_request():
html = await render_post_edit_page(tctx)
return await make_response(html)
else:
sx_src = await render_post_edit_oob(tctx)
return sx_response(sx_src)
return redirect(host_url(url_for("blog.post.admin.defpage_post_settings", slug=post.slug)) + "?saved=1")
@bp.post("/edit/")
@require_post_author
@@ -575,11 +504,11 @@ def register():
try:
lexical_doc = json.loads(lexical_raw)
except (json.JSONDecodeError, TypeError):
return redirect(host_url(url_for("blog.post.admin.edit", slug=slug)) + "?error=" + quote("Invalid JSON in editor content."))
return redirect(host_url(url_for("blog.post.admin.defpage_post_edit", slug=slug)) + "?error=" + quote("Invalid JSON in editor content."))
ok, reason = validate_lexical(lexical_doc)
if not ok:
return redirect(host_url(url_for("blog.post.admin.edit", slug=slug)) + "?error=" + quote(reason))
return redirect(host_url(url_for("blog.post.admin.defpage_post_edit", slug=slug)) + "?error=" + quote(reason))
# Publish workflow
is_admin = bool((g.get("rights") or {}).get("admin"))
@@ -615,7 +544,7 @@ def register():
)
except OptimisticLockError:
return redirect(
host_url(url_for("blog.post.admin.edit", slug=slug))
host_url(url_for("blog.post.admin.defpage_post_edit", slug=slug))
+ "?error=" + quote("Someone else edited this post. Please reload and try again.")
)
@@ -631,7 +560,7 @@ def register():
await invalidate_tag_cache("post.post_detail")
# Redirect to GET (PRG pattern) — use post.slug in case it changed
redirect_url = host_url(url_for("blog.post.admin.edit", slug=post.slug)) + "?saved=1"
redirect_url = host_url(url_for("blog.post.admin.defpage_post_edit", slug=post.slug)) + "?saved=1"
if publish_requested_msg:
redirect_url += "&publish_requested=1"
return redirect(redirect_url)