Files
rose-ash/test/runner.py
giles e8bc228c7f
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 11m37s
Rebrand sexp → sx across web platform (173 files)
Rename all sexp directories, files, identifiers, and references to sx.
artdag/ excluded (separate media processing DSL).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-01 11:06:57 +00:00

223 lines
7.0 KiB
Python

"""Pytest subprocess runner + in-memory result storage."""
from __future__ import annotations
import asyncio
import json
import logging
import os
import time
from collections import OrderedDict
from pathlib import Path
log = logging.getLogger(__name__)
# In-memory state
_last_result: dict | None = None
_running: bool = False
# Each service group runs in its own pytest subprocess with its own PYTHONPATH
_SERVICE_GROUPS: list[dict] = [
{"name": "shared", "dirs": ["shared/tests/", "shared/sx/tests/"],
"pythonpath": None},
{"name": "blog", "dirs": ["blog/tests/"], "pythonpath": "/app/blog"},
{"name": "market", "dirs": ["market/tests/"], "pythonpath": "/app/market"},
{"name": "cart", "dirs": ["cart/tests/"], "pythonpath": "/app/cart"},
{"name": "events", "dirs": ["events/tests/"], "pythonpath": "/app/events"},
{"name": "account", "dirs": ["account/tests/"], "pythonpath": "/app/account"},
{"name": "orders", "dirs": ["orders/tests/"], "pythonpath": "/app/orders"},
{"name": "federation", "dirs": ["federation/tests/"],
"pythonpath": "/app/federation"},
{"name": "relations", "dirs": ["relations/tests/"],
"pythonpath": "/app/relations"},
{"name": "likes", "dirs": ["likes/tests/"], "pythonpath": "/app/likes"},
]
_SERVICE_ORDER = [g["name"] for g in _SERVICE_GROUPS]
_REPORT_PATH = "/tmp/test-report-{}.json"
def _parse_report(path: str) -> tuple[list[dict], dict]:
"""Parse a pytest-json-report file."""
rp = Path(path)
if not rp.exists():
return [], {}
try:
report = json.loads(rp.read_text())
except (json.JSONDecodeError, OSError):
return [], {}
summary = report.get("summary", {})
tests_raw = report.get("tests", [])
tests = []
for t in tests_raw:
tests.append({
"nodeid": t.get("nodeid", ""),
"outcome": t.get("outcome", "unknown"),
"duration": round(t.get("duration", 0), 4),
"longrepr": (t.get("call", {}) or {}).get("longrepr", ""),
})
return tests, summary
async def _run_group(group: dict) -> tuple[list[dict], dict, str]:
"""Run pytest for a single service group."""
existing = [d for d in group["dirs"] if Path(f"/app/{d}").is_dir()]
if not existing:
return [], {}, ""
report_file = _REPORT_PATH.format(group["name"])
cmd = [
"python3", "-m", "pytest",
*existing,
"--json-report",
f"--json-report-file={report_file}",
"-q",
"--tb=short",
]
env = {**os.environ}
if group["pythonpath"]:
env["PYTHONPATH"] = group["pythonpath"] + ":" + env.get("PYTHONPATH", "")
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
cwd="/app",
env=env,
)
stdout, _ = await proc.communicate()
stdout_str = (stdout or b"").decode("utf-8", errors="replace")
tests, summary = _parse_report(report_file)
return tests, summary, stdout_str
async def run_tests() -> dict:
"""Run pytest in subprocess, parse JSON report, store results."""
global _last_result, _running
if _running:
return {"status": "already_running"}
_running = True
started_at = time.time()
try:
tasks = [_run_group(g) for g in _SERVICE_GROUPS]
results = await asyncio.gather(*tasks, return_exceptions=True)
all_tests: list[dict] = []
total_passed = total_failed = total_errors = total_skipped = total_count = 0
all_stdout: list[str] = []
for i, res in enumerate(results):
if isinstance(res, Exception):
log.error("Group %s failed: %s", _SERVICE_GROUPS[i]["name"], res)
continue
tests, summary, stdout_str = res
all_tests.extend(tests)
total_passed += summary.get("passed", 0)
total_failed += summary.get("failed", 0)
total_errors += summary.get("error", 0)
total_skipped += summary.get("skipped", 0)
total_count += summary.get("total", len(tests))
if stdout_str.strip():
all_stdout.append(stdout_str)
finished_at = time.time()
status = "failed" if total_failed > 0 or total_errors > 0 else "passed"
_last_result = {
"status": status,
"started_at": started_at,
"finished_at": finished_at,
"duration": round(finished_at - started_at, 2),
"passed": total_passed,
"failed": total_failed,
"errors": total_errors,
"skipped": total_skipped,
"total": total_count,
"tests": all_tests,
"stdout": "\n".join(all_stdout)[-5000:],
}
log.info(
"Test run complete: %s (%d passed, %d failed, %d errors, %.1fs)",
status, total_passed, total_failed, total_errors,
_last_result["duration"],
)
return _last_result
except Exception:
log.exception("Test run failed")
finished_at = time.time()
_last_result = {
"status": "error",
"started_at": started_at,
"finished_at": finished_at,
"duration": round(finished_at - started_at, 2),
"passed": 0,
"failed": 0,
"errors": 1,
"skipped": 0,
"total": 0,
"tests": [],
"stdout": "",
}
return _last_result
finally:
_running = False
def get_results() -> dict | None:
"""Return last run results."""
return _last_result
def get_test(nodeid: str) -> dict | None:
"""Look up a single test by nodeid."""
if not _last_result:
return None
for t in _last_result["tests"]:
if t["nodeid"] == nodeid:
return t
return None
def is_running() -> bool:
"""Check if tests are currently running."""
return _running
def _service_from_nodeid(nodeid: str) -> str:
"""Extract service name from a test nodeid."""
parts = nodeid.split("/")
return parts[0] if len(parts) >= 2 else "other"
def group_tests_by_service(tests: list[dict]) -> list[dict]:
"""Group tests into ordered sections by service."""
buckets: dict[str, list[dict]] = OrderedDict()
for svc in _SERVICE_ORDER:
buckets[svc] = []
for t in tests:
svc = _service_from_nodeid(t["nodeid"])
if svc not in buckets:
buckets[svc] = []
buckets[svc].append(t)
sections = []
for svc, svc_tests in buckets.items():
if not svc_tests:
continue
sections.append({
"service": svc,
"tests": svc_tests,
"total": len(svc_tests),
"passed": sum(1 for t in svc_tests if t["outcome"] == "passed"),
"failed": sum(1 for t in svc_tests if t["outcome"] == "failed"),
"errors": sum(1 for t in svc_tests if t["outcome"] == "error"),
"skipped": sum(1 for t in svc_tests if t["outcome"] == "skipped"),
})
return sections