""" Declarative page registry and blueprint mounting. Supports ``defpage`` s-expressions that define GET route handlers in .sx files instead of Python. Each page is a self-contained declaration with path, auth, layout, data, and content slots. Usage:: from shared.sx.pages import load_page_file, mount_pages # Load page definitions from .sx files load_page_file("blog/sx/pages/admin.sx", "blog") # Mount page routes onto an existing blueprint mount_pages(bp, "blog") """ from __future__ import annotations import inspect import logging import os from typing import Any import traceback from .types import EvalError, PageDef logger = logging.getLogger("sx.pages") def _eval_error_sx(e: EvalError, context: str) -> str: """Render an EvalError as SX content that's visible to the developer.""" from .ref.sx_ref import escape_html as _esc msg = _esc(str(e)) ctx = _esc(context) return ( f'(div :class "sx-eval-error" :style ' f'"background:#fef2f2;border:1px solid #fca5a5;' f'color:#991b1b;padding:1rem;margin:1rem 0;' f'border-radius:0.5rem;font-family:monospace;white-space:pre-wrap"' f' (p :style "font-weight:700;margin:0 0 0.5rem" "SX EvalError in {ctx}")' f' (p :style "margin:0" "{msg}"))' ) # --------------------------------------------------------------------------- # Registry — service → page-name → PageDef # --------------------------------------------------------------------------- _PAGE_REGISTRY: dict[str, dict[str, PageDef]] = {} _PAGE_HELPERS: dict[str, dict[str, Any]] = {} # service → name → callable def register_page(service: str, page_def: PageDef) -> None: """Register a page definition for a service.""" if service not in _PAGE_REGISTRY: _PAGE_REGISTRY[service] = {} _PAGE_REGISTRY[service][page_def.name] = page_def logger.debug("Registered page %s:%s path=%s", service, page_def.name, page_def.path) def get_page(service: str, name: str) -> PageDef | None: """Look up a registered page by service and name.""" return _PAGE_REGISTRY.get(service, {}).get(name) def get_all_pages(service: str) -> dict[str, PageDef]: """Return all pages for a service.""" return dict(_PAGE_REGISTRY.get(service, {})) def clear_pages(service: str | None = None) -> None: """Clear page registry. If service given, clear only that service.""" if service is None: _PAGE_REGISTRY.clear() else: _PAGE_REGISTRY.pop(service, None) def register_page_helpers(service: str, helpers: dict[str, Any]) -> None: """Register Python functions available in defpage content expressions. These are injected into the evaluation environment when executing defpage content, allowing defpage to call into Python:: register_page_helpers("sx", { "docs-content": docs_content_sx, "reference-content": reference_content_sx, }) Then in .sx:: (defpage docs-page :path "/language/docs/" :auth :public :content (docs-content slug)) """ from .boundary import validate_helper, validate_boundary_value import asyncio import functools for name in helpers: validate_helper(service, name) # Wrap helpers to validate return values at the boundary. # Async generators pass through unwrapped — their yields are validated # by the streaming infrastructure, not at the helper boundary. wrapped: dict[str, Any] = {} for name, fn in helpers.items(): if inspect.isasyncgenfunction(fn): # Async generator: pass through (streaming infra validates yields) wrapped[name] = fn elif asyncio.iscoroutinefunction(fn): @functools.wraps(fn) async def _async_wrap(*a, _fn=fn, _name=name, **kw): result = await _fn(*a, **kw) validate_boundary_value(result, context=f"helper {_name!r}") return result wrapped[name] = _async_wrap else: @functools.wraps(fn) def _sync_wrap(*a, _fn=fn, _name=name, **kw): result = _fn(*a, **kw) validate_boundary_value(result, context=f"helper {_name!r}") return result wrapped[name] = _sync_wrap if service not in _PAGE_HELPERS: _PAGE_HELPERS[service] = {} _PAGE_HELPERS[service].update(wrapped) def get_page_helpers(service: str) -> dict[str, Any]: """Return registered page helpers for a service.""" return dict(_PAGE_HELPERS.get(service, {})) # --------------------------------------------------------------------------- # Loading — parse .sx files and collect PageDef instances # --------------------------------------------------------------------------- def load_page_file(filepath: str, service_name: str) -> list[PageDef]: """Parse an .sx file, evaluate it, and register any PageDef values.""" from .parser import parse_all from .ref.sx_ref import eval_expr as _raw_eval, trampoline as _trampoline _eval = lambda expr, env: _trampoline(_raw_eval(expr, env)) from .jinja_bridge import get_component_env with open(filepath, encoding="utf-8") as f: source = f.read() # Seed env with component definitions so pages can reference components env = dict(get_component_env()) exprs = parse_all(source) pages: list[PageDef] = [] for expr in exprs: _eval(expr, env) # Collect all PageDef values from the env for key, val in env.items(): if isinstance(val, PageDef): register_page(service_name, val) pages.append(val) return pages def load_page_dir(directory: str, service_name: str) -> list[PageDef]: """Load all .sx files from a directory and register pages.""" import glob as glob_mod pages: list[PageDef] = [] for filepath in sorted(glob_mod.glob(os.path.join(directory, "*.sx"))): pages.extend(load_page_file(filepath, service_name)) return pages # --------------------------------------------------------------------------- # Page execution # --------------------------------------------------------------------------- def _wrap_with_env(expr: Any, env: dict) -> str: """Serialize an expression wrapped with let-bindings from env. Injects page env values (URL params, data results) as let-bindings so the OCaml kernel can evaluate the expression with those bindings. Only injects non-component, non-callable values that pages add dynamically. """ from .parser import serialize from .ocaml_bridge import _serialize_for_ocaml from .types import Symbol, Keyword, NIL body = serialize(expr) bindings = [] for k, v in env.items(): # Skip component definitions — already loaded in kernel if k.startswith("~") or callable(v): continue # Skip env keys that are component-env infrastructure if isinstance(v, (type, type(None))) and v is not None: continue # Serialize the value if v is NIL or v is None: sv = "nil" elif isinstance(v, bool): sv = "true" if v else "false" elif isinstance(v, (int, float)): sv = str(int(v)) if isinstance(v, float) and v == int(v) else str(v) elif isinstance(v, str): sv = _serialize_for_ocaml(v) elif isinstance(v, (list, dict)): sv = _serialize_for_ocaml(v) else: # Component, Lambda, etc — skip, already in kernel continue bindings.append(f"({k} {sv})") if not bindings: return body return f"(let ({' '.join(bindings)}) {body})" async def _eval_slot(expr: Any, env: dict, ctx: Any) -> str: """Evaluate a page slot expression and return an sx source string. Expands component calls (so IO in the body executes) but serializes the result as SX wire format, not HTML. """ import os if os.environ.get("SX_USE_OCAML") == "1": from .ocaml_bridge import get_bridge from .parser import serialize bridge = await get_bridge() # Wrap expression with let-bindings for env values that pages # inject (URL params, data results, etc.) sx_text = _wrap_with_env(expr, env) service = ctx.get("_helper_service", "") if isinstance(ctx, dict) else "" return await bridge.aser_slot(sx_text, ctx={"_helper_service": service}) if os.environ.get("SX_USE_REF") == "1": from .ref.async_eval_ref import async_eval_slot_to_sx else: from .async_eval import async_eval_slot_to_sx return await async_eval_slot_to_sx(expr, env, ctx) def _replace_suspense_sexp(sx: str, stream_id: str, replacement: str) -> str: """Replace a rendered ~shared:pages/suspense div in SX source with replacement content. After _eval_slot, ~shared:pages/suspense expands to: (div :id "sx-suspense-{id}" :data-suspense "{id}" :style "display:contents" ...) This finds the balanced s-expression containing :data-suspense "{id}" and replaces it with the given replacement string. """ marker = f':data-suspense "{stream_id}"' idx = sx.find(marker) if idx < 0: return sx # Walk backwards to find the opening paren of the containing (div ...) start = sx.rfind("(", 0, idx) if start < 0: return sx # Walk forward from start to find matching close paren (balanced) depth = 0 i = start while i < len(sx): ch = sx[i] if ch == "(": depth += 1 elif ch == ")": depth -= 1 if depth == 0: return sx[:start] + replacement + sx[i + 1:] elif ch == '"': # Skip string contents i += 1 while i < len(sx) and sx[i] != '"': if sx[i] == "\\": i += 1 # skip escaped char i += 1 i += 1 return sx async def execute_page( page_def: PageDef, service_name: str, url_params: dict[str, Any] | None = None, ) -> str: """Execute a declarative page and return the full response string. 1. Build env from component env + page closure + URL params 2. If :data — async_eval(data_expr) → merge result dict into env 3. Render slots: async_eval_to_sx(content_expr) etc. 4. get_template_context() for header construction 5. Resolve layout → header rows 6. Branch: full_page_sx() vs oob_page_sx() based on is_htmx_request() """ from .jinja_bridge import get_component_env, _get_request_context from .page import get_template_context from .helpers import full_page_sx, oob_page_sx, sx_response from .layouts import get_layout from shared.browser.app.utils.htmx import is_htmx_request _use_ocaml = os.environ.get("SX_USE_OCAML") == "1" if _use_ocaml: from .ocaml_bridge import get_bridge from .parser import serialize, parse_all from .ocaml_bridge import _serialize_for_ocaml else: from .async_eval import async_eval if url_params is None: url_params = {} # Build environment — closure first (page-local defines), then fresh # component env on top so hot-reloaded components take priority. env = dict(page_def.closure) env.update(get_component_env()) env.update(get_page_helpers(service_name)) # Inject URL params as kebab-case symbols for key, val in url_params.items(): kebab = key.replace("_", "-") env[kebab] = val env[key] = val # also provide snake_case for convenience # Get request context for I/O primitives ctx = _get_request_context() # Evaluate :data expression if present _multi_stream_content = None if page_def.data_expr is not None: if _use_ocaml: bridge = await get_bridge() sx_text = _wrap_with_env(page_def.data_expr, env) ocaml_ctx = {"_helper_service": service_name} raw = await bridge.eval(sx_text, ctx=ocaml_ctx) # Parse result back to Python dict/value if raw: parsed = parse_all(raw) data_result = parsed[0] if parsed else {} else: data_result = {} else: data_result = await async_eval(page_def.data_expr, env, ctx) if hasattr(data_result, '__aiter__'): # Multi-stream: consume generator, eval :content per chunk, # combine into shell with resolved suspense slots. chunks = [] async for chunk in data_result: if not isinstance(chunk, dict): continue chunk = dict(chunk) stream_id = chunk.pop("stream-id", "stream-content") chunk_env = dict(env) for k, v in chunk.items(): chunk_env[k.replace("_", "-")] = v chunk_sx = await _eval_slot(page_def.content_expr, chunk_env, ctx) if page_def.content_expr else "" chunks.append((stream_id, chunk_sx)) # Build content: if :shell exists, render it and inline resolved chunks if page_def.shell_expr is not None: shell_sx = await _eval_slot(page_def.shell_expr, env, ctx) # Replace each rendered suspense div with resolved content. # _eval_slot expands ~shared:pages/suspense into: # (div :id "sx-suspense-X" :data-suspense "X" :style "display:contents" ...) # We find the balanced s-expr containing :data-suspense "X" and replace it. for stream_id, chunk_sx in chunks: shell_sx = _replace_suspense_sexp(shell_sx, stream_id, chunk_sx) _multi_stream_content = shell_sx else: # No shell: just concatenate all chunks in a fragment parts = " ".join(sx for _, sx in chunks) _multi_stream_content = f"(<> {parts})" elif isinstance(data_result, dict): # Merge with kebab-case keys so SX symbols can reference them for k, v in data_result.items(): env[k.replace("_", "-")] = v # Render content slot (required) if _multi_stream_content is not None: content_sx = _multi_stream_content else: content_sx = await _eval_slot(page_def.content_expr, env, ctx) # Render optional slots filter_sx = "" if page_def.filter_expr is not None: filter_sx = await _eval_slot(page_def.filter_expr, env, ctx) aside_sx = "" if page_def.aside_expr is not None: aside_sx = await _eval_slot(page_def.aside_expr, env, ctx) menu_sx = "" if page_def.menu_expr is not None: menu_sx = await _eval_slot(page_def.menu_expr, env, ctx) # Resolve layout → header rows + mobile menu fallback tctx = await get_template_context() header_rows = "" oob_headers = "" layout_kwargs: dict[str, Any] = {} if page_def.layout is not None: if isinstance(page_def.layout, str): layout_name = page_def.layout elif isinstance(page_def.layout, list): # (:layout-name :key val ...) — unevaluated list from defpage # Evaluate each value in the current env to resolve dynamic bindings from .types import Keyword as SxKeyword, Symbol as SxSymbol raw = page_def.layout # First element is layout name (keyword or symbol or string) first = raw[0] if isinstance(first, SxKeyword): layout_name = first.name elif isinstance(first, SxSymbol): layout_name = first.name elif isinstance(first, str): layout_name = first else: layout_name = str(first) # Parse keyword args — evaluate values at request time i = 1 while i < len(raw): k = raw[i] if isinstance(k, SxKeyword) and i + 1 < len(raw): raw_val = raw[i + 1] if _use_ocaml: bridge = await get_bridge() sx_text = _wrap_with_env(raw_val, env) ocaml_ctx = {"_helper_service": service_name} raw_result = await bridge.eval(sx_text, ctx=ocaml_ctx) if raw_result: parsed = parse_all(raw_result) resolved = parsed[0] if parsed else None else: resolved = None else: resolved = await async_eval(raw_val, env, ctx) layout_kwargs[k.name.replace("-", "_")] = resolved i += 2 else: i += 1 else: layout_name = str(page_def.layout) layout = get_layout(layout_name) if layout is not None: header_rows = await layout.full_headers(tctx, **layout_kwargs) oob_headers = await layout.oob_headers(tctx, **layout_kwargs) if not menu_sx: menu_sx = await layout.mobile_menu(tctx, **layout_kwargs) # Branch on request type is_htmx = is_htmx_request() if is_htmx: # Compute content expression deps so the server sends component # definitions the client needs for future client-side routing extra_deps: set[str] | None = None if page_def.content_expr is not None and page_def.data_expr is None: from .deps import components_needed from .parser import serialize try: content_src = serialize(page_def.content_expr) extra_deps = components_needed(content_src, get_component_env()) except Exception: pass # non-critical — client will just fall back to server return sx_response(await oob_page_sx( oobs=oob_headers if oob_headers else "", filter=filter_sx, aside=aside_sx, content=content_sx, menu=menu_sx, ), extra_component_names=extra_deps) else: return await full_page_sx( tctx, header_rows=header_rows, filter=filter_sx, aside=aside_sx, content=content_sx, menu=menu_sx, ) # --------------------------------------------------------------------------- # Streaming page execution (Phase 6: Streaming & Suspense) # --------------------------------------------------------------------------- async def execute_page_streaming( page_def: PageDef, service_name: str, url_params: dict[str, Any] | None = None, ): """Execute a page with streaming response. All context-dependent setup (g, request, current_app access) runs in this regular async function — called while the request context is live. Returns an async generator that yields pre-computed HTML chunks and awaits already-created tasks (no further context access needed). """ import asyncio from .jinja_bridge import get_component_env, _get_request_context from .async_eval import async_eval from .page import get_template_context from .helpers import ( render_to_html as _helpers_render_to_html, sx_page_streaming_parts, sx_streaming_resolve_script, ) from .parser import SxExpr, serialize as sx_serialize from .layouts import get_layout if url_params is None: url_params = {} env = dict(page_def.closure) env.update(get_component_env()) env.update(get_page_helpers(service_name)) for key, val in url_params.items(): kebab = key.replace("_", "-") env[kebab] = val env[key] = val ctx = _get_request_context() tctx = await get_template_context() # Build fallback expressions if page_def.fallback_expr is not None: fallback_sx = sx_serialize(page_def.fallback_expr) else: fallback_sx = ( '(div :class "p-8 animate-pulse"' ' (div :class "h-8 bg-stone-200 rounded mb-4 w-1/3")' ' (div :class "h-64 bg-stone-200 rounded"))' ) header_fallback = '(div :class "h-12 bg-stone-200 animate-pulse")' # Resolve layout layout = None layout_kwargs: dict[str, Any] = {} if page_def.layout is not None: if isinstance(page_def.layout, str): layout_name = page_def.layout elif isinstance(page_def.layout, list): from .types import Keyword as SxKeyword, Symbol as SxSymbol raw = page_def.layout first = raw[0] layout_name = ( first.name if isinstance(first, (SxKeyword, SxSymbol)) else str(first) ) i = 1 while i < len(raw): k = raw[i] if isinstance(k, SxKeyword) and i + 1 < len(raw): resolved = await async_eval(raw[i + 1], env, ctx) layout_kwargs[k.name.replace("-", "_")] = resolved i += 2 else: i += 1 else: layout_name = str(page_def.layout) layout = get_layout(layout_name) # --- Launch concurrent IO tasks (inherit context via create_task) --- _stream_queue: asyncio.Queue = asyncio.Queue() _multi_stream = False async def _eval_data_and_content(): """Evaluate :data then :content. If :data returns an async generator (multi-stream mode), iterate it and push each (stream_id, content_sx) to _stream_queue incrementally. The main stream loop drains the queue and sends resolve scripts as items arrive — giving true staggered streaming. """ nonlocal _multi_stream try: data_env = dict(env) if page_def.data_expr is not None: data_result = await async_eval(page_def.data_expr, data_env, ctx) # Async generator: multi-stream mode if hasattr(data_result, '__aiter__'): _multi_stream = True async for chunk in data_result: if not isinstance(chunk, dict): continue chunk = dict(chunk) # copy so pop doesn't mutate stream_id = chunk.pop("stream-id", "stream-content") chunk_env = dict(env) for k, v in chunk.items(): chunk_env[k.replace("_", "-")] = v content_sx = await _eval_slot(page_def.content_expr, chunk_env, ctx) if page_def.content_expr else "" await _stream_queue.put(("data", stream_id, content_sx)) await _stream_queue.put(("data-done",)) return if isinstance(data_result, dict): for k, v in data_result.items(): data_env[k.replace("_", "-")] = v content_sx = await _eval_slot(page_def.content_expr, data_env, ctx) if page_def.content_expr else "" filter_sx = await _eval_slot(page_def.filter_expr, data_env, ctx) if page_def.filter_expr else "" aside_sx = await _eval_slot(page_def.aside_expr, data_env, ctx) if page_def.aside_expr else "" menu_sx = await _eval_slot(page_def.menu_expr, data_env, ctx) if page_def.menu_expr else "" await _stream_queue.put(("data-single", content_sx, filter_sx, aside_sx, menu_sx)) except EvalError as e: logger.error("Streaming data task failed (EvalError): %s\n%s", e, traceback.format_exc()) error_sx = _eval_error_sx(e, "page content") await _stream_queue.put(("data-single", error_sx, "", "", "")) except Exception as e: logger.error("Streaming data task failed: %s\n%s", e, traceback.format_exc()) await _stream_queue.put(("data-done",)) async def _eval_headers(): try: if layout is None: await _stream_queue.put(("headers", "", "")) return rows = await layout.full_headers(tctx, **layout_kwargs) menu = await layout.mobile_menu(tctx, **layout_kwargs) await _stream_queue.put(("headers", rows, menu)) except Exception as e: logger.error("Streaming headers task failed: %s\n%s", e, traceback.format_exc()) await _stream_queue.put(("headers", "", "")) data_task = asyncio.create_task(_eval_data_and_content()) header_task = asyncio.create_task(_eval_headers()) # --- Build initial shell as HTML (still in request context) --- # Render to HTML so [data-suspense] elements are real DOM immediately. # No dependency on sx-browser.js boot timing for the initial shell. suspense_header_sx = f'(~shared:pages/suspense :id "stream-headers" :fallback {header_fallback})' # When :shell is provided, it renders directly as the content slot # (it contains its own ~shared:pages/suspense for the data-dependent part). # Otherwise, wrap the entire :content in a single suspense. if page_def.shell_expr is not None: shell_content_sx = await _eval_slot(page_def.shell_expr, env, ctx) suspense_content_sx = shell_content_sx else: suspense_content_sx = f'(~shared:pages/suspense :id "stream-content" :fallback {fallback_sx})' initial_page_html = await _helpers_render_to_html("shared:layout/app-body", header_rows=SxExpr(suspense_header_sx), content=SxExpr(suspense_content_sx), ) # Include layout component refs + page content so the scan picks up # their transitive deps (e.g. ~shared:fragments/cart-mini, ~auth-menu in headers). layout_refs = "" if layout is not None and hasattr(layout, "component_names"): layout_refs = " ".join(f"({n})" for n in layout.component_names) content_ref = "" if page_def.content_expr is not None: content_ref = sx_serialize(page_def.content_expr) shell_ref = "" if page_def.shell_expr is not None: shell_ref = sx_serialize(page_def.shell_expr) page_sx_for_scan = f'(<> {layout_refs} {content_ref} {shell_ref} (~shared:layout/app-body :header-rows {suspense_header_sx} :content {suspense_content_sx}))' shell, tail = sx_page_streaming_parts( tctx, initial_page_html, page_sx=page_sx_for_scan, ) # Capture component env + extras scanner while we still have context. # Resolved SX may reference components not in the initial scan # (e.g. ~shared:fragments/cart-mini from IO-generated header content). from .jinja_bridge import components_for_page as _comp_scan from quart import current_app as _ca _service = _ca.name # Track which components were already sent in the shell _shell_scan = page_sx_for_scan def _extra_defs(sx_source: str) -> str: """Return component defs needed by sx_source but not in shell.""" from .deps import components_needed comp_env = dict(get_component_env()) shell_needed = components_needed(_shell_scan, comp_env) resolve_needed = components_needed(sx_source, comp_env) extra = resolve_needed - shell_needed if not extra: return "" from .parser import serialize from .types import Component parts = [] for key, val in comp_env.items(): if isinstance(val, Component) and (f"~{val.name}" in extra or key in extra): param_strs = ["&key"] + list(val.params) if val.has_children: param_strs.extend(["&rest", "children"]) params_sx = "(" + " ".join(param_strs) + ")" body_sx = serialize(val.body, pretty=True) parts.append(f"(defcomp ~{val.name} {params_sx} {body_sx})") return "\n".join(parts) # --- Return async generator that yields chunks --- # No context access needed below — just awaiting tasks and yielding strings. async def _stream_chunks(): yield shell + tail # Both tasks push tagged items onto _stream_queue. We drain until # both are done. Items: ("headers", rows, menu), ("data-single", ...), # ("data", stream_id, sx), ("data-done",). remaining = 2 # waiting for: headers + data while remaining > 0: item = await _stream_queue.get() kind = item[0] try: if kind == "headers": _, header_rows, header_menu = item remaining -= 1 if header_rows: extras = _extra_defs(header_rows) yield sx_streaming_resolve_script("stream-headers", header_rows, extras) elif kind == "data-single": _, content_sx, filter_sx, aside_sx, menu_sx = item remaining -= 1 extras = _extra_defs(content_sx) yield sx_streaming_resolve_script("stream-content", content_sx, extras) elif kind == "data": _, stream_id, content_sx = item extras = _extra_defs(content_sx) yield sx_streaming_resolve_script(stream_id, content_sx, extras) elif kind == "data-done": remaining -= 1 except Exception as e: logger.error("Streaming resolve failed for %s: %s\n%s", kind, e, traceback.format_exc()) yield "\n\n" return _stream_chunks() async def execute_page_streaming_oob( page_def: PageDef, service_name: str, url_params: dict[str, Any] | None = None, ): """Execute a streaming page for HTMX/SX requests. Like execute_page_streaming but yields OOB SX swap format instead of a full HTML document: 1. First yield: OOB SX with shell content (suspense skeletons) + CSS + defs 2. Subsequent yields: __sxResolve script tags as data resolves The client uses streaming fetch (ReadableStream) to process the OOB swap immediately and then execute resolve scripts as they arrive. """ import asyncio from .jinja_bridge import get_component_env, _get_request_context from .async_eval import async_eval from .page import get_template_context from .helpers import ( oob_page_sx, sx_streaming_resolve_script, components_for_request, SxExpr, ) from .parser import serialize as sx_serialize from .layouts import get_layout if url_params is None: url_params = {} env = dict(page_def.closure) env.update(get_component_env()) env.update(get_page_helpers(service_name)) for key, val in url_params.items(): kebab = key.replace("_", "-") env[kebab] = val env[key] = val ctx = _get_request_context() # Evaluate shell with suspense skeletons (no data yet) shell_sx = "" if page_def.shell_expr is not None: shell_sx = await _eval_slot(page_def.shell_expr, env, ctx) # Build initial OOB response with shell as content tctx = await get_template_context() # Resolve layout for OOB headers layout = None layout_kwargs: dict[str, Any] = {} if page_def.layout is not None: if isinstance(page_def.layout, str): layout_name = page_def.layout elif isinstance(page_def.layout, list): from .types import Keyword as SxKeyword, Symbol as SxSymbol raw = page_def.layout first = raw[0] layout_name = ( first.name if isinstance(first, (SxKeyword, SxSymbol)) else str(first) ) i = 1 while i < len(raw): k = raw[i] if isinstance(k, SxKeyword) and i + 1 < len(raw): resolved = await async_eval(raw[i + 1], env, ctx) layout_kwargs[k.name.replace("-", "_")] = resolved i += 2 else: i += 1 else: layout_name = str(page_def.layout) layout = get_layout(layout_name) # Launch concurrent tasks _stream_queue: asyncio.Queue = asyncio.Queue() async def _eval_data(): try: if page_def.data_expr is not None: data_result = await async_eval(page_def.data_expr, env, ctx) if hasattr(data_result, '__aiter__'): async for chunk in data_result: if not isinstance(chunk, dict): continue chunk = dict(chunk) stream_id = chunk.pop("stream-id", "stream-content") chunk_env = dict(env) for k, v in chunk.items(): chunk_env[k.replace("_", "-")] = v content_sx = await _eval_slot(page_def.content_expr, chunk_env, ctx) if page_def.content_expr else "" await _stream_queue.put(("data", stream_id, content_sx)) await _stream_queue.put(("data-done",)) return await _stream_queue.put(("data-done",)) except EvalError as e: logger.error("Streaming OOB data task failed (EvalError): %s\n%s", e, traceback.format_exc()) error_sx = _eval_error_sx(e, "page content") await _stream_queue.put(("data", "stream-content", error_sx)) await _stream_queue.put(("data-done",)) except Exception as e: logger.error("Streaming OOB data task failed: %s\n%s", e, traceback.format_exc()) await _stream_queue.put(("data-done",)) async def _eval_oob_headers(): try: if layout is not None: oob_headers = await layout.oob_headers(tctx, **layout_kwargs) await _stream_queue.put(("headers", oob_headers)) else: await _stream_queue.put(("headers", "")) except Exception as e: logger.error("Streaming OOB headers task failed: %s\n%s", e, traceback.format_exc()) await _stream_queue.put(("headers", "")) data_task = asyncio.create_task(_eval_data()) header_task = asyncio.create_task(_eval_oob_headers()) # Build initial OOB body with shell content (skeletons in place) oob_body = await oob_page_sx( oobs="", # headers will arrive via resolve script content=shell_sx, ) # Prepend component definitions + CSS (like sx_response does) from quart import request comp_defs = components_for_request(oob_body) body = oob_body if comp_defs: body = (f'\n{body}') from .css_registry import scan_classes_from_sx, lookup_rules, registry_loaded if registry_loaded(): new_classes = scan_classes_from_sx(oob_body) if comp_defs: new_classes.update(scan_classes_from_sx(comp_defs)) known_raw = request.headers.get("SX-Css", "") if known_raw: from .css_registry import lookup_css_hash if len(known_raw) <= 16: looked_up = lookup_css_hash(known_raw) known_classes = looked_up if looked_up is not None else set() else: known_classes = set(known_raw.split(",")) new_classes -= known_classes if new_classes: new_rules = lookup_rules(new_classes) if new_rules: body = f'\n{body}' # Capture component env for extra defs in resolve chunks from .jinja_bridge import components_for_page as _comp_scan _base_scan = oob_body def _extra_defs(sx_source: str) -> str: from .deps import components_needed comp_env = dict(get_component_env()) base_needed = components_needed(_base_scan, comp_env) resolve_needed = components_needed(sx_source, comp_env) extra = resolve_needed - base_needed if not extra: return "" from .parser import serialize from .types import Component parts = [] for key, val in comp_env.items(): if isinstance(val, Component) and (f"~{val.name}" in extra or key in extra): param_strs = ["&key"] + list(val.params) if val.has_children: param_strs.extend(["&rest", "children"]) params_sx = "(" + " ".join(param_strs) + ")" body_sx = serialize(val.body, pretty=True) parts.append(f"(defcomp ~{val.name} {params_sx} {body_sx})") return "\n".join(parts) # Yield chunks async def _stream_oob_chunks(): # First chunk: OOB swap with skeletons yield body # Drain queue for resolve scripts remaining = 2 # headers + data while remaining > 0: item = await _stream_queue.get() kind = item[0] try: if kind == "headers": _, oob_hdr = item remaining -= 1 # Headers don't need resolve scripts for OOB — they're # handled by OOB swap attributes in the SX content itself. # But if we have header content, send a resolve for it. if oob_hdr: extras = _extra_defs(oob_hdr) yield sx_streaming_resolve_script("stream-headers", oob_hdr, extras) elif kind == "data": _, stream_id, content_sx = item extras = _extra_defs(content_sx) yield sx_streaming_resolve_script(stream_id, content_sx, extras) elif kind == "data-done": remaining -= 1 except Exception as e: logger.error("Streaming OOB resolve failed for %s: %s\n%s", kind, e, traceback.format_exc()) return _stream_oob_chunks() # --------------------------------------------------------------------------- # Blueprint mounting # --------------------------------------------------------------------------- def compute_page_render_plans(service_name: str) -> None: """Pre-compute and cache render plans for all pages in a service. Must be called after components are loaded (compute_all_deps/io_refs done) and pages are registered. Stores plans on PageDef.render_plan. """ import time from .parser import serialize from .deps import page_render_plan, get_all_io_names from .jinja_bridge import _COMPONENT_ENV t0 = time.monotonic() io_names = get_all_io_names() pages = get_all_pages(service_name) count = 0 for page_def in pages.values(): if page_def.content_expr is not None: content_src = serialize(page_def.content_expr) page_def.render_plan = page_render_plan(content_src, _COMPONENT_ENV, io_names) count += 1 elapsed = (time.monotonic() - t0) * 1000 logger.info("Computed render plans for %d pages in %s (%.1fms)", count, service_name, elapsed) def auto_mount_pages(app: Any, service_name: str) -> None: """Auto-mount all registered defpages for a service directly on the app. Pages must have absolute paths (from the service URL root). Called once per service in app.py after setup_*_pages(). Also mounts the /sx/data/ endpoint for client-side data fetching. """ pages = get_all_pages(service_name) # Pre-compute render plans (which components render where) compute_page_render_plans(service_name) for page_def in pages.values(): _mount_one_page(app, service_name, page_def) logger.info("Auto-mounted %d defpages for %s", len(pages), service_name) # Mount page data endpoint for client-side rendering of :data pages has_data_pages = any(p.data_expr is not None for p in pages.values()) if has_data_pages: auto_mount_page_data(app, service_name) # Mount IO proxy endpoint for Phase 5: client-side IO primitives mount_io_endpoint(app, service_name) # Mount service worker at root scope for offline data layer mount_service_worker(app) # Mount action endpoint for Phase 7c: optimistic data mutations mount_action_endpoint(app, service_name) def mount_pages(bp: Any, service_name: str, names: set[str] | list[str] | None = None) -> None: """Mount registered PageDef routes onto a Quart Blueprint. For each PageDef, adds a GET route with appropriate auth/cache decorators. Coexists with existing Python routes on the same blueprint. If *names* is given, only mount pages whose name is in the set. """ from quart import make_response pages = get_all_pages(service_name) for page_def in pages.values(): if names is not None and page_def.name not in names: continue _mount_one_page(bp, service_name, page_def) def _mount_one_page(bp: Any, service_name: str, page_def: PageDef) -> None: """Mount a single PageDef as a GET route on the blueprint.""" from quart import make_response, Response if page_def.stream: # Streaming response: yields chunks as IO resolves async def page_view(**kwargs: Any) -> Any: from shared.browser.app.utils.htmx import is_htmx_request current = get_page(service_name, page_def.name) or page_def if is_htmx_request(): # Streaming OOB: shell with skeletons first, then resolve scripts gen = await execute_page_streaming_oob( current, service_name, url_params=kwargs, ) return Response(gen, content_type="text/sx; charset=utf-8") # Full page streaming: HTML document with inline resolve scripts gen = await execute_page_streaming( current, service_name, url_params=kwargs, ) return Response(gen, content_type="text/html; charset=utf-8") else: # Standard non-streaming response async def page_view(**kwargs: Any) -> Any: current = get_page(service_name, page_def.name) or page_def result = await execute_page(current, service_name, url_params=kwargs) if hasattr(result, "status_code"): return result return await make_response(result, 200) # Give the view function a unique name for Quart's routing page_view.__name__ = f"defpage_{page_def.name.replace('-', '_')}" page_view.__qualname__ = page_view.__name__ # Apply auth decorator view_fn = _apply_auth(page_view, page_def.auth) # Apply cache decorator if page_def.cache: view_fn = _apply_cache(view_fn, page_def.cache) # Register the route bp.add_url_rule( page_def.path, endpoint=page_view.__name__, view_func=view_fn, methods=["GET"], ) logger.info("Mounted defpage %s:%s → GET %s", service_name, page_def.name, page_def.path) def _apply_auth(fn: Any, auth: str | list) -> Any: """Wrap a view function with the appropriate auth decorator.""" if auth == "public": return fn if auth == "login": from shared.browser.app.authz import require_login return require_login(fn) if auth == "admin": from shared.browser.app.authz import require_admin return require_admin(fn) if auth == "post_author": from shared.browser.app.authz import require_post_author return require_post_author(fn) if isinstance(auth, list) and auth and auth[0] == "rights": from shared.browser.app.authz import require_rights return require_rights(*auth[1:])(fn) return fn def _apply_cache(fn: Any, cache: dict) -> Any: """Wrap a view function with cache_page decorator.""" from shared.browser.app.redis_cacher import cache_page ttl = cache.get("ttl", 0) tag = cache.get("tag") scope = cache.get("scope", "user") return cache_page(ttl=ttl, tag=tag, scope=scope)(fn) async def _check_page_auth(auth: str | list) -> Any | None: """Check auth for the data endpoint. Returns None if OK, or a response.""" from quart import g, abort as quart_abort if auth == "public": return None user = g.get("user") if auth == "login": if not user: quart_abort(401) elif auth == "admin": if not user or not user.get("rights", {}).get("admin"): quart_abort(403) elif isinstance(auth, list) and auth and auth[0] == "rights": if not user: quart_abort(401) user_rights = set(user.get("rights", {}).keys()) required = set(auth[1:]) if not required.issubset(user_rights): quart_abort(403) return None # --------------------------------------------------------------------------- # Page data endpoint — evaluate :data expression, return SX # --------------------------------------------------------------------------- async def evaluate_page_data( page_def: PageDef, service_name: str, url_params: dict[str, Any] | None = None, ) -> str: """Evaluate a defpage's :data expression and return result as SX. This is the data-only counterpart to execute_page(). The client fetches this when it has all component definitions but needs the data bindings to render a :data page client-side. Returns SX wire format (e.g. ``{:posts (list ...) :count 42}``), parsed by the client's SX parser and merged into the eval env. """ from .jinja_bridge import get_component_env, _get_request_context from .async_eval import async_eval from .parser import serialize if page_def.data_expr is None: return "nil" if url_params is None: url_params = {} # Build environment (same as execute_page) env = dict(page_def.closure) env.update(get_component_env()) env.update(get_page_helpers(service_name)) for key, val in url_params.items(): kebab = key.replace("_", "-") env[kebab] = val env[key] = val ctx = _get_request_context() data_result = await async_eval(page_def.data_expr, env, ctx) # Multi-stream: async generator can't be serialized as a single dict. # Return nil to signal the client to fall back to server-side rendering. if hasattr(data_result, '__aiter__'): # Close the generator cleanly await data_result.aclose() return "nil" # Kebab-case dict keys (matching execute_page) if isinstance(data_result, dict): data_result = { k.replace("_", "-"): v for k, v in data_result.items() } # Serialize the result as SX return serialize(data_result) def auto_mount_page_data(app: Any, service_name: str) -> None: """Mount a single /sx/data/ endpoint that serves page data as SX. For each defpage with :data, the client can GET /sx/data/ (with URL params as query args) and receive the evaluated :data result serialized as SX wire format (text/sx). Auth is enforced per-page: the endpoint looks up the page's auth setting and checks it before evaluating the data expression. """ from quart import make_response, request, abort as quart_abort async def page_data_view(page_name: str) -> Any: page_def = get_page(service_name, page_name) if page_def is None: quart_abort(404) if page_def.data_expr is None: quart_abort(404) # Check auth — same enforcement as the page route itself auth_error = await _check_page_auth(page_def.auth) if auth_error is not None: return auth_error # Extract URL params from query string url_params = dict(request.args) result_sx = await evaluate_page_data( page_def, service_name, url_params=url_params, ) resp = await make_response(result_sx, 200) resp.content_type = "text/sx; charset=utf-8" return resp page_data_view.__name__ = "sx_page_data" page_data_view.__qualname__ = "sx_page_data" app.add_url_rule( "/sx/data/", endpoint="sx_page_data", view_func=page_data_view, methods=["GET"], ) logger.info("Mounted page data endpoint for %s at /sx/data/", service_name) def mount_io_endpoint(app: Any, service_name: str) -> None: """Mount /sx/io/ endpoint for client-side IO primitive calls. The client can call any allowed IO primitive or page helper via GET/POST. Result is returned as SX wire format (text/sx). Falls back to page helpers when the name isn't a global IO primitive, so service-specific functions like ``highlight`` work via the proxy. """ import asyncio as _asyncio from quart import make_response, request, abort as quart_abort from .primitives_io import IO_PRIMITIVES, execute_io from .jinja_bridge import _get_request_context from .parser import serialize # Build allowlist from all component IO refs across this service from .jinja_bridge import _COMPONENT_ENV from .types import Component as _Comp _ALLOWED_IO: set[str] = set() for _val in _COMPONENT_ENV.values(): if isinstance(_val, _Comp) and _val.io_refs: _ALLOWED_IO.update(_val.io_refs) from shared.browser.app.csrf import csrf_exempt @csrf_exempt async def io_proxy(name: str) -> Any: if name not in _ALLOWED_IO: quart_abort(403) # Parse args from query string or JSON body args: list = [] kwargs: dict = {} if request.method == "GET": for k, v in request.args.items(): if k.startswith("_arg"): args.append(v) else: kwargs[k] = v else: data = await request.get_json(silent=True) or {} args = data.get("args", []) kwargs = data.get("kwargs", {}) # Try global IO primitives first if name in IO_PRIMITIVES: ctx = _get_request_context() result = await execute_io(name, args, kwargs, ctx) else: # Fall back to page helpers (service-specific functions) helpers = get_page_helpers(service_name) helper_fn = helpers.get(name) if helper_fn is None: quart_abort(404) result = helper_fn(*args, **kwargs) if kwargs else helper_fn(*args) if _asyncio.iscoroutine(result): result = await result result_sx = serialize(result) if result is not None else "nil" resp = await make_response(result_sx, 200) resp.content_type = "text/sx; charset=utf-8" resp.headers["Cache-Control"] = "public, max-age=300" return resp io_proxy.__name__ = "sx_io_proxy" io_proxy.__qualname__ = "sx_io_proxy" app.add_url_rule( "/sx/io/", endpoint="sx_io_proxy", view_func=io_proxy, methods=["GET", "POST"], ) logger.info("Mounted IO proxy for %s: %s", service_name, sorted(_ALLOWED_IO)) # --------------------------------------------------------------------------- # Service Worker mount # --------------------------------------------------------------------------- _SW_MOUNTED = False def mount_service_worker(app: Any) -> None: """Mount the SX service worker at /sx-sw.js (root scope). The service worker provides offline data caching: - /sx/data/* and /sx/io/* responses cached in IndexedDB - /static/* assets cached via Cache API (stale-while-revalidate) - Everything else passes through to network """ global _SW_MOUNTED if _SW_MOUNTED: return _SW_MOUNTED = True sw_path = os.path.join( os.path.dirname(__file__), "..", "static", "scripts", "sx-sw.js" ) sw_path = os.path.abspath(sw_path) async def serve_sw(): from quart import send_file return await send_file( sw_path, mimetype="application/javascript", cache_timeout=0, # no caching — SW updates checked by browser ) app.add_url_rule("/sx-sw.js", endpoint="sx_service_worker", view_func=serve_sw) logger.info("Mounted service worker at /sx-sw.js") def mount_action_endpoint(app: Any, service_name: str) -> None: """Mount /sx/action/ endpoint for client-side data mutations. The client can POST to trigger a named action (registered via register_page_helpers with an 'action:' prefix). The action receives the JSON payload, performs the mutation, and returns the new page data as SX wire format. This is the server counterpart to submit-mutation in orchestration.sx. """ from quart import make_response, request, abort as quart_abort from .parser import serialize from shared.browser.app.csrf import csrf_exempt @csrf_exempt async def action_handler(name: str) -> Any: # Look up action helper helpers = get_page_helpers(service_name) action_fn = helpers.get(f"action:{name}") if action_fn is None: quart_abort(404) # Parse JSON payload import asyncio as _asyncio data = await request.get_json(silent=True) or {} try: result = action_fn(**data) if _asyncio.iscoroutine(result): result = await result except Exception as e: logger.warning("Action %s failed: %s", name, e) resp = await make_response(f'(dict "error" "{e}")', 500) resp.content_type = "text/sx; charset=utf-8" return resp result_sx = serialize(result) if result is not None else "nil" resp = await make_response(result_sx, 200) resp.content_type = "text/sx; charset=utf-8" return resp action_handler.__name__ = "sx_action" action_handler.__qualname__ = "sx_action" app.add_url_rule( "/sx/action/", endpoint="sx_action", view_func=action_handler, methods=["POST"], ) logger.info("Mounted action endpoint for %s at /sx/action/", service_name)