#!/usr/bin/env python3 """Run test-signals.sx using the bootstrapped evaluator with signal primitives. Uses bootstrapped signal functions from sx_ref.py directly, patching apply to handle SX lambdas from the interpreter (test expressions create lambdas that need evaluator dispatch). """ from __future__ import annotations import os, sys _HERE = os.path.dirname(os.path.abspath(__file__)) _PROJECT = os.path.abspath(os.path.join(_HERE, "..", "..", "..")) _SPEC_TESTS = os.path.join(_PROJECT, "spec", "tests") _WEB_TESTS = os.path.join(_PROJECT, "web", "tests") sys.path.insert(0, _PROJECT) from shared.sx.ref.sx_ref import sx_parse as parse_all from shared.sx.ref import sx_ref from shared.sx.ref.sx_ref import make_env, scope_push, scope_pop, sx_context from shared.sx.types import NIL, Island, Lambda # Use tree-walk evaluator for interpreting .sx test files. eval_expr = sx_ref._tree_walk_eval_expr trampoline = sx_ref._tree_walk_trampoline sx_ref.eval_expr = eval_expr sx_ref.trampoline = trampoline # Build env with primitives env = make_env() # --- Patch apply BEFORE anything else --- # Test expressions create SX Lambdas that bootstrapped code calls via apply. # Patch the module-level function so all bootstrapped functions see it. # apply is used by swap! and other forms to call functions with arg lists def _apply(f, args): if isinstance(f, Lambda): return trampoline(eval_expr([f] + list(args), env)) return f(*args) sx_ref.__dict__["apply"] = _apply # cons needs to handle tuples from Python *args (swap! passes &rest as tuple) _orig_cons = sx_ref.PRIMITIVES.get("cons") def _cons(x, c): if isinstance(c, tuple): c = list(c) return [x] + (c or []) sx_ref.__dict__["cons"] = _cons sx_ref.PRIMITIVES["cons"] = _cons # Platform test functions _suite_stack: list[str] = [] _pass_count = 0 _fail_count = 0 def _try_call(thunk): try: trampoline(eval_expr([thunk], env)) return {"ok": True} except Exception as e: return {"ok": False, "error": str(e)} def _report_pass(name): global _pass_count _pass_count += 1 ctx = " > ".join(_suite_stack) print(f" PASS: {ctx} > {name}") return NIL def _report_fail(name, error): global _fail_count _fail_count += 1 ctx = " > ".join(_suite_stack) print(f" FAIL: {ctx} > {name}: {error}") return NIL def _push_suite(name): _suite_stack.append(name) print(f"{' ' * (len(_suite_stack)-1)}Suite: {name}") return NIL def _pop_suite(): if _suite_stack: _suite_stack.pop() return NIL env["try-call"] = _try_call env["report-pass"] = _report_pass env["report-fail"] = _report_fail env["push-suite"] = _push_suite env["pop-suite"] = _pop_suite # Signal functions are now pure SX (transpiled into sx_ref.py from signals.sx) # Wire both low-level dict-based signal functions and high-level API env["identical?"] = sx_ref.is_identical env["island?"] = lambda x: isinstance(x, Island) # Scope primitives (used by signals.sx for reactive tracking) env["scope-push!"] = scope_push env["scope-pop!"] = scope_pop env["context"] = sx_context # Low-level signal functions (now pure SX, transpiled from signals.sx) env["make-signal"] = sx_ref.make_signal env["signal?"] = sx_ref.is_signal env["signal-value"] = sx_ref.signal_value env["signal-set-value!"] = sx_ref.signal_set_value env["signal-subscribers"] = sx_ref.signal_subscribers env["signal-add-sub!"] = sx_ref.signal_add_sub env["signal-remove-sub!"] = sx_ref.signal_remove_sub env["signal-deps"] = sx_ref.signal_deps env["signal-set-deps!"] = sx_ref.signal_set_deps # Bootstrapped signal functions from sx_ref.py env["signal"] = sx_ref.signal env["deref"] = sx_ref.deref env["reset!"] = sx_ref.reset_b env["swap!"] = sx_ref.swap_b env["computed"] = sx_ref.computed env["effect"] = sx_ref.effect # batch has a bootstrapper issue with _batch_depth global variable access. # Wrap it to work correctly in the test context. def _batch(thunk): sx_ref._batch_depth = getattr(sx_ref, '_batch_depth', 0) + 1 sx_ref.cek_call(thunk, None) sx_ref._batch_depth -= 1 if sx_ref._batch_depth == 0: queue = list(sx_ref._batch_queue) sx_ref._batch_queue = [] seen = [] pending = [] for s in queue: for sub in sx_ref.signal_subscribers(s): if sub not in seen: seen.append(sub) pending.append(sub) for sub in pending: sub() return NIL env["batch"] = _batch env["notify-subscribers"] = sx_ref.notify_subscribers env["flush-subscribers"] = sx_ref.flush_subscribers env["dispose-computed"] = sx_ref.dispose_computed env["with-island-scope"] = sx_ref.with_island_scope env["register-in-scope"] = sx_ref.register_in_scope env["callable?"] = sx_ref.is_callable # Load test framework with open(os.path.join(_SPEC_TESTS, "test-framework.sx")) as f: for expr in parse_all(f.read()): trampoline(eval_expr(expr, env)) # Run tests print("=" * 60) print("Running test-signals.sx") print("=" * 60) with open(os.path.join(_WEB_TESTS, "test-signals.sx")) as f: for expr in parse_all(f.read()): trampoline(eval_expr(expr, env)) print("=" * 60) print(f"Results: {_pass_count} passed, {_fail_count} failed") print("=" * 60) sys.exit(1 if _fail_count > 0 else 0)