Files
rose-ash/shared/sx/primitives_io.py
giles f77d7350dd Refactor SX primitives: modular, isomorphic, general-purpose
Spec modularization:
- Add (define-module :name) markers to primitives.sx creating 11 modules
  (7 core, 4 stdlib). Bootstrappers can now selectively include modules.
- Add parse_primitives_by_module() to boundary_parser.py.
- Remove split-ids primitive; inline at 4 call sites in blog/market queries.

Python file split:
- primitives.py: slimmed to registry + core primitives only (~350 lines)
- primitives_stdlib.py: NEW — stdlib primitives (format, text, style, debug)
- primitives_ctx.py: NEW — extracted 12 page context builders from IO
- primitives_io.py: add register_io_handler decorator, auto-derive
  IO_PRIMITIVES from registry, move sync IO bridges here

JS parity fixes:
- = uses === (strict equality), != uses !==
- round supports optional ndigits parameter
- concat uses nil-check not falsy-check (preserves 0, "", false)
- escape adds single quote entity (') matching Python/markupsafe
- assert added (was missing from JS entirely)

Bootstrapper modularization:
- PRIMITIVES_JS_MODULES / PRIMITIVES_PY_MODULES dicts keyed by module
- --modules CLI flag for selective inclusion (core.* always included)
- Regenerated sx-ref.js and sx_ref.py with all fixes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 01:45:29 +00:00

503 lines
16 KiB
Python

"""
Async I/O primitives for the s-expression resolver.
These wrap rose-ash's inter-service communication layer so that s-expressions
can fetch fragments, query data, call actions, and access request context.
Unlike pure primitives (primitives.py), these are **async** and are executed
by the resolver rather than the evaluator. They are identified by name
during the tree-walk phase and dispatched via ``asyncio.gather()``.
Usage in s-expressions::
(frag "blog" "link-card" :slug "apple")
(query "market" "products-by-ids" :ids "1,2,3")
(action "market" "create-marketplace" :name "Farm Shop" :slug "farm")
(current-user)
(htmx-request?)
"""
from __future__ import annotations
import contextvars
from typing import Any
# ---------------------------------------------------------------------------
# Registry of async IO handlers (name → coroutine)
# ---------------------------------------------------------------------------
_IO_HANDLERS: dict[str, Any] = {}
def register_io_handler(name: str):
"""Decorator that registers an async function as an IO handler."""
def decorator(fn):
_IO_HANDLERS[name] = fn
return fn
return decorator
# ---------------------------------------------------------------------------
# Request context (set per-request by the resolver)
# ---------------------------------------------------------------------------
_handler_service: contextvars.ContextVar[Any] = contextvars.ContextVar(
"_handler_service", default=None
)
def set_handler_service(service_obj: Any) -> None:
"""Bind the local domain service for ``(service ...)`` primitive calls."""
_handler_service.set(service_obj)
def get_handler_service() -> Any:
"""Get the currently bound handler service, or None."""
return _handler_service.get(None)
class RequestContext:
"""Per-request context provided to I/O primitives."""
__slots__ = ("user", "is_htmx", "extras")
def __init__(
self,
user: dict[str, Any] | None = None,
is_htmx: bool = False,
extras: dict[str, Any] | None = None,
):
self.user = user
self.is_htmx = is_htmx
self.extras = extras or {}
# ---------------------------------------------------------------------------
# I/O dispatch
# ---------------------------------------------------------------------------
async def execute_io(
name: str,
args: list[Any],
kwargs: dict[str, Any],
ctx: RequestContext,
) -> Any:
"""Execute an I/O primitive by name."""
handler = _IO_HANDLERS.get(name)
if handler is None:
raise RuntimeError(f"Unknown I/O primitive: {name}")
return await handler(args, kwargs, ctx)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _clean_kwargs(kwargs: dict[str, Any]) -> dict[str, Any]:
"""Strip None and NIL values from kwargs for Python interop."""
from .types import NIL
return {k: v for k, v in kwargs.items() if v is not None and v is not NIL}
def _dto_to_dict(obj: Any) -> dict[str, Any]:
"""Convert a DTO/dataclass/namedtuple to a plain dict."""
if hasattr(obj, "__dataclass_fields__"):
from shared.contracts.dtos import dto_to_dict
return dto_to_dict(obj)
elif hasattr(obj, "_asdict"):
d = dict(obj._asdict())
elif hasattr(obj, "__dict__"):
d = {k: v for k, v in obj.__dict__.items() if not k.startswith("_")}
else:
return {"value": obj}
for key, val in list(d.items()):
if hasattr(val, "year") and hasattr(val, "strftime"):
d[f"{key}_year"] = val.year
d[f"{key}_month"] = val.month
d[f"{key}_day"] = val.day
return d
def _convert_result(result: Any) -> Any:
"""Convert a service method result for sx consumption."""
if result is None:
from .types import NIL
return NIL
if isinstance(result, (int, float, str, bool)):
return result
if hasattr(result, "isoformat") and callable(result.isoformat):
return result.isoformat()
if isinstance(result, dict):
return {k: _convert_result(v) for k, v in result.items()}
if isinstance(result, tuple):
return [_convert_result(item) for item in result]
if hasattr(result, "__dataclass_fields__") or hasattr(result, "_asdict"):
return _dto_to_dict(result)
if isinstance(result, list):
return [
_dto_to_dict(item)
if hasattr(item, "__dataclass_fields__") or hasattr(item, "_asdict")
else _convert_result(item)
for item in result
]
return result
# ---------------------------------------------------------------------------
# Generic IO handlers
# ---------------------------------------------------------------------------
@register_io_handler("frag")
async def _io_frag(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> str:
"""``(frag "service" "type" :key val ...)`` → fetch_fragment."""
if len(args) < 2:
raise ValueError("frag requires service and fragment type")
service = str(args[0])
frag_type = str(args[1])
params = _clean_kwargs(kwargs)
from shared.infrastructure.fragments import fetch_fragment
return await fetch_fragment(service, frag_type, params=params or None)
@register_io_handler("query")
async def _io_query(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> Any:
"""``(query "service" "query-name" :key val ...)`` → fetch_data."""
if len(args) < 2:
raise ValueError("query requires service and query name")
service = str(args[0])
query_name = str(args[1])
params = _clean_kwargs(kwargs)
from shared.infrastructure.data_client import fetch_data
return await fetch_data(service, query_name, params=params or None)
@register_io_handler("action")
async def _io_action(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> Any:
"""``(action "service" "action-name" :key val ...)`` → call_action."""
if len(args) < 2:
raise ValueError("action requires service and action name")
service = str(args[0])
action_name = str(args[1])
payload = _clean_kwargs(kwargs)
from shared.infrastructure.actions import call_action
return await call_action(service, action_name, payload=payload or None)
@register_io_handler("current-user")
async def _io_current_user(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> dict[str, Any] | None:
"""``(current-user)`` → user dict from request context."""
return ctx.user
@register_io_handler("htmx-request?")
async def _io_htmx_request(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> bool:
"""``(htmx-request?)`` → True if HX-Request header present."""
return ctx.is_htmx
@register_io_handler("service")
async def _io_service(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> Any:
"""``(service "svc-name" "method-name" :key val ...)`` → call domain service."""
if not args:
raise ValueError("service requires at least a method name")
if len(args) >= 2:
from shared.services.registry import services as svc_registry
svc_name = str(args[0]).replace("-", "_")
svc = getattr(svc_registry, svc_name, None)
if svc is None:
raise RuntimeError(f"No service registered as: {svc_name}")
method_name = str(args[1]).replace("-", "_")
else:
svc = get_handler_service()
if svc is None:
raise RuntimeError(
"No handler service bound — cannot call (service ...)")
method_name = str(args[0]).replace("-", "_")
method = getattr(svc, method_name, None)
if method is None:
raise RuntimeError(f"Service has no method: {method_name}")
from .types import NIL
clean_kwargs = {
k.replace("-", "_"): (None if v is NIL else v)
for k, v in kwargs.items()
}
from quart import g
result = await method(g.s, **clean_kwargs)
return _convert_result(result)
@register_io_handler("request-arg")
async def _io_request_arg(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> Any:
"""``(request-arg "name" default?)`` → request.args.get(name, default)."""
if not args:
raise ValueError("request-arg requires a name")
from quart import request
name = str(args[0])
default = args[1] if len(args) > 1 else None
return request.args.get(name, default)
@register_io_handler("request-path")
async def _io_request_path(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> str:
"""``(request-path)`` → request.path."""
from quart import request
return request.path
@register_io_handler("nav-tree")
async def _io_nav_tree(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> list[dict[str, Any]]:
"""``(nav-tree)`` → list of navigation menu node dicts."""
from quart import g
from shared.services.navigation import get_navigation_tree
nodes = await get_navigation_tree(g.s)
return [_dto_to_dict(node) for node in nodes]
@register_io_handler("get-children")
async def _io_get_children(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> list[dict[str, Any]]:
"""``(get-children :parent-type "page" :parent-id 1 ...)``"""
from quart import g
from shared.services.relationships import get_children
clean = {k.replace("-", "_"): v for k, v in kwargs.items()}
children = await get_children(g.s, **clean)
return [_dto_to_dict(child) for child in children]
@register_io_handler("g")
async def _io_g(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> Any:
"""``(g "key")`` → getattr(g, key, None)."""
from quart import g
key = str(args[0]).replace("-", "_") if args else ""
return getattr(g, key, None)
@register_io_handler("csrf-token")
async def _io_csrf_token(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> str:
"""``(csrf-token)`` → current CSRF token string."""
from quart import current_app
csrf = current_app.jinja_env.globals.get("csrf_token")
if callable(csrf):
return csrf()
return ""
@register_io_handler("abort")
async def _io_abort(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> Any:
"""``(abort 403 "message")`` — raise HTTP error from SX."""
if not args:
raise ValueError("abort requires a status code")
from quart import abort
status = int(args[0])
message = str(args[1]) if len(args) > 1 else ""
abort(status, message)
@register_io_handler("url-for")
async def _io_url_for(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> str:
"""``(url-for "endpoint" :key val ...)`` → url_for(endpoint, **kwargs)."""
if not args:
raise ValueError("url-for requires an endpoint name")
from quart import url_for
endpoint = str(args[0])
clean = {k.replace("-", "_"): v for k, v in _clean_kwargs(kwargs).items()}
for k, v in clean.items():
if isinstance(v, str) and v.isdigit():
clean[k] = int(v)
return url_for(endpoint, **clean)
@register_io_handler("route-prefix")
async def _io_route_prefix(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> str:
"""``(route-prefix)`` → current route prefix string."""
from shared.utils import route_prefix
return route_prefix()
@register_io_handler("request-view-args")
async def _io_request_view_args(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> Any:
"""``(request-view-args "key")`` → request.view_args[key]."""
if not args:
raise ValueError("request-view-args requires a key")
from quart import request
key = str(args[0])
return (request.view_args or {}).get(key)
@register_io_handler("app-url")
async def _io_app_url(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> str:
"""``(app-url "blog" "/my-post/")`` → full URL for service."""
if not args:
raise ValueError("app-url requires a service name")
from shared.infrastructure.urls import app_url
service = str(args[0])
path = str(args[1]) if len(args) > 1 else "/"
return app_url(service, path)
@register_io_handler("asset-url")
async def _io_asset_url(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> str:
"""``(asset-url "/img/logo.png")`` → versioned static URL."""
from shared.infrastructure.urls import asset_url
path = str(args[0]) if args else ""
return asset_url(path)
@register_io_handler("config")
async def _io_config(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> Any:
"""``(config "key")`` → shared.config.config()[key]."""
if not args:
raise ValueError("config requires a key")
from shared.config import config
cfg = config()
return cfg.get(str(args[0]))
@register_io_handler("jinja-global")
async def _io_jinja_global(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> Any:
"""``(jinja-global "key")`` → current_app.jinja_env.globals[key]."""
if not args:
raise ValueError("jinja-global requires a key")
from quart import current_app
key = str(args[0])
default = args[1] if len(args) > 1 else None
return current_app.jinja_env.globals.get(key, default)
@register_io_handler("relations-from")
async def _io_relations_from(
args: list[Any], kwargs: dict[str, Any], ctx: RequestContext
) -> list[dict]:
"""``(relations-from "page")`` → list of RelationDef dicts."""
if not args:
raise ValueError("relations-from requires an entity type")
from shared.sx.relations import relations_from
return [
{
"name": d.name, "from_type": d.from_type, "to_type": d.to_type,
"cardinality": d.cardinality, "nav": d.nav,
"nav_icon": d.nav_icon, "nav_label": d.nav_label,
}
for d in relations_from(str(args[0]))
]
# ---------------------------------------------------------------------------
# Import context handlers (registers into _IO_HANDLERS via decorator)
# ---------------------------------------------------------------------------
from . import primitives_ctx # noqa: E402, F401
# ---------------------------------------------------------------------------
# Auto-derive IO_PRIMITIVES from registered handlers
# ---------------------------------------------------------------------------
IO_PRIMITIVES: frozenset[str] = frozenset(_IO_HANDLERS.keys())
# ---------------------------------------------------------------------------
# Sync IO bridge primitives
#
# These are declared in boundary.sx (I/O tier), NOT primitives.sx.
# They must be evaluator-visible because they're called inline in .sx
# code (inside let, filter, etc.) where the async IO interceptor can't
# reach them — particularly in sx_ref.py which only intercepts IO at
# the top level.
# ---------------------------------------------------------------------------
from .primitives import _PRIMITIVES # noqa: E402
def _bridge_app_url(service, *path_parts):
from shared.infrastructure.urls import app_url
path = str(path_parts[0]) if path_parts else "/"
return app_url(str(service), path)
def _bridge_asset_url(*path_parts):
from shared.infrastructure.urls import asset_url
path = str(path_parts[0]) if path_parts else ""
return asset_url(path)
def _bridge_config(key):
from shared.config import config
cfg = config()
return cfg.get(str(key))
def _bridge_jinja_global(key, *default):
from quart import current_app
d = default[0] if default else None
return current_app.jinja_env.globals.get(str(key), d)
def _bridge_relations_from(entity_type):
from shared.sx.relations import relations_from
return [
{
"name": d.name, "from_type": d.from_type, "to_type": d.to_type,
"cardinality": d.cardinality, "nav": d.nav,
"nav_icon": d.nav_icon, "nav_label": d.nav_label,
}
for d in relations_from(str(entity_type))
]
_PRIMITIVES["app-url"] = _bridge_app_url
_PRIMITIVES["asset-url"] = _bridge_asset_url
_PRIMITIVES["config"] = _bridge_config
_PRIMITIVES["jinja-global"] = _bridge_jinja_global
_PRIMITIVES["relations-from"] = _bridge_relations_from
# ---------------------------------------------------------------------------
# Validate all IO handlers against boundary.sx
# ---------------------------------------------------------------------------
def _validate_io_handlers() -> None:
from .boundary import validate_io
for name in _IO_HANDLERS:
validate_io(name)
_validate_io_handlers()