""" Declarative handler registry and blueprint factory. Supports ``defhandler`` s-expressions that define fragment handlers in .sx files instead of Python. Each handler is a self-contained s-expression with a bounded primitive vocabulary, providing a clear security boundary and AI legibility. Usage:: from shared.sx.handlers import create_handler_blueprint, load_handler_file # Load handler definitions from .sx files load_handler_file("blog/sx/handlers/link-card.sx", "blog") # Create a blueprint that dispatches to both sx and Python handlers bp = create_handler_blueprint("blog") bp.add_python_handler("nav-tree", nav_tree_handler_fn) """ from __future__ import annotations import logging import os from typing import Any, Callable, Awaitable from werkzeug.routing import BaseConverter from .types import HandlerDef logger = logging.getLogger("sx.handlers") class SxAtomConverter(BaseConverter): """URL converter for SX atoms inside expression URLs. Matches a single atom — stops at dots, parens, slashes, and query chars. Use as ```` in route patterns like:: /(geography.(hypermedia.(reference.(api.(item.))))) """ regex = r"[^./)(?&= ]+" # --------------------------------------------------------------------------- # Registry — service → handler-name → HandlerDef # --------------------------------------------------------------------------- _HANDLER_REGISTRY: dict[str, dict[str, HandlerDef]] = {} def register_handler(service: str, handler_def: HandlerDef) -> None: """Register a handler definition for a service.""" if service not in _HANDLER_REGISTRY: _HANDLER_REGISTRY[service] = {} _HANDLER_REGISTRY[service][handler_def.name] = handler_def logger.debug("Registered handler %s:%s", service, handler_def.name) def get_handler(service: str, name: str) -> HandlerDef | None: """Look up a registered handler by service and name.""" return _HANDLER_REGISTRY.get(service, {}).get(name) def get_all_handlers(service: str) -> dict[str, HandlerDef]: """Return all handlers for a service.""" return dict(_HANDLER_REGISTRY.get(service, {})) def clear_handlers(service: str | None = None) -> None: """Clear handler registry. If service given, clear only that service.""" if service is None: _HANDLER_REGISTRY.clear() else: _HANDLER_REGISTRY.pop(service, None) # --------------------------------------------------------------------------- # Loading — parse .sx files and collect HandlerDef instances # --------------------------------------------------------------------------- def load_handler_file(filepath: str, service_name: str) -> list[HandlerDef]: """Parse an .sx file, evaluate it, and register any HandlerDef values.""" from .parser import parse_all import os from .ref.sx_ref import eval_expr as _raw_eval, trampoline as _trampoline _eval = lambda expr, env: _trampoline(_raw_eval(expr, env)) from .jinja_bridge import get_component_env with open(filepath, encoding="utf-8") as f: source = f.read() # Seed env with component definitions so handlers can reference components env = dict(get_component_env()) exprs = parse_all(source) handlers: list[HandlerDef] = [] for expr in exprs: _eval(expr, env) # Collect all HandlerDef values from the env for key, val in env.items(): if isinstance(val, HandlerDef): register_handler(service_name, val) handlers.append(val) return handlers def load_handler_dir(directory: str, service_name: str) -> list[HandlerDef]: """Load all .sx files from a directory and register handlers.""" import glob as glob_mod handlers: list[HandlerDef] = [] for filepath in sorted(glob_mod.glob(os.path.join(directory, "*.sx"))): handlers.extend(load_handler_file(filepath, service_name)) return handlers # --------------------------------------------------------------------------- # Handler execution # --------------------------------------------------------------------------- async def execute_handler( handler_def: HandlerDef, service_name: str, args: dict[str, str] | None = None, ) -> str: """Execute a declarative handler and return SX wire format (``SxExpr``). Uses the async evaluator so I/O primitives (``query``, ``service``, ``request-arg``, etc.) are awaited inline within control flow. Returns ``SxExpr`` — pre-built sx source. Callers like ``fetch_fragment`` check ``content-type: text/sx`` and wrap the response in ``SxExpr`` when consuming cross-service fragments. 1. Build env from component env + handler closure 2. Bind handler params from args (typically request.args) 3. Evaluate via OCaml kernel (or Python fallback) 4. Return ``SxExpr`` wire format """ from .jinja_bridge import get_component_env, _get_request_context from .pages import get_page_helpers from .parser import serialize from .types import NIL, SxExpr import os if args is None: args = {} use_ocaml = os.environ.get("SX_USE_OCAML") == "1" if use_ocaml: from .ocaml_bridge import get_bridge # Serialize handler body with bound params as a let expression. # Define constants and defcomps from the handler file are available # in the kernel's global env (loaded by _ensure_components). param_bindings = [] for param in handler_def.params: val = args.get(param, args.get(param.replace("-", "_"), NIL)) param_bindings.append(f"({param} {serialize(val)})") body_sx = serialize(handler_def.body) if param_bindings: sx_text = f"(let ({' '.join(param_bindings)}) {body_sx})" else: sx_text = body_sx bridge = await get_bridge() ocaml_ctx = {"_helper_service": service_name} result_sx = await bridge.aser(sx_text, ctx=ocaml_ctx) return SxExpr(result_sx or "") else: # Python fallback if os.environ.get("SX_USE_REF") == "1": from .ref.async_eval_ref import async_eval_to_sx else: from .async_eval import async_eval_to_sx env = dict(get_component_env()) env.update(get_page_helpers(service_name)) env.update(handler_def.closure) for param in handler_def.params: env[param] = args.get(param, args.get(param.replace("-", "_"), NIL)) ctx = _get_request_context() return await async_eval_to_sx(handler_def.body, env, ctx) # --------------------------------------------------------------------------- # Blueprint factory # --------------------------------------------------------------------------- def create_handler_blueprint(service_name: str) -> Any: """Create a Quart Blueprint that dispatches fragment requests to both sx-defined handlers and Python handler functions. Usage:: bp = create_handler_blueprint("blog") bp.add_python_handler("nav-tree", my_python_handler) app.register_blueprint(bp) """ from quart import Blueprint, Response, request from shared.infrastructure.fragments import FRAGMENT_HEADER bp = Blueprint( f"sx_handlers_{service_name}", __name__, url_prefix="/internal/fragments", ) # Python-side handler overrides _python_handlers: dict[str, Callable[[], Awaitable[str]]] = {} @bp.before_request async def _require_fragment_header(): if not request.headers.get(FRAGMENT_HEADER): return Response("", status=403) @bp.get("/") async def get_fragment(fragment_type: str): # 1. Check Python handlers first (manual overrides) py_handler = _python_handlers.get(fragment_type) if py_handler is not None: result = await py_handler() return Response(result, status=200, content_type="text/sx") # 2. Check sx handler registry handler_def = get_handler(service_name, fragment_type) if handler_def is not None: result = await execute_handler( handler_def, service_name, args=dict(request.args), ) return Response(result, status=200, content_type="text/sx") # 3. No handler found — return empty return Response("", status=200, content_type="text/sx") def add_python_handler(name: str, fn: Callable[[], Awaitable[str]]) -> None: """Register a Python async function as a fragment handler.""" _python_handlers[name] = fn bp.add_python_handler = add_python_handler # type: ignore[attr-defined] bp._python_handlers = _python_handlers # type: ignore[attr-defined] return bp # --------------------------------------------------------------------------- # Public route registration — handlers with :path get mounted as routes # --------------------------------------------------------------------------- def register_route_handlers(app_or_bp: Any, service_name: str) -> int: """Register public routes for all handlers with :path defined. Returns the number of routes registered. """ from quart import Response, request from shared.browser.app.csrf import csrf_exempt # Register SX atom converter for expression URL parameters if hasattr(app_or_bp, 'url_map'): app_or_bp.url_map.converters.setdefault('sx', SxAtomConverter) handlers = get_all_handlers(service_name) count = 0 for name, hdef in handlers.items(): if not hdef.is_route: continue # Capture hdef in closure _hdef = hdef async def _route_view(_h=_hdef, **path_kwargs): from shared.sx.helpers import sx_response from shared.sx.primitives_io import reset_response_meta, get_response_meta reset_response_meta() args = dict(request.args) args.update(path_kwargs) result = await execute_handler(_h, service_name, args=args) resp = sx_response(result) meta = get_response_meta() if meta: if meta.get("status"): resp.status_code = meta["status"] for k, v in meta.get("headers", {}).items(): resp.headers[k] = v return resp endpoint = f"sx_route_{name}" view_fn = _route_view if not _hdef.csrf: view_fn = csrf_exempt(view_fn) method = _hdef.method.lower() route_reg = getattr(app_or_bp, method, None) if route_reg is None: logger.warning("Unsupported HTTP method %s for handler %s", _hdef.method, name) continue route_reg(_hdef.path, endpoint=endpoint)(view_fn) logger.info("Registered route %s %s → handler:%s", _hdef.method.upper(), _hdef.path, name) count += 1 return count # --------------------------------------------------------------------------- # Direct app mount — replaces per-service fragment blueprint boilerplate # --------------------------------------------------------------------------- def auto_mount_fragment_handlers(app: Any, service_name: str) -> Callable: """Mount ``/internal/fragments/`` directly on the app. Returns an ``add_handler(name, fn, content_type)`` function for registering Python handler overrides (checked before SX handlers). """ from quart import Response, request from shared.infrastructure.fragments import FRAGMENT_HEADER python_handlers: dict[str, Callable[[], Awaitable[str]]] = {} html_types: set[str] = set() @app.get("/internal/fragments/") async def _fragment_dispatch(fragment_type: str): if not request.headers.get(FRAGMENT_HEADER): return Response("", status=403) py = python_handlers.get(fragment_type) if py is not None: result = await py() ct = "text/html" if fragment_type in html_types else "text/sx" return Response(result, status=200, content_type=ct) hdef = get_handler(service_name, fragment_type) if hdef is not None: result = await execute_handler(hdef, service_name, args=dict(request.args)) return Response(result, status=200, content_type="text/sx") return Response("", status=200, content_type="text/sx") def add_handler(name: str, fn: Callable[[], Awaitable[str]], content_type: str = "text/sx") -> None: python_handlers[name] = fn if content_type == "text/html": html_types.add(name) return add_handler