""" Run Service - business logic for run management. Runs are content-addressed (run_id computed from inputs + recipe). Completed runs are stored in PostgreSQL, not Redis. In-progress runs are tracked via Celery task state. """ import hashlib import json import os from datetime import datetime, timezone from pathlib import Path from typing import Optional, List, Dict, Any, Tuple def compute_run_id(input_hashes: list, recipe: str, recipe_hash: str = None) -> str: """ Compute a deterministic run_id from inputs and recipe. The run_id is a SHA3-256 hash of: - Sorted input content hashes - Recipe identifier (recipe_hash if provided, else "effect:{recipe}") This makes runs content-addressable: same inputs + recipe = same run_id. """ # Handle both list and dict inputs if isinstance(input_hashes, dict): sorted_inputs = sorted(input_hashes.values()) else: sorted_inputs = sorted(input_hashes) data = { "inputs": sorted_inputs, "recipe": recipe_hash or f"effect:{recipe}", "version": "1", } json_str = json.dumps(data, sort_keys=True, separators=(",", ":")) return hashlib.sha3_256(json_str.encode()).hexdigest() def detect_media_type(cache_path: Path) -> str: """Detect if file is image, video, or audio based on magic bytes.""" try: with open(cache_path, "rb") as f: header = f.read(32) except Exception: return "unknown" # Video signatures if header[:4] == b'\x1a\x45\xdf\xa3': # WebM/MKV return "video" if len(header) > 8 and header[4:8] == b'ftyp': # MP4/MOV return "video" if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'AVI ': # AVI return "video" # Image signatures if header[:8] == b'\x89PNG\r\n\x1a\n': # PNG return "image" if header[:2] == b'\xff\xd8': # JPEG return "image" if header[:6] in (b'GIF87a', b'GIF89a'): # GIF return "image" if header[:4] == b'RIFF' and len(header) > 12 and header[8:12] == b'WEBP': # WebP return "image" return "unknown" class RunService: """ Service for managing recipe runs. Uses PostgreSQL for completed runs, Celery for task state. Redis is only used for task_id mapping (ephemeral). """ def __init__(self, database, redis, cache): self.db = database self.redis = redis # Only for task_id mapping self.cache = cache self.task_key_prefix = "artdag:task:" # run_id -> task_id mapping only self.cache_dir = Path(os.environ.get("CACHE_DIR", "/tmp/artdag-cache")) async def get_run(self, run_id: str) -> Optional[Dict[str, Any]]: """Get a run by ID. Checks database first, then Celery task state.""" # Check database for completed run cached = await self.db.get_run_cache(run_id) if cached: return { "run_id": run_id, "status": "completed", "recipe": cached.get("recipe"), "inputs": cached.get("inputs", []), "output_hash": cached.get("output_hash"), "ipfs_cid": cached.get("ipfs_cid"), "provenance_cid": cached.get("provenance_cid"), "actor_id": cached.get("actor_id"), "created_at": cached.get("created_at"), "completed_at": cached.get("created_at"), } # Check if there's a running task task_id = self.redis.get(f"{self.task_key_prefix}{run_id}") if task_id: if isinstance(task_id, bytes): task_id = task_id.decode() # Get task state from Celery from celery.result import AsyncResult from celery_app import app as celery_app result = AsyncResult(task_id, app=celery_app) status = result.status.lower() run_data = { "run_id": run_id, "status": status if status != "pending" else "pending", "celery_task_id": task_id, } # If task completed, get result if result.ready(): if result.successful(): run_data["status"] = "completed" task_result = result.result if isinstance(task_result, dict): run_data["output_hash"] = task_result.get("output_hash") else: run_data["status"] = "failed" run_data["error"] = str(result.result) return run_data return None async def list_runs(self, actor_id: str, offset: int = 0, limit: int = 20) -> list: """List runs for a user. Returns completed runs from database.""" # Get completed runs from database runs = await self.db.list_runs_by_actor(actor_id, offset=offset, limit=limit) # Also check for any pending tasks in Redis pending = [] cursor = 0 while True: cursor, keys = self.redis.scan( cursor=cursor, match=f"{self.task_key_prefix}*", count=100 ) for key in keys: run_id = key.decode().replace(self.task_key_prefix, "") if isinstance(key, bytes) else key.replace(self.task_key_prefix, "") # Check if this run belongs to the user and isn't already in results if not any(r.get("run_id") == run_id for r in runs): run = await self.get_run(run_id) if run and run.get("status") in ("pending", "running"): pending.append(run) if cursor == 0: break # Combine and sort all_runs = pending + runs all_runs.sort(key=lambda r: r.get("created_at", ""), reverse=True) return all_runs[offset:offset + limit] async def create_run( self, recipe: str, inputs: list, output_name: str = None, use_dag: bool = True, dag_json: str = None, actor_id: str = None, l2_server: str = None, ) -> Tuple[Optional[Dict[str, Any]], Optional[str]]: """ Create a new rendering run. Checks cache before executing. Returns (run_dict, error_message). """ import httpx try: from legacy_tasks import render_effect, execute_dag, build_effect_dag except ImportError as e: return None, f"Celery tasks not available: {e}" # Handle both list and dict inputs if isinstance(inputs, dict): input_list = list(inputs.values()) else: input_list = inputs # Compute content-addressable run_id run_id = compute_run_id(input_list, recipe) # Generate output name if not provided if not output_name: output_name = f"{recipe}-{run_id[:8]}" # Check database cache first (completed runs) cached_run = await self.db.get_run_cache(run_id) if cached_run: output_hash = cached_run.get("output_hash") if output_hash and self.cache.has_content(output_hash): return { "run_id": run_id, "status": "completed", "recipe": recipe, "inputs": input_list, "output_name": output_name, "output_hash": output_hash, "ipfs_cid": cached_run.get("ipfs_cid"), "provenance_cid": cached_run.get("provenance_cid"), "created_at": cached_run.get("created_at"), "completed_at": cached_run.get("created_at"), "actor_id": actor_id, }, None # Check L2 if not in local cache if l2_server: try: async with httpx.AsyncClient(timeout=10) as client: l2_resp = await client.get(f"{l2_server}/assets/by-run-id/{run_id}") if l2_resp.status_code == 200: l2_data = l2_resp.json() output_hash = l2_data.get("output_hash") ipfs_cid = l2_data.get("ipfs_cid") if output_hash and ipfs_cid: # Pull from IPFS to local cache try: import ipfs_client legacy_dir = self.cache_dir / "legacy" legacy_dir.mkdir(parents=True, exist_ok=True) recovery_path = legacy_dir / output_hash if ipfs_client.get_file(ipfs_cid, str(recovery_path)): # Save to database cache await self.db.save_run_cache( run_id=run_id, output_hash=output_hash, recipe=recipe, inputs=input_list, ipfs_cid=ipfs_cid, provenance_cid=l2_data.get("provenance_cid"), actor_id=actor_id, ) return { "run_id": run_id, "status": "completed", "recipe": recipe, "inputs": input_list, "output_hash": output_hash, "ipfs_cid": ipfs_cid, "provenance_cid": l2_data.get("provenance_cid"), "created_at": datetime.now(timezone.utc).isoformat(), "actor_id": actor_id, }, None except Exception: pass # IPFS recovery failed, continue to run except Exception: pass # L2 lookup failed, continue to run # Not cached - submit to Celery try: if use_dag or recipe == "dag": if dag_json: dag_data = dag_json else: dag = build_effect_dag(input_list, recipe) dag_data = dag.to_json() task = execute_dag.delay(dag_data, run_id) else: if len(input_list) != 1: return None, "Legacy mode only supports single-input recipes. Use use_dag=true for multi-input." task = render_effect.delay(input_list[0], recipe, output_name) # Store task_id mapping in Redis (ephemeral) self.redis.setex( f"{self.task_key_prefix}{run_id}", 3600 * 24, # 24 hour TTL task.id ) return { "run_id": run_id, "status": "running", "recipe": recipe, "inputs": input_list, "output_name": output_name, "celery_task_id": task.id, "created_at": datetime.now(timezone.utc).isoformat(), "actor_id": actor_id, }, None except Exception as e: return None, f"Failed to submit task: {e}" async def discard_run( self, run_id: str, actor_id: str, username: str, ) -> Tuple[bool, Optional[str]]: """ Discard (delete) a run record. Note: This removes the run record but not the output content. """ run = await self.get_run(run_id) if not run: return False, f"Run {run_id} not found" # Check ownership run_owner = run.get("actor_id") if run_owner and run_owner not in (username, actor_id): return False, "Access denied" # Remove task_id mapping from Redis self.redis.delete(f"{self.task_key_prefix}{run_id}") # Note: We don't delete from run_cache as that's a permanent record # of completed work. The content itself remains in cache. return True, None async def get_run_plan(self, run_id: str) -> Optional[Dict[str, Any]]: """Get execution plan for a run.""" plan_path = self.cache_dir / "plans" / f"{run_id}.json" if plan_path.exists(): with open(plan_path) as f: return json.load(f) return None async def get_run_artifacts(self, run_id: str) -> List[Dict[str, Any]]: """Get all artifacts (inputs + outputs) for a run.""" run = await self.get_run(run_id) if not run: return [] artifacts = [] def get_artifact_info(content_hash: str, role: str, name: str) -> Optional[Dict]: if self.cache.has_content(content_hash): path = self.cache.get_by_content_hash(content_hash) if path and path.exists(): return { "hash": content_hash, "size_bytes": path.stat().st_size, "media_type": detect_media_type(path), "role": role, "step_name": name, } return None # Add inputs inputs = run.get("inputs", []) if isinstance(inputs, dict): inputs = list(inputs.values()) for i, h in enumerate(inputs): info = get_artifact_info(h, "input", f"Input {i + 1}") if info: artifacts.append(info) # Add output if run.get("output_hash"): info = get_artifact_info(run["output_hash"], "output", "Output") if info: artifacts.append(info) return artifacts async def get_run_analysis(self, run_id: str) -> List[Dict[str, Any]]: """Get analysis data for each input in a run.""" run = await self.get_run(run_id) if not run: return [] analysis_dir = self.cache_dir / "analysis" results = [] inputs = run.get("inputs", []) if isinstance(inputs, dict): inputs = list(inputs.values()) for i, input_hash in enumerate(inputs): analysis_path = analysis_dir / f"{input_hash}.json" analysis_data = None if analysis_path.exists(): try: with open(analysis_path) as f: analysis_data = json.load(f) except (json.JSONDecodeError, IOError): pass results.append({ "input_hash": input_hash, "input_name": f"Input {i + 1}", "has_analysis": analysis_data is not None, "tempo": analysis_data.get("tempo") if analysis_data else None, "beat_times": analysis_data.get("beat_times", []) if analysis_data else [], "raw": analysis_data, }) return results def detect_media_type(self, path: Path) -> str: """Detect media type for a file path.""" return detect_media_type(path)