- Add \uXXXX unicode escape support to parser.py and parser.sx spec - Add char-from-code primitive (Python chr(), JS String.fromCharCode()) - Fix variadic infix operators in both bootstrappers (js.sx, py.sx) — (+ a b c d) was silently dropping terms, now left-folds correctly - Rebootstrap sx_ref.py and sx-browser.js with all fixes - Fix 3 pre-existing map-dict test failures in shared/sx/tests/run.py - Add live demos alongside examples in spreads essay (side-by-side layout) - Add scoped-effects plan: algebraic effects as unified foundation for spread/collect/island/lake/signal/context - Add foundations plan: CEK machine, the computational floor, three-axis model (depth/topology/linearity), Curry-Howard correspondence - Route both plans in page-functions.sx and nav-data.sx Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1057 lines
38 KiB
Python
1057 lines
38 KiB
Python
#!/usr/bin/env python3
|
|
"""Run SX test specs against the Python SX evaluator.
|
|
|
|
The Python evaluator parses and evaluates test specs — SX tests itself.
|
|
This script provides only platform functions (error catching, reporting).
|
|
|
|
Usage:
|
|
python shared/sx/tests/run.py # run all available specs
|
|
python shared/sx/tests/run.py eval # run only test-eval.sx
|
|
python shared/sx/tests/run.py eval parser router # run specific specs
|
|
python shared/sx/tests/run.py --legacy # run monolithic test.sx
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
|
|
_HERE = os.path.dirname(os.path.abspath(__file__))
|
|
_PROJECT = os.path.abspath(os.path.join(_HERE, "..", "..", ".."))
|
|
sys.path.insert(0, _PROJECT)
|
|
|
|
from shared.sx.parser import parse_all
|
|
from shared.sx.ref.sx_ref import eval_expr as _eval, trampoline as _trampoline, call_lambda as _call_lambda
|
|
from shared.sx.types import Symbol, Keyword, Lambda, NIL, Component, Island
|
|
|
|
# --- Test state ---
|
|
suite_stack: list[str] = []
|
|
passed = 0
|
|
failed = 0
|
|
test_num = 0
|
|
|
|
|
|
def try_call(thunk):
|
|
"""Call an SX thunk, catching errors."""
|
|
try:
|
|
_trampoline(_eval([thunk], env))
|
|
return {"ok": True}
|
|
except Exception as e:
|
|
return {"ok": False, "error": str(e)}
|
|
|
|
|
|
def report_pass(name):
|
|
global passed, test_num
|
|
test_num += 1
|
|
passed += 1
|
|
full_name = " > ".join(suite_stack + [name])
|
|
print(f"ok {test_num} - {full_name}")
|
|
|
|
|
|
def report_fail(name, error):
|
|
global failed, test_num
|
|
test_num += 1
|
|
failed += 1
|
|
full_name = " > ".join(suite_stack + [name])
|
|
print(f"not ok {test_num} - {full_name}")
|
|
print(f" # {error}")
|
|
|
|
|
|
def push_suite(name):
|
|
suite_stack.append(name)
|
|
|
|
|
|
def pop_suite():
|
|
suite_stack.pop()
|
|
|
|
|
|
# --- Parser platform functions ---
|
|
|
|
def sx_parse(source):
|
|
"""Parse SX source string into list of AST expressions."""
|
|
return parse_all(source)
|
|
|
|
|
|
def sx_serialize(val):
|
|
"""Serialize an AST value to SX source text."""
|
|
if val is None or val is NIL:
|
|
return "nil"
|
|
if isinstance(val, bool):
|
|
return "true" if val else "false"
|
|
if isinstance(val, (int, float)):
|
|
return str(val)
|
|
if isinstance(val, str):
|
|
escaped = val.replace("\\", "\\\\").replace('"', '\\"')
|
|
return f'"{escaped}"'
|
|
if isinstance(val, Symbol):
|
|
return val.name
|
|
if isinstance(val, Keyword):
|
|
return f":{val.name}"
|
|
if isinstance(val, list):
|
|
inner = " ".join(sx_serialize(x) for x in val)
|
|
return f"({inner})"
|
|
if isinstance(val, dict):
|
|
parts = []
|
|
for k, v in val.items():
|
|
parts.append(f":{k}")
|
|
parts.append(sx_serialize(v))
|
|
return "{" + " ".join(parts) + "}"
|
|
return str(val)
|
|
|
|
|
|
def make_symbol(name):
|
|
return Symbol(name)
|
|
|
|
|
|
def make_keyword(name):
|
|
return Keyword(name)
|
|
|
|
|
|
def symbol_name(sym):
|
|
if isinstance(sym, Symbol):
|
|
return sym.name
|
|
return str(sym)
|
|
|
|
|
|
def keyword_name(kw):
|
|
if isinstance(kw, Keyword):
|
|
return kw.name
|
|
return str(kw)
|
|
|
|
|
|
# --- Render platform function ---
|
|
|
|
def render_html(sx_source):
|
|
"""Parse SX source and render to HTML via the bootstrapped evaluator."""
|
|
try:
|
|
from shared.sx.ref.sx_ref import render_to_html as _render_to_html
|
|
except ImportError:
|
|
raise RuntimeError("render-to-html not available — sx_ref.py not built")
|
|
exprs = parse_all(sx_source)
|
|
# Use Env (not flat dict) so tests exercise the real scope chain path.
|
|
render_env = _Env(dict(env))
|
|
result = ""
|
|
for expr in exprs:
|
|
result += _render_to_html(expr, render_env)
|
|
return result
|
|
|
|
|
|
# --- Render SX (aser) platform function ---
|
|
|
|
def render_sx(sx_source):
|
|
"""Parse SX source and serialize to SX wire format via the bootstrapped evaluator."""
|
|
try:
|
|
from shared.sx.ref.sx_ref import aser as _aser, serialize as _serialize
|
|
except ImportError:
|
|
raise RuntimeError("aser not available — sx_ref.py not built")
|
|
exprs = parse_all(sx_source)
|
|
# Use Env (not flat dict) so tests exercise the real scope chain path.
|
|
# Using dict(env) hides bugs where merge() drops Env parent scopes.
|
|
render_env = _Env(dict(env))
|
|
result = ""
|
|
for expr in exprs:
|
|
val = _aser(expr, render_env)
|
|
if isinstance(val, str):
|
|
result += val
|
|
elif val is None or val is NIL:
|
|
pass
|
|
else:
|
|
result += _serialize(val)
|
|
return result
|
|
|
|
|
|
# --- Signal platform primitives ---
|
|
# Implements the signal runtime platform interface for testing signals.sx
|
|
|
|
class Signal:
|
|
"""A reactive signal container."""
|
|
__slots__ = ("value", "subscribers", "deps")
|
|
|
|
def __init__(self, value):
|
|
self.value = value
|
|
self.subscribers = [] # list of callables
|
|
self.deps = [] # list of Signal (for computed)
|
|
|
|
|
|
class TrackingContext:
|
|
"""Tracks signal dependencies during effect/computed evaluation."""
|
|
__slots__ = ("notify_fn", "deps")
|
|
|
|
def __init__(self, notify_fn):
|
|
self.notify_fn = notify_fn
|
|
self.deps = []
|
|
|
|
|
|
_tracking_context = [None] # mutable cell
|
|
|
|
|
|
def _make_signal(value):
|
|
s = Signal(value)
|
|
return s
|
|
|
|
|
|
def _signal_p(x):
|
|
return isinstance(x, Signal)
|
|
|
|
|
|
def _signal_value(s):
|
|
return s.value
|
|
|
|
|
|
def _signal_set_value(s, v):
|
|
s.value = v
|
|
return NIL
|
|
|
|
|
|
def _signal_subscribers(s):
|
|
return list(s.subscribers)
|
|
|
|
|
|
def _signal_add_sub(s, fn):
|
|
if fn not in s.subscribers:
|
|
s.subscribers.append(fn)
|
|
return NIL
|
|
|
|
|
|
def _signal_remove_sub(s, fn):
|
|
if fn in s.subscribers:
|
|
s.subscribers.remove(fn)
|
|
return NIL
|
|
|
|
|
|
def _signal_deps(s):
|
|
return list(s.deps)
|
|
|
|
|
|
def _signal_set_deps(s, deps):
|
|
s.deps = list(deps)
|
|
return NIL
|
|
|
|
|
|
def _set_tracking_context(ctx):
|
|
_tracking_context[0] = ctx
|
|
return NIL
|
|
|
|
|
|
def _get_tracking_context():
|
|
return _tracking_context[0] or NIL
|
|
|
|
|
|
def _make_tracking_context(notify_fn):
|
|
return TrackingContext(notify_fn)
|
|
|
|
|
|
def _tracking_context_deps(ctx):
|
|
if isinstance(ctx, TrackingContext):
|
|
return ctx.deps
|
|
return []
|
|
|
|
|
|
def _tracking_context_add_dep(ctx, s):
|
|
if isinstance(ctx, TrackingContext) and s not in ctx.deps:
|
|
ctx.deps.append(s)
|
|
return NIL
|
|
|
|
|
|
def _tracking_context_notify_fn(ctx):
|
|
if isinstance(ctx, TrackingContext):
|
|
return ctx.notify_fn
|
|
return NIL
|
|
|
|
|
|
def _identical(a, b):
|
|
return a is b
|
|
|
|
|
|
def _island_p(x):
|
|
return isinstance(x, Island)
|
|
|
|
|
|
def _make_island(name, params, has_children, body, closure):
|
|
return Island(
|
|
name=name,
|
|
params=list(params),
|
|
has_children=has_children,
|
|
body=body,
|
|
closure=dict(closure) if isinstance(closure, dict) else {},
|
|
)
|
|
|
|
|
|
# --- Spec registry ---
|
|
|
|
SPECS = {
|
|
"eval": {"file": "test-eval.sx", "needs": []},
|
|
"parser": {"file": "test-parser.sx", "needs": ["sx-parse"]},
|
|
"router": {"file": "test-router.sx", "needs": []},
|
|
"render": {"file": "test-render.sx", "needs": ["render-html"]},
|
|
"aser": {"file": "test-aser.sx", "needs": ["render-sx"]},
|
|
"deps": {"file": "test-deps.sx", "needs": []},
|
|
"engine": {"file": "test-engine.sx", "needs": []},
|
|
"orchestration": {"file": "test-orchestration.sx", "needs": []},
|
|
"signals": {"file": "test-signals.sx", "needs": ["make-signal"]},
|
|
"types": {"file": "test-types.sx", "needs": []},
|
|
}
|
|
|
|
REF_DIR = os.path.join(_HERE, "..", "ref")
|
|
|
|
|
|
def eval_file(filename, env):
|
|
"""Load and evaluate an SX file."""
|
|
filepath = os.path.join(REF_DIR, filename)
|
|
if not os.path.exists(filepath):
|
|
print(f"# SKIP {filename} (file not found)")
|
|
return
|
|
with open(filepath) as f:
|
|
src = f.read()
|
|
exprs = parse_all(src)
|
|
for expr in exprs:
|
|
_trampoline(_eval(expr, env))
|
|
|
|
|
|
# --- Build env ---
|
|
from shared.sx.env import Env as _Env
|
|
|
|
env = _Env({
|
|
"try-call": try_call,
|
|
"report-pass": report_pass,
|
|
"report-fail": report_fail,
|
|
"push-suite": push_suite,
|
|
"pop-suite": pop_suite,
|
|
# Parser platform functions
|
|
"sx-parse": sx_parse,
|
|
"sx-serialize": sx_serialize,
|
|
"make-symbol": make_symbol,
|
|
"make-keyword": make_keyword,
|
|
"symbol-name": symbol_name,
|
|
"keyword-name": keyword_name,
|
|
# Render platform functions
|
|
"render-html": render_html,
|
|
"render-sx": render_sx,
|
|
# Extra primitives needed by spec modules (router.sx, deps.sx)
|
|
"for-each-indexed": "_deferred", # replaced below
|
|
"dict-set!": "_deferred",
|
|
"dict-has?": "_deferred",
|
|
"dict-get": "_deferred",
|
|
"append!": "_deferred",
|
|
"inc": lambda n: n + 1,
|
|
# Component accessor for affinity (Phase 7)
|
|
"component-affinity": lambda c: getattr(c, 'affinity', 'auto'),
|
|
# Signal platform primitives
|
|
"make-signal": _make_signal,
|
|
"signal?": _signal_p,
|
|
"signal-value": _signal_value,
|
|
"signal-set-value!": _signal_set_value,
|
|
"signal-subscribers": _signal_subscribers,
|
|
"signal-add-sub!": _signal_add_sub,
|
|
"signal-remove-sub!": _signal_remove_sub,
|
|
"signal-deps": _signal_deps,
|
|
"signal-set-deps!": _signal_set_deps,
|
|
"set-tracking-context!": _set_tracking_context,
|
|
"get-tracking-context": _get_tracking_context,
|
|
"make-tracking-context": _make_tracking_context,
|
|
"tracking-context-deps": _tracking_context_deps,
|
|
"tracking-context-add-dep!": _tracking_context_add_dep,
|
|
"tracking-context-notify-fn": _tracking_context_notify_fn,
|
|
"identical?": _identical,
|
|
# Island platform primitives
|
|
"island?": _island_p,
|
|
"make-island": _make_island,
|
|
"component-name": lambda c: getattr(c, 'name', ''),
|
|
"component-params": lambda c: list(getattr(c, 'params', [])),
|
|
"component-body": lambda c: getattr(c, 'body', NIL),
|
|
"component-closure": lambda c: dict(getattr(c, 'closure', {})),
|
|
"component-has-children?": lambda c: getattr(c, 'has_children', False),
|
|
})
|
|
|
|
|
|
def _call_sx(fn, args, caller_env):
|
|
"""Call an SX lambda or native function with args."""
|
|
if isinstance(fn, Lambda):
|
|
return _trampoline(_call_lambda(fn, list(args), caller_env))
|
|
return fn(*args)
|
|
|
|
|
|
def _for_each_indexed(fn, coll):
|
|
"""for-each-indexed that respects set! in lambda closures.
|
|
|
|
The hand-written evaluator copies envs on lambda calls, which breaks
|
|
set! mutation of outer scope. We eval directly in the closure dict
|
|
to match the bootstrapped semantics (cell-based mutation).
|
|
"""
|
|
if isinstance(fn, Lambda):
|
|
closure = fn.closure
|
|
for i, item in enumerate(coll or []):
|
|
# Bind params directly in the closure (no copy)
|
|
for p, v in zip(fn.params, [i, item]):
|
|
closure[p] = v
|
|
_trampoline(_eval(fn.body, closure))
|
|
else:
|
|
for i, item in enumerate(coll or []):
|
|
fn(i, item)
|
|
return NIL
|
|
|
|
|
|
def _dict_set(d, k, v):
|
|
if isinstance(d, dict):
|
|
d[k] = v
|
|
return NIL
|
|
|
|
|
|
def _dict_has(d, k):
|
|
return isinstance(d, dict) and k in d
|
|
|
|
|
|
def _dict_get(d, k):
|
|
if isinstance(d, dict):
|
|
return d.get(k, NIL)
|
|
return NIL
|
|
|
|
|
|
def _append_mut(lst, item):
|
|
if isinstance(lst, list):
|
|
lst.append(item)
|
|
return NIL
|
|
|
|
|
|
env["for-each-indexed"] = _for_each_indexed
|
|
env["dict-set!"] = _dict_set
|
|
env["dict-has?"] = _dict_has
|
|
env["dict-get"] = _dict_get
|
|
env["append!"] = _append_mut
|
|
|
|
|
|
def _load_router_from_bootstrap(env):
|
|
"""Load router functions from the bootstrapped sx_ref.py.
|
|
|
|
The hand-written evaluator can't run router.sx faithfully because:
|
|
1. set! inside lambda closures doesn't propagate to outer scopes
|
|
2. Deep recursive function chains exceed Python stack depth
|
|
The bootstrapped code compiles to native Python, avoiding both issues.
|
|
|
|
Build sx_ref.py with --spec-modules=router to include these functions.
|
|
"""
|
|
try:
|
|
from shared.sx.ref.sx_ref import (
|
|
# Original route matching
|
|
split_path_segments,
|
|
parse_route_pattern,
|
|
match_route_segments,
|
|
match_route,
|
|
find_matching_route,
|
|
make_route_segment,
|
|
# SX URL conversion
|
|
sx_url_to_path,
|
|
_fn_to_segment,
|
|
# Relative URL resolution
|
|
resolve_relative_url,
|
|
relative_sx_url_p,
|
|
_normalize_relative,
|
|
_count_leading_dots,
|
|
_strip_trailing_close,
|
|
_last_index_of,
|
|
_pop_sx_url_level,
|
|
_pop_sx_url_levels,
|
|
# Keyword operations
|
|
_extract_innermost,
|
|
_find_keyword_value,
|
|
_find_kw_in_tokens,
|
|
_set_keyword_in_content,
|
|
_replace_kw_in_tokens,
|
|
_is_delta_value_p,
|
|
_apply_delta,
|
|
_apply_kw_pairs,
|
|
_apply_keywords_to_url,
|
|
_parse_relative_body,
|
|
_split_pos_kw,
|
|
# URL special forms
|
|
parse_sx_url,
|
|
url_special_form_p,
|
|
url_special_form_name,
|
|
url_special_form_inner,
|
|
_url_special_forms,
|
|
)
|
|
env["split-path-segments"] = split_path_segments
|
|
env["parse-route-pattern"] = parse_route_pattern
|
|
env["match-route-segments"] = match_route_segments
|
|
env["match-route"] = match_route
|
|
env["find-matching-route"] = find_matching_route
|
|
env["make-route-segment"] = make_route_segment
|
|
env["sx-url-to-path"] = sx_url_to_path
|
|
env["_fn-to-segment"] = _fn_to_segment
|
|
env["resolve-relative-url"] = resolve_relative_url
|
|
env["relative-sx-url?"] = relative_sx_url_p
|
|
env["_normalize-relative"] = _normalize_relative
|
|
env["_count-leading-dots"] = _count_leading_dots
|
|
env["_strip-trailing-close"] = _strip_trailing_close
|
|
env["_last-index-of"] = _last_index_of
|
|
env["_pop-sx-url-level"] = _pop_sx_url_level
|
|
env["_pop-sx-url-levels"] = _pop_sx_url_levels
|
|
env["_extract-innermost"] = _extract_innermost
|
|
env["_find-keyword-value"] = _find_keyword_value
|
|
env["_find-kw-in-tokens"] = _find_kw_in_tokens
|
|
env["_set-keyword-in-content"] = _set_keyword_in_content
|
|
env["_replace-kw-in-tokens"] = _replace_kw_in_tokens
|
|
env["_is-delta-value?"] = _is_delta_value_p
|
|
env["_apply-delta"] = _apply_delta
|
|
env["_apply-kw-pairs"] = _apply_kw_pairs
|
|
env["_apply-keywords-to-url"] = _apply_keywords_to_url
|
|
env["_parse-relative-body"] = _parse_relative_body
|
|
env["_split-pos-kw"] = _split_pos_kw
|
|
env["parse-sx-url"] = parse_sx_url
|
|
env["url-special-form?"] = url_special_form_p
|
|
env["url-special-form-name"] = url_special_form_name
|
|
env["url-special-form-inner"] = url_special_form_inner
|
|
env["_url-special-forms"] = _url_special_forms
|
|
except ImportError:
|
|
# Fallback: eval router.sx directly (may fail on set!/recursion)
|
|
eval_file("router.sx", env)
|
|
|
|
|
|
def _load_deps_from_bootstrap(env):
|
|
"""Load deps functions from the bootstrapped sx_ref.py."""
|
|
try:
|
|
from shared.sx.ref.sx_ref import (
|
|
scan_refs,
|
|
scan_components_from_source,
|
|
transitive_deps,
|
|
compute_all_deps,
|
|
components_needed,
|
|
page_component_bundle,
|
|
page_css_classes,
|
|
scan_io_refs,
|
|
transitive_io_refs,
|
|
compute_all_io_refs,
|
|
component_pure_p,
|
|
render_target,
|
|
page_render_plan,
|
|
)
|
|
env["scan-refs"] = scan_refs
|
|
env["scan-components-from-source"] = scan_components_from_source
|
|
env["transitive-deps"] = transitive_deps
|
|
env["compute-all-deps"] = compute_all_deps
|
|
env["components-needed"] = components_needed
|
|
env["page-component-bundle"] = page_component_bundle
|
|
env["page-css-classes"] = page_css_classes
|
|
env["scan-io-refs"] = scan_io_refs
|
|
env["transitive-io-refs"] = transitive_io_refs
|
|
env["compute-all-io-refs"] = compute_all_io_refs
|
|
env["component-pure?"] = component_pure_p
|
|
env["render-target"] = render_target
|
|
env["page-render-plan"] = page_render_plan
|
|
env["test-env"] = lambda: env
|
|
except ImportError:
|
|
eval_file("deps.sx", env)
|
|
env["test-env"] = lambda: env
|
|
|
|
|
|
def _load_engine_from_bootstrap(env):
|
|
"""Load engine pure functions from the bootstrapped sx_ref.py."""
|
|
try:
|
|
from shared.sx.ref.sx_ref import (
|
|
parse_time,
|
|
parse_trigger_spec,
|
|
default_trigger,
|
|
parse_swap_spec,
|
|
parse_retry_spec,
|
|
next_retry_ms,
|
|
filter_params,
|
|
)
|
|
env["parse-time"] = parse_time
|
|
env["parse-trigger-spec"] = parse_trigger_spec
|
|
env["default-trigger"] = default_trigger
|
|
env["parse-swap-spec"] = parse_swap_spec
|
|
env["parse-retry-spec"] = parse_retry_spec
|
|
env["next-retry-ms"] = next_retry_ms
|
|
env["filter-params"] = filter_params
|
|
except ImportError:
|
|
eval_file("engine.sx", env)
|
|
|
|
|
|
def _load_orchestration(env):
|
|
"""Load orchestration.sx with mocked platform functions for testing.
|
|
|
|
Orchestration defines many browser-wiring functions (DOM, fetch, etc.)
|
|
but the Phase 7c/7d tests only exercise the cache, optimistic, and
|
|
offline functions. Lambda bodies referencing DOM/fetch are never called,
|
|
so we only need to mock the functions actually invoked by the tests:
|
|
now-ms, log-info, log-warn, execute-action, try-rerender-page.
|
|
"""
|
|
_mock_ts = [1000] # mutable so mock can advance time
|
|
|
|
def _mock_now_ms():
|
|
return _mock_ts[0]
|
|
|
|
def _noop(*_a, **_kw):
|
|
return NIL
|
|
|
|
def _mock_execute_action(action, payload, on_success, on_error):
|
|
"""Mock: immediately calls on_success with payload as 'server truth'."""
|
|
_call_sx(on_success, [payload], env)
|
|
return NIL
|
|
|
|
def _dict_delete(d, k):
|
|
if isinstance(d, dict) and k in d:
|
|
del d[k]
|
|
return NIL
|
|
|
|
env["now-ms"] = _mock_now_ms
|
|
env["log-info"] = _noop
|
|
env["log-warn"] = _noop
|
|
env["execute-action"] = _mock_execute_action
|
|
env["try-rerender-page"] = _noop
|
|
env["persist-offline-data"] = _noop
|
|
env["retrieve-offline-data"] = lambda: NIL
|
|
env["dict-delete!"] = _dict_delete
|
|
# DOM / browser stubs (never called by tests, but referenced in lambdas
|
|
# that the evaluator might try to resolve at call time)
|
|
for stub in [
|
|
"try-parse-json", "dom-dispatch", "dom-query-selector",
|
|
"dom-get-attribute", "dom-set-attribute", "dom-set-text-content",
|
|
"dom-append", "dom-insert-html-adjacent", "dom-remove",
|
|
"dom-outer-html", "dom-inner-html", "dom-create-element",
|
|
"dom-set-inner-html", "dom-morph", "dom-get-tag",
|
|
"dom-query-selector-all", "dom-add-event-listener",
|
|
"dom-set-timeout", "dom-prevent-default", "dom-closest",
|
|
"dom-matches", "dom-get-id", "dom-set-id", "dom-form-data",
|
|
"dom-is-form", "browser-location-href", "browser-push-state",
|
|
"browser-replace-state", "sx-hydrate-elements", "render-to-dom",
|
|
"hoist-head-elements-full", "url-pathname",
|
|
]:
|
|
if stub not in env:
|
|
env[stub] = _noop
|
|
|
|
# Load engine.sx first (orchestration depends on it)
|
|
_load_engine_from_bootstrap(env)
|
|
# Load orchestration.sx
|
|
eval_file("orchestration.sx", env)
|
|
|
|
|
|
def _load_forms_from_bootstrap(env):
|
|
"""Load forms functions (including streaming protocol) from sx_ref.py."""
|
|
try:
|
|
from shared.sx.ref.sx_ref import (
|
|
stream_chunk_id,
|
|
stream_chunk_bindings,
|
|
normalize_binding_key,
|
|
bind_stream_chunk,
|
|
validate_stream_data,
|
|
)
|
|
env["stream-chunk-id"] = stream_chunk_id
|
|
env["stream-chunk-bindings"] = stream_chunk_bindings
|
|
env["normalize-binding-key"] = normalize_binding_key
|
|
env["bind-stream-chunk"] = bind_stream_chunk
|
|
env["validate-stream-data"] = validate_stream_data
|
|
except ImportError:
|
|
eval_file("forms.sx", env)
|
|
|
|
|
|
def _load_signals(env):
|
|
"""Load signals.sx spec — defines signal, deref, reset!, swap!, etc.
|
|
|
|
The hand-written evaluator doesn't support &rest in define/fn,
|
|
so we override swap! with a native implementation after loading.
|
|
"""
|
|
# callable? is needed by effect (to check if return value is cleanup fn)
|
|
env["callable?"] = lambda x: callable(x) or isinstance(x, Lambda)
|
|
eval_file("signals.sx", env)
|
|
|
|
# Override signal functions that need to call Lambda subscribers.
|
|
# The hand-written evaluator's Lambda objects can't be called directly
|
|
# from Python — they need _call_lambda. So we provide native versions
|
|
# of functions that bridge native→Lambda calls.
|
|
|
|
def _call_sx_fn(fn, args):
|
|
"""Call an SX function (Lambda or native) from Python."""
|
|
if isinstance(fn, Lambda):
|
|
return _trampoline(_call_lambda(fn, list(args), env))
|
|
if callable(fn):
|
|
return fn(*args)
|
|
return NIL
|
|
|
|
def _flush_subscribers(s):
|
|
for sub in list(s.subscribers):
|
|
_call_sx_fn(sub, [])
|
|
return NIL
|
|
|
|
def _notify_subscribers(s):
|
|
batch_depth = env.get("*batch-depth*", 0)
|
|
if batch_depth and batch_depth > 0:
|
|
batch_queue = env.get("*batch-queue*", [])
|
|
if s not in batch_queue:
|
|
batch_queue.append(s)
|
|
return NIL
|
|
_flush_subscribers(s)
|
|
return NIL
|
|
env["notify-subscribers"] = _notify_subscribers
|
|
env["flush-subscribers"] = _flush_subscribers
|
|
|
|
def _reset_bang(s, value):
|
|
if not isinstance(s, Signal):
|
|
return NIL
|
|
old = s.value
|
|
if old is not value:
|
|
s.value = value
|
|
_notify_subscribers(s)
|
|
return NIL
|
|
env["reset!"] = _reset_bang
|
|
|
|
def _swap_bang(s, f, *args):
|
|
if not isinstance(s, Signal):
|
|
return NIL
|
|
old = s.value
|
|
all_args = [old] + list(args)
|
|
new_val = _call_sx_fn(f, all_args)
|
|
if old is not new_val:
|
|
s.value = new_val
|
|
_notify_subscribers(s)
|
|
return NIL
|
|
env["swap!"] = _swap_bang
|
|
|
|
def _computed(compute_fn):
|
|
s = Signal(NIL)
|
|
|
|
def recompute():
|
|
# Unsubscribe from old deps
|
|
for dep in s.deps:
|
|
if recompute in dep.subscribers:
|
|
dep.subscribers.remove(recompute)
|
|
s.deps = []
|
|
|
|
# Create tracking context
|
|
ctx = TrackingContext(recompute)
|
|
prev = _tracking_context[0]
|
|
_tracking_context[0] = ctx
|
|
|
|
new_val = _call_sx_fn(compute_fn, [])
|
|
|
|
_tracking_context[0] = prev
|
|
s.deps = list(ctx.deps)
|
|
|
|
old = s.value
|
|
s.value = new_val
|
|
if old is not new_val:
|
|
_flush_subscribers(s)
|
|
|
|
recompute()
|
|
return s
|
|
env["computed"] = _computed
|
|
|
|
def _effect(effect_fn):
|
|
deps = []
|
|
disposed = [False]
|
|
cleanup_fn = [None]
|
|
|
|
def run_effect():
|
|
if disposed[0]:
|
|
return NIL
|
|
# Run previous cleanup
|
|
if cleanup_fn[0]:
|
|
_call_sx_fn(cleanup_fn[0], [])
|
|
cleanup_fn[0] = None
|
|
|
|
# Unsubscribe from old deps
|
|
for dep in deps:
|
|
if run_effect in dep.subscribers:
|
|
dep.subscribers.remove(run_effect)
|
|
deps.clear()
|
|
|
|
# Track new deps
|
|
ctx = TrackingContext(run_effect)
|
|
prev = _tracking_context[0]
|
|
_tracking_context[0] = ctx
|
|
|
|
result = _call_sx_fn(effect_fn, [])
|
|
|
|
_tracking_context[0] = prev
|
|
deps.extend(ctx.deps)
|
|
|
|
# If effect returns a callable, it's cleanup
|
|
if callable(result) or isinstance(result, Lambda):
|
|
cleanup_fn[0] = result
|
|
|
|
return NIL
|
|
|
|
run_effect()
|
|
|
|
def dispose():
|
|
disposed[0] = True
|
|
if cleanup_fn[0]:
|
|
_call_sx_fn(cleanup_fn[0], [])
|
|
for dep in deps:
|
|
if run_effect in dep.subscribers:
|
|
dep.subscribers.remove(run_effect)
|
|
deps.clear()
|
|
return NIL
|
|
|
|
return dispose
|
|
env["effect"] = _effect
|
|
|
|
def _batch(thunk):
|
|
depth = env.get("*batch-depth*", 0)
|
|
env["*batch-depth*"] = depth + 1
|
|
_call_sx_fn(thunk, [])
|
|
env["*batch-depth*"] = env["*batch-depth*"] - 1
|
|
if env["*batch-depth*"] == 0:
|
|
queue = env.get("*batch-queue*", [])
|
|
env["*batch-queue*"] = []
|
|
# Collect unique subscribers across all queued signals
|
|
seen = set()
|
|
pending = []
|
|
for s in queue:
|
|
for sub in s.subscribers:
|
|
sub_id = id(sub)
|
|
if sub_id not in seen:
|
|
seen.add(sub_id)
|
|
pending.append(sub)
|
|
# Notify each unique subscriber exactly once
|
|
for sub in pending:
|
|
_call_sx_fn(sub, [])
|
|
return NIL
|
|
env["batch"] = _batch
|
|
|
|
|
|
def _load_types(env):
|
|
"""Load types.sx spec — gradual type system."""
|
|
from shared.sx.types import Component
|
|
|
|
def _component_param_types(c):
|
|
return getattr(c, 'param_types', None)
|
|
|
|
def _component_set_param_types(c, d):
|
|
c.param_types = d
|
|
|
|
env["component-param-types"] = _component_param_types
|
|
env["component-set-param-types!"] = _component_set_param_types
|
|
|
|
# test-prim-types: a minimal type registry for testing
|
|
def _test_prim_types():
|
|
return {
|
|
"+": "number", "-": "number", "*": "number", "/": "number",
|
|
"mod": "number", "abs": "number", "floor": "number",
|
|
"ceil": "number", "round": "number", "min": "number",
|
|
"max": "number", "parse-int": "number", "parse-float": "number",
|
|
"=": "boolean", "!=": "boolean", "<": "boolean", ">": "boolean",
|
|
"<=": "boolean", ">=": "boolean",
|
|
"str": "string", "string-length": "number",
|
|
"substring": "string", "upcase": "string", "downcase": "string",
|
|
"trim": "string", "split": "list", "join": "string",
|
|
"string-contains?": "boolean", "starts-with?": "boolean",
|
|
"ends-with?": "boolean", "replace": "string",
|
|
"not": "boolean", "nil?": "boolean", "number?": "boolean",
|
|
"string?": "boolean", "list?": "boolean", "dict?": "boolean",
|
|
"boolean?": "boolean", "symbol?": "boolean", "empty?": "boolean",
|
|
"list": "list", "first": "any", "rest": "list", "nth": "any",
|
|
"last": "any", "cons": "list", "append": "list",
|
|
"reverse": "list", "len": "number", "contains?": "boolean",
|
|
"flatten": "list", "concat": "list", "slice": "list",
|
|
"range": "list", "sort": "list", "sort-by": "list",
|
|
"map": "list", "filter": "list", "reduce": "any",
|
|
"some": "boolean", "every?": "boolean",
|
|
"dict": "dict", "assoc": "dict", "dissoc": "dict",
|
|
"get": "any", "keys": "list", "vals": "list",
|
|
"has-key?": "boolean", "merge": "dict",
|
|
}
|
|
|
|
env["test-prim-types"] = _test_prim_types
|
|
|
|
# test-prim-param-types: param type signatures for primitive call checking
|
|
def _test_prim_param_types():
|
|
# Each entry: {"positional": [["name", "type"|None], ...], "rest-type": "type"|None}
|
|
return {
|
|
"+": {"positional": [], "rest-type": "number"},
|
|
"-": {"positional": [["a", "number"]], "rest-type": "number"},
|
|
"*": {"positional": [], "rest-type": "number"},
|
|
"/": {"positional": [["a", "number"], ["b", "number"]], "rest-type": None},
|
|
"mod": {"positional": [["a", "number"], ["b", "number"]], "rest-type": None},
|
|
"sqrt": {"positional": [["x", "number"]], "rest-type": None},
|
|
"pow": {"positional": [["x", "number"], ["n", "number"]], "rest-type": None},
|
|
"abs": {"positional": [["x", "number"]], "rest-type": None},
|
|
"floor": {"positional": [["x", "number"]], "rest-type": None},
|
|
"ceil": {"positional": [["x", "number"]], "rest-type": None},
|
|
"round": {"positional": [["x", "number"]], "rest-type": "number"},
|
|
"min": {"positional": [], "rest-type": "number"},
|
|
"max": {"positional": [], "rest-type": "number"},
|
|
"clamp": {"positional": [["x", "number"], ["lo", "number"], ["hi", "number"]], "rest-type": None},
|
|
"inc": {"positional": [["n", "number"]], "rest-type": None},
|
|
"dec": {"positional": [["n", "number"]], "rest-type": None},
|
|
"<": {"positional": [["a", "number"], ["b", "number"]], "rest-type": None},
|
|
">": {"positional": [["a", "number"], ["b", "number"]], "rest-type": None},
|
|
"<=": {"positional": [["a", "number"], ["b", "number"]], "rest-type": None},
|
|
">=": {"positional": [["a", "number"], ["b", "number"]], "rest-type": None},
|
|
"odd?": {"positional": [["n", "number"]], "rest-type": None},
|
|
"even?": {"positional": [["n", "number"]], "rest-type": None},
|
|
"zero?": {"positional": [["n", "number"]], "rest-type": None},
|
|
"upper": {"positional": [["s", "string"]], "rest-type": None},
|
|
"upcase": {"positional": [["s", "string"]], "rest-type": None},
|
|
"lower": {"positional": [["s", "string"]], "rest-type": None},
|
|
"downcase": {"positional": [["s", "string"]], "rest-type": None},
|
|
"string-length": {"positional": [["s", "string"]], "rest-type": None},
|
|
"substring": {"positional": [["s", "string"], ["start", "number"], ["end", "number"]], "rest-type": None},
|
|
"string-contains?": {"positional": [["s", "string"], ["needle", "string"]], "rest-type": None},
|
|
"trim": {"positional": [["s", "string"]], "rest-type": None},
|
|
"split": {"positional": [["s", "string"]], "rest-type": "string"},
|
|
"join": {"positional": [["sep", "string"], ["coll", "list"]], "rest-type": None},
|
|
"replace": {"positional": [["s", "string"], ["old", "string"], ["new", "string"]], "rest-type": None},
|
|
"index-of": {"positional": [["s", "string"], ["needle", "string"]], "rest-type": "number"},
|
|
"starts-with?": {"positional": [["s", "string"], ["prefix", "string"]], "rest-type": None},
|
|
"ends-with?": {"positional": [["s", "string"], ["suffix", "string"]], "rest-type": None},
|
|
"concat": {"positional": [], "rest-type": "list"},
|
|
"range": {"positional": [["start", "number"], ["end", "number"]], "rest-type": "number"},
|
|
"first": {"positional": [["coll", "list"]], "rest-type": None},
|
|
"last": {"positional": [["coll", "list"]], "rest-type": None},
|
|
"rest": {"positional": [["coll", "list"]], "rest-type": None},
|
|
"nth": {"positional": [["coll", "list"], ["n", "number"]], "rest-type": None},
|
|
"cons": {"positional": [["x", None], ["coll", "list"]], "rest-type": None},
|
|
"append": {"positional": [["coll", "list"]], "rest-type": None},
|
|
"append!": {"positional": [["coll", "list"]], "rest-type": None},
|
|
"reverse": {"positional": [["coll", "list"]], "rest-type": None},
|
|
"flatten": {"positional": [["coll", "list"]], "rest-type": None},
|
|
"chunk-every": {"positional": [["coll", "list"], ["n", "number"]], "rest-type": None},
|
|
"zip-pairs": {"positional": [["coll", "list"]], "rest-type": None},
|
|
"keys": {"positional": [["d", "dict"]], "rest-type": None},
|
|
"vals": {"positional": [["d", "dict"]], "rest-type": None},
|
|
"merge": {"positional": [], "rest-type": "dict"},
|
|
"has-key?": {"positional": [["d", "dict"]], "rest-type": None},
|
|
"assoc": {"positional": [["d", "dict"]], "rest-type": None},
|
|
"dissoc": {"positional": [["d", "dict"]], "rest-type": None},
|
|
"dict-set!": {"positional": [["d", "dict"]], "rest-type": None},
|
|
"format-date": {"positional": [["date-str", "string"], ["fmt", "string"]], "rest-type": None},
|
|
"format-decimal": {"positional": [["val", "number"]], "rest-type": "number"},
|
|
"parse-datetime": {"positional": [["s", "string"]], "rest-type": None},
|
|
"pluralize": {"positional": [["count", "number"]], "rest-type": "string"},
|
|
"escape": {"positional": [["s", "string"]], "rest-type": None},
|
|
"strip-tags": {"positional": [["s", "string"]], "rest-type": None},
|
|
"symbol-name": {"positional": [["sym", "symbol"]], "rest-type": None},
|
|
"keyword-name": {"positional": [["kw", "keyword"]], "rest-type": None},
|
|
"sx-parse": {"positional": [["source", "string"]], "rest-type": None},
|
|
}
|
|
|
|
env["test-prim-param-types"] = _test_prim_param_types
|
|
env["test-env"] = lambda: env
|
|
|
|
# Platform functions needed by types.sx check-body-walk
|
|
if "env-get" not in env:
|
|
env["env-get"] = lambda e, k: e.get(k) if hasattr(e, 'get') else None
|
|
if "env-has?" not in env:
|
|
env["env-has?"] = lambda e, k: k in e
|
|
if "dict-has?" not in env:
|
|
env["dict-has?"] = lambda d, k: k in d if isinstance(d, dict) else False
|
|
if "dict-get" not in env:
|
|
env["dict-get"] = lambda d, k, *default: d.get(k, default[0] if default else None) if isinstance(d, dict) else (default[0] if default else None)
|
|
# types.sx uses component-has-children (no ?), test runner has component-has-children?
|
|
if "component-has-children" not in env:
|
|
env["component-has-children"] = lambda c: getattr(c, 'has_children', False)
|
|
# types.sx uses map-dict for record type resolution
|
|
if "map-dict" not in env:
|
|
from shared.sx.types import Lambda as _Lambda
|
|
def _map_dict(fn, d):
|
|
result = {}
|
|
for k, v in d.items():
|
|
if isinstance(fn, _Lambda):
|
|
result[k] = _trampoline(_eval([fn, k, v], env))
|
|
else:
|
|
result[k] = fn(k, v)
|
|
return result
|
|
env["map-dict"] = _map_dict
|
|
|
|
# Try bootstrapped types first, fall back to eval
|
|
try:
|
|
from shared.sx.ref.sx_ref import (
|
|
subtype_p, type_union, narrow_type,
|
|
infer_type, check_component_call, check_component,
|
|
check_all, build_type_registry, type_any_p,
|
|
type_never_p, type_nullable_p, nullable_base,
|
|
narrow_exclude_nil, narrow_exclude,
|
|
)
|
|
env["subtype?"] = subtype_p
|
|
env["type-union"] = type_union
|
|
env["narrow-type"] = narrow_type
|
|
env["infer-type"] = infer_type
|
|
env["check-component-call"] = check_component_call
|
|
env["check-component"] = check_component
|
|
env["check-all"] = check_all
|
|
env["build-type-registry"] = build_type_registry
|
|
env["type-any?"] = type_any_p
|
|
env["type-never?"] = type_never_p
|
|
env["type-nullable?"] = type_nullable_p
|
|
env["nullable-base"] = nullable_base
|
|
env["narrow-exclude-nil"] = narrow_exclude_nil
|
|
env["narrow-exclude"] = narrow_exclude
|
|
except ImportError:
|
|
eval_file("types.sx", env)
|
|
|
|
|
|
def main():
|
|
global passed, failed, test_num
|
|
|
|
args = sys.argv[1:]
|
|
|
|
# Legacy mode
|
|
if args and args[0] == "--legacy":
|
|
print("TAP version 13")
|
|
eval_file("test.sx", env)
|
|
else:
|
|
# Determine which specs to run
|
|
specs_to_run = args if args else list(SPECS.keys())
|
|
|
|
print("TAP version 13")
|
|
|
|
# Always load framework first
|
|
eval_file("test-framework.sx", env)
|
|
|
|
for spec_name in specs_to_run:
|
|
spec = SPECS.get(spec_name)
|
|
if not spec:
|
|
print(f"# SKIP unknown spec: {spec_name}")
|
|
continue
|
|
|
|
# Check platform requirements
|
|
can_run = True
|
|
for need in spec["needs"]:
|
|
if need not in env:
|
|
print(f"# SKIP {spec_name} (missing: {need})")
|
|
can_run = False
|
|
break
|
|
if not can_run:
|
|
continue
|
|
|
|
# Load prerequisite spec modules
|
|
if spec_name == "eval":
|
|
_load_forms_from_bootstrap(env)
|
|
if spec_name == "router":
|
|
_load_router_from_bootstrap(env)
|
|
if spec_name == "deps":
|
|
_load_deps_from_bootstrap(env)
|
|
if spec_name == "engine":
|
|
_load_engine_from_bootstrap(env)
|
|
if spec_name == "orchestration":
|
|
_load_orchestration(env)
|
|
if spec_name == "signals":
|
|
_load_signals(env)
|
|
if spec_name == "types":
|
|
_load_types(env)
|
|
|
|
print(f"# --- {spec_name} ---")
|
|
eval_file(spec["file"], env)
|
|
|
|
# Reset render state after render/aser tests to avoid leaking
|
|
# into subsequent specs (bootstrapped evaluator checks render_active)
|
|
if spec_name in ("render", "aser"):
|
|
try:
|
|
from shared.sx.ref.sx_ref import set_render_active_b
|
|
set_render_active_b(False)
|
|
except ImportError:
|
|
pass
|
|
|
|
# Summary
|
|
print()
|
|
print(f"1..{test_num}")
|
|
print(f"# tests {passed + failed}")
|
|
print(f"# pass {passed}")
|
|
if failed > 0:
|
|
print(f"# fail {failed}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|