js-on-sx: parallel test262 runner with raw-fd line buffer

Rework test262-runner.py to support --workers N parallel shards, each running
a long-lived sx_server session. Replace thread-per-readline with a select-based
raw-fd line buffer.

On 2-core machines, 1 worker still beats 2 (OCaml eval is CPU-bound and starves
when shared). Auto-defaults n_workers=1 on <=2 CPU, nproc-1 (up to 8) otherwise.

Throughput baseline: ~1.1 Math tests/s serial on 2-core (unchanged; the
evaluator dominates). The runner framework is now ready to scale on bigger
machines without further code changes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-24 06:18:48 +00:00
parent 20a1a81d15
commit 65d4c70638
3 changed files with 261 additions and 168 deletions

View File

@@ -3,14 +3,16 @@
test262-runner — run the official TC39 test262 suite against our JS-on-SX runtime.
Walks lib/js/test262-upstream/test/**/*.js, parses YAML-ish frontmatter, runs
tests via a long-lived sx_server.exe subprocess (one harness load, one `js-eval`
call per test), and emits JSON + Markdown scoreboards.
tests via a pool of long-lived sx_server.exe subprocesses (each worker loads
the harness once, then runs `js-eval` per test on a persistent stdin channel),
and emits JSON + Markdown scoreboards.
Usage:
python3 lib/js/test262-runner.py # full run (skips strict/module/etc)
python3 lib/js/test262-runner.py --limit 2000
python3 lib/js/test262-runner.py --filter built-ins/Math
python3 lib/js/test262-runner.py --per-test-timeout 3
python3 lib/js/test262-runner.py --workers 4 # parallel workers (default: 2)
Outputs:
lib/js/test262-scoreboard.json
@@ -34,11 +36,12 @@ from __future__ import annotations
import argparse
import dataclasses
import json
import multiprocessing as mp
import os
import re
import select
import subprocess
import sys
import threading
import time
from collections import Counter, defaultdict
from pathlib import Path
@@ -152,15 +155,15 @@ FRONTMATTER_RE = re.compile(r"/\*---(.*?)---\*/", re.DOTALL)
@dataclasses.dataclass
class Frontmatter:
description: str = ""
flags: list[str] = dataclasses.field(default_factory=list)
includes: list[str] = dataclasses.field(default_factory=list)
features: list[str] = dataclasses.field(default_factory=list)
negative_phase: str | None = None
negative_type: str | None = None
esid: str | None = None
flags: list = dataclasses.field(default_factory=list)
includes: list = dataclasses.field(default_factory=list)
features: list = dataclasses.field(default_factory=list)
negative_phase: "str | None" = None
negative_type: "str | None" = None
esid: "str | None" = None
def _parse_yaml_list(s: str) -> list[str]:
def _parse_yaml_list(s: str) -> list:
s = s.strip()
if s.startswith("[") and s.endswith("]"):
s = s[1:-1]
@@ -187,7 +190,7 @@ def parse_frontmatter(src: str) -> Frontmatter:
key, value = m2.group(1), m2.group(2).strip()
if key == "description":
if value in (">", "|"):
desc_lines: list[str] = []
desc_lines = []
j = i + 1
while j < len(lines):
nxt = lines[j]
@@ -328,7 +331,7 @@ def classify_error(msg: str) -> str:
return f"Other: {msg[:80]}"
def classify_negative_result(fm: Frontmatter, kind: str, payload: str) -> tuple[bool, str]:
def classify_negative_result(fm: Frontmatter, kind: str, payload: str):
expected_type = fm.negative_type or ""
if kind == "error":
if expected_type and expected_type.lower() in payload.lower():
@@ -337,7 +340,7 @@ def classify_negative_result(fm: Frontmatter, kind: str, payload: str) -> tuple[
return False, f"negative: expected {expected_type}, but test completed normally"
def classify_positive_result(kind: str, payload: str) -> tuple[bool, str]:
def classify_positive_result(kind: str, payload: str):
if kind == "ok":
return True, "passed"
return False, classify_error(payload)
@@ -435,7 +438,7 @@ UNSUPPORTED_FEATURES = {
}
def should_skip(t: "TestCase") -> tuple[bool, str]:
def should_skip(t):
if "onlyStrict" in t.fm.flags:
return True, "strict-mode only"
if "module" in t.fm.flags:
@@ -527,8 +530,8 @@ class TestResult:
elapsed_ms: int = 0
def discover_tests(filter_prefixes: list[str] | None) -> list[Path]:
tests: list[Path] = []
def discover_tests(filter_prefixes):
tests = []
for p in TEST_ROOT.rglob("*.js"):
if p.name.endswith("_FIXTURE.js"):
continue
@@ -543,7 +546,7 @@ def discover_tests(filter_prefixes: list[str] | None) -> list[Path]:
return tests
def load_test(path: Path) -> TestCase | None:
def load_test(path: Path):
try:
src = path.read_text(encoding="utf-8")
except Exception:
@@ -566,12 +569,15 @@ def load_test(path: Path) -> TestCase | None:
class ServerSession:
"""Wrap a long-lived sx_server.exe subprocess; feed it one-liner commands,
collect results per-epoch. Restart on hang/crash.
Uses a raw-fd line buffer + select() to avoid spawning a thread per read.
"""
def __init__(self, per_test_timeout: float):
self.per_test_timeout = per_test_timeout
self.proc: subprocess.Popen | None = None
self.lock = threading.Lock()
self.proc = None
self._buf = b""
self._fd = -1
def start(self) -> None:
self.proc = subprocess.Popen(
@@ -580,22 +586,24 @@ class ServerSession:
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
cwd=str(REPO),
text=True,
bufsize=1,
bufsize=0, # binary, unbuffered — we do our own line parsing
)
self._wait_for("(ready)", timeout=10.0)
self._fd = self.proc.stdout.fileno()
self._buf = b""
os.set_blocking(self._fd, False)
self._wait_for("(ready)", timeout=15.0)
# Load kernel libraries
self._run_and_collect(1, '(load "lib/r7rs.sx")', timeout=30.0)
self._run_and_collect(2, '(load "lib/js/lexer.sx")', timeout=30.0)
self._run_and_collect(3, '(load "lib/js/parser.sx")', timeout=30.0)
self._run_and_collect(4, '(load "lib/js/transpile.sx")', timeout=30.0)
self._run_and_collect(5, '(load "lib/js/runtime.sx")', timeout=30.0)
self._run_and_collect(1, '(load "lib/r7rs.sx")', timeout=60.0)
self._run_and_collect(2, '(load "lib/js/lexer.sx")', timeout=60.0)
self._run_and_collect(3, '(load "lib/js/parser.sx")', timeout=60.0)
self._run_and_collect(4, '(load "lib/js/transpile.sx")', timeout=60.0)
self._run_and_collect(5, '(load "lib/js/runtime.sx")', timeout=60.0)
# Preload the stub harness as one big js-eval
stub_escaped = sx_escape_for_nested_eval(HARNESS_STUB)
self._run_and_collect(
6,
f'(eval "(js-eval \\"{stub_escaped}\\")")',
timeout=30.0,
timeout=60.0,
)
def stop(self) -> None:
@@ -614,31 +622,77 @@ class ServerSession:
pass
self.proc = None
def _readline_raw(self, timeout: float):
"""Read one line (including trailing \\n) from the subprocess's stdout.
Returns bytes or None on EOF. Raises TimeoutError if no newline appears
within `timeout` seconds.
"""
deadline = time.monotonic() + timeout
while True:
nl = self._buf.find(b"\n")
if nl >= 0:
line = self._buf[: nl + 1]
self._buf = self._buf[nl + 1 :]
return line
remaining = deadline - time.monotonic()
if remaining <= 0:
raise TimeoutError("readline timeout")
try:
rlist, _, _ = select.select([self._fd], [], [], remaining)
except (OSError, ValueError):
return None
if not rlist:
raise TimeoutError("readline timeout")
try:
chunk = os.read(self._fd, 65536)
except (BlockingIOError, InterruptedError):
continue
except OSError:
return None
if not chunk:
if self._buf:
line = self._buf
self._buf = b""
return line
return None
self._buf += chunk
def _readline(self, timeout: float):
b = self._readline_raw(timeout)
if b is None:
return None
try:
return b.decode("utf-8", errors="replace")
except Exception:
return ""
def _wait_for(self, token: str, timeout: float) -> None:
assert self.proc and self.proc.stdout
start = time.monotonic()
while time.monotonic() - start < timeout:
line = self.proc.stdout.readline()
if not line:
line = self._readline(timeout - (time.monotonic() - start))
if line is None:
raise RuntimeError("sx_server closed stdout before ready")
if token in line:
return
raise TimeoutError(f"timeout waiting for {token}")
def _run_and_collect(self, epoch: int, cmd: str, timeout: float) -> tuple[str, str]:
"""Write `(epoch N)\n<cmd>\n` and read until we see ok/ok-len/error for that epoch.
def _run_and_collect(self, epoch: int, cmd: str, timeout: float):
"""Write `(epoch N)\\n<cmd>\\n` and read until we see ok/ok-len/error for that epoch.
Returns (kind, payload). Raises TimeoutError if the server hangs.
"""
assert self.proc and self.proc.stdin and self.proc.stdout
self.proc.stdin.write(f"(epoch {epoch})\n{cmd}\n")
self.proc.stdin.flush()
payload = f"(epoch {epoch})\n{cmd}\n".encode("utf-8")
try:
self.proc.stdin.write(payload)
self.proc.stdin.flush()
except (BrokenPipeError, OSError):
raise RuntimeError("sx_server stdin closed")
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
remaining = deadline - time.monotonic()
if remaining <= 0:
raise TimeoutError(f"epoch {epoch} exceeded timeout {timeout}")
line = self._readline_with_timeout(remaining)
if not line:
line = self._readline(remaining)
if line is None:
raise RuntimeError("sx_server closed stdout mid-epoch")
m = RX_OK_INLINE.match(line)
if m:
@@ -649,7 +703,10 @@ class ServerSession:
m = RX_OK_LEN.match(line)
if m:
e = int(m.group(1))
val = self._readline_with_timeout(remaining)
remaining2 = deadline - time.monotonic()
if remaining2 <= 0:
raise TimeoutError(f"epoch {epoch} exceeded timeout {timeout}")
val = self._readline(remaining2)
if val is None:
val = ""
val = val.rstrip("\n")
@@ -665,58 +722,107 @@ class ServerSession:
# Other output — (ready), comment, noise — ignore
raise TimeoutError(f"epoch {epoch} exceeded timeout {timeout}")
def _readline_with_timeout(self, timeout: float) -> str | None:
"""Read one line with a timeout. On Linux we use a thread-wrapped read
since there's no portable non-blocking readline on a subprocess pipe.
"""
assert self.proc and self.proc.stdout
result: list[str | None] = [None]
done = threading.Event()
def reader() -> None:
try:
result[0] = self.proc.stdout.readline() # type: ignore[union-attr]
except Exception:
result[0] = None
finally:
done.set()
th = threading.Thread(target=reader, daemon=True)
th.start()
done.wait(timeout=timeout)
if not done.is_set():
# Hang — kill the process; caller will restart
try:
self.proc.kill()
except Exception:
pass
raise TimeoutError("readline timeout")
return result[0]
def run_test(self, epoch: int, js_source: str) -> tuple[str, str]:
def run_test(self, epoch: int, js_source: str):
escaped = sx_escape_for_nested_eval(js_source)
cmd = f'(eval "(js-eval \\"{escaped}\\")")'
return self._run_and_collect(epoch, cmd, timeout=self.per_test_timeout)
# ---------------------------------------------------------------------------
# Parallel workers
# ---------------------------------------------------------------------------
def _worker_run(args):
"""Run a shard of tests in this process. Returns list of (rel, category, status, reason).
Each worker keeps its own long-lived ServerSession. Restarts on timeout/crash.
"""
shard_tests, per_test_timeout, restart_every, worker_id = args
session = None
results = []
def get_session():
nonlocal session
if session is None:
session = ServerSession(per_test_timeout=per_test_timeout)
session.start()
return session
def restart():
nonlocal session
if session is not None:
try:
session.stop()
except Exception:
pass
session = None
try:
epoch = 100 + worker_id * 10000
done_n = 0
for t_data in shard_tests:
rel, category, src, negative_phase, negative_type = t_data
epoch += 1
done_n += 1
try:
sess = get_session()
kind, payload = sess.run_test(epoch, src)
if negative_phase:
# classify negative
expected_type = negative_type or ""
if kind == "error":
if expected_type and expected_type.lower() in payload.lower():
status, reason = "pass", f"negative: threw {expected_type} as expected"
else:
status, reason = "fail", f"negative: expected {expected_type}, got: {payload[:100]}"
else:
status, reason = "fail", f"negative: expected {expected_type}, but test completed normally"
else:
if kind == "ok":
status, reason = "pass", "passed"
else:
status, reason = "fail", classify_error(payload)
results.append((rel, category, status, reason))
except TimeoutError:
results.append((rel, category, "timeout", "per-test timeout"))
restart()
except Exception as e:
results.append((rel, category, "fail", f"runner-error: {e}"))
restart()
# Periodic restart to keep server healthy (memory bounded)
if restart_every > 0 and done_n % restart_every == 0:
restart()
finally:
if session is not None:
try:
session.stop()
except Exception:
pass
return results
# ---------------------------------------------------------------------------
# Run driver
# ---------------------------------------------------------------------------
def assemble_source(t: TestCase) -> str:
def assemble_source(t):
"""Return JS source to feed to js-eval. Harness is preloaded, so we only
append the test source (plus negative-test prep if needed).
"""
return t.src
def aggregate(results: list[TestResult]) -> dict:
by_cat: dict[str, dict] = defaultdict(
def aggregate(results):
by_cat = defaultdict(
lambda: {"pass": 0, "fail": 0, "skip": 0, "timeout": 0, "total": 0, "failures": Counter()}
)
totals = {"pass": 0, "fail": 0, "skip": 0, "timeout": 0, "total": 0}
failure_modes: Counter[str] = Counter()
failure_modes = Counter()
for r in results:
cat = by_cat[r.category]
cat[r.status] += 1
@@ -756,7 +862,7 @@ def aggregate(results: list[TestResult]) -> dict:
}
def write_markdown(scoreboard: dict, path: Path, pinned_commit: str, elapsed_s: float) -> None:
def write_markdown(scoreboard, path: Path, pinned_commit: str, elapsed_s: float) -> None:
t = scoreboard["totals"]
lines = [
"# test262 scoreboard",
@@ -796,16 +902,18 @@ def write_markdown(scoreboard: dict, path: Path, pinned_commit: str, elapsed_s:
path.write_text("\n".join(lines), encoding="utf-8")
def main(argv: list[str]) -> int:
def main(argv):
ap = argparse.ArgumentParser()
ap.add_argument("--limit", type=int, default=0, help="max tests to run (0 = all)")
ap.add_argument("--filter", type=str, action="append", default=None,
help="path prefix filter (repeatable; OR'd together)")
ap.add_argument("--per-test-timeout", type=float, default=DEFAULT_PER_TEST_TIMEOUT_S)
ap.add_argument("--restart-every", type=int, default=500,
help="restart server every N tests to keep memory bounded")
help="restart worker server every N tests (keeps memory bounded)")
ap.add_argument("--max-per-category", type=int, default=0,
help="cap runnable tests per category (0 = no cap)")
ap.add_argument("--workers", type=int, default=0,
help="number of parallel workers (0 = auto; min(nproc, 4))")
ap.add_argument("--output-json", type=str,
default=str(REPO / "lib" / "js" / "test262-scoreboard.json"))
ap.add_argument("--output-md", type=str,
@@ -833,9 +941,9 @@ def main(argv: list[str]) -> int:
all_paths = all_paths[: args.limit]
print(f"Discovered {len(all_paths)} test files.", file=sys.stderr)
tests: list[TestCase] = []
results: list[TestResult] = []
per_cat_count: dict[str, int] = defaultdict(int)
tests = []
results = []
per_cat_count = defaultdict(int)
for p in all_paths:
t = load_test(p)
if not t:
@@ -853,74 +961,62 @@ def main(argv: list[str]) -> int:
print(f"Will run {len(tests)} tests ({len(results)} skipped up front).", file=sys.stderr)
# Worker count
# Auto-default: on <=2-core machines, 1 worker beats 2 because OCaml eval is
# CPU-bound and two processes starve each other. On 4+ cores, use nproc-1
# (leave one core for OS/Python). Cap at 8 to avoid resource thrash.
n_workers = args.workers
if n_workers <= 0:
try:
cpu = os.cpu_count() or 2
except Exception:
cpu = 2
if cpu <= 2:
n_workers = 1
else:
n_workers = max(1, min(cpu - 1, 8))
n_workers = max(1, min(n_workers, len(tests))) if tests else 1
print(f"Using {n_workers} parallel worker(s).", file=sys.stderr)
# Shard tests across workers (round-robin so categories spread evenly)
shards = [[] for _ in range(n_workers)]
for i, t in enumerate(tests):
shards[i % n_workers].append(
(t.rel, t.category, t.src, t.fm.negative_phase, t.fm.negative_type)
)
t_run_start = time.monotonic()
session: ServerSession | None = None
def ensure_session() -> ServerSession:
nonlocal session
if session is None:
session = ServerSession(per_test_timeout=args.per_test_timeout)
session.start()
return session
def restart_session() -> None:
nonlocal session
if session is not None:
session.stop()
session = None
epoch = 100
done_n = 0
try:
for t in tests:
epoch += 1
done_n += 1
source = assemble_source(t)
try:
sess = ensure_session()
kind, payload = sess.run_test(epoch, source)
if t.fm.negative_phase:
ok, why = classify_negative_result(t.fm, kind, payload)
else:
ok, why = classify_positive_result(kind, payload)
results.append(
TestResult(
rel=t.rel,
category=t.category,
status="pass" if ok else "fail",
reason=why,
if n_workers == 1:
# Serial path — avoids multiprocessing overhead
worker_results = [_worker_run((shards[0], args.per_test_timeout, args.restart_every, 0))]
else:
with mp.Pool(n_workers) as pool:
worker_args = [
(shards[i], args.per_test_timeout, args.restart_every, i)
for i in range(n_workers)
]
# imap_unordered so progress prints show up sooner
collected = []
total_tests = len(tests)
last_print = time.monotonic()
for shard_out in pool.imap_unordered(_worker_run, worker_args):
collected.append(shard_out)
now = time.monotonic()
if now - last_print >= 5.0:
done_so_far = sum(len(s) for s in collected)
el = now - t_run_start
print(
f" worker returned: {done_so_far}/{total_tests} tests "
f"elapsed={el:.1f}s rate={done_so_far/max(el,0.001):.1f}/s",
file=sys.stderr,
)
)
except TimeoutError:
results.append(
TestResult(rel=t.rel, category=t.category, status="timeout", reason="per-test timeout")
)
restart_session()
except Exception as e:
results.append(
TestResult(rel=t.rel, category=t.category, status="fail", reason=f"runner-error: {e}")
)
restart_session()
last_print = now
worker_results = collected
# Periodic restart to keep server healthy
if args.restart_every > 0 and done_n % args.restart_every == 0:
restart_session()
if done_n % args.progress_every == 0:
pass_so_far = sum(1 for r in results if r.status == "pass")
fail_so_far = sum(1 for r in results if r.status == "fail")
to_so_far = sum(1 for r in results if r.status == "timeout")
el = time.monotonic() - t_run_start
print(
f" [{done_n}/{len(tests)}] pass={pass_so_far} fail={fail_so_far} "
f"timeout={to_so_far} elapsed={el:.1f}s "
f"rate={done_n/max(el,0.001):.1f}/s",
file=sys.stderr,
)
finally:
if session is not None:
session.stop()
for shard_out in worker_results:
for rel, category, status, reason in shard_out:
results.append(TestResult(rel=rel, category=category, status=status, reason=reason))
t_run_elapsed = time.monotonic() - t_run_start
print(f"\nFinished run in {t_run_elapsed:.1f}s", file=sys.stderr)
@@ -928,6 +1024,7 @@ def main(argv: list[str]) -> int:
scoreboard = aggregate(results)
scoreboard["pinned_commit"] = pinned_commit
scoreboard["elapsed_seconds"] = round(t_run_elapsed, 1)
scoreboard["workers"] = n_workers
out_json = Path(args.output_json)
out_json.parent.mkdir(parents=True, exist_ok=True)

View File

@@ -1,22 +1,22 @@
{
"totals": {
"pass": 66,
"fail": 206,
"pass": 67,
"fail": 204,
"skip": 39,
"timeout": 16,
"timeout": 17,
"total": 327,
"runnable": 288,
"pass_rate": 22.9
"pass_rate": 23.3
},
"categories": [
{
"category": "built-ins/Math",
"total": 327,
"pass": 66,
"fail": 206,
"pass": 67,
"fail": 204,
"skip": 39,
"timeout": 16,
"pass_rate": 22.9,
"timeout": 17,
"pass_rate": 23.3,
"top_failures": [
[
"ReferenceError (undefined symbol)",
@@ -28,11 +28,11 @@
],
[
"TypeError: not a function",
31
30
],
[
"Timeout",
16
17
],
[
"Unhandled: Not callable: {:random <js-math-random()> :trunc <js-math-tr",
@@ -52,21 +52,18 @@
],
[
"TypeError: not a function",
31
30
],
[
"Timeout",
16
17
],
[
"Unhandled: Not callable: {:random <js-math-random()> :trunc <js-math-tr",
1
],
[
"SyntaxError (parse/unsupported syntax)",
1
]
],
"pinned_commit": "d5e73fc8d2c663554fb72e2380a8c2bc1a318a33",
"elapsed_seconds": 275.0
"elapsed_seconds": 426.2,
"workers": 2
}

View File

@@ -1,31 +1,30 @@
# test262 scoreboard
Pinned commit: `d5e73fc8d2c663554fb72e2380a8c2bc1a318a33`
Wall time: 275.0s
Wall time: 426.2s
**Total:** 66/288 runnable passed (22.9%). Raw: pass=66 fail=206 skip=39 timeout=16 total=327.
**Total:** 67/288 runnable passed (23.3%). Raw: pass=67 fail=204 skip=39 timeout=17 total=327.
## Top failure modes
- **94x** ReferenceError (undefined symbol)
- **79x** Test262Error (assertion failed)
- **31x** TypeError: not a function
- **16x** Timeout
- **30x** TypeError: not a function
- **17x** Timeout
- **1x** Unhandled: Not callable: {:random <js-math-random()> :trunc <js-math-tr
- **1x** SyntaxError (parse/unsupported syntax)
## Categories (worst pass-rate first, min 10 runnable)
| Category | Pass | Fail | Skip | Timeout | Total | Pass % |
|---|---:|---:|---:|---:|---:|---:|
| built-ins/Math | 66 | 206 | 39 | 16 | 327 | 22.9% |
| built-ins/Math | 67 | 204 | 39 | 17 | 327 | 23.3% |
## Per-category top failures (min 10 runnable, worst first)
### built-ins/Math (66/288 — 22.9%)
### built-ins/Math (67/288 — 23.3%)
- **94x** ReferenceError (undefined symbol)
- **79x** Test262Error (assertion failed)
- **31x** TypeError: not a function
- **16x** Timeout
- **30x** TypeError: not a function
- **17x** Timeout
- **1x** Unhandled: Not callable: {:random <js-math-random()> :trunc <js-math-tr