All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m11s
- Run tests for all 10 services via per-service pytest subprocesses - Group results by service with section headers - Clickable summary cards filter by outcome (passed/failed/errors/skipped) - Service filter nav using ~nav-link buttons in menu bar - Full menu integration: ~header-row + ~header-child + ~menu-row - Show logo image via cart-mini rendering - Mount full service directories in docker-compose for test access - Add 24 unit test files across 9 services Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
223 lines
7.0 KiB
Python
223 lines
7.0 KiB
Python
"""Pytest subprocess runner + in-memory result storage."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
from collections import OrderedDict
|
|
from pathlib import Path
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# In-memory state
|
|
_last_result: dict | None = None
|
|
_running: bool = False
|
|
|
|
# Each service group runs in its own pytest subprocess with its own PYTHONPATH
|
|
_SERVICE_GROUPS: list[dict] = [
|
|
{"name": "shared", "dirs": ["shared/tests/", "shared/sexp/tests/"],
|
|
"pythonpath": None},
|
|
{"name": "blog", "dirs": ["blog/tests/"], "pythonpath": "/app/blog"},
|
|
{"name": "market", "dirs": ["market/tests/"], "pythonpath": "/app/market"},
|
|
{"name": "cart", "dirs": ["cart/tests/"], "pythonpath": "/app/cart"},
|
|
{"name": "events", "dirs": ["events/tests/"], "pythonpath": "/app/events"},
|
|
{"name": "account", "dirs": ["account/tests/"], "pythonpath": "/app/account"},
|
|
{"name": "orders", "dirs": ["orders/tests/"], "pythonpath": "/app/orders"},
|
|
{"name": "federation", "dirs": ["federation/tests/"],
|
|
"pythonpath": "/app/federation"},
|
|
{"name": "relations", "dirs": ["relations/tests/"],
|
|
"pythonpath": "/app/relations"},
|
|
{"name": "likes", "dirs": ["likes/tests/"], "pythonpath": "/app/likes"},
|
|
]
|
|
|
|
_SERVICE_ORDER = [g["name"] for g in _SERVICE_GROUPS]
|
|
_REPORT_PATH = "/tmp/test-report-{}.json"
|
|
|
|
|
|
def _parse_report(path: str) -> tuple[list[dict], dict]:
|
|
"""Parse a pytest-json-report file."""
|
|
rp = Path(path)
|
|
if not rp.exists():
|
|
return [], {}
|
|
try:
|
|
report = json.loads(rp.read_text())
|
|
except (json.JSONDecodeError, OSError):
|
|
return [], {}
|
|
|
|
summary = report.get("summary", {})
|
|
tests_raw = report.get("tests", [])
|
|
|
|
tests = []
|
|
for t in tests_raw:
|
|
tests.append({
|
|
"nodeid": t.get("nodeid", ""),
|
|
"outcome": t.get("outcome", "unknown"),
|
|
"duration": round(t.get("duration", 0), 4),
|
|
"longrepr": (t.get("call", {}) or {}).get("longrepr", ""),
|
|
})
|
|
return tests, summary
|
|
|
|
|
|
async def _run_group(group: dict) -> tuple[list[dict], dict, str]:
|
|
"""Run pytest for a single service group."""
|
|
existing = [d for d in group["dirs"] if Path(f"/app/{d}").is_dir()]
|
|
if not existing:
|
|
return [], {}, ""
|
|
|
|
report_file = _REPORT_PATH.format(group["name"])
|
|
cmd = [
|
|
"python3", "-m", "pytest",
|
|
*existing,
|
|
"--json-report",
|
|
f"--json-report-file={report_file}",
|
|
"-q",
|
|
"--tb=short",
|
|
]
|
|
env = {**os.environ}
|
|
if group["pythonpath"]:
|
|
env["PYTHONPATH"] = group["pythonpath"] + ":" + env.get("PYTHONPATH", "")
|
|
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.STDOUT,
|
|
cwd="/app",
|
|
env=env,
|
|
)
|
|
stdout, _ = await proc.communicate()
|
|
stdout_str = (stdout or b"").decode("utf-8", errors="replace")
|
|
tests, summary = _parse_report(report_file)
|
|
return tests, summary, stdout_str
|
|
|
|
|
|
async def run_tests() -> dict:
|
|
"""Run pytest in subprocess, parse JSON report, store results."""
|
|
global _last_result, _running
|
|
|
|
if _running:
|
|
return {"status": "already_running"}
|
|
|
|
_running = True
|
|
started_at = time.time()
|
|
|
|
try:
|
|
tasks = [_run_group(g) for g in _SERVICE_GROUPS]
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
all_tests: list[dict] = []
|
|
total_passed = total_failed = total_errors = total_skipped = total_count = 0
|
|
all_stdout: list[str] = []
|
|
|
|
for i, res in enumerate(results):
|
|
if isinstance(res, Exception):
|
|
log.error("Group %s failed: %s", _SERVICE_GROUPS[i]["name"], res)
|
|
continue
|
|
tests, summary, stdout_str = res
|
|
all_tests.extend(tests)
|
|
total_passed += summary.get("passed", 0)
|
|
total_failed += summary.get("failed", 0)
|
|
total_errors += summary.get("error", 0)
|
|
total_skipped += summary.get("skipped", 0)
|
|
total_count += summary.get("total", len(tests))
|
|
if stdout_str.strip():
|
|
all_stdout.append(stdout_str)
|
|
|
|
finished_at = time.time()
|
|
status = "failed" if total_failed > 0 or total_errors > 0 else "passed"
|
|
|
|
_last_result = {
|
|
"status": status,
|
|
"started_at": started_at,
|
|
"finished_at": finished_at,
|
|
"duration": round(finished_at - started_at, 2),
|
|
"passed": total_passed,
|
|
"failed": total_failed,
|
|
"errors": total_errors,
|
|
"skipped": total_skipped,
|
|
"total": total_count,
|
|
"tests": all_tests,
|
|
"stdout": "\n".join(all_stdout)[-5000:],
|
|
}
|
|
|
|
log.info(
|
|
"Test run complete: %s (%d passed, %d failed, %d errors, %.1fs)",
|
|
status, total_passed, total_failed, total_errors,
|
|
_last_result["duration"],
|
|
)
|
|
return _last_result
|
|
|
|
except Exception:
|
|
log.exception("Test run failed")
|
|
finished_at = time.time()
|
|
_last_result = {
|
|
"status": "error",
|
|
"started_at": started_at,
|
|
"finished_at": finished_at,
|
|
"duration": round(finished_at - started_at, 2),
|
|
"passed": 0,
|
|
"failed": 0,
|
|
"errors": 1,
|
|
"skipped": 0,
|
|
"total": 0,
|
|
"tests": [],
|
|
"stdout": "",
|
|
}
|
|
return _last_result
|
|
finally:
|
|
_running = False
|
|
|
|
|
|
def get_results() -> dict | None:
|
|
"""Return last run results."""
|
|
return _last_result
|
|
|
|
|
|
def get_test(nodeid: str) -> dict | None:
|
|
"""Look up a single test by nodeid."""
|
|
if not _last_result:
|
|
return None
|
|
for t in _last_result["tests"]:
|
|
if t["nodeid"] == nodeid:
|
|
return t
|
|
return None
|
|
|
|
|
|
def is_running() -> bool:
|
|
"""Check if tests are currently running."""
|
|
return _running
|
|
|
|
|
|
def _service_from_nodeid(nodeid: str) -> str:
|
|
"""Extract service name from a test nodeid."""
|
|
parts = nodeid.split("/")
|
|
return parts[0] if len(parts) >= 2 else "other"
|
|
|
|
|
|
def group_tests_by_service(tests: list[dict]) -> list[dict]:
|
|
"""Group tests into ordered sections by service."""
|
|
buckets: dict[str, list[dict]] = OrderedDict()
|
|
for svc in _SERVICE_ORDER:
|
|
buckets[svc] = []
|
|
for t in tests:
|
|
svc = _service_from_nodeid(t["nodeid"])
|
|
if svc not in buckets:
|
|
buckets[svc] = []
|
|
buckets[svc].append(t)
|
|
|
|
sections = []
|
|
for svc, svc_tests in buckets.items():
|
|
if not svc_tests:
|
|
continue
|
|
sections.append({
|
|
"service": svc,
|
|
"tests": svc_tests,
|
|
"total": len(svc_tests),
|
|
"passed": sum(1 for t in svc_tests if t["outcome"] == "passed"),
|
|
"failed": sum(1 for t in svc_tests if t["outcome"] == "failed"),
|
|
"errors": sum(1 for t in svc_tests if t["outcome"] == "error"),
|
|
"skipped": sum(1 for t in svc_tests if t["outcome"] == "skipped"),
|
|
})
|
|
return sections
|