Major architectural change: page function dispatch and handler execution
now go through the OCaml kernel instead of the Python bootstrapped evaluator.
OCaml integration:
- Page dispatch: bridge.eval() evaluates SX URL expressions (geography, marshes, etc.)
- Handler aser: bridge.aser() serializes handler responses as SX wire format
- _ensure_components loads all .sx files into OCaml kernel (spec, web adapter, handlers)
- defhandler/defpage registered as no-op special forms so handler files load
- helper IO primitive dispatches to Python page helpers + IO handlers
- ok-raw response format for SX wire format (no double-escaping)
- Natural list serialization in eval (no (list ...) wrapper)
- Clean pipe: _read_until_ok always sends io-response on error
SX adapter (aser):
- scope-emit!/scope-peek aliases to avoid CEK special form conflict
- aser-fragment/aser-call: strings starting with "(" pass through unserialized
- Registered cond-scheme?, is-else-clause?, primitive?, get-primitive in kernel
- random-int, parse-int as kernel primitives; json-encode, into via IO bridge
Handler migration:
- All IO calls converted to (helper "name" args...) pattern
- request-arg, request-form, state-get, state-set!, now, component-source etc.
- Fixed bare (effect ...) in island bodies leaking disposer functions as text
- Fixed lower-case → lower, ~search-results → ~examples/search-results
Reactive islands:
- sx-hydrate-islands called after client-side navigation swap
- force-dispose-islands-in for outerHTML swaps (clears hydration markers)
- clear-processed! platform primitive for re-hydration
Content restructuring:
- Design, event bridge, named stores, phase 2 consolidated into reactive overview
- Marshes split into overview + 5 example sub-pages
- Nav links use sx-get/sx-target for client-side navigation
Playwright test suite (sx/tests/test_demos.py):
- 83 tests covering hypermedia demos, reactive islands, marshes, spec explorer
- Server-side rendering, handler interactions, island hydration, navigation
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
749 lines
24 KiB
Python
749 lines
24 KiB
Python
"""
|
|
Async I/O primitives for the s-expression resolver.
|
|
|
|
These wrap rose-ash's inter-service communication layer so that s-expressions
|
|
can fetch fragments, query data, call actions, and access request context.
|
|
|
|
Unlike pure primitives (primitives.py), these are **async** and are executed
|
|
by the resolver rather than the evaluator. They are identified by name
|
|
during the tree-walk phase and dispatched via ``asyncio.gather()``.
|
|
|
|
Usage in s-expressions::
|
|
|
|
(frag "blog" "link-card" :slug "apple")
|
|
(query "market" "products-by-ids" :ids "1,2,3")
|
|
(action "market" "create-marketplace" :name "Farm Shop" :slug "farm")
|
|
(current-user)
|
|
(htmx-request?)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import contextvars
|
|
from typing import Any
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Registry of async IO handlers (name → coroutine)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_IO_HANDLERS: dict[str, Any] = {}
|
|
|
|
|
|
def register_io_handler(name: str):
|
|
"""Decorator that registers an async function as an IO handler."""
|
|
def decorator(fn):
|
|
_IO_HANDLERS[name] = fn
|
|
return fn
|
|
return decorator
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Request context (set per-request by the resolver)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_handler_service: contextvars.ContextVar[Any] = contextvars.ContextVar(
|
|
"_handler_service", default=None
|
|
)
|
|
|
|
_response_meta: contextvars.ContextVar[dict | None] = contextvars.ContextVar(
|
|
"_response_meta", default=None
|
|
)
|
|
|
|
# Ephemeral per-process state — resets on restart. For demos/testing only.
|
|
_ephemeral_state: dict[str, Any] = {}
|
|
|
|
|
|
def set_handler_service(service_obj: Any) -> None:
|
|
"""Bind the local domain service for ``(service ...)`` primitive calls."""
|
|
_handler_service.set(service_obj)
|
|
|
|
|
|
def get_handler_service() -> Any:
|
|
"""Get the currently bound handler service, or None."""
|
|
return _handler_service.get(None)
|
|
|
|
|
|
def reset_response_meta() -> None:
|
|
"""Reset response meta for a new request."""
|
|
_response_meta.set(None)
|
|
|
|
|
|
def get_response_meta() -> dict | None:
|
|
"""Get response meta (headers/status) set by handler IO primitives."""
|
|
return _response_meta.get(None)
|
|
|
|
|
|
class RequestContext:
|
|
"""Per-request context provided to I/O primitives."""
|
|
__slots__ = ("user", "is_htmx", "extras")
|
|
|
|
def __init__(
|
|
self,
|
|
user: dict[str, Any] | None = None,
|
|
is_htmx: bool = False,
|
|
extras: dict[str, Any] | None = None,
|
|
):
|
|
self.user = user
|
|
self.is_htmx = is_htmx
|
|
self.extras = extras or {}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# I/O dispatch
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def execute_io(
|
|
name: str,
|
|
args: list[Any],
|
|
kwargs: dict[str, Any],
|
|
ctx: RequestContext,
|
|
) -> Any:
|
|
"""Execute an I/O primitive by name."""
|
|
handler = _IO_HANDLERS.get(name)
|
|
if handler is None:
|
|
raise RuntimeError(f"Unknown I/O primitive: {name}")
|
|
return await handler(args, kwargs, ctx)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _clean_kwargs(kwargs: dict[str, Any]) -> dict[str, Any]:
|
|
"""Strip None and NIL values from kwargs for Python interop."""
|
|
from .types import NIL
|
|
return {k: v for k, v in kwargs.items() if v is not None and v is not NIL}
|
|
|
|
|
|
def _dto_to_dict(obj: Any) -> dict[str, Any]:
|
|
"""Convert a DTO/dataclass/namedtuple to a plain dict."""
|
|
if hasattr(obj, "__dataclass_fields__"):
|
|
from shared.contracts.dtos import dto_to_dict
|
|
return dto_to_dict(obj)
|
|
elif hasattr(obj, "_asdict"):
|
|
d = dict(obj._asdict())
|
|
elif hasattr(obj, "__dict__"):
|
|
d = {k: v for k, v in obj.__dict__.items() if not k.startswith("_")}
|
|
else:
|
|
return {"value": obj}
|
|
for key, val in list(d.items()):
|
|
if hasattr(val, "year") and hasattr(val, "strftime"):
|
|
d[f"{key}_year"] = val.year
|
|
d[f"{key}_month"] = val.month
|
|
d[f"{key}_day"] = val.day
|
|
return d
|
|
|
|
|
|
def _convert_result(result: Any) -> Any:
|
|
"""Convert a service method result for sx consumption."""
|
|
if result is None:
|
|
from .types import NIL
|
|
return NIL
|
|
if isinstance(result, (int, float, str, bool)):
|
|
return result
|
|
if hasattr(result, "isoformat") and callable(result.isoformat):
|
|
return result.isoformat()
|
|
if isinstance(result, dict):
|
|
return {k: _convert_result(v) for k, v in result.items()}
|
|
if isinstance(result, tuple):
|
|
return [_convert_result(item) for item in result]
|
|
if hasattr(result, "__dataclass_fields__") or hasattr(result, "_asdict"):
|
|
return _dto_to_dict(result)
|
|
if isinstance(result, list):
|
|
return [
|
|
_dto_to_dict(item)
|
|
if hasattr(item, "__dataclass_fields__") or hasattr(item, "_asdict")
|
|
else _convert_result(item)
|
|
for item in result
|
|
]
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Generic IO handlers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@register_io_handler("frag")
|
|
async def _io_frag(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> str:
|
|
"""``(frag "service" "type" :key val ...)`` → fetch_fragment."""
|
|
if len(args) < 2:
|
|
raise ValueError("frag requires service and fragment type")
|
|
service = str(args[0])
|
|
frag_type = str(args[1])
|
|
params = _clean_kwargs(kwargs)
|
|
from shared.infrastructure.fragments import fetch_fragment
|
|
return await fetch_fragment(service, frag_type, params=params or None)
|
|
|
|
|
|
@register_io_handler("query")
|
|
async def _io_query(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> Any:
|
|
"""``(query "service" "query-name" :key val ...)`` → fetch_data."""
|
|
if len(args) < 2:
|
|
raise ValueError("query requires service and query name")
|
|
service = str(args[0])
|
|
query_name = str(args[1])
|
|
params = _clean_kwargs(kwargs)
|
|
from shared.infrastructure.data_client import fetch_data
|
|
return await fetch_data(service, query_name, params=params or None)
|
|
|
|
|
|
@register_io_handler("action")
|
|
async def _io_action(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> Any:
|
|
"""``(action "service" "action-name" :key val ...)`` → call_action."""
|
|
if len(args) < 2:
|
|
raise ValueError("action requires service and action name")
|
|
service = str(args[0])
|
|
action_name = str(args[1])
|
|
payload = _clean_kwargs(kwargs)
|
|
from shared.infrastructure.actions import call_action
|
|
return await call_action(service, action_name, payload=payload or None)
|
|
|
|
|
|
@register_io_handler("current-user")
|
|
async def _io_current_user(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> dict[str, Any] | None:
|
|
"""``(current-user)`` → user dict from request context."""
|
|
return ctx.user
|
|
|
|
|
|
@register_io_handler("htmx-request?")
|
|
async def _io_htmx_request(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> bool:
|
|
"""``(htmx-request?)`` → True if HX-Request header present."""
|
|
return ctx.is_htmx
|
|
|
|
|
|
@register_io_handler("service")
|
|
async def _io_service(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> Any:
|
|
"""``(service "svc-name" "method-name" :key val ...)`` → call domain service."""
|
|
if not args:
|
|
raise ValueError("service requires at least a method name")
|
|
|
|
if len(args) >= 2:
|
|
from shared.services.registry import services as svc_registry
|
|
svc_name = str(args[0]).replace("-", "_")
|
|
svc = getattr(svc_registry, svc_name, None)
|
|
if svc is None:
|
|
raise RuntimeError(f"No service registered as: {svc_name}")
|
|
method_name = str(args[1]).replace("-", "_")
|
|
else:
|
|
svc = get_handler_service()
|
|
if svc is None:
|
|
raise RuntimeError(
|
|
"No handler service bound — cannot call (service ...)")
|
|
method_name = str(args[0]).replace("-", "_")
|
|
|
|
method = getattr(svc, method_name, None)
|
|
if method is None:
|
|
raise RuntimeError(f"Service has no method: {method_name}")
|
|
|
|
from .types import NIL
|
|
clean_kwargs = {
|
|
k.replace("-", "_"): (None if v is NIL else v)
|
|
for k, v in kwargs.items()
|
|
}
|
|
from quart import g
|
|
result = await method(g.s, **clean_kwargs)
|
|
|
|
return _convert_result(result)
|
|
|
|
|
|
@register_io_handler("request-arg")
|
|
async def _io_request_arg(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> Any:
|
|
"""``(request-arg "name" default?)`` → request.args.get(name, default)."""
|
|
if not args:
|
|
raise ValueError("request-arg requires a name")
|
|
from quart import request
|
|
name = str(args[0])
|
|
default = args[1] if len(args) > 1 else None
|
|
return request.args.get(name, default)
|
|
|
|
|
|
@register_io_handler("request-path")
|
|
async def _io_request_path(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> str:
|
|
"""``(request-path)`` → request.path."""
|
|
from quart import request
|
|
return request.path
|
|
|
|
|
|
@register_io_handler("nav-tree")
|
|
async def _io_nav_tree(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> list[dict[str, Any]]:
|
|
"""``(nav-tree)`` → list of navigation menu node dicts."""
|
|
from quart import g
|
|
from shared.services.navigation import get_navigation_tree
|
|
nodes = await get_navigation_tree(g.s)
|
|
return [_dto_to_dict(node) for node in nodes]
|
|
|
|
|
|
@register_io_handler("get-children")
|
|
async def _io_get_children(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> list[dict[str, Any]]:
|
|
"""``(get-children :parent-type "page" :parent-id 1 ...)``"""
|
|
from quart import g
|
|
from shared.services.relationships import get_children
|
|
clean = {k.replace("-", "_"): v for k, v in kwargs.items()}
|
|
children = await get_children(g.s, **clean)
|
|
return [_dto_to_dict(child) for child in children]
|
|
|
|
|
|
@register_io_handler("g")
|
|
async def _io_g(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> Any:
|
|
"""``(g "key")`` → getattr(g, key, None)."""
|
|
from quart import g
|
|
key = str(args[0]).replace("-", "_") if args else ""
|
|
return getattr(g, key, None)
|
|
|
|
|
|
@register_io_handler("now")
|
|
async def _io_now(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> str:
|
|
"""``(now)`` or ``(now "%H:%M:%S")`` → formatted timestamp string."""
|
|
from datetime import datetime
|
|
fmt = str(args[0]) if args else None
|
|
dt = datetime.now()
|
|
return dt.strftime(fmt) if fmt else dt.isoformat()
|
|
|
|
|
|
@register_io_handler("sleep")
|
|
async def _io_sleep(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> Any:
|
|
"""``(sleep 800)`` → pause for 800ms."""
|
|
import asyncio
|
|
from .types import NIL
|
|
if not args:
|
|
raise ValueError("sleep requires milliseconds")
|
|
ms = int(args[0])
|
|
await asyncio.sleep(ms / 1000.0)
|
|
return NIL
|
|
|
|
|
|
@register_io_handler("request-form")
|
|
async def _io_request_form(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> Any:
|
|
"""``(request-form "name" default?)`` → read a form field."""
|
|
if not args:
|
|
raise ValueError("request-form requires a field name")
|
|
from quart import request
|
|
from .types import NIL
|
|
name = str(args[0])
|
|
default = args[1] if len(args) > 1 else NIL
|
|
form = await request.form
|
|
return form.get(name, default)
|
|
|
|
|
|
@register_io_handler("request-json")
|
|
async def _io_request_json(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> Any:
|
|
"""``(request-json)`` → JSON body as dict, or nil."""
|
|
from quart import request
|
|
from .types import NIL
|
|
data = await request.get_json(silent=True)
|
|
return data if data is not None else NIL
|
|
|
|
|
|
@register_io_handler("request-header")
|
|
async def _io_request_header(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> Any:
|
|
"""``(request-header "name" default?)`` → request header value."""
|
|
if not args:
|
|
raise ValueError("request-header requires a header name")
|
|
from quart import request
|
|
from .types import NIL
|
|
name = str(args[0])
|
|
default = args[1] if len(args) > 1 else NIL
|
|
return request.headers.get(name, default)
|
|
|
|
|
|
@register_io_handler("request-content-type")
|
|
async def _io_request_content_type(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> Any:
|
|
"""``(request-content-type)`` → content-type string or nil."""
|
|
from quart import request
|
|
from .types import NIL
|
|
return request.content_type or NIL
|
|
|
|
|
|
@register_io_handler("request-args-all")
|
|
async def _io_request_args_all(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> dict:
|
|
"""``(request-args-all)`` → all query params as dict."""
|
|
from quart import request
|
|
return dict(request.args)
|
|
|
|
|
|
@register_io_handler("request-form-all")
|
|
async def _io_request_form_all(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> dict:
|
|
"""``(request-form-all)`` → all form fields as dict."""
|
|
from quart import request
|
|
form = await request.form
|
|
return dict(form)
|
|
|
|
|
|
@register_io_handler("request-form-list")
|
|
async def _io_request_form_list(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> list:
|
|
"""``(request-form-list "field")`` → all values for a multi-value form field."""
|
|
if not args:
|
|
raise ValueError("request-form-list requires a field name")
|
|
from quart import request
|
|
form = await request.form
|
|
return form.getlist(str(args[0]))
|
|
|
|
|
|
@register_io_handler("request-headers-all")
|
|
async def _io_request_headers_all(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> dict:
|
|
"""``(request-headers-all)`` → all headers as dict (lowercase keys)."""
|
|
from quart import request
|
|
return {k.lower(): v for k, v in request.headers}
|
|
|
|
|
|
@register_io_handler("request-file-name")
|
|
async def _io_request_file_name(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> Any:
|
|
"""``(request-file-name "field")`` → filename or nil."""
|
|
if not args:
|
|
raise ValueError("request-file-name requires a field name")
|
|
from quart import request
|
|
from .types import NIL
|
|
files = await request.files
|
|
f = files.get(str(args[0]))
|
|
return f.filename if f else NIL
|
|
|
|
|
|
@register_io_handler("set-response-header")
|
|
async def _io_set_response_header(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> Any:
|
|
"""``(set-response-header "Name" "value")`` → set on response after handler."""
|
|
if len(args) < 2:
|
|
raise ValueError("set-response-header requires name and value")
|
|
from .types import NIL
|
|
meta = _response_meta.get(None)
|
|
if meta is None:
|
|
meta = {"headers": {}, "status": None}
|
|
_response_meta.set(meta)
|
|
meta["headers"][str(args[0])] = str(args[1])
|
|
return NIL
|
|
|
|
|
|
@register_io_handler("set-response-status")
|
|
async def _io_set_response_status(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> Any:
|
|
"""``(set-response-status 503)`` → set status code on response."""
|
|
if not args:
|
|
raise ValueError("set-response-status requires a status code")
|
|
from .types import NIL
|
|
meta = _response_meta.get(None)
|
|
if meta is None:
|
|
meta = {"headers": {}, "status": None}
|
|
_response_meta.set(meta)
|
|
meta["status"] = int(args[0])
|
|
return NIL
|
|
|
|
|
|
@register_io_handler("state-get")
|
|
async def _io_state_get(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> Any:
|
|
"""``(state-get "key" default?)`` → read from ephemeral state."""
|
|
if not args:
|
|
raise ValueError("state-get requires a key")
|
|
from .types import NIL
|
|
key = str(args[0])
|
|
default = args[1] if len(args) > 1 else NIL
|
|
return _ephemeral_state.get(key, default)
|
|
|
|
|
|
@register_io_handler("state-set!")
|
|
async def _io_state_set(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> Any:
|
|
"""``(state-set! "key" value)`` → write to ephemeral state."""
|
|
if len(args) < 2:
|
|
raise ValueError("state-set! requires key and value")
|
|
from .types import NIL
|
|
_ephemeral_state[str(args[0])] = args[1]
|
|
return NIL
|
|
|
|
|
|
@register_io_handler("csrf-token")
|
|
async def _io_csrf_token(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> str:
|
|
"""``(csrf-token)`` → current CSRF token string."""
|
|
from quart import current_app
|
|
csrf = current_app.jinja_env.globals.get("csrf_token")
|
|
if callable(csrf):
|
|
return csrf()
|
|
return ""
|
|
|
|
|
|
@register_io_handler("abort")
|
|
async def _io_abort(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> Any:
|
|
"""``(abort 403 "message")`` — raise HTTP error from SX."""
|
|
if not args:
|
|
raise ValueError("abort requires a status code")
|
|
from quart import abort
|
|
status = int(args[0])
|
|
message = str(args[1]) if len(args) > 1 else ""
|
|
abort(status, message)
|
|
|
|
|
|
@register_io_handler("url-for")
|
|
async def _io_url_for(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> str:
|
|
"""``(url-for "endpoint" :key val ...)`` → url_for(endpoint, **kwargs)."""
|
|
if not args:
|
|
raise ValueError("url-for requires an endpoint name")
|
|
from quart import url_for
|
|
endpoint = str(args[0])
|
|
clean = {k.replace("-", "_"): v for k, v in _clean_kwargs(kwargs).items()}
|
|
for k, v in clean.items():
|
|
if isinstance(v, str) and v.isdigit():
|
|
clean[k] = int(v)
|
|
return url_for(endpoint, **clean)
|
|
|
|
|
|
@register_io_handler("route-prefix")
|
|
async def _io_route_prefix(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> str:
|
|
"""``(route-prefix)`` → current route prefix string."""
|
|
from shared.utils import route_prefix
|
|
return route_prefix()
|
|
|
|
|
|
@register_io_handler("request-view-args")
|
|
async def _io_request_view_args(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> Any:
|
|
"""``(request-view-args "key")`` → request.view_args[key]."""
|
|
if not args:
|
|
raise ValueError("request-view-args requires a key")
|
|
from quart import request
|
|
key = str(args[0])
|
|
return (request.view_args or {}).get(key)
|
|
|
|
|
|
@register_io_handler("app-url")
|
|
async def _io_app_url(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> str:
|
|
"""``(app-url "blog" "/my-post/")`` → full URL for service."""
|
|
if not args:
|
|
raise ValueError("app-url requires a service name")
|
|
from shared.infrastructure.urls import app_url
|
|
service = str(args[0])
|
|
path = str(args[1]) if len(args) > 1 else "/"
|
|
return app_url(service, path)
|
|
|
|
|
|
@register_io_handler("asset-url")
|
|
async def _io_asset_url(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> str:
|
|
"""``(asset-url "/img/logo.png")`` → versioned static URL."""
|
|
from quart import current_app
|
|
asset_url = current_app.jinja_env.globals.get("asset_url")
|
|
if asset_url is None:
|
|
raise RuntimeError("asset_url Jinja global not registered")
|
|
path = str(args[0]) if args else ""
|
|
return asset_url(path)
|
|
|
|
|
|
@register_io_handler("config")
|
|
async def _io_config(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> Any:
|
|
"""``(config "key")`` → shared.config.config()[key]."""
|
|
if not args:
|
|
raise ValueError("config requires a key")
|
|
from shared.config import config
|
|
cfg = config()
|
|
return cfg.get(str(args[0]))
|
|
|
|
|
|
@register_io_handler("jinja-global")
|
|
async def _io_jinja_global(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> Any:
|
|
"""``(jinja-global "key")`` → current_app.jinja_env.globals[key]."""
|
|
if not args:
|
|
raise ValueError("jinja-global requires a key")
|
|
from quart import current_app
|
|
key = str(args[0])
|
|
default = args[1] if len(args) > 1 else None
|
|
return current_app.jinja_env.globals.get(key, default)
|
|
|
|
|
|
@register_io_handler("relations-from")
|
|
async def _io_relations_from(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> list[dict]:
|
|
"""``(relations-from "page")`` → list of RelationDef dicts."""
|
|
if not args:
|
|
raise ValueError("relations-from requires an entity type")
|
|
from shared.sx.relations import relations_from
|
|
return [
|
|
{
|
|
"name": d.name, "from_type": d.from_type, "to_type": d.to_type,
|
|
"cardinality": d.cardinality, "nav": d.nav,
|
|
"nav_icon": d.nav_icon, "nav_label": d.nav_label,
|
|
}
|
|
for d in relations_from(str(args[0]))
|
|
]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Import context handlers (registers into _IO_HANDLERS via decorator)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
from . import primitives_ctx # noqa: E402, F401
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Auto-derive IO_PRIMITIVES from registered handlers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Placeholder — rebuilt at end of file after all handlers are registered
|
|
IO_PRIMITIVES: frozenset[str] = frozenset()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Sync IO bridge primitives
|
|
#
|
|
# These are declared in boundary.sx (I/O tier), NOT primitives.sx.
|
|
# They must be evaluator-visible because they're called inline in .sx
|
|
# code (inside let, filter, etc.) where the async IO interceptor can't
|
|
# reach them — particularly in sx_ref.py which only intercepts IO at
|
|
# the top level.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
from .primitives import _PRIMITIVES # noqa: E402
|
|
|
|
|
|
def _bridge_app_url(service, *path_parts):
|
|
from shared.infrastructure.urls import app_url
|
|
path = str(path_parts[0]) if path_parts else "/"
|
|
return app_url(str(service), path)
|
|
|
|
def _bridge_asset_url(*path_parts):
|
|
from quart import current_app
|
|
asset_url = current_app.jinja_env.globals.get("asset_url")
|
|
if asset_url is None:
|
|
raise RuntimeError("asset_url Jinja global not registered")
|
|
path = str(path_parts[0]) if path_parts else ""
|
|
return asset_url(path)
|
|
|
|
def _bridge_config(key):
|
|
from shared.config import config
|
|
cfg = config()
|
|
return cfg.get(str(key))
|
|
|
|
def _bridge_jinja_global(key, *default):
|
|
from quart import current_app
|
|
d = default[0] if default else None
|
|
return current_app.jinja_env.globals.get(str(key), d)
|
|
|
|
def _bridge_relations_from(entity_type):
|
|
from shared.sx.relations import relations_from
|
|
return [
|
|
{
|
|
"name": d.name, "from_type": d.from_type, "to_type": d.to_type,
|
|
"cardinality": d.cardinality, "nav": d.nav,
|
|
"nav_icon": d.nav_icon, "nav_label": d.nav_label,
|
|
}
|
|
for d in relations_from(str(entity_type))
|
|
]
|
|
|
|
_PRIMITIVES["app-url"] = _bridge_app_url
|
|
_PRIMITIVES["asset-url"] = _bridge_asset_url
|
|
_PRIMITIVES["config"] = _bridge_config
|
|
_PRIMITIVES["jinja-global"] = _bridge_jinja_global
|
|
_PRIMITIVES["relations-from"] = _bridge_relations_from
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Validate all IO handlers against boundary.sx
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@register_io_handler("helper")
|
|
async def _io_helper(
|
|
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
|
|
) -> Any:
|
|
"""``(helper "name" args...)`` → dispatch to page helpers or IO handlers.
|
|
|
|
Universal IO dispatcher — same interface as the OCaml kernel's helper
|
|
IO primitive. Checks page helpers first, then IO handlers.
|
|
"""
|
|
if not args:
|
|
raise ValueError("helper requires a name")
|
|
name = str(args[0])
|
|
helper_args = args[1:]
|
|
|
|
# Check page helpers first
|
|
from .pages import get_page_helpers
|
|
helpers = get_page_helpers("sx")
|
|
fn = helpers.get(name)
|
|
if fn is not None:
|
|
import asyncio
|
|
result = fn(*helper_args)
|
|
if asyncio.iscoroutine(result):
|
|
result = await result
|
|
return result
|
|
|
|
# Fall back to IO handlers
|
|
io_handler = _IO_HANDLERS.get(name)
|
|
if io_handler is not None:
|
|
return await io_handler(helper_args, {}, ctx)
|
|
|
|
raise ValueError(f"Unknown helper: {name!r}")
|
|
|
|
|
|
def _validate_io_handlers() -> None:
|
|
from .boundary import validate_io
|
|
for name in _IO_HANDLERS:
|
|
validate_io(name)
|
|
|
|
_validate_io_handlers()
|
|
|
|
# Rebuild IO_PRIMITIVES now that all handlers (including helper) are registered
|
|
IO_PRIMITIVES = frozenset(_IO_HANDLERS.keys())
|