diff --git a/blog/bp/blog/routes.py b/blog/bp/blog/routes.py
index 3152c6f..a9a0ed1 100644
--- a/blog/bp/blog/routes.py
+++ b/blog/bp/blog/routes.py
@@ -229,7 +229,7 @@ def register(url_prefix, title):
lexical_doc = json.loads(lexical_raw)
except (json.JSONDecodeError, TypeError):
from shared.sx.page import get_template_context
- from sxc.pages import render_editor_panel
+ from sxc.pages.renders import render_editor_panel
tctx = await get_template_context()
tctx["editor_html"] = await render_editor_panel(save_error="Invalid JSON in editor content.")
html = await _render_new_post_page(tctx)
@@ -238,7 +238,7 @@ def register(url_prefix, title):
ok, reason = validate_lexical(lexical_doc)
if not ok:
from shared.sx.page import get_template_context
- from sxc.pages import render_editor_panel
+ from sxc.pages.renders import render_editor_panel
tctx = await get_template_context()
tctx["editor_html"] = await render_editor_panel(save_error=reason)
html = await _render_new_post_page(tctx)
@@ -285,7 +285,7 @@ def register(url_prefix, title):
lexical_doc = json.loads(lexical_raw)
except (json.JSONDecodeError, TypeError):
from shared.sx.page import get_template_context
- from sxc.pages import render_editor_panel
+ from sxc.pages.renders import render_editor_panel
tctx = await get_template_context()
tctx["editor_html"] = await render_editor_panel(save_error="Invalid JSON in editor content.", is_page=True)
tctx["is_page"] = True
@@ -295,7 +295,7 @@ def register(url_prefix, title):
ok, reason = validate_lexical(lexical_doc)
if not ok:
from shared.sx.page import get_template_context
- from sxc.pages import render_editor_panel
+ from sxc.pages.renders import render_editor_panel
tctx = await get_template_context()
tctx["editor_html"] = await render_editor_panel(save_error=reason, is_page=True)
tctx["is_page"] = True
diff --git a/blog/sxc/pages/__init__.py b/blog/sxc/pages/__init__.py
index d4077bb..af6ee10 100644
--- a/blog/sxc/pages/__init__.py
+++ b/blog/sxc/pages/__init__.py
@@ -1,11 +1,11 @@
"""Blog defpage setup — registers layouts, page helpers, and loads .sx pages."""
from __future__ import annotations
-from typing import Any
-
def setup_blog_pages() -> None:
"""Register blog-specific layouts, page helpers, and load page definitions."""
+ from .layouts import _register_blog_layouts
+ from .helpers import _register_blog_helpers
_register_blog_layouts()
_register_blog_helpers()
_load_blog_page_files()
@@ -15,1105 +15,6 @@ def _load_blog_page_files() -> None:
import os
from shared.sx.pages import load_page_dir
from shared.sx.jinja_bridge import load_service_components
- # Load blog .sx component definitions + handler definitions
- # __file__ = blog/sxc/pages/__init__.py → blog root is 3 levels up
blog_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
load_service_components(blog_dir, service_name="blog")
load_page_dir(os.path.dirname(__file__), "blog")
-
-
-# ---------------------------------------------------------------------------
-# Shared hydration helpers
-# ---------------------------------------------------------------------------
-
-def _add_to_defpage_ctx(**kwargs: Any) -> None:
- from quart import g
- if not hasattr(g, '_defpage_ctx'):
- g._defpage_ctx = {}
- g._defpage_ctx.update(kwargs)
-
-
-async def _ensure_post_data(slug: str | None) -> None:
- """Load post data and set g.post_data + defpage context.
-
- Replicates post bp's hydrate_post_data + context_processor.
- """
- from quart import g, abort
-
- if hasattr(g, 'post_data') and g.post_data:
- await _inject_post_context(g.post_data)
- return
-
- if not slug:
- abort(404)
-
- from bp.post.services.post_data import post_data
-
- is_admin = bool((g.get("rights") or {}).get("admin"))
- p_data = await post_data(slug, g.s, include_drafts=True)
- if not p_data:
- abort(404)
-
- # Draft access control
- if p_data["post"].get("status") != "published":
- if is_admin:
- pass
- elif g.user and p_data["post"].get("user_id") == g.user.id:
- pass
- else:
- abort(404)
-
- g.post_data = p_data
- g.post_slug = slug
- await _inject_post_context(p_data)
-
-
-async def _inject_post_context(p_data: dict) -> None:
- """Add post context_processor data to defpage context."""
- from shared.config import config
- from shared.infrastructure.fragments import fetch_fragment
- from shared.infrastructure.data_client import fetch_data
- from shared.contracts.dtos import CartSummaryDTO, dto_from_dict
- from shared.infrastructure.cart_identity import current_cart_identity
-
- db_post_id = p_data["post"]["id"]
- post_slug = p_data["post"]["slug"]
-
- container_nav = await fetch_fragment("relations", "container-nav", params={
- "container_type": "page",
- "container_id": str(db_post_id),
- "post_slug": post_slug,
- })
-
- ctx: dict = {
- **p_data,
- "base_title": config()["title"],
- "container_nav": container_nav,
- }
-
- if p_data["post"].get("is_page"):
- ident = current_cart_identity()
- summary_params: dict = {"page_slug": post_slug}
- if ident["user_id"] is not None:
- summary_params["user_id"] = ident["user_id"]
- if ident["session_id"] is not None:
- summary_params["session_id"] = ident["session_id"]
- raw_summary = await fetch_data(
- "cart", "cart-summary", params=summary_params, required=False,
- )
- page_summary = dto_from_dict(CartSummaryDTO, raw_summary) if raw_summary else CartSummaryDTO()
- ctx["page_cart_count"] = (
- page_summary.count + page_summary.calendar_count + page_summary.ticket_count
- )
- ctx["page_cart_total"] = float(
- page_summary.total + page_summary.calendar_total + page_summary.ticket_total
- )
-
- _add_to_defpage_ctx(**ctx)
-
-
-# ---------------------------------------------------------------------------
-# Header helpers (moved from sx_components — thin render_to_sx wrappers)
-# ---------------------------------------------------------------------------
-
-async def _blog_header_sx(ctx: dict, *, oob: bool = False) -> str:
- from shared.sx.helpers import render_to_sx
- from shared.sx.parser import SxExpr
- return await render_to_sx("menu-row-sx",
- id="blog-row", level=1,
- link_label_content=SxExpr("(div)"),
- child_id="blog-header-child", oob=oob)
-
-
-async def _settings_header_sx(ctx: dict, *, oob: bool = False) -> str:
- from shared.sx.helpers import render_to_sx
- from shared.sx.parser import SxExpr
- from quart import url_for as qurl
-
- settings_href = qurl("settings.defpage_settings_home")
- label_sx = await render_to_sx("blog-admin-label")
- nav_sx = await _settings_nav_sx(ctx)
-
- return await render_to_sx("menu-row-sx",
- id="root-settings-row", level=1,
- link_href=settings_href,
- link_label_content=SxExpr(label_sx),
- nav=SxExpr(nav_sx) if nav_sx else None,
- child_id="root-settings-header-child", oob=oob)
-
-
-async def _settings_nav_sx(ctx: dict) -> str:
- from shared.sx.helpers import render_to_sx
- return await render_to_sx("blog-settings-nav",
- select_colours=ctx.get("select_colours", ""))
-
-
-async def _sub_settings_header_sx(row_id: str, child_id: str, href: str,
- icon: str, label: str, ctx: dict,
- *, oob: bool = False, nav_sx: str = "") -> str:
- from shared.sx.helpers import render_to_sx
- from shared.sx.parser import SxExpr
-
- label_sx = await render_to_sx("blog-sub-settings-label",
- icon=f"fa fa-{icon}", label=label)
- return await render_to_sx("menu-row-sx",
- id=row_id, level=2,
- link_href=href,
- link_label_content=SxExpr(label_sx),
- nav=SxExpr(nav_sx) if nav_sx else None,
- child_id=child_id, oob=oob)
-
-
-# ---------------------------------------------------------------------------
-# Layouts
-# ---------------------------------------------------------------------------
-
-def _register_blog_layouts() -> None:
- from shared.sx.layouts import register_custom_layout
- register_custom_layout("blog", _blog_full, _blog_oob)
- register_custom_layout("blog-settings", _settings_full, _settings_oob,
- mobile_fn=_settings_mobile)
- register_custom_layout("blog-cache", _cache_full, _cache_oob)
- register_custom_layout("blog-snippets", _snippets_full, _snippets_oob)
- register_custom_layout("blog-menu-items", _menu_items_full, _menu_items_oob)
- register_custom_layout("blog-tag-groups", _tag_groups_full, _tag_groups_oob)
- register_custom_layout("blog-tag-group-edit",
- _tag_group_edit_full, _tag_group_edit_oob)
-
-
-# --- Blog layout (root + blog header) ---
-
-async def _blog_full(ctx: dict, **kw: Any) -> str:
- from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
- from shared.sx.parser import SxExpr
- return await render_to_sx_with_env("blog-layout-full", _ctx_to_env(ctx),
- blog_header=SxExpr(await _blog_header_sx(ctx)))
-
-
-async def _blog_oob(ctx: dict, **kw: Any) -> str:
- from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env, oob_header_sx
- from shared.sx.parser import SxExpr
- rows = await render_to_sx_with_env("blog-layout-full", _ctx_to_env(ctx),
- blog_header=SxExpr(await _blog_header_sx(ctx)))
- return await oob_header_sx("root-header-child", "blog-header-child", rows)
-
-
-# --- Settings layout (root + settings header) ---
-
-async def _settings_full(ctx: dict, **kw: Any) -> str:
- from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
- from shared.sx.parser import SxExpr
- return await render_to_sx_with_env("settings-layout-full", _ctx_to_env(ctx),
- settings_header=SxExpr(await _settings_header_sx(ctx)))
-
-
-async def _settings_oob(ctx: dict, **kw: Any) -> str:
- from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env, oob_header_sx
- from shared.sx.parser import SxExpr
- rows = await render_to_sx_with_env("settings-layout-full", _ctx_to_env(ctx),
- settings_header=SxExpr(await _settings_header_sx(ctx)))
- return await oob_header_sx("root-header-child", "root-settings-header-child", rows)
-
-
-async def _settings_mobile(ctx: dict, **kw: Any) -> str:
- return await _settings_nav_sx(ctx)
-
-
-# --- Sub-settings helpers ---
-
-async def _sub_settings_full(ctx: dict, row_id: str, child_id: str,
- endpoint: str, icon: str, label: str) -> str:
- from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
- from shared.sx.parser import SxExpr
- from quart import url_for as qurl
- return await render_to_sx_with_env("sub-settings-layout-full", _ctx_to_env(ctx),
- settings_header=SxExpr(await _settings_header_sx(ctx)),
- sub_header=SxExpr(await _sub_settings_header_sx(
- row_id, child_id, qurl(endpoint), icon, label, ctx)))
-
-
-async def _sub_settings_oob(ctx: dict, row_id: str, child_id: str,
- endpoint: str, icon: str, label: str) -> str:
- from shared.sx.helpers import oob_header_sx, render_to_sx
- from shared.sx.parser import SxExpr
- from quart import url_for as qurl
- settings_hdr_oob = await _settings_header_sx(ctx, oob=True)
- sub_hdr = await _sub_settings_header_sx(
- row_id, child_id, qurl(endpoint), icon, label, ctx)
- sub_oob = await oob_header_sx("root-settings-header-child", child_id, sub_hdr)
- return await render_to_sx("sub-settings-layout-oob",
- settings_header_oob=SxExpr(settings_hdr_oob),
- sub_header_oob=SxExpr(sub_oob))
-
-
-# --- Cache ---
-
-async def _cache_full(ctx: dict, **kw: Any) -> str:
- return await _sub_settings_full(ctx, "cache-row", "cache-header-child",
- "defpage_cache_page", "refresh", "Cache")
-
-
-async def _cache_oob(ctx: dict, **kw: Any) -> str:
- return await _sub_settings_oob(ctx, "cache-row", "cache-header-child",
- "defpage_cache_page", "refresh", "Cache")
-
-
-# --- Snippets ---
-
-async def _snippets_full(ctx: dict, **kw: Any) -> str:
- return await _sub_settings_full(ctx, "snippets-row", "snippets-header-child",
- "defpage_snippets_page", "puzzle-piece", "Snippets")
-
-
-async def _snippets_oob(ctx: dict, **kw: Any) -> str:
- return await _sub_settings_oob(ctx, "snippets-row", "snippets-header-child",
- "defpage_snippets_page", "puzzle-piece", "Snippets")
-
-
-# --- Menu Items ---
-
-async def _menu_items_full(ctx: dict, **kw: Any) -> str:
- return await _sub_settings_full(ctx, "menu_items-row", "menu_items-header-child",
- "defpage_menu_items_page", "bars", "Menu Items")
-
-
-async def _menu_items_oob(ctx: dict, **kw: Any) -> str:
- return await _sub_settings_oob(ctx, "menu_items-row", "menu_items-header-child",
- "defpage_menu_items_page", "bars", "Menu Items")
-
-
-# --- Tag Groups ---
-
-async def _tag_groups_full(ctx: dict, **kw: Any) -> str:
- return await _sub_settings_full(ctx, "tag-groups-row", "tag-groups-header-child",
- "defpage_tag_groups_page", "tags", "Tag Groups")
-
-
-async def _tag_groups_oob(ctx: dict, **kw: Any) -> str:
- return await _sub_settings_oob(ctx, "tag-groups-row", "tag-groups-header-child",
- "defpage_tag_groups_page", "tags", "Tag Groups")
-
-
-# --- Tag Group Edit ---
-
-async def _tag_group_edit_full(ctx: dict, **kw: Any) -> str:
- from quart import request, url_for as qurl
- from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
- from shared.sx.parser import SxExpr
- g_id = (request.view_args or {}).get("id")
- return await render_to_sx_with_env("sub-settings-layout-full", _ctx_to_env(ctx),
- settings_header=SxExpr(await _settings_header_sx(ctx)),
- sub_header=SxExpr(await _sub_settings_header_sx(
- "tag-groups-row", "tag-groups-header-child",
- qurl("defpage_tag_group_edit", id=g_id),
- "tags", "Tag Groups", ctx)))
-
-
-async def _tag_group_edit_oob(ctx: dict, **kw: Any) -> str:
- from quart import request, url_for as qurl
- from shared.sx.helpers import oob_header_sx, render_to_sx
- from shared.sx.parser import SxExpr
- g_id = (request.view_args or {}).get("id")
- settings_hdr_oob = await _settings_header_sx(ctx, oob=True)
- sub_hdr = await _sub_settings_header_sx(
- "tag-groups-row", "tag-groups-header-child",
- qurl("defpage_tag_group_edit", id=g_id),
- "tags", "Tag Groups", ctx)
- sub_oob = await oob_header_sx("root-settings-header-child", "tag-groups-header-child", sub_hdr)
- return await render_to_sx("sub-settings-layout-oob",
- settings_header_oob=SxExpr(settings_hdr_oob),
- sub_header_oob=SxExpr(sub_oob))
-
-
-# ---------------------------------------------------------------------------
-# Rendering helpers (moved from sx_components)
-# ---------------------------------------------------------------------------
-
-def _raw_html_sx(html: str) -> str:
- """Wrap raw HTML in (raw! "...") so it's valid inside sx source."""
- from shared.sx.parser import serialize as sx_serialize
- if not html:
- return ""
- return "(raw! " + sx_serialize(html) + ")"
-
-
-async def render_editor_panel(save_error: str | None = None, is_page: bool = False) -> str:
- """Build the WYSIWYG editor panel HTML for new post/page creation."""
- import os
- from quart import url_for as qurl, current_app
- from shared.browser.app.csrf import generate_csrf_token
- from shared.sx.helpers import render_to_sx
-
- csrf = generate_csrf_token()
- asset_url_fn = current_app.jinja_env.globals.get("asset_url", lambda p: "")
- editor_css = asset_url_fn("scripts/editor.css")
- editor_js = asset_url_fn("scripts/editor.js")
- sx_editor_js = asset_url_fn("scripts/sx-editor.js")
-
- upload_image_url = qurl("blog.editor_api.upload_image")
- upload_media_url = qurl("blog.editor_api.upload_media")
- upload_file_url = qurl("blog.editor_api.upload_file")
- oembed_url = qurl("blog.editor_api.oembed_proxy")
- snippets_url = qurl("blog.editor_api.list_snippets")
- unsplash_key = os.environ.get("UNSPLASH_ACCESS_KEY", "")
-
- title_placeholder = "Page title..." if is_page else "Post title..."
- create_label = "Create Page" if is_page else "Create Post"
-
- parts: list[str] = []
-
- if save_error:
- parts.append(await render_to_sx("blog-editor-error", error=str(save_error)))
-
- parts.append(await render_to_sx("blog-editor-form",
- csrf=csrf, title_placeholder=title_placeholder,
- create_label=create_label,
- ))
-
- parts.append(await render_to_sx("blog-editor-styles", css_href=editor_css))
- parts.append(await render_to_sx("sx-editor-styles"))
-
- init_js = (
- "console.log('[EDITOR-DEBUG] init script running');\n"
- "(function() {\n"
- " console.log('[EDITOR-DEBUG] IIFE entered, mountEditor=', typeof window.mountEditor);\n"
- " function init() {\n"
- " var csrfToken = document.querySelector('input[name=\"csrf_token\"]').value;\n"
- f" var uploadUrl = '{upload_image_url}';\n"
- " var uploadUrls = {\n"
- " image: uploadUrl,\n"
- f" media: '{upload_media_url}',\n"
- f" file: '{upload_file_url}',\n"
- " };\n"
- "\n"
- " var fileInput = document.getElementById('feature-image-file');\n"
- " var addBtn = document.getElementById('feature-image-add-btn');\n"
- " var deleteBtn = document.getElementById('feature-image-delete-btn');\n"
- " var preview = document.getElementById('feature-image-preview');\n"
- " var emptyState = document.getElementById('feature-image-empty');\n"
- " var filledState = document.getElementById('feature-image-filled');\n"
- " var hiddenUrl = document.getElementById('feature-image-input');\n"
- " var hiddenCaption = document.getElementById('feature-image-caption-input');\n"
- " var captionInput = document.getElementById('feature-image-caption');\n"
- " var uploading = document.getElementById('feature-image-uploading');\n"
- "\n"
- " function showFilled(url) {\n"
- " preview.src = url;\n"
- " hiddenUrl.value = url;\n"
- " emptyState.classList.add('hidden');\n"
- " filledState.classList.remove('hidden');\n"
- " uploading.classList.add('hidden');\n"
- " }\n"
- "\n"
- " function showEmpty() {\n"
- " preview.src = '';\n"
- " hiddenUrl.value = '';\n"
- " hiddenCaption.value = '';\n"
- " captionInput.value = '';\n"
- " emptyState.classList.remove('hidden');\n"
- " filledState.classList.add('hidden');\n"
- " uploading.classList.add('hidden');\n"
- " }\n"
- "\n"
- " function uploadFile(file) {\n"
- " emptyState.classList.add('hidden');\n"
- " uploading.classList.remove('hidden');\n"
- " var fd = new FormData();\n"
- " fd.append('file', file);\n"
- " fetch(uploadUrl, {\n"
- " method: 'POST',\n"
- " body: fd,\n"
- " headers: { 'X-CSRFToken': csrfToken },\n"
- " })\n"
- " .then(function(r) {\n"
- " if (!r.ok) throw new Error('Upload failed (' + r.status + ')');\n"
- " return r.json();\n"
- " })\n"
- " .then(function(data) {\n"
- " var url = data.images && data.images[0] && data.images[0].url;\n"
- " if (url) showFilled(url);\n"
- " else { showEmpty(); alert('Upload succeeded but no image URL returned.'); }\n"
- " })\n"
- " .catch(function(e) {\n"
- " showEmpty();\n"
- " alert(e.message);\n"
- " });\n"
- " }\n"
- "\n"
- " addBtn.addEventListener('click', function() { fileInput.click(); });\n"
- " preview.addEventListener('click', function() { fileInput.click(); });\n"
- " deleteBtn.addEventListener('click', function(e) {\n"
- " e.stopPropagation();\n"
- " showEmpty();\n"
- " });\n"
- " fileInput.addEventListener('change', function() {\n"
- " if (fileInput.files && fileInput.files[0]) {\n"
- " uploadFile(fileInput.files[0]);\n"
- " fileInput.value = '';\n"
- " }\n"
- " });\n"
- " captionInput.addEventListener('input', function() {\n"
- " hiddenCaption.value = captionInput.value;\n"
- " });\n"
- "\n"
- " var excerpt = document.querySelector('textarea[name=\"custom_excerpt\"]');\n"
- " function autoResize() {\n"
- " excerpt.style.height = 'auto';\n"
- " excerpt.style.height = excerpt.scrollHeight + 'px';\n"
- " }\n"
- " excerpt.addEventListener('input', autoResize);\n"
- " autoResize();\n"
- "\n"
- " window.mountEditor('lexical-editor', {\n"
- " initialJson: null,\n"
- " csrfToken: csrfToken,\n"
- " uploadUrls: uploadUrls,\n"
- f" oembedUrl: '{oembed_url}',\n"
- f" unsplashApiKey: '{unsplash_key}',\n"
- f" snippetsUrl: '{snippets_url}',\n"
- " });\n"
- "\n"
- " if (typeof SxEditor !== 'undefined') {\n"
- " SxEditor.mount('sx-editor', {\n"
- " initialSx: (document.getElementById('sx-content-input') || {}).value || null,\n"
- " csrfToken: csrfToken,\n"
- " uploadUrls: uploadUrls,\n"
- f" oembedUrl: '{oembed_url}',\n"
- " onChange: function(sx) {\n"
- " document.getElementById('sx-content-input').value = sx;\n"
- " }\n"
- " });\n"
- " }\n"
- "\n"
- " document.addEventListener('keydown', function(e) {\n"
- " if ((e.ctrlKey || e.metaKey) && e.key === 's') {\n"
- " e.preventDefault();\n"
- " document.getElementById('post-new-form').requestSubmit();\n"
- " }\n"
- " });\n"
- " }\n"
- "\n"
- " if (typeof window.mountEditor === 'function') {\n"
- " init();\n"
- " } else {\n"
- " var _t = setInterval(function() {\n"
- " if (typeof window.mountEditor === 'function') { clearInterval(_t); init(); }\n"
- " }, 50);\n"
- " }\n"
- "})();\n"
- )
- parts.append(await render_to_sx("blog-editor-scripts",
- js_src=editor_js,
- sx_editor_js_src=sx_editor_js,
- init_js=init_js))
-
- from shared.sx.parser import SxExpr
- return await render_to_sx("blog-editor-panel",
- parts=SxExpr("(<> " + " ".join(parts) + ")")) if parts else ""
-
-
-# ---------------------------------------------------------------------------
-# Page helpers (async functions available in .sx defpage expressions)
-# ---------------------------------------------------------------------------
-
-def _register_blog_helpers() -> None:
- from shared.sx.pages import register_page_helpers
- register_page_helpers("blog", {
- "editor-content": _h_editor_content,
- "editor-page-content": _h_editor_page_content,
- "post-admin-content": _h_post_admin_content,
- "post-data-content": _h_post_data_content,
- "post-preview-content": _h_post_preview_content,
- "post-entries-content": _h_post_entries_content,
- "post-settings-content": _h_post_settings_content,
- "post-edit-content": _h_post_edit_content,
- })
-
-
-# --- Editor helpers ---
-
-async def _h_editor_content(**kw):
- return await render_editor_panel()
-
-
-async def _h_editor_page_content(**kw):
- return await render_editor_panel(is_page=True)
-
-
-# --- Post admin helpers ---
-
-async def _h_post_admin_content(slug=None, **kw):
- await _ensure_post_data(slug)
- return '(div :class "pb-8")'
-
-
-async def _h_post_data_content(slug=None, **kw):
- await _ensure_post_data(slug)
- from quart import g
- from markupsafe import escape as esc
-
- original_post = getattr(g, "post_data", {}).get("original_post")
- if original_post is None:
- return _raw_html_sx('
No post data available.
')
-
- tablename = getattr(original_post, "__tablename__", "?")
-
- def _render_scalar_table(obj):
- rows = []
- for col in obj.__mapper__.columns:
- key = col.key
- if key == "_sa_instance_state":
- continue
- val = getattr(obj, key, None)
- if val is None:
- val_html = '\u2014 '
- elif hasattr(val, "isoformat"):
- val_html = f'{esc(val.isoformat())} '
- elif isinstance(val, str):
- val_html = f'{esc(val)} '
- else:
- val_html = f'{esc(str(val))} '
- rows.append(
- f''
- f'{esc(key)} '
- f'{val_html} '
- )
- return (
- ''
- '
'
- ''
- 'Field '
- 'Value '
- ' ' + "".join(rows) + '
'
- )
-
- def _render_model(obj, depth=0, max_depth=2):
- parts = [_render_scalar_table(obj)]
- rel_parts = []
- for rel in obj.__mapper__.relationships:
- rel_name = rel.key
- loaded = rel_name in obj.__dict__
- value = getattr(obj, rel_name, None) if loaded else None
- cardinality = "many" if rel.uselist else "one"
- cls_name = rel.mapper.class_.__name__
- loaded_label = "" if loaded else " \u2022 not loaded "
-
- inner = ""
- if value is None:
- inner = '\u2014 '
- elif rel.uselist:
- items = list(value) if value else []
- inner = f'{len(items)} item{"" if len(items) == 1 else "s"}
'
- if items and depth < max_depth:
- sub_rows = []
- for i, it in enumerate(items, 1):
- ident_parts = []
- for k in ("id", "ghost_id", "uuid", "slug", "name", "title"):
- if k in it.__mapper__.c:
- v = getattr(it, k, "")
- ident_parts.append(f"{k}={v}")
- summary = " \u2022 ".join(ident_parts) if ident_parts else str(it)
- child_html = ""
- if depth < max_depth:
- child_html = f'{_render_model(it, depth + 1, max_depth)}
'
- else:
- child_html = '\u2026max depth reached\u2026
'
- sub_rows.append(
- f''
- f'{i} '
- f'{esc(summary)} {child_html} '
- )
- inner += (
- ''
- '
'
- '# '
- 'Summary '
- + "".join(sub_rows) + '
'
- )
- else:
- child = value
- ident_parts = []
- for k in ("id", "ghost_id", "uuid", "slug", "name", "title"):
- if k in child.__mapper__.c:
- v = getattr(child, k, "")
- ident_parts.append(f"{k}={v}")
- summary = " \u2022 ".join(ident_parts) if ident_parts else str(child)
- inner = f'{esc(summary)} '
- if depth < max_depth:
- inner += f'{_render_model(child, depth + 1, max_depth)}
'
- else:
- inner += '\u2026max depth reached\u2026
'
-
- rel_parts.append(
- f''
- f'
'
- f'Relationship: {esc(rel_name)} '
- f' {cardinality} \u2192 {esc(cls_name)}{loaded_label}
'
- f'
{inner}
'
- )
- if rel_parts:
- parts.append('' + "".join(rel_parts) + '
')
- return '' + "".join(parts) + '
'
-
- html = (
- f''
- f'
Model: Post \u2022 Table: {esc(tablename)}
'
- f'{_render_model(original_post, 0, 2)}
'
- )
- return _raw_html_sx(html)
-
-
-async def _h_post_preview_content(slug=None, **kw):
- await _ensure_post_data(slug)
- from quart import g
- from shared.services.registry import services
- from shared.sx.helpers import render_to_sx
- from shared.sx.parser import SxExpr, serialize as sx_serialize
-
- preview = await services.blog_page.preview_data(g.s)
-
- sections: list[str] = []
- if preview.get("sx_pretty"):
- sections.append(await render_to_sx("blog-preview-section",
- title="S-Expression Source", content=SxExpr(preview["sx_pretty"])))
- if preview.get("json_pretty"):
- sections.append(await render_to_sx("blog-preview-section",
- title="Lexical JSON", content=SxExpr(preview["json_pretty"])))
- if preview.get("sx_rendered"):
- rendered_sx = f'(div :class "blog-content prose max-w-none" (raw! {sx_serialize(preview["sx_rendered"])}))'
- sections.append(await render_to_sx("blog-preview-section",
- title="SX Rendered", content=SxExpr(rendered_sx)))
- if preview.get("lex_rendered"):
- rendered_sx = f'(div :class "blog-content prose max-w-none" (raw! {sx_serialize(preview["lex_rendered"])}))'
- sections.append(await render_to_sx("blog-preview-section",
- title="Lexical Rendered", content=SxExpr(rendered_sx)))
-
- if not sections:
- return '(div :class "p-8 text-stone-500" "No content to preview.")'
-
- inner = " ".join(sections)
- return await render_to_sx("blog-preview-panel", sections=SxExpr(f"(<> {inner})"))
-
-
-async def _h_post_entries_content(slug=None, **kw):
- await _ensure_post_data(slug)
- from quart import g, url_for as qurl
- from sqlalchemy import select
- from markupsafe import escape as esc
- from shared.models.calendars import Calendar
- from shared.utils import host_url
- from bp.post.services.entry_associations import get_post_entry_ids
- from bp.post.admin.routes import _render_associated_entries
-
- post_id = g.post_data["post"]["id"]
- post_slug = g.post_data["post"]["slug"]
- associated_entry_ids = await get_post_entry_ids(post_id)
- result = await g.s.execute(
- select(Calendar)
- .where(Calendar.deleted_at.is_(None))
- .order_by(Calendar.name.asc())
- )
- all_calendars = result.scalars().all()
- for calendar in all_calendars:
- await g.s.refresh(calendar, ["entries", "post"])
-
- # Associated entries list
- assoc_html = await _render_associated_entries(all_calendars, associated_entry_ids, post_slug)
-
- # Calendar browser
- cal_items: list[str] = []
- for cal in all_calendars:
- cal_post = getattr(cal, "post", None)
- cal_fi = getattr(cal_post, "feature_image", None) if cal_post else None
- cal_title = esc(getattr(cal_post, "title", "")) if cal_post else ""
- cal_name = esc(getattr(cal, "name", ""))
- cal_view_url = host_url(qurl("blog.post.admin.calendar_view", slug=post_slug, calendar_id=cal.id))
-
- img_html = (
- f' '
- if cal_fi else
- '
'
- )
- cal_items.append(
- f''
- f''
- f'{img_html}'
- f''
- f'
{cal_name}
'
- f'
{cal_title}
'
- f'
'
- f''
- f'
Loading calendar...
'
- f'
'
- )
-
- if cal_items:
- browser_html = (
- '
Browse Calendars '
- + "".join(cal_items) + ''
- )
- else:
- browser_html = 'Browse Calendars No calendars found.
'
-
- return (
- _raw_html_sx('')
- + assoc_html
- + _raw_html_sx(browser_html + '
')
- )
-
-
-async def _h_post_settings_content(slug=None, **kw):
- await _ensure_post_data(slug)
- from quart import g, request
- from markupsafe import escape as esc
- from models.ghost_content import Post
- from sqlalchemy import select as sa_select
- from sqlalchemy.orm import selectinload
- from shared.browser.app.csrf import generate_csrf_token
- from bp.post.admin.routes import _post_to_edit_dict
-
- post_id = g.post_data["post"]["id"]
- post = (await g.s.execute(
- sa_select(Post)
- .where(Post.id == post_id)
- .options(selectinload(Post.tags))
- )).scalar_one_or_none()
- ghost_post = _post_to_edit_dict(post) if post else {}
- save_success = request.args.get("saved") == "1"
- csrf = generate_csrf_token()
-
- p = g.post_data.get("post", {}) if hasattr(g, "post_data") else {}
- is_page = p.get("is_page", False)
- gp = ghost_post
-
- def field_label(text, field_for=None):
- for_attr = f' for="{field_for}"' if field_for else ''
- return f'{esc(text)} '
-
- input_cls = ('w-full text-[14px] rounded-[6px] border border-stone-200 px-[10px] py-[7px] '
- 'bg-white text-stone-700 placeholder:text-stone-300 '
- 'focus:outline-none focus:border-stone-400 focus:ring-1 focus:ring-stone-300')
- textarea_cls = input_cls + ' resize-y'
-
- def text_input(name, value='', placeholder='', input_type='text', maxlength=None):
- ml = f' maxlength="{maxlength}"' if maxlength else ''
- return (f' ')
-
- def textarea_input(name, value='', placeholder='', rows=3, maxlength=None):
- ml = f' maxlength="{maxlength}"' if maxlength else ''
- return (f'')
-
- def checkbox_input(name, checked=False, label=''):
- chk = ' checked' if checked else ''
- return (f''
- f' '
- f'{esc(label)} ')
-
- def section(title, content, is_open=False):
- open_attr = ' open' if is_open else ''
- return (f''
- f'{esc(title)} '
- f'{content}
')
-
- # General section
- slug_placeholder = 'page-slug' if is_page else 'post-slug'
- pub_at = gp.get("published_at") or ""
- pub_at_val = pub_at[:16] if pub_at else ""
- vis = gp.get("visibility") or "public"
- vis_opts = "".join(
- f'{l} '
- for v, l in [("public", "Public"), ("members", "Members"), ("paid", "Paid")]
- )
-
- general = (
- f'{field_label("Slug", "settings-slug")}{text_input("slug", gp.get("slug") or "", slug_placeholder)}
'
- f'{field_label("Published at", "settings-published_at")}'
- f'
'
- f'{checkbox_input("featured", gp.get("featured"), "Featured page" if is_page else "Featured post")}
'
- f'{field_label("Visibility", "settings-visibility")}'
- f'{vis_opts}
'
- f'{checkbox_input("email_only", gp.get("email_only"), "Email only")}
'
- )
-
- # Tags
- tags = gp.get("tags") or []
- if tags:
- tag_names = ", ".join(getattr(t, "name", t.get("name", "") if isinstance(t, dict) else str(t)) for t in tags)
- else:
- tag_names = ""
- tags_sec = (
- f'{field_label("Tags (comma-separated)", "settings-tags")}'
- f'{text_input("tags", tag_names, "news, updates, featured")}'
- f'
Unknown tags will be created automatically.
'
- )
-
- fi_sec = f'{field_label("Alt text", "settings-feature_image_alt")}{text_input("feature_image_alt", gp.get("feature_image_alt") or "", "Describe the feature image")}
'
-
- seo_sec = (
- f'{field_label("Meta title", "settings-meta_title")}{text_input("meta_title", gp.get("meta_title") or "", "SEO title", maxlength=300)}'
- f'
Recommended: 70 characters. Max: 300.
'
- f'{field_label("Meta description", "settings-meta_description")}{textarea_input("meta_description", gp.get("meta_description") or "", "SEO description", rows=2, maxlength=500)}'
- f'
Recommended: 156 characters.
'
- f'{field_label("Canonical URL", "settings-canonical_url")}{text_input("canonical_url", gp.get("canonical_url") or "", "https://example.com/original-post", input_type="url")}
'
- )
-
- og_sec = (
- f'{field_label("OG title", "settings-og_title")}{text_input("og_title", gp.get("og_title") or "")}
'
- f'{field_label("OG description", "settings-og_description")}{textarea_input("og_description", gp.get("og_description") or "", rows=2)}
'
- f'{field_label("OG image URL", "settings-og_image")}{text_input("og_image", gp.get("og_image") or "", "https://...", input_type="url")}
'
- )
-
- tw_sec = (
- f'{field_label("Twitter title", "settings-twitter_title")}{text_input("twitter_title", gp.get("twitter_title") or "")}
'
- f'{field_label("Twitter description", "settings-twitter_description")}{textarea_input("twitter_description", gp.get("twitter_description") or "", rows=2)}
'
- f'{field_label("Twitter image URL", "settings-twitter_image")}{text_input("twitter_image", gp.get("twitter_image") or "", "https://...", input_type="url")}
'
- )
-
- tmpl_placeholder = 'custom-page.hbs' if is_page else 'custom-post.hbs'
- adv_sec = f'{field_label("Custom template", "settings-custom_template")}{text_input("custom_template", gp.get("custom_template") or "", tmpl_placeholder)}
'
-
- sections = (
- section("General", general, is_open=True)
- + section("Tags", tags_sec)
- + section("Feature Image", fi_sec)
- + section("SEO / Meta", seo_sec)
- + section("Facebook / OpenGraph", og_sec)
- + section("X / Twitter", tw_sec)
- + section("Advanced", adv_sec)
- )
-
- saved_html = 'Saved. ' if save_success else ''
-
- html = (
- f''
- )
- return _raw_html_sx(html)
-
-
-async def _h_post_edit_content(slug=None, **kw):
- await _ensure_post_data(slug)
- import os
- from quart import g, request as qrequest, url_for as qurl, current_app
- from models.ghost_content import Post
- from sqlalchemy import select as sa_select
- from sqlalchemy.orm import selectinload
- from shared.infrastructure.data_client import fetch_data
- from shared.browser.app.csrf import generate_csrf_token
- from shared.sx.helpers import render_to_sx
- from shared.sx.parser import SxExpr, serialize as sx_serialize
- from bp.post.admin.routes import _post_to_edit_dict
-
- post_id = g.post_data["post"]["id"]
- db_post = (await g.s.execute(
- sa_select(Post)
- .where(Post.id == post_id)
- .options(selectinload(Post.tags))
- )).scalar_one_or_none()
- ghost_post = _post_to_edit_dict(db_post) if db_post else {}
- save_success = qrequest.args.get("saved") == "1"
- save_error = qrequest.args.get("error", "")
- raw_newsletters = await fetch_data("account", "newsletters", required=False) or []
- from types import SimpleNamespace
- newsletters = [SimpleNamespace(**nl) for nl in raw_newsletters]
-
- csrf = generate_csrf_token()
- asset_url_fn = current_app.jinja_env.globals.get("asset_url", lambda p: "")
- editor_css = asset_url_fn("scripts/editor.css")
- editor_js = asset_url_fn("scripts/editor.js")
- sx_editor_js = asset_url_fn("scripts/sx-editor.js")
-
- upload_image_url = qurl("blog.editor_api.upload_image")
- upload_media_url = qurl("blog.editor_api.upload_media")
- upload_file_url = qurl("blog.editor_api.upload_file")
- oembed_url = qurl("blog.editor_api.oembed_proxy")
- snippets_url = qurl("blog.editor_api.list_snippets")
- unsplash_key = os.environ.get("UNSPLASH_ACCESS_KEY", "")
-
- post = g.post_data.get("post", {}) if hasattr(g, "post_data") else {}
- is_page = post.get("is_page", False)
-
- feature_image = ghost_post.get("feature_image") or ""
- feature_image_caption = ghost_post.get("feature_image_caption") or ""
- title_val = ghost_post.get("title") or ""
- excerpt_val = ghost_post.get("custom_excerpt") or ""
- updated_at = ghost_post.get("updated_at") or ""
- status = ghost_post.get("status") or "draft"
- lexical_json = ghost_post.get("lexical") or '{"root":{"children":[{"children":[],"direction":null,"format":"","indent":0,"type":"paragraph","version":1}],"direction":null,"format":"","indent":0,"type":"root","version":1}}'
- sx_content = ghost_post.get("sx_content") or ""
- has_sx = bool(sx_content)
-
- already_emailed = bool(ghost_post and ghost_post.get("email") and (ghost_post["email"] if isinstance(ghost_post["email"], dict) else {}).get("status"))
- email_obj = ghost_post.get("email")
- if email_obj and not isinstance(email_obj, dict):
- already_emailed = bool(getattr(email_obj, "status", None))
-
- title_placeholder = "Page title..." if is_page else "Post title..."
-
- # Newsletter options as SX fragment
- nl_parts = ['(option :value "" "Select newsletter\u2026")']
- for nl in newsletters:
- nl_slug = sx_serialize(getattr(nl, "slug", ""))
- nl_name = sx_serialize(getattr(nl, "name", ""))
- nl_parts.append(f"(option :value {nl_slug} {nl_name})")
- nl_opts_sx = SxExpr("(<> " + " ".join(nl_parts) + ")")
-
- # Footer extra badges as SX fragment
- badge_parts: list[str] = []
- if save_success:
- badge_parts.append('(span :class "text-[14px] text-green-600" "Saved.")')
- publish_requested = qrequest.args.get("publish_requested") if hasattr(qrequest, 'args') else None
- if publish_requested:
- badge_parts.append('(span :class "text-[14px] text-blue-600" "Publish requested \u2014 an admin will review.")')
- if post.get("publish_requested"):
- badge_parts.append('(span :class "inline-block px-2 py-0.5 rounded-full text-xs font-semibold bg-blue-100 text-blue-800" "Publish requested")')
- if already_emailed:
- nl_name = ""
- newsletter = ghost_post.get("newsletter")
- if newsletter:
- nl_name = getattr(newsletter, "name", "") if not isinstance(newsletter, dict) else newsletter.get("name", "")
- suffix = f" to {nl_name}" if nl_name else ""
- badge_parts.append(f'(span :class "inline-block px-2 py-0.5 rounded-full text-xs font-semibold bg-green-100 text-green-800" "Emailed{suffix}")')
- footer_extra_sx = SxExpr("(<> " + " ".join(badge_parts) + ")") if badge_parts else None
-
- parts: list[str] = []
-
- if save_error:
- parts.append(await render_to_sx("blog-editor-error", error=save_error))
-
- parts.append(await render_to_sx("blog-editor-edit-form",
- csrf=csrf,
- updated_at=str(updated_at),
- title_val=title_val,
- excerpt_val=excerpt_val,
- feature_image=feature_image,
- feature_image_caption=feature_image_caption,
- sx_content_val=sx_content,
- lexical_json=lexical_json,
- has_sx=has_sx,
- title_placeholder=title_placeholder,
- status=status,
- already_emailed=already_emailed,
- newsletter_options=nl_opts_sx,
- footer_extra=footer_extra_sx,
- ))
-
- parts.append(await render_to_sx("blog-editor-publish-js", already_emailed=already_emailed))
- parts.append(await render_to_sx("blog-editor-styles", css_href=editor_css))
- parts.append(await render_to_sx("sx-editor-styles"))
-
- init_js = (
- '(function() {'
- " function applyEditorFontSize() {"
- " document.documentElement.style.fontSize = '62.5%';"
- " document.body.style.fontSize = '1.6rem';"
- ' }'
- " function restoreDefaultFontSize() {"
- " document.documentElement.style.fontSize = '';"
- " document.body.style.fontSize = '';"
- ' }'
- ' applyEditorFontSize();'
- " document.body.addEventListener('htmx:beforeSwap', function cleanup(e) {"
- " if (e.detail.target && e.detail.target.id === 'main-panel') {"
- ' restoreDefaultFontSize();'
- " document.body.removeEventListener('htmx:beforeSwap', cleanup);"
- ' }'
- ' });'
- ' function init() {'
- " var csrfToken = document.querySelector('input[name=\"csrf_token\"]').value;"
- f" var uploadUrl = '{upload_image_url}';"
- ' var uploadUrls = {'
- ' image: uploadUrl,'
- f" media: '{upload_media_url}',"
- f" file: '{upload_file_url}',"
- ' };'
- " var fileInput = document.getElementById('feature-image-file');"
- " var addBtn = document.getElementById('feature-image-add-btn');"
- " var deleteBtn = document.getElementById('feature-image-delete-btn');"
- " var preview = document.getElementById('feature-image-preview');"
- " var emptyState = document.getElementById('feature-image-empty');"
- " var filledState = document.getElementById('feature-image-filled');"
- " var hiddenUrl = document.getElementById('feature-image-input');"
- " var hiddenCaption = document.getElementById('feature-image-caption-input');"
- " var captionInput = document.getElementById('feature-image-caption');"
- " var uploading = document.getElementById('feature-image-uploading');"
- ' function showFilled(url) {'
- ' preview.src = url; hiddenUrl.value = url;'
- " emptyState.classList.add('hidden'); filledState.classList.remove('hidden'); uploading.classList.add('hidden');"
- ' }'
- ' function showEmpty() {'
- " preview.src = ''; hiddenUrl.value = ''; hiddenCaption.value = ''; captionInput.value = '';"
- " emptyState.classList.remove('hidden'); filledState.classList.add('hidden'); uploading.classList.add('hidden');"
- ' }'
- ' function uploadFile(file) {'
- " emptyState.classList.add('hidden'); uploading.classList.remove('hidden');"
- " var fd = new FormData(); fd.append('file', file);"
- " fetch(uploadUrl, { method: 'POST', body: fd, headers: { 'X-CSRFToken': csrfToken } })"
- " .then(function(r) { if (!r.ok) throw new Error('Upload failed (' + r.status + ')'); return r.json(); })"
- ' .then(function(data) {'
- ' var url = data.images && data.images[0] && data.images[0].url;'
- " if (url) showFilled(url); else { showEmpty(); alert('Upload succeeded but no image URL returned.'); }"
- ' })'
- ' .catch(function(e) { showEmpty(); alert(e.message); });'
- ' }'
- " addBtn.addEventListener('click', function() { fileInput.click(); });"
- " preview.addEventListener('click', function() { fileInput.click(); });"
- " deleteBtn.addEventListener('click', function(e) { e.stopPropagation(); showEmpty(); });"
- " fileInput.addEventListener('change', function() {"
- ' if (fileInput.files && fileInput.files[0]) { uploadFile(fileInput.files[0]); fileInput.value = \'\'; }'
- ' });'
- " captionInput.addEventListener('input', function() { hiddenCaption.value = captionInput.value; });"
- " var excerpt = document.querySelector('textarea[name=\"custom_excerpt\"]');"
- " function autoResize() { excerpt.style.height = 'auto'; excerpt.style.height = excerpt.scrollHeight + 'px'; }"
- " excerpt.addEventListener('input', autoResize); autoResize();"
- ' var dataEl = document.getElementById(\'lexical-initial-data\');'
- ' var initialJson = dataEl ? dataEl.textContent.trim() : null;'
- ' if (initialJson) { var hidden = document.getElementById(\'lexical-json-input\'); if (hidden) hidden.value = initialJson; }'
- " window.mountEditor('lexical-editor', {"
- ' initialJson: initialJson,'
- ' csrfToken: csrfToken,'
- ' uploadUrls: uploadUrls,'
- f" oembedUrl: '{oembed_url}',"
- f" unsplashApiKey: '{unsplash_key}',"
- f" snippetsUrl: '{snippets_url}',"
- ' });'
- " if (typeof SxEditor !== 'undefined') {"
- " SxEditor.mount('sx-editor', {"
- " initialSx: (document.getElementById('sx-content-input') || {}).value || null,"
- ' csrfToken: csrfToken,'
- ' uploadUrls: uploadUrls,'
- f" oembedUrl: '{oembed_url}',"
- ' onChange: function(sx) {'
- " document.getElementById('sx-content-input').value = sx;"
- ' }'
- ' });'
- ' }'
- " document.addEventListener('keydown', function(e) {"
- " if ((e.ctrlKey || e.metaKey) && e.key === 's') {"
- " e.preventDefault(); document.getElementById('post-edit-form').requestSubmit();"
- ' }'
- ' });'
- ' }'
- " if (typeof window.mountEditor === 'function') { init(); }"
- ' else { var _t = setInterval(function() {'
- " if (typeof window.mountEditor === 'function') { clearInterval(_t); init(); }"
- ' }, 50); }'
- '})();'
- )
- parts.append(await render_to_sx("blog-editor-scripts",
- js_src=editor_js,
- sx_editor_js_src=sx_editor_js,
- init_js=init_js))
-
- return await render_to_sx("blog-editor-panel",
- parts=SxExpr("(<> " + " ".join(parts) + ")"))
-
-
diff --git a/blog/sxc/pages/helpers.py b/blog/sxc/pages/helpers.py
new file mode 100644
index 0000000..4c75d33
--- /dev/null
+++ b/blog/sxc/pages/helpers.py
@@ -0,0 +1,713 @@
+"""Blog page helpers — async functions available in .sx defpage expressions."""
+from __future__ import annotations
+
+from typing import Any
+
+
+# ---------------------------------------------------------------------------
+# Shared hydration helpers
+# ---------------------------------------------------------------------------
+
+def _add_to_defpage_ctx(**kwargs: Any) -> None:
+ from quart import g
+ if not hasattr(g, '_defpage_ctx'):
+ g._defpage_ctx = {}
+ g._defpage_ctx.update(kwargs)
+
+
+async def _ensure_post_data(slug: str | None) -> None:
+ """Load post data and set g.post_data + defpage context.
+
+ Replicates post bp's hydrate_post_data + context_processor.
+ """
+ from quart import g, abort
+
+ if hasattr(g, 'post_data') and g.post_data:
+ await _inject_post_context(g.post_data)
+ return
+
+ if not slug:
+ abort(404)
+
+ from bp.post.services.post_data import post_data
+
+ is_admin = bool((g.get("rights") or {}).get("admin"))
+ p_data = await post_data(slug, g.s, include_drafts=True)
+ if not p_data:
+ abort(404)
+
+ # Draft access control
+ if p_data["post"].get("status") != "published":
+ if is_admin:
+ pass
+ elif g.user and p_data["post"].get("user_id") == g.user.id:
+ pass
+ else:
+ abort(404)
+
+ g.post_data = p_data
+ g.post_slug = slug
+ await _inject_post_context(p_data)
+
+
+async def _inject_post_context(p_data: dict) -> None:
+ """Add post context_processor data to defpage context."""
+ from shared.config import config
+ from shared.infrastructure.fragments import fetch_fragment
+ from shared.infrastructure.data_client import fetch_data
+ from shared.contracts.dtos import CartSummaryDTO, dto_from_dict
+ from shared.infrastructure.cart_identity import current_cart_identity
+
+ db_post_id = p_data["post"]["id"]
+ post_slug = p_data["post"]["slug"]
+
+ container_nav = await fetch_fragment("relations", "container-nav", params={
+ "container_type": "page",
+ "container_id": str(db_post_id),
+ "post_slug": post_slug,
+ })
+
+ ctx: dict = {
+ **p_data,
+ "base_title": config()["title"],
+ "container_nav": container_nav,
+ }
+
+ if p_data["post"].get("is_page"):
+ ident = current_cart_identity()
+ summary_params: dict = {"page_slug": post_slug}
+ if ident["user_id"] is not None:
+ summary_params["user_id"] = ident["user_id"]
+ if ident["session_id"] is not None:
+ summary_params["session_id"] = ident["session_id"]
+ raw_summary = await fetch_data(
+ "cart", "cart-summary", params=summary_params, required=False,
+ )
+ page_summary = dto_from_dict(CartSummaryDTO, raw_summary) if raw_summary else CartSummaryDTO()
+ ctx["page_cart_count"] = (
+ page_summary.count + page_summary.calendar_count + page_summary.ticket_count
+ )
+ ctx["page_cart_total"] = float(
+ page_summary.total + page_summary.calendar_total + page_summary.ticket_total
+ )
+
+ _add_to_defpage_ctx(**ctx)
+
+
+# ---------------------------------------------------------------------------
+# Rendering helpers (moved from sx_components)
+# ---------------------------------------------------------------------------
+
+def _raw_html_sx(html: str) -> str:
+ """Wrap raw HTML in (raw! "...") so it's valid inside sx source."""
+ from shared.sx.parser import serialize as sx_serialize
+ if not html:
+ return ""
+ return "(raw! " + sx_serialize(html) + ")"
+
+
+# ---------------------------------------------------------------------------
+# Page helpers (async functions available in .sx defpage expressions)
+# ---------------------------------------------------------------------------
+
+def _register_blog_helpers() -> None:
+ from shared.sx.pages import register_page_helpers
+ register_page_helpers("blog", {
+ "editor-content": _h_editor_content,
+ "editor-page-content": _h_editor_page_content,
+ "post-admin-content": _h_post_admin_content,
+ "post-data-content": _h_post_data_content,
+ "post-preview-content": _h_post_preview_content,
+ "post-entries-content": _h_post_entries_content,
+ "post-settings-content": _h_post_settings_content,
+ "post-edit-content": _h_post_edit_content,
+ })
+
+
+# --- Editor helpers ---
+
+async def _h_editor_content(**kw):
+ from .renders import render_editor_panel
+ return await render_editor_panel()
+
+
+async def _h_editor_page_content(**kw):
+ from .renders import render_editor_panel
+ return await render_editor_panel(is_page=True)
+
+
+# --- Post admin helpers ---
+
+async def _h_post_admin_content(slug=None, **kw):
+ await _ensure_post_data(slug)
+ return '(div :class "pb-8")'
+
+
+async def _h_post_data_content(slug=None, **kw):
+ await _ensure_post_data(slug)
+ from quart import g
+ from markupsafe import escape as esc
+
+ original_post = getattr(g, "post_data", {}).get("original_post")
+ if original_post is None:
+ return _raw_html_sx('No post data available.
')
+
+ tablename = getattr(original_post, "__tablename__", "?")
+
+ def _render_scalar_table(obj):
+ rows = []
+ for col in obj.__mapper__.columns:
+ key = col.key
+ if key == "_sa_instance_state":
+ continue
+ val = getattr(obj, key, None)
+ if val is None:
+ val_html = '\u2014 '
+ elif hasattr(val, "isoformat"):
+ val_html = f'{esc(val.isoformat())} '
+ elif isinstance(val, str):
+ val_html = f'{esc(val)} '
+ else:
+ val_html = f'{esc(str(val))} '
+ rows.append(
+ f''
+ f'{esc(key)} '
+ f'{val_html} '
+ )
+ return (
+ ''
+ '
'
+ ''
+ 'Field '
+ 'Value '
+ ' ' + "".join(rows) + '
'
+ )
+
+ def _render_model(obj, depth=0, max_depth=2):
+ parts = [_render_scalar_table(obj)]
+ rel_parts = []
+ for rel in obj.__mapper__.relationships:
+ rel_name = rel.key
+ loaded = rel_name in obj.__dict__
+ value = getattr(obj, rel_name, None) if loaded else None
+ cardinality = "many" if rel.uselist else "one"
+ cls_name = rel.mapper.class_.__name__
+ loaded_label = "" if loaded else " \u2022 not loaded "
+
+ inner = ""
+ if value is None:
+ inner = '\u2014 '
+ elif rel.uselist:
+ items = list(value) if value else []
+ inner = f'{len(items)} item{"" if len(items) == 1 else "s"}
'
+ if items and depth < max_depth:
+ sub_rows = []
+ for i, it in enumerate(items, 1):
+ ident_parts = []
+ for k in ("id", "ghost_id", "uuid", "slug", "name", "title"):
+ if k in it.__mapper__.c:
+ v = getattr(it, k, "")
+ ident_parts.append(f"{k}={v}")
+ summary = " \u2022 ".join(ident_parts) if ident_parts else str(it)
+ child_html = ""
+ if depth < max_depth:
+ child_html = f'{_render_model(it, depth + 1, max_depth)}
'
+ else:
+ child_html = '\u2026max depth reached\u2026
'
+ sub_rows.append(
+ f''
+ f'{i} '
+ f'{esc(summary)} {child_html} '
+ )
+ inner += (
+ ''
+ '
'
+ '# '
+ 'Summary '
+ + "".join(sub_rows) + '
'
+ )
+ else:
+ child = value
+ ident_parts = []
+ for k in ("id", "ghost_id", "uuid", "slug", "name", "title"):
+ if k in child.__mapper__.c:
+ v = getattr(child, k, "")
+ ident_parts.append(f"{k}={v}")
+ summary = " \u2022 ".join(ident_parts) if ident_parts else str(child)
+ inner = f'{esc(summary)} '
+ if depth < max_depth:
+ inner += f'{_render_model(child, depth + 1, max_depth)}
'
+ else:
+ inner += '\u2026max depth reached\u2026
'
+
+ rel_parts.append(
+ f''
+ f'
'
+ f'Relationship: {esc(rel_name)} '
+ f' {cardinality} \u2192 {esc(cls_name)}{loaded_label}
'
+ f'
{inner}
'
+ )
+ if rel_parts:
+ parts.append('' + "".join(rel_parts) + '
')
+ return '' + "".join(parts) + '
'
+
+ html = (
+ f''
+ f'
Model: Post \u2022 Table: {esc(tablename)}
'
+ f'{_render_model(original_post, 0, 2)}
'
+ )
+ return _raw_html_sx(html)
+
+
+async def _h_post_preview_content(slug=None, **kw):
+ await _ensure_post_data(slug)
+ from quart import g
+ from shared.services.registry import services
+ from shared.sx.helpers import render_to_sx
+ from shared.sx.parser import SxExpr, serialize as sx_serialize
+
+ preview = await services.blog_page.preview_data(g.s)
+
+ sections: list[str] = []
+ if preview.get("sx_pretty"):
+ sections.append(await render_to_sx("blog-preview-section",
+ title="S-Expression Source", content=SxExpr(preview["sx_pretty"])))
+ if preview.get("json_pretty"):
+ sections.append(await render_to_sx("blog-preview-section",
+ title="Lexical JSON", content=SxExpr(preview["json_pretty"])))
+ if preview.get("sx_rendered"):
+ rendered_sx = f'(div :class "blog-content prose max-w-none" (raw! {sx_serialize(preview["sx_rendered"])}))'
+ sections.append(await render_to_sx("blog-preview-section",
+ title="SX Rendered", content=SxExpr(rendered_sx)))
+ if preview.get("lex_rendered"):
+ rendered_sx = f'(div :class "blog-content prose max-w-none" (raw! {sx_serialize(preview["lex_rendered"])}))'
+ sections.append(await render_to_sx("blog-preview-section",
+ title="Lexical Rendered", content=SxExpr(rendered_sx)))
+
+ if not sections:
+ return '(div :class "p-8 text-stone-500" "No content to preview.")'
+
+ inner = " ".join(sections)
+ return await render_to_sx("blog-preview-panel", sections=SxExpr(f"(<> {inner})"))
+
+
+async def _h_post_entries_content(slug=None, **kw):
+ await _ensure_post_data(slug)
+ from quart import g, url_for as qurl
+ from sqlalchemy import select
+ from markupsafe import escape as esc
+ from shared.models.calendars import Calendar
+ from shared.utils import host_url
+ from bp.post.services.entry_associations import get_post_entry_ids
+ from bp.post.admin.routes import _render_associated_entries
+
+ post_id = g.post_data["post"]["id"]
+ post_slug = g.post_data["post"]["slug"]
+ associated_entry_ids = await get_post_entry_ids(post_id)
+ result = await g.s.execute(
+ select(Calendar)
+ .where(Calendar.deleted_at.is_(None))
+ .order_by(Calendar.name.asc())
+ )
+ all_calendars = result.scalars().all()
+ for calendar in all_calendars:
+ await g.s.refresh(calendar, ["entries", "post"])
+
+ # Associated entries list
+ assoc_html = await _render_associated_entries(all_calendars, associated_entry_ids, post_slug)
+
+ # Calendar browser
+ cal_items: list[str] = []
+ for cal in all_calendars:
+ cal_post = getattr(cal, "post", None)
+ cal_fi = getattr(cal_post, "feature_image", None) if cal_post else None
+ cal_title = esc(getattr(cal_post, "title", "")) if cal_post else ""
+ cal_name = esc(getattr(cal, "name", ""))
+ cal_view_url = host_url(qurl("blog.post.admin.calendar_view", slug=post_slug, calendar_id=cal.id))
+
+ img_html = (
+ f' '
+ if cal_fi else
+ '
'
+ )
+ cal_items.append(
+ f''
+ f''
+ f'{img_html}'
+ f''
+ f'
{cal_name}
'
+ f'
{cal_title}
'
+ f'
'
+ f''
+ f'
Loading calendar...
'
+ f'
'
+ )
+
+ if cal_items:
+ browser_html = (
+ '
Browse Calendars '
+ + "".join(cal_items) + ''
+ )
+ else:
+ browser_html = 'Browse Calendars No calendars found.
'
+
+ return (
+ _raw_html_sx('')
+ + assoc_html
+ + _raw_html_sx(browser_html + '
')
+ )
+
+
+async def _h_post_settings_content(slug=None, **kw):
+ await _ensure_post_data(slug)
+ from quart import g, request
+ from markupsafe import escape as esc
+ from models.ghost_content import Post
+ from sqlalchemy import select as sa_select
+ from sqlalchemy.orm import selectinload
+ from shared.browser.app.csrf import generate_csrf_token
+ from bp.post.admin.routes import _post_to_edit_dict
+
+ post_id = g.post_data["post"]["id"]
+ post = (await g.s.execute(
+ sa_select(Post)
+ .where(Post.id == post_id)
+ .options(selectinload(Post.tags))
+ )).scalar_one_or_none()
+ ghost_post = _post_to_edit_dict(post) if post else {}
+ save_success = request.args.get("saved") == "1"
+ csrf = generate_csrf_token()
+
+ p = g.post_data.get("post", {}) if hasattr(g, "post_data") else {}
+ is_page = p.get("is_page", False)
+ gp = ghost_post
+
+ def field_label(text, field_for=None):
+ for_attr = f' for="{field_for}"' if field_for else ''
+ return f'{esc(text)} '
+
+ input_cls = ('w-full text-[14px] rounded-[6px] border border-stone-200 px-[10px] py-[7px] '
+ 'bg-white text-stone-700 placeholder:text-stone-300 '
+ 'focus:outline-none focus:border-stone-400 focus:ring-1 focus:ring-stone-300')
+ textarea_cls = input_cls + ' resize-y'
+
+ def text_input(name, value='', placeholder='', input_type='text', maxlength=None):
+ ml = f' maxlength="{maxlength}"' if maxlength else ''
+ return (f' ')
+
+ def textarea_input(name, value='', placeholder='', rows=3, maxlength=None):
+ ml = f' maxlength="{maxlength}"' if maxlength else ''
+ return (f'')
+
+ def checkbox_input(name, checked=False, label=''):
+ chk = ' checked' if checked else ''
+ return (f''
+ f' '
+ f'{esc(label)} ')
+
+ def section(title, content, is_open=False):
+ open_attr = ' open' if is_open else ''
+ return (f''
+ f'{esc(title)} '
+ f'{content}
')
+
+ # General section
+ slug_placeholder = 'page-slug' if is_page else 'post-slug'
+ pub_at = gp.get("published_at") or ""
+ pub_at_val = pub_at[:16] if pub_at else ""
+ vis = gp.get("visibility") or "public"
+ vis_opts = "".join(
+ f'{l} '
+ for v, l in [("public", "Public"), ("members", "Members"), ("paid", "Paid")]
+ )
+
+ general = (
+ f'{field_label("Slug", "settings-slug")}{text_input("slug", gp.get("slug") or "", slug_placeholder)}
'
+ f'{field_label("Published at", "settings-published_at")}'
+ f'
'
+ f'{checkbox_input("featured", gp.get("featured"), "Featured page" if is_page else "Featured post")}
'
+ f'{field_label("Visibility", "settings-visibility")}'
+ f'{vis_opts}
'
+ f'{checkbox_input("email_only", gp.get("email_only"), "Email only")}
'
+ )
+
+ # Tags
+ tags = gp.get("tags") or []
+ if tags:
+ tag_names = ", ".join(getattr(t, "name", t.get("name", "") if isinstance(t, dict) else str(t)) for t in tags)
+ else:
+ tag_names = ""
+ tags_sec = (
+ f'{field_label("Tags (comma-separated)", "settings-tags")}'
+ f'{text_input("tags", tag_names, "news, updates, featured")}'
+ f'
Unknown tags will be created automatically.
'
+ )
+
+ fi_sec = f'{field_label("Alt text", "settings-feature_image_alt")}{text_input("feature_image_alt", gp.get("feature_image_alt") or "", "Describe the feature image")}
'
+
+ seo_sec = (
+ f'{field_label("Meta title", "settings-meta_title")}{text_input("meta_title", gp.get("meta_title") or "", "SEO title", maxlength=300)}'
+ f'
Recommended: 70 characters. Max: 300.
'
+ f'{field_label("Meta description", "settings-meta_description")}{textarea_input("meta_description", gp.get("meta_description") or "", "SEO description", rows=2, maxlength=500)}'
+ f'
Recommended: 156 characters.
'
+ f'{field_label("Canonical URL", "settings-canonical_url")}{text_input("canonical_url", gp.get("canonical_url") or "", "https://example.com/original-post", input_type="url")}
'
+ )
+
+ og_sec = (
+ f'{field_label("OG title", "settings-og_title")}{text_input("og_title", gp.get("og_title") or "")}
'
+ f'{field_label("OG description", "settings-og_description")}{textarea_input("og_description", gp.get("og_description") or "", rows=2)}
'
+ f'{field_label("OG image URL", "settings-og_image")}{text_input("og_image", gp.get("og_image") or "", "https://...", input_type="url")}
'
+ )
+
+ tw_sec = (
+ f'{field_label("Twitter title", "settings-twitter_title")}{text_input("twitter_title", gp.get("twitter_title") or "")}
'
+ f'{field_label("Twitter description", "settings-twitter_description")}{textarea_input("twitter_description", gp.get("twitter_description") or "", rows=2)}
'
+ f'{field_label("Twitter image URL", "settings-twitter_image")}{text_input("twitter_image", gp.get("twitter_image") or "", "https://...", input_type="url")}
'
+ )
+
+ tmpl_placeholder = 'custom-page.hbs' if is_page else 'custom-post.hbs'
+ adv_sec = f'{field_label("Custom template", "settings-custom_template")}{text_input("custom_template", gp.get("custom_template") or "", tmpl_placeholder)}
'
+
+ sections = (
+ section("General", general, is_open=True)
+ + section("Tags", tags_sec)
+ + section("Feature Image", fi_sec)
+ + section("SEO / Meta", seo_sec)
+ + section("Facebook / OpenGraph", og_sec)
+ + section("X / Twitter", tw_sec)
+ + section("Advanced", adv_sec)
+ )
+
+ saved_html = 'Saved. ' if save_success else ''
+
+ html = (
+ f''
+ )
+ return _raw_html_sx(html)
+
+
+async def _h_post_edit_content(slug=None, **kw):
+ await _ensure_post_data(slug)
+ import os
+ from quart import g, request as qrequest, url_for as qurl, current_app
+ from models.ghost_content import Post
+ from sqlalchemy import select as sa_select
+ from sqlalchemy.orm import selectinload
+ from shared.infrastructure.data_client import fetch_data
+ from shared.browser.app.csrf import generate_csrf_token
+ from shared.sx.helpers import render_to_sx
+ from shared.sx.parser import SxExpr, serialize as sx_serialize
+ from bp.post.admin.routes import _post_to_edit_dict
+
+ post_id = g.post_data["post"]["id"]
+ db_post = (await g.s.execute(
+ sa_select(Post)
+ .where(Post.id == post_id)
+ .options(selectinload(Post.tags))
+ )).scalar_one_or_none()
+ ghost_post = _post_to_edit_dict(db_post) if db_post else {}
+ save_success = qrequest.args.get("saved") == "1"
+ save_error = qrequest.args.get("error", "")
+ raw_newsletters = await fetch_data("account", "newsletters", required=False) or []
+ from types import SimpleNamespace
+ newsletters = [SimpleNamespace(**nl) for nl in raw_newsletters]
+
+ csrf = generate_csrf_token()
+ asset_url_fn = current_app.jinja_env.globals.get("asset_url", lambda p: "")
+ editor_css = asset_url_fn("scripts/editor.css")
+ editor_js = asset_url_fn("scripts/editor.js")
+ sx_editor_js = asset_url_fn("scripts/sx-editor.js")
+
+ upload_image_url = qurl("blog.editor_api.upload_image")
+ upload_media_url = qurl("blog.editor_api.upload_media")
+ upload_file_url = qurl("blog.editor_api.upload_file")
+ oembed_url = qurl("blog.editor_api.oembed_proxy")
+ snippets_url = qurl("blog.editor_api.list_snippets")
+ unsplash_key = os.environ.get("UNSPLASH_ACCESS_KEY", "")
+
+ post = g.post_data.get("post", {}) if hasattr(g, "post_data") else {}
+ is_page = post.get("is_page", False)
+
+ feature_image = ghost_post.get("feature_image") or ""
+ feature_image_caption = ghost_post.get("feature_image_caption") or ""
+ title_val = ghost_post.get("title") or ""
+ excerpt_val = ghost_post.get("custom_excerpt") or ""
+ updated_at = ghost_post.get("updated_at") or ""
+ status = ghost_post.get("status") or "draft"
+ lexical_json = ghost_post.get("lexical") or '{"root":{"children":[{"children":[],"direction":null,"format":"","indent":0,"type":"paragraph","version":1}],"direction":null,"format":"","indent":0,"type":"root","version":1}}'
+ sx_content = ghost_post.get("sx_content") or ""
+ has_sx = bool(sx_content)
+
+ already_emailed = bool(ghost_post and ghost_post.get("email") and (ghost_post["email"] if isinstance(ghost_post["email"], dict) else {}).get("status"))
+ email_obj = ghost_post.get("email")
+ if email_obj and not isinstance(email_obj, dict):
+ already_emailed = bool(getattr(email_obj, "status", None))
+
+ title_placeholder = "Page title..." if is_page else "Post title..."
+
+ # Newsletter options as SX fragment
+ nl_parts = ['(option :value "" "Select newsletter\u2026")']
+ for nl in newsletters:
+ nl_slug = sx_serialize(getattr(nl, "slug", ""))
+ nl_name = sx_serialize(getattr(nl, "name", ""))
+ nl_parts.append(f"(option :value {nl_slug} {nl_name})")
+ nl_opts_sx = SxExpr("(<> " + " ".join(nl_parts) + ")")
+
+ # Footer extra badges as SX fragment
+ badge_parts: list[str] = []
+ if save_success:
+ badge_parts.append('(span :class "text-[14px] text-green-600" "Saved.")')
+ publish_requested = qrequest.args.get("publish_requested") if hasattr(qrequest, 'args') else None
+ if publish_requested:
+ badge_parts.append('(span :class "text-[14px] text-blue-600" "Publish requested \u2014 an admin will review.")')
+ if post.get("publish_requested"):
+ badge_parts.append('(span :class "inline-block px-2 py-0.5 rounded-full text-xs font-semibold bg-blue-100 text-blue-800" "Publish requested")')
+ if already_emailed:
+ nl_name = ""
+ newsletter = ghost_post.get("newsletter")
+ if newsletter:
+ nl_name = getattr(newsletter, "name", "") if not isinstance(newsletter, dict) else newsletter.get("name", "")
+ suffix = f" to {nl_name}" if nl_name else ""
+ badge_parts.append(f'(span :class "inline-block px-2 py-0.5 rounded-full text-xs font-semibold bg-green-100 text-green-800" "Emailed{suffix}")')
+ footer_extra_sx = SxExpr("(<> " + " ".join(badge_parts) + ")") if badge_parts else None
+
+ parts: list[str] = []
+
+ if save_error:
+ parts.append(await render_to_sx("blog-editor-error", error=save_error))
+
+ parts.append(await render_to_sx("blog-editor-edit-form",
+ csrf=csrf,
+ updated_at=str(updated_at),
+ title_val=title_val,
+ excerpt_val=excerpt_val,
+ feature_image=feature_image,
+ feature_image_caption=feature_image_caption,
+ sx_content_val=sx_content,
+ lexical_json=lexical_json,
+ has_sx=has_sx,
+ title_placeholder=title_placeholder,
+ status=status,
+ already_emailed=already_emailed,
+ newsletter_options=nl_opts_sx,
+ footer_extra=footer_extra_sx,
+ ))
+
+ parts.append(await render_to_sx("blog-editor-publish-js", already_emailed=already_emailed))
+ parts.append(await render_to_sx("blog-editor-styles", css_href=editor_css))
+ parts.append(await render_to_sx("sx-editor-styles"))
+
+ init_js = (
+ '(function() {'
+ " function applyEditorFontSize() {"
+ " document.documentElement.style.fontSize = '62.5%';"
+ " document.body.style.fontSize = '1.6rem';"
+ ' }'
+ " function restoreDefaultFontSize() {"
+ " document.documentElement.style.fontSize = '';"
+ " document.body.style.fontSize = '';"
+ ' }'
+ ' applyEditorFontSize();'
+ " document.body.addEventListener('htmx:beforeSwap', function cleanup(e) {"
+ " if (e.detail.target && e.detail.target.id === 'main-panel') {"
+ ' restoreDefaultFontSize();'
+ " document.body.removeEventListener('htmx:beforeSwap', cleanup);"
+ ' }'
+ ' });'
+ ' function init() {'
+ " var csrfToken = document.querySelector('input[name=\"csrf_token\"]').value;"
+ f" var uploadUrl = '{upload_image_url}';"
+ ' var uploadUrls = {'
+ ' image: uploadUrl,'
+ f" media: '{upload_media_url}',"
+ f" file: '{upload_file_url}',"
+ ' };'
+ " var fileInput = document.getElementById('feature-image-file');"
+ " var addBtn = document.getElementById('feature-image-add-btn');"
+ " var deleteBtn = document.getElementById('feature-image-delete-btn');"
+ " var preview = document.getElementById('feature-image-preview');"
+ " var emptyState = document.getElementById('feature-image-empty');"
+ " var filledState = document.getElementById('feature-image-filled');"
+ " var hiddenUrl = document.getElementById('feature-image-input');"
+ " var hiddenCaption = document.getElementById('feature-image-caption-input');"
+ " var captionInput = document.getElementById('feature-image-caption');"
+ " var uploading = document.getElementById('feature-image-uploading');"
+ ' function showFilled(url) {'
+ ' preview.src = url; hiddenUrl.value = url;'
+ " emptyState.classList.add('hidden'); filledState.classList.remove('hidden'); uploading.classList.add('hidden');"
+ ' }'
+ ' function showEmpty() {'
+ " preview.src = ''; hiddenUrl.value = ''; hiddenCaption.value = ''; captionInput.value = '';"
+ " emptyState.classList.remove('hidden'); filledState.classList.add('hidden'); uploading.classList.add('hidden');"
+ ' }'
+ ' function uploadFile(file) {'
+ " emptyState.classList.add('hidden'); uploading.classList.remove('hidden');"
+ " var fd = new FormData(); fd.append('file', file);"
+ " fetch(uploadUrl, { method: 'POST', body: fd, headers: { 'X-CSRFToken': csrfToken } })"
+ " .then(function(r) { if (!r.ok) throw new Error('Upload failed (' + r.status + ')'); return r.json(); })"
+ ' .then(function(data) {'
+ ' var url = data.images && data.images[0] && data.images[0].url;'
+ " if (url) showFilled(url); else { showEmpty(); alert('Upload succeeded but no image URL returned.'); }"
+ ' })'
+ ' .catch(function(e) { showEmpty(); alert(e.message); });'
+ ' }'
+ " addBtn.addEventListener('click', function() { fileInput.click(); });"
+ " preview.addEventListener('click', function() { fileInput.click(); });"
+ " deleteBtn.addEventListener('click', function(e) { e.stopPropagation(); showEmpty(); });"
+ " fileInput.addEventListener('change', function() {"
+ ' if (fileInput.files && fileInput.files[0]) { uploadFile(fileInput.files[0]); fileInput.value = \'\'; }'
+ ' });'
+ " captionInput.addEventListener('input', function() { hiddenCaption.value = captionInput.value; });"
+ " var excerpt = document.querySelector('textarea[name=\"custom_excerpt\"]');"
+ " function autoResize() { excerpt.style.height = 'auto'; excerpt.style.height = excerpt.scrollHeight + 'px'; }"
+ " excerpt.addEventListener('input', autoResize); autoResize();"
+ ' var dataEl = document.getElementById(\'lexical-initial-data\');'
+ ' var initialJson = dataEl ? dataEl.textContent.trim() : null;'
+ ' if (initialJson) { var hidden = document.getElementById(\'lexical-json-input\'); if (hidden) hidden.value = initialJson; }'
+ " window.mountEditor('lexical-editor', {"
+ ' initialJson: initialJson,'
+ ' csrfToken: csrfToken,'
+ ' uploadUrls: uploadUrls,'
+ f" oembedUrl: '{oembed_url}',"
+ f" unsplashApiKey: '{unsplash_key}',"
+ f" snippetsUrl: '{snippets_url}',"
+ ' });'
+ " if (typeof SxEditor !== 'undefined') {"
+ " SxEditor.mount('sx-editor', {"
+ " initialSx: (document.getElementById('sx-content-input') || {}).value || null,"
+ ' csrfToken: csrfToken,'
+ ' uploadUrls: uploadUrls,'
+ f" oembedUrl: '{oembed_url}',"
+ ' onChange: function(sx) {'
+ " document.getElementById('sx-content-input').value = sx;"
+ ' }'
+ ' });'
+ ' }'
+ " document.addEventListener('keydown', function(e) {"
+ " if ((e.ctrlKey || e.metaKey) && e.key === 's') {"
+ " e.preventDefault(); document.getElementById('post-edit-form').requestSubmit();"
+ ' }'
+ ' });'
+ ' }'
+ " if (typeof window.mountEditor === 'function') { init(); }"
+ ' else { var _t = setInterval(function() {'
+ " if (typeof window.mountEditor === 'function') { clearInterval(_t); init(); }"
+ ' }, 50); }'
+ '})();'
+ )
+ parts.append(await render_to_sx("blog-editor-scripts",
+ js_src=editor_js,
+ sx_editor_js_src=sx_editor_js,
+ init_js=init_js))
+
+ return await render_to_sx("blog-editor-panel",
+ parts=SxExpr("(<> " + " ".join(parts) + ")"))
diff --git a/blog/sxc/pages/layouts.py b/blog/sxc/pages/layouts.py
new file mode 100644
index 0000000..aeb22c6
--- /dev/null
+++ b/blog/sxc/pages/layouts.py
@@ -0,0 +1,217 @@
+"""Blog layout functions for defpage rendering."""
+from __future__ import annotations
+
+from typing import Any
+
+
+# ---------------------------------------------------------------------------
+# Header helpers (moved from sx_components — thin render_to_sx wrappers)
+# ---------------------------------------------------------------------------
+
+async def _blog_header_sx(ctx: dict, *, oob: bool = False) -> str:
+ from shared.sx.helpers import render_to_sx
+ from shared.sx.parser import SxExpr
+ return await render_to_sx("menu-row-sx",
+ id="blog-row", level=1,
+ link_label_content=SxExpr("(div)"),
+ child_id="blog-header-child", oob=oob)
+
+
+async def _settings_header_sx(ctx: dict, *, oob: bool = False) -> str:
+ from shared.sx.helpers import render_to_sx
+ from shared.sx.parser import SxExpr
+ from quart import url_for as qurl
+
+ settings_href = qurl("settings.defpage_settings_home")
+ label_sx = await render_to_sx("blog-admin-label")
+ nav_sx = await _settings_nav_sx(ctx)
+
+ return await render_to_sx("menu-row-sx",
+ id="root-settings-row", level=1,
+ link_href=settings_href,
+ link_label_content=SxExpr(label_sx),
+ nav=SxExpr(nav_sx) if nav_sx else None,
+ child_id="root-settings-header-child", oob=oob)
+
+
+async def _settings_nav_sx(ctx: dict) -> str:
+ from shared.sx.helpers import render_to_sx
+ return await render_to_sx("blog-settings-nav",
+ select_colours=ctx.get("select_colours", ""))
+
+
+async def _sub_settings_header_sx(row_id: str, child_id: str, href: str,
+ icon: str, label: str, ctx: dict,
+ *, oob: bool = False, nav_sx: str = "") -> str:
+ from shared.sx.helpers import render_to_sx
+ from shared.sx.parser import SxExpr
+
+ label_sx = await render_to_sx("blog-sub-settings-label",
+ icon=f"fa fa-{icon}", label=label)
+ return await render_to_sx("menu-row-sx",
+ id=row_id, level=2,
+ link_href=href,
+ link_label_content=SxExpr(label_sx),
+ nav=SxExpr(nav_sx) if nav_sx else None,
+ child_id=child_id, oob=oob)
+
+
+# ---------------------------------------------------------------------------
+# Layouts
+# ---------------------------------------------------------------------------
+
+def _register_blog_layouts() -> None:
+ from shared.sx.layouts import register_custom_layout
+ register_custom_layout("blog", _blog_full, _blog_oob)
+ register_custom_layout("blog-settings", _settings_full, _settings_oob,
+ mobile_fn=_settings_mobile)
+ register_custom_layout("blog-cache", _cache_full, _cache_oob)
+ register_custom_layout("blog-snippets", _snippets_full, _snippets_oob)
+ register_custom_layout("blog-menu-items", _menu_items_full, _menu_items_oob)
+ register_custom_layout("blog-tag-groups", _tag_groups_full, _tag_groups_oob)
+ register_custom_layout("blog-tag-group-edit",
+ _tag_group_edit_full, _tag_group_edit_oob)
+
+
+# --- Blog layout (root + blog header) ---
+
+async def _blog_full(ctx: dict, **kw: Any) -> str:
+ from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
+ from shared.sx.parser import SxExpr
+ return await render_to_sx_with_env("blog-layout-full", _ctx_to_env(ctx),
+ blog_header=SxExpr(await _blog_header_sx(ctx)))
+
+
+async def _blog_oob(ctx: dict, **kw: Any) -> str:
+ from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env, oob_header_sx
+ from shared.sx.parser import SxExpr
+ rows = await render_to_sx_with_env("blog-layout-full", _ctx_to_env(ctx),
+ blog_header=SxExpr(await _blog_header_sx(ctx)))
+ return await oob_header_sx("root-header-child", "blog-header-child", rows)
+
+
+# --- Settings layout (root + settings header) ---
+
+async def _settings_full(ctx: dict, **kw: Any) -> str:
+ from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
+ from shared.sx.parser import SxExpr
+ return await render_to_sx_with_env("settings-layout-full", _ctx_to_env(ctx),
+ settings_header=SxExpr(await _settings_header_sx(ctx)))
+
+
+async def _settings_oob(ctx: dict, **kw: Any) -> str:
+ from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env, oob_header_sx
+ from shared.sx.parser import SxExpr
+ rows = await render_to_sx_with_env("settings-layout-full", _ctx_to_env(ctx),
+ settings_header=SxExpr(await _settings_header_sx(ctx)))
+ return await oob_header_sx("root-header-child", "root-settings-header-child", rows)
+
+
+async def _settings_mobile(ctx: dict, **kw: Any) -> str:
+ return await _settings_nav_sx(ctx)
+
+
+# --- Sub-settings helpers ---
+
+async def _sub_settings_full(ctx: dict, row_id: str, child_id: str,
+ endpoint: str, icon: str, label: str) -> str:
+ from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
+ from shared.sx.parser import SxExpr
+ from quart import url_for as qurl
+ return await render_to_sx_with_env("sub-settings-layout-full", _ctx_to_env(ctx),
+ settings_header=SxExpr(await _settings_header_sx(ctx)),
+ sub_header=SxExpr(await _sub_settings_header_sx(
+ row_id, child_id, qurl(endpoint), icon, label, ctx)))
+
+
+async def _sub_settings_oob(ctx: dict, row_id: str, child_id: str,
+ endpoint: str, icon: str, label: str) -> str:
+ from shared.sx.helpers import oob_header_sx, render_to_sx
+ from shared.sx.parser import SxExpr
+ from quart import url_for as qurl
+ settings_hdr_oob = await _settings_header_sx(ctx, oob=True)
+ sub_hdr = await _sub_settings_header_sx(
+ row_id, child_id, qurl(endpoint), icon, label, ctx)
+ sub_oob = await oob_header_sx("root-settings-header-child", child_id, sub_hdr)
+ return await render_to_sx("sub-settings-layout-oob",
+ settings_header_oob=SxExpr(settings_hdr_oob),
+ sub_header_oob=SxExpr(sub_oob))
+
+
+# --- Cache ---
+
+async def _cache_full(ctx: dict, **kw: Any) -> str:
+ return await _sub_settings_full(ctx, "cache-row", "cache-header-child",
+ "defpage_cache_page", "refresh", "Cache")
+
+
+async def _cache_oob(ctx: dict, **kw: Any) -> str:
+ return await _sub_settings_oob(ctx, "cache-row", "cache-header-child",
+ "defpage_cache_page", "refresh", "Cache")
+
+
+# --- Snippets ---
+
+async def _snippets_full(ctx: dict, **kw: Any) -> str:
+ return await _sub_settings_full(ctx, "snippets-row", "snippets-header-child",
+ "defpage_snippets_page", "puzzle-piece", "Snippets")
+
+
+async def _snippets_oob(ctx: dict, **kw: Any) -> str:
+ return await _sub_settings_oob(ctx, "snippets-row", "snippets-header-child",
+ "defpage_snippets_page", "puzzle-piece", "Snippets")
+
+
+# --- Menu Items ---
+
+async def _menu_items_full(ctx: dict, **kw: Any) -> str:
+ return await _sub_settings_full(ctx, "menu_items-row", "menu_items-header-child",
+ "defpage_menu_items_page", "bars", "Menu Items")
+
+
+async def _menu_items_oob(ctx: dict, **kw: Any) -> str:
+ return await _sub_settings_oob(ctx, "menu_items-row", "menu_items-header-child",
+ "defpage_menu_items_page", "bars", "Menu Items")
+
+
+# --- Tag Groups ---
+
+async def _tag_groups_full(ctx: dict, **kw: Any) -> str:
+ return await _sub_settings_full(ctx, "tag-groups-row", "tag-groups-header-child",
+ "defpage_tag_groups_page", "tags", "Tag Groups")
+
+
+async def _tag_groups_oob(ctx: dict, **kw: Any) -> str:
+ return await _sub_settings_oob(ctx, "tag-groups-row", "tag-groups-header-child",
+ "defpage_tag_groups_page", "tags", "Tag Groups")
+
+
+# --- Tag Group Edit ---
+
+async def _tag_group_edit_full(ctx: dict, **kw: Any) -> str:
+ from quart import request, url_for as qurl
+ from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
+ from shared.sx.parser import SxExpr
+ g_id = (request.view_args or {}).get("id")
+ return await render_to_sx_with_env("sub-settings-layout-full", _ctx_to_env(ctx),
+ settings_header=SxExpr(await _settings_header_sx(ctx)),
+ sub_header=SxExpr(await _sub_settings_header_sx(
+ "tag-groups-row", "tag-groups-header-child",
+ qurl("defpage_tag_group_edit", id=g_id),
+ "tags", "Tag Groups", ctx)))
+
+
+async def _tag_group_edit_oob(ctx: dict, **kw: Any) -> str:
+ from quart import request, url_for as qurl
+ from shared.sx.helpers import oob_header_sx, render_to_sx
+ from shared.sx.parser import SxExpr
+ g_id = (request.view_args or {}).get("id")
+ settings_hdr_oob = await _settings_header_sx(ctx, oob=True)
+ sub_hdr = await _sub_settings_header_sx(
+ "tag-groups-row", "tag-groups-header-child",
+ qurl("defpage_tag_group_edit", id=g_id),
+ "tags", "Tag Groups", ctx)
+ sub_oob = await oob_header_sx("root-settings-header-child", "tag-groups-header-child", sub_hdr)
+ return await render_to_sx("sub-settings-layout-oob",
+ settings_header_oob=SxExpr(settings_hdr_oob),
+ sub_header_oob=SxExpr(sub_oob))
diff --git a/blog/sxc/pages/renders.py b/blog/sxc/pages/renders.py
new file mode 100644
index 0000000..fd08d62
--- /dev/null
+++ b/blog/sxc/pages/renders.py
@@ -0,0 +1,177 @@
+"""Blog editor panel rendering."""
+from __future__ import annotations
+
+
+async def render_editor_panel(save_error: str | None = None, is_page: bool = False) -> str:
+ """Build the WYSIWYG editor panel HTML for new post/page creation."""
+ import os
+ from quart import url_for as qurl, current_app
+ from shared.browser.app.csrf import generate_csrf_token
+ from shared.sx.helpers import render_to_sx
+
+ csrf = generate_csrf_token()
+ asset_url_fn = current_app.jinja_env.globals.get("asset_url", lambda p: "")
+ editor_css = asset_url_fn("scripts/editor.css")
+ editor_js = asset_url_fn("scripts/editor.js")
+ sx_editor_js = asset_url_fn("scripts/sx-editor.js")
+
+ upload_image_url = qurl("blog.editor_api.upload_image")
+ upload_media_url = qurl("blog.editor_api.upload_media")
+ upload_file_url = qurl("blog.editor_api.upload_file")
+ oembed_url = qurl("blog.editor_api.oembed_proxy")
+ snippets_url = qurl("blog.editor_api.list_snippets")
+ unsplash_key = os.environ.get("UNSPLASH_ACCESS_KEY", "")
+
+ title_placeholder = "Page title..." if is_page else "Post title..."
+ create_label = "Create Page" if is_page else "Create Post"
+
+ parts: list[str] = []
+
+ if save_error:
+ parts.append(await render_to_sx("blog-editor-error", error=str(save_error)))
+
+ parts.append(await render_to_sx("blog-editor-form",
+ csrf=csrf, title_placeholder=title_placeholder,
+ create_label=create_label,
+ ))
+
+ parts.append(await render_to_sx("blog-editor-styles", css_href=editor_css))
+ parts.append(await render_to_sx("sx-editor-styles"))
+
+ init_js = (
+ "console.log('[EDITOR-DEBUG] init script running');\n"
+ "(function() {\n"
+ " console.log('[EDITOR-DEBUG] IIFE entered, mountEditor=', typeof window.mountEditor);\n"
+ " function init() {\n"
+ " var csrfToken = document.querySelector('input[name=\"csrf_token\"]').value;\n"
+ f" var uploadUrl = '{upload_image_url}';\n"
+ " var uploadUrls = {\n"
+ " image: uploadUrl,\n"
+ f" media: '{upload_media_url}',\n"
+ f" file: '{upload_file_url}',\n"
+ " };\n"
+ "\n"
+ " var fileInput = document.getElementById('feature-image-file');\n"
+ " var addBtn = document.getElementById('feature-image-add-btn');\n"
+ " var deleteBtn = document.getElementById('feature-image-delete-btn');\n"
+ " var preview = document.getElementById('feature-image-preview');\n"
+ " var emptyState = document.getElementById('feature-image-empty');\n"
+ " var filledState = document.getElementById('feature-image-filled');\n"
+ " var hiddenUrl = document.getElementById('feature-image-input');\n"
+ " var hiddenCaption = document.getElementById('feature-image-caption-input');\n"
+ " var captionInput = document.getElementById('feature-image-caption');\n"
+ " var uploading = document.getElementById('feature-image-uploading');\n"
+ "\n"
+ " function showFilled(url) {\n"
+ " preview.src = url;\n"
+ " hiddenUrl.value = url;\n"
+ " emptyState.classList.add('hidden');\n"
+ " filledState.classList.remove('hidden');\n"
+ " uploading.classList.add('hidden');\n"
+ " }\n"
+ "\n"
+ " function showEmpty() {\n"
+ " preview.src = '';\n"
+ " hiddenUrl.value = '';\n"
+ " hiddenCaption.value = '';\n"
+ " captionInput.value = '';\n"
+ " emptyState.classList.remove('hidden');\n"
+ " filledState.classList.add('hidden');\n"
+ " uploading.classList.add('hidden');\n"
+ " }\n"
+ "\n"
+ " function uploadFile(file) {\n"
+ " emptyState.classList.add('hidden');\n"
+ " uploading.classList.remove('hidden');\n"
+ " var fd = new FormData();\n"
+ " fd.append('file', file);\n"
+ " fetch(uploadUrl, {\n"
+ " method: 'POST',\n"
+ " body: fd,\n"
+ " headers: { 'X-CSRFToken': csrfToken },\n"
+ " })\n"
+ " .then(function(r) {\n"
+ " if (!r.ok) throw new Error('Upload failed (' + r.status + ')');\n"
+ " return r.json();\n"
+ " })\n"
+ " .then(function(data) {\n"
+ " var url = data.images && data.images[0] && data.images[0].url;\n"
+ " if (url) showFilled(url);\n"
+ " else { showEmpty(); alert('Upload succeeded but no image URL returned.'); }\n"
+ " })\n"
+ " .catch(function(e) {\n"
+ " showEmpty();\n"
+ " alert(e.message);\n"
+ " });\n"
+ " }\n"
+ "\n"
+ " addBtn.addEventListener('click', function() { fileInput.click(); });\n"
+ " preview.addEventListener('click', function() { fileInput.click(); });\n"
+ " deleteBtn.addEventListener('click', function(e) {\n"
+ " e.stopPropagation();\n"
+ " showEmpty();\n"
+ " });\n"
+ " fileInput.addEventListener('change', function() {\n"
+ " if (fileInput.files && fileInput.files[0]) {\n"
+ " uploadFile(fileInput.files[0]);\n"
+ " fileInput.value = '';\n"
+ " }\n"
+ " });\n"
+ " captionInput.addEventListener('input', function() {\n"
+ " hiddenCaption.value = captionInput.value;\n"
+ " });\n"
+ "\n"
+ " var excerpt = document.querySelector('textarea[name=\"custom_excerpt\"]');\n"
+ " function autoResize() {\n"
+ " excerpt.style.height = 'auto';\n"
+ " excerpt.style.height = excerpt.scrollHeight + 'px';\n"
+ " }\n"
+ " excerpt.addEventListener('input', autoResize);\n"
+ " autoResize();\n"
+ "\n"
+ " window.mountEditor('lexical-editor', {\n"
+ " initialJson: null,\n"
+ " csrfToken: csrfToken,\n"
+ " uploadUrls: uploadUrls,\n"
+ f" oembedUrl: '{oembed_url}',\n"
+ f" unsplashApiKey: '{unsplash_key}',\n"
+ f" snippetsUrl: '{snippets_url}',\n"
+ " });\n"
+ "\n"
+ " if (typeof SxEditor !== 'undefined') {\n"
+ " SxEditor.mount('sx-editor', {\n"
+ " initialSx: (document.getElementById('sx-content-input') || {}).value || null,\n"
+ " csrfToken: csrfToken,\n"
+ " uploadUrls: uploadUrls,\n"
+ f" oembedUrl: '{oembed_url}',\n"
+ " onChange: function(sx) {\n"
+ " document.getElementById('sx-content-input').value = sx;\n"
+ " }\n"
+ " });\n"
+ " }\n"
+ "\n"
+ " document.addEventListener('keydown', function(e) {\n"
+ " if ((e.ctrlKey || e.metaKey) && e.key === 's') {\n"
+ " e.preventDefault();\n"
+ " document.getElementById('post-new-form').requestSubmit();\n"
+ " }\n"
+ " });\n"
+ " }\n"
+ "\n"
+ " if (typeof window.mountEditor === 'function') {\n"
+ " init();\n"
+ " } else {\n"
+ " var _t = setInterval(function() {\n"
+ " if (typeof window.mountEditor === 'function') { clearInterval(_t); init(); }\n"
+ " }, 50);\n"
+ " }\n"
+ "})();\n"
+ )
+ parts.append(await render_to_sx("blog-editor-scripts",
+ js_src=editor_js,
+ sx_editor_js_src=sx_editor_js,
+ init_js=init_js))
+
+ from shared.sx.parser import SxExpr
+ return await render_to_sx("blog-editor-panel",
+ parts=SxExpr("(<> " + " ".join(parts) + ")")) if parts else ""
diff --git a/cart/bp/cart/global_routes.py b/cart/bp/cart/global_routes.py
index 4a8b855..fa1bebc 100644
--- a/cart/bp/cart/global_routes.py
+++ b/cart/bp/cart/global_routes.py
@@ -151,7 +151,7 @@ def register(url_prefix: str) -> Blueprint:
page_config = await resolve_page_config(g.s, cart, calendar_entries, tickets)
except ValueError as e:
from shared.sx.page import get_template_context
- from sxc.pages import render_checkout_error_page
+ from sxc.pages.renders import render_checkout_error_page
tctx = await get_template_context()
html = await render_checkout_error_page(tctx, error=str(e))
return await make_response(html, 400)
@@ -208,7 +208,7 @@ def register(url_prefix: str) -> Blueprint:
if not hosted_url:
from shared.sx.page import get_template_context
- from sxc.pages import render_checkout_error_page
+ from sxc.pages.renders import render_checkout_error_page
tctx = await get_template_context()
html = await render_checkout_error_page(tctx, error="No hosted checkout URL returned from SumUp.")
return await make_response(html, 500)
diff --git a/cart/bp/cart/page_routes.py b/cart/bp/cart/page_routes.py
index 41e3d28..527cb42 100644
--- a/cart/bp/cart/page_routes.py
+++ b/cart/bp/cart/page_routes.py
@@ -73,7 +73,7 @@ def register(url_prefix: str) -> Blueprint:
if not hosted_url:
from shared.sx.page import get_template_context
- from sxc.pages import render_checkout_error_page
+ from sxc.pages.renders import render_checkout_error_page
tctx = await get_template_context()
html = await render_checkout_error_page(tctx, error="No hosted checkout URL returned from SumUp.")
return await make_response(html, 500)
diff --git a/cart/bp/order/routes.py b/cart/bp/order/routes.py
index 8852ce9..fb6d5ef 100644
--- a/cart/bp/order/routes.py
+++ b/cart/bp/order/routes.py
@@ -57,7 +57,7 @@ def register() -> Blueprint:
if not order:
return await make_response("Order not found", 404)
from shared.sx.page import get_template_context
- from sxc.pages import render_order_page, render_order_oob
+ from sxc.pages.renders import render_order_page, render_order_oob
ctx = await get_template_context()
calendar_entries = ctx.get("calendar_entries")
@@ -122,7 +122,7 @@ def register() -> Blueprint:
if not hosted_url:
from shared.sx.page import get_template_context
- from sxc.pages import render_checkout_error_page
+ from sxc.pages.renders import render_checkout_error_page
tctx = await get_template_context()
html = await render_checkout_error_page(tctx, error="No hosted checkout URL returned from SumUp when trying to reopen payment.", order=order)
return await make_response(html, 500)
diff --git a/cart/bp/orders/routes.py b/cart/bp/orders/routes.py
index aa08254..22150a2 100644
--- a/cart/bp/orders/routes.py
+++ b/cart/bp/orders/routes.py
@@ -138,7 +138,7 @@ def register(url_prefix: str) -> Blueprint:
orders = result.scalars().all()
from shared.sx.page import get_template_context
- from sxc.pages import (
+ from sxc.pages.renders import (
render_orders_page,
render_orders_rows,
render_orders_oob,
diff --git a/cart/bp/page_admin/routes.py b/cart/bp/page_admin/routes.py
index 95ae714..934e003 100644
--- a/cart/bp/page_admin/routes.py
+++ b/cart/bp/page_admin/routes.py
@@ -47,7 +47,7 @@ def register():
g.page_config = SimpleNamespace(**raw_pc) if raw_pc else None
from shared.sx.page import get_template_context
- from sxc.pages import render_cart_payments_panel
+ from sxc.pages.renders import render_cart_payments_panel
ctx = await get_template_context()
html = await render_cart_payments_panel(ctx)
return sx_response(html)
diff --git a/cart/sxc/pages/__init__.py b/cart/sxc/pages/__init__.py
index 06a36f5..439693d 100644
--- a/cart/sxc/pages/__init__.py
+++ b/cart/sxc/pages/__init__.py
@@ -1,14 +1,10 @@
"""Cart defpage setup — registers layouts and loads .sx pages."""
from __future__ import annotations
-from typing import Any
-
-from markupsafe import escape
-from shared.sx.parser import SxExpr
-
def setup_cart_pages() -> None:
"""Register cart-specific layouts and load page definitions."""
+ from .layouts import _register_cart_layouts
_register_cart_layouts()
_load_cart_page_files()
@@ -17,305 +13,3 @@ def _load_cart_page_files() -> None:
import os
from shared.sx.pages import load_page_dir
load_page_dir(os.path.dirname(__file__), "cart")
-
-
-# ---------------------------------------------------------------------------
-# Header helpers (still needed by layouts and render functions)
-# ---------------------------------------------------------------------------
-
-def _ensure_post_ctx(ctx: dict, page_post: Any) -> dict:
- """Ensure ctx has a 'post' dict from page_post DTO."""
- if ctx.get("post") or not page_post:
- return ctx
- return {**ctx, "post": {
- "id": getattr(page_post, "id", None),
- "slug": getattr(page_post, "slug", ""),
- "title": getattr(page_post, "title", ""),
- "feature_image": getattr(page_post, "feature_image", None),
- }}
-
-
-async def _ensure_container_nav(ctx: dict) -> dict:
- """Fetch container_nav if not already present."""
- if ctx.get("container_nav"):
- return ctx
- post = ctx.get("post") or {}
- post_id = post.get("id")
- if not post_id:
- return ctx
- slug = post.get("slug", "")
- from shared.infrastructure.fragments import fetch_fragments
- nav_params = {
- "container_type": "page",
- "container_id": str(post_id),
- "post_slug": slug,
- }
- events_nav, market_nav = await fetch_fragments([
- ("events", "container-nav", nav_params),
- ("market", "container-nav", nav_params),
- ], required=False)
- return {**ctx, "container_nav": events_nav + market_nav}
-
-
-async def _post_header_sx(ctx: dict, page_post: Any, *, oob: bool = False) -> str:
- from shared.sx.helpers import post_header_sx as _shared_post_header_sx
- ctx = _ensure_post_ctx(ctx, page_post)
- ctx = await _ensure_container_nav(ctx)
- return await _shared_post_header_sx(ctx, oob=oob)
-
-
-async def _cart_header_sx(ctx: dict, *, oob: bool = False) -> str:
- from shared.sx.helpers import render_to_sx, call_url
- return await render_to_sx(
- "menu-row-sx",
- id="cart-row", level=1, colour="sky",
- link_href=call_url(ctx, "cart_url", "/"),
- link_label="cart", icon="fa fa-shopping-cart",
- child_id="cart-header-child", oob=oob,
- )
-
-
-async def _page_cart_header_sx(ctx: dict, page_post: Any, *, oob: bool = False) -> str:
- from shared.sx.helpers import render_to_sx, call_url
- slug = page_post.slug if page_post else ""
- title = ((page_post.title if page_post else None) or "")[:160]
- label_parts = []
- if page_post and page_post.feature_image:
- label_parts.append(await render_to_sx("cart-page-label-img", src=page_post.feature_image))
- label_parts.append(f'(span "{escape(title)}")')
- label_sx = "(<> " + " ".join(label_parts) + ")"
- nav_sx = await render_to_sx("cart-all-carts-link", href=call_url(ctx, "cart_url", "/"))
- return await render_to_sx(
- "menu-row-sx",
- id="page-cart-row", level=2, colour="sky",
- link_href=call_url(ctx, "cart_url", f"/{slug}/"),
- link_label_content=SxExpr(label_sx),
- nav=SxExpr(nav_sx), oob=oob,
- )
-
-
-# ---------------------------------------------------------------------------
-# Order serialization helpers
-# ---------------------------------------------------------------------------
-
-def _serialize_order(order: Any) -> dict:
- from shared.infrastructure.urls import market_product_url
- created = order.created_at.strftime("%-d %b %Y, %H:%M") if order.created_at else "\u2014"
- items = []
- if order.items:
- for item in order.items:
- items.append({
- "product_image": item.product_image,
- "product_title": item.product_title or "Unknown product",
- "product_id": item.product_id,
- "product_slug": item.product_slug,
- "product_url": market_product_url(item.product_slug),
- "quantity": item.quantity,
- "unit_price_formatted": f"{item.unit_price or 0:.2f}",
- "currency": item.currency or order.currency or "GBP",
- })
- return {
- "id": order.id,
- "status": order.status or "pending",
- "created_at_formatted": created,
- "description": order.description or "",
- "total_formatted": f"{order.total_amount or 0:.2f}",
- "total_amount": float(order.total_amount or 0),
- "currency": order.currency or "GBP",
- "items": items,
- }
-
-
-def _serialize_calendar_entry(e: Any) -> dict:
- st = e.state or ""
- ds = e.start_at.strftime("%-d %b %Y, %H:%M") if e.start_at else ""
- if e.end_at:
- ds += f" \u2013 {e.end_at.strftime('%-d %b %Y, %H:%M')}"
- return {"name": e.name, "state": st, "date_str": ds, "cost_formatted": f"{e.cost or 0:.2f}"}
-
-
-# ---------------------------------------------------------------------------
-# Render functions (called by routes) — delegate header composition to .sx
-# ---------------------------------------------------------------------------
-
-async def render_orders_page(ctx, orders, page, total_pages, search, search_count, url_for_fn, qs_fn):
- from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, search_desktop_sx, search_mobile_sx, full_page_sx
- from shared.utils import route_prefix
- ctx["search"] = search
- ctx["search_count"] = search_count
- pfx = route_prefix()
- list_url = pfx + url_for_fn("orders.list_orders")
- detail_url_prefix = pfx + url_for_fn("orders.order.order_detail", order_id=0).rsplit("0/", 1)[0]
- order_dicts = [_serialize_order(o) for o in orders]
- content = await render_to_sx("orders-list-content", orders=order_dicts,
- page=page, total_pages=total_pages, rows_url=list_url, detail_url_prefix=detail_url_prefix)
- header_rows = await render_to_sx_with_env("cart-orders-layout-full", _ctx_to_env(ctx),
- list_url=list_url,
- )
- filt = await render_to_sx("order-list-header", search_mobile=SxExpr(await search_mobile_sx(ctx)))
- return await full_page_sx(ctx, header_rows=header_rows, filter=filt,
- aside=await search_desktop_sx(ctx), content=content)
-
-
-async def render_orders_rows(ctx, orders, page, total_pages, url_for_fn, qs_fn):
- from shared.sx.helpers import render_to_sx
- from shared.utils import route_prefix
- pfx = route_prefix()
- list_url = pfx + url_for_fn("orders.list_orders")
- detail_url_prefix = pfx + url_for_fn("orders.order.order_detail", order_id=0).rsplit("0/", 1)[0]
- order_dicts = [_serialize_order(o) for o in orders]
- parts = []
- for od in order_dicts:
- parts.append(await render_to_sx("order-row-pair", order=od, detail_url_prefix=detail_url_prefix))
- next_scroll = ""
- if page < total_pages:
- next_url = list_url + qs_fn(page=page + 1)
- next_scroll = await render_to_sx("infinite-scroll", url=next_url, page=page,
- total_pages=total_pages, id_prefix="orders", colspan=5)
- else:
- next_scroll = await render_to_sx("order-end-row")
- return await render_to_sx("cart-orders-rows",
- rows=SxExpr("(<> " + " ".join(parts) + ")"),
- next_scroll=SxExpr(next_scroll),
- )
-
-
-async def render_orders_oob(ctx, orders, page, total_pages, search, search_count, url_for_fn, qs_fn):
- from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, search_desktop_sx, search_mobile_sx, oob_page_sx
- from shared.utils import route_prefix
- ctx["search"] = search
- ctx["search_count"] = search_count
- pfx = route_prefix()
- list_url = pfx + url_for_fn("orders.list_orders")
- detail_url_prefix = pfx + url_for_fn("orders.order.order_detail", order_id=0).rsplit("0/", 1)[0]
- order_dicts = [_serialize_order(o) for o in orders]
- content = await render_to_sx("orders-list-content", orders=order_dicts,
- page=page, total_pages=total_pages, rows_url=list_url, detail_url_prefix=detail_url_prefix)
- oobs = await render_to_sx_with_env("cart-orders-layout-oob", _ctx_to_env(ctx, oob=True),
- list_url=list_url,
- )
- filt = await render_to_sx("order-list-header", search_mobile=SxExpr(await search_mobile_sx(ctx)))
- return await oob_page_sx(oobs=oobs, filter=filt, aside=await search_desktop_sx(ctx), content=content)
-
-
-async def render_order_page(ctx, order, calendar_entries, url_for_fn):
- from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, full_page_sx
- from shared.utils import route_prefix
- from shared.browser.app.csrf import generate_csrf_token
- pfx = route_prefix()
- detail_url = pfx + url_for_fn("orders.order.order_detail", order_id=order.id)
- list_url = pfx + url_for_fn("orders.list_orders")
- recheck_url = pfx + url_for_fn("orders.order.order_recheck", order_id=order.id)
- pay_url = pfx + url_for_fn("orders.order.order_pay", order_id=order.id)
- order_data = _serialize_order(order)
- cal_data = [_serialize_calendar_entry(e) for e in (calendar_entries or [])]
- main = await render_to_sx("order-detail-content", order=order_data, calendar_entries=cal_data)
- filt = await render_to_sx("order-detail-filter-content", order=order_data,
- list_url=list_url, recheck_url=recheck_url, pay_url=pay_url, csrf=generate_csrf_token())
- header_rows = await render_to_sx_with_env("cart-order-detail-layout-full", _ctx_to_env(ctx),
- list_url=list_url, detail_url=detail_url,
- order_label=f"Order {order.id}",
- )
- return await full_page_sx(ctx, header_rows=header_rows, filter=filt, content=main)
-
-
-async def render_order_oob(ctx, order, calendar_entries, url_for_fn):
- from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, oob_page_sx
- from shared.utils import route_prefix
- from shared.browser.app.csrf import generate_csrf_token
- pfx = route_prefix()
- detail_url = pfx + url_for_fn("orders.order.order_detail", order_id=order.id)
- list_url = pfx + url_for_fn("orders.list_orders")
- recheck_url = pfx + url_for_fn("orders.order.order_recheck", order_id=order.id)
- pay_url = pfx + url_for_fn("orders.order.order_pay", order_id=order.id)
- order_data = _serialize_order(order)
- cal_data = [_serialize_calendar_entry(e) for e in (calendar_entries or [])]
- main = await render_to_sx("order-detail-content", order=order_data, calendar_entries=cal_data)
- filt = await render_to_sx("order-detail-filter-content", order=order_data,
- list_url=list_url, recheck_url=recheck_url, pay_url=pay_url, csrf=generate_csrf_token())
- oobs = await render_to_sx_with_env("cart-order-detail-layout-oob", _ctx_to_env(ctx, oob=True),
- detail_url=detail_url,
- order_label=f"Order {order.id}",
- )
- return await oob_page_sx(oobs=oobs, filter=filt, content=main)
-
-
-async def render_checkout_error_page(ctx, error=None, order=None):
- from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, full_page_sx
- from shared.infrastructure.urls import cart_url
- err_msg = error or "Unexpected error while creating the hosted checkout session."
- order_sx = await render_to_sx("checkout-error-order-id", oid=f"#{order.id}") if order else None
- hdr = await render_to_sx_with_env("layout-root-full", _ctx_to_env(ctx))
- filt = await render_to_sx("checkout-error-header")
- content = await render_to_sx("checkout-error-content", msg=err_msg,
- order=SxExpr(order_sx) if order_sx else None, back_url=cart_url("/"))
- return await full_page_sx(ctx, header_rows=hdr, filter=filt, content=content)
-
-
-async def render_cart_payments_panel(ctx):
- from shared.sx.helpers import render_to_sx
- page_config = ctx.get("page_config")
- pc_data = None
- if page_config:
- pc_data = {
- "sumup_api_key": bool(getattr(page_config, "sumup_api_key", None)),
- "sumup_merchant_code": getattr(page_config, "sumup_merchant_code", None) or "",
- "sumup_checkout_prefix": getattr(page_config, "sumup_checkout_prefix", None) or "",
- }
- return await render_to_sx("cart-payments-content", page_config=pc_data)
-
-
-# ---------------------------------------------------------------------------
-# Layouts — thin wrappers delegating to .sx defcomps in cart/sx/layouts.sx
-# ---------------------------------------------------------------------------
-
-def _register_cart_layouts() -> None:
- from shared.sx.layouts import register_custom_layout
- register_custom_layout("cart-page", _cart_page_full, _cart_page_oob)
- register_custom_layout("cart-admin", _cart_admin_full, _cart_admin_oob)
-
-
-async def _cart_page_full(ctx: dict, **kw: Any) -> str:
- from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
- page_post = ctx.get("page_post")
- env = _ctx_to_env(ctx)
- return await render_to_sx_with_env("cart-page-layout-full", env,
- cart_row=SxExpr(await _cart_header_sx(ctx)),
- page_cart_row=SxExpr(await _page_cart_header_sx(ctx, page_post)),
- )
-
-
-async def _cart_page_oob(ctx: dict, **kw: Any) -> str:
- from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env, root_header_sx
- page_post = ctx.get("page_post")
- env = _ctx_to_env(ctx, oob=True)
- return await render_to_sx_with_env("cart-page-layout-oob", env,
- root_header_oob=SxExpr(await root_header_sx(ctx, oob=True)),
- cart_row_oob=SxExpr(await _cart_header_sx(ctx, oob=True)),
- page_cart_row=SxExpr(await _page_cart_header_sx(ctx, page_post)),
- )
-
-
-async def _cart_admin_full(ctx: dict, **kw: Any) -> str:
- from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
- page_post = ctx.get("page_post")
- selected = kw.get("selected", "")
- env = _ctx_to_env(ctx)
- return await render_to_sx_with_env("cart-admin-layout-full", env,
- post_header=SxExpr(await _post_header_sx(ctx, page_post)),
- admin_header=SxExpr(await _cart_page_admin_header_sx(ctx, page_post, selected=selected)),
- )
-
-
-async def _cart_admin_oob(ctx: dict, **kw: Any) -> str:
- page_post = ctx.get("page_post")
- selected = kw.get("selected", "")
- return await _cart_page_admin_header_sx(ctx, page_post, oob=True, selected=selected)
-
-
-async def _cart_page_admin_header_sx(ctx: dict, page_post: Any, *, oob: bool = False,
- selected: str = "") -> str:
- from shared.sx.helpers import post_admin_header_sx
- slug = page_post.slug if page_post else ""
- ctx = _ensure_post_ctx(ctx, page_post)
- return await post_admin_header_sx(ctx, slug, oob=oob, selected=selected)
diff --git a/cart/sxc/pages/layouts.py b/cart/sxc/pages/layouts.py
new file mode 100644
index 0000000..1591129
--- /dev/null
+++ b/cart/sxc/pages/layouts.py
@@ -0,0 +1,138 @@
+"""Cart layout registration and header builders."""
+from __future__ import annotations
+
+from typing import Any
+
+from markupsafe import escape
+from shared.sx.parser import SxExpr
+
+
+def _register_cart_layouts() -> None:
+ from shared.sx.layouts import register_custom_layout
+ register_custom_layout("cart-page", _cart_page_full, _cart_page_oob)
+ register_custom_layout("cart-admin", _cart_admin_full, _cart_admin_oob)
+
+
+# ---------------------------------------------------------------------------
+# Header helpers
+# ---------------------------------------------------------------------------
+
+def _ensure_post_ctx(ctx: dict, page_post: Any) -> dict:
+ """Ensure ctx has a 'post' dict from page_post DTO."""
+ if ctx.get("post") or not page_post:
+ return ctx
+ return {**ctx, "post": {
+ "id": getattr(page_post, "id", None),
+ "slug": getattr(page_post, "slug", ""),
+ "title": getattr(page_post, "title", ""),
+ "feature_image": getattr(page_post, "feature_image", None),
+ }}
+
+
+async def _ensure_container_nav(ctx: dict) -> dict:
+ """Fetch container_nav if not already present."""
+ if ctx.get("container_nav"):
+ return ctx
+ post = ctx.get("post") or {}
+ post_id = post.get("id")
+ if not post_id:
+ return ctx
+ slug = post.get("slug", "")
+ from shared.infrastructure.fragments import fetch_fragments
+ nav_params = {
+ "container_type": "page",
+ "container_id": str(post_id),
+ "post_slug": slug,
+ }
+ events_nav, market_nav = await fetch_fragments([
+ ("events", "container-nav", nav_params),
+ ("market", "container-nav", nav_params),
+ ], required=False)
+ return {**ctx, "container_nav": events_nav + market_nav}
+
+
+async def _post_header_sx(ctx: dict, page_post: Any, *, oob: bool = False) -> str:
+ from shared.sx.helpers import post_header_sx as _shared_post_header_sx
+ ctx = _ensure_post_ctx(ctx, page_post)
+ ctx = await _ensure_container_nav(ctx)
+ return await _shared_post_header_sx(ctx, oob=oob)
+
+
+async def _cart_header_sx(ctx: dict, *, oob: bool = False) -> str:
+ from shared.sx.helpers import render_to_sx, call_url
+ return await render_to_sx(
+ "menu-row-sx",
+ id="cart-row", level=1, colour="sky",
+ link_href=call_url(ctx, "cart_url", "/"),
+ link_label="cart", icon="fa fa-shopping-cart",
+ child_id="cart-header-child", oob=oob,
+ )
+
+
+async def _page_cart_header_sx(ctx: dict, page_post: Any, *, oob: bool = False) -> str:
+ from shared.sx.helpers import render_to_sx, call_url
+ slug = page_post.slug if page_post else ""
+ title = ((page_post.title if page_post else None) or "")[:160]
+ label_parts = []
+ if page_post and page_post.feature_image:
+ label_parts.append(await render_to_sx("cart-page-label-img", src=page_post.feature_image))
+ label_parts.append(f'(span "{escape(title)}")')
+ label_sx = "(<> " + " ".join(label_parts) + ")"
+ nav_sx = await render_to_sx("cart-all-carts-link", href=call_url(ctx, "cart_url", "/"))
+ return await render_to_sx(
+ "menu-row-sx",
+ id="page-cart-row", level=2, colour="sky",
+ link_href=call_url(ctx, "cart_url", f"/{slug}/"),
+ link_label_content=SxExpr(label_sx),
+ nav=SxExpr(nav_sx), oob=oob,
+ )
+
+
+async def _cart_page_admin_header_sx(ctx: dict, page_post: Any, *, oob: bool = False,
+ selected: str = "") -> str:
+ from shared.sx.helpers import post_admin_header_sx
+ slug = page_post.slug if page_post else ""
+ ctx = _ensure_post_ctx(ctx, page_post)
+ return await post_admin_header_sx(ctx, slug, oob=oob, selected=selected)
+
+
+# ---------------------------------------------------------------------------
+# Layout functions
+# ---------------------------------------------------------------------------
+
+async def _cart_page_full(ctx: dict, **kw: Any) -> str:
+ from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
+ page_post = ctx.get("page_post")
+ env = _ctx_to_env(ctx)
+ return await render_to_sx_with_env("cart-page-layout-full", env,
+ cart_row=SxExpr(await _cart_header_sx(ctx)),
+ page_cart_row=SxExpr(await _page_cart_header_sx(ctx, page_post)),
+ )
+
+
+async def _cart_page_oob(ctx: dict, **kw: Any) -> str:
+ from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env, root_header_sx
+ page_post = ctx.get("page_post")
+ env = _ctx_to_env(ctx, oob=True)
+ return await render_to_sx_with_env("cart-page-layout-oob", env,
+ root_header_oob=SxExpr(await root_header_sx(ctx, oob=True)),
+ cart_row_oob=SxExpr(await _cart_header_sx(ctx, oob=True)),
+ page_cart_row=SxExpr(await _page_cart_header_sx(ctx, page_post)),
+ )
+
+
+async def _cart_admin_full(ctx: dict, **kw: Any) -> str:
+ from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
+ page_post = ctx.get("page_post")
+ selected = kw.get("selected", "")
+ env = _ctx_to_env(ctx)
+ return await render_to_sx_with_env("cart-admin-layout-full", env,
+ post_header=SxExpr(await _post_header_sx(ctx, page_post)),
+ admin_header=SxExpr(await _cart_page_admin_header_sx(ctx, page_post, selected=selected)),
+ )
+
+
+async def _cart_admin_oob(ctx: dict, **kw: Any) -> str:
+ page_post = ctx.get("page_post")
+ selected = kw.get("selected", "")
+ return await _cart_page_admin_header_sx(ctx, page_post, oob=True, selected=selected)
diff --git a/cart/sxc/pages/renders.py b/cart/sxc/pages/renders.py
new file mode 100644
index 0000000..f53027e
--- /dev/null
+++ b/cart/sxc/pages/renders.py
@@ -0,0 +1,133 @@
+"""Cart render functions — called from bp routes."""
+from __future__ import annotations
+
+from shared.sx.parser import SxExpr
+
+from .utils import _serialize_order, _serialize_calendar_entry
+
+
+async def render_orders_page(ctx, orders, page, total_pages, search, search_count, url_for_fn, qs_fn):
+ from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, search_desktop_sx, search_mobile_sx, full_page_sx
+ from shared.utils import route_prefix
+ ctx["search"] = search
+ ctx["search_count"] = search_count
+ pfx = route_prefix()
+ list_url = pfx + url_for_fn("orders.list_orders")
+ detail_url_prefix = pfx + url_for_fn("orders.order.order_detail", order_id=0).rsplit("0/", 1)[0]
+ order_dicts = [_serialize_order(o) for o in orders]
+ content = await render_to_sx("orders-list-content", orders=order_dicts,
+ page=page, total_pages=total_pages, rows_url=list_url, detail_url_prefix=detail_url_prefix)
+ header_rows = await render_to_sx_with_env("cart-orders-layout-full", _ctx_to_env(ctx),
+ list_url=list_url,
+ )
+ filt = await render_to_sx("order-list-header", search_mobile=SxExpr(await search_mobile_sx(ctx)))
+ return await full_page_sx(ctx, header_rows=header_rows, filter=filt,
+ aside=await search_desktop_sx(ctx), content=content)
+
+
+async def render_orders_rows(ctx, orders, page, total_pages, url_for_fn, qs_fn):
+ from shared.sx.helpers import render_to_sx
+ from shared.utils import route_prefix
+ pfx = route_prefix()
+ list_url = pfx + url_for_fn("orders.list_orders")
+ detail_url_prefix = pfx + url_for_fn("orders.order.order_detail", order_id=0).rsplit("0/", 1)[0]
+ order_dicts = [_serialize_order(o) for o in orders]
+ parts = []
+ for od in order_dicts:
+ parts.append(await render_to_sx("order-row-pair", order=od, detail_url_prefix=detail_url_prefix))
+ next_scroll = ""
+ if page < total_pages:
+ next_url = list_url + qs_fn(page=page + 1)
+ next_scroll = await render_to_sx("infinite-scroll", url=next_url, page=page,
+ total_pages=total_pages, id_prefix="orders", colspan=5)
+ else:
+ next_scroll = await render_to_sx("order-end-row")
+ return await render_to_sx("cart-orders-rows",
+ rows=SxExpr("(<> " + " ".join(parts) + ")"),
+ next_scroll=SxExpr(next_scroll),
+ )
+
+
+async def render_orders_oob(ctx, orders, page, total_pages, search, search_count, url_for_fn, qs_fn):
+ from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, search_desktop_sx, search_mobile_sx, oob_page_sx
+ from shared.utils import route_prefix
+ ctx["search"] = search
+ ctx["search_count"] = search_count
+ pfx = route_prefix()
+ list_url = pfx + url_for_fn("orders.list_orders")
+ detail_url_prefix = pfx + url_for_fn("orders.order.order_detail", order_id=0).rsplit("0/", 1)[0]
+ order_dicts = [_serialize_order(o) for o in orders]
+ content = await render_to_sx("orders-list-content", orders=order_dicts,
+ page=page, total_pages=total_pages, rows_url=list_url, detail_url_prefix=detail_url_prefix)
+ oobs = await render_to_sx_with_env("cart-orders-layout-oob", _ctx_to_env(ctx, oob=True),
+ list_url=list_url,
+ )
+ filt = await render_to_sx("order-list-header", search_mobile=SxExpr(await search_mobile_sx(ctx)))
+ return await oob_page_sx(oobs=oobs, filter=filt, aside=await search_desktop_sx(ctx), content=content)
+
+
+async def render_order_page(ctx, order, calendar_entries, url_for_fn):
+ from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, full_page_sx
+ from shared.utils import route_prefix
+ from shared.browser.app.csrf import generate_csrf_token
+ pfx = route_prefix()
+ detail_url = pfx + url_for_fn("orders.order.order_detail", order_id=order.id)
+ list_url = pfx + url_for_fn("orders.list_orders")
+ recheck_url = pfx + url_for_fn("orders.order.order_recheck", order_id=order.id)
+ pay_url = pfx + url_for_fn("orders.order.order_pay", order_id=order.id)
+ order_data = _serialize_order(order)
+ cal_data = [_serialize_calendar_entry(e) for e in (calendar_entries or [])]
+ main = await render_to_sx("order-detail-content", order=order_data, calendar_entries=cal_data)
+ filt = await render_to_sx("order-detail-filter-content", order=order_data,
+ list_url=list_url, recheck_url=recheck_url, pay_url=pay_url, csrf=generate_csrf_token())
+ header_rows = await render_to_sx_with_env("cart-order-detail-layout-full", _ctx_to_env(ctx),
+ list_url=list_url, detail_url=detail_url,
+ order_label=f"Order {order.id}",
+ )
+ return await full_page_sx(ctx, header_rows=header_rows, filter=filt, content=main)
+
+
+async def render_order_oob(ctx, order, calendar_entries, url_for_fn):
+ from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, oob_page_sx
+ from shared.utils import route_prefix
+ from shared.browser.app.csrf import generate_csrf_token
+ pfx = route_prefix()
+ detail_url = pfx + url_for_fn("orders.order.order_detail", order_id=order.id)
+ list_url = pfx + url_for_fn("orders.list_orders")
+ recheck_url = pfx + url_for_fn("orders.order.order_recheck", order_id=order.id)
+ pay_url = pfx + url_for_fn("orders.order.order_pay", order_id=order.id)
+ order_data = _serialize_order(order)
+ cal_data = [_serialize_calendar_entry(e) for e in (calendar_entries or [])]
+ main = await render_to_sx("order-detail-content", order=order_data, calendar_entries=cal_data)
+ filt = await render_to_sx("order-detail-filter-content", order=order_data,
+ list_url=list_url, recheck_url=recheck_url, pay_url=pay_url, csrf=generate_csrf_token())
+ oobs = await render_to_sx_with_env("cart-order-detail-layout-oob", _ctx_to_env(ctx, oob=True),
+ detail_url=detail_url,
+ order_label=f"Order {order.id}",
+ )
+ return await oob_page_sx(oobs=oobs, filter=filt, content=main)
+
+
+async def render_checkout_error_page(ctx, error=None, order=None):
+ from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, full_page_sx
+ from shared.infrastructure.urls import cart_url
+ err_msg = error or "Unexpected error while creating the hosted checkout session."
+ order_sx = await render_to_sx("checkout-error-order-id", oid=f"#{order.id}") if order else None
+ hdr = await render_to_sx_with_env("layout-root-full", _ctx_to_env(ctx))
+ filt = await render_to_sx("checkout-error-header")
+ content = await render_to_sx("checkout-error-content", msg=err_msg,
+ order=SxExpr(order_sx) if order_sx else None, back_url=cart_url("/"))
+ return await full_page_sx(ctx, header_rows=hdr, filter=filt, content=content)
+
+
+async def render_cart_payments_panel(ctx):
+ from shared.sx.helpers import render_to_sx
+ page_config = ctx.get("page_config")
+ pc_data = None
+ if page_config:
+ pc_data = {
+ "sumup_api_key": bool(getattr(page_config, "sumup_api_key", None)),
+ "sumup_merchant_code": getattr(page_config, "sumup_merchant_code", None) or "",
+ "sumup_checkout_prefix": getattr(page_config, "sumup_checkout_prefix", None) or "",
+ }
+ return await render_to_sx("cart-payments-content", page_config=pc_data)
diff --git a/cart/sxc/pages/utils.py b/cart/sxc/pages/utils.py
new file mode 100644
index 0000000..f9afe9e
--- /dev/null
+++ b/cart/sxc/pages/utils.py
@@ -0,0 +1,40 @@
+"""Cart page utilities — serializers and formatters."""
+from __future__ import annotations
+
+from typing import Any
+
+
+def _serialize_order(order: Any) -> dict:
+ from shared.infrastructure.urls import market_product_url
+ created = order.created_at.strftime("%-d %b %Y, %H:%M") if order.created_at else "\u2014"
+ items = []
+ if order.items:
+ for item in order.items:
+ items.append({
+ "product_image": item.product_image,
+ "product_title": item.product_title or "Unknown product",
+ "product_id": item.product_id,
+ "product_slug": item.product_slug,
+ "product_url": market_product_url(item.product_slug),
+ "quantity": item.quantity,
+ "unit_price_formatted": f"{item.unit_price or 0:.2f}",
+ "currency": item.currency or order.currency or "GBP",
+ })
+ return {
+ "id": order.id,
+ "status": order.status or "pending",
+ "created_at_formatted": created,
+ "description": order.description or "",
+ "total_formatted": f"{order.total_amount or 0:.2f}",
+ "total_amount": float(order.total_amount or 0),
+ "currency": order.currency or "GBP",
+ "items": items,
+ }
+
+
+def _serialize_calendar_entry(e: Any) -> dict:
+ st = e.state or ""
+ ds = e.start_at.strftime("%-d %b %Y, %H:%M") if e.start_at else ""
+ if e.end_at:
+ ds += f" \u2013 {e.end_at.strftime('%-d %b %Y, %H:%M')}"
+ return {"name": e.name, "state": st, "date_str": ds, "cost_formatted": f"{e.cost or 0:.2f}"}
diff --git a/federation/bp/auth/routes.py b/federation/bp/auth/routes.py
index 16139fd..4e25ba4 100644
--- a/federation/bp/auth/routes.py
+++ b/federation/bp/auth/routes.py
@@ -46,7 +46,7 @@ async def _render_social_auth_page(component: str, title: str, **kwargs) -> str:
"""Render an auth page with social layout — replaces sx_components helpers."""
from shared.sx.helpers import render_to_sx
from shared.sx.page import get_template_context
- from sxc.pages import _social_page
+ from sxc.pages.utils import _social_page
ctx = await get_template_context()
content = await render_to_sx(component, **{k: v for k, v in kwargs.items() if v})
return await _social_page(ctx, None, content=content, title=title)
diff --git a/federation/bp/identity/routes.py b/federation/bp/identity/routes.py
index d101c5c..497303e 100644
--- a/federation/bp/identity/routes.py
+++ b/federation/bp/identity/routes.py
@@ -33,7 +33,7 @@ async def _render_choose_username(*, actor=None, error="", username=""):
from shared.sx.helpers import render_to_sx
from shared.sx.parser import SxExpr
from shared.sx.page import get_template_context
- from sxc.pages import _social_page
+ from sxc.pages.utils import _social_page
from markupsafe import escape
ctx = await get_template_context()
diff --git a/federation/bp/social/routes.py b/federation/bp/social/routes.py
index 1967955..56c3582 100644
--- a/federation/bp/social/routes.py
+++ b/federation/bp/social/routes.py
@@ -95,7 +95,7 @@ def register(url_prefix="/social"):
@bp.get("/search/page")
async def search_page():
- from sxc.pages import _serialize_remote_actor, _serialize_actor
+ from sxc.pages.utils import _serialize_remote_actor, _serialize_actor
actor = getattr(g, "_social_actor", None)
query = request.args.get("q", "").strip()
@@ -154,7 +154,7 @@ def register(url_prefix="/social"):
async def _actor_card_response(actor, remote_actor_url, is_followed):
"""Re-render a single actor card after follow/unfollow via HTMX."""
- from sxc.pages import _serialize_remote_actor, _serialize_actor
+ from sxc.pages.utils import _serialize_remote_actor, _serialize_actor
remote_dto = await services.federation.get_or_fetch_remote_actor(
g.s, remote_actor_url,
@@ -298,7 +298,7 @@ def register(url_prefix="/social"):
@bp.get("/following/page")
async def following_list_page():
- from sxc.pages import _serialize_remote_actor, _serialize_actor
+ from sxc.pages.utils import _serialize_remote_actor, _serialize_actor
actor = _require_actor()
page = request.args.get("page", 1, type=int)
@@ -320,7 +320,7 @@ def register(url_prefix="/social"):
@bp.get("/followers/page")
async def followers_list_page():
- from sxc.pages import _serialize_remote_actor, _serialize_actor
+ from sxc.pages.utils import _serialize_remote_actor, _serialize_actor
actor = _require_actor()
page = request.args.get("page", 1, type=int)
@@ -387,7 +387,7 @@ def register(url_prefix="/social"):
async def _render_timeline_items(items, timeline_type, actor, actor_id=None):
"""Render timeline pagination items as SX fragment."""
- from sxc.pages import _serialize_timeline_item, _serialize_actor
+ from sxc.pages.utils import _serialize_timeline_item, _serialize_actor
item_dicts = [_serialize_timeline_item(i) for i in items]
actor_data = _serialize_actor(actor)
diff --git a/federation/sxc/pages/__init__.py b/federation/sxc/pages/__init__.py
index b8af21f..983922b 100644
--- a/federation/sxc/pages/__init__.py
+++ b/federation/sxc/pages/__init__.py
@@ -1,8 +1,6 @@
"""Federation defpage setup — registers layouts and loads .sx pages."""
from __future__ import annotations
-from typing import Any
-
def setup_federation_pages() -> None:
"""Register federation-specific layouts and load page definitions."""
@@ -16,82 +14,7 @@ def _load_federation_page_files() -> None:
load_page_dir(os.path.dirname(__file__), "federation")
-# ---------------------------------------------------------------------------
-# Layouts — .sx defcomps read free variables from env
-# ---------------------------------------------------------------------------
-
def _register_federation_layouts() -> None:
from shared.sx.layouts import register_custom_layout
+ from .utils import _social_full, _social_oob
register_custom_layout("social", _social_full, _social_oob)
-
-
-def _actor_data(ctx: dict) -> dict | None:
- actor = ctx.get("actor")
- if not actor:
- return None
- from services.federation_page import _serialize_actor
- return _serialize_actor(actor)
-
-
-async def _social_full(ctx: dict, **kw: Any) -> str:
- from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
- env = _ctx_to_env(ctx)
- env["actor"] = kw.get("actor") or _actor_data(ctx)
- return await render_to_sx_with_env("social-layout-full", env)
-
-
-async def _social_oob(ctx: dict, **kw: Any) -> str:
- from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
- env = _ctx_to_env(ctx, oob=True)
- env["actor"] = kw.get("actor") or _actor_data(ctx)
- return await render_to_sx_with_env("social-layout-oob", env)
-
-
-# ---------------------------------------------------------------------------
-# Helpers still used by route handlers
-# ---------------------------------------------------------------------------
-
-def _serialize_actor(actor) -> dict | None:
- """Serialize an actor profile to a dict for sx defcomps."""
- from services.federation_page import _serialize_actor as _impl
- return _impl(actor)
-
-
-def _serialize_timeline_item(item) -> dict:
- """Serialize a timeline item DTO to a dict for sx defcomps."""
- from services.federation_page import _serialize_timeline_item as _impl
- return _impl(item)
-
-
-def _serialize_remote_actor(a) -> dict:
- """Serialize a remote actor DTO to a dict for sx defcomps."""
- from services.federation_page import _serialize_remote_actor as _impl
- return _impl(a)
-
-
-async def _social_page(ctx: dict, actor, *, content: str,
- title: str = "Rose Ash", meta_html: str = "") -> str:
- """Build a full social page with social header."""
- from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env, full_page_sx
- from markupsafe import escape
-
- env = _ctx_to_env(ctx)
- env["actor"] = _serialize_actor(actor) if actor else None
- header_rows = await render_to_sx_with_env("social-layout-full", env)
- return await full_page_sx(ctx, header_rows=header_rows, content=content,
- meta_html=meta_html or f'{escape(title)} ')
-
-
-def _get_actor():
- """Return current user's actor or None."""
- from quart import g
- return getattr(g, "_social_actor", None)
-
-
-def _require_actor():
- """Return current user's actor or abort 403."""
- from quart import abort
- actor = _get_actor()
- if not actor:
- abort(403, "You need to choose a federation username first")
- return actor
diff --git a/federation/sxc/pages/utils.py b/federation/sxc/pages/utils.py
new file mode 100644
index 0000000..97b1f01
--- /dev/null
+++ b/federation/sxc/pages/utils.py
@@ -0,0 +1,71 @@
+"""Federation page utilities — serializers, actor helpers, social page builder."""
+from __future__ import annotations
+
+from typing import Any
+
+
+def _serialize_actor(actor) -> dict | None:
+ """Serialize an actor profile to a dict for sx defcomps."""
+ from services.federation_page import _serialize_actor as _impl
+ return _impl(actor)
+
+
+def _serialize_timeline_item(item) -> dict:
+ """Serialize a timeline item DTO to a dict for sx defcomps."""
+ from services.federation_page import _serialize_timeline_item as _impl
+ return _impl(item)
+
+
+def _serialize_remote_actor(a) -> dict:
+ """Serialize a remote actor DTO to a dict for sx defcomps."""
+ from services.federation_page import _serialize_remote_actor as _impl
+ return _impl(a)
+
+
+async def _social_page(ctx: dict, actor, *, content: str,
+ title: str = "Rose Ash", meta_html: str = "") -> str:
+ """Build a full social page with social header."""
+ from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env, full_page_sx
+ from markupsafe import escape
+
+ env = _ctx_to_env(ctx)
+ env["actor"] = _serialize_actor(actor) if actor else None
+ header_rows = await render_to_sx_with_env("social-layout-full", env)
+ return await full_page_sx(ctx, header_rows=header_rows, content=content,
+ meta_html=meta_html or f'{escape(title)} ')
+
+
+def _get_actor():
+ """Return current user's actor or None."""
+ from quart import g
+ return getattr(g, "_social_actor", None)
+
+
+def _require_actor():
+ """Return current user's actor or abort 403."""
+ from quart import abort
+ actor = _get_actor()
+ if not actor:
+ abort(403, "You need to choose a federation username first")
+ return actor
+
+
+def _actor_data(ctx: dict) -> dict | None:
+ actor = ctx.get("actor")
+ if not actor:
+ return None
+ return _serialize_actor(actor)
+
+
+async def _social_full(ctx: dict, **kw: Any) -> str:
+ from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
+ env = _ctx_to_env(ctx)
+ env["actor"] = kw.get("actor") or _actor_data(ctx)
+ return await render_to_sx_with_env("social-layout-full", env)
+
+
+async def _social_oob(ctx: dict, **kw: Any) -> str:
+ from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
+ env = _ctx_to_env(ctx, oob=True)
+ env["actor"] = kw.get("actor") or _actor_data(ctx)
+ return await render_to_sx_with_env("social-layout-oob", env)
diff --git a/test/bp/dashboard/routes.py b/test/bp/dashboard/routes.py
index 25fb744..b44a0eb 100644
--- a/test/bp/dashboard/routes.py
+++ b/test/bp/dashboard/routes.py
@@ -14,7 +14,7 @@ def register(url_prefix: str = "/") -> Blueprint:
"""Full page dashboard with last results."""
from shared.sx.page import get_template_context
from shared.browser.app.csrf import generate_csrf_token
- from sxc.pages import render_dashboard_page_sx
+ from sxc.pages.renders import render_dashboard_page_sx
import runner
ctx = await get_template_context()
@@ -63,12 +63,12 @@ def register(url_prefix: str = "/") -> Blueprint:
if is_htmx:
# S-expression wire format — sx.js renders client-side
from shared.sx.helpers import sx_response
- from sxc.pages import test_detail_sx
+ from sxc.pages.renders import test_detail_sx
return sx_response(await test_detail_sx(test))
# Full page render (direct navigation / refresh)
from shared.sx.page import get_template_context
- from sxc.pages import render_test_detail_page_sx
+ from sxc.pages.renders import render_test_detail_page_sx
ctx = await get_template_context()
html = await render_test_detail_page_sx(ctx, test)
@@ -78,7 +78,7 @@ def register(url_prefix: str = "/") -> Blueprint:
async def results():
"""HTMX partial — poll target for results table."""
from shared.browser.app.csrf import generate_csrf_token
- from sxc.pages import render_results_partial_sx
+ from sxc.pages.renders import render_results_partial_sx
import runner
result = runner.get_results()
diff --git a/test/sxc/pages/__init__.py b/test/sxc/pages/__init__.py
index 299d9db..3d0ba3a 100644
--- a/test/sxc/pages/__init__.py
+++ b/test/sxc/pages/__init__.py
@@ -1,145 +1,13 @@
-"""Test service s-expression page components."""
+"""Test service defpage setup."""
from __future__ import annotations
-import os
-from datetime import datetime
-from shared.sx.jinja_bridge import load_service_components
-from shared.sx.helpers import render_to_sx, SxExpr, render_to_sx_with_env, _ctx_to_env, full_page_sx
-
-# Load test-specific .sx components at import time
-load_service_components(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
+def setup_test_pages() -> None:
+ """Load test page definitions."""
+ _load_test_page_files()
-def _format_time(ts: float | None) -> str:
- """Format a unix timestamp for display."""
- if not ts:
- return "never"
- return datetime.fromtimestamp(ts).strftime("%-d %b %Y, %H:%M:%S")
-
-
-_FILTER_MAP = {
- "passed": "passed",
- "failed": "failed",
- "errors": "error",
- "skipped": "skipped",
-}
-
-
-def _filter_tests(tests: list[dict], active_filter: str | None,
- active_service: str | None) -> list[dict]:
- """Filter tests by outcome and/or service."""
- from runner import _service_from_nodeid
- filtered = tests
- if active_filter and active_filter in _FILTER_MAP:
- outcome = _FILTER_MAP[active_filter]
- filtered = [t for t in filtered if t["outcome"] == outcome]
- if active_service:
- filtered = [t for t in filtered if _service_from_nodeid(t["nodeid"]) == active_service]
- return filtered
-
-
-def _service_list() -> list[str]:
- from runner import _SERVICE_ORDER
- return list(_SERVICE_ORDER)
-
-
-def _build_summary_data(result: dict | None, running: bool, csrf: str,
- active_filter: str | None) -> dict:
- """Prepare summary data dict for the ~test-results-partial defcomp."""
- if running and not result:
- return dict(state="running", status="running", passed="0", failed="0",
- errors="0", skipped="0", total="0", duration="...",
- last_run="in progress", running=True, csrf=csrf,
- active_filter=active_filter)
- if not result:
- return dict(state="no-results", status=None, passed="0", failed="0",
- errors="0", skipped="0", total="0", duration="0",
- last_run="never", running=running, csrf=csrf,
- active_filter=active_filter)
- status = "running" if running else result["status"]
- return dict(
- state="running" if running else "has-results",
- status=status,
- passed=str(result["passed"]),
- failed=str(result["failed"]),
- errors=str(result["errors"]),
- skipped=str(result.get("skipped", 0)),
- total=str(result["total"]),
- duration=str(result["duration"]),
- last_run=_format_time(result["finished_at"]) if not running else "in progress",
- running=running, csrf=csrf,
- active_filter=active_filter,
- )
-
-
-async def test_detail_sx(test: dict) -> str:
- """Return s-expression wire format for a test detail view."""
- return await render_to_sx("test-detail-section", test=test)
-
-
-async def render_dashboard_page_sx(ctx: dict, result: dict | None,
- running: bool, csrf: str,
- active_filter: str | None = None,
- active_service: str | None = None) -> str:
- """Full page: test dashboard (sx wire format)."""
- from runner import group_tests_by_service
-
- summary_data = _build_summary_data(result, running, csrf, active_filter)
- sections = []
- has_failures = "false"
- if result and not running:
- tests = _filter_tests(result.get("tests", []), active_filter, active_service)
- if tests:
- sections = group_tests_by_service(tests)
- has_failures = str(result["failed"] > 0 or result["errors"] > 0).lower()
- else:
- summary_data["state"] = "empty-filtered"
-
- inner = await render_to_sx("test-results-partial",
- summary_data=summary_data, sections=sections, has_failures=has_failures)
- content = await render_to_sx("test-results-wrap", running=running, inner=SxExpr(inner))
- hdr = await render_to_sx_with_env("test-layout-full", _ctx_to_env(ctx),
- services=_service_list(),
- active_service=active_service,
- )
- return await full_page_sx(ctx, header_rows=hdr, content=content)
-
-
-async def render_results_partial_sx(result: dict | None, running: bool,
- csrf: str,
- active_filter: str | None = None,
- active_service: str | None = None) -> str:
- """HTMX partial: results section (sx wire format)."""
- from runner import group_tests_by_service
-
- summary_data = _build_summary_data(result, running, csrf, active_filter)
- sections = []
- has_failures = "false"
- if result and not running:
- tests = _filter_tests(result.get("tests", []), active_filter, active_service)
- if tests:
- sections = group_tests_by_service(tests)
- has_failures = str(result["failed"] > 0 or result["errors"] > 0).lower()
- else:
- summary_data["state"] = "empty-filtered"
-
- inner = await render_to_sx("test-results-partial",
- summary_data=summary_data, sections=sections, has_failures=has_failures)
- return await render_to_sx("test-results-wrap", running=running, inner=SxExpr(inner))
-
-
-async def render_test_detail_page_sx(ctx: dict, test: dict) -> str:
- """Full page: test detail (sx wire format)."""
- hdr = await render_to_sx_with_env("test-detail-layout-full", _ctx_to_env(ctx),
- services=_service_list(),
- test_nodeid=test["nodeid"],
- test_label=test["nodeid"].rsplit("::", 1)[-1],
- )
- content = await render_to_sx("test-detail",
- nodeid=test["nodeid"],
- outcome=test["outcome"],
- duration=str(test["duration"]),
- longrepr=test.get("longrepr", ""),
- )
- return await full_page_sx(ctx, header_rows=hdr, content=content)
+def _load_test_page_files() -> None:
+ import os
+ from shared.sx.pages import load_page_dir
+ load_page_dir(os.path.dirname(__file__), "test")
diff --git a/test/sxc/pages/renders.py b/test/sxc/pages/renders.py
new file mode 100644
index 0000000..c907963
--- /dev/null
+++ b/test/sxc/pages/renders.py
@@ -0,0 +1,145 @@
+"""Test service render functions — called from bp routes."""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from shared.sx.jinja_bridge import load_service_components
+from shared.sx.helpers import render_to_sx, SxExpr, render_to_sx_with_env, _ctx_to_env, full_page_sx
+
+# Load test-specific .sx components at import time
+load_service_components(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
+
+
+def _format_time(ts: float | None) -> str:
+ """Format a unix timestamp for display."""
+ if not ts:
+ return "never"
+ return datetime.fromtimestamp(ts).strftime("%-d %b %Y, %H:%M:%S")
+
+
+_FILTER_MAP = {
+ "passed": "passed",
+ "failed": "failed",
+ "errors": "error",
+ "skipped": "skipped",
+}
+
+
+def _filter_tests(tests: list[dict], active_filter: str | None,
+ active_service: str | None) -> list[dict]:
+ """Filter tests by outcome and/or service."""
+ from runner import _service_from_nodeid
+ filtered = tests
+ if active_filter and active_filter in _FILTER_MAP:
+ outcome = _FILTER_MAP[active_filter]
+ filtered = [t for t in filtered if t["outcome"] == outcome]
+ if active_service:
+ filtered = [t for t in filtered if _service_from_nodeid(t["nodeid"]) == active_service]
+ return filtered
+
+
+def _service_list() -> list[str]:
+ from runner import _SERVICE_ORDER
+ return list(_SERVICE_ORDER)
+
+
+def _build_summary_data(result: dict | None, running: bool, csrf: str,
+ active_filter: str | None) -> dict:
+ """Prepare summary data dict for the ~test-results-partial defcomp."""
+ if running and not result:
+ return dict(state="running", status="running", passed="0", failed="0",
+ errors="0", skipped="0", total="0", duration="...",
+ last_run="in progress", running=True, csrf=csrf,
+ active_filter=active_filter)
+ if not result:
+ return dict(state="no-results", status=None, passed="0", failed="0",
+ errors="0", skipped="0", total="0", duration="0",
+ last_run="never", running=running, csrf=csrf,
+ active_filter=active_filter)
+ status = "running" if running else result["status"]
+ return dict(
+ state="running" if running else "has-results",
+ status=status,
+ passed=str(result["passed"]),
+ failed=str(result["failed"]),
+ errors=str(result["errors"]),
+ skipped=str(result.get("skipped", 0)),
+ total=str(result["total"]),
+ duration=str(result["duration"]),
+ last_run=_format_time(result["finished_at"]) if not running else "in progress",
+ running=running, csrf=csrf,
+ active_filter=active_filter,
+ )
+
+
+async def test_detail_sx(test: dict) -> str:
+ """Return s-expression wire format for a test detail view."""
+ return await render_to_sx("test-detail-section", test=test)
+
+
+async def render_dashboard_page_sx(ctx: dict, result: dict | None,
+ running: bool, csrf: str,
+ active_filter: str | None = None,
+ active_service: str | None = None) -> str:
+ """Full page: test dashboard (sx wire format)."""
+ from runner import group_tests_by_service
+
+ summary_data = _build_summary_data(result, running, csrf, active_filter)
+ sections = []
+ has_failures = "false"
+ if result and not running:
+ tests = _filter_tests(result.get("tests", []), active_filter, active_service)
+ if tests:
+ sections = group_tests_by_service(tests)
+ has_failures = str(result["failed"] > 0 or result["errors"] > 0).lower()
+ else:
+ summary_data["state"] = "empty-filtered"
+
+ inner = await render_to_sx("test-results-partial",
+ summary_data=summary_data, sections=sections, has_failures=has_failures)
+ content = await render_to_sx("test-results-wrap", running=running, inner=SxExpr(inner))
+ hdr = await render_to_sx_with_env("test-layout-full", _ctx_to_env(ctx),
+ services=_service_list(),
+ active_service=active_service,
+ )
+ return await full_page_sx(ctx, header_rows=hdr, content=content)
+
+
+async def render_results_partial_sx(result: dict | None, running: bool,
+ csrf: str,
+ active_filter: str | None = None,
+ active_service: str | None = None) -> str:
+ """HTMX partial: results section (sx wire format)."""
+ from runner import group_tests_by_service
+
+ summary_data = _build_summary_data(result, running, csrf, active_filter)
+ sections = []
+ has_failures = "false"
+ if result and not running:
+ tests = _filter_tests(result.get("tests", []), active_filter, active_service)
+ if tests:
+ sections = group_tests_by_service(tests)
+ has_failures = str(result["failed"] > 0 or result["errors"] > 0).lower()
+ else:
+ summary_data["state"] = "empty-filtered"
+
+ inner = await render_to_sx("test-results-partial",
+ summary_data=summary_data, sections=sections, has_failures=has_failures)
+ return await render_to_sx("test-results-wrap", running=running, inner=SxExpr(inner))
+
+
+async def render_test_detail_page_sx(ctx: dict, test: dict) -> str:
+ """Full page: test detail (sx wire format)."""
+ hdr = await render_to_sx_with_env("test-detail-layout-full", _ctx_to_env(ctx),
+ services=_service_list(),
+ test_nodeid=test["nodeid"],
+ test_label=test["nodeid"].rsplit("::", 1)[-1],
+ )
+ content = await render_to_sx("test-detail",
+ nodeid=test["nodeid"],
+ outcome=test["outcome"],
+ duration=str(test["duration"]),
+ longrepr=test.get("longrepr", ""),
+ )
+ return await full_page_sx(ctx, header_rows=hdr, content=content)