#!/usr/bin/env python3 """MCP server for hyperscript conformance test runs. Wraps `node tests/hs-run-filtered.js` so the agent can run the suite (full or filtered), regenerate the SX test file, and kill stale background runs without going through Bash for each step. Stdio JSON-RPC transport, same protocol as tools/mcp_services.py. """ import json import os import re import signal import subprocess import sys import time PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) RUNNER_PATH = os.path.join(PROJECT_DIR, "tests/hs-run-filtered.js") GEN_PATH = os.path.join(PROJECT_DIR, "tests/playwright/generate-sx-tests.py") # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def text_result(s): return {"content": [{"type": "text", "text": s}]} def error_result(s): return {"content": [{"type": "text", "text": s}], "isError": True} def find_runner_pids(): """Return PIDs of any running `node tests/hs-run-filtered.js` processes.""" try: out = subprocess.run( ["pgrep", "-f", "node tests/hs-run-filtered.js"], capture_output=True, text=True, timeout=5, ).stdout.strip() except Exception: return [] return [int(p) for p in out.split() if p.isdigit()] # --------------------------------------------------------------------------- # Tool: hs_test_run # --------------------------------------------------------------------------- def hs_test_run(args): """Run the HS test suite (optionally filtered) with a wall-clock timeout. Args: suite: HS_SUITE filter (e.g. "hs-upstream-put"). Optional. start: HS_START — first test index. Optional. end: HS_END — exclusive end test index. Optional. step_limit: HS_STEP_LIMIT (default 200000). timeout_secs: wall-clock cap (default 300, max 1800). summary_only: if true (default), strip per-test output and keep only Results / By category / Failure types / All failures. verbose: if true, set HS_VERBOSE=1. """ if not os.path.isfile(RUNNER_PATH): return error_result(f"Runner not found at {RUNNER_PATH}") env = os.environ.copy() if args.get("suite"): env["HS_SUITE"] = str(args["suite"]) if args.get("start") is not None: env["HS_START"] = str(int(args["start"])) if args.get("end") is not None: env["HS_END"] = str(int(args["end"])) if args.get("step_limit") is not None: env["HS_STEP_LIMIT"] = str(int(args["step_limit"])) if args.get("verbose"): env["HS_VERBOSE"] = "1" timeout = max(10, min(int(args.get("timeout_secs", 300)), 1800)) summary_only = args.get("summary_only", True) t0 = time.time() proc = subprocess.Popen( ["node", RUNNER_PATH], cwd=PROJECT_DIR, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, # Detach into its own process group so we can kill the whole tree. preexec_fn=os.setsid, ) timed_out = False try: stdout, _ = proc.communicate(timeout=timeout) except subprocess.TimeoutExpired: timed_out = True # Kill the whole process group (SIGTERM, then SIGKILL after 2s). try: os.killpg(proc.pid, signal.SIGTERM) time.sleep(2) if proc.poll() is None: os.killpg(proc.pid, signal.SIGKILL) except ProcessLookupError: pass try: stdout, _ = proc.communicate(timeout=5) except subprocess.TimeoutExpired: stdout = "" elapsed = time.time() - t0 # Find the last "Tn " (verbose progress marker) or last failure line so # we can tell the agent where things were when a timeout fired. last_progress = "" if timed_out: for line in (stdout or "").splitlines()[::-1]: if line.startswith(" SLOW:") or line.startswith(" TIMEOUT:") or "/1496 " in line: last_progress = line.strip() break if summary_only: # Keep only the post-summary sections. lines = (stdout or "").splitlines() kept = [] in_summary = False for line in lines: if line.startswith("Results:"): in_summary = True if in_summary: kept.append(line) elif line.startswith(" TIMEOUT:") or line.startswith(" SLOW:"): # Useful for debugging hangs even when summary-only. kept.append(line) stdout = "\n".join(kept) or stdout header = [f"## hs_test_run ({elapsed:.1f}s)"] if timed_out: header.append(f"⚠️ TIMED OUT after {timeout}s") if last_progress: header.append(f"Last activity: {last_progress}") if env.get("HS_SUITE"): header.append(f"suite: {env['HS_SUITE']}") if env.get("HS_START") or env.get("HS_END"): header.append(f"range: {env.get('HS_START', '0')}-{env.get('HS_END', 'end')}") header.append(f"step_limit: {env.get('HS_STEP_LIMIT', '200000')}") return text_result("\n".join(header) + "\n\n" + (stdout or "(no output)")) # --------------------------------------------------------------------------- # Tool: hs_test_kill # --------------------------------------------------------------------------- def hs_test_kill(args): """Kill any running `node tests/hs-run-filtered.js` processes. Args: signal_name: "TERM" (default) or "KILL". """ sig = signal.SIGTERM if args.get("signal_name", "TERM") == "TERM" else signal.SIGKILL pids = find_runner_pids() if not pids: return text_result("No runner processes found.") killed, failed = [], [] for p in pids: try: os.kill(p, sig) killed.append(p) except ProcessLookupError: pass except Exception as e: failed.append((p, str(e))) msg = f"Sent SIG{args.get('signal_name', 'TERM')} to {len(killed)} runner(s): {killed}" if failed: msg += f"\nFailed: {failed}" return text_result(msg) # --------------------------------------------------------------------------- # Tool: hs_test_regen # --------------------------------------------------------------------------- def hs_test_regen(args): """Regenerate spec/tests/test-hyperscript-behavioral.sx from the upstream JSON.""" if not os.path.isfile(GEN_PATH): return error_result(f"Generator not found at {GEN_PATH}") try: r = subprocess.run( ["python3", GEN_PATH], cwd=PROJECT_DIR, capture_output=True, text=True, timeout=120, ) except subprocess.TimeoutExpired: return error_result("Generator timed out (>120s)") out = (r.stdout or "") + (r.stderr or "") # Trim — generator prints a long category list; keep just the tail. lines = out.splitlines() if len(lines) > 25: out = "...\n" + "\n".join(lines[-25:]) status = "ok" if r.returncode == 0 else f"exit {r.returncode}" return text_result(f"## hs_test_regen ({status})\n\n{out}") # --------------------------------------------------------------------------- # Tool: hs_test_status # --------------------------------------------------------------------------- def hs_test_status(args): """Show whether any runner is in flight, plus runner/generator paths.""" pids = find_runner_pids() info = [f"runner: {RUNNER_PATH}", f"generator: {GEN_PATH}"] if pids: info.append(f"running pids: {pids}") else: info.append("running pids: (none)") return text_result("\n".join(info)) # --------------------------------------------------------------------------- # JSON-RPC dispatch # --------------------------------------------------------------------------- def tool(name, description, properties, required): return { "name": name, "description": description, "inputSchema": {"type": "object", "properties": properties, "required": required}, } TOOLS = [ tool( "hs_test_run", "Run the hyperscript conformance suite (node tests/hs-run-filtered.js) with a " "wall-clock timeout. Optionally filter by suite or test index range. Always " "kills the child process group on timeout.", { "suite": {"type": "string", "description": "HS_SUITE filter (e.g. 'hs-upstream-put')"}, "start": {"type": "integer", "description": "First test index (HS_START)"}, "end": {"type": "integer", "description": "Exclusive end test index (HS_END)"}, "step_limit": {"type": "integer", "description": "HS_STEP_LIMIT (default 200000)"}, "timeout_secs": {"type": "integer", "description": "Wall-clock cap (default 300, max 1800)"}, "summary_only": {"type": "boolean", "description": "Trim per-test output (default true)"}, "verbose": {"type": "boolean", "description": "Set HS_VERBOSE=1"}, }, [], ), tool( "hs_test_kill", "Kill any background hs-run-filtered.js processes.", {"signal_name": {"type": "string", "description": "TERM (default) or KILL"}}, [], ), tool( "hs_test_regen", "Regenerate spec/tests/test-hyperscript-behavioral.sx from the upstream JSON.", {}, [], ), tool( "hs_test_status", "Show whether any runner is currently in flight.", {}, [], ), ] def handle_tool(name, args): match name: case "hs_test_run": return hs_test_run(args) case "hs_test_kill": return hs_test_kill(args) case "hs_test_regen": return hs_test_regen(args) case "hs_test_status": return hs_test_status(args) case _: return error_result(f"Unknown tool: {name}") def dispatch(method, params): match method: case "initialize": return { "protocolVersion": "2024-11-05", "capabilities": {"tools": {}}, "serverInfo": {"name": "hs-test", "version": "0.1.0"}, } case "notifications/initialized": return None case "tools/list": return {"tools": TOOLS} case "tools/call": name = params["name"] args = params.get("arguments", {}) or {} try: return handle_tool(name, args) except Exception as e: return error_result(f"Error: {e}") case _: return None def main(): for line in sys.stdin: line = line.strip() if not line: continue try: msg = json.loads(line) except json.JSONDecodeError: continue method = msg.get("method", "") params = msg.get("params", {}) msg_id = msg.get("id") result = dispatch(method, params) if msg_id is not None and result is not None: print(json.dumps({"jsonrpc": "2.0", "id": msg_id, "result": result}), flush=True) if __name__ == "__main__": main()