#!/usr/bin/env python3 """ test262-runner — run the official TC39 test262 suite against our JS-on-SX runtime. Walks lib/js/test262-upstream/test/**/*.js, parses YAML-ish frontmatter, runs tests via a long-lived sx_server.exe subprocess (one harness load, one `js-eval` call per test), and emits JSON + Markdown scoreboards. Usage: python3 lib/js/test262-runner.py # full run (skips strict/module/etc) python3 lib/js/test262-runner.py --limit 2000 python3 lib/js/test262-runner.py --filter built-ins/Math python3 lib/js/test262-runner.py --per-test-timeout 3 Outputs: lib/js/test262-scoreboard.json lib/js/test262-scoreboard.md Pinned to the commit currently checked out in test262-upstream/. Update: rm -rf lib/js/test262-upstream git -C lib/js clone --depth 1 https://github.com/tc39/test262.git test262-upstream Why a custom harness stub instead of assert.js + sta.js? Our JS parser doesn't handle `i++` yet, which the real assert.js uses. The stub here implements the assert entry points that >99% of tests actually touch (sameValue, notSameValue, throws, _isSameValue, _toString) plus Test262Error — using syntax our parser handles. Tests that reach into obscure assert.* paths will fail and show up on the scoreboard, which is the point. """ from __future__ import annotations import argparse import dataclasses import json import os import re import subprocess import sys import threading import time from collections import Counter, defaultdict from pathlib import Path REPO = Path(__file__).resolve().parents[2] SX_SERVER = REPO / "hosts" / "ocaml" / "_build" / "default" / "bin" / "sx_server.exe" UPSTREAM = REPO / "lib" / "js" / "test262-upstream" TEST_ROOT = UPSTREAM / "test" HARNESS_DIR = UPSTREAM / "harness" DEFAULT_PER_TEST_TIMEOUT_S = 5.0 DEFAULT_BATCH_TIMEOUT_S = 120 # --------------------------------------------------------------------------- # Harness stub — replaces assert.js + sta.js with something our parser handles. # --------------------------------------------------------------------------- HARNESS_STUB = r""" function Test262Error(message) { this.message = message || ""; this.name = "Test262Error"; } Test262Error.thrower = function (message) { throw new Test262Error(message); }; function $DONOTEVALUATE() { throw "Test262: This statement should not be evaluated."; } var assert = {}; assert._isSameValue = function (a, b) { if (a === b) { return (a !== 0) || ((1/a) === (1/b)); } return (a !== a) && (b !== b); }; assert._toString = function (v) { if (v === null) { return "null"; } if (v === undefined) { return "undefined"; } if (typeof v === "string") { return "\"" + v + "\""; } return "" + v; }; assert.sameValue = function (actual, expected, message) { if (assert._isSameValue(actual, expected)) { return; } var msg = message || ""; throw new Test262Error(msg + " Expected SameValue(" + assert._toString(actual) + ", " + assert._toString(expected) + ")"); }; assert.notSameValue = function (actual, unexpected, message) { if (!assert._isSameValue(actual, unexpected)) { return; } var msg = message || ""; throw new Test262Error(msg + " Expected different values, both were " + assert._toString(actual)); }; assert.throws = function (errCtor, fn, message) { var msg = message || ""; try { fn(); } catch (e) { if (typeof e !== "object" || e === null) { throw new Test262Error(msg + " thrown value not an object"); } if (e.constructor === errCtor) { return; } throw new Test262Error(msg + " expected " + errCtor.name + " got " + (e.name || "other")); } throw new Test262Error(msg + " no exception thrown, expected " + errCtor.name); }; assert.throws.early = function (errCtor, code) { // We can't truly early-parse so fall back to runtime throw check. throw new Test262Error("assert.throws.early not supported"); }; // assert() direct call — loose-check truthiness (not strict === true like real harness) var __assert_call__ = function (b, m) { if (b) { return; } throw new Test262Error(m || "assertion failed"); }; // compareArray stub — minimal for cases that only compareArray arrays of primitives assert.compareArray = function (a, b, m) { var msg = m || ""; if (a === b) { return; } if (a == null || b == null) { throw new Test262Error(msg + " compareArray null"); } if (a.length !== b.length) { throw new Test262Error(msg + " compareArray length differs"); } for (var i = 0; i < a.length; i = i + 1) { if (!assert._isSameValue(a[i], b[i])) { throw new Test262Error(msg + " compareArray index " + i); } } }; // propertyHelper stubs — verifyProperty checks just existence + value for now. var verifyProperty = function (obj, name, desc, opts) { if (desc && (desc.value !== undefined)) { assert.sameValue(obj[name], desc.value, name + " value"); } }; var verifyPrimordialProperty = verifyProperty; var verifyNotEnumerable = function (o, n) { }; var verifyNotWritable = function (o, n) { }; var verifyNotConfigurable = function (o, n) { }; var verifyEnumerable = function (o, n) { }; var verifyWritable = function (o, n) { }; var verifyConfigurable = function (o, n) { }; // isConstructor stub — we can't actually probe; assume falsy constructor for arrows/functions var isConstructor = function (f) { if (typeof f !== "function") { return false; } // Best-effort: built-in functions and arrows aren't; declared `function` decls are. return false; }; // Trivial helper for tests that use Array.isArray-like functionality // (many tests reach for it via compareArray) """ # --------------------------------------------------------------------------- # Frontmatter parsing # --------------------------------------------------------------------------- FRONTMATTER_RE = re.compile(r"/\*---(.*?)---\*/", re.DOTALL) @dataclasses.dataclass class Frontmatter: description: str = "" flags: list[str] = dataclasses.field(default_factory=list) includes: list[str] = dataclasses.field(default_factory=list) features: list[str] = dataclasses.field(default_factory=list) negative_phase: str | None = None negative_type: str | None = None esid: str | None = None def _parse_yaml_list(s: str) -> list[str]: s = s.strip() if s.startswith("[") and s.endswith("]"): s = s[1:-1] return [item.strip().strip('"').strip("'") for item in s.split(",") if item.strip()] def parse_frontmatter(src: str) -> Frontmatter: fm = Frontmatter() m = FRONTMATTER_RE.search(src) if not m: return fm body = m.group(1) lines = body.split("\n") i = 0 current_key = None while i < len(lines): line = lines[i] stripped = line.strip() if not stripped or stripped.startswith("#"): i += 1 continue m2 = re.match(r"^([a-zA-Z_][a-zA-Z0-9_]*)\s*:\s*(.*)$", line) if m2 and not line.startswith(" ") and not line.startswith("\t"): key, value = m2.group(1), m2.group(2).strip() if key == "description": if value in (">", "|"): desc_lines: list[str] = [] j = i + 1 while j < len(lines): nxt = lines[j] if nxt.startswith(" ") or nxt.startswith("\t") or not nxt.strip(): desc_lines.append(nxt.strip()) j += 1 else: break fm.description = " ".join(d for d in desc_lines if d) i = j continue fm.description = value elif key == "flags": fm.flags = _parse_yaml_list(value) elif key == "includes": fm.includes = _parse_yaml_list(value) elif key == "features": fm.features = _parse_yaml_list(value) elif key == "negative": if value.startswith("{"): inner = value.strip("{}") for part in inner.split(","): if ":" in part: pk, pv = part.split(":", 1) pk = pk.strip() pv = pv.strip().strip('"').strip("'") if pk == "phase": fm.negative_phase = pv elif pk == "type": fm.negative_type = pv else: current_key = "negative" elif key == "esid": fm.esid = value i += 1 continue if current_key == "negative": m3 = re.match(r"^\s+([a-zA-Z_]+)\s*:\s*(.*)$", line) if m3: pk, pv = m3.group(1), m3.group(2).strip().strip('"').strip("'") if pk == "phase": fm.negative_phase = pv elif pk == "type": fm.negative_type = pv else: current_key = None i += 1 return fm # --------------------------------------------------------------------------- # Categorisation # --------------------------------------------------------------------------- def test_category(test_path: Path) -> str: rel = test_path.relative_to(TEST_ROOT).as_posix() parts = rel.split("/") if len(parts) >= 2: return "/".join(parts[:2]) return parts[0] # --------------------------------------------------------------------------- # SX escaping — escape a JS source string for the nested `(eval "(js-eval \"...\")")` form # --------------------------------------------------------------------------- def sx_escape_for_nested_eval(s: str) -> str: """Return a string ready to be embedded as the JS source inside `(eval "(js-eval \"...\")")`. Two-level escape: the outer `(eval "...")` consumes one layer, the inner `(js-eval \"...\")` consumes another. """ # Level 1 — inside the inner string literal inner = ( s.replace("\\", "\\\\") .replace('"', '\\"') .replace("\n", "\\n") .replace("\r", "\\r") .replace("\t", "\\t") ) # Level 2 — the whole inner form is itself a string in the outer outer = inner.replace("\\", "\\\\").replace('"', '\\"') return outer # --------------------------------------------------------------------------- # Output parsing # --------------------------------------------------------------------------- # Server output forms: # (ready) # (ok N VALUE) -- single-line result # (ok-len N SIZE) -- next line is the result (multi-line or long) # VALUE # (error N "message") -- epoch errored # # We read line-by-line off stdout so we can advance tests one-at-a-time # and kill the server if it hangs. RX_OK_INLINE = re.compile(r"^\(ok (\d+) (.*)\)\s*$") RX_OK_LEN = re.compile(r"^\(ok-len (\d+) \d+\)\s*$") RX_ERR = re.compile(r"^\(error (\d+) (.*)\)\s*$") # --------------------------------------------------------------------------- # Classification # --------------------------------------------------------------------------- def classify_error(msg: str) -> str: m = msg.lower() if "expected" in m and "got" in m: return "SyntaxError (parse/unsupported syntax)" if "syntaxerror" in m or "parse" in m: return "SyntaxError (parse/unsupported syntax)" if "undefined symbol" in m or "unbound" in m: return "ReferenceError (undefined symbol)" if "referenceerror" in m: return "ReferenceError (undefined symbol)" if "typeerror" in m and "not a function" in m: return "TypeError: not a function" if "typeerror" in m: return "TypeError (other)" if "rangeerror" in m: return "RangeError" if "test262error" in m: return "Test262Error (assertion failed)" if "timeout" in m: return "Timeout" if "killed" in m or "crash" in m: return "Crash" if "unhandled exception" in m: inner = re.search(r"Unhandled exception:\s*\\?\"([^\"]{0,80})", msg) if inner: return f"Unhandled: {inner.group(1)[:60]}" return "Unhandled exception" return f"Other: {msg[:80]}" def classify_negative_result(fm: Frontmatter, kind: str, payload: str) -> tuple[bool, str]: expected_type = fm.negative_type or "" if kind == "error": if expected_type and expected_type.lower() in payload.lower(): return True, f"negative: threw {expected_type} as expected" return False, f"negative: expected {expected_type}, got: {payload[:100]}" return False, f"negative: expected {expected_type}, but test completed normally" def classify_positive_result(kind: str, payload: str) -> tuple[bool, str]: if kind == "ok": return True, "passed" return False, classify_error(payload) # --------------------------------------------------------------------------- # Skip rules # --------------------------------------------------------------------------- UNSUPPORTED_FEATURES = { "Atomics", "SharedArrayBuffer", "BigInt", "Proxy", "Reflect", "Reflect.construct", "Symbol", "Symbol.iterator", "Symbol.asyncIterator", "Symbol.hasInstance", "Symbol.isConcatSpreadable", "Symbol.match", "Symbol.matchAll", "Symbol.replace", "Symbol.search", "Symbol.species", "Symbol.split", "Symbol.toPrimitive", "Symbol.toStringTag", "Symbol.unscopables", "TypedArray", "DataView", "WeakRef", "WeakMap", "WeakSet", "FinalizationRegistry", "async-functions", # we support but conformance shape iffy "async-iteration", "async-generators", "generators", "regexp-named-groups", "regexp-unicode-property-escapes", "regexp-dotall", "regexp-lookbehind", "regexp-match-indices", "regexp-modifiers", "regexp-v-flag", "regexp-duplicate-named-groups", "numeric-separator-literal", "class-fields-private", "class-fields-public", "class-methods-private", "class-static-fields-private", "class-static-fields-public", "class-static-methods-private", "decorators", "destructuring-binding-patterns", "destructuring-assignment", "error-cause", "optional-chaining", "optional-catch-binding", "logical-assignment-operators", "numeric-separator-literal", "hashbang", "import-assertions", "import-attributes", "import.meta", "dynamic-import", "json-modules", "json-parse-with-source", "Intl.DisplayNames", "Intl.ListFormat", "Intl.Locale", "Intl.NumberFormat-unified", "Intl.Segmenter", "Intl-enumeration", "Temporal", "IteratorClose", "Iterator", "iterator-helpers", "async-explicit-resource-management", "explicit-resource-management", "set-methods", "Map.prototype.upsert", "array-grouping", "Array.fromAsync", "promise-with-resolvers", "Promise.try", "Promise.any", "Promise.allSettled", "ShadowRealm", "tail-call-optimization", "legacy-regexp", "uint8array-base64", } def should_skip(t: "TestCase") -> tuple[bool, str]: if "onlyStrict" in t.fm.flags: return True, "strict-mode only" if "module" in t.fm.flags: return True, "ESM module" if "raw" in t.fm.flags: return True, "raw (no harness)" if "CanBlockIsFalse" in t.fm.flags or "CanBlockIsTrue" in t.fm.flags: return True, "shared-memory flag" for f in t.fm.features: if f in UNSUPPORTED_FEATURES: return True, f"feature:{f}" # Skip anything under Intl/Temporal/etc. path — these categories are 100% unsupported p = t.rel for prefix in ( "intl402/", "staging/", "built-ins/Atomics/", "built-ins/SharedArrayBuffer/", "built-ins/BigInt/", "built-ins/Proxy/", "built-ins/Reflect/", "built-ins/Symbol/", "built-ins/WeakRef/", "built-ins/WeakMap/", "built-ins/WeakSet/", "built-ins/FinalizationRegistry/", "built-ins/TypedArrayConstructors/", "built-ins/Temporal/", "built-ins/Int8Array/", "built-ins/Int16Array/", "built-ins/Int32Array/", "built-ins/Uint8Array/", "built-ins/Uint8ClampedArray/", "built-ins/Uint16Array/", "built-ins/Uint32Array/", "built-ins/Float16Array/", "built-ins/Float32Array/", "built-ins/Float64Array/", "built-ins/BigInt64Array/", "built-ins/BigUint64Array/", "built-ins/DataView/", "built-ins/ArrayBuffer/", "built-ins/ArrayIteratorPrototype/", "built-ins/AsyncFromSyncIteratorPrototype/", "built-ins/AsyncGeneratorFunction/", "built-ins/AsyncGeneratorPrototype/", "built-ins/AsyncIteratorPrototype/", "built-ins/GeneratorFunction/", "built-ins/GeneratorPrototype/", "built-ins/MapIteratorPrototype/", "built-ins/SetIteratorPrototype/", "built-ins/StringIteratorPrototype/", "built-ins/RegExpStringIteratorPrototype/", "built-ins/AbstractModuleSource/", "built-ins/AggregateError/", "built-ins/DisposableStack/", "built-ins/AsyncDisposableStack/", "built-ins/SuppressedError/", "built-ins/Iterator/", "built-ins/AsyncIterator/", "built-ins/ShadowRealm/", "annexB/", ): if p.startswith(prefix): return True, f"unsupported path:{prefix.rstrip('/')}" return False, "" # --------------------------------------------------------------------------- # Test case loading # --------------------------------------------------------------------------- @dataclasses.dataclass class TestCase: path: Path rel: str category: str fm: Frontmatter src: str @dataclasses.dataclass class TestResult: rel: str category: str status: str # pass | fail | skip | timeout reason: str elapsed_ms: int = 0 def discover_tests(filter_prefixes: list[str] | None) -> list[Path]: tests: list[Path] = [] for p in TEST_ROOT.rglob("*.js"): if p.name.endswith("_FIXTURE.js"): continue if "_FIXTURE" in p.parts: continue if filter_prefixes: rel = p.relative_to(TEST_ROOT).as_posix() if not any(rel.startswith(prefix) for prefix in filter_prefixes): continue tests.append(p) tests.sort() return tests def load_test(path: Path) -> TestCase | None: try: src = path.read_text(encoding="utf-8") except Exception: return None fm = parse_frontmatter(src) return TestCase( path=path, rel=path.relative_to(TEST_ROOT).as_posix(), category=test_category(path), fm=fm, src=src, ) # --------------------------------------------------------------------------- # Long-lived server session # --------------------------------------------------------------------------- class ServerSession: """Wrap a long-lived sx_server.exe subprocess; feed it one-liner commands, collect results per-epoch. Restart on hang/crash. """ def __init__(self, per_test_timeout: float): self.per_test_timeout = per_test_timeout self.proc: subprocess.Popen | None = None self.lock = threading.Lock() def start(self) -> None: self.proc = subprocess.Popen( [str(SX_SERVER)], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, cwd=str(REPO), text=True, bufsize=1, ) self._wait_for("(ready)", timeout=10.0) # Load kernel libraries self._run_and_collect(1, '(load "lib/r7rs.sx")', timeout=30.0) self._run_and_collect(2, '(load "lib/js/lexer.sx")', timeout=30.0) self._run_and_collect(3, '(load "lib/js/parser.sx")', timeout=30.0) self._run_and_collect(4, '(load "lib/js/transpile.sx")', timeout=30.0) self._run_and_collect(5, '(load "lib/js/runtime.sx")', timeout=30.0) # Preload the stub harness as one big js-eval stub_escaped = sx_escape_for_nested_eval(HARNESS_STUB) self._run_and_collect( 6, f'(eval "(js-eval \\"{stub_escaped}\\")")', timeout=30.0, ) def stop(self) -> None: if self.proc is not None: try: self.proc.stdin.close() except Exception: pass try: self.proc.terminate() self.proc.wait(timeout=3) except Exception: try: self.proc.kill() except Exception: pass self.proc = None def _wait_for(self, token: str, timeout: float) -> None: assert self.proc and self.proc.stdout start = time.monotonic() while time.monotonic() - start < timeout: line = self.proc.stdout.readline() if not line: raise RuntimeError("sx_server closed stdout before ready") if token in line: return raise TimeoutError(f"timeout waiting for {token}") def _run_and_collect(self, epoch: int, cmd: str, timeout: float) -> tuple[str, str]: """Write `(epoch N)\n\n` and read until we see ok/ok-len/error for that epoch. Returns (kind, payload). Raises TimeoutError if the server hangs. """ assert self.proc and self.proc.stdin and self.proc.stdout self.proc.stdin.write(f"(epoch {epoch})\n{cmd}\n") self.proc.stdin.flush() deadline = time.monotonic() + timeout while time.monotonic() < deadline: remaining = deadline - time.monotonic() if remaining <= 0: raise TimeoutError(f"epoch {epoch} exceeded timeout {timeout}") line = self._readline_with_timeout(remaining) if not line: raise RuntimeError("sx_server closed stdout mid-epoch") m = RX_OK_INLINE.match(line) if m: e = int(m.group(1)) if e == epoch: return "ok", m.group(2) continue m = RX_OK_LEN.match(line) if m: e = int(m.group(1)) val = self._readline_with_timeout(remaining) if val is None: val = "" val = val.rstrip("\n") if e == epoch: return "ok", val continue m = RX_ERR.match(line) if m: e = int(m.group(1)) if e == epoch: return "error", m.group(2) continue # Other output — (ready), comment, noise — ignore raise TimeoutError(f"epoch {epoch} exceeded timeout {timeout}") def _readline_with_timeout(self, timeout: float) -> str | None: """Read one line with a timeout. On Linux we use a thread-wrapped read since there's no portable non-blocking readline on a subprocess pipe. """ assert self.proc and self.proc.stdout result: list[str | None] = [None] done = threading.Event() def reader() -> None: try: result[0] = self.proc.stdout.readline() # type: ignore[union-attr] except Exception: result[0] = None finally: done.set() th = threading.Thread(target=reader, daemon=True) th.start() done.wait(timeout=timeout) if not done.is_set(): # Hang — kill the process; caller will restart try: self.proc.kill() except Exception: pass raise TimeoutError("readline timeout") return result[0] def run_test(self, epoch: int, js_source: str) -> tuple[str, str]: escaped = sx_escape_for_nested_eval(js_source) cmd = f'(eval "(js-eval \\"{escaped}\\")")' return self._run_and_collect(epoch, cmd, timeout=self.per_test_timeout) # --------------------------------------------------------------------------- # Run driver # --------------------------------------------------------------------------- def assemble_source(t: TestCase) -> str: """Return JS source to feed to js-eval. Harness is preloaded, so we only append the test source (plus negative-test prep if needed). """ return t.src def aggregate(results: list[TestResult]) -> dict: by_cat: dict[str, dict] = defaultdict( lambda: {"pass": 0, "fail": 0, "skip": 0, "timeout": 0, "total": 0, "failures": Counter()} ) totals = {"pass": 0, "fail": 0, "skip": 0, "timeout": 0, "total": 0} failure_modes: Counter[str] = Counter() for r in results: cat = by_cat[r.category] cat[r.status] += 1 cat["total"] += 1 totals[r.status] += 1 totals["total"] += 1 if r.status == "fail": cat["failures"][r.reason] += 1 failure_modes[r.reason] += 1 elif r.status == "timeout": cat["failures"]["Timeout"] += 1 failure_modes["Timeout"] += 1 categories = [] for name, stats in sorted(by_cat.items()): total = stats["total"] passed = stats["pass"] runnable = total - stats["skip"] pass_rate = (passed / runnable * 100.0) if runnable else 0.0 categories.append( { "category": name, "total": total, "pass": passed, "fail": stats["fail"], "skip": stats["skip"], "timeout": stats["timeout"], "pass_rate": round(pass_rate, 1), "top_failures": stats["failures"].most_common(5), } ) runnable_total = totals["total"] - totals["skip"] pass_rate = (totals["pass"] / runnable_total * 100.0) if runnable_total else 0.0 return { "totals": {**totals, "runnable": runnable_total, "pass_rate": round(pass_rate, 1)}, "categories": categories, "top_failure_modes": failure_modes.most_common(20), } def write_markdown(scoreboard: dict, path: Path, pinned_commit: str, elapsed_s: float) -> None: t = scoreboard["totals"] lines = [ "# test262 scoreboard", "", f"Pinned commit: `{pinned_commit}`", f"Wall time: {elapsed_s:.1f}s", "", f"**Total:** {t['pass']}/{t['runnable']} runnable passed ({t['pass_rate']}%). " f"Raw: pass={t['pass']} fail={t['fail']} skip={t['skip']} timeout={t['timeout']} total={t['total']}.", "", "## Top failure modes", "", ] for mode, count in scoreboard["top_failure_modes"]: lines.append(f"- **{count}x** {mode}") lines.extend(["", "## Categories (worst pass-rate first, min 10 runnable)", ""]) lines.append("| Category | Pass | Fail | Skip | Timeout | Total | Pass % |") lines.append("|---|---:|---:|---:|---:|---:|---:|") cats = [c for c in scoreboard["categories"] if (c["total"] - c["skip"]) >= 10] cats.sort(key=lambda c: (c["pass_rate"], -c["total"])) for c in cats: lines.append( f"| {c['category']} | {c['pass']} | {c['fail']} | {c['skip']} | " f"{c['timeout']} | {c['total']} | {c['pass_rate']}% |" ) lines.append("") lines.append("## Per-category top failures (min 10 runnable, worst first)") lines.append("") for c in cats: if not c["top_failures"]: continue lines.append(f"### {c['category']} ({c['pass']}/{c['total']-c['skip']} — {c['pass_rate']}%)") lines.append("") for reason, count in c["top_failures"]: lines.append(f"- **{count}x** {reason}") lines.append("") path.write_text("\n".join(lines), encoding="utf-8") def main(argv: list[str]) -> int: ap = argparse.ArgumentParser() ap.add_argument("--limit", type=int, default=0, help="max tests to run (0 = all)") ap.add_argument("--filter", type=str, action="append", default=None, help="path prefix filter (repeatable; OR'd together)") ap.add_argument("--per-test-timeout", type=float, default=DEFAULT_PER_TEST_TIMEOUT_S) ap.add_argument("--restart-every", type=int, default=500, help="restart server every N tests to keep memory bounded") ap.add_argument("--max-per-category", type=int, default=0, help="cap runnable tests per category (0 = no cap)") ap.add_argument("--output-json", type=str, default=str(REPO / "lib" / "js" / "test262-scoreboard.json")) ap.add_argument("--output-md", type=str, default=str(REPO / "lib" / "js" / "test262-scoreboard.md")) ap.add_argument("--progress-every", type=int, default=100) args = ap.parse_args(argv) if not SX_SERVER.exists(): print(f"ERROR: sx_server.exe not found at {SX_SERVER}", file=sys.stderr) return 1 if not UPSTREAM.exists(): print(f"ERROR: test262-upstream not found at {UPSTREAM}", file=sys.stderr) return 1 pinned_commit = "" try: pinned_commit = subprocess.check_output( ["git", "-C", str(UPSTREAM), "rev-parse", "HEAD"], text=True ).strip() except Exception: pass all_paths = discover_tests(args.filter) if args.limit: all_paths = all_paths[: args.limit] print(f"Discovered {len(all_paths)} test files.", file=sys.stderr) tests: list[TestCase] = [] results: list[TestResult] = [] per_cat_count: dict[str, int] = defaultdict(int) for p in all_paths: t = load_test(p) if not t: continue skip, why = should_skip(t) if skip: results.append(TestResult(rel=t.rel, category=t.category, status="skip", reason=why)) continue if args.max_per_category > 0 and per_cat_count[t.category] >= args.max_per_category: results.append(TestResult(rel=t.rel, category=t.category, status="skip", reason=f"capped at --max-per-category={args.max_per_category}")) continue per_cat_count[t.category] += 1 tests.append(t) print(f"Will run {len(tests)} tests ({len(results)} skipped up front).", file=sys.stderr) t_run_start = time.monotonic() session: ServerSession | None = None def ensure_session() -> ServerSession: nonlocal session if session is None: session = ServerSession(per_test_timeout=args.per_test_timeout) session.start() return session def restart_session() -> None: nonlocal session if session is not None: session.stop() session = None epoch = 100 done_n = 0 try: for t in tests: epoch += 1 done_n += 1 source = assemble_source(t) try: sess = ensure_session() kind, payload = sess.run_test(epoch, source) if t.fm.negative_phase: ok, why = classify_negative_result(t.fm, kind, payload) else: ok, why = classify_positive_result(kind, payload) results.append( TestResult( rel=t.rel, category=t.category, status="pass" if ok else "fail", reason=why, ) ) except TimeoutError: results.append( TestResult(rel=t.rel, category=t.category, status="timeout", reason="per-test timeout") ) restart_session() except Exception as e: results.append( TestResult(rel=t.rel, category=t.category, status="fail", reason=f"runner-error: {e}") ) restart_session() # Periodic restart to keep server healthy if args.restart_every > 0 and done_n % args.restart_every == 0: restart_session() if done_n % args.progress_every == 0: pass_so_far = sum(1 for r in results if r.status == "pass") fail_so_far = sum(1 for r in results if r.status == "fail") to_so_far = sum(1 for r in results if r.status == "timeout") el = time.monotonic() - t_run_start print( f" [{done_n}/{len(tests)}] pass={pass_so_far} fail={fail_so_far} " f"timeout={to_so_far} elapsed={el:.1f}s " f"rate={done_n/max(el,0.001):.1f}/s", file=sys.stderr, ) finally: if session is not None: session.stop() t_run_elapsed = time.monotonic() - t_run_start print(f"\nFinished run in {t_run_elapsed:.1f}s", file=sys.stderr) scoreboard = aggregate(results) scoreboard["pinned_commit"] = pinned_commit scoreboard["elapsed_seconds"] = round(t_run_elapsed, 1) out_json = Path(args.output_json) out_json.parent.mkdir(parents=True, exist_ok=True) out_json.write_text(json.dumps(scoreboard, indent=2), encoding="utf-8") out_md = Path(args.output_md) write_markdown(scoreboard, out_md, pinned_commit, t_run_elapsed) t = scoreboard["totals"] print( f"\nScoreboard: {t['pass']}/{t['runnable']} runnable passed ({t['pass_rate']}%) " f"fail={t['fail']} skip={t['skip']} timeout={t['timeout']} total={t['total']}", file=sys.stderr, ) print(f"JSON: {out_json}", file=sys.stderr) print(f"MD: {out_md}", file=sys.stderr) return 0 if __name__ == "__main__": sys.exit(main(sys.argv[1:]))