Wraps `node tests/hs-run-filtered.js` so the agent can run/filter/kill test runs without per-call Bash permission prompts. Tools: - hs_test_run: run the suite (optional suite filter, start/end range, step_limit, verbose); enforces a wall-clock timeout via SIGTERM/SIGKILL on the child process group, so a hung CEK loop can't strand the agent. - hs_test_kill: SIGTERM/SIGKILL any background runner. - hs_test_regen: regenerate spec/tests/test-hyperscript-behavioral.sx. - hs_test_status: list any in-flight runners. Stdio JSON-RPC, same protocol as tools/mcp_services.py. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
327 lines
11 KiB
Python
Executable File
327 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""MCP server for hyperscript conformance test runs.
|
|
|
|
Wraps `node tests/hs-run-filtered.js` so the agent can run the suite (full
|
|
or filtered), regenerate the SX test file, and kill stale background runs
|
|
without going through Bash for each step.
|
|
|
|
Stdio JSON-RPC transport, same protocol as tools/mcp_services.py.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
|
|
PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
RUNNER_PATH = os.path.join(PROJECT_DIR, "tests/hs-run-filtered.js")
|
|
GEN_PATH = os.path.join(PROJECT_DIR, "tests/playwright/generate-sx-tests.py")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def text_result(s):
|
|
return {"content": [{"type": "text", "text": s}]}
|
|
|
|
|
|
def error_result(s):
|
|
return {"content": [{"type": "text", "text": s}], "isError": True}
|
|
|
|
|
|
def find_runner_pids():
|
|
"""Return PIDs of any running `node tests/hs-run-filtered.js` processes."""
|
|
try:
|
|
out = subprocess.run(
|
|
["pgrep", "-f", "node tests/hs-run-filtered.js"],
|
|
capture_output=True, text=True, timeout=5,
|
|
).stdout.strip()
|
|
except Exception:
|
|
return []
|
|
return [int(p) for p in out.split() if p.isdigit()]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool: hs_test_run
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def hs_test_run(args):
|
|
"""Run the HS test suite (optionally filtered) with a wall-clock timeout.
|
|
|
|
Args:
|
|
suite: HS_SUITE filter (e.g. "hs-upstream-put"). Optional.
|
|
start: HS_START — first test index. Optional.
|
|
end: HS_END — exclusive end test index. Optional.
|
|
step_limit: HS_STEP_LIMIT (default 200000).
|
|
timeout_secs: wall-clock cap (default 300, max 1800).
|
|
summary_only: if true (default), strip per-test output and keep
|
|
only Results / By category / Failure types / All failures.
|
|
verbose: if true, set HS_VERBOSE=1.
|
|
"""
|
|
if not os.path.isfile(RUNNER_PATH):
|
|
return error_result(f"Runner not found at {RUNNER_PATH}")
|
|
|
|
env = os.environ.copy()
|
|
if args.get("suite"):
|
|
env["HS_SUITE"] = str(args["suite"])
|
|
if args.get("start") is not None:
|
|
env["HS_START"] = str(int(args["start"]))
|
|
if args.get("end") is not None:
|
|
env["HS_END"] = str(int(args["end"]))
|
|
if args.get("step_limit") is not None:
|
|
env["HS_STEP_LIMIT"] = str(int(args["step_limit"]))
|
|
if args.get("verbose"):
|
|
env["HS_VERBOSE"] = "1"
|
|
|
|
timeout = max(10, min(int(args.get("timeout_secs", 300)), 1800))
|
|
summary_only = args.get("summary_only", True)
|
|
|
|
t0 = time.time()
|
|
proc = subprocess.Popen(
|
|
["node", RUNNER_PATH],
|
|
cwd=PROJECT_DIR,
|
|
env=env,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
# Detach into its own process group so we can kill the whole tree.
|
|
preexec_fn=os.setsid,
|
|
)
|
|
|
|
timed_out = False
|
|
try:
|
|
stdout, _ = proc.communicate(timeout=timeout)
|
|
except subprocess.TimeoutExpired:
|
|
timed_out = True
|
|
# Kill the whole process group (SIGTERM, then SIGKILL after 2s).
|
|
try:
|
|
os.killpg(proc.pid, signal.SIGTERM)
|
|
time.sleep(2)
|
|
if proc.poll() is None:
|
|
os.killpg(proc.pid, signal.SIGKILL)
|
|
except ProcessLookupError:
|
|
pass
|
|
try:
|
|
stdout, _ = proc.communicate(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
stdout = ""
|
|
|
|
elapsed = time.time() - t0
|
|
|
|
# Find the last "Tn " (verbose progress marker) or last failure line so
|
|
# we can tell the agent where things were when a timeout fired.
|
|
last_progress = ""
|
|
if timed_out:
|
|
for line in (stdout or "").splitlines()[::-1]:
|
|
if line.startswith(" SLOW:") or line.startswith(" TIMEOUT:") or "/1496 " in line:
|
|
last_progress = line.strip()
|
|
break
|
|
|
|
if summary_only:
|
|
# Keep only the post-summary sections.
|
|
lines = (stdout or "").splitlines()
|
|
kept = []
|
|
in_summary = False
|
|
for line in lines:
|
|
if line.startswith("Results:"):
|
|
in_summary = True
|
|
if in_summary:
|
|
kept.append(line)
|
|
elif line.startswith(" TIMEOUT:") or line.startswith(" SLOW:"):
|
|
# Useful for debugging hangs even when summary-only.
|
|
kept.append(line)
|
|
stdout = "\n".join(kept) or stdout
|
|
|
|
header = [f"## hs_test_run ({elapsed:.1f}s)"]
|
|
if timed_out:
|
|
header.append(f"⚠️ TIMED OUT after {timeout}s")
|
|
if last_progress:
|
|
header.append(f"Last activity: {last_progress}")
|
|
if env.get("HS_SUITE"):
|
|
header.append(f"suite: {env['HS_SUITE']}")
|
|
if env.get("HS_START") or env.get("HS_END"):
|
|
header.append(f"range: {env.get('HS_START', '0')}-{env.get('HS_END', 'end')}")
|
|
header.append(f"step_limit: {env.get('HS_STEP_LIMIT', '200000')}")
|
|
|
|
return text_result("\n".join(header) + "\n\n" + (stdout or "(no output)"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool: hs_test_kill
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def hs_test_kill(args):
|
|
"""Kill any running `node tests/hs-run-filtered.js` processes.
|
|
|
|
Args:
|
|
signal_name: "TERM" (default) or "KILL".
|
|
"""
|
|
sig = signal.SIGTERM if args.get("signal_name", "TERM") == "TERM" else signal.SIGKILL
|
|
pids = find_runner_pids()
|
|
if not pids:
|
|
return text_result("No runner processes found.")
|
|
killed, failed = [], []
|
|
for p in pids:
|
|
try:
|
|
os.kill(p, sig)
|
|
killed.append(p)
|
|
except ProcessLookupError:
|
|
pass
|
|
except Exception as e:
|
|
failed.append((p, str(e)))
|
|
msg = f"Sent SIG{args.get('signal_name', 'TERM')} to {len(killed)} runner(s): {killed}"
|
|
if failed:
|
|
msg += f"\nFailed: {failed}"
|
|
return text_result(msg)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool: hs_test_regen
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def hs_test_regen(args):
|
|
"""Regenerate spec/tests/test-hyperscript-behavioral.sx from the upstream JSON."""
|
|
if not os.path.isfile(GEN_PATH):
|
|
return error_result(f"Generator not found at {GEN_PATH}")
|
|
try:
|
|
r = subprocess.run(
|
|
["python3", GEN_PATH],
|
|
cwd=PROJECT_DIR, capture_output=True, text=True, timeout=120,
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
return error_result("Generator timed out (>120s)")
|
|
out = (r.stdout or "") + (r.stderr or "")
|
|
# Trim — generator prints a long category list; keep just the tail.
|
|
lines = out.splitlines()
|
|
if len(lines) > 25:
|
|
out = "...\n" + "\n".join(lines[-25:])
|
|
status = "ok" if r.returncode == 0 else f"exit {r.returncode}"
|
|
return text_result(f"## hs_test_regen ({status})\n\n{out}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tool: hs_test_status
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def hs_test_status(args):
|
|
"""Show whether any runner is in flight, plus runner/generator paths."""
|
|
pids = find_runner_pids()
|
|
info = [f"runner: {RUNNER_PATH}", f"generator: {GEN_PATH}"]
|
|
if pids:
|
|
info.append(f"running pids: {pids}")
|
|
else:
|
|
info.append("running pids: (none)")
|
|
return text_result("\n".join(info))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# JSON-RPC dispatch
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def tool(name, description, properties, required):
|
|
return {
|
|
"name": name,
|
|
"description": description,
|
|
"inputSchema": {"type": "object", "properties": properties, "required": required},
|
|
}
|
|
|
|
|
|
TOOLS = [
|
|
tool(
|
|
"hs_test_run",
|
|
"Run the hyperscript conformance suite (node tests/hs-run-filtered.js) with a "
|
|
"wall-clock timeout. Optionally filter by suite or test index range. Always "
|
|
"kills the child process group on timeout.",
|
|
{
|
|
"suite": {"type": "string", "description": "HS_SUITE filter (e.g. 'hs-upstream-put')"},
|
|
"start": {"type": "integer", "description": "First test index (HS_START)"},
|
|
"end": {"type": "integer", "description": "Exclusive end test index (HS_END)"},
|
|
"step_limit": {"type": "integer", "description": "HS_STEP_LIMIT (default 200000)"},
|
|
"timeout_secs": {"type": "integer", "description": "Wall-clock cap (default 300, max 1800)"},
|
|
"summary_only": {"type": "boolean", "description": "Trim per-test output (default true)"},
|
|
"verbose": {"type": "boolean", "description": "Set HS_VERBOSE=1"},
|
|
},
|
|
[],
|
|
),
|
|
tool(
|
|
"hs_test_kill",
|
|
"Kill any background hs-run-filtered.js processes.",
|
|
{"signal_name": {"type": "string", "description": "TERM (default) or KILL"}},
|
|
[],
|
|
),
|
|
tool(
|
|
"hs_test_regen",
|
|
"Regenerate spec/tests/test-hyperscript-behavioral.sx from the upstream JSON.",
|
|
{},
|
|
[],
|
|
),
|
|
tool(
|
|
"hs_test_status",
|
|
"Show whether any runner is currently in flight.",
|
|
{},
|
|
[],
|
|
),
|
|
]
|
|
|
|
|
|
def handle_tool(name, args):
|
|
match name:
|
|
case "hs_test_run":
|
|
return hs_test_run(args)
|
|
case "hs_test_kill":
|
|
return hs_test_kill(args)
|
|
case "hs_test_regen":
|
|
return hs_test_regen(args)
|
|
case "hs_test_status":
|
|
return hs_test_status(args)
|
|
case _:
|
|
return error_result(f"Unknown tool: {name}")
|
|
|
|
|
|
def dispatch(method, params):
|
|
match method:
|
|
case "initialize":
|
|
return {
|
|
"protocolVersion": "2024-11-05",
|
|
"capabilities": {"tools": {}},
|
|
"serverInfo": {"name": "hs-test", "version": "0.1.0"},
|
|
}
|
|
case "notifications/initialized":
|
|
return None
|
|
case "tools/list":
|
|
return {"tools": TOOLS}
|
|
case "tools/call":
|
|
name = params["name"]
|
|
args = params.get("arguments", {}) or {}
|
|
try:
|
|
return handle_tool(name, args)
|
|
except Exception as e:
|
|
return error_result(f"Error: {e}")
|
|
case _:
|
|
return None
|
|
|
|
|
|
def main():
|
|
for line in sys.stdin:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
msg = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
method = msg.get("method", "")
|
|
params = msg.get("params", {})
|
|
msg_id = msg.get("id")
|
|
result = dispatch(method, params)
|
|
if msg_id is not None and result is not None:
|
|
print(json.dumps({"jsonrpc": "2.0", "id": msg_id, "result": result}), flush=True)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|