Files
rose-ash/lib/js/test262-runner.py
giles 7cffae2148 js-on-sx: exponent notation in js-string-to-number (+3 Number tests)
js-num-from-string now finds an e/E split, parses mantissa and exponent
separately, and combines via js-pow-int (positive-exp loop for >=0, 1/
reciprocal for negative). Previously `.12345e-3` parsed as 0.12345 and
"1e3" returned NaN — the parser walked decimals/dots only.

New helpers:
- js-find-exp-char / -loop : linear scan for e/E, returns -1 if absent
- js-pow-int base exp : integer-exp power, handles negative

Also fixed `js-string-trim` typo → `js-trim` in the rewritten num-from-
string, and corrected test 903's expected part count (3, not 2 — the
lexer has always split `hi ${x}!` into str+expr+str, the test just had
the wrong count).

Unit: 521/522 (was 520/522, 934 still blocked on SX \` escape).
Conformance: 148/148 unchanged.
Number scoreboard: 43/100 → 46/100 (+3).

Impacted test262 paths (sample): built-ins/Number/S9.3.1_A11.js and
A12/A16/A17 (".12345e-3", scientific notation round-trips).
2026-04-24 11:36:56 +00:00

1269 lines
46 KiB
Python

#!/usr/bin/env python3
"""
test262-runner — run the official TC39 test262 suite against our JS-on-SX runtime.
Walks lib/js/test262-upstream/test/**/*.js, parses YAML-ish frontmatter, runs
tests via a pool of long-lived sx_server.exe subprocesses (each worker loads
the harness once, then runs `js-eval` per test on a persistent stdin channel),
and emits JSON + Markdown scoreboards.
Usage:
python3 lib/js/test262-runner.py # full run (skips strict/module/etc)
python3 lib/js/test262-runner.py --limit 2000
python3 lib/js/test262-runner.py --filter built-ins/Math
python3 lib/js/test262-runner.py --per-test-timeout 3
python3 lib/js/test262-runner.py --workers 4 # parallel workers (default: 2)
Outputs:
lib/js/test262-scoreboard.json
lib/js/test262-scoreboard.md
Pinned to the commit currently checked out in test262-upstream/. Update:
rm -rf lib/js/test262-upstream
git -C lib/js clone --depth 1 https://github.com/tc39/test262.git test262-upstream
Why a custom harness stub instead of assert.js + sta.js?
Our JS parser doesn't handle `i++` yet, which the real assert.js uses. The
stub here implements the assert entry points that >99% of tests actually
touch (sameValue, notSameValue, throws, _isSameValue, _toString) plus
Test262Error — using syntax our parser handles. Tests that reach into
obscure assert.* paths will fail and show up on the scoreboard, which is
the point.
"""
from __future__ import annotations
import argparse
import dataclasses
import json
import multiprocessing as mp
import os
import re
import select
import subprocess
import sys
import time
from collections import Counter, defaultdict
from pathlib import Path
REPO = Path(__file__).resolve().parents[2]
SX_SERVER = REPO / "hosts" / "ocaml" / "_build" / "default" / "bin" / "sx_server.exe"
UPSTREAM = REPO / "lib" / "js" / "test262-upstream"
TEST_ROOT = UPSTREAM / "test"
HARNESS_DIR = UPSTREAM / "harness"
DEFAULT_PER_TEST_TIMEOUT_S = 5.0
DEFAULT_BATCH_TIMEOUT_S = 120
# Cache dir for precomputed SX source of harness JS (one file per Python run).
# Written once in main(), read via (load ...) by every worker session.
HARNESS_CACHE_DIR = REPO / "lib" / "js" / ".harness-cache"
# ---------------------------------------------------------------------------
# Harness stub — replaces assert.js + sta.js with something our parser handles.
# ---------------------------------------------------------------------------
HARNESS_STUB = r"""
function Test262Error(message) {
this.message = message || "";
this.name = "Test262Error";
}
Test262Error.thrower = function (message) { throw new Test262Error(message); };
function $DONOTEVALUATE() { throw "Test262: This statement should not be evaluated."; }
var assert = {};
assert._isSameValue = function (a, b) {
if (a === b) { return (a !== 0) || ((1/a) === (1/b)); }
return (a !== a) && (b !== b);
};
assert._toString = function (v) {
if (v === null) { return "null"; }
if (v === undefined) { return "undefined"; }
if (typeof v === "string") { return "\"" + v + "\""; }
return "" + v;
};
assert.sameValue = function (actual, expected, message) {
if (assert._isSameValue(actual, expected)) { return; }
var msg = message || "";
throw new Test262Error(msg + " Expected SameValue(" + assert._toString(actual) + ", " + assert._toString(expected) + ")");
};
assert.notSameValue = function (actual, unexpected, message) {
if (!assert._isSameValue(actual, unexpected)) { return; }
var msg = message || "";
throw new Test262Error(msg + " Expected different values, both were " + assert._toString(actual));
};
assert.throws = function (errCtor, fn, message) {
var msg = message || "";
try { fn(); } catch (e) {
if (typeof e !== "object" || e === null) {
throw new Test262Error(msg + " thrown value not an object");
}
if (e.constructor === errCtor) { return; }
throw new Test262Error(msg + " expected " + errCtor.name + " got " + (e.name || "other"));
}
throw new Test262Error(msg + " no exception thrown, expected " + errCtor.name);
};
assert.throws.early = function (errCtor, code) {
// We can't truly early-parse so fall back to runtime throw check.
throw new Test262Error("assert.throws.early not supported");
};
// assert() direct call — loose-check truthiness (not strict === true like real harness)
var __assert_call__ = function (b, m) {
if (b) { return; }
throw new Test262Error(m || "assertion failed");
};
// Make `assert` itself callable — many tests write `assert(x, "msg")`.
assert.__callable__ = __assert_call__;
// compareArray stub — minimal for cases that only compareArray arrays of primitives
assert.compareArray = function (a, b, m) {
var msg = m || "";
if (a === b) { return; }
if (a == null || b == null) { throw new Test262Error(msg + " compareArray null"); }
if (a.length !== b.length) { throw new Test262Error(msg + " compareArray length differs"); }
for (var i = 0; i < a.length; i = i + 1) {
if (!assert._isSameValue(a[i], b[i])) {
throw new Test262Error(msg + " compareArray index " + i);
}
}
};
// propertyHelper stubs — verifyProperty checks just existence + value for now.
var verifyProperty = function (obj, name, desc, opts) {
if (desc && (desc.value !== undefined)) {
assert.sameValue(obj[name], desc.value, name + " value");
}
};
var verifyPrimordialProperty = verifyProperty;
var verifyNotEnumerable = function (o, n, v, w, x) { };
var verifyNotWritable = function (o, n, v, w, x) { };
var verifyNotConfigurable = function (o, n, v, w, x) { };
var verifyEnumerable = function (o, n, v, w, x) { };
var verifyWritable = function (o, n, v, w, x) { };
var verifyConfigurable = function (o, n, v, w, x) { };
// isConstructor stub — we can't actually probe; assume falsy constructor for arrows/functions
var isConstructor = function (f) {
if (typeof f !== "function") { return false; }
// Best-effort: built-in functions and arrows aren't; declared `function` decls are.
return false;
};
// Trivial helper for tests that use Array.isArray-like functionality
// (many tests reach for it via compareArray)
"""
# ---------------------------------------------------------------------------
# Frontmatter parsing
# ---------------------------------------------------------------------------
FRONTMATTER_RE = re.compile(r"/\*---(.*?)---\*/", re.DOTALL)
@dataclasses.dataclass
class Frontmatter:
description: str = ""
flags: list = dataclasses.field(default_factory=list)
includes: list = dataclasses.field(default_factory=list)
features: list = dataclasses.field(default_factory=list)
negative_phase: "str | None" = None
negative_type: "str | None" = None
esid: "str | None" = None
def _parse_yaml_list(s: str) -> list:
s = s.strip()
if s.startswith("[") and s.endswith("]"):
s = s[1:-1]
return [item.strip().strip('"').strip("'") for item in s.split(",") if item.strip()]
def parse_frontmatter(src: str) -> Frontmatter:
fm = Frontmatter()
m = FRONTMATTER_RE.search(src)
if not m:
return fm
body = m.group(1)
lines = body.split("\n")
i = 0
current_key = None
while i < len(lines):
line = lines[i]
stripped = line.strip()
if not stripped or stripped.startswith("#"):
i += 1
continue
m2 = re.match(r"^([a-zA-Z_][a-zA-Z0-9_]*)\s*:\s*(.*)$", line)
if m2 and not line.startswith(" ") and not line.startswith("\t"):
key, value = m2.group(1), m2.group(2).strip()
if key == "description":
if value in (">", "|"):
desc_lines = []
j = i + 1
while j < len(lines):
nxt = lines[j]
if nxt.startswith(" ") or nxt.startswith("\t") or not nxt.strip():
desc_lines.append(nxt.strip())
j += 1
else:
break
fm.description = " ".join(d for d in desc_lines if d)
i = j
continue
fm.description = value
elif key == "flags":
fm.flags = _parse_yaml_list(value)
elif key == "includes":
fm.includes = _parse_yaml_list(value)
elif key == "features":
fm.features = _parse_yaml_list(value)
elif key == "negative":
if value.startswith("{"):
inner = value.strip("{}")
for part in inner.split(","):
if ":" in part:
pk, pv = part.split(":", 1)
pk = pk.strip()
pv = pv.strip().strip('"').strip("'")
if pk == "phase":
fm.negative_phase = pv
elif pk == "type":
fm.negative_type = pv
else:
current_key = "negative"
elif key == "esid":
fm.esid = value
i += 1
continue
if current_key == "negative":
m3 = re.match(r"^\s+([a-zA-Z_]+)\s*:\s*(.*)$", line)
if m3:
pk, pv = m3.group(1), m3.group(2).strip().strip('"').strip("'")
if pk == "phase":
fm.negative_phase = pv
elif pk == "type":
fm.negative_type = pv
else:
current_key = None
i += 1
return fm
# ---------------------------------------------------------------------------
# Categorisation
# ---------------------------------------------------------------------------
def test_category(test_path: Path) -> str:
rel = test_path.relative_to(TEST_ROOT).as_posix()
parts = rel.split("/")
if len(parts) >= 2:
return "/".join(parts[:2])
return parts[0]
# ---------------------------------------------------------------------------
# SX escaping — escape a JS source string for the nested `(eval "(js-eval \"...\")")` form
# ---------------------------------------------------------------------------
def sx_escape_for_nested_eval(s: str) -> str:
"""Return a string ready to be embedded as the JS source inside
`(eval "(js-eval \"...\")")`. Two-level escape: the outer `(eval "...")`
consumes one layer, the inner `(js-eval \"...\")` consumes another.
"""
# Level 1 — inside the inner string literal
inner = (
s.replace("\\", "\\\\")
.replace('"', '\\"')
.replace("\n", "\\n")
.replace("\r", "\\r")
.replace("\t", "\\t")
)
# Level 2 — the whole inner form is itself a string in the outer
outer = inner.replace("\\", "\\\\").replace('"', '\\"')
return outer
# ---------------------------------------------------------------------------
# Output parsing
# ---------------------------------------------------------------------------
# Server output forms:
# (ready)
# (ok N VALUE) -- single-line result
# (ok-len N SIZE) -- next line is the result (multi-line or long)
# VALUE
# (error N "message") -- epoch errored
#
# We read line-by-line off stdout so we can advance tests one-at-a-time
# and kill the server if it hangs.
RX_OK_INLINE = re.compile(r"^\(ok (\d+) (.*)\)\s*$")
RX_OK_LEN = re.compile(r"^\(ok-len (\d+) \d+\)\s*$")
RX_ERR = re.compile(r"^\(error (\d+) (.*)\)\s*$")
# ---------------------------------------------------------------------------
# Classification
# ---------------------------------------------------------------------------
def classify_error(msg: str) -> str:
m = msg.lower()
if "expected" in m and "got" in m:
return "SyntaxError (parse/unsupported syntax)"
if "unexpected token" in m or "unexpected char" in m:
return "SyntaxError (parse/unsupported syntax)"
if "expected ident" in m or "expected punct" in m or "expected keyword" in m:
return "SyntaxError (parse/unsupported syntax)"
if "syntaxerror" in m or "parse" in m:
return "SyntaxError (parse/unsupported syntax)"
if "undefined symbol" in m or "unbound" in m:
return "ReferenceError (undefined symbol)"
if "referenceerror" in m:
return "ReferenceError (undefined symbol)"
if "typeerror" in m and "not a function" in m:
return "TypeError: not a function"
if "typeerror" in m:
return "TypeError (other)"
if "rangeerror" in m:
return "RangeError"
if "test262error" in m:
return "Test262Error (assertion failed)"
if "timeout" in m:
return "Timeout"
if "killed" in m or "crash" in m:
return "Crash"
if "unhandled exception" in m:
inner = re.search(r"Unhandled exception:\s*\\?\"([^\"]{0,80})", msg)
if inner:
return f"Unhandled: {inner.group(1)[:60]}"
return "Unhandled exception"
return f"Other: {msg[:80]}"
def classify_negative_result(fm: Frontmatter, kind: str, payload: str):
expected_type = fm.negative_type or ""
if kind == "error":
low = payload.lower()
etype = expected_type.lower()
if etype and etype in low:
return True, f"negative: threw {expected_type} as expected"
# Map our parse errors to SyntaxError for negative:parse tests
if expected_type == "SyntaxError" and (
"unexpected token" in low
or "unexpected char" in low
or "expected ident" in low
or "expected punct" in low
or "expected keyword" in low
or ("expected" in low and "got" in low)
or "js-transpile-unop" in low
or "js-transpile-binop" in low
or "js-compound-update" in low
or "parse" in low
):
return True, f"negative: threw {expected_type} (mapped from parser error) as expected"
# Many runtime errors signal parse-phase syntax errors in test262's sense
if expected_type == "ReferenceError" and "undefined symbol" in low:
return True, f"negative: threw {expected_type} (mapped) as expected"
if expected_type == "TypeError" and "typeerror" in low:
return True, f"negative: threw {expected_type} as expected"
return False, f"negative: expected {expected_type}, got: {payload[:100]}"
return False, f"negative: expected {expected_type}, but test completed normally"
def classify_positive_result(kind: str, payload: str):
if kind == "ok":
return True, "passed"
return False, classify_error(payload)
# ---------------------------------------------------------------------------
# Skip rules
# ---------------------------------------------------------------------------
UNSUPPORTED_FEATURES = {
"Atomics",
"SharedArrayBuffer",
"BigInt",
"Proxy",
"Reflect",
"Reflect.construct",
"Symbol",
"Symbol.iterator",
"Symbol.asyncIterator",
"Symbol.hasInstance",
"Symbol.isConcatSpreadable",
"Symbol.match",
"Symbol.matchAll",
"Symbol.replace",
"Symbol.search",
"Symbol.species",
"Symbol.split",
"Symbol.toPrimitive",
"Symbol.toStringTag",
"Symbol.unscopables",
"TypedArray",
"DataView",
"WeakRef",
"WeakMap",
"WeakSet",
"FinalizationRegistry",
"async-functions", # we support but conformance shape iffy
"async-iteration",
"async-generators",
"generators",
"regexp-named-groups",
"regexp-unicode-property-escapes",
"regexp-dotall",
"regexp-lookbehind",
"regexp-match-indices",
"regexp-modifiers",
"regexp-v-flag",
"regexp-duplicate-named-groups",
"numeric-separator-literal",
"class-fields-private",
"class-fields-public",
"class-methods-private",
"class-static-fields-private",
"class-static-fields-public",
"class-static-methods-private",
"decorators",
"destructuring-binding-patterns",
"destructuring-assignment",
"error-cause",
"optional-chaining",
"optional-catch-binding",
"logical-assignment-operators",
"numeric-separator-literal",
"hashbang",
"import-assertions",
"import-attributes",
"import.meta",
"dynamic-import",
"json-modules",
"json-parse-with-source",
"Intl.DisplayNames",
"Intl.ListFormat",
"Intl.Locale",
"Intl.NumberFormat-unified",
"Intl.Segmenter",
"Intl-enumeration",
"Temporal",
"IteratorClose",
"Iterator",
"iterator-helpers",
"async-explicit-resource-management",
"explicit-resource-management",
"set-methods",
"Map.prototype.upsert",
"array-grouping",
"Array.fromAsync",
"promise-with-resolvers",
"Promise.try",
"Promise.any",
"Promise.allSettled",
"ShadowRealm",
"tail-call-optimization",
"legacy-regexp",
"uint8array-base64",
}
def should_skip(t):
if "onlyStrict" in t.fm.flags:
return True, "strict-mode only"
if "module" in t.fm.flags:
return True, "ESM module"
if "raw" in t.fm.flags:
return True, "raw (no harness)"
if "CanBlockIsFalse" in t.fm.flags or "CanBlockIsTrue" in t.fm.flags:
return True, "shared-memory flag"
for f in t.fm.features:
if f in UNSUPPORTED_FEATURES:
return True, f"feature:{f}"
# Skip anything under Intl/Temporal/etc. path — these categories are 100% unsupported
p = t.rel
for prefix in (
"intl402/",
"staging/",
"built-ins/Atomics/",
"built-ins/SharedArrayBuffer/",
"built-ins/BigInt/",
"built-ins/Proxy/",
"built-ins/Reflect/",
"built-ins/Symbol/",
"built-ins/WeakRef/",
"built-ins/WeakMap/",
"built-ins/WeakSet/",
"built-ins/FinalizationRegistry/",
"built-ins/TypedArrayConstructors/",
"built-ins/Temporal/",
"built-ins/Int8Array/",
"built-ins/Int16Array/",
"built-ins/Int32Array/",
"built-ins/Uint8Array/",
"built-ins/Uint8ClampedArray/",
"built-ins/Uint16Array/",
"built-ins/Uint32Array/",
"built-ins/Float16Array/",
"built-ins/Float32Array/",
"built-ins/Float64Array/",
"built-ins/BigInt64Array/",
"built-ins/BigUint64Array/",
"built-ins/DataView/",
"built-ins/ArrayBuffer/",
"built-ins/ArrayIteratorPrototype/",
"built-ins/AsyncFromSyncIteratorPrototype/",
"built-ins/AsyncGeneratorFunction/",
"built-ins/AsyncGeneratorPrototype/",
"built-ins/AsyncIteratorPrototype/",
"built-ins/GeneratorFunction/",
"built-ins/GeneratorPrototype/",
"built-ins/MapIteratorPrototype/",
"built-ins/SetIteratorPrototype/",
"built-ins/StringIteratorPrototype/",
"built-ins/RegExpStringIteratorPrototype/",
"built-ins/AbstractModuleSource/",
"built-ins/AggregateError/",
"built-ins/DisposableStack/",
"built-ins/AsyncDisposableStack/",
"built-ins/SuppressedError/",
"built-ins/Iterator/",
"built-ins/AsyncIterator/",
"built-ins/ShadowRealm/",
"annexB/",
):
if p.startswith(prefix):
return True, f"unsupported path:{prefix.rstrip('/')}"
return False, ""
# ---------------------------------------------------------------------------
# Test case loading
# ---------------------------------------------------------------------------
@dataclasses.dataclass
class TestCase:
path: Path
rel: str
category: str
fm: Frontmatter
src: str
@dataclasses.dataclass
class TestResult:
rel: str
category: str
status: str # pass | fail | skip | timeout
reason: str
elapsed_ms: int = 0
def discover_tests(filter_prefixes):
tests = []
for p in TEST_ROOT.rglob("*.js"):
if p.name.endswith("_FIXTURE.js"):
continue
if "_FIXTURE" in p.parts:
continue
if filter_prefixes:
rel = p.relative_to(TEST_ROOT).as_posix()
if not any(rel.startswith(prefix) for prefix in filter_prefixes):
continue
tests.append(p)
tests.sort()
return tests
def load_test(path: Path):
try:
src = path.read_text(encoding="utf-8")
except Exception:
return None
fm = parse_frontmatter(src)
return TestCase(
path=path,
rel=path.relative_to(TEST_ROOT).as_posix(),
category=test_category(path),
fm=fm,
src=src,
)
# ---------------------------------------------------------------------------
# Harness cache — transpile HARNESS_STUB once, write SX to disk.
# Every worker then loads the cached .sx (a few ms) instead of re-running
# js-tokenize + js-parse + js-transpile (15+ s).
# ---------------------------------------------------------------------------
# Remembered across the Python process. None until we've run the precompute.
_HARNESS_CACHE_PATH: "Path | None" = None
# Per-filename include cache: maps 'compareArray.js' -> Path of cached .sx.
_EXTRA_HARNESS_CACHE: dict = {}
def _harness_cache_rel_path() -> "str | None":
if _HARNESS_CACHE_PATH is None:
return None
try:
return _HARNESS_CACHE_PATH.relative_to(REPO).as_posix()
except ValueError:
return str(_HARNESS_CACHE_PATH)
def _precompute_sx(js_source: str, timeout_s: float = 120.0) -> str:
"""Run one throwaway sx_server to turn a chunk of JS into the SX text that
js-eval would have evaluated. Returns the raw SX source (no outer quotes).
"""
proc = subprocess.Popen(
[str(SX_SERVER)],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
cwd=str(REPO),
bufsize=0,
)
fd = proc.stdout.fileno()
os.set_blocking(fd, False)
buf = [b""]
def readline(timeout: float):
deadline = time.monotonic() + timeout
while True:
nl = buf[0].find(b"\n")
if nl >= 0:
line = buf[0][: nl + 1]
buf[0] = buf[0][nl + 1 :]
return line
remaining = deadline - time.monotonic()
if remaining <= 0:
raise TimeoutError("precompute readline timeout")
rlist, _, _ = select.select([fd], [], [], remaining)
if not rlist:
raise TimeoutError("precompute readline timeout")
try:
chunk = os.read(fd, 65536)
except (BlockingIOError, InterruptedError):
continue
if not chunk:
return None
buf[0] += chunk
def run(epoch: int, cmd: str, to: float = 60.0):
proc.stdin.write(f"(epoch {epoch})\n{cmd}\n".encode("utf-8"))
proc.stdin.flush()
deadline = time.monotonic() + to
while time.monotonic() < deadline:
line = readline(deadline - time.monotonic())
if line is None:
raise RuntimeError("precompute: sx_server closed stdout")
m = RX_OK_INLINE.match(line.decode("utf-8", "replace"))
if m and int(m.group(1)) == epoch:
return "ok", m.group(2)
m = RX_OK_LEN.match(line.decode("utf-8", "replace"))
if m and int(m.group(1)) == epoch:
val = readline(deadline - time.monotonic())
return "ok", (val or b"").decode("utf-8", "replace").rstrip("\n")
m = RX_ERR.match(line.decode("utf-8", "replace"))
if m and int(m.group(1)) == epoch:
return "error", m.group(2)
raise TimeoutError(f"precompute epoch {epoch}")
try:
# Wait for ready
deadline = time.monotonic() + 15.0
while time.monotonic() < deadline:
line = readline(deadline - time.monotonic())
if line is None:
raise RuntimeError("precompute: sx_server closed before ready")
if b"(ready)" in line:
break
# Load JS kernel
run(1, '(load "lib/r7rs.sx")')
run(2, '(load "lib/js/lexer.sx")')
run(3, '(load "lib/js/parser.sx")')
run(4, '(load "lib/js/transpile.sx")')
# Transpile to SX source via inspect
inner = js_source.replace("\\", "\\\\").replace('"', '\\"')
inner = inner.replace("\n", "\\n").replace("\r", "\\r").replace("\t", "\\t")
outer = inner.replace("\\", "\\\\").replace('"', '\\"')
cmd = f'(eval "(inspect (js-transpile (js-parse (js-tokenize \\"{outer}\\"))))")'
kind, payload = run(5, cmd, timeout_s)
if kind != "ok":
raise RuntimeError(f"precompute error: {payload[:200]}")
# payload is an SX string-literal — peel one layer of quoting.
import json as _json
if payload.startswith('"') and payload.endswith('"'):
return _json.loads(payload)
return payload
finally:
try:
proc.stdin.close()
except Exception:
pass
try:
proc.terminate()
proc.wait(timeout=3)
except Exception:
try:
proc.kill()
except Exception:
pass
def _harness_fingerprint() -> str:
import hashlib
# Include the exact runtime/transpile source hash so a change to the
# transpiler invalidates the cache automatically.
h = hashlib.sha256()
h.update(HARNESS_STUB.encode("utf-8"))
for p in ("lib/js/lexer.sx", "lib/js/parser.sx", "lib/js/transpile.sx"):
try:
h.update((REPO / p).read_bytes())
except Exception:
pass
return h.hexdigest()[:16]
def precompute_harness_cache() -> Path:
"""Populate _HARNESS_CACHE_PATH by transpiling HARNESS_STUB once and
writing it to disk. Every worker session then does (load <path>) instead.
Reuses a prior cache file from a previous `python3 test262-runner.py`
run when the fingerprint (harness text + transpiler source hash) still
matches — that covers the common case of re-running scoreboards back-to-back
without touching transpile.sx.
"""
global _HARNESS_CACHE_PATH
HARNESS_CACHE_DIR.mkdir(parents=True, exist_ok=True)
fp = _harness_fingerprint()
dst = HARNESS_CACHE_DIR / f"stub.{fp}.sx"
stable = HARNESS_CACHE_DIR / "stub.sx"
if dst.exists() and dst.stat().st_size > 0:
# Expose both the canonical and fingerprinted names — sessions load
# the canonical one.
stable.write_bytes(dst.read_bytes())
_HARNESS_CACHE_PATH = stable
print(f"harness cache: reused {dst.name} ({dst.stat().st_size} bytes)",
file=sys.stderr)
return stable
t0 = time.monotonic()
sx = _precompute_sx(HARNESS_STUB)
dst.write_text(sx, encoding="utf-8")
stable.write_text(sx, encoding="utf-8")
_HARNESS_CACHE_PATH = stable
dt = time.monotonic() - t0
print(f"harness cache: {len(HARNESS_STUB)} JS chars → {len(sx)} SX chars "
f"at {stable.relative_to(REPO)} (fp={fp}, {dt:.2f}s)", file=sys.stderr)
return stable
# ---------------------------------------------------------------------------
# Long-lived server session
# ---------------------------------------------------------------------------
class ServerSession:
"""Wrap a long-lived sx_server.exe subprocess; feed it one-liner commands,
collect results per-epoch. Restart on hang/crash.
Uses a raw-fd line buffer + select() to avoid spawning a thread per read.
"""
def __init__(self, per_test_timeout: float):
self.per_test_timeout = per_test_timeout
self.proc = None
self._buf = b""
self._fd = -1
def start(self) -> None:
self.proc = subprocess.Popen(
[str(SX_SERVER)],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
cwd=str(REPO),
bufsize=0, # binary, unbuffered — we do our own line parsing
)
self._fd = self.proc.stdout.fileno()
self._buf = b""
os.set_blocking(self._fd, False)
self._wait_for("(ready)", timeout=15.0)
# Load kernel libraries
self._run_and_collect(1, '(load "lib/r7rs.sx")', timeout=60.0)
self._run_and_collect(2, '(load "lib/js/lexer.sx")', timeout=60.0)
self._run_and_collect(3, '(load "lib/js/parser.sx")', timeout=60.0)
self._run_and_collect(4, '(load "lib/js/transpile.sx")', timeout=60.0)
self._run_and_collect(5, '(load "lib/js/runtime.sx")', timeout=60.0)
# Preload the stub harness — use precomputed SX cache when available
# (huge win: ~15s js-eval HARNESS_STUB → ~0s load precomputed .sx).
cache_rel = _harness_cache_rel_path()
if cache_rel is not None:
self._run_and_collect(6, f'(load "{cache_rel}")', timeout=60.0)
else:
stub_escaped = sx_escape_for_nested_eval(HARNESS_STUB)
self._run_and_collect(
6,
f'(eval "(js-eval \\"{stub_escaped}\\")")',
timeout=60.0,
)
def stop(self) -> None:
if self.proc is not None:
try:
self.proc.stdin.close()
except Exception:
pass
try:
self.proc.terminate()
self.proc.wait(timeout=3)
except Exception:
try:
self.proc.kill()
except Exception:
pass
self.proc = None
def _readline_raw(self, timeout: float):
"""Read one line (including trailing \\n) from the subprocess's stdout.
Returns bytes or None on EOF. Raises TimeoutError if no newline appears
within `timeout` seconds.
"""
deadline = time.monotonic() + timeout
while True:
nl = self._buf.find(b"\n")
if nl >= 0:
line = self._buf[: nl + 1]
self._buf = self._buf[nl + 1 :]
return line
remaining = deadline - time.monotonic()
if remaining <= 0:
raise TimeoutError("readline timeout")
try:
rlist, _, _ = select.select([self._fd], [], [], remaining)
except (OSError, ValueError):
return None
if not rlist:
raise TimeoutError("readline timeout")
try:
chunk = os.read(self._fd, 65536)
except (BlockingIOError, InterruptedError):
continue
except OSError:
return None
if not chunk:
if self._buf:
line = self._buf
self._buf = b""
return line
return None
self._buf += chunk
def _readline(self, timeout: float):
b = self._readline_raw(timeout)
if b is None:
return None
try:
return b.decode("utf-8", errors="replace")
except Exception:
return ""
def _wait_for(self, token: str, timeout: float) -> None:
start = time.monotonic()
while time.monotonic() - start < timeout:
line = self._readline(timeout - (time.monotonic() - start))
if line is None:
raise RuntimeError("sx_server closed stdout before ready")
if token in line:
return
raise TimeoutError(f"timeout waiting for {token}")
def _run_and_collect(self, epoch: int, cmd: str, timeout: float):
"""Write `(epoch N)\\n<cmd>\\n` and read until we see ok/ok-len/error for that epoch.
Returns (kind, payload). Raises TimeoutError if the server hangs.
"""
payload = f"(epoch {epoch})\n{cmd}\n".encode("utf-8")
try:
self.proc.stdin.write(payload)
self.proc.stdin.flush()
except (BrokenPipeError, OSError):
raise RuntimeError("sx_server stdin closed")
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
remaining = deadline - time.monotonic()
if remaining <= 0:
raise TimeoutError(f"epoch {epoch} exceeded timeout {timeout}")
line = self._readline(remaining)
if line is None:
raise RuntimeError("sx_server closed stdout mid-epoch")
m = RX_OK_INLINE.match(line)
if m:
e = int(m.group(1))
if e == epoch:
return "ok", m.group(2)
continue
m = RX_OK_LEN.match(line)
if m:
e = int(m.group(1))
remaining2 = deadline - time.monotonic()
if remaining2 <= 0:
raise TimeoutError(f"epoch {epoch} exceeded timeout {timeout}")
val = self._readline(remaining2)
if val is None:
val = ""
val = val.rstrip("\n")
if e == epoch:
return "ok", val
continue
m = RX_ERR.match(line)
if m:
e = int(m.group(1))
if e == epoch:
return "error", m.group(2)
continue
# Other output — (ready), comment, noise — ignore
raise TimeoutError(f"epoch {epoch} exceeded timeout {timeout}")
def run_test(self, epoch: int, js_source: str):
escaped = sx_escape_for_nested_eval(js_source)
cmd = f'(eval "(js-eval \\"{escaped}\\")")'
return self._run_and_collect(epoch, cmd, timeout=self.per_test_timeout)
# ---------------------------------------------------------------------------
# Parallel workers
# ---------------------------------------------------------------------------
def _worker_run(args):
"""Run a shard of tests in this process. Returns list of (rel, category, status, reason).
Each worker keeps its own long-lived ServerSession. Restarts on timeout/crash.
"""
shard_tests, per_test_timeout, restart_every, worker_id = args
session = None
results = []
def get_session():
nonlocal session
if session is None:
session = ServerSession(per_test_timeout=per_test_timeout)
session.start()
return session
def restart():
nonlocal session
if session is not None:
try:
session.stop()
except Exception:
pass
session = None
try:
epoch = 100 + worker_id * 10000
done_n = 0
for t_data in shard_tests:
rel, category, src, negative_phase, negative_type = t_data
epoch += 1
done_n += 1
try:
sess = get_session()
kind, payload = sess.run_test(epoch, src)
if negative_phase:
fake_fm = Frontmatter()
fake_fm.negative_phase = negative_phase
fake_fm.negative_type = negative_type
ok, reason = classify_negative_result(fake_fm, kind, payload)
status = "pass" if ok else "fail"
else:
if kind == "ok":
status, reason = "pass", "passed"
else:
status, reason = "fail", classify_error(payload)
results.append((rel, category, status, reason))
except TimeoutError:
results.append((rel, category, "timeout", "per-test timeout"))
restart()
except Exception as e:
results.append((rel, category, "fail", f"runner-error: {e}"))
restart()
# Periodic restart to keep server healthy (memory bounded)
if restart_every > 0 and done_n % restart_every == 0:
restart()
finally:
if session is not None:
try:
session.stop()
except Exception:
pass
return results
# ---------------------------------------------------------------------------
# Run driver
# ---------------------------------------------------------------------------
def assemble_source(t):
"""Return JS source to feed to js-eval. Harness is preloaded, so we only
append the test source (plus negative-test prep if needed).
"""
return t.src
def aggregate(results):
by_cat = defaultdict(
lambda: {"pass": 0, "fail": 0, "skip": 0, "timeout": 0, "total": 0, "failures": Counter()}
)
totals = {"pass": 0, "fail": 0, "skip": 0, "timeout": 0, "total": 0}
failure_modes = Counter()
for r in results:
cat = by_cat[r.category]
cat[r.status] += 1
cat["total"] += 1
totals[r.status] += 1
totals["total"] += 1
if r.status == "fail":
cat["failures"][r.reason] += 1
failure_modes[r.reason] += 1
elif r.status == "timeout":
cat["failures"]["Timeout"] += 1
failure_modes["Timeout"] += 1
categories = []
for name, stats in sorted(by_cat.items()):
total = stats["total"]
passed = stats["pass"]
runnable = total - stats["skip"]
pass_rate = (passed / runnable * 100.0) if runnable else 0.0
categories.append(
{
"category": name,
"total": total,
"pass": passed,
"fail": stats["fail"],
"skip": stats["skip"],
"timeout": stats["timeout"],
"pass_rate": round(pass_rate, 1),
"top_failures": stats["failures"].most_common(5),
}
)
runnable_total = totals["total"] - totals["skip"]
pass_rate = (totals["pass"] / runnable_total * 100.0) if runnable_total else 0.0
return {
"totals": {**totals, "runnable": runnable_total, "pass_rate": round(pass_rate, 1)},
"categories": categories,
"top_failure_modes": failure_modes.most_common(20),
}
def write_markdown(scoreboard, path: Path, pinned_commit: str, elapsed_s: float) -> None:
t = scoreboard["totals"]
lines = [
"# test262 scoreboard",
"",
f"Pinned commit: `{pinned_commit}`",
f"Wall time: {elapsed_s:.1f}s",
"",
f"**Total:** {t['pass']}/{t['runnable']} runnable passed ({t['pass_rate']}%). "
f"Raw: pass={t['pass']} fail={t['fail']} skip={t['skip']} timeout={t['timeout']} total={t['total']}.",
"",
"## Top failure modes",
"",
]
for mode, count in scoreboard["top_failure_modes"]:
lines.append(f"- **{count}x** {mode}")
lines.extend(["", "## Categories (worst pass-rate first, min 10 runnable)", ""])
lines.append("| Category | Pass | Fail | Skip | Timeout | Total | Pass % |")
lines.append("|---|---:|---:|---:|---:|---:|---:|")
cats = [c for c in scoreboard["categories"] if (c["total"] - c["skip"]) >= 10]
cats.sort(key=lambda c: (c["pass_rate"], -c["total"]))
for c in cats:
lines.append(
f"| {c['category']} | {c['pass']} | {c['fail']} | {c['skip']} | "
f"{c['timeout']} | {c['total']} | {c['pass_rate']}% |"
)
lines.append("")
lines.append("## Per-category top failures (min 10 runnable, worst first)")
lines.append("")
for c in cats:
if not c["top_failures"]:
continue
lines.append(f"### {c['category']} ({c['pass']}/{c['total']-c['skip']}{c['pass_rate']}%)")
lines.append("")
for reason, count in c["top_failures"]:
lines.append(f"- **{count}x** {reason}")
lines.append("")
path.write_text("\n".join(lines), encoding="utf-8")
def main(argv):
ap = argparse.ArgumentParser()
ap.add_argument("--limit", type=int, default=0, help="max tests to run (0 = all)")
ap.add_argument("--filter", type=str, action="append", default=None,
help="path prefix filter (repeatable; OR'd together)")
ap.add_argument("--per-test-timeout", type=float, default=DEFAULT_PER_TEST_TIMEOUT_S)
ap.add_argument("--restart-every", type=int, default=500,
help="restart worker server every N tests (keeps memory bounded)")
ap.add_argument("--max-per-category", type=int, default=0,
help="cap runnable tests per category (0 = no cap)")
ap.add_argument("--workers", type=int, default=0,
help="number of parallel workers (0 = auto; min(nproc, 4))")
ap.add_argument("--output-json", type=str,
default=str(REPO / "lib" / "js" / "test262-scoreboard.json"))
ap.add_argument("--output-md", type=str,
default=str(REPO / "lib" / "js" / "test262-scoreboard.md"))
ap.add_argument("--progress-every", type=int, default=100)
ap.add_argument("--dump-failures", type=str, default=None,
help="if set, write every failed test's rel path + reason to this file")
args = ap.parse_args(argv)
if not SX_SERVER.exists():
print(f"ERROR: sx_server.exe not found at {SX_SERVER}", file=sys.stderr)
return 1
if not UPSTREAM.exists():
print(f"ERROR: test262-upstream not found at {UPSTREAM}", file=sys.stderr)
return 1
pinned_commit = ""
try:
pinned_commit = subprocess.check_output(
["git", "-C", str(UPSTREAM), "rev-parse", "HEAD"], text=True
).strip()
except Exception:
pass
all_paths = discover_tests(args.filter)
if args.limit:
all_paths = all_paths[: args.limit]
print(f"Discovered {len(all_paths)} test files.", file=sys.stderr)
# Precompute harness cache once per run. Workers (forked) inherit module
# globals, so the cache path is visible to every session.start() call.
try:
precompute_harness_cache()
except Exception as e:
print(f"harness cache precompute failed ({e}); falling back to js-eval per session",
file=sys.stderr)
tests = []
results = []
per_cat_count = defaultdict(int)
for p in all_paths:
t = load_test(p)
if not t:
continue
skip, why = should_skip(t)
if skip:
results.append(TestResult(rel=t.rel, category=t.category, status="skip", reason=why))
continue
if args.max_per_category > 0 and per_cat_count[t.category] >= args.max_per_category:
results.append(TestResult(rel=t.rel, category=t.category, status="skip",
reason=f"capped at --max-per-category={args.max_per_category}"))
continue
per_cat_count[t.category] += 1
tests.append(t)
print(f"Will run {len(tests)} tests ({len(results)} skipped up front).", file=sys.stderr)
# Worker count
# Auto-default: on <=2-core machines, 1 worker beats 2 because OCaml eval is
# CPU-bound and two processes starve each other. On 4+ cores, use nproc-1
# (leave one core for OS/Python). Cap at 8 to avoid resource thrash.
n_workers = args.workers
if n_workers <= 0:
try:
cpu = os.cpu_count() or 2
except Exception:
cpu = 2
if cpu <= 2:
n_workers = 1
else:
n_workers = max(1, min(cpu - 1, 8))
n_workers = max(1, min(n_workers, len(tests))) if tests else 1
print(f"Using {n_workers} parallel worker(s).", file=sys.stderr)
# Shard tests across workers (round-robin so categories spread evenly)
shards = [[] for _ in range(n_workers)]
for i, t in enumerate(tests):
shards[i % n_workers].append(
(t.rel, t.category, t.src, t.fm.negative_phase, t.fm.negative_type)
)
t_run_start = time.monotonic()
if n_workers == 1:
# Serial path — avoids multiprocessing overhead
worker_results = [_worker_run((shards[0], args.per_test_timeout, args.restart_every, 0))]
else:
with mp.Pool(n_workers) as pool:
worker_args = [
(shards[i], args.per_test_timeout, args.restart_every, i)
for i in range(n_workers)
]
# imap_unordered so progress prints show up sooner
collected = []
total_tests = len(tests)
last_print = time.monotonic()
for shard_out in pool.imap_unordered(_worker_run, worker_args):
collected.append(shard_out)
now = time.monotonic()
if now - last_print >= 5.0:
done_so_far = sum(len(s) for s in collected)
el = now - t_run_start
print(
f" worker returned: {done_so_far}/{total_tests} tests "
f"elapsed={el:.1f}s rate={done_so_far/max(el,0.001):.1f}/s",
file=sys.stderr,
)
last_print = now
worker_results = collected
for shard_out in worker_results:
for rel, category, status, reason in shard_out:
results.append(TestResult(rel=rel, category=category, status=status, reason=reason))
t_run_elapsed = time.monotonic() - t_run_start
print(f"\nFinished run in {t_run_elapsed:.1f}s", file=sys.stderr)
scoreboard = aggregate(results)
scoreboard["pinned_commit"] = pinned_commit
scoreboard["elapsed_seconds"] = round(t_run_elapsed, 1)
scoreboard["workers"] = n_workers
out_json = Path(args.output_json)
out_json.parent.mkdir(parents=True, exist_ok=True)
out_json.write_text(json.dumps(scoreboard, indent=2), encoding="utf-8")
out_md = Path(args.output_md)
write_markdown(scoreboard, out_md, pinned_commit, t_run_elapsed)
if args.dump_failures:
out_fail = Path(args.dump_failures)
out_fail.parent.mkdir(parents=True, exist_ok=True)
with out_fail.open("w", encoding="utf-8") as f:
for r in results:
if r.status in ("fail", "timeout"):
f.write(f"{r.status}\t{r.rel}\t{r.reason}\n")
print(f"failures dumped to {out_fail}", file=sys.stderr)
t = scoreboard["totals"]
print(
f"\nScoreboard: {t['pass']}/{t['runnable']} runnable passed ({t['pass_rate']}%) "
f"fail={t['fail']} skip={t['skip']} timeout={t['timeout']} total={t['total']}",
file=sys.stderr,
)
print(f"JSON: {out_json}", file=sys.stderr)
print(f"MD: {out_md}", file=sys.stderr)
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))