""" S-expression step execution task. Executes individual steps received as S-expressions. The S-expression is the canonical format - workers verify cache_ids by hashing the received S-expression. """ import json import logging import os import socket from pathlib import Path from typing import Dict, Optional from celery import current_task # Import from the Celery app import sys sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from celery_app import app from claiming import ( get_claimer, claim_task, complete_task, fail_task, ClaimStatus, ) from cache_manager import get_cache_manager # Import artdag S-expression modules try: from artdag.sexp import parse, Symbol, Keyword from artdag import NodeType from artdag.executor import get_executor except ImportError: parse = None Symbol = None Keyword = None NodeType = None get_executor = None logger = logging.getLogger(__name__) def get_worker_id() -> str: """Get a unique identifier for this worker.""" hostname = socket.gethostname() pid = os.getpid() return f"{hostname}:{pid}" def sexp_to_config(sexp) -> Dict: """ Convert parsed S-expression to config dict. Input: (effect :hash "abc123" :inputs ["step1"]) Output: {"node_type": "EFFECT", "hash": "abc123", "inputs": ["step1"]} """ if not isinstance(sexp, list) or len(sexp) < 1: raise ValueError(f"Invalid step S-expression: {sexp}") # First element is the node type head = sexp[0] if isinstance(head, Symbol): node_type = head.name.upper() else: node_type = str(head).upper() config = {"node_type": node_type} # Parse keyword arguments i = 1 while i < len(sexp): item = sexp[i] if isinstance(item, Keyword): if i + 1 < len(sexp): key = item.name.replace('-', '_') value = sexp[i + 1] config[key] = value i += 2 else: i += 1 else: # Positional argument i += 1 return config @app.task(bind=True, name='tasks.execute_step_sexp') def execute_step_sexp( self, step_sexp: str, step_id: str, cache_id: str, plan_id: str, input_cache_ids: Dict[str, str], ) -> dict: """ Execute a single step from an S-expression. The step is received as a serialized S-expression string. Workers can verify the cache_id by hashing the S-expression. Args: step_sexp: Serialized S-expression for the step step_id: Human-readable step identifier cache_id: Expected cache_id (SHA3-256 of step_sexp) plan_id: ID of the parent execution plan input_cache_ids: Mapping from input step_id to their cache_id Returns: Dict with execution result """ if parse is None: raise ImportError("artdag.sexp not available") worker_id = get_worker_id() task_id = self.request.id logger.info(f"Executing step {step_id} cache_id={cache_id[:16]}...") logger.debug(f"Step S-expression: {step_sexp[:100]}...") # Parse the S-expression try: parsed = parse(step_sexp) config = sexp_to_config(parsed) node_type = config.pop("node_type") except Exception as e: logger.error(f"Failed to parse step S-expression: {e}") return { "status": "failed", "step_id": step_id, "cache_id": cache_id, "error": f"Parse error: {e}", } # Get cache manager cache_mgr = get_cache_manager() # Check if already cached cached_path = cache_mgr.get_by_cid(cache_id) if cached_path: logger.info(f"Step {step_id} already cached at {cached_path}") claimer = get_claimer() claimer.mark_cached(cache_id, str(cached_path)) return { "status": "cached", "step_id": step_id, "cache_id": cache_id, "output_path": str(cached_path), } # Try to claim the task if not claim_task(cache_id, worker_id, task_id): logger.info(f"Step {step_id} claimed by another worker, waiting...") claimer = get_claimer() result = claimer.wait_for_completion(cache_id, timeout=600) if result and result.status == ClaimStatus.COMPLETED: return { "status": "completed_by_other", "step_id": step_id, "cache_id": cache_id, "output_path": result.output_path, } elif result and result.status == ClaimStatus.CACHED: return { "status": "cached", "step_id": step_id, "cache_id": cache_id, "output_path": result.output_path, } elif result and result.status == ClaimStatus.FAILED: return { "status": "failed", "step_id": step_id, "cache_id": cache_id, "error": result.error, } else: return { "status": "timeout", "step_id": step_id, "cache_id": cache_id, "error": "Timeout waiting for other worker", } # We have the claim, update to running claimer = get_claimer() claimer.update_status(cache_id, worker_id, ClaimStatus.RUNNING) try: # Handle SOURCE nodes if node_type == "SOURCE": # Support both :cid (new IPFS) and :hash (legacy) content_id = config.get("cid") or config.get("hash") if not content_id: raise ValueError("SOURCE step missing :cid or :hash") path = cache_mgr.get_by_cid(content_id) if not path: raise ValueError(f"SOURCE input not found: {content_id[:16]}...") output_path = str(path) complete_task(cache_id, worker_id, output_path) return { "status": "completed", "step_id": step_id, "cache_id": cache_id, "output_path": output_path, } # Handle EFFECT nodes if node_type == "EFFECT": effect_hash = config.get("cid") or config.get("hash") if not effect_hash: raise ValueError("EFFECT step missing :cid") # Get input paths inputs = config.get("inputs", []) input_paths = [] for inp in inputs: inp_cache_id = input_cache_ids.get(inp, inp) path = cache_mgr.get_by_cid(inp_cache_id) if not path: raise ValueError(f"Input not found: {inp_cache_id[:16]}...") input_paths.append(Path(path)) # Get executor try: executor = get_executor(NodeType.SOURCE) # Effects use SOURCE executor for now except: executor = None if executor is None: # Fallback: copy input to output (identity-like behavior) if input_paths: output_path = str(input_paths[0]) complete_task(cache_id, worker_id, output_path) return { "status": "completed", "step_id": step_id, "cache_id": cache_id, "output_path": output_path, } raise ValueError(f"No executor for EFFECT and no inputs") # Handle COMPOUND nodes (collapsed effect chains) if node_type == "COMPOUND": filter_chain = config.get("filter_chain", []) if not filter_chain: raise ValueError("COMPOUND step has empty filter_chain") # Get input paths inputs = config.get("inputs", []) input_paths = [] for inp in inputs: inp_cache_id = input_cache_ids.get(inp, inp) path = cache_mgr.get_by_cid(inp_cache_id) if not path: raise ValueError(f"Input not found: {inp_cache_id[:16]}...") input_paths.append(Path(path)) if not input_paths: raise ValueError("COMPOUND step has no inputs") # Build FFmpeg filter graph from chain filters = [] for i, filter_item in enumerate(filter_chain): filter_type = filter_item.get("type", "") filter_config = filter_item.get("config", {}) if filter_type == "EFFECT": # Effect - for now identity-like, can be extended effect_hash = filter_config.get("cid") or filter_config.get("hash") or filter_config.get("effect") # TODO: resolve effect to actual FFmpeg filter # For now, skip identity-like effects pass elif filter_type == "TRANSFORM": # Transform effects map to FFmpeg filters effects = filter_config.get("effects", {}) for eff_name, eff_value in effects.items(): if eff_name == "saturation": filters.append(f"eq=saturation={eff_value}") elif eff_name == "brightness": filters.append(f"eq=brightness={eff_value}") elif eff_name == "contrast": filters.append(f"eq=contrast={eff_value}") elif eff_name == "hue": filters.append(f"hue=h={eff_value}") elif filter_type == "RESIZE": width = filter_config.get("width", -1) height = filter_config.get("height", -1) mode = filter_config.get("mode", "fit") if mode == "fit": filters.append(f"scale={width}:{height}:force_original_aspect_ratio=decrease") elif mode == "fill": filters.append(f"scale={width}:{height}:force_original_aspect_ratio=increase,crop={width}:{height}") else: filters.append(f"scale={width}:{height}") elif filter_type == "SEGMENT": # Segment handled via -ss and -t, not filter pass # Create temp output import tempfile import subprocess output_dir = Path(tempfile.mkdtemp()) output_path = output_dir / f"compound_{cache_id[:16]}.mp4" # Build FFmpeg command input_path = input_paths[0] cmd = ["ffmpeg", "-y", "-i", str(input_path)] # Handle segment timing if present for filter_item in filter_chain: if filter_item.get("type") == "SEGMENT": seg_config = filter_item.get("config", {}) if "start" in seg_config: cmd.extend(["-ss", str(seg_config["start"])]) if "end" in seg_config: duration = seg_config["end"] - seg_config.get("start", 0) cmd.extend(["-t", str(duration)]) elif "duration" in seg_config: cmd.extend(["-t", str(seg_config["duration"])]) # Add filter graph if any if filters: cmd.extend(["-vf", ",".join(filters)]) # Output options cmd.extend(["-c:v", "libx264", "-c:a", "aac", str(output_path)]) logger.info(f"Running COMPOUND FFmpeg: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"FFmpeg failed: {result.stderr}") # Store in cache cached_file, ipfs_cid = cache_mgr.put( source_path=output_path, node_type="COMPOUND", node_id=cache_id, ) logger.info(f"COMPOUND step {step_id} completed with {len(filter_chain)} filters, IPFS CID: {ipfs_cid}") complete_task(cache_id, worker_id, str(cached_file.path)) # Cleanup temp if output_dir.exists(): import shutil shutil.rmtree(output_dir, ignore_errors=True) return { "status": "completed", "step_id": step_id, "cache_id": cache_id, "output_path": str(cached_file.path), "cid": cached_file.cid, "ipfs_cid": ipfs_cid, "filter_count": len(filter_chain), } # Get executor for other node types try: node_type_enum = NodeType[node_type] except (KeyError, TypeError): node_type_enum = node_type executor = get_executor(node_type_enum) if executor is None: raise ValueError(f"No executor for node type: {node_type}") # Resolve input paths inputs = config.pop("inputs", []) input_paths = [] for inp in inputs: inp_cache_id = input_cache_ids.get(inp, inp) path = cache_mgr.get_by_cid(inp_cache_id) if not path: raise ValueError(f"Input not found: {inp_cache_id[:16]}...") input_paths.append(Path(path)) # Create temp output import tempfile output_dir = Path(tempfile.mkdtemp()) output_path = output_dir / f"output_{cache_id[:16]}.mp4" # Execute logger.info(f"Running executor for {node_type} with {len(input_paths)} inputs") result_path = executor.execute(config, input_paths, output_path) # Store in cache cached_file, ipfs_cid = cache_mgr.put( source_path=result_path, node_type=node_type, node_id=cache_id, ) logger.info(f"Step {step_id} completed, IPFS CID: {ipfs_cid}") complete_task(cache_id, worker_id, str(cached_file.path)) # Cleanup temp if output_dir.exists(): import shutil shutil.rmtree(output_dir, ignore_errors=True) return { "status": "completed", "step_id": step_id, "cache_id": cache_id, "output_path": str(cached_file.path), "cid": cached_file.cid, "ipfs_cid": ipfs_cid, } except Exception as e: logger.error(f"Step {step_id} failed: {e}") fail_task(cache_id, worker_id, str(e)) return { "status": "failed", "step_id": step_id, "cache_id": cache_id, "error": str(e), } @app.task(bind=True, name='tasks.run_plan_sexp') def run_plan_sexp( self, plan_sexp: str, run_id: Optional[str] = None, ) -> dict: """ Execute a complete S-expression execution plan. Args: plan_sexp: Serialized S-expression plan run_id: Optional run ID for tracking Returns: Dict with execution results """ if parse is None: raise ImportError("artdag.sexp not available") from artdag.sexp.scheduler import PlanScheduler from artdag.sexp.planner import ExecutionPlanSexp, PlanStep logger.info(f"Running plan from S-expression (run_id={run_id})") # Parse the plan S-expression parsed = parse(plan_sexp) # Extract plan metadata and steps plan_id = None recipe_id = None recipe_hash = None inputs = {} steps = [] output_step_id = None i = 1 while i < len(parsed): item = parsed[i] if isinstance(item, Keyword): key = item.name if i + 1 < len(parsed): value = parsed[i + 1] if key == "id": plan_id = value elif key == "recipe": recipe_id = value elif key == "recipe-hash": recipe_hash = value elif key == "output": output_step_id = value i += 2 else: i += 1 elif isinstance(item, list) and len(item) > 0: head = item[0] if isinstance(head, Symbol): if head.name == "inputs": # Parse inputs block for j in range(1, len(item)): inp = item[j] if isinstance(inp, list) and len(inp) >= 2: name = inp[0].name if isinstance(inp[0], Symbol) else str(inp[0]) value = inp[1] inputs[name] = value elif head.name == "step": # Parse step step_id = item[1] if len(item) > 1 else None step_cache_id = None step_level = 0 step_node = None j = 2 while j < len(item): sub = item[j] if isinstance(sub, Keyword): if sub.name == "cache-id" and j + 1 < len(item): step_cache_id = item[j + 1] j += 2 elif sub.name == "level" and j + 1 < len(item): step_level = item[j + 1] j += 2 else: j += 1 elif isinstance(sub, list): step_node = sub j += 1 else: j += 1 if step_id and step_cache_id and step_node: # Convert step_node to config config = sexp_to_config(step_node) node_type = config.pop("node_type") step_inputs = config.pop("inputs", []) steps.append(PlanStep( step_id=step_id, node_type=node_type, config=config, inputs=step_inputs if isinstance(step_inputs, list) else [], cache_id=step_cache_id, level=step_level, )) i += 1 else: i += 1 # Create plan object plan = ExecutionPlanSexp( plan_id=plan_id or "unknown", recipe_id=recipe_id or "unknown", recipe_hash=recipe_hash or "", steps=steps, output_step_id=output_step_id or (steps[-1].step_id if steps else ""), inputs=inputs, ) # Create scheduler and run cache_mgr = get_cache_manager() scheduler = PlanScheduler( cache_manager=cache_mgr, celery_app=app, execute_task_name='tasks.execute_step_sexp', ) result = scheduler.schedule(plan) return { "status": result.status, "run_id": run_id, "plan_id": result.plan_id, "output_cache_id": result.output_cache_id, "output_path": result.output_path, "output_ipfs_cid": result.output_ipfs_cid, "steps_completed": result.steps_completed, "steps_cached": result.steps_cached, "steps_failed": result.steps_failed, "error": result.error, }