Files
rose-ash/shared/sx/pages.py
giles 0ce3f95d6c Phase 7c+7d: cache invalidation + offline data layer
7c: Client data cache management via element attributes
(sx-cache-invalidate) and response headers (SX-Cache-Invalidate,
SX-Cache-Update). Programmatic API: invalidate-page-cache,
invalidate-all-page-cache, update-page-cache.

7d: Service Worker (sx-sw.js) with IndexedDB for offline-capable
data caching. Network-first for /sx/data/ and /sx/io/, stale-while-
revalidate for /static/. Cache invalidation propagates from
in-memory cache to SW via postMessage.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 00:45:33 +00:00

1228 lines
46 KiB
Python

"""
Declarative page registry and blueprint mounting.
Supports ``defpage`` s-expressions that define GET route handlers
in .sx files instead of Python. Each page is a self-contained
declaration with path, auth, layout, data, and content slots.
Usage::
from shared.sx.pages import load_page_file, mount_pages
# Load page definitions from .sx files
load_page_file("blog/sx/pages/admin.sx", "blog")
# Mount page routes onto an existing blueprint
mount_pages(bp, "blog")
"""
from __future__ import annotations
import inspect
import logging
import os
from typing import Any
from .types import PageDef
logger = logging.getLogger("sx.pages")
# ---------------------------------------------------------------------------
# Registry — service → page-name → PageDef
# ---------------------------------------------------------------------------
_PAGE_REGISTRY: dict[str, dict[str, PageDef]] = {}
_PAGE_HELPERS: dict[str, dict[str, Any]] = {} # service → name → callable
def register_page(service: str, page_def: PageDef) -> None:
"""Register a page definition for a service."""
if service not in _PAGE_REGISTRY:
_PAGE_REGISTRY[service] = {}
_PAGE_REGISTRY[service][page_def.name] = page_def
logger.debug("Registered page %s:%s path=%s", service, page_def.name, page_def.path)
def get_page(service: str, name: str) -> PageDef | None:
"""Look up a registered page by service and name."""
return _PAGE_REGISTRY.get(service, {}).get(name)
def get_all_pages(service: str) -> dict[str, PageDef]:
"""Return all pages for a service."""
return dict(_PAGE_REGISTRY.get(service, {}))
def clear_pages(service: str | None = None) -> None:
"""Clear page registry. If service given, clear only that service."""
if service is None:
_PAGE_REGISTRY.clear()
else:
_PAGE_REGISTRY.pop(service, None)
def register_page_helpers(service: str, helpers: dict[str, Any]) -> None:
"""Register Python functions available in defpage content expressions.
These are injected into the evaluation environment when executing
defpage content, allowing defpage to call into Python::
register_page_helpers("sx", {
"docs-content": docs_content_sx,
"reference-content": reference_content_sx,
})
Then in .sx::
(defpage docs-page
:path "/docs/<slug>"
:auth :public
:content (docs-content slug))
"""
from .boundary import validate_helper, validate_boundary_value
import asyncio
import functools
for name in helpers:
validate_helper(service, name)
# Wrap helpers to validate return values at the boundary.
# Async generators pass through unwrapped — their yields are validated
# by the streaming infrastructure, not at the helper boundary.
wrapped: dict[str, Any] = {}
for name, fn in helpers.items():
if inspect.isasyncgenfunction(fn):
# Async generator: pass through (streaming infra validates yields)
wrapped[name] = fn
elif asyncio.iscoroutinefunction(fn):
@functools.wraps(fn)
async def _async_wrap(*a, _fn=fn, _name=name, **kw):
result = await _fn(*a, **kw)
validate_boundary_value(result, context=f"helper {_name!r}")
return result
wrapped[name] = _async_wrap
else:
@functools.wraps(fn)
def _sync_wrap(*a, _fn=fn, _name=name, **kw):
result = _fn(*a, **kw)
validate_boundary_value(result, context=f"helper {_name!r}")
return result
wrapped[name] = _sync_wrap
if service not in _PAGE_HELPERS:
_PAGE_HELPERS[service] = {}
_PAGE_HELPERS[service].update(wrapped)
def get_page_helpers(service: str) -> dict[str, Any]:
"""Return registered page helpers for a service."""
return dict(_PAGE_HELPERS.get(service, {}))
# ---------------------------------------------------------------------------
# Loading — parse .sx files and collect PageDef instances
# ---------------------------------------------------------------------------
def load_page_file(filepath: str, service_name: str) -> list[PageDef]:
"""Parse an .sx file, evaluate it, and register any PageDef values."""
from .parser import parse_all
from .evaluator import _eval as _raw_eval, _trampoline
_eval = lambda expr, env: _trampoline(_raw_eval(expr, env))
from .jinja_bridge import get_component_env
with open(filepath, encoding="utf-8") as f:
source = f.read()
# Seed env with component definitions so pages can reference components
env = dict(get_component_env())
exprs = parse_all(source)
pages: list[PageDef] = []
for expr in exprs:
_eval(expr, env)
# Collect all PageDef values from the env
for key, val in env.items():
if isinstance(val, PageDef):
register_page(service_name, val)
pages.append(val)
return pages
def load_page_dir(directory: str, service_name: str) -> list[PageDef]:
"""Load all .sx files from a directory and register pages."""
import glob as glob_mod
pages: list[PageDef] = []
for filepath in sorted(glob_mod.glob(os.path.join(directory, "*.sx"))):
pages.extend(load_page_file(filepath, service_name))
return pages
# ---------------------------------------------------------------------------
# Page execution
# ---------------------------------------------------------------------------
async def _eval_slot(expr: Any, env: dict, ctx: Any) -> str:
"""Evaluate a page slot expression and return an sx source string.
Expands component calls (so IO in the body executes) but serializes
the result as SX wire format, not HTML.
"""
from .async_eval import async_eval_slot_to_sx
return await async_eval_slot_to_sx(expr, env, ctx)
def _replace_suspense_sexp(sx: str, stream_id: str, replacement: str) -> str:
"""Replace a rendered ~suspense div in SX source with replacement content.
After _eval_slot, ~suspense expands to:
(div :id "sx-suspense-{id}" :data-suspense "{id}" :style "display:contents" ...)
This finds the balanced s-expression containing :data-suspense "{id}" and
replaces it with the given replacement string.
"""
marker = f':data-suspense "{stream_id}"'
idx = sx.find(marker)
if idx < 0:
return sx
# Walk backwards to find the opening paren of the containing (div ...)
start = sx.rfind("(", 0, idx)
if start < 0:
return sx
# Walk forward from start to find matching close paren (balanced)
depth = 0
i = start
while i < len(sx):
ch = sx[i]
if ch == "(":
depth += 1
elif ch == ")":
depth -= 1
if depth == 0:
return sx[:start] + replacement + sx[i + 1:]
elif ch == '"':
# Skip string contents
i += 1
while i < len(sx) and sx[i] != '"':
if sx[i] == "\\":
i += 1 # skip escaped char
i += 1
i += 1
return sx
async def execute_page(
page_def: PageDef,
service_name: str,
url_params: dict[str, Any] | None = None,
) -> str:
"""Execute a declarative page and return the full response string.
1. Build env from component env + page closure + URL params
2. If :data — async_eval(data_expr) → merge result dict into env
3. Render slots: async_eval_to_sx(content_expr) etc.
4. get_template_context() for header construction
5. Resolve layout → header rows
6. Branch: full_page_sx() vs oob_page_sx() based on is_htmx_request()
"""
from .jinja_bridge import get_component_env, _get_request_context
from .async_eval import async_eval
from .page import get_template_context
from .helpers import full_page_sx, oob_page_sx, sx_response
from .layouts import get_layout
from shared.browser.app.utils.htmx import is_htmx_request
if url_params is None:
url_params = {}
# Build environment
env = dict(get_component_env())
env.update(get_page_helpers(service_name))
env.update(page_def.closure)
# Inject URL params as kebab-case symbols
for key, val in url_params.items():
kebab = key.replace("_", "-")
env[kebab] = val
env[key] = val # also provide snake_case for convenience
# Get request context for I/O primitives
ctx = _get_request_context()
# Evaluate :data expression if present
_multi_stream_content = None
if page_def.data_expr is not None:
data_result = await async_eval(page_def.data_expr, env, ctx)
if hasattr(data_result, '__aiter__'):
# Multi-stream: consume generator, eval :content per chunk,
# combine into shell with resolved suspense slots.
chunks = []
async for chunk in data_result:
if not isinstance(chunk, dict):
continue
chunk = dict(chunk)
stream_id = chunk.pop("stream-id", "stream-content")
chunk_env = dict(env)
for k, v in chunk.items():
chunk_env[k.replace("_", "-")] = v
chunk_sx = await _eval_slot(page_def.content_expr, chunk_env, ctx) if page_def.content_expr else ""
chunks.append((stream_id, chunk_sx))
# Build content: if :shell exists, render it and inline resolved chunks
if page_def.shell_expr is not None:
shell_sx = await _eval_slot(page_def.shell_expr, env, ctx)
# Replace each rendered suspense div with resolved content.
# _eval_slot expands ~suspense into:
# (div :id "sx-suspense-X" :data-suspense "X" :style "display:contents" ...)
# We find the balanced s-expr containing :data-suspense "X" and replace it.
for stream_id, chunk_sx in chunks:
shell_sx = _replace_suspense_sexp(shell_sx, stream_id, chunk_sx)
_multi_stream_content = shell_sx
else:
# No shell: just concatenate all chunks in a fragment
parts = " ".join(sx for _, sx in chunks)
_multi_stream_content = f"(<> {parts})"
elif isinstance(data_result, dict):
# Merge with kebab-case keys so SX symbols can reference them
for k, v in data_result.items():
env[k.replace("_", "-")] = v
# Render content slot (required)
if _multi_stream_content is not None:
content_sx = _multi_stream_content
else:
content_sx = await _eval_slot(page_def.content_expr, env, ctx)
# Render optional slots
filter_sx = ""
if page_def.filter_expr is not None:
filter_sx = await _eval_slot(page_def.filter_expr, env, ctx)
aside_sx = ""
if page_def.aside_expr is not None:
aside_sx = await _eval_slot(page_def.aside_expr, env, ctx)
menu_sx = ""
if page_def.menu_expr is not None:
menu_sx = await _eval_slot(page_def.menu_expr, env, ctx)
# Resolve layout → header rows + mobile menu fallback
tctx = await get_template_context()
header_rows = ""
oob_headers = ""
layout_kwargs: dict[str, Any] = {}
if page_def.layout is not None:
if isinstance(page_def.layout, str):
layout_name = page_def.layout
elif isinstance(page_def.layout, list):
# (:layout-name :key val ...) — unevaluated list from defpage
# Evaluate each value in the current env to resolve dynamic bindings
from .types import Keyword as SxKeyword, Symbol as SxSymbol
raw = page_def.layout
# First element is layout name (keyword or symbol or string)
first = raw[0]
if isinstance(first, SxKeyword):
layout_name = first.name
elif isinstance(first, SxSymbol):
layout_name = first.name
elif isinstance(first, str):
layout_name = first
else:
layout_name = str(first)
# Parse keyword args — evaluate values at request time
i = 1
while i < len(raw):
k = raw[i]
if isinstance(k, SxKeyword) and i + 1 < len(raw):
raw_val = raw[i + 1]
resolved = await async_eval(raw_val, env, ctx)
layout_kwargs[k.name.replace("-", "_")] = resolved
i += 2
else:
i += 1
else:
layout_name = str(page_def.layout)
layout = get_layout(layout_name)
if layout is not None:
header_rows = await layout.full_headers(tctx, **layout_kwargs)
oob_headers = await layout.oob_headers(tctx, **layout_kwargs)
if not menu_sx:
menu_sx = await layout.mobile_menu(tctx, **layout_kwargs)
# Branch on request type
is_htmx = is_htmx_request()
if is_htmx:
# Compute content expression deps so the server sends component
# definitions the client needs for future client-side routing
extra_deps: set[str] | None = None
if page_def.content_expr is not None and page_def.data_expr is None:
from .deps import components_needed
from .parser import serialize
try:
content_src = serialize(page_def.content_expr)
extra_deps = components_needed(content_src, get_component_env())
except Exception:
pass # non-critical — client will just fall back to server
return sx_response(await oob_page_sx(
oobs=oob_headers if oob_headers else "",
filter=filter_sx,
aside=aside_sx,
content=content_sx,
menu=menu_sx,
), extra_component_names=extra_deps)
else:
return await full_page_sx(
tctx,
header_rows=header_rows,
filter=filter_sx,
aside=aside_sx,
content=content_sx,
menu=menu_sx,
)
# ---------------------------------------------------------------------------
# Streaming page execution (Phase 6: Streaming & Suspense)
# ---------------------------------------------------------------------------
async def execute_page_streaming(
page_def: PageDef,
service_name: str,
url_params: dict[str, Any] | None = None,
):
"""Execute a page with streaming response.
All context-dependent setup (g, request, current_app access) runs in
this regular async function — called while the request context is live.
Returns an async generator that yields pre-computed HTML chunks and
awaits already-created tasks (no further context access needed).
"""
import asyncio
from .jinja_bridge import get_component_env, _get_request_context
from .async_eval import async_eval
from .page import get_template_context
from .helpers import (
render_to_html as _helpers_render_to_html,
sx_page_streaming_parts,
sx_streaming_resolve_script,
)
from .parser import SxExpr, serialize as sx_serialize
from .layouts import get_layout
if url_params is None:
url_params = {}
env = dict(get_component_env())
env.update(get_page_helpers(service_name))
env.update(page_def.closure)
for key, val in url_params.items():
kebab = key.replace("_", "-")
env[kebab] = val
env[key] = val
ctx = _get_request_context()
tctx = await get_template_context()
# Build fallback expressions
if page_def.fallback_expr is not None:
fallback_sx = sx_serialize(page_def.fallback_expr)
else:
fallback_sx = (
'(div :class "p-8 animate-pulse"'
' (div :class "h-8 bg-stone-200 rounded mb-4 w-1/3")'
' (div :class "h-64 bg-stone-200 rounded"))'
)
header_fallback = '(div :class "h-12 bg-stone-200 animate-pulse")'
# Resolve layout
layout = None
layout_kwargs: dict[str, Any] = {}
if page_def.layout is not None:
if isinstance(page_def.layout, str):
layout_name = page_def.layout
elif isinstance(page_def.layout, list):
from .types import Keyword as SxKeyword, Symbol as SxSymbol
raw = page_def.layout
first = raw[0]
layout_name = (
first.name if isinstance(first, (SxKeyword, SxSymbol))
else str(first)
)
i = 1
while i < len(raw):
k = raw[i]
if isinstance(k, SxKeyword) and i + 1 < len(raw):
resolved = await async_eval(raw[i + 1], env, ctx)
layout_kwargs[k.name.replace("-", "_")] = resolved
i += 2
else:
i += 1
else:
layout_name = str(page_def.layout)
layout = get_layout(layout_name)
# --- Launch concurrent IO tasks (inherit context via create_task) ---
_stream_queue: asyncio.Queue = asyncio.Queue()
_multi_stream = False
async def _eval_data_and_content():
"""Evaluate :data then :content.
If :data returns an async generator (multi-stream mode), iterate it
and push each (stream_id, content_sx) to _stream_queue incrementally.
The main stream loop drains the queue and sends resolve scripts as
items arrive — giving true staggered streaming.
"""
nonlocal _multi_stream
try:
data_env = dict(env)
if page_def.data_expr is not None:
data_result = await async_eval(page_def.data_expr, data_env, ctx)
# Async generator: multi-stream mode
if hasattr(data_result, '__aiter__'):
_multi_stream = True
async for chunk in data_result:
if not isinstance(chunk, dict):
continue
chunk = dict(chunk) # copy so pop doesn't mutate
stream_id = chunk.pop("stream-id", "stream-content")
chunk_env = dict(env)
for k, v in chunk.items():
chunk_env[k.replace("_", "-")] = v
content_sx = await _eval_slot(page_def.content_expr, chunk_env, ctx) if page_def.content_expr else ""
await _stream_queue.put(("data", stream_id, content_sx))
await _stream_queue.put(("data-done",))
return
if isinstance(data_result, dict):
for k, v in data_result.items():
data_env[k.replace("_", "-")] = v
content_sx = await _eval_slot(page_def.content_expr, data_env, ctx) if page_def.content_expr else ""
filter_sx = await _eval_slot(page_def.filter_expr, data_env, ctx) if page_def.filter_expr else ""
aside_sx = await _eval_slot(page_def.aside_expr, data_env, ctx) if page_def.aside_expr else ""
menu_sx = await _eval_slot(page_def.menu_expr, data_env, ctx) if page_def.menu_expr else ""
await _stream_queue.put(("data-single", content_sx, filter_sx, aside_sx, menu_sx))
except Exception as e:
logger.error("Streaming data task failed: %s", e)
await _stream_queue.put(("data-done",))
async def _eval_headers():
try:
if layout is None:
await _stream_queue.put(("headers", "", ""))
return
rows = await layout.full_headers(tctx, **layout_kwargs)
menu = await layout.mobile_menu(tctx, **layout_kwargs)
await _stream_queue.put(("headers", rows, menu))
except Exception as e:
logger.error("Streaming headers task failed: %s", e)
await _stream_queue.put(("headers", "", ""))
data_task = asyncio.create_task(_eval_data_and_content())
header_task = asyncio.create_task(_eval_headers())
# --- Build initial shell as HTML (still in request context) ---
# Render to HTML so [data-suspense] elements are real DOM immediately.
# No dependency on sx-browser.js boot timing for the initial shell.
suspense_header_sx = f'(~suspense :id "stream-headers" :fallback {header_fallback})'
# When :shell is provided, it renders directly as the content slot
# (it contains its own ~suspense for the data-dependent part).
# Otherwise, wrap the entire :content in a single suspense.
if page_def.shell_expr is not None:
shell_content_sx = await _eval_slot(page_def.shell_expr, env, ctx)
suspense_content_sx = shell_content_sx
else:
suspense_content_sx = f'(~suspense :id "stream-content" :fallback {fallback_sx})'
initial_page_html = await _helpers_render_to_html("app-body",
header_rows=SxExpr(suspense_header_sx),
content=SxExpr(suspense_content_sx),
)
# Include layout component refs + page content so the scan picks up
# their transitive deps (e.g. ~cart-mini, ~auth-menu in headers).
layout_refs = ""
if layout is not None and hasattr(layout, "component_names"):
layout_refs = " ".join(f"({n})" for n in layout.component_names)
content_ref = ""
if page_def.content_expr is not None:
content_ref = sx_serialize(page_def.content_expr)
shell_ref = ""
if page_def.shell_expr is not None:
shell_ref = sx_serialize(page_def.shell_expr)
page_sx_for_scan = f'(<> {layout_refs} {content_ref} {shell_ref} (~app-body :header-rows {suspense_header_sx} :content {suspense_content_sx}))'
shell, tail = sx_page_streaming_parts(
tctx, initial_page_html, page_sx=page_sx_for_scan,
)
# Capture component env + extras scanner while we still have context.
# Resolved SX may reference components not in the initial scan
# (e.g. ~cart-mini from IO-generated header content).
from .jinja_bridge import components_for_page as _comp_scan
from quart import current_app as _ca
_service = _ca.name
# Track which components were already sent in the shell
_shell_scan = page_sx_for_scan
def _extra_defs(sx_source: str) -> str:
"""Return component defs needed by sx_source but not in shell."""
from .deps import components_needed
comp_env = dict(get_component_env())
shell_needed = components_needed(_shell_scan, comp_env)
resolve_needed = components_needed(sx_source, comp_env)
extra = resolve_needed - shell_needed
if not extra:
return ""
from .parser import serialize
from .types import Component
parts = []
for key, val in comp_env.items():
if isinstance(val, Component) and (f"~{val.name}" in extra or key in extra):
param_strs = ["&key"] + list(val.params)
if val.has_children:
param_strs.extend(["&rest", "children"])
params_sx = "(" + " ".join(param_strs) + ")"
body_sx = serialize(val.body, pretty=True)
parts.append(f"(defcomp ~{val.name} {params_sx} {body_sx})")
return "\n".join(parts)
# --- Return async generator that yields chunks ---
# No context access needed below — just awaiting tasks and yielding strings.
async def _stream_chunks():
yield shell + tail
# Both tasks push tagged items onto _stream_queue. We drain until
# both are done. Items: ("headers", rows, menu), ("data-single", ...),
# ("data", stream_id, sx), ("data-done",).
remaining = 2 # waiting for: headers + data
while remaining > 0:
item = await _stream_queue.get()
kind = item[0]
try:
if kind == "headers":
_, header_rows, header_menu = item
remaining -= 1
if header_rows:
extras = _extra_defs(header_rows)
yield sx_streaming_resolve_script("stream-headers", header_rows, extras)
elif kind == "data-single":
_, content_sx, filter_sx, aside_sx, menu_sx = item
remaining -= 1
extras = _extra_defs(content_sx)
yield sx_streaming_resolve_script("stream-content", content_sx, extras)
elif kind == "data":
_, stream_id, content_sx = item
extras = _extra_defs(content_sx)
yield sx_streaming_resolve_script(stream_id, content_sx, extras)
elif kind == "data-done":
remaining -= 1
except Exception as e:
logger.error("Streaming resolve failed for %s: %s", kind, e)
yield "\n</body>\n</html>"
return _stream_chunks()
async def execute_page_streaming_oob(
page_def: PageDef,
service_name: str,
url_params: dict[str, Any] | None = None,
):
"""Execute a streaming page for HTMX/SX requests.
Like execute_page_streaming but yields OOB SX swap format instead of a
full HTML document:
1. First yield: OOB SX with shell content (suspense skeletons) + CSS + defs
2. Subsequent yields: __sxResolve script tags as data resolves
The client uses streaming fetch (ReadableStream) to process the OOB swap
immediately and then execute resolve scripts as they arrive.
"""
import asyncio
from .jinja_bridge import get_component_env, _get_request_context
from .async_eval import async_eval
from .page import get_template_context
from .helpers import (
oob_page_sx,
sx_streaming_resolve_script,
components_for_request,
SxExpr,
)
from .parser import serialize as sx_serialize
from .layouts import get_layout
if url_params is None:
url_params = {}
env = dict(get_component_env())
env.update(get_page_helpers(service_name))
env.update(page_def.closure)
for key, val in url_params.items():
kebab = key.replace("_", "-")
env[kebab] = val
env[key] = val
ctx = _get_request_context()
# Evaluate shell with suspense skeletons (no data yet)
shell_sx = ""
if page_def.shell_expr is not None:
shell_sx = await _eval_slot(page_def.shell_expr, env, ctx)
# Build initial OOB response with shell as content
tctx = await get_template_context()
# Resolve layout for OOB headers
layout = None
layout_kwargs: dict[str, Any] = {}
if page_def.layout is not None:
if isinstance(page_def.layout, str):
layout_name = page_def.layout
elif isinstance(page_def.layout, list):
from .types import Keyword as SxKeyword, Symbol as SxSymbol
raw = page_def.layout
first = raw[0]
layout_name = (
first.name if isinstance(first, (SxKeyword, SxSymbol))
else str(first)
)
i = 1
while i < len(raw):
k = raw[i]
if isinstance(k, SxKeyword) and i + 1 < len(raw):
resolved = await async_eval(raw[i + 1], env, ctx)
layout_kwargs[k.name.replace("-", "_")] = resolved
i += 2
else:
i += 1
else:
layout_name = str(page_def.layout)
layout = get_layout(layout_name)
# Launch concurrent tasks
_stream_queue: asyncio.Queue = asyncio.Queue()
async def _eval_data():
try:
if page_def.data_expr is not None:
data_result = await async_eval(page_def.data_expr, env, ctx)
if hasattr(data_result, '__aiter__'):
async for chunk in data_result:
if not isinstance(chunk, dict):
continue
chunk = dict(chunk)
stream_id = chunk.pop("stream-id", "stream-content")
chunk_env = dict(env)
for k, v in chunk.items():
chunk_env[k.replace("_", "-")] = v
content_sx = await _eval_slot(page_def.content_expr, chunk_env, ctx) if page_def.content_expr else ""
await _stream_queue.put(("data", stream_id, content_sx))
await _stream_queue.put(("data-done",))
return
await _stream_queue.put(("data-done",))
except Exception as e:
logger.error("Streaming OOB data task failed: %s", e)
await _stream_queue.put(("data-done",))
async def _eval_oob_headers():
try:
if layout is not None:
oob_headers = await layout.oob_headers(tctx, **layout_kwargs)
await _stream_queue.put(("headers", oob_headers))
else:
await _stream_queue.put(("headers", ""))
except Exception as e:
logger.error("Streaming OOB headers task failed: %s", e)
await _stream_queue.put(("headers", ""))
data_task = asyncio.create_task(_eval_data())
header_task = asyncio.create_task(_eval_oob_headers())
# Build initial OOB body with shell content (skeletons in place)
oob_body = await oob_page_sx(
oobs="", # headers will arrive via resolve script
content=shell_sx,
)
# Prepend component definitions + CSS (like sx_response does)
from quart import request
comp_defs = components_for_request(oob_body)
body = oob_body
if comp_defs:
body = (f'<script type="text/sx" data-components>'
f'{comp_defs}</script>\n{body}')
from .css_registry import scan_classes_from_sx, lookup_rules, registry_loaded
if registry_loaded():
new_classes = scan_classes_from_sx(oob_body)
if comp_defs:
new_classes.update(scan_classes_from_sx(comp_defs))
known_raw = request.headers.get("SX-Css", "")
if known_raw:
from .css_registry import lookup_css_hash
if len(known_raw) <= 16:
looked_up = lookup_css_hash(known_raw)
known_classes = looked_up if looked_up is not None else set()
else:
known_classes = set(known_raw.split(","))
new_classes -= known_classes
if new_classes:
new_rules = lookup_rules(new_classes)
if new_rules:
body = f'<style data-sx-css>{new_rules}</style>\n{body}'
# Capture component env for extra defs in resolve chunks
from .jinja_bridge import components_for_page as _comp_scan
_base_scan = oob_body
def _extra_defs(sx_source: str) -> str:
from .deps import components_needed
comp_env = dict(get_component_env())
base_needed = components_needed(_base_scan, comp_env)
resolve_needed = components_needed(sx_source, comp_env)
extra = resolve_needed - base_needed
if not extra:
return ""
from .parser import serialize
from .types import Component
parts = []
for key, val in comp_env.items():
if isinstance(val, Component) and (f"~{val.name}" in extra or key in extra):
param_strs = ["&key"] + list(val.params)
if val.has_children:
param_strs.extend(["&rest", "children"])
params_sx = "(" + " ".join(param_strs) + ")"
body_sx = serialize(val.body, pretty=True)
parts.append(f"(defcomp ~{val.name} {params_sx} {body_sx})")
return "\n".join(parts)
# Yield chunks
async def _stream_oob_chunks():
# First chunk: OOB swap with skeletons
yield body
# Drain queue for resolve scripts
remaining = 2 # headers + data
while remaining > 0:
item = await _stream_queue.get()
kind = item[0]
try:
if kind == "headers":
_, oob_hdr = item
remaining -= 1
# Headers don't need resolve scripts for OOB — they're
# handled by OOB swap attributes in the SX content itself.
# But if we have header content, send a resolve for it.
if oob_hdr:
extras = _extra_defs(oob_hdr)
yield sx_streaming_resolve_script("stream-headers", oob_hdr, extras)
elif kind == "data":
_, stream_id, content_sx = item
extras = _extra_defs(content_sx)
yield sx_streaming_resolve_script(stream_id, content_sx, extras)
elif kind == "data-done":
remaining -= 1
except Exception as e:
logger.error("Streaming OOB resolve failed for %s: %s", kind, e)
return _stream_oob_chunks()
# ---------------------------------------------------------------------------
# Blueprint mounting
# ---------------------------------------------------------------------------
def compute_page_render_plans(service_name: str) -> None:
"""Pre-compute and cache render plans for all pages in a service.
Must be called after components are loaded (compute_all_deps/io_refs done)
and pages are registered. Stores plans on PageDef.render_plan.
"""
from .parser import serialize
from .deps import page_render_plan, get_all_io_names
from .jinja_bridge import _COMPONENT_ENV
io_names = get_all_io_names()
pages = get_all_pages(service_name)
for page_def in pages.values():
if page_def.content_expr is not None:
content_src = serialize(page_def.content_expr)
page_def.render_plan = page_render_plan(content_src, _COMPONENT_ENV, io_names)
logger.info("Computed render plans for %d pages in %s", len(pages), service_name)
def auto_mount_pages(app: Any, service_name: str) -> None:
"""Auto-mount all registered defpages for a service directly on the app.
Pages must have absolute paths (from the service URL root).
Called once per service in app.py after setup_*_pages().
Also mounts the /sx/data/ endpoint for client-side data fetching.
"""
pages = get_all_pages(service_name)
# Pre-compute render plans (which components render where)
compute_page_render_plans(service_name)
for page_def in pages.values():
_mount_one_page(app, service_name, page_def)
logger.info("Auto-mounted %d defpages for %s", len(pages), service_name)
# Mount page data endpoint for client-side rendering of :data pages
has_data_pages = any(p.data_expr is not None for p in pages.values())
if has_data_pages:
auto_mount_page_data(app, service_name)
# Mount IO proxy endpoint for Phase 5: client-side IO primitives
mount_io_endpoint(app, service_name)
# Mount service worker at root scope for offline data layer
mount_service_worker(app)
def mount_pages(bp: Any, service_name: str,
names: set[str] | list[str] | None = None) -> None:
"""Mount registered PageDef routes onto a Quart Blueprint.
For each PageDef, adds a GET route with appropriate auth/cache
decorators. Coexists with existing Python routes on the same blueprint.
If *names* is given, only mount pages whose name is in the set.
"""
from quart import make_response
pages = get_all_pages(service_name)
for page_def in pages.values():
if names is not None and page_def.name not in names:
continue
_mount_one_page(bp, service_name, page_def)
def _mount_one_page(bp: Any, service_name: str, page_def: PageDef) -> None:
"""Mount a single PageDef as a GET route on the blueprint."""
from quart import make_response, Response
if page_def.stream:
# Streaming response: yields chunks as IO resolves
async def page_view(**kwargs: Any) -> Any:
from shared.browser.app.utils.htmx import is_htmx_request
current = get_page(service_name, page_def.name) or page_def
if is_htmx_request():
# Streaming OOB: shell with skeletons first, then resolve scripts
gen = await execute_page_streaming_oob(
current, service_name, url_params=kwargs,
)
return Response(gen, content_type="text/sx; charset=utf-8")
# Full page streaming: HTML document with inline resolve scripts
gen = await execute_page_streaming(
current, service_name, url_params=kwargs,
)
return Response(gen, content_type="text/html; charset=utf-8")
else:
# Standard non-streaming response
async def page_view(**kwargs: Any) -> Any:
current = get_page(service_name, page_def.name) or page_def
result = await execute_page(current, service_name, url_params=kwargs)
if hasattr(result, "status_code"):
return result
return await make_response(result, 200)
# Give the view function a unique name for Quart's routing
page_view.__name__ = f"defpage_{page_def.name.replace('-', '_')}"
page_view.__qualname__ = page_view.__name__
# Apply auth decorator
view_fn = _apply_auth(page_view, page_def.auth)
# Apply cache decorator
if page_def.cache:
view_fn = _apply_cache(view_fn, page_def.cache)
# Register the route
bp.add_url_rule(
page_def.path,
endpoint=page_view.__name__,
view_func=view_fn,
methods=["GET"],
)
logger.info("Mounted defpage %s:%s → GET %s", service_name, page_def.name, page_def.path)
def _apply_auth(fn: Any, auth: str | list) -> Any:
"""Wrap a view function with the appropriate auth decorator."""
if auth == "public":
return fn
if auth == "login":
from shared.browser.app.authz import require_login
return require_login(fn)
if auth == "admin":
from shared.browser.app.authz import require_admin
return require_admin(fn)
if auth == "post_author":
from shared.browser.app.authz import require_post_author
return require_post_author(fn)
if isinstance(auth, list) and auth and auth[0] == "rights":
from shared.browser.app.authz import require_rights
return require_rights(*auth[1:])(fn)
return fn
def _apply_cache(fn: Any, cache: dict) -> Any:
"""Wrap a view function with cache_page decorator."""
from shared.browser.app.redis_cacher import cache_page
ttl = cache.get("ttl", 0)
tag = cache.get("tag")
scope = cache.get("scope", "user")
return cache_page(ttl=ttl, tag=tag, scope=scope)(fn)
async def _check_page_auth(auth: str | list) -> Any | None:
"""Check auth for the data endpoint. Returns None if OK, or a response."""
from quart import g, abort as quart_abort
if auth == "public":
return None
user = g.get("user")
if auth == "login":
if not user:
quart_abort(401)
elif auth == "admin":
if not user or not user.get("rights", {}).get("admin"):
quart_abort(403)
elif isinstance(auth, list) and auth and auth[0] == "rights":
if not user:
quart_abort(401)
user_rights = set(user.get("rights", {}).keys())
required = set(auth[1:])
if not required.issubset(user_rights):
quart_abort(403)
return None
# ---------------------------------------------------------------------------
# Page data endpoint — evaluate :data expression, return SX
# ---------------------------------------------------------------------------
async def evaluate_page_data(
page_def: PageDef,
service_name: str,
url_params: dict[str, Any] | None = None,
) -> str:
"""Evaluate a defpage's :data expression and return result as SX.
This is the data-only counterpart to execute_page(). The client
fetches this when it has all component definitions but needs the
data bindings to render a :data page client-side.
Returns SX wire format (e.g. ``{:posts (list ...) :count 42}``),
parsed by the client's SX parser and merged into the eval env.
"""
from .jinja_bridge import get_component_env, _get_request_context
from .async_eval import async_eval
from .parser import serialize
if page_def.data_expr is None:
return "nil"
if url_params is None:
url_params = {}
# Build environment (same as execute_page)
env = dict(get_component_env())
env.update(get_page_helpers(service_name))
env.update(page_def.closure)
for key, val in url_params.items():
kebab = key.replace("_", "-")
env[kebab] = val
env[key] = val
ctx = _get_request_context()
data_result = await async_eval(page_def.data_expr, env, ctx)
# Multi-stream: async generator can't be serialized as a single dict.
# Return nil to signal the client to fall back to server-side rendering.
if hasattr(data_result, '__aiter__'):
# Close the generator cleanly
await data_result.aclose()
return "nil"
# Kebab-case dict keys (matching execute_page)
if isinstance(data_result, dict):
data_result = {
k.replace("_", "-"): v for k, v in data_result.items()
}
# Serialize the result as SX
return serialize(data_result)
def auto_mount_page_data(app: Any, service_name: str) -> None:
"""Mount a single /sx/data/ endpoint that serves page data as SX.
For each defpage with :data, the client can GET /sx/data/<page-name>
(with URL params as query args) and receive the evaluated :data
result serialized as SX wire format (text/sx).
Auth is enforced per-page: the endpoint looks up the page's auth
setting and checks it before evaluating the data expression.
"""
from quart import make_response, request, abort as quart_abort
async def page_data_view(page_name: str) -> Any:
page_def = get_page(service_name, page_name)
if page_def is None:
quart_abort(404)
if page_def.data_expr is None:
quart_abort(404)
# Check auth — same enforcement as the page route itself
auth_error = await _check_page_auth(page_def.auth)
if auth_error is not None:
return auth_error
# Extract URL params from query string
url_params = dict(request.args)
result_sx = await evaluate_page_data(
page_def, service_name, url_params=url_params,
)
resp = await make_response(result_sx, 200)
resp.content_type = "text/sx; charset=utf-8"
return resp
page_data_view.__name__ = "sx_page_data"
page_data_view.__qualname__ = "sx_page_data"
app.add_url_rule(
"/sx/data/<page_name>",
endpoint="sx_page_data",
view_func=page_data_view,
methods=["GET"],
)
logger.info("Mounted page data endpoint for %s at /sx/data/<page_name>", service_name)
def mount_io_endpoint(app: Any, service_name: str) -> None:
"""Mount /sx/io/<name> endpoint for client-side IO primitive calls.
The client can call any allowed IO primitive or page helper via GET/POST.
Result is returned as SX wire format (text/sx).
Falls back to page helpers when the name isn't a global IO primitive,
so service-specific functions like ``highlight`` work via the proxy.
"""
import asyncio as _asyncio
from quart import make_response, request, abort as quart_abort
from .primitives_io import IO_PRIMITIVES, execute_io
from .jinja_bridge import _get_request_context
from .parser import serialize
# Build allowlist from all component IO refs across this service
from .jinja_bridge import _COMPONENT_ENV
from .types import Component as _Comp
_ALLOWED_IO: set[str] = set()
for _val in _COMPONENT_ENV.values():
if isinstance(_val, _Comp) and _val.io_refs:
_ALLOWED_IO.update(_val.io_refs)
from shared.browser.app.csrf import csrf_exempt
@csrf_exempt
async def io_proxy(name: str) -> Any:
if name not in _ALLOWED_IO:
quart_abort(403)
# Parse args from query string or JSON body
args: list = []
kwargs: dict = {}
if request.method == "GET":
for k, v in request.args.items():
if k.startswith("_arg"):
args.append(v)
else:
kwargs[k] = v
else:
data = await request.get_json(silent=True) or {}
args = data.get("args", [])
kwargs = data.get("kwargs", {})
# Try global IO primitives first
if name in IO_PRIMITIVES:
ctx = _get_request_context()
result = await execute_io(name, args, kwargs, ctx)
else:
# Fall back to page helpers (service-specific functions)
helpers = get_page_helpers(service_name)
helper_fn = helpers.get(name)
if helper_fn is None:
quart_abort(404)
result = helper_fn(*args, **kwargs) if kwargs else helper_fn(*args)
if _asyncio.iscoroutine(result):
result = await result
result_sx = serialize(result) if result is not None else "nil"
resp = await make_response(result_sx, 200)
resp.content_type = "text/sx; charset=utf-8"
resp.headers["Cache-Control"] = "public, max-age=300"
return resp
io_proxy.__name__ = "sx_io_proxy"
io_proxy.__qualname__ = "sx_io_proxy"
app.add_url_rule(
"/sx/io/<name>",
endpoint="sx_io_proxy",
view_func=io_proxy,
methods=["GET", "POST"],
)
logger.info("Mounted IO proxy for %s: %s", service_name, sorted(_ALLOWED_IO))
# ---------------------------------------------------------------------------
# Service Worker mount
# ---------------------------------------------------------------------------
_SW_MOUNTED = False
def mount_service_worker(app: Any) -> None:
"""Mount the SX service worker at /sx-sw.js (root scope).
The service worker provides offline data caching:
- /sx/data/* and /sx/io/* responses cached in IndexedDB
- /static/* assets cached via Cache API (stale-while-revalidate)
- Everything else passes through to network
"""
global _SW_MOUNTED
if _SW_MOUNTED:
return
_SW_MOUNTED = True
sw_path = os.path.join(
os.path.dirname(__file__), "..", "static", "scripts", "sx-sw.js"
)
sw_path = os.path.abspath(sw_path)
async def serve_sw():
from quart import send_file
return await send_file(
sw_path,
mimetype="application/javascript",
cache_timeout=0, # no caching — SW updates checked by browser
)
app.add_url_rule("/sx-sw.js", endpoint="sx_service_worker", view_func=serve_sw)
logger.info("Mounted service worker at /sx-sw.js")