Quart's ASGI layer doesn't propagate request/app context into async generator iterations. Wrap execute_page_streaming with stream_with_context so current_app and request remain accessible after each yield. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
782 lines
28 KiB
Python
782 lines
28 KiB
Python
"""
|
|
Declarative page registry and blueprint mounting.
|
|
|
|
Supports ``defpage`` s-expressions that define GET route handlers
|
|
in .sx files instead of Python. Each page is a self-contained
|
|
declaration with path, auth, layout, data, and content slots.
|
|
|
|
Usage::
|
|
|
|
from shared.sx.pages import load_page_file, mount_pages
|
|
|
|
# Load page definitions from .sx files
|
|
load_page_file("blog/sx/pages/admin.sx", "blog")
|
|
|
|
# Mount page routes onto an existing blueprint
|
|
mount_pages(bp, "blog")
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
from typing import Any
|
|
|
|
from .types import PageDef
|
|
|
|
logger = logging.getLogger("sx.pages")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Registry — service → page-name → PageDef
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_PAGE_REGISTRY: dict[str, dict[str, PageDef]] = {}
|
|
_PAGE_HELPERS: dict[str, dict[str, Any]] = {} # service → name → callable
|
|
|
|
|
|
def register_page(service: str, page_def: PageDef) -> None:
|
|
"""Register a page definition for a service."""
|
|
if service not in _PAGE_REGISTRY:
|
|
_PAGE_REGISTRY[service] = {}
|
|
_PAGE_REGISTRY[service][page_def.name] = page_def
|
|
logger.debug("Registered page %s:%s path=%s", service, page_def.name, page_def.path)
|
|
|
|
|
|
def get_page(service: str, name: str) -> PageDef | None:
|
|
"""Look up a registered page by service and name."""
|
|
return _PAGE_REGISTRY.get(service, {}).get(name)
|
|
|
|
|
|
def get_all_pages(service: str) -> dict[str, PageDef]:
|
|
"""Return all pages for a service."""
|
|
return dict(_PAGE_REGISTRY.get(service, {}))
|
|
|
|
|
|
def clear_pages(service: str | None = None) -> None:
|
|
"""Clear page registry. If service given, clear only that service."""
|
|
if service is None:
|
|
_PAGE_REGISTRY.clear()
|
|
else:
|
|
_PAGE_REGISTRY.pop(service, None)
|
|
|
|
|
|
def register_page_helpers(service: str, helpers: dict[str, Any]) -> None:
|
|
"""Register Python functions available in defpage content expressions.
|
|
|
|
These are injected into the evaluation environment when executing
|
|
defpage content, allowing defpage to call into Python::
|
|
|
|
register_page_helpers("sx", {
|
|
"docs-content": docs_content_sx,
|
|
"reference-content": reference_content_sx,
|
|
})
|
|
|
|
Then in .sx::
|
|
|
|
(defpage docs-page
|
|
:path "/docs/<slug>"
|
|
:auth :public
|
|
:content (docs-content slug))
|
|
"""
|
|
from .boundary import validate_helper, validate_boundary_value
|
|
import asyncio
|
|
import functools
|
|
|
|
for name in helpers:
|
|
validate_helper(service, name)
|
|
|
|
# Wrap helpers to validate return values at the boundary
|
|
wrapped: dict[str, Any] = {}
|
|
for name, fn in helpers.items():
|
|
if asyncio.iscoroutinefunction(fn):
|
|
@functools.wraps(fn)
|
|
async def _async_wrap(*a, _fn=fn, _name=name, **kw):
|
|
result = await _fn(*a, **kw)
|
|
validate_boundary_value(result, context=f"helper {_name!r}")
|
|
return result
|
|
wrapped[name] = _async_wrap
|
|
else:
|
|
@functools.wraps(fn)
|
|
def _sync_wrap(*a, _fn=fn, _name=name, **kw):
|
|
result = _fn(*a, **kw)
|
|
validate_boundary_value(result, context=f"helper {_name!r}")
|
|
return result
|
|
wrapped[name] = _sync_wrap
|
|
|
|
if service not in _PAGE_HELPERS:
|
|
_PAGE_HELPERS[service] = {}
|
|
_PAGE_HELPERS[service].update(wrapped)
|
|
|
|
|
|
def get_page_helpers(service: str) -> dict[str, Any]:
|
|
"""Return registered page helpers for a service."""
|
|
return dict(_PAGE_HELPERS.get(service, {}))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Loading — parse .sx files and collect PageDef instances
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def load_page_file(filepath: str, service_name: str) -> list[PageDef]:
|
|
"""Parse an .sx file, evaluate it, and register any PageDef values."""
|
|
from .parser import parse_all
|
|
from .evaluator import _eval as _raw_eval, _trampoline
|
|
_eval = lambda expr, env: _trampoline(_raw_eval(expr, env))
|
|
from .jinja_bridge import get_component_env
|
|
|
|
with open(filepath, encoding="utf-8") as f:
|
|
source = f.read()
|
|
|
|
# Seed env with component definitions so pages can reference components
|
|
env = dict(get_component_env())
|
|
exprs = parse_all(source)
|
|
pages: list[PageDef] = []
|
|
|
|
for expr in exprs:
|
|
_eval(expr, env)
|
|
|
|
# Collect all PageDef values from the env
|
|
for key, val in env.items():
|
|
if isinstance(val, PageDef):
|
|
register_page(service_name, val)
|
|
pages.append(val)
|
|
|
|
return pages
|
|
|
|
|
|
def load_page_dir(directory: str, service_name: str) -> list[PageDef]:
|
|
"""Load all .sx files from a directory and register pages."""
|
|
import glob as glob_mod
|
|
pages: list[PageDef] = []
|
|
for filepath in sorted(glob_mod.glob(os.path.join(directory, "*.sx"))):
|
|
pages.extend(load_page_file(filepath, service_name))
|
|
return pages
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Page execution
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _eval_slot(expr: Any, env: dict, ctx: Any) -> str:
|
|
"""Evaluate a page slot expression and return an sx source string.
|
|
|
|
Expands component calls (so IO in the body executes) but serializes
|
|
the result as SX wire format, not HTML.
|
|
"""
|
|
from .async_eval import async_eval_slot_to_sx
|
|
return await async_eval_slot_to_sx(expr, env, ctx)
|
|
|
|
|
|
async def execute_page(
|
|
page_def: PageDef,
|
|
service_name: str,
|
|
url_params: dict[str, Any] | None = None,
|
|
) -> str:
|
|
"""Execute a declarative page and return the full response string.
|
|
|
|
1. Build env from component env + page closure + URL params
|
|
2. If :data — async_eval(data_expr) → merge result dict into env
|
|
3. Render slots: async_eval_to_sx(content_expr) etc.
|
|
4. get_template_context() for header construction
|
|
5. Resolve layout → header rows
|
|
6. Branch: full_page_sx() vs oob_page_sx() based on is_htmx_request()
|
|
"""
|
|
from .jinja_bridge import get_component_env, _get_request_context
|
|
from .async_eval import async_eval
|
|
from .page import get_template_context
|
|
from .helpers import full_page_sx, oob_page_sx, sx_response
|
|
from .layouts import get_layout
|
|
from shared.browser.app.utils.htmx import is_htmx_request
|
|
|
|
if url_params is None:
|
|
url_params = {}
|
|
|
|
# Build environment
|
|
env = dict(get_component_env())
|
|
env.update(get_page_helpers(service_name))
|
|
env.update(page_def.closure)
|
|
|
|
# Inject URL params as kebab-case symbols
|
|
for key, val in url_params.items():
|
|
kebab = key.replace("_", "-")
|
|
env[kebab] = val
|
|
env[key] = val # also provide snake_case for convenience
|
|
|
|
# Get request context for I/O primitives
|
|
ctx = _get_request_context()
|
|
|
|
# Evaluate :data expression if present
|
|
if page_def.data_expr is not None:
|
|
data_result = await async_eval(page_def.data_expr, env, ctx)
|
|
if isinstance(data_result, dict):
|
|
# Merge with kebab-case keys so SX symbols can reference them
|
|
for k, v in data_result.items():
|
|
env[k.replace("_", "-")] = v
|
|
|
|
# Render content slot (required)
|
|
content_sx = await _eval_slot(page_def.content_expr, env, ctx)
|
|
|
|
# Render optional slots
|
|
filter_sx = ""
|
|
if page_def.filter_expr is not None:
|
|
filter_sx = await _eval_slot(page_def.filter_expr, env, ctx)
|
|
|
|
aside_sx = ""
|
|
if page_def.aside_expr is not None:
|
|
aside_sx = await _eval_slot(page_def.aside_expr, env, ctx)
|
|
|
|
menu_sx = ""
|
|
if page_def.menu_expr is not None:
|
|
menu_sx = await _eval_slot(page_def.menu_expr, env, ctx)
|
|
|
|
# Resolve layout → header rows + mobile menu fallback
|
|
tctx = await get_template_context()
|
|
header_rows = ""
|
|
oob_headers = ""
|
|
layout_kwargs: dict[str, Any] = {}
|
|
|
|
if page_def.layout is not None:
|
|
if isinstance(page_def.layout, str):
|
|
layout_name = page_def.layout
|
|
elif isinstance(page_def.layout, list):
|
|
# (:layout-name :key val ...) — unevaluated list from defpage
|
|
# Evaluate each value in the current env to resolve dynamic bindings
|
|
from .types import Keyword as SxKeyword, Symbol as SxSymbol
|
|
raw = page_def.layout
|
|
# First element is layout name (keyword or symbol or string)
|
|
first = raw[0]
|
|
if isinstance(first, SxKeyword):
|
|
layout_name = first.name
|
|
elif isinstance(first, SxSymbol):
|
|
layout_name = first.name
|
|
elif isinstance(first, str):
|
|
layout_name = first
|
|
else:
|
|
layout_name = str(first)
|
|
# Parse keyword args — evaluate values at request time
|
|
i = 1
|
|
while i < len(raw):
|
|
k = raw[i]
|
|
if isinstance(k, SxKeyword) and i + 1 < len(raw):
|
|
raw_val = raw[i + 1]
|
|
resolved = await async_eval(raw_val, env, ctx)
|
|
layout_kwargs[k.name.replace("-", "_")] = resolved
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
else:
|
|
layout_name = str(page_def.layout)
|
|
|
|
layout = get_layout(layout_name)
|
|
if layout is not None:
|
|
header_rows = await layout.full_headers(tctx, **layout_kwargs)
|
|
oob_headers = await layout.oob_headers(tctx, **layout_kwargs)
|
|
if not menu_sx:
|
|
menu_sx = await layout.mobile_menu(tctx, **layout_kwargs)
|
|
|
|
# Branch on request type
|
|
is_htmx = is_htmx_request()
|
|
|
|
if is_htmx:
|
|
# Compute content expression deps so the server sends component
|
|
# definitions the client needs for future client-side routing
|
|
extra_deps: set[str] | None = None
|
|
if page_def.content_expr is not None and page_def.data_expr is None:
|
|
from .deps import components_needed
|
|
from .parser import serialize
|
|
try:
|
|
content_src = serialize(page_def.content_expr)
|
|
extra_deps = components_needed(content_src, get_component_env())
|
|
except Exception:
|
|
pass # non-critical — client will just fall back to server
|
|
|
|
return sx_response(await oob_page_sx(
|
|
oobs=oob_headers if oob_headers else "",
|
|
filter=filter_sx,
|
|
aside=aside_sx,
|
|
content=content_sx,
|
|
menu=menu_sx,
|
|
), extra_component_names=extra_deps)
|
|
else:
|
|
return await full_page_sx(
|
|
tctx,
|
|
header_rows=header_rows,
|
|
filter=filter_sx,
|
|
aside=aside_sx,
|
|
content=content_sx,
|
|
menu=menu_sx,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Streaming page execution (Phase 6: Streaming & Suspense)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def execute_page_streaming(
|
|
page_def: PageDef,
|
|
service_name: str,
|
|
url_params: dict[str, Any] | None = None,
|
|
):
|
|
"""Execute a page with streaming response.
|
|
|
|
Returns an async generator that yields HTML chunks:
|
|
1. HTML shell with suspense placeholders (immediate)
|
|
2. Resolution <script> tags as IO completes
|
|
3. Closing </body></html>
|
|
|
|
Each suspense placeholder renders a loading skeleton. As data and
|
|
header IO resolve, the server streams inline scripts that call
|
|
``__sxResolve(id, sx)`` to replace the placeholder content.
|
|
"""
|
|
import asyncio
|
|
from .jinja_bridge import get_component_env, _get_request_context
|
|
from .async_eval import async_eval
|
|
from .page import get_template_context
|
|
from .helpers import (
|
|
_render_to_sx, sx_page_streaming_parts,
|
|
sx_streaming_resolve_script,
|
|
)
|
|
from .parser import SxExpr, serialize as sx_serialize
|
|
from .layouts import get_layout
|
|
|
|
if url_params is None:
|
|
url_params = {}
|
|
|
|
env = dict(get_component_env())
|
|
env.update(get_page_helpers(service_name))
|
|
env.update(page_def.closure)
|
|
for key, val in url_params.items():
|
|
kebab = key.replace("_", "-")
|
|
env[kebab] = val
|
|
env[key] = val
|
|
|
|
ctx = _get_request_context()
|
|
tctx = await get_template_context()
|
|
|
|
# Build fallback expressions
|
|
if page_def.fallback_expr is not None:
|
|
fallback_sx = sx_serialize(page_def.fallback_expr)
|
|
else:
|
|
fallback_sx = (
|
|
'(div :class "p-8 animate-pulse"'
|
|
' (div :class "h-8 bg-stone-200 rounded mb-4 w-1/3")'
|
|
' (div :class "h-64 bg-stone-200 rounded"))'
|
|
)
|
|
header_fallback = '(div :class "h-12 bg-stone-200 animate-pulse")'
|
|
|
|
# Resolve layout
|
|
layout = None
|
|
layout_kwargs: dict[str, Any] = {}
|
|
if page_def.layout is not None:
|
|
if isinstance(page_def.layout, str):
|
|
layout_name = page_def.layout
|
|
elif isinstance(page_def.layout, list):
|
|
from .types import Keyword as SxKeyword, Symbol as SxSymbol
|
|
raw = page_def.layout
|
|
first = raw[0]
|
|
layout_name = (
|
|
first.name if isinstance(first, (SxKeyword, SxSymbol))
|
|
else str(first)
|
|
)
|
|
i = 1
|
|
while i < len(raw):
|
|
k = raw[i]
|
|
if isinstance(k, SxKeyword) and i + 1 < len(raw):
|
|
resolved = await async_eval(raw[i + 1], env, ctx)
|
|
layout_kwargs[k.name.replace("-", "_")] = resolved
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
else:
|
|
layout_name = str(page_def.layout)
|
|
layout = get_layout(layout_name)
|
|
|
|
# --- Concurrent IO tasks ---
|
|
|
|
async def _eval_data_and_content():
|
|
data_env = dict(env)
|
|
if page_def.data_expr is not None:
|
|
data_result = await async_eval(page_def.data_expr, data_env, ctx)
|
|
if isinstance(data_result, dict):
|
|
for k, v in data_result.items():
|
|
data_env[k.replace("_", "-")] = v
|
|
content_sx = await _eval_slot(page_def.content_expr, data_env, ctx) if page_def.content_expr else ""
|
|
filter_sx = await _eval_slot(page_def.filter_expr, data_env, ctx) if page_def.filter_expr else ""
|
|
aside_sx = await _eval_slot(page_def.aside_expr, data_env, ctx) if page_def.aside_expr else ""
|
|
menu_sx = await _eval_slot(page_def.menu_expr, data_env, ctx) if page_def.menu_expr else ""
|
|
return content_sx, filter_sx, aside_sx, menu_sx
|
|
|
|
async def _eval_headers():
|
|
if layout is None:
|
|
return "", ""
|
|
rows = await layout.full_headers(tctx, **layout_kwargs)
|
|
menu = await layout.mobile_menu(tctx, **layout_kwargs)
|
|
return rows, menu
|
|
|
|
data_task = asyncio.create_task(_eval_data_and_content())
|
|
header_task = asyncio.create_task(_eval_headers())
|
|
|
|
# --- Build initial page SX with suspense placeholders ---
|
|
|
|
initial_page_sx = await _render_to_sx("app-body",
|
|
header_rows=SxExpr(
|
|
f'(~suspense :id "stream-headers" :fallback {header_fallback})'
|
|
),
|
|
content=SxExpr(
|
|
f'(~suspense :id "stream-content" :fallback {fallback_sx})'
|
|
),
|
|
)
|
|
|
|
shell, tail = sx_page_streaming_parts(tctx, initial_page_sx)
|
|
|
|
# --- Yield initial shell + scripts ---
|
|
yield shell + tail
|
|
|
|
# --- Yield resolution chunks in completion order ---
|
|
tasks = {data_task: "data", header_task: "headers"}
|
|
pending = set(tasks.keys())
|
|
|
|
while pending:
|
|
done, pending = await asyncio.wait(
|
|
pending, return_when=asyncio.FIRST_COMPLETED,
|
|
)
|
|
for task in done:
|
|
label = tasks[task]
|
|
try:
|
|
result = task.result()
|
|
except Exception as e:
|
|
logger.error("Streaming %s task failed: %s", label, e)
|
|
continue
|
|
|
|
if label == "data":
|
|
content_sx, filter_sx, aside_sx, menu_sx = result
|
|
yield sx_streaming_resolve_script("stream-content", content_sx)
|
|
elif label == "headers":
|
|
header_rows, header_menu = result
|
|
if header_rows:
|
|
yield sx_streaming_resolve_script("stream-headers", header_rows)
|
|
|
|
yield "\n</body>\n</html>"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Blueprint mounting
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def auto_mount_pages(app: Any, service_name: str) -> None:
|
|
"""Auto-mount all registered defpages for a service directly on the app.
|
|
|
|
Pages must have absolute paths (from the service URL root).
|
|
Called once per service in app.py after setup_*_pages().
|
|
|
|
Also mounts the /sx/data/ endpoint for client-side data fetching.
|
|
"""
|
|
pages = get_all_pages(service_name)
|
|
for page_def in pages.values():
|
|
_mount_one_page(app, service_name, page_def)
|
|
logger.info("Auto-mounted %d defpages for %s", len(pages), service_name)
|
|
|
|
# Mount page data endpoint for client-side rendering of :data pages
|
|
has_data_pages = any(p.data_expr is not None for p in pages.values())
|
|
if has_data_pages:
|
|
auto_mount_page_data(app, service_name)
|
|
|
|
# Mount IO proxy endpoint for Phase 5: client-side IO primitives
|
|
mount_io_endpoint(app, service_name)
|
|
|
|
|
|
def mount_pages(bp: Any, service_name: str,
|
|
names: set[str] | list[str] | None = None) -> None:
|
|
"""Mount registered PageDef routes onto a Quart Blueprint.
|
|
|
|
For each PageDef, adds a GET route with appropriate auth/cache
|
|
decorators. Coexists with existing Python routes on the same blueprint.
|
|
|
|
If *names* is given, only mount pages whose name is in the set.
|
|
"""
|
|
from quart import make_response
|
|
|
|
pages = get_all_pages(service_name)
|
|
|
|
for page_def in pages.values():
|
|
if names is not None and page_def.name not in names:
|
|
continue
|
|
_mount_one_page(bp, service_name, page_def)
|
|
|
|
|
|
def _mount_one_page(bp: Any, service_name: str, page_def: PageDef) -> None:
|
|
"""Mount a single PageDef as a GET route on the blueprint."""
|
|
from quart import make_response, Response, stream_with_context
|
|
|
|
if page_def.stream:
|
|
# Streaming response: yields HTML chunks as IO resolves
|
|
async def page_view(**kwargs: Any) -> Any:
|
|
from shared.browser.app.utils.htmx import is_htmx_request
|
|
current = get_page(service_name, page_def.name) or page_def
|
|
# Only stream for full page loads (not SX/HTMX requests)
|
|
if is_htmx_request():
|
|
result = await execute_page(current, service_name, url_params=kwargs)
|
|
if hasattr(result, "status_code"):
|
|
return result
|
|
return await make_response(result, 200)
|
|
# Streaming response — stream_with_context preserves app/request
|
|
# context across async generator yields
|
|
gen = execute_page_streaming(current, service_name, url_params=kwargs)
|
|
return Response(stream_with_context(gen), content_type="text/html; charset=utf-8")
|
|
else:
|
|
# Standard non-streaming response
|
|
async def page_view(**kwargs: Any) -> Any:
|
|
current = get_page(service_name, page_def.name) or page_def
|
|
result = await execute_page(current, service_name, url_params=kwargs)
|
|
if hasattr(result, "status_code"):
|
|
return result
|
|
return await make_response(result, 200)
|
|
|
|
# Give the view function a unique name for Quart's routing
|
|
page_view.__name__ = f"defpage_{page_def.name.replace('-', '_')}"
|
|
page_view.__qualname__ = page_view.__name__
|
|
|
|
# Apply auth decorator
|
|
view_fn = _apply_auth(page_view, page_def.auth)
|
|
|
|
# Apply cache decorator
|
|
if page_def.cache:
|
|
view_fn = _apply_cache(view_fn, page_def.cache)
|
|
|
|
# Register the route
|
|
bp.add_url_rule(
|
|
page_def.path,
|
|
endpoint=page_view.__name__,
|
|
view_func=view_fn,
|
|
methods=["GET"],
|
|
)
|
|
logger.info("Mounted defpage %s:%s → GET %s", service_name, page_def.name, page_def.path)
|
|
|
|
|
|
def _apply_auth(fn: Any, auth: str | list) -> Any:
|
|
"""Wrap a view function with the appropriate auth decorator."""
|
|
if auth == "public":
|
|
return fn
|
|
if auth == "login":
|
|
from shared.browser.app.authz import require_login
|
|
return require_login(fn)
|
|
if auth == "admin":
|
|
from shared.browser.app.authz import require_admin
|
|
return require_admin(fn)
|
|
if auth == "post_author":
|
|
from shared.browser.app.authz import require_post_author
|
|
return require_post_author(fn)
|
|
if isinstance(auth, list) and auth and auth[0] == "rights":
|
|
from shared.browser.app.authz import require_rights
|
|
return require_rights(*auth[1:])(fn)
|
|
return fn
|
|
|
|
|
|
def _apply_cache(fn: Any, cache: dict) -> Any:
|
|
"""Wrap a view function with cache_page decorator."""
|
|
from shared.browser.app.redis_cacher import cache_page
|
|
ttl = cache.get("ttl", 0)
|
|
tag = cache.get("tag")
|
|
scope = cache.get("scope", "user")
|
|
return cache_page(ttl=ttl, tag=tag, scope=scope)(fn)
|
|
|
|
|
|
async def _check_page_auth(auth: str | list) -> Any | None:
|
|
"""Check auth for the data endpoint. Returns None if OK, or a response."""
|
|
from quart import g, abort as quart_abort
|
|
|
|
if auth == "public":
|
|
return None
|
|
user = g.get("user")
|
|
if auth == "login":
|
|
if not user:
|
|
quart_abort(401)
|
|
elif auth == "admin":
|
|
if not user or not user.get("rights", {}).get("admin"):
|
|
quart_abort(403)
|
|
elif isinstance(auth, list) and auth and auth[0] == "rights":
|
|
if not user:
|
|
quart_abort(401)
|
|
user_rights = set(user.get("rights", {}).keys())
|
|
required = set(auth[1:])
|
|
if not required.issubset(user_rights):
|
|
quart_abort(403)
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Page data endpoint — evaluate :data expression, return SX
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def evaluate_page_data(
|
|
page_def: PageDef,
|
|
service_name: str,
|
|
url_params: dict[str, Any] | None = None,
|
|
) -> str:
|
|
"""Evaluate a defpage's :data expression and return result as SX.
|
|
|
|
This is the data-only counterpart to execute_page(). The client
|
|
fetches this when it has all component definitions but needs the
|
|
data bindings to render a :data page client-side.
|
|
|
|
Returns SX wire format (e.g. ``{:posts (list ...) :count 42}``),
|
|
parsed by the client's SX parser and merged into the eval env.
|
|
"""
|
|
from .jinja_bridge import get_component_env, _get_request_context
|
|
from .async_eval import async_eval
|
|
from .parser import serialize
|
|
|
|
if page_def.data_expr is None:
|
|
return "nil"
|
|
|
|
if url_params is None:
|
|
url_params = {}
|
|
|
|
# Build environment (same as execute_page)
|
|
env = dict(get_component_env())
|
|
env.update(get_page_helpers(service_name))
|
|
env.update(page_def.closure)
|
|
|
|
for key, val in url_params.items():
|
|
kebab = key.replace("_", "-")
|
|
env[kebab] = val
|
|
env[key] = val
|
|
|
|
ctx = _get_request_context()
|
|
|
|
data_result = await async_eval(page_def.data_expr, env, ctx)
|
|
|
|
# Kebab-case dict keys (matching execute_page line 214-215)
|
|
if isinstance(data_result, dict):
|
|
data_result = {
|
|
k.replace("_", "-"): v for k, v in data_result.items()
|
|
}
|
|
|
|
# Serialize the result as SX
|
|
return serialize(data_result)
|
|
|
|
|
|
def auto_mount_page_data(app: Any, service_name: str) -> None:
|
|
"""Mount a single /sx/data/ endpoint that serves page data as SX.
|
|
|
|
For each defpage with :data, the client can GET /sx/data/<page-name>
|
|
(with URL params as query args) and receive the evaluated :data
|
|
result serialized as SX wire format (text/sx).
|
|
|
|
Auth is enforced per-page: the endpoint looks up the page's auth
|
|
setting and checks it before evaluating the data expression.
|
|
"""
|
|
from quart import make_response, request, abort as quart_abort
|
|
|
|
async def page_data_view(page_name: str) -> Any:
|
|
page_def = get_page(service_name, page_name)
|
|
if page_def is None:
|
|
quart_abort(404)
|
|
|
|
if page_def.data_expr is None:
|
|
quart_abort(404)
|
|
|
|
# Check auth — same enforcement as the page route itself
|
|
auth_error = await _check_page_auth(page_def.auth)
|
|
if auth_error is not None:
|
|
return auth_error
|
|
|
|
# Extract URL params from query string
|
|
url_params = dict(request.args)
|
|
|
|
result_sx = await evaluate_page_data(
|
|
page_def, service_name, url_params=url_params,
|
|
)
|
|
|
|
resp = await make_response(result_sx, 200)
|
|
resp.content_type = "text/sx; charset=utf-8"
|
|
return resp
|
|
|
|
page_data_view.__name__ = "sx_page_data"
|
|
page_data_view.__qualname__ = "sx_page_data"
|
|
|
|
app.add_url_rule(
|
|
"/sx/data/<page_name>",
|
|
endpoint="sx_page_data",
|
|
view_func=page_data_view,
|
|
methods=["GET"],
|
|
)
|
|
logger.info("Mounted page data endpoint for %s at /sx/data/<page_name>", service_name)
|
|
|
|
|
|
def mount_io_endpoint(app: Any, service_name: str) -> None:
|
|
"""Mount /sx/io/<name> endpoint for client-side IO primitive calls.
|
|
|
|
The client can call any allowed IO primitive or page helper via GET/POST.
|
|
Result is returned as SX wire format (text/sx).
|
|
|
|
Falls back to page helpers when the name isn't a global IO primitive,
|
|
so service-specific functions like ``highlight`` work via the proxy.
|
|
"""
|
|
import asyncio as _asyncio
|
|
from quart import make_response, request, abort as quart_abort
|
|
from .primitives_io import IO_PRIMITIVES, execute_io
|
|
from .jinja_bridge import _get_request_context
|
|
from .parser import serialize
|
|
|
|
# Build allowlist from all component IO refs across this service
|
|
from .jinja_bridge import _COMPONENT_ENV
|
|
from .types import Component as _Comp
|
|
_ALLOWED_IO: set[str] = set()
|
|
for _val in _COMPONENT_ENV.values():
|
|
if isinstance(_val, _Comp) and _val.io_refs:
|
|
_ALLOWED_IO.update(_val.io_refs)
|
|
|
|
from shared.browser.app.csrf import csrf_exempt
|
|
|
|
@csrf_exempt
|
|
async def io_proxy(name: str) -> Any:
|
|
if name not in _ALLOWED_IO:
|
|
quart_abort(403)
|
|
|
|
# Parse args from query string or JSON body
|
|
args: list = []
|
|
kwargs: dict = {}
|
|
if request.method == "GET":
|
|
for k, v in request.args.items():
|
|
if k.startswith("_arg"):
|
|
args.append(v)
|
|
else:
|
|
kwargs[k] = v
|
|
else:
|
|
data = await request.get_json(silent=True) or {}
|
|
args = data.get("args", [])
|
|
kwargs = data.get("kwargs", {})
|
|
|
|
# Try global IO primitives first
|
|
if name in IO_PRIMITIVES:
|
|
ctx = _get_request_context()
|
|
result = await execute_io(name, args, kwargs, ctx)
|
|
else:
|
|
# Fall back to page helpers (service-specific functions)
|
|
helpers = get_page_helpers(service_name)
|
|
helper_fn = helpers.get(name)
|
|
if helper_fn is None:
|
|
quart_abort(404)
|
|
result = helper_fn(*args, **kwargs) if kwargs else helper_fn(*args)
|
|
if _asyncio.iscoroutine(result):
|
|
result = await result
|
|
|
|
result_sx = serialize(result) if result is not None else "nil"
|
|
resp = await make_response(result_sx, 200)
|
|
resp.content_type = "text/sx; charset=utf-8"
|
|
resp.headers["Cache-Control"] = "public, max-age=300"
|
|
return resp
|
|
|
|
io_proxy.__name__ = "sx_io_proxy"
|
|
io_proxy.__qualname__ = "sx_io_proxy"
|
|
|
|
app.add_url_rule(
|
|
"/sx/io/<name>",
|
|
endpoint="sx_io_proxy",
|
|
view_func=io_proxy,
|
|
methods=["GET", "POST"],
|
|
)
|
|
logger.info("Mounted IO proxy for %s: %s", service_name, sorted(_ALLOWED_IO))
|