#!/usr/bin/env python3 """Run test-cek-reactive.sx — tests for deref-as-shift reactive rendering.""" from __future__ import annotations import os, sys _HERE = os.path.dirname(os.path.abspath(__file__)) _PROJECT = os.path.abspath(os.path.join(_HERE, "..", "..", "..")) _SPEC_TESTS = os.path.join(_PROJECT, "spec", "tests") _WEB_TESTS = os.path.join(_PROJECT, "web", "tests") sys.path.insert(0, _PROJECT) sys.setrecursionlimit(20000) from shared.sx.parser import parse_all from shared.sx.ref import sx_ref from shared.sx.ref.sx_ref import ( make_env, env_get, env_has, env_set, env_extend, env_merge, ) # Use tree-walk evaluator for interpreting .sx test files. # The CEK override (eval_expr = cek_run) would cause the interpreted cek.sx # to delegate to the transpiled CEK, not the interpreted one being tested. # Override both the local names AND the module-level names so that transpiled # functions (ho_map, call_lambda, etc.) also use tree-walk internally. eval_expr = sx_ref._tree_walk_eval_expr trampoline = sx_ref._tree_walk_trampoline sx_ref.eval_expr = eval_expr sx_ref.trampoline = trampoline from shared.sx.types import ( NIL, Symbol, Keyword, Lambda, Component, Island, Continuation, Macro, _ShiftSignal, ) # Build env with primitives env = make_env() # Platform test functions _suite_stack: list[str] = [] _pass_count = 0 _fail_count = 0 def _try_call(thunk): try: trampoline(eval_expr([thunk], env)) return {"ok": True} except Exception as e: return {"ok": False, "error": str(e)} def _report_pass(name): global _pass_count _pass_count += 1 ctx = " > ".join(_suite_stack) print(f" PASS: {ctx} > {name}") return NIL def _report_fail(name, error): global _fail_count _fail_count += 1 ctx = " > ".join(_suite_stack) print(f" FAIL: {ctx} > {name}: {error}") return NIL def _push_suite(name): _suite_stack.append(name) print(f"{' ' * (len(_suite_stack)-1)}Suite: {name}") return NIL def _pop_suite(): if _suite_stack: _suite_stack.pop() return NIL def _test_env(): return env def _sx_parse(source): return parse_all(source) def _sx_parse_one(source): """Parse a single expression.""" exprs = parse_all(source) return exprs[0] if exprs else NIL def _make_continuation(fn): return Continuation(fn) env["try-call"] = _try_call env["report-pass"] = _report_pass env["report-fail"] = _report_fail env["push-suite"] = _push_suite env["pop-suite"] = _pop_suite env["test-env"] = _test_env env["sx-parse"] = _sx_parse env["sx-parse-one"] = _sx_parse_one env["env-get"] = env_get env["env-has?"] = env_has env["env-set!"] = env_set env["env-extend"] = env_extend env["make-continuation"] = _make_continuation env["continuation?"] = lambda x: isinstance(x, Continuation) env["continuation-fn"] = lambda c: c.fn def _make_cek_continuation_with_data(captured, rest_kont): c = Continuation(lambda v=NIL: v) c._cek_data = {"captured": captured, "rest-kont": rest_kont} return c env["make-cek-continuation"] = _make_cek_continuation_with_data env["continuation-data"] = lambda c: getattr(c, '_cek_data', {}) # Type predicates and constructors env["callable?"] = lambda x: callable(x) or isinstance(x, (Lambda, Component, Island, Continuation)) env["lambda?"] = lambda x: isinstance(x, Lambda) env["component?"] = lambda x: isinstance(x, Component) env["island?"] = lambda x: isinstance(x, Island) env["macro?"] = lambda x: isinstance(x, Macro) env["thunk?"] = sx_ref.is_thunk env["thunk-expr"] = sx_ref.thunk_expr env["thunk-env"] = sx_ref.thunk_env env["make-thunk"] = sx_ref.make_thunk env["make-lambda"] = sx_ref.make_lambda env["make-component"] = sx_ref.make_component env["make-island"] = sx_ref.make_island env["make-macro"] = sx_ref.make_macro env["make-symbol"] = lambda n: Symbol(n) env["lambda-params"] = lambda f: f.params env["lambda-body"] = lambda f: f.body env["lambda-closure"] = lambda f: f.closure env["lambda-name"] = lambda f: f.name env["set-lambda-name!"] = lambda f, n: setattr(f, 'name', n) or NIL env["component-params"] = lambda c: c.params env["component-body"] = lambda c: c.body env["component-closure"] = lambda c: c.closure env["component-has-children?"] = lambda c: c.has_children env["component-affinity"] = lambda c: getattr(c, 'affinity', 'auto') env["component-set-param-types!"] = lambda c, t: setattr(c, 'param_types', t) or NIL env["macro-params"] = lambda m: m.params env["macro-rest-param"] = lambda m: m.rest_param env["macro-body"] = lambda m: m.body env["macro-closure"] = lambda m: m.closure env["env-merge"] = env_merge env["symbol-name"] = lambda s: s.name if isinstance(s, Symbol) else str(s) env["keyword-name"] = lambda k: k.name if isinstance(k, Keyword) else str(k) env["type-of"] = sx_ref.type_of env["primitive?"] = sx_ref.is_primitive env["get-primitive"] = sx_ref.get_primitive env["strip-prefix"] = lambda s, p: s[len(p):] if s.startswith(p) else s env["inspect"] = repr env["debug-log"] = lambda *args: None env["error"] = sx_ref.error env["apply"] = lambda f, args: f(*args) # Functions from eval.sx that cek.sx references env["trampoline"] = trampoline env["eval-expr"] = eval_expr env["eval-list"] = sx_ref.eval_list env["eval-call"] = sx_ref.eval_call env["call-lambda"] = sx_ref.call_lambda env["call-component"] = sx_ref.call_component env["parse-keyword-args"] = sx_ref.parse_keyword_args env["sf-lambda"] = sx_ref.sf_lambda env["sf-defcomp"] = sx_ref.sf_defcomp env["sf-defisland"] = sx_ref.sf_defisland env["sf-defmacro"] = sx_ref.sf_defmacro env["sf-defstyle"] = sx_ref.sf_defstyle env["sf-deftype"] = sx_ref.sf_deftype env["sf-defeffect"] = sx_ref.sf_defeffect env["sf-letrec"] = sx_ref.sf_letrec env["sf-named-let"] = sx_ref.sf_named_let env["sf-dynamic-wind"] = sx_ref.sf_dynamic_wind env["sf-scope"] = sx_ref.sf_scope env["sf-provide"] = sx_ref.sf_provide env["qq-expand"] = sx_ref.qq_expand env["expand-macro"] = sx_ref.expand_macro env["cond-scheme?"] = sx_ref.cond_scheme_p # Higher-order form handlers env["ho-map"] = sx_ref.ho_map env["ho-map-indexed"] = sx_ref.ho_map_indexed env["ho-filter"] = sx_ref.ho_filter env["ho-reduce"] = sx_ref.ho_reduce env["ho-some"] = sx_ref.ho_some env["ho-every"] = sx_ref.ho_every env["ho-for-each"] = sx_ref.ho_for_each env["call-fn"] = sx_ref.call_fn # Render-related (stub for testing — no active rendering) env["render-active?"] = lambda: False env["is-render-expr?"] = lambda expr: False env["render-expr"] = lambda expr, env: NIL # Scope primitives (needed for reactive-shift-deref island cleanup) env["scope-push!"] = sx_ref.PRIMITIVES.get("scope-push!", lambda *a: NIL) env["scope-pop!"] = sx_ref.PRIMITIVES.get("scope-pop!", lambda *a: NIL) env["context"] = sx_ref.PRIMITIVES.get("context", lambda *a: NIL) env["emit!"] = sx_ref.PRIMITIVES.get("emit!", lambda *a: NIL) env["emitted"] = sx_ref.PRIMITIVES.get("emitted", lambda *a: []) # Dynamic wind env["push-wind!"] = lambda before, after: NIL env["pop-wind!"] = lambda: NIL env["call-thunk"] = lambda f, e: f() if callable(f) else trampoline(eval_expr([f], e)) # Mutation helpers env["dict-get"] = lambda d, k: d.get(k, NIL) if isinstance(d, dict) else NIL env["identical?"] = lambda a, b: a is b # defhandler, defpage, defquery, defaction stubs for name in ["sf-defhandler", "sf-defpage", "sf-defquery", "sf-defaction"]: pyname = name.replace("-", "_") fn = getattr(sx_ref, pyname, None) if fn: env[name] = fn else: env[name] = lambda args, e, _n=name: NIL # Load test framework with open(os.path.join(_SPEC_TESTS, "test-framework.sx")) as f: for expr in parse_all(f.read()): trampoline(eval_expr(expr, env)) # Load signals module print("Loading signals.sx ...") with open(os.path.join(_PROJECT, "web", "signals.sx")) as f: for expr in parse_all(f.read()): trampoline(eval_expr(expr, env)) # Load frames module print("Loading frames.sx ...") with open(os.path.join(_PROJECT, "spec", "frames.sx")) as f: for expr in parse_all(f.read()): trampoline(eval_expr(expr, env)) # Load CEK module print("Loading cek.sx ...") with open(os.path.join(_PROJECT, "spec", "cek.sx")) as f: for expr in parse_all(f.read()): trampoline(eval_expr(expr, env)) # Run tests print("=" * 60) print("Running test-cek-reactive.sx") print("=" * 60) with open(os.path.join(_WEB_TESTS, "test-cek-reactive.sx")) as f: for expr in parse_all(f.read()): trampoline(eval_expr(expr, env)) print("=" * 60) print(f"Results: {_pass_count} passed, {_fail_count} failed") print("=" * 60) sys.exit(1 if _fail_count > 0 else 0)