#!/usr/bin/env python3 """ Execute a pre-computed plan. Takes a plan file (S-expression) and executes primitive operations, storing artifacts by their content hash. Usage: analyze.py recipe.sexp > analysis.sexp plan.py recipe.sexp --analysis analysis.sexp --sexp > plan.sexp execute.py plan.sexp --analysis analysis.sexp """ import json import shutil import subprocess import sys import tempfile import importlib.util from pathlib import Path from typing import List # Add artdag to path sys.path.insert(0, str(Path(__file__).parent.parent / "artdag")) from artdag.sexp import parse from artdag.sexp.parser import Symbol, Keyword import time import os import threading import concurrent.futures from itertools import groupby # Limit concurrent raw-video pipelines to prevent memory exhaustion. # Each pipeline holds raw frames in memory (e.g. ~6MB per 1080p frame) # and spawns 2+ ffmpeg subprocesses. When the ThreadPoolExecutor runs # many EFFECT steps in parallel the combined load can freeze the system. # Default: 1 concurrent pipeline; override with ARTDAG_VIDEO_PIPELINES. _MAX_VIDEO_PIPELINES = int(os.environ.get("ARTDAG_VIDEO_PIPELINES", 1)) _video_pipeline_sem = threading.Semaphore(_MAX_VIDEO_PIPELINES) def set_max_video_pipelines(n: int): """Reconfigure the video-pipeline concurrency limit at runtime.""" global _video_pipeline_sem, _MAX_VIDEO_PIPELINES _MAX_VIDEO_PIPELINES = n _video_pipeline_sem = threading.Semaphore(n) def _video_pipeline_guard(fn): """Decorator: acquire the video-pipeline semaphore for the call's duration.""" from functools import wraps @wraps(fn) def _guarded(*args, **kwargs): _video_pipeline_sem.acquire() try: return fn(*args, **kwargs) finally: _video_pipeline_sem.release() return _guarded class ProgressBar: """Simple console progress bar with ETA.""" def __init__(self, total: int, desc: str = "", width: int = 30, update_interval: int = 30): self.total = total self.desc = desc self.width = width self.current = 0 self.start_time = time.time() self.update_interval = update_interval self._last_render = 0 def update(self, n: int = 1): self.current += n if self.current - self._last_render >= self.update_interval: self._render() self._last_render = self.current def set(self, n: int): self.current = n if self.current - self._last_render >= self.update_interval: self._render() self._last_render = self.current def _render(self): elapsed = time.time() - self.start_time if self.total == 0: # Unknown total - just show count line = f"\r {self.desc} {self.current} frames ({elapsed:.1f}s)" print(line, end="", file=sys.stderr, flush=True) return pct = self.current / self.total filled = int(self.width * pct) bar = "█" * filled + "░" * (self.width - filled) if self.current > 0 and pct < 1.0: eta = elapsed / pct - elapsed eta_str = f"ETA {eta:.0f}s" elif pct >= 1.0: eta_str = f"done in {elapsed:.1f}s" else: eta_str = "..." line = f"\r {self.desc} |{bar}| {self.current}/{self.total} ({pct*100:.0f}%) {eta_str}" print(line, end="", file=sys.stderr, flush=True) def finish(self): self._render() print(file=sys.stderr) # newline def check_cache(cache_dir: Path, cache_id: str, extensions: list) -> Path: """Check if a cached result exists for a step using IPNS/CID lookup. Args: cache_dir: Cache directory (used for unified cache) cache_id: IPNS address (computation hash, known before execution) extensions: List of possible file extensions (for legacy compatibility) Returns: Path to cached content file if found, None otherwise """ import cache as unified_cache # Look up IPNS → CID mapping cached_path = unified_cache.cache_exists(cache_id) if cached_path: return cached_path return None def save_to_cache(cache_dir: Path, cache_id: str, source_path: Path) -> Path: """Save a result to cache using IPNS/CID structure. Args: cache_dir: Cache directory (used for unified cache) cache_id: IPNS address (computation hash, known before execution) source_path: Path to the file to cache Returns: Path to the cached content file """ import cache as unified_cache # Store content by CID, create IPNS → CID ref cid, cached_path = unified_cache.cache_store_file(cache_id, source_path) return cached_path def extract_segment_with_loop(input_path: Path, output_path: Path, start: float, duration: float, encoding: dict) -> Path: """Extract a segment from a video, looping the source if needed to reach requested duration. Args: input_path: Source video file output_path: Output segment file start: Start time in seconds duration: Requested duration in seconds encoding: Encoding settings dict Returns: Path to the output segment """ enc = encoding fps = enc.get("fps", 30) # First attempt without looping cmd = ["ffmpeg", "-y", "-i", str(input_path)] if start: cmd.extend(["-ss", str(start)]) if duration: cmd.extend(["-t", str(duration)]) cmd.extend(["-r", str(fps), "-c:v", enc["codec"], "-preset", enc["preset"], "-crf", str(enc["crf"]), "-pix_fmt", "yuv420p", "-c:a", enc.get("audio_codec", "aac"), str(output_path)]) print(f" Extracting segment: start={start}, duration={duration}", file=sys.stderr) result = subprocess.run(cmd, capture_output=True, text=True) # Check if we need to loop needs_loop = False if result.returncode == 0 and duration: probe_cmd = ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", str(output_path)] probe_result = subprocess.run(probe_cmd, capture_output=True, text=True) if probe_result.returncode == 0: probe_data = json.loads(probe_result.stdout) output_duration = float(probe_data.get("format", {}).get("duration", 0)) if output_duration < duration - 1.0: # 1 second tolerance needs_loop = True print(f" Output {output_duration:.1f}s < requested {duration:.1f}s, will loop", file=sys.stderr) if needs_loop or result.returncode != 0: # Get source duration for wrapping probe_cmd = ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", str(input_path)] probe_result = subprocess.run(probe_cmd, capture_output=True, text=True) probe_data = json.loads(probe_result.stdout) src_duration = float(probe_data.get("format", {}).get("duration", 0)) if src_duration > 0: wrapped_start = start % src_duration if start else 0 print(f" Looping source ({src_duration:.1f}s) to reach {duration:.1f}s", file=sys.stderr) # Re-run with stream_loop cmd = ["ffmpeg", "-y", "-stream_loop", "-1", "-i", str(input_path)] cmd.extend(["-ss", str(wrapped_start)]) if duration: cmd.extend(["-t", str(duration)]) cmd.extend(["-r", str(fps), "-c:v", enc["codec"], "-preset", enc["preset"], "-crf", str(enc["crf"]), "-pix_fmt", "yuv420p", "-c:a", enc.get("audio_codec", "aac"), str(output_path)]) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f" FFmpeg loop error: {result.stderr[:200]}", file=sys.stderr) raise ValueError(f"FFmpeg segment extraction with loop failed") if not output_path.exists() or output_path.stat().st_size == 0: raise ValueError(f"Segment output invalid: {output_path}") print(f" Segment: {output_path.stat().st_size / 1024 / 1024:.1f}MB", file=sys.stderr) return output_path def clean_nil_symbols(obj): """Recursively convert Symbol('nil') to None and filter out None values from dicts.""" if isinstance(obj, Symbol): if obj.name == 'nil': return None return obj elif isinstance(obj, dict): result = {} for k, v in obj.items(): cleaned = clean_nil_symbols(v) # Skip None values (they were nil) if cleaned is not None: result[k] = cleaned return result elif isinstance(obj, list): return [clean_nil_symbols(v) for v in obj] return obj def parse_analysis_sexp(content: str) -> dict: """Parse analysis S-expression into dict.""" sexp = parse(content) if isinstance(sexp, list) and len(sexp) == 1: sexp = sexp[0] if not isinstance(sexp, list) or not sexp: raise ValueError("Invalid analysis S-expression") # Should be (analysis (name ...) (name ...) ...) if not isinstance(sexp[0], Symbol) or sexp[0].name != "analysis": raise ValueError("Expected (analysis ...) S-expression") result = {} for item in sexp[1:]: if isinstance(item, list) and item and isinstance(item[0], Symbol): name = item[0].name data = {} i = 1 while i < len(item): if isinstance(item[i], Keyword): key = item[i].name.replace("-", "_") i += 1 if i < len(item): data[key] = item[i] i += 1 else: i += 1 result[name] = data return result def sexp_to_plan(sexp) -> dict: """Convert a parsed S-expression plan to a dict.""" if not isinstance(sexp, list) or not sexp: raise ValueError("Invalid plan S-expression") # Skip 'plan' symbol and name plan = { "steps": [], "analysis": {}, } i = 0 if isinstance(sexp[0], Symbol) and sexp[0].name == "plan": i = 1 # Parse keywords and steps while i < len(sexp): item = sexp[i] if isinstance(item, Keyword): key = item.name.replace("-", "_") i += 1 if i < len(sexp): value = sexp[i] if key == "encoding" and isinstance(value, list): # Parse encoding dict from sexp plan["encoding"] = sexp_to_dict(value) elif key == "output": # Map :output to output_step_id plan["output_step_id"] = value elif key == "id": # Map :id to plan_id plan["plan_id"] = value elif key == "source_cid": # Map :source-cid to source_hash plan["source_hash"] = value else: plan[key] = value i += 1 elif isinstance(item, list) and item and isinstance(item[0], Symbol): if item[0].name == "step": # Parse step step = parse_step_sexp(item) plan["steps"].append(step) elif item[0].name == "analysis": # Parse analysis data plan["analysis"] = parse_analysis_sexp(item) elif item[0].name == "effects-registry": # Parse effects registry plan["effects_registry"] = parse_effects_registry_sexp(item) i += 1 else: i += 1 return plan def parse_analysis_sexp(sexp) -> dict: """Parse analysis S-expression: (analysis (bass :times [...] :values [...]) ...) Handles both inline data (:times [...] :values [...]) and cache-id refs (:cache-id "..."). """ analysis = {} for item in sexp[1:]: # Skip 'analysis' symbol if isinstance(item, list) and item and isinstance(item[0], Symbol): name = item[0].name data = {} j = 1 while j < len(item): if isinstance(item[j], Keyword): key = item[j].name j += 1 if j < len(item): data[key] = item[j] j += 1 else: j += 1 # Normalize: parser gives "cache-id", internal code expects "_cache_id" if "cache-id" in data: data["_cache_id"] = data.pop("cache-id") analysis[name] = data return analysis def parse_effects_registry_sexp(sexp) -> dict: """Parse effects-registry S-expression: (effects-registry (rotate :path "...") (blur :path "..."))""" registry = {} for item in sexp[1:]: # Skip 'effects-registry' symbol if isinstance(item, list) and item and isinstance(item[0], Symbol): name = item[0].name data = {} j = 1 while j < len(item): if isinstance(item[j], Keyword): key = item[j].name j += 1 if j < len(item): data[key] = item[j] j += 1 else: j += 1 registry[name] = data return registry def parse_bind_sexp(sexp) -> dict: """Parse a bind S-expression: (bind analysis-ref :range [min max] :offset 60 :transform sqrt)""" if not isinstance(sexp, list) or len(sexp) < 2: return None if not isinstance(sexp[0], Symbol) or sexp[0].name != "bind": return None bind = { "_bind": sexp[1] if isinstance(sexp[1], str) else sexp[1].name if isinstance(sexp[1], Symbol) else str(sexp[1]), "range_min": 0.0, "range_max": 1.0, "transform": None, "offset": 0.0, } i = 2 while i < len(sexp): if isinstance(sexp[i], Keyword): kw = sexp[i].name if kw == "range": i += 1 if i < len(sexp) and isinstance(sexp[i], list) and len(sexp[i]) >= 2: bind["range_min"] = float(sexp[i][0]) bind["range_max"] = float(sexp[i][1]) elif kw == "offset": i += 1 if i < len(sexp): bind["offset"] = float(sexp[i]) elif kw == "transform": i += 1 if i < len(sexp): t = sexp[i] if isinstance(t, Symbol): bind["transform"] = t.name elif isinstance(t, str): bind["transform"] = t i += 1 return bind def sexp_to_dict(sexp) -> dict: """Convert S-expression key-value pairs to dict.""" result = {} i = 0 while i < len(sexp): if isinstance(sexp[i], Keyword): key = sexp[i].name.replace("-", "_") i += 1 if i < len(sexp): value = sexp[i] # Check for bind expression and convert to dict format if isinstance(value, list) and value and isinstance(value[0], Symbol) and value[0].name == "bind": value = parse_bind_sexp(value) result[key] = value i += 1 else: i += 1 return result def parse_step_sexp(sexp) -> dict: """Parse a step S-expression. Supports two formats: 1. (step "id" :cache-id "..." :type "SOURCE" :path "..." :inputs [...]) 2. (step "id" :cache-id "..." :level 1 (source :path "..." :inputs [...])) """ step = { "inputs": [], "config": {}, } i = 1 # Skip 'step' symbol if i < len(sexp) and isinstance(sexp[i], str): step["step_id"] = sexp[i] i += 1 while i < len(sexp): item = sexp[i] if isinstance(item, Keyword): key = item.name.replace("-", "_") i += 1 if i < len(sexp): value = sexp[i] if key == "type": step["node_type"] = value if isinstance(value, str) else value.name elif key == "inputs": step["inputs"] = value if isinstance(value, list) else [value] elif key in ("level", "cache", "cache_id"): if key == "cache": key = "cache_id" step[key] = value else: # Check for bind expression if isinstance(value, list) and value and isinstance(value[0], Symbol) and value[0].name == "bind": value = parse_bind_sexp(value) # Config value step["config"][key] = value i += 1 elif isinstance(item, list) and item and isinstance(item[0], Symbol): # Nested node expression: (source :path "..." :inputs [...]) node_type = item[0].name.upper() step["node_type"] = node_type # Parse node config j = 1 while j < len(item): if isinstance(item[j], Keyword): key = item[j].name.replace("-", "_") j += 1 if j < len(item): value = item[j] if key == "inputs": step["inputs"] = value if isinstance(value, list) else [value] else: # Check for bind expression if isinstance(value, list) and value and isinstance(value[0], Symbol) and value[0].name == "bind": value = parse_bind_sexp(value) step["config"][key] = value j += 1 else: j += 1 i += 1 else: i += 1 return step def parse_plan_input(content: str) -> dict: """Parse plan from JSON or S-expression string.""" content = content.strip() if content.startswith("{"): return json.loads(content) elif content.startswith("("): sexp = parse(content) return sexp_to_plan(sexp[0] if isinstance(sexp, list) and len(sexp) == 1 else sexp) else: raise ValueError("Plan must be JSON (starting with '{') or S-expression (starting with '(')") # Default encoding settings DEFAULT_ENCODING = { "codec": "libx264", "preset": "fast", "crf": 18, "audio_codec": "aac", "fps": 30, } def get_encoding(recipe_encoding: dict, step_config: dict) -> dict: """Merge encoding settings: defaults < recipe < step overrides.""" encoding = {**DEFAULT_ENCODING} encoding.update(recipe_encoding) if "encoding" in step_config: encoding.update(step_config["encoding"]) return encoding class SexpEffectModule: """Wrapper for S-expression effects to provide process_frame interface.""" def __init__(self, effect_path: Path, effects_registry: dict = None, recipe_dir: Path = None, minimal_primitives: bool = False): from sexp_effects import get_interpreter self.interp = get_interpreter(minimal_primitives=minimal_primitives) # Load only explicitly declared effects from the recipe's registry # No auto-loading from directory - everything must be explicit if effects_registry: base_dir = recipe_dir or effect_path.parent.parent # Resolve relative paths for effect_name, effect_info in effects_registry.items(): effect_rel_path = effect_info.get("path") if effect_rel_path: full_path = (base_dir / effect_rel_path).resolve() if full_path.exists() and effect_name not in self.interp.effects: self.interp.load_effect(str(full_path)) # Load the specific effect if not already loaded self.interp.load_effect(str(effect_path)) self.effect_name = effect_path.stem def process_frame(self, frame, params, state): return self.interp.run_effect(self.effect_name, frame, params, state or {}) def load_effect(effect_path: Path, effects_registry: dict = None, recipe_dir: Path = None, minimal_primitives: bool = False): """Load an effect module from a local path (.py or .sexp).""" if effect_path.suffix == ".sexp": return SexpEffectModule(effect_path, effects_registry, recipe_dir, minimal_primitives) spec = importlib.util.spec_from_file_location("effect", effect_path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) return module def interpolate_analysis(times: list, values: list, t: float) -> float: """Interpolate analysis value at time t.""" if not times or not values: return 0.0 if t <= times[0]: return values[0] if t >= times[-1]: return values[-1] # Binary search for surrounding times lo, hi = 0, len(times) - 1 while lo < hi - 1: mid = (lo + hi) // 2 if times[mid] <= t: lo = mid else: hi = mid # Linear interpolation t0, t1 = times[lo], times[hi] v0, v1 = values[lo], values[hi] if t1 == t0: return v0 alpha = (t - t0) / (t1 - t0) return v0 + alpha * (v1 - v0) def apply_transform(value: float, transform: str) -> float: """Apply a transform function to a value (0-1 range).""" if transform is None: return value if transform == "sqrt": return value ** 0.5 elif transform == "pow2": return value ** 2 elif transform == "pow3": return value ** 3 elif transform == "log": # Logarithmic scale: log(1 + 9*x) / log(10) maps 0-1 to 0-1 with log curve import math return math.log(1 + 9 * value) / math.log(10) if value > 0 else 0 elif transform == "exp": # Exponential scale: (10^x - 1) / 9 maps 0-1 to 0-1 with exp curve return (10 ** value - 1) / 9 elif transform == "inv": return 1 - value else: return value def eval_expr(value, frame_time: float, frame_num: int, analysis_data: dict) -> float: """ Evaluate a runtime expression. Supports: - Literals (int, float) - Bindings: {"_binding": True, "source": ..., "feature": ...} - Math expressions: {"_expr": True, "op": "+", "args": [...]} - Time/frame: {"_expr": True, "op": "time"} or {"_expr": True, "op": "frame"} """ import math # Literal values if isinstance(value, (int, float)): return float(value) if not isinstance(value, dict): return 0.0 # Unknown type # Handle bindings if "_bind" in value or "_binding" in value: if "_bind" in value: ref = value["_bind"] range_min = value.get("range_min", 0.0) range_max = value.get("range_max", 1.0) else: ref = value.get("source", "") range_val = value.get("range", [0.0, 1.0]) range_min = range_val[0] if isinstance(range_val, list) else 0.0 range_max = range_val[1] if isinstance(range_val, list) and len(range_val) > 1 else 1.0 transform = value.get("transform") bind_offset = value.get("offset", 0.0) track = analysis_data.get(ref, {}) times = track.get("times", []) values = track.get("values", []) lookup_time = frame_time + bind_offset raw = interpolate_analysis(times, values, lookup_time) transformed = apply_transform(raw, transform) return range_min + transformed * (range_max - range_min) # Handle expressions if "_expr" in value: op = value.get("op") args = value.get("args", []) # Special ops without args if op == "time": return frame_time if op == "frame": return float(frame_num) # Lazy-evaluated ops (don't evaluate all branches) if op == "if": cond = eval_expr(args[0], frame_time, frame_num, analysis_data) if args else 0.0 if cond: return eval_expr(args[1], frame_time, frame_num, analysis_data) if len(args) > 1 else 0.0 return eval_expr(args[2], frame_time, frame_num, analysis_data) if len(args) > 2 else 0.0 # Evaluate arguments recursively evaluated = [eval_expr(arg, frame_time, frame_num, analysis_data) for arg in args] # Comparison operations if op == "<" and len(evaluated) >= 2: return 1.0 if evaluated[0] < evaluated[1] else 0.0 if op == ">" and len(evaluated) >= 2: return 1.0 if evaluated[0] > evaluated[1] else 0.0 if op == "<=" and len(evaluated) >= 2: return 1.0 if evaluated[0] <= evaluated[1] else 0.0 if op == ">=" and len(evaluated) >= 2: return 1.0 if evaluated[0] >= evaluated[1] else 0.0 if op == "=" and len(evaluated) >= 2: return 1.0 if evaluated[0] == evaluated[1] else 0.0 # Math operations if op == "+" and len(evaluated) >= 2: return evaluated[0] + evaluated[1] if op == "-" and len(evaluated) >= 2: return evaluated[0] - evaluated[1] if op == "*" and len(evaluated) >= 2: return evaluated[0] * evaluated[1] if op == "/" and len(evaluated) >= 2: return evaluated[0] / evaluated[1] if evaluated[1] != 0 else 0.0 if op == "mod" and len(evaluated) >= 2: return evaluated[0] % evaluated[1] if evaluated[1] != 0 else 0.0 if op == "min" and len(evaluated) >= 2: return min(evaluated[0], evaluated[1]) if op == "max" and len(evaluated) >= 2: return max(evaluated[0], evaluated[1]) if op == "abs" and len(evaluated) >= 1: return abs(evaluated[0]) if op == "sin" and len(evaluated) >= 1: return math.sin(evaluated[0]) if op == "cos" and len(evaluated) >= 1: return math.cos(evaluated[0]) if op == "floor" and len(evaluated) >= 1: return float(math.floor(evaluated[0])) if op == "ceil" and len(evaluated) >= 1: return float(math.ceil(evaluated[0])) return 0.0 # Fallback def eval_scan_expr(value, rng, variables): """ Evaluate a scan expression with seeded RNG and variable bindings. Args: value: Compiled expression (literal, dict with _expr, etc.) rng: random.Random instance (seeded, advances state per call) variables: Dict of variable bindings (acc, rem, hue, etc.) Returns: Evaluated value (number or dict) """ import math if isinstance(value, (int, float)): return value if isinstance(value, str): return value if not isinstance(value, dict) or "_expr" not in value: return value op = value.get("op") args = value.get("args", []) # Variable reference if op == "var": name = value.get("name", "") return variables.get(name, 0) # Dict constructor if op == "dict": keys = value.get("keys", []) vals = [eval_scan_expr(a, rng, variables) for a in args] return dict(zip(keys, vals)) # Random ops (advance RNG state) if op == "rand": return rng.random() if op == "rand-int": lo = int(eval_scan_expr(args[0], rng, variables)) hi = int(eval_scan_expr(args[1], rng, variables)) return rng.randint(lo, hi) if op == "rand-range": lo = float(eval_scan_expr(args[0], rng, variables)) hi = float(eval_scan_expr(args[1], rng, variables)) return rng.uniform(lo, hi) # Conditional (lazy - only evaluate taken branch) if op == "if": cond = eval_scan_expr(args[0], rng, variables) if args else 0 if cond: return eval_scan_expr(args[1], rng, variables) if len(args) > 1 else 0 return eval_scan_expr(args[2], rng, variables) if len(args) > 2 else 0 # Comparison ops if op in ("<", ">", "<=", ">=", "="): left = eval_scan_expr(args[0], rng, variables) if args else 0 right = eval_scan_expr(args[1], rng, variables) if len(args) > 1 else 0 if op == "<": return 1 if left < right else 0 if op == ">": return 1 if left > right else 0 if op == "<=": return 1 if left <= right else 0 if op == ">=": return 1 if left >= right else 0 if op == "=": return 1 if left == right else 0 # Eagerly evaluate remaining args evaluated = [eval_scan_expr(a, rng, variables) for a in args] # Arithmetic ops if op == "+" and len(evaluated) >= 2: return evaluated[0] + evaluated[1] if op == "-" and len(evaluated) >= 2: return evaluated[0] - evaluated[1] if op == "-" and len(evaluated) == 1: return -evaluated[0] if op == "*" and len(evaluated) >= 2: return evaluated[0] * evaluated[1] if op == "/" and len(evaluated) >= 2: return evaluated[0] / evaluated[1] if evaluated[1] != 0 else 0 if op == "mod" and len(evaluated) >= 2: return evaluated[0] % evaluated[1] if evaluated[1] != 0 else 0 if op == "min" and len(evaluated) >= 2: return min(evaluated[0], evaluated[1]) if op == "max" and len(evaluated) >= 2: return max(evaluated[0], evaluated[1]) if op == "abs" and len(evaluated) >= 1: return abs(evaluated[0]) if op == "sin" and len(evaluated) >= 1: return math.sin(evaluated[0]) if op == "cos" and len(evaluated) >= 1: return math.cos(evaluated[0]) if op == "floor" and len(evaluated) >= 1: return math.floor(evaluated[0]) if op == "ceil" and len(evaluated) >= 1: return math.ceil(evaluated[0]) if op == "nth" and len(evaluated) >= 2: collection = evaluated[0] index = int(evaluated[1]) if isinstance(collection, (list, tuple)) and 0 <= index < len(collection): return collection[index] return 0 return 0 # Fallback def _is_binding(value): """Check if a value is a binding/expression dict that needs per-frame resolution.""" return isinstance(value, dict) and ("_bind" in value or "_binding" in value or "_expr" in value) def _check_has_bindings(params: dict) -> bool: """Check if any param value (including inside lists) contains bindings.""" for v in params.values(): if _is_binding(v): return True if isinstance(v, list) and any(_is_binding(item) for item in v): return True return False def resolve_params(params: dict, frame_time: float, analysis_data: dict, frame_num: int = 0) -> dict: """Resolve any binding/expression params using analysis data at frame_time. Handles bindings at the top level and inside lists (e.g. blend_multi weights). """ resolved = {} for key, value in params.items(): if _is_binding(value): resolved[key] = eval_expr(value, frame_time, frame_num, analysis_data) elif isinstance(value, list): resolved[key] = [ eval_expr(item, frame_time, frame_num, analysis_data) if _is_binding(item) else item for item in value ] else: resolved[key] = value return resolved def resolve_scalar_binding(value, analysis_data: dict): """Resolve a scalar binding (like duration) from analysis data. For scalar features like 'duration', retrieves the value directly from analysis data. For time-varying features, this returns None (use resolve_params instead). Returns: Resolved value (float) if binding can be resolved to scalar, None otherwise. If value is not a binding, returns the value unchanged. """ if not isinstance(value, dict) or not ("_bind" in value or "_binding" in value): return value # Get source reference and feature if "_bind" in value: ref = value["_bind"] feature = "values" # old format defaults to values else: ref = value.get("source", "") feature = value.get("feature", "values") # Look up analysis track track = analysis_data.get(ref, {}) # For scalar features like 'duration', get directly if feature == "duration": duration = track.get("duration") if duration is not None: return float(duration) return None # For time-varying features, can't resolve to scalar # Return None to indicate this needs frame-by-frame resolution return None @_video_pipeline_guard def run_effect(effect_module, input_path: Path, output_path: Path, params: dict, encoding: dict, analysis_data: dict = None, time_offset: float = 0.0, max_duration: float = None): """Run an effect on a video file. Args: time_offset: Time offset in seconds for resolving bindings (e.g., segment start time in audio) max_duration: Maximum duration in seconds to process (stops after this many seconds of frames) """ import numpy as np # Clean nil Symbols from params params = clean_nil_symbols(params) # Get video info including duration probe_cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", "-show_format", str(input_path) ] probe_result = subprocess.run(probe_cmd, capture_output=True, text=True) probe_data = json.loads(probe_result.stdout) # Find video stream video_stream = None for stream in probe_data.get("streams", []): if stream.get("codec_type") == "video": video_stream = stream break if not video_stream: raise ValueError("No video stream found") in_width = int(video_stream["width"]) in_height = int(video_stream["height"]) # Get framerate fps_str = video_stream.get("r_frame_rate", "30/1") if "/" in fps_str: num, den = fps_str.split("/") fps = float(num) / float(den) else: fps = float(fps_str) # Get duration for progress bar duration = None if "format" in probe_data and "duration" in probe_data["format"]: duration = float(probe_data["format"]["duration"]) # Read frames with ffmpeg read_cmd = [ "ffmpeg", "-i", str(input_path), "-f", "rawvideo", "-pix_fmt", "rgb24", "-" ] read_proc = subprocess.Popen(read_cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) # Check if we have any bindings that need per-frame resolution has_bindings = _check_has_bindings(params) analysis_data = analysis_data or {} # Debug: print bindings and analysis info once if has_bindings: print(f" BINDINGS DEBUG: time_offset={time_offset:.2f}", file=sys.stderr) for k, v in params.items(): if isinstance(v, dict) and ("_bind" in v or "_binding" in v): ref = v.get("_bind") or v.get("source") bind_offset = float(v.get("offset", 0.0)) track = analysis_data.get(ref, {}) times = track.get("times", []) values = track.get("values", []) if times and values: # Find first non-zero value first_nonzero_idx = next((i for i, v in enumerate(values) if v > 0.01), -1) first_nonzero_time = times[first_nonzero_idx] if first_nonzero_idx >= 0 else -1 print(f" param {k}: ref='{ref}' bind_offset={bind_offset} time_range=[{min(times):.2f}, {max(times):.2f}]", file=sys.stderr) print(f" first_nonzero at t={first_nonzero_time:.2f} max_value={max(values):.4f}", file=sys.stderr) else: raise ValueError(f"Binding for param '{k}' references '{ref}' but no analysis data found. Available: {list(analysis_data.keys())}") # Process first frame to detect output dimensions in_frame_size = in_width * in_height * 3 frame_data = read_proc.stdout.read(in_frame_size) if len(frame_data) < in_frame_size: read_proc.stdout.close() read_proc.wait() raise ValueError("No frames in input video") frame = np.frombuffer(frame_data, dtype=np.uint8).reshape((in_height, in_width, 3)) # Resolve params for first frame if has_bindings: frame_params = resolve_params(params, time_offset, analysis_data, frame_num=0) else: frame_params = params # Apply single effect with mix bypass: mix=0 → passthrough, 0=1 → full def apply_effect(frame, frame_params, state): mix_val = float(frame_params.get('mix', 1.0)) if mix_val <= 0: return frame, state result, state = effect_module.process_frame(frame, frame_params, state) if mix_val < 1.0: result = np.clip( frame.astype(np.float32) * (1.0 - mix_val) + result.astype(np.float32) * mix_val, 0, 255 ).astype(np.uint8) return result, state state = None processed, state = apply_effect(frame, frame_params, state) # Get output dimensions from processed frame out_height, out_width = processed.shape[:2] if out_width != in_width or out_height != in_height: print(f" Effect resizes: {in_width}x{in_height} -> {out_width}x{out_height}", file=sys.stderr) # Now start write process with correct output dimensions write_cmd = [ "ffmpeg", "-y", "-f", "rawvideo", "-pix_fmt", "rgb24", "-s", f"{out_width}x{out_height}", "-r", str(encoding.get("fps", 30)), "-i", "-", "-i", str(input_path), # For audio "-map", "0:v", "-map", "1:a?", "-c:v", encoding["codec"], "-preset", encoding["preset"], "-crf", str(encoding["crf"]), "-pix_fmt", "yuv420p", "-c:a", encoding["audio_codec"], str(output_path) ] write_proc = subprocess.Popen(write_cmd, stdin=subprocess.PIPE, stderr=subprocess.DEVNULL) # Write first processed frame write_proc.stdin.write(processed.tobytes()) frame_count = 1 # Calculate max frames and total for progress bar max_frames = None total_frames = 0 if max_duration: max_frames = int(max_duration * fps) total_frames = max_frames elif duration: total_frames = int(duration * fps) # Create progress bar effect_name = getattr(effect_module, 'effect_name', 'effect') pbar = ProgressBar(total_frames, desc=effect_name) pbar.set(1) # First frame already processed # Process remaining frames while True: # Stop if we've reached the frame limit if max_frames and frame_count >= max_frames: break frame_data = read_proc.stdout.read(in_frame_size) if len(frame_data) < in_frame_size: break frame = np.frombuffer(frame_data, dtype=np.uint8).reshape((in_height, in_width, 3)) # Resolve params for this frame if has_bindings: frame_time = time_offset + frame_count / fps frame_params = resolve_params(params, frame_time, analysis_data, frame_num=frame_count) else: frame_params = params processed, state = apply_effect(frame, frame_params, state) write_proc.stdin.write(processed.tobytes()) frame_count += 1 pbar.set(frame_count) read_proc.stdout.close() write_proc.stdin.close() read_proc.wait() write_proc.wait() pbar.finish() @_video_pipeline_guard def run_multi_effect(effect_module, input_paths: List[Path], output_path: Path, params: dict, encoding: dict, analysis_data: dict = None, time_offset: float = 0.0, max_duration: float = None): """Run a multi-input effect on multiple video files. Args: time_offset: Time offset in seconds for resolving bindings (e.g., segment start time in audio) max_duration: Maximum duration in seconds to process (stops after this many seconds of frames) """ import numpy as np # Clean nil Symbols from params params = clean_nil_symbols(params) if len(input_paths) < 2: raise ValueError("Multi-input effect requires at least 2 inputs") # Get video info for each input (preserve original dimensions) input_infos = [] for input_path in input_paths: probe_cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", str(input_path) ] probe_result = subprocess.run(probe_cmd, capture_output=True, text=True) probe_data = json.loads(probe_result.stdout) video_stream = None for stream in probe_data.get("streams", []): if stream.get("codec_type") == "video": video_stream = stream break if not video_stream: raise ValueError(f"No video stream found in {input_path}") w = int(video_stream["width"]) h = int(video_stream["height"]) input_infos.append({"width": w, "height": h, "path": input_path}) print(f" Input: {input_path.name} ({w}x{h})", file=sys.stderr) # Get framerate and duration from first input probe_cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", "-show_format", str(input_paths[0]) ] probe_result = subprocess.run(probe_cmd, capture_output=True, text=True) probe_data = json.loads(probe_result.stdout) video_stream = next(s for s in probe_data.get("streams", []) if s.get("codec_type") == "video") fps_str = video_stream.get("r_frame_rate", "30/1") if "/" in fps_str: num, den = fps_str.split("/") fps = float(num) / float(den) else: fps = float(fps_str) # Get duration for progress bar duration = None if "format" in probe_data and "duration" in probe_data["format"]: duration = float(probe_data["format"]["duration"]) # Open read processes for all inputs - preserve original dimensions read_procs = [] for info in input_infos: read_cmd = [ "ffmpeg", "-i", str(info["path"]), "-f", "rawvideo", "-pix_fmt", "rgb24", "-" # Don't scale - keep original dimensions ] proc = subprocess.Popen(read_cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) read_procs.append(proc) analysis_data = analysis_data or {} state = None # Process first frame to detect output dimensions frames = [] for i, (proc, info) in enumerate(zip(read_procs, input_infos)): frame_size = info["width"] * info["height"] * 3 frame_data = proc.stdout.read(frame_size) if len(frame_data) < frame_size: # Cleanup for p in read_procs: p.stdout.close() p.wait() raise ValueError(f"No frames in input {i}") frame = np.frombuffer(frame_data, dtype=np.uint8).reshape((info["height"], info["width"], 3)) frames.append(frame) # Check if we have any bindings that need per-frame resolution has_bindings = _check_has_bindings(params) # Resolve params for first frame if has_bindings: frame_params = resolve_params(params, time_offset, analysis_data, frame_num=0) else: frame_params = params processed, state = effect_module.process_frame(frames, frame_params, state) out_height, out_width = processed.shape[:2] print(f" Output dimensions: {out_width}x{out_height}", file=sys.stderr) # Now start write process with correct output dimensions write_cmd = [ "ffmpeg", "-y", "-f", "rawvideo", "-pix_fmt", "rgb24", "-s", f"{out_width}x{out_height}", "-r", str(encoding.get("fps", 30)), "-i", "-", "-i", str(input_paths[0]), # For audio from first input "-map", "0:v", "-map", "1:a?", "-c:v", encoding["codec"], "-preset", encoding["preset"], "-crf", str(encoding["crf"]), "-pix_fmt", "yuv420p", "-c:a", encoding["audio_codec"], str(output_path) ] write_proc = subprocess.Popen(write_cmd, stdin=subprocess.PIPE, stderr=subprocess.DEVNULL) # Write first processed frame write_proc.stdin.write(processed.tobytes()) frame_count = 1 # Calculate max frames and total for progress bar max_frames = None total_frames = 0 if max_duration: max_frames = int(max_duration * fps) total_frames = max_frames elif duration: total_frames = int(duration * fps) # Create progress bar effect_name = getattr(effect_module, 'effect_name', 'blend') pbar = ProgressBar(total_frames, desc=effect_name) pbar.set(1) # First frame already processed # Process remaining frames while True: # Stop if we've reached the frame limit if max_frames and frame_count >= max_frames: break # Read frame from each input (each may have different dimensions) frames = [] all_valid = True for i, (proc, info) in enumerate(zip(read_procs, input_infos)): frame_size = info["width"] * info["height"] * 3 frame_data = proc.stdout.read(frame_size) if len(frame_data) < frame_size: all_valid = False break frame = np.frombuffer(frame_data, dtype=np.uint8).reshape((info["height"], info["width"], 3)) frames.append(frame) if not all_valid: break # Resolve params for this frame if has_bindings: frame_time = time_offset + frame_count / fps frame_params = resolve_params(params, frame_time, analysis_data, frame_num=frame_count) else: frame_params = params # Pass list of frames to effect processed, state = effect_module.process_frame(frames, frame_params, state) write_proc.stdin.write(processed.tobytes()) frame_count += 1 pbar.set(frame_count) # Cleanup for proc in read_procs: proc.stdout.close() proc.wait() write_proc.stdin.close() write_proc.wait() pbar.finish() @_video_pipeline_guard def run_effect_chain(effect_modules, input_path: Path, output_path: Path, params_list: list, encoding: dict, analysis_data=None, time_offset: float = 0.0, max_duration: float = None): """Run multiple effects as a single-pass fused chain: one decode, one encode, no intermediates. Args: effect_modules: List of effect modules (each has process_frame) input_path: Input video file output_path: Output video file params_list: List of param dicts, one per effect encoding: Encoding settings analysis_data: Optional analysis data for binding resolution time_offset: Time offset for resolving bindings max_duration: Maximum duration in seconds to process """ import numpy as np # Clean nil Symbols from each params dict params_list = [clean_nil_symbols(p) for p in params_list] # Probe input for dimensions/fps/duration probe_cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", "-show_format", str(input_path) ] probe_result = subprocess.run(probe_cmd, capture_output=True, text=True) probe_data = json.loads(probe_result.stdout) video_stream = None for stream in probe_data.get("streams", []): if stream.get("codec_type") == "video": video_stream = stream break if not video_stream: raise ValueError("No video stream found") in_width = int(video_stream["width"]) in_height = int(video_stream["height"]) fps_str = video_stream.get("r_frame_rate", "30/1") if "/" in fps_str: num, den = fps_str.split("/") fps = float(num) / float(den) else: fps = float(fps_str) duration = None if "format" in probe_data and "duration" in probe_data["format"]: duration = float(probe_data["format"]["duration"]) # Pre-compute per-effect binding flags analysis_data = analysis_data or {} bindings_flags = [] for params in params_list: has_b = any(isinstance(v, dict) and ("_bind" in v or "_binding" in v or "_expr" in v) for v in params.values()) bindings_flags.append(has_b) # Open single ffmpeg reader read_cmd = [ "ffmpeg", "-i", str(input_path), "-f", "rawvideo", "-pix_fmt", "rgb24", "-" ] read_proc = subprocess.Popen(read_cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) # Read first frame in_frame_size = in_width * in_height * 3 frame_data = read_proc.stdout.read(in_frame_size) if len(frame_data) < in_frame_size: read_proc.stdout.close() read_proc.wait() raise ValueError("No frames in input video") frame = np.frombuffer(frame_data, dtype=np.uint8).reshape((in_height, in_width, 3)) # Apply effect chain to a frame, respecting per-effect mix bypass. # mix=0 → skip (zero cost), 0=1 → full effect. def apply_chain(frame, states, frame_num, frame_time): processed = frame for idx, (module, params, has_b) in enumerate(zip(effect_modules, params_list, bindings_flags)): if has_b: fp = resolve_params(params, frame_time, analysis_data, frame_num=frame_num) else: fp = params mix_val = float(fp.get('mix', 1.0)) if mix_val <= 0: continue result, states[idx] = module.process_frame(processed, fp, states[idx]) if mix_val < 1.0: processed = np.clip( processed.astype(np.float32) * (1.0 - mix_val) + result.astype(np.float32) * mix_val, 0, 255 ).astype(np.uint8) else: processed = result return processed, states # Push first frame through all effects to discover final output dimensions states = [None] * len(effect_modules) processed, states = apply_chain(frame, states, 0, time_offset) out_height, out_width = processed.shape[:2] if out_width != in_width or out_height != in_height: print(f" Chain resizes: {in_width}x{in_height} -> {out_width}x{out_height}", file=sys.stderr) # Open single ffmpeg writer with final output dimensions write_cmd = [ "ffmpeg", "-y", "-f", "rawvideo", "-pix_fmt", "rgb24", "-s", f"{out_width}x{out_height}", "-r", str(encoding.get("fps", 30)), "-i", "-", "-i", str(input_path), # For audio "-map", "0:v", "-map", "1:a?", "-c:v", encoding["codec"], "-preset", encoding["preset"], "-crf", str(encoding["crf"]), "-pix_fmt", "yuv420p", "-c:a", encoding["audio_codec"], str(output_path) ] write_proc = subprocess.Popen(write_cmd, stdin=subprocess.PIPE, stderr=subprocess.DEVNULL) # Write first processed frame write_proc.stdin.write(processed.tobytes()) frame_count = 1 # Calculate max frames and total for progress bar max_frames = None total_frames = 0 if max_duration: max_frames = int(max_duration * fps) total_frames = max_frames elif duration: total_frames = int(duration * fps) effect_names = [getattr(m, 'effect_name', '?') for m in effect_modules] pbar = ProgressBar(total_frames, desc='+'.join(effect_names)) pbar.set(1) # Frame loop: read -> apply chain -> write while True: if max_frames and frame_count >= max_frames: break frame_data = read_proc.stdout.read(in_frame_size) if len(frame_data) < in_frame_size: break frame = np.frombuffer(frame_data, dtype=np.uint8).reshape((in_height, in_width, 3)) frame_time = time_offset + frame_count / fps processed, states = apply_chain(frame, states, frame_count, frame_time) write_proc.stdin.write(processed.tobytes()) frame_count += 1 pbar.set(frame_count) read_proc.stdout.close() write_proc.stdin.close() read_proc.wait() write_proc.wait() pbar.finish() def get_video_dimensions(file_path: Path) -> tuple: """Get video dimensions using ffprobe.""" cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", str(file_path) ] result = subprocess.run(cmd, capture_output=True, text=True) data = json.loads(result.stdout) for stream in data.get("streams", []): if stream.get("codec_type") == "video": return int(stream["width"]), int(stream["height"]) return None, None def normalize_video( input_path: Path, output_path: Path, target_width: int, target_height: int, resize_mode: str, priority: str = None, pad_color: str = "black", crop_gravity: str = "center", encoding: dict = None, ) -> Path: """ Normalize video to target dimensions. resize_mode: - stretch: force to exact size (distorts) - crop: scale to fill, crop overflow - fit: scale to fit, pad remainder - cover: scale to cover, crop minimally priority: width | height (which dimension to match exactly for fit/crop) """ enc = encoding or {} src_width, src_height = get_video_dimensions(input_path) if src_width is None: # Can't determine dimensions, just copy shutil.copy(input_path, output_path) return output_path # Already correct size? if src_width == target_width and src_height == target_height: shutil.copy(input_path, output_path) return output_path src_aspect = src_width / src_height target_aspect = target_width / target_height if resize_mode == "stretch": # Force exact size vf = f"scale={target_width}:{target_height}" elif resize_mode == "fit": # Scale to fit within bounds, pad remainder if priority == "width": # Match width exactly, pad height vf = f"scale={target_width}:-1,pad={target_width}:{target_height}:(ow-iw)/2:(oh-ih)/2:{pad_color}" elif priority == "height": # Match height exactly, pad width vf = f"scale=-1:{target_height},pad={target_width}:{target_height}:(ow-iw)/2:(oh-ih)/2:{pad_color}" else: # Auto: fit within bounds (may pad both) if src_aspect > target_aspect: # Source is wider, fit to width vf = f"scale={target_width}:-1,pad={target_width}:{target_height}:(ow-iw)/2:(oh-ih)/2:{pad_color}" else: # Source is taller, fit to height vf = f"scale=-1:{target_height},pad={target_width}:{target_height}:(ow-iw)/2:(oh-ih)/2:{pad_color}" elif resize_mode == "crop": # Scale to fill, crop overflow if priority == "width": # Match width, crop height vf = f"scale={target_width}:-1,crop={target_width}:{target_height}" elif priority == "height": # Match height, crop width vf = f"scale=-1:{target_height},crop={target_width}:{target_height}" else: # Auto: fill bounds, crop minimally if src_aspect > target_aspect: # Source is wider, match height and crop width vf = f"scale=-1:{target_height},crop={target_width}:{target_height}" else: # Source is taller, match width and crop height vf = f"scale={target_width}:-1,crop={target_width}:{target_height}" elif resize_mode == "cover": # Scale to cover target, crop to exact size if src_aspect > target_aspect: vf = f"scale=-1:{target_height},crop={target_width}:{target_height}" else: vf = f"scale={target_width}:-1,crop={target_width}:{target_height}" else: # Unknown mode, just copy shutil.copy(input_path, output_path) return output_path cmd = [ "ffmpeg", "-y", "-i", str(input_path), "-vf", vf, "-r", str(enc.get("fps", 30)), # Normalize framerate for concat compatibility "-c:v", enc.get("codec", "libx264"), "-preset", enc.get("preset", "fast"), "-crf", str(enc.get("crf", 18)), "-pix_fmt", "yuv420p", # Normalize pixel format for concat compatibility "-c:a", enc.get("audio_codec", "aac"), str(output_path) ] subprocess.run(cmd, check=True, capture_output=True) return output_path def tree_concat(files: list, work_dir: Path, prefix: str = "concat") -> Path: """Concatenate files using a binary tree approach.""" if len(files) == 1: return files[0] level = 0 current_files = list(files) print(f" Tree concat: {len(current_files)} files", file=sys.stderr) for i, f in enumerate(current_files): print(f" [{i}] {f}", file=sys.stderr) while len(current_files) > 1: next_files = [] pairs = (len(current_files) + 1) // 2 print(f" Level {level}: {len(current_files)} -> {pairs} pairs", file=sys.stderr) for i in range(0, len(current_files), 2): if i + 1 < len(current_files): concat_file = work_dir / f"{prefix}_L{level}_{i}.txt" output_file = work_dir / f"{prefix}_L{level}_{i}.mp4" with open(concat_file, "w") as f: f.write(f"file '{current_files[i]}'\n") f.write(f"file '{current_files[i+1]}'\n") cmd = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", str(concat_file), "-c", "copy", str(output_file)] subprocess.run(cmd, capture_output=True) next_files.append(output_file) else: next_files.append(current_files[i]) current_files = next_files level += 1 return current_files[0] def execute_plan(plan_path: Path = None, output_path: Path = None, recipe_dir: Path = None, plan_data: dict = None, external_analysis: dict = None, cache_dir: Path = None): """Execute a plan file (S-expression) or plan dict. Args: cache_dir: Directory to cache intermediate results. If provided, steps will check for cached outputs before recomputing. """ # Load plan from file, stdin, or dict if plan_data: plan = plan_data elif plan_path and str(plan_path) != "-": content = plan_path.read_text() plan = parse_plan_input(content) else: # Read from stdin content = sys.stdin.read() plan = parse_plan_input(content) print(f"Executing plan: {plan['plan_id'][:16]}...", file=sys.stderr) print(f"Source CID: {plan.get('source_hash', 'unknown')[:16]}...", file=sys.stderr) print(f"Steps: {len(plan['steps'])}", file=sys.stderr) recipe_encoding = plan.get("encoding", {}) # Merge plan's embedded analysis (includes synthetic tracks from composition # merging) with external analysis (fresh ANALYZE step outputs). # External analysis takes priority for tracks that exist in both. analysis_data = dict(plan.get("analysis", {})) if external_analysis: analysis_data.update(external_analysis) # Resolve cache-id refs from plan for name, data in list(analysis_data.items()): if isinstance(data, dict) and "_cache_id" in data: try: from cache import cache_get_json loaded = cache_get_json(data["_cache_id"]) if loaded: analysis_data[name] = loaded except ImportError: pass # standalone mode, no cache available if recipe_dir is None: recipe_dir = plan_path.parent if plan_path else Path(".") if analysis_data: print(f"Analysis tracks: {list(analysis_data.keys())}", file=sys.stderr) # Get effects registry for loading explicitly declared effects effects_registry = plan.get("effects_registry", {}) if effects_registry: print(f"Effects registry: {list(effects_registry.keys())}", file=sys.stderr) # Check for minimal primitives mode minimal_primitives = plan.get("minimal_primitives", False) if minimal_primitives: print(f"Minimal primitives mode: enabled", file=sys.stderr) # Execute steps results = {} # step_id -> output_path work_dir = Path(tempfile.mkdtemp(prefix="artdag_exec_")) # Sort steps by level first (respecting dependencies), then by type within each level # Type priority within same level: SOURCE/SEGMENT first, then ANALYZE, then EFFECT steps = plan["steps"] def step_sort_key(s): node_type = s.get("node_type") or "UNKNOWN" # Handle node_type being a Symbol if hasattr(node_type, 'name'): node_type = node_type.name level = s.get("level", 0) # Ensure level is an int (could be Symbol or None) if not isinstance(level, int): level = 0 # Type priority (tiebreaker within same level): SOURCE=0, SEGMENT=1, ANALYZE=2, others=3 if node_type == "SOURCE": type_priority = 0 elif node_type == "SEGMENT": type_priority = 1 elif node_type in ("ANALYZE", "SCAN"): type_priority = 2 else: type_priority = 3 # Sort by level FIRST, then type priority as tiebreaker return (level, type_priority) ordered_steps = sorted(steps, key=step_sort_key) try: def _run_step(step): step_id = step["step_id"] node_type = step["node_type"] config = step["config"] inputs = step.get("inputs", []) cache_id = step.get("cache_id", step_id) # IPNS address for caching print(f"\n[{step.get('level', 0)}] {node_type}: {step_id[:16]}...", file=sys.stderr) if node_type == "SOURCE": if "path" in config: src_path = (recipe_dir / config["path"]).resolve() if not src_path.exists(): raise FileNotFoundError(f"Source not found: {src_path}") results[step_id] = src_path print(f" -> {src_path}", file=sys.stderr) elif node_type == "SEGMENT": is_audio = str(results[inputs[0]]).lower().endswith( ('.mp3', '.wav', '.flac', '.aac', '.ogg', '.m4a')) input_path = results[inputs[0]] start = config.get("start", 0) duration = config.get("duration") end = config.get("end") # Resolve any bindings to scalar values start = resolve_scalar_binding(start, analysis_data) if start else 0 duration = resolve_scalar_binding(duration, analysis_data) if duration else None end = resolve_scalar_binding(end, analysis_data) if end else None # Check cache cached = check_cache(cache_dir, cache_id, ['.m4a'] if is_audio else ['.mp4']) if cached: results[step_id] = cached print(f" -> {cached} (cached)", file=sys.stderr) return print(f" Resolved: start={start}, duration={duration}", file=sys.stderr) enc = get_encoding(recipe_encoding, config) if is_audio: output_file = work_dir / f"segment_{step_id}.m4a" cmd = ["ffmpeg", "-y", "-i", str(input_path)] if start: cmd.extend(["-ss", str(start)]) if duration: cmd.extend(["-t", str(duration)]) cmd.extend(["-c:a", enc["audio_codec"], str(output_file)]) else: output_file = work_dir / f"segment_{step_id}.mp4" cmd = ["ffmpeg", "-y", "-i", str(input_path)] if start: cmd.extend(["-ss", str(start)]) if duration: cmd.extend(["-t", str(duration)]) elif end: cmd.extend(["-t", str(end - start)]) cmd.extend(["-r", str(enc["fps"]), # Normalize frame rate "-c:v", enc["codec"], "-preset", enc["preset"], "-crf", str(enc["crf"]), "-c:a", enc["audio_codec"], str(output_file)]) result = subprocess.run(cmd, capture_output=True, text=True) # Check if segment has video content AND correct duration, if not try with looping needs_loop = False if not is_audio and result.returncode == 0: probe_cmd = ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", "-show_format", str(output_file)] probe_result = subprocess.run(probe_cmd, capture_output=True, text=True) probe_data = json.loads(probe_result.stdout) has_video = any(s.get("codec_type") == "video" for s in probe_data.get("streams", [])) if not has_video: needs_loop = True # Also check if output duration matches requested duration elif duration: output_duration = float(probe_data.get("format", {}).get("duration", 0)) # If output is significantly shorter than requested, need to loop if output_duration < duration - 1.0: # 1 second tolerance needs_loop = True print(f" Output {output_duration:.1f}s < requested {duration:.1f}s, will loop", file=sys.stderr) if needs_loop or result.returncode != 0: # Get source duration and loop the input probe_cmd = ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", str(input_path)] probe_result = subprocess.run(probe_cmd, capture_output=True, text=True) probe_data = json.loads(probe_result.stdout) src_duration = float(probe_data.get("format", {}).get("duration", 0)) if src_duration > 0: # Wrap start time to source duration wrapped_start = start % src_duration if start else 0 seg_duration = duration if duration else (end - start if end else None) print(f" Wrapping segment: {start:.2f}s -> {wrapped_start:.2f}s (source={src_duration:.2f}s)", file=sys.stderr) # Use stream_loop for seamless looping if segment spans wrap point if wrapped_start + (seg_duration or 0) > src_duration: # Need to loop - use concat filter cmd = ["ffmpeg", "-y", "-stream_loop", "-1", "-i", str(input_path)] cmd.extend(["-ss", str(wrapped_start)]) if seg_duration: cmd.extend(["-t", str(seg_duration)]) cmd.extend(["-r", str(enc["fps"]), "-c:v", enc["codec"], "-preset", enc["preset"], "-crf", str(enc["crf"]), "-c:a", enc["audio_codec"], str(output_file)]) else: cmd = ["ffmpeg", "-y", "-i", str(input_path)] cmd.extend(["-ss", str(wrapped_start)]) if seg_duration: cmd.extend(["-t", str(seg_duration)]) cmd.extend(["-r", str(enc["fps"]), "-c:v", enc["codec"], "-preset", enc["preset"], "-crf", str(enc["crf"]), "-c:a", enc["audio_codec"], str(output_file)]) subprocess.run(cmd, check=True, capture_output=True) else: raise ValueError(f"Cannot determine source duration for looping") results[step_id] = save_to_cache(cache_dir, cache_id, output_file) or output_file print(f" -> {output_file}", file=sys.stderr) elif node_type == "EFFECT": # Check cache cached = check_cache(cache_dir, cache_id, ['.mp4']) if cached: results[step_id] = cached print(f" -> {cached} (cached)", file=sys.stderr) return effect_name = config.get("effect", "unknown") effect_path = config.get("effect_path") is_multi_input = config.get("multi_input", False) output_file = work_dir / f"effect_{step_id}.mp4" enc = get_encoding(recipe_encoding, config) if effect_path: full_path = recipe_dir / effect_path effect_module = load_effect(full_path, effects_registry, recipe_dir, minimal_primitives) params = {k: v for k, v in config.items() if k not in ("effect", "effect_path", "cid", "encoding", "multi_input")} print(f" Effect: {effect_name}", file=sys.stderr) # Get timing offset and duration for bindings effect_time_offset = config.get("start", config.get("segment_start", 0)) effect_duration = config.get("duration") if is_multi_input and len(inputs) > 1: # Multi-input effect (blend, layer, etc.) input_paths = [results[inp] for inp in inputs] run_multi_effect(effect_module, input_paths, output_file, params, enc, analysis_data, time_offset=effect_time_offset, max_duration=effect_duration) else: # Single-input effect input_path = results[inputs[0]] run_effect(effect_module, input_path, output_file, params, enc, analysis_data, time_offset=effect_time_offset, max_duration=effect_duration) else: input_path = results[inputs[0]] shutil.copy(input_path, output_file) results[step_id] = save_to_cache(cache_dir, cache_id, output_file) or output_file print(f" -> {output_file}", file=sys.stderr) elif node_type == "SEQUENCE": # Check cache first cached = check_cache(cache_dir, cache_id, ['.mp4']) if cached: results[step_id] = cached print(f" -> {cached} (cached)", file=sys.stderr) return if len(inputs) < 2: results[step_id] = results[inputs[0]] return input_files = [results[inp] for inp in inputs] enc = get_encoding(recipe_encoding, config) # Check for normalization config resize_mode = config.get("resize_mode") if resize_mode: # Determine target dimensions target_width = config.get("target_width") or enc.get("width") target_height = config.get("target_height") or enc.get("height") # If no explicit target, use first input's dimensions if not target_width or not target_height: first_w, first_h = get_video_dimensions(input_files[0]) target_width = target_width or first_w target_height = target_height or first_h if target_width and target_height: print(f" Normalizing {len(input_files)} inputs to {target_width}x{target_height} ({resize_mode})", file=sys.stderr) normalized_files = [] for i, inp_file in enumerate(input_files): norm_file = work_dir / f"norm_{step_id[:8]}_{i:04d}.mp4" normalize_video( inp_file, norm_file, target_width, target_height, resize_mode, priority=config.get("priority"), pad_color=config.get("pad_color", "black"), crop_gravity=config.get("crop_gravity", "center"), encoding=enc, ) normalized_files.append(norm_file) input_files = normalized_files # Use tree concat for efficiency output_file = tree_concat(input_files, work_dir, f"seq_{step_id[:8]}") results[step_id] = save_to_cache(cache_dir, cache_id, output_file) or output_file print(f" -> {output_file}", file=sys.stderr) elif node_type == "MUX": # Check cache cached = check_cache(cache_dir, cache_id, ['.mp4']) if cached: results[step_id] = cached print(f" -> {cached} (cached)", file=sys.stderr) return video_path = results[inputs[0]] audio_path = results[inputs[1]] enc = get_encoding(recipe_encoding, config) output_file = work_dir / f"mux_{step_id}.mp4" # Get duration for progress bar probe_cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", str(video_path) ] probe_result = subprocess.run(probe_cmd, capture_output=True, text=True) mux_duration = None if probe_result.returncode == 0: probe_data = json.loads(probe_result.stdout) mux_duration = float(probe_data.get("format", {}).get("duration", 0)) cmd = ["ffmpeg", "-y", "-i", str(video_path), "-i", str(audio_path), "-map", "0:v", "-map", "1:a", "-c:v", enc["codec"], "-preset", enc["preset"], "-crf", str(enc["crf"]), "-c:a", enc["audio_codec"], "-shortest", str(output_file)] import re mux_proc = subprocess.Popen(cmd, stderr=subprocess.PIPE, text=True) pbar = ProgressBar(int(mux_duration * 1000) if mux_duration else 0, desc="mux") for line in mux_proc.stderr: m = re.search(r"time=(\d+):(\d+):(\d+)\.(\d+)", line) if m: h, mi, s, cs = int(m.group(1)), int(m.group(2)), int(m.group(3)), int(m.group(4)) ms = h * 3600000 + mi * 60000 + s * 1000 + cs * 10 pbar.set(ms) pbar.finish() mux_proc.wait() if mux_proc.returncode != 0: raise RuntimeError("MUX ffmpeg failed") results[step_id] = save_to_cache(cache_dir, cache_id, output_file) or output_file print(f" -> {output_file}", file=sys.stderr) elif node_type == "ANALYZE": # Check cache first cached = check_cache(cache_dir, cache_id, ['.json']) if cached: with open(cached) as f: analysis_data[step_id] = json.load(f) results[step_id] = cached print(f" -> {cached} (cached)", file=sys.stderr) return output_file = work_dir / f"analysis_{step_id}.json" if "analysis_results" in config: # Analysis was done during planning with open(output_file, "w") as f: json.dump(config["analysis_results"], f) analysis_data[step_id] = config["analysis_results"] print(f" -> {output_file} (from plan)", file=sys.stderr) else: # Run analyzer now analyzer_path = config.get("analyzer_path") if analyzer_path: analyzer_path = (recipe_dir / analyzer_path).resolve() input_path = results[inputs[0]] # Load and run analyzer import importlib.util spec = importlib.util.spec_from_file_location("analyzer", analyzer_path) analyzer_module = importlib.util.module_from_spec(spec) spec.loader.exec_module(analyzer_module) # Run analysis analyzer_params = {k: v for k, v in config.items() if k not in ("analyzer", "analyzer_path", "cid")} analysis_result = analyzer_module.analyze(input_path, analyzer_params) # Save and store results with open(output_file, "w") as f: json.dump(analysis_result, f) analysis_data[step_id] = analysis_result print(f" -> {output_file} (ran analyzer: {len(analysis_result.get('times', []))} pts)", file=sys.stderr) else: print(f" -> no analyzer path!", file=sys.stderr) results[step_id] = save_to_cache(cache_dir, cache_id, output_file) or output_file elif node_type == "SCAN": # Check cache first cached = check_cache(cache_dir, cache_id, ['.json']) if cached: with open(cached) as f: scan_result = json.load(f) analysis_data[step_id] = scan_result results[step_id] = cached print(f" -> {cached} (cached)", file=sys.stderr) return import random # Load source analysis data source_id = inputs[0] source_data = analysis_data.get(source_id, {}) event_times = source_data.get("times", []) duration = source_data.get("duration", event_times[-1] if event_times else 0) seed = config.get("seed", 0) init_expr = config.get("init", 0) step_expr = config.get("step_expr") emit_expr = config.get("emit_expr") # Initialize RNG and accumulator rng = random.Random(seed) acc = eval_scan_expr(init_expr, rng, {}) # Process each event event_values = [] # (time, emitted_value) pairs for t in event_times: # Build variable bindings from accumulator if isinstance(acc, dict): variables = dict(acc) variables["acc"] = acc else: variables = {"acc": acc} # Step: update accumulator acc = eval_scan_expr(step_expr, rng, variables) # Rebind after step if isinstance(acc, dict): variables = dict(acc) variables["acc"] = acc else: variables = {"acc": acc} # Emit: produce output value emit_val = eval_scan_expr(emit_expr, rng, variables) if isinstance(emit_val, (int, float)): event_values.append((t, float(emit_val))) else: event_values.append((t, 0.0)) # Generate high-resolution time-series with step-held interpolation resolution = 100 # points per second hi_res_times = [] hi_res_values = [] current_val = 0.0 event_idx = 0 num_points = int(duration * resolution) + 1 for i in range(num_points): t = i / resolution # Advance to the latest event at or before time t while event_idx < len(event_values) and event_values[event_idx][0] <= t: current_val = event_values[event_idx][1] event_idx += 1 hi_res_times.append(round(t, 4)) hi_res_values.append(current_val) scan_result = { "times": hi_res_times, "values": hi_res_values, "duration": duration, } analysis_data[step_id] = scan_result # Save to cache output_file = work_dir / f"scan_{step_id}.json" with open(output_file, "w") as f: json.dump(scan_result, f) results[step_id] = save_to_cache(cache_dir, cache_id, output_file) or output_file print(f" SCAN: {len(event_times)} events -> {len(hi_res_times)} points ({duration:.1f}s)", file=sys.stderr) print(f" -> {output_file}", file=sys.stderr) elif node_type == "COMPOUND": # Check cache first cached = check_cache(cache_dir, cache_id, ['.mp4']) if cached: results[step_id] = cached print(f" -> {cached} (cached)", file=sys.stderr) return # Collapsed effect chains - compile to single FFmpeg command with sendcmd filter_chain_raw = config.get("filter_chain", []) if not filter_chain_raw: raise ValueError("COMPOUND step has empty filter_chain") # Get effects registry for this compound step (use different name # to avoid shadowing the outer effects_registry in nested function) step_effects_registry = config.get("effects_registry", {}) # Convert filter_chain items from S-expression lists to dicts # and clean nil Symbols from configs filter_chain = [] for item in filter_chain_raw: if isinstance(item, dict): # Clean nil Symbols from the config cleaned_item = clean_nil_symbols(item) filter_chain.append(cleaned_item) elif isinstance(item, list) and item: item_dict = sexp_to_dict(item) ftype = item_dict.get("type", "UNKNOWN") if isinstance(ftype, Symbol): ftype = ftype.name fconfig_raw = item_dict.get("config", {}) if isinstance(fconfig_raw, list): fconfig = sexp_to_dict(fconfig_raw) elif isinstance(fconfig_raw, dict): fconfig = fconfig_raw else: fconfig = {} # Clean nil Symbols from config fconfig = clean_nil_symbols(fconfig) filter_chain.append({"type": ftype, "config": fconfig}) else: filter_chain.append({"type": "UNKNOWN", "config": {}}) input_path = results[inputs[0]] # Debug: verify input exists and has content if not input_path.exists(): raise ValueError(f"COMPOUND input does not exist: {input_path}") if input_path.stat().st_size == 0: raise ValueError(f"COMPOUND input is empty: {input_path}") print(f" COMPOUND input: {input_path} ({input_path.stat().st_size} bytes)", file=sys.stderr) enc = get_encoding(recipe_encoding, config) output_file = work_dir / f"compound_{step_id}.mp4" # Extract segment timing and effects segment_start = 0 segment_duration = None effects = [] for filter_item in filter_chain: filter_type = filter_item.get("type", "") filter_config = filter_item.get("config", {}) if filter_type == "SEGMENT": segment_start = filter_config.get("start", 0) segment_duration = filter_config.get("duration") if not segment_duration and filter_config.get("end"): segment_duration = filter_config["end"] - segment_start elif filter_type == "EFFECT": effects.append(filter_config) # Try to compile effects to FFmpeg filters from artdag.sexp.ffmpeg_compiler import FFmpegCompiler, generate_sendcmd_filter compiler = FFmpegCompiler() # Check if any effect has bindings - these need Python path for per-frame resolution any_has_bindings = any(_check_has_bindings(e) for e in effects) # Check if all effects have FFmpeg mappings all_have_mappings = all( compiler.get_mapping(e.get("effect", "")) is not None for e in effects ) # Use FFmpeg only for static effects (no bindings) # Effects with bindings use Python path for proper per-frame binding resolution if all_have_mappings and effects and not any_has_bindings: # Compile to FFmpeg with sendcmd for dynamic params ffmpeg_filters, sendcmd_path = generate_sendcmd_filter( effects, analysis_data, segment_start, segment_duration or 1.0, ) # First extract segment with looping if needed ffmpeg_input = input_path if segment_start or segment_duration: seg_temp = work_dir / f"compound_{step_id}_seg_temp.mp4" extract_segment_with_loop(input_path, seg_temp, segment_start or 0, segment_duration, enc) ffmpeg_input = seg_temp # Build FFmpeg command (segment already extracted, just apply filters) cmd = ["ffmpeg", "-y", "-i", str(ffmpeg_input)] if ffmpeg_filters: cmd.extend(["-vf", ffmpeg_filters]) cmd.extend(["-r", str(enc.get("fps", 30)), "-c:v", enc["codec"], "-preset", enc["preset"], "-crf", str(enc["crf"]), "-pix_fmt", "yuv420p", "-c:a", enc["audio_codec"], str(output_file)]) effect_names = [e.get("effect", "?") for e in effects] print(f" COMPOUND (FFmpeg): {', '.join(effect_names)}", file=sys.stderr) print(f" filters: {ffmpeg_filters[:80]}{'...' if len(ffmpeg_filters) > 80 else ''}", file=sys.stderr) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f" FFmpeg error: {result.stderr[:200]}", file=sys.stderr) raise RuntimeError(f"FFmpeg failed: {result.stderr}") # Clean up sendcmd file if sendcmd_path and sendcmd_path.exists(): sendcmd_path.unlink() else: # Fall back to sequential processing for effects without FFmpeg mappings current_input = input_path # First handle segment (with looping if source is shorter than requested) for filter_item in filter_chain: if filter_item.get("type") == "SEGMENT": filter_config = filter_item.get("config", {}) start = filter_config.get("start", 0) or 0 duration = filter_config.get("duration") if start or duration: seg_output = work_dir / f"compound_{step_id}_seg.mp4" extract_segment_with_loop(current_input, seg_output, start, duration, enc) current_input = seg_output break # Load all effect modules and params for fused single-pass execution effect_modules = [] chain_params_list = [] for effect_config in effects: effect_name = effect_config.get("effect", "unknown") effect_path = effect_config.get("effect_path") if not effect_path: for effects_dir in ["effects", "sexp_effects/effects"]: for ext in [".py", ".sexp"]: candidate = recipe_dir / effects_dir / f"{effect_name}{ext}" if candidate.exists(): effect_path = str(candidate.relative_to(recipe_dir)) break if effect_path: break if not effect_path: raise ValueError(f"COMPOUND EFFECT '{effect_name}' has no effect_path or FFmpeg mapping") full_path = recipe_dir / effect_path effect_modules.append(load_effect(full_path, step_effects_registry or effects_registry, recipe_dir, minimal_primitives)) chain_params_list.append({k: v for k, v in effect_config.items() if k not in ("effect", "effect_path", "cid", "encoding", "type")}) effect_names = [e.get("effect", "?") for e in effects] print(f" COMPOUND (fused): {', '.join(effect_names)}", file=sys.stderr) run_effect_chain(effect_modules, current_input, output_file, chain_params_list, enc, analysis_data, time_offset=segment_start, max_duration=segment_duration) results[step_id] = save_to_cache(cache_dir, cache_id, output_file) or output_file print(f" -> {output_file}", file=sys.stderr) else: raise ValueError(f"Unknown node type: {node_type}") # Group steps by level for parallel execution. # Default to 4 workers to avoid overwhelming the system with # CPU-intensive effects (ascii_art, ripple, etc.) running in parallel. max_workers = int(os.environ.get("ARTDAG_WORKERS", 4)) level_groups = [] for k, g in groupby(ordered_steps, key=lambda s: s.get("level", 0)): level_groups.append((k, list(g))) for level_num, level_steps in level_groups: if len(level_steps) == 1: _run_step(level_steps[0]) else: types = [s.get("node_type", "?") for s in level_steps] types = [t.name if hasattr(t, 'name') else str(t) for t in types] type_counts = {} for t in types: type_counts[t] = type_counts.get(t, 0) + 1 type_summary = ", ".join(f"{v}x {k}" for k, v in type_counts.items()) print(f"\n >> Level {level_num}: {len(level_steps)} steps in parallel ({type_summary})", file=sys.stderr) with concurrent.futures.ThreadPoolExecutor(max_workers=min(len(level_steps), max_workers)) as pool: futures = [pool.submit(_run_step, s) for s in level_steps] for f in concurrent.futures.as_completed(futures): f.result() # re-raises exceptions from threads # Get final output final_output = results[plan["output_step_id"]] print(f"\n--- Output ---", file=sys.stderr) print(f"Final: {final_output}", file=sys.stderr) if output_path: # Handle stdout specially - remux to streamable format if str(output_path) in ("/dev/stdout", "-"): # MP4 isn't streamable, use matroska which is cmd = [ "ffmpeg", "-y", "-i", str(final_output), "-c", "copy", "-f", "matroska", "pipe:1" ] subprocess.run(cmd, stdout=sys.stdout.buffer, stderr=subprocess.DEVNULL) return output_path else: shutil.copy(final_output, output_path) print(f"Copied to: {output_path}", file=sys.stderr) # Print path to stdout for piping print(output_path) return output_path else: # Use truncated source CID for output filename source_cid = plan.get('source_hash', 'output')[:16] out = recipe_dir / f"{source_cid}-output.mp4" shutil.copy(final_output, out) print(f"Copied to: {out}", file=sys.stderr) # Print path to stdout for piping print(out) return out finally: print(f"Debug: temp files in {work_dir}", file=sys.stderr) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Execute a plan") parser.add_argument("plan", nargs="?", default="-", help="Plan file (- for stdin)") parser.add_argument("-o", "--output", type=Path, help="Output file") parser.add_argument("-d", "--dir", type=Path, default=Path("."), help="Recipe directory for resolving paths") parser.add_argument("-a", "--analysis", type=Path, help="Analysis file (.sexp)") args = parser.parse_args() plan_path = None if args.plan == "-" else Path(args.plan) if plan_path and not plan_path.exists(): print(f"Plan not found: {plan_path}") sys.exit(1) # Load external analysis if provided external_analysis = None if args.analysis: if not args.analysis.exists(): print(f"Analysis file not found: {args.analysis}") sys.exit(1) external_analysis = parse_analysis_sexp(args.analysis.read_text()) execute_plan(plan_path, args.output, args.dir, external_analysis=external_analysis)