Merge branch 'worktree-macros-essays' into macros
This commit is contained in:
@@ -229,7 +229,7 @@ def register(url_prefix, title):
|
||||
lexical_doc = json.loads(lexical_raw)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
from shared.sx.page import get_template_context
|
||||
from sxc.pages import render_editor_panel
|
||||
from sxc.pages.renders import render_editor_panel
|
||||
tctx = await get_template_context()
|
||||
tctx["editor_html"] = await render_editor_panel(save_error="Invalid JSON in editor content.")
|
||||
html = await _render_new_post_page(tctx)
|
||||
@@ -238,7 +238,7 @@ def register(url_prefix, title):
|
||||
ok, reason = validate_lexical(lexical_doc)
|
||||
if not ok:
|
||||
from shared.sx.page import get_template_context
|
||||
from sxc.pages import render_editor_panel
|
||||
from sxc.pages.renders import render_editor_panel
|
||||
tctx = await get_template_context()
|
||||
tctx["editor_html"] = await render_editor_panel(save_error=reason)
|
||||
html = await _render_new_post_page(tctx)
|
||||
@@ -285,7 +285,7 @@ def register(url_prefix, title):
|
||||
lexical_doc = json.loads(lexical_raw)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
from shared.sx.page import get_template_context
|
||||
from sxc.pages import render_editor_panel
|
||||
from sxc.pages.renders import render_editor_panel
|
||||
tctx = await get_template_context()
|
||||
tctx["editor_html"] = await render_editor_panel(save_error="Invalid JSON in editor content.", is_page=True)
|
||||
tctx["is_page"] = True
|
||||
@@ -295,7 +295,7 @@ def register(url_prefix, title):
|
||||
ok, reason = validate_lexical(lexical_doc)
|
||||
if not ok:
|
||||
from shared.sx.page import get_template_context
|
||||
from sxc.pages import render_editor_panel
|
||||
from sxc.pages.renders import render_editor_panel
|
||||
tctx = await get_template_context()
|
||||
tctx["editor_html"] = await render_editor_panel(save_error=reason, is_page=True)
|
||||
tctx["is_page"] = True
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
713
blog/sxc/pages/helpers.py
Normal file
713
blog/sxc/pages/helpers.py
Normal file
@@ -0,0 +1,713 @@
|
||||
"""Blog page helpers — async functions available in .sx defpage expressions."""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Shared hydration helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _add_to_defpage_ctx(**kwargs: Any) -> None:
|
||||
from quart import g
|
||||
if not hasattr(g, '_defpage_ctx'):
|
||||
g._defpage_ctx = {}
|
||||
g._defpage_ctx.update(kwargs)
|
||||
|
||||
|
||||
async def _ensure_post_data(slug: str | None) -> None:
|
||||
"""Load post data and set g.post_data + defpage context.
|
||||
|
||||
Replicates post bp's hydrate_post_data + context_processor.
|
||||
"""
|
||||
from quart import g, abort
|
||||
|
||||
if hasattr(g, 'post_data') and g.post_data:
|
||||
await _inject_post_context(g.post_data)
|
||||
return
|
||||
|
||||
if not slug:
|
||||
abort(404)
|
||||
|
||||
from bp.post.services.post_data import post_data
|
||||
|
||||
is_admin = bool((g.get("rights") or {}).get("admin"))
|
||||
p_data = await post_data(slug, g.s, include_drafts=True)
|
||||
if not p_data:
|
||||
abort(404)
|
||||
|
||||
# Draft access control
|
||||
if p_data["post"].get("status") != "published":
|
||||
if is_admin:
|
||||
pass
|
||||
elif g.user and p_data["post"].get("user_id") == g.user.id:
|
||||
pass
|
||||
else:
|
||||
abort(404)
|
||||
|
||||
g.post_data = p_data
|
||||
g.post_slug = slug
|
||||
await _inject_post_context(p_data)
|
||||
|
||||
|
||||
async def _inject_post_context(p_data: dict) -> None:
|
||||
"""Add post context_processor data to defpage context."""
|
||||
from shared.config import config
|
||||
from shared.infrastructure.fragments import fetch_fragment
|
||||
from shared.infrastructure.data_client import fetch_data
|
||||
from shared.contracts.dtos import CartSummaryDTO, dto_from_dict
|
||||
from shared.infrastructure.cart_identity import current_cart_identity
|
||||
|
||||
db_post_id = p_data["post"]["id"]
|
||||
post_slug = p_data["post"]["slug"]
|
||||
|
||||
container_nav = await fetch_fragment("relations", "container-nav", params={
|
||||
"container_type": "page",
|
||||
"container_id": str(db_post_id),
|
||||
"post_slug": post_slug,
|
||||
})
|
||||
|
||||
ctx: dict = {
|
||||
**p_data,
|
||||
"base_title": config()["title"],
|
||||
"container_nav": container_nav,
|
||||
}
|
||||
|
||||
if p_data["post"].get("is_page"):
|
||||
ident = current_cart_identity()
|
||||
summary_params: dict = {"page_slug": post_slug}
|
||||
if ident["user_id"] is not None:
|
||||
summary_params["user_id"] = ident["user_id"]
|
||||
if ident["session_id"] is not None:
|
||||
summary_params["session_id"] = ident["session_id"]
|
||||
raw_summary = await fetch_data(
|
||||
"cart", "cart-summary", params=summary_params, required=False,
|
||||
)
|
||||
page_summary = dto_from_dict(CartSummaryDTO, raw_summary) if raw_summary else CartSummaryDTO()
|
||||
ctx["page_cart_count"] = (
|
||||
page_summary.count + page_summary.calendar_count + page_summary.ticket_count
|
||||
)
|
||||
ctx["page_cart_total"] = float(
|
||||
page_summary.total + page_summary.calendar_total + page_summary.ticket_total
|
||||
)
|
||||
|
||||
_add_to_defpage_ctx(**ctx)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Rendering helpers (moved from sx_components)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _raw_html_sx(html: str) -> str:
|
||||
"""Wrap raw HTML in (raw! "...") so it's valid inside sx source."""
|
||||
from shared.sx.parser import serialize as sx_serialize
|
||||
if not html:
|
||||
return ""
|
||||
return "(raw! " + sx_serialize(html) + ")"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Page helpers (async functions available in .sx defpage expressions)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _register_blog_helpers() -> None:
|
||||
from shared.sx.pages import register_page_helpers
|
||||
register_page_helpers("blog", {
|
||||
"editor-content": _h_editor_content,
|
||||
"editor-page-content": _h_editor_page_content,
|
||||
"post-admin-content": _h_post_admin_content,
|
||||
"post-data-content": _h_post_data_content,
|
||||
"post-preview-content": _h_post_preview_content,
|
||||
"post-entries-content": _h_post_entries_content,
|
||||
"post-settings-content": _h_post_settings_content,
|
||||
"post-edit-content": _h_post_edit_content,
|
||||
})
|
||||
|
||||
|
||||
# --- Editor helpers ---
|
||||
|
||||
async def _h_editor_content(**kw):
|
||||
from .renders import render_editor_panel
|
||||
return await render_editor_panel()
|
||||
|
||||
|
||||
async def _h_editor_page_content(**kw):
|
||||
from .renders import render_editor_panel
|
||||
return await render_editor_panel(is_page=True)
|
||||
|
||||
|
||||
# --- Post admin helpers ---
|
||||
|
||||
async def _h_post_admin_content(slug=None, **kw):
|
||||
await _ensure_post_data(slug)
|
||||
return '(div :class "pb-8")'
|
||||
|
||||
|
||||
async def _h_post_data_content(slug=None, **kw):
|
||||
await _ensure_post_data(slug)
|
||||
from quart import g
|
||||
from markupsafe import escape as esc
|
||||
|
||||
original_post = getattr(g, "post_data", {}).get("original_post")
|
||||
if original_post is None:
|
||||
return _raw_html_sx('<div class="px-4 py-8 text-stone-400">No post data available.</div>')
|
||||
|
||||
tablename = getattr(original_post, "__tablename__", "?")
|
||||
|
||||
def _render_scalar_table(obj):
|
||||
rows = []
|
||||
for col in obj.__mapper__.columns:
|
||||
key = col.key
|
||||
if key == "_sa_instance_state":
|
||||
continue
|
||||
val = getattr(obj, key, None)
|
||||
if val is None:
|
||||
val_html = '<span class="text-neutral-400">\u2014</span>'
|
||||
elif hasattr(val, "isoformat"):
|
||||
val_html = f'<pre class="whitespace-pre-wrap break-words break-all text-xs"><code>{esc(val.isoformat())}</code></pre>'
|
||||
elif isinstance(val, str):
|
||||
val_html = f'<pre class="whitespace-pre-wrap break-words break-all text-xs">{esc(val)}</pre>'
|
||||
else:
|
||||
val_html = f'<pre class="whitespace-pre-wrap break-words break-all text-xs"><code>{esc(str(val))}</code></pre>'
|
||||
rows.append(
|
||||
f'<tr class="border-t border-neutral-200 align-top">'
|
||||
f'<td class="px-3 py-2 whitespace-nowrap text-neutral-600 align-top">{esc(key)}</td>'
|
||||
f'<td class="px-3 py-2 align-top">{val_html}</td></tr>'
|
||||
)
|
||||
return (
|
||||
'<div class="w-full overflow-x-auto sm:overflow-visible">'
|
||||
'<table class="w-full table-fixed text-sm border border-neutral-200 rounded-xl overflow-hidden">'
|
||||
'<thead class="bg-neutral-50/70"><tr>'
|
||||
'<th class="px-3 py-2 text-left font-medium w-40 sm:w-56">Field</th>'
|
||||
'<th class="px-3 py-2 text-left font-medium">Value</th>'
|
||||
'</tr></thead><tbody>' + "".join(rows) + '</tbody></table></div>'
|
||||
)
|
||||
|
||||
def _render_model(obj, depth=0, max_depth=2):
|
||||
parts = [_render_scalar_table(obj)]
|
||||
rel_parts = []
|
||||
for rel in obj.__mapper__.relationships:
|
||||
rel_name = rel.key
|
||||
loaded = rel_name in obj.__dict__
|
||||
value = getattr(obj, rel_name, None) if loaded else None
|
||||
cardinality = "many" if rel.uselist else "one"
|
||||
cls_name = rel.mapper.class_.__name__
|
||||
loaded_label = "" if loaded else " \u2022 <em>not loaded</em>"
|
||||
|
||||
inner = ""
|
||||
if value is None:
|
||||
inner = '<span class="text-neutral-400">\u2014</span>'
|
||||
elif rel.uselist:
|
||||
items = list(value) if value else []
|
||||
inner = f'<div class="text-neutral-500 mb-2">{len(items)} item{"" if len(items) == 1 else "s"}</div>'
|
||||
if items and depth < max_depth:
|
||||
sub_rows = []
|
||||
for i, it in enumerate(items, 1):
|
||||
ident_parts = []
|
||||
for k in ("id", "ghost_id", "uuid", "slug", "name", "title"):
|
||||
if k in it.__mapper__.c:
|
||||
v = getattr(it, k, "")
|
||||
ident_parts.append(f"{k}={v}")
|
||||
summary = " \u2022 ".join(ident_parts) if ident_parts else str(it)
|
||||
child_html = ""
|
||||
if depth < max_depth:
|
||||
child_html = f'<div class="mt-2 pl-3 border-l border-neutral-200">{_render_model(it, depth + 1, max_depth)}</div>'
|
||||
else:
|
||||
child_html = '<div class="mt-1 text-xs text-neutral-500">\u2026max depth reached\u2026</div>'
|
||||
sub_rows.append(
|
||||
f'<tr class="border-t border-neutral-200 align-top">'
|
||||
f'<td class="px-2 py-1 whitespace-nowrap align-top">{i}</td>'
|
||||
f'<td class="px-2 py-1 align-top"><pre class="whitespace-pre-wrap break-words break-all text-xs"><code>{esc(summary)}</code></pre>{child_html}</td></tr>'
|
||||
)
|
||||
inner += (
|
||||
'<div class="w-full overflow-x-auto sm:overflow-visible">'
|
||||
'<table class="w-full table-fixed text-sm border border-neutral-200 rounded-lg overflow-hidden">'
|
||||
'<thead class="bg-neutral-50/70"><tr><th class="px-2 py-1 text-left w-10">#</th>'
|
||||
'<th class="px-2 py-1 text-left">Summary</th></tr></thead><tbody>'
|
||||
+ "".join(sub_rows) + '</tbody></table></div>'
|
||||
)
|
||||
else:
|
||||
child = value
|
||||
ident_parts = []
|
||||
for k in ("id", "ghost_id", "uuid", "slug", "name", "title"):
|
||||
if k in child.__mapper__.c:
|
||||
v = getattr(child, k, "")
|
||||
ident_parts.append(f"{k}={v}")
|
||||
summary = " \u2022 ".join(ident_parts) if ident_parts else str(child)
|
||||
inner = f'<pre class="whitespace-pre-wrap break-words break-all text-xs mb-2"><code>{esc(summary)}</code></pre>'
|
||||
if depth < max_depth:
|
||||
inner += f'<div class="pl-3 border-l border-neutral-200">{_render_model(child, depth + 1, max_depth)}</div>'
|
||||
else:
|
||||
inner += '<div class="text-xs text-neutral-500">\u2026max depth reached\u2026</div>'
|
||||
|
||||
rel_parts.append(
|
||||
f'<div class="rounded-xl border border-neutral-200">'
|
||||
f'<div class="px-3 py-2 bg-neutral-50/70 text-sm font-medium">'
|
||||
f'Relationship: <span class="font-semibold">{esc(rel_name)}</span>'
|
||||
f' <span class="ml-2 text-xs text-neutral-500">{cardinality} \u2192 {esc(cls_name)}{loaded_label}</span></div>'
|
||||
f'<div class="p-3 text-sm">{inner}</div></div>'
|
||||
)
|
||||
if rel_parts:
|
||||
parts.append('<div class="space-y-3">' + "".join(rel_parts) + '</div>')
|
||||
return '<div class="space-y-4">' + "".join(parts) + '</div>'
|
||||
|
||||
html = (
|
||||
f'<div class="px-4 py-8">'
|
||||
f'<div class="mb-6 text-sm text-neutral-500">Model: <code>Post</code> \u2022 Table: <code>{esc(tablename)}</code></div>'
|
||||
f'{_render_model(original_post, 0, 2)}</div>'
|
||||
)
|
||||
return _raw_html_sx(html)
|
||||
|
||||
|
||||
async def _h_post_preview_content(slug=None, **kw):
|
||||
await _ensure_post_data(slug)
|
||||
from quart import g
|
||||
from shared.services.registry import services
|
||||
from shared.sx.helpers import render_to_sx
|
||||
from shared.sx.parser import SxExpr, serialize as sx_serialize
|
||||
|
||||
preview = await services.blog_page.preview_data(g.s)
|
||||
|
||||
sections: list[str] = []
|
||||
if preview.get("sx_pretty"):
|
||||
sections.append(await render_to_sx("blog-preview-section",
|
||||
title="S-Expression Source", content=SxExpr(preview["sx_pretty"])))
|
||||
if preview.get("json_pretty"):
|
||||
sections.append(await render_to_sx("blog-preview-section",
|
||||
title="Lexical JSON", content=SxExpr(preview["json_pretty"])))
|
||||
if preview.get("sx_rendered"):
|
||||
rendered_sx = f'(div :class "blog-content prose max-w-none" (raw! {sx_serialize(preview["sx_rendered"])}))'
|
||||
sections.append(await render_to_sx("blog-preview-section",
|
||||
title="SX Rendered", content=SxExpr(rendered_sx)))
|
||||
if preview.get("lex_rendered"):
|
||||
rendered_sx = f'(div :class "blog-content prose max-w-none" (raw! {sx_serialize(preview["lex_rendered"])}))'
|
||||
sections.append(await render_to_sx("blog-preview-section",
|
||||
title="Lexical Rendered", content=SxExpr(rendered_sx)))
|
||||
|
||||
if not sections:
|
||||
return '(div :class "p-8 text-stone-500" "No content to preview.")'
|
||||
|
||||
inner = " ".join(sections)
|
||||
return await render_to_sx("blog-preview-panel", sections=SxExpr(f"(<> {inner})"))
|
||||
|
||||
|
||||
async def _h_post_entries_content(slug=None, **kw):
|
||||
await _ensure_post_data(slug)
|
||||
from quart import g, url_for as qurl
|
||||
from sqlalchemy import select
|
||||
from markupsafe import escape as esc
|
||||
from shared.models.calendars import Calendar
|
||||
from shared.utils import host_url
|
||||
from bp.post.services.entry_associations import get_post_entry_ids
|
||||
from bp.post.admin.routes import _render_associated_entries
|
||||
|
||||
post_id = g.post_data["post"]["id"]
|
||||
post_slug = g.post_data["post"]["slug"]
|
||||
associated_entry_ids = await get_post_entry_ids(post_id)
|
||||
result = await g.s.execute(
|
||||
select(Calendar)
|
||||
.where(Calendar.deleted_at.is_(None))
|
||||
.order_by(Calendar.name.asc())
|
||||
)
|
||||
all_calendars = result.scalars().all()
|
||||
for calendar in all_calendars:
|
||||
await g.s.refresh(calendar, ["entries", "post"])
|
||||
|
||||
# Associated entries list
|
||||
assoc_html = await _render_associated_entries(all_calendars, associated_entry_ids, post_slug)
|
||||
|
||||
# Calendar browser
|
||||
cal_items: list[str] = []
|
||||
for cal in all_calendars:
|
||||
cal_post = getattr(cal, "post", None)
|
||||
cal_fi = getattr(cal_post, "feature_image", None) if cal_post else None
|
||||
cal_title = esc(getattr(cal_post, "title", "")) if cal_post else ""
|
||||
cal_name = esc(getattr(cal, "name", ""))
|
||||
cal_view_url = host_url(qurl("blog.post.admin.calendar_view", slug=post_slug, calendar_id=cal.id))
|
||||
|
||||
img_html = (
|
||||
f'<img src="{esc(cal_fi)}" alt="{cal_title}" class="w-12 h-12 rounded object-cover flex-shrink-0" />'
|
||||
if cal_fi else
|
||||
'<div class="w-12 h-12 rounded bg-stone-200 flex-shrink-0"></div>'
|
||||
)
|
||||
cal_items.append(
|
||||
f'<details class="border rounded-lg bg-white" data-toggle-group="calendar-browser">'
|
||||
f'<summary class="p-4 cursor-pointer hover:bg-stone-50 flex items-center gap-3">'
|
||||
f'{img_html}'
|
||||
f'<div class="flex-1">'
|
||||
f'<div class="font-semibold flex items-center gap-2"><i class="fa fa-calendar text-stone-500"></i> {cal_name}</div>'
|
||||
f'<div class="text-sm text-stone-600">{cal_title}</div>'
|
||||
f'</div></summary>'
|
||||
f'<div class="p-4 border-t" sx-get="{esc(cal_view_url)}" sx-trigger="intersect once" sx-swap="innerHTML">'
|
||||
f'<div class="text-sm text-stone-400">Loading calendar...</div>'
|
||||
f'</div></details>'
|
||||
)
|
||||
|
||||
if cal_items:
|
||||
browser_html = (
|
||||
'<div class="space-y-3"><h3 class="text-lg font-semibold">Browse Calendars</h3>'
|
||||
+ "".join(cal_items) + '</div>'
|
||||
)
|
||||
else:
|
||||
browser_html = '<div class="space-y-3"><h3 class="text-lg font-semibold">Browse Calendars</h3><div class="text-sm text-stone-400">No calendars found.</div></div>'
|
||||
|
||||
return (
|
||||
_raw_html_sx('<div id="post-entries-content" class="space-y-6 p-4">')
|
||||
+ assoc_html
|
||||
+ _raw_html_sx(browser_html + '</div>')
|
||||
)
|
||||
|
||||
|
||||
async def _h_post_settings_content(slug=None, **kw):
|
||||
await _ensure_post_data(slug)
|
||||
from quart import g, request
|
||||
from markupsafe import escape as esc
|
||||
from models.ghost_content import Post
|
||||
from sqlalchemy import select as sa_select
|
||||
from sqlalchemy.orm import selectinload
|
||||
from shared.browser.app.csrf import generate_csrf_token
|
||||
from bp.post.admin.routes import _post_to_edit_dict
|
||||
|
||||
post_id = g.post_data["post"]["id"]
|
||||
post = (await g.s.execute(
|
||||
sa_select(Post)
|
||||
.where(Post.id == post_id)
|
||||
.options(selectinload(Post.tags))
|
||||
)).scalar_one_or_none()
|
||||
ghost_post = _post_to_edit_dict(post) if post else {}
|
||||
save_success = request.args.get("saved") == "1"
|
||||
csrf = generate_csrf_token()
|
||||
|
||||
p = g.post_data.get("post", {}) if hasattr(g, "post_data") else {}
|
||||
is_page = p.get("is_page", False)
|
||||
gp = ghost_post
|
||||
|
||||
def field_label(text, field_for=None):
|
||||
for_attr = f' for="{field_for}"' if field_for else ''
|
||||
return f'<label{for_attr} class="block text-[13px] font-medium text-stone-500 mb-[4px]">{esc(text)}</label>'
|
||||
|
||||
input_cls = ('w-full text-[14px] rounded-[6px] border border-stone-200 px-[10px] py-[7px] '
|
||||
'bg-white text-stone-700 placeholder:text-stone-300 '
|
||||
'focus:outline-none focus:border-stone-400 focus:ring-1 focus:ring-stone-300')
|
||||
textarea_cls = input_cls + ' resize-y'
|
||||
|
||||
def text_input(name, value='', placeholder='', input_type='text', maxlength=None):
|
||||
ml = f' maxlength="{maxlength}"' if maxlength else ''
|
||||
return (f'<input type="{input_type}" name="{name}" id="settings-{name}" value="{esc(value)}"'
|
||||
f' placeholder="{esc(placeholder)}"{ml} class="{input_cls}">')
|
||||
|
||||
def textarea_input(name, value='', placeholder='', rows=3, maxlength=None):
|
||||
ml = f' maxlength="{maxlength}"' if maxlength else ''
|
||||
return (f'<textarea name="{name}" id="settings-{name}" rows="{rows}"'
|
||||
f' placeholder="{esc(placeholder)}"{ml} class="{textarea_cls}">{esc(value)}</textarea>')
|
||||
|
||||
def checkbox_input(name, checked=False, label=''):
|
||||
chk = ' checked' if checked else ''
|
||||
return (f'<label class="inline-flex items-center gap-[8px] cursor-pointer">'
|
||||
f'<input type="checkbox" name="{name}" id="settings-{name}"{chk}'
|
||||
f' class="rounded border-stone-300 text-stone-600 focus:ring-stone-300">'
|
||||
f'<span class="text-[14px] text-stone-600">{esc(label)}</span></label>')
|
||||
|
||||
def section(title, content, is_open=False):
|
||||
open_attr = ' open' if is_open else ''
|
||||
return (f'<details class="border border-stone-200 rounded-[8px] overflow-hidden"{open_attr}>'
|
||||
f'<summary class="px-[16px] py-[10px] bg-stone-50 text-[14px] font-medium text-stone-600 cursor-pointer select-none hover:bg-stone-100 transition-colors">{esc(title)}</summary>'
|
||||
f'<div class="px-[16px] py-[12px] space-y-[12px]">{content}</div></details>')
|
||||
|
||||
# General section
|
||||
slug_placeholder = 'page-slug' if is_page else 'post-slug'
|
||||
pub_at = gp.get("published_at") or ""
|
||||
pub_at_val = pub_at[:16] if pub_at else ""
|
||||
vis = gp.get("visibility") or "public"
|
||||
vis_opts = "".join(
|
||||
f'<option value="{v}"{" selected" if vis == v else ""}>{l}</option>'
|
||||
for v, l in [("public", "Public"), ("members", "Members"), ("paid", "Paid")]
|
||||
)
|
||||
|
||||
general = (
|
||||
f'<div>{field_label("Slug", "settings-slug")}{text_input("slug", gp.get("slug") or "", slug_placeholder)}</div>'
|
||||
f'<div>{field_label("Published at", "settings-published_at")}'
|
||||
f'<input type="datetime-local" name="published_at" id="settings-published_at" value="{esc(pub_at_val)}" class="{input_cls}"></div>'
|
||||
f'<div>{checkbox_input("featured", gp.get("featured"), "Featured page" if is_page else "Featured post")}</div>'
|
||||
f'<div>{field_label("Visibility", "settings-visibility")}'
|
||||
f'<select name="visibility" id="settings-visibility" class="{input_cls}">{vis_opts}</select></div>'
|
||||
f'<div>{checkbox_input("email_only", gp.get("email_only"), "Email only")}</div>'
|
||||
)
|
||||
|
||||
# Tags
|
||||
tags = gp.get("tags") or []
|
||||
if tags:
|
||||
tag_names = ", ".join(getattr(t, "name", t.get("name", "") if isinstance(t, dict) else str(t)) for t in tags)
|
||||
else:
|
||||
tag_names = ""
|
||||
tags_sec = (
|
||||
f'<div>{field_label("Tags (comma-separated)", "settings-tags")}'
|
||||
f'{text_input("tags", tag_names, "news, updates, featured")}'
|
||||
f'<p class="text-[12px] text-stone-400 mt-[4px]">Unknown tags will be created automatically.</p></div>'
|
||||
)
|
||||
|
||||
fi_sec = f'<div>{field_label("Alt text", "settings-feature_image_alt")}{text_input("feature_image_alt", gp.get("feature_image_alt") or "", "Describe the feature image")}</div>'
|
||||
|
||||
seo_sec = (
|
||||
f'<div>{field_label("Meta title", "settings-meta_title")}{text_input("meta_title", gp.get("meta_title") or "", "SEO title", maxlength=300)}'
|
||||
f'<p class="text-[12px] text-stone-400 mt-[2px]">Recommended: 70 characters. Max: 300.</p></div>'
|
||||
f'<div>{field_label("Meta description", "settings-meta_description")}{textarea_input("meta_description", gp.get("meta_description") or "", "SEO description", rows=2, maxlength=500)}'
|
||||
f'<p class="text-[12px] text-stone-400 mt-[2px]">Recommended: 156 characters.</p></div>'
|
||||
f'<div>{field_label("Canonical URL", "settings-canonical_url")}{text_input("canonical_url", gp.get("canonical_url") or "", "https://example.com/original-post", input_type="url")}</div>'
|
||||
)
|
||||
|
||||
og_sec = (
|
||||
f'<div>{field_label("OG title", "settings-og_title")}{text_input("og_title", gp.get("og_title") or "")}</div>'
|
||||
f'<div>{field_label("OG description", "settings-og_description")}{textarea_input("og_description", gp.get("og_description") or "", rows=2)}</div>'
|
||||
f'<div>{field_label("OG image URL", "settings-og_image")}{text_input("og_image", gp.get("og_image") or "", "https://...", input_type="url")}</div>'
|
||||
)
|
||||
|
||||
tw_sec = (
|
||||
f'<div>{field_label("Twitter title", "settings-twitter_title")}{text_input("twitter_title", gp.get("twitter_title") or "")}</div>'
|
||||
f'<div>{field_label("Twitter description", "settings-twitter_description")}{textarea_input("twitter_description", gp.get("twitter_description") or "", rows=2)}</div>'
|
||||
f'<div>{field_label("Twitter image URL", "settings-twitter_image")}{text_input("twitter_image", gp.get("twitter_image") or "", "https://...", input_type="url")}</div>'
|
||||
)
|
||||
|
||||
tmpl_placeholder = 'custom-page.hbs' if is_page else 'custom-post.hbs'
|
||||
adv_sec = f'<div>{field_label("Custom template", "settings-custom_template")}{text_input("custom_template", gp.get("custom_template") or "", tmpl_placeholder)}</div>'
|
||||
|
||||
sections = (
|
||||
section("General", general, is_open=True)
|
||||
+ section("Tags", tags_sec)
|
||||
+ section("Feature Image", fi_sec)
|
||||
+ section("SEO / Meta", seo_sec)
|
||||
+ section("Facebook / OpenGraph", og_sec)
|
||||
+ section("X / Twitter", tw_sec)
|
||||
+ section("Advanced", adv_sec)
|
||||
)
|
||||
|
||||
saved_html = '<span class="text-[14px] text-green-600">Saved.</span>' if save_success else ''
|
||||
|
||||
html = (
|
||||
f'<form method="post" class="max-w-[640px] mx-auto pb-[48px] px-[16px]">'
|
||||
f'<input type="hidden" name="csrf_token" value="{csrf}">'
|
||||
f'<input type="hidden" name="updated_at" value="{esc(gp.get("updated_at") or "")}">'
|
||||
f'<div class="space-y-[12px] mt-[16px]">{sections}</div>'
|
||||
f'<div class="flex items-center gap-[16px] mt-[24px] pt-[16px] border-t border-stone-200">'
|
||||
f'<button type="submit" class="px-[20px] py-[6px] bg-stone-700 text-white text-[14px] rounded-[8px] hover:bg-stone-800 transition-colors cursor-pointer">Save settings</button>'
|
||||
f'{saved_html}</div></form>'
|
||||
)
|
||||
return _raw_html_sx(html)
|
||||
|
||||
|
||||
async def _h_post_edit_content(slug=None, **kw):
|
||||
await _ensure_post_data(slug)
|
||||
import os
|
||||
from quart import g, request as qrequest, url_for as qurl, current_app
|
||||
from models.ghost_content import Post
|
||||
from sqlalchemy import select as sa_select
|
||||
from sqlalchemy.orm import selectinload
|
||||
from shared.infrastructure.data_client import fetch_data
|
||||
from shared.browser.app.csrf import generate_csrf_token
|
||||
from shared.sx.helpers import render_to_sx
|
||||
from shared.sx.parser import SxExpr, serialize as sx_serialize
|
||||
from bp.post.admin.routes import _post_to_edit_dict
|
||||
|
||||
post_id = g.post_data["post"]["id"]
|
||||
db_post = (await g.s.execute(
|
||||
sa_select(Post)
|
||||
.where(Post.id == post_id)
|
||||
.options(selectinload(Post.tags))
|
||||
)).scalar_one_or_none()
|
||||
ghost_post = _post_to_edit_dict(db_post) if db_post else {}
|
||||
save_success = qrequest.args.get("saved") == "1"
|
||||
save_error = qrequest.args.get("error", "")
|
||||
raw_newsletters = await fetch_data("account", "newsletters", required=False) or []
|
||||
from types import SimpleNamespace
|
||||
newsletters = [SimpleNamespace(**nl) for nl in raw_newsletters]
|
||||
|
||||
csrf = generate_csrf_token()
|
||||
asset_url_fn = current_app.jinja_env.globals.get("asset_url", lambda p: "")
|
||||
editor_css = asset_url_fn("scripts/editor.css")
|
||||
editor_js = asset_url_fn("scripts/editor.js")
|
||||
sx_editor_js = asset_url_fn("scripts/sx-editor.js")
|
||||
|
||||
upload_image_url = qurl("blog.editor_api.upload_image")
|
||||
upload_media_url = qurl("blog.editor_api.upload_media")
|
||||
upload_file_url = qurl("blog.editor_api.upload_file")
|
||||
oembed_url = qurl("blog.editor_api.oembed_proxy")
|
||||
snippets_url = qurl("blog.editor_api.list_snippets")
|
||||
unsplash_key = os.environ.get("UNSPLASH_ACCESS_KEY", "")
|
||||
|
||||
post = g.post_data.get("post", {}) if hasattr(g, "post_data") else {}
|
||||
is_page = post.get("is_page", False)
|
||||
|
||||
feature_image = ghost_post.get("feature_image") or ""
|
||||
feature_image_caption = ghost_post.get("feature_image_caption") or ""
|
||||
title_val = ghost_post.get("title") or ""
|
||||
excerpt_val = ghost_post.get("custom_excerpt") or ""
|
||||
updated_at = ghost_post.get("updated_at") or ""
|
||||
status = ghost_post.get("status") or "draft"
|
||||
lexical_json = ghost_post.get("lexical") or '{"root":{"children":[{"children":[],"direction":null,"format":"","indent":0,"type":"paragraph","version":1}],"direction":null,"format":"","indent":0,"type":"root","version":1}}'
|
||||
sx_content = ghost_post.get("sx_content") or ""
|
||||
has_sx = bool(sx_content)
|
||||
|
||||
already_emailed = bool(ghost_post and ghost_post.get("email") and (ghost_post["email"] if isinstance(ghost_post["email"], dict) else {}).get("status"))
|
||||
email_obj = ghost_post.get("email")
|
||||
if email_obj and not isinstance(email_obj, dict):
|
||||
already_emailed = bool(getattr(email_obj, "status", None))
|
||||
|
||||
title_placeholder = "Page title..." if is_page else "Post title..."
|
||||
|
||||
# Newsletter options as SX fragment
|
||||
nl_parts = ['(option :value "" "Select newsletter\u2026")']
|
||||
for nl in newsletters:
|
||||
nl_slug = sx_serialize(getattr(nl, "slug", ""))
|
||||
nl_name = sx_serialize(getattr(nl, "name", ""))
|
||||
nl_parts.append(f"(option :value {nl_slug} {nl_name})")
|
||||
nl_opts_sx = SxExpr("(<> " + " ".join(nl_parts) + ")")
|
||||
|
||||
# Footer extra badges as SX fragment
|
||||
badge_parts: list[str] = []
|
||||
if save_success:
|
||||
badge_parts.append('(span :class "text-[14px] text-green-600" "Saved.")')
|
||||
publish_requested = qrequest.args.get("publish_requested") if hasattr(qrequest, 'args') else None
|
||||
if publish_requested:
|
||||
badge_parts.append('(span :class "text-[14px] text-blue-600" "Publish requested \u2014 an admin will review.")')
|
||||
if post.get("publish_requested"):
|
||||
badge_parts.append('(span :class "inline-block px-2 py-0.5 rounded-full text-xs font-semibold bg-blue-100 text-blue-800" "Publish requested")')
|
||||
if already_emailed:
|
||||
nl_name = ""
|
||||
newsletter = ghost_post.get("newsletter")
|
||||
if newsletter:
|
||||
nl_name = getattr(newsletter, "name", "") if not isinstance(newsletter, dict) else newsletter.get("name", "")
|
||||
suffix = f" to {nl_name}" if nl_name else ""
|
||||
badge_parts.append(f'(span :class "inline-block px-2 py-0.5 rounded-full text-xs font-semibold bg-green-100 text-green-800" "Emailed{suffix}")')
|
||||
footer_extra_sx = SxExpr("(<> " + " ".join(badge_parts) + ")") if badge_parts else None
|
||||
|
||||
parts: list[str] = []
|
||||
|
||||
if save_error:
|
||||
parts.append(await render_to_sx("blog-editor-error", error=save_error))
|
||||
|
||||
parts.append(await render_to_sx("blog-editor-edit-form",
|
||||
csrf=csrf,
|
||||
updated_at=str(updated_at),
|
||||
title_val=title_val,
|
||||
excerpt_val=excerpt_val,
|
||||
feature_image=feature_image,
|
||||
feature_image_caption=feature_image_caption,
|
||||
sx_content_val=sx_content,
|
||||
lexical_json=lexical_json,
|
||||
has_sx=has_sx,
|
||||
title_placeholder=title_placeholder,
|
||||
status=status,
|
||||
already_emailed=already_emailed,
|
||||
newsletter_options=nl_opts_sx,
|
||||
footer_extra=footer_extra_sx,
|
||||
))
|
||||
|
||||
parts.append(await render_to_sx("blog-editor-publish-js", already_emailed=already_emailed))
|
||||
parts.append(await render_to_sx("blog-editor-styles", css_href=editor_css))
|
||||
parts.append(await render_to_sx("sx-editor-styles"))
|
||||
|
||||
init_js = (
|
||||
'(function() {'
|
||||
" function applyEditorFontSize() {"
|
||||
" document.documentElement.style.fontSize = '62.5%';"
|
||||
" document.body.style.fontSize = '1.6rem';"
|
||||
' }'
|
||||
" function restoreDefaultFontSize() {"
|
||||
" document.documentElement.style.fontSize = '';"
|
||||
" document.body.style.fontSize = '';"
|
||||
' }'
|
||||
' applyEditorFontSize();'
|
||||
" document.body.addEventListener('htmx:beforeSwap', function cleanup(e) {"
|
||||
" if (e.detail.target && e.detail.target.id === 'main-panel') {"
|
||||
' restoreDefaultFontSize();'
|
||||
" document.body.removeEventListener('htmx:beforeSwap', cleanup);"
|
||||
' }'
|
||||
' });'
|
||||
' function init() {'
|
||||
" var csrfToken = document.querySelector('input[name=\"csrf_token\"]').value;"
|
||||
f" var uploadUrl = '{upload_image_url}';"
|
||||
' var uploadUrls = {'
|
||||
' image: uploadUrl,'
|
||||
f" media: '{upload_media_url}',"
|
||||
f" file: '{upload_file_url}',"
|
||||
' };'
|
||||
" var fileInput = document.getElementById('feature-image-file');"
|
||||
" var addBtn = document.getElementById('feature-image-add-btn');"
|
||||
" var deleteBtn = document.getElementById('feature-image-delete-btn');"
|
||||
" var preview = document.getElementById('feature-image-preview');"
|
||||
" var emptyState = document.getElementById('feature-image-empty');"
|
||||
" var filledState = document.getElementById('feature-image-filled');"
|
||||
" var hiddenUrl = document.getElementById('feature-image-input');"
|
||||
" var hiddenCaption = document.getElementById('feature-image-caption-input');"
|
||||
" var captionInput = document.getElementById('feature-image-caption');"
|
||||
" var uploading = document.getElementById('feature-image-uploading');"
|
||||
' function showFilled(url) {'
|
||||
' preview.src = url; hiddenUrl.value = url;'
|
||||
" emptyState.classList.add('hidden'); filledState.classList.remove('hidden'); uploading.classList.add('hidden');"
|
||||
' }'
|
||||
' function showEmpty() {'
|
||||
" preview.src = ''; hiddenUrl.value = ''; hiddenCaption.value = ''; captionInput.value = '';"
|
||||
" emptyState.classList.remove('hidden'); filledState.classList.add('hidden'); uploading.classList.add('hidden');"
|
||||
' }'
|
||||
' function uploadFile(file) {'
|
||||
" emptyState.classList.add('hidden'); uploading.classList.remove('hidden');"
|
||||
" var fd = new FormData(); fd.append('file', file);"
|
||||
" fetch(uploadUrl, { method: 'POST', body: fd, headers: { 'X-CSRFToken': csrfToken } })"
|
||||
" .then(function(r) { if (!r.ok) throw new Error('Upload failed (' + r.status + ')'); return r.json(); })"
|
||||
' .then(function(data) {'
|
||||
' var url = data.images && data.images[0] && data.images[0].url;'
|
||||
" if (url) showFilled(url); else { showEmpty(); alert('Upload succeeded but no image URL returned.'); }"
|
||||
' })'
|
||||
' .catch(function(e) { showEmpty(); alert(e.message); });'
|
||||
' }'
|
||||
" addBtn.addEventListener('click', function() { fileInput.click(); });"
|
||||
" preview.addEventListener('click', function() { fileInput.click(); });"
|
||||
" deleteBtn.addEventListener('click', function(e) { e.stopPropagation(); showEmpty(); });"
|
||||
" fileInput.addEventListener('change', function() {"
|
||||
' if (fileInput.files && fileInput.files[0]) { uploadFile(fileInput.files[0]); fileInput.value = \'\'; }'
|
||||
' });'
|
||||
" captionInput.addEventListener('input', function() { hiddenCaption.value = captionInput.value; });"
|
||||
" var excerpt = document.querySelector('textarea[name=\"custom_excerpt\"]');"
|
||||
" function autoResize() { excerpt.style.height = 'auto'; excerpt.style.height = excerpt.scrollHeight + 'px'; }"
|
||||
" excerpt.addEventListener('input', autoResize); autoResize();"
|
||||
' var dataEl = document.getElementById(\'lexical-initial-data\');'
|
||||
' var initialJson = dataEl ? dataEl.textContent.trim() : null;'
|
||||
' if (initialJson) { var hidden = document.getElementById(\'lexical-json-input\'); if (hidden) hidden.value = initialJson; }'
|
||||
" window.mountEditor('lexical-editor', {"
|
||||
' initialJson: initialJson,'
|
||||
' csrfToken: csrfToken,'
|
||||
' uploadUrls: uploadUrls,'
|
||||
f" oembedUrl: '{oembed_url}',"
|
||||
f" unsplashApiKey: '{unsplash_key}',"
|
||||
f" snippetsUrl: '{snippets_url}',"
|
||||
' });'
|
||||
" if (typeof SxEditor !== 'undefined') {"
|
||||
" SxEditor.mount('sx-editor', {"
|
||||
" initialSx: (document.getElementById('sx-content-input') || {}).value || null,"
|
||||
' csrfToken: csrfToken,'
|
||||
' uploadUrls: uploadUrls,'
|
||||
f" oembedUrl: '{oembed_url}',"
|
||||
' onChange: function(sx) {'
|
||||
" document.getElementById('sx-content-input').value = sx;"
|
||||
' }'
|
||||
' });'
|
||||
' }'
|
||||
" document.addEventListener('keydown', function(e) {"
|
||||
" if ((e.ctrlKey || e.metaKey) && e.key === 's') {"
|
||||
" e.preventDefault(); document.getElementById('post-edit-form').requestSubmit();"
|
||||
' }'
|
||||
' });'
|
||||
' }'
|
||||
" if (typeof window.mountEditor === 'function') { init(); }"
|
||||
' else { var _t = setInterval(function() {'
|
||||
" if (typeof window.mountEditor === 'function') { clearInterval(_t); init(); }"
|
||||
' }, 50); }'
|
||||
'})();'
|
||||
)
|
||||
parts.append(await render_to_sx("blog-editor-scripts",
|
||||
js_src=editor_js,
|
||||
sx_editor_js_src=sx_editor_js,
|
||||
init_js=init_js))
|
||||
|
||||
return await render_to_sx("blog-editor-panel",
|
||||
parts=SxExpr("(<> " + " ".join(parts) + ")"))
|
||||
217
blog/sxc/pages/layouts.py
Normal file
217
blog/sxc/pages/layouts.py
Normal file
@@ -0,0 +1,217 @@
|
||||
"""Blog layout functions for defpage rendering."""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Header helpers (moved from sx_components — thin render_to_sx wrappers)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def _blog_header_sx(ctx: dict, *, oob: bool = False) -> str:
|
||||
from shared.sx.helpers import render_to_sx
|
||||
from shared.sx.parser import SxExpr
|
||||
return await render_to_sx("menu-row-sx",
|
||||
id="blog-row", level=1,
|
||||
link_label_content=SxExpr("(div)"),
|
||||
child_id="blog-header-child", oob=oob)
|
||||
|
||||
|
||||
async def _settings_header_sx(ctx: dict, *, oob: bool = False) -> str:
|
||||
from shared.sx.helpers import render_to_sx
|
||||
from shared.sx.parser import SxExpr
|
||||
from quart import url_for as qurl
|
||||
|
||||
settings_href = qurl("settings.defpage_settings_home")
|
||||
label_sx = await render_to_sx("blog-admin-label")
|
||||
nav_sx = await _settings_nav_sx(ctx)
|
||||
|
||||
return await render_to_sx("menu-row-sx",
|
||||
id="root-settings-row", level=1,
|
||||
link_href=settings_href,
|
||||
link_label_content=SxExpr(label_sx),
|
||||
nav=SxExpr(nav_sx) if nav_sx else None,
|
||||
child_id="root-settings-header-child", oob=oob)
|
||||
|
||||
|
||||
async def _settings_nav_sx(ctx: dict) -> str:
|
||||
from shared.sx.helpers import render_to_sx
|
||||
return await render_to_sx("blog-settings-nav",
|
||||
select_colours=ctx.get("select_colours", ""))
|
||||
|
||||
|
||||
async def _sub_settings_header_sx(row_id: str, child_id: str, href: str,
|
||||
icon: str, label: str, ctx: dict,
|
||||
*, oob: bool = False, nav_sx: str = "") -> str:
|
||||
from shared.sx.helpers import render_to_sx
|
||||
from shared.sx.parser import SxExpr
|
||||
|
||||
label_sx = await render_to_sx("blog-sub-settings-label",
|
||||
icon=f"fa fa-{icon}", label=label)
|
||||
return await render_to_sx("menu-row-sx",
|
||||
id=row_id, level=2,
|
||||
link_href=href,
|
||||
link_label_content=SxExpr(label_sx),
|
||||
nav=SxExpr(nav_sx) if nav_sx else None,
|
||||
child_id=child_id, oob=oob)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Layouts
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _register_blog_layouts() -> None:
|
||||
from shared.sx.layouts import register_custom_layout
|
||||
register_custom_layout("blog", _blog_full, _blog_oob)
|
||||
register_custom_layout("blog-settings", _settings_full, _settings_oob,
|
||||
mobile_fn=_settings_mobile)
|
||||
register_custom_layout("blog-cache", _cache_full, _cache_oob)
|
||||
register_custom_layout("blog-snippets", _snippets_full, _snippets_oob)
|
||||
register_custom_layout("blog-menu-items", _menu_items_full, _menu_items_oob)
|
||||
register_custom_layout("blog-tag-groups", _tag_groups_full, _tag_groups_oob)
|
||||
register_custom_layout("blog-tag-group-edit",
|
||||
_tag_group_edit_full, _tag_group_edit_oob)
|
||||
|
||||
|
||||
# --- Blog layout (root + blog header) ---
|
||||
|
||||
async def _blog_full(ctx: dict, **kw: Any) -> str:
|
||||
from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
|
||||
from shared.sx.parser import SxExpr
|
||||
return await render_to_sx_with_env("blog-layout-full", _ctx_to_env(ctx),
|
||||
blog_header=SxExpr(await _blog_header_sx(ctx)))
|
||||
|
||||
|
||||
async def _blog_oob(ctx: dict, **kw: Any) -> str:
|
||||
from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env, oob_header_sx
|
||||
from shared.sx.parser import SxExpr
|
||||
rows = await render_to_sx_with_env("blog-layout-full", _ctx_to_env(ctx),
|
||||
blog_header=SxExpr(await _blog_header_sx(ctx)))
|
||||
return await oob_header_sx("root-header-child", "blog-header-child", rows)
|
||||
|
||||
|
||||
# --- Settings layout (root + settings header) ---
|
||||
|
||||
async def _settings_full(ctx: dict, **kw: Any) -> str:
|
||||
from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
|
||||
from shared.sx.parser import SxExpr
|
||||
return await render_to_sx_with_env("settings-layout-full", _ctx_to_env(ctx),
|
||||
settings_header=SxExpr(await _settings_header_sx(ctx)))
|
||||
|
||||
|
||||
async def _settings_oob(ctx: dict, **kw: Any) -> str:
|
||||
from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env, oob_header_sx
|
||||
from shared.sx.parser import SxExpr
|
||||
rows = await render_to_sx_with_env("settings-layout-full", _ctx_to_env(ctx),
|
||||
settings_header=SxExpr(await _settings_header_sx(ctx)))
|
||||
return await oob_header_sx("root-header-child", "root-settings-header-child", rows)
|
||||
|
||||
|
||||
async def _settings_mobile(ctx: dict, **kw: Any) -> str:
|
||||
return await _settings_nav_sx(ctx)
|
||||
|
||||
|
||||
# --- Sub-settings helpers ---
|
||||
|
||||
async def _sub_settings_full(ctx: dict, row_id: str, child_id: str,
|
||||
endpoint: str, icon: str, label: str) -> str:
|
||||
from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
|
||||
from shared.sx.parser import SxExpr
|
||||
from quart import url_for as qurl
|
||||
return await render_to_sx_with_env("sub-settings-layout-full", _ctx_to_env(ctx),
|
||||
settings_header=SxExpr(await _settings_header_sx(ctx)),
|
||||
sub_header=SxExpr(await _sub_settings_header_sx(
|
||||
row_id, child_id, qurl(endpoint), icon, label, ctx)))
|
||||
|
||||
|
||||
async def _sub_settings_oob(ctx: dict, row_id: str, child_id: str,
|
||||
endpoint: str, icon: str, label: str) -> str:
|
||||
from shared.sx.helpers import oob_header_sx, render_to_sx
|
||||
from shared.sx.parser import SxExpr
|
||||
from quart import url_for as qurl
|
||||
settings_hdr_oob = await _settings_header_sx(ctx, oob=True)
|
||||
sub_hdr = await _sub_settings_header_sx(
|
||||
row_id, child_id, qurl(endpoint), icon, label, ctx)
|
||||
sub_oob = await oob_header_sx("root-settings-header-child", child_id, sub_hdr)
|
||||
return await render_to_sx("sub-settings-layout-oob",
|
||||
settings_header_oob=SxExpr(settings_hdr_oob),
|
||||
sub_header_oob=SxExpr(sub_oob))
|
||||
|
||||
|
||||
# --- Cache ---
|
||||
|
||||
async def _cache_full(ctx: dict, **kw: Any) -> str:
|
||||
return await _sub_settings_full(ctx, "cache-row", "cache-header-child",
|
||||
"defpage_cache_page", "refresh", "Cache")
|
||||
|
||||
|
||||
async def _cache_oob(ctx: dict, **kw: Any) -> str:
|
||||
return await _sub_settings_oob(ctx, "cache-row", "cache-header-child",
|
||||
"defpage_cache_page", "refresh", "Cache")
|
||||
|
||||
|
||||
# --- Snippets ---
|
||||
|
||||
async def _snippets_full(ctx: dict, **kw: Any) -> str:
|
||||
return await _sub_settings_full(ctx, "snippets-row", "snippets-header-child",
|
||||
"defpage_snippets_page", "puzzle-piece", "Snippets")
|
||||
|
||||
|
||||
async def _snippets_oob(ctx: dict, **kw: Any) -> str:
|
||||
return await _sub_settings_oob(ctx, "snippets-row", "snippets-header-child",
|
||||
"defpage_snippets_page", "puzzle-piece", "Snippets")
|
||||
|
||||
|
||||
# --- Menu Items ---
|
||||
|
||||
async def _menu_items_full(ctx: dict, **kw: Any) -> str:
|
||||
return await _sub_settings_full(ctx, "menu_items-row", "menu_items-header-child",
|
||||
"defpage_menu_items_page", "bars", "Menu Items")
|
||||
|
||||
|
||||
async def _menu_items_oob(ctx: dict, **kw: Any) -> str:
|
||||
return await _sub_settings_oob(ctx, "menu_items-row", "menu_items-header-child",
|
||||
"defpage_menu_items_page", "bars", "Menu Items")
|
||||
|
||||
|
||||
# --- Tag Groups ---
|
||||
|
||||
async def _tag_groups_full(ctx: dict, **kw: Any) -> str:
|
||||
return await _sub_settings_full(ctx, "tag-groups-row", "tag-groups-header-child",
|
||||
"defpage_tag_groups_page", "tags", "Tag Groups")
|
||||
|
||||
|
||||
async def _tag_groups_oob(ctx: dict, **kw: Any) -> str:
|
||||
return await _sub_settings_oob(ctx, "tag-groups-row", "tag-groups-header-child",
|
||||
"defpage_tag_groups_page", "tags", "Tag Groups")
|
||||
|
||||
|
||||
# --- Tag Group Edit ---
|
||||
|
||||
async def _tag_group_edit_full(ctx: dict, **kw: Any) -> str:
|
||||
from quart import request, url_for as qurl
|
||||
from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
|
||||
from shared.sx.parser import SxExpr
|
||||
g_id = (request.view_args or {}).get("id")
|
||||
return await render_to_sx_with_env("sub-settings-layout-full", _ctx_to_env(ctx),
|
||||
settings_header=SxExpr(await _settings_header_sx(ctx)),
|
||||
sub_header=SxExpr(await _sub_settings_header_sx(
|
||||
"tag-groups-row", "tag-groups-header-child",
|
||||
qurl("defpage_tag_group_edit", id=g_id),
|
||||
"tags", "Tag Groups", ctx)))
|
||||
|
||||
|
||||
async def _tag_group_edit_oob(ctx: dict, **kw: Any) -> str:
|
||||
from quart import request, url_for as qurl
|
||||
from shared.sx.helpers import oob_header_sx, render_to_sx
|
||||
from shared.sx.parser import SxExpr
|
||||
g_id = (request.view_args or {}).get("id")
|
||||
settings_hdr_oob = await _settings_header_sx(ctx, oob=True)
|
||||
sub_hdr = await _sub_settings_header_sx(
|
||||
"tag-groups-row", "tag-groups-header-child",
|
||||
qurl("defpage_tag_group_edit", id=g_id),
|
||||
"tags", "Tag Groups", ctx)
|
||||
sub_oob = await oob_header_sx("root-settings-header-child", "tag-groups-header-child", sub_hdr)
|
||||
return await render_to_sx("sub-settings-layout-oob",
|
||||
settings_header_oob=SxExpr(settings_hdr_oob),
|
||||
sub_header_oob=SxExpr(sub_oob))
|
||||
177
blog/sxc/pages/renders.py
Normal file
177
blog/sxc/pages/renders.py
Normal file
@@ -0,0 +1,177 @@
|
||||
"""Blog editor panel rendering."""
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
async def render_editor_panel(save_error: str | None = None, is_page: bool = False) -> str:
|
||||
"""Build the WYSIWYG editor panel HTML for new post/page creation."""
|
||||
import os
|
||||
from quart import url_for as qurl, current_app
|
||||
from shared.browser.app.csrf import generate_csrf_token
|
||||
from shared.sx.helpers import render_to_sx
|
||||
|
||||
csrf = generate_csrf_token()
|
||||
asset_url_fn = current_app.jinja_env.globals.get("asset_url", lambda p: "")
|
||||
editor_css = asset_url_fn("scripts/editor.css")
|
||||
editor_js = asset_url_fn("scripts/editor.js")
|
||||
sx_editor_js = asset_url_fn("scripts/sx-editor.js")
|
||||
|
||||
upload_image_url = qurl("blog.editor_api.upload_image")
|
||||
upload_media_url = qurl("blog.editor_api.upload_media")
|
||||
upload_file_url = qurl("blog.editor_api.upload_file")
|
||||
oembed_url = qurl("blog.editor_api.oembed_proxy")
|
||||
snippets_url = qurl("blog.editor_api.list_snippets")
|
||||
unsplash_key = os.environ.get("UNSPLASH_ACCESS_KEY", "")
|
||||
|
||||
title_placeholder = "Page title..." if is_page else "Post title..."
|
||||
create_label = "Create Page" if is_page else "Create Post"
|
||||
|
||||
parts: list[str] = []
|
||||
|
||||
if save_error:
|
||||
parts.append(await render_to_sx("blog-editor-error", error=str(save_error)))
|
||||
|
||||
parts.append(await render_to_sx("blog-editor-form",
|
||||
csrf=csrf, title_placeholder=title_placeholder,
|
||||
create_label=create_label,
|
||||
))
|
||||
|
||||
parts.append(await render_to_sx("blog-editor-styles", css_href=editor_css))
|
||||
parts.append(await render_to_sx("sx-editor-styles"))
|
||||
|
||||
init_js = (
|
||||
"console.log('[EDITOR-DEBUG] init script running');\n"
|
||||
"(function() {\n"
|
||||
" console.log('[EDITOR-DEBUG] IIFE entered, mountEditor=', typeof window.mountEditor);\n"
|
||||
" function init() {\n"
|
||||
" var csrfToken = document.querySelector('input[name=\"csrf_token\"]').value;\n"
|
||||
f" var uploadUrl = '{upload_image_url}';\n"
|
||||
" var uploadUrls = {\n"
|
||||
" image: uploadUrl,\n"
|
||||
f" media: '{upload_media_url}',\n"
|
||||
f" file: '{upload_file_url}',\n"
|
||||
" };\n"
|
||||
"\n"
|
||||
" var fileInput = document.getElementById('feature-image-file');\n"
|
||||
" var addBtn = document.getElementById('feature-image-add-btn');\n"
|
||||
" var deleteBtn = document.getElementById('feature-image-delete-btn');\n"
|
||||
" var preview = document.getElementById('feature-image-preview');\n"
|
||||
" var emptyState = document.getElementById('feature-image-empty');\n"
|
||||
" var filledState = document.getElementById('feature-image-filled');\n"
|
||||
" var hiddenUrl = document.getElementById('feature-image-input');\n"
|
||||
" var hiddenCaption = document.getElementById('feature-image-caption-input');\n"
|
||||
" var captionInput = document.getElementById('feature-image-caption');\n"
|
||||
" var uploading = document.getElementById('feature-image-uploading');\n"
|
||||
"\n"
|
||||
" function showFilled(url) {\n"
|
||||
" preview.src = url;\n"
|
||||
" hiddenUrl.value = url;\n"
|
||||
" emptyState.classList.add('hidden');\n"
|
||||
" filledState.classList.remove('hidden');\n"
|
||||
" uploading.classList.add('hidden');\n"
|
||||
" }\n"
|
||||
"\n"
|
||||
" function showEmpty() {\n"
|
||||
" preview.src = '';\n"
|
||||
" hiddenUrl.value = '';\n"
|
||||
" hiddenCaption.value = '';\n"
|
||||
" captionInput.value = '';\n"
|
||||
" emptyState.classList.remove('hidden');\n"
|
||||
" filledState.classList.add('hidden');\n"
|
||||
" uploading.classList.add('hidden');\n"
|
||||
" }\n"
|
||||
"\n"
|
||||
" function uploadFile(file) {\n"
|
||||
" emptyState.classList.add('hidden');\n"
|
||||
" uploading.classList.remove('hidden');\n"
|
||||
" var fd = new FormData();\n"
|
||||
" fd.append('file', file);\n"
|
||||
" fetch(uploadUrl, {\n"
|
||||
" method: 'POST',\n"
|
||||
" body: fd,\n"
|
||||
" headers: { 'X-CSRFToken': csrfToken },\n"
|
||||
" })\n"
|
||||
" .then(function(r) {\n"
|
||||
" if (!r.ok) throw new Error('Upload failed (' + r.status + ')');\n"
|
||||
" return r.json();\n"
|
||||
" })\n"
|
||||
" .then(function(data) {\n"
|
||||
" var url = data.images && data.images[0] && data.images[0].url;\n"
|
||||
" if (url) showFilled(url);\n"
|
||||
" else { showEmpty(); alert('Upload succeeded but no image URL returned.'); }\n"
|
||||
" })\n"
|
||||
" .catch(function(e) {\n"
|
||||
" showEmpty();\n"
|
||||
" alert(e.message);\n"
|
||||
" });\n"
|
||||
" }\n"
|
||||
"\n"
|
||||
" addBtn.addEventListener('click', function() { fileInput.click(); });\n"
|
||||
" preview.addEventListener('click', function() { fileInput.click(); });\n"
|
||||
" deleteBtn.addEventListener('click', function(e) {\n"
|
||||
" e.stopPropagation();\n"
|
||||
" showEmpty();\n"
|
||||
" });\n"
|
||||
" fileInput.addEventListener('change', function() {\n"
|
||||
" if (fileInput.files && fileInput.files[0]) {\n"
|
||||
" uploadFile(fileInput.files[0]);\n"
|
||||
" fileInput.value = '';\n"
|
||||
" }\n"
|
||||
" });\n"
|
||||
" captionInput.addEventListener('input', function() {\n"
|
||||
" hiddenCaption.value = captionInput.value;\n"
|
||||
" });\n"
|
||||
"\n"
|
||||
" var excerpt = document.querySelector('textarea[name=\"custom_excerpt\"]');\n"
|
||||
" function autoResize() {\n"
|
||||
" excerpt.style.height = 'auto';\n"
|
||||
" excerpt.style.height = excerpt.scrollHeight + 'px';\n"
|
||||
" }\n"
|
||||
" excerpt.addEventListener('input', autoResize);\n"
|
||||
" autoResize();\n"
|
||||
"\n"
|
||||
" window.mountEditor('lexical-editor', {\n"
|
||||
" initialJson: null,\n"
|
||||
" csrfToken: csrfToken,\n"
|
||||
" uploadUrls: uploadUrls,\n"
|
||||
f" oembedUrl: '{oembed_url}',\n"
|
||||
f" unsplashApiKey: '{unsplash_key}',\n"
|
||||
f" snippetsUrl: '{snippets_url}',\n"
|
||||
" });\n"
|
||||
"\n"
|
||||
" if (typeof SxEditor !== 'undefined') {\n"
|
||||
" SxEditor.mount('sx-editor', {\n"
|
||||
" initialSx: (document.getElementById('sx-content-input') || {}).value || null,\n"
|
||||
" csrfToken: csrfToken,\n"
|
||||
" uploadUrls: uploadUrls,\n"
|
||||
f" oembedUrl: '{oembed_url}',\n"
|
||||
" onChange: function(sx) {\n"
|
||||
" document.getElementById('sx-content-input').value = sx;\n"
|
||||
" }\n"
|
||||
" });\n"
|
||||
" }\n"
|
||||
"\n"
|
||||
" document.addEventListener('keydown', function(e) {\n"
|
||||
" if ((e.ctrlKey || e.metaKey) && e.key === 's') {\n"
|
||||
" e.preventDefault();\n"
|
||||
" document.getElementById('post-new-form').requestSubmit();\n"
|
||||
" }\n"
|
||||
" });\n"
|
||||
" }\n"
|
||||
"\n"
|
||||
" if (typeof window.mountEditor === 'function') {\n"
|
||||
" init();\n"
|
||||
" } else {\n"
|
||||
" var _t = setInterval(function() {\n"
|
||||
" if (typeof window.mountEditor === 'function') { clearInterval(_t); init(); }\n"
|
||||
" }, 50);\n"
|
||||
" }\n"
|
||||
"})();\n"
|
||||
)
|
||||
parts.append(await render_to_sx("blog-editor-scripts",
|
||||
js_src=editor_js,
|
||||
sx_editor_js_src=sx_editor_js,
|
||||
init_js=init_js))
|
||||
|
||||
from shared.sx.parser import SxExpr
|
||||
return await render_to_sx("blog-editor-panel",
|
||||
parts=SxExpr("(<> " + " ".join(parts) + ")")) if parts else ""
|
||||
@@ -151,7 +151,7 @@ def register(url_prefix: str) -> Blueprint:
|
||||
page_config = await resolve_page_config(g.s, cart, calendar_entries, tickets)
|
||||
except ValueError as e:
|
||||
from shared.sx.page import get_template_context
|
||||
from sxc.pages import render_checkout_error_page
|
||||
from sxc.pages.renders import render_checkout_error_page
|
||||
tctx = await get_template_context()
|
||||
html = await render_checkout_error_page(tctx, error=str(e))
|
||||
return await make_response(html, 400)
|
||||
@@ -208,7 +208,7 @@ def register(url_prefix: str) -> Blueprint:
|
||||
|
||||
if not hosted_url:
|
||||
from shared.sx.page import get_template_context
|
||||
from sxc.pages import render_checkout_error_page
|
||||
from sxc.pages.renders import render_checkout_error_page
|
||||
tctx = await get_template_context()
|
||||
html = await render_checkout_error_page(tctx, error="No hosted checkout URL returned from SumUp.")
|
||||
return await make_response(html, 500)
|
||||
|
||||
@@ -73,7 +73,7 @@ def register(url_prefix: str) -> Blueprint:
|
||||
|
||||
if not hosted_url:
|
||||
from shared.sx.page import get_template_context
|
||||
from sxc.pages import render_checkout_error_page
|
||||
from sxc.pages.renders import render_checkout_error_page
|
||||
tctx = await get_template_context()
|
||||
html = await render_checkout_error_page(tctx, error="No hosted checkout URL returned from SumUp.")
|
||||
return await make_response(html, 500)
|
||||
|
||||
@@ -57,7 +57,7 @@ def register() -> Blueprint:
|
||||
if not order:
|
||||
return await make_response("Order not found", 404)
|
||||
from shared.sx.page import get_template_context
|
||||
from sxc.pages import render_order_page, render_order_oob
|
||||
from sxc.pages.renders import render_order_page, render_order_oob
|
||||
|
||||
ctx = await get_template_context()
|
||||
calendar_entries = ctx.get("calendar_entries")
|
||||
@@ -122,7 +122,7 @@ def register() -> Blueprint:
|
||||
|
||||
if not hosted_url:
|
||||
from shared.sx.page import get_template_context
|
||||
from sxc.pages import render_checkout_error_page
|
||||
from sxc.pages.renders import render_checkout_error_page
|
||||
tctx = await get_template_context()
|
||||
html = await render_checkout_error_page(tctx, error="No hosted checkout URL returned from SumUp when trying to reopen payment.", order=order)
|
||||
return await make_response(html, 500)
|
||||
|
||||
@@ -138,7 +138,7 @@ def register(url_prefix: str) -> Blueprint:
|
||||
orders = result.scalars().all()
|
||||
|
||||
from shared.sx.page import get_template_context
|
||||
from sxc.pages import (
|
||||
from sxc.pages.renders import (
|
||||
render_orders_page,
|
||||
render_orders_rows,
|
||||
render_orders_oob,
|
||||
|
||||
@@ -47,7 +47,7 @@ def register():
|
||||
g.page_config = SimpleNamespace(**raw_pc) if raw_pc else None
|
||||
|
||||
from shared.sx.page import get_template_context
|
||||
from sxc.pages import render_cart_payments_panel
|
||||
from sxc.pages.renders import render_cart_payments_panel
|
||||
ctx = await get_template_context()
|
||||
html = await render_cart_payments_panel(ctx)
|
||||
return sx_response(html)
|
||||
|
||||
@@ -1,14 +1,10 @@
|
||||
"""Cart defpage setup — registers layouts and loads .sx pages."""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from markupsafe import escape
|
||||
from shared.sx.parser import SxExpr
|
||||
|
||||
|
||||
def setup_cart_pages() -> None:
|
||||
"""Register cart-specific layouts and load page definitions."""
|
||||
from .layouts import _register_cart_layouts
|
||||
_register_cart_layouts()
|
||||
_load_cart_page_files()
|
||||
|
||||
@@ -17,305 +13,3 @@ def _load_cart_page_files() -> None:
|
||||
import os
|
||||
from shared.sx.pages import load_page_dir
|
||||
load_page_dir(os.path.dirname(__file__), "cart")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Header helpers (still needed by layouts and render functions)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _ensure_post_ctx(ctx: dict, page_post: Any) -> dict:
|
||||
"""Ensure ctx has a 'post' dict from page_post DTO."""
|
||||
if ctx.get("post") or not page_post:
|
||||
return ctx
|
||||
return {**ctx, "post": {
|
||||
"id": getattr(page_post, "id", None),
|
||||
"slug": getattr(page_post, "slug", ""),
|
||||
"title": getattr(page_post, "title", ""),
|
||||
"feature_image": getattr(page_post, "feature_image", None),
|
||||
}}
|
||||
|
||||
|
||||
async def _ensure_container_nav(ctx: dict) -> dict:
|
||||
"""Fetch container_nav if not already present."""
|
||||
if ctx.get("container_nav"):
|
||||
return ctx
|
||||
post = ctx.get("post") or {}
|
||||
post_id = post.get("id")
|
||||
if not post_id:
|
||||
return ctx
|
||||
slug = post.get("slug", "")
|
||||
from shared.infrastructure.fragments import fetch_fragments
|
||||
nav_params = {
|
||||
"container_type": "page",
|
||||
"container_id": str(post_id),
|
||||
"post_slug": slug,
|
||||
}
|
||||
events_nav, market_nav = await fetch_fragments([
|
||||
("events", "container-nav", nav_params),
|
||||
("market", "container-nav", nav_params),
|
||||
], required=False)
|
||||
return {**ctx, "container_nav": events_nav + market_nav}
|
||||
|
||||
|
||||
async def _post_header_sx(ctx: dict, page_post: Any, *, oob: bool = False) -> str:
|
||||
from shared.sx.helpers import post_header_sx as _shared_post_header_sx
|
||||
ctx = _ensure_post_ctx(ctx, page_post)
|
||||
ctx = await _ensure_container_nav(ctx)
|
||||
return await _shared_post_header_sx(ctx, oob=oob)
|
||||
|
||||
|
||||
async def _cart_header_sx(ctx: dict, *, oob: bool = False) -> str:
|
||||
from shared.sx.helpers import render_to_sx, call_url
|
||||
return await render_to_sx(
|
||||
"menu-row-sx",
|
||||
id="cart-row", level=1, colour="sky",
|
||||
link_href=call_url(ctx, "cart_url", "/"),
|
||||
link_label="cart", icon="fa fa-shopping-cart",
|
||||
child_id="cart-header-child", oob=oob,
|
||||
)
|
||||
|
||||
|
||||
async def _page_cart_header_sx(ctx: dict, page_post: Any, *, oob: bool = False) -> str:
|
||||
from shared.sx.helpers import render_to_sx, call_url
|
||||
slug = page_post.slug if page_post else ""
|
||||
title = ((page_post.title if page_post else None) or "")[:160]
|
||||
label_parts = []
|
||||
if page_post and page_post.feature_image:
|
||||
label_parts.append(await render_to_sx("cart-page-label-img", src=page_post.feature_image))
|
||||
label_parts.append(f'(span "{escape(title)}")')
|
||||
label_sx = "(<> " + " ".join(label_parts) + ")"
|
||||
nav_sx = await render_to_sx("cart-all-carts-link", href=call_url(ctx, "cart_url", "/"))
|
||||
return await render_to_sx(
|
||||
"menu-row-sx",
|
||||
id="page-cart-row", level=2, colour="sky",
|
||||
link_href=call_url(ctx, "cart_url", f"/{slug}/"),
|
||||
link_label_content=SxExpr(label_sx),
|
||||
nav=SxExpr(nav_sx), oob=oob,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Order serialization helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _serialize_order(order: Any) -> dict:
|
||||
from shared.infrastructure.urls import market_product_url
|
||||
created = order.created_at.strftime("%-d %b %Y, %H:%M") if order.created_at else "\u2014"
|
||||
items = []
|
||||
if order.items:
|
||||
for item in order.items:
|
||||
items.append({
|
||||
"product_image": item.product_image,
|
||||
"product_title": item.product_title or "Unknown product",
|
||||
"product_id": item.product_id,
|
||||
"product_slug": item.product_slug,
|
||||
"product_url": market_product_url(item.product_slug),
|
||||
"quantity": item.quantity,
|
||||
"unit_price_formatted": f"{item.unit_price or 0:.2f}",
|
||||
"currency": item.currency or order.currency or "GBP",
|
||||
})
|
||||
return {
|
||||
"id": order.id,
|
||||
"status": order.status or "pending",
|
||||
"created_at_formatted": created,
|
||||
"description": order.description or "",
|
||||
"total_formatted": f"{order.total_amount or 0:.2f}",
|
||||
"total_amount": float(order.total_amount or 0),
|
||||
"currency": order.currency or "GBP",
|
||||
"items": items,
|
||||
}
|
||||
|
||||
|
||||
def _serialize_calendar_entry(e: Any) -> dict:
|
||||
st = e.state or ""
|
||||
ds = e.start_at.strftime("%-d %b %Y, %H:%M") if e.start_at else ""
|
||||
if e.end_at:
|
||||
ds += f" \u2013 {e.end_at.strftime('%-d %b %Y, %H:%M')}"
|
||||
return {"name": e.name, "state": st, "date_str": ds, "cost_formatted": f"{e.cost or 0:.2f}"}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Render functions (called by routes) — delegate header composition to .sx
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def render_orders_page(ctx, orders, page, total_pages, search, search_count, url_for_fn, qs_fn):
|
||||
from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, search_desktop_sx, search_mobile_sx, full_page_sx
|
||||
from shared.utils import route_prefix
|
||||
ctx["search"] = search
|
||||
ctx["search_count"] = search_count
|
||||
pfx = route_prefix()
|
||||
list_url = pfx + url_for_fn("orders.list_orders")
|
||||
detail_url_prefix = pfx + url_for_fn("orders.order.order_detail", order_id=0).rsplit("0/", 1)[0]
|
||||
order_dicts = [_serialize_order(o) for o in orders]
|
||||
content = await render_to_sx("orders-list-content", orders=order_dicts,
|
||||
page=page, total_pages=total_pages, rows_url=list_url, detail_url_prefix=detail_url_prefix)
|
||||
header_rows = await render_to_sx_with_env("cart-orders-layout-full", _ctx_to_env(ctx),
|
||||
list_url=list_url,
|
||||
)
|
||||
filt = await render_to_sx("order-list-header", search_mobile=SxExpr(await search_mobile_sx(ctx)))
|
||||
return await full_page_sx(ctx, header_rows=header_rows, filter=filt,
|
||||
aside=await search_desktop_sx(ctx), content=content)
|
||||
|
||||
|
||||
async def render_orders_rows(ctx, orders, page, total_pages, url_for_fn, qs_fn):
|
||||
from shared.sx.helpers import render_to_sx
|
||||
from shared.utils import route_prefix
|
||||
pfx = route_prefix()
|
||||
list_url = pfx + url_for_fn("orders.list_orders")
|
||||
detail_url_prefix = pfx + url_for_fn("orders.order.order_detail", order_id=0).rsplit("0/", 1)[0]
|
||||
order_dicts = [_serialize_order(o) for o in orders]
|
||||
parts = []
|
||||
for od in order_dicts:
|
||||
parts.append(await render_to_sx("order-row-pair", order=od, detail_url_prefix=detail_url_prefix))
|
||||
next_scroll = ""
|
||||
if page < total_pages:
|
||||
next_url = list_url + qs_fn(page=page + 1)
|
||||
next_scroll = await render_to_sx("infinite-scroll", url=next_url, page=page,
|
||||
total_pages=total_pages, id_prefix="orders", colspan=5)
|
||||
else:
|
||||
next_scroll = await render_to_sx("order-end-row")
|
||||
return await render_to_sx("cart-orders-rows",
|
||||
rows=SxExpr("(<> " + " ".join(parts) + ")"),
|
||||
next_scroll=SxExpr(next_scroll),
|
||||
)
|
||||
|
||||
|
||||
async def render_orders_oob(ctx, orders, page, total_pages, search, search_count, url_for_fn, qs_fn):
|
||||
from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, search_desktop_sx, search_mobile_sx, oob_page_sx
|
||||
from shared.utils import route_prefix
|
||||
ctx["search"] = search
|
||||
ctx["search_count"] = search_count
|
||||
pfx = route_prefix()
|
||||
list_url = pfx + url_for_fn("orders.list_orders")
|
||||
detail_url_prefix = pfx + url_for_fn("orders.order.order_detail", order_id=0).rsplit("0/", 1)[0]
|
||||
order_dicts = [_serialize_order(o) for o in orders]
|
||||
content = await render_to_sx("orders-list-content", orders=order_dicts,
|
||||
page=page, total_pages=total_pages, rows_url=list_url, detail_url_prefix=detail_url_prefix)
|
||||
oobs = await render_to_sx_with_env("cart-orders-layout-oob", _ctx_to_env(ctx, oob=True),
|
||||
list_url=list_url,
|
||||
)
|
||||
filt = await render_to_sx("order-list-header", search_mobile=SxExpr(await search_mobile_sx(ctx)))
|
||||
return await oob_page_sx(oobs=oobs, filter=filt, aside=await search_desktop_sx(ctx), content=content)
|
||||
|
||||
|
||||
async def render_order_page(ctx, order, calendar_entries, url_for_fn):
|
||||
from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, full_page_sx
|
||||
from shared.utils import route_prefix
|
||||
from shared.browser.app.csrf import generate_csrf_token
|
||||
pfx = route_prefix()
|
||||
detail_url = pfx + url_for_fn("orders.order.order_detail", order_id=order.id)
|
||||
list_url = pfx + url_for_fn("orders.list_orders")
|
||||
recheck_url = pfx + url_for_fn("orders.order.order_recheck", order_id=order.id)
|
||||
pay_url = pfx + url_for_fn("orders.order.order_pay", order_id=order.id)
|
||||
order_data = _serialize_order(order)
|
||||
cal_data = [_serialize_calendar_entry(e) for e in (calendar_entries or [])]
|
||||
main = await render_to_sx("order-detail-content", order=order_data, calendar_entries=cal_data)
|
||||
filt = await render_to_sx("order-detail-filter-content", order=order_data,
|
||||
list_url=list_url, recheck_url=recheck_url, pay_url=pay_url, csrf=generate_csrf_token())
|
||||
header_rows = await render_to_sx_with_env("cart-order-detail-layout-full", _ctx_to_env(ctx),
|
||||
list_url=list_url, detail_url=detail_url,
|
||||
order_label=f"Order {order.id}",
|
||||
)
|
||||
return await full_page_sx(ctx, header_rows=header_rows, filter=filt, content=main)
|
||||
|
||||
|
||||
async def render_order_oob(ctx, order, calendar_entries, url_for_fn):
|
||||
from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, oob_page_sx
|
||||
from shared.utils import route_prefix
|
||||
from shared.browser.app.csrf import generate_csrf_token
|
||||
pfx = route_prefix()
|
||||
detail_url = pfx + url_for_fn("orders.order.order_detail", order_id=order.id)
|
||||
list_url = pfx + url_for_fn("orders.list_orders")
|
||||
recheck_url = pfx + url_for_fn("orders.order.order_recheck", order_id=order.id)
|
||||
pay_url = pfx + url_for_fn("orders.order.order_pay", order_id=order.id)
|
||||
order_data = _serialize_order(order)
|
||||
cal_data = [_serialize_calendar_entry(e) for e in (calendar_entries or [])]
|
||||
main = await render_to_sx("order-detail-content", order=order_data, calendar_entries=cal_data)
|
||||
filt = await render_to_sx("order-detail-filter-content", order=order_data,
|
||||
list_url=list_url, recheck_url=recheck_url, pay_url=pay_url, csrf=generate_csrf_token())
|
||||
oobs = await render_to_sx_with_env("cart-order-detail-layout-oob", _ctx_to_env(ctx, oob=True),
|
||||
detail_url=detail_url,
|
||||
order_label=f"Order {order.id}",
|
||||
)
|
||||
return await oob_page_sx(oobs=oobs, filter=filt, content=main)
|
||||
|
||||
|
||||
async def render_checkout_error_page(ctx, error=None, order=None):
|
||||
from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, full_page_sx
|
||||
from shared.infrastructure.urls import cart_url
|
||||
err_msg = error or "Unexpected error while creating the hosted checkout session."
|
||||
order_sx = await render_to_sx("checkout-error-order-id", oid=f"#{order.id}") if order else None
|
||||
hdr = await render_to_sx_with_env("layout-root-full", _ctx_to_env(ctx))
|
||||
filt = await render_to_sx("checkout-error-header")
|
||||
content = await render_to_sx("checkout-error-content", msg=err_msg,
|
||||
order=SxExpr(order_sx) if order_sx else None, back_url=cart_url("/"))
|
||||
return await full_page_sx(ctx, header_rows=hdr, filter=filt, content=content)
|
||||
|
||||
|
||||
async def render_cart_payments_panel(ctx):
|
||||
from shared.sx.helpers import render_to_sx
|
||||
page_config = ctx.get("page_config")
|
||||
pc_data = None
|
||||
if page_config:
|
||||
pc_data = {
|
||||
"sumup_api_key": bool(getattr(page_config, "sumup_api_key", None)),
|
||||
"sumup_merchant_code": getattr(page_config, "sumup_merchant_code", None) or "",
|
||||
"sumup_checkout_prefix": getattr(page_config, "sumup_checkout_prefix", None) or "",
|
||||
}
|
||||
return await render_to_sx("cart-payments-content", page_config=pc_data)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Layouts — thin wrappers delegating to .sx defcomps in cart/sx/layouts.sx
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _register_cart_layouts() -> None:
|
||||
from shared.sx.layouts import register_custom_layout
|
||||
register_custom_layout("cart-page", _cart_page_full, _cart_page_oob)
|
||||
register_custom_layout("cart-admin", _cart_admin_full, _cart_admin_oob)
|
||||
|
||||
|
||||
async def _cart_page_full(ctx: dict, **kw: Any) -> str:
|
||||
from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
|
||||
page_post = ctx.get("page_post")
|
||||
env = _ctx_to_env(ctx)
|
||||
return await render_to_sx_with_env("cart-page-layout-full", env,
|
||||
cart_row=SxExpr(await _cart_header_sx(ctx)),
|
||||
page_cart_row=SxExpr(await _page_cart_header_sx(ctx, page_post)),
|
||||
)
|
||||
|
||||
|
||||
async def _cart_page_oob(ctx: dict, **kw: Any) -> str:
|
||||
from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env, root_header_sx
|
||||
page_post = ctx.get("page_post")
|
||||
env = _ctx_to_env(ctx, oob=True)
|
||||
return await render_to_sx_with_env("cart-page-layout-oob", env,
|
||||
root_header_oob=SxExpr(await root_header_sx(ctx, oob=True)),
|
||||
cart_row_oob=SxExpr(await _cart_header_sx(ctx, oob=True)),
|
||||
page_cart_row=SxExpr(await _page_cart_header_sx(ctx, page_post)),
|
||||
)
|
||||
|
||||
|
||||
async def _cart_admin_full(ctx: dict, **kw: Any) -> str:
|
||||
from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
|
||||
page_post = ctx.get("page_post")
|
||||
selected = kw.get("selected", "")
|
||||
env = _ctx_to_env(ctx)
|
||||
return await render_to_sx_with_env("cart-admin-layout-full", env,
|
||||
post_header=SxExpr(await _post_header_sx(ctx, page_post)),
|
||||
admin_header=SxExpr(await _cart_page_admin_header_sx(ctx, page_post, selected=selected)),
|
||||
)
|
||||
|
||||
|
||||
async def _cart_admin_oob(ctx: dict, **kw: Any) -> str:
|
||||
page_post = ctx.get("page_post")
|
||||
selected = kw.get("selected", "")
|
||||
return await _cart_page_admin_header_sx(ctx, page_post, oob=True, selected=selected)
|
||||
|
||||
|
||||
async def _cart_page_admin_header_sx(ctx: dict, page_post: Any, *, oob: bool = False,
|
||||
selected: str = "") -> str:
|
||||
from shared.sx.helpers import post_admin_header_sx
|
||||
slug = page_post.slug if page_post else ""
|
||||
ctx = _ensure_post_ctx(ctx, page_post)
|
||||
return await post_admin_header_sx(ctx, slug, oob=oob, selected=selected)
|
||||
|
||||
138
cart/sxc/pages/layouts.py
Normal file
138
cart/sxc/pages/layouts.py
Normal file
@@ -0,0 +1,138 @@
|
||||
"""Cart layout registration and header builders."""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from markupsafe import escape
|
||||
from shared.sx.parser import SxExpr
|
||||
|
||||
|
||||
def _register_cart_layouts() -> None:
|
||||
from shared.sx.layouts import register_custom_layout
|
||||
register_custom_layout("cart-page", _cart_page_full, _cart_page_oob)
|
||||
register_custom_layout("cart-admin", _cart_admin_full, _cart_admin_oob)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Header helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _ensure_post_ctx(ctx: dict, page_post: Any) -> dict:
|
||||
"""Ensure ctx has a 'post' dict from page_post DTO."""
|
||||
if ctx.get("post") or not page_post:
|
||||
return ctx
|
||||
return {**ctx, "post": {
|
||||
"id": getattr(page_post, "id", None),
|
||||
"slug": getattr(page_post, "slug", ""),
|
||||
"title": getattr(page_post, "title", ""),
|
||||
"feature_image": getattr(page_post, "feature_image", None),
|
||||
}}
|
||||
|
||||
|
||||
async def _ensure_container_nav(ctx: dict) -> dict:
|
||||
"""Fetch container_nav if not already present."""
|
||||
if ctx.get("container_nav"):
|
||||
return ctx
|
||||
post = ctx.get("post") or {}
|
||||
post_id = post.get("id")
|
||||
if not post_id:
|
||||
return ctx
|
||||
slug = post.get("slug", "")
|
||||
from shared.infrastructure.fragments import fetch_fragments
|
||||
nav_params = {
|
||||
"container_type": "page",
|
||||
"container_id": str(post_id),
|
||||
"post_slug": slug,
|
||||
}
|
||||
events_nav, market_nav = await fetch_fragments([
|
||||
("events", "container-nav", nav_params),
|
||||
("market", "container-nav", nav_params),
|
||||
], required=False)
|
||||
return {**ctx, "container_nav": events_nav + market_nav}
|
||||
|
||||
|
||||
async def _post_header_sx(ctx: dict, page_post: Any, *, oob: bool = False) -> str:
|
||||
from shared.sx.helpers import post_header_sx as _shared_post_header_sx
|
||||
ctx = _ensure_post_ctx(ctx, page_post)
|
||||
ctx = await _ensure_container_nav(ctx)
|
||||
return await _shared_post_header_sx(ctx, oob=oob)
|
||||
|
||||
|
||||
async def _cart_header_sx(ctx: dict, *, oob: bool = False) -> str:
|
||||
from shared.sx.helpers import render_to_sx, call_url
|
||||
return await render_to_sx(
|
||||
"menu-row-sx",
|
||||
id="cart-row", level=1, colour="sky",
|
||||
link_href=call_url(ctx, "cart_url", "/"),
|
||||
link_label="cart", icon="fa fa-shopping-cart",
|
||||
child_id="cart-header-child", oob=oob,
|
||||
)
|
||||
|
||||
|
||||
async def _page_cart_header_sx(ctx: dict, page_post: Any, *, oob: bool = False) -> str:
|
||||
from shared.sx.helpers import render_to_sx, call_url
|
||||
slug = page_post.slug if page_post else ""
|
||||
title = ((page_post.title if page_post else None) or "")[:160]
|
||||
label_parts = []
|
||||
if page_post and page_post.feature_image:
|
||||
label_parts.append(await render_to_sx("cart-page-label-img", src=page_post.feature_image))
|
||||
label_parts.append(f'(span "{escape(title)}")')
|
||||
label_sx = "(<> " + " ".join(label_parts) + ")"
|
||||
nav_sx = await render_to_sx("cart-all-carts-link", href=call_url(ctx, "cart_url", "/"))
|
||||
return await render_to_sx(
|
||||
"menu-row-sx",
|
||||
id="page-cart-row", level=2, colour="sky",
|
||||
link_href=call_url(ctx, "cart_url", f"/{slug}/"),
|
||||
link_label_content=SxExpr(label_sx),
|
||||
nav=SxExpr(nav_sx), oob=oob,
|
||||
)
|
||||
|
||||
|
||||
async def _cart_page_admin_header_sx(ctx: dict, page_post: Any, *, oob: bool = False,
|
||||
selected: str = "") -> str:
|
||||
from shared.sx.helpers import post_admin_header_sx
|
||||
slug = page_post.slug if page_post else ""
|
||||
ctx = _ensure_post_ctx(ctx, page_post)
|
||||
return await post_admin_header_sx(ctx, slug, oob=oob, selected=selected)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Layout functions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def _cart_page_full(ctx: dict, **kw: Any) -> str:
|
||||
from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
|
||||
page_post = ctx.get("page_post")
|
||||
env = _ctx_to_env(ctx)
|
||||
return await render_to_sx_with_env("cart-page-layout-full", env,
|
||||
cart_row=SxExpr(await _cart_header_sx(ctx)),
|
||||
page_cart_row=SxExpr(await _page_cart_header_sx(ctx, page_post)),
|
||||
)
|
||||
|
||||
|
||||
async def _cart_page_oob(ctx: dict, **kw: Any) -> str:
|
||||
from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env, root_header_sx
|
||||
page_post = ctx.get("page_post")
|
||||
env = _ctx_to_env(ctx, oob=True)
|
||||
return await render_to_sx_with_env("cart-page-layout-oob", env,
|
||||
root_header_oob=SxExpr(await root_header_sx(ctx, oob=True)),
|
||||
cart_row_oob=SxExpr(await _cart_header_sx(ctx, oob=True)),
|
||||
page_cart_row=SxExpr(await _page_cart_header_sx(ctx, page_post)),
|
||||
)
|
||||
|
||||
|
||||
async def _cart_admin_full(ctx: dict, **kw: Any) -> str:
|
||||
from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
|
||||
page_post = ctx.get("page_post")
|
||||
selected = kw.get("selected", "")
|
||||
env = _ctx_to_env(ctx)
|
||||
return await render_to_sx_with_env("cart-admin-layout-full", env,
|
||||
post_header=SxExpr(await _post_header_sx(ctx, page_post)),
|
||||
admin_header=SxExpr(await _cart_page_admin_header_sx(ctx, page_post, selected=selected)),
|
||||
)
|
||||
|
||||
|
||||
async def _cart_admin_oob(ctx: dict, **kw: Any) -> str:
|
||||
page_post = ctx.get("page_post")
|
||||
selected = kw.get("selected", "")
|
||||
return await _cart_page_admin_header_sx(ctx, page_post, oob=True, selected=selected)
|
||||
133
cart/sxc/pages/renders.py
Normal file
133
cart/sxc/pages/renders.py
Normal file
@@ -0,0 +1,133 @@
|
||||
"""Cart render functions — called from bp routes."""
|
||||
from __future__ import annotations
|
||||
|
||||
from shared.sx.parser import SxExpr
|
||||
|
||||
from .utils import _serialize_order, _serialize_calendar_entry
|
||||
|
||||
|
||||
async def render_orders_page(ctx, orders, page, total_pages, search, search_count, url_for_fn, qs_fn):
|
||||
from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, search_desktop_sx, search_mobile_sx, full_page_sx
|
||||
from shared.utils import route_prefix
|
||||
ctx["search"] = search
|
||||
ctx["search_count"] = search_count
|
||||
pfx = route_prefix()
|
||||
list_url = pfx + url_for_fn("orders.list_orders")
|
||||
detail_url_prefix = pfx + url_for_fn("orders.order.order_detail", order_id=0).rsplit("0/", 1)[0]
|
||||
order_dicts = [_serialize_order(o) for o in orders]
|
||||
content = await render_to_sx("orders-list-content", orders=order_dicts,
|
||||
page=page, total_pages=total_pages, rows_url=list_url, detail_url_prefix=detail_url_prefix)
|
||||
header_rows = await render_to_sx_with_env("cart-orders-layout-full", _ctx_to_env(ctx),
|
||||
list_url=list_url,
|
||||
)
|
||||
filt = await render_to_sx("order-list-header", search_mobile=SxExpr(await search_mobile_sx(ctx)))
|
||||
return await full_page_sx(ctx, header_rows=header_rows, filter=filt,
|
||||
aside=await search_desktop_sx(ctx), content=content)
|
||||
|
||||
|
||||
async def render_orders_rows(ctx, orders, page, total_pages, url_for_fn, qs_fn):
|
||||
from shared.sx.helpers import render_to_sx
|
||||
from shared.utils import route_prefix
|
||||
pfx = route_prefix()
|
||||
list_url = pfx + url_for_fn("orders.list_orders")
|
||||
detail_url_prefix = pfx + url_for_fn("orders.order.order_detail", order_id=0).rsplit("0/", 1)[0]
|
||||
order_dicts = [_serialize_order(o) for o in orders]
|
||||
parts = []
|
||||
for od in order_dicts:
|
||||
parts.append(await render_to_sx("order-row-pair", order=od, detail_url_prefix=detail_url_prefix))
|
||||
next_scroll = ""
|
||||
if page < total_pages:
|
||||
next_url = list_url + qs_fn(page=page + 1)
|
||||
next_scroll = await render_to_sx("infinite-scroll", url=next_url, page=page,
|
||||
total_pages=total_pages, id_prefix="orders", colspan=5)
|
||||
else:
|
||||
next_scroll = await render_to_sx("order-end-row")
|
||||
return await render_to_sx("cart-orders-rows",
|
||||
rows=SxExpr("(<> " + " ".join(parts) + ")"),
|
||||
next_scroll=SxExpr(next_scroll),
|
||||
)
|
||||
|
||||
|
||||
async def render_orders_oob(ctx, orders, page, total_pages, search, search_count, url_for_fn, qs_fn):
|
||||
from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, search_desktop_sx, search_mobile_sx, oob_page_sx
|
||||
from shared.utils import route_prefix
|
||||
ctx["search"] = search
|
||||
ctx["search_count"] = search_count
|
||||
pfx = route_prefix()
|
||||
list_url = pfx + url_for_fn("orders.list_orders")
|
||||
detail_url_prefix = pfx + url_for_fn("orders.order.order_detail", order_id=0).rsplit("0/", 1)[0]
|
||||
order_dicts = [_serialize_order(o) for o in orders]
|
||||
content = await render_to_sx("orders-list-content", orders=order_dicts,
|
||||
page=page, total_pages=total_pages, rows_url=list_url, detail_url_prefix=detail_url_prefix)
|
||||
oobs = await render_to_sx_with_env("cart-orders-layout-oob", _ctx_to_env(ctx, oob=True),
|
||||
list_url=list_url,
|
||||
)
|
||||
filt = await render_to_sx("order-list-header", search_mobile=SxExpr(await search_mobile_sx(ctx)))
|
||||
return await oob_page_sx(oobs=oobs, filter=filt, aside=await search_desktop_sx(ctx), content=content)
|
||||
|
||||
|
||||
async def render_order_page(ctx, order, calendar_entries, url_for_fn):
|
||||
from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, full_page_sx
|
||||
from shared.utils import route_prefix
|
||||
from shared.browser.app.csrf import generate_csrf_token
|
||||
pfx = route_prefix()
|
||||
detail_url = pfx + url_for_fn("orders.order.order_detail", order_id=order.id)
|
||||
list_url = pfx + url_for_fn("orders.list_orders")
|
||||
recheck_url = pfx + url_for_fn("orders.order.order_recheck", order_id=order.id)
|
||||
pay_url = pfx + url_for_fn("orders.order.order_pay", order_id=order.id)
|
||||
order_data = _serialize_order(order)
|
||||
cal_data = [_serialize_calendar_entry(e) for e in (calendar_entries or [])]
|
||||
main = await render_to_sx("order-detail-content", order=order_data, calendar_entries=cal_data)
|
||||
filt = await render_to_sx("order-detail-filter-content", order=order_data,
|
||||
list_url=list_url, recheck_url=recheck_url, pay_url=pay_url, csrf=generate_csrf_token())
|
||||
header_rows = await render_to_sx_with_env("cart-order-detail-layout-full", _ctx_to_env(ctx),
|
||||
list_url=list_url, detail_url=detail_url,
|
||||
order_label=f"Order {order.id}",
|
||||
)
|
||||
return await full_page_sx(ctx, header_rows=header_rows, filter=filt, content=main)
|
||||
|
||||
|
||||
async def render_order_oob(ctx, order, calendar_entries, url_for_fn):
|
||||
from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, oob_page_sx
|
||||
from shared.utils import route_prefix
|
||||
from shared.browser.app.csrf import generate_csrf_token
|
||||
pfx = route_prefix()
|
||||
detail_url = pfx + url_for_fn("orders.order.order_detail", order_id=order.id)
|
||||
list_url = pfx + url_for_fn("orders.list_orders")
|
||||
recheck_url = pfx + url_for_fn("orders.order.order_recheck", order_id=order.id)
|
||||
pay_url = pfx + url_for_fn("orders.order.order_pay", order_id=order.id)
|
||||
order_data = _serialize_order(order)
|
||||
cal_data = [_serialize_calendar_entry(e) for e in (calendar_entries or [])]
|
||||
main = await render_to_sx("order-detail-content", order=order_data, calendar_entries=cal_data)
|
||||
filt = await render_to_sx("order-detail-filter-content", order=order_data,
|
||||
list_url=list_url, recheck_url=recheck_url, pay_url=pay_url, csrf=generate_csrf_token())
|
||||
oobs = await render_to_sx_with_env("cart-order-detail-layout-oob", _ctx_to_env(ctx, oob=True),
|
||||
detail_url=detail_url,
|
||||
order_label=f"Order {order.id}",
|
||||
)
|
||||
return await oob_page_sx(oobs=oobs, filter=filt, content=main)
|
||||
|
||||
|
||||
async def render_checkout_error_page(ctx, error=None, order=None):
|
||||
from shared.sx.helpers import render_to_sx, render_to_sx_with_env, _ctx_to_env, full_page_sx
|
||||
from shared.infrastructure.urls import cart_url
|
||||
err_msg = error or "Unexpected error while creating the hosted checkout session."
|
||||
order_sx = await render_to_sx("checkout-error-order-id", oid=f"#{order.id}") if order else None
|
||||
hdr = await render_to_sx_with_env("layout-root-full", _ctx_to_env(ctx))
|
||||
filt = await render_to_sx("checkout-error-header")
|
||||
content = await render_to_sx("checkout-error-content", msg=err_msg,
|
||||
order=SxExpr(order_sx) if order_sx else None, back_url=cart_url("/"))
|
||||
return await full_page_sx(ctx, header_rows=hdr, filter=filt, content=content)
|
||||
|
||||
|
||||
async def render_cart_payments_panel(ctx):
|
||||
from shared.sx.helpers import render_to_sx
|
||||
page_config = ctx.get("page_config")
|
||||
pc_data = None
|
||||
if page_config:
|
||||
pc_data = {
|
||||
"sumup_api_key": bool(getattr(page_config, "sumup_api_key", None)),
|
||||
"sumup_merchant_code": getattr(page_config, "sumup_merchant_code", None) or "",
|
||||
"sumup_checkout_prefix": getattr(page_config, "sumup_checkout_prefix", None) or "",
|
||||
}
|
||||
return await render_to_sx("cart-payments-content", page_config=pc_data)
|
||||
40
cart/sxc/pages/utils.py
Normal file
40
cart/sxc/pages/utils.py
Normal file
@@ -0,0 +1,40 @@
|
||||
"""Cart page utilities — serializers and formatters."""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
|
||||
def _serialize_order(order: Any) -> dict:
|
||||
from shared.infrastructure.urls import market_product_url
|
||||
created = order.created_at.strftime("%-d %b %Y, %H:%M") if order.created_at else "\u2014"
|
||||
items = []
|
||||
if order.items:
|
||||
for item in order.items:
|
||||
items.append({
|
||||
"product_image": item.product_image,
|
||||
"product_title": item.product_title or "Unknown product",
|
||||
"product_id": item.product_id,
|
||||
"product_slug": item.product_slug,
|
||||
"product_url": market_product_url(item.product_slug),
|
||||
"quantity": item.quantity,
|
||||
"unit_price_formatted": f"{item.unit_price or 0:.2f}",
|
||||
"currency": item.currency or order.currency or "GBP",
|
||||
})
|
||||
return {
|
||||
"id": order.id,
|
||||
"status": order.status or "pending",
|
||||
"created_at_formatted": created,
|
||||
"description": order.description or "",
|
||||
"total_formatted": f"{order.total_amount or 0:.2f}",
|
||||
"total_amount": float(order.total_amount or 0),
|
||||
"currency": order.currency or "GBP",
|
||||
"items": items,
|
||||
}
|
||||
|
||||
|
||||
def _serialize_calendar_entry(e: Any) -> dict:
|
||||
st = e.state or ""
|
||||
ds = e.start_at.strftime("%-d %b %Y, %H:%M") if e.start_at else ""
|
||||
if e.end_at:
|
||||
ds += f" \u2013 {e.end_at.strftime('%-d %b %Y, %H:%M')}"
|
||||
return {"name": e.name, "state": st, "date_str": ds, "cost_formatted": f"{e.cost or 0:.2f}"}
|
||||
@@ -46,7 +46,7 @@ async def _render_social_auth_page(component: str, title: str, **kwargs) -> str:
|
||||
"""Render an auth page with social layout — replaces sx_components helpers."""
|
||||
from shared.sx.helpers import render_to_sx
|
||||
from shared.sx.page import get_template_context
|
||||
from sxc.pages import _social_page
|
||||
from sxc.pages.utils import _social_page
|
||||
ctx = await get_template_context()
|
||||
content = await render_to_sx(component, **{k: v for k, v in kwargs.items() if v})
|
||||
return await _social_page(ctx, None, content=content, title=title)
|
||||
|
||||
@@ -33,7 +33,7 @@ async def _render_choose_username(*, actor=None, error="", username=""):
|
||||
from shared.sx.helpers import render_to_sx
|
||||
from shared.sx.parser import SxExpr
|
||||
from shared.sx.page import get_template_context
|
||||
from sxc.pages import _social_page
|
||||
from sxc.pages.utils import _social_page
|
||||
from markupsafe import escape
|
||||
|
||||
ctx = await get_template_context()
|
||||
|
||||
@@ -95,7 +95,7 @@ def register(url_prefix="/social"):
|
||||
|
||||
@bp.get("/search/page")
|
||||
async def search_page():
|
||||
from sxc.pages import _serialize_remote_actor, _serialize_actor
|
||||
from sxc.pages.utils import _serialize_remote_actor, _serialize_actor
|
||||
|
||||
actor = getattr(g, "_social_actor", None)
|
||||
query = request.args.get("q", "").strip()
|
||||
@@ -154,7 +154,7 @@ def register(url_prefix="/social"):
|
||||
|
||||
async def _actor_card_response(actor, remote_actor_url, is_followed):
|
||||
"""Re-render a single actor card after follow/unfollow via HTMX."""
|
||||
from sxc.pages import _serialize_remote_actor, _serialize_actor
|
||||
from sxc.pages.utils import _serialize_remote_actor, _serialize_actor
|
||||
|
||||
remote_dto = await services.federation.get_or_fetch_remote_actor(
|
||||
g.s, remote_actor_url,
|
||||
@@ -298,7 +298,7 @@ def register(url_prefix="/social"):
|
||||
|
||||
@bp.get("/following/page")
|
||||
async def following_list_page():
|
||||
from sxc.pages import _serialize_remote_actor, _serialize_actor
|
||||
from sxc.pages.utils import _serialize_remote_actor, _serialize_actor
|
||||
|
||||
actor = _require_actor()
|
||||
page = request.args.get("page", 1, type=int)
|
||||
@@ -320,7 +320,7 @@ def register(url_prefix="/social"):
|
||||
|
||||
@bp.get("/followers/page")
|
||||
async def followers_list_page():
|
||||
from sxc.pages import _serialize_remote_actor, _serialize_actor
|
||||
from sxc.pages.utils import _serialize_remote_actor, _serialize_actor
|
||||
|
||||
actor = _require_actor()
|
||||
page = request.args.get("page", 1, type=int)
|
||||
@@ -387,7 +387,7 @@ def register(url_prefix="/social"):
|
||||
|
||||
async def _render_timeline_items(items, timeline_type, actor, actor_id=None):
|
||||
"""Render timeline pagination items as SX fragment."""
|
||||
from sxc.pages import _serialize_timeline_item, _serialize_actor
|
||||
from sxc.pages.utils import _serialize_timeline_item, _serialize_actor
|
||||
|
||||
item_dicts = [_serialize_timeline_item(i) for i in items]
|
||||
actor_data = _serialize_actor(actor)
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
"""Federation defpage setup — registers layouts and loads .sx pages."""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
|
||||
def setup_federation_pages() -> None:
|
||||
"""Register federation-specific layouts and load page definitions."""
|
||||
@@ -16,82 +14,7 @@ def _load_federation_page_files() -> None:
|
||||
load_page_dir(os.path.dirname(__file__), "federation")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Layouts — .sx defcomps read free variables from env
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _register_federation_layouts() -> None:
|
||||
from shared.sx.layouts import register_custom_layout
|
||||
from .utils import _social_full, _social_oob
|
||||
register_custom_layout("social", _social_full, _social_oob)
|
||||
|
||||
|
||||
def _actor_data(ctx: dict) -> dict | None:
|
||||
actor = ctx.get("actor")
|
||||
if not actor:
|
||||
return None
|
||||
from services.federation_page import _serialize_actor
|
||||
return _serialize_actor(actor)
|
||||
|
||||
|
||||
async def _social_full(ctx: dict, **kw: Any) -> str:
|
||||
from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
|
||||
env = _ctx_to_env(ctx)
|
||||
env["actor"] = kw.get("actor") or _actor_data(ctx)
|
||||
return await render_to_sx_with_env("social-layout-full", env)
|
||||
|
||||
|
||||
async def _social_oob(ctx: dict, **kw: Any) -> str:
|
||||
from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
|
||||
env = _ctx_to_env(ctx, oob=True)
|
||||
env["actor"] = kw.get("actor") or _actor_data(ctx)
|
||||
return await render_to_sx_with_env("social-layout-oob", env)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers still used by route handlers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _serialize_actor(actor) -> dict | None:
|
||||
"""Serialize an actor profile to a dict for sx defcomps."""
|
||||
from services.federation_page import _serialize_actor as _impl
|
||||
return _impl(actor)
|
||||
|
||||
|
||||
def _serialize_timeline_item(item) -> dict:
|
||||
"""Serialize a timeline item DTO to a dict for sx defcomps."""
|
||||
from services.federation_page import _serialize_timeline_item as _impl
|
||||
return _impl(item)
|
||||
|
||||
|
||||
def _serialize_remote_actor(a) -> dict:
|
||||
"""Serialize a remote actor DTO to a dict for sx defcomps."""
|
||||
from services.federation_page import _serialize_remote_actor as _impl
|
||||
return _impl(a)
|
||||
|
||||
|
||||
async def _social_page(ctx: dict, actor, *, content: str,
|
||||
title: str = "Rose Ash", meta_html: str = "") -> str:
|
||||
"""Build a full social page with social header."""
|
||||
from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env, full_page_sx
|
||||
from markupsafe import escape
|
||||
|
||||
env = _ctx_to_env(ctx)
|
||||
env["actor"] = _serialize_actor(actor) if actor else None
|
||||
header_rows = await render_to_sx_with_env("social-layout-full", env)
|
||||
return await full_page_sx(ctx, header_rows=header_rows, content=content,
|
||||
meta_html=meta_html or f'<title>{escape(title)}</title>')
|
||||
|
||||
|
||||
def _get_actor():
|
||||
"""Return current user's actor or None."""
|
||||
from quart import g
|
||||
return getattr(g, "_social_actor", None)
|
||||
|
||||
|
||||
def _require_actor():
|
||||
"""Return current user's actor or abort 403."""
|
||||
from quart import abort
|
||||
actor = _get_actor()
|
||||
if not actor:
|
||||
abort(403, "You need to choose a federation username first")
|
||||
return actor
|
||||
|
||||
71
federation/sxc/pages/utils.py
Normal file
71
federation/sxc/pages/utils.py
Normal file
@@ -0,0 +1,71 @@
|
||||
"""Federation page utilities — serializers, actor helpers, social page builder."""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
|
||||
def _serialize_actor(actor) -> dict | None:
|
||||
"""Serialize an actor profile to a dict for sx defcomps."""
|
||||
from services.federation_page import _serialize_actor as _impl
|
||||
return _impl(actor)
|
||||
|
||||
|
||||
def _serialize_timeline_item(item) -> dict:
|
||||
"""Serialize a timeline item DTO to a dict for sx defcomps."""
|
||||
from services.federation_page import _serialize_timeline_item as _impl
|
||||
return _impl(item)
|
||||
|
||||
|
||||
def _serialize_remote_actor(a) -> dict:
|
||||
"""Serialize a remote actor DTO to a dict for sx defcomps."""
|
||||
from services.federation_page import _serialize_remote_actor as _impl
|
||||
return _impl(a)
|
||||
|
||||
|
||||
async def _social_page(ctx: dict, actor, *, content: str,
|
||||
title: str = "Rose Ash", meta_html: str = "") -> str:
|
||||
"""Build a full social page with social header."""
|
||||
from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env, full_page_sx
|
||||
from markupsafe import escape
|
||||
|
||||
env = _ctx_to_env(ctx)
|
||||
env["actor"] = _serialize_actor(actor) if actor else None
|
||||
header_rows = await render_to_sx_with_env("social-layout-full", env)
|
||||
return await full_page_sx(ctx, header_rows=header_rows, content=content,
|
||||
meta_html=meta_html or f'<title>{escape(title)}</title>')
|
||||
|
||||
|
||||
def _get_actor():
|
||||
"""Return current user's actor or None."""
|
||||
from quart import g
|
||||
return getattr(g, "_social_actor", None)
|
||||
|
||||
|
||||
def _require_actor():
|
||||
"""Return current user's actor or abort 403."""
|
||||
from quart import abort
|
||||
actor = _get_actor()
|
||||
if not actor:
|
||||
abort(403, "You need to choose a federation username first")
|
||||
return actor
|
||||
|
||||
|
||||
def _actor_data(ctx: dict) -> dict | None:
|
||||
actor = ctx.get("actor")
|
||||
if not actor:
|
||||
return None
|
||||
return _serialize_actor(actor)
|
||||
|
||||
|
||||
async def _social_full(ctx: dict, **kw: Any) -> str:
|
||||
from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
|
||||
env = _ctx_to_env(ctx)
|
||||
env["actor"] = kw.get("actor") or _actor_data(ctx)
|
||||
return await render_to_sx_with_env("social-layout-full", env)
|
||||
|
||||
|
||||
async def _social_oob(ctx: dict, **kw: Any) -> str:
|
||||
from shared.sx.helpers import render_to_sx_with_env, _ctx_to_env
|
||||
env = _ctx_to_env(ctx, oob=True)
|
||||
env["actor"] = kw.get("actor") or _actor_data(ctx)
|
||||
return await render_to_sx_with_env("social-layout-oob", env)
|
||||
@@ -14,7 +14,7 @@ def register(url_prefix: str = "/") -> Blueprint:
|
||||
"""Full page dashboard with last results."""
|
||||
from shared.sx.page import get_template_context
|
||||
from shared.browser.app.csrf import generate_csrf_token
|
||||
from sxc.pages import render_dashboard_page_sx
|
||||
from sxc.pages.renders import render_dashboard_page_sx
|
||||
import runner
|
||||
|
||||
ctx = await get_template_context()
|
||||
@@ -63,12 +63,12 @@ def register(url_prefix: str = "/") -> Blueprint:
|
||||
if is_htmx:
|
||||
# S-expression wire format — sx.js renders client-side
|
||||
from shared.sx.helpers import sx_response
|
||||
from sxc.pages import test_detail_sx
|
||||
from sxc.pages.renders import test_detail_sx
|
||||
return sx_response(await test_detail_sx(test))
|
||||
|
||||
# Full page render (direct navigation / refresh)
|
||||
from shared.sx.page import get_template_context
|
||||
from sxc.pages import render_test_detail_page_sx
|
||||
from sxc.pages.renders import render_test_detail_page_sx
|
||||
|
||||
ctx = await get_template_context()
|
||||
html = await render_test_detail_page_sx(ctx, test)
|
||||
@@ -78,7 +78,7 @@ def register(url_prefix: str = "/") -> Blueprint:
|
||||
async def results():
|
||||
"""HTMX partial — poll target for results table."""
|
||||
from shared.browser.app.csrf import generate_csrf_token
|
||||
from sxc.pages import render_results_partial_sx
|
||||
from sxc.pages.renders import render_results_partial_sx
|
||||
import runner
|
||||
|
||||
result = runner.get_results()
|
||||
|
||||
@@ -1,145 +1,13 @@
|
||||
"""Test service s-expression page components."""
|
||||
"""Test service defpage setup."""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
from shared.sx.jinja_bridge import load_service_components
|
||||
from shared.sx.helpers import render_to_sx, SxExpr, render_to_sx_with_env, _ctx_to_env, full_page_sx
|
||||
|
||||
# Load test-specific .sx components at import time
|
||||
load_service_components(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
|
||||
def setup_test_pages() -> None:
|
||||
"""Load test page definitions."""
|
||||
_load_test_page_files()
|
||||
|
||||
|
||||
def _format_time(ts: float | None) -> str:
|
||||
"""Format a unix timestamp for display."""
|
||||
if not ts:
|
||||
return "never"
|
||||
return datetime.fromtimestamp(ts).strftime("%-d %b %Y, %H:%M:%S")
|
||||
|
||||
|
||||
_FILTER_MAP = {
|
||||
"passed": "passed",
|
||||
"failed": "failed",
|
||||
"errors": "error",
|
||||
"skipped": "skipped",
|
||||
}
|
||||
|
||||
|
||||
def _filter_tests(tests: list[dict], active_filter: str | None,
|
||||
active_service: str | None) -> list[dict]:
|
||||
"""Filter tests by outcome and/or service."""
|
||||
from runner import _service_from_nodeid
|
||||
filtered = tests
|
||||
if active_filter and active_filter in _FILTER_MAP:
|
||||
outcome = _FILTER_MAP[active_filter]
|
||||
filtered = [t for t in filtered if t["outcome"] == outcome]
|
||||
if active_service:
|
||||
filtered = [t for t in filtered if _service_from_nodeid(t["nodeid"]) == active_service]
|
||||
return filtered
|
||||
|
||||
|
||||
def _service_list() -> list[str]:
|
||||
from runner import _SERVICE_ORDER
|
||||
return list(_SERVICE_ORDER)
|
||||
|
||||
|
||||
def _build_summary_data(result: dict | None, running: bool, csrf: str,
|
||||
active_filter: str | None) -> dict:
|
||||
"""Prepare summary data dict for the ~test-results-partial defcomp."""
|
||||
if running and not result:
|
||||
return dict(state="running", status="running", passed="0", failed="0",
|
||||
errors="0", skipped="0", total="0", duration="...",
|
||||
last_run="in progress", running=True, csrf=csrf,
|
||||
active_filter=active_filter)
|
||||
if not result:
|
||||
return dict(state="no-results", status=None, passed="0", failed="0",
|
||||
errors="0", skipped="0", total="0", duration="0",
|
||||
last_run="never", running=running, csrf=csrf,
|
||||
active_filter=active_filter)
|
||||
status = "running" if running else result["status"]
|
||||
return dict(
|
||||
state="running" if running else "has-results",
|
||||
status=status,
|
||||
passed=str(result["passed"]),
|
||||
failed=str(result["failed"]),
|
||||
errors=str(result["errors"]),
|
||||
skipped=str(result.get("skipped", 0)),
|
||||
total=str(result["total"]),
|
||||
duration=str(result["duration"]),
|
||||
last_run=_format_time(result["finished_at"]) if not running else "in progress",
|
||||
running=running, csrf=csrf,
|
||||
active_filter=active_filter,
|
||||
)
|
||||
|
||||
|
||||
async def test_detail_sx(test: dict) -> str:
|
||||
"""Return s-expression wire format for a test detail view."""
|
||||
return await render_to_sx("test-detail-section", test=test)
|
||||
|
||||
|
||||
async def render_dashboard_page_sx(ctx: dict, result: dict | None,
|
||||
running: bool, csrf: str,
|
||||
active_filter: str | None = None,
|
||||
active_service: str | None = None) -> str:
|
||||
"""Full page: test dashboard (sx wire format)."""
|
||||
from runner import group_tests_by_service
|
||||
|
||||
summary_data = _build_summary_data(result, running, csrf, active_filter)
|
||||
sections = []
|
||||
has_failures = "false"
|
||||
if result and not running:
|
||||
tests = _filter_tests(result.get("tests", []), active_filter, active_service)
|
||||
if tests:
|
||||
sections = group_tests_by_service(tests)
|
||||
has_failures = str(result["failed"] > 0 or result["errors"] > 0).lower()
|
||||
else:
|
||||
summary_data["state"] = "empty-filtered"
|
||||
|
||||
inner = await render_to_sx("test-results-partial",
|
||||
summary_data=summary_data, sections=sections, has_failures=has_failures)
|
||||
content = await render_to_sx("test-results-wrap", running=running, inner=SxExpr(inner))
|
||||
hdr = await render_to_sx_with_env("test-layout-full", _ctx_to_env(ctx),
|
||||
services=_service_list(),
|
||||
active_service=active_service,
|
||||
)
|
||||
return await full_page_sx(ctx, header_rows=hdr, content=content)
|
||||
|
||||
|
||||
async def render_results_partial_sx(result: dict | None, running: bool,
|
||||
csrf: str,
|
||||
active_filter: str | None = None,
|
||||
active_service: str | None = None) -> str:
|
||||
"""HTMX partial: results section (sx wire format)."""
|
||||
from runner import group_tests_by_service
|
||||
|
||||
summary_data = _build_summary_data(result, running, csrf, active_filter)
|
||||
sections = []
|
||||
has_failures = "false"
|
||||
if result and not running:
|
||||
tests = _filter_tests(result.get("tests", []), active_filter, active_service)
|
||||
if tests:
|
||||
sections = group_tests_by_service(tests)
|
||||
has_failures = str(result["failed"] > 0 or result["errors"] > 0).lower()
|
||||
else:
|
||||
summary_data["state"] = "empty-filtered"
|
||||
|
||||
inner = await render_to_sx("test-results-partial",
|
||||
summary_data=summary_data, sections=sections, has_failures=has_failures)
|
||||
return await render_to_sx("test-results-wrap", running=running, inner=SxExpr(inner))
|
||||
|
||||
|
||||
async def render_test_detail_page_sx(ctx: dict, test: dict) -> str:
|
||||
"""Full page: test detail (sx wire format)."""
|
||||
hdr = await render_to_sx_with_env("test-detail-layout-full", _ctx_to_env(ctx),
|
||||
services=_service_list(),
|
||||
test_nodeid=test["nodeid"],
|
||||
test_label=test["nodeid"].rsplit("::", 1)[-1],
|
||||
)
|
||||
content = await render_to_sx("test-detail",
|
||||
nodeid=test["nodeid"],
|
||||
outcome=test["outcome"],
|
||||
duration=str(test["duration"]),
|
||||
longrepr=test.get("longrepr", ""),
|
||||
)
|
||||
return await full_page_sx(ctx, header_rows=hdr, content=content)
|
||||
def _load_test_page_files() -> None:
|
||||
import os
|
||||
from shared.sx.pages import load_page_dir
|
||||
load_page_dir(os.path.dirname(__file__), "test")
|
||||
|
||||
145
test/sxc/pages/renders.py
Normal file
145
test/sxc/pages/renders.py
Normal file
@@ -0,0 +1,145 @@
|
||||
"""Test service render functions — called from bp routes."""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
from shared.sx.jinja_bridge import load_service_components
|
||||
from shared.sx.helpers import render_to_sx, SxExpr, render_to_sx_with_env, _ctx_to_env, full_page_sx
|
||||
|
||||
# Load test-specific .sx components at import time
|
||||
load_service_components(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
|
||||
|
||||
|
||||
def _format_time(ts: float | None) -> str:
|
||||
"""Format a unix timestamp for display."""
|
||||
if not ts:
|
||||
return "never"
|
||||
return datetime.fromtimestamp(ts).strftime("%-d %b %Y, %H:%M:%S")
|
||||
|
||||
|
||||
_FILTER_MAP = {
|
||||
"passed": "passed",
|
||||
"failed": "failed",
|
||||
"errors": "error",
|
||||
"skipped": "skipped",
|
||||
}
|
||||
|
||||
|
||||
def _filter_tests(tests: list[dict], active_filter: str | None,
|
||||
active_service: str | None) -> list[dict]:
|
||||
"""Filter tests by outcome and/or service."""
|
||||
from runner import _service_from_nodeid
|
||||
filtered = tests
|
||||
if active_filter and active_filter in _FILTER_MAP:
|
||||
outcome = _FILTER_MAP[active_filter]
|
||||
filtered = [t for t in filtered if t["outcome"] == outcome]
|
||||
if active_service:
|
||||
filtered = [t for t in filtered if _service_from_nodeid(t["nodeid"]) == active_service]
|
||||
return filtered
|
||||
|
||||
|
||||
def _service_list() -> list[str]:
|
||||
from runner import _SERVICE_ORDER
|
||||
return list(_SERVICE_ORDER)
|
||||
|
||||
|
||||
def _build_summary_data(result: dict | None, running: bool, csrf: str,
|
||||
active_filter: str | None) -> dict:
|
||||
"""Prepare summary data dict for the ~test-results-partial defcomp."""
|
||||
if running and not result:
|
||||
return dict(state="running", status="running", passed="0", failed="0",
|
||||
errors="0", skipped="0", total="0", duration="...",
|
||||
last_run="in progress", running=True, csrf=csrf,
|
||||
active_filter=active_filter)
|
||||
if not result:
|
||||
return dict(state="no-results", status=None, passed="0", failed="0",
|
||||
errors="0", skipped="0", total="0", duration="0",
|
||||
last_run="never", running=running, csrf=csrf,
|
||||
active_filter=active_filter)
|
||||
status = "running" if running else result["status"]
|
||||
return dict(
|
||||
state="running" if running else "has-results",
|
||||
status=status,
|
||||
passed=str(result["passed"]),
|
||||
failed=str(result["failed"]),
|
||||
errors=str(result["errors"]),
|
||||
skipped=str(result.get("skipped", 0)),
|
||||
total=str(result["total"]),
|
||||
duration=str(result["duration"]),
|
||||
last_run=_format_time(result["finished_at"]) if not running else "in progress",
|
||||
running=running, csrf=csrf,
|
||||
active_filter=active_filter,
|
||||
)
|
||||
|
||||
|
||||
async def test_detail_sx(test: dict) -> str:
|
||||
"""Return s-expression wire format for a test detail view."""
|
||||
return await render_to_sx("test-detail-section", test=test)
|
||||
|
||||
|
||||
async def render_dashboard_page_sx(ctx: dict, result: dict | None,
|
||||
running: bool, csrf: str,
|
||||
active_filter: str | None = None,
|
||||
active_service: str | None = None) -> str:
|
||||
"""Full page: test dashboard (sx wire format)."""
|
||||
from runner import group_tests_by_service
|
||||
|
||||
summary_data = _build_summary_data(result, running, csrf, active_filter)
|
||||
sections = []
|
||||
has_failures = "false"
|
||||
if result and not running:
|
||||
tests = _filter_tests(result.get("tests", []), active_filter, active_service)
|
||||
if tests:
|
||||
sections = group_tests_by_service(tests)
|
||||
has_failures = str(result["failed"] > 0 or result["errors"] > 0).lower()
|
||||
else:
|
||||
summary_data["state"] = "empty-filtered"
|
||||
|
||||
inner = await render_to_sx("test-results-partial",
|
||||
summary_data=summary_data, sections=sections, has_failures=has_failures)
|
||||
content = await render_to_sx("test-results-wrap", running=running, inner=SxExpr(inner))
|
||||
hdr = await render_to_sx_with_env("test-layout-full", _ctx_to_env(ctx),
|
||||
services=_service_list(),
|
||||
active_service=active_service,
|
||||
)
|
||||
return await full_page_sx(ctx, header_rows=hdr, content=content)
|
||||
|
||||
|
||||
async def render_results_partial_sx(result: dict | None, running: bool,
|
||||
csrf: str,
|
||||
active_filter: str | None = None,
|
||||
active_service: str | None = None) -> str:
|
||||
"""HTMX partial: results section (sx wire format)."""
|
||||
from runner import group_tests_by_service
|
||||
|
||||
summary_data = _build_summary_data(result, running, csrf, active_filter)
|
||||
sections = []
|
||||
has_failures = "false"
|
||||
if result and not running:
|
||||
tests = _filter_tests(result.get("tests", []), active_filter, active_service)
|
||||
if tests:
|
||||
sections = group_tests_by_service(tests)
|
||||
has_failures = str(result["failed"] > 0 or result["errors"] > 0).lower()
|
||||
else:
|
||||
summary_data["state"] = "empty-filtered"
|
||||
|
||||
inner = await render_to_sx("test-results-partial",
|
||||
summary_data=summary_data, sections=sections, has_failures=has_failures)
|
||||
return await render_to_sx("test-results-wrap", running=running, inner=SxExpr(inner))
|
||||
|
||||
|
||||
async def render_test_detail_page_sx(ctx: dict, test: dict) -> str:
|
||||
"""Full page: test detail (sx wire format)."""
|
||||
hdr = await render_to_sx_with_env("test-detail-layout-full", _ctx_to_env(ctx),
|
||||
services=_service_list(),
|
||||
test_nodeid=test["nodeid"],
|
||||
test_label=test["nodeid"].rsplit("::", 1)[-1],
|
||||
)
|
||||
content = await render_to_sx("test-detail",
|
||||
nodeid=test["nodeid"],
|
||||
outcome=test["outcome"],
|
||||
duration=str(test["duration"]),
|
||||
longrepr=test.get("longrepr", ""),
|
||||
)
|
||||
return await full_page_sx(ctx, header_rows=hdr, content=content)
|
||||
Reference in New Issue
Block a user