""" Effect processing backends. Provides abstraction over different rendering backends: - numpy: CPU-based, works everywhere, ~3-5 fps - glsl: GPU-based, requires OpenGL, 30+ fps (future) """ import numpy as np from abc import ABC, abstractmethod from typing import List, Dict, Any, Optional from pathlib import Path class Backend(ABC): """Abstract base class for effect processing backends.""" @abstractmethod def process_frame( self, frames: List[np.ndarray], effects_per_frame: List[List[Dict]], compositor_config: Dict, t: float, analysis_data: Dict, ) -> np.ndarray: """ Process multiple input frames through effects and composite. Args: frames: List of input frames (one per source) effects_per_frame: List of effect chains (one per source) compositor_config: How to blend the layers t: Current time in seconds analysis_data: Analysis data for binding resolution Returns: Composited output frame """ pass @abstractmethod def load_effect(self, effect_path: Path) -> Any: """Load an effect definition.""" pass class NumpyBackend(Backend): """ CPU-based effect processing using NumPy. Uses existing sexp_effects interpreter for effect execution. Works on any system, but limited to ~3-5 fps for complex effects. """ def __init__(self, recipe_dir: Path = None, minimal_primitives: bool = True): self.recipe_dir = recipe_dir or Path(".") self.minimal_primitives = minimal_primitives self._interpreter = None self._loaded_effects = {} def _get_interpreter(self): """Lazy-load the sexp interpreter.""" if self._interpreter is None: from sexp_effects import get_interpreter self._interpreter = get_interpreter(minimal_primitives=self.minimal_primitives) return self._interpreter def load_effect(self, effect_path: Path) -> Any: """Load an effect from sexp file.""" if isinstance(effect_path, str): effect_path = Path(effect_path) effect_key = str(effect_path) if effect_key not in self._loaded_effects: interp = self._get_interpreter() interp.load_effect(str(effect_path)) self._loaded_effects[effect_key] = effect_path.stem return self._loaded_effects[effect_key] def _resolve_binding(self, value: Any, t: float, analysis_data: Dict) -> Any: """Resolve a parameter binding to its value at time t.""" if not isinstance(value, dict): return value if "_binding" in value or "_bind" in value: source = value.get("source") or value.get("_bind") feature = value.get("feature", "values") range_map = value.get("range") track = analysis_data.get(source, {}) times = track.get("times", []) values = track.get("values", []) if not times or not values: return 0.0 # Find value at time t (linear interpolation) if t <= times[0]: val = values[0] elif t >= times[-1]: val = values[-1] else: # Binary search for bracket for i in range(len(times) - 1): if times[i] <= t <= times[i + 1]: alpha = (t - times[i]) / (times[i + 1] - times[i]) val = values[i] * (1 - alpha) + values[i + 1] * alpha break else: val = values[-1] # Apply range mapping if range_map and len(range_map) == 2: val = range_map[0] + val * (range_map[1] - range_map[0]) return val return value def _apply_effect( self, frame: np.ndarray, effect_name: str, params: Dict, t: float, analysis_data: Dict, ) -> np.ndarray: """Apply a single effect to a frame.""" # Resolve bindings in params resolved_params = {"_time": t} for key, value in params.items(): if key in ("effect", "effect_path", "cid", "analysis_refs"): continue resolved_params[key] = self._resolve_binding(value, t, analysis_data) # Try fast native effects first result = self._apply_native_effect(frame, effect_name, resolved_params) if result is not None: return result # Fall back to sexp interpreter for complex effects interp = self._get_interpreter() if effect_name in interp.effects: result, _ = interp.run_effect(effect_name, frame, resolved_params, {}) return result # Unknown effect - pass through return frame def _apply_native_effect( self, frame: np.ndarray, effect_name: str, params: Dict, ) -> Optional[np.ndarray]: """Fast native numpy effects for real-time streaming.""" import cv2 if effect_name == "zoom": amount = float(params.get("amount", 1.0)) if abs(amount - 1.0) < 0.01: return frame h, w = frame.shape[:2] # Crop center and resize new_w, new_h = int(w / amount), int(h / amount) x1, y1 = (w - new_w) // 2, (h - new_h) // 2 cropped = frame[y1:y1+new_h, x1:x1+new_w] return cv2.resize(cropped, (w, h)) elif effect_name == "rotate": angle = float(params.get("angle", 0)) if abs(angle) < 0.5: return frame h, w = frame.shape[:2] center = (w // 2, h // 2) matrix = cv2.getRotationMatrix2D(center, angle, 1.0) return cv2.warpAffine(frame, matrix, (w, h)) elif effect_name == "brightness": amount = float(params.get("amount", 1.0)) return np.clip(frame * amount, 0, 255).astype(np.uint8) elif effect_name == "invert": amount = float(params.get("amount", 1.0)) if amount < 0.5: return frame return 255 - frame # Not a native effect return None def process_frame( self, frames: List[np.ndarray], effects_per_frame: List[List[Dict]], compositor_config: Dict, t: float, analysis_data: Dict, ) -> np.ndarray: """ Process frames through effects and composite. """ if not frames: return np.zeros((720, 1280, 3), dtype=np.uint8) processed = [] # Apply effects to each input frame for i, (frame, effects) in enumerate(zip(frames, effects_per_frame)): result = frame.copy() for effect_config in effects: effect_name = effect_config.get("effect", "") if effect_name: result = self._apply_effect( result, effect_name, effect_config, t, analysis_data ) processed.append(result) # Composite layers if len(processed) == 1: return processed[0] return self._composite(processed, compositor_config, t, analysis_data) def _composite( self, frames: List[np.ndarray], config: Dict, t: float, analysis_data: Dict, ) -> np.ndarray: """Composite multiple frames into one.""" mode = config.get("mode", "alpha") weights = config.get("weights", [1.0 / len(frames)] * len(frames)) # Resolve weight bindings resolved_weights = [] for w in weights: resolved_weights.append(self._resolve_binding(w, t, analysis_data)) # Normalize weights total = sum(resolved_weights) if total > 0: resolved_weights = [w / total for w in resolved_weights] else: resolved_weights = [1.0 / len(frames)] * len(frames) # Resize frames to match first frame target_h, target_w = frames[0].shape[:2] resized = [] for frame in frames: if frame.shape[:2] != (target_h, target_w): import cv2 frame = cv2.resize(frame, (target_w, target_h)) resized.append(frame.astype(np.float32)) # Weighted blend result = np.zeros_like(resized[0]) for frame, weight in zip(resized, resolved_weights): result += frame * weight return np.clip(result, 0, 255).astype(np.uint8) class WGPUBackend(Backend): """ GPU-based effect processing using wgpu/WebGPU compute shaders. Compiles sexp effects to WGSL at load time, executes on GPU. Achieves 30+ fps real-time processing on supported hardware. Requirements: - wgpu-py library - Vulkan-capable GPU (or software renderer) """ def __init__(self, recipe_dir: Path = None): self.recipe_dir = recipe_dir or Path(".") self._device = None self._loaded_effects: Dict[str, Any] = {} # name -> compiled shader info self._numpy_fallback = NumpyBackend(recipe_dir) # Buffer pool for reuse - keyed by (width, height) self._buffer_pool: Dict[tuple, Dict] = {} def _ensure_device(self): """Lazy-initialize wgpu device.""" if self._device is not None: return try: import wgpu adapter = wgpu.gpu.request_adapter_sync(power_preference="high-performance") self._device = adapter.request_device_sync() print(f"[WGPUBackend] Using GPU: {adapter.info.get('device', 'unknown')}") except Exception as e: print(f"[WGPUBackend] GPU init failed: {e}, falling back to CPU") self._device = None def load_effect(self, effect_path: Path) -> Any: """Load and compile an effect from sexp file to WGSL.""" effect_key = str(effect_path) if effect_key in self._loaded_effects: return self._loaded_effects[effect_key] try: from sexp_effects.wgsl_compiler import compile_effect_file compiled = compile_effect_file(str(effect_path)) self._ensure_device() if self._device is None: # Fall back to numpy return self._numpy_fallback.load_effect(effect_path) # Create shader module import wgpu shader_module = self._device.create_shader_module(code=compiled.wgsl_code) # Create compute pipeline pipeline = self._device.create_compute_pipeline( layout="auto", compute={"module": shader_module, "entry_point": "main"} ) self._loaded_effects[effect_key] = { 'compiled': compiled, 'pipeline': pipeline, 'name': compiled.name, } return compiled.name except Exception as e: print(f"[WGPUBackend] Failed to compile {effect_path}: {e}") # Fall back to numpy for this effect return self._numpy_fallback.load_effect(effect_path) def _resolve_binding(self, value: Any, t: float, analysis_data: Dict) -> Any: """Resolve a parameter binding to its value at time t.""" # Delegate to numpy backend's implementation return self._numpy_fallback._resolve_binding(value, t, analysis_data) def _get_or_create_buffers(self, w: int, h: int): """Get or create reusable buffers for given dimensions.""" import wgpu key = (w, h) if key in self._buffer_pool: return self._buffer_pool[key] size = w * h * 4 # u32 per pixel # Create staging buffer for uploads (MAP_WRITE) staging_buffer = self._device.create_buffer( size=size, usage=wgpu.BufferUsage.MAP_WRITE | wgpu.BufferUsage.COPY_SRC, mapped_at_creation=False, ) # Create input buffer (STORAGE, receives data from staging) input_buffer = self._device.create_buffer( size=size, usage=wgpu.BufferUsage.STORAGE | wgpu.BufferUsage.COPY_DST, ) # Create output buffer (STORAGE + COPY_SRC for readback) output_buffer = self._device.create_buffer( size=size, usage=wgpu.BufferUsage.STORAGE | wgpu.BufferUsage.COPY_SRC, ) # Params buffer (uniform, 256 bytes should be enough) params_buffer = self._device.create_buffer( size=256, usage=wgpu.BufferUsage.UNIFORM | wgpu.BufferUsage.COPY_DST, ) self._buffer_pool[key] = { 'staging': staging_buffer, 'input': input_buffer, 'output': output_buffer, 'params': params_buffer, 'size': size, } return self._buffer_pool[key] def _apply_effect_gpu( self, frame: np.ndarray, effect_name: str, params: Dict, t: float, ) -> Optional[np.ndarray]: """Apply effect using GPU. Returns None if GPU not available.""" import wgpu # Find the loaded effect effect_info = None for key, info in self._loaded_effects.items(): if info.get('name') == effect_name: effect_info = info break if effect_info is None or self._device is None: return None compiled = effect_info['compiled'] pipeline = effect_info['pipeline'] h, w = frame.shape[:2] # Get reusable buffers buffers = self._get_or_create_buffers(w, h) # Pack frame as u32 array (RGB -> packed u32) r = frame[:, :, 0].astype(np.uint32) g = frame[:, :, 1].astype(np.uint32) b = frame[:, :, 2].astype(np.uint32) packed = (r << 16) | (g << 8) | b input_data = packed.flatten().astype(np.uint32) # Upload input data via queue.write_buffer (more efficient than recreation) self._device.queue.write_buffer(buffers['input'], 0, input_data.tobytes()) # Build params struct import struct param_values = [w, h] # width, height as u32 param_format = "II" # two u32 # Add time as f32 param_values.append(t) param_format += "f" # Add effect-specific params for param in compiled.params: val = params.get(param.name, param.default) if val is None: val = 0 if param.wgsl_type == 'f32': param_values.append(float(val)) param_format += "f" elif param.wgsl_type == 'i32': param_values.append(int(val)) param_format += "i" elif param.wgsl_type == 'u32': param_values.append(int(val)) param_format += "I" # Pad to 16-byte alignment param_bytes = struct.pack(param_format, *param_values) while len(param_bytes) % 16 != 0: param_bytes += b'\x00' self._device.queue.write_buffer(buffers['params'], 0, param_bytes) # Create bind group (unfortunately this can't be easily reused with different effects) bind_group = self._device.create_bind_group( layout=pipeline.get_bind_group_layout(0), entries=[ {"binding": 0, "resource": {"buffer": buffers['input']}}, {"binding": 1, "resource": {"buffer": buffers['output']}}, {"binding": 2, "resource": {"buffer": buffers['params']}}, ] ) # Dispatch compute encoder = self._device.create_command_encoder() compute_pass = encoder.begin_compute_pass() compute_pass.set_pipeline(pipeline) compute_pass.set_bind_group(0, bind_group) # Workgroups: ceil(w/16) x ceil(h/16) wg_x = (w + 15) // 16 wg_y = (h + 15) // 16 compute_pass.dispatch_workgroups(wg_x, wg_y, 1) compute_pass.end() self._device.queue.submit([encoder.finish()]) # Read back result result_data = self._device.queue.read_buffer(buffers['output']) result_packed = np.frombuffer(result_data, dtype=np.uint32).reshape(h, w) # Unpack u32 -> RGB result = np.zeros((h, w, 3), dtype=np.uint8) result[:, :, 0] = ((result_packed >> 16) & 0xFF).astype(np.uint8) result[:, :, 1] = ((result_packed >> 8) & 0xFF).astype(np.uint8) result[:, :, 2] = (result_packed & 0xFF).astype(np.uint8) return result def _apply_effect( self, frame: np.ndarray, effect_name: str, params: Dict, t: float, analysis_data: Dict, ) -> np.ndarray: """Apply a single effect to a frame.""" # Resolve bindings in params resolved_params = {"_time": t} for key, value in params.items(): if key in ("effect", "effect_path", "cid", "analysis_refs"): continue resolved_params[key] = self._resolve_binding(value, t, analysis_data) # Try GPU first self._ensure_device() if self._device is not None: result = self._apply_effect_gpu(frame, effect_name, resolved_params, t) if result is not None: return result # Fall back to numpy return self._numpy_fallback._apply_effect( frame, effect_name, params, t, analysis_data ) def process_frame( self, frames: List[np.ndarray], effects_per_frame: List[List[Dict]], compositor_config: Dict, t: float, analysis_data: Dict, ) -> np.ndarray: """Process frames through effects and composite.""" if not frames: return np.zeros((720, 1280, 3), dtype=np.uint8) processed = [] # Apply effects to each input frame for i, (frame, effects) in enumerate(zip(frames, effects_per_frame)): result = frame.copy() for effect_config in effects: effect_name = effect_config.get("effect", "") if effect_name: result = self._apply_effect( result, effect_name, effect_config, t, analysis_data ) processed.append(result) # Composite layers (use numpy backend for now) if len(processed) == 1: return processed[0] return self._numpy_fallback._composite( processed, compositor_config, t, analysis_data ) # Keep GLSLBackend as alias for backwards compatibility GLSLBackend = WGPUBackend def get_backend(name: str = "numpy", **kwargs) -> Backend: """ Get a backend by name. Args: name: "numpy", "wgpu", or "glsl" (alias for wgpu) **kwargs: Backend-specific options Returns: Backend instance """ if name == "numpy": return NumpyBackend(**kwargs) elif name in ("wgpu", "glsl", "gpu"): return WGPUBackend(**kwargs) else: raise ValueError(f"Unknown backend: {name}")