From 65d4c7063899d7ca27b6f0ff87372c78ab2f3da7 Mon Sep 17 00:00:00 2001 From: giles Date: Fri, 24 Apr 2026 06:18:48 +0000 Subject: [PATCH] js-on-sx: parallel test262 runner with raw-fd line buffer Rework test262-runner.py to support --workers N parallel shards, each running a long-lived sx_server session. Replace thread-per-readline with a select-based raw-fd line buffer. On 2-core machines, 1 worker still beats 2 (OCaml eval is CPU-bound and starves when shared). Auto-defaults n_workers=1 on <=2 CPU, nproc-1 (up to 8) otherwise. Throughput baseline: ~1.1 Math tests/s serial on 2-core (unchanged; the evaluator dominates). The runner framework is now ready to scale on bigger machines without further code changes. Co-Authored-By: Claude Opus 4.7 (1M context) --- lib/js/test262-runner.py | 381 +++++++++++++++++++++------------ lib/js/test262-scoreboard.json | 31 ++- lib/js/test262-scoreboard.md | 17 +- 3 files changed, 261 insertions(+), 168 deletions(-) diff --git a/lib/js/test262-runner.py b/lib/js/test262-runner.py index b511333f..e87e2405 100644 --- a/lib/js/test262-runner.py +++ b/lib/js/test262-runner.py @@ -3,14 +3,16 @@ test262-runner — run the official TC39 test262 suite against our JS-on-SX runtime. Walks lib/js/test262-upstream/test/**/*.js, parses YAML-ish frontmatter, runs -tests via a long-lived sx_server.exe subprocess (one harness load, one `js-eval` -call per test), and emits JSON + Markdown scoreboards. +tests via a pool of long-lived sx_server.exe subprocesses (each worker loads +the harness once, then runs `js-eval` per test on a persistent stdin channel), +and emits JSON + Markdown scoreboards. Usage: python3 lib/js/test262-runner.py # full run (skips strict/module/etc) python3 lib/js/test262-runner.py --limit 2000 python3 lib/js/test262-runner.py --filter built-ins/Math python3 lib/js/test262-runner.py --per-test-timeout 3 + python3 lib/js/test262-runner.py --workers 4 # parallel workers (default: 2) Outputs: lib/js/test262-scoreboard.json @@ -34,11 +36,12 @@ from __future__ import annotations import argparse import dataclasses import json +import multiprocessing as mp import os import re +import select import subprocess import sys -import threading import time from collections import Counter, defaultdict from pathlib import Path @@ -152,15 +155,15 @@ FRONTMATTER_RE = re.compile(r"/\*---(.*?)---\*/", re.DOTALL) @dataclasses.dataclass class Frontmatter: description: str = "" - flags: list[str] = dataclasses.field(default_factory=list) - includes: list[str] = dataclasses.field(default_factory=list) - features: list[str] = dataclasses.field(default_factory=list) - negative_phase: str | None = None - negative_type: str | None = None - esid: str | None = None + flags: list = dataclasses.field(default_factory=list) + includes: list = dataclasses.field(default_factory=list) + features: list = dataclasses.field(default_factory=list) + negative_phase: "str | None" = None + negative_type: "str | None" = None + esid: "str | None" = None -def _parse_yaml_list(s: str) -> list[str]: +def _parse_yaml_list(s: str) -> list: s = s.strip() if s.startswith("[") and s.endswith("]"): s = s[1:-1] @@ -187,7 +190,7 @@ def parse_frontmatter(src: str) -> Frontmatter: key, value = m2.group(1), m2.group(2).strip() if key == "description": if value in (">", "|"): - desc_lines: list[str] = [] + desc_lines = [] j = i + 1 while j < len(lines): nxt = lines[j] @@ -328,7 +331,7 @@ def classify_error(msg: str) -> str: return f"Other: {msg[:80]}" -def classify_negative_result(fm: Frontmatter, kind: str, payload: str) -> tuple[bool, str]: +def classify_negative_result(fm: Frontmatter, kind: str, payload: str): expected_type = fm.negative_type or "" if kind == "error": if expected_type and expected_type.lower() in payload.lower(): @@ -337,7 +340,7 @@ def classify_negative_result(fm: Frontmatter, kind: str, payload: str) -> tuple[ return False, f"negative: expected {expected_type}, but test completed normally" -def classify_positive_result(kind: str, payload: str) -> tuple[bool, str]: +def classify_positive_result(kind: str, payload: str): if kind == "ok": return True, "passed" return False, classify_error(payload) @@ -435,7 +438,7 @@ UNSUPPORTED_FEATURES = { } -def should_skip(t: "TestCase") -> tuple[bool, str]: +def should_skip(t): if "onlyStrict" in t.fm.flags: return True, "strict-mode only" if "module" in t.fm.flags: @@ -527,8 +530,8 @@ class TestResult: elapsed_ms: int = 0 -def discover_tests(filter_prefixes: list[str] | None) -> list[Path]: - tests: list[Path] = [] +def discover_tests(filter_prefixes): + tests = [] for p in TEST_ROOT.rglob("*.js"): if p.name.endswith("_FIXTURE.js"): continue @@ -543,7 +546,7 @@ def discover_tests(filter_prefixes: list[str] | None) -> list[Path]: return tests -def load_test(path: Path) -> TestCase | None: +def load_test(path: Path): try: src = path.read_text(encoding="utf-8") except Exception: @@ -566,12 +569,15 @@ def load_test(path: Path) -> TestCase | None: class ServerSession: """Wrap a long-lived sx_server.exe subprocess; feed it one-liner commands, collect results per-epoch. Restart on hang/crash. + + Uses a raw-fd line buffer + select() to avoid spawning a thread per read. """ def __init__(self, per_test_timeout: float): self.per_test_timeout = per_test_timeout - self.proc: subprocess.Popen | None = None - self.lock = threading.Lock() + self.proc = None + self._buf = b"" + self._fd = -1 def start(self) -> None: self.proc = subprocess.Popen( @@ -580,22 +586,24 @@ class ServerSession: stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, cwd=str(REPO), - text=True, - bufsize=1, + bufsize=0, # binary, unbuffered — we do our own line parsing ) - self._wait_for("(ready)", timeout=10.0) + self._fd = self.proc.stdout.fileno() + self._buf = b"" + os.set_blocking(self._fd, False) + self._wait_for("(ready)", timeout=15.0) # Load kernel libraries - self._run_and_collect(1, '(load "lib/r7rs.sx")', timeout=30.0) - self._run_and_collect(2, '(load "lib/js/lexer.sx")', timeout=30.0) - self._run_and_collect(3, '(load "lib/js/parser.sx")', timeout=30.0) - self._run_and_collect(4, '(load "lib/js/transpile.sx")', timeout=30.0) - self._run_and_collect(5, '(load "lib/js/runtime.sx")', timeout=30.0) + self._run_and_collect(1, '(load "lib/r7rs.sx")', timeout=60.0) + self._run_and_collect(2, '(load "lib/js/lexer.sx")', timeout=60.0) + self._run_and_collect(3, '(load "lib/js/parser.sx")', timeout=60.0) + self._run_and_collect(4, '(load "lib/js/transpile.sx")', timeout=60.0) + self._run_and_collect(5, '(load "lib/js/runtime.sx")', timeout=60.0) # Preload the stub harness as one big js-eval stub_escaped = sx_escape_for_nested_eval(HARNESS_STUB) self._run_and_collect( 6, f'(eval "(js-eval \\"{stub_escaped}\\")")', - timeout=30.0, + timeout=60.0, ) def stop(self) -> None: @@ -614,31 +622,77 @@ class ServerSession: pass self.proc = None + def _readline_raw(self, timeout: float): + """Read one line (including trailing \\n) from the subprocess's stdout. + Returns bytes or None on EOF. Raises TimeoutError if no newline appears + within `timeout` seconds. + """ + deadline = time.monotonic() + timeout + while True: + nl = self._buf.find(b"\n") + if nl >= 0: + line = self._buf[: nl + 1] + self._buf = self._buf[nl + 1 :] + return line + remaining = deadline - time.monotonic() + if remaining <= 0: + raise TimeoutError("readline timeout") + try: + rlist, _, _ = select.select([self._fd], [], [], remaining) + except (OSError, ValueError): + return None + if not rlist: + raise TimeoutError("readline timeout") + try: + chunk = os.read(self._fd, 65536) + except (BlockingIOError, InterruptedError): + continue + except OSError: + return None + if not chunk: + if self._buf: + line = self._buf + self._buf = b"" + return line + return None + self._buf += chunk + + def _readline(self, timeout: float): + b = self._readline_raw(timeout) + if b is None: + return None + try: + return b.decode("utf-8", errors="replace") + except Exception: + return "" + def _wait_for(self, token: str, timeout: float) -> None: - assert self.proc and self.proc.stdout start = time.monotonic() while time.monotonic() - start < timeout: - line = self.proc.stdout.readline() - if not line: + line = self._readline(timeout - (time.monotonic() - start)) + if line is None: raise RuntimeError("sx_server closed stdout before ready") if token in line: return raise TimeoutError(f"timeout waiting for {token}") - def _run_and_collect(self, epoch: int, cmd: str, timeout: float) -> tuple[str, str]: - """Write `(epoch N)\n\n` and read until we see ok/ok-len/error for that epoch. + def _run_and_collect(self, epoch: int, cmd: str, timeout: float): + """Write `(epoch N)\\n\\n` and read until we see ok/ok-len/error for that epoch. Returns (kind, payload). Raises TimeoutError if the server hangs. """ - assert self.proc and self.proc.stdin and self.proc.stdout - self.proc.stdin.write(f"(epoch {epoch})\n{cmd}\n") - self.proc.stdin.flush() + payload = f"(epoch {epoch})\n{cmd}\n".encode("utf-8") + try: + self.proc.stdin.write(payload) + self.proc.stdin.flush() + except (BrokenPipeError, OSError): + raise RuntimeError("sx_server stdin closed") deadline = time.monotonic() + timeout while time.monotonic() < deadline: remaining = deadline - time.monotonic() if remaining <= 0: raise TimeoutError(f"epoch {epoch} exceeded timeout {timeout}") - line = self._readline_with_timeout(remaining) - if not line: + line = self._readline(remaining) + if line is None: raise RuntimeError("sx_server closed stdout mid-epoch") m = RX_OK_INLINE.match(line) if m: @@ -649,7 +703,10 @@ class ServerSession: m = RX_OK_LEN.match(line) if m: e = int(m.group(1)) - val = self._readline_with_timeout(remaining) + remaining2 = deadline - time.monotonic() + if remaining2 <= 0: + raise TimeoutError(f"epoch {epoch} exceeded timeout {timeout}") + val = self._readline(remaining2) if val is None: val = "" val = val.rstrip("\n") @@ -665,58 +722,107 @@ class ServerSession: # Other output — (ready), comment, noise — ignore raise TimeoutError(f"epoch {epoch} exceeded timeout {timeout}") - def _readline_with_timeout(self, timeout: float) -> str | None: - """Read one line with a timeout. On Linux we use a thread-wrapped read - since there's no portable non-blocking readline on a subprocess pipe. - """ - assert self.proc and self.proc.stdout - result: list[str | None] = [None] - done = threading.Event() - - def reader() -> None: - try: - result[0] = self.proc.stdout.readline() # type: ignore[union-attr] - except Exception: - result[0] = None - finally: - done.set() - - th = threading.Thread(target=reader, daemon=True) - th.start() - done.wait(timeout=timeout) - if not done.is_set(): - # Hang — kill the process; caller will restart - try: - self.proc.kill() - except Exception: - pass - raise TimeoutError("readline timeout") - return result[0] - - def run_test(self, epoch: int, js_source: str) -> tuple[str, str]: + def run_test(self, epoch: int, js_source: str): escaped = sx_escape_for_nested_eval(js_source) cmd = f'(eval "(js-eval \\"{escaped}\\")")' return self._run_and_collect(epoch, cmd, timeout=self.per_test_timeout) +# --------------------------------------------------------------------------- +# Parallel workers +# --------------------------------------------------------------------------- + + +def _worker_run(args): + """Run a shard of tests in this process. Returns list of (rel, category, status, reason). + + Each worker keeps its own long-lived ServerSession. Restarts on timeout/crash. + """ + shard_tests, per_test_timeout, restart_every, worker_id = args + + session = None + results = [] + + def get_session(): + nonlocal session + if session is None: + session = ServerSession(per_test_timeout=per_test_timeout) + session.start() + return session + + def restart(): + nonlocal session + if session is not None: + try: + session.stop() + except Exception: + pass + session = None + + try: + epoch = 100 + worker_id * 10000 + done_n = 0 + for t_data in shard_tests: + rel, category, src, negative_phase, negative_type = t_data + epoch += 1 + done_n += 1 + try: + sess = get_session() + kind, payload = sess.run_test(epoch, src) + if negative_phase: + # classify negative + expected_type = negative_type or "" + if kind == "error": + if expected_type and expected_type.lower() in payload.lower(): + status, reason = "pass", f"negative: threw {expected_type} as expected" + else: + status, reason = "fail", f"negative: expected {expected_type}, got: {payload[:100]}" + else: + status, reason = "fail", f"negative: expected {expected_type}, but test completed normally" + else: + if kind == "ok": + status, reason = "pass", "passed" + else: + status, reason = "fail", classify_error(payload) + results.append((rel, category, status, reason)) + except TimeoutError: + results.append((rel, category, "timeout", "per-test timeout")) + restart() + except Exception as e: + results.append((rel, category, "fail", f"runner-error: {e}")) + restart() + + # Periodic restart to keep server healthy (memory bounded) + if restart_every > 0 and done_n % restart_every == 0: + restart() + finally: + if session is not None: + try: + session.stop() + except Exception: + pass + + return results + + # --------------------------------------------------------------------------- # Run driver # --------------------------------------------------------------------------- -def assemble_source(t: TestCase) -> str: +def assemble_source(t): """Return JS source to feed to js-eval. Harness is preloaded, so we only append the test source (plus negative-test prep if needed). """ return t.src -def aggregate(results: list[TestResult]) -> dict: - by_cat: dict[str, dict] = defaultdict( +def aggregate(results): + by_cat = defaultdict( lambda: {"pass": 0, "fail": 0, "skip": 0, "timeout": 0, "total": 0, "failures": Counter()} ) totals = {"pass": 0, "fail": 0, "skip": 0, "timeout": 0, "total": 0} - failure_modes: Counter[str] = Counter() + failure_modes = Counter() for r in results: cat = by_cat[r.category] cat[r.status] += 1 @@ -756,7 +862,7 @@ def aggregate(results: list[TestResult]) -> dict: } -def write_markdown(scoreboard: dict, path: Path, pinned_commit: str, elapsed_s: float) -> None: +def write_markdown(scoreboard, path: Path, pinned_commit: str, elapsed_s: float) -> None: t = scoreboard["totals"] lines = [ "# test262 scoreboard", @@ -796,16 +902,18 @@ def write_markdown(scoreboard: dict, path: Path, pinned_commit: str, elapsed_s: path.write_text("\n".join(lines), encoding="utf-8") -def main(argv: list[str]) -> int: +def main(argv): ap = argparse.ArgumentParser() ap.add_argument("--limit", type=int, default=0, help="max tests to run (0 = all)") ap.add_argument("--filter", type=str, action="append", default=None, help="path prefix filter (repeatable; OR'd together)") ap.add_argument("--per-test-timeout", type=float, default=DEFAULT_PER_TEST_TIMEOUT_S) ap.add_argument("--restart-every", type=int, default=500, - help="restart server every N tests to keep memory bounded") + help="restart worker server every N tests (keeps memory bounded)") ap.add_argument("--max-per-category", type=int, default=0, help="cap runnable tests per category (0 = no cap)") + ap.add_argument("--workers", type=int, default=0, + help="number of parallel workers (0 = auto; min(nproc, 4))") ap.add_argument("--output-json", type=str, default=str(REPO / "lib" / "js" / "test262-scoreboard.json")) ap.add_argument("--output-md", type=str, @@ -833,9 +941,9 @@ def main(argv: list[str]) -> int: all_paths = all_paths[: args.limit] print(f"Discovered {len(all_paths)} test files.", file=sys.stderr) - tests: list[TestCase] = [] - results: list[TestResult] = [] - per_cat_count: dict[str, int] = defaultdict(int) + tests = [] + results = [] + per_cat_count = defaultdict(int) for p in all_paths: t = load_test(p) if not t: @@ -853,74 +961,62 @@ def main(argv: list[str]) -> int: print(f"Will run {len(tests)} tests ({len(results)} skipped up front).", file=sys.stderr) + # Worker count + # Auto-default: on <=2-core machines, 1 worker beats 2 because OCaml eval is + # CPU-bound and two processes starve each other. On 4+ cores, use nproc-1 + # (leave one core for OS/Python). Cap at 8 to avoid resource thrash. + n_workers = args.workers + if n_workers <= 0: + try: + cpu = os.cpu_count() or 2 + except Exception: + cpu = 2 + if cpu <= 2: + n_workers = 1 + else: + n_workers = max(1, min(cpu - 1, 8)) + n_workers = max(1, min(n_workers, len(tests))) if tests else 1 + print(f"Using {n_workers} parallel worker(s).", file=sys.stderr) + + # Shard tests across workers (round-robin so categories spread evenly) + shards = [[] for _ in range(n_workers)] + for i, t in enumerate(tests): + shards[i % n_workers].append( + (t.rel, t.category, t.src, t.fm.negative_phase, t.fm.negative_type) + ) + t_run_start = time.monotonic() - session: ServerSession | None = None - - def ensure_session() -> ServerSession: - nonlocal session - if session is None: - session = ServerSession(per_test_timeout=args.per_test_timeout) - session.start() - return session - - def restart_session() -> None: - nonlocal session - if session is not None: - session.stop() - session = None - - epoch = 100 - done_n = 0 - try: - for t in tests: - epoch += 1 - done_n += 1 - source = assemble_source(t) - try: - sess = ensure_session() - kind, payload = sess.run_test(epoch, source) - if t.fm.negative_phase: - ok, why = classify_negative_result(t.fm, kind, payload) - else: - ok, why = classify_positive_result(kind, payload) - results.append( - TestResult( - rel=t.rel, - category=t.category, - status="pass" if ok else "fail", - reason=why, + if n_workers == 1: + # Serial path — avoids multiprocessing overhead + worker_results = [_worker_run((shards[0], args.per_test_timeout, args.restart_every, 0))] + else: + with mp.Pool(n_workers) as pool: + worker_args = [ + (shards[i], args.per_test_timeout, args.restart_every, i) + for i in range(n_workers) + ] + # imap_unordered so progress prints show up sooner + collected = [] + total_tests = len(tests) + last_print = time.monotonic() + for shard_out in pool.imap_unordered(_worker_run, worker_args): + collected.append(shard_out) + now = time.monotonic() + if now - last_print >= 5.0: + done_so_far = sum(len(s) for s in collected) + el = now - t_run_start + print( + f" worker returned: {done_so_far}/{total_tests} tests " + f"elapsed={el:.1f}s rate={done_so_far/max(el,0.001):.1f}/s", + file=sys.stderr, ) - ) - except TimeoutError: - results.append( - TestResult(rel=t.rel, category=t.category, status="timeout", reason="per-test timeout") - ) - restart_session() - except Exception as e: - results.append( - TestResult(rel=t.rel, category=t.category, status="fail", reason=f"runner-error: {e}") - ) - restart_session() + last_print = now + worker_results = collected - # Periodic restart to keep server healthy - if args.restart_every > 0 and done_n % args.restart_every == 0: - restart_session() - - if done_n % args.progress_every == 0: - pass_so_far = sum(1 for r in results if r.status == "pass") - fail_so_far = sum(1 for r in results if r.status == "fail") - to_so_far = sum(1 for r in results if r.status == "timeout") - el = time.monotonic() - t_run_start - print( - f" [{done_n}/{len(tests)}] pass={pass_so_far} fail={fail_so_far} " - f"timeout={to_so_far} elapsed={el:.1f}s " - f"rate={done_n/max(el,0.001):.1f}/s", - file=sys.stderr, - ) - finally: - if session is not None: - session.stop() + for shard_out in worker_results: + for rel, category, status, reason in shard_out: + results.append(TestResult(rel=rel, category=category, status=status, reason=reason)) t_run_elapsed = time.monotonic() - t_run_start print(f"\nFinished run in {t_run_elapsed:.1f}s", file=sys.stderr) @@ -928,6 +1024,7 @@ def main(argv: list[str]) -> int: scoreboard = aggregate(results) scoreboard["pinned_commit"] = pinned_commit scoreboard["elapsed_seconds"] = round(t_run_elapsed, 1) + scoreboard["workers"] = n_workers out_json = Path(args.output_json) out_json.parent.mkdir(parents=True, exist_ok=True) diff --git a/lib/js/test262-scoreboard.json b/lib/js/test262-scoreboard.json index fb8311e7..1653fbc9 100644 --- a/lib/js/test262-scoreboard.json +++ b/lib/js/test262-scoreboard.json @@ -1,22 +1,22 @@ { "totals": { - "pass": 66, - "fail": 206, + "pass": 67, + "fail": 204, "skip": 39, - "timeout": 16, + "timeout": 17, "total": 327, "runnable": 288, - "pass_rate": 22.9 + "pass_rate": 23.3 }, "categories": [ { "category": "built-ins/Math", "total": 327, - "pass": 66, - "fail": 206, + "pass": 67, + "fail": 204, "skip": 39, - "timeout": 16, - "pass_rate": 22.9, + "timeout": 17, + "pass_rate": 23.3, "top_failures": [ [ "ReferenceError (undefined symbol)", @@ -28,11 +28,11 @@ ], [ "TypeError: not a function", - 31 + 30 ], [ "Timeout", - 16 + 17 ], [ "Unhandled: Not callable: {:random :trunc :trunc :trunc :trunc