#!/usr/bin/env python3 """Run SX test specs against the Python SX evaluator. The Python evaluator parses and evaluates test specs — SX tests itself. This script provides only platform functions (error catching, reporting). Usage: python shared/sx/tests/run.py # run all available specs python shared/sx/tests/run.py eval # run only test-eval.sx python shared/sx/tests/run.py eval parser router # run specific specs python shared/sx/tests/run.py --legacy # run monolithic test.sx """ from __future__ import annotations import os import sys _HERE = os.path.dirname(os.path.abspath(__file__)) _PROJECT = os.path.abspath(os.path.join(_HERE, "..", "..", "..")) sys.path.insert(0, _PROJECT) from shared.sx.parser import parse_all from shared.sx.evaluator import _eval, _trampoline, _call_lambda from shared.sx.types import Symbol, Keyword, Lambda, NIL, Component, Island # --- Test state --- suite_stack: list[str] = [] passed = 0 failed = 0 test_num = 0 def try_call(thunk): """Call an SX thunk, catching errors.""" try: _trampoline(_eval([thunk], env)) return {"ok": True} except Exception as e: return {"ok": False, "error": str(e)} def report_pass(name): global passed, test_num test_num += 1 passed += 1 full_name = " > ".join(suite_stack + [name]) print(f"ok {test_num} - {full_name}") def report_fail(name, error): global failed, test_num test_num += 1 failed += 1 full_name = " > ".join(suite_stack + [name]) print(f"not ok {test_num} - {full_name}") print(f" # {error}") def push_suite(name): suite_stack.append(name) def pop_suite(): suite_stack.pop() # --- Parser platform functions --- def sx_parse(source): """Parse SX source string into list of AST expressions.""" return parse_all(source) def sx_serialize(val): """Serialize an AST value to SX source text.""" if val is None or val is NIL: return "nil" if isinstance(val, bool): return "true" if val else "false" if isinstance(val, (int, float)): return str(val) if isinstance(val, str): escaped = val.replace("\\", "\\\\").replace('"', '\\"') return f'"{escaped}"' if isinstance(val, Symbol): return val.name if isinstance(val, Keyword): return f":{val.name}" if isinstance(val, list): inner = " ".join(sx_serialize(x) for x in val) return f"({inner})" if isinstance(val, dict): parts = [] for k, v in val.items(): parts.append(f":{k}") parts.append(sx_serialize(v)) return "{" + " ".join(parts) + "}" return str(val) def make_symbol(name): return Symbol(name) def make_keyword(name): return Keyword(name) def symbol_name(sym): if isinstance(sym, Symbol): return sym.name return str(sym) def keyword_name(kw): if isinstance(kw, Keyword): return kw.name return str(kw) # --- Render platform function --- def render_html(sx_source): """Parse SX source and render to HTML via the bootstrapped evaluator.""" try: from shared.sx.ref.sx_ref import render_to_html as _render_to_html except ImportError: raise RuntimeError("render-to-html not available — sx_ref.py not built") exprs = parse_all(sx_source) render_env = dict(env) result = "" for expr in exprs: result += _render_to_html(expr, render_env) return result # --- Signal platform primitives --- # Implements the signal runtime platform interface for testing signals.sx class Signal: """A reactive signal container.""" __slots__ = ("value", "subscribers", "deps") def __init__(self, value): self.value = value self.subscribers = [] # list of callables self.deps = [] # list of Signal (for computed) class TrackingContext: """Tracks signal dependencies during effect/computed evaluation.""" __slots__ = ("notify_fn", "deps") def __init__(self, notify_fn): self.notify_fn = notify_fn self.deps = [] _tracking_context = [None] # mutable cell def _make_signal(value): s = Signal(value) return s def _signal_p(x): return isinstance(x, Signal) def _signal_value(s): return s.value def _signal_set_value(s, v): s.value = v return NIL def _signal_subscribers(s): return list(s.subscribers) def _signal_add_sub(s, fn): if fn not in s.subscribers: s.subscribers.append(fn) return NIL def _signal_remove_sub(s, fn): if fn in s.subscribers: s.subscribers.remove(fn) return NIL def _signal_deps(s): return list(s.deps) def _signal_set_deps(s, deps): s.deps = list(deps) return NIL def _set_tracking_context(ctx): _tracking_context[0] = ctx return NIL def _get_tracking_context(): return _tracking_context[0] or NIL def _make_tracking_context(notify_fn): return TrackingContext(notify_fn) def _tracking_context_deps(ctx): if isinstance(ctx, TrackingContext): return ctx.deps return [] def _tracking_context_add_dep(ctx, s): if isinstance(ctx, TrackingContext) and s not in ctx.deps: ctx.deps.append(s) return NIL def _tracking_context_notify_fn(ctx): if isinstance(ctx, TrackingContext): return ctx.notify_fn return NIL def _identical(a, b): return a is b def _island_p(x): return isinstance(x, Island) def _make_island(name, params, has_children, body, closure): return Island( name=name, params=list(params), has_children=has_children, body=body, closure=dict(closure) if isinstance(closure, dict) else {}, ) # --- Spec registry --- SPECS = { "eval": {"file": "test-eval.sx", "needs": []}, "parser": {"file": "test-parser.sx", "needs": ["sx-parse"]}, "router": {"file": "test-router.sx", "needs": []}, "render": {"file": "test-render.sx", "needs": ["render-html"]}, "deps": {"file": "test-deps.sx", "needs": []}, "engine": {"file": "test-engine.sx", "needs": []}, "orchestration": {"file": "test-orchestration.sx", "needs": []}, "signals": {"file": "test-signals.sx", "needs": ["make-signal"]}, } REF_DIR = os.path.join(_HERE, "..", "ref") def eval_file(filename, env): """Load and evaluate an SX file.""" filepath = os.path.join(REF_DIR, filename) if not os.path.exists(filepath): print(f"# SKIP {filename} (file not found)") return with open(filepath) as f: src = f.read() exprs = parse_all(src) for expr in exprs: _trampoline(_eval(expr, env)) # --- Build env --- env = { "try-call": try_call, "report-pass": report_pass, "report-fail": report_fail, "push-suite": push_suite, "pop-suite": pop_suite, # Parser platform functions "sx-parse": sx_parse, "sx-serialize": sx_serialize, "make-symbol": make_symbol, "make-keyword": make_keyword, "symbol-name": symbol_name, "keyword-name": keyword_name, # Render platform function "render-html": render_html, # Extra primitives needed by spec modules (router.sx, deps.sx) "for-each-indexed": "_deferred", # replaced below "dict-set!": "_deferred", "dict-has?": "_deferred", "dict-get": "_deferred", "append!": "_deferred", "inc": lambda n: n + 1, # Component accessor for affinity (Phase 7) "component-affinity": lambda c: getattr(c, 'affinity', 'auto'), # Signal platform primitives "make-signal": _make_signal, "signal?": _signal_p, "signal-value": _signal_value, "signal-set-value!": _signal_set_value, "signal-subscribers": _signal_subscribers, "signal-add-sub!": _signal_add_sub, "signal-remove-sub!": _signal_remove_sub, "signal-deps": _signal_deps, "signal-set-deps!": _signal_set_deps, "set-tracking-context!": _set_tracking_context, "get-tracking-context": _get_tracking_context, "make-tracking-context": _make_tracking_context, "tracking-context-deps": _tracking_context_deps, "tracking-context-add-dep!": _tracking_context_add_dep, "tracking-context-notify-fn": _tracking_context_notify_fn, "identical?": _identical, # Island platform primitives "island?": _island_p, "make-island": _make_island, "component-name": lambda c: getattr(c, 'name', ''), "component-params": lambda c: list(getattr(c, 'params', [])), "component-body": lambda c: getattr(c, 'body', NIL), "component-closure": lambda c: dict(getattr(c, 'closure', {})), "component-has-children?": lambda c: getattr(c, 'has_children', False), } def _call_sx(fn, args, caller_env): """Call an SX lambda or native function with args.""" if isinstance(fn, Lambda): return _trampoline(_call_lambda(fn, list(args), caller_env)) return fn(*args) def _for_each_indexed(fn, coll): """for-each-indexed that respects set! in lambda closures. The hand-written evaluator copies envs on lambda calls, which breaks set! mutation of outer scope. We eval directly in the closure dict to match the bootstrapped semantics (cell-based mutation). """ if isinstance(fn, Lambda): closure = fn.closure for i, item in enumerate(coll or []): # Bind params directly in the closure (no copy) for p, v in zip(fn.params, [i, item]): closure[p] = v _trampoline(_eval(fn.body, closure)) else: for i, item in enumerate(coll or []): fn(i, item) return NIL def _dict_set(d, k, v): if isinstance(d, dict): d[k] = v return NIL def _dict_has(d, k): return isinstance(d, dict) and k in d def _dict_get(d, k): if isinstance(d, dict): return d.get(k, NIL) return NIL def _append_mut(lst, item): if isinstance(lst, list): lst.append(item) return NIL env["for-each-indexed"] = _for_each_indexed env["dict-set!"] = _dict_set env["dict-has?"] = _dict_has env["dict-get"] = _dict_get env["append!"] = _append_mut def _load_router_from_bootstrap(env): """Load router functions from the bootstrapped sx_ref.py. The hand-written evaluator can't run router.sx faithfully because set! inside lambda closures doesn't propagate to outer scopes (the evaluator uses dict copies, not cells). The bootstrapped code compiles set! to cell-based mutation, so we import from there. """ try: from shared.sx.ref.sx_ref import ( split_path_segments, parse_route_pattern, match_route_segments, match_route, find_matching_route, make_route_segment, ) env["split-path-segments"] = split_path_segments env["parse-route-pattern"] = parse_route_pattern env["match-route-segments"] = match_route_segments env["match-route"] = match_route env["find-matching-route"] = find_matching_route env["make-route-segment"] = make_route_segment except ImportError: # Fallback: eval router.sx directly (may fail on set! scoping) eval_file("router.sx", env) def _load_deps_from_bootstrap(env): """Load deps functions from the bootstrapped sx_ref.py.""" try: from shared.sx.ref.sx_ref import ( scan_refs, scan_components_from_source, transitive_deps, compute_all_deps, components_needed, page_component_bundle, page_css_classes, scan_io_refs, transitive_io_refs, compute_all_io_refs, component_pure_p, render_target, page_render_plan, ) env["scan-refs"] = scan_refs env["scan-components-from-source"] = scan_components_from_source env["transitive-deps"] = transitive_deps env["compute-all-deps"] = compute_all_deps env["components-needed"] = components_needed env["page-component-bundle"] = page_component_bundle env["page-css-classes"] = page_css_classes env["scan-io-refs"] = scan_io_refs env["transitive-io-refs"] = transitive_io_refs env["compute-all-io-refs"] = compute_all_io_refs env["component-pure?"] = component_pure_p env["render-target"] = render_target env["page-render-plan"] = page_render_plan env["test-env"] = lambda: env except ImportError: eval_file("deps.sx", env) env["test-env"] = lambda: env def _load_engine_from_bootstrap(env): """Load engine pure functions from the bootstrapped sx_ref.py.""" try: from shared.sx.ref.sx_ref import ( parse_time, parse_trigger_spec, default_trigger, parse_swap_spec, parse_retry_spec, next_retry_ms, filter_params, ) env["parse-time"] = parse_time env["parse-trigger-spec"] = parse_trigger_spec env["default-trigger"] = default_trigger env["parse-swap-spec"] = parse_swap_spec env["parse-retry-spec"] = parse_retry_spec env["next-retry-ms"] = next_retry_ms env["filter-params"] = filter_params except ImportError: eval_file("engine.sx", env) def _load_orchestration(env): """Load orchestration.sx with mocked platform functions for testing. Orchestration defines many browser-wiring functions (DOM, fetch, etc.) but the Phase 7c/7d tests only exercise the cache, optimistic, and offline functions. Lambda bodies referencing DOM/fetch are never called, so we only need to mock the functions actually invoked by the tests: now-ms, log-info, log-warn, execute-action, try-rerender-page. """ _mock_ts = [1000] # mutable so mock can advance time def _mock_now_ms(): return _mock_ts[0] def _noop(*_a, **_kw): return NIL def _mock_execute_action(action, payload, on_success, on_error): """Mock: immediately calls on_success with payload as 'server truth'.""" on_success(payload) return NIL def _dict_delete(d, k): if isinstance(d, dict) and k in d: del d[k] return NIL env["now-ms"] = _mock_now_ms env["log-info"] = _noop env["log-warn"] = _noop env["execute-action"] = _mock_execute_action env["try-rerender-page"] = _noop env["persist-offline-data"] = _noop env["retrieve-offline-data"] = lambda: NIL env["dict-delete!"] = _dict_delete # DOM / browser stubs (never called by tests, but referenced in lambdas # that the evaluator might try to resolve at call time) for stub in [ "try-parse-json", "dom-dispatch", "dom-query-selector", "dom-get-attribute", "dom-set-attribute", "dom-set-text-content", "dom-append", "dom-insert-html-adjacent", "dom-remove", "dom-outer-html", "dom-inner-html", "dom-create-element", "dom-set-inner-html", "dom-morph", "dom-get-tag", "dom-query-selector-all", "dom-add-event-listener", "dom-set-timeout", "dom-prevent-default", "dom-closest", "dom-matches", "dom-get-id", "dom-set-id", "dom-form-data", "dom-is-form", "browser-location-href", "browser-push-state", "browser-replace-state", "sx-hydrate-elements", "render-to-dom", "hoist-head-elements-full", "url-pathname", ]: if stub not in env: env[stub] = _noop # Load engine.sx first (orchestration depends on it) _load_engine_from_bootstrap(env) # Load orchestration.sx eval_file("orchestration.sx", env) def _load_forms_from_bootstrap(env): """Load forms functions (including streaming protocol) from sx_ref.py.""" try: from shared.sx.ref.sx_ref import ( stream_chunk_id, stream_chunk_bindings, normalize_binding_key, bind_stream_chunk, validate_stream_data, ) env["stream-chunk-id"] = stream_chunk_id env["stream-chunk-bindings"] = stream_chunk_bindings env["normalize-binding-key"] = normalize_binding_key env["bind-stream-chunk"] = bind_stream_chunk env["validate-stream-data"] = validate_stream_data except ImportError: eval_file("forms.sx", env) def _load_signals(env): """Load signals.sx spec — defines signal, deref, reset!, swap!, etc. The hand-written evaluator doesn't support &rest in define/fn, so we override swap! with a native implementation after loading. """ # callable? is needed by effect (to check if return value is cleanup fn) env["callable?"] = lambda x: callable(x) or isinstance(x, Lambda) eval_file("signals.sx", env) # Override signal functions that need to call Lambda subscribers. # The hand-written evaluator's Lambda objects can't be called directly # from Python — they need _call_lambda. So we provide native versions # of functions that bridge native→Lambda calls. def _call_sx_fn(fn, args): """Call an SX function (Lambda or native) from Python.""" if isinstance(fn, Lambda): return _trampoline(_call_lambda(fn, list(args), env)) if callable(fn): return fn(*args) return NIL def _flush_subscribers(s): for sub in list(s.subscribers): _call_sx_fn(sub, []) return NIL def _notify_subscribers(s): batch_depth = env.get("*batch-depth*", 0) if batch_depth and batch_depth > 0: batch_queue = env.get("*batch-queue*", []) if s not in batch_queue: batch_queue.append(s) return NIL _flush_subscribers(s) return NIL env["notify-subscribers"] = _notify_subscribers env["flush-subscribers"] = _flush_subscribers def _reset_bang(s, value): if not isinstance(s, Signal): return NIL old = s.value if old is not value: s.value = value _notify_subscribers(s) return NIL env["reset!"] = _reset_bang def _swap_bang(s, f, *args): if not isinstance(s, Signal): return NIL old = s.value all_args = [old] + list(args) new_val = _call_sx_fn(f, all_args) if old is not new_val: s.value = new_val _notify_subscribers(s) return NIL env["swap!"] = _swap_bang def _computed(compute_fn): s = Signal(NIL) def recompute(): # Unsubscribe from old deps for dep in s.deps: if recompute in dep.subscribers: dep.subscribers.remove(recompute) s.deps = [] # Create tracking context ctx = TrackingContext(recompute) prev = _tracking_context[0] _tracking_context[0] = ctx new_val = _call_sx_fn(compute_fn, []) _tracking_context[0] = prev s.deps = list(ctx.deps) old = s.value s.value = new_val if old is not new_val: _flush_subscribers(s) recompute() return s env["computed"] = _computed def _effect(effect_fn): deps = [] disposed = [False] cleanup_fn = [None] def run_effect(): if disposed[0]: return NIL # Run previous cleanup if cleanup_fn[0]: _call_sx_fn(cleanup_fn[0], []) cleanup_fn[0] = None # Unsubscribe from old deps for dep in deps: if run_effect in dep.subscribers: dep.subscribers.remove(run_effect) deps.clear() # Track new deps ctx = TrackingContext(run_effect) prev = _tracking_context[0] _tracking_context[0] = ctx result = _call_sx_fn(effect_fn, []) _tracking_context[0] = prev deps.extend(ctx.deps) # If effect returns a callable, it's cleanup if callable(result) or isinstance(result, Lambda): cleanup_fn[0] = result return NIL run_effect() def dispose(): disposed[0] = True if cleanup_fn[0]: _call_sx_fn(cleanup_fn[0], []) for dep in deps: if run_effect in dep.subscribers: dep.subscribers.remove(run_effect) deps.clear() return NIL return dispose env["effect"] = _effect def _batch(thunk): depth = env.get("*batch-depth*", 0) env["*batch-depth*"] = depth + 1 _call_sx_fn(thunk, []) env["*batch-depth*"] = env["*batch-depth*"] - 1 if env["*batch-depth*"] == 0: queue = env.get("*batch-queue*", []) env["*batch-queue*"] = [] # Collect unique subscribers across all queued signals seen = set() pending = [] for s in queue: for sub in s.subscribers: sub_id = id(sub) if sub_id not in seen: seen.add(sub_id) pending.append(sub) # Notify each unique subscriber exactly once for sub in pending: _call_sx_fn(sub, []) return NIL env["batch"] = _batch def main(): global passed, failed, test_num args = sys.argv[1:] # Legacy mode if args and args[0] == "--legacy": print("TAP version 13") eval_file("test.sx", env) else: # Determine which specs to run specs_to_run = args if args else list(SPECS.keys()) print("TAP version 13") # Always load framework first eval_file("test-framework.sx", env) for spec_name in specs_to_run: spec = SPECS.get(spec_name) if not spec: print(f"# SKIP unknown spec: {spec_name}") continue # Check platform requirements can_run = True for need in spec["needs"]: if need not in env: print(f"# SKIP {spec_name} (missing: {need})") can_run = False break if not can_run: continue # Load prerequisite spec modules if spec_name == "eval": _load_forms_from_bootstrap(env) if spec_name == "router": _load_router_from_bootstrap(env) if spec_name == "deps": _load_deps_from_bootstrap(env) if spec_name == "engine": _load_engine_from_bootstrap(env) if spec_name == "orchestration": _load_orchestration(env) if spec_name == "signals": _load_signals(env) print(f"# --- {spec_name} ---") eval_file(spec["file"], env) # Summary print() print(f"1..{test_num}") print(f"# tests {passed + failed}") print(f"# pass {passed}") if failed > 0: print(f"# fail {failed}") sys.exit(1) if __name__ == "__main__": main()