""" GPU-Accelerated Streaming Primitives Provides GPU-native video source and frame processing. Frames stay on GPU memory throughout the pipeline for maximum performance. Architecture: - GPUFrame: Wrapper that tracks whether data is on CPU or GPU - GPUVideoSource: Hardware-accelerated decode to GPU memory - GPU primitives operate directly on GPU frames using fast CUDA kernels - Transfer to CPU only at final output Requirements: - CuPy for CUDA support - FFmpeg with NVDEC support (for hardware decode) - NVIDIA GPU with CUDA capability """ import os import sys import json import subprocess import numpy as np from pathlib import Path from typing import Optional, Tuple, Union # Try to import CuPy try: import cupy as cp GPU_AVAILABLE = True except ImportError: cp = None GPU_AVAILABLE = False # Try to import fast CUDA kernels from JIT compiler _FAST_KERNELS_AVAILABLE = False try: if GPU_AVAILABLE: from streaming.jit_compiler import ( fast_rotate, fast_zoom, fast_blend, fast_hue_shift, fast_invert, fast_ripple, get_fast_ops ) _FAST_KERNELS_AVAILABLE = True print("[streaming_gpu] Fast CUDA kernels loaded", file=sys.stderr) except ImportError as e: print(f"[streaming_gpu] Fast kernels not available: {e}", file=sys.stderr) # Check for hardware decode support _HWDEC_AVAILABLE: Optional[bool] = None _DECORD_GPU_AVAILABLE: Optional[bool] = None def check_hwdec_available() -> bool: """Check if NVIDIA hardware decode is available.""" global _HWDEC_AVAILABLE if _HWDEC_AVAILABLE is not None: return _HWDEC_AVAILABLE try: # Check for nvidia-smi (GPU present) result = subprocess.run(["nvidia-smi"], capture_output=True, timeout=2) if result.returncode != 0: _HWDEC_AVAILABLE = False return False # Check for nvdec in ffmpeg result = subprocess.run( ["ffmpeg", "-hwaccels"], capture_output=True, text=True, timeout=5 ) _HWDEC_AVAILABLE = "cuda" in result.stdout except Exception: _HWDEC_AVAILABLE = False return _HWDEC_AVAILABLE def check_decord_gpu_available() -> bool: """Check if decord with CUDA GPU decode is available.""" global _DECORD_GPU_AVAILABLE if _DECORD_GPU_AVAILABLE is not None: return _DECORD_GPU_AVAILABLE try: import decord from decord import gpu # Try to create a GPU context to verify CUDA support ctx = gpu(0) _DECORD_GPU_AVAILABLE = True print("[streaming_gpu] decord GPU (CUDA) decode available", file=sys.stderr) except Exception as e: _DECORD_GPU_AVAILABLE = False print(f"[streaming_gpu] decord GPU not available: {e}", file=sys.stderr) return _DECORD_GPU_AVAILABLE class GPUFrame: """ Frame container that tracks data location (CPU/GPU). Enables zero-copy operations when data is already on the right device. Lazy transfer - only moves data when actually needed. """ def __init__(self, data: Union[np.ndarray, 'cp.ndarray'], on_gpu: bool = None): self._cpu_data: Optional[np.ndarray] = None self._gpu_data = None # Optional[cp.ndarray] if on_gpu is None: # Auto-detect based on type if GPU_AVAILABLE and isinstance(data, cp.ndarray): self._gpu_data = data else: self._cpu_data = np.asarray(data) elif on_gpu and GPU_AVAILABLE: self._gpu_data = cp.asarray(data) if not isinstance(data, cp.ndarray) else data else: self._cpu_data = np.asarray(data) if isinstance(data, np.ndarray) else cp.asnumpy(data) @property def cpu(self) -> np.ndarray: """Get frame as numpy array (transfers from GPU if needed).""" if self._cpu_data is None: if self._gpu_data is not None and GPU_AVAILABLE: self._cpu_data = cp.asnumpy(self._gpu_data) else: raise ValueError("No frame data available") return self._cpu_data @property def gpu(self): """Get frame as CuPy array (transfers to GPU if needed).""" if not GPU_AVAILABLE: raise RuntimeError("GPU not available") if self._gpu_data is None: if self._cpu_data is not None: self._gpu_data = cp.asarray(self._cpu_data) else: raise ValueError("No frame data available") return self._gpu_data @property def is_on_gpu(self) -> bool: """Check if data is currently on GPU.""" return self._gpu_data is not None @property def shape(self) -> Tuple[int, ...]: """Get frame shape.""" if self._gpu_data is not None: return self._gpu_data.shape return self._cpu_data.shape @property def dtype(self): """Get frame dtype.""" if self._gpu_data is not None: return self._gpu_data.dtype return self._cpu_data.dtype def numpy(self) -> np.ndarray: """Alias for cpu property.""" return self.cpu def cupy(self): """Alias for gpu property.""" return self.gpu def free_cpu(self): """Free CPU memory (keep GPU only).""" if self._gpu_data is not None: self._cpu_data = None def free_gpu(self): """Free GPU memory (keep CPU only).""" if self._cpu_data is not None: self._gpu_data = None class GPUVideoSource: """ GPU-accelerated video source using hardware decode. Uses decord with CUDA GPU context for true NVDEC decode - frames decode directly to GPU memory via CUDA. Falls back to FFmpeg pipe if decord GPU unavailable (slower due to CPU copy). """ def __init__(self, path: str, fps: float = 30, prefer_gpu: bool = True): self.path = Path(path) self.fps = fps self.prefer_gpu = prefer_gpu and GPU_AVAILABLE self._use_decord_gpu = self.prefer_gpu and check_decord_gpu_available() self._frame_size: Optional[Tuple[int, int]] = None self._duration: Optional[float] = None self._video_fps: float = 30.0 self._total_frames: int = 0 self._frame_time = 1.0 / fps self._last_read_time = -1 self._cached_frame: Optional[GPUFrame] = None # Decord VideoReader with GPU context self._vr = None self._decord_ctx = None # FFmpeg fallback state self._proc = None self._stream_time = 0.0 # Initialize video source self._init_video() mode = "decord-GPU" if self._use_decord_gpu else ("ffmpeg-hwaccel" if check_hwdec_available() else "ffmpeg-CPU") print(f"[GPUVideoSource] {self.path.name}: {self._frame_size}, " f"duration={self._duration:.1f}s, mode={mode}", file=sys.stderr) def _init_video(self): """Initialize video reader (decord GPU or probe for ffmpeg).""" if self._use_decord_gpu: try: from decord import VideoReader, gpu # Use GPU context for NVDEC hardware decode self._decord_ctx = gpu(0) self._vr = VideoReader(str(self.path), ctx=self._decord_ctx, num_threads=1) self._total_frames = len(self._vr) self._video_fps = self._vr.get_avg_fps() self._duration = self._total_frames / self._video_fps # Get frame size from first frame first_frame = self._vr[0] self._frame_size = (first_frame.shape[1], first_frame.shape[0]) print(f"[GPUVideoSource] decord GPU initialized: {self._frame_size}, " f"{self._total_frames} frames @ {self._video_fps:.1f}fps", file=sys.stderr) return except Exception as e: print(f"[GPUVideoSource] decord GPU init failed, falling back to ffmpeg: {e}", file=sys.stderr) self._use_decord_gpu = False self._vr = None self._decord_ctx = None # FFmpeg fallback - probe video for metadata self._probe_video() def _probe_video(self): """Probe video file for metadata (FFmpeg fallback).""" cmd = ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", "-show_format", str(self.path)] result = subprocess.run(cmd, capture_output=True, text=True) info = json.loads(result.stdout) for stream in info.get("streams", []): if stream.get("codec_type") == "video": self._frame_size = (stream.get("width", 720), stream.get("height", 720)) if "duration" in stream: self._duration = float(stream["duration"]) elif "tags" in stream and "DURATION" in stream["tags"]: dur_str = stream["tags"]["DURATION"] parts = dur_str.split(":") if len(parts) == 3: h, m, s = parts self._duration = int(h) * 3600 + int(m) * 60 + float(s) # Get fps if "r_frame_rate" in stream: fps_str = stream["r_frame_rate"] if "/" in fps_str: num, den = fps_str.split("/") self._video_fps = float(num) / float(den) break if self._duration is None and "format" in info: if "duration" in info["format"]: self._duration = float(info["format"]["duration"]) if not self._frame_size: self._frame_size = (720, 720) if not self._duration: self._duration = 60.0 self._total_frames = int(self._duration * self._video_fps) def _start_stream(self, seek_time: float = 0): """Start ffmpeg decode process (fallback mode).""" if self._proc: self._proc.kill() self._proc = None if not self.path.exists(): raise FileNotFoundError(f"Video file not found: {self.path}") w, h = self._frame_size # Build ffmpeg command cmd = ["ffmpeg", "-v", "error"] # Hardware decode if available if check_hwdec_available(): cmd.extend(["-hwaccel", "cuda"]) cmd.extend([ "-ss", f"{seek_time:.3f}", "-i", str(self.path), "-f", "rawvideo", "-pix_fmt", "rgb24", "-s", f"{w}x{h}", "-r", str(self.fps), "-" ]) self._proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) self._stream_time = seek_time def _read_frame_raw(self) -> Optional[np.ndarray]: """Read one frame from ffmpeg pipe (fallback mode).""" w, h = self._frame_size frame_size = w * h * 3 if not self._proc or self._proc.poll() is not None: return None data = self._proc.stdout.read(frame_size) if len(data) < frame_size: return None return np.frombuffer(data, dtype=np.uint8).reshape((h, w, 3)).copy() def _read_frame_decord_gpu(self, frame_idx: int) -> Optional[GPUFrame]: """Read frame using decord with GPU context (NVDEC, zero-copy to CuPy).""" if self._vr is None: return None try: # Handle looping frame_idx = frame_idx % max(1, self._total_frames) # Decode frame - with GPU context, this uses NVDEC frame_tensor = self._vr[frame_idx] # Convert to CuPy via DLPack (zero-copy GPU transfer) if GPU_AVAILABLE: # decord tensors have .to_dlpack() which returns a PyCapsule # that CuPy can consume for zero-copy GPU transfer try: dlpack_capsule = frame_tensor.to_dlpack() gpu_frame = cp.from_dlpack(dlpack_capsule) # Log success once per source if not getattr(self, '_dlpack_logged', False): print(f"[GPUVideoSource] DLPack zero-copy SUCCESS - frames stay on GPU", file=sys.stderr) self._dlpack_logged = True return GPUFrame(gpu_frame, on_gpu=True) except Exception as dlpack_err: # Fallback: convert via numpy (involves CPU copy) if not getattr(self, '_dlpack_fail_logged', False): print(f"[GPUVideoSource] DLPack FAILED ({dlpack_err}), using CPU copy fallback", file=sys.stderr) self._dlpack_fail_logged = True frame_np = frame_tensor.asnumpy() return GPUFrame(frame_np, on_gpu=True) else: return GPUFrame(frame_tensor.asnumpy(), on_gpu=False) except Exception as e: print(f"[GPUVideoSource] decord GPU read error at frame {frame_idx}: {e}", file=sys.stderr) return None def read_at(self, t: float) -> Optional[GPUFrame]: """ Read frame at specific time. Returns GPUFrame with data on GPU if GPU mode enabled. """ # Cache check if t == self._last_read_time and self._cached_frame is not None: return self._cached_frame # Loop time for shorter videos seek_time = t if self._duration and self._duration > 0: seek_time = t % self._duration if seek_time > self._duration - 0.1: seek_time = 0.0 self._last_read_time = t # Use decord GPU if available (NVDEC decode, zero-copy via DLPack) if self._use_decord_gpu: frame_idx = int(seek_time * self._video_fps) self._cached_frame = self._read_frame_decord_gpu(frame_idx) if self._cached_frame is not None: # Free CPU copy if on GPU (saves memory) if self.prefer_gpu and self._cached_frame.is_on_gpu: self._cached_frame.free_cpu() return self._cached_frame # FFmpeg fallback need_seek = ( self._proc is None or self._proc.poll() is not None or seek_time < self._stream_time - self._frame_time or seek_time > self._stream_time + 2.0 ) if need_seek: self._start_stream(seek_time) # Skip frames to reach target while self._stream_time + self._frame_time <= seek_time: frame = self._read_frame_raw() if frame is None: self._start_stream(seek_time) break self._stream_time += self._frame_time # Read target frame frame_np = self._read_frame_raw() if frame_np is None: return self._cached_frame self._stream_time += self._frame_time # Create GPUFrame - transfer to GPU if in GPU mode self._cached_frame = GPUFrame(frame_np, on_gpu=self.prefer_gpu) # Free CPU copy if on GPU (saves memory) if self.prefer_gpu and self._cached_frame.is_on_gpu: self._cached_frame.free_cpu() return self._cached_frame def read(self) -> Optional[GPUFrame]: """Read current frame.""" if self._cached_frame is not None: return self._cached_frame return self.read_at(0) @property def size(self) -> Tuple[int, int]: return self._frame_size @property def duration(self) -> float: return self._duration def close(self): """Close the video source.""" if self._proc: self._proc.kill() self._proc = None # Release decord resources self._vr = None self._decord_ctx = None # GPU-aware primitive functions def gpu_blend(frame_a: GPUFrame, frame_b: GPUFrame, alpha: float = 0.5) -> GPUFrame: """ Blend two frames on GPU using fast CUDA kernel. Both frames stay on GPU throughout - no CPU transfer. """ if not GPU_AVAILABLE: a = frame_a.cpu.astype(np.float32) b = frame_b.cpu.astype(np.float32) result = (a * alpha + b * (1 - alpha)).astype(np.uint8) return GPUFrame(result, on_gpu=False) # Use fast CUDA kernel if _FAST_KERNELS_AVAILABLE: a_gpu = frame_a.gpu b_gpu = frame_b.gpu if a_gpu.dtype != cp.uint8: a_gpu = cp.clip(a_gpu, 0, 255).astype(cp.uint8) if b_gpu.dtype != cp.uint8: b_gpu = cp.clip(b_gpu, 0, 255).astype(cp.uint8) result = fast_blend(a_gpu, b_gpu, alpha) return GPUFrame(result, on_gpu=True) # Fallback a = frame_a.gpu.astype(cp.float32) b = frame_b.gpu.astype(cp.float32) result = (a * alpha + b * (1 - alpha)).astype(cp.uint8) return GPUFrame(result, on_gpu=True) def gpu_resize(frame: GPUFrame, size: Tuple[int, int]) -> GPUFrame: """Resize frame on GPU using fast CUDA zoom kernel.""" import cv2 if not GPU_AVAILABLE or not frame.is_on_gpu: resized = cv2.resize(frame.cpu, size) return GPUFrame(resized, on_gpu=False) gpu_data = frame.gpu h, w = gpu_data.shape[:2] target_w, target_h = size # Use fast zoom kernel if same aspect ratio (pure zoom) if _FAST_KERNELS_AVAILABLE and target_w == target_h == w == h: # For uniform zoom we can use the zoom kernel pass # Fall through to scipy for now - full resize needs different approach # CuPy doesn't have built-in resize, use scipy zoom from cupyx.scipy import ndimage as cpndimage zoom_y = target_h / h zoom_x = target_w / w if gpu_data.ndim == 3: resized = cpndimage.zoom(gpu_data, (zoom_y, zoom_x, 1), order=1) else: resized = cpndimage.zoom(gpu_data, (zoom_y, zoom_x), order=1) return GPUFrame(resized, on_gpu=True) def gpu_zoom(frame: GPUFrame, factor: float, cx: float = None, cy: float = None) -> GPUFrame: """Zoom frame on GPU using fast CUDA kernel.""" if not GPU_AVAILABLE or not frame.is_on_gpu: import cv2 h, w = frame.cpu.shape[:2] if cx is None: cx = w / 2 if cy is None: cy = h / 2 M = cv2.getRotationMatrix2D((cx, cy), 0, factor) zoomed = cv2.warpAffine(frame.cpu, M, (w, h)) return GPUFrame(zoomed, on_gpu=False) if _FAST_KERNELS_AVAILABLE: zoomed = fast_zoom(frame.gpu, factor, cx=cx, cy=cy) return GPUFrame(zoomed, on_gpu=True) # Fallback - basic zoom via slice and resize return frame def gpu_hue_shift(frame: GPUFrame, degrees: float) -> GPUFrame: """Shift hue on GPU using fast CUDA kernel.""" if not GPU_AVAILABLE or not frame.is_on_gpu: import cv2 hsv = cv2.cvtColor(frame.cpu, cv2.COLOR_RGB2HSV) hsv[:, :, 0] = (hsv[:, :, 0].astype(np.float32) + degrees / 2) % 180 result = cv2.cvtColor(hsv, cv2.COLOR_HSV2RGB) return GPUFrame(result, on_gpu=False) if _FAST_KERNELS_AVAILABLE: gpu_data = frame.gpu if gpu_data.dtype != cp.uint8: gpu_data = cp.clip(gpu_data, 0, 255).astype(cp.uint8) shifted = fast_hue_shift(gpu_data, degrees) return GPUFrame(shifted, on_gpu=True) # Fallback - no GPU hue shift without fast kernels return frame def gpu_invert(frame: GPUFrame) -> GPUFrame: """Invert colors on GPU using fast CUDA kernel.""" if not GPU_AVAILABLE or not frame.is_on_gpu: result = 255 - frame.cpu return GPUFrame(result, on_gpu=False) if _FAST_KERNELS_AVAILABLE: gpu_data = frame.gpu if gpu_data.dtype != cp.uint8: gpu_data = cp.clip(gpu_data, 0, 255).astype(cp.uint8) inverted = fast_invert(gpu_data) return GPUFrame(inverted, on_gpu=True) # Fallback - basic CuPy invert result = 255 - frame.gpu return GPUFrame(result, on_gpu=True) def gpu_ripple(frame: GPUFrame, amplitude: float, frequency: float = 8, decay: float = 2, phase: float = 0, cx: float = None, cy: float = None) -> GPUFrame: """Apply ripple effect on GPU using fast CUDA kernel.""" if not GPU_AVAILABLE or not frame.is_on_gpu: return frame # No CPU fallback for ripple if _FAST_KERNELS_AVAILABLE: gpu_data = frame.gpu if gpu_data.dtype != cp.uint8: gpu_data = cp.clip(gpu_data, 0, 255).astype(cp.uint8) h, w = gpu_data.shape[:2] rippled = fast_ripple( gpu_data, amplitude, center_x=cx if cx else w/2, center_y=cy if cy else h/2, frequency=frequency, decay=decay, speed=1.0, t=phase ) return GPUFrame(rippled, on_gpu=True) return frame def gpu_contrast(frame: GPUFrame, factor: float) -> GPUFrame: """Adjust contrast on GPU using fast CUDA kernel.""" if not GPU_AVAILABLE or not frame.is_on_gpu: result = np.clip((frame.cpu.astype(np.float32) - 128) * factor + 128, 0, 255).astype(np.uint8) return GPUFrame(result, on_gpu=False) if _FAST_KERNELS_AVAILABLE: gpu_data = frame.gpu if gpu_data.dtype != cp.uint8: gpu_data = cp.clip(gpu_data, 0, 255).astype(cp.uint8) h, w = gpu_data.shape[:2] ops = get_fast_ops(w, h) ops.set_input(gpu_data) ops.contrast(factor) return GPUFrame(ops.get_output().copy(), on_gpu=True) # Fallback result = cp.clip((frame.gpu.astype(cp.float32) - 128) * factor + 128, 0, 255).astype(cp.uint8) return GPUFrame(result, on_gpu=True) def gpu_rotate(frame: GPUFrame, angle: float) -> GPUFrame: """Rotate frame on GPU using fast CUDA kernel.""" if not GPU_AVAILABLE or not frame.is_on_gpu: import cv2 h, w = frame.cpu.shape[:2] center = (w // 2, h // 2) M = cv2.getRotationMatrix2D(center, angle, 1.0) rotated = cv2.warpAffine(frame.cpu, M, (w, h)) return GPUFrame(rotated, on_gpu=False) # Use fast CUDA kernel (< 1ms vs 20ms for scipy) if _FAST_KERNELS_AVAILABLE: rotated = fast_rotate(frame.gpu, angle) return GPUFrame(rotated, on_gpu=True) # Fallback to scipy (slow) from cupyx.scipy import ndimage as cpndimage rotated = cpndimage.rotate(frame.gpu, angle, reshape=False, order=1) return GPUFrame(rotated, on_gpu=True) def gpu_brightness(frame: GPUFrame, factor: float) -> GPUFrame: """Adjust brightness on GPU using fast CUDA kernel.""" if not GPU_AVAILABLE or not frame.is_on_gpu: result = np.clip(frame.cpu.astype(np.float32) * factor, 0, 255).astype(np.uint8) return GPUFrame(result, on_gpu=False) # Use fast CUDA kernel if _FAST_KERNELS_AVAILABLE: gpu_data = frame.gpu if gpu_data.dtype != cp.uint8: gpu_data = cp.clip(gpu_data, 0, 255).astype(cp.uint8) h, w = gpu_data.shape[:2] ops = get_fast_ops(w, h) ops.set_input(gpu_data) ops.brightness(factor) return GPUFrame(ops.get_output().copy(), on_gpu=True) # Fallback result = cp.clip(frame.gpu.astype(cp.float32) * factor, 0, 255).astype(cp.uint8) return GPUFrame(result, on_gpu=True) def gpu_composite(frames: list, weights: list = None) -> GPUFrame: """ Composite multiple frames with weights. All frames processed on GPU for efficiency. """ if not frames: raise ValueError("No frames to composite") if len(frames) == 1: return frames[0] if weights is None: weights = [1.0 / len(frames)] * len(frames) # Normalize weights total = sum(weights) if total > 0: weights = [w / total for w in weights] use_gpu = GPU_AVAILABLE and any(f.is_on_gpu for f in frames) if use_gpu: # All on GPU target_shape = frames[0].gpu.shape result = cp.zeros(target_shape, dtype=cp.float32) for frame, weight in zip(frames, weights): gpu_data = frame.gpu.astype(cp.float32) if gpu_data.shape != target_shape: # Resize to match from cupyx.scipy import ndimage as cpndimage h, w = target_shape[:2] fh, fw = gpu_data.shape[:2] zoom_factors = (h/fh, w/fw, 1) if gpu_data.ndim == 3 else (h/fh, w/fw) gpu_data = cpndimage.zoom(gpu_data, zoom_factors, order=1) result += gpu_data * weight return GPUFrame(cp.clip(result, 0, 255).astype(cp.uint8), on_gpu=True) else: # All on CPU import cv2 target_shape = frames[0].cpu.shape result = np.zeros(target_shape, dtype=np.float32) for frame, weight in zip(frames, weights): cpu_data = frame.cpu.astype(np.float32) if cpu_data.shape != target_shape: cpu_data = cv2.resize(cpu_data, (target_shape[1], target_shape[0])) result += cpu_data * weight return GPUFrame(np.clip(result, 0, 255).astype(np.uint8), on_gpu=False) # Primitive registration for streaming interpreter def _to_gpu_frame(img): """Convert any image type to GPUFrame, keeping data on GPU if possible.""" if isinstance(img, GPUFrame): return img # Check for CuPy array (stays on GPU) if GPU_AVAILABLE and hasattr(img, '__cuda_array_interface__'): # Already a CuPy array - wrap directly return GPUFrame(img, on_gpu=True) # Numpy or other - will be uploaded to GPU return GPUFrame(img, on_gpu=True) def get_primitives(): """ Get GPU-aware primitives for registration with interpreter. These wrap the GPU functions to work with the sexp interpreter. All use fast CUDA kernels when available for maximum performance. Primitives detect CuPy arrays and keep them on GPU (no CPU round-trips). """ def prim_make_video_source_gpu(path: str, fps: float = 30): """Create GPU-accelerated video source.""" return GPUVideoSource(path, fps, prefer_gpu=True) def prim_gpu_blend(a, b, alpha=0.5): """Blend two frames using fast CUDA kernel.""" fa = _to_gpu_frame(a) fb = _to_gpu_frame(b) result = gpu_blend(fa, fb, alpha) return result.gpu if result.is_on_gpu else result.cpu def prim_gpu_rotate(img, angle): """Rotate image using fast CUDA kernel (< 1ms).""" f = _to_gpu_frame(img) result = gpu_rotate(f, angle) return result.gpu if result.is_on_gpu else result.cpu def prim_gpu_brightness(img, factor): """Adjust brightness using fast CUDA kernel.""" f = _to_gpu_frame(img) result = gpu_brightness(f, factor) return result.gpu if result.is_on_gpu else result.cpu def prim_gpu_contrast(img, factor): """Adjust contrast using fast CUDA kernel.""" f = _to_gpu_frame(img) result = gpu_contrast(f, factor) return result.gpu if result.is_on_gpu else result.cpu def prim_gpu_zoom(img, factor, cx=None, cy=None): """Zoom image using fast CUDA kernel.""" f = _to_gpu_frame(img) result = gpu_zoom(f, factor, cx, cy) return result.gpu if result.is_on_gpu else result.cpu def prim_gpu_hue_shift(img, degrees): """Shift hue using fast CUDA kernel.""" f = _to_gpu_frame(img) result = gpu_hue_shift(f, degrees) return result.gpu if result.is_on_gpu else result.cpu def prim_gpu_invert(img): """Invert colors using fast CUDA kernel.""" f = _to_gpu_frame(img) result = gpu_invert(f) return result.gpu if result.is_on_gpu else result.cpu def prim_gpu_ripple(img, amplitude, frequency=8, decay=2, phase=0, cx=None, cy=None): """Apply ripple effect using fast CUDA kernel.""" f = _to_gpu_frame(img) result = gpu_ripple(f, amplitude, frequency, decay, phase, cx, cy) return result.gpu if result.is_on_gpu else result.cpu return { 'streaming-gpu:make-video-source': prim_make_video_source_gpu, 'gpu:blend': prim_gpu_blend, 'gpu:rotate': prim_gpu_rotate, 'gpu:brightness': prim_gpu_brightness, 'gpu:contrast': prim_gpu_contrast, 'gpu:zoom': prim_gpu_zoom, 'gpu:hue-shift': prim_gpu_hue_shift, 'gpu:invert': prim_gpu_invert, 'gpu:ripple': prim_gpu_ripple, } # Export __all__ = [ 'GPU_AVAILABLE', 'GPUFrame', 'GPUVideoSource', 'gpu_blend', 'gpu_resize', 'gpu_rotate', 'gpu_brightness', 'gpu_contrast', 'gpu_zoom', 'gpu_hue_shift', 'gpu_invert', 'gpu_ripple', 'gpu_composite', 'get_primitives', 'check_hwdec_available', 'PRIMITIVES', ] # Import CPU primitives from streaming.py and include them in PRIMITIVES # This ensures audio analysis primitives are available when streaming_gpu is loaded def _get_cpu_primitives(): from sexp_effects.primitive_libs import streaming return streaming.PRIMITIVES PRIMITIVES = _get_cpu_primitives().copy() # Try to import fused kernel compiler _FUSED_KERNELS_AVAILABLE = False _compile_frame_pipeline = None _compile_autonomous_pipeline = None try: if GPU_AVAILABLE: from streaming.sexp_to_cuda import compile_frame_pipeline as _compile_frame_pipeline from streaming.sexp_to_cuda import compile_autonomous_pipeline as _compile_autonomous_pipeline _FUSED_KERNELS_AVAILABLE = True print("[streaming_gpu] Fused CUDA kernel compiler loaded", file=sys.stderr) except ImportError as e: print(f"[streaming_gpu] Fused kernels not available: {e}", file=sys.stderr) # Fused pipeline cache _FUSED_PIPELINE_CACHE = {} def _normalize_effect_dict(effect): """Convert effect dict with Keyword keys to string keys.""" result = {} for k, v in effect.items(): # Handle Keyword objects from sexp parser if hasattr(k, 'name'): # Keyword object key = k.name else: key = str(k) result[key] = v return result _FUSED_CALL_COUNT = 0 def prim_fused_pipeline(img, effects_list, **dynamic_params): """ Apply a fused CUDA kernel pipeline to an image. This compiles multiple effects into a single CUDA kernel that processes the entire pipeline in one GPU pass, eliminating Python interpreter overhead. Args: img: Input image (GPU array or numpy array) effects_list: List of effect dicts like: [{'op': 'rotate', 'angle': 45.0}, {'op': 'hue_shift', 'degrees': 90.0}, {'op': 'ripple', 'amplitude': 10, ...}] **dynamic_params: Parameters that change per-frame like: rotate_angle=45, ripple_phase=0.5 Returns: Processed image as GPU array Supported ops: rotate, zoom, ripple, invert, hue_shift, brightness, resize """ global _FUSED_CALL_COUNT _FUSED_CALL_COUNT += 1 if _FUSED_CALL_COUNT <= 5 or _FUSED_CALL_COUNT % 100 == 0: print(f"[FUSED] call #{_FUSED_CALL_COUNT}, effects={len(effects_list)}, params={list(dynamic_params.keys())}", file=sys.stderr) # Normalize effects list - convert Keyword keys to strings effects_list = [_normalize_effect_dict(e) for e in effects_list] # Handle resize separately - it changes dimensions so must happen before fused kernel resize_ops = [e for e in effects_list if e.get('op') == 'resize'] other_effects = [e for e in effects_list if e.get('op') != 'resize'] # Apply resize first if needed if resize_ops: for resize_op in resize_ops: target_w = int(resize_op.get('width', 640)) target_h = int(resize_op.get('height', 360)) # Wrap in GPUFrame if needed if isinstance(img, GPUFrame): img = gpu_resize(img, (target_w, target_h)) img = img.gpu if img.is_on_gpu else img.cpu else: frame = GPUFrame(img, on_gpu=hasattr(img, '__cuda_array_interface__')) img = gpu_resize(frame, (target_w, target_h)) img = img.gpu if img.is_on_gpu else img.cpu # If no other effects, just return the resized image if not other_effects: return img # Update effects list to exclude resize ops effects_list = other_effects if not _FUSED_KERNELS_AVAILABLE: # Fallback: apply effects one by one print(f"[FUSED FALLBACK] Using fallback path for {len(effects_list)} effects", file=sys.stderr) # Wrap in GPUFrame if needed (GPU functions expect GPUFrame objects) if isinstance(img, GPUFrame): result = img else: on_gpu = hasattr(img, '__cuda_array_interface__') result = GPUFrame(img, on_gpu=on_gpu) for effect in effects_list: op = effect['op'] if op == 'rotate': angle = dynamic_params.get('rotate_angle', effect.get('angle', 0)) result = gpu_rotate(result, angle) elif op == 'zoom': amount = dynamic_params.get('zoom_amount', effect.get('amount', 1.0)) result = gpu_zoom(result, amount) elif op == 'hue_shift': degrees = effect.get('degrees', 0) if abs(degrees) > 0.1: # Only apply if significant shift result = gpu_hue_shift(result, degrees) elif op == 'ripple': amplitude = dynamic_params.get('ripple_amplitude', effect.get('amplitude', 10)) if amplitude > 0.1: # Only apply if amplitude is significant result = gpu_ripple(result, amplitude=amplitude, frequency=effect.get('frequency', 8), decay=effect.get('decay', 2), phase=dynamic_params.get('ripple_phase', effect.get('phase', 0)), cx=effect.get('center_x'), cy=effect.get('center_y')) elif op == 'brightness': factor = effect.get('factor', 1.0) result = gpu_contrast(result, factor, 0) elif op == 'invert': amount = effect.get('amount', 0) if amount > 0.5: # Only invert if amount > 0.5 result = gpu_invert(result) else: raise ValueError(f"Unsupported fused pipeline operation: '{op}'. Supported ops: rotate, zoom, hue_shift, ripple, brightness, invert, resize") # Return raw array, not GPUFrame (downstream expects arrays with .flags attribute) if isinstance(result, GPUFrame): return result.gpu if result.is_on_gpu else result.cpu return result # Get image dimensions if hasattr(img, 'shape'): h, w = img.shape[:2] else: raise ValueError("Image must have shape attribute") # Create cache key from effects import hashlib ops_key = str([(e['op'], {k:v for k,v in e.items() if k != 'src2'}) for e in effects_list]) cache_key = f"{w}x{h}_{hashlib.md5(ops_key.encode()).hexdigest()}" # Compile or get cached pipeline if cache_key not in _FUSED_PIPELINE_CACHE: _FUSED_PIPELINE_CACHE[cache_key] = _compile_frame_pipeline(effects_list, w, h) pipeline = _FUSED_PIPELINE_CACHE[cache_key] # Ensure image is on GPU and uint8 if hasattr(img, '__cuda_array_interface__'): gpu_img = img elif GPU_AVAILABLE: gpu_img = cp.asarray(img) else: gpu_img = img # Run the fused pipeline # Debug: log dynamic params occasionally import random if random.random() < 0.01: # 1% of frames print(f"[fused] dynamic_params: {dynamic_params}", file=sys.stderr) print(f"[fused] effects: {[(e['op'], e.get('amount'), e.get('amplitude')) for e in effects_list]}", file=sys.stderr) return pipeline(gpu_img, **dynamic_params) # Autonomous pipeline cache (separate from fused) _AUTONOMOUS_PIPELINE_CACHE = {} def prim_autonomous_pipeline(img, effects_list, dynamic_expressions, frame_num, fps=30.0): """ Apply a fully autonomous CUDA kernel pipeline. This computes ALL parameters on GPU - including time-based expressions like sin(t), t*30, etc. Zero Python in the hot path! Args: img: Input image (GPU array or numpy array) effects_list: List of effect dicts dynamic_expressions: Dict mapping param names to CUDA expressions: {'rotate_angle': 't * 30.0f', 'ripple_phase': 't * 2.0f', 'brightness_factor': '0.8f + 0.4f * sinf(t * 2.0f)'} frame_num: Current frame number fps: Frames per second (default 30) Returns: Processed image as GPU array Note: Expressions use CUDA syntax - use sinf() not sin(), etc. """ # Normalize effects and expressions effects_list = [_normalize_effect_dict(e) for e in effects_list] dynamic_expressions = { (k.name if hasattr(k, 'name') else str(k)): v for k, v in dynamic_expressions.items() } if not _FUSED_KERNELS_AVAILABLE or _compile_autonomous_pipeline is None: # Fallback to regular fused pipeline with Python-computed params import math t = float(frame_num) / float(fps) # Evaluate expressions in Python as fallback dynamic_params = {} for key, expr in dynamic_expressions.items(): try: # Simple eval with t and math functions result = eval(expr.replace('f', '').replace('sin', 'math.sin').replace('cos', 'math.cos'), {'t': t, 'math': math, 'frame_num': frame_num}) dynamic_params[key] = result except: dynamic_params[key] = 0 return prim_fused_pipeline(img, effects_list, **dynamic_params) # Get image dimensions if hasattr(img, 'shape'): h, w = img.shape[:2] else: raise ValueError("Image must have shape attribute") # Create cache key import hashlib ops_key = str([(e['op'], {k:v for k,v in e.items() if k != 'src2'}) for e in effects_list]) expr_key = str(sorted(dynamic_expressions.items())) cache_key = f"auto_{w}x{h}_{hashlib.md5((ops_key + expr_key).encode()).hexdigest()}" # Compile or get cached pipeline if cache_key not in _AUTONOMOUS_PIPELINE_CACHE: _AUTONOMOUS_PIPELINE_CACHE[cache_key] = _compile_autonomous_pipeline( effects_list, w, h, dynamic_expressions) pipeline = _AUTONOMOUS_PIPELINE_CACHE[cache_key] # Ensure image is on GPU if hasattr(img, '__cuda_array_interface__'): gpu_img = img elif GPU_AVAILABLE: gpu_img = cp.asarray(img) else: gpu_img = img # Run - just pass frame_num and fps, kernel does the rest! return pipeline(gpu_img, int(frame_num), float(fps)) # ============================================================ # GPU Image Primitives (keep images on GPU) # ============================================================ def gpu_make_image(w, h, color=None): """Create a new image on GPU filled with color (default black). Unlike image:make-image, this keeps the image on GPU memory, avoiding CPU<->GPU transfers in the pipeline. """ if not GPU_AVAILABLE: # Fallback to CPU import numpy as np if color is None: color = [0, 0, 0] img = np.zeros((int(h), int(w), 3), dtype=np.uint8) img[:] = color return img w, h = int(w), int(h) if color is None: color = [0, 0, 0] # Create on GPU directly img = cp.zeros((h, w, 3), dtype=cp.uint8) img[:, :, 0] = int(color[0]) if len(color) > 0 else 0 img[:, :, 1] = int(color[1]) if len(color) > 1 else 0 img[:, :, 2] = int(color[2]) if len(color) > 2 else 0 return img def gpu_gradient_image(w, h, color1=None, color2=None, direction='horizontal'): """Create a gradient image on GPU. Args: w, h: Dimensions color1, color2: Start and end colors [r, g, b] direction: 'horizontal', 'vertical', 'diagonal' """ if not GPU_AVAILABLE: return gpu_make_image(w, h, color1) w, h = int(w), int(h) if color1 is None: color1 = [0, 0, 0] if color2 is None: color2 = [255, 255, 255] img = cp.zeros((h, w, 3), dtype=cp.uint8) if direction == 'horizontal': for c in range(3): grad = cp.linspace(color1[c], color2[c], w, dtype=cp.float32) img[:, :, c] = grad[cp.newaxis, :].astype(cp.uint8) elif direction == 'vertical': for c in range(3): grad = cp.linspace(color1[c], color2[c], h, dtype=cp.float32) img[:, :, c] = grad[:, cp.newaxis].astype(cp.uint8) elif direction == 'diagonal': for c in range(3): x_grad = cp.linspace(0, 1, w, dtype=cp.float32)[cp.newaxis, :] y_grad = cp.linspace(0, 1, h, dtype=cp.float32)[:, cp.newaxis] combined = (x_grad + y_grad) / 2 img[:, :, c] = (color1[c] + (color2[c] - color1[c]) * combined).astype(cp.uint8) return img # Add GPU-specific primitives PRIMITIVES['fused-pipeline'] = prim_fused_pipeline PRIMITIVES['autonomous-pipeline'] = prim_autonomous_pipeline PRIMITIVES['gpu-make-image'] = gpu_make_image PRIMITIVES['gpu-gradient'] = gpu_gradient_image # (The GPU video source will be added by create_cid_primitives in the task)