From 8eaf4026abf1d038eb9edada408d8ffdfe49f145 Mon Sep 17 00:00:00 2001 From: giles Date: Wed, 4 Mar 2026 16:51:57 +0000 Subject: [PATCH] Slim sxc/pages/__init__.py for federation, test, cart, blog MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move render functions, layouts, helpers, and utils from __init__.py to sub-modules (renders.py, layouts.py, helpers.py, utils.py). Update all bp route imports to point at sub-modules directly. Each __init__.py is now ≤20 lines of setup + registration. Co-Authored-By: Claude Opus 4.6 --- blog/bp/blog/routes.py | 8 +- blog/sxc/pages/__init__.py | 1103 +----------------------------- blog/sxc/pages/helpers.py | 713 +++++++++++++++++++ blog/sxc/pages/layouts.py | 217 ++++++ blog/sxc/pages/renders.py | 177 +++++ cart/bp/cart/global_routes.py | 4 +- cart/bp/cart/page_routes.py | 2 +- cart/bp/order/routes.py | 4 +- cart/bp/orders/routes.py | 2 +- cart/bp/page_admin/routes.py | 2 +- cart/sxc/pages/__init__.py | 308 +-------- cart/sxc/pages/layouts.py | 138 ++++ cart/sxc/pages/renders.py | 133 ++++ cart/sxc/pages/utils.py | 40 ++ federation/bp/auth/routes.py | 2 +- federation/bp/identity/routes.py | 2 +- federation/bp/social/routes.py | 10 +- federation/sxc/pages/__init__.py | 79 +-- federation/sxc/pages/utils.py | 71 ++ test/bp/dashboard/routes.py | 8 +- test/sxc/pages/__init__.py | 148 +--- test/sxc/pages/renders.py | 145 ++++ 22 files changed, 1668 insertions(+), 1648 deletions(-) create mode 100644 blog/sxc/pages/helpers.py create mode 100644 blog/sxc/pages/layouts.py create mode 100644 blog/sxc/pages/renders.py create mode 100644 cart/sxc/pages/layouts.py create mode 100644 cart/sxc/pages/renders.py create mode 100644 cart/sxc/pages/utils.py create mode 100644 federation/sxc/pages/utils.py create mode 100644 test/sxc/pages/renders.py diff --git a/blog/bp/blog/routes.py b/blog/bp/blog/routes.py index 3152c6f..a9a0ed1 100644 --- a/blog/bp/blog/routes.py +++ b/blog/bp/blog/routes.py @@ -229,7 +229,7 @@ def register(url_prefix, title): lexical_doc = json.loads(lexical_raw) except (json.JSONDecodeError, TypeError): from shared.sx.page import get_template_context - from sxc.pages import render_editor_panel + from sxc.pages.renders import render_editor_panel tctx = await get_template_context() tctx["editor_html"] = await render_editor_panel(save_error="Invalid JSON in editor content.") html = await _render_new_post_page(tctx) @@ -238,7 +238,7 @@ def register(url_prefix, title): ok, reason = validate_lexical(lexical_doc) if not ok: from shared.sx.page import get_template_context - from sxc.pages import render_editor_panel + from sxc.pages.renders import render_editor_panel tctx = await get_template_context() tctx["editor_html"] = await render_editor_panel(save_error=reason) html = await _render_new_post_page(tctx) @@ -285,7 +285,7 @@ def register(url_prefix, title): lexical_doc = json.loads(lexical_raw) except (json.JSONDecodeError, TypeError): from shared.sx.page import get_template_context - from sxc.pages import render_editor_panel + from sxc.pages.renders import render_editor_panel tctx = await get_template_context() tctx["editor_html"] = await render_editor_panel(save_error="Invalid JSON in editor content.", is_page=True) tctx["is_page"] = True @@ -295,7 +295,7 @@ def register(url_prefix, title): ok, reason = validate_lexical(lexical_doc) if not ok: from shared.sx.page import get_template_context - from sxc.pages import render_editor_panel + from sxc.pages.renders import render_editor_panel tctx = await get_template_context() tctx["editor_html"] = await render_editor_panel(save_error=reason, is_page=True) tctx["is_page"] = True diff --git a/blog/sxc/pages/__init__.py b/blog/sxc/pages/__init__.py index d4077bb..af6ee10 100644 --- a/blog/sxc/pages/__init__.py +++ b/blog/sxc/pages/__init__.py @@ -1,11 +1,11 @@ """Blog defpage setup — registers layouts, page helpers, and loads .sx pages.""" from __future__ import annotations -from typing import Any - def setup_blog_pages() -> None: """Register blog-specific layouts, page helpers, and load page definitions.""" + from .layouts import _register_blog_layouts + from .helpers import _register_blog_helpers _register_blog_layouts() _register_blog_helpers() _load_blog_page_files() @@ -15,1105 +15,6 @@ def _load_blog_page_files() -> None: import os from shared.sx.pages import load_page_dir from shared.sx.jinja_bridge import load_service_components - # Load blog .sx component definitions + handler definitions - # __file__ = blog/sxc/pages/__init__.py → blog root is 3 levels up blog_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) load_service_components(blog_dir, service_name="blog") load_page_dir(os.path.dirname(__file__), "blog") - - -# --------------------------------------------------------------------------- -# Shared hydration helpers -# --------------------------------------------------------------------------- - -def _add_to_defpage_ctx(**kwargs: Any) -> None: - from quart import g - if not hasattr(g, '_defpage_ctx'): - g._defpage_ctx = {} - g._defpage_ctx.update(kwargs) - - -async def _ensure_post_data(slug: str | None) -> None: - """Load post data and set g.post_data + defpage context. - - Replicates post bp's hydrate_post_data + context_processor. - """ - from quart import g, abort - - if hasattr(g, 'post_data') and g.post_data: - await _inject_post_context(g.post_data) - return - - if not slug: - abort(404) - - from bp.post.services.post_data import post_data - - is_admin = bool((g.get("rights") or {}).get("admin")) - p_data = await post_data(slug, g.s, include_drafts=True) - if not p_data: - abort(404) - - # Draft access control - if p_data["post"].get("status") != "published": - if is_admin: - pass - elif g.user and p_data["post"].get("user_id") == g.user.id: - pass - else: - abort(404) - - g.post_data = p_data - g.post_slug = slug - await _inject_post_context(p_data) - - -async def _inject_post_context(p_data: dict) -> None: - """Add post context_processor data to defpage context.""" - from shared.config import config - from shared.infrastructure.fragments import fetch_fragment - from shared.infrastructure.data_client import fetch_data - from shared.contracts.dtos import CartSummaryDTO, dto_from_dict - from shared.infrastructure.cart_identity import current_cart_identity - - db_post_id = p_data["post"]["id"] - post_slug = p_data["post"]["slug"] - - container_nav = await fetch_fragment("relations", "container-nav", params={ - "container_type": "page", - "container_id": str(db_post_id), - "post_slug": post_slug, - }) - - ctx: dict = { - **p_data, - "base_title": config()["title"], - "container_nav": container_nav, - } - - if p_data["post"].get("is_page"): - ident = current_cart_identity() - summary_params: dict = {"page_slug": post_slug} - if ident["user_id"] is not None: - summary_params["user_id"] = ident["user_id"] - if ident["session_id"] is not None: - summary_params["session_id"] = ident["session_id"] - raw_summary = await fetch_data( - "cart", "cart-summary", params=summary_params, required=False, - ) - page_summary = dto_from_dict(CartSummaryDTO, raw_summary) if raw_summary else CartSummaryDTO() - ctx["page_cart_count"] = ( - page_summary.count + page_summary.calendar_count + page_summary.ticket_count - ) - ctx["page_cart_total"] = float( - page_summary.total + page_summary.calendar_total + page_summary.ticket_total - ) - - _add_to_defpage_ctx(**ctx) - - -# --------------------------------------------------------------------------- -# Header helpers (moved from sx_components — thin render_to_sx wrappers) -# --------------------------------------------------------------------------- - -async def _blog_header_sx(ctx: dict, *, oob: bool = False) -> str: - from shared.sx.helpers import render_to_sx - from shared.sx.parser import SxExpr - return await render_to_sx("menu-row-sx", - id="blog-row", level=1, - link_label_content=SxExpr("(div)"), - child_id="blog-header-child", oob=oob) - - -async def _settings_header_sx(ctx: dict, *, oob: bool = False) -> str: - from shared.sx.helpers import render_to_sx - from shared.sx.parser import SxExpr - from quart import url_for as qurl - - settings_href = qurl("settings.defpage_settings_home") - label_sx = await render_to_sx("blog-admin-label") - nav_sx = await _settings_nav_sx(ctx) - - return await render_to_sx("menu-row-sx", - id="root-settings-row", level=1, - link_href=settings_href, - link_label_content=SxExpr(label_sx), - nav=SxExpr(nav_sx) if nav_sx else None, - child_id="root-settings-header-child", oob=oob) - - -async def _settings_nav_sx(ctx: dict) -> str: - from shared.sx.helpers import render_to_sx - return await render_to_sx("blog-settings-nav", - select_colours=ctx.get("select_colours", "")) - - -async def _sub_settings_header_sx(row_id: str, child_id: str, href: str, - icon: str, label: str, ctx: dict, - *, oob: bool = False, nav_sx: str = "") -> str: - from shared.sx.helpers import render_to_sx - from shared.sx.parser import SxExpr - - label_sx = await render_to_sx("blog-sub-settings-label", - icon=f"fa fa-{icon}", label=label) - return await render_to_sx("menu-row-sx", - id=row_id, level=2, - link_href=href, - link_label_content=SxExpr(label_sx), - nav=SxExpr(nav_sx) if nav_sx else None, - child_id=child_id, oob=oob) - - -# --------------------------------------------------------------------------- -# Layouts -# --------------------------------------------------------------------------- - -def _register_blog_layouts() -> None: - from shared.sx.layouts import register_custom_layout - register_custom_layout("blog", _blog_full, _blog_oob) - register_custom_layout("blog-settings", _settings_full, _settings_oob, - mobile_fn=_settings_mobile) - register_custom_layout("blog-cache", _cache_full, _cache_oob) - register_custom_layout("blog-snippets", _snippets_full, _snippets_oob) - register_custom_layout("blog-menu-items", _menu_items_full, _menu_items_oob) - register_custom_layout("blog-tag-groups", _tag_groups_full, _tag_groups_oob) - register_custom_layout("blog-tag-group-edit", - _tag_group_edit_full, _tag_group_edit_oob) - - -# --- Blog layout (root + blog header) --- - -async def _blog_full(ctx: dict, **kw: Any) -> str: - from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env - from shared.sx.parser import SxExpr - return await render_to_sx_with_env("blog-layout-full", _ctx_to_env(ctx), - blog_header=SxExpr(await _blog_header_sx(ctx))) - - -async def _blog_oob(ctx: dict, **kw: Any) -> str: - from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env, oob_header_sx - from shared.sx.parser import SxExpr - rows = await render_to_sx_with_env("blog-layout-full", _ctx_to_env(ctx), - blog_header=SxExpr(await _blog_header_sx(ctx))) - return await oob_header_sx("root-header-child", "blog-header-child", rows) - - -# --- Settings layout (root + settings header) --- - -async def _settings_full(ctx: dict, **kw: Any) -> str: - from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env - from shared.sx.parser import SxExpr - return await render_to_sx_with_env("settings-layout-full", _ctx_to_env(ctx), - settings_header=SxExpr(await _settings_header_sx(ctx))) - - -async def _settings_oob(ctx: dict, **kw: Any) -> str: - from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env, oob_header_sx - from shared.sx.parser import SxExpr - rows = await render_to_sx_with_env("settings-layout-full", _ctx_to_env(ctx), - settings_header=SxExpr(await _settings_header_sx(ctx))) - return await oob_header_sx("root-header-child", "root-settings-header-child", rows) - - -async def _settings_mobile(ctx: dict, **kw: Any) -> str: - return await _settings_nav_sx(ctx) - - -# --- Sub-settings helpers --- - -async def _sub_settings_full(ctx: dict, row_id: str, child_id: str, - endpoint: str, icon: str, label: str) -> str: - from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env - from shared.sx.parser import SxExpr - from quart import url_for as qurl - return await render_to_sx_with_env("sub-settings-layout-full", _ctx_to_env(ctx), - settings_header=SxExpr(await _settings_header_sx(ctx)), - sub_header=SxExpr(await _sub_settings_header_sx( - row_id, child_id, qurl(endpoint), icon, label, ctx))) - - -async def _sub_settings_oob(ctx: dict, row_id: str, child_id: str, - endpoint: str, icon: str, label: str) -> str: - from shared.sx.helpers import oob_header_sx, render_to_sx - from shared.sx.parser import SxExpr - from quart import url_for as qurl - settings_hdr_oob = await _settings_header_sx(ctx, oob=True) - sub_hdr = await _sub_settings_header_sx( - row_id, child_id, qurl(endpoint), icon, label, ctx) - sub_oob = await oob_header_sx("root-settings-header-child", child_id, sub_hdr) - return await render_to_sx("sub-settings-layout-oob", - settings_header_oob=SxExpr(settings_hdr_oob), - sub_header_oob=SxExpr(sub_oob)) - - -# --- Cache --- - -async def _cache_full(ctx: dict, **kw: Any) -> str: - return await _sub_settings_full(ctx, "cache-row", "cache-header-child", - "defpage_cache_page", "refresh", "Cache") - - -async def _cache_oob(ctx: dict, **kw: Any) -> str: - return await _sub_settings_oob(ctx, "cache-row", "cache-header-child", - "defpage_cache_page", "refresh", "Cache") - - -# --- Snippets --- - -async def _snippets_full(ctx: dict, **kw: Any) -> str: - return await _sub_settings_full(ctx, "snippets-row", "snippets-header-child", - "defpage_snippets_page", "puzzle-piece", "Snippets") - - -async def _snippets_oob(ctx: dict, **kw: Any) -> str: - return await _sub_settings_oob(ctx, "snippets-row", "snippets-header-child", - "defpage_snippets_page", "puzzle-piece", "Snippets") - - -# --- Menu Items --- - -async def _menu_items_full(ctx: dict, **kw: Any) -> str: - return await _sub_settings_full(ctx, "menu_items-row", "menu_items-header-child", - "defpage_menu_items_page", "bars", "Menu Items") - - -async def _menu_items_oob(ctx: dict, **kw: Any) -> str: - return await _sub_settings_oob(ctx, "menu_items-row", "menu_items-header-child", - "defpage_menu_items_page", "bars", "Menu Items") - - -# --- Tag Groups --- - -async def _tag_groups_full(ctx: dict, **kw: Any) -> str: - return await _sub_settings_full(ctx, "tag-groups-row", "tag-groups-header-child", - "defpage_tag_groups_page", "tags", "Tag Groups") - - -async def _tag_groups_oob(ctx: dict, **kw: Any) -> str: - return await _sub_settings_oob(ctx, "tag-groups-row", "tag-groups-header-child", - "defpage_tag_groups_page", "tags", "Tag Groups") - - -# --- Tag Group Edit --- - -async def _tag_group_edit_full(ctx: dict, **kw: Any) -> str: - from quart import request, url_for as qurl - from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env - from shared.sx.parser import SxExpr - g_id = (request.view_args or {}).get("id") - return await render_to_sx_with_env("sub-settings-layout-full", _ctx_to_env(ctx), - settings_header=SxExpr(await _settings_header_sx(ctx)), - sub_header=SxExpr(await _sub_settings_header_sx( - "tag-groups-row", "tag-groups-header-child", - qurl("defpage_tag_group_edit", id=g_id), - "tags", "Tag Groups", ctx))) - - -async def _tag_group_edit_oob(ctx: dict, **kw: Any) -> str: - from quart import request, url_for as qurl - from shared.sx.helpers import oob_header_sx, render_to_sx - from shared.sx.parser import SxExpr - g_id = (request.view_args or {}).get("id") - settings_hdr_oob = await _settings_header_sx(ctx, oob=True) - sub_hdr = await _sub_settings_header_sx( - "tag-groups-row", "tag-groups-header-child", - qurl("defpage_tag_group_edit", id=g_id), - "tags", "Tag Groups", ctx) - sub_oob = await oob_header_sx("root-settings-header-child", "tag-groups-header-child", sub_hdr) - return await render_to_sx("sub-settings-layout-oob", - settings_header_oob=SxExpr(settings_hdr_oob), - sub_header_oob=SxExpr(sub_oob)) - - -# --------------------------------------------------------------------------- -# Rendering helpers (moved from sx_components) -# --------------------------------------------------------------------------- - -def _raw_html_sx(html: str) -> str: - """Wrap raw HTML in (raw! "...") so it's valid inside sx source.""" - from shared.sx.parser import serialize as sx_serialize - if not html: - return "" - return "(raw! " + sx_serialize(html) + ")" - - -async def render_editor_panel(save_error: str | None = None, is_page: bool = False) -> str: - """Build the WYSIWYG editor panel HTML for new post/page creation.""" - import os - from quart import url_for as qurl, current_app - from shared.browser.app.csrf import generate_csrf_token - from shared.sx.helpers import render_to_sx - - csrf = generate_csrf_token() - asset_url_fn = current_app.jinja_env.globals.get("asset_url", lambda p: "") - editor_css = asset_url_fn("scripts/editor.css") - editor_js = asset_url_fn("scripts/editor.js") - sx_editor_js = asset_url_fn("scripts/sx-editor.js") - - upload_image_url = qurl("blog.editor_api.upload_image") - upload_media_url = qurl("blog.editor_api.upload_media") - upload_file_url = qurl("blog.editor_api.upload_file") - oembed_url = qurl("blog.editor_api.oembed_proxy") - snippets_url = qurl("blog.editor_api.list_snippets") - unsplash_key = os.environ.get("UNSPLASH_ACCESS_KEY", "") - - title_placeholder = "Page title..." if is_page else "Post title..." - create_label = "Create Page" if is_page else "Create Post" - - parts: list[str] = [] - - if save_error: - parts.append(await render_to_sx("blog-editor-error", error=str(save_error))) - - parts.append(await render_to_sx("blog-editor-form", - csrf=csrf, title_placeholder=title_placeholder, - create_label=create_label, - )) - - parts.append(await render_to_sx("blog-editor-styles", css_href=editor_css)) - parts.append(await render_to_sx("sx-editor-styles")) - - init_js = ( - "console.log('[EDITOR-DEBUG] init script running');\n" - "(function() {\n" - " console.log('[EDITOR-DEBUG] IIFE entered, mountEditor=', typeof window.mountEditor);\n" - " function init() {\n" - " var csrfToken = document.querySelector('input[name=\"csrf_token\"]').value;\n" - f" var uploadUrl = '{upload_image_url}';\n" - " var uploadUrls = {\n" - " image: uploadUrl,\n" - f" media: '{upload_media_url}',\n" - f" file: '{upload_file_url}',\n" - " };\n" - "\n" - " var fileInput = document.getElementById('feature-image-file');\n" - " var addBtn = document.getElementById('feature-image-add-btn');\n" - " var deleteBtn = document.getElementById('feature-image-delete-btn');\n" - " var preview = document.getElementById('feature-image-preview');\n" - " var emptyState = document.getElementById('feature-image-empty');\n" - " var filledState = document.getElementById('feature-image-filled');\n" - " var hiddenUrl = document.getElementById('feature-image-input');\n" - " var hiddenCaption = document.getElementById('feature-image-caption-input');\n" - " var captionInput = document.getElementById('feature-image-caption');\n" - " var uploading = document.getElementById('feature-image-uploading');\n" - "\n" - " function showFilled(url) {\n" - " preview.src = url;\n" - " hiddenUrl.value = url;\n" - " emptyState.classList.add('hidden');\n" - " filledState.classList.remove('hidden');\n" - " uploading.classList.add('hidden');\n" - " }\n" - "\n" - " function showEmpty() {\n" - " preview.src = '';\n" - " hiddenUrl.value = '';\n" - " hiddenCaption.value = '';\n" - " captionInput.value = '';\n" - " emptyState.classList.remove('hidden');\n" - " filledState.classList.add('hidden');\n" - " uploading.classList.add('hidden');\n" - " }\n" - "\n" - " function uploadFile(file) {\n" - " emptyState.classList.add('hidden');\n" - " uploading.classList.remove('hidden');\n" - " var fd = new FormData();\n" - " fd.append('file', file);\n" - " fetch(uploadUrl, {\n" - " method: 'POST',\n" - " body: fd,\n" - " headers: { 'X-CSRFToken': csrfToken },\n" - " })\n" - " .then(function(r) {\n" - " if (!r.ok) throw new Error('Upload failed (' + r.status + ')');\n" - " return r.json();\n" - " })\n" - " .then(function(data) {\n" - " var url = data.images && data.images[0] && data.images[0].url;\n" - " if (url) showFilled(url);\n" - " else { showEmpty(); alert('Upload succeeded but no image URL returned.'); }\n" - " })\n" - " .catch(function(e) {\n" - " showEmpty();\n" - " alert(e.message);\n" - " });\n" - " }\n" - "\n" - " addBtn.addEventListener('click', function() { fileInput.click(); });\n" - " preview.addEventListener('click', function() { fileInput.click(); });\n" - " deleteBtn.addEventListener('click', function(e) {\n" - " e.stopPropagation();\n" - " showEmpty();\n" - " });\n" - " fileInput.addEventListener('change', function() {\n" - " if (fileInput.files && fileInput.files[0]) {\n" - " uploadFile(fileInput.files[0]);\n" - " fileInput.value = '';\n" - " }\n" - " });\n" - " captionInput.addEventListener('input', function() {\n" - " hiddenCaption.value = captionInput.value;\n" - " });\n" - "\n" - " var excerpt = document.querySelector('textarea[name=\"custom_excerpt\"]');\n" - " function autoResize() {\n" - " excerpt.style.height = 'auto';\n" - " excerpt.style.height = excerpt.scrollHeight + 'px';\n" - " }\n" - " excerpt.addEventListener('input', autoResize);\n" - " autoResize();\n" - "\n" - " window.mountEditor('lexical-editor', {\n" - " initialJson: null,\n" - " csrfToken: csrfToken,\n" - " uploadUrls: uploadUrls,\n" - f" oembedUrl: '{oembed_url}',\n" - f" unsplashApiKey: '{unsplash_key}',\n" - f" snippetsUrl: '{snippets_url}',\n" - " });\n" - "\n" - " if (typeof SxEditor !== 'undefined') {\n" - " SxEditor.mount('sx-editor', {\n" - " initialSx: (document.getElementById('sx-content-input') || {}).value || null,\n" - " csrfToken: csrfToken,\n" - " uploadUrls: uploadUrls,\n" - f" oembedUrl: '{oembed_url}',\n" - " onChange: function(sx) {\n" - " document.getElementById('sx-content-input').value = sx;\n" - " }\n" - " });\n" - " }\n" - "\n" - " document.addEventListener('keydown', function(e) {\n" - " if ((e.ctrlKey || e.metaKey) && e.key === 's') {\n" - " e.preventDefault();\n" - " document.getElementById('post-new-form').requestSubmit();\n" - " }\n" - " });\n" - " }\n" - "\n" - " if (typeof window.mountEditor === 'function') {\n" - " init();\n" - " } else {\n" - " var _t = setInterval(function() {\n" - " if (typeof window.mountEditor === 'function') { clearInterval(_t); init(); }\n" - " }, 50);\n" - " }\n" - "})();\n" - ) - parts.append(await render_to_sx("blog-editor-scripts", - js_src=editor_js, - sx_editor_js_src=sx_editor_js, - init_js=init_js)) - - from shared.sx.parser import SxExpr - return await render_to_sx("blog-editor-panel", - parts=SxExpr("(<> " + " ".join(parts) + ")")) if parts else "" - - -# --------------------------------------------------------------------------- -# Page helpers (async functions available in .sx defpage expressions) -# --------------------------------------------------------------------------- - -def _register_blog_helpers() -> None: - from shared.sx.pages import register_page_helpers - register_page_helpers("blog", { - "editor-content": _h_editor_content, - "editor-page-content": _h_editor_page_content, - "post-admin-content": _h_post_admin_content, - "post-data-content": _h_post_data_content, - "post-preview-content": _h_post_preview_content, - "post-entries-content": _h_post_entries_content, - "post-settings-content": _h_post_settings_content, - "post-edit-content": _h_post_edit_content, - }) - - -# --- Editor helpers --- - -async def _h_editor_content(**kw): - return await render_editor_panel() - - -async def _h_editor_page_content(**kw): - return await render_editor_panel(is_page=True) - - -# --- Post admin helpers --- - -async def _h_post_admin_content(slug=None, **kw): - await _ensure_post_data(slug) - return '(div :class "pb-8")' - - -async def _h_post_data_content(slug=None, **kw): - await _ensure_post_data(slug) - from quart import g - from markupsafe import escape as esc - - original_post = getattr(g, "post_data", {}).get("original_post") - if original_post is None: - return _raw_html_sx('
No post data available.
') - - tablename = getattr(original_post, "__tablename__", "?") - - def _render_scalar_table(obj): - rows = [] - for col in obj.__mapper__.columns: - key = col.key - if key == "_sa_instance_state": - continue - val = getattr(obj, key, None) - if val is None: - val_html = '\u2014' - elif hasattr(val, "isoformat"): - val_html = f'
{esc(val.isoformat())}
' - elif isinstance(val, str): - val_html = f'
{esc(val)}
' - else: - val_html = f'
{esc(str(val))}
' - rows.append( - f'' - f'{esc(key)}' - f'{val_html}' - ) - return ( - '
' - '' - '' - '' - '' - '' + "".join(rows) + '
FieldValue
' - ) - - def _render_model(obj, depth=0, max_depth=2): - parts = [_render_scalar_table(obj)] - rel_parts = [] - for rel in obj.__mapper__.relationships: - rel_name = rel.key - loaded = rel_name in obj.__dict__ - value = getattr(obj, rel_name, None) if loaded else None - cardinality = "many" if rel.uselist else "one" - cls_name = rel.mapper.class_.__name__ - loaded_label = "" if loaded else " \u2022 not loaded" - - inner = "" - if value is None: - inner = '\u2014' - elif rel.uselist: - items = list(value) if value else [] - inner = f'
{len(items)} item{"" if len(items) == 1 else "s"}
' - if items and depth < max_depth: - sub_rows = [] - for i, it in enumerate(items, 1): - ident_parts = [] - for k in ("id", "ghost_id", "uuid", "slug", "name", "title"): - if k in it.__mapper__.c: - v = getattr(it, k, "") - ident_parts.append(f"{k}={v}") - summary = " \u2022 ".join(ident_parts) if ident_parts else str(it) - child_html = "" - if depth < max_depth: - child_html = f'
{_render_model(it, depth + 1, max_depth)}
' - else: - child_html = '
\u2026max depth reached\u2026
' - sub_rows.append( - f'' - f'{i}' - f'
{esc(summary)}
{child_html}' - ) - inner += ( - '
' - '' - '' - '' - + "".join(sub_rows) + '
#Summary
' - ) - else: - child = value - ident_parts = [] - for k in ("id", "ghost_id", "uuid", "slug", "name", "title"): - if k in child.__mapper__.c: - v = getattr(child, k, "") - ident_parts.append(f"{k}={v}") - summary = " \u2022 ".join(ident_parts) if ident_parts else str(child) - inner = f'
{esc(summary)}
' - if depth < max_depth: - inner += f'
{_render_model(child, depth + 1, max_depth)}
' - else: - inner += '
\u2026max depth reached\u2026
' - - rel_parts.append( - f'
' - f'
' - f'Relationship: {esc(rel_name)}' - f' {cardinality} \u2192 {esc(cls_name)}{loaded_label}
' - f'
{inner}
' - ) - if rel_parts: - parts.append('
' + "".join(rel_parts) + '
') - return '
' + "".join(parts) + '
' - - html = ( - f'
' - f'
Model: Post \u2022 Table: {esc(tablename)}
' - f'{_render_model(original_post, 0, 2)}
' - ) - return _raw_html_sx(html) - - -async def _h_post_preview_content(slug=None, **kw): - await _ensure_post_data(slug) - from quart import g - from shared.services.registry import services - from shared.sx.helpers import render_to_sx - from shared.sx.parser import SxExpr, serialize as sx_serialize - - preview = await services.blog_page.preview_data(g.s) - - sections: list[str] = [] - if preview.get("sx_pretty"): - sections.append(await render_to_sx("blog-preview-section", - title="S-Expression Source", content=SxExpr(preview["sx_pretty"]))) - if preview.get("json_pretty"): - sections.append(await render_to_sx("blog-preview-section", - title="Lexical JSON", content=SxExpr(preview["json_pretty"]))) - if preview.get("sx_rendered"): - rendered_sx = f'(div :class "blog-content prose max-w-none" (raw! {sx_serialize(preview["sx_rendered"])}))' - sections.append(await render_to_sx("blog-preview-section", - title="SX Rendered", content=SxExpr(rendered_sx))) - if preview.get("lex_rendered"): - rendered_sx = f'(div :class "blog-content prose max-w-none" (raw! {sx_serialize(preview["lex_rendered"])}))' - sections.append(await render_to_sx("blog-preview-section", - title="Lexical Rendered", content=SxExpr(rendered_sx))) - - if not sections: - return '(div :class "p-8 text-stone-500" "No content to preview.")' - - inner = " ".join(sections) - return await render_to_sx("blog-preview-panel", sections=SxExpr(f"(<> {inner})")) - - -async def _h_post_entries_content(slug=None, **kw): - await _ensure_post_data(slug) - from quart import g, url_for as qurl - from sqlalchemy import select - from markupsafe import escape as esc - from shared.models.calendars import Calendar - from shared.utils import host_url - from bp.post.services.entry_associations import get_post_entry_ids - from bp.post.admin.routes import _render_associated_entries - - post_id = g.post_data["post"]["id"] - post_slug = g.post_data["post"]["slug"] - associated_entry_ids = await get_post_entry_ids(post_id) - result = await g.s.execute( - select(Calendar) - .where(Calendar.deleted_at.is_(None)) - .order_by(Calendar.name.asc()) - ) - all_calendars = result.scalars().all() - for calendar in all_calendars: - await g.s.refresh(calendar, ["entries", "post"]) - - # Associated entries list - assoc_html = await _render_associated_entries(all_calendars, associated_entry_ids, post_slug) - - # Calendar browser - cal_items: list[str] = [] - for cal in all_calendars: - cal_post = getattr(cal, "post", None) - cal_fi = getattr(cal_post, "feature_image", None) if cal_post else None - cal_title = esc(getattr(cal_post, "title", "")) if cal_post else "" - cal_name = esc(getattr(cal, "name", "")) - cal_view_url = host_url(qurl("blog.post.admin.calendar_view", slug=post_slug, calendar_id=cal.id)) - - img_html = ( - f'{cal_title}' - if cal_fi else - '
' - ) - cal_items.append( - f'
' - f'' - f'{img_html}' - f'
' - f'
{cal_name}
' - f'
{cal_title}
' - f'
' - f'
' - f'
Loading calendar...
' - f'
' - ) - - if cal_items: - browser_html = ( - '

Browse Calendars

' - + "".join(cal_items) + '
' - ) - else: - browser_html = '

Browse Calendars

No calendars found.
' - - return ( - _raw_html_sx('
') - + assoc_html - + _raw_html_sx(browser_html + '
') - ) - - -async def _h_post_settings_content(slug=None, **kw): - await _ensure_post_data(slug) - from quart import g, request - from markupsafe import escape as esc - from models.ghost_content import Post - from sqlalchemy import select as sa_select - from sqlalchemy.orm import selectinload - from shared.browser.app.csrf import generate_csrf_token - from bp.post.admin.routes import _post_to_edit_dict - - post_id = g.post_data["post"]["id"] - post = (await g.s.execute( - sa_select(Post) - .where(Post.id == post_id) - .options(selectinload(Post.tags)) - )).scalar_one_or_none() - ghost_post = _post_to_edit_dict(post) if post else {} - save_success = request.args.get("saved") == "1" - csrf = generate_csrf_token() - - p = g.post_data.get("post", {}) if hasattr(g, "post_data") else {} - is_page = p.get("is_page", False) - gp = ghost_post - - def field_label(text, field_for=None): - for_attr = f' for="{field_for}"' if field_for else '' - return f'{esc(text)}' - - input_cls = ('w-full text-[14px] rounded-[6px] border border-stone-200 px-[10px] py-[7px] ' - 'bg-white text-stone-700 placeholder:text-stone-300 ' - 'focus:outline-none focus:border-stone-400 focus:ring-1 focus:ring-stone-300') - textarea_cls = input_cls + ' resize-y' - - def text_input(name, value='', placeholder='', input_type='text', maxlength=None): - ml = f' maxlength="{maxlength}"' if maxlength else '' - return (f'') - - def textarea_input(name, value='', placeholder='', rows=3, maxlength=None): - ml = f' maxlength="{maxlength}"' if maxlength else '' - return (f'') - - def checkbox_input(name, checked=False, label=''): - chk = ' checked' if checked else '' - return (f'') - - def section(title, content, is_open=False): - open_attr = ' open' if is_open else '' - return (f'
' - f'{esc(title)}' - f'
{content}
') - - # General section - slug_placeholder = 'page-slug' if is_page else 'post-slug' - pub_at = gp.get("published_at") or "" - pub_at_val = pub_at[:16] if pub_at else "" - vis = gp.get("visibility") or "public" - vis_opts = "".join( - f'' - for v, l in [("public", "Public"), ("members", "Members"), ("paid", "Paid")] - ) - - general = ( - f'
{field_label("Slug", "settings-slug")}{text_input("slug", gp.get("slug") or "", slug_placeholder)}
' - f'
{field_label("Published at", "settings-published_at")}' - f'
' - f'
{checkbox_input("featured", gp.get("featured"), "Featured page" if is_page else "Featured post")}
' - f'
{field_label("Visibility", "settings-visibility")}' - f'
' - f'
{checkbox_input("email_only", gp.get("email_only"), "Email only")}
' - ) - - # Tags - tags = gp.get("tags") or [] - if tags: - tag_names = ", ".join(getattr(t, "name", t.get("name", "") if isinstance(t, dict) else str(t)) for t in tags) - else: - tag_names = "" - tags_sec = ( - f'
{field_label("Tags (comma-separated)", "settings-tags")}' - f'{text_input("tags", tag_names, "news, updates, featured")}' - f'

Unknown tags will be created automatically.

' - ) - - fi_sec = f'
{field_label("Alt text", "settings-feature_image_alt")}{text_input("feature_image_alt", gp.get("feature_image_alt") or "", "Describe the feature image")}
' - - seo_sec = ( - f'
{field_label("Meta title", "settings-meta_title")}{text_input("meta_title", gp.get("meta_title") or "", "SEO title", maxlength=300)}' - f'

Recommended: 70 characters. Max: 300.

' - f'
{field_label("Meta description", "settings-meta_description")}{textarea_input("meta_description", gp.get("meta_description") or "", "SEO description", rows=2, maxlength=500)}' - f'

Recommended: 156 characters.

' - f'
{field_label("Canonical URL", "settings-canonical_url")}{text_input("canonical_url", gp.get("canonical_url") or "", "https://example.com/original-post", input_type="url")}
' - ) - - og_sec = ( - f'
{field_label("OG title", "settings-og_title")}{text_input("og_title", gp.get("og_title") or "")}
' - f'
{field_label("OG description", "settings-og_description")}{textarea_input("og_description", gp.get("og_description") or "", rows=2)}
' - f'
{field_label("OG image URL", "settings-og_image")}{text_input("og_image", gp.get("og_image") or "", "https://...", input_type="url")}
' - ) - - tw_sec = ( - f'
{field_label("Twitter title", "settings-twitter_title")}{text_input("twitter_title", gp.get("twitter_title") or "")}
' - f'
{field_label("Twitter description", "settings-twitter_description")}{textarea_input("twitter_description", gp.get("twitter_description") or "", rows=2)}
' - f'
{field_label("Twitter image URL", "settings-twitter_image")}{text_input("twitter_image", gp.get("twitter_image") or "", "https://...", input_type="url")}
' - ) - - tmpl_placeholder = 'custom-page.hbs' if is_page else 'custom-post.hbs' - adv_sec = f'
{field_label("Custom template", "settings-custom_template")}{text_input("custom_template", gp.get("custom_template") or "", tmpl_placeholder)}
' - - sections = ( - section("General", general, is_open=True) - + section("Tags", tags_sec) - + section("Feature Image", fi_sec) - + section("SEO / Meta", seo_sec) - + section("Facebook / OpenGraph", og_sec) - + section("X / Twitter", tw_sec) - + section("Advanced", adv_sec) - ) - - saved_html = 'Saved.' if save_success else '' - - html = ( - f'
' - f'' - f'' - f'
{sections}
' - f'
' - f'' - f'{saved_html}
' - ) - return _raw_html_sx(html) - - -async def _h_post_edit_content(slug=None, **kw): - await _ensure_post_data(slug) - import os - from quart import g, request as qrequest, url_for as qurl, current_app - from models.ghost_content import Post - from sqlalchemy import select as sa_select - from sqlalchemy.orm import selectinload - from shared.infrastructure.data_client import fetch_data - from shared.browser.app.csrf import generate_csrf_token - from shared.sx.helpers import render_to_sx - from shared.sx.parser import SxExpr, serialize as sx_serialize - from bp.post.admin.routes import _post_to_edit_dict - - post_id = g.post_data["post"]["id"] - db_post = (await g.s.execute( - sa_select(Post) - .where(Post.id == post_id) - .options(selectinload(Post.tags)) - )).scalar_one_or_none() - ghost_post = _post_to_edit_dict(db_post) if db_post else {} - save_success = qrequest.args.get("saved") == "1" - save_error = qrequest.args.get("error", "") - raw_newsletters = await fetch_data("account", "newsletters", required=False) or [] - from types import SimpleNamespace - newsletters = [SimpleNamespace(**nl) for nl in raw_newsletters] - - csrf = generate_csrf_token() - asset_url_fn = current_app.jinja_env.globals.get("asset_url", lambda p: "") - editor_css = asset_url_fn("scripts/editor.css") - editor_js = asset_url_fn("scripts/editor.js") - sx_editor_js = asset_url_fn("scripts/sx-editor.js") - - upload_image_url = qurl("blog.editor_api.upload_image") - upload_media_url = qurl("blog.editor_api.upload_media") - upload_file_url = qurl("blog.editor_api.upload_file") - oembed_url = qurl("blog.editor_api.oembed_proxy") - snippets_url = qurl("blog.editor_api.list_snippets") - unsplash_key = os.environ.get("UNSPLASH_ACCESS_KEY", "") - - post = g.post_data.get("post", {}) if hasattr(g, "post_data") else {} - is_page = post.get("is_page", False) - - feature_image = ghost_post.get("feature_image") or "" - feature_image_caption = ghost_post.get("feature_image_caption") or "" - title_val = ghost_post.get("title") or "" - excerpt_val = ghost_post.get("custom_excerpt") or "" - updated_at = ghost_post.get("updated_at") or "" - status = ghost_post.get("status") or "draft" - lexical_json = ghost_post.get("lexical") or '{"root":{"children":[{"children":[],"direction":null,"format":"","indent":0,"type":"paragraph","version":1}],"direction":null,"format":"","indent":0,"type":"root","version":1}}' - sx_content = ghost_post.get("sx_content") or "" - has_sx = bool(sx_content) - - already_emailed = bool(ghost_post and ghost_post.get("email") and (ghost_post["email"] if isinstance(ghost_post["email"], dict) else {}).get("status")) - email_obj = ghost_post.get("email") - if email_obj and not isinstance(email_obj, dict): - already_emailed = bool(getattr(email_obj, "status", None)) - - title_placeholder = "Page title..." if is_page else "Post title..." - - # Newsletter options as SX fragment - nl_parts = ['(option :value "" "Select newsletter\u2026")'] - for nl in newsletters: - nl_slug = sx_serialize(getattr(nl, "slug", "")) - nl_name = sx_serialize(getattr(nl, "name", "")) - nl_parts.append(f"(option :value {nl_slug} {nl_name})") - nl_opts_sx = SxExpr("(<> " + " ".join(nl_parts) + ")") - - # Footer extra badges as SX fragment - badge_parts: list[str] = [] - if save_success: - badge_parts.append('(span :class "text-[14px] text-green-600" "Saved.")') - publish_requested = qrequest.args.get("publish_requested") if hasattr(qrequest, 'args') else None - if publish_requested: - badge_parts.append('(span :class "text-[14px] text-blue-600" "Publish requested \u2014 an admin will review.")') - if post.get("publish_requested"): - badge_parts.append('(span :class "inline-block px-2 py-0.5 rounded-full text-xs font-semibold bg-blue-100 text-blue-800" "Publish requested")') - if already_emailed: - nl_name = "" - newsletter = ghost_post.get("newsletter") - if newsletter: - nl_name = getattr(newsletter, "name", "") if not isinstance(newsletter, dict) else newsletter.get("name", "") - suffix = f" to {nl_name}" if nl_name else "" - badge_parts.append(f'(span :class "inline-block px-2 py-0.5 rounded-full text-xs font-semibold bg-green-100 text-green-800" "Emailed{suffix}")') - footer_extra_sx = SxExpr("(<> " + " ".join(badge_parts) + ")") if badge_parts else None - - parts: list[str] = [] - - if save_error: - parts.append(await render_to_sx("blog-editor-error", error=save_error)) - - parts.append(await render_to_sx("blog-editor-edit-form", - csrf=csrf, - updated_at=str(updated_at), - title_val=title_val, - excerpt_val=excerpt_val, - feature_image=feature_image, - feature_image_caption=feature_image_caption, - sx_content_val=sx_content, - lexical_json=lexical_json, - has_sx=has_sx, - title_placeholder=title_placeholder, - status=status, - already_emailed=already_emailed, - newsletter_options=nl_opts_sx, - footer_extra=footer_extra_sx, - )) - - parts.append(await render_to_sx("blog-editor-publish-js", already_emailed=already_emailed)) - parts.append(await render_to_sx("blog-editor-styles", css_href=editor_css)) - parts.append(await render_to_sx("sx-editor-styles")) - - init_js = ( - '(function() {' - " function applyEditorFontSize() {" - " document.documentElement.style.fontSize = '62.5%';" - " document.body.style.fontSize = '1.6rem';" - ' }' - " function restoreDefaultFontSize() {" - " document.documentElement.style.fontSize = '';" - " document.body.style.fontSize = '';" - ' }' - ' applyEditorFontSize();' - " document.body.addEventListener('htmx:beforeSwap', function cleanup(e) {" - " if (e.detail.target && e.detail.target.id === 'main-panel') {" - ' restoreDefaultFontSize();' - " document.body.removeEventListener('htmx:beforeSwap', cleanup);" - ' }' - ' });' - ' function init() {' - " var csrfToken = document.querySelector('input[name=\"csrf_token\"]').value;" - f" var uploadUrl = '{upload_image_url}';" - ' var uploadUrls = {' - ' image: uploadUrl,' - f" media: '{upload_media_url}'," - f" file: '{upload_file_url}'," - ' };' - " var fileInput = document.getElementById('feature-image-file');" - " var addBtn = document.getElementById('feature-image-add-btn');" - " var deleteBtn = document.getElementById('feature-image-delete-btn');" - " var preview = document.getElementById('feature-image-preview');" - " var emptyState = document.getElementById('feature-image-empty');" - " var filledState = document.getElementById('feature-image-filled');" - " var hiddenUrl = document.getElementById('feature-image-input');" - " var hiddenCaption = document.getElementById('feature-image-caption-input');" - " var captionInput = document.getElementById('feature-image-caption');" - " var uploading = document.getElementById('feature-image-uploading');" - ' function showFilled(url) {' - ' preview.src = url; hiddenUrl.value = url;' - " emptyState.classList.add('hidden'); filledState.classList.remove('hidden'); uploading.classList.add('hidden');" - ' }' - ' function showEmpty() {' - " preview.src = ''; hiddenUrl.value = ''; hiddenCaption.value = ''; captionInput.value = '';" - " emptyState.classList.remove('hidden'); filledState.classList.add('hidden'); uploading.classList.add('hidden');" - ' }' - ' function uploadFile(file) {' - " emptyState.classList.add('hidden'); uploading.classList.remove('hidden');" - " var fd = new FormData(); fd.append('file', file);" - " fetch(uploadUrl, { method: 'POST', body: fd, headers: { 'X-CSRFToken': csrfToken } })" - " .then(function(r) { if (!r.ok) throw new Error('Upload failed (' + r.status + ')'); return r.json(); })" - ' .then(function(data) {' - ' var url = data.images && data.images[0] && data.images[0].url;' - " if (url) showFilled(url); else { showEmpty(); alert('Upload succeeded but no image URL returned.'); }" - ' })' - ' .catch(function(e) { showEmpty(); alert(e.message); });' - ' }' - " addBtn.addEventListener('click', function() { fileInput.click(); });" - " preview.addEventListener('click', function() { fileInput.click(); });" - " deleteBtn.addEventListener('click', function(e) { e.stopPropagation(); showEmpty(); });" - " fileInput.addEventListener('change', function() {" - ' if (fileInput.files && fileInput.files[0]) { uploadFile(fileInput.files[0]); fileInput.value = \'\'; }' - ' });' - " captionInput.addEventListener('input', function() { hiddenCaption.value = captionInput.value; });" - " var excerpt = document.querySelector('textarea[name=\"custom_excerpt\"]');" - " function autoResize() { excerpt.style.height = 'auto'; excerpt.style.height = excerpt.scrollHeight + 'px'; }" - " excerpt.addEventListener('input', autoResize); autoResize();" - ' var dataEl = document.getElementById(\'lexical-initial-data\');' - ' var initialJson = dataEl ? dataEl.textContent.trim() : null;' - ' if (initialJson) { var hidden = document.getElementById(\'lexical-json-input\'); if (hidden) hidden.value = initialJson; }' - " window.mountEditor('lexical-editor', {" - ' initialJson: initialJson,' - ' csrfToken: csrfToken,' - ' uploadUrls: uploadUrls,' - f" oembedUrl: '{oembed_url}'," - f" unsplashApiKey: '{unsplash_key}'," - f" snippetsUrl: '{snippets_url}'," - ' });' - " if (typeof SxEditor !== 'undefined') {" - " SxEditor.mount('sx-editor', {" - " initialSx: (document.getElementById('sx-content-input') || {}).value || null," - ' csrfToken: csrfToken,' - ' uploadUrls: uploadUrls,' - f" oembedUrl: '{oembed_url}'," - ' onChange: function(sx) {' - " document.getElementById('sx-content-input').value = sx;" - ' }' - ' });' - ' }' - " document.addEventListener('keydown', function(e) {" - " if ((e.ctrlKey || e.metaKey) && e.key === 's') {" - " e.preventDefault(); document.getElementById('post-edit-form').requestSubmit();" - ' }' - ' });' - ' }' - " if (typeof window.mountEditor === 'function') { init(); }" - ' else { var _t = setInterval(function() {' - " if (typeof window.mountEditor === 'function') { clearInterval(_t); init(); }" - ' }, 50); }' - '})();' - ) - parts.append(await render_to_sx("blog-editor-scripts", - js_src=editor_js, - sx_editor_js_src=sx_editor_js, - init_js=init_js)) - - return await render_to_sx("blog-editor-panel", - parts=SxExpr("(<> " + " ".join(parts) + ")")) - - diff --git a/blog/sxc/pages/helpers.py b/blog/sxc/pages/helpers.py new file mode 100644 index 0000000..4c75d33 --- /dev/null +++ b/blog/sxc/pages/helpers.py @@ -0,0 +1,713 @@ +"""Blog page helpers — async functions available in .sx defpage expressions.""" +from __future__ import annotations + +from typing import Any + + +# --------------------------------------------------------------------------- +# Shared hydration helpers +# --------------------------------------------------------------------------- + +def _add_to_defpage_ctx(**kwargs: Any) -> None: + from quart import g + if not hasattr(g, '_defpage_ctx'): + g._defpage_ctx = {} + g._defpage_ctx.update(kwargs) + + +async def _ensure_post_data(slug: str | None) -> None: + """Load post data and set g.post_data + defpage context. + + Replicates post bp's hydrate_post_data + context_processor. + """ + from quart import g, abort + + if hasattr(g, 'post_data') and g.post_data: + await _inject_post_context(g.post_data) + return + + if not slug: + abort(404) + + from bp.post.services.post_data import post_data + + is_admin = bool((g.get("rights") or {}).get("admin")) + p_data = await post_data(slug, g.s, include_drafts=True) + if not p_data: + abort(404) + + # Draft access control + if p_data["post"].get("status") != "published": + if is_admin: + pass + elif g.user and p_data["post"].get("user_id") == g.user.id: + pass + else: + abort(404) + + g.post_data = p_data + g.post_slug = slug + await _inject_post_context(p_data) + + +async def _inject_post_context(p_data: dict) -> None: + """Add post context_processor data to defpage context.""" + from shared.config import config + from shared.infrastructure.fragments import fetch_fragment + from shared.infrastructure.data_client import fetch_data + from shared.contracts.dtos import CartSummaryDTO, dto_from_dict + from shared.infrastructure.cart_identity import current_cart_identity + + db_post_id = p_data["post"]["id"] + post_slug = p_data["post"]["slug"] + + container_nav = await fetch_fragment("relations", "container-nav", params={ + "container_type": "page", + "container_id": str(db_post_id), + "post_slug": post_slug, + }) + + ctx: dict = { + **p_data, + "base_title": config()["title"], + "container_nav": container_nav, + } + + if p_data["post"].get("is_page"): + ident = current_cart_identity() + summary_params: dict = {"page_slug": post_slug} + if ident["user_id"] is not None: + summary_params["user_id"] = ident["user_id"] + if ident["session_id"] is not None: + summary_params["session_id"] = ident["session_id"] + raw_summary = await fetch_data( + "cart", "cart-summary", params=summary_params, required=False, + ) + page_summary = dto_from_dict(CartSummaryDTO, raw_summary) if raw_summary else CartSummaryDTO() + ctx["page_cart_count"] = ( + page_summary.count + page_summary.calendar_count + page_summary.ticket_count + ) + ctx["page_cart_total"] = float( + page_summary.total + page_summary.calendar_total + page_summary.ticket_total + ) + + _add_to_defpage_ctx(**ctx) + + +# --------------------------------------------------------------------------- +# Rendering helpers (moved from sx_components) +# --------------------------------------------------------------------------- + +def _raw_html_sx(html: str) -> str: + """Wrap raw HTML in (raw! "...") so it's valid inside sx source.""" + from shared.sx.parser import serialize as sx_serialize + if not html: + return "" + return "(raw! " + sx_serialize(html) + ")" + + +# --------------------------------------------------------------------------- +# Page helpers (async functions available in .sx defpage expressions) +# --------------------------------------------------------------------------- + +def _register_blog_helpers() -> None: + from shared.sx.pages import register_page_helpers + register_page_helpers("blog", { + "editor-content": _h_editor_content, + "editor-page-content": _h_editor_page_content, + "post-admin-content": _h_post_admin_content, + "post-data-content": _h_post_data_content, + "post-preview-content": _h_post_preview_content, + "post-entries-content": _h_post_entries_content, + "post-settings-content": _h_post_settings_content, + "post-edit-content": _h_post_edit_content, + }) + + +# --- Editor helpers --- + +async def _h_editor_content(**kw): + from .renders import render_editor_panel + return await render_editor_panel() + + +async def _h_editor_page_content(**kw): + from .renders import render_editor_panel + return await render_editor_panel(is_page=True) + + +# --- Post admin helpers --- + +async def _h_post_admin_content(slug=None, **kw): + await _ensure_post_data(slug) + return '(div :class "pb-8")' + + +async def _h_post_data_content(slug=None, **kw): + await _ensure_post_data(slug) + from quart import g + from markupsafe import escape as esc + + original_post = getattr(g, "post_data", {}).get("original_post") + if original_post is None: + return _raw_html_sx('
No post data available.
') + + tablename = getattr(original_post, "__tablename__", "?") + + def _render_scalar_table(obj): + rows = [] + for col in obj.__mapper__.columns: + key = col.key + if key == "_sa_instance_state": + continue + val = getattr(obj, key, None) + if val is None: + val_html = '\u2014' + elif hasattr(val, "isoformat"): + val_html = f'
{esc(val.isoformat())}
' + elif isinstance(val, str): + val_html = f'
{esc(val)}
' + else: + val_html = f'
{esc(str(val))}
' + rows.append( + f'' + f'{esc(key)}' + f'{val_html}' + ) + return ( + '
' + '' + '' + '' + '' + '' + "".join(rows) + '
FieldValue
' + ) + + def _render_model(obj, depth=0, max_depth=2): + parts = [_render_scalar_table(obj)] + rel_parts = [] + for rel in obj.__mapper__.relationships: + rel_name = rel.key + loaded = rel_name in obj.__dict__ + value = getattr(obj, rel_name, None) if loaded else None + cardinality = "many" if rel.uselist else "one" + cls_name = rel.mapper.class_.__name__ + loaded_label = "" if loaded else " \u2022 not loaded" + + inner = "" + if value is None: + inner = '\u2014' + elif rel.uselist: + items = list(value) if value else [] + inner = f'
{len(items)} item{"" if len(items) == 1 else "s"}
' + if items and depth < max_depth: + sub_rows = [] + for i, it in enumerate(items, 1): + ident_parts = [] + for k in ("id", "ghost_id", "uuid", "slug", "name", "title"): + if k in it.__mapper__.c: + v = getattr(it, k, "") + ident_parts.append(f"{k}={v}") + summary = " \u2022 ".join(ident_parts) if ident_parts else str(it) + child_html = "" + if depth < max_depth: + child_html = f'
{_render_model(it, depth + 1, max_depth)}
' + else: + child_html = '
\u2026max depth reached\u2026
' + sub_rows.append( + f'' + f'{i}' + f'
{esc(summary)}
{child_html}' + ) + inner += ( + '
' + '' + '' + '' + + "".join(sub_rows) + '
#Summary
' + ) + else: + child = value + ident_parts = [] + for k in ("id", "ghost_id", "uuid", "slug", "name", "title"): + if k in child.__mapper__.c: + v = getattr(child, k, "") + ident_parts.append(f"{k}={v}") + summary = " \u2022 ".join(ident_parts) if ident_parts else str(child) + inner = f'
{esc(summary)}
' + if depth < max_depth: + inner += f'
{_render_model(child, depth + 1, max_depth)}
' + else: + inner += '
\u2026max depth reached\u2026
' + + rel_parts.append( + f'
' + f'
' + f'Relationship: {esc(rel_name)}' + f' {cardinality} \u2192 {esc(cls_name)}{loaded_label}
' + f'
{inner}
' + ) + if rel_parts: + parts.append('
' + "".join(rel_parts) + '
') + return '
' + "".join(parts) + '
' + + html = ( + f'
' + f'
Model: Post \u2022 Table: {esc(tablename)}
' + f'{_render_model(original_post, 0, 2)}
' + ) + return _raw_html_sx(html) + + +async def _h_post_preview_content(slug=None, **kw): + await _ensure_post_data(slug) + from quart import g + from shared.services.registry import services + from shared.sx.helpers import render_to_sx + from shared.sx.parser import SxExpr, serialize as sx_serialize + + preview = await services.blog_page.preview_data(g.s) + + sections: list[str] = [] + if preview.get("sx_pretty"): + sections.append(await render_to_sx("blog-preview-section", + title="S-Expression Source", content=SxExpr(preview["sx_pretty"]))) + if preview.get("json_pretty"): + sections.append(await render_to_sx("blog-preview-section", + title="Lexical JSON", content=SxExpr(preview["json_pretty"]))) + if preview.get("sx_rendered"): + rendered_sx = f'(div :class "blog-content prose max-w-none" (raw! {sx_serialize(preview["sx_rendered"])}))' + sections.append(await render_to_sx("blog-preview-section", + title="SX Rendered", content=SxExpr(rendered_sx))) + if preview.get("lex_rendered"): + rendered_sx = f'(div :class "blog-content prose max-w-none" (raw! {sx_serialize(preview["lex_rendered"])}))' + sections.append(await render_to_sx("blog-preview-section", + title="Lexical Rendered", content=SxExpr(rendered_sx))) + + if not sections: + return '(div :class "p-8 text-stone-500" "No content to preview.")' + + inner = " ".join(sections) + return await render_to_sx("blog-preview-panel", sections=SxExpr(f"(<> {inner})")) + + +async def _h_post_entries_content(slug=None, **kw): + await _ensure_post_data(slug) + from quart import g, url_for as qurl + from sqlalchemy import select + from markupsafe import escape as esc + from shared.models.calendars import Calendar + from shared.utils import host_url + from bp.post.services.entry_associations import get_post_entry_ids + from bp.post.admin.routes import _render_associated_entries + + post_id = g.post_data["post"]["id"] + post_slug = g.post_data["post"]["slug"] + associated_entry_ids = await get_post_entry_ids(post_id) + result = await g.s.execute( + select(Calendar) + .where(Calendar.deleted_at.is_(None)) + .order_by(Calendar.name.asc()) + ) + all_calendars = result.scalars().all() + for calendar in all_calendars: + await g.s.refresh(calendar, ["entries", "post"]) + + # Associated entries list + assoc_html = await _render_associated_entries(all_calendars, associated_entry_ids, post_slug) + + # Calendar browser + cal_items: list[str] = [] + for cal in all_calendars: + cal_post = getattr(cal, "post", None) + cal_fi = getattr(cal_post, "feature_image", None) if cal_post else None + cal_title = esc(getattr(cal_post, "title", "")) if cal_post else "" + cal_name = esc(getattr(cal, "name", "")) + cal_view_url = host_url(qurl("blog.post.admin.calendar_view", slug=post_slug, calendar_id=cal.id)) + + img_html = ( + f'{cal_title}' + if cal_fi else + '
' + ) + cal_items.append( + f'
' + f'' + f'{img_html}' + f'
' + f'
{cal_name}
' + f'
{cal_title}
' + f'
' + f'
' + f'
Loading calendar...
' + f'
' + ) + + if cal_items: + browser_html = ( + '

Browse Calendars

' + + "".join(cal_items) + '
' + ) + else: + browser_html = '

Browse Calendars

No calendars found.
' + + return ( + _raw_html_sx('
') + + assoc_html + + _raw_html_sx(browser_html + '
') + ) + + +async def _h_post_settings_content(slug=None, **kw): + await _ensure_post_data(slug) + from quart import g, request + from markupsafe import escape as esc + from models.ghost_content import Post + from sqlalchemy import select as sa_select + from sqlalchemy.orm import selectinload + from shared.browser.app.csrf import generate_csrf_token + from bp.post.admin.routes import _post_to_edit_dict + + post_id = g.post_data["post"]["id"] + post = (await g.s.execute( + sa_select(Post) + .where(Post.id == post_id) + .options(selectinload(Post.tags)) + )).scalar_one_or_none() + ghost_post = _post_to_edit_dict(post) if post else {} + save_success = request.args.get("saved") == "1" + csrf = generate_csrf_token() + + p = g.post_data.get("post", {}) if hasattr(g, "post_data") else {} + is_page = p.get("is_page", False) + gp = ghost_post + + def field_label(text, field_for=None): + for_attr = f' for="{field_for}"' if field_for else '' + return f'{esc(text)}' + + input_cls = ('w-full text-[14px] rounded-[6px] border border-stone-200 px-[10px] py-[7px] ' + 'bg-white text-stone-700 placeholder:text-stone-300 ' + 'focus:outline-none focus:border-stone-400 focus:ring-1 focus:ring-stone-300') + textarea_cls = input_cls + ' resize-y' + + def text_input(name, value='', placeholder='', input_type='text', maxlength=None): + ml = f' maxlength="{maxlength}"' if maxlength else '' + return (f'') + + def textarea_input(name, value='', placeholder='', rows=3, maxlength=None): + ml = f' maxlength="{maxlength}"' if maxlength else '' + return (f'') + + def checkbox_input(name, checked=False, label=''): + chk = ' checked' if checked else '' + return (f'') + + def section(title, content, is_open=False): + open_attr = ' open' if is_open else '' + return (f'
' + f'{esc(title)}' + f'
{content}
') + + # General section + slug_placeholder = 'page-slug' if is_page else 'post-slug' + pub_at = gp.get("published_at") or "" + pub_at_val = pub_at[:16] if pub_at else "" + vis = gp.get("visibility") or "public" + vis_opts = "".join( + f'' + for v, l in [("public", "Public"), ("members", "Members"), ("paid", "Paid")] + ) + + general = ( + f'
{field_label("Slug", "settings-slug")}{text_input("slug", gp.get("slug") or "", slug_placeholder)}
' + f'
{field_label("Published at", "settings-published_at")}' + f'
' + f'
{checkbox_input("featured", gp.get("featured"), "Featured page" if is_page else "Featured post")}
' + f'
{field_label("Visibility", "settings-visibility")}' + f'
' + f'
{checkbox_input("email_only", gp.get("email_only"), "Email only")}
' + ) + + # Tags + tags = gp.get("tags") or [] + if tags: + tag_names = ", ".join(getattr(t, "name", t.get("name", "") if isinstance(t, dict) else str(t)) for t in tags) + else: + tag_names = "" + tags_sec = ( + f'
{field_label("Tags (comma-separated)", "settings-tags")}' + f'{text_input("tags", tag_names, "news, updates, featured")}' + f'

Unknown tags will be created automatically.

' + ) + + fi_sec = f'
{field_label("Alt text", "settings-feature_image_alt")}{text_input("feature_image_alt", gp.get("feature_image_alt") or "", "Describe the feature image")}
' + + seo_sec = ( + f'
{field_label("Meta title", "settings-meta_title")}{text_input("meta_title", gp.get("meta_title") or "", "SEO title", maxlength=300)}' + f'

Recommended: 70 characters. Max: 300.

' + f'
{field_label("Meta description", "settings-meta_description")}{textarea_input("meta_description", gp.get("meta_description") or "", "SEO description", rows=2, maxlength=500)}' + f'

Recommended: 156 characters.

' + f'
{field_label("Canonical URL", "settings-canonical_url")}{text_input("canonical_url", gp.get("canonical_url") or "", "https://example.com/original-post", input_type="url")}
' + ) + + og_sec = ( + f'
{field_label("OG title", "settings-og_title")}{text_input("og_title", gp.get("og_title") or "")}
' + f'
{field_label("OG description", "settings-og_description")}{textarea_input("og_description", gp.get("og_description") or "", rows=2)}
' + f'
{field_label("OG image URL", "settings-og_image")}{text_input("og_image", gp.get("og_image") or "", "https://...", input_type="url")}
' + ) + + tw_sec = ( + f'
{field_label("Twitter title", "settings-twitter_title")}{text_input("twitter_title", gp.get("twitter_title") or "")}
' + f'
{field_label("Twitter description", "settings-twitter_description")}{textarea_input("twitter_description", gp.get("twitter_description") or "", rows=2)}
' + f'
{field_label("Twitter image URL", "settings-twitter_image")}{text_input("twitter_image", gp.get("twitter_image") or "", "https://...", input_type="url")}
' + ) + + tmpl_placeholder = 'custom-page.hbs' if is_page else 'custom-post.hbs' + adv_sec = f'
{field_label("Custom template", "settings-custom_template")}{text_input("custom_template", gp.get("custom_template") or "", tmpl_placeholder)}
' + + sections = ( + section("General", general, is_open=True) + + section("Tags", tags_sec) + + section("Feature Image", fi_sec) + + section("SEO / Meta", seo_sec) + + section("Facebook / OpenGraph", og_sec) + + section("X / Twitter", tw_sec) + + section("Advanced", adv_sec) + ) + + saved_html = 'Saved.' if save_success else '' + + html = ( + f'
' + f'' + f'' + f'
{sections}
' + f'
' + f'' + f'{saved_html}
' + ) + return _raw_html_sx(html) + + +async def _h_post_edit_content(slug=None, **kw): + await _ensure_post_data(slug) + import os + from quart import g, request as qrequest, url_for as qurl, current_app + from models.ghost_content import Post + from sqlalchemy import select as sa_select + from sqlalchemy.orm import selectinload + from shared.infrastructure.data_client import fetch_data + from shared.browser.app.csrf import generate_csrf_token + from shared.sx.helpers import render_to_sx + from shared.sx.parser import SxExpr, serialize as sx_serialize + from bp.post.admin.routes import _post_to_edit_dict + + post_id = g.post_data["post"]["id"] + db_post = (await g.s.execute( + sa_select(Post) + .where(Post.id == post_id) + .options(selectinload(Post.tags)) + )).scalar_one_or_none() + ghost_post = _post_to_edit_dict(db_post) if db_post else {} + save_success = qrequest.args.get("saved") == "1" + save_error = qrequest.args.get("error", "") + raw_newsletters = await fetch_data("account", "newsletters", required=False) or [] + from types import SimpleNamespace + newsletters = [SimpleNamespace(**nl) for nl in raw_newsletters] + + csrf = generate_csrf_token() + asset_url_fn = current_app.jinja_env.globals.get("asset_url", lambda p: "") + editor_css = asset_url_fn("scripts/editor.css") + editor_js = asset_url_fn("scripts/editor.js") + sx_editor_js = asset_url_fn("scripts/sx-editor.js") + + upload_image_url = qurl("blog.editor_api.upload_image") + upload_media_url = qurl("blog.editor_api.upload_media") + upload_file_url = qurl("blog.editor_api.upload_file") + oembed_url = qurl("blog.editor_api.oembed_proxy") + snippets_url = qurl("blog.editor_api.list_snippets") + unsplash_key = os.environ.get("UNSPLASH_ACCESS_KEY", "") + + post = g.post_data.get("post", {}) if hasattr(g, "post_data") else {} + is_page = post.get("is_page", False) + + feature_image = ghost_post.get("feature_image") or "" + feature_image_caption = ghost_post.get("feature_image_caption") or "" + title_val = ghost_post.get("title") or "" + excerpt_val = ghost_post.get("custom_excerpt") or "" + updated_at = ghost_post.get("updated_at") or "" + status = ghost_post.get("status") or "draft" + lexical_json = ghost_post.get("lexical") or '{"root":{"children":[{"children":[],"direction":null,"format":"","indent":0,"type":"paragraph","version":1}],"direction":null,"format":"","indent":0,"type":"root","version":1}}' + sx_content = ghost_post.get("sx_content") or "" + has_sx = bool(sx_content) + + already_emailed = bool(ghost_post and ghost_post.get("email") and (ghost_post["email"] if isinstance(ghost_post["email"], dict) else {}).get("status")) + email_obj = ghost_post.get("email") + if email_obj and not isinstance(email_obj, dict): + already_emailed = bool(getattr(email_obj, "status", None)) + + title_placeholder = "Page title..." if is_page else "Post title..." + + # Newsletter options as SX fragment + nl_parts = ['(option :value "" "Select newsletter\u2026")'] + for nl in newsletters: + nl_slug = sx_serialize(getattr(nl, "slug", "")) + nl_name = sx_serialize(getattr(nl, "name", "")) + nl_parts.append(f"(option :value {nl_slug} {nl_name})") + nl_opts_sx = SxExpr("(<> " + " ".join(nl_parts) + ")") + + # Footer extra badges as SX fragment + badge_parts: list[str] = [] + if save_success: + badge_parts.append('(span :class "text-[14px] text-green-600" "Saved.")') + publish_requested = qrequest.args.get("publish_requested") if hasattr(qrequest, 'args') else None + if publish_requested: + badge_parts.append('(span :class "text-[14px] text-blue-600" "Publish requested \u2014 an admin will review.")') + if post.get("publish_requested"): + badge_parts.append('(span :class "inline-block px-2 py-0.5 rounded-full text-xs font-semibold bg-blue-100 text-blue-800" "Publish requested")') + if already_emailed: + nl_name = "" + newsletter = ghost_post.get("newsletter") + if newsletter: + nl_name = getattr(newsletter, "name", "") if not isinstance(newsletter, dict) else newsletter.get("name", "") + suffix = f" to {nl_name}" if nl_name else "" + badge_parts.append(f'(span :class "inline-block px-2 py-0.5 rounded-full text-xs font-semibold bg-green-100 text-green-800" "Emailed{suffix}")') + footer_extra_sx = SxExpr("(<> " + " ".join(badge_parts) + ")") if badge_parts else None + + parts: list[str] = [] + + if save_error: + parts.append(await render_to_sx("blog-editor-error", error=save_error)) + + parts.append(await render_to_sx("blog-editor-edit-form", + csrf=csrf, + updated_at=str(updated_at), + title_val=title_val, + excerpt_val=excerpt_val, + feature_image=feature_image, + feature_image_caption=feature_image_caption, + sx_content_val=sx_content, + lexical_json=lexical_json, + has_sx=has_sx, + title_placeholder=title_placeholder, + status=status, + already_emailed=already_emailed, + newsletter_options=nl_opts_sx, + footer_extra=footer_extra_sx, + )) + + parts.append(await render_to_sx("blog-editor-publish-js", already_emailed=already_emailed)) + parts.append(await render_to_sx("blog-editor-styles", css_href=editor_css)) + parts.append(await render_to_sx("sx-editor-styles")) + + init_js = ( + '(function() {' + " function applyEditorFontSize() {" + " document.documentElement.style.fontSize = '62.5%';" + " document.body.style.fontSize = '1.6rem';" + ' }' + " function restoreDefaultFontSize() {" + " document.documentElement.style.fontSize = '';" + " document.body.style.fontSize = '';" + ' }' + ' applyEditorFontSize();' + " document.body.addEventListener('htmx:beforeSwap', function cleanup(e) {" + " if (e.detail.target && e.detail.target.id === 'main-panel') {" + ' restoreDefaultFontSize();' + " document.body.removeEventListener('htmx:beforeSwap', cleanup);" + ' }' + ' });' + ' function init() {' + " var csrfToken = document.querySelector('input[name=\"csrf_token\"]').value;" + f" var uploadUrl = '{upload_image_url}';" + ' var uploadUrls = {' + ' image: uploadUrl,' + f" media: '{upload_media_url}'," + f" file: '{upload_file_url}'," + ' };' + " var fileInput = document.getElementById('feature-image-file');" + " var addBtn = document.getElementById('feature-image-add-btn');" + " var deleteBtn = document.getElementById('feature-image-delete-btn');" + " var preview = document.getElementById('feature-image-preview');" + " var emptyState = document.getElementById('feature-image-empty');" + " var filledState = document.getElementById('feature-image-filled');" + " var hiddenUrl = document.getElementById('feature-image-input');" + " var hiddenCaption = document.getElementById('feature-image-caption-input');" + " var captionInput = document.getElementById('feature-image-caption');" + " var uploading = document.getElementById('feature-image-uploading');" + ' function showFilled(url) {' + ' preview.src = url; hiddenUrl.value = url;' + " emptyState.classList.add('hidden'); filledState.classList.remove('hidden'); uploading.classList.add('hidden');" + ' }' + ' function showEmpty() {' + " preview.src = ''; hiddenUrl.value = ''; hiddenCaption.value = ''; captionInput.value = '';" + " emptyState.classList.remove('hidden'); filledState.classList.add('hidden'); uploading.classList.add('hidden');" + ' }' + ' function uploadFile(file) {' + " emptyState.classList.add('hidden'); uploading.classList.remove('hidden');" + " var fd = new FormData(); fd.append('file', file);" + " fetch(uploadUrl, { method: 'POST', body: fd, headers: { 'X-CSRFToken': csrfToken } })" + " .then(function(r) { if (!r.ok) throw new Error('Upload failed (' + r.status + ')'); return r.json(); })" + ' .then(function(data) {' + ' var url = data.images && data.images[0] && data.images[0].url;' + " if (url) showFilled(url); else { showEmpty(); alert('Upload succeeded but no image URL returned.'); }" + ' })' + ' .catch(function(e) { showEmpty(); alert(e.message); });' + ' }' + " addBtn.addEventListener('click', function() { fileInput.click(); });" + " preview.addEventListener('click', function() { fileInput.click(); });" + " deleteBtn.addEventListener('click', function(e) { e.stopPropagation(); showEmpty(); });" + " fileInput.addEventListener('change', function() {" + ' if (fileInput.files && fileInput.files[0]) { uploadFile(fileInput.files[0]); fileInput.value = \'\'; }' + ' });' + " captionInput.addEventListener('input', function() { hiddenCaption.value = captionInput.value; });" + " var excerpt = document.querySelector('textarea[name=\"custom_excerpt\"]');" + " function autoResize() { excerpt.style.height = 'auto'; excerpt.style.height = excerpt.scrollHeight + 'px'; }" + " excerpt.addEventListener('input', autoResize); autoResize();" + ' var dataEl = document.getElementById(\'lexical-initial-data\');' + ' var initialJson = dataEl ? dataEl.textContent.trim() : null;' + ' if (initialJson) { var hidden = document.getElementById(\'lexical-json-input\'); if (hidden) hidden.value = initialJson; }' + " window.mountEditor('lexical-editor', {" + ' initialJson: initialJson,' + ' csrfToken: csrfToken,' + ' uploadUrls: uploadUrls,' + f" oembedUrl: '{oembed_url}'," + f" unsplashApiKey: '{unsplash_key}'," + f" snippetsUrl: '{snippets_url}'," + ' });' + " if (typeof SxEditor !== 'undefined') {" + " SxEditor.mount('sx-editor', {" + " initialSx: (document.getElementById('sx-content-input') || {}).value || null," + ' csrfToken: csrfToken,' + ' uploadUrls: uploadUrls,' + f" oembedUrl: '{oembed_url}'," + ' onChange: function(sx) {' + " document.getElementById('sx-content-input').value = sx;" + ' }' + ' });' + ' }' + " document.addEventListener('keydown', function(e) {" + " if ((e.ctrlKey || e.metaKey) && e.key === 's') {" + " e.preventDefault(); document.getElementById('post-edit-form').requestSubmit();" + ' }' + ' });' + ' }' + " if (typeof window.mountEditor === 'function') { init(); }" + ' else { var _t = setInterval(function() {' + " if (typeof window.mountEditor === 'function') { clearInterval(_t); init(); }" + ' }, 50); }' + '})();' + ) + parts.append(await render_to_sx("blog-editor-scripts", + js_src=editor_js, + sx_editor_js_src=sx_editor_js, + init_js=init_js)) + + return await render_to_sx("blog-editor-panel", + parts=SxExpr("(<> " + " ".join(parts) + ")")) diff --git a/blog/sxc/pages/layouts.py b/blog/sxc/pages/layouts.py new file mode 100644 index 0000000..aeb22c6 --- /dev/null +++ b/blog/sxc/pages/layouts.py @@ -0,0 +1,217 @@ +"""Blog layout functions for defpage rendering.""" +from __future__ import annotations + +from typing import Any + + +# --------------------------------------------------------------------------- +# Header helpers (moved from sx_components — thin render_to_sx wrappers) +# --------------------------------------------------------------------------- + +async def _blog_header_sx(ctx: dict, *, oob: bool = False) -> str: + from shared.sx.helpers import render_to_sx + from shared.sx.parser import SxExpr + return await render_to_sx("menu-row-sx", + id="blog-row", level=1, + link_label_content=SxExpr("(div)"), + child_id="blog-header-child", oob=oob) + + +async def _settings_header_sx(ctx: dict, *, oob: bool = False) -> str: + from shared.sx.helpers import render_to_sx + from shared.sx.parser import SxExpr + from quart import url_for as qurl + + settings_href = qurl("settings.defpage_settings_home") + label_sx = await render_to_sx("blog-admin-label") + nav_sx = await _settings_nav_sx(ctx) + + return await render_to_sx("menu-row-sx", + id="root-settings-row", level=1, + link_href=settings_href, + link_label_content=SxExpr(label_sx), + nav=SxExpr(nav_sx) if nav_sx else None, + child_id="root-settings-header-child", oob=oob) + + +async def _settings_nav_sx(ctx: dict) -> str: + from shared.sx.helpers import render_to_sx + return await render_to_sx("blog-settings-nav", + select_colours=ctx.get("select_colours", "")) + + +async def _sub_settings_header_sx(row_id: str, child_id: str, href: str, + icon: str, label: str, ctx: dict, + *, oob: bool = False, nav_sx: str = "") -> str: + from shared.sx.helpers import render_to_sx + from shared.sx.parser import SxExpr + + label_sx = await render_to_sx("blog-sub-settings-label", + icon=f"fa fa-{icon}", label=label) + return await render_to_sx("menu-row-sx", + id=row_id, level=2, + link_href=href, + link_label_content=SxExpr(label_sx), + nav=SxExpr(nav_sx) if nav_sx else None, + child_id=child_id, oob=oob) + + +# --------------------------------------------------------------------------- +# Layouts +# --------------------------------------------------------------------------- + +def _register_blog_layouts() -> None: + from shared.sx.layouts import register_custom_layout + register_custom_layout("blog", _blog_full, _blog_oob) + register_custom_layout("blog-settings", _settings_full, _settings_oob, + mobile_fn=_settings_mobile) + register_custom_layout("blog-cache", _cache_full, _cache_oob) + register_custom_layout("blog-snippets", _snippets_full, _snippets_oob) + register_custom_layout("blog-menu-items", _menu_items_full, _menu_items_oob) + register_custom_layout("blog-tag-groups", _tag_groups_full, _tag_groups_oob) + register_custom_layout("blog-tag-group-edit", + _tag_group_edit_full, _tag_group_edit_oob) + + +# --- Blog layout (root + blog header) --- + +async def _blog_full(ctx: dict, **kw: Any) -> str: + from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env + from shared.sx.parser import SxExpr + return await render_to_sx_with_env("blog-layout-full", _ctx_to_env(ctx), + blog_header=SxExpr(await _blog_header_sx(ctx))) + + +async def _blog_oob(ctx: dict, **kw: Any) -> str: + from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env, oob_header_sx + from shared.sx.parser import SxExpr + rows = await render_to_sx_with_env("blog-layout-full", _ctx_to_env(ctx), + blog_header=SxExpr(await _blog_header_sx(ctx))) + return await oob_header_sx("root-header-child", "blog-header-child", rows) + + +# --- Settings layout (root + settings header) --- + +async def _settings_full(ctx: dict, **kw: Any) -> str: + from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env + from shared.sx.parser import SxExpr + return await render_to_sx_with_env("settings-layout-full", _ctx_to_env(ctx), + settings_header=SxExpr(await _settings_header_sx(ctx))) + + +async def _settings_oob(ctx: dict, **kw: Any) -> str: + from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env, oob_header_sx + from shared.sx.parser import SxExpr + rows = await render_to_sx_with_env("settings-layout-full", _ctx_to_env(ctx), + settings_header=SxExpr(await _settings_header_sx(ctx))) + return await oob_header_sx("root-header-child", "root-settings-header-child", rows) + + +async def _settings_mobile(ctx: dict, **kw: Any) -> str: + return await _settings_nav_sx(ctx) + + +# --- Sub-settings helpers --- + +async def _sub_settings_full(ctx: dict, row_id: str, child_id: str, + endpoint: str, icon: str, label: str) -> str: + from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env + from shared.sx.parser import SxExpr + from quart import url_for as qurl + return await render_to_sx_with_env("sub-settings-layout-full", _ctx_to_env(ctx), + settings_header=SxExpr(await _settings_header_sx(ctx)), + sub_header=SxExpr(await _sub_settings_header_sx( + row_id, child_id, qurl(endpoint), icon, label, ctx))) + + +async def _sub_settings_oob(ctx: dict, row_id: str, child_id: str, + endpoint: str, icon: str, label: str) -> str: + from shared.sx.helpers import oob_header_sx, render_to_sx + from shared.sx.parser import SxExpr + from quart import url_for as qurl + settings_hdr_oob = await _settings_header_sx(ctx, oob=True) + sub_hdr = await _sub_settings_header_sx( + row_id, child_id, qurl(endpoint), icon, label, ctx) + sub_oob = await oob_header_sx("root-settings-header-child", child_id, sub_hdr) + return await render_to_sx("sub-settings-layout-oob", + settings_header_oob=SxExpr(settings_hdr_oob), + sub_header_oob=SxExpr(sub_oob)) + + +# --- Cache --- + +async def _cache_full(ctx: dict, **kw: Any) -> str: + return await _sub_settings_full(ctx, "cache-row", "cache-header-child", + "defpage_cache_page", "refresh", "Cache") + + +async def _cache_oob(ctx: dict, **kw: Any) -> str: + return await _sub_settings_oob(ctx, "cache-row", "cache-header-child", + "defpage_cache_page", "refresh", "Cache") + + +# --- Snippets --- + +async def _snippets_full(ctx: dict, **kw: Any) -> str: + return await _sub_settings_full(ctx, "snippets-row", "snippets-header-child", + "defpage_snippets_page", "puzzle-piece", "Snippets") + + +async def _snippets_oob(ctx: dict, **kw: Any) -> str: + return await _sub_settings_oob(ctx, "snippets-row", "snippets-header-child", + "defpage_snippets_page", "puzzle-piece", "Snippets") + + +# --- Menu Items --- + +async def _menu_items_full(ctx: dict, **kw: Any) -> str: + return await _sub_settings_full(ctx, "menu_items-row", "menu_items-header-child", + "defpage_menu_items_page", "bars", "Menu Items") + + +async def _menu_items_oob(ctx: dict, **kw: Any) -> str: + return await _sub_settings_oob(ctx, "menu_items-row", "menu_items-header-child", + "defpage_menu_items_page", "bars", "Menu Items") + + +# --- Tag Groups --- + +async def _tag_groups_full(ctx: dict, **kw: Any) -> str: + return await _sub_settings_full(ctx, "tag-groups-row", "tag-groups-header-child", + "defpage_tag_groups_page", "tags", "Tag Groups") + + +async def _tag_groups_oob(ctx: dict, **kw: Any) -> str: + return await _sub_settings_oob(ctx, "tag-groups-row", "tag-groups-header-child", + "defpage_tag_groups_page", "tags", "Tag Groups") + + +# --- Tag Group Edit --- + +async def _tag_group_edit_full(ctx: dict, **kw: Any) -> str: + from quart import request, url_for as qurl + from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env + from shared.sx.parser import SxExpr + g_id = (request.view_args or {}).get("id") + return await render_to_sx_with_env("sub-settings-layout-full", _ctx_to_env(ctx), + settings_header=SxExpr(await _settings_header_sx(ctx)), + sub_header=SxExpr(await _sub_settings_header_sx( + "tag-groups-row", "tag-groups-header-child", + qurl("defpage_tag_group_edit", id=g_id), + "tags", "Tag Groups", ctx))) + + +async def _tag_group_edit_oob(ctx: dict, **kw: Any) -> str: + from quart import request, url_for as qurl + from shared.sx.helpers import oob_header_sx, render_to_sx + from shared.sx.parser import SxExpr + g_id = (request.view_args or {}).get("id") + settings_hdr_oob = await _settings_header_sx(ctx, oob=True) + sub_hdr = await _sub_settings_header_sx( + "tag-groups-row", "tag-groups-header-child", + qurl("defpage_tag_group_edit", id=g_id), + "tags", "Tag Groups", ctx) + sub_oob = await oob_header_sx("root-settings-header-child", "tag-groups-header-child", sub_hdr) + return await render_to_sx("sub-settings-layout-oob", + settings_header_oob=SxExpr(settings_hdr_oob), + sub_header_oob=SxExpr(sub_oob)) diff --git a/blog/sxc/pages/renders.py b/blog/sxc/pages/renders.py new file mode 100644 index 0000000..fd08d62 --- /dev/null +++ b/blog/sxc/pages/renders.py @@ -0,0 +1,177 @@ +"""Blog editor panel rendering.""" +from __future__ import annotations + + +async def render_editor_panel(save_error: str | None = None, is_page: bool = False) -> str: + """Build the WYSIWYG editor panel HTML for new post/page creation.""" + import os + from quart import url_for as qurl, current_app + from shared.browser.app.csrf import generate_csrf_token + from shared.sx.helpers import render_to_sx + + csrf = generate_csrf_token() + asset_url_fn = current_app.jinja_env.globals.get("asset_url", lambda p: "") + editor_css = asset_url_fn("scripts/editor.css") + editor_js = asset_url_fn("scripts/editor.js") + sx_editor_js = asset_url_fn("scripts/sx-editor.js") + + upload_image_url = qurl("blog.editor_api.upload_image") + upload_media_url = qurl("blog.editor_api.upload_media") + upload_file_url = qurl("blog.editor_api.upload_file") + oembed_url = qurl("blog.editor_api.oembed_proxy") + snippets_url = qurl("blog.editor_api.list_snippets") + unsplash_key = os.environ.get("UNSPLASH_ACCESS_KEY", "") + + title_placeholder = "Page title..." if is_page else "Post title..." + create_label = "Create Page" if is_page else "Create Post" + + parts: list[str] = [] + + if save_error: + parts.append(await render_to_sx("blog-editor-error", error=str(save_error))) + + parts.append(await render_to_sx("blog-editor-form", + csrf=csrf, title_placeholder=title_placeholder, + create_label=create_label, + )) + + parts.append(await render_to_sx("blog-editor-styles", css_href=editor_css)) + parts.append(await render_to_sx("sx-editor-styles")) + + init_js = ( + "console.log('[EDITOR-DEBUG] init script running');\n" + "(function() {\n" + " console.log('[EDITOR-DEBUG] IIFE entered, mountEditor=', typeof window.mountEditor);\n" + " function init() {\n" + " var csrfToken = document.querySelector('input[name=\"csrf_token\"]').value;\n" + f" var uploadUrl = '{upload_image_url}';\n" + " var uploadUrls = {\n" + " image: uploadUrl,\n" + f" media: '{upload_media_url}',\n" + f" file: '{upload_file_url}',\n" + " };\n" + "\n" + " var fileInput = document.getElementById('feature-image-file');\n" + " var addBtn = document.getElementById('feature-image-add-btn');\n" + " var deleteBtn = document.getElementById('feature-image-delete-btn');\n" + " var preview = document.getElementById('feature-image-preview');\n" + " var emptyState = document.getElementById('feature-image-empty');\n" + " var filledState = document.getElementById('feature-image-filled');\n" + " var hiddenUrl = document.getElementById('feature-image-input');\n" + " var hiddenCaption = document.getElementById('feature-image-caption-input');\n" + " var captionInput = document.getElementById('feature-image-caption');\n" + " var uploading = document.getElementById('feature-image-uploading');\n" + "\n" + " function showFilled(url) {\n" + " preview.src = url;\n" + " hiddenUrl.value = url;\n" + " emptyState.classList.add('hidden');\n" + " filledState.classList.remove('hidden');\n" + " uploading.classList.add('hidden');\n" + " }\n" + "\n" + " function showEmpty() {\n" + " preview.src = '';\n" + " hiddenUrl.value = '';\n" + " hiddenCaption.value = '';\n" + " captionInput.value = '';\n" + " emptyState.classList.remove('hidden');\n" + " filledState.classList.add('hidden');\n" + " uploading.classList.add('hidden');\n" + " }\n" + "\n" + " function uploadFile(file) {\n" + " emptyState.classList.add('hidden');\n" + " uploading.classList.remove('hidden');\n" + " var fd = new FormData();\n" + " fd.append('file', file);\n" + " fetch(uploadUrl, {\n" + " method: 'POST',\n" + " body: fd,\n" + " headers: { 'X-CSRFToken': csrfToken },\n" + " })\n" + " .then(function(r) {\n" + " if (!r.ok) throw new Error('Upload failed (' + r.status + ')');\n" + " return r.json();\n" + " })\n" + " .then(function(data) {\n" + " var url = data.images && data.images[0] && data.images[0].url;\n" + " if (url) showFilled(url);\n" + " else { showEmpty(); alert('Upload succeeded but no image URL returned.'); }\n" + " })\n" + " .catch(function(e) {\n" + " showEmpty();\n" + " alert(e.message);\n" + " });\n" + " }\n" + "\n" + " addBtn.addEventListener('click', function() { fileInput.click(); });\n" + " preview.addEventListener('click', function() { fileInput.click(); });\n" + " deleteBtn.addEventListener('click', function(e) {\n" + " e.stopPropagation();\n" + " showEmpty();\n" + " });\n" + " fileInput.addEventListener('change', function() {\n" + " if (fileInput.files && fileInput.files[0]) {\n" + " uploadFile(fileInput.files[0]);\n" + " fileInput.value = '';\n" + " }\n" + " });\n" + " captionInput.addEventListener('input', function() {\n" + " hiddenCaption.value = captionInput.value;\n" + " });\n" + "\n" + " var excerpt = document.querySelector('textarea[name=\"custom_excerpt\"]');\n" + " function autoResize() {\n" + " excerpt.style.height = 'auto';\n" + " excerpt.style.height = excerpt.scrollHeight + 'px';\n" + " }\n" + " excerpt.addEventListener('input', autoResize);\n" + " autoResize();\n" + "\n" + " window.mountEditor('lexical-editor', {\n" + " initialJson: null,\n" + " csrfToken: csrfToken,\n" + " uploadUrls: uploadUrls,\n" + f" oembedUrl: '{oembed_url}',\n" + f" unsplashApiKey: '{unsplash_key}',\n" + f" snippetsUrl: '{snippets_url}',\n" + " });\n" + "\n" + " if (typeof SxEditor !== 'undefined') {\n" + " SxEditor.mount('sx-editor', {\n" + " initialSx: (document.getElementById('sx-content-input') || {}).value || null,\n" + " csrfToken: csrfToken,\n" + " uploadUrls: uploadUrls,\n" + f" oembedUrl: '{oembed_url}',\n" + " onChange: function(sx) {\n" + " document.getElementById('sx-content-input').value = sx;\n" + " }\n" + " });\n" + " }\n" + "\n" + " document.addEventListener('keydown', function(e) {\n" + " if ((e.ctrlKey || e.metaKey) && e.key === 's') {\n" + " e.preventDefault();\n" + " document.getElementById('post-new-form').requestSubmit();\n" + " }\n" + " });\n" + " }\n" + "\n" + " if (typeof window.mountEditor === 'function') {\n" + " init();\n" + " } else {\n" + " var _t = setInterval(function() {\n" + " if (typeof window.mountEditor === 'function') { clearInterval(_t); init(); }\n" + " }, 50);\n" + " }\n" + "})();\n" + ) + parts.append(await render_to_sx("blog-editor-scripts", + js_src=editor_js, + sx_editor_js_src=sx_editor_js, + init_js=init_js)) + + from shared.sx.parser import SxExpr + return await render_to_sx("blog-editor-panel", + parts=SxExpr("(<> " + " ".join(parts) + ")")) if parts else "" diff --git a/cart/bp/cart/global_routes.py b/cart/bp/cart/global_routes.py index 4a8b855..fa1bebc 100644 --- a/cart/bp/cart/global_routes.py +++ b/cart/bp/cart/global_routes.py @@ -151,7 +151,7 @@ def register(url_prefix: str) -> Blueprint: page_config = await resolve_page_config(g.s, cart, calendar_entries, tickets) except ValueError as e: from shared.sx.page import get_template_context - from sxc.pages import render_checkout_error_page + from sxc.pages.renders import render_checkout_error_page tctx = await get_template_context() html = await render_checkout_error_page(tctx, error=str(e)) return await make_response(html, 400) @@ -208,7 +208,7 @@ def register(url_prefix: str) -> Blueprint: if not hosted_url: from shared.sx.page import get_template_context - from sxc.pages import render_checkout_error_page + from sxc.pages.renders import render_checkout_error_page tctx = await get_template_context() html = await render_checkout_error_page(tctx, error="No hosted checkout URL returned from SumUp.") return await make_response(html, 500) diff --git a/cart/bp/cart/page_routes.py b/cart/bp/cart/page_routes.py index 41e3d28..527cb42 100644 --- a/cart/bp/cart/page_routes.py +++ b/cart/bp/cart/page_routes.py @@ -73,7 +73,7 @@ def register(url_prefix: str) -> Blueprint: if not hosted_url: from shared.sx.page import get_template_context - from sxc.pages import render_checkout_error_page + from sxc.pages.renders import render_checkout_error_page tctx = await get_template_context() html = await render_checkout_error_page(tctx, error="No hosted checkout URL returned from SumUp.") return await make_response(html, 500) diff --git a/cart/bp/order/routes.py b/cart/bp/order/routes.py index 8852ce9..fb6d5ef 100644 --- a/cart/bp/order/routes.py +++ b/cart/bp/order/routes.py @@ -57,7 +57,7 @@ def register() -> Blueprint: if not order: return await make_response("Order not found", 404) from shared.sx.page import get_template_context - from sxc.pages import render_order_page, render_order_oob + from sxc.pages.renders import render_order_page, render_order_oob ctx = await get_template_context() calendar_entries = ctx.get("calendar_entries") @@ -122,7 +122,7 @@ def register() -> Blueprint: if not hosted_url: from shared.sx.page import get_template_context - from sxc.pages import render_checkout_error_page + from sxc.pages.renders import render_checkout_error_page tctx = await get_template_context() html = await render_checkout_error_page(tctx, error="No hosted checkout URL returned from SumUp when trying to reopen payment.", order=order) return await make_response(html, 500) diff --git a/cart/bp/orders/routes.py b/cart/bp/orders/routes.py index aa08254..22150a2 100644 --- a/cart/bp/orders/routes.py +++ b/cart/bp/orders/routes.py @@ -138,7 +138,7 @@ def register(url_prefix: str) -> Blueprint: orders = result.scalars().all() from shared.sx.page import get_template_context - from sxc.pages import ( + from sxc.pages.renders import ( render_orders_page, render_orders_rows, render_orders_oob, diff --git a/cart/bp/page_admin/routes.py b/cart/bp/page_admin/routes.py index 95ae714..934e003 100644 --- a/cart/bp/page_admin/routes.py +++ b/cart/bp/page_admin/routes.py @@ -47,7 +47,7 @@ def register(): g.page_config = SimpleNamespace(**raw_pc) if raw_pc else None from shared.sx.page import get_template_context - from sxc.pages import render_cart_payments_panel + from sxc.pages.renders import render_cart_payments_panel ctx = await get_template_context() html = await render_cart_payments_panel(ctx) return sx_response(html) diff --git a/cart/sxc/pages/__init__.py b/cart/sxc/pages/__init__.py index 06a36f5..439693d 100644 --- a/cart/sxc/pages/__init__.py +++ b/cart/sxc/pages/__init__.py @@ -1,14 +1,10 @@ """Cart defpage setup — registers layouts and loads .sx pages.""" from __future__ import annotations -from typing import Any - -from markupsafe import escape -from shared.sx.parser import SxExpr - def setup_cart_pages() -> None: """Register cart-specific layouts and load page definitions.""" + from .layouts import _register_cart_layouts _register_cart_layouts() _load_cart_page_files() @@ -17,305 +13,3 @@ def _load_cart_page_files() -> None: import os from shared.sx.pages import load_page_dir load_page_dir(os.path.dirname(__file__), "cart") - - -# --------------------------------------------------------------------------- -# Header helpers (still needed by layouts and render functions) -# --------------------------------------------------------------------------- - -def _ensure_post_ctx(ctx: dict, page_post: Any) -> dict: - """Ensure ctx has a 'post' dict from page_post DTO.""" - if ctx.get("post") or not page_post: - return ctx - return {**ctx, "post": { - "id": getattr(page_post, "id", None), - "slug": getattr(page_post, "slug", ""), - "title": getattr(page_post, "title", ""), - "feature_image": getattr(page_post, "feature_image", None), - }} - - -async def _ensure_container_nav(ctx: dict) -> dict: - """Fetch container_nav if not already present.""" - if ctx.get("container_nav"): - return ctx - post = ctx.get("post") or {} - post_id = post.get("id") - if not post_id: - return ctx - slug = post.get("slug", "") - from shared.infrastructure.fragments import fetch_fragments - nav_params = { - "container_type": "page", - "container_id": str(post_id), - "post_slug": slug, - } - events_nav, market_nav = await fetch_fragments([ - ("events", "container-nav", nav_params), - ("market", "container-nav", nav_params), - ], required=False) - return {**ctx, "container_nav": events_nav + market_nav} - - -async def _post_header_sx(ctx: dict, page_post: Any, *, oob: bool = False) -> str: - from shared.sx.helpers import post_header_sx as _shared_post_header_sx - ctx = _ensure_post_ctx(ctx, page_post) - ctx = await _ensure_container_nav(ctx) - return await _shared_post_header_sx(ctx, oob=oob) - - -async def _cart_header_sx(ctx: dict, *, oob: bool = False) -> str: - from shared.sx.helpers import render_to_sx, call_url - return await render_to_sx( - "menu-row-sx", - id="cart-row", level=1, colour="sky", - link_href=call_url(ctx, "cart_url", "/"), - link_label="cart", icon="fa fa-shopping-cart", - child_id="cart-header-child", oob=oob, - ) - - -async def _page_cart_header_sx(ctx: dict, page_post: Any, *, oob: bool = False) -> str: - from shared.sx.helpers import render_to_sx, call_url - slug = page_post.slug if page_post else "" - title = ((page_post.title if page_post else None) or "")[:160] - label_parts = [] - if page_post and page_post.feature_image: - label_parts.append(await render_to_sx("cart-page-label-img", src=page_post.feature_image)) - label_parts.append(f'(span "{escape(title)}")') - label_sx = "(<> " + " ".join(label_parts) + ")" - nav_sx = await render_to_sx("cart-all-carts-link", href=call_url(ctx, "cart_url", "/")) - return await render_to_sx( - "menu-row-sx", - id="page-cart-row", level=2, colour="sky", - link_href=call_url(ctx, "cart_url", f"/{slug}/"), - link_label_content=SxExpr(label_sx), - nav=SxExpr(nav_sx), oob=oob, - ) - - -# --------------------------------------------------------------------------- -# Order serialization helpers -# --------------------------------------------------------------------------- - -def _serialize_order(order: Any) -> dict: - from shared.infrastructure.urls import market_product_url - created = order.created_at.strftime("%-d %b %Y, %H:%M") if order.created_at else "\u2014" - items = [] - if order.items: - for item in order.items: - items.append({ - "product_image": item.product_image, - "product_title": item.product_title or "Unknown product", - "product_id": item.product_id, - "product_slug": item.product_slug, - "product_url": market_product_url(item.product_slug), - "quantity": item.quantity, - "unit_price_formatted": f"{item.unit_price or 0:.2f}", - "currency": item.currency or order.currency or "GBP", - }) - return { - "id": order.id, - "status": order.status or "pending", - "created_at_formatted": created, - "description": order.description or "", - "total_formatted": f"{order.total_amount or 0:.2f}", - "total_amount": float(order.total_amount or 0), - "currency": order.currency or "GBP", - "items": items, - } - - -def _serialize_calendar_entry(e: Any) -> dict: - st = e.state or "" - ds = e.start_at.strftime("%-d %b %Y, %H:%M") if e.start_at else "" - if e.end_at: - ds += f" \u2013 {e.end_at.strftime('%-d %b %Y, %H:%M')}" - return {"name": e.name, "state": st, "date_str": ds, "cost_formatted": f"{e.cost or 0:.2f}"} - - -# --------------------------------------------------------------------------- -# Render functions (called by routes) — delegate header composition to .sx -# --------------------------------------------------------------------------- - -async def render_orders_page(ctx, orders, page, total_pages, search, search_count, url_for_fn, qs_fn): - from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, search_desktop_sx, search_mobile_sx, full_page_sx - from shared.utils import route_prefix - ctx["search"] = search - ctx["search_count"] = search_count - pfx = route_prefix() - list_url = pfx + url_for_fn("orders.list_orders") - detail_url_prefix = pfx + url_for_fn("orders.order.order_detail", order_id=0).rsplit("0/", 1)[0] - order_dicts = [_serialize_order(o) for o in orders] - content = await render_to_sx("orders-list-content", orders=order_dicts, - page=page, total_pages=total_pages, rows_url=list_url, detail_url_prefix=detail_url_prefix) - header_rows = await render_to_sx_with_env("cart-orders-layout-full", _ctx_to_env(ctx), - list_url=list_url, - ) - filt = await render_to_sx("order-list-header", search_mobile=SxExpr(await search_mobile_sx(ctx))) - return await full_page_sx(ctx, header_rows=header_rows, filter=filt, - aside=await search_desktop_sx(ctx), content=content) - - -async def render_orders_rows(ctx, orders, page, total_pages, url_for_fn, qs_fn): - from shared.sx.helpers import render_to_sx - from shared.utils import route_prefix - pfx = route_prefix() - list_url = pfx + url_for_fn("orders.list_orders") - detail_url_prefix = pfx + url_for_fn("orders.order.order_detail", order_id=0).rsplit("0/", 1)[0] - order_dicts = [_serialize_order(o) for o in orders] - parts = [] - for od in order_dicts: - parts.append(await render_to_sx("order-row-pair", order=od, detail_url_prefix=detail_url_prefix)) - next_scroll = "" - if page < total_pages: - next_url = list_url + qs_fn(page=page + 1) - next_scroll = await render_to_sx("infinite-scroll", url=next_url, page=page, - total_pages=total_pages, id_prefix="orders", colspan=5) - else: - next_scroll = await render_to_sx("order-end-row") - return await render_to_sx("cart-orders-rows", - rows=SxExpr("(<> " + " ".join(parts) + ")"), - next_scroll=SxExpr(next_scroll), - ) - - -async def render_orders_oob(ctx, orders, page, total_pages, search, search_count, url_for_fn, qs_fn): - from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, search_desktop_sx, search_mobile_sx, oob_page_sx - from shared.utils import route_prefix - ctx["search"] = search - ctx["search_count"] = search_count - pfx = route_prefix() - list_url = pfx + url_for_fn("orders.list_orders") - detail_url_prefix = pfx + url_for_fn("orders.order.order_detail", order_id=0).rsplit("0/", 1)[0] - order_dicts = [_serialize_order(o) for o in orders] - content = await render_to_sx("orders-list-content", orders=order_dicts, - page=page, total_pages=total_pages, rows_url=list_url, detail_url_prefix=detail_url_prefix) - oobs = await render_to_sx_with_env("cart-orders-layout-oob", _ctx_to_env(ctx, oob=True), - list_url=list_url, - ) - filt = await render_to_sx("order-list-header", search_mobile=SxExpr(await search_mobile_sx(ctx))) - return await oob_page_sx(oobs=oobs, filter=filt, aside=await search_desktop_sx(ctx), content=content) - - -async def render_order_page(ctx, order, calendar_entries, url_for_fn): - from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, full_page_sx - from shared.utils import route_prefix - from shared.browser.app.csrf import generate_csrf_token - pfx = route_prefix() - detail_url = pfx + url_for_fn("orders.order.order_detail", order_id=order.id) - list_url = pfx + url_for_fn("orders.list_orders") - recheck_url = pfx + url_for_fn("orders.order.order_recheck", order_id=order.id) - pay_url = pfx + url_for_fn("orders.order.order_pay", order_id=order.id) - order_data = _serialize_order(order) - cal_data = [_serialize_calendar_entry(e) for e in (calendar_entries or [])] - main = await render_to_sx("order-detail-content", order=order_data, calendar_entries=cal_data) - filt = await render_to_sx("order-detail-filter-content", order=order_data, - list_url=list_url, recheck_url=recheck_url, pay_url=pay_url, csrf=generate_csrf_token()) - header_rows = await render_to_sx_with_env("cart-order-detail-layout-full", _ctx_to_env(ctx), - list_url=list_url, detail_url=detail_url, - order_label=f"Order {order.id}", - ) - return await full_page_sx(ctx, header_rows=header_rows, filter=filt, content=main) - - -async def render_order_oob(ctx, order, calendar_entries, url_for_fn): - from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, oob_page_sx - from shared.utils import route_prefix - from shared.browser.app.csrf import generate_csrf_token - pfx = route_prefix() - detail_url = pfx + url_for_fn("orders.order.order_detail", order_id=order.id) - list_url = pfx + url_for_fn("orders.list_orders") - recheck_url = pfx + url_for_fn("orders.order.order_recheck", order_id=order.id) - pay_url = pfx + url_for_fn("orders.order.order_pay", order_id=order.id) - order_data = _serialize_order(order) - cal_data = [_serialize_calendar_entry(e) for e in (calendar_entries or [])] - main = await render_to_sx("order-detail-content", order=order_data, calendar_entries=cal_data) - filt = await render_to_sx("order-detail-filter-content", order=order_data, - list_url=list_url, recheck_url=recheck_url, pay_url=pay_url, csrf=generate_csrf_token()) - oobs = await render_to_sx_with_env("cart-order-detail-layout-oob", _ctx_to_env(ctx, oob=True), - detail_url=detail_url, - order_label=f"Order {order.id}", - ) - return await oob_page_sx(oobs=oobs, filter=filt, content=main) - - -async def render_checkout_error_page(ctx, error=None, order=None): - from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, full_page_sx - from shared.infrastructure.urls import cart_url - err_msg = error or "Unexpected error while creating the hosted checkout session." - order_sx = await render_to_sx("checkout-error-order-id", oid=f"#{order.id}") if order else None - hdr = await render_to_sx_with_env("layout-root-full", _ctx_to_env(ctx)) - filt = await render_to_sx("checkout-error-header") - content = await render_to_sx("checkout-error-content", msg=err_msg, - order=SxExpr(order_sx) if order_sx else None, back_url=cart_url("/")) - return await full_page_sx(ctx, header_rows=hdr, filter=filt, content=content) - - -async def render_cart_payments_panel(ctx): - from shared.sx.helpers import render_to_sx - page_config = ctx.get("page_config") - pc_data = None - if page_config: - pc_data = { - "sumup_api_key": bool(getattr(page_config, "sumup_api_key", None)), - "sumup_merchant_code": getattr(page_config, "sumup_merchant_code", None) or "", - "sumup_checkout_prefix": getattr(page_config, "sumup_checkout_prefix", None) or "", - } - return await render_to_sx("cart-payments-content", page_config=pc_data) - - -# --------------------------------------------------------------------------- -# Layouts — thin wrappers delegating to .sx defcomps in cart/sx/layouts.sx -# --------------------------------------------------------------------------- - -def _register_cart_layouts() -> None: - from shared.sx.layouts import register_custom_layout - register_custom_layout("cart-page", _cart_page_full, _cart_page_oob) - register_custom_layout("cart-admin", _cart_admin_full, _cart_admin_oob) - - -async def _cart_page_full(ctx: dict, **kw: Any) -> str: - from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env - page_post = ctx.get("page_post") - env = _ctx_to_env(ctx) - return await render_to_sx_with_env("cart-page-layout-full", env, - cart_row=SxExpr(await _cart_header_sx(ctx)), - page_cart_row=SxExpr(await _page_cart_header_sx(ctx, page_post)), - ) - - -async def _cart_page_oob(ctx: dict, **kw: Any) -> str: - from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env, root_header_sx - page_post = ctx.get("page_post") - env = _ctx_to_env(ctx, oob=True) - return await render_to_sx_with_env("cart-page-layout-oob", env, - root_header_oob=SxExpr(await root_header_sx(ctx, oob=True)), - cart_row_oob=SxExpr(await _cart_header_sx(ctx, oob=True)), - page_cart_row=SxExpr(await _page_cart_header_sx(ctx, page_post)), - ) - - -async def _cart_admin_full(ctx: dict, **kw: Any) -> str: - from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env - page_post = ctx.get("page_post") - selected = kw.get("selected", "") - env = _ctx_to_env(ctx) - return await render_to_sx_with_env("cart-admin-layout-full", env, - post_header=SxExpr(await _post_header_sx(ctx, page_post)), - admin_header=SxExpr(await _cart_page_admin_header_sx(ctx, page_post, selected=selected)), - ) - - -async def _cart_admin_oob(ctx: dict, **kw: Any) -> str: - page_post = ctx.get("page_post") - selected = kw.get("selected", "") - return await _cart_page_admin_header_sx(ctx, page_post, oob=True, selected=selected) - - -async def _cart_page_admin_header_sx(ctx: dict, page_post: Any, *, oob: bool = False, - selected: str = "") -> str: - from shared.sx.helpers import post_admin_header_sx - slug = page_post.slug if page_post else "" - ctx = _ensure_post_ctx(ctx, page_post) - return await post_admin_header_sx(ctx, slug, oob=oob, selected=selected) diff --git a/cart/sxc/pages/layouts.py b/cart/sxc/pages/layouts.py new file mode 100644 index 0000000..1591129 --- /dev/null +++ b/cart/sxc/pages/layouts.py @@ -0,0 +1,138 @@ +"""Cart layout registration and header builders.""" +from __future__ import annotations + +from typing import Any + +from markupsafe import escape +from shared.sx.parser import SxExpr + + +def _register_cart_layouts() -> None: + from shared.sx.layouts import register_custom_layout + register_custom_layout("cart-page", _cart_page_full, _cart_page_oob) + register_custom_layout("cart-admin", _cart_admin_full, _cart_admin_oob) + + +# --------------------------------------------------------------------------- +# Header helpers +# --------------------------------------------------------------------------- + +def _ensure_post_ctx(ctx: dict, page_post: Any) -> dict: + """Ensure ctx has a 'post' dict from page_post DTO.""" + if ctx.get("post") or not page_post: + return ctx + return {**ctx, "post": { + "id": getattr(page_post, "id", None), + "slug": getattr(page_post, "slug", ""), + "title": getattr(page_post, "title", ""), + "feature_image": getattr(page_post, "feature_image", None), + }} + + +async def _ensure_container_nav(ctx: dict) -> dict: + """Fetch container_nav if not already present.""" + if ctx.get("container_nav"): + return ctx + post = ctx.get("post") or {} + post_id = post.get("id") + if not post_id: + return ctx + slug = post.get("slug", "") + from shared.infrastructure.fragments import fetch_fragments + nav_params = { + "container_type": "page", + "container_id": str(post_id), + "post_slug": slug, + } + events_nav, market_nav = await fetch_fragments([ + ("events", "container-nav", nav_params), + ("market", "container-nav", nav_params), + ], required=False) + return {**ctx, "container_nav": events_nav + market_nav} + + +async def _post_header_sx(ctx: dict, page_post: Any, *, oob: bool = False) -> str: + from shared.sx.helpers import post_header_sx as _shared_post_header_sx + ctx = _ensure_post_ctx(ctx, page_post) + ctx = await _ensure_container_nav(ctx) + return await _shared_post_header_sx(ctx, oob=oob) + + +async def _cart_header_sx(ctx: dict, *, oob: bool = False) -> str: + from shared.sx.helpers import render_to_sx, call_url + return await render_to_sx( + "menu-row-sx", + id="cart-row", level=1, colour="sky", + link_href=call_url(ctx, "cart_url", "/"), + link_label="cart", icon="fa fa-shopping-cart", + child_id="cart-header-child", oob=oob, + ) + + +async def _page_cart_header_sx(ctx: dict, page_post: Any, *, oob: bool = False) -> str: + from shared.sx.helpers import render_to_sx, call_url + slug = page_post.slug if page_post else "" + title = ((page_post.title if page_post else None) or "")[:160] + label_parts = [] + if page_post and page_post.feature_image: + label_parts.append(await render_to_sx("cart-page-label-img", src=page_post.feature_image)) + label_parts.append(f'(span "{escape(title)}")') + label_sx = "(<> " + " ".join(label_parts) + ")" + nav_sx = await render_to_sx("cart-all-carts-link", href=call_url(ctx, "cart_url", "/")) + return await render_to_sx( + "menu-row-sx", + id="page-cart-row", level=2, colour="sky", + link_href=call_url(ctx, "cart_url", f"/{slug}/"), + link_label_content=SxExpr(label_sx), + nav=SxExpr(nav_sx), oob=oob, + ) + + +async def _cart_page_admin_header_sx(ctx: dict, page_post: Any, *, oob: bool = False, + selected: str = "") -> str: + from shared.sx.helpers import post_admin_header_sx + slug = page_post.slug if page_post else "" + ctx = _ensure_post_ctx(ctx, page_post) + return await post_admin_header_sx(ctx, slug, oob=oob, selected=selected) + + +# --------------------------------------------------------------------------- +# Layout functions +# --------------------------------------------------------------------------- + +async def _cart_page_full(ctx: dict, **kw: Any) -> str: + from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env + page_post = ctx.get("page_post") + env = _ctx_to_env(ctx) + return await render_to_sx_with_env("cart-page-layout-full", env, + cart_row=SxExpr(await _cart_header_sx(ctx)), + page_cart_row=SxExpr(await _page_cart_header_sx(ctx, page_post)), + ) + + +async def _cart_page_oob(ctx: dict, **kw: Any) -> str: + from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env, root_header_sx + page_post = ctx.get("page_post") + env = _ctx_to_env(ctx, oob=True) + return await render_to_sx_with_env("cart-page-layout-oob", env, + root_header_oob=SxExpr(await root_header_sx(ctx, oob=True)), + cart_row_oob=SxExpr(await _cart_header_sx(ctx, oob=True)), + page_cart_row=SxExpr(await _page_cart_header_sx(ctx, page_post)), + ) + + +async def _cart_admin_full(ctx: dict, **kw: Any) -> str: + from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env + page_post = ctx.get("page_post") + selected = kw.get("selected", "") + env = _ctx_to_env(ctx) + return await render_to_sx_with_env("cart-admin-layout-full", env, + post_header=SxExpr(await _post_header_sx(ctx, page_post)), + admin_header=SxExpr(await _cart_page_admin_header_sx(ctx, page_post, selected=selected)), + ) + + +async def _cart_admin_oob(ctx: dict, **kw: Any) -> str: + page_post = ctx.get("page_post") + selected = kw.get("selected", "") + return await _cart_page_admin_header_sx(ctx, page_post, oob=True, selected=selected) diff --git a/cart/sxc/pages/renders.py b/cart/sxc/pages/renders.py new file mode 100644 index 0000000..f53027e --- /dev/null +++ b/cart/sxc/pages/renders.py @@ -0,0 +1,133 @@ +"""Cart render functions — called from bp routes.""" +from __future__ import annotations + +from shared.sx.parser import SxExpr + +from .utils import _serialize_order, _serialize_calendar_entry + + +async def render_orders_page(ctx, orders, page, total_pages, search, search_count, url_for_fn, qs_fn): + from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, search_desktop_sx, search_mobile_sx, full_page_sx + from shared.utils import route_prefix + ctx["search"] = search + ctx["search_count"] = search_count + pfx = route_prefix() + list_url = pfx + url_for_fn("orders.list_orders") + detail_url_prefix = pfx + url_for_fn("orders.order.order_detail", order_id=0).rsplit("0/", 1)[0] + order_dicts = [_serialize_order(o) for o in orders] + content = await render_to_sx("orders-list-content", orders=order_dicts, + page=page, total_pages=total_pages, rows_url=list_url, detail_url_prefix=detail_url_prefix) + header_rows = await render_to_sx_with_env("cart-orders-layout-full", _ctx_to_env(ctx), + list_url=list_url, + ) + filt = await render_to_sx("order-list-header", search_mobile=SxExpr(await search_mobile_sx(ctx))) + return await full_page_sx(ctx, header_rows=header_rows, filter=filt, + aside=await search_desktop_sx(ctx), content=content) + + +async def render_orders_rows(ctx, orders, page, total_pages, url_for_fn, qs_fn): + from shared.sx.helpers import render_to_sx + from shared.utils import route_prefix + pfx = route_prefix() + list_url = pfx + url_for_fn("orders.list_orders") + detail_url_prefix = pfx + url_for_fn("orders.order.order_detail", order_id=0).rsplit("0/", 1)[0] + order_dicts = [_serialize_order(o) for o in orders] + parts = [] + for od in order_dicts: + parts.append(await render_to_sx("order-row-pair", order=od, detail_url_prefix=detail_url_prefix)) + next_scroll = "" + if page < total_pages: + next_url = list_url + qs_fn(page=page + 1) + next_scroll = await render_to_sx("infinite-scroll", url=next_url, page=page, + total_pages=total_pages, id_prefix="orders", colspan=5) + else: + next_scroll = await render_to_sx("order-end-row") + return await render_to_sx("cart-orders-rows", + rows=SxExpr("(<> " + " ".join(parts) + ")"), + next_scroll=SxExpr(next_scroll), + ) + + +async def render_orders_oob(ctx, orders, page, total_pages, search, search_count, url_for_fn, qs_fn): + from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, search_desktop_sx, search_mobile_sx, oob_page_sx + from shared.utils import route_prefix + ctx["search"] = search + ctx["search_count"] = search_count + pfx = route_prefix() + list_url = pfx + url_for_fn("orders.list_orders") + detail_url_prefix = pfx + url_for_fn("orders.order.order_detail", order_id=0).rsplit("0/", 1)[0] + order_dicts = [_serialize_order(o) for o in orders] + content = await render_to_sx("orders-list-content", orders=order_dicts, + page=page, total_pages=total_pages, rows_url=list_url, detail_url_prefix=detail_url_prefix) + oobs = await render_to_sx_with_env("cart-orders-layout-oob", _ctx_to_env(ctx, oob=True), + list_url=list_url, + ) + filt = await render_to_sx("order-list-header", search_mobile=SxExpr(await search_mobile_sx(ctx))) + return await oob_page_sx(oobs=oobs, filter=filt, aside=await search_desktop_sx(ctx), content=content) + + +async def render_order_page(ctx, order, calendar_entries, url_for_fn): + from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, full_page_sx + from shared.utils import route_prefix + from shared.browser.app.csrf import generate_csrf_token + pfx = route_prefix() + detail_url = pfx + url_for_fn("orders.order.order_detail", order_id=order.id) + list_url = pfx + url_for_fn("orders.list_orders") + recheck_url = pfx + url_for_fn("orders.order.order_recheck", order_id=order.id) + pay_url = pfx + url_for_fn("orders.order.order_pay", order_id=order.id) + order_data = _serialize_order(order) + cal_data = [_serialize_calendar_entry(e) for e in (calendar_entries or [])] + main = await render_to_sx("order-detail-content", order=order_data, calendar_entries=cal_data) + filt = await render_to_sx("order-detail-filter-content", order=order_data, + list_url=list_url, recheck_url=recheck_url, pay_url=pay_url, csrf=generate_csrf_token()) + header_rows = await render_to_sx_with_env("cart-order-detail-layout-full", _ctx_to_env(ctx), + list_url=list_url, detail_url=detail_url, + order_label=f"Order {order.id}", + ) + return await full_page_sx(ctx, header_rows=header_rows, filter=filt, content=main) + + +async def render_order_oob(ctx, order, calendar_entries, url_for_fn): + from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, oob_page_sx + from shared.utils import route_prefix + from shared.browser.app.csrf import generate_csrf_token + pfx = route_prefix() + detail_url = pfx + url_for_fn("orders.order.order_detail", order_id=order.id) + list_url = pfx + url_for_fn("orders.list_orders") + recheck_url = pfx + url_for_fn("orders.order.order_recheck", order_id=order.id) + pay_url = pfx + url_for_fn("orders.order.order_pay", order_id=order.id) + order_data = _serialize_order(order) + cal_data = [_serialize_calendar_entry(e) for e in (calendar_entries or [])] + main = await render_to_sx("order-detail-content", order=order_data, calendar_entries=cal_data) + filt = await render_to_sx("order-detail-filter-content", order=order_data, + list_url=list_url, recheck_url=recheck_url, pay_url=pay_url, csrf=generate_csrf_token()) + oobs = await render_to_sx_with_env("cart-order-detail-layout-oob", _ctx_to_env(ctx, oob=True), + detail_url=detail_url, + order_label=f"Order {order.id}", + ) + return await oob_page_sx(oobs=oobs, filter=filt, content=main) + + +async def render_checkout_error_page(ctx, error=None, order=None): + from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, full_page_sx + from shared.infrastructure.urls import cart_url + err_msg = error or "Unexpected error while creating the hosted checkout session." + order_sx = await render_to_sx("checkout-error-order-id", oid=f"#{order.id}") if order else None + hdr = await render_to_sx_with_env("layout-root-full", _ctx_to_env(ctx)) + filt = await render_to_sx("checkout-error-header") + content = await render_to_sx("checkout-error-content", msg=err_msg, + order=SxExpr(order_sx) if order_sx else None, back_url=cart_url("/")) + return await full_page_sx(ctx, header_rows=hdr, filter=filt, content=content) + + +async def render_cart_payments_panel(ctx): + from shared.sx.helpers import render_to_sx + page_config = ctx.get("page_config") + pc_data = None + if page_config: + pc_data = { + "sumup_api_key": bool(getattr(page_config, "sumup_api_key", None)), + "sumup_merchant_code": getattr(page_config, "sumup_merchant_code", None) or "", + "sumup_checkout_prefix": getattr(page_config, "sumup_checkout_prefix", None) or "", + } + return await render_to_sx("cart-payments-content", page_config=pc_data) diff --git a/cart/sxc/pages/utils.py b/cart/sxc/pages/utils.py new file mode 100644 index 0000000..f9afe9e --- /dev/null +++ b/cart/sxc/pages/utils.py @@ -0,0 +1,40 @@ +"""Cart page utilities — serializers and formatters.""" +from __future__ import annotations + +from typing import Any + + +def _serialize_order(order: Any) -> dict: + from shared.infrastructure.urls import market_product_url + created = order.created_at.strftime("%-d %b %Y, %H:%M") if order.created_at else "\u2014" + items = [] + if order.items: + for item in order.items: + items.append({ + "product_image": item.product_image, + "product_title": item.product_title or "Unknown product", + "product_id": item.product_id, + "product_slug": item.product_slug, + "product_url": market_product_url(item.product_slug), + "quantity": item.quantity, + "unit_price_formatted": f"{item.unit_price or 0:.2f}", + "currency": item.currency or order.currency or "GBP", + }) + return { + "id": order.id, + "status": order.status or "pending", + "created_at_formatted": created, + "description": order.description or "", + "total_formatted": f"{order.total_amount or 0:.2f}", + "total_amount": float(order.total_amount or 0), + "currency": order.currency or "GBP", + "items": items, + } + + +def _serialize_calendar_entry(e: Any) -> dict: + st = e.state or "" + ds = e.start_at.strftime("%-d %b %Y, %H:%M") if e.start_at else "" + if e.end_at: + ds += f" \u2013 {e.end_at.strftime('%-d %b %Y, %H:%M')}" + return {"name": e.name, "state": st, "date_str": ds, "cost_formatted": f"{e.cost or 0:.2f}"} diff --git a/federation/bp/auth/routes.py b/federation/bp/auth/routes.py index 16139fd..4e25ba4 100644 --- a/federation/bp/auth/routes.py +++ b/federation/bp/auth/routes.py @@ -46,7 +46,7 @@ async def _render_social_auth_page(component: str, title: str, **kwargs) -> str: """Render an auth page with social layout — replaces sx_components helpers.""" from shared.sx.helpers import render_to_sx from shared.sx.page import get_template_context - from sxc.pages import _social_page + from sxc.pages.utils import _social_page ctx = await get_template_context() content = await render_to_sx(component, **{k: v for k, v in kwargs.items() if v}) return await _social_page(ctx, None, content=content, title=title) diff --git a/federation/bp/identity/routes.py b/federation/bp/identity/routes.py index d101c5c..497303e 100644 --- a/federation/bp/identity/routes.py +++ b/federation/bp/identity/routes.py @@ -33,7 +33,7 @@ async def _render_choose_username(*, actor=None, error="", username=""): from shared.sx.helpers import render_to_sx from shared.sx.parser import SxExpr from shared.sx.page import get_template_context - from sxc.pages import _social_page + from sxc.pages.utils import _social_page from markupsafe import escape ctx = await get_template_context() diff --git a/federation/bp/social/routes.py b/federation/bp/social/routes.py index 1967955..56c3582 100644 --- a/federation/bp/social/routes.py +++ b/federation/bp/social/routes.py @@ -95,7 +95,7 @@ def register(url_prefix="/social"): @bp.get("/search/page") async def search_page(): - from sxc.pages import _serialize_remote_actor, _serialize_actor + from sxc.pages.utils import _serialize_remote_actor, _serialize_actor actor = getattr(g, "_social_actor", None) query = request.args.get("q", "").strip() @@ -154,7 +154,7 @@ def register(url_prefix="/social"): async def _actor_card_response(actor, remote_actor_url, is_followed): """Re-render a single actor card after follow/unfollow via HTMX.""" - from sxc.pages import _serialize_remote_actor, _serialize_actor + from sxc.pages.utils import _serialize_remote_actor, _serialize_actor remote_dto = await services.federation.get_or_fetch_remote_actor( g.s, remote_actor_url, @@ -298,7 +298,7 @@ def register(url_prefix="/social"): @bp.get("/following/page") async def following_list_page(): - from sxc.pages import _serialize_remote_actor, _serialize_actor + from sxc.pages.utils import _serialize_remote_actor, _serialize_actor actor = _require_actor() page = request.args.get("page", 1, type=int) @@ -320,7 +320,7 @@ def register(url_prefix="/social"): @bp.get("/followers/page") async def followers_list_page(): - from sxc.pages import _serialize_remote_actor, _serialize_actor + from sxc.pages.utils import _serialize_remote_actor, _serialize_actor actor = _require_actor() page = request.args.get("page", 1, type=int) @@ -387,7 +387,7 @@ def register(url_prefix="/social"): async def _render_timeline_items(items, timeline_type, actor, actor_id=None): """Render timeline pagination items as SX fragment.""" - from sxc.pages import _serialize_timeline_item, _serialize_actor + from sxc.pages.utils import _serialize_timeline_item, _serialize_actor item_dicts = [_serialize_timeline_item(i) for i in items] actor_data = _serialize_actor(actor) diff --git a/federation/sxc/pages/__init__.py b/federation/sxc/pages/__init__.py index b8af21f..983922b 100644 --- a/federation/sxc/pages/__init__.py +++ b/federation/sxc/pages/__init__.py @@ -1,8 +1,6 @@ """Federation defpage setup — registers layouts and loads .sx pages.""" from __future__ import annotations -from typing import Any - def setup_federation_pages() -> None: """Register federation-specific layouts and load page definitions.""" @@ -16,82 +14,7 @@ def _load_federation_page_files() -> None: load_page_dir(os.path.dirname(__file__), "federation") -# --------------------------------------------------------------------------- -# Layouts — .sx defcomps read free variables from env -# --------------------------------------------------------------------------- - def _register_federation_layouts() -> None: from shared.sx.layouts import register_custom_layout + from .utils import _social_full, _social_oob register_custom_layout("social", _social_full, _social_oob) - - -def _actor_data(ctx: dict) -> dict | None: - actor = ctx.get("actor") - if not actor: - return None - from services.federation_page import _serialize_actor - return _serialize_actor(actor) - - -async def _social_full(ctx: dict, **kw: Any) -> str: - from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env - env = _ctx_to_env(ctx) - env["actor"] = kw.get("actor") or _actor_data(ctx) - return await render_to_sx_with_env("social-layout-full", env) - - -async def _social_oob(ctx: dict, **kw: Any) -> str: - from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env - env = _ctx_to_env(ctx, oob=True) - env["actor"] = kw.get("actor") or _actor_data(ctx) - return await render_to_sx_with_env("social-layout-oob", env) - - -# --------------------------------------------------------------------------- -# Helpers still used by route handlers -# --------------------------------------------------------------------------- - -def _serialize_actor(actor) -> dict | None: - """Serialize an actor profile to a dict for sx defcomps.""" - from services.federation_page import _serialize_actor as _impl - return _impl(actor) - - -def _serialize_timeline_item(item) -> dict: - """Serialize a timeline item DTO to a dict for sx defcomps.""" - from services.federation_page import _serialize_timeline_item as _impl - return _impl(item) - - -def _serialize_remote_actor(a) -> dict: - """Serialize a remote actor DTO to a dict for sx defcomps.""" - from services.federation_page import _serialize_remote_actor as _impl - return _impl(a) - - -async def _social_page(ctx: dict, actor, *, content: str, - title: str = "Rose Ash", meta_html: str = "") -> str: - """Build a full social page with social header.""" - from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env, full_page_sx - from markupsafe import escape - - env = _ctx_to_env(ctx) - env["actor"] = _serialize_actor(actor) if actor else None - header_rows = await render_to_sx_with_env("social-layout-full", env) - return await full_page_sx(ctx, header_rows=header_rows, content=content, - meta_html=meta_html or f'{escape(title)}') - - -def _get_actor(): - """Return current user's actor or None.""" - from quart import g - return getattr(g, "_social_actor", None) - - -def _require_actor(): - """Return current user's actor or abort 403.""" - from quart import abort - actor = _get_actor() - if not actor: - abort(403, "You need to choose a federation username first") - return actor diff --git a/federation/sxc/pages/utils.py b/federation/sxc/pages/utils.py new file mode 100644 index 0000000..97b1f01 --- /dev/null +++ b/federation/sxc/pages/utils.py @@ -0,0 +1,71 @@ +"""Federation page utilities — serializers, actor helpers, social page builder.""" +from __future__ import annotations + +from typing import Any + + +def _serialize_actor(actor) -> dict | None: + """Serialize an actor profile to a dict for sx defcomps.""" + from services.federation_page import _serialize_actor as _impl + return _impl(actor) + + +def _serialize_timeline_item(item) -> dict: + """Serialize a timeline item DTO to a dict for sx defcomps.""" + from services.federation_page import _serialize_timeline_item as _impl + return _impl(item) + + +def _serialize_remote_actor(a) -> dict: + """Serialize a remote actor DTO to a dict for sx defcomps.""" + from services.federation_page import _serialize_remote_actor as _impl + return _impl(a) + + +async def _social_page(ctx: dict, actor, *, content: str, + title: str = "Rose Ash", meta_html: str = "") -> str: + """Build a full social page with social header.""" + from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env, full_page_sx + from markupsafe import escape + + env = _ctx_to_env(ctx) + env["actor"] = _serialize_actor(actor) if actor else None + header_rows = await render_to_sx_with_env("social-layout-full", env) + return await full_page_sx(ctx, header_rows=header_rows, content=content, + meta_html=meta_html or f'{escape(title)}') + + +def _get_actor(): + """Return current user's actor or None.""" + from quart import g + return getattr(g, "_social_actor", None) + + +def _require_actor(): + """Return current user's actor or abort 403.""" + from quart import abort + actor = _get_actor() + if not actor: + abort(403, "You need to choose a federation username first") + return actor + + +def _actor_data(ctx: dict) -> dict | None: + actor = ctx.get("actor") + if not actor: + return None + return _serialize_actor(actor) + + +async def _social_full(ctx: dict, **kw: Any) -> str: + from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env + env = _ctx_to_env(ctx) + env["actor"] = kw.get("actor") or _actor_data(ctx) + return await render_to_sx_with_env("social-layout-full", env) + + +async def _social_oob(ctx: dict, **kw: Any) -> str: + from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env + env = _ctx_to_env(ctx, oob=True) + env["actor"] = kw.get("actor") or _actor_data(ctx) + return await render_to_sx_with_env("social-layout-oob", env) diff --git a/test/bp/dashboard/routes.py b/test/bp/dashboard/routes.py index 25fb744..b44a0eb 100644 --- a/test/bp/dashboard/routes.py +++ b/test/bp/dashboard/routes.py @@ -14,7 +14,7 @@ def register(url_prefix: str = "/") -> Blueprint: """Full page dashboard with last results.""" from shared.sx.page import get_template_context from shared.browser.app.csrf import generate_csrf_token - from sxc.pages import render_dashboard_page_sx + from sxc.pages.renders import render_dashboard_page_sx import runner ctx = await get_template_context() @@ -63,12 +63,12 @@ def register(url_prefix: str = "/") -> Blueprint: if is_htmx: # S-expression wire format — sx.js renders client-side from shared.sx.helpers import sx_response - from sxc.pages import test_detail_sx + from sxc.pages.renders import test_detail_sx return sx_response(await test_detail_sx(test)) # Full page render (direct navigation / refresh) from shared.sx.page import get_template_context - from sxc.pages import render_test_detail_page_sx + from sxc.pages.renders import render_test_detail_page_sx ctx = await get_template_context() html = await render_test_detail_page_sx(ctx, test) @@ -78,7 +78,7 @@ def register(url_prefix: str = "/") -> Blueprint: async def results(): """HTMX partial — poll target for results table.""" from shared.browser.app.csrf import generate_csrf_token - from sxc.pages import render_results_partial_sx + from sxc.pages.renders import render_results_partial_sx import runner result = runner.get_results() diff --git a/test/sxc/pages/__init__.py b/test/sxc/pages/__init__.py index 299d9db..3d0ba3a 100644 --- a/test/sxc/pages/__init__.py +++ b/test/sxc/pages/__init__.py @@ -1,145 +1,13 @@ -"""Test service s-expression page components.""" +"""Test service defpage setup.""" from __future__ import annotations -import os -from datetime import datetime -from shared.sx.jinja_bridge import load_service_components -from shared.sx.helpers import render_to_sx, SxExpr, render_to_sx_with_env, _ctx_to_env, full_page_sx - -# Load test-specific .sx components at import time -load_service_components(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) +def setup_test_pages() -> None: + """Load test page definitions.""" + _load_test_page_files() -def _format_time(ts: float | None) -> str: - """Format a unix timestamp for display.""" - if not ts: - return "never" - return datetime.fromtimestamp(ts).strftime("%-d %b %Y, %H:%M:%S") - - -_FILTER_MAP = { - "passed": "passed", - "failed": "failed", - "errors": "error", - "skipped": "skipped", -} - - -def _filter_tests(tests: list[dict], active_filter: str | None, - active_service: str | None) -> list[dict]: - """Filter tests by outcome and/or service.""" - from runner import _service_from_nodeid - filtered = tests - if active_filter and active_filter in _FILTER_MAP: - outcome = _FILTER_MAP[active_filter] - filtered = [t for t in filtered if t["outcome"] == outcome] - if active_service: - filtered = [t for t in filtered if _service_from_nodeid(t["nodeid"]) == active_service] - return filtered - - -def _service_list() -> list[str]: - from runner import _SERVICE_ORDER - return list(_SERVICE_ORDER) - - -def _build_summary_data(result: dict | None, running: bool, csrf: str, - active_filter: str | None) -> dict: - """Prepare summary data dict for the ~test-results-partial defcomp.""" - if running and not result: - return dict(state="running", status="running", passed="0", failed="0", - errors="0", skipped="0", total="0", duration="...", - last_run="in progress", running=True, csrf=csrf, - active_filter=active_filter) - if not result: - return dict(state="no-results", status=None, passed="0", failed="0", - errors="0", skipped="0", total="0", duration="0", - last_run="never", running=running, csrf=csrf, - active_filter=active_filter) - status = "running" if running else result["status"] - return dict( - state="running" if running else "has-results", - status=status, - passed=str(result["passed"]), - failed=str(result["failed"]), - errors=str(result["errors"]), - skipped=str(result.get("skipped", 0)), - total=str(result["total"]), - duration=str(result["duration"]), - last_run=_format_time(result["finished_at"]) if not running else "in progress", - running=running, csrf=csrf, - active_filter=active_filter, - ) - - -async def test_detail_sx(test: dict) -> str: - """Return s-expression wire format for a test detail view.""" - return await render_to_sx("test-detail-section", test=test) - - -async def render_dashboard_page_sx(ctx: dict, result: dict | None, - running: bool, csrf: str, - active_filter: str | None = None, - active_service: str | None = None) -> str: - """Full page: test dashboard (sx wire format).""" - from runner import group_tests_by_service - - summary_data = _build_summary_data(result, running, csrf, active_filter) - sections = [] - has_failures = "false" - if result and not running: - tests = _filter_tests(result.get("tests", []), active_filter, active_service) - if tests: - sections = group_tests_by_service(tests) - has_failures = str(result["failed"] > 0 or result["errors"] > 0).lower() - else: - summary_data["state"] = "empty-filtered" - - inner = await render_to_sx("test-results-partial", - summary_data=summary_data, sections=sections, has_failures=has_failures) - content = await render_to_sx("test-results-wrap", running=running, inner=SxExpr(inner)) - hdr = await render_to_sx_with_env("test-layout-full", _ctx_to_env(ctx), - services=_service_list(), - active_service=active_service, - ) - return await full_page_sx(ctx, header_rows=hdr, content=content) - - -async def render_results_partial_sx(result: dict | None, running: bool, - csrf: str, - active_filter: str | None = None, - active_service: str | None = None) -> str: - """HTMX partial: results section (sx wire format).""" - from runner import group_tests_by_service - - summary_data = _build_summary_data(result, running, csrf, active_filter) - sections = [] - has_failures = "false" - if result and not running: - tests = _filter_tests(result.get("tests", []), active_filter, active_service) - if tests: - sections = group_tests_by_service(tests) - has_failures = str(result["failed"] > 0 or result["errors"] > 0).lower() - else: - summary_data["state"] = "empty-filtered" - - inner = await render_to_sx("test-results-partial", - summary_data=summary_data, sections=sections, has_failures=has_failures) - return await render_to_sx("test-results-wrap", running=running, inner=SxExpr(inner)) - - -async def render_test_detail_page_sx(ctx: dict, test: dict) -> str: - """Full page: test detail (sx wire format).""" - hdr = await render_to_sx_with_env("test-detail-layout-full", _ctx_to_env(ctx), - services=_service_list(), - test_nodeid=test["nodeid"], - test_label=test["nodeid"].rsplit("::", 1)[-1], - ) - content = await render_to_sx("test-detail", - nodeid=test["nodeid"], - outcome=test["outcome"], - duration=str(test["duration"]), - longrepr=test.get("longrepr", ""), - ) - return await full_page_sx(ctx, header_rows=hdr, content=content) +def _load_test_page_files() -> None: + import os + from shared.sx.pages import load_page_dir + load_page_dir(os.path.dirname(__file__), "test") diff --git a/test/sxc/pages/renders.py b/test/sxc/pages/renders.py new file mode 100644 index 0000000..c907963 --- /dev/null +++ b/test/sxc/pages/renders.py @@ -0,0 +1,145 @@ +"""Test service render functions — called from bp routes.""" +from __future__ import annotations + +import os +from datetime import datetime + +from shared.sx.jinja_bridge import load_service_components +from shared.sx.helpers import render_to_sx, SxExpr, render_to_sx_with_env, _ctx_to_env, full_page_sx + +# Load test-specific .sx components at import time +load_service_components(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) + + +def _format_time(ts: float | None) -> str: + """Format a unix timestamp for display.""" + if not ts: + return "never" + return datetime.fromtimestamp(ts).strftime("%-d %b %Y, %H:%M:%S") + + +_FILTER_MAP = { + "passed": "passed", + "failed": "failed", + "errors": "error", + "skipped": "skipped", +} + + +def _filter_tests(tests: list[dict], active_filter: str | None, + active_service: str | None) -> list[dict]: + """Filter tests by outcome and/or service.""" + from runner import _service_from_nodeid + filtered = tests + if active_filter and active_filter in _FILTER_MAP: + outcome = _FILTER_MAP[active_filter] + filtered = [t for t in filtered if t["outcome"] == outcome] + if active_service: + filtered = [t for t in filtered if _service_from_nodeid(t["nodeid"]) == active_service] + return filtered + + +def _service_list() -> list[str]: + from runner import _SERVICE_ORDER + return list(_SERVICE_ORDER) + + +def _build_summary_data(result: dict | None, running: bool, csrf: str, + active_filter: str | None) -> dict: + """Prepare summary data dict for the ~test-results-partial defcomp.""" + if running and not result: + return dict(state="running", status="running", passed="0", failed="0", + errors="0", skipped="0", total="0", duration="...", + last_run="in progress", running=True, csrf=csrf, + active_filter=active_filter) + if not result: + return dict(state="no-results", status=None, passed="0", failed="0", + errors="0", skipped="0", total="0", duration="0", + last_run="never", running=running, csrf=csrf, + active_filter=active_filter) + status = "running" if running else result["status"] + return dict( + state="running" if running else "has-results", + status=status, + passed=str(result["passed"]), + failed=str(result["failed"]), + errors=str(result["errors"]), + skipped=str(result.get("skipped", 0)), + total=str(result["total"]), + duration=str(result["duration"]), + last_run=_format_time(result["finished_at"]) if not running else "in progress", + running=running, csrf=csrf, + active_filter=active_filter, + ) + + +async def test_detail_sx(test: dict) -> str: + """Return s-expression wire format for a test detail view.""" + return await render_to_sx("test-detail-section", test=test) + + +async def render_dashboard_page_sx(ctx: dict, result: dict | None, + running: bool, csrf: str, + active_filter: str | None = None, + active_service: str | None = None) -> str: + """Full page: test dashboard (sx wire format).""" + from runner import group_tests_by_service + + summary_data = _build_summary_data(result, running, csrf, active_filter) + sections = [] + has_failures = "false" + if result and not running: + tests = _filter_tests(result.get("tests", []), active_filter, active_service) + if tests: + sections = group_tests_by_service(tests) + has_failures = str(result["failed"] > 0 or result["errors"] > 0).lower() + else: + summary_data["state"] = "empty-filtered" + + inner = await render_to_sx("test-results-partial", + summary_data=summary_data, sections=sections, has_failures=has_failures) + content = await render_to_sx("test-results-wrap", running=running, inner=SxExpr(inner)) + hdr = await render_to_sx_with_env("test-layout-full", _ctx_to_env(ctx), + services=_service_list(), + active_service=active_service, + ) + return await full_page_sx(ctx, header_rows=hdr, content=content) + + +async def render_results_partial_sx(result: dict | None, running: bool, + csrf: str, + active_filter: str | None = None, + active_service: str | None = None) -> str: + """HTMX partial: results section (sx wire format).""" + from runner import group_tests_by_service + + summary_data = _build_summary_data(result, running, csrf, active_filter) + sections = [] + has_failures = "false" + if result and not running: + tests = _filter_tests(result.get("tests", []), active_filter, active_service) + if tests: + sections = group_tests_by_service(tests) + has_failures = str(result["failed"] > 0 or result["errors"] > 0).lower() + else: + summary_data["state"] = "empty-filtered" + + inner = await render_to_sx("test-results-partial", + summary_data=summary_data, sections=sections, has_failures=has_failures) + return await render_to_sx("test-results-wrap", running=running, inner=SxExpr(inner)) + + +async def render_test_detail_page_sx(ctx: dict, test: dict) -> str: + """Full page: test detail (sx wire format).""" + hdr = await render_to_sx_with_env("test-detail-layout-full", _ctx_to_env(ctx), + services=_service_list(), + test_nodeid=test["nodeid"], + test_label=test["nodeid"].rsplit("::", 1)[-1], + ) + content = await render_to_sx("test-detail", + nodeid=test["nodeid"], + outcome=test["outcome"], + duration=str(test["duration"]), + longrepr=test.get("longrepr", ""), + ) + return await full_page_sx(ctx, header_rows=hdr, content=content)