diff --git a/app/services/run_service.py b/app/services/run_service.py index c45d785..c3b601d 100644 --- a/app/services/run_service.py +++ b/app/services/run_service.py @@ -13,10 +13,11 @@ class RunService: Handles run lifecycle, plan loading, and result aggregation. """ - def __init__(self, redis, cache): + def __init__(self, database, redis, cache): + self.db = database self.redis = redis self.cache = cache - self.run_prefix = "run:" + self.run_prefix = "artdag:run:" async def get_run(self, run_id: str) -> Optional[Dict[str, Any]]: """Get a run by ID.""" @@ -104,10 +105,123 @@ class RunService: async def load_plan(self, run_id: str) -> Optional[Dict[str, Any]]: """Load execution plan for a run.""" - from ..config import settings + from pathlib import Path + import os - plan_path = settings.plan_cache_dir / f"{run_id}.json" + # Try plan cache directory + cache_dir = Path(os.environ.get("CACHE_DIR", "/tmp/artdag-cache")) + plan_path = cache_dir / "plans" / f"{run_id}.json" if plan_path.exists(): with open(plan_path) as f: return json.load(f) + + # Also check for plan_id in run data + run = await self.get_run(run_id) + if run and run.get("plan_id"): + plan_path = cache_dir / "plans" / f"{run['plan_id']}.json" + if plan_path.exists(): + with open(plan_path) as f: + return json.load(f) + return None + + async def get_run_plan(self, run_id: str) -> Optional[Dict[str, Any]]: + """Get execution plan with step results merged in.""" + run = await self.get_run(run_id) + if not run: + return None + + plan = await self.load_plan(run_id) + + # If no stored plan, try to reconstruct from run data + if not plan and run.get("step_results"): + plan = { + "plan_id": run.get("plan_id"), + "recipe": run.get("recipe"), + "steps": [], + } + + if plan and run.get("step_results"): + # Merge step results into plan + step_results = run.get("step_results", {}) + for step in plan.get("steps", []): + step_id = step.get("id") or step.get("name") + if step_id and step_id in step_results: + result = step_results[step_id] + step["cache_id"] = result.get("cache_id") or result.get("output_cache_id") + step["status"] = result.get("status", "completed") + step["cached"] = result.get("cached", False) + step["outputs"] = result.get("outputs", []) + + return plan + + async def get_run_artifacts(self, run_id: str) -> List[Dict[str, Any]]: + """Get all artifacts (inputs + outputs) for a run.""" + run = await self.get_run(run_id) + if not run: + return [] + + artifacts = [] + + def get_artifact_info(content_hash: str, role: str, step_name: str) -> Optional[Dict]: + """Get artifact info using cache manager.""" + if self.cache.has_content(content_hash): + path = self.cache.get_path(content_hash) + if path and path.exists(): + # Detect media type + media_type = "file" + try: + with open(path, "rb") as f: + header = f.read(12) + if header[:4] == b'\x1a\x45\xdf\xa3' or header[4:8] == b'ftyp': + media_type = "video" + elif header[:8] == b'\x89PNG\r\n\x1a\n' or header[:2] == b'\xff\xd8': + media_type = "image" + elif header[:4] == b'RIFF' and header[8:12] == b'WAVE': + media_type = "audio" + except Exception: + pass + + return { + "hash": content_hash, + "size_bytes": path.stat().st_size, + "media_type": media_type, + "role": role, + "step_name": step_name, + } + return None + + # Add inputs + inputs = run.get("inputs", []) + if isinstance(inputs, dict): + inputs = list(inputs.values()) + for i, content_hash in enumerate(inputs): + info = get_artifact_info(content_hash, "input", f"Input {i + 1}") + if info: + artifacts.append(info) + + # Add step outputs from step_results + step_results = run.get("step_results", {}) + for step_id, result in step_results.items(): + cache_id = result.get("cache_id") or result.get("output_cache_id") + if cache_id: + info = get_artifact_info(cache_id, "step_output", step_id) + if info: + artifacts.append(info) + # Also add any additional outputs + for output in result.get("outputs", []): + if output and output != cache_id: + info = get_artifact_info(output, "step_output", step_id) + if info: + artifacts.append(info) + + # Add final output + if run.get("output_hash"): + output_hash = run["output_hash"] + # Avoid duplicates + if not any(a["hash"] == output_hash for a in artifacts): + info = get_artifact_info(output_hash, "output", "Final Output") + if info: + artifacts.append(info) + + return artifacts diff --git a/app/templates/runs/detail.html b/app/templates/runs/detail.html index fb12c11..8d78917 100644 --- a/app/templates/runs/detail.html +++ b/app/templates/runs/detail.html @@ -72,25 +72,44 @@