""" Streaming pipeline executor. Directly executes compiled sexp recipes frame-by-frame. No adapter layer - frames and analysis flow through the DAG. """ import sys import time import numpy as np from pathlib import Path from typing import Dict, List, Any, Optional from dataclasses import dataclass, field from .sources import VideoSource from .audio import StreamingAudioAnalyzer from .output import DisplayOutput, FileOutput from .sexp_interp import SexpInterpreter @dataclass class FrameContext: """Context passed through the pipeline for each frame.""" t: float # Current time energy: float = 0.0 is_beat: bool = False beat_count: int = 0 analysis: Dict[str, Any] = field(default_factory=dict) class StreamingPipeline: """ Executes a compiled sexp recipe as a streaming pipeline. Frames flow through the DAG directly - no adapter needed. Each node is evaluated lazily when its output is requested. """ def __init__(self, compiled_recipe, recipe_dir: Path = None, fps: float = 30, seed: int = 42, output_size: tuple = None): self.recipe = compiled_recipe self.recipe_dir = recipe_dir or Path(".") self.fps = fps self.seed = seed # Build node lookup self.nodes = {n['id']: n for n in compiled_recipe.nodes} # Runtime state self.sources: Dict[str, VideoSource] = {} self.audio_analyzer: Optional[StreamingAudioAnalyzer] = None self.audio_source_path: Optional[str] = None # Sexp interpreter for expressions self.interp = SexpInterpreter() # Scan state (node_id -> current value) self.scan_state: Dict[str, Any] = {} self.scan_emit: Dict[str, Any] = {} # SLICE_ON state self.slice_on_acc: Dict[str, Any] = {} self.slice_on_result: Dict[str, Any] = {} # Frame cache for current timestep (cleared each frame) self._frame_cache: Dict[str, np.ndarray] = {} # Context for current frame self.ctx = FrameContext(t=0.0) # Output size (w, h) - set after sources are initialized self._output_size = output_size # Initialize self._init_sources() self._init_scans() self._init_slice_on() # Set output size from first source if not specified if self._output_size is None and self.sources: first_source = next(iter(self.sources.values())) self._output_size = first_source._size def _init_sources(self): """Initialize video and audio sources.""" for node in self.recipe.nodes: if node.get('type') == 'SOURCE': config = node.get('config', {}) path = config.get('path') if path: full_path = (self.recipe_dir / path).resolve() suffix = full_path.suffix.lower() if suffix in ('.mp4', '.webm', '.mov', '.avi', '.mkv'): if not full_path.exists(): print(f"Warning: video not found: {full_path}", file=sys.stderr) continue self.sources[node['id']] = VideoSource( str(full_path), target_fps=self.fps ) elif suffix in ('.mp3', '.wav', '.flac', '.ogg', '.m4a', '.aac'): if not full_path.exists(): print(f"Warning: audio not found: {full_path}", file=sys.stderr) continue self.audio_source_path = str(full_path) self.audio_analyzer = StreamingAudioAnalyzer(str(full_path)) def _init_scans(self): """Initialize scan nodes with their initial state.""" import random seed_offset = 0 for node in self.recipe.nodes: if node.get('type') == 'SCAN': config = node.get('config', {}) # Create RNG for this scan scan_seed = config.get('seed', self.seed + seed_offset) rng = random.Random(scan_seed) seed_offset += 1 # Evaluate initial value init_expr = config.get('init', 0) init_value = self.interp.eval(init_expr, {}) self.scan_state[node['id']] = { 'value': init_value, 'rng': rng, 'config': config, } # Compute initial emit self._update_scan_emit(node['id']) def _update_scan_emit(self, node_id: str): """Update the emit value for a scan.""" state = self.scan_state[node_id] config = state['config'] emit_expr = config.get('emit_expr', config.get('emit', None)) if emit_expr is None: # No emit expression - emit the value directly self.scan_emit[node_id] = state['value'] return # Build environment from state env = {} if isinstance(state['value'], dict): env.update(state['value']) else: env['acc'] = state['value'] env['beat_count'] = self.ctx.beat_count env['time'] = self.ctx.t # Set RNG for interpreter self.interp.rng = state['rng'] self.scan_emit[node_id] = self.interp.eval(emit_expr, env) def _step_scan(self, node_id: str): """Step a scan forward on beat.""" state = self.scan_state[node_id] config = state['config'] step_expr = config.get('step_expr', config.get('step', None)) if step_expr is None: return # Build environment env = {} if isinstance(state['value'], dict): env.update(state['value']) else: env['acc'] = state['value'] env['beat_count'] = self.ctx.beat_count env['time'] = self.ctx.t # Set RNG self.interp.rng = state['rng'] # Evaluate step new_value = self.interp.eval(step_expr, env) state['value'] = new_value # Update emit self._update_scan_emit(node_id) def _init_slice_on(self): """Initialize SLICE_ON nodes.""" for node in self.recipe.nodes: if node.get('type') == 'SLICE_ON': config = node.get('config', {}) init = config.get('init', {}) self.slice_on_acc[node['id']] = dict(init) # Evaluate initial state self._eval_slice_on(node['id']) def _eval_slice_on(self, node_id: str): """Evaluate a SLICE_ON node's Lambda.""" node = self.nodes[node_id] config = node.get('config', {}) fn = config.get('fn') videos = config.get('videos', []) if not fn: return acc = self.slice_on_acc[node_id] n_videos = len(videos) # Set up environment self.interp.globals['videos'] = list(range(n_videos)) try: from .sexp_interp import eval_slice_on_lambda result = eval_slice_on_lambda( fn, acc, self.ctx.beat_count, 0, 1, list(range(n_videos)), self.interp ) self.slice_on_result[node_id] = result # Update accumulator if 'acc' in result: self.slice_on_acc[node_id] = result['acc'] except Exception as e: print(f"SLICE_ON eval error: {e}", file=sys.stderr) def _on_beat(self): """Called when a beat is detected.""" self.ctx.beat_count += 1 # Step all scans for node_id in self.scan_state: self._step_scan(node_id) # Step all SLICE_ON nodes for node_id in self.slice_on_acc: self._eval_slice_on(node_id) def _get_frame(self, node_id: str) -> Optional[np.ndarray]: """ Get the output frame for a node at current time. Recursively evaluates inputs as needed. Results are cached for the current timestep. """ if node_id in self._frame_cache: return self._frame_cache[node_id] node = self.nodes.get(node_id) if not node: return None node_type = node.get('type') if node_type == 'SOURCE': frame = self._eval_source(node) elif node_type == 'SEGMENT': frame = self._eval_segment(node) elif node_type == 'EFFECT': frame = self._eval_effect(node) elif node_type == 'SLICE_ON': frame = self._eval_slice_on_frame(node) else: # Unknown node type - try to pass through input inputs = node.get('inputs', []) frame = self._get_frame(inputs[0]) if inputs else None self._frame_cache[node_id] = frame return frame def _eval_source(self, node: dict) -> Optional[np.ndarray]: """Evaluate a SOURCE node.""" source = self.sources.get(node['id']) if source: return source.read_frame(self.ctx.t) return None def _eval_segment(self, node: dict) -> Optional[np.ndarray]: """Evaluate a SEGMENT node (time segment of source).""" inputs = node.get('inputs', []) if not inputs: return None config = node.get('config', {}) start = config.get('start', 0) duration = config.get('duration') # Resolve any bindings if isinstance(start, dict): start = self._resolve_binding(start) if start.get('_binding') else 0 if isinstance(duration, dict): duration = self._resolve_binding(duration) if duration.get('_binding') else None # Adjust time for segment t_local = self.ctx.t + (start if isinstance(start, (int, float)) else 0) if duration and isinstance(duration, (int, float)): t_local = t_local % duration # Loop within segment # Get source frame at adjusted time source_id = inputs[0] source = self.sources.get(source_id) if source: return source.read_frame(t_local) return self._get_frame(source_id) def _eval_effect(self, node: dict) -> Optional[np.ndarray]: """Evaluate an EFFECT node.""" import cv2 inputs = node.get('inputs', []) config = node.get('config', {}) effect_name = config.get('effect') # Get input frame(s) input_frames = [self._get_frame(inp) for inp in inputs] input_frames = [f for f in input_frames if f is not None] if not input_frames: return None frame = input_frames[0] # Resolve bindings in config params = self._resolve_config(config) # Apply effect based on name if effect_name == 'rotate': angle = params.get('angle', 0) if abs(angle) > 0.5: h, w = frame.shape[:2] center = (w // 2, h // 2) matrix = cv2.getRotationMatrix2D(center, angle, 1.0) frame = cv2.warpAffine(frame, matrix, (w, h)) elif effect_name == 'zoom': amount = params.get('amount', 1.0) if abs(amount - 1.0) > 0.01: frame = self._apply_zoom(frame, amount) elif effect_name == 'invert': amount = params.get('amount', 0) if amount > 0.01: inverted = 255 - frame frame = cv2.addWeighted(frame, 1 - amount, inverted, amount, 0) elif effect_name == 'hue_shift': degrees = params.get('degrees', 0) if abs(degrees) > 1: hsv = cv2.cvtColor(frame, cv2.COLOR_RGB2HSV) hsv[:, :, 0] = (hsv[:, :, 0].astype(int) + int(degrees / 2)) % 180 frame = cv2.cvtColor(hsv, cv2.COLOR_HSV2RGB) elif effect_name == 'blend': if len(input_frames) >= 2: opacity = params.get('opacity', 0.5) frame = cv2.addWeighted(input_frames[0], 1 - opacity, input_frames[1], opacity, 0) elif effect_name == 'blend_multi': weights = params.get('weights', []) if len(input_frames) > 1 and weights: h, w = input_frames[0].shape[:2] result = np.zeros((h, w, 3), dtype=np.float32) for f, wt in zip(input_frames, weights): if f is not None and wt > 0.001: if f.shape[:2] != (h, w): f = cv2.resize(f, (w, h)) result += f.astype(np.float32) * wt frame = np.clip(result, 0, 255).astype(np.uint8) elif effect_name == 'ripple': amp = params.get('amplitude', 0) if amp > 1: frame = self._apply_ripple(frame, amp, params.get('center_x', 0.5), params.get('center_y', 0.5), params.get('frequency', 8), params.get('decay', 2), params.get('speed', 5)) return frame def _eval_slice_on_frame(self, node: dict) -> Optional[np.ndarray]: """Evaluate a SLICE_ON node - returns composited frame.""" import cv2 config = node.get('config', {}) video_ids = config.get('videos', []) result = self.slice_on_result.get(node['id'], {}) if not result: # No result yet - return first video if video_ids: return self._get_frame(video_ids[0]) return None # Get layers and compose info layers = result.get('layers', []) compose = result.get('compose', {}) weights = compose.get('weights', []) if not layers or not weights: if video_ids: return self._get_frame(video_ids[0]) return None # Get frames for each layer frames = [] for i, layer in enumerate(layers): video_idx = layer.get('video', i) if video_idx < len(video_ids): frame = self._get_frame(video_ids[video_idx]) # Apply layer effects (zoom) effects = layer.get('effects', []) for eff in effects: eff_name = eff.get('effect') if hasattr(eff_name, 'name'): eff_name = eff_name.name if eff_name == 'zoom': zoom_amt = eff.get('amount', 1.0) if frame is not None: frame = self._apply_zoom(frame, zoom_amt) frames.append(frame) else: frames.append(None) # Composite with weights - use consistent output size if self._output_size: w, h = self._output_size else: # Fallback to first non-None frame size for f in frames: if f is not None: h, w = f.shape[:2] break else: return None output = np.zeros((h, w, 3), dtype=np.float32) for frame, weight in zip(frames, weights): if frame is None or weight < 0.001: continue # Resize to output size if frame.shape[1] != w or frame.shape[0] != h: frame = cv2.resize(frame, (w, h)) output += frame.astype(np.float32) * weight # Normalize weights total_weight = sum(wt for wt in weights if wt > 0.001) if total_weight > 0 and abs(total_weight - 1.0) > 0.01: output /= total_weight return np.clip(output, 0, 255).astype(np.uint8) def _resolve_config(self, config: dict) -> dict: """Resolve bindings in effect config to actual values.""" resolved = {} for key, value in config.items(): if key in ('effect', 'effect_path', 'effect_cid', 'effects_registry', 'analysis_refs', 'inputs', 'cid'): continue if isinstance(value, dict) and value.get('_binding'): resolved[key] = self._resolve_binding(value) elif isinstance(value, dict) and value.get('_expr'): resolved[key] = self._resolve_expr(value) else: resolved[key] = value return resolved def _resolve_binding(self, binding: dict) -> Any: """Resolve a binding to its current value.""" source_id = binding.get('source') feature = binding.get('feature', 'values') range_map = binding.get('range') # Get raw value from scan or analysis if source_id in self.scan_emit: value = self.scan_emit[source_id] elif source_id in self.ctx.analysis: data = self.ctx.analysis[source_id] value = data.get(feature, data.get('values', [0]))[0] if isinstance(data, dict) else data else: # Fallback to energy value = self.ctx.energy # Extract feature from dict if isinstance(value, dict) and feature in value: value = value[feature] # Apply range mapping if range_map and isinstance(value, (int, float)): lo, hi = range_map value = lo + value * (hi - lo) return value def _resolve_expr(self, expr: dict) -> Any: """Resolve a compiled expression.""" env = { 'energy': self.ctx.energy, 'beat_count': self.ctx.beat_count, 't': self.ctx.t, } # Add scan values for scan_id, value in self.scan_emit.items(): # Use short form if available env[scan_id] = value # Extract the actual expression from _expr wrapper actual_expr = expr.get('_expr', expr) return self.interp.eval(actual_expr, env) def _apply_zoom(self, frame: np.ndarray, amount: float) -> np.ndarray: """Apply zoom to frame.""" import cv2 h, w = frame.shape[:2] if amount > 1.01: # Zoom in: crop center new_w, new_h = int(w / amount), int(h / amount) if new_w > 0 and new_h > 0: x1, y1 = (w - new_w) // 2, (h - new_h) // 2 cropped = frame[y1:y1+new_h, x1:x1+new_w] return cv2.resize(cropped, (w, h)) elif amount < 0.99: # Zoom out: shrink and center scaled_w, scaled_h = int(w * amount), int(h * amount) if scaled_w > 0 and scaled_h > 0: shrunk = cv2.resize(frame, (scaled_w, scaled_h)) canvas = np.zeros((h, w, 3), dtype=np.uint8) x_off, y_off = (w - scaled_w) // 2, (h - scaled_h) // 2 canvas[y_off:y_off+scaled_h, x_off:x_off+scaled_w] = shrunk return canvas return frame def _apply_ripple(self, frame: np.ndarray, amplitude: float, cx: float, cy: float, frequency: float, decay: float, speed: float) -> np.ndarray: """Apply ripple effect.""" import cv2 h, w = frame.shape[:2] # Create coordinate grids y_coords, x_coords = np.mgrid[0:h, 0:w].astype(np.float32) # Normalize to center center_x, center_y = w * cx, h * cy dx = x_coords - center_x dy = y_coords - center_y dist = np.sqrt(dx**2 + dy**2) # Ripple displacement phase = self.ctx.t * speed ripple = amplitude * np.sin(dist / frequency - phase) * np.exp(-dist * decay / max(w, h)) # Displace coordinates angle = np.arctan2(dy, dx) map_x = (x_coords + ripple * np.cos(angle)).astype(np.float32) map_y = (y_coords + ripple * np.sin(angle)).astype(np.float32) return cv2.remap(frame, map_x, map_y, cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT) def _find_output_node(self) -> Optional[str]: """Find the final output node (MUX or last EFFECT).""" # Look for MUX node for node in self.recipe.nodes: if node.get('type') == 'MUX': return node['id'] # Otherwise find last EFFECT after SLICE_ON last_effect = None found_slice_on = False for node in self.recipe.nodes: if node.get('type') == 'SLICE_ON': found_slice_on = True elif node.get('type') == 'EFFECT' and found_slice_on: last_effect = node['id'] return last_effect def render_frame(self, t: float) -> Optional[np.ndarray]: """Render a single frame at time t.""" # Clear frame cache self._frame_cache.clear() # Update context self.ctx.t = t # Update audio analysis if self.audio_analyzer: self.audio_analyzer.set_time(t) energy = self.audio_analyzer.get_energy() is_beat = self.audio_analyzer.get_beat() # Beat edge detection was_beat = self.ctx.is_beat self.ctx.energy = energy self.ctx.is_beat = is_beat if is_beat and not was_beat: self._on_beat() # Store in analysis dict self.ctx.analysis['live_energy'] = {'values': [energy]} self.ctx.analysis['live_beat'] = {'values': [1.0 if is_beat else 0.0]} # Find output node and render output_node = self._find_output_node() if output_node: frame = self._get_frame(output_node) # Normalize to output size if frame is not None and self._output_size: w, h = self._output_size if frame.shape[1] != w or frame.shape[0] != h: import cv2 frame = cv2.resize(frame, (w, h)) return frame return None def run(self, output: str = "preview", duration: float = None): """ Run the pipeline. Args: output: "preview", filename, or Output object duration: Duration in seconds (default: audio duration or 60s) """ # Determine duration if duration is None: if self.audio_analyzer: duration = self.audio_analyzer.duration else: duration = 60.0 # Create output if output == "preview": # Get frame size from first source first_source = next(iter(self.sources.values()), None) if first_source: w, h = first_source._size else: w, h = 720, 720 out = DisplayOutput(size=(w, h), fps=self.fps, audio_source=self.audio_source_path) elif isinstance(output, str): first_source = next(iter(self.sources.values()), None) if first_source: w, h = first_source._size else: w, h = 720, 720 out = FileOutput(output, size=(w, h), fps=self.fps, audio_source=self.audio_source_path) else: out = output frame_time = 1.0 / self.fps n_frames = int(duration * self.fps) print(f"Streaming: {len(self.sources)} sources -> {output}", file=sys.stderr) print(f"Duration: {duration:.1f}s, {n_frames} frames @ {self.fps}fps", file=sys.stderr) start_time = time.time() frame_count = 0 try: for frame_num in range(n_frames): t = frame_num * frame_time frame = self.render_frame(t) if frame is not None: out.write(frame, t) frame_count += 1 # Progress if frame_num % 50 == 0: elapsed = time.time() - start_time fps = frame_count / elapsed if elapsed > 0 else 0 pct = 100 * frame_num / n_frames print(f"\r{pct:5.1f}% | {fps:5.1f} fps | frame {frame_num}/{n_frames}", end="", file=sys.stderr) except KeyboardInterrupt: print("\nInterrupted", file=sys.stderr) finally: out.close() for src in self.sources.values(): src.close() elapsed = time.time() - start_time avg_fps = frame_count / elapsed if elapsed > 0 else 0 print(f"\nCompleted: {frame_count} frames in {elapsed:.1f}s ({avg_fps:.1f} fps avg)", file=sys.stderr) def run_pipeline(recipe_path: str, output: str = "preview", duration: float = None, fps: float = None): """ Run a recipe through the streaming pipeline. No adapter layer - directly executes the compiled recipe. """ from pathlib import Path # Add artdag to path import sys sys.path.insert(0, str(Path(__file__).parent.parent.parent / "artdag")) from artdag.sexp.compiler import compile_string recipe_path = Path(recipe_path) recipe_text = recipe_path.read_text() compiled = compile_string(recipe_text, {}, recipe_dir=recipe_path.parent) pipeline = StreamingPipeline( compiled, recipe_dir=recipe_path.parent, fps=fps or compiled.encoding.get('fps', 30), ) pipeline.run(output=output, duration=duration) def run_pipeline_piped(recipe_path: str, duration: float = None, fps: float = None): """ Run pipeline and pipe directly to mpv with audio. """ import subprocess from pathlib import Path import sys sys.path.insert(0, str(Path(__file__).parent.parent.parent / "artdag")) from artdag.sexp.compiler import compile_string recipe_path = Path(recipe_path) recipe_text = recipe_path.read_text() compiled = compile_string(recipe_text, {}, recipe_dir=recipe_path.parent) pipeline = StreamingPipeline( compiled, recipe_dir=recipe_path.parent, fps=fps or compiled.encoding.get('fps', 30), ) # Get frame info first_source = next(iter(pipeline.sources.values()), None) if first_source: w, h = first_source._size else: w, h = 720, 720 # Determine duration if duration is None: if pipeline.audio_analyzer: duration = pipeline.audio_analyzer.duration else: duration = 60.0 actual_fps = fps or compiled.encoding.get('fps', 30) n_frames = int(duration * actual_fps) frame_time = 1.0 / actual_fps print(f"Streaming {n_frames} frames @ {actual_fps}fps to mpv", file=sys.stderr) # Start mpv mpv_cmd = [ "mpv", "--no-cache", "--demuxer=rawvideo", f"--demuxer-rawvideo-w={w}", f"--demuxer-rawvideo-h={h}", "--demuxer-rawvideo-mp-format=rgb24", f"--demuxer-rawvideo-fps={actual_fps}", "--title=Streaming Pipeline", "-" ] mpv = subprocess.Popen(mpv_cmd, stdin=subprocess.PIPE, stderr=subprocess.DEVNULL) # Start audio if available audio_proc = None if pipeline.audio_source_path: audio_cmd = ["ffplay", "-nodisp", "-autoexit", "-loglevel", "quiet", pipeline.audio_source_path] audio_proc = subprocess.Popen(audio_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) try: import cv2 for frame_num in range(n_frames): if mpv.poll() is not None: break # mpv closed t = frame_num * frame_time frame = pipeline.render_frame(t) if frame is not None: # Ensure consistent frame size if frame.shape[1] != w or frame.shape[0] != h: frame = cv2.resize(frame, (w, h)) if not frame.flags['C_CONTIGUOUS']: frame = np.ascontiguousarray(frame) try: mpv.stdin.write(frame.tobytes()) mpv.stdin.flush() except BrokenPipeError: break except KeyboardInterrupt: pass finally: if mpv.stdin: mpv.stdin.close() mpv.terminate() if audio_proc: audio_proc.terminate() for src in pipeline.sources.values(): src.close() if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Run sexp recipe through streaming pipeline") parser.add_argument("recipe", help="Path to .sexp recipe file") parser.add_argument("-o", "--output", default="pipe", help="Output: 'pipe' (mpv), 'preview', or filename (default: pipe)") parser.add_argument("-d", "--duration", type=float, default=None, help="Duration in seconds (default: audio duration)") parser.add_argument("--fps", type=float, default=None, help="Frame rate (default: from recipe)") args = parser.parse_args() if args.output == "pipe": run_pipeline_piped(args.recipe, duration=args.duration, fps=args.fps) else: run_pipeline(args.recipe, output=args.output, duration=args.duration, fps=args.fps)