Public Quart microservice that runs pytest against shared/tests/ and shared/sexp/tests/, serving an HTMX-powered sexp-rendered dashboard with pass/fail/running status, auto-refresh polling, and re-run button. No database — results stored in memory. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
136 lines
3.6 KiB
Python
136 lines
3.6 KiB
Python
"""Pytest subprocess runner + in-memory result storage."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import time
|
|
from pathlib import Path
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# In-memory state
|
|
_last_result: dict | None = None
|
|
_running: bool = False
|
|
|
|
# Paths to test directories (relative to /app in Docker)
|
|
_TEST_DIRS = [
|
|
"shared/tests/",
|
|
"shared/sexp/tests/",
|
|
]
|
|
|
|
_REPORT_PATH = "/tmp/test-report.json"
|
|
|
|
|
|
async def run_tests() -> dict:
|
|
"""Run pytest in subprocess, parse JSON report, store results."""
|
|
global _last_result, _running
|
|
|
|
if _running:
|
|
return {"status": "already_running"}
|
|
|
|
_running = True
|
|
started_at = time.time()
|
|
|
|
try:
|
|
cmd = [
|
|
"python3", "-m", "pytest",
|
|
*_TEST_DIRS,
|
|
"--json-report",
|
|
f"--json-report-file={_REPORT_PATH}",
|
|
"-q",
|
|
"--tb=short",
|
|
]
|
|
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.STDOUT,
|
|
cwd="/app",
|
|
)
|
|
stdout, _ = await proc.communicate()
|
|
finished_at = time.time()
|
|
|
|
# Parse JSON report
|
|
report_path = Path(_REPORT_PATH)
|
|
if report_path.exists():
|
|
try:
|
|
report = json.loads(report_path.read_text())
|
|
except (json.JSONDecodeError, OSError):
|
|
report = {}
|
|
else:
|
|
report = {}
|
|
|
|
summary = report.get("summary", {})
|
|
tests_raw = report.get("tests", [])
|
|
|
|
tests = []
|
|
for t in tests_raw:
|
|
tests.append({
|
|
"nodeid": t.get("nodeid", ""),
|
|
"outcome": t.get("outcome", "unknown"),
|
|
"duration": round(t.get("duration", 0), 4),
|
|
"longrepr": (t.get("call", {}) or {}).get("longrepr", ""),
|
|
})
|
|
|
|
passed = summary.get("passed", 0)
|
|
failed = summary.get("failed", 0)
|
|
errors = summary.get("error", 0)
|
|
skipped = summary.get("skipped", 0)
|
|
total = summary.get("total", len(tests))
|
|
|
|
if failed > 0 or errors > 0:
|
|
status = "failed"
|
|
else:
|
|
status = "passed"
|
|
|
|
_last_result = {
|
|
"status": status,
|
|
"started_at": started_at,
|
|
"finished_at": finished_at,
|
|
"duration": round(finished_at - started_at, 2),
|
|
"passed": passed,
|
|
"failed": failed,
|
|
"errors": errors,
|
|
"skipped": skipped,
|
|
"total": total,
|
|
"tests": tests,
|
|
"stdout": (stdout or b"").decode("utf-8", errors="replace")[-5000:],
|
|
}
|
|
|
|
log.info(
|
|
"Test run complete: %s (%d passed, %d failed, %d errors, %.1fs)",
|
|
status, passed, failed, errors, _last_result["duration"],
|
|
)
|
|
return _last_result
|
|
|
|
except Exception:
|
|
log.exception("Test run failed")
|
|
finished_at = time.time()
|
|
_last_result = {
|
|
"status": "error",
|
|
"started_at": started_at,
|
|
"finished_at": finished_at,
|
|
"duration": round(finished_at - started_at, 2),
|
|
"passed": 0,
|
|
"failed": 0,
|
|
"errors": 1,
|
|
"skipped": 0,
|
|
"total": 0,
|
|
"tests": [],
|
|
"stdout": "",
|
|
}
|
|
return _last_result
|
|
finally:
|
|
_running = False
|
|
|
|
|
|
def get_results() -> dict | None:
|
|
"""Return last run results."""
|
|
return _last_result
|
|
|
|
|
|
def is_running() -> bool:
|
|
"""Check if tests are currently running."""
|
|
return _running
|