Files
rose-ash/shared/sx/handlers.py
giles ee868f686b Migrate 6 reactive demo handlers from Python f-strings to SX defhandlers
Moved flash-sale, settle-data, search-products/events/posts, and catalog
endpoints from bp/pages/routes.py into sx/sx/handlers/reactive-api.sx.
routes.py now contains only the SSE endpoint (async generators need Python).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 13:26:25 +00:00

355 lines
12 KiB
Python

"""
Declarative handler registry and blueprint factory.
Supports ``defhandler`` s-expressions that define fragment handlers
in .sx files instead of Python. Each handler is a self-contained
s-expression with a bounded primitive vocabulary, providing a clear
security boundary and AI legibility.
Usage::
from shared.sx.handlers import create_handler_blueprint, load_handler_file
# Load handler definitions from .sx files
load_handler_file("blog/sx/handlers/link-card.sx", "blog")
# Create a blueprint that dispatches to both sx and Python handlers
bp = create_handler_blueprint("blog")
bp.add_python_handler("nav-tree", nav_tree_handler_fn)
"""
from __future__ import annotations
import logging
import os
from typing import Any, Callable, Awaitable
from werkzeug.routing import BaseConverter
from .types import HandlerDef
logger = logging.getLogger("sx.handlers")
class SxAtomConverter(BaseConverter):
"""URL converter for SX atoms inside expression URLs.
Matches a single atom — stops at dots, parens, slashes, and query chars.
Use as ``<sx:param>`` in route patterns like::
/(geography.(hypermedia.(reference.(api.(item.<sx:item_id>)))))
"""
regex = r"[^./)(?&= ]+"
# ---------------------------------------------------------------------------
# Registry — service → handler-name → HandlerDef
# ---------------------------------------------------------------------------
_HANDLER_REGISTRY: dict[str, dict[str, HandlerDef]] = {}
def register_handler(service: str, handler_def: HandlerDef) -> None:
"""Register a handler definition for a service."""
if service not in _HANDLER_REGISTRY:
_HANDLER_REGISTRY[service] = {}
_HANDLER_REGISTRY[service][handler_def.name] = handler_def
logger.debug("Registered handler %s:%s", service, handler_def.name)
def get_handler(service: str, name: str) -> HandlerDef | None:
"""Look up a registered handler by service and name."""
return _HANDLER_REGISTRY.get(service, {}).get(name)
def get_all_handlers(service: str) -> dict[str, HandlerDef]:
"""Return all handlers for a service."""
return dict(_HANDLER_REGISTRY.get(service, {}))
def clear_handlers(service: str | None = None) -> None:
"""Clear handler registry. If service given, clear only that service."""
if service is None:
_HANDLER_REGISTRY.clear()
else:
_HANDLER_REGISTRY.pop(service, None)
# ---------------------------------------------------------------------------
# Loading — parse .sx files and collect HandlerDef instances
# ---------------------------------------------------------------------------
def load_handler_file(filepath: str, service_name: str) -> list[HandlerDef]:
"""Parse an .sx file, evaluate it, and register any HandlerDef values."""
from .parser import parse_all
import os
from .ref.sx_ref import eval_expr as _raw_eval, trampoline as _trampoline
_eval = lambda expr, env: _trampoline(_raw_eval(expr, env))
from .jinja_bridge import get_component_env
with open(filepath, encoding="utf-8") as f:
source = f.read()
# Seed env with component definitions so handlers can reference components
env = dict(get_component_env())
exprs = parse_all(source)
handlers: list[HandlerDef] = []
for expr in exprs:
_eval(expr, env)
# Collect all HandlerDef values from the env
for key, val in env.items():
if isinstance(val, HandlerDef):
register_handler(service_name, val)
handlers.append(val)
return handlers
def load_handler_dir(directory: str, service_name: str) -> list[HandlerDef]:
"""Load all .sx files from a directory and register handlers."""
import glob as glob_mod
handlers: list[HandlerDef] = []
for filepath in sorted(glob_mod.glob(os.path.join(directory, "*.sx"))):
handlers.extend(load_handler_file(filepath, service_name))
return handlers
# ---------------------------------------------------------------------------
# Handler execution
# ---------------------------------------------------------------------------
async def execute_handler(
handler_def: HandlerDef,
service_name: str,
args: dict[str, str] | None = None,
) -> str:
"""Execute a declarative handler and return SX wire format (``SxExpr``).
Uses the async evaluator so I/O primitives (``query``, ``service``,
``request-arg``, etc.) are awaited inline within control flow.
Returns ``SxExpr`` — pre-built sx source. Callers like
``fetch_fragment`` check ``content-type: text/sx`` and wrap the
response in ``SxExpr`` when consuming cross-service fragments.
1. Build env from component env + handler closure
2. Bind handler params from args (typically request.args)
3. Evaluate via OCaml kernel (or Python fallback)
4. Return ``SxExpr`` wire format
"""
from .jinja_bridge import get_component_env, _get_request_context
from .pages import get_page_helpers
from .parser import serialize
from .types import NIL, SxExpr
import os
if args is None:
args = {}
use_ocaml = os.environ.get("SX_USE_OCAML") == "1"
if use_ocaml:
from .ocaml_bridge import get_bridge
# Serialize handler body with bound params as a let expression.
# Define constants and defcomps from the handler file are available
# in the kernel's global env (loaded by _ensure_components).
param_bindings = []
for param in handler_def.params:
val = args.get(param, args.get(param.replace("-", "_"), NIL))
param_bindings.append(f"({param} {serialize(val)})")
body_sx = serialize(handler_def.body)
if param_bindings:
sx_text = f"(let ({' '.join(param_bindings)}) {body_sx})"
else:
sx_text = body_sx
bridge = await get_bridge()
ocaml_ctx = {"_helper_service": service_name}
result_sx = await bridge.aser(sx_text, ctx=ocaml_ctx)
return SxExpr(result_sx or "")
else:
# Python fallback
if os.environ.get("SX_USE_REF") == "1":
from .ref.async_eval_ref import async_eval_to_sx
else:
from .async_eval import async_eval_to_sx
env = dict(get_component_env())
env.update(get_page_helpers(service_name))
env.update(handler_def.closure)
for param in handler_def.params:
env[param] = args.get(param, args.get(param.replace("-", "_"), NIL))
ctx = _get_request_context()
return await async_eval_to_sx(handler_def.body, env, ctx)
# ---------------------------------------------------------------------------
# Blueprint factory
# ---------------------------------------------------------------------------
def create_handler_blueprint(service_name: str) -> Any:
"""Create a Quart Blueprint that dispatches fragment requests to
both sx-defined handlers and Python handler functions.
Usage::
bp = create_handler_blueprint("blog")
bp.add_python_handler("nav-tree", my_python_handler)
app.register_blueprint(bp)
"""
from quart import Blueprint, Response, request
from shared.infrastructure.fragments import FRAGMENT_HEADER
bp = Blueprint(
f"sx_handlers_{service_name}",
__name__,
url_prefix="/internal/fragments",
)
# Python-side handler overrides
_python_handlers: dict[str, Callable[[], Awaitable[str]]] = {}
@bp.before_request
async def _require_fragment_header():
if not request.headers.get(FRAGMENT_HEADER):
return Response("", status=403)
@bp.get("/<fragment_type>")
async def get_fragment(fragment_type: str):
# 1. Check Python handlers first (manual overrides)
py_handler = _python_handlers.get(fragment_type)
if py_handler is not None:
result = await py_handler()
return Response(result, status=200, content_type="text/sx")
# 2. Check sx handler registry
handler_def = get_handler(service_name, fragment_type)
if handler_def is not None:
result = await execute_handler(
handler_def,
service_name,
args=dict(request.args),
)
return Response(result, status=200, content_type="text/sx")
# 3. No handler found — return empty
return Response("", status=200, content_type="text/sx")
def add_python_handler(name: str, fn: Callable[[], Awaitable[str]]) -> None:
"""Register a Python async function as a fragment handler."""
_python_handlers[name] = fn
bp.add_python_handler = add_python_handler # type: ignore[attr-defined]
bp._python_handlers = _python_handlers # type: ignore[attr-defined]
return bp
# ---------------------------------------------------------------------------
# Public route registration — handlers with :path get mounted as routes
# ---------------------------------------------------------------------------
def register_route_handlers(app_or_bp: Any, service_name: str) -> int:
"""Register public routes for all handlers with :path defined.
Returns the number of routes registered.
"""
from quart import Response, request
from shared.browser.app.csrf import csrf_exempt
# Register SX atom converter for expression URL parameters
if hasattr(app_or_bp, 'url_map'):
app_or_bp.url_map.converters.setdefault('sx', SxAtomConverter)
handlers = get_all_handlers(service_name)
count = 0
for name, hdef in handlers.items():
if not hdef.is_route:
continue
# Capture hdef in closure
_hdef = hdef
async def _route_view(_h=_hdef, **path_kwargs):
from shared.sx.helpers import sx_response
from shared.sx.primitives_io import reset_response_meta, get_response_meta
reset_response_meta()
args = dict(request.args)
args.update(path_kwargs)
result = await execute_handler(_h, service_name, args=args)
resp = sx_response(result)
meta = get_response_meta()
if meta:
if meta.get("status"):
resp.status_code = meta["status"]
for k, v in meta.get("headers", {}).items():
resp.headers[k] = v
return resp
endpoint = f"sx_route_{name}"
view_fn = _route_view
if not _hdef.csrf:
view_fn = csrf_exempt(view_fn)
method = _hdef.method.lower()
route_reg = getattr(app_or_bp, method, None)
if route_reg is None:
logger.warning("Unsupported HTTP method %s for handler %s",
_hdef.method, name)
continue
route_reg(_hdef.path, endpoint=endpoint)(view_fn)
logger.info("Registered route %s %s → handler:%s",
_hdef.method.upper(), _hdef.path, name)
count += 1
return count
# ---------------------------------------------------------------------------
# Direct app mount — replaces per-service fragment blueprint boilerplate
# ---------------------------------------------------------------------------
def auto_mount_fragment_handlers(app: Any, service_name: str) -> Callable:
"""Mount ``/internal/fragments/<type>`` directly on the app.
Returns an ``add_handler(name, fn, content_type)`` function for
registering Python handler overrides (checked before SX handlers).
"""
from quart import Response, request
from shared.infrastructure.fragments import FRAGMENT_HEADER
python_handlers: dict[str, Callable[[], Awaitable[str]]] = {}
html_types: set[str] = set()
@app.get("/internal/fragments/<fragment_type>")
async def _fragment_dispatch(fragment_type: str):
if not request.headers.get(FRAGMENT_HEADER):
return Response("", status=403)
py = python_handlers.get(fragment_type)
if py is not None:
result = await py()
ct = "text/html" if fragment_type in html_types else "text/sx"
return Response(result, status=200, content_type=ct)
hdef = get_handler(service_name, fragment_type)
if hdef is not None:
result = await execute_handler(hdef, service_name, args=dict(request.args))
return Response(result, status=200, content_type="text/sx")
return Response("", status=200, content_type="text/sx")
def add_handler(name: str, fn: Callable[[], Awaitable[str]], content_type: str = "text/sx") -> None:
python_handlers[name] = fn
if content_type == "text/html":
html_types.add(name)
return add_handler