Bootstrap shift/reset to both Python and JS targets. The implementation uses exception-based capture with re-evaluation: reset wraps in try/catch for ShiftSignal, shift raises to the nearest reset, and continuation invocation pushes a resume value and re-evaluates the body. - Add Continuation type and _ShiftSignal to shared/sx/types.py - Add sf_reset/sf_shift to hand-written evaluator.py - Add async versions to async_eval.py - Add shift/reset dispatch to eval.sx spec - Bootstrap to Python: FIXUPS_PY with sf_reset/sf_shift, regenerate sx_ref.py - Bootstrap to JS: Continuation/ShiftSignal types, sfReset/sfShift in fixups - Add continuation? primitive to both bootstrappers and primitives.sx - Allow callables (including Continuation) in hand-written HO map - 44 unit tests (22 per evaluator) covering: passthrough, abort, invoke, double invoke, predicate, stored continuation, nested reset, practical patterns - Update continuations essay to reflect implemented status with examples Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1044 lines
33 KiB
Python
1044 lines
33 KiB
Python
"""
|
|
S-expression evaluator.
|
|
|
|
Walks a parsed s-expression tree and evaluates it in an environment.
|
|
|
|
Special forms:
|
|
(if cond then else?)
|
|
(when cond body)
|
|
(cond clause...) — Scheme-style ((test body)...) or Clojure-style (test body...)
|
|
(case expr val body... :else default)
|
|
(and expr...) (or expr...)
|
|
(let ((name val)...) body) or (let (name val name val...) body)
|
|
(lambda (params...) body) or (fn (params...) body)
|
|
(define name value)
|
|
(defcomp ~name (&key param...) body)
|
|
(defrelation :name :from "type" :to "type" :cardinality :card ...)
|
|
(begin expr...)
|
|
(quote expr)
|
|
(do expr...) — alias for begin
|
|
(-> val form...) — thread-first macro
|
|
|
|
Higher-order forms (operate on lambdas):
|
|
(map fn coll)
|
|
(map-indexed fn coll)
|
|
(filter fn coll)
|
|
(reduce fn init coll)
|
|
(some fn coll)
|
|
(every? fn coll)
|
|
(for-each fn coll)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from .types import Component, Continuation, HandlerDef, Keyword, Lambda, Macro, NIL, PageDef, RelationDef, Symbol, _ShiftSignal
|
|
from .primitives import _PRIMITIVES
|
|
|
|
|
|
class EvalError(Exception):
|
|
"""Error during expression evaluation."""
|
|
pass
|
|
|
|
|
|
class _Thunk:
|
|
"""Deferred evaluation — returned from tail positions for TCO."""
|
|
__slots__ = ("expr", "env")
|
|
|
|
def __init__(self, expr: Any, env: dict[str, Any]):
|
|
self.expr = expr
|
|
self.env = env
|
|
|
|
|
|
def _trampoline(val: Any) -> Any:
|
|
"""Unwrap thunks by re-entering the evaluator until we get an actual value."""
|
|
while isinstance(val, _Thunk):
|
|
val = _eval(val.expr, val.env)
|
|
return val
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def evaluate(expr: Any, env: dict[str, Any] | None = None) -> Any:
|
|
"""Evaluate *expr* in *env* and return the result."""
|
|
if env is None:
|
|
env = {}
|
|
result = _eval(expr, env)
|
|
while isinstance(result, _Thunk):
|
|
result = _eval(result.expr, result.env)
|
|
return result
|
|
|
|
|
|
def make_env(**kwargs: Any) -> dict[str, Any]:
|
|
"""Convenience: create an environment dict with initial bindings."""
|
|
return dict(kwargs)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Internal evaluator
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _eval(expr: Any, env: dict[str, Any]) -> Any:
|
|
# --- literals ---------------------------------------------------------
|
|
if isinstance(expr, (int, float, str, bool)):
|
|
return expr
|
|
if expr is None or expr is NIL:
|
|
return NIL
|
|
|
|
# --- symbol lookup ----------------------------------------------------
|
|
if isinstance(expr, Symbol):
|
|
name = expr.name
|
|
if name in env:
|
|
return env[name]
|
|
if name in _PRIMITIVES:
|
|
return _PRIMITIVES[name]
|
|
if name == "true":
|
|
return True
|
|
if name == "false":
|
|
return False
|
|
if name == "nil":
|
|
return NIL
|
|
raise EvalError(f"Undefined symbol: {name}")
|
|
|
|
# --- keyword → its string name ----------------------------------------
|
|
if isinstance(expr, Keyword):
|
|
return expr.name
|
|
|
|
# --- dict literal -----------------------------------------------------
|
|
if isinstance(expr, dict):
|
|
return {k: _trampoline(_eval(v, env)) for k, v in expr.items()}
|
|
|
|
# --- list = call or special form --------------------------------------
|
|
if not isinstance(expr, list):
|
|
return expr
|
|
|
|
if not expr:
|
|
return []
|
|
|
|
head = expr[0]
|
|
|
|
# If head is not a symbol/lambda/list, treat entire list as data
|
|
if not isinstance(head, (Symbol, Lambda, list)):
|
|
return [_trampoline(_eval(x, env)) for x in expr]
|
|
|
|
# --- special forms ----------------------------------------------------
|
|
if isinstance(head, Symbol):
|
|
name = head.name
|
|
handler = _SPECIAL_FORMS.get(name)
|
|
if handler is not None:
|
|
return handler(expr, env)
|
|
|
|
# Higher-order forms (need lazy eval of lambda arg)
|
|
ho = _HO_FORMS.get(name)
|
|
if ho is not None:
|
|
return ho(expr, env)
|
|
|
|
# Macro expansion — if head resolves to a Macro, expand then eval
|
|
if name in env:
|
|
val = env[name]
|
|
if isinstance(val, Macro):
|
|
expanded = _expand_macro(val, expr[1:], env)
|
|
return _Thunk(expanded, env)
|
|
|
|
# --- function / lambda call -------------------------------------------
|
|
fn = _trampoline(_eval(head, env))
|
|
args = [_trampoline(_eval(a, env)) for a in expr[1:]]
|
|
|
|
if callable(fn) and not isinstance(fn, (Lambda, Component)):
|
|
return fn(*args)
|
|
|
|
if isinstance(fn, Lambda):
|
|
return _call_lambda(fn, args, env)
|
|
|
|
if isinstance(fn, Component):
|
|
return _call_component(fn, expr[1:], env)
|
|
|
|
raise EvalError(f"Not callable: {fn!r}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Lambda / component invocation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _call_lambda(fn: Lambda, args: list[Any], caller_env: dict[str, Any]) -> Any:
|
|
if len(args) != len(fn.params):
|
|
raise EvalError(f"{fn!r} expects {len(fn.params)} args, got {len(args)}")
|
|
local = dict(fn.closure)
|
|
local.update(caller_env)
|
|
for p, v in zip(fn.params, args):
|
|
local[p] = v
|
|
return _Thunk(fn.body, local)
|
|
|
|
|
|
def _call_component(comp: Component, raw_args: list[Any], env: dict[str, Any]) -> Any:
|
|
"""Evaluate a component invocation with keyword arguments.
|
|
|
|
``(~card :title "Hello" (p "child"))``
|
|
→ comp.params gets ``title="Hello"``, comp children gets ``[(p "child")]``
|
|
"""
|
|
kwargs: dict[str, Any] = {}
|
|
children: list[Any] = []
|
|
i = 0
|
|
while i < len(raw_args):
|
|
arg = raw_args[i]
|
|
if isinstance(arg, Keyword) and i + 1 < len(raw_args):
|
|
kwargs[arg.name] = _trampoline(_eval(raw_args[i + 1], env))
|
|
i += 2
|
|
else:
|
|
children.append(_trampoline(_eval(arg, env)))
|
|
i += 1
|
|
|
|
local = dict(comp.closure)
|
|
local.update(env)
|
|
for p in comp.params:
|
|
if p in kwargs:
|
|
local[p] = kwargs[p]
|
|
else:
|
|
local[p] = NIL
|
|
if comp.has_children:
|
|
local["children"] = children
|
|
return _Thunk(comp.body, local)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Special forms
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _sf_if(expr: list, env: dict) -> Any:
|
|
if len(expr) < 3:
|
|
raise EvalError("if requires condition and then-branch")
|
|
cond = _trampoline(_eval(expr[1], env))
|
|
if cond and cond is not NIL:
|
|
return _Thunk(expr[2], env)
|
|
if len(expr) > 3:
|
|
return _Thunk(expr[3], env)
|
|
return NIL
|
|
|
|
|
|
def _sf_when(expr: list, env: dict) -> Any:
|
|
if len(expr) < 3:
|
|
raise EvalError("when requires condition and body")
|
|
cond = _trampoline(_eval(expr[1], env))
|
|
if cond and cond is not NIL:
|
|
for body_expr in expr[2:-1]:
|
|
_trampoline(_eval(body_expr, env))
|
|
return _Thunk(expr[-1], env)
|
|
return NIL
|
|
|
|
|
|
def _sf_cond(expr: list, env: dict) -> Any:
|
|
clauses = expr[1:]
|
|
if not clauses:
|
|
return NIL
|
|
# Detect scheme-style: first clause is a 2-element list that isn't a comparison
|
|
if (
|
|
isinstance(clauses[0], list)
|
|
and len(clauses[0]) == 2
|
|
and not (isinstance(clauses[0][0], Symbol) and clauses[0][0].name in (
|
|
"=", "<", ">", "<=", ">=", "!=", "and", "or",
|
|
))
|
|
):
|
|
for clause in clauses:
|
|
if not isinstance(clause, list) or len(clause) < 2:
|
|
raise EvalError("cond clause must be (test result)")
|
|
test = clause[0]
|
|
if isinstance(test, Symbol) and test.name in ("else", ":else"):
|
|
return _Thunk(clause[1], env)
|
|
if isinstance(test, Keyword) and test.name == "else":
|
|
return _Thunk(clause[1], env)
|
|
if _trampoline(_eval(test, env)):
|
|
return _Thunk(clause[1], env)
|
|
else:
|
|
i = 0
|
|
while i < len(clauses) - 1:
|
|
test = clauses[i]
|
|
result = clauses[i + 1]
|
|
if isinstance(test, Keyword) and test.name == "else":
|
|
return _Thunk(result, env)
|
|
if isinstance(test, Symbol) and test.name in (":else", "else"):
|
|
return _Thunk(result, env)
|
|
if _trampoline(_eval(test, env)):
|
|
return _Thunk(result, env)
|
|
i += 2
|
|
return NIL
|
|
|
|
|
|
def _sf_case(expr: list, env: dict) -> Any:
|
|
if len(expr) < 2:
|
|
raise EvalError("case requires expression to match")
|
|
match_val = _trampoline(_eval(expr[1], env))
|
|
clauses = expr[2:]
|
|
i = 0
|
|
while i < len(clauses) - 1:
|
|
test = clauses[i]
|
|
result = clauses[i + 1]
|
|
if isinstance(test, Keyword) and test.name == "else":
|
|
return _Thunk(result, env)
|
|
if isinstance(test, Symbol) and test.name in (":else", "else"):
|
|
return _Thunk(result, env)
|
|
if match_val == _trampoline(_eval(test, env)):
|
|
return _Thunk(result, env)
|
|
i += 2
|
|
return NIL
|
|
|
|
|
|
def _sf_and(expr: list, env: dict) -> Any:
|
|
result: Any = True
|
|
for arg in expr[1:]:
|
|
result = _trampoline(_eval(arg, env))
|
|
if not result:
|
|
return result
|
|
return result
|
|
|
|
|
|
def _sf_or(expr: list, env: dict) -> Any:
|
|
result: Any = False
|
|
for arg in expr[1:]:
|
|
result = _trampoline(_eval(arg, env))
|
|
if result:
|
|
return result
|
|
return result
|
|
|
|
|
|
def _sf_let(expr: list, env: dict) -> Any:
|
|
if len(expr) < 3:
|
|
raise EvalError("let requires bindings and body")
|
|
bindings = expr[1]
|
|
local = dict(env)
|
|
|
|
if isinstance(bindings, list):
|
|
if bindings and isinstance(bindings[0], list):
|
|
# Scheme-style: ((name val) ...)
|
|
for binding in bindings:
|
|
if len(binding) != 2:
|
|
raise EvalError("let binding must be (name value)")
|
|
var = binding[0]
|
|
vname = var.name if isinstance(var, Symbol) else var
|
|
local[vname] = _trampoline(_eval(binding[1], local))
|
|
elif len(bindings) % 2 == 0:
|
|
# Clojure-style: (name val name val ...)
|
|
for i in range(0, len(bindings), 2):
|
|
var = bindings[i]
|
|
vname = var.name if isinstance(var, Symbol) else var
|
|
local[vname] = _trampoline(_eval(bindings[i + 1], local))
|
|
else:
|
|
raise EvalError("let bindings must be (name val ...) pairs")
|
|
else:
|
|
raise EvalError("let bindings must be a list")
|
|
|
|
# Evaluate body expressions — all but last non-tail, last is tail
|
|
body = expr[2:]
|
|
for body_expr in body[:-1]:
|
|
_trampoline(_eval(body_expr, local))
|
|
return _Thunk(body[-1], local)
|
|
|
|
|
|
def _sf_lambda(expr: list, env: dict) -> Lambda:
|
|
if len(expr) < 3:
|
|
raise EvalError("lambda requires params and body")
|
|
params_expr = expr[1]
|
|
if not isinstance(params_expr, list):
|
|
raise EvalError("lambda params must be a list")
|
|
param_names = []
|
|
for p in params_expr:
|
|
if isinstance(p, Symbol):
|
|
param_names.append(p.name)
|
|
elif isinstance(p, str):
|
|
param_names.append(p)
|
|
else:
|
|
raise EvalError(f"Invalid lambda param: {p}")
|
|
return Lambda(param_names, expr[2], dict(env))
|
|
|
|
|
|
def _sf_define(expr: list, env: dict) -> Any:
|
|
if len(expr) < 3:
|
|
raise EvalError("define requires name and value")
|
|
name_sym = expr[1]
|
|
if not isinstance(name_sym, Symbol):
|
|
raise EvalError(f"define name must be symbol, got {type(name_sym).__name__}")
|
|
value = _trampoline(_eval(expr[2], env))
|
|
if isinstance(value, Lambda) and value.name is None:
|
|
value.name = name_sym.name
|
|
env[name_sym.name] = value
|
|
return value
|
|
|
|
|
|
def _sf_defstyle(expr: list, env: dict) -> Any:
|
|
"""``(defstyle card-base (css :rounded-xl :bg-white :shadow))``
|
|
|
|
Evaluates body → StyleValue, binds to name in env.
|
|
"""
|
|
if len(expr) < 3:
|
|
raise EvalError("defstyle requires name and body")
|
|
name_sym = expr[1]
|
|
if not isinstance(name_sym, Symbol):
|
|
raise EvalError(f"defstyle name must be symbol, got {type(name_sym).__name__}")
|
|
value = _trampoline(_eval(expr[2], env))
|
|
env[name_sym.name] = value
|
|
return value
|
|
|
|
|
|
def _sf_defkeyframes(expr: list, env: dict) -> Any:
|
|
"""``(defkeyframes fade-in (from (css :opacity-0)) (to (css :opacity-100)))``
|
|
|
|
Builds @keyframes rule from steps, registers it, and binds the animation.
|
|
"""
|
|
from .types import StyleValue
|
|
from .css_registry import register_generated_rule
|
|
from .style_dict import KEYFRAMES
|
|
|
|
if len(expr) < 3:
|
|
raise EvalError("defkeyframes requires name and at least one step")
|
|
name_sym = expr[1]
|
|
if not isinstance(name_sym, Symbol):
|
|
raise EvalError(f"defkeyframes name must be symbol, got {type(name_sym).__name__}")
|
|
|
|
kf_name = name_sym.name
|
|
|
|
# Build @keyframes rule from steps
|
|
steps: list[str] = []
|
|
for step_expr in expr[2:]:
|
|
if not isinstance(step_expr, list) or len(step_expr) < 2:
|
|
raise EvalError("defkeyframes step must be (selector (css ...))")
|
|
selector = step_expr[0]
|
|
if isinstance(selector, Symbol):
|
|
selector = selector.name
|
|
else:
|
|
selector = str(selector)
|
|
body = _trampoline(_eval(step_expr[1], env))
|
|
if isinstance(body, StyleValue):
|
|
decls = body.declarations
|
|
elif isinstance(body, str):
|
|
decls = body
|
|
else:
|
|
raise EvalError(f"defkeyframes step body must be css/string, got {type(body).__name__}")
|
|
steps.append(f"{selector}{{{decls}}}")
|
|
|
|
kf_rule = f"@keyframes {kf_name}{{{' '.join(steps)}}}"
|
|
|
|
# Register in KEYFRAMES so animate-{name} works
|
|
KEYFRAMES[kf_name] = kf_rule
|
|
# Clear resolver cache so new keyframes are picked up
|
|
from .style_resolver import _resolve_cached
|
|
_resolve_cached.cache_clear()
|
|
|
|
# Create a StyleValue for the animation property
|
|
import hashlib
|
|
h = hashlib.sha256(kf_rule.encode()).hexdigest()[:6]
|
|
sv = StyleValue(
|
|
class_name=f"sx-{h}",
|
|
declarations=f"animation-name:{kf_name}",
|
|
keyframes=((kf_name, kf_rule),),
|
|
)
|
|
register_generated_rule(sv)
|
|
env[kf_name] = sv
|
|
return sv
|
|
|
|
|
|
def _sf_defcomp(expr: list, env: dict) -> Component:
|
|
"""``(defcomp ~name (&key param1 param2 &rest children) body)``"""
|
|
if len(expr) < 4:
|
|
raise EvalError("defcomp requires name, params, and body")
|
|
name_sym = expr[1]
|
|
if not isinstance(name_sym, Symbol):
|
|
raise EvalError(f"defcomp name must be symbol, got {type(name_sym).__name__}")
|
|
comp_name = name_sym.name.lstrip("~")
|
|
|
|
params_expr = expr[2]
|
|
if not isinstance(params_expr, list):
|
|
raise EvalError("defcomp params must be a list")
|
|
|
|
params: list[str] = []
|
|
has_children = False
|
|
in_key = False
|
|
for p in params_expr:
|
|
if isinstance(p, Symbol):
|
|
if p.name == "&key":
|
|
in_key = True
|
|
continue
|
|
if p.name == "&rest":
|
|
has_children = True
|
|
continue
|
|
if in_key or has_children:
|
|
if not has_children:
|
|
params.append(p.name)
|
|
else:
|
|
params.append(p.name)
|
|
# Skip children param name after &rest
|
|
elif isinstance(p, str):
|
|
params.append(p)
|
|
|
|
comp = Component(
|
|
name=comp_name,
|
|
params=params,
|
|
has_children=has_children,
|
|
body=expr[3],
|
|
closure=dict(env),
|
|
)
|
|
env[name_sym.name] = comp
|
|
return comp
|
|
|
|
|
|
def _sf_begin(expr: list, env: dict) -> Any:
|
|
if len(expr) < 2:
|
|
return NIL
|
|
for sub in expr[1:-1]:
|
|
_trampoline(_eval(sub, env))
|
|
return _Thunk(expr[-1], env)
|
|
|
|
|
|
def _sf_quote(expr: list, _env: dict) -> Any:
|
|
return expr[1] if len(expr) > 1 else NIL
|
|
|
|
|
|
def _sf_thread_first(expr: list, env: dict) -> Any:
|
|
"""``(-> val (f a) (g b))`` → ``(g (f val a) b)``"""
|
|
if len(expr) < 2:
|
|
raise EvalError("-> requires at least a value")
|
|
result = _trampoline(_eval(expr[1], env))
|
|
for form in expr[2:]:
|
|
if isinstance(form, list):
|
|
fn = _trampoline(_eval(form[0], env))
|
|
args = [result] + [_trampoline(_eval(a, env)) for a in form[1:]]
|
|
else:
|
|
fn = _trampoline(_eval(form, env))
|
|
args = [result]
|
|
if callable(fn) and not isinstance(fn, (Lambda, Component)):
|
|
result = fn(*args)
|
|
elif isinstance(fn, Lambda):
|
|
result = _trampoline(_call_lambda(fn, args, env))
|
|
else:
|
|
raise EvalError(f"-> form not callable: {fn!r}")
|
|
return result
|
|
|
|
|
|
def _sf_defmacro(expr: list, env: dict) -> Macro:
|
|
"""``(defmacro name (params... &rest rest) body)``"""
|
|
if len(expr) < 4:
|
|
raise EvalError("defmacro requires name, params, and body")
|
|
name_sym = expr[1]
|
|
if not isinstance(name_sym, Symbol):
|
|
raise EvalError(f"defmacro name must be symbol, got {type(name_sym).__name__}")
|
|
|
|
params_expr = expr[2]
|
|
if not isinstance(params_expr, list):
|
|
raise EvalError("defmacro params must be a list")
|
|
|
|
params: list[str] = []
|
|
rest_param: str | None = None
|
|
i = 0
|
|
while i < len(params_expr):
|
|
p = params_expr[i]
|
|
if isinstance(p, Symbol) and p.name == "&rest":
|
|
if i + 1 < len(params_expr):
|
|
rp = params_expr[i + 1]
|
|
rest_param = rp.name if isinstance(rp, Symbol) else str(rp)
|
|
break
|
|
if isinstance(p, Symbol):
|
|
params.append(p.name)
|
|
elif isinstance(p, str):
|
|
params.append(p)
|
|
i += 1
|
|
|
|
macro = Macro(
|
|
params=params,
|
|
rest_param=rest_param,
|
|
body=expr[3],
|
|
closure=dict(env),
|
|
name=name_sym.name,
|
|
)
|
|
env[name_sym.name] = macro
|
|
return macro
|
|
|
|
|
|
def _sf_quasiquote(expr: list, env: dict) -> Any:
|
|
"""``(quasiquote template)`` — process quasiquote template."""
|
|
if len(expr) < 2:
|
|
raise EvalError("quasiquote requires a template")
|
|
return _qq_expand(expr[1], env)
|
|
|
|
|
|
def _qq_expand(template: Any, env: dict) -> Any:
|
|
"""Walk a quasiquote template, replacing unquote/splice-unquote."""
|
|
if not isinstance(template, list):
|
|
return template
|
|
if not template:
|
|
return []
|
|
# Check for (unquote x) or (splice-unquote x)
|
|
head = template[0]
|
|
if isinstance(head, Symbol):
|
|
if head.name == "unquote":
|
|
if len(template) < 2:
|
|
raise EvalError("unquote requires an expression")
|
|
return _trampoline(_eval(template[1], env))
|
|
if head.name == "splice-unquote":
|
|
raise EvalError("splice-unquote not inside a list")
|
|
# Walk children, handling splice-unquote
|
|
result: list[Any] = []
|
|
for item in template:
|
|
if isinstance(item, list) and len(item) == 2 and isinstance(item[0], Symbol) and item[0].name == "splice-unquote":
|
|
spliced = _trampoline(_eval(item[1], env))
|
|
if isinstance(spliced, list):
|
|
result.extend(spliced)
|
|
elif spliced is not None and spliced is not NIL:
|
|
result.append(spliced)
|
|
else:
|
|
result.append(_qq_expand(item, env))
|
|
return result
|
|
|
|
|
|
def _expand_macro(macro: Macro, raw_args: list[Any], env: dict) -> Any:
|
|
"""Expand a macro: bind unevaluated args, evaluate body to get new AST."""
|
|
local = dict(macro.closure)
|
|
local.update(env)
|
|
|
|
# Bind positional params
|
|
for i, param in enumerate(macro.params):
|
|
if i < len(raw_args):
|
|
local[param] = raw_args[i]
|
|
else:
|
|
local[param] = NIL
|
|
|
|
# Bind &rest param
|
|
if macro.rest_param is not None:
|
|
rest_start = len(macro.params)
|
|
local[macro.rest_param] = list(raw_args[rest_start:])
|
|
|
|
return _trampoline(_eval(macro.body, local))
|
|
|
|
|
|
def _sf_defhandler(expr: list, env: dict) -> HandlerDef:
|
|
"""``(defhandler name (&key param...) body)``"""
|
|
if len(expr) < 4:
|
|
raise EvalError("defhandler requires name, params, and body")
|
|
name_sym = expr[1]
|
|
if not isinstance(name_sym, Symbol):
|
|
raise EvalError(f"defhandler name must be symbol, got {type(name_sym).__name__}")
|
|
|
|
params_expr = expr[2]
|
|
if not isinstance(params_expr, list):
|
|
raise EvalError("defhandler params must be a list")
|
|
|
|
params: list[str] = []
|
|
in_key = False
|
|
for p in params_expr:
|
|
if isinstance(p, Symbol):
|
|
if p.name == "&key":
|
|
in_key = True
|
|
continue
|
|
if in_key:
|
|
params.append(p.name)
|
|
elif isinstance(p, str):
|
|
params.append(p)
|
|
|
|
handler = HandlerDef(
|
|
name=name_sym.name,
|
|
params=params,
|
|
body=expr[3],
|
|
closure=dict(env),
|
|
)
|
|
env[f"handler:{name_sym.name}"] = handler
|
|
return handler
|
|
|
|
|
|
def _parse_key_params(params_expr: list) -> list[str]:
|
|
"""Parse ``(&key param1 param2 ...)`` into a list of param name strings."""
|
|
params: list[str] = []
|
|
in_key = False
|
|
for p in params_expr:
|
|
if isinstance(p, Symbol):
|
|
if p.name == "&key":
|
|
in_key = True
|
|
continue
|
|
if in_key:
|
|
params.append(p.name)
|
|
elif isinstance(p, str):
|
|
params.append(p)
|
|
return params
|
|
|
|
|
|
def _sf_defquery(expr: list, env: dict):
|
|
"""``(defquery name (&key param...) "docstring" body)``"""
|
|
from .types import QueryDef
|
|
if len(expr) < 4:
|
|
raise EvalError("defquery requires name, params, and body")
|
|
name_sym = expr[1]
|
|
if not isinstance(name_sym, Symbol):
|
|
raise EvalError(f"defquery name must be symbol, got {type(name_sym).__name__}")
|
|
params_expr = expr[2]
|
|
if not isinstance(params_expr, list):
|
|
raise EvalError("defquery params must be a list")
|
|
params = _parse_key_params(params_expr)
|
|
# Optional docstring before body
|
|
if len(expr) >= 5 and isinstance(expr[3], str):
|
|
doc = expr[3]
|
|
body = expr[4]
|
|
else:
|
|
doc = ""
|
|
body = expr[3]
|
|
qdef = QueryDef(
|
|
name=name_sym.name, params=params, doc=doc,
|
|
body=body, closure=dict(env),
|
|
)
|
|
env[f"query:{name_sym.name}"] = qdef
|
|
return qdef
|
|
|
|
|
|
def _sf_defaction(expr: list, env: dict):
|
|
"""``(defaction name (&key param...) "docstring" body)``"""
|
|
from .types import ActionDef
|
|
if len(expr) < 4:
|
|
raise EvalError("defaction requires name, params, and body")
|
|
name_sym = expr[1]
|
|
if not isinstance(name_sym, Symbol):
|
|
raise EvalError(f"defaction name must be symbol, got {type(name_sym).__name__}")
|
|
params_expr = expr[2]
|
|
if not isinstance(params_expr, list):
|
|
raise EvalError("defaction params must be a list")
|
|
params = _parse_key_params(params_expr)
|
|
if len(expr) >= 5 and isinstance(expr[3], str):
|
|
doc = expr[3]
|
|
body = expr[4]
|
|
else:
|
|
doc = ""
|
|
body = expr[3]
|
|
adef = ActionDef(
|
|
name=name_sym.name, params=params, doc=doc,
|
|
body=body, closure=dict(env),
|
|
)
|
|
env[f"action:{name_sym.name}"] = adef
|
|
return adef
|
|
|
|
|
|
def _sf_set_bang(expr: list, env: dict) -> Any:
|
|
"""``(set! name value)`` — mutate existing binding."""
|
|
if len(expr) != 3:
|
|
raise EvalError("set! requires name and value")
|
|
name_sym = expr[1]
|
|
if not isinstance(name_sym, Symbol):
|
|
raise EvalError(f"set! name must be symbol, got {type(name_sym).__name__}")
|
|
value = _trampoline(_eval(expr[2], env))
|
|
# Walk up scope if using Env objects; for plain dicts just overwrite
|
|
env[name_sym.name] = value
|
|
return value
|
|
|
|
|
|
_VALID_CARDINALITIES = {"one-to-one", "one-to-many", "many-to-many"}
|
|
_VALID_NAV = {"submenu", "tab", "badge", "inline", "hidden"}
|
|
|
|
|
|
def _sf_defrelation(expr: list, env: dict) -> RelationDef:
|
|
"""``(defrelation :name :from "t" :to "t" :cardinality :card ...)``"""
|
|
if len(expr) < 2:
|
|
raise EvalError("defrelation requires a name")
|
|
|
|
name_kw = expr[1]
|
|
if not isinstance(name_kw, Keyword):
|
|
raise EvalError(f"defrelation name must be a keyword, got {type(name_kw).__name__}")
|
|
rel_name = name_kw.name
|
|
|
|
# Parse keyword args from remaining elements
|
|
kwargs: dict[str, str | None] = {}
|
|
i = 2
|
|
while i < len(expr):
|
|
key = expr[i]
|
|
if isinstance(key, Keyword):
|
|
if i + 1 < len(expr):
|
|
val = expr[i + 1]
|
|
if isinstance(val, Keyword):
|
|
kwargs[key.name] = val.name
|
|
else:
|
|
kwargs[key.name] = _trampoline(_eval(val, env)) if not isinstance(val, str) else val
|
|
i += 2
|
|
else:
|
|
kwargs[key.name] = None
|
|
i += 1
|
|
else:
|
|
i += 1
|
|
|
|
for field in ("from", "to", "cardinality"):
|
|
if field not in kwargs:
|
|
raise EvalError(f"defrelation {rel_name} missing required :{field}")
|
|
|
|
card = kwargs["cardinality"]
|
|
if card not in _VALID_CARDINALITIES:
|
|
raise EvalError(
|
|
f"defrelation {rel_name}: invalid cardinality {card!r}, "
|
|
f"expected one of {_VALID_CARDINALITIES}"
|
|
)
|
|
|
|
nav = kwargs.get("nav", "hidden")
|
|
if nav not in _VALID_NAV:
|
|
raise EvalError(
|
|
f"defrelation {rel_name}: invalid nav {nav!r}, "
|
|
f"expected one of {_VALID_NAV}"
|
|
)
|
|
|
|
defn = RelationDef(
|
|
name=rel_name,
|
|
from_type=kwargs["from"],
|
|
to_type=kwargs["to"],
|
|
cardinality=card,
|
|
inverse=kwargs.get("inverse"),
|
|
nav=nav,
|
|
nav_icon=kwargs.get("nav-icon"),
|
|
nav_label=kwargs.get("nav-label"),
|
|
)
|
|
|
|
from .relations import register_relation
|
|
register_relation(defn)
|
|
|
|
env[f"relation:{rel_name}"] = defn
|
|
return defn
|
|
|
|
|
|
def _sf_defpage(expr: list, env: dict) -> PageDef:
|
|
"""``(defpage name :path "/..." :auth :public :content expr ...)``
|
|
|
|
Parses keyword args from the expression. All slot values are stored
|
|
as unevaluated AST — they are resolved at request time by execute_page().
|
|
"""
|
|
if len(expr) < 2:
|
|
raise EvalError("defpage requires a name")
|
|
name_sym = expr[1]
|
|
if not isinstance(name_sym, Symbol):
|
|
raise EvalError(f"defpage name must be symbol, got {type(name_sym).__name__}")
|
|
|
|
# Parse keyword args — values are NOT evaluated (stored as AST)
|
|
slots: dict[str, Any] = {}
|
|
i = 2
|
|
while i < len(expr):
|
|
key = expr[i]
|
|
if isinstance(key, Keyword) and i + 1 < len(expr):
|
|
slots[key.name] = expr[i + 1]
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
|
|
# Required fields
|
|
path = slots.get("path")
|
|
if path is None:
|
|
raise EvalError(f"defpage {name_sym.name} missing required :path")
|
|
if not isinstance(path, str):
|
|
raise EvalError(f"defpage {name_sym.name} :path must be a string")
|
|
|
|
auth_val = slots.get("auth", "public")
|
|
if isinstance(auth_val, Keyword):
|
|
auth: str | list = auth_val.name
|
|
elif isinstance(auth_val, list):
|
|
# (:rights "a" "b") → ["rights", "a", "b"]
|
|
auth = []
|
|
for item in auth_val:
|
|
if isinstance(item, Keyword):
|
|
auth.append(item.name)
|
|
elif isinstance(item, str):
|
|
auth.append(item)
|
|
else:
|
|
auth.append(_trampoline(_eval(item, env)))
|
|
else:
|
|
auth = str(auth_val) if auth_val else "public"
|
|
|
|
# Layout — keep unevaluated
|
|
layout = slots.get("layout")
|
|
if isinstance(layout, Keyword):
|
|
layout = layout.name
|
|
elif isinstance(layout, list):
|
|
# Keep as unevaluated list for execute_page to resolve at request time
|
|
pass
|
|
|
|
# Cache — evaluate if present (it's a static config dict)
|
|
cache_val = slots.get("cache")
|
|
cache = None
|
|
if cache_val is not None:
|
|
cache_result = _trampoline(_eval(cache_val, env))
|
|
if isinstance(cache_result, dict):
|
|
cache = cache_result
|
|
|
|
page = PageDef(
|
|
name=name_sym.name,
|
|
path=path,
|
|
auth=auth,
|
|
layout=layout,
|
|
cache=cache,
|
|
data_expr=slots.get("data"),
|
|
content_expr=slots.get("content"),
|
|
filter_expr=slots.get("filter"),
|
|
aside_expr=slots.get("aside"),
|
|
menu_expr=slots.get("menu"),
|
|
closure=dict(env),
|
|
)
|
|
env[f"page:{name_sym.name}"] = page
|
|
return page
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Delimited continuations — shift / reset
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_RESET_RESUME = [] # stack of resume values; empty = not resuming
|
|
|
|
_RESET_SENTINEL = object()
|
|
|
|
|
|
def _sf_reset(expr, env):
|
|
"""(reset body) — establish a continuation delimiter."""
|
|
body = expr[1]
|
|
try:
|
|
return _trampoline(_eval(body, env))
|
|
except _ShiftSignal as sig:
|
|
def cont_fn(value=NIL):
|
|
_RESET_RESUME.append(value)
|
|
try:
|
|
return _trampoline(_eval(body, env))
|
|
finally:
|
|
_RESET_RESUME.pop()
|
|
k = Continuation(cont_fn)
|
|
sig_env = dict(sig.env)
|
|
sig_env[sig.k_name] = k
|
|
return _trampoline(_eval(sig.body, sig_env))
|
|
|
|
|
|
def _sf_shift(expr, env):
|
|
"""(shift k body) — capture continuation to nearest reset."""
|
|
if _RESET_RESUME:
|
|
return _RESET_RESUME[-1]
|
|
k_name = expr[1].name # symbol
|
|
body = expr[2]
|
|
raise _ShiftSignal(k_name, body, env)
|
|
|
|
|
|
_SPECIAL_FORMS: dict[str, Any] = {
|
|
"if": _sf_if,
|
|
"when": _sf_when,
|
|
"cond": _sf_cond,
|
|
"case": _sf_case,
|
|
"and": _sf_and,
|
|
"or": _sf_or,
|
|
"let": _sf_let,
|
|
"let*": _sf_let,
|
|
"lambda": _sf_lambda,
|
|
"fn": _sf_lambda,
|
|
"define": _sf_define,
|
|
"defstyle": _sf_defstyle,
|
|
"defkeyframes": _sf_defkeyframes,
|
|
"defcomp": _sf_defcomp,
|
|
"defrelation": _sf_defrelation,
|
|
"begin": _sf_begin,
|
|
"do": _sf_begin,
|
|
"quote": _sf_quote,
|
|
"->": _sf_thread_first,
|
|
"set!": _sf_set_bang,
|
|
"defmacro": _sf_defmacro,
|
|
"quasiquote": _sf_quasiquote,
|
|
"defhandler": _sf_defhandler,
|
|
"defpage": _sf_defpage,
|
|
"defquery": _sf_defquery,
|
|
"defaction": _sf_defaction,
|
|
"reset": _sf_reset,
|
|
"shift": _sf_shift,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Higher-order forms (need to evaluate the fn arg first)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _ho_map(expr: list, env: dict) -> list:
|
|
if len(expr) != 3:
|
|
raise EvalError("map requires fn and collection")
|
|
fn = _trampoline(_eval(expr[1], env))
|
|
coll = _trampoline(_eval(expr[2], env))
|
|
if isinstance(fn, Lambda):
|
|
return [_trampoline(_call_lambda(fn, [item], env)) for item in coll]
|
|
if callable(fn):
|
|
return [fn(item) for item in coll]
|
|
raise EvalError(f"map requires lambda, got {type(fn).__name__}")
|
|
|
|
|
|
def _ho_map_indexed(expr: list, env: dict) -> list:
|
|
if len(expr) != 3:
|
|
raise EvalError("map-indexed requires fn and collection")
|
|
fn = _trampoline(_eval(expr[1], env))
|
|
coll = _trampoline(_eval(expr[2], env))
|
|
if not isinstance(fn, Lambda):
|
|
raise EvalError(f"map-indexed requires lambda, got {type(fn).__name__}")
|
|
if len(fn.params) < 2:
|
|
raise EvalError("map-indexed lambda needs (i item) params")
|
|
return [_trampoline(_call_lambda(fn, [i, item], env)) for i, item in enumerate(coll)]
|
|
|
|
|
|
def _ho_filter(expr: list, env: dict) -> list:
|
|
if len(expr) != 3:
|
|
raise EvalError("filter requires fn and collection")
|
|
fn = _trampoline(_eval(expr[1], env))
|
|
coll = _trampoline(_eval(expr[2], env))
|
|
if not isinstance(fn, Lambda):
|
|
raise EvalError(f"filter requires lambda, got {type(fn).__name__}")
|
|
return [item for item in coll if _trampoline(_call_lambda(fn, [item], env))]
|
|
|
|
|
|
def _ho_reduce(expr: list, env: dict) -> Any:
|
|
if len(expr) != 4:
|
|
raise EvalError("reduce requires fn, init, and collection")
|
|
fn = _trampoline(_eval(expr[1], env))
|
|
acc = _trampoline(_eval(expr[2], env))
|
|
coll = _trampoline(_eval(expr[3], env))
|
|
if not isinstance(fn, Lambda):
|
|
raise EvalError(f"reduce requires lambda, got {type(fn).__name__}")
|
|
for item in coll:
|
|
acc = _trampoline(_call_lambda(fn, [acc, item], env))
|
|
return acc
|
|
|
|
|
|
def _ho_some(expr: list, env: dict) -> Any:
|
|
if len(expr) != 3:
|
|
raise EvalError("some requires fn and collection")
|
|
fn = _trampoline(_eval(expr[1], env))
|
|
coll = _trampoline(_eval(expr[2], env))
|
|
if not isinstance(fn, Lambda):
|
|
raise EvalError(f"some requires lambda, got {type(fn).__name__}")
|
|
for item in coll:
|
|
result = _trampoline(_call_lambda(fn, [item], env))
|
|
if result:
|
|
return result
|
|
return NIL
|
|
|
|
|
|
def _ho_every(expr: list, env: dict) -> bool:
|
|
if len(expr) != 3:
|
|
raise EvalError("every? requires fn and collection")
|
|
fn = _trampoline(_eval(expr[1], env))
|
|
coll = _trampoline(_eval(expr[2], env))
|
|
if not isinstance(fn, Lambda):
|
|
raise EvalError(f"every? requires lambda, got {type(fn).__name__}")
|
|
for item in coll:
|
|
if not _trampoline(_call_lambda(fn, [item], env)):
|
|
return False
|
|
return True
|
|
|
|
|
|
def _ho_for_each(expr: list, env: dict) -> Any:
|
|
if len(expr) != 3:
|
|
raise EvalError("for-each requires fn and collection")
|
|
fn = _trampoline(_eval(expr[1], env))
|
|
coll = _trampoline(_eval(expr[2], env))
|
|
if not isinstance(fn, Lambda):
|
|
raise EvalError(f"for-each requires lambda, got {type(fn).__name__}")
|
|
for item in coll:
|
|
_trampoline(_call_lambda(fn, [item], env))
|
|
return NIL
|
|
|
|
|
|
_HO_FORMS: dict[str, Any] = {
|
|
"map": _ho_map,
|
|
"map-indexed": _ho_map_indexed,
|
|
"filter": _ho_filter,
|
|
"reduce": _ho_reduce,
|
|
"some": _ho_some,
|
|
"every?": _ho_every,
|
|
"for-each": _ho_for_each,
|
|
}
|