#!/usr/bin/env python3 """ test262-runner — run the official TC39 test262 suite against our JS-on-SX runtime. Walks lib/js/test262-upstream/test/**/*.js, parses YAML-ish frontmatter, batches tests through sx_server.exe, and emits a JSON + Markdown scoreboard. Usage: python3 lib/js/test262-runner.py # full run python3 lib/js/test262-runner.py --limit 2000 # first 2000 tests only python3 lib/js/test262-runner.py --filter built-ins/Math python3 lib/js/test262-runner.py --batch-size 200 # tests per sx_server boot Outputs: lib/js/test262-scoreboard.json — per-category stats + top failure modes lib/js/test262-scoreboard.md — human-readable summary (worst first) Pinned to commit (see test262-upstream/.git/HEAD after clone). Update: rm -rf lib/js/test262-upstream git -C lib/js clone --depth 1 https://github.com/tc39/test262.git test262-upstream Timeouts: per-test wallclock: 5s per-batch wallclock: 120s """ from __future__ import annotations import argparse import dataclasses import json import os import re import subprocess import sys import time from collections import Counter, defaultdict from pathlib import Path REPO = Path(__file__).resolve().parents[2] SX_SERVER = REPO / "hosts" / "ocaml" / "_build" / "default" / "bin" / "sx_server.exe" UPSTREAM = REPO / "lib" / "js" / "test262-upstream" TEST_ROOT = UPSTREAM / "test" HARNESS_DIR = UPSTREAM / "harness" # Default harness files every test implicitly gets (per INTERPRETING.md). DEFAULT_HARNESS = ["assert.js", "sta.js"] # Per-batch timeout (seconds). Each batch runs N tests; if sx_server hangs on # one, we kill the whole batch and mark remaining as timeout. BATCH_TIMEOUT_S = 120 # Per-test wallclock is enforced by slicing batches: if a batch of N tests # takes > PER_TEST_S * N + slack, it's killed. We also record elapsed time # per test by parsing the output stream. PER_TEST_S = 5 # Target batch size — tune to balance sx_server startup cost (~500ms) against # memory / risk of one bad test killing many. DEFAULT_BATCH_SIZE = 200 # --------------------------------------------------------------------------- # Frontmatter parsing # --------------------------------------------------------------------------- FRONTMATTER_RE = re.compile(r"/\*---(.*?)---\*/", re.DOTALL) @dataclasses.dataclass class Frontmatter: description: str = "" flags: list[str] = dataclasses.field(default_factory=list) includes: list[str] = dataclasses.field(default_factory=list) features: list[str] = dataclasses.field(default_factory=list) negative_phase: str | None = None negative_type: str | None = None esid: str | None = None es5id: str | None = None es6id: str | None = None def _parse_yaml_list(s: str) -> list[str]: """Parse a `[a, b, c]` style list. Loose — test262 YAML uses this form almost exclusively.""" s = s.strip() if s.startswith("[") and s.endswith("]"): s = s[1:-1] return [item.strip().strip('"').strip("'") for item in s.split(",") if item.strip()] def parse_frontmatter(src: str) -> Frontmatter: """Parse test262 YAML-ish frontmatter. Lenient — handles the subset actually in use.""" fm = Frontmatter() m = FRONTMATTER_RE.search(src) if not m: return fm body = m.group(1) # Walk lines, tracking indent for nested negative: {phase, type}. lines = body.split("\n") i = 0 current_key = None while i < len(lines): line = lines[i] stripped = line.strip() if not stripped or stripped.startswith("#"): i += 1 continue # Top-level key: value m2 = re.match(r"^([a-zA-Z_][a-zA-Z0-9_]*)\s*:\s*(.*)$", line) if m2 and not line.startswith(" ") and not line.startswith("\t"): key, value = m2.group(1), m2.group(2).strip() if key == "description": # Multi-line description supported via `>` or `|` if value in (">", "|"): desc_lines: list[str] = [] j = i + 1 while j < len(lines): nxt = lines[j] if nxt.startswith(" ") or nxt.startswith("\t") or not nxt.strip(): desc_lines.append(nxt.strip()) j += 1 else: break fm.description = " ".join(d for d in desc_lines if d) i = j continue fm.description = value elif key == "flags": fm.flags = _parse_yaml_list(value) elif key == "includes": fm.includes = _parse_yaml_list(value) elif key == "features": fm.features = _parse_yaml_list(value) elif key == "negative": # Either `negative: {phase: parse, type: SyntaxError}` (inline) # or spans two indented lines. if value.startswith("{"): # Inline dict inner = value.strip("{}") for part in inner.split(","): if ":" in part: pk, pv = part.split(":", 1) pk = pk.strip() pv = pv.strip().strip('"').strip("'") if pk == "phase": fm.negative_phase = pv elif pk == "type": fm.negative_type = pv else: current_key = "negative" elif key == "esid": fm.esid = value elif key == "es5id": fm.es5id = value elif key == "es6id": fm.es6id = value i += 1 continue # Indented continuation — e.g., negative: {phase:..., type:...} if current_key == "negative": m3 = re.match(r"^\s+([a-zA-Z_]+)\s*:\s*(.*)$", line) if m3: pk, pv = m3.group(1), m3.group(2).strip().strip('"').strip("'") if pk == "phase": fm.negative_phase = pv elif pk == "type": fm.negative_type = pv else: current_key = None i += 1 return fm # --------------------------------------------------------------------------- # Harness loading # --------------------------------------------------------------------------- _HARNESS_CACHE: dict[str, str] = {} def load_harness(name: str) -> str: if name not in _HARNESS_CACHE: p = HARNESS_DIR / name if p.exists(): _HARNESS_CACHE[name] = p.read_text(encoding="utf-8") else: _HARNESS_CACHE[name] = "" return _HARNESS_CACHE[name] # --------------------------------------------------------------------------- # Categories # --------------------------------------------------------------------------- def test_category(test_path: Path) -> str: """Derive a category like 'built-ins/Math' from the test path.""" rel = test_path.relative_to(TEST_ROOT).as_posix() parts = rel.split("/") # Use at most 2 levels; e.g. built-ins/Math/abs/foo.js → built-ins/Math if len(parts) >= 2: return "/".join(parts[:2]) return parts[0] # --------------------------------------------------------------------------- # SX escaping # --------------------------------------------------------------------------- def sx_escape_double(s: str) -> str: """Escape for a single SX string literal. Turn bytes that break SX parsing into escapes.""" return ( s.replace("\\", "\\\\") .replace('"', '\\"') .replace("\n", "\\n") .replace("\r", "\\r") .replace("\t", "\\t") ) def sx_double_escape(s: str) -> str: """Escape a JS source string for the nested `(eval "(js-eval \"...\")")` form. Two levels of SX string-literal escaping. Matches conformance.sh. """ inner = sx_escape_double(s) # The inner string gets consumed by the outer `(eval "...")`, so we need # to escape backslashes and quotes again. outer = inner.replace("\\", "\\\\").replace('"', '\\"') return outer # --------------------------------------------------------------------------- # Test assembly # --------------------------------------------------------------------------- # A tiny helper we prepend so assert.X = function syntax has a hope. The real # test262 assert.js does `assert.sameValue = function(...){}` which requires # function-property support. Our runtime doesn't have that yet, so many tests # will fail — that's the point of the scoreboard. # # We don't patch. We run the real harness as-is so the numbers reflect reality. def assemble_source(test_src: str, includes: list[str]) -> str: """Assemble the full JS source for a test: harness preludes + test.""" chunks: list[str] = [] for h in DEFAULT_HARNESS: chunks.append(load_harness(h)) for inc in includes: chunks.append(load_harness(inc)) chunks.append(test_src) return "\n".join(chunks) # --------------------------------------------------------------------------- # Output parsing # --------------------------------------------------------------------------- # Output from sx_server looks like: # (ready) # (ok 1 2) -- short value: (ok EPOCH VALUE) # (ok-len 100 42) -- long value: next line has the value # NEXT_LINE_WITH_VALUE # (error 101 "msg") -- epoch errored # # For our purposes, each test has an epoch. We look up the ok/error result # and classify as pass/fail. def parse_output(output: str) -> dict[int, tuple[str, str]]: """Return {epoch: (kind, payload)} where kind is 'ok' | 'error' | 'missing'.""" results: dict[int, tuple[str, str]] = {} lines = output.split("\n") i = 0 while i < len(lines): line = lines[i] m_ok = re.match(r"^\(ok (\d+) (.*)\)$", line) m_oklen = re.match(r"^\(ok-len (\d+) \d+\)$", line) m_err = re.match(r"^\(error (\d+) (.*)\)$", line) if m_ok: epoch = int(m_ok.group(1)) results[epoch] = ("ok", m_ok.group(2)) elif m_oklen: epoch = int(m_oklen.group(1)) val = lines[i + 1] if i + 1 < len(lines) else "" results[epoch] = ("ok", val) i += 1 elif m_err: epoch = int(m_err.group(1)) results[epoch] = ("error", m_err.group(2)) i += 1 return results # --------------------------------------------------------------------------- # Classification # --------------------------------------------------------------------------- def classify_error(msg: str) -> str: """Bucket an error message into a failure mode.""" m = msg.lower() if "syntaxerror" in m or "parse" in m or "expected" in m and "got" in m: return "SyntaxError (parse/unsupported syntax)" if "referenceerror" in m or "undefined symbol" in m or "unbound" in m: return "ReferenceError (undefined symbol)" if "typeerror" in m and "not a function" in m: return "TypeError: not a function" if "typeerror" in m: return "TypeError (other)" if "rangeerror" in m: return "RangeError" if "test262error" in m: return "Test262Error (assertion failed)" if "timeout" in m: return "Timeout" if "killed" in m or "crash" in m: return "Crash" if "unhandled exception" in m: # Could be almost anything — extract the inner message. inner = re.search(r"Unhandled exception:\s*\\?\"([^\"]{0,80})", msg) if inner: return f"Unhandled: {inner.group(1)[:60]}" return "Unhandled exception" return f"Other: {msg[:80]}" def classify_negative_result( fm: Frontmatter, kind: str, payload: str ) -> tuple[bool, str]: """For negative tests: pass if the right error was thrown.""" expected_type = fm.negative_type or "" if kind == "error": # We throw; check if it matches. Our error messages look like: # Unhandled exception: "...TypeError..." if expected_type and expected_type.lower() in payload.lower(): return True, f"negative: threw {expected_type} as expected" # Also consider "Test262Error" a match for anything (assertion failed # instead of throw) — some negative tests assert more than just the throw. return False, f"negative: expected {expected_type}, got: {payload[:100]}" # ok → the test ran without throwing; that's a fail for negative tests return False, f"negative: expected {expected_type}, but test completed normally" def classify_positive_result(kind: str, payload: str) -> tuple[bool, str]: """For positive tests: pass if no error thrown.""" if kind == "ok": return True, "passed" return False, classify_error(payload) # --------------------------------------------------------------------------- # Batch execution # --------------------------------------------------------------------------- @dataclasses.dataclass class TestCase: path: Path rel: str category: str fm: Frontmatter src: str # Test source (pre-harness); full source assembled at run time. @dataclasses.dataclass class TestResult: rel: str category: str status: str # pass | fail | skip | timeout reason: str elapsed_ms: int = 0 def build_batch_script(tests: list[TestCase], start_epoch: int) -> tuple[str, list[int]]: """Build one big SX script that loads the kernel once, then runs each test in its own epoch. Returns (script, [epoch_per_test]).""" lines = [] lines.append("(epoch 1)") lines.append('(load "lib/r7rs.sx")') lines.append("(epoch 2)") lines.append('(load "lib/js/lexer.sx")') lines.append("(epoch 3)") lines.append('(load "lib/js/parser.sx")') lines.append("(epoch 4)") lines.append('(load "lib/js/transpile.sx")') lines.append("(epoch 5)") lines.append('(load "lib/js/runtime.sx")') epochs: list[int] = [] epoch = start_epoch for t in tests: full_src = assemble_source(t.src, t.fm.includes) escaped = sx_double_escape(full_src) lines.append(f"(epoch {epoch})") lines.append(f'(eval "(js-eval \\"{escaped}\\")")') epochs.append(epoch) epoch += 1 return "\n".join(lines) + "\n", epochs def run_batch( tests: list[TestCase], start_epoch: int, timeout_s: int ) -> tuple[dict[int, tuple[str, str]], bool, float]: """Run a batch; return (results, timed_out, elapsed_s).""" script, epochs = build_batch_script(tests, start_epoch) start = time.monotonic() try: proc = subprocess.run( [str(SX_SERVER)], input=script, capture_output=True, text=True, timeout=timeout_s, cwd=str(REPO), ) elapsed = time.monotonic() - start return parse_output(proc.stdout), False, elapsed except subprocess.TimeoutExpired as e: elapsed = time.monotonic() - start # Partial output may still be parseable stdout = (e.stdout or b"").decode("utf-8", errors="replace") if isinstance(e.stdout, bytes) else (e.stdout or "") return parse_output(stdout), True, elapsed # --------------------------------------------------------------------------- # Main loop # --------------------------------------------------------------------------- def discover_tests(filter_prefix: str | None) -> list[Path]: """Walk test262/test/**/*.js, skipping _FIXTURE files and _FIXTURE dirs.""" tests: list[Path] = [] for p in TEST_ROOT.rglob("*.js"): if p.name.endswith("_FIXTURE.js"): continue if "_FIXTURE" in p.parts: continue if filter_prefix: rel = p.relative_to(TEST_ROOT).as_posix() if not rel.startswith(filter_prefix): continue tests.append(p) tests.sort() return tests def load_test(path: Path) -> TestCase | None: """Load + parse frontmatter. Returns None on read error.""" try: src = path.read_text(encoding="utf-8") except Exception: return None fm = parse_frontmatter(src) return TestCase( path=path, rel=path.relative_to(TEST_ROOT).as_posix(), category=test_category(path), fm=fm, src=src, ) def should_skip(t: TestCase) -> tuple[bool, str]: """Skip tests we know we can't run or are explicitly excluded.""" # Strict-mode tests — we don't support strict mode, so these are noise. if "onlyStrict" in t.fm.flags: return True, "strict-mode only (not supported)" # module flag — ESM tests not supported if "module" in t.fm.flags: return True, "ESM module (not supported)" # async tests time out easily without a proper event loop if "async" in t.fm.flags: # Let them run; the executor handles timeouts per-batch. pass # raw tests — they don't load the harness; we can't use assert.* at all. # Still run them — some raw tests just check syntax via parse. return False, "" def aggregate(results: list[TestResult]) -> dict: """Build the scoreboard dict.""" by_cat: dict[str, dict] = defaultdict( lambda: {"pass": 0, "fail": 0, "skip": 0, "timeout": 0, "total": 0, "failures": Counter()} ) totals = {"pass": 0, "fail": 0, "skip": 0, "timeout": 0, "total": 0} failure_modes: Counter[str] = Counter() for r in results: cat = by_cat[r.category] cat[r.status] += 1 cat["total"] += 1 totals[r.status] += 1 totals["total"] += 1 if r.status == "fail": cat["failures"][r.reason] += 1 failure_modes[r.reason] += 1 # Build the scoreboard categories = [] for name, stats in sorted(by_cat.items()): total = stats["total"] passed = stats["pass"] runnable = total - stats["skip"] pass_rate = (passed / runnable * 100.0) if runnable else 0.0 categories.append( { "category": name, "total": total, "pass": passed, "fail": stats["fail"], "skip": stats["skip"], "timeout": stats["timeout"], "pass_rate": round(pass_rate, 1), "top_failures": stats["failures"].most_common(5), } ) pass_rate = (totals["pass"] / (totals["total"] - totals["skip"]) * 100.0) if totals["total"] - totals["skip"] else 0.0 return { "totals": {**totals, "pass_rate": round(pass_rate, 1)}, "categories": categories, "top_failure_modes": failure_modes.most_common(20), } def write_markdown(scoreboard: dict, path: Path, pinned_commit: str) -> None: t = scoreboard["totals"] lines = [ "# test262 scoreboard", "", f"Pinned commit: `{pinned_commit}`", "", f"**Total:** {t['pass']}/{t['total']} passed ({t['pass_rate']}%), " f"{t['fail']} failed, {t['skip']} skipped, {t['timeout']} timeouts.", "", "## Top failure modes", "", ] for mode, count in scoreboard["top_failure_modes"]: lines.append(f"- **{count}x** {mode}") lines.extend(["", "## Categories (worst pass-rate first)", ""]) lines.append("| Category | Pass | Fail | Skip | Timeout | Total | Pass % |") lines.append("|---|---:|---:|---:|---:|---:|---:|") # Sort: worst pass rate first, breaking ties by total desc cats = sorted(scoreboard["categories"], key=lambda c: (c["pass_rate"], -c["total"])) for c in cats: lines.append( f"| {c['category']} | {c['pass']} | {c['fail']} | {c['skip']} | " f"{c['timeout']} | {c['total']} | {c['pass_rate']}% |" ) lines.append("") lines.append("## Per-category top failures") lines.append("") for c in cats: if not c["top_failures"]: continue lines.append(f"### {c['category']}") lines.append("") for reason, count in c["top_failures"]: lines.append(f"- **{count}x** {reason}") lines.append("") path.write_text("\n".join(lines), encoding="utf-8") def main(argv: list[str]) -> int: ap = argparse.ArgumentParser() ap.add_argument("--limit", type=int, default=0, help="max tests to run (0 = all)") ap.add_argument("--filter", type=str, default=None, help="path prefix filter") ap.add_argument("--batch-size", type=int, default=DEFAULT_BATCH_SIZE) ap.add_argument( "--output-json", type=str, default=str(REPO / "lib" / "js" / "test262-scoreboard.json"), ) ap.add_argument( "--output-md", type=str, default=str(REPO / "lib" / "js" / "test262-scoreboard.md"), ) ap.add_argument("--progress", action="store_true", help="print per-batch progress") args = ap.parse_args(argv) if not SX_SERVER.exists(): print(f"ERROR: sx_server.exe not found at {SX_SERVER}", file=sys.stderr) print("Build with: cd hosts/ocaml && dune build", file=sys.stderr) return 1 if not UPSTREAM.exists(): print(f"ERROR: test262-upstream not found at {UPSTREAM}", file=sys.stderr) print( "Clone with: cd lib/js && git clone --depth 1 " "https://github.com/tc39/test262.git test262-upstream", file=sys.stderr, ) return 1 pinned_commit = "" try: pinned_commit = subprocess.check_output( ["git", "-C", str(UPSTREAM), "rev-parse", "HEAD"], text=True ).strip() except Exception: pass all_paths = discover_tests(args.filter) if args.limit: all_paths = all_paths[: args.limit] print(f"Discovered {len(all_paths)} test files.", file=sys.stderr) # Load all (parse frontmatter, decide skips up front) tests: list[TestCase] = [] skipped: list[TestResult] = [] for p in all_paths: t = load_test(p) if not t: continue skip, why = should_skip(t) if skip: skipped.append( TestResult(rel=t.rel, category=t.category, status="skip", reason=why) ) continue tests.append(t) print( f"Will run {len(tests)} tests ({len(skipped)} skipped up front).", file=sys.stderr, ) results: list[TestResult] = list(skipped) batch_size = args.batch_size epoch_start = 100 n_batches = (len(tests) + batch_size - 1) // batch_size t_run_start = time.monotonic() for bi in range(n_batches): batch = tests[bi * batch_size : (bi + 1) * batch_size] timeout_s = min(BATCH_TIMEOUT_S, max(30, len(batch) * PER_TEST_S)) epoch_map, timed_out, elapsed = run_batch(batch, epoch_start, timeout_s) for idx, t in enumerate(batch): epoch = epoch_start + idx res = epoch_map.get(epoch) if res is None: # No result for this epoch — batch probably timed out before # reaching it, or sx_server died. status = "timeout" if timed_out else "fail" reason = "batch timeout before epoch" if timed_out else "no result from sx_server" results.append( TestResult( rel=t.rel, category=t.category, status=status, reason=reason ) ) continue kind, payload = res if t.fm.negative_phase: ok, why = classify_negative_result(t.fm, kind, payload) else: ok, why = classify_positive_result(kind, payload) results.append( TestResult( rel=t.rel, category=t.category, status="pass" if ok else "fail", reason=why, ) ) epoch_start += batch_size if args.progress or bi % 10 == 0: done_n = min((bi + 1) * batch_size, len(tests)) pass_so_far = sum(1 for r in results if r.status == "pass") print( f" [batch {bi + 1}/{n_batches}] {done_n}/{len(tests)} tests " f"{elapsed:.1f}s{' TIMEOUT' if timed_out else ''} " f"running-pass={pass_so_far}", file=sys.stderr, ) t_run_elapsed = time.monotonic() - t_run_start print(f"\nFinished run in {t_run_elapsed:.1f}s", file=sys.stderr) scoreboard = aggregate(results) scoreboard["pinned_commit"] = pinned_commit scoreboard["elapsed_seconds"] = round(t_run_elapsed, 1) # Per-test detail is too large — omit from JSON by default; the aggregated # scoreboard is what's useful. out_json = Path(args.output_json) out_json.parent.mkdir(parents=True, exist_ok=True) out_json.write_text(json.dumps(scoreboard, indent=2), encoding="utf-8") out_md = Path(args.output_md) write_markdown(scoreboard, out_md, pinned_commit) t = scoreboard["totals"] print( f"\nScoreboard: {t['pass']}/{t['total']} passed ({t['pass_rate']}%) " f"fail={t['fail']} skip={t['skip']} timeout={t['timeout']}", file=sys.stderr, ) print(f"JSON: {out_json}", file=sys.stderr) print(f"MD: {out_md}", file=sys.stderr) return 0 if __name__ == "__main__": sys.exit(main(sys.argv[1:]))