Files
rose-ash/shared/sx/helpers.py
giles d850f7c9c1 Fix page registry: escape braces for str.format()
pages_sx contains SX dict literals with {} (empty closures) which
Python's str.format() interprets as positional placeholders, causing
a KeyError that was silently caught. Escape braces before formatting.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 21:16:35 +00:00

917 lines
34 KiB
Python

"""
Shared helper functions for s-expression page rendering.
These are used by per-service sxc/pages modules to build common
page elements (headers, search, etc.) from template context.
"""
from __future__ import annotations
import hashlib
from pathlib import Path
from typing import Any
from markupsafe import escape
from .page import SEARCH_HEADERS_MOBILE, SEARCH_HEADERS_DESKTOP
from .parser import SxExpr
def _sx_fragment(*parts: str) -> SxExpr:
"""Wrap pre-rendered SX wire format strings in a fragment.
Infrastructure utility for composing already-serialized SX strings.
NOT for building SX from Python data — use sx_call() or _render_to_sx().
"""
joined = " ".join(p for p in parts if p)
return SxExpr(f"(<> {joined})") if joined else SxExpr("")
def call_url(ctx: dict, key: str, path: str = "/") -> str:
"""Call a URL helper from context (e.g., blog_url, account_url)."""
fn = ctx.get(key)
if callable(fn):
return fn(path)
return str(fn or "") + path
def get_asset_url(ctx: dict) -> str:
"""Extract the asset URL base from context."""
au = ctx.get("asset_url")
if callable(au):
result = au("")
return result.rsplit("/", 1)[0] if "/" in result else result
return au or ""
# ---------------------------------------------------------------------------
# Sx-native helper functions — return sx source (not HTML)
# ---------------------------------------------------------------------------
def _as_sx(val: Any) -> SxExpr | None:
"""Coerce a fragment value to SxExpr.
If *val* is already a ``SxExpr`` (from a ``text/sx`` fragment),
return it as-is. If it's a non-empty string (HTML from a
``text/html`` fragment), wrap it in ``~rich-text``. Otherwise
return ``None``.
"""
if not val:
return None
if isinstance(val, SxExpr):
return val if val.source else None
html = str(val)
return sx_call("rich-text", html=html)
async def root_header_sx(ctx: dict, *, oob: bool = False) -> str:
"""Build the root header row as sx wire format."""
rights = ctx.get("rights") or {}
is_admin = rights.get("admin") if isinstance(rights, dict) else getattr(rights, "admin", False)
settings_url = call_url(ctx, "blog_url", "/settings/") if is_admin else ""
return await _render_to_sx("header-row-sx",
cart_mini=_as_sx(ctx.get("cart_mini")),
blog_url=call_url(ctx, "blog_url", ""),
site_title=ctx.get("base_title", ""),
app_label=ctx.get("app_label", ""),
nav_tree=_as_sx(ctx.get("nav_tree")),
auth_menu=_as_sx(ctx.get("auth_menu")),
nav_panel=_as_sx(ctx.get("nav_panel")),
settings_url=settings_url,
is_admin=is_admin,
oob=oob,
)
def mobile_menu_sx(*sections: str) -> SxExpr:
"""Assemble mobile menu from pre-built sections (deepest first)."""
parts = [s for s in sections if s]
return _sx_fragment(*parts) if parts else SxExpr("")
async def mobile_root_nav_sx(ctx: dict) -> str:
"""Root-level mobile nav via ~mobile-root-nav component."""
nav_tree = ctx.get("nav_tree") or ""
auth_menu = ctx.get("auth_menu") or ""
if not nav_tree and not auth_menu:
return ""
return await _render_to_sx("mobile-root-nav",
nav_tree=_as_sx(nav_tree),
auth_menu=_as_sx(auth_menu),
)
# ---------------------------------------------------------------------------
# Shared nav-item builders — used by BOTH desktop headers and mobile menus
# ---------------------------------------------------------------------------
async def _post_nav_items_sx(ctx: dict) -> SxExpr:
"""Build post-level nav items (container_nav + admin cog). Shared by
``post_header_sx`` (desktop) and ``post_mobile_nav_sx`` (mobile)."""
post = ctx.get("post") or {}
slug = post.get("slug", "")
if not slug:
return SxExpr("")
parts: list[str] = []
page_cart_count = ctx.get("page_cart_count", 0)
if page_cart_count and page_cart_count > 0:
cart_href = call_url(ctx, "cart_url", f"/{slug}/")
parts.append(await _render_to_sx("page-cart-badge", href=cart_href,
count=str(page_cart_count)))
container_nav = str(ctx.get("container_nav") or "").strip()
# Skip empty fragment wrappers like "(<> )"
if container_nav and container_nav.replace("(<>", "").replace(")", "").strip():
parts.append(await _render_to_sx("container-nav-wrapper",
content=SxExpr(container_nav)))
# Admin cog
admin_nav = ctx.get("post_admin_nav")
if not admin_nav:
rights = ctx.get("rights") or {}
has_admin = rights.get("admin") if isinstance(rights, dict) else getattr(rights, "admin", False)
if has_admin and slug:
from quart import request
admin_href = call_url(ctx, "blog_url", f"/{slug}/admin/")
is_admin_page = ctx.get("is_admin_section") or "/admin" in request.path
admin_nav = await _render_to_sx("admin-cog-button",
href=admin_href,
is_admin_page=is_admin_page or None)
if admin_nav:
parts.append(admin_nav)
return _sx_fragment(*parts) if parts else SxExpr("")
async def _post_admin_nav_items_sx(ctx: dict, slug: str,
selected: str = "") -> SxExpr:
"""Build post-admin nav items (calendars, markets, etc.). Shared by
``post_admin_header_sx`` (desktop) and mobile menu."""
select_colours = ctx.get("select_colours", "")
parts: list[str] = []
items = [
("events_url", f"/{slug}/admin/", "calendars"),
("market_url", f"/{slug}/admin/", "markets"),
("cart_url", f"/{slug}/admin/payments/", "payments"),
("blog_url", f"/{slug}/admin/entries/", "entries"),
("blog_url", f"/{slug}/admin/data/", "data"),
("blog_url", f"/{slug}/admin/preview/", "preview"),
("blog_url", f"/{slug}/admin/edit/", "edit"),
("blog_url", f"/{slug}/admin/settings/", "settings"),
]
for url_key, path, label in items:
url_fn = ctx.get(url_key)
if not callable(url_fn):
continue
href = url_fn(path)
is_sel = label == selected
parts.append(await _render_to_sx("nav-link", href=href, label=label,
select_colours=select_colours,
is_selected=is_sel or None))
return _sx_fragment(*parts) if parts else SxExpr("")
# ---------------------------------------------------------------------------
# Mobile menu section builders — wrap shared nav items for hamburger panel
# ---------------------------------------------------------------------------
async def post_mobile_nav_sx(ctx: dict) -> str:
"""Post-level mobile menu section."""
nav = await _post_nav_items_sx(ctx)
if not nav:
return ""
post = ctx.get("post") or {}
slug = post.get("slug", "")
title = (post.get("title") or slug)[:40]
return await _render_to_sx("mobile-menu-section",
label=title,
href=call_url(ctx, "blog_url", f"/{slug}/"),
level=1,
items=nav,
)
async def search_mobile_sx(ctx: dict) -> str:
"""Build mobile search input as sx wire format."""
return await _render_to_sx("search-mobile",
current_local_href=ctx.get("current_local_href", "/"),
search=ctx.get("search", ""),
search_count=ctx.get("search_count", ""),
hx_select=ctx.get("hx_select", "#main-panel"),
search_headers_mobile=SEARCH_HEADERS_MOBILE,
)
async def search_desktop_sx(ctx: dict) -> str:
"""Build desktop search input as sx wire format."""
return await _render_to_sx("search-desktop",
current_local_href=ctx.get("current_local_href", "/"),
search=ctx.get("search", ""),
search_count=ctx.get("search_count", ""),
hx_select=ctx.get("hx_select", "#main-panel"),
search_headers_desktop=SEARCH_HEADERS_DESKTOP,
)
async def post_header_sx(ctx: dict, *, oob: bool = False, child: str = "") -> str:
"""Build the post-level header row as sx wire format."""
post = ctx.get("post") or {}
slug = post.get("slug", "")
if not slug:
return ""
title = (post.get("title") or "")[:160]
feature_image = post.get("feature_image")
label_sx = await _render_to_sx("post-label", feature_image=feature_image, title=title)
nav_sx = await _post_nav_items_sx(ctx) or None
link_href = call_url(ctx, "blog_url", f"/{slug}/")
return await _render_to_sx("menu-row-sx",
id="post-row", level=1,
link_href=link_href,
link_label_content=label_sx,
nav=nav_sx,
child_id="post-header-child",
child=SxExpr(child) if child else None,
oob=oob, external=True,
)
async def post_admin_header_sx(ctx: dict, slug: str, *, oob: bool = False,
selected: str = "", admin_href: str = "") -> str:
"""Post admin header row as sx wire format."""
# Label
label_sx = await _render_to_sx("post-admin-label",
selected=str(escape(selected)) if selected else None)
nav_sx = await _post_admin_nav_items_sx(ctx, slug, selected) or None
if not admin_href:
blog_fn = ctx.get("blog_url")
admin_href = blog_fn(f"/{slug}/admin/") if callable(blog_fn) else f"/{slug}/admin/"
return await _render_to_sx("menu-row-sx",
id="post-admin-row", level=2,
link_href=admin_href,
link_label_content=label_sx,
nav=nav_sx,
child_id="post-admin-header-child", oob=oob,
)
async def oob_header_sx(parent_id: str, child_id: str, row_sx: str) -> str:
"""Wrap a header row sx in an OOB swap.
child_id is accepted for call-site compatibility but no longer used —
the child placeholder is created by ~menu-row-sx itself.
"""
return await _render_to_sx("oob-header-sx",
parent_id=parent_id,
row=SxExpr(row_sx),
)
async def header_child_sx(inner_sx: str, *, id: str = "root-header-child") -> str:
"""Wrap inner sx in a header-child div."""
return await _render_to_sx("header-child-sx",
id=id, inner=_sx_fragment(inner_sx),
)
async def oob_page_sx(*, oobs: str = "", filter: str = "", aside: str = "",
content: str = "", menu: str = "") -> str:
"""Build OOB response as sx wire format."""
return await _render_to_sx("oob-sx",
oobs=_sx_fragment(oobs) if oobs else None,
filter=SxExpr(filter) if filter else None,
aside=SxExpr(aside) if aside else None,
menu=SxExpr(menu) if menu else None,
content=SxExpr(content) if content else None,
)
async def full_page_sx(ctx: dict, *, header_rows: str,
filter: str = "", aside: str = "",
content: str = "", menu: str = "",
meta_html: str = "", meta: str = "") -> str:
"""Build a full page using sx_page() with ~app-body.
meta_html: raw HTML injected into the <head> shell (legacy).
meta: sx source for meta tags — auto-hoisted to <head> by sx.js.
"""
# Auto-generate mobile nav from context when no menu provided
if not menu:
menu = await mobile_root_nav_sx(ctx)
body_sx = await _render_to_sx("app-body",
header_rows=_sx_fragment(header_rows) if header_rows else None,
filter=SxExpr(filter) if filter else None,
aside=SxExpr(aside) if aside else None,
menu=SxExpr(menu) if menu else None,
content=SxExpr(content) if content else None,
)
if meta:
# Wrap body + meta in a fragment so sx.js renders both;
# auto-hoist moves meta/title/link elements to <head>.
body_sx = _sx_fragment(meta, body_sx)
return sx_page(ctx, body_sx, meta_html=meta_html)
def _build_component_ast(__name: str, **kwargs: Any) -> list:
"""Build an AST list for a component call from Python kwargs.
Returns e.g. [Symbol("~card"), Keyword("title"), "hello", Keyword("count"), 3]
No SX string generation — values stay as native Python objects.
"""
from .types import Symbol, Keyword, NIL
comp_sym = Symbol(__name if __name.startswith("~") else f"~{__name}")
ast: list = [comp_sym]
for key, val in kwargs.items():
kebab = key.replace("_", "-")
ast.append(Keyword(kebab))
if val is None:
ast.append(NIL)
elif isinstance(val, SxExpr):
# SxExpr values need to be parsed into AST
from .parser import parse
if not val.source:
ast.append(NIL)
else:
ast.append(parse(val.source))
else:
ast.append(val)
return ast
async def _render_to_sx_with_env(__name: str, extra_env: dict, **kwargs: Any) -> str:
"""Like ``_render_to_sx`` but merges *extra_env* into the evaluation
environment before eval. Used by ``register_sx_layout`` so .sx
defcomps can read ctx values as free variables.
Uses ``async_eval_slot_to_sx`` (not ``async_eval_to_sx``) so the
top-level component body is expanded server-side — free variables
from *extra_env* are resolved during expansion rather than being
serialized as unresolved symbols for the client.
**Private** — service code should use ``sx_call()`` or defmacros instead.
"""
from .jinja_bridge import get_component_env, _get_request_context
import os
if os.environ.get("SX_USE_REF") == "1":
from .ref.async_eval_ref import async_eval_slot_to_sx
else:
from .async_eval import async_eval_slot_to_sx
from .types import Symbol, Keyword, NIL as _NIL
# Build AST with extra_env entries as keyword args so _aser_component
# binds them as params (otherwise it defaults all params to NIL).
comp_sym = Symbol(__name if __name.startswith("~") else f"~{__name}")
ast: list = [comp_sym]
for k, v in extra_env.items():
ast.append(Keyword(k))
ast.append(v if v is not None else _NIL)
for k, v in kwargs.items():
ast.append(Keyword(k.replace("_", "-")))
ast.append(v if v is not None else _NIL)
env = dict(get_component_env())
env.update(extra_env)
ctx = _get_request_context()
return SxExpr(await async_eval_slot_to_sx(ast, env, ctx))
async def _render_to_sx(__name: str, **kwargs: Any) -> str:
"""Call a defcomp and get SX wire format back. No SX string literals.
Builds an AST from Python values and evaluates it through the SX
evaluator, which resolves IO primitives and serializes component/tag
calls as SX wire format.
**Private** — service code should use ``sx_call()`` or defmacros instead.
Only infrastructure code (helpers.py, layouts.py) should call this.
"""
from .jinja_bridge import get_component_env, _get_request_context
import os
if os.environ.get("SX_USE_REF") == "1":
from .ref.async_eval_ref import async_eval_to_sx
else:
from .async_eval import async_eval_to_sx
ast = _build_component_ast(__name, **kwargs)
env = dict(get_component_env())
ctx = _get_request_context()
return SxExpr(await async_eval_to_sx(ast, env, ctx))
# Backwards-compat alias — layout infrastructure still imports this.
# Will be removed once all layouts use register_sx_layout().
render_to_sx_with_env = _render_to_sx_with_env
async def render_to_html(__name: str, **kwargs: Any) -> str:
"""Call a defcomp and get HTML back. No SX string literals.
Same as render_to_sx() but produces HTML output instead of SX wire
format. Used by route renders that need HTML (full pages, fragments).
"""
from .jinja_bridge import get_component_env, _get_request_context
import os
if os.environ.get("SX_USE_REF") == "1":
from .ref.async_eval_ref import async_render
else:
from .async_eval import async_render
ast = _build_component_ast(__name, **kwargs)
env = dict(get_component_env())
ctx = _get_request_context()
return await async_render(ast, env, ctx)
def sx_call(component_name: str, **kwargs: Any) -> str:
"""Build an s-expression component call string from Python kwargs.
Converts snake_case to kebab-case automatically::
sx_call("test-row", nodeid="foo", outcome="passed")
# => '(~test-row :nodeid "foo" :outcome "passed")'
Values are serialized: strings are quoted, None becomes nil,
bools become true/false, numbers stay as-is.
List values use ``(list ...)`` so the client gets an iterable array
rather than a rendered fragment.
"""
from .parser import serialize, SxExpr
name = component_name if component_name.startswith("~") else f"~{component_name}"
parts = [name]
for key, val in kwargs.items():
parts.append(f":{key.replace('_', '-')}")
if isinstance(val, list):
items = [serialize(v) for v in val if v is not None]
if not items:
parts.append("nil")
else:
parts.append("(list " + " ".join(items) + ")")
else:
parts.append(serialize(val))
return SxExpr("(" + " ".join(parts) + ")")
def components_for_request(source: str = "") -> str:
"""Return defcomp/defmacro source for definitions the client doesn't have yet.
Reads the ``SX-Components`` header (comma-separated component names
like ``~card,~nav-item``) and returns only the definitions the client
is missing. If *source* is provided, only sends components needed
for that source (plus transitive deps). If the header is absent,
returns all needed defs.
"""
from quart import request
from .jinja_bridge import _COMPONENT_ENV
from .deps import components_needed
from .types import Component, Macro
from .parser import serialize
# Determine which components the page needs
if source:
needed = components_needed(source, _COMPONENT_ENV)
else:
needed = None # all
loaded_raw = request.headers.get("SX-Components", "")
loaded = set(loaded_raw.split(",")) if loaded_raw else set()
parts = []
for key, val in _COMPONENT_ENV.items():
if isinstance(val, Component):
comp_name = f"~{val.name}"
# Skip if not needed for this page
if needed is not None and comp_name not in needed and key not in needed:
continue
# Skip components the client already has
if comp_name in loaded or val.name in loaded:
continue
# Reconstruct defcomp source
param_strs = ["&key"] + list(val.params)
if val.has_children:
param_strs.extend(["&rest", "children"])
params_sx = "(" + " ".join(param_strs) + ")"
body_sx = serialize(val.body, pretty=True)
parts.append(f"(defcomp ~{val.name} {params_sx} {body_sx})")
elif isinstance(val, Macro):
if val.name in loaded:
continue
param_strs = list(val.params)
if val.rest_param:
param_strs.extend(["&rest", val.rest_param])
params_sx = "(" + " ".join(param_strs) + ")"
body_sx = serialize(val.body, pretty=True)
parts.append(f"(defmacro {val.name} {params_sx} {body_sx})")
return "\n".join(parts)
def sx_response(source: str, status: int = 200,
headers: dict | None = None):
"""Return an s-expression wire-format response.
Takes a raw sx string::
return sx_response('(~test-row :nodeid "foo")')
For SX requests, missing component definitions are prepended as a
``<script type="text/sx" data-components>`` block so the client
can process them before rendering OOB content.
"""
from quart import request, Response
body = source
# Validate the sx source parses as a single expression
try:
from .parser import parse as _parse_check
_parse_check(source)
except Exception as _e:
import logging
logging.getLogger("sx").error("sx_response parse error: %s\nSource (first 500): %s", _e, source[:500])
# For SX requests, prepend missing component definitions
comp_defs = ""
if request.headers.get("SX-Request"):
comp_defs = components_for_request(source)
if comp_defs:
body = (f'<script type="text/sx" data-components>'
f'{comp_defs}</script>\n{body}')
# On-demand CSS: scan source for classes, send only new rules
from .css_registry import scan_classes_from_sx, lookup_rules, registry_loaded, lookup_css_hash, store_css_hash
new_classes: set[str] = set()
cumulative_classes: set[str] = set()
if registry_loaded():
new_classes = scan_classes_from_sx(source)
if comp_defs:
# Scan only the component definitions actually being sent
new_classes.update(scan_classes_from_sx(comp_defs))
# Resolve known classes from SX-Css header (hash or full list)
known_classes: set[str] = set()
known_raw = request.headers.get("SX-Css", "")
if known_raw:
if len(known_raw) <= 16:
# Treat as hash
looked_up = lookup_css_hash(known_raw)
if looked_up is not None:
known_classes = looked_up
else:
# Cache miss — send all classes (safe fallback)
known_classes = set()
else:
known_classes = set(known_raw.split(","))
cumulative_classes = known_classes | new_classes
new_classes -= known_classes
if new_classes:
new_rules = lookup_rules(new_classes)
if new_rules:
body = f'<style data-sx-css>{new_rules}</style>\n{body}'
# Dev mode: pretty-print sx source for readable Network tab responses
if _is_dev_mode():
body = _pretty_print_sx_body(body)
resp = Response(body, status=status, content_type="text/sx")
if new_classes:
resp.headers["SX-Css-Add"] = ",".join(sorted(new_classes))
if cumulative_classes:
resp.headers["SX-Css-Hash"] = store_css_hash(cumulative_classes)
if headers:
for k, v in headers.items():
resp.headers[k] = v
return resp
# ---------------------------------------------------------------------------
# Sx wire-format full page shell
# ---------------------------------------------------------------------------
_SX_PAGE_TEMPLATE = """\
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="robots" content="index,follow">
<meta name="theme-color" content="#ffffff">
<title>{title}</title>
{meta_html}
<style>@media (min-width: 768px) {{ .js-mobile-sentinel {{ display:none !important; }} }}</style>
<meta name="csrf-token" content="{csrf}">
<style id="sx-css">{sx_css}</style>
<meta name="sx-css-classes" content="{sx_css_classes}">
<script src="https://unpkg.com/prismjs/prism.js"></script>
<script src="https://unpkg.com/prismjs/components/prism-javascript.min.js"></script>
<script src="https://unpkg.com/prismjs/components/prism-python.min.js"></script>
<script src="https://unpkg.com/prismjs/components/prism-bash.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/sweetalert2@11"></script>
<script>if(matchMedia('(hover:hover) and (pointer:fine)').matches){{document.documentElement.classList.add('hover-capable')}}</script>
<script>document.addEventListener('click',function(e){{var t=e.target.closest('[data-close-details]');if(!t)return;var d=t.closest('details');if(d)d.removeAttribute('open')}})</script>
<style>
details[data-toggle-group="mobile-panels"]>summary{{list-style:none}}
details[data-toggle-group="mobile-panels"]>summary::-webkit-details-marker{{display:none}}
@media(min-width:768px){{.nav-group:focus-within .submenu,.nav-group:hover .submenu{{display:block}}}}
img{{max-width:100%;height:auto}}
.clamp-2{{display:-webkit-box;-webkit-line-clamp:2;-webkit-box-orient:vertical;overflow:hidden}}
.clamp-3{{display:-webkit-box;-webkit-line-clamp:3;-webkit-box-orient:vertical;overflow:hidden}}
.no-scrollbar::-webkit-scrollbar{{display:none}}.no-scrollbar{{-ms-overflow-style:none;scrollbar-width:none}}
details.group{{overflow:hidden}}details.group>summary{{list-style:none}}details.group>summary::-webkit-details-marker{{display:none}}
.sx-indicator{{display:none}}.sx-request .sx-indicator{{display:inline-flex}}
.sx-error .sx-indicator{{display:none}}.sx-loading .sx-indicator{{display:inline-flex}}
.js-wrap.open .js-pop{{display:block}}.js-wrap.open .js-backdrop{{display:block}}
</style>
</head>
<body class="bg-stone-50 text-stone-900">
<script type="text/sx-styles" data-hash="{styles_hash}">{styles_json}</script>
<script type="text/sx" data-components data-hash="{component_hash}">{component_defs}</script>
<script type="text/sx-pages">{pages_sx}</script>
<script type="text/sx" data-mount="body">{page_sx}</script>
<script src="{asset_url}/scripts/sx-browser.js?v={sx_js_hash}"></script>
<script src="{asset_url}/scripts/body.js?v={body_js_hash}"></script>
</body>
</html>"""
def _build_pages_sx(service: str) -> str:
"""Build SX page registry for client-side routing.
Returns SX dict literals (one per page) parseable by the client's
``parse`` function. Each dict has keys: name, path, auth, has-data,
content, closure.
"""
from .pages import get_all_pages
from .parser import serialize as sx_serialize
pages = get_all_pages(service)
if not pages:
return ""
entries = []
for page_def in pages.values():
content_src = ""
if page_def.content_expr is not None:
try:
content_src = sx_serialize(page_def.content_expr)
except Exception:
pass
auth = page_def.auth if isinstance(page_def.auth, str) else "custom"
has_data = "true" if page_def.data_expr is not None else "false"
# Build closure as SX dict
closure_parts: list[str] = []
for k, v in page_def.closure.items():
if isinstance(v, (str, int, float, bool)):
closure_parts.append(f":{k} {_sx_literal(v)}")
closure_sx = "{" + " ".join(closure_parts) + "}"
entry = (
"{:name " + _sx_literal(page_def.name)
+ " :path " + _sx_literal(page_def.path)
+ " :auth " + _sx_literal(auth)
+ " :has-data " + has_data
+ " :content " + _sx_literal(content_src)
+ " :closure " + closure_sx + "}"
)
entries.append(entry)
return "\n".join(entries)
def _sx_literal(v: object) -> str:
"""Serialize a Python value as an SX literal."""
if v is None:
return "nil"
if isinstance(v, bool):
return "true" if v else "false"
if isinstance(v, (int, float)):
return str(v)
if isinstance(v, str):
escaped = v.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
return f'"{escaped}"'
return "nil"
def sx_page(ctx: dict, page_sx: str, *,
meta_html: str = "") -> str:
"""Return a minimal HTML shell that boots the page from sx source.
The browser loads component definitions and page sx, then sx.js
renders everything client-side. CSS rules are scanned from the sx
source and component defs, then injected as a <style> block.
"""
from .jinja_bridge import components_for_page, css_classes_for_page
from .css_registry import lookup_rules, get_preamble, registry_loaded, store_css_hash
# Per-page component bundle: only definitions this page needs
component_defs, component_hash = components_for_page(page_sx)
# Check if client already has this version cached (via cookie)
# In dev mode, always send full source so edits are visible immediately
client_hash = _get_sx_comp_cookie()
if not _is_dev_mode() and client_hash and client_hash == component_hash:
# Client has current components cached — send empty source
component_defs = ""
# Scan for CSS classes — only from components this page uses + page source
sx_css = ""
sx_css_classes = ""
sx_css_hash = ""
if registry_loaded():
classes = css_classes_for_page(page_sx)
# Always include body classes
classes.update(["bg-stone-50", "text-stone-900"])
rules = lookup_rules(classes)
sx_css = get_preamble() + rules
sx_css_hash = store_css_hash(classes)
sx_css_classes = sx_css_hash
asset_url = get_asset_url(ctx)
title = ctx.get("base_title", "Rose Ash")
csrf = _get_csrf_token()
# Dev mode: pretty-print page sx for readable View Source
if _is_dev_mode() and page_sx and page_sx.startswith("("):
from .parser import parse as _parse, serialize as _serialize
try:
page_sx = _serialize(_parse(page_sx), pretty=True)
except Exception:
pass
# Style dictionary for client-side css primitive
styles_hash = _get_style_dict_hash()
client_styles_hash = _get_sx_styles_cookie()
if not _is_dev_mode() and client_styles_hash and client_styles_hash == styles_hash:
styles_json = "" # Client has cached version
else:
styles_json = _build_style_dict_json()
# Page registry for client-side routing
pages_sx = ""
try:
from quart import current_app
pages_sx = _build_pages_sx(current_app.name)
except Exception as e:
import logging
logging.getLogger("sx").warning("Failed to build page registry: %s", e)
return _SX_PAGE_TEMPLATE.format(
title=_html_escape(title),
asset_url=asset_url,
meta_html=meta_html,
csrf=_html_escape(csrf),
component_hash=component_hash,
component_defs=component_defs,
styles_hash=styles_hash,
styles_json=styles_json,
pages_sx=pages_sx.replace("{", "{{").replace("}", "}}"),
page_sx=page_sx,
sx_css=sx_css,
sx_css_classes=sx_css_classes,
sx_js_hash=_script_hash("sx-browser.js"),
body_js_hash=_script_hash("body.js"),
)
_SCRIPT_HASH_CACHE: dict[str, str] = {}
_STYLE_DICT_JSON: str = ""
_STYLE_DICT_HASH: str = ""
def _build_style_dict_json() -> str:
"""Build compact JSON style dictionary for client-side css primitive."""
global _STYLE_DICT_JSON, _STYLE_DICT_HASH
if _STYLE_DICT_JSON:
return _STYLE_DICT_JSON
import json
from .style_dict import (
STYLE_ATOMS, PSEUDO_VARIANTS, RESPONSIVE_BREAKPOINTS,
KEYFRAMES, ARBITRARY_PATTERNS, CHILD_SELECTOR_ATOMS,
)
# Derive child selector prefixes from CHILD_SELECTOR_ATOMS
prefixes = set()
for atom in CHILD_SELECTOR_ATOMS:
# "space-y-4" → "space-y-", "divide-y" → "divide-"
for sep in ("space-x-", "space-y-", "divide-x", "divide-y"):
if atom.startswith(sep):
prefixes.add(sep)
break
data = {
"a": STYLE_ATOMS,
"v": PSEUDO_VARIANTS,
"b": RESPONSIVE_BREAKPOINTS,
"k": KEYFRAMES,
"p": ARBITRARY_PATTERNS,
"c": sorted(prefixes),
}
_STYLE_DICT_JSON = json.dumps(data, separators=(",", ":"))
_STYLE_DICT_HASH = hashlib.md5(_STYLE_DICT_JSON.encode()).hexdigest()[:8]
return _STYLE_DICT_JSON
def _get_style_dict_hash() -> str:
"""Get the hash of the style dictionary JSON."""
if not _STYLE_DICT_HASH:
_build_style_dict_json()
return _STYLE_DICT_HASH
def _get_sx_styles_cookie() -> str:
"""Read the sx-styles-hash cookie from the current request."""
try:
from quart import request
return request.cookies.get("sx-styles-hash", "")
except Exception:
return ""
def _script_hash(filename: str) -> str:
"""Compute MD5 hash of a static script file, cached for process lifetime."""
if filename not in _SCRIPT_HASH_CACHE:
try:
data = (Path("static") / "scripts" / filename).read_bytes()
_SCRIPT_HASH_CACHE[filename] = hashlib.md5(data).hexdigest()[:8]
except Exception:
_SCRIPT_HASH_CACHE[filename] = "dev"
return _SCRIPT_HASH_CACHE[filename]
def _get_csrf_token() -> str:
"""Get the CSRF token from the current request context."""
try:
from quart import g
return getattr(g, "csrf_token", "")
except Exception:
return ""
def _get_sx_comp_cookie() -> str:
"""Read the sx-comp-hash cookie from the current request."""
try:
from quart import request
return request.cookies.get("sx-comp-hash", "")
except Exception:
return ""
def _is_dev_mode() -> bool:
"""Check if running in dev mode (RELOAD=true)."""
import os
return os.getenv("RELOAD") == "true"
def _pretty_print_sx_body(body: str) -> str:
"""Pretty-print the sx portion of a response body, preserving HTML blocks."""
import re
from .parser import parse_all as _parse_all, serialize as _serialize
# Split HTML prefix blocks (<style>, <script>) from the sx tail
# These are always at the start, each on its own line
parts: list[str] = []
rest = body
while rest.startswith("<"):
end = rest.find(">", rest.find("</")) + 1
if end <= 0:
break
# Find end of the closing tag
tag_match = re.match(r'<(style|script)[^>]*>[\s\S]*?</\1>', rest)
if tag_match:
parts.append(tag_match.group(0))
rest = rest[tag_match.end():].lstrip("\n")
else:
break
sx_source = rest.strip()
if not sx_source or sx_source[0] != "(":
return body
try:
exprs = _parse_all(sx_source)
if len(exprs) == 1:
parts.append(_serialize(exprs[0], pretty=True))
else:
# Multiple top-level expressions — indent each
pretty_parts = [_serialize(expr, pretty=True) for expr in exprs]
parts.append("\n\n".join(pretty_parts))
return "\n\n".join(parts)
except Exception:
return body
def _html_escape(s: str) -> str:
"""Minimal HTML escaping for attribute values."""
return (s.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;"))