EvalError moved to types.py. All 27 files updated to import eval_expr, trampoline, call_lambda, etc. directly from shared.sx.ref.sx_ref instead of through the evaluator.py indirection layer. 320/320 spec tests pass. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1290 lines
48 KiB
Python
1290 lines
48 KiB
Python
"""
|
|
Declarative page registry and blueprint mounting.
|
|
|
|
Supports ``defpage`` s-expressions that define GET route handlers
|
|
in .sx files instead of Python. Each page is a self-contained
|
|
declaration with path, auth, layout, data, and content slots.
|
|
|
|
Usage::
|
|
|
|
from shared.sx.pages import load_page_file, mount_pages
|
|
|
|
# Load page definitions from .sx files
|
|
load_page_file("blog/sx/pages/admin.sx", "blog")
|
|
|
|
# Mount page routes onto an existing blueprint
|
|
mount_pages(bp, "blog")
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import inspect
|
|
import logging
|
|
import os
|
|
from typing import Any
|
|
|
|
from .types import PageDef
|
|
|
|
logger = logging.getLogger("sx.pages")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Registry — service → page-name → PageDef
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_PAGE_REGISTRY: dict[str, dict[str, PageDef]] = {}
|
|
_PAGE_HELPERS: dict[str, dict[str, Any]] = {} # service → name → callable
|
|
|
|
|
|
def register_page(service: str, page_def: PageDef) -> None:
|
|
"""Register a page definition for a service."""
|
|
if service not in _PAGE_REGISTRY:
|
|
_PAGE_REGISTRY[service] = {}
|
|
_PAGE_REGISTRY[service][page_def.name] = page_def
|
|
logger.debug("Registered page %s:%s path=%s", service, page_def.name, page_def.path)
|
|
|
|
|
|
def get_page(service: str, name: str) -> PageDef | None:
|
|
"""Look up a registered page by service and name."""
|
|
return _PAGE_REGISTRY.get(service, {}).get(name)
|
|
|
|
|
|
def get_all_pages(service: str) -> dict[str, PageDef]:
|
|
"""Return all pages for a service."""
|
|
return dict(_PAGE_REGISTRY.get(service, {}))
|
|
|
|
|
|
def clear_pages(service: str | None = None) -> None:
|
|
"""Clear page registry. If service given, clear only that service."""
|
|
if service is None:
|
|
_PAGE_REGISTRY.clear()
|
|
else:
|
|
_PAGE_REGISTRY.pop(service, None)
|
|
|
|
|
|
def register_page_helpers(service: str, helpers: dict[str, Any]) -> None:
|
|
"""Register Python functions available in defpage content expressions.
|
|
|
|
These are injected into the evaluation environment when executing
|
|
defpage content, allowing defpage to call into Python::
|
|
|
|
register_page_helpers("sx", {
|
|
"docs-content": docs_content_sx,
|
|
"reference-content": reference_content_sx,
|
|
})
|
|
|
|
Then in .sx::
|
|
|
|
(defpage docs-page
|
|
:path "/docs/<slug>"
|
|
:auth :public
|
|
:content (docs-content slug))
|
|
"""
|
|
from .boundary import validate_helper, validate_boundary_value
|
|
import asyncio
|
|
import functools
|
|
|
|
for name in helpers:
|
|
validate_helper(service, name)
|
|
|
|
# Wrap helpers to validate return values at the boundary.
|
|
# Async generators pass through unwrapped — their yields are validated
|
|
# by the streaming infrastructure, not at the helper boundary.
|
|
wrapped: dict[str, Any] = {}
|
|
for name, fn in helpers.items():
|
|
if inspect.isasyncgenfunction(fn):
|
|
# Async generator: pass through (streaming infra validates yields)
|
|
wrapped[name] = fn
|
|
elif asyncio.iscoroutinefunction(fn):
|
|
@functools.wraps(fn)
|
|
async def _async_wrap(*a, _fn=fn, _name=name, **kw):
|
|
result = await _fn(*a, **kw)
|
|
validate_boundary_value(result, context=f"helper {_name!r}")
|
|
return result
|
|
wrapped[name] = _async_wrap
|
|
else:
|
|
@functools.wraps(fn)
|
|
def _sync_wrap(*a, _fn=fn, _name=name, **kw):
|
|
result = _fn(*a, **kw)
|
|
validate_boundary_value(result, context=f"helper {_name!r}")
|
|
return result
|
|
wrapped[name] = _sync_wrap
|
|
|
|
if service not in _PAGE_HELPERS:
|
|
_PAGE_HELPERS[service] = {}
|
|
_PAGE_HELPERS[service].update(wrapped)
|
|
|
|
|
|
def get_page_helpers(service: str) -> dict[str, Any]:
|
|
"""Return registered page helpers for a service."""
|
|
return dict(_PAGE_HELPERS.get(service, {}))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Loading — parse .sx files and collect PageDef instances
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def load_page_file(filepath: str, service_name: str) -> list[PageDef]:
|
|
"""Parse an .sx file, evaluate it, and register any PageDef values."""
|
|
from .parser import parse_all
|
|
from .ref.sx_ref import eval_expr as _raw_eval, trampoline as _trampoline
|
|
_eval = lambda expr, env: _trampoline(_raw_eval(expr, env))
|
|
from .jinja_bridge import get_component_env
|
|
|
|
with open(filepath, encoding="utf-8") as f:
|
|
source = f.read()
|
|
|
|
# Seed env with component definitions so pages can reference components
|
|
env = dict(get_component_env())
|
|
exprs = parse_all(source)
|
|
pages: list[PageDef] = []
|
|
|
|
for expr in exprs:
|
|
_eval(expr, env)
|
|
|
|
# Collect all PageDef values from the env
|
|
for key, val in env.items():
|
|
if isinstance(val, PageDef):
|
|
register_page(service_name, val)
|
|
pages.append(val)
|
|
|
|
return pages
|
|
|
|
|
|
def load_page_dir(directory: str, service_name: str) -> list[PageDef]:
|
|
"""Load all .sx files from a directory and register pages."""
|
|
import glob as glob_mod
|
|
pages: list[PageDef] = []
|
|
for filepath in sorted(glob_mod.glob(os.path.join(directory, "*.sx"))):
|
|
pages.extend(load_page_file(filepath, service_name))
|
|
return pages
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Page execution
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _eval_slot(expr: Any, env: dict, ctx: Any) -> str:
|
|
"""Evaluate a page slot expression and return an sx source string.
|
|
|
|
Expands component calls (so IO in the body executes) but serializes
|
|
the result as SX wire format, not HTML.
|
|
"""
|
|
from .async_eval import async_eval_slot_to_sx
|
|
return await async_eval_slot_to_sx(expr, env, ctx)
|
|
|
|
|
|
def _replace_suspense_sexp(sx: str, stream_id: str, replacement: str) -> str:
|
|
"""Replace a rendered ~suspense div in SX source with replacement content.
|
|
|
|
After _eval_slot, ~suspense expands to:
|
|
(div :id "sx-suspense-{id}" :data-suspense "{id}" :style "display:contents" ...)
|
|
This finds the balanced s-expression containing :data-suspense "{id}" and
|
|
replaces it with the given replacement string.
|
|
"""
|
|
marker = f':data-suspense "{stream_id}"'
|
|
idx = sx.find(marker)
|
|
if idx < 0:
|
|
return sx
|
|
# Walk backwards to find the opening paren of the containing (div ...)
|
|
start = sx.rfind("(", 0, idx)
|
|
if start < 0:
|
|
return sx
|
|
# Walk forward from start to find matching close paren (balanced)
|
|
depth = 0
|
|
i = start
|
|
while i < len(sx):
|
|
ch = sx[i]
|
|
if ch == "(":
|
|
depth += 1
|
|
elif ch == ")":
|
|
depth -= 1
|
|
if depth == 0:
|
|
return sx[:start] + replacement + sx[i + 1:]
|
|
elif ch == '"':
|
|
# Skip string contents
|
|
i += 1
|
|
while i < len(sx) and sx[i] != '"':
|
|
if sx[i] == "\\":
|
|
i += 1 # skip escaped char
|
|
i += 1
|
|
i += 1
|
|
return sx
|
|
|
|
|
|
async def execute_page(
|
|
page_def: PageDef,
|
|
service_name: str,
|
|
url_params: dict[str, Any] | None = None,
|
|
) -> str:
|
|
"""Execute a declarative page and return the full response string.
|
|
|
|
1. Build env from component env + page closure + URL params
|
|
2. If :data — async_eval(data_expr) → merge result dict into env
|
|
3. Render slots: async_eval_to_sx(content_expr) etc.
|
|
4. get_template_context() for header construction
|
|
5. Resolve layout → header rows
|
|
6. Branch: full_page_sx() vs oob_page_sx() based on is_htmx_request()
|
|
"""
|
|
from .jinja_bridge import get_component_env, _get_request_context
|
|
from .async_eval import async_eval
|
|
from .page import get_template_context
|
|
from .helpers import full_page_sx, oob_page_sx, sx_response
|
|
from .layouts import get_layout
|
|
from shared.browser.app.utils.htmx import is_htmx_request
|
|
|
|
if url_params is None:
|
|
url_params = {}
|
|
|
|
# Build environment — closure first (page-local defines), then fresh
|
|
# component env on top so hot-reloaded components take priority.
|
|
env = dict(page_def.closure)
|
|
env.update(get_component_env())
|
|
env.update(get_page_helpers(service_name))
|
|
|
|
# Inject URL params as kebab-case symbols
|
|
for key, val in url_params.items():
|
|
kebab = key.replace("_", "-")
|
|
env[kebab] = val
|
|
env[key] = val # also provide snake_case for convenience
|
|
|
|
# Get request context for I/O primitives
|
|
ctx = _get_request_context()
|
|
|
|
# Evaluate :data expression if present
|
|
_multi_stream_content = None
|
|
if page_def.data_expr is not None:
|
|
data_result = await async_eval(page_def.data_expr, env, ctx)
|
|
if hasattr(data_result, '__aiter__'):
|
|
# Multi-stream: consume generator, eval :content per chunk,
|
|
# combine into shell with resolved suspense slots.
|
|
chunks = []
|
|
async for chunk in data_result:
|
|
if not isinstance(chunk, dict):
|
|
continue
|
|
chunk = dict(chunk)
|
|
stream_id = chunk.pop("stream-id", "stream-content")
|
|
chunk_env = dict(env)
|
|
for k, v in chunk.items():
|
|
chunk_env[k.replace("_", "-")] = v
|
|
chunk_sx = await _eval_slot(page_def.content_expr, chunk_env, ctx) if page_def.content_expr else ""
|
|
chunks.append((stream_id, chunk_sx))
|
|
# Build content: if :shell exists, render it and inline resolved chunks
|
|
if page_def.shell_expr is not None:
|
|
shell_sx = await _eval_slot(page_def.shell_expr, env, ctx)
|
|
# Replace each rendered suspense div with resolved content.
|
|
# _eval_slot expands ~suspense into:
|
|
# (div :id "sx-suspense-X" :data-suspense "X" :style "display:contents" ...)
|
|
# We find the balanced s-expr containing :data-suspense "X" and replace it.
|
|
for stream_id, chunk_sx in chunks:
|
|
shell_sx = _replace_suspense_sexp(shell_sx, stream_id, chunk_sx)
|
|
_multi_stream_content = shell_sx
|
|
else:
|
|
# No shell: just concatenate all chunks in a fragment
|
|
parts = " ".join(sx for _, sx in chunks)
|
|
_multi_stream_content = f"(<> {parts})"
|
|
elif isinstance(data_result, dict):
|
|
# Merge with kebab-case keys so SX symbols can reference them
|
|
for k, v in data_result.items():
|
|
env[k.replace("_", "-")] = v
|
|
|
|
# Render content slot (required)
|
|
if _multi_stream_content is not None:
|
|
content_sx = _multi_stream_content
|
|
else:
|
|
content_sx = await _eval_slot(page_def.content_expr, env, ctx)
|
|
|
|
# Render optional slots
|
|
filter_sx = ""
|
|
if page_def.filter_expr is not None:
|
|
filter_sx = await _eval_slot(page_def.filter_expr, env, ctx)
|
|
|
|
aside_sx = ""
|
|
if page_def.aside_expr is not None:
|
|
aside_sx = await _eval_slot(page_def.aside_expr, env, ctx)
|
|
|
|
menu_sx = ""
|
|
if page_def.menu_expr is not None:
|
|
menu_sx = await _eval_slot(page_def.menu_expr, env, ctx)
|
|
|
|
# Resolve layout → header rows + mobile menu fallback
|
|
tctx = await get_template_context()
|
|
header_rows = ""
|
|
oob_headers = ""
|
|
layout_kwargs: dict[str, Any] = {}
|
|
|
|
if page_def.layout is not None:
|
|
if isinstance(page_def.layout, str):
|
|
layout_name = page_def.layout
|
|
elif isinstance(page_def.layout, list):
|
|
# (:layout-name :key val ...) — unevaluated list from defpage
|
|
# Evaluate each value in the current env to resolve dynamic bindings
|
|
from .types import Keyword as SxKeyword, Symbol as SxSymbol
|
|
raw = page_def.layout
|
|
# First element is layout name (keyword or symbol or string)
|
|
first = raw[0]
|
|
if isinstance(first, SxKeyword):
|
|
layout_name = first.name
|
|
elif isinstance(first, SxSymbol):
|
|
layout_name = first.name
|
|
elif isinstance(first, str):
|
|
layout_name = first
|
|
else:
|
|
layout_name = str(first)
|
|
# Parse keyword args — evaluate values at request time
|
|
i = 1
|
|
while i < len(raw):
|
|
k = raw[i]
|
|
if isinstance(k, SxKeyword) and i + 1 < len(raw):
|
|
raw_val = raw[i + 1]
|
|
resolved = await async_eval(raw_val, env, ctx)
|
|
layout_kwargs[k.name.replace("-", "_")] = resolved
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
else:
|
|
layout_name = str(page_def.layout)
|
|
|
|
layout = get_layout(layout_name)
|
|
if layout is not None:
|
|
header_rows = await layout.full_headers(tctx, **layout_kwargs)
|
|
oob_headers = await layout.oob_headers(tctx, **layout_kwargs)
|
|
if not menu_sx:
|
|
menu_sx = await layout.mobile_menu(tctx, **layout_kwargs)
|
|
|
|
# Branch on request type
|
|
is_htmx = is_htmx_request()
|
|
|
|
if is_htmx:
|
|
# Compute content expression deps so the server sends component
|
|
# definitions the client needs for future client-side routing
|
|
extra_deps: set[str] | None = None
|
|
if page_def.content_expr is not None and page_def.data_expr is None:
|
|
from .deps import components_needed
|
|
from .parser import serialize
|
|
try:
|
|
content_src = serialize(page_def.content_expr)
|
|
extra_deps = components_needed(content_src, get_component_env())
|
|
except Exception:
|
|
pass # non-critical — client will just fall back to server
|
|
|
|
return sx_response(await oob_page_sx(
|
|
oobs=oob_headers if oob_headers else "",
|
|
filter=filter_sx,
|
|
aside=aside_sx,
|
|
content=content_sx,
|
|
menu=menu_sx,
|
|
), extra_component_names=extra_deps)
|
|
else:
|
|
return await full_page_sx(
|
|
tctx,
|
|
header_rows=header_rows,
|
|
filter=filter_sx,
|
|
aside=aside_sx,
|
|
content=content_sx,
|
|
menu=menu_sx,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Streaming page execution (Phase 6: Streaming & Suspense)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def execute_page_streaming(
|
|
page_def: PageDef,
|
|
service_name: str,
|
|
url_params: dict[str, Any] | None = None,
|
|
):
|
|
"""Execute a page with streaming response.
|
|
|
|
All context-dependent setup (g, request, current_app access) runs in
|
|
this regular async function — called while the request context is live.
|
|
Returns an async generator that yields pre-computed HTML chunks and
|
|
awaits already-created tasks (no further context access needed).
|
|
"""
|
|
import asyncio
|
|
from .jinja_bridge import get_component_env, _get_request_context
|
|
from .async_eval import async_eval
|
|
from .page import get_template_context
|
|
from .helpers import (
|
|
render_to_html as _helpers_render_to_html,
|
|
sx_page_streaming_parts,
|
|
sx_streaming_resolve_script,
|
|
)
|
|
from .parser import SxExpr, serialize as sx_serialize
|
|
from .layouts import get_layout
|
|
|
|
if url_params is None:
|
|
url_params = {}
|
|
|
|
env = dict(page_def.closure)
|
|
env.update(get_component_env())
|
|
env.update(get_page_helpers(service_name))
|
|
for key, val in url_params.items():
|
|
kebab = key.replace("_", "-")
|
|
env[kebab] = val
|
|
env[key] = val
|
|
|
|
ctx = _get_request_context()
|
|
tctx = await get_template_context()
|
|
|
|
# Build fallback expressions
|
|
if page_def.fallback_expr is not None:
|
|
fallback_sx = sx_serialize(page_def.fallback_expr)
|
|
else:
|
|
fallback_sx = (
|
|
'(div :class "p-8 animate-pulse"'
|
|
' (div :class "h-8 bg-stone-200 rounded mb-4 w-1/3")'
|
|
' (div :class "h-64 bg-stone-200 rounded"))'
|
|
)
|
|
header_fallback = '(div :class "h-12 bg-stone-200 animate-pulse")'
|
|
|
|
# Resolve layout
|
|
layout = None
|
|
layout_kwargs: dict[str, Any] = {}
|
|
if page_def.layout is not None:
|
|
if isinstance(page_def.layout, str):
|
|
layout_name = page_def.layout
|
|
elif isinstance(page_def.layout, list):
|
|
from .types import Keyword as SxKeyword, Symbol as SxSymbol
|
|
raw = page_def.layout
|
|
first = raw[0]
|
|
layout_name = (
|
|
first.name if isinstance(first, (SxKeyword, SxSymbol))
|
|
else str(first)
|
|
)
|
|
i = 1
|
|
while i < len(raw):
|
|
k = raw[i]
|
|
if isinstance(k, SxKeyword) and i + 1 < len(raw):
|
|
resolved = await async_eval(raw[i + 1], env, ctx)
|
|
layout_kwargs[k.name.replace("-", "_")] = resolved
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
else:
|
|
layout_name = str(page_def.layout)
|
|
layout = get_layout(layout_name)
|
|
|
|
# --- Launch concurrent IO tasks (inherit context via create_task) ---
|
|
|
|
_stream_queue: asyncio.Queue = asyncio.Queue()
|
|
_multi_stream = False
|
|
|
|
async def _eval_data_and_content():
|
|
"""Evaluate :data then :content.
|
|
|
|
If :data returns an async generator (multi-stream mode), iterate it
|
|
and push each (stream_id, content_sx) to _stream_queue incrementally.
|
|
The main stream loop drains the queue and sends resolve scripts as
|
|
items arrive — giving true staggered streaming.
|
|
"""
|
|
nonlocal _multi_stream
|
|
try:
|
|
data_env = dict(env)
|
|
if page_def.data_expr is not None:
|
|
data_result = await async_eval(page_def.data_expr, data_env, ctx)
|
|
# Async generator: multi-stream mode
|
|
if hasattr(data_result, '__aiter__'):
|
|
_multi_stream = True
|
|
async for chunk in data_result:
|
|
if not isinstance(chunk, dict):
|
|
continue
|
|
chunk = dict(chunk) # copy so pop doesn't mutate
|
|
stream_id = chunk.pop("stream-id", "stream-content")
|
|
chunk_env = dict(env)
|
|
for k, v in chunk.items():
|
|
chunk_env[k.replace("_", "-")] = v
|
|
content_sx = await _eval_slot(page_def.content_expr, chunk_env, ctx) if page_def.content_expr else ""
|
|
await _stream_queue.put(("data", stream_id, content_sx))
|
|
await _stream_queue.put(("data-done",))
|
|
return
|
|
if isinstance(data_result, dict):
|
|
for k, v in data_result.items():
|
|
data_env[k.replace("_", "-")] = v
|
|
content_sx = await _eval_slot(page_def.content_expr, data_env, ctx) if page_def.content_expr else ""
|
|
filter_sx = await _eval_slot(page_def.filter_expr, data_env, ctx) if page_def.filter_expr else ""
|
|
aside_sx = await _eval_slot(page_def.aside_expr, data_env, ctx) if page_def.aside_expr else ""
|
|
menu_sx = await _eval_slot(page_def.menu_expr, data_env, ctx) if page_def.menu_expr else ""
|
|
await _stream_queue.put(("data-single", content_sx, filter_sx, aside_sx, menu_sx))
|
|
except Exception as e:
|
|
logger.error("Streaming data task failed: %s", e)
|
|
await _stream_queue.put(("data-done",))
|
|
|
|
async def _eval_headers():
|
|
try:
|
|
if layout is None:
|
|
await _stream_queue.put(("headers", "", ""))
|
|
return
|
|
rows = await layout.full_headers(tctx, **layout_kwargs)
|
|
menu = await layout.mobile_menu(tctx, **layout_kwargs)
|
|
await _stream_queue.put(("headers", rows, menu))
|
|
except Exception as e:
|
|
logger.error("Streaming headers task failed: %s", e)
|
|
await _stream_queue.put(("headers", "", ""))
|
|
|
|
data_task = asyncio.create_task(_eval_data_and_content())
|
|
header_task = asyncio.create_task(_eval_headers())
|
|
|
|
# --- Build initial shell as HTML (still in request context) ---
|
|
# Render to HTML so [data-suspense] elements are real DOM immediately.
|
|
# No dependency on sx-browser.js boot timing for the initial shell.
|
|
|
|
suspense_header_sx = f'(~suspense :id "stream-headers" :fallback {header_fallback})'
|
|
|
|
# When :shell is provided, it renders directly as the content slot
|
|
# (it contains its own ~suspense for the data-dependent part).
|
|
# Otherwise, wrap the entire :content in a single suspense.
|
|
if page_def.shell_expr is not None:
|
|
shell_content_sx = await _eval_slot(page_def.shell_expr, env, ctx)
|
|
suspense_content_sx = shell_content_sx
|
|
else:
|
|
suspense_content_sx = f'(~suspense :id "stream-content" :fallback {fallback_sx})'
|
|
|
|
initial_page_html = await _helpers_render_to_html("app-body",
|
|
header_rows=SxExpr(suspense_header_sx),
|
|
content=SxExpr(suspense_content_sx),
|
|
)
|
|
|
|
# Include layout component refs + page content so the scan picks up
|
|
# their transitive deps (e.g. ~cart-mini, ~auth-menu in headers).
|
|
layout_refs = ""
|
|
if layout is not None and hasattr(layout, "component_names"):
|
|
layout_refs = " ".join(f"({n})" for n in layout.component_names)
|
|
content_ref = ""
|
|
if page_def.content_expr is not None:
|
|
content_ref = sx_serialize(page_def.content_expr)
|
|
shell_ref = ""
|
|
if page_def.shell_expr is not None:
|
|
shell_ref = sx_serialize(page_def.shell_expr)
|
|
page_sx_for_scan = f'(<> {layout_refs} {content_ref} {shell_ref} (~app-body :header-rows {suspense_header_sx} :content {suspense_content_sx}))'
|
|
shell, tail = sx_page_streaming_parts(
|
|
tctx, initial_page_html, page_sx=page_sx_for_scan,
|
|
)
|
|
|
|
# Capture component env + extras scanner while we still have context.
|
|
# Resolved SX may reference components not in the initial scan
|
|
# (e.g. ~cart-mini from IO-generated header content).
|
|
from .jinja_bridge import components_for_page as _comp_scan
|
|
from quart import current_app as _ca
|
|
_service = _ca.name
|
|
# Track which components were already sent in the shell
|
|
_shell_scan = page_sx_for_scan
|
|
|
|
def _extra_defs(sx_source: str) -> str:
|
|
"""Return component defs needed by sx_source but not in shell."""
|
|
from .deps import components_needed
|
|
comp_env = dict(get_component_env())
|
|
shell_needed = components_needed(_shell_scan, comp_env)
|
|
resolve_needed = components_needed(sx_source, comp_env)
|
|
extra = resolve_needed - shell_needed
|
|
if not extra:
|
|
return ""
|
|
from .parser import serialize
|
|
from .types import Component
|
|
parts = []
|
|
for key, val in comp_env.items():
|
|
if isinstance(val, Component) and (f"~{val.name}" in extra or key in extra):
|
|
param_strs = ["&key"] + list(val.params)
|
|
if val.has_children:
|
|
param_strs.extend(["&rest", "children"])
|
|
params_sx = "(" + " ".join(param_strs) + ")"
|
|
body_sx = serialize(val.body, pretty=True)
|
|
parts.append(f"(defcomp ~{val.name} {params_sx} {body_sx})")
|
|
return "\n".join(parts)
|
|
|
|
# --- Return async generator that yields chunks ---
|
|
# No context access needed below — just awaiting tasks and yielding strings.
|
|
|
|
async def _stream_chunks():
|
|
yield shell + tail
|
|
|
|
# Both tasks push tagged items onto _stream_queue. We drain until
|
|
# both are done. Items: ("headers", rows, menu), ("data-single", ...),
|
|
# ("data", stream_id, sx), ("data-done",).
|
|
remaining = 2 # waiting for: headers + data
|
|
while remaining > 0:
|
|
item = await _stream_queue.get()
|
|
kind = item[0]
|
|
try:
|
|
if kind == "headers":
|
|
_, header_rows, header_menu = item
|
|
remaining -= 1
|
|
if header_rows:
|
|
extras = _extra_defs(header_rows)
|
|
yield sx_streaming_resolve_script("stream-headers", header_rows, extras)
|
|
elif kind == "data-single":
|
|
_, content_sx, filter_sx, aside_sx, menu_sx = item
|
|
remaining -= 1
|
|
extras = _extra_defs(content_sx)
|
|
yield sx_streaming_resolve_script("stream-content", content_sx, extras)
|
|
elif kind == "data":
|
|
_, stream_id, content_sx = item
|
|
extras = _extra_defs(content_sx)
|
|
yield sx_streaming_resolve_script(stream_id, content_sx, extras)
|
|
elif kind == "data-done":
|
|
remaining -= 1
|
|
except Exception as e:
|
|
logger.error("Streaming resolve failed for %s: %s", kind, e)
|
|
|
|
yield "\n</body>\n</html>"
|
|
|
|
return _stream_chunks()
|
|
|
|
|
|
async def execute_page_streaming_oob(
|
|
page_def: PageDef,
|
|
service_name: str,
|
|
url_params: dict[str, Any] | None = None,
|
|
):
|
|
"""Execute a streaming page for HTMX/SX requests.
|
|
|
|
Like execute_page_streaming but yields OOB SX swap format instead of a
|
|
full HTML document:
|
|
1. First yield: OOB SX with shell content (suspense skeletons) + CSS + defs
|
|
2. Subsequent yields: __sxResolve script tags as data resolves
|
|
|
|
The client uses streaming fetch (ReadableStream) to process the OOB swap
|
|
immediately and then execute resolve scripts as they arrive.
|
|
"""
|
|
import asyncio
|
|
from .jinja_bridge import get_component_env, _get_request_context
|
|
from .async_eval import async_eval
|
|
from .page import get_template_context
|
|
from .helpers import (
|
|
oob_page_sx,
|
|
sx_streaming_resolve_script,
|
|
components_for_request,
|
|
SxExpr,
|
|
)
|
|
from .parser import serialize as sx_serialize
|
|
from .layouts import get_layout
|
|
|
|
if url_params is None:
|
|
url_params = {}
|
|
|
|
env = dict(page_def.closure)
|
|
env.update(get_component_env())
|
|
env.update(get_page_helpers(service_name))
|
|
for key, val in url_params.items():
|
|
kebab = key.replace("_", "-")
|
|
env[kebab] = val
|
|
env[key] = val
|
|
|
|
ctx = _get_request_context()
|
|
|
|
# Evaluate shell with suspense skeletons (no data yet)
|
|
shell_sx = ""
|
|
if page_def.shell_expr is not None:
|
|
shell_sx = await _eval_slot(page_def.shell_expr, env, ctx)
|
|
|
|
# Build initial OOB response with shell as content
|
|
tctx = await get_template_context()
|
|
|
|
# Resolve layout for OOB headers
|
|
layout = None
|
|
layout_kwargs: dict[str, Any] = {}
|
|
if page_def.layout is not None:
|
|
if isinstance(page_def.layout, str):
|
|
layout_name = page_def.layout
|
|
elif isinstance(page_def.layout, list):
|
|
from .types import Keyword as SxKeyword, Symbol as SxSymbol
|
|
raw = page_def.layout
|
|
first = raw[0]
|
|
layout_name = (
|
|
first.name if isinstance(first, (SxKeyword, SxSymbol))
|
|
else str(first)
|
|
)
|
|
i = 1
|
|
while i < len(raw):
|
|
k = raw[i]
|
|
if isinstance(k, SxKeyword) and i + 1 < len(raw):
|
|
resolved = await async_eval(raw[i + 1], env, ctx)
|
|
layout_kwargs[k.name.replace("-", "_")] = resolved
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
else:
|
|
layout_name = str(page_def.layout)
|
|
layout = get_layout(layout_name)
|
|
|
|
# Launch concurrent tasks
|
|
_stream_queue: asyncio.Queue = asyncio.Queue()
|
|
|
|
async def _eval_data():
|
|
try:
|
|
if page_def.data_expr is not None:
|
|
data_result = await async_eval(page_def.data_expr, env, ctx)
|
|
if hasattr(data_result, '__aiter__'):
|
|
async for chunk in data_result:
|
|
if not isinstance(chunk, dict):
|
|
continue
|
|
chunk = dict(chunk)
|
|
stream_id = chunk.pop("stream-id", "stream-content")
|
|
chunk_env = dict(env)
|
|
for k, v in chunk.items():
|
|
chunk_env[k.replace("_", "-")] = v
|
|
content_sx = await _eval_slot(page_def.content_expr, chunk_env, ctx) if page_def.content_expr else ""
|
|
await _stream_queue.put(("data", stream_id, content_sx))
|
|
await _stream_queue.put(("data-done",))
|
|
return
|
|
await _stream_queue.put(("data-done",))
|
|
except Exception as e:
|
|
logger.error("Streaming OOB data task failed: %s", e)
|
|
await _stream_queue.put(("data-done",))
|
|
|
|
async def _eval_oob_headers():
|
|
try:
|
|
if layout is not None:
|
|
oob_headers = await layout.oob_headers(tctx, **layout_kwargs)
|
|
await _stream_queue.put(("headers", oob_headers))
|
|
else:
|
|
await _stream_queue.put(("headers", ""))
|
|
except Exception as e:
|
|
logger.error("Streaming OOB headers task failed: %s", e)
|
|
await _stream_queue.put(("headers", ""))
|
|
|
|
data_task = asyncio.create_task(_eval_data())
|
|
header_task = asyncio.create_task(_eval_oob_headers())
|
|
|
|
# Build initial OOB body with shell content (skeletons in place)
|
|
oob_body = await oob_page_sx(
|
|
oobs="", # headers will arrive via resolve script
|
|
content=shell_sx,
|
|
)
|
|
|
|
# Prepend component definitions + CSS (like sx_response does)
|
|
from quart import request
|
|
comp_defs = components_for_request(oob_body)
|
|
body = oob_body
|
|
if comp_defs:
|
|
body = (f'<script type="text/sx" data-components>'
|
|
f'{comp_defs}</script>\n{body}')
|
|
|
|
from .css_registry import scan_classes_from_sx, lookup_rules, registry_loaded
|
|
if registry_loaded():
|
|
new_classes = scan_classes_from_sx(oob_body)
|
|
if comp_defs:
|
|
new_classes.update(scan_classes_from_sx(comp_defs))
|
|
known_raw = request.headers.get("SX-Css", "")
|
|
if known_raw:
|
|
from .css_registry import lookup_css_hash
|
|
if len(known_raw) <= 16:
|
|
looked_up = lookup_css_hash(known_raw)
|
|
known_classes = looked_up if looked_up is not None else set()
|
|
else:
|
|
known_classes = set(known_raw.split(","))
|
|
new_classes -= known_classes
|
|
if new_classes:
|
|
new_rules = lookup_rules(new_classes)
|
|
if new_rules:
|
|
body = f'<style data-sx-css>{new_rules}</style>\n{body}'
|
|
|
|
# Capture component env for extra defs in resolve chunks
|
|
from .jinja_bridge import components_for_page as _comp_scan
|
|
_base_scan = oob_body
|
|
|
|
def _extra_defs(sx_source: str) -> str:
|
|
from .deps import components_needed
|
|
comp_env = dict(get_component_env())
|
|
base_needed = components_needed(_base_scan, comp_env)
|
|
resolve_needed = components_needed(sx_source, comp_env)
|
|
extra = resolve_needed - base_needed
|
|
if not extra:
|
|
return ""
|
|
from .parser import serialize
|
|
from .types import Component
|
|
parts = []
|
|
for key, val in comp_env.items():
|
|
if isinstance(val, Component) and (f"~{val.name}" in extra or key in extra):
|
|
param_strs = ["&key"] + list(val.params)
|
|
if val.has_children:
|
|
param_strs.extend(["&rest", "children"])
|
|
params_sx = "(" + " ".join(param_strs) + ")"
|
|
body_sx = serialize(val.body, pretty=True)
|
|
parts.append(f"(defcomp ~{val.name} {params_sx} {body_sx})")
|
|
return "\n".join(parts)
|
|
|
|
# Yield chunks
|
|
async def _stream_oob_chunks():
|
|
# First chunk: OOB swap with skeletons
|
|
yield body
|
|
|
|
# Drain queue for resolve scripts
|
|
remaining = 2 # headers + data
|
|
while remaining > 0:
|
|
item = await _stream_queue.get()
|
|
kind = item[0]
|
|
try:
|
|
if kind == "headers":
|
|
_, oob_hdr = item
|
|
remaining -= 1
|
|
# Headers don't need resolve scripts for OOB — they're
|
|
# handled by OOB swap attributes in the SX content itself.
|
|
# But if we have header content, send a resolve for it.
|
|
if oob_hdr:
|
|
extras = _extra_defs(oob_hdr)
|
|
yield sx_streaming_resolve_script("stream-headers", oob_hdr, extras)
|
|
elif kind == "data":
|
|
_, stream_id, content_sx = item
|
|
extras = _extra_defs(content_sx)
|
|
yield sx_streaming_resolve_script(stream_id, content_sx, extras)
|
|
elif kind == "data-done":
|
|
remaining -= 1
|
|
except Exception as e:
|
|
logger.error("Streaming OOB resolve failed for %s: %s", kind, e)
|
|
|
|
return _stream_oob_chunks()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Blueprint mounting
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def compute_page_render_plans(service_name: str) -> None:
|
|
"""Pre-compute and cache render plans for all pages in a service.
|
|
|
|
Must be called after components are loaded (compute_all_deps/io_refs done)
|
|
and pages are registered. Stores plans on PageDef.render_plan.
|
|
"""
|
|
import time
|
|
from .parser import serialize
|
|
from .deps import page_render_plan, get_all_io_names
|
|
from .jinja_bridge import _COMPONENT_ENV
|
|
|
|
t0 = time.monotonic()
|
|
io_names = get_all_io_names()
|
|
pages = get_all_pages(service_name)
|
|
count = 0
|
|
for page_def in pages.values():
|
|
if page_def.content_expr is not None:
|
|
content_src = serialize(page_def.content_expr)
|
|
page_def.render_plan = page_render_plan(content_src, _COMPONENT_ENV, io_names)
|
|
count += 1
|
|
elapsed = (time.monotonic() - t0) * 1000
|
|
logger.info("Computed render plans for %d pages in %s (%.1fms)", count, service_name, elapsed)
|
|
|
|
|
|
def auto_mount_pages(app: Any, service_name: str) -> None:
|
|
"""Auto-mount all registered defpages for a service directly on the app.
|
|
|
|
Pages must have absolute paths (from the service URL root).
|
|
Called once per service in app.py after setup_*_pages().
|
|
|
|
Also mounts the /sx/data/ endpoint for client-side data fetching.
|
|
"""
|
|
pages = get_all_pages(service_name)
|
|
|
|
# Pre-compute render plans (which components render where)
|
|
compute_page_render_plans(service_name)
|
|
|
|
for page_def in pages.values():
|
|
_mount_one_page(app, service_name, page_def)
|
|
logger.info("Auto-mounted %d defpages for %s", len(pages), service_name)
|
|
|
|
# Mount page data endpoint for client-side rendering of :data pages
|
|
has_data_pages = any(p.data_expr is not None for p in pages.values())
|
|
if has_data_pages:
|
|
auto_mount_page_data(app, service_name)
|
|
|
|
# Mount IO proxy endpoint for Phase 5: client-side IO primitives
|
|
mount_io_endpoint(app, service_name)
|
|
|
|
# Mount service worker at root scope for offline data layer
|
|
mount_service_worker(app)
|
|
|
|
# Mount action endpoint for Phase 7c: optimistic data mutations
|
|
mount_action_endpoint(app, service_name)
|
|
|
|
|
|
def mount_pages(bp: Any, service_name: str,
|
|
names: set[str] | list[str] | None = None) -> None:
|
|
"""Mount registered PageDef routes onto a Quart Blueprint.
|
|
|
|
For each PageDef, adds a GET route with appropriate auth/cache
|
|
decorators. Coexists with existing Python routes on the same blueprint.
|
|
|
|
If *names* is given, only mount pages whose name is in the set.
|
|
"""
|
|
from quart import make_response
|
|
|
|
pages = get_all_pages(service_name)
|
|
|
|
for page_def in pages.values():
|
|
if names is not None and page_def.name not in names:
|
|
continue
|
|
_mount_one_page(bp, service_name, page_def)
|
|
|
|
|
|
def _mount_one_page(bp: Any, service_name: str, page_def: PageDef) -> None:
|
|
"""Mount a single PageDef as a GET route on the blueprint."""
|
|
from quart import make_response, Response
|
|
|
|
if page_def.stream:
|
|
# Streaming response: yields chunks as IO resolves
|
|
async def page_view(**kwargs: Any) -> Any:
|
|
from shared.browser.app.utils.htmx import is_htmx_request
|
|
current = get_page(service_name, page_def.name) or page_def
|
|
if is_htmx_request():
|
|
# Streaming OOB: shell with skeletons first, then resolve scripts
|
|
gen = await execute_page_streaming_oob(
|
|
current, service_name, url_params=kwargs,
|
|
)
|
|
return Response(gen, content_type="text/sx; charset=utf-8")
|
|
# Full page streaming: HTML document with inline resolve scripts
|
|
gen = await execute_page_streaming(
|
|
current, service_name, url_params=kwargs,
|
|
)
|
|
return Response(gen, content_type="text/html; charset=utf-8")
|
|
else:
|
|
# Standard non-streaming response
|
|
async def page_view(**kwargs: Any) -> Any:
|
|
current = get_page(service_name, page_def.name) or page_def
|
|
result = await execute_page(current, service_name, url_params=kwargs)
|
|
if hasattr(result, "status_code"):
|
|
return result
|
|
return await make_response(result, 200)
|
|
|
|
# Give the view function a unique name for Quart's routing
|
|
page_view.__name__ = f"defpage_{page_def.name.replace('-', '_')}"
|
|
page_view.__qualname__ = page_view.__name__
|
|
|
|
# Apply auth decorator
|
|
view_fn = _apply_auth(page_view, page_def.auth)
|
|
|
|
# Apply cache decorator
|
|
if page_def.cache:
|
|
view_fn = _apply_cache(view_fn, page_def.cache)
|
|
|
|
# Register the route
|
|
bp.add_url_rule(
|
|
page_def.path,
|
|
endpoint=page_view.__name__,
|
|
view_func=view_fn,
|
|
methods=["GET"],
|
|
)
|
|
logger.info("Mounted defpage %s:%s → GET %s", service_name, page_def.name, page_def.path)
|
|
|
|
|
|
def _apply_auth(fn: Any, auth: str | list) -> Any:
|
|
"""Wrap a view function with the appropriate auth decorator."""
|
|
if auth == "public":
|
|
return fn
|
|
if auth == "login":
|
|
from shared.browser.app.authz import require_login
|
|
return require_login(fn)
|
|
if auth == "admin":
|
|
from shared.browser.app.authz import require_admin
|
|
return require_admin(fn)
|
|
if auth == "post_author":
|
|
from shared.browser.app.authz import require_post_author
|
|
return require_post_author(fn)
|
|
if isinstance(auth, list) and auth and auth[0] == "rights":
|
|
from shared.browser.app.authz import require_rights
|
|
return require_rights(*auth[1:])(fn)
|
|
return fn
|
|
|
|
|
|
def _apply_cache(fn: Any, cache: dict) -> Any:
|
|
"""Wrap a view function with cache_page decorator."""
|
|
from shared.browser.app.redis_cacher import cache_page
|
|
ttl = cache.get("ttl", 0)
|
|
tag = cache.get("tag")
|
|
scope = cache.get("scope", "user")
|
|
return cache_page(ttl=ttl, tag=tag, scope=scope)(fn)
|
|
|
|
|
|
async def _check_page_auth(auth: str | list) -> Any | None:
|
|
"""Check auth for the data endpoint. Returns None if OK, or a response."""
|
|
from quart import g, abort as quart_abort
|
|
|
|
if auth == "public":
|
|
return None
|
|
user = g.get("user")
|
|
if auth == "login":
|
|
if not user:
|
|
quart_abort(401)
|
|
elif auth == "admin":
|
|
if not user or not user.get("rights", {}).get("admin"):
|
|
quart_abort(403)
|
|
elif isinstance(auth, list) and auth and auth[0] == "rights":
|
|
if not user:
|
|
quart_abort(401)
|
|
user_rights = set(user.get("rights", {}).keys())
|
|
required = set(auth[1:])
|
|
if not required.issubset(user_rights):
|
|
quart_abort(403)
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Page data endpoint — evaluate :data expression, return SX
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def evaluate_page_data(
|
|
page_def: PageDef,
|
|
service_name: str,
|
|
url_params: dict[str, Any] | None = None,
|
|
) -> str:
|
|
"""Evaluate a defpage's :data expression and return result as SX.
|
|
|
|
This is the data-only counterpart to execute_page(). The client
|
|
fetches this when it has all component definitions but needs the
|
|
data bindings to render a :data page client-side.
|
|
|
|
Returns SX wire format (e.g. ``{:posts (list ...) :count 42}``),
|
|
parsed by the client's SX parser and merged into the eval env.
|
|
"""
|
|
from .jinja_bridge import get_component_env, _get_request_context
|
|
from .async_eval import async_eval
|
|
from .parser import serialize
|
|
|
|
if page_def.data_expr is None:
|
|
return "nil"
|
|
|
|
if url_params is None:
|
|
url_params = {}
|
|
|
|
# Build environment (same as execute_page)
|
|
env = dict(page_def.closure)
|
|
env.update(get_component_env())
|
|
env.update(get_page_helpers(service_name))
|
|
|
|
for key, val in url_params.items():
|
|
kebab = key.replace("_", "-")
|
|
env[kebab] = val
|
|
env[key] = val
|
|
|
|
ctx = _get_request_context()
|
|
|
|
data_result = await async_eval(page_def.data_expr, env, ctx)
|
|
|
|
# Multi-stream: async generator can't be serialized as a single dict.
|
|
# Return nil to signal the client to fall back to server-side rendering.
|
|
if hasattr(data_result, '__aiter__'):
|
|
# Close the generator cleanly
|
|
await data_result.aclose()
|
|
return "nil"
|
|
|
|
# Kebab-case dict keys (matching execute_page)
|
|
if isinstance(data_result, dict):
|
|
data_result = {
|
|
k.replace("_", "-"): v for k, v in data_result.items()
|
|
}
|
|
|
|
# Serialize the result as SX
|
|
return serialize(data_result)
|
|
|
|
|
|
def auto_mount_page_data(app: Any, service_name: str) -> None:
|
|
"""Mount a single /sx/data/ endpoint that serves page data as SX.
|
|
|
|
For each defpage with :data, the client can GET /sx/data/<page-name>
|
|
(with URL params as query args) and receive the evaluated :data
|
|
result serialized as SX wire format (text/sx).
|
|
|
|
Auth is enforced per-page: the endpoint looks up the page's auth
|
|
setting and checks it before evaluating the data expression.
|
|
"""
|
|
from quart import make_response, request, abort as quart_abort
|
|
|
|
async def page_data_view(page_name: str) -> Any:
|
|
page_def = get_page(service_name, page_name)
|
|
if page_def is None:
|
|
quart_abort(404)
|
|
|
|
if page_def.data_expr is None:
|
|
quart_abort(404)
|
|
|
|
# Check auth — same enforcement as the page route itself
|
|
auth_error = await _check_page_auth(page_def.auth)
|
|
if auth_error is not None:
|
|
return auth_error
|
|
|
|
# Extract URL params from query string
|
|
url_params = dict(request.args)
|
|
|
|
result_sx = await evaluate_page_data(
|
|
page_def, service_name, url_params=url_params,
|
|
)
|
|
|
|
resp = await make_response(result_sx, 200)
|
|
resp.content_type = "text/sx; charset=utf-8"
|
|
return resp
|
|
|
|
page_data_view.__name__ = "sx_page_data"
|
|
page_data_view.__qualname__ = "sx_page_data"
|
|
|
|
app.add_url_rule(
|
|
"/sx/data/<page_name>",
|
|
endpoint="sx_page_data",
|
|
view_func=page_data_view,
|
|
methods=["GET"],
|
|
)
|
|
logger.info("Mounted page data endpoint for %s at /sx/data/<page_name>", service_name)
|
|
|
|
|
|
def mount_io_endpoint(app: Any, service_name: str) -> None:
|
|
"""Mount /sx/io/<name> endpoint for client-side IO primitive calls.
|
|
|
|
The client can call any allowed IO primitive or page helper via GET/POST.
|
|
Result is returned as SX wire format (text/sx).
|
|
|
|
Falls back to page helpers when the name isn't a global IO primitive,
|
|
so service-specific functions like ``highlight`` work via the proxy.
|
|
"""
|
|
import asyncio as _asyncio
|
|
from quart import make_response, request, abort as quart_abort
|
|
from .primitives_io import IO_PRIMITIVES, execute_io
|
|
from .jinja_bridge import _get_request_context
|
|
from .parser import serialize
|
|
|
|
# Build allowlist from all component IO refs across this service
|
|
from .jinja_bridge import _COMPONENT_ENV
|
|
from .types import Component as _Comp
|
|
_ALLOWED_IO: set[str] = set()
|
|
for _val in _COMPONENT_ENV.values():
|
|
if isinstance(_val, _Comp) and _val.io_refs:
|
|
_ALLOWED_IO.update(_val.io_refs)
|
|
|
|
from shared.browser.app.csrf import csrf_exempt
|
|
|
|
@csrf_exempt
|
|
async def io_proxy(name: str) -> Any:
|
|
if name not in _ALLOWED_IO:
|
|
quart_abort(403)
|
|
|
|
# Parse args from query string or JSON body
|
|
args: list = []
|
|
kwargs: dict = {}
|
|
if request.method == "GET":
|
|
for k, v in request.args.items():
|
|
if k.startswith("_arg"):
|
|
args.append(v)
|
|
else:
|
|
kwargs[k] = v
|
|
else:
|
|
data = await request.get_json(silent=True) or {}
|
|
args = data.get("args", [])
|
|
kwargs = data.get("kwargs", {})
|
|
|
|
# Try global IO primitives first
|
|
if name in IO_PRIMITIVES:
|
|
ctx = _get_request_context()
|
|
result = await execute_io(name, args, kwargs, ctx)
|
|
else:
|
|
# Fall back to page helpers (service-specific functions)
|
|
helpers = get_page_helpers(service_name)
|
|
helper_fn = helpers.get(name)
|
|
if helper_fn is None:
|
|
quart_abort(404)
|
|
result = helper_fn(*args, **kwargs) if kwargs else helper_fn(*args)
|
|
if _asyncio.iscoroutine(result):
|
|
result = await result
|
|
|
|
result_sx = serialize(result) if result is not None else "nil"
|
|
resp = await make_response(result_sx, 200)
|
|
resp.content_type = "text/sx; charset=utf-8"
|
|
resp.headers["Cache-Control"] = "public, max-age=300"
|
|
return resp
|
|
|
|
io_proxy.__name__ = "sx_io_proxy"
|
|
io_proxy.__qualname__ = "sx_io_proxy"
|
|
|
|
app.add_url_rule(
|
|
"/sx/io/<name>",
|
|
endpoint="sx_io_proxy",
|
|
view_func=io_proxy,
|
|
methods=["GET", "POST"],
|
|
)
|
|
logger.info("Mounted IO proxy for %s: %s", service_name, sorted(_ALLOWED_IO))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Service Worker mount
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_SW_MOUNTED = False
|
|
|
|
def mount_service_worker(app: Any) -> None:
|
|
"""Mount the SX service worker at /sx-sw.js (root scope).
|
|
|
|
The service worker provides offline data caching:
|
|
- /sx/data/* and /sx/io/* responses cached in IndexedDB
|
|
- /static/* assets cached via Cache API (stale-while-revalidate)
|
|
- Everything else passes through to network
|
|
"""
|
|
global _SW_MOUNTED
|
|
if _SW_MOUNTED:
|
|
return
|
|
_SW_MOUNTED = True
|
|
|
|
sw_path = os.path.join(
|
|
os.path.dirname(__file__), "..", "static", "scripts", "sx-sw.js"
|
|
)
|
|
sw_path = os.path.abspath(sw_path)
|
|
|
|
async def serve_sw():
|
|
from quart import send_file
|
|
return await send_file(
|
|
sw_path,
|
|
mimetype="application/javascript",
|
|
cache_timeout=0, # no caching — SW updates checked by browser
|
|
)
|
|
|
|
app.add_url_rule("/sx-sw.js", endpoint="sx_service_worker", view_func=serve_sw)
|
|
logger.info("Mounted service worker at /sx-sw.js")
|
|
|
|
|
|
def mount_action_endpoint(app: Any, service_name: str) -> None:
|
|
"""Mount /sx/action/<name> endpoint for client-side data mutations.
|
|
|
|
The client can POST to trigger a named action (registered via
|
|
register_page_helpers with an 'action:' prefix). The action receives
|
|
the JSON payload, performs the mutation, and returns the new page data
|
|
as SX wire format.
|
|
|
|
This is the server counterpart to submit-mutation in orchestration.sx.
|
|
"""
|
|
from quart import make_response, request, abort as quart_abort
|
|
from .parser import serialize
|
|
from shared.browser.app.csrf import csrf_exempt
|
|
|
|
@csrf_exempt
|
|
async def action_handler(name: str) -> Any:
|
|
# Look up action helper
|
|
helpers = get_page_helpers(service_name)
|
|
action_fn = helpers.get(f"action:{name}")
|
|
if action_fn is None:
|
|
quart_abort(404)
|
|
|
|
# Parse JSON payload
|
|
import asyncio as _asyncio
|
|
data = await request.get_json(silent=True) or {}
|
|
|
|
try:
|
|
result = action_fn(**data)
|
|
if _asyncio.iscoroutine(result):
|
|
result = await result
|
|
except Exception as e:
|
|
logger.warning("Action %s failed: %s", name, e)
|
|
resp = await make_response(f'(dict "error" "{e}")', 500)
|
|
resp.content_type = "text/sx; charset=utf-8"
|
|
return resp
|
|
|
|
result_sx = serialize(result) if result is not None else "nil"
|
|
resp = await make_response(result_sx, 200)
|
|
resp.content_type = "text/sx; charset=utf-8"
|
|
return resp
|
|
|
|
action_handler.__name__ = "sx_action"
|
|
action_handler.__qualname__ = "sx_action"
|
|
|
|
app.add_url_rule(
|
|
"/sx/action/<name>",
|
|
endpoint="sx_action",
|
|
view_func=action_handler,
|
|
methods=["POST"],
|
|
)
|
|
logger.info("Mounted action endpoint for %s at /sx/action/<name>", service_name)
|