""" Run Service - business logic for run management. Runs are content-addressed (run_id computed from inputs + recipe). Completed runs are stored in PostgreSQL, not Redis. In-progress runs are tracked via Celery task state. """ import hashlib import json import os from datetime import datetime, timezone from pathlib import Path from typing import Optional, List, Dict, Any, Tuple, Union, TYPE_CHECKING if TYPE_CHECKING: import redis from cache_manager import L1CacheManager from database import Database from ..types import RunResult def compute_run_id(input_hashes: Union[List[str], Dict[str, str]], recipe: str, recipe_hash: Optional[str] = None) -> str: """ Compute a deterministic run_id from inputs and recipe. The run_id is a SHA3-256 hash of: - Sorted input content hashes - Recipe identifier (recipe_hash if provided, else "effect:{recipe}") This makes runs content-addressable: same inputs + recipe = same run_id. """ # Handle both list and dict inputs if isinstance(input_hashes, dict): sorted_inputs = sorted(input_hashes.values()) else: sorted_inputs = sorted(input_hashes) data = { "inputs": sorted_inputs, "recipe": recipe_hash or f"effect:{recipe}", "version": "1", } json_str = json.dumps(data, sort_keys=True, separators=(",", ":")) return hashlib.sha3_256(json_str.encode()).hexdigest() def detect_media_type(cache_path: Path) -> str: """Detect if file is image, video, or audio based on magic bytes.""" try: with open(cache_path, "rb") as f: header = f.read(32) except Exception: return "unknown" # Video signatures if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV return "video" if len(header) > 8 and header[4:8] == b'ftyp': # MP4/MOV/M4A # Check for audio-only M4A if len(header) > 11 and header[8:12] in (b'M4A ', b'm4a '): return "audio" return "video" if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'AVI ': # AVI return "video" # Image signatures if header[:8] == b'\x89PNG\r\n\x1a\n': # PNG return "image" if header[:2] == b'\xff\xd8': # JPEG return "image" if header[:6] in (b'GIF87a', b'GIF89a'): # GIF return "image" if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'WEBP': # WebP return "image" # Audio signatures if header[:3] == b'ID3' or header[:2] == b'\xff\xfb': # MP3 return "audio" if header[:4] == b'fLaC': # FLAC return "audio" if header[:4] == b'OggS': # Ogg (could be audio or video, assume audio) return "audio" if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'WAVE': # WAV return "audio" return "unknown" class RunService: """ Service for managing recipe runs. Uses PostgreSQL for completed runs, Celery for task state. Redis is only used for task_id mapping (ephemeral). """ def __init__(self, database: "Database", redis: "redis.Redis[bytes]", cache: "L1CacheManager") -> None: self.db = database self.redis = redis # Only for task_id mapping self.cache = cache self.task_key_prefix = "artdag:task:" # run_id -> task_id mapping only self.cache_dir = Path(os.environ.get("CACHE_DIR", "/tmp/artdag-cache")) def _ensure_inputs_list(self, inputs: Any) -> List[str]: """Ensure inputs is a list, parsing JSON string if needed.""" if inputs is None: return [] if isinstance(inputs, list): return inputs if isinstance(inputs, str): try: parsed = json.loads(inputs) if isinstance(parsed, list): return parsed except json.JSONDecodeError: pass return [] return [] async def get_run(self, run_id: str) -> Optional[RunResult]: """Get a run by ID. Checks database first, then Celery task state.""" # Check database for completed run cached = await self.db.get_run_cache(run_id) if cached: output_cid = cached.get("output_cid") # Only return as completed if we have an output # (runs with no output should be re-executed) if output_cid: # Also fetch recipe content from pending_runs for streaming runs recipe_sexp = None recipe_name = None pending = await self.db.get_pending_run(run_id) if pending: recipe_sexp = pending.get("dag_json") # Extract recipe name from streaming recipe content if recipe_sexp: import re name_match = re.search(r'\(stream\s+"([^"]+)"', recipe_sexp) if name_match: recipe_name = name_match.group(1) return { "run_id": run_id, "status": "completed", "recipe": cached.get("recipe"), "recipe_name": recipe_name, "inputs": self._ensure_inputs_list(cached.get("inputs")), "output_cid": output_cid, "ipfs_cid": cached.get("ipfs_cid"), "provenance_cid": cached.get("provenance_cid"), "plan_cid": cached.get("plan_cid"), "actor_id": cached.get("actor_id"), "created_at": cached.get("created_at"), "completed_at": cached.get("created_at"), "recipe_sexp": recipe_sexp, } # Check database for pending run pending = await self.db.get_pending_run(run_id) if pending: task_id = pending.get("celery_task_id") if task_id: # Check actual Celery task state from celery.result import AsyncResult from celery_app import app as celery_app result = AsyncResult(task_id, app=celery_app) status = result.status.lower() # Normalize status status_map = { "pending": "pending", "started": "running", "success": "completed", "failure": "failed", "retry": "running", "revoked": "failed", } normalized_status = status_map.get(status, status) run_data = { "run_id": run_id, "status": normalized_status, "celery_task_id": task_id, "actor_id": pending.get("actor_id"), "recipe": pending.get("recipe"), "inputs": self._ensure_inputs_list(pending.get("inputs")), "output_name": pending.get("output_name"), "created_at": pending.get("created_at"), "error": pending.get("error"), "recipe_sexp": pending.get("dag_json"), # Recipe content for streaming runs } # If task completed, get result if result.ready(): if result.successful(): task_result = result.result if isinstance(task_result, dict): # Check task's own success flag and output_cid task_success = task_result.get("success", True) output_cid = task_result.get("output_cid") if task_success and output_cid: run_data["status"] = "completed" run_data["output_cid"] = output_cid else: run_data["status"] = "failed" run_data["error"] = task_result.get("error", "No output produced") else: run_data["status"] = "completed" else: run_data["status"] = "failed" run_data["error"] = str(result.result) return run_data # No task_id but have pending record - return from DB return { "run_id": run_id, "status": pending.get("status", "pending"), "recipe": pending.get("recipe"), "inputs": self._ensure_inputs_list(pending.get("inputs")), "output_name": pending.get("output_name"), "actor_id": pending.get("actor_id"), "created_at": pending.get("created_at"), "error": pending.get("error"), "recipe_sexp": pending.get("dag_json"), # Recipe content for streaming runs } # Fallback: Check Redis for backwards compatibility task_data = self.redis.get(f"{self.task_key_prefix}{run_id}") if task_data: if isinstance(task_data, bytes): task_data = task_data.decode() # Parse task data (supports both old format string and new JSON format) try: parsed = json.loads(task_data) task_id = parsed.get("task_id") task_actor_id = parsed.get("actor_id") task_recipe = parsed.get("recipe") task_recipe_name = parsed.get("recipe_name") task_inputs = parsed.get("inputs") # Ensure inputs is a list (might be JSON string) if isinstance(task_inputs, str): try: task_inputs = json.loads(task_inputs) except json.JSONDecodeError: task_inputs = None task_output_name = parsed.get("output_name") task_created_at = parsed.get("created_at") except json.JSONDecodeError: # Old format: just the task_id string task_id = task_data task_actor_id = None task_recipe = None task_recipe_name = None task_inputs = None task_output_name = None task_created_at = None # Get task state from Celery from celery.result import AsyncResult from celery_app import app as celery_app result = AsyncResult(task_id, app=celery_app) status = result.status.lower() # Normalize Celery status names status_map = { "pending": "pending", "started": "running", "success": "completed", "failure": "failed", "retry": "running", "revoked": "failed", } normalized_status = status_map.get(status, status) run_data = { "run_id": run_id, "status": normalized_status, "celery_task_id": task_id, "actor_id": task_actor_id, "recipe": task_recipe, "recipe_name": task_recipe_name, "inputs": self._ensure_inputs_list(task_inputs), "output_name": task_output_name, "created_at": task_created_at, } # If task completed, get result if result.ready(): if result.successful(): task_result = result.result if isinstance(task_result, dict): # Check task's own success flag and output_cid task_success = task_result.get("success", True) output_cid = task_result.get("output_cid") if task_success and output_cid: run_data["status"] = "completed" run_data["output_cid"] = output_cid else: run_data["status"] = "failed" run_data["error"] = task_result.get("error", "No output produced") else: run_data["status"] = "completed" else: run_data["status"] = "failed" run_data["error"] = str(result.result) return run_data return None async def list_runs(self, actor_id: str, offset: int = 0, limit: int = 20) -> List[RunResult]: """List runs for a user. Returns completed and pending runs from database.""" # Get completed runs from database completed_runs = await self.db.list_runs_by_actor(actor_id, offset=0, limit=limit + 50) # Get pending runs from database pending_db = await self.db.list_pending_runs(actor_id=actor_id) # Convert pending runs to run format with live status check pending = [] for pr in pending_db: run_id = pr.get("run_id") # Skip if already in completed if any(r.get("run_id") == run_id for r in completed_runs): continue # Get live status - include pending, running, rendering, and failed runs run = await self.get_run(run_id) if run and run.get("status") in ("pending", "running", "rendering", "failed"): pending.append(run) # Combine and sort all_runs = pending + completed_runs all_runs.sort(key=lambda r: r.get("created_at", ""), reverse=True) return all_runs[offset:offset + limit] async def create_run( self, recipe: str, inputs: Union[List[str], Dict[str, str]], output_name: Optional[str] = None, use_dag: bool = True, dag_json: Optional[str] = None, actor_id: Optional[str] = None, l2_server: Optional[str] = None, recipe_name: Optional[str] = None, recipe_sexp: Optional[str] = None, ) -> Tuple[Optional[RunResult], Optional[str]]: """ Create a new rendering run. Checks cache before executing. If recipe_sexp is provided, uses the new S-expression execution path which generates code-addressed cache IDs before execution. Returns (run_dict, error_message). """ import httpx try: from legacy_tasks import render_effect, execute_dag, build_effect_dag, execute_recipe except ImportError as e: return None, f"Celery tasks not available: {e}" # Handle both list and dict inputs if isinstance(inputs, dict): input_list = list(inputs.values()) else: input_list = inputs # Compute content-addressable run_id run_id = compute_run_id(input_list, recipe) # Generate output name if not provided if not output_name: output_name = f"{recipe}-{run_id[:8]}" # Check database cache first (completed runs) cached_run = await self.db.get_run_cache(run_id) if cached_run: output_cid = cached_run.get("output_cid") if output_cid and self.cache.has_content(output_cid): return { "run_id": run_id, "status": "completed", "recipe": recipe, "inputs": input_list, "output_name": output_name, "output_cid": output_cid, "ipfs_cid": cached_run.get("ipfs_cid"), "provenance_cid": cached_run.get("provenance_cid"), "created_at": cached_run.get("created_at"), "completed_at": cached_run.get("created_at"), "actor_id": actor_id, }, None # Check L2 if not in local cache if l2_server: try: async with httpx.AsyncClient(timeout=10) as client: l2_resp = await client.get(f"{l2_server}/assets/by-run-id/{run_id}") if l2_resp.status_code == 200: l2_data = l2_resp.json() output_cid = l2_data.get("output_cid") ipfs_cid = l2_data.get("ipfs_cid") if output_cid and ipfs_cid: # Pull from IPFS to local cache try: import ipfs_client legacy_dir = self.cache_dir / "legacy" legacy_dir.mkdir(parents=True, exist_ok=True) recovery_path = legacy_dir / output_cid if ipfs_client.get_file(ipfs_cid, str(recovery_path)): # Save to database cache await self.db.save_run_cache( run_id=run_id, output_cid=output_cid, recipe=recipe, inputs=input_list, ipfs_cid=ipfs_cid, provenance_cid=l2_data.get("provenance_cid"), actor_id=actor_id, ) return { "run_id": run_id, "status": "completed", "recipe": recipe, "inputs": input_list, "output_cid": output_cid, "ipfs_cid": ipfs_cid, "provenance_cid": l2_data.get("provenance_cid"), "created_at": datetime.now(timezone.utc).isoformat(), "actor_id": actor_id, }, None except Exception: pass # IPFS recovery failed, continue to run except Exception: pass # L2 lookup failed, continue to run # Not cached - submit to Celery try: # Prefer S-expression execution path (code-addressed cache IDs) if recipe_sexp: # Convert inputs to dict if needed if isinstance(inputs, dict): input_hashes = inputs else: # Legacy list format - use positional names input_hashes = {f"input_{i}": cid for i, cid in enumerate(input_list)} task = execute_recipe.delay(recipe_sexp, input_hashes, run_id) elif use_dag or recipe == "dag": if dag_json: dag_data = dag_json else: dag = build_effect_dag(input_list, recipe) dag_data = dag.to_json() task = execute_dag.delay(dag_data, run_id) else: if len(input_list) != 1: return None, "Legacy mode only supports single-input recipes. Use use_dag=true for multi-input." task = render_effect.delay(input_list[0], recipe, output_name) # Store pending run in database for durability try: await self.db.create_pending_run( run_id=run_id, celery_task_id=task.id, recipe=recipe, inputs=input_list, actor_id=actor_id, dag_json=dag_json, output_name=output_name, ) except Exception as e: import logging logging.getLogger(__name__).error(f"Failed to save pending run: {e}") # Also store in Redis for backwards compatibility (shorter TTL) task_data = json.dumps({ "task_id": task.id, "actor_id": actor_id, "recipe": recipe, "recipe_name": recipe_name, "inputs": input_list, "output_name": output_name, "created_at": datetime.now(timezone.utc).isoformat(), }) self.redis.setex( f"{self.task_key_prefix}{run_id}", 3600 * 4, # 4 hour TTL (database is primary now) task_data ) return { "run_id": run_id, "status": "running", "recipe": recipe, "recipe_name": recipe_name, "inputs": input_list, "output_name": output_name, "celery_task_id": task.id, "created_at": datetime.now(timezone.utc).isoformat(), "actor_id": actor_id, }, None except Exception as e: return None, f"Failed to submit task: {e}" async def discard_run( self, run_id: str, actor_id: str, username: str, ) -> Tuple[bool, Optional[str]]: """ Discard (delete) a run record and clean up outputs/intermediates. Outputs and intermediates are only deleted if not used by other runs. """ import logging logger = logging.getLogger(__name__) run = await self.get_run(run_id) if not run: return False, f"Run {run_id} not found" # Check ownership run_owner = run.get("actor_id") if run_owner and run_owner not in (username, actor_id): return False, "Access denied" # Clean up activity outputs/intermediates (only if orphaned) # The activity_id is the same as run_id try: success, msg = self.cache.discard_activity_outputs_only(run_id) if success: logger.info(f"Cleaned up run {run_id}: {msg}") else: # Activity might not exist (old runs), that's OK logger.debug(f"No activity cleanup for {run_id}: {msg}") except Exception as e: logger.warning(f"Failed to cleanup activity for {run_id}: {e}") # Remove task_id mapping from Redis self.redis.delete(f"{self.task_key_prefix}{run_id}") # Remove from run_cache database table try: await self.db.delete_run_cache(run_id) except Exception as e: logger.warning(f"Failed to delete run_cache for {run_id}: {e}") # Remove pending run if exists try: await self.db.delete_pending_run(run_id) except Exception: pass return True, None def _dag_to_steps(self, dag: Dict[str, Any]) -> Dict[str, Any]: """Convert DAG nodes dict format to steps list format. DAG format: {"nodes": {"id": {...}}, "output_id": "..."} Steps format: {"steps": [{"id": "...", "type": "...", ...}], "output_id": "..."} """ if "steps" in dag: # Already in steps format return dag if "nodes" not in dag: return dag nodes = dag.get("nodes", {}) steps = [] # Sort by topological order (sources first, then by input dependencies) def get_level(node_id: str, visited: set = None) -> int: if visited is None: visited = set() if node_id in visited: return 0 visited.add(node_id) node = nodes.get(node_id, {}) inputs = node.get("inputs", []) if not inputs: return 0 return 1 + max(get_level(inp, visited) for inp in inputs) sorted_ids = sorted(nodes.keys(), key=lambda nid: (get_level(nid), nid)) for node_id in sorted_ids: node = nodes[node_id] steps.append({ "id": node_id, "step_id": node_id, "type": node.get("node_type", "EFFECT"), "config": node.get("config", {}), "inputs": node.get("inputs", []), "name": node.get("name"), "cache_id": node_id, # In code-addressed system, node_id IS the cache_id }) return { "steps": steps, "output_id": dag.get("output_id"), "metadata": dag.get("metadata", {}), "format": "json", } def _sexp_to_steps(self, sexp_content: str) -> Dict[str, Any]: """Convert S-expression plan to steps list format for UI. Parses the S-expression plan format: (plan :id :recipe :recipe-hash (inputs (input_name hash) ...) (step step_id :cache-id :level (node-type :key val ...)) ... :output ) Returns steps list compatible with UI visualization. """ try: from artdag.sexp import parse, Symbol, Keyword except ImportError: return {"sexp": sexp_content, "steps": [], "format": "sexp"} try: parsed = parse(sexp_content) except Exception: return {"sexp": sexp_content, "steps": [], "format": "sexp"} if not isinstance(parsed, list) or not parsed: return {"sexp": sexp_content, "steps": [], "format": "sexp"} steps = [] output_step_id = None plan_id = None recipe_name = None # Parse plan structure i = 0 while i < len(parsed): item = parsed[i] if isinstance(item, Keyword): key = item.name if i + 1 < len(parsed): value = parsed[i + 1] if key == "id": plan_id = value elif key == "recipe": recipe_name = value elif key == "output": output_step_id = value i += 2 continue if isinstance(item, list) and item: first = item[0] if isinstance(first, Symbol) and first.name == "step": # Parse step: (step step_id :cache-id :level (node-expr)) step_id = item[1] if len(item) > 1 else None cache_id = None level = 0 node_type = "EFFECT" config = {} inputs = [] j = 2 while j < len(item): part = item[j] if isinstance(part, Keyword): key = part.name if j + 1 < len(item): val = item[j + 1] if key == "cache-id": cache_id = val elif key == "level": level = val j += 2 continue elif isinstance(part, list) and part: # Node expression: (node-type :key val ...) if isinstance(part[0], Symbol): node_type = part[0].name.upper() k = 1 while k < len(part): if isinstance(part[k], Keyword): kname = part[k].name if k + 1 < len(part): kval = part[k + 1] if kname == "inputs": inputs = kval if isinstance(kval, list) else [kval] else: config[kname] = kval k += 2 continue k += 1 j += 1 steps.append({ "id": step_id, "step_id": step_id, "type": node_type, "config": config, "inputs": inputs, "cache_id": cache_id or step_id, "level": level, }) i += 1 return { "sexp": sexp_content, "steps": steps, "output_id": output_step_id, "plan_id": plan_id, "recipe": recipe_name, "format": "sexp", } async def get_run_plan(self, run_id: str) -> Optional[Dict[str, Any]]: """Get execution plan for a run. Plans are just node outputs - cached by content hash like everything else. For streaming runs, returns the recipe content as the plan. """ # Get run to find plan_cache_id run = await self.get_run(run_id) if not run: return None # For streaming runs, return the recipe as the plan if run.get("recipe") == "streaming" and run.get("recipe_sexp"): return { "steps": [{"id": "stream", "type": "STREAM", "name": "Streaming Recipe"}], "sexp": run.get("recipe_sexp"), "format": "sexp", } # Check plan_cid (stored in database) or plan_cache_id (legacy) plan_cid = run.get("plan_cid") or run.get("plan_cache_id") if plan_cid: # Get plan from cache by content hash plan_path = self.cache.get_by_cid(plan_cid) if plan_path and plan_path.exists(): with open(plan_path) as f: content = f.read() # Detect format if content.strip().startswith("("): # S-expression format - parse for UI return self._sexp_to_steps(content) else: plan = json.loads(content) return self._dag_to_steps(plan) # Fall back to legacy plans directory sexp_path = self.cache_dir / "plans" / f"{run_id}.sexp" if sexp_path.exists(): with open(sexp_path) as f: return self._sexp_to_steps(f.read()) json_path = self.cache_dir / "plans" / f"{run_id}.json" if json_path.exists(): with open(json_path) as f: plan = json.load(f) return self._dag_to_steps(plan) return None async def get_run_plan_sexp(self, run_id: str) -> Optional[str]: """Get execution plan as S-expression string.""" plan = await self.get_run_plan(run_id) if plan and plan.get("format") == "sexp": return plan.get("sexp") return None async def get_run_artifacts(self, run_id: str) -> List[Dict[str, Any]]: """Get all artifacts (inputs + outputs) for a run.""" run = await self.get_run(run_id) if not run: return [] artifacts = [] def get_artifact_info(cid: str, role: str, name: str) -> Optional[Dict]: if self.cache.has_content(cid): path = self.cache.get_by_cid(cid) if path and path.exists(): return { "cid": cid, "size_bytes": path.stat().st_size, "media_type": detect_media_type(path), "role": role, "step_name": name, } return None # Add inputs inputs = run.get("inputs", []) if isinstance(inputs, dict): inputs = list(inputs.values()) for i, h in enumerate(inputs): info = get_artifact_info(h, "input", f"Input {i + 1}") if info: artifacts.append(info) # Add output if run.get("output_cid"): info = get_artifact_info(run["output_cid"], "output", "Output") if info: artifacts.append(info) return artifacts async def get_run_analysis(self, run_id: str) -> List[Dict[str, Any]]: """Get analysis data for each input in a run.""" run = await self.get_run(run_id) if not run: return [] analysis_dir = self.cache_dir / "analysis" results = [] inputs = run.get("inputs", []) if isinstance(inputs, dict): inputs = list(inputs.values()) for i, input_hash in enumerate(inputs): analysis_path = analysis_dir / f"{input_hash}.json" analysis_data = None if analysis_path.exists(): try: with open(analysis_path) as f: analysis_data = json.load(f) except (json.JSONDecodeError, IOError): pass results.append({ "input_hash": input_hash, "input_name": f"Input {i + 1}", "has_analysis": analysis_data is not None, "tempo": analysis_data.get("tempo") if analysis_data else None, "beat_times": analysis_data.get("beat_times", []) if analysis_data else [], "raw": analysis_data, }) return results def detect_media_type(self, path: Path) -> str: """Detect media type for a file path.""" return detect_media_type(path) async def recover_pending_runs(self) -> Dict[str, Union[int, str]]: """ Recover pending runs after restart. Checks all pending runs in the database and: - Updates status for completed tasks - Re-queues orphaned tasks that can be retried - Marks as failed if unrecoverable Returns counts of recovered, completed, failed runs. """ from celery.result import AsyncResult from celery_app import app as celery_app try: from legacy_tasks import execute_dag except ImportError: return {"error": "Celery tasks not available"} stats = {"recovered": 0, "completed": 0, "failed": 0, "still_running": 0} # Get all pending/running runs from database pending_runs = await self.db.list_pending_runs() for run in pending_runs: run_id = run.get("run_id") task_id = run.get("celery_task_id") status = run.get("status") if not task_id: # No task ID - try to re-queue if we have dag_json dag_json = run.get("dag_json") if dag_json: try: new_task = execute_dag.delay(dag_json, run_id) await self.db.create_pending_run( run_id=run_id, celery_task_id=new_task.id, recipe=run.get("recipe", "unknown"), inputs=run.get("inputs", []), actor_id=run.get("actor_id"), dag_json=dag_json, output_name=run.get("output_name"), ) stats["recovered"] += 1 except Exception as e: await self.db.update_pending_run_status( run_id, "failed", f"Recovery failed: {e}" ) stats["failed"] += 1 else: await self.db.update_pending_run_status( run_id, "failed", "No DAG data for recovery" ) stats["failed"] += 1 continue # Check Celery task state result = AsyncResult(task_id, app=celery_app) celery_status = result.status.lower() if result.ready(): if result.successful(): # Task completed - move to run_cache task_result = result.result if isinstance(task_result, dict) and task_result.get("output_cid"): await self.db.save_run_cache( run_id=run_id, output_cid=task_result["output_cid"], recipe=run.get("recipe", "unknown"), inputs=run.get("inputs", []), ipfs_cid=task_result.get("ipfs_cid"), provenance_cid=task_result.get("provenance_cid"), actor_id=run.get("actor_id"), ) await self.db.complete_pending_run(run_id) stats["completed"] += 1 else: await self.db.update_pending_run_status( run_id, "failed", "Task completed but no output hash" ) stats["failed"] += 1 else: # Task failed await self.db.update_pending_run_status( run_id, "failed", str(result.result) ) stats["failed"] += 1 elif celery_status in ("pending", "started", "retry"): # Still running stats["still_running"] += 1 else: # Unknown state - try to re-queue if we have dag_json dag_json = run.get("dag_json") if dag_json: try: new_task = execute_dag.delay(dag_json, run_id) await self.db.create_pending_run( run_id=run_id, celery_task_id=new_task.id, recipe=run.get("recipe", "unknown"), inputs=run.get("inputs", []), actor_id=run.get("actor_id"), dag_json=dag_json, output_name=run.get("output_name"), ) stats["recovered"] += 1 except Exception as e: await self.db.update_pending_run_status( run_id, "failed", f"Recovery failed: {e}" ) stats["failed"] += 1 else: await self.db.update_pending_run_status( run_id, "failed", f"Task in unknown state: {celery_status}" ) stats["failed"] += 1 return stats