Some checks failed
Build and Deploy / build-and-deploy (push) Has been cancelled
Phase 1 — Macros: defmacro + quasiquote syntax (`, ,, ,@) in parser,
evaluator, HTML renderer, and JS mirror. Macro type, expansion, and
round-trip serialization.
Phase 2 — Expanded primitives: app-url, url-for, asset-url, config,
format-date, parse-int (pure); service, request-arg, request-path,
nav-tree, get-children (I/O); jinja-global, relations-from (pure).
Updated _io_service to accept (service "registry-name" "method" :kwargs)
with auto kebab→snake conversion. DTO-to-dict now expands datetime fields
into year/month/day convenience keys. Tuple returns converted to lists.
Phase 3 — Declarative handlers: HandlerDef type, defhandler special form,
handler registry (service → name → HandlerDef), async evaluator+renderer
(async_eval.py) that awaits I/O primitives inline within control flow.
Handler loading from .sx files, execute_handler, blueprint factory.
Phase 4 — Convert all fragment routes: 13 Python fragment handlers across
8 services replaced with declarative .sx handler files. All routes.py
simplified to uniform sx dispatch pattern. Two Jinja HTML handlers
(events/container-cards, events/account-page) kept as Python.
New files: shared/sx/async_eval.py, shared/sx/handlers.py,
shared/sx/tests/test_handlers.py, plus 13 handler .sx files under
{service}/sx/handlers/. MarketService.product_by_slug() added.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
665 lines
25 KiB
Python
665 lines
25 KiB
Python
"""
|
|
Shared helper functions for s-expression page rendering.
|
|
|
|
These are used by per-service sx_components.py files to build common
|
|
page elements (headers, search, etc.) from template context.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from markupsafe import escape
|
|
|
|
from .page import SEARCH_HEADERS_MOBILE, SEARCH_HEADERS_DESKTOP
|
|
from .parser import SxExpr
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Pre-computed CSS classes for inline sx built by Python helpers
|
|
# ---------------------------------------------------------------------------
|
|
# These :class strings appear in post_header_sx / post_admin_header_sx etc.
|
|
# They're static — scan once at import time so they aren't re-scanned per request.
|
|
|
|
_HELPER_CLASS_SOURCES = [
|
|
':class "flex flex-col sm:flex-row sm:items-center gap-2 border-r border-stone-200 mr-2 sm:max-w-2xl"',
|
|
':class "relative nav-group"',
|
|
':class "justify-center cursor-pointer flex flex-row items-center gap-2 rounded bg-stone-200 text-black p-3"',
|
|
':class "!bg-stone-500 !text-white"',
|
|
':class "fa fa-cog"',
|
|
':class "fa fa-shield-halved"',
|
|
':class "text-white"',
|
|
':class "justify-center cursor-pointer flex flex-row items-center gap-2 rounded !bg-stone-500 !text-white p-3"',
|
|
]
|
|
|
|
|
|
def _scan_helper_classes() -> frozenset[str]:
|
|
"""Scan the static class strings from helper functions once."""
|
|
from .css_registry import scan_classes_from_sx
|
|
combined = " ".join(_HELPER_CLASS_SOURCES)
|
|
return frozenset(scan_classes_from_sx(combined))
|
|
|
|
|
|
HELPER_CSS_CLASSES: frozenset[str] = _scan_helper_classes()
|
|
|
|
|
|
def call_url(ctx: dict, key: str, path: str = "/") -> str:
|
|
"""Call a URL helper from context (e.g., blog_url, account_url)."""
|
|
fn = ctx.get(key)
|
|
if callable(fn):
|
|
return fn(path)
|
|
return str(fn or "") + path
|
|
|
|
|
|
def get_asset_url(ctx: dict) -> str:
|
|
"""Extract the asset URL base from context."""
|
|
au = ctx.get("asset_url")
|
|
if callable(au):
|
|
result = au("")
|
|
return result.rsplit("/", 1)[0] if "/" in result else result
|
|
return au or ""
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Sx-native helper functions — return sx source (not HTML)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _as_sx(val: Any) -> SxExpr | None:
|
|
"""Coerce a fragment value to SxExpr.
|
|
|
|
If *val* is already a ``SxExpr`` (from a ``text/sx`` fragment),
|
|
return it as-is. If it's a non-empty string (HTML from a
|
|
``text/html`` fragment), wrap it in ``~rich-text``. Otherwise
|
|
return ``None``.
|
|
"""
|
|
if not val:
|
|
return None
|
|
if isinstance(val, SxExpr):
|
|
return val
|
|
html = str(val)
|
|
escaped = html.replace("\\", "\\\\").replace('"', '\\"')
|
|
return SxExpr(f'(~rich-text :html "{escaped}")')
|
|
|
|
|
|
def root_header_sx(ctx: dict, *, oob: bool = False) -> str:
|
|
"""Build the root header row as a sx call string."""
|
|
rights = ctx.get("rights") or {}
|
|
is_admin = rights.get("admin") if isinstance(rights, dict) else getattr(rights, "admin", False)
|
|
settings_url = call_url(ctx, "blog_url", "/settings/") if is_admin else ""
|
|
return sx_call("header-row-sx",
|
|
cart_mini=_as_sx(ctx.get("cart_mini")),
|
|
blog_url=call_url(ctx, "blog_url", ""),
|
|
site_title=ctx.get("base_title", ""),
|
|
app_label=ctx.get("app_label", ""),
|
|
nav_tree=_as_sx(ctx.get("nav_tree")),
|
|
auth_menu=_as_sx(ctx.get("auth_menu")),
|
|
nav_panel=_as_sx(ctx.get("nav_panel")),
|
|
settings_url=settings_url,
|
|
is_admin=is_admin,
|
|
oob=oob,
|
|
)
|
|
|
|
|
|
def search_mobile_sx(ctx: dict) -> str:
|
|
"""Build mobile search input as sx call string."""
|
|
return sx_call("search-mobile",
|
|
current_local_href=ctx.get("current_local_href", "/"),
|
|
search=ctx.get("search", ""),
|
|
search_count=ctx.get("search_count", ""),
|
|
hx_select=ctx.get("hx_select", "#main-panel"),
|
|
search_headers_mobile=SEARCH_HEADERS_MOBILE,
|
|
)
|
|
|
|
|
|
def search_desktop_sx(ctx: dict) -> str:
|
|
"""Build desktop search input as sx call string."""
|
|
return sx_call("search-desktop",
|
|
current_local_href=ctx.get("current_local_href", "/"),
|
|
search=ctx.get("search", ""),
|
|
search_count=ctx.get("search_count", ""),
|
|
hx_select=ctx.get("hx_select", "#main-panel"),
|
|
search_headers_desktop=SEARCH_HEADERS_DESKTOP,
|
|
)
|
|
|
|
|
|
def post_header_sx(ctx: dict, *, oob: bool = False) -> str:
|
|
"""Build the post-level header row as sx call string."""
|
|
post = ctx.get("post") or {}
|
|
slug = post.get("slug", "")
|
|
if not slug:
|
|
return ""
|
|
title = (post.get("title") or "")[:160]
|
|
feature_image = post.get("feature_image")
|
|
|
|
label_sx = sx_call("post-label", feature_image=feature_image, title=title)
|
|
|
|
nav_parts: list[str] = []
|
|
page_cart_count = ctx.get("page_cart_count", 0)
|
|
if page_cart_count and page_cart_count > 0:
|
|
cart_href = call_url(ctx, "cart_url", f"/{slug}/")
|
|
nav_parts.append(sx_call("page-cart-badge", href=cart_href, count=str(page_cart_count)))
|
|
|
|
container_nav = ctx.get("container_nav")
|
|
if container_nav:
|
|
nav_parts.append(
|
|
f'(div :id "entries-calendars-nav-wrapper"'
|
|
f' :class "flex flex-col sm:flex-row sm:items-center gap-2 border-r border-stone-200 mr-2 sm:max-w-2xl"'
|
|
f' {container_nav})'
|
|
)
|
|
|
|
# Admin cog
|
|
admin_nav = ctx.get("post_admin_nav")
|
|
if not admin_nav:
|
|
rights = ctx.get("rights") or {}
|
|
has_admin = rights.get("admin") if isinstance(rights, dict) else getattr(rights, "admin", False)
|
|
if has_admin and slug:
|
|
from quart import request
|
|
admin_href = call_url(ctx, "blog_url", f"/{slug}/admin/")
|
|
is_admin_page = ctx.get("is_admin_section") or "/admin" in request.path
|
|
sel_cls = "!bg-stone-500 !text-white" if is_admin_page else ""
|
|
base_cls = ("justify-center cursor-pointer flex flex-row"
|
|
" items-center gap-2 rounded bg-stone-200 text-black p-3")
|
|
admin_nav = (
|
|
f'(div :class "relative nav-group"'
|
|
f' (a :href "{admin_href}"'
|
|
f' :class "{base_cls} {sel_cls}"'
|
|
f' (i :class "fa fa-cog" :aria-hidden "true")))'
|
|
)
|
|
if admin_nav:
|
|
nav_parts.append(admin_nav)
|
|
|
|
nav_sx = "(<> " + " ".join(nav_parts) + ")" if nav_parts else None
|
|
link_href = call_url(ctx, "blog_url", f"/{slug}/")
|
|
|
|
return sx_call("menu-row-sx",
|
|
id="post-row", level=1,
|
|
link_href=link_href,
|
|
link_label_content=SxExpr(label_sx),
|
|
nav=SxExpr(nav_sx) if nav_sx else None,
|
|
child_id="post-header-child",
|
|
oob=oob, external=True,
|
|
)
|
|
|
|
|
|
def post_admin_header_sx(ctx: dict, slug: str, *, oob: bool = False,
|
|
selected: str = "", admin_href: str = "") -> str:
|
|
"""Post admin header row as sx call string."""
|
|
# Label
|
|
label_parts = ['(i :class "fa fa-shield-halved" :aria-hidden "true")', '" admin"']
|
|
if selected:
|
|
label_parts.append(f'(span :class "text-white" "{escape(selected)}")')
|
|
label_sx = "(<> " + " ".join(label_parts) + ")"
|
|
|
|
# Nav items
|
|
select_colours = ctx.get("select_colours", "")
|
|
base_cls = ("justify-center cursor-pointer flex flex-row items-center"
|
|
" gap-2 rounded bg-stone-200 text-black p-3")
|
|
selected_cls = ("justify-center cursor-pointer flex flex-row items-center"
|
|
" gap-2 rounded !bg-stone-500 !text-white p-3")
|
|
nav_parts: list[str] = []
|
|
items = [
|
|
("events_url", f"/{slug}/admin/", "calendars"),
|
|
("market_url", f"/{slug}/admin/", "markets"),
|
|
("cart_url", f"/{slug}/admin/payments/", "payments"),
|
|
("blog_url", f"/{slug}/admin/entries/", "entries"),
|
|
("blog_url", f"/{slug}/admin/data/", "data"),
|
|
("blog_url", f"/{slug}/admin/preview/", "preview"),
|
|
("blog_url", f"/{slug}/admin/edit/", "edit"),
|
|
("blog_url", f"/{slug}/admin/settings/", "settings"),
|
|
]
|
|
for url_key, path, label in items:
|
|
url_fn = ctx.get(url_key)
|
|
if not callable(url_fn):
|
|
continue
|
|
href = url_fn(path)
|
|
is_sel = label == selected
|
|
cls = selected_cls if is_sel else base_cls
|
|
aria = "true" if is_sel else None
|
|
nav_parts.append(
|
|
f'(div :class "relative nav-group"'
|
|
f' (a :href "{escape(href)}"'
|
|
+ (f' :aria-selected "true"' if aria else "")
|
|
+ f' :class "{cls} {escape(select_colours)}"'
|
|
+ f' "{escape(label)}"))'
|
|
)
|
|
nav_sx = "(<> " + " ".join(nav_parts) + ")" if nav_parts else None
|
|
|
|
if not admin_href:
|
|
blog_fn = ctx.get("blog_url")
|
|
admin_href = blog_fn(f"/{slug}/admin/") if callable(blog_fn) else f"/{slug}/admin/"
|
|
|
|
return sx_call("menu-row-sx",
|
|
id="post-admin-row", level=2,
|
|
link_href=admin_href,
|
|
link_label_content=SxExpr(label_sx),
|
|
nav=SxExpr(nav_sx) if nav_sx else None,
|
|
child_id="post-admin-header-child", oob=oob,
|
|
)
|
|
|
|
|
|
def oob_header_sx(parent_id: str, child_id: str, row_sx: str) -> str:
|
|
"""Wrap a header row sx in an OOB swap.
|
|
|
|
child_id is accepted for call-site compatibility but no longer used —
|
|
the child placeholder is created by ~menu-row-sx itself.
|
|
"""
|
|
return sx_call("oob-header-sx",
|
|
parent_id=parent_id,
|
|
row=SxExpr(row_sx),
|
|
)
|
|
|
|
|
|
def header_child_sx(inner_sx: str, *, id: str = "root-header-child") -> str:
|
|
"""Wrap inner sx in a header-child div."""
|
|
return sx_call("header-child-sx",
|
|
id=id, inner=SxExpr(f"(<> {inner_sx})"),
|
|
)
|
|
|
|
|
|
def oob_page_sx(*, oobs: str = "", filter: str = "", aside: str = "",
|
|
content: str = "", menu: str = "") -> str:
|
|
"""Build OOB response as sx call string."""
|
|
return sx_call("oob-sx",
|
|
oobs=SxExpr(f"(<> {oobs})") if oobs else None,
|
|
filter=SxExpr(filter) if filter else None,
|
|
aside=SxExpr(aside) if aside else None,
|
|
menu=SxExpr(menu) if menu else None,
|
|
content=SxExpr(content) if content else None,
|
|
)
|
|
|
|
|
|
def full_page_sx(ctx: dict, *, header_rows: str,
|
|
filter: str = "", aside: str = "",
|
|
content: str = "", menu: str = "",
|
|
meta_html: str = "", meta: str = "") -> str:
|
|
"""Build a full page using sx_page() with ~app-body.
|
|
|
|
meta_html: raw HTML injected into the <head> shell (legacy).
|
|
meta: sx source for meta tags — auto-hoisted to <head> by sx.js.
|
|
"""
|
|
body_sx = sx_call("app-body",
|
|
header_rows=SxExpr(f"(<> {header_rows})") if header_rows else None,
|
|
filter=SxExpr(filter) if filter else None,
|
|
aside=SxExpr(aside) if aside else None,
|
|
menu=SxExpr(menu) if menu else None,
|
|
content=SxExpr(content) if content else None,
|
|
)
|
|
if meta:
|
|
# Wrap body + meta in a fragment so sx.js renders both;
|
|
# auto-hoist moves meta/title/link elements to <head>.
|
|
body_sx = "(<> " + meta + " " + body_sx + ")"
|
|
return sx_page(ctx, body_sx, meta_html=meta_html)
|
|
|
|
|
|
def sx_call(component_name: str, **kwargs: Any) -> str:
|
|
"""Build an s-expression component call string from Python kwargs.
|
|
|
|
Converts snake_case to kebab-case automatically::
|
|
|
|
sx_call("test-row", nodeid="foo", outcome="passed")
|
|
# => '(~test-row :nodeid "foo" :outcome "passed")'
|
|
|
|
Values are serialized: strings are quoted, None becomes nil,
|
|
bools become true/false, numbers stay as-is.
|
|
"""
|
|
from .parser import serialize
|
|
name = component_name if component_name.startswith("~") else f"~{component_name}"
|
|
parts = [name]
|
|
for key, val in kwargs.items():
|
|
parts.append(f":{key.replace('_', '-')}")
|
|
parts.append(serialize(val))
|
|
return "(" + " ".join(parts) + ")"
|
|
|
|
|
|
def components_for_request() -> str:
|
|
"""Return defcomp/defmacro source for definitions the client doesn't have yet.
|
|
|
|
Reads the ``SX-Components`` header (comma-separated component names
|
|
like ``~card,~nav-item``) and returns only the definitions the client
|
|
is missing. If the header is absent, returns all defs.
|
|
"""
|
|
from quart import request
|
|
from .jinja_bridge import client_components_tag, _COMPONENT_ENV
|
|
from .types import Component, Macro
|
|
from .parser import serialize
|
|
|
|
loaded_raw = request.headers.get("SX-Components", "")
|
|
if not loaded_raw:
|
|
# Client has nothing — send all
|
|
tag = client_components_tag()
|
|
if not tag:
|
|
return ""
|
|
start = tag.find(">") + 1
|
|
end = tag.rfind("</script>")
|
|
return tag[start:end] if start > 0 and end > start else ""
|
|
|
|
loaded = set(loaded_raw.split(","))
|
|
parts = []
|
|
for key, val in _COMPONENT_ENV.items():
|
|
if isinstance(val, Component):
|
|
# Skip components the client already has
|
|
if f"~{val.name}" in loaded or val.name in loaded:
|
|
continue
|
|
# Reconstruct defcomp source
|
|
param_strs = ["&key"] + list(val.params)
|
|
if val.has_children:
|
|
param_strs.extend(["&rest", "children"])
|
|
params_sx = "(" + " ".join(param_strs) + ")"
|
|
body_sx = serialize(val.body, pretty=True)
|
|
parts.append(f"(defcomp ~{val.name} {params_sx} {body_sx})")
|
|
elif isinstance(val, Macro):
|
|
if val.name in loaded:
|
|
continue
|
|
param_strs = list(val.params)
|
|
if val.rest_param:
|
|
param_strs.extend(["&rest", val.rest_param])
|
|
params_sx = "(" + " ".join(param_strs) + ")"
|
|
body_sx = serialize(val.body, pretty=True)
|
|
parts.append(f"(defmacro {val.name} {params_sx} {body_sx})")
|
|
return "\n".join(parts)
|
|
|
|
|
|
def sx_response(source_or_component: str, status: int = 200,
|
|
headers: dict | None = None, **kwargs: Any):
|
|
"""Return an s-expression wire-format response.
|
|
|
|
Can be called with a raw sx string::
|
|
|
|
return sx_response('(~test-row :nodeid "foo")')
|
|
|
|
Or with a component name + kwargs (builds the sx call)::
|
|
|
|
return sx_response("test-row", nodeid="foo", outcome="passed")
|
|
|
|
For SX requests, missing component definitions are prepended as a
|
|
``<script type="text/sx" data-components>`` block so the client
|
|
can process them before rendering OOB content.
|
|
"""
|
|
from quart import request, Response
|
|
if kwargs:
|
|
source = sx_call(source_or_component, **kwargs)
|
|
else:
|
|
source = source_or_component
|
|
|
|
body = source
|
|
# Validate the sx source parses as a single expression
|
|
try:
|
|
from .parser import parse as _parse_check
|
|
_parse_check(source)
|
|
except Exception as _e:
|
|
import logging
|
|
logging.getLogger("sx").error("sx_response parse error: %s\nSource (first 500): %s", _e, source[:500])
|
|
|
|
# For SX requests, prepend missing component definitions
|
|
comp_defs = ""
|
|
if request.headers.get("SX-Request"):
|
|
comp_defs = components_for_request()
|
|
if comp_defs:
|
|
body = (f'<script type="text/sx" data-components>'
|
|
f'{comp_defs}</script>\n{body}')
|
|
|
|
# On-demand CSS: scan source for classes, send only new rules
|
|
from .css_registry import scan_classes_from_sx, lookup_rules, registry_loaded, lookup_css_hash, store_css_hash
|
|
from .jinja_bridge import _COMPONENT_ENV
|
|
from .types import Component as _Component
|
|
new_classes: set[str] = set()
|
|
cumulative_classes: set[str] = set()
|
|
if registry_loaded():
|
|
new_classes = scan_classes_from_sx(source)
|
|
# Include pre-computed helper classes (menu bars, admin nav, etc.)
|
|
new_classes.update(HELPER_CSS_CLASSES)
|
|
if comp_defs:
|
|
# Use pre-computed classes for components being sent
|
|
for key, val in _COMPONENT_ENV.items():
|
|
if isinstance(val, _Component) and val.css_classes:
|
|
new_classes.update(val.css_classes)
|
|
|
|
# Resolve known classes from SX-Css header (hash or full list)
|
|
known_classes: set[str] = set()
|
|
known_raw = request.headers.get("SX-Css", "")
|
|
if known_raw:
|
|
if len(known_raw) <= 16:
|
|
# Treat as hash
|
|
looked_up = lookup_css_hash(known_raw)
|
|
if looked_up is not None:
|
|
known_classes = looked_up
|
|
else:
|
|
# Cache miss — send all classes (safe fallback)
|
|
known_classes = set()
|
|
else:
|
|
known_classes = set(known_raw.split(","))
|
|
|
|
cumulative_classes = known_classes | new_classes
|
|
new_classes -= known_classes
|
|
|
|
if new_classes:
|
|
new_rules = lookup_rules(new_classes)
|
|
if new_rules:
|
|
body = f'<style data-sx-css>{new_rules}</style>\n{body}'
|
|
|
|
# Dev mode: pretty-print sx source for readable Network tab responses
|
|
if _is_dev_mode():
|
|
body = _pretty_print_sx_body(body)
|
|
|
|
resp = Response(body, status=status, content_type="text/sx")
|
|
if new_classes:
|
|
resp.headers["SX-Css-Add"] = ",".join(sorted(new_classes))
|
|
if cumulative_classes:
|
|
resp.headers["SX-Css-Hash"] = store_css_hash(cumulative_classes)
|
|
if headers:
|
|
for k, v in headers.items():
|
|
resp.headers[k] = v
|
|
return resp
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Sx wire-format full page shell
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_SX_PAGE_TEMPLATE = """\
|
|
<!doctype html>
|
|
<html lang="en">
|
|
<head>
|
|
<meta charset="utf-8">
|
|
<meta name="viewport" content="width=device-width, initial-scale=1">
|
|
<meta name="robots" content="index,follow">
|
|
<meta name="theme-color" content="#ffffff">
|
|
<title>{title}</title>
|
|
{meta_html}
|
|
<style>@media (min-width: 768px) {{ .js-mobile-sentinel {{ display:none !important; }} }}</style>
|
|
<meta name="csrf-token" content="{csrf}">
|
|
<style id="sx-css">{sx_css}</style>
|
|
<meta name="sx-css-classes" content="{sx_css_classes}">
|
|
<script src="https://unpkg.com/prismjs/prism.js"></script>
|
|
<script src="https://unpkg.com/prismjs/components/prism-javascript.min.js"></script>
|
|
<script src="https://unpkg.com/prismjs/components/prism-python.min.js"></script>
|
|
<script src="https://unpkg.com/prismjs/components/prism-bash.min.js"></script>
|
|
<script src="https://cdn.jsdelivr.net/npm/sweetalert2@11"></script>
|
|
<script>if(matchMedia('(hover:hover) and (pointer:fine)').matches){{document.documentElement.classList.add('hover-capable')}}</script>
|
|
<script>document.addEventListener('click',function(e){{var t=e.target.closest('[data-close-details]');if(!t)return;var d=t.closest('details');if(d)d.removeAttribute('open')}})</script>
|
|
<style>
|
|
details[data-toggle-group="mobile-panels"]>summary{{list-style:none}}
|
|
details[data-toggle-group="mobile-panels"]>summary::-webkit-details-marker{{display:none}}
|
|
@media(min-width:768px){{.nav-group:focus-within .submenu,.nav-group:hover .submenu{{display:block}}}}
|
|
img{{max-width:100%;height:auto}}
|
|
.clamp-2{{display:-webkit-box;-webkit-line-clamp:2;-webkit-box-orient:vertical;overflow:hidden}}
|
|
.clamp-3{{display:-webkit-box;-webkit-line-clamp:3;-webkit-box-orient:vertical;overflow:hidden}}
|
|
.no-scrollbar::-webkit-scrollbar{{display:none}}.no-scrollbar{{-ms-overflow-style:none;scrollbar-width:none}}
|
|
details.group{{overflow:hidden}}details.group>summary{{list-style:none}}details.group>summary::-webkit-details-marker{{display:none}}
|
|
.sx-indicator{{display:none}}.sx-request .sx-indicator{{display:inline-flex}}
|
|
.sx-error .sx-indicator{{display:none}}.sx-loading .sx-indicator{{display:inline-flex}}
|
|
.js-wrap.open .js-pop{{display:block}}.js-wrap.open .js-backdrop{{display:block}}
|
|
</style>
|
|
</head>
|
|
<body class="bg-stone-50 text-stone-900">
|
|
<script type="text/sx" data-components data-hash="{component_hash}">{component_defs}</script>
|
|
<script type="text/sx" data-mount="body">{page_sx}</script>
|
|
<script src="{asset_url}/scripts/sx.js?v={sx_js_hash}"></script>
|
|
<script src="{asset_url}/scripts/body.js?v={body_js_hash}"></script>
|
|
</body>
|
|
</html>"""
|
|
|
|
|
|
def sx_page(ctx: dict, page_sx: str, *,
|
|
meta_html: str = "") -> str:
|
|
"""Return a minimal HTML shell that boots the page from sx source.
|
|
|
|
The browser loads component definitions and page sx, then sx.js
|
|
renders everything client-side. CSS rules are scanned from the sx
|
|
source and component defs, then injected as a <style> block.
|
|
"""
|
|
from .jinja_bridge import client_components_tag, _COMPONENT_ENV, get_component_hash
|
|
from .css_registry import scan_classes_from_sx, lookup_rules, get_preamble, registry_loaded, store_css_hash
|
|
from .types import Component
|
|
|
|
component_hash = get_component_hash()
|
|
|
|
# Check if client already has this version cached (via cookie)
|
|
client_hash = _get_sx_comp_cookie()
|
|
if client_hash and client_hash == component_hash:
|
|
# Client has current components cached — send empty source
|
|
component_defs = ""
|
|
else:
|
|
components_tag = client_components_tag()
|
|
# Extract just the inner source from the <script> tag
|
|
component_defs = ""
|
|
if components_tag:
|
|
start = components_tag.find(">") + 1
|
|
end = components_tag.rfind("</script>")
|
|
if start > 0 and end > start:
|
|
component_defs = components_tag[start:end]
|
|
|
|
# Scan for CSS classes — use pre-computed sets for components, scan page sx at request time
|
|
sx_css = ""
|
|
sx_css_classes = ""
|
|
sx_css_hash = ""
|
|
if registry_loaded():
|
|
# Union pre-computed component classes instead of re-scanning source
|
|
classes: set[str] = set()
|
|
for val in _COMPONENT_ENV.values():
|
|
if isinstance(val, Component) and val.css_classes:
|
|
classes.update(val.css_classes)
|
|
# Include pre-computed helper classes (menu bars, admin nav, etc.)
|
|
classes.update(HELPER_CSS_CLASSES)
|
|
# Page sx is unique per request — scan it
|
|
classes.update(scan_classes_from_sx(page_sx))
|
|
# Always include body classes
|
|
classes.update(["bg-stone-50", "text-stone-900"])
|
|
rules = lookup_rules(classes)
|
|
sx_css = get_preamble() + rules
|
|
sx_css_hash = store_css_hash(classes)
|
|
sx_css_classes = sx_css_hash
|
|
|
|
asset_url = get_asset_url(ctx)
|
|
title = ctx.get("base_title", "Rose Ash")
|
|
csrf = _get_csrf_token()
|
|
|
|
# Dev mode: pretty-print page sx for readable View Source
|
|
if _is_dev_mode() and page_sx and page_sx.startswith("("):
|
|
from .parser import parse as _parse, serialize as _serialize
|
|
try:
|
|
page_sx = _serialize(_parse(page_sx), pretty=True)
|
|
except Exception:
|
|
pass
|
|
|
|
return _SX_PAGE_TEMPLATE.format(
|
|
title=_html_escape(title),
|
|
asset_url=asset_url,
|
|
meta_html=meta_html,
|
|
csrf=_html_escape(csrf),
|
|
component_hash=component_hash,
|
|
component_defs=component_defs,
|
|
page_sx=page_sx,
|
|
sx_css=sx_css,
|
|
sx_css_classes=sx_css_classes,
|
|
sx_js_hash=_script_hash("sx.js"),
|
|
body_js_hash=_script_hash("body.js"),
|
|
)
|
|
|
|
|
|
_SCRIPT_HASH_CACHE: dict[str, str] = {}
|
|
|
|
|
|
def _script_hash(filename: str) -> str:
|
|
"""Compute MD5 hash of a static script file, cached for process lifetime."""
|
|
if filename not in _SCRIPT_HASH_CACHE:
|
|
try:
|
|
data = (Path("static") / "scripts" / filename).read_bytes()
|
|
_SCRIPT_HASH_CACHE[filename] = hashlib.md5(data).hexdigest()[:8]
|
|
except Exception:
|
|
_SCRIPT_HASH_CACHE[filename] = "dev"
|
|
return _SCRIPT_HASH_CACHE[filename]
|
|
|
|
|
|
def _get_csrf_token() -> str:
|
|
"""Get the CSRF token from the current request context."""
|
|
try:
|
|
from quart import g
|
|
return getattr(g, "csrf_token", "")
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def _get_sx_comp_cookie() -> str:
|
|
"""Read the sx-comp-hash cookie from the current request."""
|
|
try:
|
|
from quart import request
|
|
return request.cookies.get("sx-comp-hash", "")
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def _is_dev_mode() -> bool:
|
|
"""Check if running in dev mode (RELOAD=true)."""
|
|
import os
|
|
return os.getenv("RELOAD") == "true"
|
|
|
|
|
|
def _pretty_print_sx_body(body: str) -> str:
|
|
"""Pretty-print the sx portion of a response body, preserving HTML blocks."""
|
|
import re
|
|
from .parser import parse_all as _parse_all, serialize as _serialize
|
|
|
|
# Split HTML prefix blocks (<style>, <script>) from the sx tail
|
|
# These are always at the start, each on its own line
|
|
parts: list[str] = []
|
|
rest = body
|
|
while rest.startswith("<"):
|
|
end = rest.find(">", rest.find("</")) + 1
|
|
if end <= 0:
|
|
break
|
|
# Find end of the closing tag
|
|
tag_match = re.match(r'<(style|script)[^>]*>[\s\S]*?</\1>', rest)
|
|
if tag_match:
|
|
parts.append(tag_match.group(0))
|
|
rest = rest[tag_match.end():].lstrip("\n")
|
|
else:
|
|
break
|
|
|
|
sx_source = rest.strip()
|
|
if not sx_source or sx_source[0] != "(":
|
|
return body
|
|
|
|
try:
|
|
exprs = _parse_all(sx_source)
|
|
if len(exprs) == 1:
|
|
parts.append(_serialize(exprs[0], pretty=True))
|
|
else:
|
|
# Multiple top-level expressions — indent each
|
|
pretty_parts = [_serialize(expr, pretty=True) for expr in exprs]
|
|
parts.append("\n\n".join(pretty_parts))
|
|
return "\n\n".join(parts)
|
|
except Exception:
|
|
return body
|
|
|
|
|
|
def _html_escape(s: str) -> str:
|
|
"""Minimal HTML escaping for attribute values."""
|
|
return (s.replace("&", "&")
|
|
.replace("<", "<")
|
|
.replace(">", ">")
|
|
.replace('"', """))
|