""" S-expression evaluator. Walks a parsed s-expression tree and evaluates it in an environment. Special forms: (if cond then else?) (when cond body) (cond clause...) — Scheme-style ((test body)...) or Clojure-style (test body...) (case expr val body... :else default) (and expr...) (or expr...) (let ((name val)...) body) or (let (name val name val...) body) (lambda (params...) body) or (fn (params...) body) (define name value) (defcomp ~name (&key param...) body) (defrelation :name :from "type" :to "type" :cardinality :card ...) (begin expr...) (quote expr) (do expr...) — alias for begin (-> val form...) — thread-first macro Higher-order forms (operate on lambdas): (map fn coll) (map-indexed fn coll) (filter fn coll) (reduce fn init coll) (some fn coll) (every? fn coll) (for-each fn coll) """ from __future__ import annotations from typing import Any from .types import Component, Continuation, HandlerDef, Keyword, Lambda, Macro, NIL, PageDef, RelationDef, Symbol, _ShiftSignal from .primitives import _PRIMITIVES class EvalError(Exception): """Error during expression evaluation.""" pass class _Thunk: """Deferred evaluation — returned from tail positions for TCO.""" __slots__ = ("expr", "env") def __init__(self, expr: Any, env: dict[str, Any]): self.expr = expr self.env = env def _trampoline(val: Any) -> Any: """Unwrap thunks by re-entering the evaluator until we get an actual value.""" while isinstance(val, _Thunk): val = _eval(val.expr, val.env) return val # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def evaluate(expr: Any, env: dict[str, Any] | None = None) -> Any: """Evaluate *expr* in *env* and return the result.""" if env is None: env = {} result = _eval(expr, env) while isinstance(result, _Thunk): result = _eval(result.expr, result.env) return result def make_env(**kwargs: Any) -> dict[str, Any]: """Convenience: create an environment dict with initial bindings.""" return dict(kwargs) # --------------------------------------------------------------------------- # Internal evaluator # --------------------------------------------------------------------------- def _eval(expr: Any, env: dict[str, Any]) -> Any: # --- literals --------------------------------------------------------- if isinstance(expr, (int, float, str, bool)): return expr if expr is None or expr is NIL: return NIL # --- symbol lookup ---------------------------------------------------- if isinstance(expr, Symbol): name = expr.name if name in env: return env[name] if name in _PRIMITIVES: return _PRIMITIVES[name] if name == "true": return True if name == "false": return False if name == "nil": return NIL raise EvalError(f"Undefined symbol: {name}") # --- keyword → its string name ---------------------------------------- if isinstance(expr, Keyword): return expr.name # --- dict literal ----------------------------------------------------- if isinstance(expr, dict): return {k: _trampoline(_eval(v, env)) for k, v in expr.items()} # --- list = call or special form -------------------------------------- if not isinstance(expr, list): return expr if not expr: return [] head = expr[0] # If head is not a symbol/lambda/list, treat entire list as data if not isinstance(head, (Symbol, Lambda, list)): return [_trampoline(_eval(x, env)) for x in expr] # --- special forms ---------------------------------------------------- if isinstance(head, Symbol): name = head.name handler = _SPECIAL_FORMS.get(name) if handler is not None: return handler(expr, env) # Higher-order forms (need lazy eval of lambda arg) ho = _HO_FORMS.get(name) if ho is not None: return ho(expr, env) # Macro expansion — if head resolves to a Macro, expand then eval if name in env: val = env[name] if isinstance(val, Macro): expanded = _expand_macro(val, expr[1:], env) return _Thunk(expanded, env) # --- function / lambda call ------------------------------------------- fn = _trampoline(_eval(head, env)) args = [_trampoline(_eval(a, env)) for a in expr[1:]] if callable(fn) and not isinstance(fn, (Lambda, Component)): return fn(*args) if isinstance(fn, Lambda): return _call_lambda(fn, args, env) if isinstance(fn, Component): return _call_component(fn, expr[1:], env) raise EvalError(f"Not callable: {fn!r}") # --------------------------------------------------------------------------- # Lambda / component invocation # --------------------------------------------------------------------------- def _call_lambda(fn: Lambda, args: list[Any], caller_env: dict[str, Any]) -> Any: if len(args) != len(fn.params): raise EvalError(f"{fn!r} expects {len(fn.params)} args, got {len(args)}") local = dict(fn.closure) local.update(caller_env) for p, v in zip(fn.params, args): local[p] = v return _Thunk(fn.body, local) def _call_component(comp: Component, raw_args: list[Any], env: dict[str, Any]) -> Any: """Evaluate a component invocation with keyword arguments. ``(~card :title "Hello" (p "child"))`` → comp.params gets ``title="Hello"``, comp children gets ``[(p "child")]`` """ kwargs: dict[str, Any] = {} children: list[Any] = [] i = 0 while i < len(raw_args): arg = raw_args[i] if isinstance(arg, Keyword) and i + 1 < len(raw_args): kwargs[arg.name] = _trampoline(_eval(raw_args[i + 1], env)) i += 2 else: children.append(_trampoline(_eval(arg, env))) i += 1 local = dict(comp.closure) local.update(env) for p in comp.params: if p in kwargs: local[p] = kwargs[p] else: local[p] = NIL if comp.has_children: local["children"] = children return _Thunk(comp.body, local) # --------------------------------------------------------------------------- # Special forms # --------------------------------------------------------------------------- def _sf_if(expr: list, env: dict) -> Any: if len(expr) < 3: raise EvalError("if requires condition and then-branch") cond = _trampoline(_eval(expr[1], env)) if cond and cond is not NIL: return _Thunk(expr[2], env) if len(expr) > 3: return _Thunk(expr[3], env) return NIL def _sf_when(expr: list, env: dict) -> Any: if len(expr) < 3: raise EvalError("when requires condition and body") cond = _trampoline(_eval(expr[1], env)) if cond and cond is not NIL: for body_expr in expr[2:-1]: _trampoline(_eval(body_expr, env)) return _Thunk(expr[-1], env) return NIL def _sf_cond(expr: list, env: dict) -> Any: clauses = expr[1:] if not clauses: return NIL # Detect scheme-style: first clause is a 2-element list that isn't a comparison if ( isinstance(clauses[0], list) and len(clauses[0]) == 2 and not (isinstance(clauses[0][0], Symbol) and clauses[0][0].name in ( "=", "<", ">", "<=", ">=", "!=", "and", "or", )) ): for clause in clauses: if not isinstance(clause, list) or len(clause) < 2: raise EvalError("cond clause must be (test result)") test = clause[0] if isinstance(test, Symbol) and test.name in ("else", ":else"): return _Thunk(clause[1], env) if isinstance(test, Keyword) and test.name == "else": return _Thunk(clause[1], env) if _trampoline(_eval(test, env)): return _Thunk(clause[1], env) else: i = 0 while i < len(clauses) - 1: test = clauses[i] result = clauses[i + 1] if isinstance(test, Keyword) and test.name == "else": return _Thunk(result, env) if isinstance(test, Symbol) and test.name in (":else", "else"): return _Thunk(result, env) if _trampoline(_eval(test, env)): return _Thunk(result, env) i += 2 return NIL def _sf_case(expr: list, env: dict) -> Any: if len(expr) < 2: raise EvalError("case requires expression to match") match_val = _trampoline(_eval(expr[1], env)) clauses = expr[2:] i = 0 while i < len(clauses) - 1: test = clauses[i] result = clauses[i + 1] if isinstance(test, Keyword) and test.name == "else": return _Thunk(result, env) if isinstance(test, Symbol) and test.name in (":else", "else"): return _Thunk(result, env) if match_val == _trampoline(_eval(test, env)): return _Thunk(result, env) i += 2 return NIL def _sf_and(expr: list, env: dict) -> Any: result: Any = True for arg in expr[1:]: result = _trampoline(_eval(arg, env)) if not result: return result return result def _sf_or(expr: list, env: dict) -> Any: result: Any = False for arg in expr[1:]: result = _trampoline(_eval(arg, env)) if result: return result return result def _sf_let(expr: list, env: dict) -> Any: if len(expr) < 3: raise EvalError("let requires bindings and body") # Named let: (let name ((x 0) ...) body) if isinstance(expr[1], Symbol): return _sf_named_let(expr, env) bindings = expr[1] local = dict(env) if isinstance(bindings, list): if bindings and isinstance(bindings[0], list): # Scheme-style: ((name val) ...) for binding in bindings: if len(binding) != 2: raise EvalError("let binding must be (name value)") var = binding[0] vname = var.name if isinstance(var, Symbol) else var local[vname] = _trampoline(_eval(binding[1], local)) elif len(bindings) % 2 == 0: # Clojure-style: (name val name val ...) for i in range(0, len(bindings), 2): var = bindings[i] vname = var.name if isinstance(var, Symbol) else var local[vname] = _trampoline(_eval(bindings[i + 1], local)) else: raise EvalError("let bindings must be (name val ...) pairs") else: raise EvalError("let bindings must be a list") # Evaluate body expressions — all but last non-tail, last is tail body = expr[2:] for body_expr in body[:-1]: _trampoline(_eval(body_expr, local)) return _Thunk(body[-1], local) def _sf_named_let(expr: list, env: dict) -> Any: """``(let name ((x 0) (y 1)) body...)`` — self-recursive loop. Desugars to a lambda bound to *name* whose closure includes itself, called with the initial values. Tail calls to *name* produce TCO thunks. """ loop_name = expr[1].name bindings = expr[2] body = expr[3:] params: list[str] = [] inits: list[Any] = [] if isinstance(bindings, list): if bindings and isinstance(bindings[0], list): for binding in bindings: var = binding[0] params.append(var.name if isinstance(var, Symbol) else var) inits.append(binding[1]) elif len(bindings) % 2 == 0: for i in range(0, len(bindings), 2): var = bindings[i] params.append(var.name if isinstance(var, Symbol) else var) inits.append(bindings[i + 1]) # Build loop body (wrap in begin if multiple expressions) loop_body = body[0] if len(body) == 1 else [Symbol("begin")] + list(body) # Create self-recursive lambda loop_fn = Lambda(params, loop_body, dict(env), name=loop_name) loop_fn.closure[loop_name] = loop_fn # Evaluate initial values in enclosing env, then call init_vals = [_trampoline(_eval(init, env)) for init in inits] return _call_lambda(loop_fn, init_vals, env) def _sf_letrec(expr: list, env: dict) -> Any: """``(letrec ((name1 val1) ...) body)`` — mutually recursive bindings. All names are bound to NIL first, then values are evaluated (so they can reference each other), then lambda closures are patched. """ if len(expr) < 3: raise EvalError("letrec requires bindings and body") bindings = expr[1] local = dict(env) names: list[str] = [] val_exprs: list[Any] = [] if isinstance(bindings, list): if bindings and isinstance(bindings[0], list): for binding in bindings: var = binding[0] vname = var.name if isinstance(var, Symbol) else var names.append(vname) val_exprs.append(binding[1]) local[vname] = NIL elif len(bindings) % 2 == 0: for i in range(0, len(bindings), 2): var = bindings[i] vname = var.name if isinstance(var, Symbol) else var names.append(vname) val_exprs.append(bindings[i + 1]) local[vname] = NIL # Evaluate all values — they can see each other's names (initially NIL) values = [_trampoline(_eval(ve, local)) for ve in val_exprs] # Bind final values for name, val in zip(names, values): local[name] = val # Patch lambda closures so they see the final bindings for val in values: if isinstance(val, Lambda): for name in names: val.closure[name] = local[name] body = expr[2:] for body_expr in body[:-1]: _trampoline(_eval(body_expr, local)) return _Thunk(body[-1], local) def _sf_dynamic_wind(expr: list, env: dict) -> Any: """``(dynamic-wind before body after)`` — entry/exit guards. All three arguments are thunks (zero-arg functions). *before* is called on entry, *after* is always called on exit (even on error). The wind stack is maintained for future continuation support. """ if len(expr) != 4: raise EvalError("dynamic-wind requires 3 arguments (before, body, after)") before = _trampoline(_eval(expr[1], env)) body_fn = _trampoline(_eval(expr[2], env)) after = _trampoline(_eval(expr[3], env)) def _call_thunk(fn: Any) -> Any: if isinstance(fn, Lambda): return _trampoline(_call_lambda(fn, [], env)) if callable(fn): return fn() raise EvalError(f"dynamic-wind: expected thunk, got {type(fn).__name__}") # Entry _call_thunk(before) _WIND_STACK.append((before, after)) try: result = _call_thunk(body_fn) finally: _WIND_STACK.pop() _call_thunk(after) return result # Wind stack for dynamic-wind (thread-safe enough for sync evaluator) _WIND_STACK: list[tuple] = [] def _sf_lambda(expr: list, env: dict) -> Lambda: if len(expr) < 3: raise EvalError("lambda requires params and body") params_expr = expr[1] if not isinstance(params_expr, list): raise EvalError("lambda params must be a list") param_names = [] for p in params_expr: if isinstance(p, Symbol): param_names.append(p.name) elif isinstance(p, str): param_names.append(p) else: raise EvalError(f"Invalid lambda param: {p}") return Lambda(param_names, expr[2], dict(env)) def _sf_define(expr: list, env: dict) -> Any: if len(expr) < 3: raise EvalError("define requires name and value") name_sym = expr[1] if not isinstance(name_sym, Symbol): raise EvalError(f"define name must be symbol, got {type(name_sym).__name__}") value = _trampoline(_eval(expr[2], env)) if isinstance(value, Lambda) and value.name is None: value.name = name_sym.name env[name_sym.name] = value return value def _sf_defstyle(expr: list, env: dict) -> Any: """``(defstyle card-base (css :rounded-xl :bg-white :shadow))`` Evaluates body → StyleValue, binds to name in env. """ if len(expr) < 3: raise EvalError("defstyle requires name and body") name_sym = expr[1] if not isinstance(name_sym, Symbol): raise EvalError(f"defstyle name must be symbol, got {type(name_sym).__name__}") value = _trampoline(_eval(expr[2], env)) env[name_sym.name] = value return value def _sf_defkeyframes(expr: list, env: dict) -> Any: """``(defkeyframes fade-in (from (css :opacity-0)) (to (css :opacity-100)))`` Builds @keyframes rule from steps, registers it, and binds the animation. """ from .types import StyleValue from .css_registry import register_generated_rule from .style_dict import KEYFRAMES if len(expr) < 3: raise EvalError("defkeyframes requires name and at least one step") name_sym = expr[1] if not isinstance(name_sym, Symbol): raise EvalError(f"defkeyframes name must be symbol, got {type(name_sym).__name__}") kf_name = name_sym.name # Build @keyframes rule from steps steps: list[str] = [] for step_expr in expr[2:]: if not isinstance(step_expr, list) or len(step_expr) < 2: raise EvalError("defkeyframes step must be (selector (css ...))") selector = step_expr[0] if isinstance(selector, Symbol): selector = selector.name else: selector = str(selector) body = _trampoline(_eval(step_expr[1], env)) if isinstance(body, StyleValue): decls = body.declarations elif isinstance(body, str): decls = body else: raise EvalError(f"defkeyframes step body must be css/string, got {type(body).__name__}") steps.append(f"{selector}{{{decls}}}") kf_rule = f"@keyframes {kf_name}{{{' '.join(steps)}}}" # Register in KEYFRAMES so animate-{name} works KEYFRAMES[kf_name] = kf_rule # Clear resolver cache so new keyframes are picked up from .style_resolver import _resolve_cached _resolve_cached.cache_clear() # Create a StyleValue for the animation property import hashlib h = hashlib.sha256(kf_rule.encode()).hexdigest()[:6] sv = StyleValue( class_name=f"sx-{h}", declarations=f"animation-name:{kf_name}", keyframes=((kf_name, kf_rule),), ) register_generated_rule(sv) env[kf_name] = sv return sv def _sf_defcomp(expr: list, env: dict) -> Component: """``(defcomp ~name (&key param1 param2 &rest children) body)``""" if len(expr) < 4: raise EvalError("defcomp requires name, params, and body") name_sym = expr[1] if not isinstance(name_sym, Symbol): raise EvalError(f"defcomp name must be symbol, got {type(name_sym).__name__}") comp_name = name_sym.name.lstrip("~") params_expr = expr[2] if not isinstance(params_expr, list): raise EvalError("defcomp params must be a list") params: list[str] = [] has_children = False in_key = False for p in params_expr: if isinstance(p, Symbol): if p.name == "&key": in_key = True continue if p.name == "&rest": has_children = True continue if in_key or has_children: if not has_children: params.append(p.name) else: params.append(p.name) # Skip children param name after &rest elif isinstance(p, str): params.append(p) comp = Component( name=comp_name, params=params, has_children=has_children, body=expr[3], closure=dict(env), ) env[name_sym.name] = comp return comp def _sf_begin(expr: list, env: dict) -> Any: if len(expr) < 2: return NIL for sub in expr[1:-1]: _trampoline(_eval(sub, env)) return _Thunk(expr[-1], env) def _sf_quote(expr: list, _env: dict) -> Any: return expr[1] if len(expr) > 1 else NIL def _sf_thread_first(expr: list, env: dict) -> Any: """``(-> val (f a) (g b))`` → ``(g (f val a) b)``""" if len(expr) < 2: raise EvalError("-> requires at least a value") result = _trampoline(_eval(expr[1], env)) for form in expr[2:]: if isinstance(form, list): fn = _trampoline(_eval(form[0], env)) args = [result] + [_trampoline(_eval(a, env)) for a in form[1:]] else: fn = _trampoline(_eval(form, env)) args = [result] if callable(fn) and not isinstance(fn, (Lambda, Component)): result = fn(*args) elif isinstance(fn, Lambda): result = _trampoline(_call_lambda(fn, args, env)) else: raise EvalError(f"-> form not callable: {fn!r}") return result def _sf_defmacro(expr: list, env: dict) -> Macro: """``(defmacro name (params... &rest rest) body)``""" if len(expr) < 4: raise EvalError("defmacro requires name, params, and body") name_sym = expr[1] if not isinstance(name_sym, Symbol): raise EvalError(f"defmacro name must be symbol, got {type(name_sym).__name__}") params_expr = expr[2] if not isinstance(params_expr, list): raise EvalError("defmacro params must be a list") params: list[str] = [] rest_param: str | None = None i = 0 while i < len(params_expr): p = params_expr[i] if isinstance(p, Symbol) and p.name == "&rest": if i + 1 < len(params_expr): rp = params_expr[i + 1] rest_param = rp.name if isinstance(rp, Symbol) else str(rp) break if isinstance(p, Symbol): params.append(p.name) elif isinstance(p, str): params.append(p) i += 1 macro = Macro( params=params, rest_param=rest_param, body=expr[3], closure=dict(env), name=name_sym.name, ) env[name_sym.name] = macro return macro def _sf_quasiquote(expr: list, env: dict) -> Any: """``(quasiquote template)`` — process quasiquote template.""" if len(expr) < 2: raise EvalError("quasiquote requires a template") return _qq_expand(expr[1], env) def _qq_expand(template: Any, env: dict) -> Any: """Walk a quasiquote template, replacing unquote/splice-unquote.""" if not isinstance(template, list): return template if not template: return [] # Check for (unquote x) or (splice-unquote x) head = template[0] if isinstance(head, Symbol): if head.name == "unquote": if len(template) < 2: raise EvalError("unquote requires an expression") return _trampoline(_eval(template[1], env)) if head.name == "splice-unquote": raise EvalError("splice-unquote not inside a list") # Walk children, handling splice-unquote result: list[Any] = [] for item in template: if isinstance(item, list) and len(item) == 2 and isinstance(item[0], Symbol) and item[0].name == "splice-unquote": spliced = _trampoline(_eval(item[1], env)) if isinstance(spliced, list): result.extend(spliced) elif spliced is not None and spliced is not NIL: result.append(spliced) else: result.append(_qq_expand(item, env)) return result def _expand_macro(macro: Macro, raw_args: list[Any], env: dict) -> Any: """Expand a macro: bind unevaluated args, evaluate body to get new AST.""" local = dict(macro.closure) local.update(env) # Bind positional params for i, param in enumerate(macro.params): if i < len(raw_args): local[param] = raw_args[i] else: local[param] = NIL # Bind &rest param if macro.rest_param is not None: rest_start = len(macro.params) local[macro.rest_param] = list(raw_args[rest_start:]) return _trampoline(_eval(macro.body, local)) def _sf_defhandler(expr: list, env: dict) -> HandlerDef: """``(defhandler name (&key param...) body)``""" if len(expr) < 4: raise EvalError("defhandler requires name, params, and body") name_sym = expr[1] if not isinstance(name_sym, Symbol): raise EvalError(f"defhandler name must be symbol, got {type(name_sym).__name__}") params_expr = expr[2] if not isinstance(params_expr, list): raise EvalError("defhandler params must be a list") params: list[str] = [] in_key = False for p in params_expr: if isinstance(p, Symbol): if p.name == "&key": in_key = True continue if in_key: params.append(p.name) elif isinstance(p, str): params.append(p) handler = HandlerDef( name=name_sym.name, params=params, body=expr[3], closure=dict(env), ) env[f"handler:{name_sym.name}"] = handler return handler def _parse_key_params(params_expr: list) -> list[str]: """Parse ``(&key param1 param2 ...)`` into a list of param name strings.""" params: list[str] = [] in_key = False for p in params_expr: if isinstance(p, Symbol): if p.name == "&key": in_key = True continue if in_key: params.append(p.name) elif isinstance(p, str): params.append(p) return params def _sf_defquery(expr: list, env: dict): """``(defquery name (&key param...) "docstring" body)``""" from .types import QueryDef if len(expr) < 4: raise EvalError("defquery requires name, params, and body") name_sym = expr[1] if not isinstance(name_sym, Symbol): raise EvalError(f"defquery name must be symbol, got {type(name_sym).__name__}") params_expr = expr[2] if not isinstance(params_expr, list): raise EvalError("defquery params must be a list") params = _parse_key_params(params_expr) # Optional docstring before body if len(expr) >= 5 and isinstance(expr[3], str): doc = expr[3] body = expr[4] else: doc = "" body = expr[3] qdef = QueryDef( name=name_sym.name, params=params, doc=doc, body=body, closure=dict(env), ) env[f"query:{name_sym.name}"] = qdef return qdef def _sf_defaction(expr: list, env: dict): """``(defaction name (&key param...) "docstring" body)``""" from .types import ActionDef if len(expr) < 4: raise EvalError("defaction requires name, params, and body") name_sym = expr[1] if not isinstance(name_sym, Symbol): raise EvalError(f"defaction name must be symbol, got {type(name_sym).__name__}") params_expr = expr[2] if not isinstance(params_expr, list): raise EvalError("defaction params must be a list") params = _parse_key_params(params_expr) if len(expr) >= 5 and isinstance(expr[3], str): doc = expr[3] body = expr[4] else: doc = "" body = expr[3] adef = ActionDef( name=name_sym.name, params=params, doc=doc, body=body, closure=dict(env), ) env[f"action:{name_sym.name}"] = adef return adef def _sf_set_bang(expr: list, env: dict) -> Any: """``(set! name value)`` — mutate existing binding.""" if len(expr) != 3: raise EvalError("set! requires name and value") name_sym = expr[1] if not isinstance(name_sym, Symbol): raise EvalError(f"set! name must be symbol, got {type(name_sym).__name__}") value = _trampoline(_eval(expr[2], env)) # Walk up scope if using Env objects; for plain dicts just overwrite env[name_sym.name] = value return value _VALID_CARDINALITIES = {"one-to-one", "one-to-many", "many-to-many"} _VALID_NAV = {"submenu", "tab", "badge", "inline", "hidden"} def _sf_defrelation(expr: list, env: dict) -> RelationDef: """``(defrelation :name :from "t" :to "t" :cardinality :card ...)``""" if len(expr) < 2: raise EvalError("defrelation requires a name") name_kw = expr[1] if not isinstance(name_kw, Keyword): raise EvalError(f"defrelation name must be a keyword, got {type(name_kw).__name__}") rel_name = name_kw.name # Parse keyword args from remaining elements kwargs: dict[str, str | None] = {} i = 2 while i < len(expr): key = expr[i] if isinstance(key, Keyword): if i + 1 < len(expr): val = expr[i + 1] if isinstance(val, Keyword): kwargs[key.name] = val.name else: kwargs[key.name] = _trampoline(_eval(val, env)) if not isinstance(val, str) else val i += 2 else: kwargs[key.name] = None i += 1 else: i += 1 for field in ("from", "to", "cardinality"): if field not in kwargs: raise EvalError(f"defrelation {rel_name} missing required :{field}") card = kwargs["cardinality"] if card not in _VALID_CARDINALITIES: raise EvalError( f"defrelation {rel_name}: invalid cardinality {card!r}, " f"expected one of {_VALID_CARDINALITIES}" ) nav = kwargs.get("nav", "hidden") if nav not in _VALID_NAV: raise EvalError( f"defrelation {rel_name}: invalid nav {nav!r}, " f"expected one of {_VALID_NAV}" ) defn = RelationDef( name=rel_name, from_type=kwargs["from"], to_type=kwargs["to"], cardinality=card, inverse=kwargs.get("inverse"), nav=nav, nav_icon=kwargs.get("nav-icon"), nav_label=kwargs.get("nav-label"), ) from .relations import register_relation register_relation(defn) env[f"relation:{rel_name}"] = defn return defn def _sf_defpage(expr: list, env: dict) -> PageDef: """``(defpage name :path "/..." :auth :public :content expr ...)`` Parses keyword args from the expression. All slot values are stored as unevaluated AST — they are resolved at request time by execute_page(). """ if len(expr) < 2: raise EvalError("defpage requires a name") name_sym = expr[1] if not isinstance(name_sym, Symbol): raise EvalError(f"defpage name must be symbol, got {type(name_sym).__name__}") # Parse keyword args — values are NOT evaluated (stored as AST) slots: dict[str, Any] = {} i = 2 while i < len(expr): key = expr[i] if isinstance(key, Keyword) and i + 1 < len(expr): slots[key.name] = expr[i + 1] i += 2 else: i += 1 # Required fields path = slots.get("path") if path is None: raise EvalError(f"defpage {name_sym.name} missing required :path") if not isinstance(path, str): raise EvalError(f"defpage {name_sym.name} :path must be a string") auth_val = slots.get("auth", "public") if isinstance(auth_val, Keyword): auth: str | list = auth_val.name elif isinstance(auth_val, list): # (:rights "a" "b") → ["rights", "a", "b"] auth = [] for item in auth_val: if isinstance(item, Keyword): auth.append(item.name) elif isinstance(item, str): auth.append(item) else: auth.append(_trampoline(_eval(item, env))) else: auth = str(auth_val) if auth_val else "public" # Layout — keep unevaluated layout = slots.get("layout") if isinstance(layout, Keyword): layout = layout.name elif isinstance(layout, list): # Keep as unevaluated list for execute_page to resolve at request time pass # Cache — evaluate if present (it's a static config dict) cache_val = slots.get("cache") cache = None if cache_val is not None: cache_result = _trampoline(_eval(cache_val, env)) if isinstance(cache_result, dict): cache = cache_result # Stream — evaluate (it's a static boolean) stream_val = slots.get("stream") stream = False if stream_val is not None: stream_result = _trampoline(_eval(stream_val, env)) stream = bool(stream_result) page = PageDef( name=name_sym.name, path=path, auth=auth, layout=layout, cache=cache, data_expr=slots.get("data"), content_expr=slots.get("content"), filter_expr=slots.get("filter"), aside_expr=slots.get("aside"), menu_expr=slots.get("menu"), stream=stream, fallback_expr=slots.get("fallback"), closure=dict(env), ) env[f"page:{name_sym.name}"] = page return page # --------------------------------------------------------------------------- # Delimited continuations — shift / reset # --------------------------------------------------------------------------- _RESET_RESUME = [] # stack of resume values; empty = not resuming _RESET_SENTINEL = object() def _sf_reset(expr, env): """(reset body) — establish a continuation delimiter.""" body = expr[1] try: return _trampoline(_eval(body, env)) except _ShiftSignal as sig: def cont_fn(value=NIL): _RESET_RESUME.append(value) try: return _trampoline(_eval(body, env)) finally: _RESET_RESUME.pop() k = Continuation(cont_fn) sig_env = dict(sig.env) sig_env[sig.k_name] = k return _trampoline(_eval(sig.body, sig_env)) def _sf_shift(expr, env): """(shift k body) — capture continuation to nearest reset.""" if _RESET_RESUME: return _RESET_RESUME[-1] k_name = expr[1].name # symbol body = expr[2] raise _ShiftSignal(k_name, body, env) _SPECIAL_FORMS: dict[str, Any] = { "if": _sf_if, "when": _sf_when, "cond": _sf_cond, "case": _sf_case, "and": _sf_and, "or": _sf_or, "let": _sf_let, "let*": _sf_let, "letrec": _sf_letrec, "lambda": _sf_lambda, "fn": _sf_lambda, "define": _sf_define, "defstyle": _sf_defstyle, "defkeyframes": _sf_defkeyframes, "defcomp": _sf_defcomp, "defrelation": _sf_defrelation, "begin": _sf_begin, "do": _sf_begin, "quote": _sf_quote, "->": _sf_thread_first, "set!": _sf_set_bang, "dynamic-wind": _sf_dynamic_wind, "defmacro": _sf_defmacro, "quasiquote": _sf_quasiquote, "defhandler": _sf_defhandler, "defpage": _sf_defpage, "defquery": _sf_defquery, "defaction": _sf_defaction, "reset": _sf_reset, "shift": _sf_shift, } # --------------------------------------------------------------------------- # Higher-order forms (need to evaluate the fn arg first) # --------------------------------------------------------------------------- def _ho_map(expr: list, env: dict) -> list: if len(expr) != 3: raise EvalError("map requires fn and collection") fn = _trampoline(_eval(expr[1], env)) coll = _trampoline(_eval(expr[2], env)) if isinstance(fn, Lambda): return [_trampoline(_call_lambda(fn, [item], env)) for item in coll] if callable(fn): return [fn(item) for item in coll] raise EvalError(f"map requires lambda, got {type(fn).__name__}") def _ho_map_indexed(expr: list, env: dict) -> list: if len(expr) != 3: raise EvalError("map-indexed requires fn and collection") fn = _trampoline(_eval(expr[1], env)) coll = _trampoline(_eval(expr[2], env)) if not isinstance(fn, Lambda): raise EvalError(f"map-indexed requires lambda, got {type(fn).__name__}") if len(fn.params) < 2: raise EvalError("map-indexed lambda needs (i item) params") return [_trampoline(_call_lambda(fn, [i, item], env)) for i, item in enumerate(coll)] def _ho_filter(expr: list, env: dict) -> list: if len(expr) != 3: raise EvalError("filter requires fn and collection") fn = _trampoline(_eval(expr[1], env)) coll = _trampoline(_eval(expr[2], env)) if not isinstance(fn, Lambda): raise EvalError(f"filter requires lambda, got {type(fn).__name__}") return [item for item in coll if _trampoline(_call_lambda(fn, [item], env))] def _ho_reduce(expr: list, env: dict) -> Any: if len(expr) != 4: raise EvalError("reduce requires fn, init, and collection") fn = _trampoline(_eval(expr[1], env)) acc = _trampoline(_eval(expr[2], env)) coll = _trampoline(_eval(expr[3], env)) if not isinstance(fn, Lambda): raise EvalError(f"reduce requires lambda, got {type(fn).__name__}") for item in coll: acc = _trampoline(_call_lambda(fn, [acc, item], env)) return acc def _ho_some(expr: list, env: dict) -> Any: if len(expr) != 3: raise EvalError("some requires fn and collection") fn = _trampoline(_eval(expr[1], env)) coll = _trampoline(_eval(expr[2], env)) if not isinstance(fn, Lambda): raise EvalError(f"some requires lambda, got {type(fn).__name__}") for item in coll: result = _trampoline(_call_lambda(fn, [item], env)) if result: return result return NIL def _ho_every(expr: list, env: dict) -> bool: if len(expr) != 3: raise EvalError("every? requires fn and collection") fn = _trampoline(_eval(expr[1], env)) coll = _trampoline(_eval(expr[2], env)) if not isinstance(fn, Lambda): raise EvalError(f"every? requires lambda, got {type(fn).__name__}") for item in coll: if not _trampoline(_call_lambda(fn, [item], env)): return False return True def _ho_for_each(expr: list, env: dict) -> Any: if len(expr) != 3: raise EvalError("for-each requires fn and collection") fn = _trampoline(_eval(expr[1], env)) coll = _trampoline(_eval(expr[2], env)) if not isinstance(fn, Lambda): raise EvalError(f"for-each requires lambda, got {type(fn).__name__}") for item in coll: _trampoline(_call_lambda(fn, [item], env)) return NIL _HO_FORMS: dict[str, Any] = { "map": _ho_map, "map-indexed": _ho_map_indexed, "filter": _ho_filter, "reduce": _ho_reduce, "some": _ho_some, "every?": _ho_every, "for-each": _ho_for_each, }