Add adapter-sx.sx transpilation, async wrapper, and SX_USE_REF switching

- Transpile adapter-sx.sx (aser) alongside adapter-html.sx for SX wire format
- Add platform functions: serialize, escape_string, is_special_form, is_ho_form,
  aser_special (with proper control-flow-through-aser dispatch)
- SxExpr wrapping prevents double-quoting in aser output
- async_eval_ref.py: async wrapper with I/O primitives, RequestContext,
  async_render, async_eval_to_sx, async_eval_slot_to_sx
- SX_USE_REF=1 env var switches shared.sx imports to transpiled backend
- 68 comparison tests (test_sx_ref.py), 289 total tests passing

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-05 22:05:35 +00:00
parent 4534fb9fee
commit 7982a07f94
6 changed files with 1872 additions and 7 deletions

View File

@@ -19,6 +19,7 @@ from typing import Any
from shared.sx.types import (
NIL, Symbol, Keyword, Lambda, Component, Macro, StyleValue,
)
from shared.sx.parser import SxExpr
# =========================================================================
@@ -105,6 +106,8 @@ def type_of(x):
return "boolean"
if isinstance(x, (int, float)):
return "number"
if isinstance(x, SxExpr):
return "sx-expr"
if isinstance(x, str):
return "string"
if isinstance(x, Symbol):
@@ -376,6 +379,228 @@ def _sx_cell_set(cells, name, val):
return val
def escape_string(s):
"""Escape a string for SX serialization."""
return (str(s)
.replace("\\", "\\\\")
.replace('"', '\\"')
.replace("\n", "\\n")
.replace("\t", "\\t")
.replace("</script", "<\\/script"))
def serialize(val):
"""Serialize an SX value to SX source text."""
t = type_of(val)
if t == "sx-expr":
return val.source
if t == "nil":
return "nil"
if t == "boolean":
return "true" if val else "false"
if t == "number":
return str(val)
if t == "string":
return '"' + escape_string(val) + '"'
if t == "symbol":
return symbol_name(val)
if t == "keyword":
return ":" + keyword_name(val)
if t == "raw-html":
escaped = escape_string(raw_html_content(val))
return '(raw! "' + escaped + '")'
if t == "style-value":
return '"' + style_value_class(val) + '"'
if t == "list":
if not val:
return "()"
items = [serialize(x) for x in val]
return "(" + " ".join(items) + ")"
if t == "dict":
items = []
for k, v in val.items():
items.append(":" + str(k))
items.append(serialize(v))
return "{" + " ".join(items) + "}"
if callable(val):
return "nil"
return str(val)
_SPECIAL_FORM_NAMES = frozenset([
"if", "when", "cond", "case", "and", "or",
"let", "let*", "lambda", "fn",
"define", "defcomp", "defmacro", "defstyle", "defkeyframes",
"defhandler", "defpage", "defquery", "defaction", "defrelation",
"begin", "do", "quote", "quasiquote",
"->", "set!",
])
_HO_FORM_NAMES = frozenset([
"map", "map-indexed", "filter", "reduce",
"some", "every?", "for-each",
])
def is_special_form(name):
return name in _SPECIAL_FORM_NAMES
def is_ho_form(name):
return name in _HO_FORM_NAMES
def aser_special(name, expr, env):
"""Evaluate a special/HO form in aser mode.
Control flow forms evaluate conditions normally but render branches
through aser (serializing tags/components instead of rendering HTML).
Definition forms evaluate for side effects and return nil.
"""
# Control flow — evaluate conditions, aser branches
args = expr[1:]
if name == "if":
cond_val = trampoline(eval_expr(args[0], env))
if sx_truthy(cond_val):
return aser(args[1], env)
return aser(args[2], env) if _b_len(args) > 2 else NIL
if name == "when":
cond_val = trampoline(eval_expr(args[0], env))
if sx_truthy(cond_val):
result = NIL
for body in args[1:]:
result = aser(body, env)
return result
return NIL
if name == "cond":
clauses = args
if clauses and isinstance(clauses[0], _b_list) and _b_len(clauses[0]) == 2:
for clause in clauses:
test = clause[0]
if isinstance(test, Symbol) and test.name in ("else", ":else"):
return aser(clause[1], env)
if isinstance(test, Keyword) and test.name == "else":
return aser(clause[1], env)
if sx_truthy(trampoline(eval_expr(test, env))):
return aser(clause[1], env)
else:
i = 0
while i < _b_len(clauses) - 1:
test = clauses[i]
result = clauses[i + 1]
if isinstance(test, Keyword) and test.name == "else":
return aser(result, env)
if isinstance(test, Symbol) and test.name in (":else", "else"):
return aser(result, env)
if sx_truthy(trampoline(eval_expr(test, env))):
return aser(result, env)
i += 2
return NIL
if name == "case":
match_val = trampoline(eval_expr(args[0], env))
clauses = args[1:]
i = 0
while i < _b_len(clauses) - 1:
test = clauses[i]
result = clauses[i + 1]
if isinstance(test, Keyword) and test.name == "else":
return aser(result, env)
if isinstance(test, Symbol) and test.name in (":else", "else"):
return aser(result, env)
if match_val == trampoline(eval_expr(test, env)):
return aser(result, env)
i += 2
return NIL
if name in ("let", "let*"):
bindings = args[0]
local = _b_dict(env)
if isinstance(bindings, _b_list):
if bindings and isinstance(bindings[0], _b_list):
for b in bindings:
var = b[0]
vname = var.name if isinstance(var, Symbol) else var
local[vname] = trampoline(eval_expr(b[1], local))
else:
for i in _b_range(0, _b_len(bindings), 2):
var = bindings[i]
vname = var.name if isinstance(var, Symbol) else var
local[vname] = trampoline(eval_expr(bindings[i + 1], local))
result = NIL
for body in args[1:]:
result = aser(body, local)
return result
if name in ("begin", "do"):
result = NIL
for body in args:
result = aser(body, env)
return result
if name == "and":
result = True
for arg in args:
result = trampoline(eval_expr(arg, env))
if not sx_truthy(result):
return result
return result
if name == "or":
result = False
for arg in args:
result = trampoline(eval_expr(arg, env))
if sx_truthy(result):
return result
return result
# HO forms in aser mode — map/for-each render through aser
if name == "map":
fn = trampoline(eval_expr(args[0], env))
coll = trampoline(eval_expr(args[1], env))
results = []
for item in coll:
if isinstance(fn, Lambda):
local = _b_dict(fn.closure)
local.update(env)
local[fn.params[0]] = item
results.append(aser(fn.body, local))
elif callable(fn):
results.append(fn(item))
else:
raise EvalError("map requires callable")
return results
if name == "map-indexed":
fn = trampoline(eval_expr(args[0], env))
coll = trampoline(eval_expr(args[1], env))
results = []
for i, item in enumerate(coll):
if isinstance(fn, Lambda):
local = _b_dict(fn.closure)
local.update(env)
local[fn.params[0]] = i
local[fn.params[1]] = item
results.append(aser(fn.body, local))
elif callable(fn):
results.append(fn(i, item))
else:
raise EvalError("map-indexed requires callable")
return results
if name == "for-each":
fn = trampoline(eval_expr(args[0], env))
coll = trampoline(eval_expr(args[1], env))
results = []
for item in coll:
if isinstance(fn, Lambda):
local = _b_dict(fn.closure)
local.update(env)
local[fn.params[0]] = item
results.append(aser(fn.body, local))
elif callable(fn):
fn(item)
return results if results else NIL
# Definition forms — evaluate for side effects
if name in ("define", "defcomp", "defmacro", "defstyle", "defkeyframes",
"defhandler", "defpage", "defquery", "defaction", "defrelation"):
trampoline(eval_expr(expr, env))
return NIL
# Lambda/fn, quote, quasiquote, set!, -> : evaluate normally
result = eval_expr(expr, env)
return trampoline(result)
# =========================================================================
# Primitives
# =========================================================================
@@ -471,7 +696,7 @@ PRIMITIVES["zero?"] = lambda n: n == 0
# Collections
PRIMITIVES["list"] = lambda *args: _b_list(args)
PRIMITIVES["dict"] = lambda *args: {args[i]: args[i+1] for i in _b_range(0, _b_len(args)-1, 2)}
PRIMITIVES["range"] = lambda a, b, step=1: _b_list(_b_range(a, b, step))
PRIMITIVES["range"] = lambda a, b, step=1: _b_list(_b_range(_b_int(a), _b_int(b), _b_int(step)))
PRIMITIVES["get"] = lambda c, k, default=NIL: c.get(k, default) if isinstance(c, _b_dict) else (c[k] if isinstance(c, (_b_list, str)) and isinstance(k, _b_int) and 0 <= k < _b_len(c) else default)
PRIMITIVES["len"] = lambda c: _b_len(c) if c is not None and c is not NIL else 0
PRIMITIVES["first"] = lambda c: c[0] if c and _b_len(c) > 0 else NIL
@@ -782,6 +1007,24 @@ render_html_component = lambda comp, args, env: (lambda kwargs: (lambda children
render_html_element = lambda tag, args, env: (lambda parsed: (lambda attrs: (lambda children: (lambda is_void: sx_str('<', tag, render_attrs(attrs), (' />' if sx_truthy(is_void) else sx_str('>', join('', map(lambda c: render_to_html(c, env), children)), '</', tag, '>'))))(contains_p(VOID_ELEMENTS, tag)))(nth(parsed, 1)))(first(parsed)))(parse_element_args(args, env))
# === Transpiled from adapter-sx ===
# render-to-sx
render_to_sx = lambda expr, env: (lambda result: (result if sx_truthy((type_of(result) == 'string')) else serialize(result)))(aser(expr, env))
# aser
aser = lambda expr, env: _sx_case(type_of(expr), [('number', lambda: expr), ('string', lambda: expr), ('boolean', lambda: expr), ('nil', lambda: NIL), ('symbol', lambda: (lambda name: (env_get(env, name) if sx_truthy(env_has(env, name)) else (get_primitive(name) if sx_truthy(is_primitive(name)) else (True if sx_truthy((name == 'true')) else (False if sx_truthy((name == 'false')) else (NIL if sx_truthy((name == 'nil')) else error(sx_str('Undefined symbol: ', name))))))))(symbol_name(expr))), ('keyword', lambda: keyword_name(expr)), ('list', lambda: ([] if sx_truthy(empty_p(expr)) else aser_list(expr, env))), (None, lambda: expr)])
# aser-list
aser_list = lambda expr, env: (lambda head: (lambda args: (map(lambda x: aser(x, env), expr) if sx_truthy((not sx_truthy((type_of(head) == 'symbol')))) else (lambda name: (aser_fragment(args, env) if sx_truthy((name == '<>')) else (aser_call(name, args, env) if sx_truthy(starts_with_p(name, '~')) else (aser_call(name, args, env) if sx_truthy(contains_p(HTML_TAGS, name)) else (aser_special(name, expr, env) if sx_truthy((is_special_form(name) if sx_truthy(is_special_form(name)) else is_ho_form(name))) else (aser(expand_macro(env_get(env, name), args, env), env) if sx_truthy((env_has(env, name) if not sx_truthy(env_has(env, name)) else is_macro(env_get(env, name)))) else (lambda f: (lambda evaled_args: (apply(f, evaled_args) if sx_truthy((is_callable(f) if not sx_truthy(is_callable(f)) else ((not sx_truthy(is_lambda(f))) if not sx_truthy((not sx_truthy(is_lambda(f)))) else (not sx_truthy(is_component(f)))))) else (trampoline(call_lambda(f, evaled_args, env)) if sx_truthy(is_lambda(f)) else (aser_call(sx_str('~', component_name(f)), args, env) if sx_truthy(is_component(f)) else error(sx_str('Not callable: ', inspect(f)))))))(map(lambda a: trampoline(eval_expr(a, env)), args)))(trampoline(eval_expr(head, env)))))))))(symbol_name(head))))(rest(expr)))(first(expr))
# aser-fragment
aser_fragment = lambda children, env: (lambda parts: ('' if sx_truthy(empty_p(parts)) else sx_str('(<> ', join(' ', map(serialize, parts)), ')')))(filter(lambda x: (not sx_truthy(is_nil(x))), map(lambda c: aser(c, env), children)))
# aser-call
aser_call = lambda name, args, env: (lambda parts: _sx_begin(reduce(lambda state, arg: (lambda skip: (assoc(state, 'skip', False, 'i', (get(state, 'i') + 1)) if sx_truthy(skip) else ((lambda val: _sx_begin((_sx_begin(_sx_append(parts, sx_str(':', keyword_name(arg))), _sx_append(parts, serialize(val))) if sx_truthy((not sx_truthy(is_nil(val)))) else NIL), assoc(state, 'skip', True, 'i', (get(state, 'i') + 1))))(aser(nth(args, (get(state, 'i') + 1)), env)) if sx_truthy(((type_of(arg) == 'keyword') if not sx_truthy((type_of(arg) == 'keyword')) else ((get(state, 'i') + 1) < len(args)))) else (lambda val: _sx_begin((_sx_append(parts, serialize(val)) if sx_truthy((not sx_truthy(is_nil(val)))) else NIL), assoc(state, 'i', (get(state, 'i') + 1))))(aser(arg, env)))))(get(state, 'skip')), {'i': 0, 'skip': False}, args), sx_str('(', join(' ', parts), ')')))([name])
# =========================================================================
# Fixups -- wire up render adapter dispatch
# =========================================================================
@@ -795,10 +1038,32 @@ def _setup_sx_adapter():
_render_expr_fn = lambda expr, env: aser_list(expr, env)
# Wrap aser_call and aser_fragment to return SxExpr
# so serialize() won't double-quote them
_orig_aser_call = None
_orig_aser_fragment = None
def _wrap_aser_outputs():
global aser_call, aser_fragment, _orig_aser_call, _orig_aser_fragment
_orig_aser_call = aser_call
_orig_aser_fragment = aser_fragment
def _aser_call_wrapped(name, args, env):
result = _orig_aser_call(name, args, env)
return SxExpr(result) if isinstance(result, str) else result
def _aser_fragment_wrapped(children, env):
result = _orig_aser_fragment(children, env)
return SxExpr(result) if isinstance(result, str) else result
aser_call = _aser_call_wrapped
aser_fragment = _aser_fragment_wrapped
# =========================================================================
# Public API
# =========================================================================
# Wrap aser outputs to return SxExpr
_wrap_aser_outputs()
# Set HTML as default adapter
_setup_html_adapter()