Merge branch 'worktree-bootstrap-py' into macros

Bootstrap Python transpiler: reads .sx spec files and emits standalone
Python evaluator (sx_ref.py) with both HTML and SX wire format adapters.
Includes async wrapper and SX_USE_REF=1 switching mechanism.
This commit is contained in:
2026-03-05 22:06:18 +00:00
6 changed files with 4220 additions and 5 deletions

View File

@@ -31,11 +31,21 @@ from .parser import (
parse_all,
serialize,
)
from .evaluator import (
EvalError,
evaluate,
make_env,
)
import os as _os
if _os.environ.get("SX_USE_REF") == "1":
from .ref.sx_ref import (
EvalError,
evaluate,
make_env,
)
else:
from .evaluator import (
EvalError,
evaluate,
make_env,
)
from .primitives import (
all_primitives,
get_primitive,

View File

@@ -0,0 +1,7 @@
"""Reference SX evaluator — transpiled from canonical .sx spec files.
This package provides the bootstrap-compiled evaluator as an alternative
backend to the hand-written evaluator.py / html.py / async_eval.py.
Enable by setting SX_USE_REF=1 environment variable.
"""

View File

@@ -0,0 +1,999 @@
"""Async evaluation wrapper for the transpiled reference evaluator.
Wraps the sync sx_ref.py evaluator with async I/O support, mirroring
the hand-written async_eval.py. Provides the same public API:
async_eval() — evaluate with I/O primitives
async_render() — render to HTML with I/O
async_eval_to_sx() — evaluate to SX wire format with I/O
async_eval_slot_to_sx() — expand components server-side, then serialize
The sync transpiled evaluator handles all control flow, special forms,
and lambda/component dispatch. This wrapper adds:
- RequestContext threading
- I/O primitive interception (query, service, request-arg, etc.)
- Async trampoline for thunks
- SxExpr wrapping for wire format output
DO NOT EDIT by hand — this is a thin wrapper; the actual eval logic
lives in sx_ref.py (generated) and the I/O primitives in primitives_io.py.
"""
from __future__ import annotations
import contextvars
import inspect
from typing import Any
from ..types import Component, Keyword, Lambda, Macro, NIL, StyleValue, Symbol
from ..parser import SxExpr, serialize
from ..primitives_io import IO_PRIMITIVES, RequestContext, execute_io
from ..html import (
HTML_TAGS, VOID_ELEMENTS, BOOLEAN_ATTRS,
escape_text, escape_attr, _RawHTML, css_class_collector, _svg_context,
)
from . import sx_ref
# Re-export EvalError from sx_ref
EvalError = sx_ref.EvalError
# When True, _aser expands known components server-side
_expand_components: contextvars.ContextVar[bool] = contextvars.ContextVar(
"_expand_components_ref", default=False
)
# ---------------------------------------------------------------------------
# Async TCO
# ---------------------------------------------------------------------------
class _AsyncThunk:
__slots__ = ("expr", "env", "ctx")
def __init__(self, expr, env, ctx):
self.expr = expr
self.env = env
self.ctx = ctx
async def _async_trampoline(val):
while isinstance(val, _AsyncThunk):
val = await _async_eval(val.expr, val.env, val.ctx)
return val
# ---------------------------------------------------------------------------
# Async evaluate — wraps transpiled sync eval with I/O support
# ---------------------------------------------------------------------------
async def async_eval(expr, env, ctx=None):
"""Public entry point: evaluate with I/O primitives."""
if ctx is None:
ctx = RequestContext()
result = await _async_eval(expr, env, ctx)
while isinstance(result, _AsyncThunk):
result = await _async_eval(result.expr, result.env, result.ctx)
return result
async def _async_eval(expr, env, ctx):
"""Internal async evaluator. Intercepts I/O primitives,
delegates everything else to the sync transpiled evaluator."""
# Intercept I/O primitive calls
if isinstance(expr, list) and expr:
head = expr[0]
if isinstance(head, Symbol) and head.name in IO_PRIMITIVES:
args, kwargs = await _parse_io_args(expr[1:], env, ctx)
return await execute_io(head.name, args, kwargs, ctx)
# For everything else, use the sync transpiled evaluator
result = sx_ref.eval_expr(expr, env)
return sx_ref.trampoline(result)
async def _parse_io_args(exprs, env, ctx):
"""Parse and evaluate I/O node args (keyword + positional)."""
args = []
kwargs = {}
i = 0
while i < len(exprs):
item = exprs[i]
if isinstance(item, Keyword) and i + 1 < len(exprs):
kwargs[item.name] = await async_eval(exprs[i + 1], env, ctx)
i += 2
else:
args.append(await async_eval(item, env, ctx))
i += 1
return args, kwargs
# ---------------------------------------------------------------------------
# Async HTML renderer
# ---------------------------------------------------------------------------
async def async_render(expr, env, ctx=None):
"""Render to HTML, awaiting I/O primitives inline."""
if ctx is None:
ctx = RequestContext()
return await _arender(expr, env, ctx)
async def _arender(expr, env, ctx):
if expr is None or expr is NIL or expr is False or expr is True:
return ""
if isinstance(expr, _RawHTML):
return expr.html
if isinstance(expr, str):
return escape_text(expr)
if isinstance(expr, (int, float)):
return escape_text(str(expr))
if isinstance(expr, Symbol):
val = await async_eval(expr, env, ctx)
return await _arender(val, env, ctx)
if isinstance(expr, Keyword):
return escape_text(expr.name)
if isinstance(expr, list):
if not expr:
return ""
return await _arender_list(expr, env, ctx)
if isinstance(expr, dict):
return ""
return escape_text(str(expr))
async def _arender_list(expr, env, ctx):
head = expr[0]
if isinstance(head, Symbol):
name = head.name
# I/O primitive
if name in IO_PRIMITIVES:
result = await async_eval(expr, env, ctx)
return await _arender(result, env, ctx)
# raw!
if name == "raw!":
parts = []
for arg in expr[1:]:
val = await async_eval(arg, env, ctx)
if isinstance(val, _RawHTML):
parts.append(val.html)
elif isinstance(val, str):
parts.append(val)
elif val is not None and val is not NIL:
parts.append(str(val))
return "".join(parts)
# Fragment
if name == "<>":
parts = [await _arender(c, env, ctx) for c in expr[1:]]
return "".join(parts)
# html: prefix
if name.startswith("html:"):
return await _arender_element(name[5:], expr[1:], env, ctx)
# Render-aware special forms
arsf = _ASYNC_RENDER_FORMS.get(name)
if arsf is not None:
if name in HTML_TAGS and (
(len(expr) > 1 and isinstance(expr[1], Keyword))
or _svg_context.get(False)
):
return await _arender_element(name, expr[1:], env, ctx)
return await arsf(expr, env, ctx)
# Macro expansion
if name in env:
val = env[name]
if isinstance(val, Macro):
expanded = sx_ref.trampoline(
sx_ref.expand_macro(val, expr[1:], env)
)
return await _arender(expanded, env, ctx)
# HTML tag
if name in HTML_TAGS:
return await _arender_element(name, expr[1:], env, ctx)
# Component
if name.startswith("~"):
val = env.get(name)
if isinstance(val, Component):
return await _arender_component(val, expr[1:], env, ctx)
# Custom element
if "-" in name and len(expr) > 1 and isinstance(expr[1], Keyword):
return await _arender_element(name, expr[1:], env, ctx)
# SVG context
if _svg_context.get(False):
return await _arender_element(name, expr[1:], env, ctx)
# Fallback — evaluate then render
result = await async_eval(expr, env, ctx)
return await _arender(result, env, ctx)
if isinstance(head, (Lambda, list)):
result = await async_eval(expr, env, ctx)
return await _arender(result, env, ctx)
# Data list
parts = [await _arender(item, env, ctx) for item in expr]
return "".join(parts)
async def _arender_element(tag, args, env, ctx):
attrs = {}
children = []
i = 0
while i < len(args):
arg = args[i]
if isinstance(arg, Keyword) and i + 1 < len(args):
attrs[arg.name] = await async_eval(args[i + 1], env, ctx)
i += 2
else:
children.append(arg)
i += 1
# StyleValue → class
style_val = attrs.get("style")
if isinstance(style_val, StyleValue):
from ..css_registry import register_generated_rule
register_generated_rule(style_val)
existing = attrs.get("class")
if existing and existing is not NIL and existing is not False:
attrs["class"] = f"{existing} {style_val.class_name}"
else:
attrs["class"] = style_val.class_name
del attrs["style"]
class_val = attrs.get("class")
if class_val is not None and class_val is not NIL and class_val is not False:
collector = css_class_collector.get(None)
if collector is not None:
collector.update(str(class_val).split())
parts = [f"<{tag}"]
for attr_name, attr_val in attrs.items():
if attr_val is None or attr_val is NIL or attr_val is False:
continue
if attr_name in BOOLEAN_ATTRS:
if attr_val:
parts.append(f" {attr_name}")
elif attr_val is True:
parts.append(f" {attr_name}")
else:
parts.append(f' {attr_name}="{escape_attr(str(attr_val))}"')
parts.append(">")
opening = "".join(parts)
if tag in VOID_ELEMENTS:
return opening
token = None
if tag in ("svg", "math"):
token = _svg_context.set(True)
try:
child_parts = [await _arender(c, env, ctx) for c in children]
finally:
if token is not None:
_svg_context.reset(token)
return f"{opening}{''.join(child_parts)}</{tag}>"
async def _arender_component(comp, args, env, ctx):
kwargs = {}
children = []
i = 0
while i < len(args):
arg = args[i]
if isinstance(arg, Keyword) and i + 1 < len(args):
kwargs[arg.name] = await async_eval(args[i + 1], env, ctx)
i += 2
else:
children.append(arg)
i += 1
local = dict(comp.closure)
local.update(env)
for p in comp.params:
local[p] = kwargs.get(p, NIL)
if comp.has_children:
child_html = [await _arender(c, env, ctx) for c in children]
local["children"] = _RawHTML("".join(child_html))
return await _arender(comp.body, local, ctx)
async def _arender_lambda(fn, args, env, ctx):
local = dict(fn.closure)
local.update(env)
for p, v in zip(fn.params, args):
local[p] = v
return await _arender(fn.body, local, ctx)
# ---------------------------------------------------------------------------
# Render-aware special forms
# ---------------------------------------------------------------------------
async def _arsf_if(expr, env, ctx):
cond = await async_eval(expr[1], env, ctx)
if cond and cond is not NIL:
return await _arender(expr[2], env, ctx)
return await _arender(expr[3], env, ctx) if len(expr) > 3 else ""
async def _arsf_when(expr, env, ctx):
cond = await async_eval(expr[1], env, ctx)
if cond and cond is not NIL:
return "".join([await _arender(b, env, ctx) for b in expr[2:]])
return ""
async def _arsf_cond(expr, env, ctx):
clauses = expr[1:]
if not clauses:
return ""
if isinstance(clauses[0], list) and len(clauses[0]) == 2:
for clause in clauses:
test = clause[0]
if isinstance(test, Symbol) and test.name in ("else", ":else"):
return await _arender(clause[1], env, ctx)
if isinstance(test, Keyword) and test.name == "else":
return await _arender(clause[1], env, ctx)
if await async_eval(test, env, ctx):
return await _arender(clause[1], env, ctx)
else:
i = 0
while i < len(clauses) - 1:
test, result = clauses[i], clauses[i + 1]
if isinstance(test, Keyword) and test.name == "else":
return await _arender(result, env, ctx)
if isinstance(test, Symbol) and test.name in (":else", "else"):
return await _arender(result, env, ctx)
if await async_eval(test, env, ctx):
return await _arender(result, env, ctx)
i += 2
return ""
async def _arsf_let(expr, env, ctx):
bindings = expr[1]
local = dict(env)
if isinstance(bindings, list):
if bindings and isinstance(bindings[0], list):
for b in bindings:
var = b[0]
vname = var.name if isinstance(var, Symbol) else var
local[vname] = await async_eval(b[1], local, ctx)
elif len(bindings) % 2 == 0:
for i in range(0, len(bindings), 2):
var = bindings[i]
vname = var.name if isinstance(var, Symbol) else var
local[vname] = await async_eval(bindings[i + 1], local, ctx)
return "".join([await _arender(b, local, ctx) for b in expr[2:]])
async def _arsf_begin(expr, env, ctx):
return "".join([await _arender(sub, env, ctx) for sub in expr[1:]])
async def _arsf_define(expr, env, ctx):
await async_eval(expr, env, ctx)
return ""
async def _arsf_map(expr, env, ctx):
fn = await async_eval(expr[1], env, ctx)
coll = await async_eval(expr[2], env, ctx)
parts = []
for item in coll:
if isinstance(fn, Lambda):
parts.append(await _arender_lambda(fn, (item,), env, ctx))
elif callable(fn):
r = fn(item)
if inspect.iscoroutine(r):
r = await r
parts.append(await _arender(r, env, ctx))
else:
parts.append(await _arender(item, env, ctx))
return "".join(parts)
async def _arsf_map_indexed(expr, env, ctx):
fn = await async_eval(expr[1], env, ctx)
coll = await async_eval(expr[2], env, ctx)
parts = []
for i, item in enumerate(coll):
if isinstance(fn, Lambda):
parts.append(await _arender_lambda(fn, (i, item), env, ctx))
elif callable(fn):
r = fn(i, item)
if inspect.iscoroutine(r):
r = await r
parts.append(await _arender(r, env, ctx))
else:
parts.append(await _arender(item, env, ctx))
return "".join(parts)
async def _arsf_filter(expr, env, ctx):
result = await async_eval(expr, env, ctx)
return await _arender(result, env, ctx)
async def _arsf_for_each(expr, env, ctx):
fn = await async_eval(expr[1], env, ctx)
coll = await async_eval(expr[2], env, ctx)
parts = []
for item in coll:
if isinstance(fn, Lambda):
parts.append(await _arender_lambda(fn, (item,), env, ctx))
elif callable(fn):
r = fn(item)
if inspect.iscoroutine(r):
r = await r
parts.append(await _arender(r, env, ctx))
else:
parts.append(await _arender(item, env, ctx))
return "".join(parts)
_ASYNC_RENDER_FORMS = {
"if": _arsf_if,
"when": _arsf_when,
"cond": _arsf_cond,
"let": _arsf_let,
"let*": _arsf_let,
"begin": _arsf_begin,
"do": _arsf_begin,
"define": _arsf_define,
"defstyle": _arsf_define,
"defkeyframes": _arsf_define,
"defcomp": _arsf_define,
"defmacro": _arsf_define,
"defhandler": _arsf_define,
"map": _arsf_map,
"map-indexed": _arsf_map_indexed,
"filter": _arsf_filter,
"for-each": _arsf_for_each,
}
# ---------------------------------------------------------------------------
# Async SX wire format (aser)
# ---------------------------------------------------------------------------
async def async_eval_to_sx(expr, env, ctx=None):
"""Evaluate and produce SX source string (wire format)."""
if ctx is None:
ctx = RequestContext()
result = await _aser(expr, env, ctx)
if isinstance(result, SxExpr):
return result
if result is None or result is NIL:
return SxExpr("")
if isinstance(result, str):
return SxExpr(result)
return SxExpr(serialize(result))
async def async_eval_slot_to_sx(expr, env, ctx=None):
"""Like async_eval_to_sx but expands component calls server-side."""
if ctx is None:
ctx = RequestContext()
token = _expand_components.set(True)
try:
return await _eval_slot_inner(expr, env, ctx)
finally:
_expand_components.reset(token)
async def _eval_slot_inner(expr, env, ctx):
if isinstance(expr, list) and expr:
head = expr[0]
if isinstance(head, Symbol) and head.name.startswith("~"):
comp = env.get(head.name)
if isinstance(comp, Component):
result = await _aser_component(comp, expr[1:], env, ctx)
if isinstance(result, SxExpr):
return result
if result is None or result is NIL:
return SxExpr("")
if isinstance(result, str):
return SxExpr(result)
return SxExpr(serialize(result))
result = await _aser(expr, env, ctx)
result = await _maybe_expand_component_result(result, env, ctx)
if isinstance(result, SxExpr):
return result
if result is None or result is NIL:
return SxExpr("")
if isinstance(result, str):
return SxExpr(result)
return SxExpr(serialize(result))
async def _maybe_expand_component_result(result, env, ctx):
raw = None
if isinstance(result, SxExpr):
raw = str(result).strip()
elif isinstance(result, str):
raw = result.strip()
if raw and raw.startswith("(~"):
from ..parser import parse_all
parsed = parse_all(raw)
if parsed:
return await async_eval_slot_to_sx(parsed[0], env, ctx)
return result
async def _aser(expr, env, ctx):
"""Evaluate for SX wire format — serialize rendering forms, evaluate control flow."""
if isinstance(expr, (int, float, bool)):
return expr
if isinstance(expr, SxExpr):
return expr
if isinstance(expr, str):
return expr
if expr is None or expr is NIL:
return NIL
if isinstance(expr, Symbol):
name = expr.name
if name in env:
return env[name]
if sx_ref.is_primitive(name):
return sx_ref.get_primitive(name)
if name == "true":
return True
if name == "false":
return False
if name == "nil":
return NIL
raise EvalError(f"Undefined symbol: {name}")
if isinstance(expr, Keyword):
return expr.name
if isinstance(expr, dict):
return {k: await _aser(v, env, ctx) for k, v in expr.items()}
if not isinstance(expr, list):
return expr
if not expr:
return []
head = expr[0]
if not isinstance(head, (Symbol, Lambda, list)):
return [await _aser(x, env, ctx) for x in expr]
if isinstance(head, Symbol):
name = head.name
# I/O primitives
if name in IO_PRIMITIVES:
args, kwargs = await _parse_io_args(expr[1:], env, ctx)
return await execute_io(name, args, kwargs, ctx)
# Fragment
if name == "<>":
return await _aser_fragment(expr[1:], env, ctx)
# raw!
if name == "raw!":
return await _aser_call("raw!", expr[1:], env, ctx)
# html: prefix
if name.startswith("html:"):
return await _aser_call(name[5:], expr[1:], env, ctx)
# Component call
if name.startswith("~"):
val = env.get(name)
if isinstance(val, Macro):
expanded = sx_ref.trampoline(
sx_ref.expand_macro(val, expr[1:], env)
)
return await _aser(expanded, env, ctx)
if isinstance(val, Component) and _expand_components.get():
return await _aser_component(val, expr[1:], env, ctx)
return await _aser_call(name, expr[1:], env, ctx)
# Serialize-mode special/HO forms
sf = _ASER_FORMS.get(name)
if sf is not None:
if name in HTML_TAGS and (
(len(expr) > 1 and isinstance(expr[1], Keyword))
or _svg_context.get(False)
):
return await _aser_call(name, expr[1:], env, ctx)
return await sf(expr, env, ctx)
# HTML tag
if name in HTML_TAGS:
return await _aser_call(name, expr[1:], env, ctx)
# Macro
if name in env:
val = env[name]
if isinstance(val, Macro):
expanded = sx_ref.trampoline(
sx_ref.expand_macro(val, expr[1:], env)
)
return await _aser(expanded, env, ctx)
# Custom element
if "-" in name and len(expr) > 1 and isinstance(expr[1], Keyword):
return await _aser_call(name, expr[1:], env, ctx)
# SVG context
if _svg_context.get(False):
return await _aser_call(name, expr[1:], env, ctx)
# Function/lambda call
fn = await async_eval(head, env, ctx)
args = [await async_eval(a, env, ctx) for a in expr[1:]]
if callable(fn) and not isinstance(fn, (Lambda, Component)):
result = fn(*args)
if inspect.iscoroutine(result):
return await result
return result
if isinstance(fn, Lambda):
local = dict(fn.closure)
local.update(env)
for p, v in zip(fn.params, args):
local[p] = v
return await _aser(fn.body, local, ctx)
if isinstance(fn, Component):
return await _aser_call(f"~{fn.name}", expr[1:], env, ctx)
raise EvalError(f"Not callable: {fn!r}")
async def _aser_fragment(children, env, ctx):
parts = []
for child in children:
result = await _aser(child, env, ctx)
if isinstance(result, list):
for item in result:
if item is not NIL and item is not None:
parts.append(serialize(item))
elif result is not NIL and result is not None:
parts.append(serialize(result))
if not parts:
return SxExpr("")
return SxExpr("(<> " + " ".join(parts) + ")")
async def _aser_component(comp, args, env, ctx):
kwargs = {}
children = []
i = 0
while i < len(args):
arg = args[i]
if isinstance(arg, Keyword) and i + 1 < len(args):
kwargs[arg.name] = await _aser(args[i + 1], env, ctx)
i += 2
else:
children.append(arg)
i += 1
local = dict(comp.closure)
local.update(env)
for p in comp.params:
local[p] = kwargs.get(p, NIL)
if comp.has_children:
child_parts = [serialize(await _aser(c, env, ctx)) for c in children]
local["children"] = SxExpr("(<> " + " ".join(child_parts) + ")")
return await _aser(comp.body, local, ctx)
async def _aser_call(name, args, env, ctx):
token = None
if name in ("svg", "math"):
token = _svg_context.set(True)
try:
parts = [name]
extra_class = None
i = 0
while i < len(args):
arg = args[i]
if isinstance(arg, Keyword) and i + 1 < len(args):
val = await _aser(args[i + 1], env, ctx)
if val is not NIL and val is not None:
if arg.name == "style" and isinstance(val, StyleValue):
from ..css_registry import register_generated_rule
register_generated_rule(val)
extra_class = val.class_name
else:
parts.append(f":{arg.name}")
if isinstance(val, list):
live = [v for v in val if v is not NIL and v is not None]
items = [serialize(v) for v in live]
if not items:
parts.append("nil")
elif any(isinstance(v, SxExpr) for v in live):
parts.append("(<> " + " ".join(items) + ")")
else:
parts.append("(list " + " ".join(items) + ")")
else:
parts.append(serialize(val))
i += 2
else:
result = await _aser(arg, env, ctx)
if result is not NIL and result is not None:
if isinstance(result, list):
for item in result:
if item is not NIL and item is not None:
parts.append(serialize(item))
else:
parts.append(serialize(result))
i += 1
if extra_class:
_merge_class_into_parts(parts, extra_class)
return SxExpr("(" + " ".join(parts) + ")")
finally:
if token is not None:
_svg_context.reset(token)
def _merge_class_into_parts(parts, class_name):
for i, p in enumerate(parts):
if p == ":class" and i + 1 < len(parts):
existing = parts[i + 1]
if existing.startswith('"') and existing.endswith('"'):
parts[i + 1] = existing[:-1] + " " + class_name + '"'
else:
parts[i + 1] = f'(str {existing} " {class_name}")'
return
parts.insert(1, f'"{class_name}"')
parts.insert(1, ":class")
# ---------------------------------------------------------------------------
# Aser-mode special forms
# ---------------------------------------------------------------------------
async def _assf_if(expr, env, ctx):
cond = await async_eval(expr[1], env, ctx)
if cond and cond is not NIL:
return await _aser(expr[2], env, ctx)
return await _aser(expr[3], env, ctx) if len(expr) > 3 else NIL
async def _assf_when(expr, env, ctx):
cond = await async_eval(expr[1], env, ctx)
if cond and cond is not NIL:
result = NIL
for body_expr in expr[2:]:
result = await _aser(body_expr, env, ctx)
return result
return NIL
async def _assf_let(expr, env, ctx):
bindings = expr[1]
local = dict(env)
if isinstance(bindings, list):
if bindings and isinstance(bindings[0], list):
for b in bindings:
var = b[0]
vname = var.name if isinstance(var, Symbol) else var
local[vname] = await _aser(b[1], local, ctx)
elif len(bindings) % 2 == 0:
for i in range(0, len(bindings), 2):
var = bindings[i]
vname = var.name if isinstance(var, Symbol) else var
local[vname] = await _aser(bindings[i + 1], local, ctx)
result = NIL
for body_expr in expr[2:]:
result = await _aser(body_expr, local, ctx)
return result
async def _assf_cond(expr, env, ctx):
clauses = expr[1:]
if not clauses:
return NIL
if isinstance(clauses[0], list) and len(clauses[0]) == 2:
for clause in clauses:
test = clause[0]
if isinstance(test, Symbol) and test.name in ("else", ":else"):
return await _aser(clause[1], env, ctx)
if isinstance(test, Keyword) and test.name == "else":
return await _aser(clause[1], env, ctx)
if await async_eval(test, env, ctx):
return await _aser(clause[1], env, ctx)
else:
i = 0
while i < len(clauses) - 1:
test, result = clauses[i], clauses[i + 1]
if isinstance(test, Keyword) and test.name == "else":
return await _aser(result, env, ctx)
if isinstance(test, Symbol) and test.name in (":else", "else"):
return await _aser(result, env, ctx)
if await async_eval(test, env, ctx):
return await _aser(result, env, ctx)
i += 2
return NIL
async def _assf_case(expr, env, ctx):
match_val = await async_eval(expr[1], env, ctx)
clauses = expr[2:]
i = 0
while i < len(clauses) - 1:
test, result = clauses[i], clauses[i + 1]
if isinstance(test, Keyword) and test.name == "else":
return await _aser(result, env, ctx)
if isinstance(test, Symbol) and test.name in (":else", "else"):
return await _aser(result, env, ctx)
if match_val == await async_eval(test, env, ctx):
return await _aser(result, env, ctx)
i += 2
return NIL
async def _assf_begin(expr, env, ctx):
result = NIL
for sub in expr[1:]:
result = await _aser(sub, env, ctx)
return result
async def _assf_define(expr, env, ctx):
await async_eval(expr, env, ctx)
return NIL
async def _assf_and(expr, env, ctx):
result = True
for arg in expr[1:]:
result = await async_eval(arg, env, ctx)
if not result:
return result
return result
async def _assf_or(expr, env, ctx):
result = False
for arg in expr[1:]:
result = await async_eval(arg, env, ctx)
if result:
return result
return result
async def _assf_lambda(expr, env, ctx):
params_expr = expr[1]
param_names = []
for p in params_expr:
if isinstance(p, Symbol):
param_names.append(p.name)
elif isinstance(p, str):
param_names.append(p)
return Lambda(param_names, expr[2], dict(env))
async def _assf_quote(expr, env, ctx):
return expr[1] if len(expr) > 1 else NIL
async def _assf_thread_first(expr, env, ctx):
result = await async_eval(expr[1], env, ctx)
for form in expr[2:]:
if isinstance(form, list):
fn = await async_eval(form[0], env, ctx)
fn_args = [result] + [await async_eval(a, env, ctx) for a in form[1:]]
else:
fn = await async_eval(form, env, ctx)
fn_args = [result]
if callable(fn) and not isinstance(fn, (Lambda, Component)):
result = fn(*fn_args)
if inspect.iscoroutine(result):
result = await result
elif isinstance(fn, Lambda):
local = dict(fn.closure)
local.update(env)
for p, v in zip(fn.params, fn_args):
local[p] = v
result = await async_eval(fn.body, local, ctx)
else:
raise EvalError(f"-> form not callable: {fn!r}")
return result
async def _assf_set_bang(expr, env, ctx):
value = await async_eval(expr[2], env, ctx)
env[expr[1].name] = value
return value
# Aser-mode HO forms
async def _asho_map(expr, env, ctx):
fn = await async_eval(expr[1], env, ctx)
coll = await async_eval(expr[2], env, ctx)
results = []
for item in coll:
if isinstance(fn, Lambda):
local = dict(fn.closure)
local.update(env)
local[fn.params[0]] = item
results.append(await _aser(fn.body, local, ctx))
elif callable(fn):
r = fn(item)
results.append(await r if inspect.iscoroutine(r) else r)
else:
raise EvalError(f"map requires callable, got {type(fn).__name__}")
return results
async def _asho_map_indexed(expr, env, ctx):
fn = await async_eval(expr[1], env, ctx)
coll = await async_eval(expr[2], env, ctx)
results = []
for i, item in enumerate(coll):
if isinstance(fn, Lambda):
local = dict(fn.closure)
local.update(env)
local[fn.params[0]] = i
local[fn.params[1]] = item
results.append(await _aser(fn.body, local, ctx))
elif callable(fn):
r = fn(i, item)
results.append(await r if inspect.iscoroutine(r) else r)
else:
raise EvalError(f"map-indexed requires callable, got {type(fn).__name__}")
return results
async def _asho_filter(expr, env, ctx):
return await async_eval(expr, env, ctx)
async def _asho_for_each(expr, env, ctx):
fn = await async_eval(expr[1], env, ctx)
coll = await async_eval(expr[2], env, ctx)
results = []
for item in coll:
if isinstance(fn, Lambda):
local = dict(fn.closure)
local.update(env)
local[fn.params[0]] = item
results.append(await _aser(fn.body, local, ctx))
elif callable(fn):
r = fn(item)
results.append(await r if inspect.iscoroutine(r) else r)
return results
_ASER_FORMS = {
"if": _assf_if,
"when": _assf_when,
"cond": _assf_cond,
"case": _assf_case,
"and": _assf_and,
"or": _assf_or,
"let": _assf_let,
"let*": _assf_let,
"lambda": _assf_lambda,
"fn": _assf_lambda,
"define": _assf_define,
"defstyle": _assf_define,
"defkeyframes": _assf_define,
"defcomp": _assf_define,
"defmacro": _assf_define,
"defhandler": _assf_define,
"begin": _assf_begin,
"do": _assf_begin,
"quote": _assf_quote,
"->": _assf_thread_first,
"set!": _assf_set_bang,
"map": _asho_map,
"map-indexed": _asho_map_indexed,
"filter": _asho_filter,
"for-each": _asho_for_each,
}

File diff suppressed because it is too large Load Diff

1089
shared/sx/ref/sx_ref.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,336 @@
"""Tests for the transpiled sx_ref.py evaluator.
Runs the same test cases as test_evaluator.py and test_html.py but
against the bootstrap-compiled evaluator to verify correctness.
"""
import pytest
from shared.sx.parser import parse
from shared.sx.types import Symbol, Keyword, NIL, Lambda, Component, Macro
from shared.sx.ref import sx_ref
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def ev(text, env=None):
"""Parse and evaluate a single expression via sx_ref."""
return sx_ref.evaluate(parse(text), env)
def render(text, env=None):
"""Parse and render via sx_ref."""
return sx_ref.render(parse(text), env)
# ---------------------------------------------------------------------------
# Literals and lookups
# ---------------------------------------------------------------------------
class TestLiterals:
def test_int(self):
assert ev("42") == 42
def test_string(self):
assert ev('"hello"') == "hello"
def test_true(self):
assert ev("true") is True
def test_nil(self):
assert ev("nil") is NIL
def test_symbol_lookup(self):
assert ev("x", {"x": 10}) == 10
def test_undefined_symbol(self):
with pytest.raises(sx_ref.EvalError, match="Undefined symbol"):
ev("xyz")
def test_keyword_evaluates_to_name(self):
assert ev(":foo") == "foo"
# ---------------------------------------------------------------------------
# Arithmetic
# ---------------------------------------------------------------------------
class TestArithmetic:
def test_add(self):
assert ev("(+ 1 2 3)") == 6
def test_sub(self):
assert ev("(- 10 3)") == 7
def test_mul(self):
assert ev("(* 2 3 4)") == 24
def test_div(self):
assert ev("(/ 10 4)") == 2.5
def test_mod(self):
assert ev("(mod 7 3)") == 1
# ---------------------------------------------------------------------------
# Special forms
# ---------------------------------------------------------------------------
class TestSpecialForms:
def test_if_true(self):
assert ev("(if true 1 2)") == 1
def test_if_false(self):
assert ev("(if false 1 2)") == 2
def test_if_no_else(self):
assert ev("(if false 1)") is NIL
def test_when_true(self):
assert ev("(when true 42)") == 42
def test_when_false(self):
assert ev("(when false 42)") is NIL
def test_and_short_circuit(self):
assert ev("(and true true 3)") == 3
assert ev("(and true false 3)") is False
def test_or_short_circuit(self):
assert ev("(or false false 3)") == 3
assert ev("(or false 2 3)") == 2
def test_let_scheme_style(self):
assert ev("(let ((x 10) (y 20)) (+ x y))") == 30
def test_let_clojure_style(self):
assert ev("(let (x 10 y 20) (+ x y))") == 30
def test_let_sequential(self):
assert ev("(let ((x 1) (y (+ x 1))) y)") == 2
def test_begin(self):
assert ev("(begin 1 2 3)") == 3
def test_quote(self):
result = ev("(quote (a b c))")
assert result == [Symbol("a"), Symbol("b"), Symbol("c")]
def test_cond_clojure(self):
assert ev("(cond false 1 true 2 :else 3)") == 2
def test_cond_else(self):
assert ev("(cond false 1 false 2 :else 99)") == 99
def test_case(self):
assert ev('(case 2 1 "one" 2 "two" :else "other")') == "two"
def test_thread_first(self):
assert ev("(-> 5 (+ 3) (* 2))") == 16
def test_define(self):
env = {}
ev("(define x 42)", env)
assert env["x"] == 42
# ---------------------------------------------------------------------------
# Lambda
# ---------------------------------------------------------------------------
class TestLambda:
def test_create_and_call(self):
assert ev("((fn (x) (* x x)) 5)") == 25
def test_closure(self):
result = ev("(let ((a 10)) ((fn (x) (+ x a)) 5))")
assert result == 15
def test_higher_order(self):
result = ev("(let ((double (fn (x) (* x 2)))) (double 7))")
assert result == 14
# ---------------------------------------------------------------------------
# Collections
# ---------------------------------------------------------------------------
class TestCollections:
def test_list_constructor(self):
assert ev("(list 1 2 3)") == [1, 2, 3]
def test_dict_constructor(self):
assert ev("(dict :a 1 :b 2)") == {"a": 1, "b": 2}
def test_get_dict(self):
assert ev('(get {:a 1 :b 2} "a")') == 1
def test_get_list(self):
assert ev("(get (list 10 20 30) 1)") == 20
def test_first_last_rest(self):
assert ev("(first (list 1 2 3))") == 1
assert ev("(last (list 1 2 3))") == 3
assert ev("(rest (list 1 2 3))") == [2, 3]
def test_len(self):
assert ev("(len (list 1 2 3))") == 3
def test_concat(self):
assert ev("(concat (list 1 2) (list 3 4))") == [1, 2, 3, 4]
def test_cons(self):
assert ev("(cons 0 (list 1 2))") == [0, 1, 2]
def test_merge(self):
assert ev("(merge {:a 1} {:b 2} {:a 3})") == {"a": 3, "b": 2}
def test_empty(self):
assert ev("(empty? (list))") is True
assert ev("(empty? (list 1))") is False
assert ev("(empty? nil)") is True
# ---------------------------------------------------------------------------
# Higher-order forms
# ---------------------------------------------------------------------------
class TestHigherOrder:
def test_map(self):
assert ev("(map (fn (x) (* x x)) (list 1 2 3 4))") == [1, 4, 9, 16]
def test_filter(self):
assert ev("(filter (fn (x) (> x 2)) (list 1 2 3 4))") == [3, 4]
def test_reduce(self):
assert ev("(reduce (fn (acc x) (+ acc x)) 0 (list 1 2 3))") == 6
def test_some(self):
assert ev("(some (fn (x) (> x 3)) (list 1 2 3 4 5))") is True
def test_for_each(self):
result = ev("(for-each (fn (x) x) (list 1 2 3))")
assert result is NIL
# ---------------------------------------------------------------------------
# String ops
# ---------------------------------------------------------------------------
class TestStrings:
def test_str(self):
assert ev('(str "hello" " " "world")') == "hello world"
def test_upper_lower(self):
assert ev('(upper "hello")') == "HELLO"
assert ev('(lower "HELLO")') == "hello"
def test_split_join(self):
assert ev('(split "a,b,c" ",")') == ["a", "b", "c"]
assert ev('(join "-" (list "a" "b"))') == "a-b"
def test_starts_ends(self):
assert ev('(starts-with? "hello" "hel")') is True
assert ev('(ends-with? "hello" "llo")') is True
# ---------------------------------------------------------------------------
# Components
# ---------------------------------------------------------------------------
class TestComponents:
def test_defcomp_and_render(self):
env = {}
ev("(defcomp ~box (&key title) (div :class \"box\" title))", env)
result = render("(~box :title \"hi\")", env)
assert result == '<div class="box">hi</div>'
def test_defcomp_with_children(self):
env = {}
ev("(defcomp ~wrap (&rest children) (div children))", env)
result = render('(~wrap (span "a") (span "b"))', env)
assert result == '<div><span>a</span><span>b</span></div>'
# ---------------------------------------------------------------------------
# HTML rendering
# ---------------------------------------------------------------------------
class TestHTMLRendering:
def test_basic_element(self):
assert render("(div)") == "<div></div>"
def test_text_content(self):
assert render('(p "hello")') == "<p>hello</p>"
def test_attributes(self):
result = render('(a :href "/about" "link")')
assert result == '<a href="/about">link</a>'
def test_void_element(self):
result = render('(br)')
assert result == "<br />"
def test_nested(self):
result = render('(div (p "a") (p "b"))')
assert result == "<div><p>a</p><p>b</p></div>"
def test_fragment(self):
result = render('(<> (span "a") (span "b"))')
assert result == "<span>a</span><span>b</span>"
def test_conditional_rendering(self):
result = render('(if true (span "yes") (span "no"))')
assert result == "<span>yes</span>"
def test_map_rendering(self):
result = render('(map (fn (x) (li x)) (list "a" "b"))')
assert result == "<li>a</li><li>b</li>"
def test_html_escaping(self):
result = render('(span "<b>bold</b>")')
assert result == "<span>&lt;b&gt;bold&lt;/b&gt;</span>"
# ---------------------------------------------------------------------------
# Aser (SX wire format)
# ---------------------------------------------------------------------------
class TestAser:
def test_render_to_sx_basic(self):
expr = parse("(div :class \"foo\" \"hello\")")
result = sx_ref.render_to_sx(expr, {})
assert result == '(div :class "foo" "hello")'
def test_component_not_expanded(self):
expr = parse('(~card :title "hi")')
result = sx_ref.render_to_sx(expr, {})
assert result == '(~card :title "hi")'
def test_fragment(self):
expr = parse('(<> "a" "b")')
result = sx_ref.render_to_sx(expr, {})
assert result == '(<> "a" "b")'
def test_let_evaluates(self):
expr = parse('(let ((x 5)) x)')
result = sx_ref.render_to_sx(expr, {})
assert result == "5"
def test_if_evaluates(self):
expr = parse('(if true "yes" "no")')
result = sx_ref.render_to_sx(expr, {})
assert result == 'yes' # strings pass through unserialized
# ---------------------------------------------------------------------------
# Macros
# ---------------------------------------------------------------------------
class TestMacros:
def test_defmacro_and_expand(self):
env = {}
ev("(defmacro unless (test body) (list (quote if) (list (quote not) test) body))", env)
result = ev('(unless false "ran")', env)
assert result == "ran"