#!/usr/bin/env python3 """lua-conformance — run the PUC-Rio Lua 5.1 test suite against Lua-on-SX. Walks lib/lua/lua-tests/*.lua, evaluates each via `lua-eval-ast` on a long-lived sx_server.exe subprocess, classifies pass/fail/timeout per file, and writes lib/lua/scoreboard.{json,md}. Modelled on lib/js/test262-runner.py but much simpler: each Lua test file is its own unit (they're self-contained assertion scripts; they pass if they complete without raising). No harness stub, no frontmatter, no worker pool. Usage: python3 lib/lua/conformance.py python3 lib/lua/conformance.py --filter locals python3 lib/lua/conformance.py --per-test-timeout 3 -v """ from __future__ import annotations import argparse import json import os import re import select import subprocess import sys import time from collections import Counter from pathlib import Path REPO = Path(__file__).resolve().parents[2] SX_SERVER_PRIMARY = REPO / "hosts" / "ocaml" / "_build" / "default" / "bin" / "sx_server.exe" SX_SERVER_FALLBACK = Path("/root/rose-ash/hosts/ocaml/_build/default/bin/sx_server.exe") TESTS_DIR = REPO / "lib" / "lua" / "lua-tests" DEFAULT_TIMEOUT = 8.0 # Files that require facilities we don't (and won't soon) support. # Still classified as skip rather than fail so the scoreboard stays honest. HARDCODED_SKIP = { "all.lua": "driver uses dofile to chain other tests", "api.lua": "requires testC (C debug library)", "checktable.lua": "internal debug helpers", "code.lua": "bytecode inspection via debug library", "db.lua": "debug library", "files.lua": "io library", "gc.lua": "collectgarbage / finalisers", "main.lua": "standalone interpreter driver", } RX_OK_INLINE = re.compile(r"^\(ok (\d+) (.*)\)\s*$") RX_OK_LEN = re.compile(r"^\(ok-len (\d+) \d+\)\s*$") RX_ERR = re.compile(r"^\(error (\d+) (.*)\)\s*$") def pick_sx_server() -> Path: if SX_SERVER_PRIMARY.exists(): return SX_SERVER_PRIMARY return SX_SERVER_FALLBACK def sx_escape_nested(s: str) -> str: """Two-level escape: (eval "(lua-eval-ast \"\")"). Outer literal is consumed by `eval` then the inner literal by `lua-eval-ast`. """ inner = ( s.replace("\\", "\\\\") .replace('"', '\\"') .replace("\n", "\\n") .replace("\r", "\\r") .replace("\t", "\\t") ) return inner.replace("\\", "\\\\").replace('"', '\\"') def classify_error(msg: str) -> str: m = msg.lower() sym = re.search(r"undefined symbol:\s*\\?\"?([^\"\s)]+)", msg, re.I) if sym: return f"undefined symbol: {sym.group(1).strip(chr(34))}" if "undefined symbol" in m: return "undefined symbol" if "lua: arith" in m: return "arith type error" if "lua-transpile" in m: return "transpile: unsupported node" if "lua-parse" in m: return "parse error" if "lua-tokenize" in m: return "tokenize error" if "unknown node" in m: return "unknown AST node" if "not yet supported" in m: return "not yet supported" if "nth: index out" in m or "nth:" in m: return "nth index error" if "timeout" in m: return "timeout" # Strip SX-side wrapping and trim trimmed = msg.strip('"').strip() return f"other: {trimmed[:80]}" class Session: def __init__(self, sx_server: Path, timeout: float): self.sx_server = sx_server self.timeout = timeout self.proc: subprocess.Popen | None = None self._buf = b"" self._fd = -1 def start(self) -> None: self.proc = subprocess.Popen( [str(self.sx_server)], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, cwd=str(REPO), bufsize=0, ) self._fd = self.proc.stdout.fileno() self._buf = b"" os.set_blocking(self._fd, False) self._wait_for("(ready)", timeout=15.0) self._run(1, '(load "lib/lua/tokenizer.sx")', 60) self._run(2, '(load "lib/lua/parser.sx")', 60) self._run(3, '(load "lib/lua/runtime.sx")', 60) self._run(4, '(load "lib/lua/transpile.sx")', 60) def stop(self) -> None: if self.proc is None: return try: self.proc.stdin.close() except Exception: pass try: self.proc.terminate() self.proc.wait(timeout=3) except Exception: try: self.proc.kill() except Exception: pass self.proc = None def _readline(self, timeout: float) -> str | None: deadline = time.monotonic() + timeout while True: nl = self._buf.find(b"\n") if nl >= 0: line = self._buf[: nl + 1] self._buf = self._buf[nl + 1 :] return line.decode("utf-8", errors="replace") remaining = deadline - time.monotonic() if remaining <= 0: raise TimeoutError("readline timeout") try: rlist, _, _ = select.select([self._fd], [], [], remaining) except (OSError, ValueError): return None if not rlist: raise TimeoutError("readline timeout") try: chunk = os.read(self._fd, 65536) except (BlockingIOError, InterruptedError): continue except OSError: return None if not chunk: if self._buf: rv = self._buf.decode("utf-8", errors="replace") self._buf = b"" return rv return None self._buf += chunk def _wait_for(self, token: str, timeout: float) -> None: start = time.monotonic() while time.monotonic() - start < timeout: line = self._readline(timeout - (time.monotonic() - start)) if line is None: raise RuntimeError("sx_server closed stdout before ready") if token in line: return raise TimeoutError(f"timeout waiting for {token}") def _run(self, epoch: int, cmd: str, timeout: float): payload = f"(epoch {epoch})\n{cmd}\n".encode("utf-8") try: self.proc.stdin.write(payload) self.proc.stdin.flush() except (BrokenPipeError, OSError): raise RuntimeError("sx_server stdin closed") deadline = time.monotonic() + timeout while time.monotonic() < deadline: remaining = deadline - time.monotonic() if remaining <= 0: raise TimeoutError(f"epoch {epoch} timeout") line = self._readline(remaining) if line is None: raise RuntimeError("sx_server closed stdout mid-epoch") m = RX_OK_INLINE.match(line) if m and int(m.group(1)) == epoch: return "ok", m.group(2) m = RX_OK_LEN.match(line) if m and int(m.group(1)) == epoch: val = self._readline(deadline - time.monotonic()) or "" return "ok", val.rstrip("\n") m = RX_ERR.match(line) if m and int(m.group(1)) == epoch: return "error", m.group(2) raise TimeoutError(f"epoch {epoch} timeout") def run_lua(self, epoch: int, src: str): escaped = sx_escape_nested(src) cmd = f'(eval "(lua-eval-ast \\"{escaped}\\")")' return self._run(epoch, cmd, self.timeout) def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--per-test-timeout", type=float, default=DEFAULT_TIMEOUT) ap.add_argument("--filter", type=str, default=None, help="only run tests whose filename contains this substring") ap.add_argument("-v", "--verbose", action="store_true") ap.add_argument("--no-scoreboard", action="store_true", help="do not write scoreboard.{json,md}") args = ap.parse_args() sx_server = pick_sx_server() if not sx_server.exists(): print(f"ERROR: sx_server not found at {sx_server}", file=sys.stderr) return 1 if not TESTS_DIR.exists(): print(f"ERROR: no tests dir at {TESTS_DIR}", file=sys.stderr) return 1 tests = sorted(TESTS_DIR.glob("*.lua")) if args.filter: tests = [p for p in tests if args.filter in p.name] if not tests: print("No tests matched.", file=sys.stderr) return 1 print(f"Running {len(tests)} Lua test file(s)…", file=sys.stderr) session = Session(sx_server, args.per_test_timeout) session.start() results = [] failure_modes: Counter = Counter() try: for i, path in enumerate(tests, start=1): name = path.name skip_reason = HARDCODED_SKIP.get(name) if skip_reason: results.append({"name": name, "status": "skip", "reason": skip_reason, "ms": 0}) if args.verbose: print(f" - {name}: SKIP ({skip_reason})") continue try: src = path.read_text(encoding="utf-8") except UnicodeDecodeError: src = path.read_text(encoding="latin-1") t0 = time.monotonic() try: kind, payload = session.run_lua(100 + i, src) ms = int((time.monotonic() - t0) * 1000) if kind == "ok": results.append({"name": name, "status": "pass", "reason": "", "ms": ms}) if args.verbose: print(f" + {name}: PASS ({ms}ms)") else: reason = classify_error(payload) failure_modes[reason] += 1 results.append({"name": name, "status": "fail", "reason": reason, "ms": ms}) if args.verbose: print(f" - {name}: FAIL — {reason}") except TimeoutError: ms = int((time.monotonic() - t0) * 1000) failure_modes["timeout"] += 1 results.append({"name": name, "status": "timeout", "reason": "per-test timeout", "ms": ms}) if args.verbose: print(f" - {name}: TIMEOUT ({ms}ms)") # Restart after a timeout to shed any stuck state. session.stop() session.start() finally: session.stop() n_pass = sum(1 for r in results if r["status"] == "pass") n_fail = sum(1 for r in results if r["status"] == "fail") n_timeout = sum(1 for r in results if r["status"] == "timeout") n_skip = sum(1 for r in results if r["status"] == "skip") n_total = len(results) n_runnable = n_total - n_skip pct = (n_pass / n_runnable * 100.0) if n_runnable else 0.0 print() print(f"Lua-on-SX conformance: {n_pass}/{n_runnable} runnable pass ({pct:.1f}%) " f"fail={n_fail} timeout={n_timeout} skip={n_skip} total={n_total}") if failure_modes: print("Top failure modes:") for mode, count in failure_modes.most_common(10): print(f" {count}x {mode}") if not args.no_scoreboard: sb = { "totals": { "pass": n_pass, "fail": n_fail, "timeout": n_timeout, "skip": n_skip, "total": n_total, "runnable": n_runnable, "pass_rate": round(pct, 1), }, "top_failure_modes": failure_modes.most_common(20), "results": results, } (REPO / "lib" / "lua" / "scoreboard.json").write_text( json.dumps(sb, indent=2), encoding="utf-8" ) md = [ "# Lua-on-SX conformance scoreboard", "", f"**Pass rate:** {n_pass}/{n_runnable} runnable ({pct:.1f}%)", f"fail={n_fail} timeout={n_timeout} skip={n_skip} total={n_total}", "", "## Top failure modes", "", ] for mode, count in failure_modes.most_common(10): md.append(f"- **{count}x** {mode}") md.extend(["", "## Per-test results", "", "| Test | Status | Reason | ms |", "|---|---|---|---:|"]) for r in results: reason = r["reason"] or "-" md.append(f"| {r['name']} | {r['status']} | {reason} | {r['ms']} |") (REPO / "lib" / "lua" / "scoreboard.md").write_text( "\n".join(md) + "\n", encoding="utf-8" ) return 0 if (n_fail == 0 and n_timeout == 0) else 1 if __name__ == "__main__": sys.exit(main())