Files
rose-ash/tools/mcp_hs_test.py
giles f8d30f50fb mcp: add hs-test server for hyperscript conformance runs
Wraps `node tests/hs-run-filtered.js` so the agent can run/filter/kill
test runs without per-call Bash permission prompts. Tools:

- hs_test_run: run the suite (optional suite filter, start/end range,
  step_limit, verbose); enforces a wall-clock timeout via SIGTERM/SIGKILL
  on the child process group, so a hung CEK loop can't strand the agent.
- hs_test_kill: SIGTERM/SIGKILL any background runner.
- hs_test_regen: regenerate spec/tests/test-hyperscript-behavioral.sx.
- hs_test_status: list any in-flight runners.

Stdio JSON-RPC, same protocol as tools/mcp_services.py.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 10:56:50 +00:00

327 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
"""MCP server for hyperscript conformance test runs.
Wraps `node tests/hs-run-filtered.js` so the agent can run the suite (full
or filtered), regenerate the SX test file, and kill stale background runs
without going through Bash for each step.
Stdio JSON-RPC transport, same protocol as tools/mcp_services.py.
"""
import json
import os
import re
import signal
import subprocess
import sys
import time
PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
RUNNER_PATH = os.path.join(PROJECT_DIR, "tests/hs-run-filtered.js")
GEN_PATH = os.path.join(PROJECT_DIR, "tests/playwright/generate-sx-tests.py")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def text_result(s):
return {"content": [{"type": "text", "text": s}]}
def error_result(s):
return {"content": [{"type": "text", "text": s}], "isError": True}
def find_runner_pids():
"""Return PIDs of any running `node tests/hs-run-filtered.js` processes."""
try:
out = subprocess.run(
["pgrep", "-f", "node tests/hs-run-filtered.js"],
capture_output=True, text=True, timeout=5,
).stdout.strip()
except Exception:
return []
return [int(p) for p in out.split() if p.isdigit()]
# ---------------------------------------------------------------------------
# Tool: hs_test_run
# ---------------------------------------------------------------------------
def hs_test_run(args):
"""Run the HS test suite (optionally filtered) with a wall-clock timeout.
Args:
suite: HS_SUITE filter (e.g. "hs-upstream-put"). Optional.
start: HS_START — first test index. Optional.
end: HS_END — exclusive end test index. Optional.
step_limit: HS_STEP_LIMIT (default 200000).
timeout_secs: wall-clock cap (default 300, max 1800).
summary_only: if true (default), strip per-test output and keep
only Results / By category / Failure types / All failures.
verbose: if true, set HS_VERBOSE=1.
"""
if not os.path.isfile(RUNNER_PATH):
return error_result(f"Runner not found at {RUNNER_PATH}")
env = os.environ.copy()
if args.get("suite"):
env["HS_SUITE"] = str(args["suite"])
if args.get("start") is not None:
env["HS_START"] = str(int(args["start"]))
if args.get("end") is not None:
env["HS_END"] = str(int(args["end"]))
if args.get("step_limit") is not None:
env["HS_STEP_LIMIT"] = str(int(args["step_limit"]))
if args.get("verbose"):
env["HS_VERBOSE"] = "1"
timeout = max(10, min(int(args.get("timeout_secs", 300)), 1800))
summary_only = args.get("summary_only", True)
t0 = time.time()
proc = subprocess.Popen(
["node", RUNNER_PATH],
cwd=PROJECT_DIR,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
# Detach into its own process group so we can kill the whole tree.
preexec_fn=os.setsid,
)
timed_out = False
try:
stdout, _ = proc.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
timed_out = True
# Kill the whole process group (SIGTERM, then SIGKILL after 2s).
try:
os.killpg(proc.pid, signal.SIGTERM)
time.sleep(2)
if proc.poll() is None:
os.killpg(proc.pid, signal.SIGKILL)
except ProcessLookupError:
pass
try:
stdout, _ = proc.communicate(timeout=5)
except subprocess.TimeoutExpired:
stdout = ""
elapsed = time.time() - t0
# Find the last "Tn " (verbose progress marker) or last failure line so
# we can tell the agent where things were when a timeout fired.
last_progress = ""
if timed_out:
for line in (stdout or "").splitlines()[::-1]:
if line.startswith(" SLOW:") or line.startswith(" TIMEOUT:") or "/1496 " in line:
last_progress = line.strip()
break
if summary_only:
# Keep only the post-summary sections.
lines = (stdout or "").splitlines()
kept = []
in_summary = False
for line in lines:
if line.startswith("Results:"):
in_summary = True
if in_summary:
kept.append(line)
elif line.startswith(" TIMEOUT:") or line.startswith(" SLOW:"):
# Useful for debugging hangs even when summary-only.
kept.append(line)
stdout = "\n".join(kept) or stdout
header = [f"## hs_test_run ({elapsed:.1f}s)"]
if timed_out:
header.append(f"⚠️ TIMED OUT after {timeout}s")
if last_progress:
header.append(f"Last activity: {last_progress}")
if env.get("HS_SUITE"):
header.append(f"suite: {env['HS_SUITE']}")
if env.get("HS_START") or env.get("HS_END"):
header.append(f"range: {env.get('HS_START', '0')}-{env.get('HS_END', 'end')}")
header.append(f"step_limit: {env.get('HS_STEP_LIMIT', '200000')}")
return text_result("\n".join(header) + "\n\n" + (stdout or "(no output)"))
# ---------------------------------------------------------------------------
# Tool: hs_test_kill
# ---------------------------------------------------------------------------
def hs_test_kill(args):
"""Kill any running `node tests/hs-run-filtered.js` processes.
Args:
signal_name: "TERM" (default) or "KILL".
"""
sig = signal.SIGTERM if args.get("signal_name", "TERM") == "TERM" else signal.SIGKILL
pids = find_runner_pids()
if not pids:
return text_result("No runner processes found.")
killed, failed = [], []
for p in pids:
try:
os.kill(p, sig)
killed.append(p)
except ProcessLookupError:
pass
except Exception as e:
failed.append((p, str(e)))
msg = f"Sent SIG{args.get('signal_name', 'TERM')} to {len(killed)} runner(s): {killed}"
if failed:
msg += f"\nFailed: {failed}"
return text_result(msg)
# ---------------------------------------------------------------------------
# Tool: hs_test_regen
# ---------------------------------------------------------------------------
def hs_test_regen(args):
"""Regenerate spec/tests/test-hyperscript-behavioral.sx from the upstream JSON."""
if not os.path.isfile(GEN_PATH):
return error_result(f"Generator not found at {GEN_PATH}")
try:
r = subprocess.run(
["python3", GEN_PATH],
cwd=PROJECT_DIR, capture_output=True, text=True, timeout=120,
)
except subprocess.TimeoutExpired:
return error_result("Generator timed out (>120s)")
out = (r.stdout or "") + (r.stderr or "")
# Trim — generator prints a long category list; keep just the tail.
lines = out.splitlines()
if len(lines) > 25:
out = "...\n" + "\n".join(lines[-25:])
status = "ok" if r.returncode == 0 else f"exit {r.returncode}"
return text_result(f"## hs_test_regen ({status})\n\n{out}")
# ---------------------------------------------------------------------------
# Tool: hs_test_status
# ---------------------------------------------------------------------------
def hs_test_status(args):
"""Show whether any runner is in flight, plus runner/generator paths."""
pids = find_runner_pids()
info = [f"runner: {RUNNER_PATH}", f"generator: {GEN_PATH}"]
if pids:
info.append(f"running pids: {pids}")
else:
info.append("running pids: (none)")
return text_result("\n".join(info))
# ---------------------------------------------------------------------------
# JSON-RPC dispatch
# ---------------------------------------------------------------------------
def tool(name, description, properties, required):
return {
"name": name,
"description": description,
"inputSchema": {"type": "object", "properties": properties, "required": required},
}
TOOLS = [
tool(
"hs_test_run",
"Run the hyperscript conformance suite (node tests/hs-run-filtered.js) with a "
"wall-clock timeout. Optionally filter by suite or test index range. Always "
"kills the child process group on timeout.",
{
"suite": {"type": "string", "description": "HS_SUITE filter (e.g. 'hs-upstream-put')"},
"start": {"type": "integer", "description": "First test index (HS_START)"},
"end": {"type": "integer", "description": "Exclusive end test index (HS_END)"},
"step_limit": {"type": "integer", "description": "HS_STEP_LIMIT (default 200000)"},
"timeout_secs": {"type": "integer", "description": "Wall-clock cap (default 300, max 1800)"},
"summary_only": {"type": "boolean", "description": "Trim per-test output (default true)"},
"verbose": {"type": "boolean", "description": "Set HS_VERBOSE=1"},
},
[],
),
tool(
"hs_test_kill",
"Kill any background hs-run-filtered.js processes.",
{"signal_name": {"type": "string", "description": "TERM (default) or KILL"}},
[],
),
tool(
"hs_test_regen",
"Regenerate spec/tests/test-hyperscript-behavioral.sx from the upstream JSON.",
{},
[],
),
tool(
"hs_test_status",
"Show whether any runner is currently in flight.",
{},
[],
),
]
def handle_tool(name, args):
match name:
case "hs_test_run":
return hs_test_run(args)
case "hs_test_kill":
return hs_test_kill(args)
case "hs_test_regen":
return hs_test_regen(args)
case "hs_test_status":
return hs_test_status(args)
case _:
return error_result(f"Unknown tool: {name}")
def dispatch(method, params):
match method:
case "initialize":
return {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {"name": "hs-test", "version": "0.1.0"},
}
case "notifications/initialized":
return None
case "tools/list":
return {"tools": TOOLS}
case "tools/call":
name = params["name"]
args = params.get("arguments", {}) or {}
try:
return handle_tool(name, args)
except Exception as e:
return error_result(f"Error: {e}")
case _:
return None
def main():
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
msg = json.loads(line)
except json.JSONDecodeError:
continue
method = msg.get("method", "")
params = msg.get("params", {})
msg_id = msg.get("id")
result = dispatch(method, params)
if msg_id is not None and result is not None:
print(json.dumps({"jsonrpc": "2.0", "id": msg_id, "result": result}), flush=True)
if __name__ == "__main__":
main()