diff --git a/execute.py b/execute.py index 211a2f3..34d94a2 100644 --- a/execute.py +++ b/execute.py @@ -25,6 +25,210 @@ sys.path.insert(0, str(Path(__file__).parent.parent / "artdag")) from artdag.sexp import parse from artdag.sexp.parser import Symbol, Keyword +import time +import os +import threading +import concurrent.futures +from itertools import groupby + + +# Limit concurrent raw-video pipelines to prevent memory exhaustion. +# Each pipeline holds raw frames in memory (e.g. ~6MB per 1080p frame) +# and spawns 2+ ffmpeg subprocesses. When the ThreadPoolExecutor runs +# many EFFECT steps in parallel the combined load can freeze the system. +# Default: 1 concurrent pipeline; override with ARTDAG_VIDEO_PIPELINES. +_MAX_VIDEO_PIPELINES = int(os.environ.get("ARTDAG_VIDEO_PIPELINES", 1)) +_video_pipeline_sem = threading.Semaphore(_MAX_VIDEO_PIPELINES) + + +def set_max_video_pipelines(n: int): + """Reconfigure the video-pipeline concurrency limit at runtime.""" + global _video_pipeline_sem, _MAX_VIDEO_PIPELINES + _MAX_VIDEO_PIPELINES = n + _video_pipeline_sem = threading.Semaphore(n) + + +def _video_pipeline_guard(fn): + """Decorator: acquire the video-pipeline semaphore for the call's duration.""" + from functools import wraps + @wraps(fn) + def _guarded(*args, **kwargs): + _video_pipeline_sem.acquire() + try: + return fn(*args, **kwargs) + finally: + _video_pipeline_sem.release() + return _guarded + + +class ProgressBar: + """Simple console progress bar with ETA.""" + + def __init__(self, total: int, desc: str = "", width: int = 30, update_interval: int = 30): + self.total = total + self.desc = desc + self.width = width + self.current = 0 + self.start_time = time.time() + self.update_interval = update_interval + self._last_render = 0 + + def update(self, n: int = 1): + self.current += n + if self.current - self._last_render >= self.update_interval: + self._render() + self._last_render = self.current + + def set(self, n: int): + self.current = n + if self.current - self._last_render >= self.update_interval: + self._render() + self._last_render = self.current + + def _render(self): + elapsed = time.time() - self.start_time + + if self.total == 0: + # Unknown total - just show count + line = f"\r {self.desc} {self.current} frames ({elapsed:.1f}s)" + print(line, end="", file=sys.stderr, flush=True) + return + + pct = self.current / self.total + filled = int(self.width * pct) + bar = "█" * filled + "░" * (self.width - filled) + + if self.current > 0 and pct < 1.0: + eta = elapsed / pct - elapsed + eta_str = f"ETA {eta:.0f}s" + elif pct >= 1.0: + eta_str = f"done in {elapsed:.1f}s" + else: + eta_str = "..." + + line = f"\r {self.desc} |{bar}| {self.current}/{self.total} ({pct*100:.0f}%) {eta_str}" + print(line, end="", file=sys.stderr, flush=True) + + def finish(self): + self._render() + print(file=sys.stderr) # newline + + +def check_cache(cache_dir: Path, cache_id: str, extensions: list) -> Path: + """Check if a cached result exists for a step using IPNS/CID lookup. + + Args: + cache_dir: Cache directory (used for unified cache) + cache_id: IPNS address (computation hash, known before execution) + extensions: List of possible file extensions (for legacy compatibility) + + Returns: + Path to cached content file if found, None otherwise + """ + import cache as unified_cache + + # Look up IPNS → CID mapping + cached_path = unified_cache.cache_exists(cache_id) + if cached_path: + return cached_path + return None + + +def save_to_cache(cache_dir: Path, cache_id: str, source_path: Path) -> Path: + """Save a result to cache using IPNS/CID structure. + + Args: + cache_dir: Cache directory (used for unified cache) + cache_id: IPNS address (computation hash, known before execution) + source_path: Path to the file to cache + + Returns: + Path to the cached content file + """ + import cache as unified_cache + + # Store content by CID, create IPNS → CID ref + cid, cached_path = unified_cache.cache_store_file(cache_id, source_path) + return cached_path + + +def extract_segment_with_loop(input_path: Path, output_path: Path, start: float, duration: float, encoding: dict) -> Path: + """Extract a segment from a video, looping the source if needed to reach requested duration. + + Args: + input_path: Source video file + output_path: Output segment file + start: Start time in seconds + duration: Requested duration in seconds + encoding: Encoding settings dict + + Returns: + Path to the output segment + """ + enc = encoding + fps = enc.get("fps", 30) + + # First attempt without looping + cmd = ["ffmpeg", "-y", "-i", str(input_path)] + if start: + cmd.extend(["-ss", str(start)]) + if duration: + cmd.extend(["-t", str(duration)]) + cmd.extend(["-r", str(fps), + "-c:v", enc["codec"], "-preset", enc["preset"], + "-crf", str(enc["crf"]), "-pix_fmt", "yuv420p", + "-c:a", enc.get("audio_codec", "aac"), + str(output_path)]) + + print(f" Extracting segment: start={start}, duration={duration}", file=sys.stderr) + result = subprocess.run(cmd, capture_output=True, text=True) + + # Check if we need to loop + needs_loop = False + if result.returncode == 0 and duration: + probe_cmd = ["ffprobe", "-v", "quiet", "-print_format", "json", + "-show_format", str(output_path)] + probe_result = subprocess.run(probe_cmd, capture_output=True, text=True) + if probe_result.returncode == 0: + probe_data = json.loads(probe_result.stdout) + output_duration = float(probe_data.get("format", {}).get("duration", 0)) + if output_duration < duration - 1.0: # 1 second tolerance + needs_loop = True + print(f" Output {output_duration:.1f}s < requested {duration:.1f}s, will loop", file=sys.stderr) + + if needs_loop or result.returncode != 0: + # Get source duration for wrapping + probe_cmd = ["ffprobe", "-v", "quiet", "-print_format", "json", + "-show_format", str(input_path)] + probe_result = subprocess.run(probe_cmd, capture_output=True, text=True) + probe_data = json.loads(probe_result.stdout) + src_duration = float(probe_data.get("format", {}).get("duration", 0)) + + if src_duration > 0: + wrapped_start = start % src_duration if start else 0 + print(f" Looping source ({src_duration:.1f}s) to reach {duration:.1f}s", file=sys.stderr) + + # Re-run with stream_loop + cmd = ["ffmpeg", "-y", "-stream_loop", "-1", "-i", str(input_path)] + cmd.extend(["-ss", str(wrapped_start)]) + if duration: + cmd.extend(["-t", str(duration)]) + cmd.extend(["-r", str(fps), + "-c:v", enc["codec"], "-preset", enc["preset"], + "-crf", str(enc["crf"]), "-pix_fmt", "yuv420p", + "-c:a", enc.get("audio_codec", "aac"), + str(output_path)]) + + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + print(f" FFmpeg loop error: {result.stderr[:200]}", file=sys.stderr) + raise ValueError(f"FFmpeg segment extraction with loop failed") + + if not output_path.exists() or output_path.stat().st_size == 0: + raise ValueError(f"Segment output invalid: {output_path}") + + print(f" Segment: {output_path.stat().st_size / 1024 / 1024:.1f}MB", file=sys.stderr) + return output_path def clean_nil_symbols(obj): @@ -95,9 +299,6 @@ def sexp_to_plan(sexp) -> dict: i = 0 if isinstance(sexp[0], Symbol) and sexp[0].name == "plan": i = 1 - if i < len(sexp) and isinstance(sexp[i], str): - plan["recipe_id"] = sexp[i] - i += 1 # Parse keywords and steps while i < len(sexp): @@ -117,9 +318,9 @@ def sexp_to_plan(sexp) -> dict: elif key == "id": # Map :id to plan_id plan["plan_id"] = value - elif key == "recipe": - # Map :recipe to recipe_id - plan["recipe_id"] = value + elif key == "source_cid": + # Map :source-cid to source_hash + plan["source_hash"] = value else: plan[key] = value i += 1 @@ -142,7 +343,10 @@ def sexp_to_plan(sexp) -> dict: def parse_analysis_sexp(sexp) -> dict: - """Parse analysis S-expression: (analysis (bass :times [...] :values [...]) ...)""" + """Parse analysis S-expression: (analysis (bass :times [...] :values [...]) ...) + + Handles both inline data (:times [...] :values [...]) and cache-id refs (:cache-id "..."). + """ analysis = {} for item in sexp[1:]: # Skip 'analysis' symbol if isinstance(item, list) and item and isinstance(item[0], Symbol): @@ -158,6 +362,9 @@ def parse_analysis_sexp(sexp) -> dict: j += 1 else: j += 1 + # Normalize: parser gives "cache-id", internal code expects "_cache_id" + if "cache-id" in data: + data["_cache_id"] = data.pop("cache-id") analysis[name] = data return analysis @@ -432,47 +639,295 @@ def apply_transform(value: float, transform: str) -> float: return value -def resolve_params(params: dict, frame_time: float, analysis_data: dict) -> dict: - """Resolve any binding params using analysis data at frame_time.""" +def eval_expr(value, frame_time: float, frame_num: int, analysis_data: dict) -> float: + """ + Evaluate a runtime expression. + + Supports: + - Literals (int, float) + - Bindings: {"_binding": True, "source": ..., "feature": ...} + - Math expressions: {"_expr": True, "op": "+", "args": [...]} + - Time/frame: {"_expr": True, "op": "time"} or {"_expr": True, "op": "frame"} + """ + import math + + # Literal values + if isinstance(value, (int, float)): + return float(value) + + if not isinstance(value, dict): + return 0.0 # Unknown type + + # Handle bindings + if "_bind" in value or "_binding" in value: + if "_bind" in value: + ref = value["_bind"] + range_min = value.get("range_min", 0.0) + range_max = value.get("range_max", 1.0) + else: + ref = value.get("source", "") + range_val = value.get("range", [0.0, 1.0]) + range_min = range_val[0] if isinstance(range_val, list) else 0.0 + range_max = range_val[1] if isinstance(range_val, list) and len(range_val) > 1 else 1.0 + + transform = value.get("transform") + bind_offset = value.get("offset", 0.0) + + track = analysis_data.get(ref, {}) + times = track.get("times", []) + values = track.get("values", []) + + lookup_time = frame_time + bind_offset + raw = interpolate_analysis(times, values, lookup_time) + transformed = apply_transform(raw, transform) + + return range_min + transformed * (range_max - range_min) + + # Handle expressions + if "_expr" in value: + op = value.get("op") + args = value.get("args", []) + + # Special ops without args + if op == "time": + return frame_time + if op == "frame": + return float(frame_num) + + # Lazy-evaluated ops (don't evaluate all branches) + if op == "if": + cond = eval_expr(args[0], frame_time, frame_num, analysis_data) if args else 0.0 + if cond: + return eval_expr(args[1], frame_time, frame_num, analysis_data) if len(args) > 1 else 0.0 + return eval_expr(args[2], frame_time, frame_num, analysis_data) if len(args) > 2 else 0.0 + + # Evaluate arguments recursively + evaluated = [eval_expr(arg, frame_time, frame_num, analysis_data) for arg in args] + + # Comparison operations + if op == "<" and len(evaluated) >= 2: + return 1.0 if evaluated[0] < evaluated[1] else 0.0 + if op == ">" and len(evaluated) >= 2: + return 1.0 if evaluated[0] > evaluated[1] else 0.0 + if op == "<=" and len(evaluated) >= 2: + return 1.0 if evaluated[0] <= evaluated[1] else 0.0 + if op == ">=" and len(evaluated) >= 2: + return 1.0 if evaluated[0] >= evaluated[1] else 0.0 + if op == "=" and len(evaluated) >= 2: + return 1.0 if evaluated[0] == evaluated[1] else 0.0 + + # Math operations + if op == "+" and len(evaluated) >= 2: + return evaluated[0] + evaluated[1] + if op == "-" and len(evaluated) >= 2: + return evaluated[0] - evaluated[1] + if op == "*" and len(evaluated) >= 2: + return evaluated[0] * evaluated[1] + if op == "/" and len(evaluated) >= 2: + return evaluated[0] / evaluated[1] if evaluated[1] != 0 else 0.0 + if op == "mod" and len(evaluated) >= 2: + return evaluated[0] % evaluated[1] if evaluated[1] != 0 else 0.0 + if op == "min" and len(evaluated) >= 2: + return min(evaluated[0], evaluated[1]) + if op == "max" and len(evaluated) >= 2: + return max(evaluated[0], evaluated[1]) + if op == "abs" and len(evaluated) >= 1: + return abs(evaluated[0]) + if op == "sin" and len(evaluated) >= 1: + return math.sin(evaluated[0]) + if op == "cos" and len(evaluated) >= 1: + return math.cos(evaluated[0]) + if op == "floor" and len(evaluated) >= 1: + return float(math.floor(evaluated[0])) + if op == "ceil" and len(evaluated) >= 1: + return float(math.ceil(evaluated[0])) + + return 0.0 # Fallback + + +def eval_scan_expr(value, rng, variables): + """ + Evaluate a scan expression with seeded RNG and variable bindings. + + Args: + value: Compiled expression (literal, dict with _expr, etc.) + rng: random.Random instance (seeded, advances state per call) + variables: Dict of variable bindings (acc, rem, hue, etc.) + + Returns: + Evaluated value (number or dict) + """ + import math + + if isinstance(value, (int, float)): + return value + + if isinstance(value, str): + return value + + if not isinstance(value, dict) or "_expr" not in value: + return value + + op = value.get("op") + args = value.get("args", []) + + # Variable reference + if op == "var": + name = value.get("name", "") + return variables.get(name, 0) + + # Dict constructor + if op == "dict": + keys = value.get("keys", []) + vals = [eval_scan_expr(a, rng, variables) for a in args] + return dict(zip(keys, vals)) + + # Random ops (advance RNG state) + if op == "rand": + return rng.random() + if op == "rand-int": + lo = int(eval_scan_expr(args[0], rng, variables)) + hi = int(eval_scan_expr(args[1], rng, variables)) + return rng.randint(lo, hi) + if op == "rand-range": + lo = float(eval_scan_expr(args[0], rng, variables)) + hi = float(eval_scan_expr(args[1], rng, variables)) + return rng.uniform(lo, hi) + + # Conditional (lazy - only evaluate taken branch) + if op == "if": + cond = eval_scan_expr(args[0], rng, variables) if args else 0 + if cond: + return eval_scan_expr(args[1], rng, variables) if len(args) > 1 else 0 + return eval_scan_expr(args[2], rng, variables) if len(args) > 2 else 0 + + # Comparison ops + if op in ("<", ">", "<=", ">=", "="): + left = eval_scan_expr(args[0], rng, variables) if args else 0 + right = eval_scan_expr(args[1], rng, variables) if len(args) > 1 else 0 + if op == "<": + return 1 if left < right else 0 + if op == ">": + return 1 if left > right else 0 + if op == "<=": + return 1 if left <= right else 0 + if op == ">=": + return 1 if left >= right else 0 + if op == "=": + return 1 if left == right else 0 + + # Eagerly evaluate remaining args + evaluated = [eval_scan_expr(a, rng, variables) for a in args] + + # Arithmetic ops + if op == "+" and len(evaluated) >= 2: + return evaluated[0] + evaluated[1] + if op == "-" and len(evaluated) >= 2: + return evaluated[0] - evaluated[1] + if op == "-" and len(evaluated) == 1: + return -evaluated[0] + if op == "*" and len(evaluated) >= 2: + return evaluated[0] * evaluated[1] + if op == "/" and len(evaluated) >= 2: + return evaluated[0] / evaluated[1] if evaluated[1] != 0 else 0 + if op == "mod" and len(evaluated) >= 2: + return evaluated[0] % evaluated[1] if evaluated[1] != 0 else 0 + if op == "min" and len(evaluated) >= 2: + return min(evaluated[0], evaluated[1]) + if op == "max" and len(evaluated) >= 2: + return max(evaluated[0], evaluated[1]) + if op == "abs" and len(evaluated) >= 1: + return abs(evaluated[0]) + if op == "sin" and len(evaluated) >= 1: + return math.sin(evaluated[0]) + if op == "cos" and len(evaluated) >= 1: + return math.cos(evaluated[0]) + if op == "floor" and len(evaluated) >= 1: + return math.floor(evaluated[0]) + if op == "ceil" and len(evaluated) >= 1: + return math.ceil(evaluated[0]) + if op == "nth" and len(evaluated) >= 2: + collection = evaluated[0] + index = int(evaluated[1]) + if isinstance(collection, (list, tuple)) and 0 <= index < len(collection): + return collection[index] + return 0 + + return 0 # Fallback + + +def _is_binding(value): + """Check if a value is a binding/expression dict that needs per-frame resolution.""" + return isinstance(value, dict) and ("_bind" in value or "_binding" in value or "_expr" in value) + + +def _check_has_bindings(params: dict) -> bool: + """Check if any param value (including inside lists) contains bindings.""" + for v in params.values(): + if _is_binding(v): + return True + if isinstance(v, list) and any(_is_binding(item) for item in v): + return True + return False + + +def resolve_params(params: dict, frame_time: float, analysis_data: dict, frame_num: int = 0) -> dict: + """Resolve any binding/expression params using analysis data at frame_time. + + Handles bindings at the top level and inside lists (e.g. blend_multi weights). + """ resolved = {} for key, value in params.items(): - if isinstance(value, dict) and ("_bind" in value or "_binding" in value): - # This is a binding - resolve it - # Support both old format (_bind) and new format (_binding) - if "_bind" in value: - # Old format: {"_bind": "ref", "range_min": 0, "range_max": 1} - ref = value["_bind"] - range_min = value.get("range_min", 0.0) - range_max = value.get("range_max", 1.0) - else: - # New format from compiler: {"_binding": True, "source": "node_id", "feature": "values", "range": [min, max]} - ref = value.get("source", "") - range_val = value.get("range", [0.0, 1.0]) - range_min = range_val[0] if isinstance(range_val, list) else 0.0 - range_max = range_val[1] if isinstance(range_val, list) and len(range_val) > 1 else 1.0 - - transform = value.get("transform") - bind_offset = value.get("offset", 0.0) - - # Look up analysis track - track = analysis_data.get(ref, {}) - times = track.get("times", []) - values = track.get("values", []) - - # Interpolate raw value (0-1) - add binding offset to frame_time - lookup_time = frame_time + bind_offset - raw = interpolate_analysis(times, values, lookup_time) - - # Apply transform to raw value (before range scaling) - transformed = apply_transform(raw, transform) - - # Map to output range - resolved[key] = range_min + transformed * (range_max - range_min) + if _is_binding(value): + resolved[key] = eval_expr(value, frame_time, frame_num, analysis_data) + elif isinstance(value, list): + resolved[key] = [ + eval_expr(item, frame_time, frame_num, analysis_data) + if _is_binding(item) else item + for item in value + ] else: resolved[key] = value return resolved +def resolve_scalar_binding(value, analysis_data: dict): + """Resolve a scalar binding (like duration) from analysis data. + + For scalar features like 'duration', retrieves the value directly from analysis data. + For time-varying features, this returns None (use resolve_params instead). + + Returns: + Resolved value (float) if binding can be resolved to scalar, None otherwise. + If value is not a binding, returns the value unchanged. + """ + if not isinstance(value, dict) or not ("_bind" in value or "_binding" in value): + return value + + # Get source reference and feature + if "_bind" in value: + ref = value["_bind"] + feature = "values" # old format defaults to values + else: + ref = value.get("source", "") + feature = value.get("feature", "values") + + # Look up analysis track + track = analysis_data.get(ref, {}) + + # For scalar features like 'duration', get directly + if feature == "duration": + duration = track.get("duration") + if duration is not None: + return float(duration) + return None + + # For time-varying features, can't resolve to scalar + # Return None to indicate this needs frame-by-frame resolution + return None + + +@_video_pipeline_guard def run_effect(effect_module, input_path: Path, output_path: Path, params: dict, encoding: dict, analysis_data: dict = None, time_offset: float = 0.0, max_duration: float = None): """Run an effect on a video file. @@ -485,10 +940,10 @@ def run_effect(effect_module, input_path: Path, output_path: Path, params: dict, # Clean nil Symbols from params params = clean_nil_symbols(params) - # Get video info + # Get video info including duration probe_cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", - "-show_streams", str(input_path) + "-show_streams", "-show_format", str(input_path) ] probe_result = subprocess.run(probe_cmd, capture_output=True, text=True) probe_data = json.loads(probe_result.stdout) @@ -514,6 +969,11 @@ def run_effect(effect_module, input_path: Path, output_path: Path, params: dict, else: fps = float(fps_str) + # Get duration for progress bar + duration = None + if "format" in probe_data and "duration" in probe_data["format"]: + duration = float(probe_data["format"]["duration"]) + # Read frames with ffmpeg read_cmd = [ "ffmpeg", "-i", str(input_path), @@ -522,7 +982,7 @@ def run_effect(effect_module, input_path: Path, output_path: Path, params: dict, read_proc = subprocess.Popen(read_cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) # Check if we have any bindings that need per-frame resolution - has_bindings = any(isinstance(v, dict) and ("_bind" in v or "_binding" in v) for v in params.values()) + has_bindings = _check_has_bindings(params) analysis_data = analysis_data or {} # Debug: print bindings and analysis info once @@ -556,12 +1016,26 @@ def run_effect(effect_module, input_path: Path, output_path: Path, params: dict, # Resolve params for first frame if has_bindings: - frame_params = resolve_params(params, time_offset, analysis_data) + frame_params = resolve_params(params, time_offset, analysis_data, frame_num=0) else: frame_params = params + # Apply single effect with mix bypass: mix=0 → passthrough, 0=1 → full + def apply_effect(frame, frame_params, state): + mix_val = float(frame_params.get('mix', 1.0)) + if mix_val <= 0: + return frame, state + result, state = effect_module.process_frame(frame, frame_params, state) + if mix_val < 1.0: + result = np.clip( + frame.astype(np.float32) * (1.0 - mix_val) + + result.astype(np.float32) * mix_val, + 0, 255 + ).astype(np.uint8) + return result, state + state = None - processed, state = effect_module.process_frame(frame, frame_params, state) + processed, state = apply_effect(frame, frame_params, state) # Get output dimensions from processed frame out_height, out_width = processed.shape[:2] @@ -587,10 +1061,19 @@ def run_effect(effect_module, input_path: Path, output_path: Path, params: dict, write_proc.stdin.write(processed.tobytes()) frame_count = 1 - # Calculate max frames if duration limit specified + # Calculate max frames and total for progress bar max_frames = None + total_frames = 0 if max_duration: max_frames = int(max_duration * fps) + total_frames = max_frames + elif duration: + total_frames = int(duration * fps) + + # Create progress bar + effect_name = getattr(effect_module, 'effect_name', 'effect') + pbar = ProgressBar(total_frames, desc=effect_name) + pbar.set(1) # First frame already processed # Process remaining frames while True: @@ -607,25 +1090,24 @@ def run_effect(effect_module, input_path: Path, output_path: Path, params: dict, # Resolve params for this frame if has_bindings: frame_time = time_offset + frame_count / fps - frame_params = resolve_params(params, frame_time, analysis_data) + frame_params = resolve_params(params, frame_time, analysis_data, frame_num=frame_count) else: frame_params = params - processed, state = effect_module.process_frame(frame, frame_params, state) + processed, state = apply_effect(frame, frame_params, state) write_proc.stdin.write(processed.tobytes()) frame_count += 1 - - if frame_count % 30 == 0: - print(f" Processed {frame_count} frames...", end="\r", file=sys.stderr) + pbar.set(frame_count) read_proc.stdout.close() write_proc.stdin.close() read_proc.wait() write_proc.wait() - print(f" Processed {frame_count} frames total", file=sys.stderr) + pbar.finish() +@_video_pipeline_guard def run_multi_effect(effect_module, input_paths: List[Path], output_path: Path, params: dict, encoding: dict, analysis_data: dict = None, time_offset: float = 0.0, max_duration: float = None): """Run a multi-input effect on multiple video files. @@ -665,10 +1147,10 @@ def run_multi_effect(effect_module, input_paths: List[Path], output_path: Path, input_infos.append({"width": w, "height": h, "path": input_path}) print(f" Input: {input_path.name} ({w}x{h})", file=sys.stderr) - # Get framerate from first input + # Get framerate and duration from first input probe_cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", - "-show_streams", str(input_paths[0]) + "-show_streams", "-show_format", str(input_paths[0]) ] probe_result = subprocess.run(probe_cmd, capture_output=True, text=True) probe_data = json.loads(probe_result.stdout) @@ -680,6 +1162,11 @@ def run_multi_effect(effect_module, input_paths: List[Path], output_path: Path, else: fps = float(fps_str) + # Get duration for progress bar + duration = None + if "format" in probe_data and "duration" in probe_data["format"]: + duration = float(probe_data["format"]["duration"]) + # Open read processes for all inputs - preserve original dimensions read_procs = [] for info in input_infos: @@ -709,11 +1196,11 @@ def run_multi_effect(effect_module, input_paths: List[Path], output_path: Path, frames.append(frame) # Check if we have any bindings that need per-frame resolution - has_bindings = any(isinstance(v, dict) and ("_bind" in v or "_binding" in v) for v in params.values()) + has_bindings = _check_has_bindings(params) # Resolve params for first frame if has_bindings: - frame_params = resolve_params(params, time_offset, analysis_data) + frame_params = resolve_params(params, time_offset, analysis_data, frame_num=0) else: frame_params = params @@ -740,10 +1227,19 @@ def run_multi_effect(effect_module, input_paths: List[Path], output_path: Path, write_proc.stdin.write(processed.tobytes()) frame_count = 1 - # Calculate max frames if duration limit specified + # Calculate max frames and total for progress bar max_frames = None + total_frames = 0 if max_duration: max_frames = int(max_duration * fps) + total_frames = max_frames + elif duration: + total_frames = int(duration * fps) + + # Create progress bar + effect_name = getattr(effect_module, 'effect_name', 'blend') + pbar = ProgressBar(total_frames, desc=effect_name) + pbar.set(1) # First frame already processed # Process remaining frames while True: @@ -769,7 +1265,7 @@ def run_multi_effect(effect_module, input_paths: List[Path], output_path: Path, # Resolve params for this frame if has_bindings: frame_time = time_offset + frame_count / fps - frame_params = resolve_params(params, frame_time, analysis_data) + frame_params = resolve_params(params, frame_time, analysis_data, frame_num=frame_count) else: frame_params = params @@ -777,9 +1273,7 @@ def run_multi_effect(effect_module, input_paths: List[Path], output_path: Path, processed, state = effect_module.process_frame(frames, frame_params, state) write_proc.stdin.write(processed.tobytes()) frame_count += 1 - - if frame_count % 30 == 0: - print(f" Processed {frame_count} frames...", end="\r", file=sys.stderr) + pbar.set(frame_count) # Cleanup for proc in read_procs: @@ -788,7 +1282,173 @@ def run_multi_effect(effect_module, input_paths: List[Path], output_path: Path, write_proc.stdin.close() write_proc.wait() - print(f" Processed {frame_count} frames total", file=sys.stderr) + pbar.finish() + + +@_video_pipeline_guard +def run_effect_chain(effect_modules, input_path: Path, output_path: Path, + params_list: list, encoding: dict, + analysis_data=None, time_offset: float = 0.0, + max_duration: float = None): + """Run multiple effects as a single-pass fused chain: one decode, one encode, no intermediates. + + Args: + effect_modules: List of effect modules (each has process_frame) + input_path: Input video file + output_path: Output video file + params_list: List of param dicts, one per effect + encoding: Encoding settings + analysis_data: Optional analysis data for binding resolution + time_offset: Time offset for resolving bindings + max_duration: Maximum duration in seconds to process + """ + import numpy as np + + # Clean nil Symbols from each params dict + params_list = [clean_nil_symbols(p) for p in params_list] + + # Probe input for dimensions/fps/duration + probe_cmd = [ + "ffprobe", "-v", "quiet", "-print_format", "json", + "-show_streams", "-show_format", str(input_path) + ] + probe_result = subprocess.run(probe_cmd, capture_output=True, text=True) + probe_data = json.loads(probe_result.stdout) + + video_stream = None + for stream in probe_data.get("streams", []): + if stream.get("codec_type") == "video": + video_stream = stream + break + if not video_stream: + raise ValueError("No video stream found") + + in_width = int(video_stream["width"]) + in_height = int(video_stream["height"]) + + fps_str = video_stream.get("r_frame_rate", "30/1") + if "/" in fps_str: + num, den = fps_str.split("/") + fps = float(num) / float(den) + else: + fps = float(fps_str) + + duration = None + if "format" in probe_data and "duration" in probe_data["format"]: + duration = float(probe_data["format"]["duration"]) + + # Pre-compute per-effect binding flags + analysis_data = analysis_data or {} + bindings_flags = [] + for params in params_list: + has_b = any(isinstance(v, dict) and ("_bind" in v or "_binding" in v or "_expr" in v) + for v in params.values()) + bindings_flags.append(has_b) + + # Open single ffmpeg reader + read_cmd = [ + "ffmpeg", "-i", str(input_path), + "-f", "rawvideo", "-pix_fmt", "rgb24", "-" + ] + read_proc = subprocess.Popen(read_cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) + + # Read first frame + in_frame_size = in_width * in_height * 3 + frame_data = read_proc.stdout.read(in_frame_size) + if len(frame_data) < in_frame_size: + read_proc.stdout.close() + read_proc.wait() + raise ValueError("No frames in input video") + + frame = np.frombuffer(frame_data, dtype=np.uint8).reshape((in_height, in_width, 3)) + + # Apply effect chain to a frame, respecting per-effect mix bypass. + # mix=0 → skip (zero cost), 0=1 → full effect. + def apply_chain(frame, states, frame_num, frame_time): + processed = frame + for idx, (module, params, has_b) in enumerate(zip(effect_modules, params_list, bindings_flags)): + if has_b: + fp = resolve_params(params, frame_time, analysis_data, frame_num=frame_num) + else: + fp = params + mix_val = float(fp.get('mix', 1.0)) + if mix_val <= 0: + continue + result, states[idx] = module.process_frame(processed, fp, states[idx]) + if mix_val < 1.0: + processed = np.clip( + processed.astype(np.float32) * (1.0 - mix_val) + + result.astype(np.float32) * mix_val, + 0, 255 + ).astype(np.uint8) + else: + processed = result + return processed, states + + # Push first frame through all effects to discover final output dimensions + states = [None] * len(effect_modules) + processed, states = apply_chain(frame, states, 0, time_offset) + + out_height, out_width = processed.shape[:2] + if out_width != in_width or out_height != in_height: + print(f" Chain resizes: {in_width}x{in_height} -> {out_width}x{out_height}", file=sys.stderr) + + # Open single ffmpeg writer with final output dimensions + write_cmd = [ + "ffmpeg", "-y", + "-f", "rawvideo", "-pix_fmt", "rgb24", + "-s", f"{out_width}x{out_height}", "-r", str(encoding.get("fps", 30)), + "-i", "-", + "-i", str(input_path), # For audio + "-map", "0:v", "-map", "1:a?", + "-c:v", encoding["codec"], "-preset", encoding["preset"], "-crf", str(encoding["crf"]), + "-pix_fmt", "yuv420p", + "-c:a", encoding["audio_codec"], + str(output_path) + ] + write_proc = subprocess.Popen(write_cmd, stdin=subprocess.PIPE, stderr=subprocess.DEVNULL) + + # Write first processed frame + write_proc.stdin.write(processed.tobytes()) + frame_count = 1 + + # Calculate max frames and total for progress bar + max_frames = None + total_frames = 0 + if max_duration: + max_frames = int(max_duration * fps) + total_frames = max_frames + elif duration: + total_frames = int(duration * fps) + + effect_names = [getattr(m, 'effect_name', '?') for m in effect_modules] + pbar = ProgressBar(total_frames, desc='+'.join(effect_names)) + pbar.set(1) + + # Frame loop: read -> apply chain -> write + while True: + if max_frames and frame_count >= max_frames: + break + + frame_data = read_proc.stdout.read(in_frame_size) + if len(frame_data) < in_frame_size: + break + + frame = np.frombuffer(frame_data, dtype=np.uint8).reshape((in_height, in_width, 3)) + + frame_time = time_offset + frame_count / fps + processed, states = apply_chain(frame, states, frame_count, frame_time) + + write_proc.stdin.write(processed.tobytes()) + frame_count += 1 + pbar.set(frame_count) + + read_proc.stdout.close() + write_proc.stdin.close() + read_proc.wait() + write_proc.wait() + + pbar.finish() def get_video_dimensions(file_path: Path) -> tuple: @@ -948,8 +1608,13 @@ def tree_concat(files: list, work_dir: Path, prefix: str = "concat") -> Path: return current_files[0] -def execute_plan(plan_path: Path = None, output_path: Path = None, recipe_dir: Path = None, plan_data: dict = None, external_analysis: dict = None): - """Execute a plan file (S-expression) or plan dict.""" +def execute_plan(plan_path: Path = None, output_path: Path = None, recipe_dir: Path = None, plan_data: dict = None, external_analysis: dict = None, cache_dir: Path = None): + """Execute a plan file (S-expression) or plan dict. + + Args: + cache_dir: Directory to cache intermediate results. If provided, steps will + check for cached outputs before recomputing. + """ # Load plan from file, stdin, or dict if plan_data: @@ -963,13 +1628,28 @@ def execute_plan(plan_path: Path = None, output_path: Path = None, recipe_dir: P plan = parse_plan_input(content) print(f"Executing plan: {plan['plan_id'][:16]}...", file=sys.stderr) - print(f"Recipe: {plan['recipe_id']}", file=sys.stderr) + print(f"Source CID: {plan.get('source_hash', 'unknown')[:16]}...", file=sys.stderr) print(f"Steps: {len(plan['steps'])}", file=sys.stderr) recipe_encoding = plan.get("encoding", {}) - # Use external analysis if provided, otherwise fall back to plan's embedded analysis - analysis_data = external_analysis or plan.get("analysis", {}) + # Merge plan's embedded analysis (includes synthetic tracks from composition + # merging) with external analysis (fresh ANALYZE step outputs). + # External analysis takes priority for tracks that exist in both. + analysis_data = dict(plan.get("analysis", {})) + if external_analysis: + analysis_data.update(external_analysis) + + # Resolve cache-id refs from plan + for name, data in list(analysis_data.items()): + if isinstance(data, dict) and "_cache_id" in data: + try: + from cache import cache_get_json + loaded = cache_get_json(data["_cache_id"]) + if loaded: + analysis_data[name] = loaded + except ImportError: + pass # standalone mode, no cache available if recipe_dir is None: recipe_dir = plan_path.parent if plan_path else Path(".") @@ -990,8 +1670,8 @@ def execute_plan(plan_path: Path = None, output_path: Path = None, recipe_dir: P results = {} # step_id -> output_path work_dir = Path(tempfile.mkdtemp(prefix="artdag_exec_")) - # Sort steps: SOURCE first, then by level, but ANALYZE before COMPOUND/EFFECT at any level - # This ensures analysis data is available for binding resolution + # Sort steps by level first (respecting dependencies), then by type within each level + # Type priority within same level: SOURCE/SEGMENT first, then ANALYZE, then EFFECT steps = plan["steps"] def step_sort_key(s): node_type = s.get("node_type") or "UNKNOWN" @@ -1002,24 +1682,26 @@ def execute_plan(plan_path: Path = None, output_path: Path = None, recipe_dir: P # Ensure level is an int (could be Symbol or None) if not isinstance(level, int): level = 0 - # Priority: SOURCE=0, SEGMENT=1, ANALYZE=2, others=3 + # Type priority (tiebreaker within same level): SOURCE=0, SEGMENT=1, ANALYZE=2, others=3 if node_type == "SOURCE": type_priority = 0 elif node_type == "SEGMENT": type_priority = 1 - elif node_type == "ANALYZE": + elif node_type in ("ANALYZE", "SCAN"): type_priority = 2 else: type_priority = 3 - return (type_priority, level) + # Sort by level FIRST, then type priority as tiebreaker + return (level, type_priority) ordered_steps = sorted(steps, key=step_sort_key) try: - for step in ordered_steps: + def _run_step(step): step_id = step["step_id"] node_type = step["node_type"] config = step["config"] inputs = step.get("inputs", []) + cache_id = step.get("cache_id", step_id) # IPNS address for caching print(f"\n[{step.get('level', 0)}] {node_type}: {step_id[:16]}...", file=sys.stderr) @@ -1032,13 +1714,28 @@ def execute_plan(plan_path: Path = None, output_path: Path = None, recipe_dir: P print(f" -> {src_path}", file=sys.stderr) elif node_type == "SEGMENT": + is_audio = str(results[inputs[0]]).lower().endswith( + ('.mp3', '.wav', '.flac', '.aac', '.ogg', '.m4a')) + input_path = results[inputs[0]] start = config.get("start", 0) duration = config.get("duration") end = config.get("end") - is_audio = str(input_path).lower().endswith( - ('.mp3', '.wav', '.flac', '.aac', '.ogg', '.m4a')) + # Resolve any bindings to scalar values + start = resolve_scalar_binding(start, analysis_data) if start else 0 + duration = resolve_scalar_binding(duration, analysis_data) if duration else None + end = resolve_scalar_binding(end, analysis_data) if end else None + + # Check cache + cached = check_cache(cache_dir, cache_id, ['.m4a'] if is_audio else ['.mp4']) + if cached: + results[step_id] = cached + print(f" -> {cached} (cached)", file=sys.stderr) + return + + print(f" Resolved: start={start}, duration={duration}", file=sys.stderr) + enc = get_encoding(recipe_encoding, config) if is_audio: @@ -1065,16 +1762,23 @@ def execute_plan(plan_path: Path = None, output_path: Path = None, recipe_dir: P result = subprocess.run(cmd, capture_output=True, text=True) - # Check if segment has video content, if not try with looping + # Check if segment has video content AND correct duration, if not try with looping needs_loop = False if not is_audio and result.returncode == 0: probe_cmd = ["ffprobe", "-v", "quiet", "-print_format", "json", - "-show_streams", str(output_file)] + "-show_streams", "-show_format", str(output_file)] probe_result = subprocess.run(probe_cmd, capture_output=True, text=True) probe_data = json.loads(probe_result.stdout) has_video = any(s.get("codec_type") == "video" for s in probe_data.get("streams", [])) if not has_video: needs_loop = True + # Also check if output duration matches requested duration + elif duration: + output_duration = float(probe_data.get("format", {}).get("duration", 0)) + # If output is significantly shorter than requested, need to loop + if output_duration < duration - 1.0: # 1 second tolerance + needs_loop = True + print(f" Output {output_duration:.1f}s < requested {duration:.1f}s, will loop", file=sys.stderr) if needs_loop or result.returncode != 0: # Get source duration and loop the input @@ -1116,10 +1820,17 @@ def execute_plan(plan_path: Path = None, output_path: Path = None, recipe_dir: P else: raise ValueError(f"Cannot determine source duration for looping") - results[step_id] = output_file + results[step_id] = save_to_cache(cache_dir, cache_id, output_file) or output_file print(f" -> {output_file}", file=sys.stderr) elif node_type == "EFFECT": + # Check cache + cached = check_cache(cache_dir, cache_id, ['.mp4']) + if cached: + results[step_id] = cached + print(f" -> {cached} (cached)", file=sys.stderr) + return + effect_name = config.get("effect", "unknown") effect_path = config.get("effect_path") is_multi_input = config.get("multi_input", False) @@ -1150,13 +1861,20 @@ def execute_plan(plan_path: Path = None, output_path: Path = None, recipe_dir: P input_path = results[inputs[0]] shutil.copy(input_path, output_file) - results[step_id] = output_file + results[step_id] = save_to_cache(cache_dir, cache_id, output_file) or output_file print(f" -> {output_file}", file=sys.stderr) elif node_type == "SEQUENCE": + # Check cache first + cached = check_cache(cache_dir, cache_id, ['.mp4']) + if cached: + results[step_id] = cached + print(f" -> {cached} (cached)", file=sys.stderr) + return + if len(inputs) < 2: results[step_id] = results[inputs[0]] - continue + return input_files = [results[inp] for inp in inputs] enc = get_encoding(recipe_encoding, config) @@ -1193,15 +1911,34 @@ def execute_plan(plan_path: Path = None, output_path: Path = None, recipe_dir: P # Use tree concat for efficiency output_file = tree_concat(input_files, work_dir, f"seq_{step_id[:8]}") - results[step_id] = output_file + results[step_id] = save_to_cache(cache_dir, cache_id, output_file) or output_file print(f" -> {output_file}", file=sys.stderr) elif node_type == "MUX": + # Check cache + cached = check_cache(cache_dir, cache_id, ['.mp4']) + if cached: + results[step_id] = cached + print(f" -> {cached} (cached)", file=sys.stderr) + return + video_path = results[inputs[0]] audio_path = results[inputs[1]] enc = get_encoding(recipe_encoding, config) output_file = work_dir / f"mux_{step_id}.mp4" + + # Get duration for progress bar + probe_cmd = [ + "ffprobe", "-v", "quiet", "-print_format", "json", + "-show_format", str(video_path) + ] + probe_result = subprocess.run(probe_cmd, capture_output=True, text=True) + mux_duration = None + if probe_result.returncode == 0: + probe_data = json.loads(probe_result.stdout) + mux_duration = float(probe_data.get("format", {}).get("duration", 0)) + cmd = ["ffmpeg", "-y", "-i", str(video_path), "-i", str(audio_path), "-map", "0:v", "-map", "1:a", @@ -1209,11 +1946,32 @@ def execute_plan(plan_path: Path = None, output_path: Path = None, recipe_dir: P "-crf", str(enc["crf"]), "-c:a", enc["audio_codec"], "-shortest", str(output_file)] - subprocess.run(cmd, check=True, capture_output=True) - results[step_id] = output_file + import re + mux_proc = subprocess.Popen(cmd, stderr=subprocess.PIPE, text=True) + pbar = ProgressBar(int(mux_duration * 1000) if mux_duration else 0, desc="mux") + for line in mux_proc.stderr: + m = re.search(r"time=(\d+):(\d+):(\d+)\.(\d+)", line) + if m: + h, mi, s, cs = int(m.group(1)), int(m.group(2)), int(m.group(3)), int(m.group(4)) + ms = h * 3600000 + mi * 60000 + s * 1000 + cs * 10 + pbar.set(ms) + pbar.finish() + mux_proc.wait() + if mux_proc.returncode != 0: + raise RuntimeError("MUX ffmpeg failed") + results[step_id] = save_to_cache(cache_dir, cache_id, output_file) or output_file print(f" -> {output_file}", file=sys.stderr) elif node_type == "ANALYZE": + # Check cache first + cached = check_cache(cache_dir, cache_id, ['.json']) + if cached: + with open(cached) as f: + analysis_data[step_id] = json.load(f) + results[step_id] = cached + print(f" -> {cached} (cached)", file=sys.stderr) + return + output_file = work_dir / f"analysis_{step_id}.json" if "analysis_results" in config: @@ -1248,16 +2006,117 @@ def execute_plan(plan_path: Path = None, output_path: Path = None, recipe_dir: P else: print(f" -> no analyzer path!", file=sys.stderr) - results[step_id] = output_file + results[step_id] = save_to_cache(cache_dir, cache_id, output_file) or output_file + + elif node_type == "SCAN": + # Check cache first + cached = check_cache(cache_dir, cache_id, ['.json']) + if cached: + with open(cached) as f: + scan_result = json.load(f) + analysis_data[step_id] = scan_result + results[step_id] = cached + print(f" -> {cached} (cached)", file=sys.stderr) + return + + import random + + # Load source analysis data + source_id = inputs[0] + source_data = analysis_data.get(source_id, {}) + event_times = source_data.get("times", []) + duration = source_data.get("duration", event_times[-1] if event_times else 0) + + seed = config.get("seed", 0) + init_expr = config.get("init", 0) + step_expr = config.get("step_expr") + emit_expr = config.get("emit_expr") + + # Initialize RNG and accumulator + rng = random.Random(seed) + acc = eval_scan_expr(init_expr, rng, {}) + + # Process each event + event_values = [] # (time, emitted_value) pairs + + for t in event_times: + # Build variable bindings from accumulator + if isinstance(acc, dict): + variables = dict(acc) + variables["acc"] = acc + else: + variables = {"acc": acc} + + # Step: update accumulator + acc = eval_scan_expr(step_expr, rng, variables) + + # Rebind after step + if isinstance(acc, dict): + variables = dict(acc) + variables["acc"] = acc + else: + variables = {"acc": acc} + + # Emit: produce output value + emit_val = eval_scan_expr(emit_expr, rng, variables) + if isinstance(emit_val, (int, float)): + event_values.append((t, float(emit_val))) + else: + event_values.append((t, 0.0)) + + # Generate high-resolution time-series with step-held interpolation + resolution = 100 # points per second + hi_res_times = [] + hi_res_values = [] + + current_val = 0.0 + event_idx = 0 + num_points = int(duration * resolution) + 1 + + for i in range(num_points): + t = i / resolution + + # Advance to the latest event at or before time t + while event_idx < len(event_values) and event_values[event_idx][0] <= t: + current_val = event_values[event_idx][1] + event_idx += 1 + + hi_res_times.append(round(t, 4)) + hi_res_values.append(current_val) + + scan_result = { + "times": hi_res_times, + "values": hi_res_values, + "duration": duration, + } + + analysis_data[step_id] = scan_result + + # Save to cache + output_file = work_dir / f"scan_{step_id}.json" + with open(output_file, "w") as f: + json.dump(scan_result, f) + results[step_id] = save_to_cache(cache_dir, cache_id, output_file) or output_file + + print(f" SCAN: {len(event_times)} events -> {len(hi_res_times)} points ({duration:.1f}s)", file=sys.stderr) + print(f" -> {output_file}", file=sys.stderr) elif node_type == "COMPOUND": + # Check cache first + cached = check_cache(cache_dir, cache_id, ['.mp4']) + if cached: + results[step_id] = cached + print(f" -> {cached} (cached)", file=sys.stderr) + return + # Collapsed effect chains - compile to single FFmpeg command with sendcmd filter_chain_raw = config.get("filter_chain", []) if not filter_chain_raw: raise ValueError("COMPOUND step has empty filter_chain") - # Get effects registry for loading explicitly declared effects - effects_registry = config.get("effects_registry", {}) + # Get effects registry for this compound step (use different name + # to avoid shadowing the outer effects_registry in nested function) + step_effects_registry = config.get("effects_registry", {}) # Convert filter_chain items from S-expression lists to dicts # and clean nil Symbols from configs @@ -1317,13 +2176,7 @@ def execute_plan(plan_path: Path = None, output_path: Path = None, recipe_dir: P compiler = FFmpegCompiler() # Check if any effect has bindings - these need Python path for per-frame resolution - def has_bindings(effect_config): - for k, v in effect_config.items(): - if isinstance(v, dict) and ("_bind" in v or "_binding" in v): - return True - return False - - any_has_bindings = any(has_bindings(e) for e in effects) + any_has_bindings = any(_check_has_bindings(e) for e in effects) # Check if all effects have FFmpeg mappings all_have_mappings = all( @@ -1342,12 +2195,15 @@ def execute_plan(plan_path: Path = None, output_path: Path = None, recipe_dir: P segment_duration or 1.0, ) - # Build FFmpeg command - cmd = ["ffmpeg", "-y", "-i", str(input_path)] - if segment_start: - cmd.extend(["-ss", str(segment_start)]) - if segment_duration: - cmd.extend(["-t", str(segment_duration)]) + # First extract segment with looping if needed + ffmpeg_input = input_path + if segment_start or segment_duration: + seg_temp = work_dir / f"compound_{step_id}_seg_temp.mp4" + extract_segment_with_loop(input_path, seg_temp, segment_start or 0, segment_duration, enc) + ffmpeg_input = seg_temp + + # Build FFmpeg command (segment already extracted, just apply filters) + cmd = ["ffmpeg", "-y", "-i", str(ffmpeg_input)] if ffmpeg_filters: cmd.extend(["-vf", ffmpeg_filters]) @@ -1374,42 +2230,26 @@ def execute_plan(plan_path: Path = None, output_path: Path = None, recipe_dir: P # Fall back to sequential processing for effects without FFmpeg mappings current_input = input_path - # First handle segment + # First handle segment (with looping if source is shorter than requested) for filter_item in filter_chain: if filter_item.get("type") == "SEGMENT": filter_config = filter_item.get("config", {}) - start = filter_config.get("start", 0) + start = filter_config.get("start", 0) or 0 duration = filter_config.get("duration") if start or duration: seg_output = work_dir / f"compound_{step_id}_seg.mp4" - cmd = ["ffmpeg", "-y", "-i", str(current_input)] - if start: - cmd.extend(["-ss", str(start)]) - if duration: - cmd.extend(["-t", str(duration)]) - cmd.extend(["-r", str(enc.get("fps", 30)), - "-c:v", enc["codec"], "-preset", enc["preset"], - "-crf", str(enc["crf"]), "-pix_fmt", "yuv420p", - "-c:a", enc["audio_codec"], - str(seg_output)]) - print(f" Extracting segment: start={start}, duration={duration}", file=sys.stderr) - result = subprocess.run(cmd, capture_output=True, text=True) - if result.returncode != 0: - print(f" FFmpeg segment error: {result.stderr}", file=sys.stderr) - raise ValueError(f"FFmpeg segment extraction failed: {result.stderr}") - if not seg_output.exists() or seg_output.stat().st_size == 0: - raise ValueError(f"Segment output invalid: {seg_output}") - print(f" Segment output: {seg_output.stat().st_size} bytes", file=sys.stderr) + extract_segment_with_loop(current_input, seg_output, start, duration, enc) current_input = seg_output break - # Then handle effects sequentially - for i, effect_config in enumerate(effects): + # Load all effect modules and params for fused single-pass execution + effect_modules = [] + chain_params_list = [] + for effect_config in effects: effect_name = effect_config.get("effect", "unknown") effect_path = effect_config.get("effect_path") - # Try to resolve effect path if not effect_path: for effects_dir in ["effects", "sexp_effects/effects"]: for ext in [".py", ".sexp"]: @@ -1420,34 +2260,52 @@ def execute_plan(plan_path: Path = None, output_path: Path = None, recipe_dir: P if effect_path: break - is_last = (i == len(effects) - 1) - effect_output = output_file if is_last else work_dir / f"compound_{step_id}_fx_{i:02d}.mp4" - - if effect_path: - full_path = recipe_dir / effect_path - effect_module = load_effect(full_path, effects_registry, recipe_dir, minimal_primitives) - params = {k: v for k, v in effect_config.items() - if k not in ("effect", "effect_path", "cid", "encoding", "type")} - print(f" COMPOUND [{i+1}/{len(effects)}]: {effect_name} (Python)", file=sys.stderr) - # Debug: check input file - if not current_input.exists(): - raise ValueError(f"Input file does not exist: {current_input}") - input_size = current_input.stat().st_size - print(f" Input: {current_input.name} ({input_size} bytes)", file=sys.stderr) - if input_size == 0: - raise ValueError(f"Input file is empty: {current_input}") - run_effect(effect_module, current_input, effect_output, params, enc, analysis_data, time_offset=segment_start, max_duration=segment_duration) - else: + if not effect_path: raise ValueError(f"COMPOUND EFFECT '{effect_name}' has no effect_path or FFmpeg mapping") - current_input = effect_output + full_path = recipe_dir / effect_path + effect_modules.append(load_effect(full_path, step_effects_registry or effects_registry, recipe_dir, minimal_primitives)) + chain_params_list.append({k: v for k, v in effect_config.items() + if k not in ("effect", "effect_path", "cid", "encoding", "type")}) - results[step_id] = output_file + effect_names = [e.get("effect", "?") for e in effects] + print(f" COMPOUND (fused): {', '.join(effect_names)}", file=sys.stderr) + + run_effect_chain(effect_modules, current_input, output_file, + chain_params_list, enc, analysis_data, + time_offset=segment_start, + max_duration=segment_duration) + + results[step_id] = save_to_cache(cache_dir, cache_id, output_file) or output_file print(f" -> {output_file}", file=sys.stderr) else: raise ValueError(f"Unknown node type: {node_type}") + # Group steps by level for parallel execution. + # Default to 4 workers to avoid overwhelming the system with + # CPU-intensive effects (ascii_art, ripple, etc.) running in parallel. + max_workers = int(os.environ.get("ARTDAG_WORKERS", 4)) + level_groups = [] + for k, g in groupby(ordered_steps, key=lambda s: s.get("level", 0)): + level_groups.append((k, list(g))) + + for level_num, level_steps in level_groups: + if len(level_steps) == 1: + _run_step(level_steps[0]) + else: + types = [s.get("node_type", "?") for s in level_steps] + types = [t.name if hasattr(t, 'name') else str(t) for t in types] + type_counts = {} + for t in types: + type_counts[t] = type_counts.get(t, 0) + 1 + type_summary = ", ".join(f"{v}x {k}" for k, v in type_counts.items()) + print(f"\n >> Level {level_num}: {len(level_steps)} steps in parallel ({type_summary})", file=sys.stderr) + with concurrent.futures.ThreadPoolExecutor(max_workers=min(len(level_steps), max_workers)) as pool: + futures = [pool.submit(_run_step, s) for s in level_steps] + for f in concurrent.futures.as_completed(futures): + f.result() # re-raises exceptions from threads + # Get final output final_output = results[plan["output_step_id"]] print(f"\n--- Output ---", file=sys.stderr) @@ -1470,7 +2328,9 @@ def execute_plan(plan_path: Path = None, output_path: Path = None, recipe_dir: P print(output_path) return output_path else: - out = recipe_dir / f"{plan['recipe_id']}-output.mp4" + # Use truncated source CID for output filename + source_cid = plan.get('source_hash', 'output')[:16] + out = recipe_dir / f"{source_cid}-output.mp4" shutil.copy(final_output, out) print(f"Copied to: {out}", file=sys.stderr) # Print path to stdout for piping diff --git a/run_staged.py b/run_staged.py index 4c7e9d9..597aacb 100644 --- a/run_staged.py +++ b/run_staged.py @@ -18,6 +18,7 @@ The script: 3. Produce final output """ +import os import sys import json import tempfile @@ -30,9 +31,37 @@ from typing import Dict, List, Optional, Any sys.path.insert(0, str(Path(__file__).parent.parent / "artdag")) from artdag.sexp import compile_string, parse -from artdag.sexp.parser import Symbol, Keyword +from artdag.sexp.parser import Symbol, Keyword, serialize from artdag.sexp.planner import create_plan +# Import unified cache +import cache as unified_cache + +import hashlib + + +def _cache_analysis_tracks(plan): + """Cache each analysis track individually, replace data with cache-id refs.""" + import json as _json + for name, data in plan.analysis.items(): + json_str = _json.dumps(data, sort_keys=True) + content_cid = hashlib.sha256(json_str.encode()).hexdigest() + unified_cache.cache_store_json(content_cid, data) + plan.analysis[name] = {"_cache_id": content_cid} + + +def _resolve_analysis_refs(analysis_dict): + """Resolve cache-id refs back to full analysis data.""" + resolved = {} + for name, data in analysis_dict.items(): + if isinstance(data, dict) and "_cache_id" in data: + loaded = unified_cache.cache_get_json(data["_cache_id"]) + if loaded: + resolved[name] = loaded + else: + resolved[name] = data + return resolved + def run_staged_recipe( recipe_path: Path, @@ -40,6 +69,7 @@ def run_staged_recipe( cache_dir: Optional[Path] = None, params: Optional[Dict[str, Any]] = None, verbose: bool = True, + force_replan: bool = False, ) -> Path: """ Run a staged recipe with stage-level caching. @@ -57,21 +87,56 @@ def run_staged_recipe( recipe_text = recipe_path.read_text() recipe_dir = recipe_path.parent - # Set up cache directory - if cache_dir is None: - cache_dir = recipe_dir / ".stage_cache" - cache_dir.mkdir(parents=True, exist_ok=True) + # Use unified cache + content_cache_dir = unified_cache.get_content_dir() def log(msg: str): if verbose: print(msg, file=sys.stderr) + # Store recipe source by CID + recipe_cid, _ = unified_cache.content_store_string(recipe_text) + log(f"Recipe CID: {recipe_cid[:16]}...") + # Compile recipe log(f"Compiling: {recipe_path}") - compiled = compile_string(recipe_text, params) + compiled = compile_string(recipe_text, params, recipe_dir=recipe_dir) log(f"Recipe: {compiled.name} v{compiled.version}") log(f"Nodes: {len(compiled.nodes)}") + # Store effects by CID + for effect_name, effect_info in compiled.registry.get("effects", {}).items(): + effect_path = effect_info.get("path") + effect_cid = effect_info.get("cid") + if effect_path and effect_cid: + effect_file = Path(effect_path) + if effect_file.exists(): + stored_cid, _ = unified_cache.content_store_file(effect_file) + if stored_cid == effect_cid: + log(f"Effect '{effect_name}' CID: {effect_cid[:16]}...") + else: + log(f"Warning: Effect '{effect_name}' CID mismatch") + + # Store analyzers by CID + for analyzer_name, analyzer_info in compiled.registry.get("analyzers", {}).items(): + analyzer_path = analyzer_info.get("path") + analyzer_cid = analyzer_info.get("cid") + if analyzer_path: + analyzer_file = Path(analyzer_path) if Path(analyzer_path).is_absolute() else recipe_dir / analyzer_path + if analyzer_file.exists(): + stored_cid, _ = unified_cache.content_store_file(analyzer_file) + log(f"Analyzer '{analyzer_name}' CID: {stored_cid[:16]}...") + + # Store included files by CID + for include_path, include_cid in compiled.registry.get("includes", {}).items(): + include_file = Path(include_path) + if include_file.exists(): + stored_cid, _ = unified_cache.content_store_file(include_file) + if stored_cid == include_cid: + log(f"Include '{include_file.name}' CID: {include_cid[:16]}...") + else: + log(f"Warning: Include '{include_file.name}' CID mismatch") + # Check for stages if not compiled.stages: log("No stages found - running as regular recipe") @@ -96,6 +161,53 @@ def run_staged_recipe( times = results.get("times", []) log(f" Analysis complete: {node_id[:16]}... ({len(times)} times)") + # Check for cached plan using unified cache + plan_cid = unified_cache.plan_exists(recipe_cid, params) + + if plan_cid and not force_replan: + plan_cache_path = unified_cache.plan_get_path(recipe_cid, params) + log(f"\nFound cached plan: {plan_cid[:16]}...") + plan_sexp_str = unified_cache.plan_load(recipe_cid, params) + + # Parse the cached plan + from execute import parse_plan_input + plan_dict = parse_plan_input(plan_sexp_str) + + # Resolve cache-id refs in plan's embedded analysis + if "analysis" in plan_dict: + plan_dict["analysis"] = _resolve_analysis_refs(plan_dict["analysis"]) + + # Load analysis data from unified cache + analysis_data = {} + for step in plan_dict.get("steps", []): + if step.get("node_type") == "ANALYZE": + step_id = step.get("step_id") + cached_analysis = unified_cache.cache_get_json(step_id) + if cached_analysis: + analysis_data[step_id] = cached_analysis + log(f" Loaded analysis: {step_id[:16]}...") + + log(f"Plan ID: {plan_dict.get('plan_id', 'unknown')[:16]}...") + log(f"Steps: {len(plan_dict.get('steps', []))}") + log(f"Analysis tracks: {list(analysis_data.keys())}") + + # Execute directly from cached plan + log("\n--- Execution (from cached plan) ---") + from execute import execute_plan + + result_path = execute_plan( + plan_path=plan_cache_path, + output_path=output_path, + recipe_dir=recipe_dir, + external_analysis=analysis_data, + cache_dir=content_cache_dir, + ) + + log(f"\n--- Complete ---") + log(f"Output: {result_path}") + return result_path + + # No cached plan - create new one plan = create_plan( compiled, inputs={}, @@ -105,18 +217,29 @@ def run_staged_recipe( log(f"\nPlan ID: {plan.plan_id[:16]}...") log(f"Steps: {len(plan.steps)}") + log(f"Analysis tracks: {list(analysis_data.keys())}") + + # Cache analysis tracks individually and replace with cache-id refs + _cache_analysis_tracks(plan) + + # Save plan to unified cache + plan_sexp_str = plan.to_string(pretty=True) + plan_cache_id, plan_cid, plan_cache_path = unified_cache.plan_store(recipe_cid, params, plan_sexp_str) + log(f"Saved plan: {plan_cache_id[:16]}... → {plan_cid[:16]}...") # Execute the plan using execute.py logic log("\n--- Execution ---") from execute import execute_plan + # Resolve cache-id refs back to full data for execution + resolved_analysis = _resolve_analysis_refs(plan.analysis) + plan_dict = { "plan_id": plan.plan_id, - "recipe_id": compiled.name, - "recipe_hash": plan.recipe_hash, + "source_hash": plan.source_hash, "encoding": compiled.encoding, "output_step_id": plan.output_step_id, - "analysis": analysis_data, + "analysis": {**resolved_analysis, **analysis_data}, "effects_registry": plan.effects_registry, "minimal_primitives": plan.minimal_primitives, "steps": [], @@ -134,16 +257,16 @@ def run_staged_recipe( # Tag with stage info if present if step.stage: step_dict["stage"] = step.stage - step_dict["stage_cache_id"] = step.stage_cache_id plan_dict["steps"].append(step_dict) - # Execute + # Execute using unified cache result_path = execute_plan( plan_path=None, output_path=output_path, recipe_dir=recipe_dir, plan_data=plan_dict, external_analysis=analysis_data, + cache_dir=content_cache_dir, ) log(f"\n--- Complete ---") @@ -162,6 +285,11 @@ def _run_non_staged(compiled, recipe_dir: Path, output_path: Optional[Path], ver raise NotImplementedError("Non-staged recipes should use plan.py | execute.py") +def list_cache(verbose: bool = False): + """List all cached items using the unified cache.""" + unified_cache.print_cache_listing(verbose) + + def list_params(recipe_path: Path): """List available parameters for a recipe and its effects.""" from artdag.sexp import parse @@ -283,16 +411,41 @@ Examples: python3 run_staged.py recipe.sexp -p color_mode=lime -p char_jitter=5 """ ) - parser.add_argument("recipe", type=Path, help="Recipe file (.sexp)") + parser.add_argument("recipe", type=Path, nargs="?", help="Recipe file (.sexp)") parser.add_argument("-o", "--output", type=Path, help="Output file path") - parser.add_argument("-c", "--cache", type=Path, help="Stage cache directory") parser.add_argument("-p", "--param", action="append", dest="params", metavar="KEY=VALUE", help="Set recipe parameter") parser.add_argument("-q", "--quiet", action="store_true", help="Suppress progress output") parser.add_argument("--list-params", action="store_true", help="List available parameters and exit") + parser.add_argument("--list-cache", action="store_true", help="List cached items and exit") + parser.add_argument("--no-cache", action="store_true", help="Ignore cached plan, force re-planning") + parser.add_argument("--show-plan", action="store_true", help="Show the plan S-expression and exit (don't execute)") + parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") + parser.add_argument("-j", "--jobs", type=int, default=None, + help="Max parallel workers (default: 4, or ARTDAG_WORKERS env)") + parser.add_argument("--pipelines", type=int, default=None, + help="Max concurrent video pipelines (default: 1, or ARTDAG_VIDEO_PIPELINES env)") args = parser.parse_args() + # Apply concurrency limits before any execution + if args.jobs is not None: + os.environ["ARTDAG_WORKERS"] = str(args.jobs) + if args.pipelines is not None: + os.environ["ARTDAG_VIDEO_PIPELINES"] = str(args.pipelines) + from execute import set_max_video_pipelines + set_max_video_pipelines(args.pipelines) + + # List cache mode - doesn't require recipe + if args.list_cache: + list_cache(verbose=args.verbose) + sys.exit(0) + + # All other modes require a recipe + if not args.recipe: + print("Error: recipe file required", file=sys.stderr) + sys.exit(1) + if not args.recipe.exists(): print(f"Recipe not found: {args.recipe}", file=sys.stderr) sys.exit(1) @@ -320,12 +473,51 @@ Examples: pass # Keep as string params[key] = value + # Show plan mode - generate plan and display without executing + if args.show_plan: + recipe_text = args.recipe.read_text() + recipe_dir = args.recipe.parent + + # Compute recipe CID (content hash) + recipe_cid, _ = unified_cache.content_store_string(recipe_text) + + compiled = compile_string(recipe_text, params if params else None, recipe_dir=recipe_dir) + + # Check for cached plan using unified cache (keyed by source CID + params) + plan_cid = unified_cache.plan_exists(recipe_cid, params if params else None) + + if plan_cid and not args.no_cache: + print(f";; Cached plan CID: {plan_cid}", file=sys.stderr) + plan_sexp_str = unified_cache.plan_load(recipe_cid, params if params else None) + print(plan_sexp_str) + else: + print(f";; Generating new plan...", file=sys.stderr) + analysis_data = {} + def on_analysis(node_id: str, results: dict): + analysis_data[node_id] = results + + plan = create_plan( + compiled, + inputs={}, + recipe_dir=recipe_dir, + on_analysis=on_analysis, + ) + # Cache analysis tracks individually before serialization + _cache_analysis_tracks(plan) + plan_sexp_str = plan.to_string(pretty=True) + + # Save to unified cache + cache_id, plan_cid, plan_path = unified_cache.plan_store(recipe_cid, params if params else None, plan_sexp_str) + print(f";; Saved: {cache_id[:16]}... → {plan_cid}", file=sys.stderr) + print(plan_sexp_str) + sys.exit(0) + result = run_staged_recipe( recipe_path=args.recipe, output_path=args.output, - cache_dir=args.cache, params=params if params else None, verbose=not args.quiet, + force_replan=args.no_cache, ) # Print final output path