"""Pytest subprocess runner + in-memory result storage.""" from __future__ import annotations import asyncio import json import logging import os import time from collections import OrderedDict from pathlib import Path log = logging.getLogger(__name__) # In-memory state _last_result: dict | None = None _running: bool = False # Each service group runs in its own pytest subprocess with its own PYTHONPATH _SERVICE_GROUPS: list[dict] = [ {"name": "shared", "dirs": ["shared/tests/", "shared/sexp/tests/"], "pythonpath": None}, {"name": "blog", "dirs": ["blog/tests/"], "pythonpath": "/app/blog"}, {"name": "market", "dirs": ["market/tests/"], "pythonpath": "/app/market"}, {"name": "cart", "dirs": ["cart/tests/"], "pythonpath": "/app/cart"}, {"name": "events", "dirs": ["events/tests/"], "pythonpath": "/app/events"}, {"name": "account", "dirs": ["account/tests/"], "pythonpath": "/app/account"}, {"name": "orders", "dirs": ["orders/tests/"], "pythonpath": "/app/orders"}, {"name": "federation", "dirs": ["federation/tests/"], "pythonpath": "/app/federation"}, {"name": "relations", "dirs": ["relations/tests/"], "pythonpath": "/app/relations"}, {"name": "likes", "dirs": ["likes/tests/"], "pythonpath": "/app/likes"}, ] _SERVICE_ORDER = [g["name"] for g in _SERVICE_GROUPS] _REPORT_PATH = "/tmp/test-report-{}.json" def _parse_report(path: str) -> tuple[list[dict], dict]: """Parse a pytest-json-report file.""" rp = Path(path) if not rp.exists(): return [], {} try: report = json.loads(rp.read_text()) except (json.JSONDecodeError, OSError): return [], {} summary = report.get("summary", {}) tests_raw = report.get("tests", []) tests = [] for t in tests_raw: tests.append({ "nodeid": t.get("nodeid", ""), "outcome": t.get("outcome", "unknown"), "duration": round(t.get("duration", 0), 4), "longrepr": (t.get("call", {}) or {}).get("longrepr", ""), }) return tests, summary async def _run_group(group: dict) -> tuple[list[dict], dict, str]: """Run pytest for a single service group.""" existing = [d for d in group["dirs"] if Path(f"/app/{d}").is_dir()] if not existing: return [], {}, "" report_file = _REPORT_PATH.format(group["name"]) cmd = [ "python3", "-m", "pytest", *existing, "--json-report", f"--json-report-file={report_file}", "-q", "--tb=short", ] env = {**os.environ} if group["pythonpath"]: env["PYTHONPATH"] = group["pythonpath"] + ":" + env.get("PYTHONPATH", "") proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, cwd="/app", env=env, ) stdout, _ = await proc.communicate() stdout_str = (stdout or b"").decode("utf-8", errors="replace") tests, summary = _parse_report(report_file) return tests, summary, stdout_str async def run_tests() -> dict: """Run pytest in subprocess, parse JSON report, store results.""" global _last_result, _running if _running: return {"status": "already_running"} _running = True started_at = time.time() try: tasks = [_run_group(g) for g in _SERVICE_GROUPS] results = await asyncio.gather(*tasks, return_exceptions=True) all_tests: list[dict] = [] total_passed = total_failed = total_errors = total_skipped = total_count = 0 all_stdout: list[str] = [] for i, res in enumerate(results): if isinstance(res, Exception): log.error("Group %s failed: %s", _SERVICE_GROUPS[i]["name"], res) continue tests, summary, stdout_str = res all_tests.extend(tests) total_passed += summary.get("passed", 0) total_failed += summary.get("failed", 0) total_errors += summary.get("error", 0) total_skipped += summary.get("skipped", 0) total_count += summary.get("total", len(tests)) if stdout_str.strip(): all_stdout.append(stdout_str) finished_at = time.time() status = "failed" if total_failed > 0 or total_errors > 0 else "passed" _last_result = { "status": status, "started_at": started_at, "finished_at": finished_at, "duration": round(finished_at - started_at, 2), "passed": total_passed, "failed": total_failed, "errors": total_errors, "skipped": total_skipped, "total": total_count, "tests": all_tests, "stdout": "\n".join(all_stdout)[-5000:], } log.info( "Test run complete: %s (%d passed, %d failed, %d errors, %.1fs)", status, total_passed, total_failed, total_errors, _last_result["duration"], ) return _last_result except Exception: log.exception("Test run failed") finished_at = time.time() _last_result = { "status": "error", "started_at": started_at, "finished_at": finished_at, "duration": round(finished_at - started_at, 2), "passed": 0, "failed": 0, "errors": 1, "skipped": 0, "total": 0, "tests": [], "stdout": "", } return _last_result finally: _running = False def get_results() -> dict | None: """Return last run results.""" return _last_result def get_test(nodeid: str) -> dict | None: """Look up a single test by nodeid.""" if not _last_result: return None for t in _last_result["tests"]: if t["nodeid"] == nodeid: return t return None def is_running() -> bool: """Check if tests are currently running.""" return _running def _service_from_nodeid(nodeid: str) -> str: """Extract service name from a test nodeid.""" parts = nodeid.split("/") return parts[0] if len(parts) >= 2 else "other" def group_tests_by_service(tests: list[dict]) -> list[dict]: """Group tests into ordered sections by service.""" buckets: dict[str, list[dict]] = OrderedDict() for svc in _SERVICE_ORDER: buckets[svc] = [] for t in tests: svc = _service_from_nodeid(t["nodeid"]) if svc not in buckets: buckets[svc] = [] buckets[svc].append(t) sections = [] for svc, svc_tests in buckets.items(): if not svc_tests: continue sections.append({ "service": svc, "tests": svc_tests, "total": len(svc_tests), "passed": sum(1 for t in svc_tests if t["outcome"] == "passed"), "failed": sum(1 for t in svc_tests if t["outcome"] == "failed"), "errors": sum(1 for t in svc_tests if t["outcome"] == "error"), "skipped": sum(1 for t in svc_tests if t["outcome"] == "skipped"), }) return sections