This was silently masking the str.format() braces bug. If page registry building fails, it should crash visibly, not serve a broken page with 0 routes. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
912 lines
34 KiB
Python
912 lines
34 KiB
Python
"""
|
|
Shared helper functions for s-expression page rendering.
|
|
|
|
These are used by per-service sxc/pages modules to build common
|
|
page elements (headers, search, etc.) from template context.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from markupsafe import escape
|
|
|
|
from .page import SEARCH_HEADERS_MOBILE, SEARCH_HEADERS_DESKTOP
|
|
from .parser import SxExpr
|
|
|
|
|
|
def _sx_fragment(*parts: str) -> SxExpr:
|
|
"""Wrap pre-rendered SX wire format strings in a fragment.
|
|
|
|
Infrastructure utility for composing already-serialized SX strings.
|
|
NOT for building SX from Python data — use sx_call() or _render_to_sx().
|
|
"""
|
|
joined = " ".join(p for p in parts if p)
|
|
return SxExpr(f"(<> {joined})") if joined else SxExpr("")
|
|
|
|
|
|
def call_url(ctx: dict, key: str, path: str = "/") -> str:
|
|
"""Call a URL helper from context (e.g., blog_url, account_url)."""
|
|
fn = ctx.get(key)
|
|
if callable(fn):
|
|
return fn(path)
|
|
return str(fn or "") + path
|
|
|
|
|
|
def get_asset_url(ctx: dict) -> str:
|
|
"""Extract the asset URL base from context."""
|
|
au = ctx.get("asset_url")
|
|
if callable(au):
|
|
result = au("")
|
|
return result.rsplit("/", 1)[0] if "/" in result else result
|
|
return au or ""
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Sx-native helper functions — return sx source (not HTML)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _as_sx(val: Any) -> SxExpr | None:
|
|
"""Coerce a fragment value to SxExpr.
|
|
|
|
If *val* is already a ``SxExpr`` (from a ``text/sx`` fragment),
|
|
return it as-is. If it's a non-empty string (HTML from a
|
|
``text/html`` fragment), wrap it in ``~rich-text``. Otherwise
|
|
return ``None``.
|
|
"""
|
|
if not val:
|
|
return None
|
|
if isinstance(val, SxExpr):
|
|
return val if val.source else None
|
|
html = str(val)
|
|
return sx_call("rich-text", html=html)
|
|
|
|
|
|
async def root_header_sx(ctx: dict, *, oob: bool = False) -> str:
|
|
"""Build the root header row as sx wire format."""
|
|
rights = ctx.get("rights") or {}
|
|
is_admin = rights.get("admin") if isinstance(rights, dict) else getattr(rights, "admin", False)
|
|
settings_url = call_url(ctx, "blog_url", "/settings/") if is_admin else ""
|
|
return await _render_to_sx("header-row-sx",
|
|
cart_mini=_as_sx(ctx.get("cart_mini")),
|
|
blog_url=call_url(ctx, "blog_url", ""),
|
|
site_title=ctx.get("base_title", ""),
|
|
app_label=ctx.get("app_label", ""),
|
|
nav_tree=_as_sx(ctx.get("nav_tree")),
|
|
auth_menu=_as_sx(ctx.get("auth_menu")),
|
|
nav_panel=_as_sx(ctx.get("nav_panel")),
|
|
settings_url=settings_url,
|
|
is_admin=is_admin,
|
|
oob=oob,
|
|
)
|
|
|
|
|
|
def mobile_menu_sx(*sections: str) -> SxExpr:
|
|
"""Assemble mobile menu from pre-built sections (deepest first)."""
|
|
parts = [s for s in sections if s]
|
|
return _sx_fragment(*parts) if parts else SxExpr("")
|
|
|
|
|
|
async def mobile_root_nav_sx(ctx: dict) -> str:
|
|
"""Root-level mobile nav via ~mobile-root-nav component."""
|
|
nav_tree = ctx.get("nav_tree") or ""
|
|
auth_menu = ctx.get("auth_menu") or ""
|
|
if not nav_tree and not auth_menu:
|
|
return ""
|
|
return await _render_to_sx("mobile-root-nav",
|
|
nav_tree=_as_sx(nav_tree),
|
|
auth_menu=_as_sx(auth_menu),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shared nav-item builders — used by BOTH desktop headers and mobile menus
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _post_nav_items_sx(ctx: dict) -> SxExpr:
|
|
"""Build post-level nav items (container_nav + admin cog). Shared by
|
|
``post_header_sx`` (desktop) and ``post_mobile_nav_sx`` (mobile)."""
|
|
post = ctx.get("post") or {}
|
|
slug = post.get("slug", "")
|
|
if not slug:
|
|
return SxExpr("")
|
|
parts: list[str] = []
|
|
page_cart_count = ctx.get("page_cart_count", 0)
|
|
if page_cart_count and page_cart_count > 0:
|
|
cart_href = call_url(ctx, "cart_url", f"/{slug}/")
|
|
parts.append(await _render_to_sx("page-cart-badge", href=cart_href,
|
|
count=str(page_cart_count)))
|
|
|
|
container_nav = str(ctx.get("container_nav") or "").strip()
|
|
# Skip empty fragment wrappers like "(<> )"
|
|
if container_nav and container_nav.replace("(<>", "").replace(")", "").strip():
|
|
parts.append(await _render_to_sx("container-nav-wrapper",
|
|
content=SxExpr(container_nav)))
|
|
|
|
# Admin cog
|
|
admin_nav = ctx.get("post_admin_nav")
|
|
if not admin_nav:
|
|
rights = ctx.get("rights") or {}
|
|
has_admin = rights.get("admin") if isinstance(rights, dict) else getattr(rights, "admin", False)
|
|
if has_admin and slug:
|
|
from quart import request
|
|
admin_href = call_url(ctx, "blog_url", f"/{slug}/admin/")
|
|
is_admin_page = ctx.get("is_admin_section") or "/admin" in request.path
|
|
admin_nav = await _render_to_sx("admin-cog-button",
|
|
href=admin_href,
|
|
is_admin_page=is_admin_page or None)
|
|
if admin_nav:
|
|
parts.append(admin_nav)
|
|
return _sx_fragment(*parts) if parts else SxExpr("")
|
|
|
|
|
|
async def _post_admin_nav_items_sx(ctx: dict, slug: str,
|
|
selected: str = "") -> SxExpr:
|
|
"""Build post-admin nav items (calendars, markets, etc.). Shared by
|
|
``post_admin_header_sx`` (desktop) and mobile menu."""
|
|
select_colours = ctx.get("select_colours", "")
|
|
parts: list[str] = []
|
|
items = [
|
|
("events_url", f"/{slug}/admin/", "calendars"),
|
|
("market_url", f"/{slug}/admin/", "markets"),
|
|
("cart_url", f"/{slug}/admin/payments/", "payments"),
|
|
("blog_url", f"/{slug}/admin/entries/", "entries"),
|
|
("blog_url", f"/{slug}/admin/data/", "data"),
|
|
("blog_url", f"/{slug}/admin/preview/", "preview"),
|
|
("blog_url", f"/{slug}/admin/edit/", "edit"),
|
|
("blog_url", f"/{slug}/admin/settings/", "settings"),
|
|
]
|
|
for url_key, path, label in items:
|
|
url_fn = ctx.get(url_key)
|
|
if not callable(url_fn):
|
|
continue
|
|
href = url_fn(path)
|
|
is_sel = label == selected
|
|
parts.append(await _render_to_sx("nav-link", href=href, label=label,
|
|
select_colours=select_colours,
|
|
is_selected=is_sel or None))
|
|
return _sx_fragment(*parts) if parts else SxExpr("")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Mobile menu section builders — wrap shared nav items for hamburger panel
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def post_mobile_nav_sx(ctx: dict) -> str:
|
|
"""Post-level mobile menu section."""
|
|
nav = await _post_nav_items_sx(ctx)
|
|
if not nav:
|
|
return ""
|
|
post = ctx.get("post") or {}
|
|
slug = post.get("slug", "")
|
|
title = (post.get("title") or slug)[:40]
|
|
return await _render_to_sx("mobile-menu-section",
|
|
label=title,
|
|
href=call_url(ctx, "blog_url", f"/{slug}/"),
|
|
level=1,
|
|
items=nav,
|
|
)
|
|
|
|
|
|
|
|
async def search_mobile_sx(ctx: dict) -> str:
|
|
"""Build mobile search input as sx wire format."""
|
|
return await _render_to_sx("search-mobile",
|
|
current_local_href=ctx.get("current_local_href", "/"),
|
|
search=ctx.get("search", ""),
|
|
search_count=ctx.get("search_count", ""),
|
|
hx_select=ctx.get("hx_select", "#main-panel"),
|
|
search_headers_mobile=SEARCH_HEADERS_MOBILE,
|
|
)
|
|
|
|
|
|
async def search_desktop_sx(ctx: dict) -> str:
|
|
"""Build desktop search input as sx wire format."""
|
|
return await _render_to_sx("search-desktop",
|
|
current_local_href=ctx.get("current_local_href", "/"),
|
|
search=ctx.get("search", ""),
|
|
search_count=ctx.get("search_count", ""),
|
|
hx_select=ctx.get("hx_select", "#main-panel"),
|
|
search_headers_desktop=SEARCH_HEADERS_DESKTOP,
|
|
)
|
|
|
|
|
|
async def post_header_sx(ctx: dict, *, oob: bool = False, child: str = "") -> str:
|
|
"""Build the post-level header row as sx wire format."""
|
|
post = ctx.get("post") or {}
|
|
slug = post.get("slug", "")
|
|
if not slug:
|
|
return ""
|
|
title = (post.get("title") or "")[:160]
|
|
feature_image = post.get("feature_image")
|
|
|
|
label_sx = await _render_to_sx("post-label", feature_image=feature_image, title=title)
|
|
nav_sx = await _post_nav_items_sx(ctx) or None
|
|
link_href = call_url(ctx, "blog_url", f"/{slug}/")
|
|
|
|
return await _render_to_sx("menu-row-sx",
|
|
id="post-row", level=1,
|
|
link_href=link_href,
|
|
link_label_content=label_sx,
|
|
nav=nav_sx,
|
|
child_id="post-header-child",
|
|
child=SxExpr(child) if child else None,
|
|
oob=oob, external=True,
|
|
)
|
|
|
|
|
|
async def post_admin_header_sx(ctx: dict, slug: str, *, oob: bool = False,
|
|
selected: str = "", admin_href: str = "") -> str:
|
|
"""Post admin header row as sx wire format."""
|
|
# Label
|
|
label_sx = await _render_to_sx("post-admin-label",
|
|
selected=str(escape(selected)) if selected else None)
|
|
|
|
nav_sx = await _post_admin_nav_items_sx(ctx, slug, selected) or None
|
|
|
|
if not admin_href:
|
|
blog_fn = ctx.get("blog_url")
|
|
admin_href = blog_fn(f"/{slug}/admin/") if callable(blog_fn) else f"/{slug}/admin/"
|
|
|
|
return await _render_to_sx("menu-row-sx",
|
|
id="post-admin-row", level=2,
|
|
link_href=admin_href,
|
|
link_label_content=label_sx,
|
|
nav=nav_sx,
|
|
child_id="post-admin-header-child", oob=oob,
|
|
)
|
|
|
|
|
|
async def oob_header_sx(parent_id: str, child_id: str, row_sx: str) -> str:
|
|
"""Wrap a header row sx in an OOB swap.
|
|
|
|
child_id is accepted for call-site compatibility but no longer used —
|
|
the child placeholder is created by ~menu-row-sx itself.
|
|
"""
|
|
return await _render_to_sx("oob-header-sx",
|
|
parent_id=parent_id,
|
|
row=SxExpr(row_sx),
|
|
)
|
|
|
|
|
|
async def header_child_sx(inner_sx: str, *, id: str = "root-header-child") -> str:
|
|
"""Wrap inner sx in a header-child div."""
|
|
return await _render_to_sx("header-child-sx",
|
|
id=id, inner=_sx_fragment(inner_sx),
|
|
)
|
|
|
|
|
|
async def oob_page_sx(*, oobs: str = "", filter: str = "", aside: str = "",
|
|
content: str = "", menu: str = "") -> str:
|
|
"""Build OOB response as sx wire format."""
|
|
return await _render_to_sx("oob-sx",
|
|
oobs=_sx_fragment(oobs) if oobs else None,
|
|
filter=SxExpr(filter) if filter else None,
|
|
aside=SxExpr(aside) if aside else None,
|
|
menu=SxExpr(menu) if menu else None,
|
|
content=SxExpr(content) if content else None,
|
|
)
|
|
|
|
|
|
async def full_page_sx(ctx: dict, *, header_rows: str,
|
|
filter: str = "", aside: str = "",
|
|
content: str = "", menu: str = "",
|
|
meta_html: str = "", meta: str = "") -> str:
|
|
"""Build a full page using sx_page() with ~app-body.
|
|
|
|
meta_html: raw HTML injected into the <head> shell (legacy).
|
|
meta: sx source for meta tags — auto-hoisted to <head> by sx.js.
|
|
"""
|
|
# Auto-generate mobile nav from context when no menu provided
|
|
if not menu:
|
|
menu = await mobile_root_nav_sx(ctx)
|
|
body_sx = await _render_to_sx("app-body",
|
|
header_rows=_sx_fragment(header_rows) if header_rows else None,
|
|
filter=SxExpr(filter) if filter else None,
|
|
aside=SxExpr(aside) if aside else None,
|
|
menu=SxExpr(menu) if menu else None,
|
|
content=SxExpr(content) if content else None,
|
|
)
|
|
if meta:
|
|
# Wrap body + meta in a fragment so sx.js renders both;
|
|
# auto-hoist moves meta/title/link elements to <head>.
|
|
body_sx = _sx_fragment(meta, body_sx)
|
|
return sx_page(ctx, body_sx, meta_html=meta_html)
|
|
|
|
|
|
def _build_component_ast(__name: str, **kwargs: Any) -> list:
|
|
"""Build an AST list for a component call from Python kwargs.
|
|
|
|
Returns e.g. [Symbol("~card"), Keyword("title"), "hello", Keyword("count"), 3]
|
|
No SX string generation — values stay as native Python objects.
|
|
"""
|
|
from .types import Symbol, Keyword, NIL
|
|
comp_sym = Symbol(__name if __name.startswith("~") else f"~{__name}")
|
|
ast: list = [comp_sym]
|
|
for key, val in kwargs.items():
|
|
kebab = key.replace("_", "-")
|
|
ast.append(Keyword(kebab))
|
|
if val is None:
|
|
ast.append(NIL)
|
|
elif isinstance(val, SxExpr):
|
|
# SxExpr values need to be parsed into AST
|
|
from .parser import parse
|
|
if not val.source:
|
|
ast.append(NIL)
|
|
else:
|
|
ast.append(parse(val.source))
|
|
else:
|
|
ast.append(val)
|
|
return ast
|
|
|
|
|
|
async def _render_to_sx_with_env(__name: str, extra_env: dict, **kwargs: Any) -> str:
|
|
"""Like ``_render_to_sx`` but merges *extra_env* into the evaluation
|
|
environment before eval. Used by ``register_sx_layout`` so .sx
|
|
defcomps can read ctx values as free variables.
|
|
|
|
Uses ``async_eval_slot_to_sx`` (not ``async_eval_to_sx``) so the
|
|
top-level component body is expanded server-side — free variables
|
|
from *extra_env* are resolved during expansion rather than being
|
|
serialized as unresolved symbols for the client.
|
|
|
|
**Private** — service code should use ``sx_call()`` or defmacros instead.
|
|
"""
|
|
from .jinja_bridge import get_component_env, _get_request_context
|
|
import os
|
|
if os.environ.get("SX_USE_REF") == "1":
|
|
from .ref.async_eval_ref import async_eval_slot_to_sx
|
|
else:
|
|
from .async_eval import async_eval_slot_to_sx
|
|
from .types import Symbol, Keyword, NIL as _NIL
|
|
|
|
# Build AST with extra_env entries as keyword args so _aser_component
|
|
# binds them as params (otherwise it defaults all params to NIL).
|
|
comp_sym = Symbol(__name if __name.startswith("~") else f"~{__name}")
|
|
ast: list = [comp_sym]
|
|
for k, v in extra_env.items():
|
|
ast.append(Keyword(k))
|
|
ast.append(v if v is not None else _NIL)
|
|
for k, v in kwargs.items():
|
|
ast.append(Keyword(k.replace("_", "-")))
|
|
ast.append(v if v is not None else _NIL)
|
|
|
|
env = dict(get_component_env())
|
|
env.update(extra_env)
|
|
ctx = _get_request_context()
|
|
return SxExpr(await async_eval_slot_to_sx(ast, env, ctx))
|
|
|
|
|
|
async def _render_to_sx(__name: str, **kwargs: Any) -> str:
|
|
"""Call a defcomp and get SX wire format back. No SX string literals.
|
|
|
|
Builds an AST from Python values and evaluates it through the SX
|
|
evaluator, which resolves IO primitives and serializes component/tag
|
|
calls as SX wire format.
|
|
|
|
**Private** — service code should use ``sx_call()`` or defmacros instead.
|
|
Only infrastructure code (helpers.py, layouts.py) should call this.
|
|
"""
|
|
from .jinja_bridge import get_component_env, _get_request_context
|
|
import os
|
|
if os.environ.get("SX_USE_REF") == "1":
|
|
from .ref.async_eval_ref import async_eval_to_sx
|
|
else:
|
|
from .async_eval import async_eval_to_sx
|
|
|
|
ast = _build_component_ast(__name, **kwargs)
|
|
env = dict(get_component_env())
|
|
ctx = _get_request_context()
|
|
return SxExpr(await async_eval_to_sx(ast, env, ctx))
|
|
|
|
|
|
# Backwards-compat alias — layout infrastructure still imports this.
|
|
# Will be removed once all layouts use register_sx_layout().
|
|
render_to_sx_with_env = _render_to_sx_with_env
|
|
|
|
|
|
async def render_to_html(__name: str, **kwargs: Any) -> str:
|
|
"""Call a defcomp and get HTML back. No SX string literals.
|
|
|
|
Same as render_to_sx() but produces HTML output instead of SX wire
|
|
format. Used by route renders that need HTML (full pages, fragments).
|
|
"""
|
|
from .jinja_bridge import get_component_env, _get_request_context
|
|
import os
|
|
if os.environ.get("SX_USE_REF") == "1":
|
|
from .ref.async_eval_ref import async_render
|
|
else:
|
|
from .async_eval import async_render
|
|
|
|
ast = _build_component_ast(__name, **kwargs)
|
|
env = dict(get_component_env())
|
|
ctx = _get_request_context()
|
|
return await async_render(ast, env, ctx)
|
|
|
|
|
|
def sx_call(component_name: str, **kwargs: Any) -> str:
|
|
"""Build an s-expression component call string from Python kwargs.
|
|
|
|
Converts snake_case to kebab-case automatically::
|
|
|
|
sx_call("test-row", nodeid="foo", outcome="passed")
|
|
# => '(~test-row :nodeid "foo" :outcome "passed")'
|
|
|
|
Values are serialized: strings are quoted, None becomes nil,
|
|
bools become true/false, numbers stay as-is.
|
|
List values use ``(list ...)`` so the client gets an iterable array
|
|
rather than a rendered fragment.
|
|
"""
|
|
from .parser import serialize, SxExpr
|
|
name = component_name if component_name.startswith("~") else f"~{component_name}"
|
|
parts = [name]
|
|
for key, val in kwargs.items():
|
|
parts.append(f":{key.replace('_', '-')}")
|
|
if isinstance(val, list):
|
|
items = [serialize(v) for v in val if v is not None]
|
|
if not items:
|
|
parts.append("nil")
|
|
else:
|
|
parts.append("(list " + " ".join(items) + ")")
|
|
else:
|
|
parts.append(serialize(val))
|
|
return SxExpr("(" + " ".join(parts) + ")")
|
|
|
|
|
|
|
|
def components_for_request(source: str = "") -> str:
|
|
"""Return defcomp/defmacro source for definitions the client doesn't have yet.
|
|
|
|
Reads the ``SX-Components`` header (comma-separated component names
|
|
like ``~card,~nav-item``) and returns only the definitions the client
|
|
is missing. If *source* is provided, only sends components needed
|
|
for that source (plus transitive deps). If the header is absent,
|
|
returns all needed defs.
|
|
"""
|
|
from quart import request
|
|
from .jinja_bridge import _COMPONENT_ENV
|
|
from .deps import components_needed
|
|
from .types import Component, Macro
|
|
from .parser import serialize
|
|
|
|
# Determine which components the page needs
|
|
if source:
|
|
needed = components_needed(source, _COMPONENT_ENV)
|
|
else:
|
|
needed = None # all
|
|
|
|
loaded_raw = request.headers.get("SX-Components", "")
|
|
loaded = set(loaded_raw.split(",")) if loaded_raw else set()
|
|
|
|
parts = []
|
|
for key, val in _COMPONENT_ENV.items():
|
|
if isinstance(val, Component):
|
|
comp_name = f"~{val.name}"
|
|
# Skip if not needed for this page
|
|
if needed is not None and comp_name not in needed and key not in needed:
|
|
continue
|
|
# Skip components the client already has
|
|
if comp_name in loaded or val.name in loaded:
|
|
continue
|
|
# Reconstruct defcomp source
|
|
param_strs = ["&key"] + list(val.params)
|
|
if val.has_children:
|
|
param_strs.extend(["&rest", "children"])
|
|
params_sx = "(" + " ".join(param_strs) + ")"
|
|
body_sx = serialize(val.body, pretty=True)
|
|
parts.append(f"(defcomp ~{val.name} {params_sx} {body_sx})")
|
|
elif isinstance(val, Macro):
|
|
if val.name in loaded:
|
|
continue
|
|
param_strs = list(val.params)
|
|
if val.rest_param:
|
|
param_strs.extend(["&rest", val.rest_param])
|
|
params_sx = "(" + " ".join(param_strs) + ")"
|
|
body_sx = serialize(val.body, pretty=True)
|
|
parts.append(f"(defmacro {val.name} {params_sx} {body_sx})")
|
|
return "\n".join(parts)
|
|
|
|
|
|
def sx_response(source: str, status: int = 200,
|
|
headers: dict | None = None):
|
|
"""Return an s-expression wire-format response.
|
|
|
|
Takes a raw sx string::
|
|
|
|
return sx_response('(~test-row :nodeid "foo")')
|
|
|
|
For SX requests, missing component definitions are prepended as a
|
|
``<script type="text/sx" data-components>`` block so the client
|
|
can process them before rendering OOB content.
|
|
"""
|
|
from quart import request, Response
|
|
|
|
body = source
|
|
# Validate the sx source parses as a single expression
|
|
try:
|
|
from .parser import parse as _parse_check
|
|
_parse_check(source)
|
|
except Exception as _e:
|
|
import logging
|
|
logging.getLogger("sx").error("sx_response parse error: %s\nSource (first 500): %s", _e, source[:500])
|
|
|
|
# For SX requests, prepend missing component definitions
|
|
comp_defs = ""
|
|
if request.headers.get("SX-Request"):
|
|
comp_defs = components_for_request(source)
|
|
if comp_defs:
|
|
body = (f'<script type="text/sx" data-components>'
|
|
f'{comp_defs}</script>\n{body}')
|
|
|
|
# On-demand CSS: scan source for classes, send only new rules
|
|
from .css_registry import scan_classes_from_sx, lookup_rules, registry_loaded, lookup_css_hash, store_css_hash
|
|
new_classes: set[str] = set()
|
|
cumulative_classes: set[str] = set()
|
|
if registry_loaded():
|
|
new_classes = scan_classes_from_sx(source)
|
|
if comp_defs:
|
|
# Scan only the component definitions actually being sent
|
|
new_classes.update(scan_classes_from_sx(comp_defs))
|
|
|
|
# Resolve known classes from SX-Css header (hash or full list)
|
|
known_classes: set[str] = set()
|
|
known_raw = request.headers.get("SX-Css", "")
|
|
if known_raw:
|
|
if len(known_raw) <= 16:
|
|
# Treat as hash
|
|
looked_up = lookup_css_hash(known_raw)
|
|
if looked_up is not None:
|
|
known_classes = looked_up
|
|
else:
|
|
# Cache miss — send all classes (safe fallback)
|
|
known_classes = set()
|
|
else:
|
|
known_classes = set(known_raw.split(","))
|
|
|
|
cumulative_classes = known_classes | new_classes
|
|
new_classes -= known_classes
|
|
|
|
if new_classes:
|
|
new_rules = lookup_rules(new_classes)
|
|
if new_rules:
|
|
body = f'<style data-sx-css>{new_rules}</style>\n{body}'
|
|
|
|
# Dev mode: pretty-print sx source for readable Network tab responses
|
|
if _is_dev_mode():
|
|
body = _pretty_print_sx_body(body)
|
|
|
|
resp = Response(body, status=status, content_type="text/sx")
|
|
if new_classes:
|
|
resp.headers["SX-Css-Add"] = ",".join(sorted(new_classes))
|
|
if cumulative_classes:
|
|
resp.headers["SX-Css-Hash"] = store_css_hash(cumulative_classes)
|
|
if headers:
|
|
for k, v in headers.items():
|
|
resp.headers[k] = v
|
|
return resp
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Sx wire-format full page shell
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_SX_PAGE_TEMPLATE = """\
|
|
<!doctype html>
|
|
<html lang="en">
|
|
<head>
|
|
<meta charset="utf-8">
|
|
<meta name="viewport" content="width=device-width, initial-scale=1">
|
|
<meta name="robots" content="index,follow">
|
|
<meta name="theme-color" content="#ffffff">
|
|
<title>{title}</title>
|
|
{meta_html}
|
|
<style>@media (min-width: 768px) {{ .js-mobile-sentinel {{ display:none !important; }} }}</style>
|
|
<meta name="csrf-token" content="{csrf}">
|
|
<style id="sx-css">{sx_css}</style>
|
|
<meta name="sx-css-classes" content="{sx_css_classes}">
|
|
<script src="https://unpkg.com/prismjs/prism.js"></script>
|
|
<script src="https://unpkg.com/prismjs/components/prism-javascript.min.js"></script>
|
|
<script src="https://unpkg.com/prismjs/components/prism-python.min.js"></script>
|
|
<script src="https://unpkg.com/prismjs/components/prism-bash.min.js"></script>
|
|
<script src="https://cdn.jsdelivr.net/npm/sweetalert2@11"></script>
|
|
<script>if(matchMedia('(hover:hover) and (pointer:fine)').matches){{document.documentElement.classList.add('hover-capable')}}</script>
|
|
<script>document.addEventListener('click',function(e){{var t=e.target.closest('[data-close-details]');if(!t)return;var d=t.closest('details');if(d)d.removeAttribute('open')}})</script>
|
|
<style>
|
|
details[data-toggle-group="mobile-panels"]>summary{{list-style:none}}
|
|
details[data-toggle-group="mobile-panels"]>summary::-webkit-details-marker{{display:none}}
|
|
@media(min-width:768px){{.nav-group:focus-within .submenu,.nav-group:hover .submenu{{display:block}}}}
|
|
img{{max-width:100%;height:auto}}
|
|
.clamp-2{{display:-webkit-box;-webkit-line-clamp:2;-webkit-box-orient:vertical;overflow:hidden}}
|
|
.clamp-3{{display:-webkit-box;-webkit-line-clamp:3;-webkit-box-orient:vertical;overflow:hidden}}
|
|
.no-scrollbar::-webkit-scrollbar{{display:none}}.no-scrollbar{{-ms-overflow-style:none;scrollbar-width:none}}
|
|
details.group{{overflow:hidden}}details.group>summary{{list-style:none}}details.group>summary::-webkit-details-marker{{display:none}}
|
|
.sx-indicator{{display:none}}.sx-request .sx-indicator{{display:inline-flex}}
|
|
.sx-error .sx-indicator{{display:none}}.sx-loading .sx-indicator{{display:inline-flex}}
|
|
.js-wrap.open .js-pop{{display:block}}.js-wrap.open .js-backdrop{{display:block}}
|
|
</style>
|
|
</head>
|
|
<body class="bg-stone-50 text-stone-900">
|
|
<script type="text/sx-styles" data-hash="{styles_hash}">{styles_json}</script>
|
|
<script type="text/sx" data-components data-hash="{component_hash}">{component_defs}</script>
|
|
<script type="text/sx-pages">{pages_sx}</script>
|
|
<script type="text/sx" data-mount="body">{page_sx}</script>
|
|
<script src="{asset_url}/scripts/sx-browser.js?v={sx_js_hash}"></script>
|
|
<script src="{asset_url}/scripts/body.js?v={body_js_hash}"></script>
|
|
</body>
|
|
</html>"""
|
|
|
|
|
|
def _build_pages_sx(service: str) -> str:
|
|
"""Build SX page registry for client-side routing.
|
|
|
|
Returns SX dict literals (one per page) parseable by the client's
|
|
``parse`` function. Each dict has keys: name, path, auth, has-data,
|
|
content, closure.
|
|
"""
|
|
from .pages import get_all_pages
|
|
from .parser import serialize as sx_serialize
|
|
|
|
pages = get_all_pages(service)
|
|
if not pages:
|
|
return ""
|
|
|
|
entries = []
|
|
for page_def in pages.values():
|
|
content_src = ""
|
|
if page_def.content_expr is not None:
|
|
try:
|
|
content_src = sx_serialize(page_def.content_expr)
|
|
except Exception:
|
|
pass
|
|
|
|
auth = page_def.auth if isinstance(page_def.auth, str) else "custom"
|
|
has_data = "true" if page_def.data_expr is not None else "false"
|
|
|
|
# Build closure as SX dict
|
|
closure_parts: list[str] = []
|
|
for k, v in page_def.closure.items():
|
|
if isinstance(v, (str, int, float, bool)):
|
|
closure_parts.append(f":{k} {_sx_literal(v)}")
|
|
closure_sx = "{" + " ".join(closure_parts) + "}"
|
|
|
|
entry = (
|
|
"{:name " + _sx_literal(page_def.name)
|
|
+ " :path " + _sx_literal(page_def.path)
|
|
+ " :auth " + _sx_literal(auth)
|
|
+ " :has-data " + has_data
|
|
+ " :content " + _sx_literal(content_src)
|
|
+ " :closure " + closure_sx + "}"
|
|
)
|
|
entries.append(entry)
|
|
|
|
return "\n".join(entries)
|
|
|
|
|
|
def _sx_literal(v: object) -> str:
|
|
"""Serialize a Python value as an SX literal."""
|
|
if v is None:
|
|
return "nil"
|
|
if isinstance(v, bool):
|
|
return "true" if v else "false"
|
|
if isinstance(v, (int, float)):
|
|
return str(v)
|
|
if isinstance(v, str):
|
|
escaped = v.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n")
|
|
return f'"{escaped}"'
|
|
return "nil"
|
|
|
|
|
|
def sx_page(ctx: dict, page_sx: str, *,
|
|
meta_html: str = "") -> str:
|
|
"""Return a minimal HTML shell that boots the page from sx source.
|
|
|
|
The browser loads component definitions and page sx, then sx.js
|
|
renders everything client-side. CSS rules are scanned from the sx
|
|
source and component defs, then injected as a <style> block.
|
|
"""
|
|
from .jinja_bridge import components_for_page, css_classes_for_page
|
|
from .css_registry import lookup_rules, get_preamble, registry_loaded, store_css_hash
|
|
|
|
# Per-page component bundle: only definitions this page needs
|
|
component_defs, component_hash = components_for_page(page_sx)
|
|
|
|
# Check if client already has this version cached (via cookie)
|
|
# In dev mode, always send full source so edits are visible immediately
|
|
client_hash = _get_sx_comp_cookie()
|
|
if not _is_dev_mode() and client_hash and client_hash == component_hash:
|
|
# Client has current components cached — send empty source
|
|
component_defs = ""
|
|
|
|
# Scan for CSS classes — only from components this page uses + page source
|
|
sx_css = ""
|
|
sx_css_classes = ""
|
|
sx_css_hash = ""
|
|
if registry_loaded():
|
|
classes = css_classes_for_page(page_sx)
|
|
# Always include body classes
|
|
classes.update(["bg-stone-50", "text-stone-900"])
|
|
rules = lookup_rules(classes)
|
|
sx_css = get_preamble() + rules
|
|
sx_css_hash = store_css_hash(classes)
|
|
sx_css_classes = sx_css_hash
|
|
|
|
asset_url = get_asset_url(ctx)
|
|
title = ctx.get("base_title", "Rose Ash")
|
|
csrf = _get_csrf_token()
|
|
|
|
# Dev mode: pretty-print page sx for readable View Source
|
|
if _is_dev_mode() and page_sx and page_sx.startswith("("):
|
|
from .parser import parse as _parse, serialize as _serialize
|
|
try:
|
|
page_sx = _serialize(_parse(page_sx), pretty=True)
|
|
except Exception:
|
|
pass
|
|
|
|
# Style dictionary for client-side css primitive
|
|
styles_hash = _get_style_dict_hash()
|
|
client_styles_hash = _get_sx_styles_cookie()
|
|
if not _is_dev_mode() and client_styles_hash and client_styles_hash == styles_hash:
|
|
styles_json = "" # Client has cached version
|
|
else:
|
|
styles_json = _build_style_dict_json()
|
|
|
|
# Page registry for client-side routing
|
|
from quart import current_app
|
|
pages_sx = _build_pages_sx(current_app.name)
|
|
|
|
return _SX_PAGE_TEMPLATE.format(
|
|
title=_html_escape(title),
|
|
asset_url=asset_url,
|
|
meta_html=meta_html,
|
|
csrf=_html_escape(csrf),
|
|
component_hash=component_hash,
|
|
component_defs=component_defs,
|
|
styles_hash=styles_hash,
|
|
styles_json=styles_json,
|
|
pages_sx=pages_sx.replace("{", "{{").replace("}", "}}"),
|
|
page_sx=page_sx,
|
|
sx_css=sx_css,
|
|
sx_css_classes=sx_css_classes,
|
|
sx_js_hash=_script_hash("sx-browser.js"),
|
|
body_js_hash=_script_hash("body.js"),
|
|
)
|
|
|
|
|
|
_SCRIPT_HASH_CACHE: dict[str, str] = {}
|
|
_STYLE_DICT_JSON: str = ""
|
|
_STYLE_DICT_HASH: str = ""
|
|
|
|
|
|
def _build_style_dict_json() -> str:
|
|
"""Build compact JSON style dictionary for client-side css primitive."""
|
|
global _STYLE_DICT_JSON, _STYLE_DICT_HASH
|
|
if _STYLE_DICT_JSON:
|
|
return _STYLE_DICT_JSON
|
|
|
|
import json
|
|
from .style_dict import (
|
|
STYLE_ATOMS, PSEUDO_VARIANTS, RESPONSIVE_BREAKPOINTS,
|
|
KEYFRAMES, ARBITRARY_PATTERNS, CHILD_SELECTOR_ATOMS,
|
|
)
|
|
|
|
# Derive child selector prefixes from CHILD_SELECTOR_ATOMS
|
|
prefixes = set()
|
|
for atom in CHILD_SELECTOR_ATOMS:
|
|
# "space-y-4" → "space-y-", "divide-y" → "divide-"
|
|
for sep in ("space-x-", "space-y-", "divide-x", "divide-y"):
|
|
if atom.startswith(sep):
|
|
prefixes.add(sep)
|
|
break
|
|
|
|
data = {
|
|
"a": STYLE_ATOMS,
|
|
"v": PSEUDO_VARIANTS,
|
|
"b": RESPONSIVE_BREAKPOINTS,
|
|
"k": KEYFRAMES,
|
|
"p": ARBITRARY_PATTERNS,
|
|
"c": sorted(prefixes),
|
|
}
|
|
_STYLE_DICT_JSON = json.dumps(data, separators=(",", ":"))
|
|
_STYLE_DICT_HASH = hashlib.md5(_STYLE_DICT_JSON.encode()).hexdigest()[:8]
|
|
return _STYLE_DICT_JSON
|
|
|
|
|
|
def _get_style_dict_hash() -> str:
|
|
"""Get the hash of the style dictionary JSON."""
|
|
if not _STYLE_DICT_HASH:
|
|
_build_style_dict_json()
|
|
return _STYLE_DICT_HASH
|
|
|
|
|
|
def _get_sx_styles_cookie() -> str:
|
|
"""Read the sx-styles-hash cookie from the current request."""
|
|
try:
|
|
from quart import request
|
|
return request.cookies.get("sx-styles-hash", "")
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def _script_hash(filename: str) -> str:
|
|
"""Compute MD5 hash of a static script file, cached for process lifetime."""
|
|
if filename not in _SCRIPT_HASH_CACHE:
|
|
try:
|
|
data = (Path("static") / "scripts" / filename).read_bytes()
|
|
_SCRIPT_HASH_CACHE[filename] = hashlib.md5(data).hexdigest()[:8]
|
|
except Exception:
|
|
_SCRIPT_HASH_CACHE[filename] = "dev"
|
|
return _SCRIPT_HASH_CACHE[filename]
|
|
|
|
|
|
def _get_csrf_token() -> str:
|
|
"""Get the CSRF token from the current request context."""
|
|
try:
|
|
from quart import g
|
|
return getattr(g, "csrf_token", "")
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def _get_sx_comp_cookie() -> str:
|
|
"""Read the sx-comp-hash cookie from the current request."""
|
|
try:
|
|
from quart import request
|
|
return request.cookies.get("sx-comp-hash", "")
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def _is_dev_mode() -> bool:
|
|
"""Check if running in dev mode (RELOAD=true)."""
|
|
import os
|
|
return os.getenv("RELOAD") == "true"
|
|
|
|
|
|
def _pretty_print_sx_body(body: str) -> str:
|
|
"""Pretty-print the sx portion of a response body, preserving HTML blocks."""
|
|
import re
|
|
from .parser import parse_all as _parse_all, serialize as _serialize
|
|
|
|
# Split HTML prefix blocks (<style>, <script>) from the sx tail
|
|
# These are always at the start, each on its own line
|
|
parts: list[str] = []
|
|
rest = body
|
|
while rest.startswith("<"):
|
|
end = rest.find(">", rest.find("</")) + 1
|
|
if end <= 0:
|
|
break
|
|
# Find end of the closing tag
|
|
tag_match = re.match(r'<(style|script)[^>]*>[\s\S]*?</\1>', rest)
|
|
if tag_match:
|
|
parts.append(tag_match.group(0))
|
|
rest = rest[tag_match.end():].lstrip("\n")
|
|
else:
|
|
break
|
|
|
|
sx_source = rest.strip()
|
|
if not sx_source or sx_source[0] != "(":
|
|
return body
|
|
|
|
try:
|
|
exprs = _parse_all(sx_source)
|
|
if len(exprs) == 1:
|
|
parts.append(_serialize(exprs[0], pretty=True))
|
|
else:
|
|
# Multiple top-level expressions — indent each
|
|
pretty_parts = [_serialize(expr, pretty=True) for expr in exprs]
|
|
parts.append("\n\n".join(pretty_parts))
|
|
return "\n\n".join(parts)
|
|
except Exception:
|
|
return body
|
|
|
|
|
|
def _html_escape(s: str) -> str:
|
|
"""Minimal HTML escaping for attribute values."""
|
|
return (s.replace("&", "&")
|
|
.replace("<", "<")
|
|
.replace(">", ">")
|
|
.replace('"', """))
|