Files
celery/sexp_effects/primitive_libs/streaming_gpu.py
giles e4349ba501
Some checks are pending
GPU Worker CI/CD / test (push) Waiting to run
GPU Worker CI/CD / deploy (push) Blocked by required conditions
Add autonomous-pipeline primitive for zero-Python hot path
2026-02-04 10:02:40 +00:00

1042 lines
36 KiB
Python

"""
GPU-Accelerated Streaming Primitives
Provides GPU-native video source and frame processing.
Frames stay on GPU memory throughout the pipeline for maximum performance.
Architecture:
- GPUFrame: Wrapper that tracks whether data is on CPU or GPU
- GPUVideoSource: Hardware-accelerated decode to GPU memory
- GPU primitives operate directly on GPU frames using fast CUDA kernels
- Transfer to CPU only at final output
Requirements:
- CuPy for CUDA support
- FFmpeg with NVDEC support (for hardware decode)
- NVIDIA GPU with CUDA capability
"""
import os
import sys
import json
import subprocess
import numpy as np
from pathlib import Path
from typing import Optional, Tuple, Union
# Try to import CuPy
try:
import cupy as cp
GPU_AVAILABLE = True
except ImportError:
cp = None
GPU_AVAILABLE = False
# Try to import fast CUDA kernels from JIT compiler
_FAST_KERNELS_AVAILABLE = False
try:
if GPU_AVAILABLE:
from streaming.jit_compiler import (
fast_rotate, fast_zoom, fast_blend, fast_hue_shift,
fast_invert, fast_ripple, get_fast_ops
)
_FAST_KERNELS_AVAILABLE = True
print("[streaming_gpu] Fast CUDA kernels loaded", file=sys.stderr)
except ImportError as e:
print(f"[streaming_gpu] Fast kernels not available: {e}", file=sys.stderr)
# Check for hardware decode support
_HWDEC_AVAILABLE: Optional[bool] = None
_DECORD_GPU_AVAILABLE: Optional[bool] = None
def check_hwdec_available() -> bool:
"""Check if NVIDIA hardware decode is available."""
global _HWDEC_AVAILABLE
if _HWDEC_AVAILABLE is not None:
return _HWDEC_AVAILABLE
try:
# Check for nvidia-smi (GPU present)
result = subprocess.run(["nvidia-smi"], capture_output=True, timeout=2)
if result.returncode != 0:
_HWDEC_AVAILABLE = False
return False
# Check for nvdec in ffmpeg
result = subprocess.run(
["ffmpeg", "-hwaccels"],
capture_output=True,
text=True,
timeout=5
)
_HWDEC_AVAILABLE = "cuda" in result.stdout
except Exception:
_HWDEC_AVAILABLE = False
return _HWDEC_AVAILABLE
def check_decord_gpu_available() -> bool:
"""Check if decord with CUDA GPU decode is available."""
global _DECORD_GPU_AVAILABLE
if _DECORD_GPU_AVAILABLE is not None:
return _DECORD_GPU_AVAILABLE
try:
import decord
from decord import gpu
# Try to create a GPU context to verify CUDA support
ctx = gpu(0)
_DECORD_GPU_AVAILABLE = True
print("[streaming_gpu] decord GPU (CUDA) decode available", file=sys.stderr)
except Exception as e:
_DECORD_GPU_AVAILABLE = False
print(f"[streaming_gpu] decord GPU not available: {e}", file=sys.stderr)
return _DECORD_GPU_AVAILABLE
class GPUFrame:
"""
Frame container that tracks data location (CPU/GPU).
Enables zero-copy operations when data is already on the right device.
Lazy transfer - only moves data when actually needed.
"""
def __init__(self, data: Union[np.ndarray, 'cp.ndarray'], on_gpu: bool = None):
self._cpu_data: Optional[np.ndarray] = None
self._gpu_data = None # Optional[cp.ndarray]
if on_gpu is None:
# Auto-detect based on type
if GPU_AVAILABLE and isinstance(data, cp.ndarray):
self._gpu_data = data
else:
self._cpu_data = np.asarray(data)
elif on_gpu and GPU_AVAILABLE:
self._gpu_data = cp.asarray(data) if not isinstance(data, cp.ndarray) else data
else:
self._cpu_data = np.asarray(data) if isinstance(data, np.ndarray) else cp.asnumpy(data)
@property
def cpu(self) -> np.ndarray:
"""Get frame as numpy array (transfers from GPU if needed)."""
if self._cpu_data is None:
if self._gpu_data is not None and GPU_AVAILABLE:
self._cpu_data = cp.asnumpy(self._gpu_data)
else:
raise ValueError("No frame data available")
return self._cpu_data
@property
def gpu(self):
"""Get frame as CuPy array (transfers to GPU if needed)."""
if not GPU_AVAILABLE:
raise RuntimeError("GPU not available")
if self._gpu_data is None:
if self._cpu_data is not None:
self._gpu_data = cp.asarray(self._cpu_data)
else:
raise ValueError("No frame data available")
return self._gpu_data
@property
def is_on_gpu(self) -> bool:
"""Check if data is currently on GPU."""
return self._gpu_data is not None
@property
def shape(self) -> Tuple[int, ...]:
"""Get frame shape."""
if self._gpu_data is not None:
return self._gpu_data.shape
return self._cpu_data.shape
@property
def dtype(self):
"""Get frame dtype."""
if self._gpu_data is not None:
return self._gpu_data.dtype
return self._cpu_data.dtype
def numpy(self) -> np.ndarray:
"""Alias for cpu property."""
return self.cpu
def cupy(self):
"""Alias for gpu property."""
return self.gpu
def free_cpu(self):
"""Free CPU memory (keep GPU only)."""
if self._gpu_data is not None:
self._cpu_data = None
def free_gpu(self):
"""Free GPU memory (keep CPU only)."""
if self._cpu_data is not None:
self._gpu_data = None
class GPUVideoSource:
"""
GPU-accelerated video source using hardware decode.
Uses decord with CUDA GPU context for true NVDEC decode - frames
decode directly to GPU memory via CUDA.
Falls back to FFmpeg pipe if decord GPU unavailable (slower due to CPU copy).
"""
def __init__(self, path: str, fps: float = 30, prefer_gpu: bool = True):
self.path = Path(path)
self.fps = fps
self.prefer_gpu = prefer_gpu and GPU_AVAILABLE
self._use_decord_gpu = self.prefer_gpu and check_decord_gpu_available()
self._frame_size: Optional[Tuple[int, int]] = None
self._duration: Optional[float] = None
self._video_fps: float = 30.0
self._total_frames: int = 0
self._frame_time = 1.0 / fps
self._last_read_time = -1
self._cached_frame: Optional[GPUFrame] = None
# Decord VideoReader with GPU context
self._vr = None
self._decord_ctx = None
# FFmpeg fallback state
self._proc = None
self._stream_time = 0.0
# Initialize video source
self._init_video()
mode = "decord-GPU" if self._use_decord_gpu else ("ffmpeg-hwaccel" if check_hwdec_available() else "ffmpeg-CPU")
print(f"[GPUVideoSource] {self.path.name}: {self._frame_size}, "
f"duration={self._duration:.1f}s, mode={mode}", file=sys.stderr)
def _init_video(self):
"""Initialize video reader (decord GPU or probe for ffmpeg)."""
if self._use_decord_gpu:
try:
from decord import VideoReader, gpu
# Use GPU context for NVDEC hardware decode
self._decord_ctx = gpu(0)
self._vr = VideoReader(str(self.path), ctx=self._decord_ctx, num_threads=1)
self._total_frames = len(self._vr)
self._video_fps = self._vr.get_avg_fps()
self._duration = self._total_frames / self._video_fps
# Get frame size from first frame
first_frame = self._vr[0]
self._frame_size = (first_frame.shape[1], first_frame.shape[0])
print(f"[GPUVideoSource] decord GPU initialized: {self._frame_size}, "
f"{self._total_frames} frames @ {self._video_fps:.1f}fps", file=sys.stderr)
return
except Exception as e:
print(f"[GPUVideoSource] decord GPU init failed, falling back to ffmpeg: {e}", file=sys.stderr)
self._use_decord_gpu = False
self._vr = None
self._decord_ctx = None
# FFmpeg fallback - probe video for metadata
self._probe_video()
def _probe_video(self):
"""Probe video file for metadata (FFmpeg fallback)."""
cmd = ["ffprobe", "-v", "quiet", "-print_format", "json",
"-show_streams", "-show_format", str(self.path)]
result = subprocess.run(cmd, capture_output=True, text=True)
info = json.loads(result.stdout)
for stream in info.get("streams", []):
if stream.get("codec_type") == "video":
self._frame_size = (stream.get("width", 720), stream.get("height", 720))
if "duration" in stream:
self._duration = float(stream["duration"])
elif "tags" in stream and "DURATION" in stream["tags"]:
dur_str = stream["tags"]["DURATION"]
parts = dur_str.split(":")
if len(parts) == 3:
h, m, s = parts
self._duration = int(h) * 3600 + int(m) * 60 + float(s)
# Get fps
if "r_frame_rate" in stream:
fps_str = stream["r_frame_rate"]
if "/" in fps_str:
num, den = fps_str.split("/")
self._video_fps = float(num) / float(den)
break
if self._duration is None and "format" in info:
if "duration" in info["format"]:
self._duration = float(info["format"]["duration"])
if not self._frame_size:
self._frame_size = (720, 720)
if not self._duration:
self._duration = 60.0
self._total_frames = int(self._duration * self._video_fps)
def _start_stream(self, seek_time: float = 0):
"""Start ffmpeg decode process (fallback mode)."""
if self._proc:
self._proc.kill()
self._proc = None
if not self.path.exists():
raise FileNotFoundError(f"Video file not found: {self.path}")
w, h = self._frame_size
# Build ffmpeg command
cmd = ["ffmpeg", "-v", "error"]
# Hardware decode if available
if check_hwdec_available():
cmd.extend(["-hwaccel", "cuda"])
cmd.extend([
"-ss", f"{seek_time:.3f}",
"-i", str(self.path),
"-f", "rawvideo",
"-pix_fmt", "rgb24",
"-s", f"{w}x{h}",
"-r", str(self.fps),
"-"
])
self._proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self._stream_time = seek_time
def _read_frame_raw(self) -> Optional[np.ndarray]:
"""Read one frame from ffmpeg pipe (fallback mode)."""
w, h = self._frame_size
frame_size = w * h * 3
if not self._proc or self._proc.poll() is not None:
return None
data = self._proc.stdout.read(frame_size)
if len(data) < frame_size:
return None
return np.frombuffer(data, dtype=np.uint8).reshape((h, w, 3)).copy()
def _read_frame_decord_gpu(self, frame_idx: int) -> Optional[GPUFrame]:
"""Read frame using decord with GPU context (NVDEC, zero-copy to CuPy)."""
if self._vr is None:
return None
try:
# Handle looping
frame_idx = frame_idx % max(1, self._total_frames)
# Decode frame - with GPU context, this uses NVDEC
frame_tensor = self._vr[frame_idx]
# Convert to CuPy via DLPack (zero-copy GPU transfer)
if GPU_AVAILABLE:
# decord tensors have .to_dlpack() which returns a PyCapsule
# that CuPy can consume for zero-copy GPU transfer
try:
dlpack_capsule = frame_tensor.to_dlpack()
gpu_frame = cp.from_dlpack(dlpack_capsule)
# Log success once per source
if not getattr(self, '_dlpack_logged', False):
print(f"[GPUVideoSource] DLPack zero-copy SUCCESS - frames stay on GPU", file=sys.stderr)
self._dlpack_logged = True
return GPUFrame(gpu_frame, on_gpu=True)
except Exception as dlpack_err:
# Fallback: convert via numpy (involves CPU copy)
if not getattr(self, '_dlpack_fail_logged', False):
print(f"[GPUVideoSource] DLPack FAILED ({dlpack_err}), using CPU copy fallback", file=sys.stderr)
self._dlpack_fail_logged = True
frame_np = frame_tensor.asnumpy()
return GPUFrame(frame_np, on_gpu=True)
else:
return GPUFrame(frame_tensor.asnumpy(), on_gpu=False)
except Exception as e:
print(f"[GPUVideoSource] decord GPU read error at frame {frame_idx}: {e}", file=sys.stderr)
return None
def read_at(self, t: float) -> Optional[GPUFrame]:
"""
Read frame at specific time.
Returns GPUFrame with data on GPU if GPU mode enabled.
"""
# Cache check
if t == self._last_read_time and self._cached_frame is not None:
return self._cached_frame
# Loop time for shorter videos
seek_time = t
if self._duration and self._duration > 0:
seek_time = t % self._duration
if seek_time > self._duration - 0.1:
seek_time = 0.0
self._last_read_time = t
# Use decord GPU if available (NVDEC decode, zero-copy via DLPack)
if self._use_decord_gpu:
frame_idx = int(seek_time * self._video_fps)
self._cached_frame = self._read_frame_decord_gpu(frame_idx)
if self._cached_frame is not None:
# Free CPU copy if on GPU (saves memory)
if self.prefer_gpu and self._cached_frame.is_on_gpu:
self._cached_frame.free_cpu()
return self._cached_frame
# FFmpeg fallback
need_seek = (
self._proc is None or
self._proc.poll() is not None or
seek_time < self._stream_time - self._frame_time or
seek_time > self._stream_time + 2.0
)
if need_seek:
self._start_stream(seek_time)
# Skip frames to reach target
while self._stream_time + self._frame_time <= seek_time:
frame = self._read_frame_raw()
if frame is None:
self._start_stream(seek_time)
break
self._stream_time += self._frame_time
# Read target frame
frame_np = self._read_frame_raw()
if frame_np is None:
return self._cached_frame
self._stream_time += self._frame_time
# Create GPUFrame - transfer to GPU if in GPU mode
self._cached_frame = GPUFrame(frame_np, on_gpu=self.prefer_gpu)
# Free CPU copy if on GPU (saves memory)
if self.prefer_gpu and self._cached_frame.is_on_gpu:
self._cached_frame.free_cpu()
return self._cached_frame
def read(self) -> Optional[GPUFrame]:
"""Read current frame."""
if self._cached_frame is not None:
return self._cached_frame
return self.read_at(0)
@property
def size(self) -> Tuple[int, int]:
return self._frame_size
@property
def duration(self) -> float:
return self._duration
def close(self):
"""Close the video source."""
if self._proc:
self._proc.kill()
self._proc = None
# Release decord resources
self._vr = None
self._decord_ctx = None
# GPU-aware primitive functions
def gpu_blend(frame_a: GPUFrame, frame_b: GPUFrame, alpha: float = 0.5) -> GPUFrame:
"""
Blend two frames on GPU using fast CUDA kernel.
Both frames stay on GPU throughout - no CPU transfer.
"""
if not GPU_AVAILABLE:
a = frame_a.cpu.astype(np.float32)
b = frame_b.cpu.astype(np.float32)
result = (a * alpha + b * (1 - alpha)).astype(np.uint8)
return GPUFrame(result, on_gpu=False)
# Use fast CUDA kernel
if _FAST_KERNELS_AVAILABLE:
a_gpu = frame_a.gpu
b_gpu = frame_b.gpu
if a_gpu.dtype != cp.uint8:
a_gpu = cp.clip(a_gpu, 0, 255).astype(cp.uint8)
if b_gpu.dtype != cp.uint8:
b_gpu = cp.clip(b_gpu, 0, 255).astype(cp.uint8)
result = fast_blend(a_gpu, b_gpu, alpha)
return GPUFrame(result, on_gpu=True)
# Fallback
a = frame_a.gpu.astype(cp.float32)
b = frame_b.gpu.astype(cp.float32)
result = (a * alpha + b * (1 - alpha)).astype(cp.uint8)
return GPUFrame(result, on_gpu=True)
def gpu_resize(frame: GPUFrame, size: Tuple[int, int]) -> GPUFrame:
"""Resize frame on GPU using fast CUDA zoom kernel."""
import cv2
if not GPU_AVAILABLE or not frame.is_on_gpu:
resized = cv2.resize(frame.cpu, size)
return GPUFrame(resized, on_gpu=False)
gpu_data = frame.gpu
h, w = gpu_data.shape[:2]
target_w, target_h = size
# Use fast zoom kernel if same aspect ratio (pure zoom)
if _FAST_KERNELS_AVAILABLE and target_w == target_h == w == h:
# For uniform zoom we can use the zoom kernel
pass # Fall through to scipy for now - full resize needs different approach
# CuPy doesn't have built-in resize, use scipy zoom
from cupyx.scipy import ndimage as cpndimage
zoom_y = target_h / h
zoom_x = target_w / w
if gpu_data.ndim == 3:
resized = cpndimage.zoom(gpu_data, (zoom_y, zoom_x, 1), order=1)
else:
resized = cpndimage.zoom(gpu_data, (zoom_y, zoom_x), order=1)
return GPUFrame(resized, on_gpu=True)
def gpu_zoom(frame: GPUFrame, factor: float, cx: float = None, cy: float = None) -> GPUFrame:
"""Zoom frame on GPU using fast CUDA kernel."""
if not GPU_AVAILABLE or not frame.is_on_gpu:
import cv2
h, w = frame.cpu.shape[:2]
if cx is None:
cx = w / 2
if cy is None:
cy = h / 2
M = cv2.getRotationMatrix2D((cx, cy), 0, factor)
zoomed = cv2.warpAffine(frame.cpu, M, (w, h))
return GPUFrame(zoomed, on_gpu=False)
if _FAST_KERNELS_AVAILABLE:
zoomed = fast_zoom(frame.gpu, factor, cx=cx, cy=cy)
return GPUFrame(zoomed, on_gpu=True)
# Fallback - basic zoom via slice and resize
return frame
def gpu_hue_shift(frame: GPUFrame, degrees: float) -> GPUFrame:
"""Shift hue on GPU using fast CUDA kernel."""
if not GPU_AVAILABLE or not frame.is_on_gpu:
import cv2
hsv = cv2.cvtColor(frame.cpu, cv2.COLOR_RGB2HSV)
hsv[:, :, 0] = (hsv[:, :, 0].astype(np.float32) + degrees / 2) % 180
result = cv2.cvtColor(hsv, cv2.COLOR_HSV2RGB)
return GPUFrame(result, on_gpu=False)
if _FAST_KERNELS_AVAILABLE:
gpu_data = frame.gpu
if gpu_data.dtype != cp.uint8:
gpu_data = cp.clip(gpu_data, 0, 255).astype(cp.uint8)
shifted = fast_hue_shift(gpu_data, degrees)
return GPUFrame(shifted, on_gpu=True)
# Fallback - no GPU hue shift without fast kernels
return frame
def gpu_invert(frame: GPUFrame) -> GPUFrame:
"""Invert colors on GPU using fast CUDA kernel."""
if not GPU_AVAILABLE or not frame.is_on_gpu:
result = 255 - frame.cpu
return GPUFrame(result, on_gpu=False)
if _FAST_KERNELS_AVAILABLE:
gpu_data = frame.gpu
if gpu_data.dtype != cp.uint8:
gpu_data = cp.clip(gpu_data, 0, 255).astype(cp.uint8)
inverted = fast_invert(gpu_data)
return GPUFrame(inverted, on_gpu=True)
# Fallback - basic CuPy invert
result = 255 - frame.gpu
return GPUFrame(result, on_gpu=True)
def gpu_ripple(frame: GPUFrame, amplitude: float, frequency: float = 8,
decay: float = 2, phase: float = 0,
cx: float = None, cy: float = None) -> GPUFrame:
"""Apply ripple effect on GPU using fast CUDA kernel."""
if not GPU_AVAILABLE or not frame.is_on_gpu:
return frame # No CPU fallback for ripple
if _FAST_KERNELS_AVAILABLE:
gpu_data = frame.gpu
if gpu_data.dtype != cp.uint8:
gpu_data = cp.clip(gpu_data, 0, 255).astype(cp.uint8)
h, w = gpu_data.shape[:2]
rippled = fast_ripple(
gpu_data, amplitude,
center_x=cx if cx else w/2,
center_y=cy if cy else h/2,
frequency=frequency,
decay=decay,
speed=1.0,
t=phase
)
return GPUFrame(rippled, on_gpu=True)
return frame
def gpu_contrast(frame: GPUFrame, factor: float) -> GPUFrame:
"""Adjust contrast on GPU using fast CUDA kernel."""
if not GPU_AVAILABLE or not frame.is_on_gpu:
result = np.clip((frame.cpu.astype(np.float32) - 128) * factor + 128, 0, 255).astype(np.uint8)
return GPUFrame(result, on_gpu=False)
if _FAST_KERNELS_AVAILABLE:
gpu_data = frame.gpu
if gpu_data.dtype != cp.uint8:
gpu_data = cp.clip(gpu_data, 0, 255).astype(cp.uint8)
h, w = gpu_data.shape[:2]
ops = get_fast_ops(w, h)
ops.set_input(gpu_data)
ops.contrast(factor)
return GPUFrame(ops.get_output().copy(), on_gpu=True)
# Fallback
result = cp.clip((frame.gpu.astype(cp.float32) - 128) * factor + 128, 0, 255).astype(cp.uint8)
return GPUFrame(result, on_gpu=True)
def gpu_rotate(frame: GPUFrame, angle: float) -> GPUFrame:
"""Rotate frame on GPU using fast CUDA kernel."""
if not GPU_AVAILABLE or not frame.is_on_gpu:
import cv2
h, w = frame.cpu.shape[:2]
center = (w // 2, h // 2)
M = cv2.getRotationMatrix2D(center, angle, 1.0)
rotated = cv2.warpAffine(frame.cpu, M, (w, h))
return GPUFrame(rotated, on_gpu=False)
# Use fast CUDA kernel (< 1ms vs 20ms for scipy)
if _FAST_KERNELS_AVAILABLE:
rotated = fast_rotate(frame.gpu, angle)
return GPUFrame(rotated, on_gpu=True)
# Fallback to scipy (slow)
from cupyx.scipy import ndimage as cpndimage
rotated = cpndimage.rotate(frame.gpu, angle, reshape=False, order=1)
return GPUFrame(rotated, on_gpu=True)
def gpu_brightness(frame: GPUFrame, factor: float) -> GPUFrame:
"""Adjust brightness on GPU using fast CUDA kernel."""
if not GPU_AVAILABLE or not frame.is_on_gpu:
result = np.clip(frame.cpu.astype(np.float32) * factor, 0, 255).astype(np.uint8)
return GPUFrame(result, on_gpu=False)
# Use fast CUDA kernel
if _FAST_KERNELS_AVAILABLE:
gpu_data = frame.gpu
if gpu_data.dtype != cp.uint8:
gpu_data = cp.clip(gpu_data, 0, 255).astype(cp.uint8)
h, w = gpu_data.shape[:2]
ops = get_fast_ops(w, h)
ops.set_input(gpu_data)
ops.brightness(factor)
return GPUFrame(ops.get_output().copy(), on_gpu=True)
# Fallback
result = cp.clip(frame.gpu.astype(cp.float32) * factor, 0, 255).astype(cp.uint8)
return GPUFrame(result, on_gpu=True)
def gpu_composite(frames: list, weights: list = None) -> GPUFrame:
"""
Composite multiple frames with weights.
All frames processed on GPU for efficiency.
"""
if not frames:
raise ValueError("No frames to composite")
if len(frames) == 1:
return frames[0]
if weights is None:
weights = [1.0 / len(frames)] * len(frames)
# Normalize weights
total = sum(weights)
if total > 0:
weights = [w / total for w in weights]
use_gpu = GPU_AVAILABLE and any(f.is_on_gpu for f in frames)
if use_gpu:
# All on GPU
target_shape = frames[0].gpu.shape
result = cp.zeros(target_shape, dtype=cp.float32)
for frame, weight in zip(frames, weights):
gpu_data = frame.gpu.astype(cp.float32)
if gpu_data.shape != target_shape:
# Resize to match
from cupyx.scipy import ndimage as cpndimage
h, w = target_shape[:2]
fh, fw = gpu_data.shape[:2]
zoom_factors = (h/fh, w/fw, 1) if gpu_data.ndim == 3 else (h/fh, w/fw)
gpu_data = cpndimage.zoom(gpu_data, zoom_factors, order=1)
result += gpu_data * weight
return GPUFrame(cp.clip(result, 0, 255).astype(cp.uint8), on_gpu=True)
else:
# All on CPU
import cv2
target_shape = frames[0].cpu.shape
result = np.zeros(target_shape, dtype=np.float32)
for frame, weight in zip(frames, weights):
cpu_data = frame.cpu.astype(np.float32)
if cpu_data.shape != target_shape:
cpu_data = cv2.resize(cpu_data, (target_shape[1], target_shape[0]))
result += cpu_data * weight
return GPUFrame(np.clip(result, 0, 255).astype(np.uint8), on_gpu=False)
# Primitive registration for streaming interpreter
def _to_gpu_frame(img):
"""Convert any image type to GPUFrame, keeping data on GPU if possible."""
if isinstance(img, GPUFrame):
return img
# Check for CuPy array (stays on GPU)
if GPU_AVAILABLE and hasattr(img, '__cuda_array_interface__'):
# Already a CuPy array - wrap directly
return GPUFrame(img, on_gpu=True)
# Numpy or other - will be uploaded to GPU
return GPUFrame(img, on_gpu=True)
def get_primitives():
"""
Get GPU-aware primitives for registration with interpreter.
These wrap the GPU functions to work with the sexp interpreter.
All use fast CUDA kernels when available for maximum performance.
Primitives detect CuPy arrays and keep them on GPU (no CPU round-trips).
"""
def prim_make_video_source_gpu(path: str, fps: float = 30):
"""Create GPU-accelerated video source."""
return GPUVideoSource(path, fps, prefer_gpu=True)
def prim_gpu_blend(a, b, alpha=0.5):
"""Blend two frames using fast CUDA kernel."""
fa = _to_gpu_frame(a)
fb = _to_gpu_frame(b)
result = gpu_blend(fa, fb, alpha)
return result.gpu if result.is_on_gpu else result.cpu
def prim_gpu_rotate(img, angle):
"""Rotate image using fast CUDA kernel (< 1ms)."""
f = _to_gpu_frame(img)
result = gpu_rotate(f, angle)
return result.gpu if result.is_on_gpu else result.cpu
def prim_gpu_brightness(img, factor):
"""Adjust brightness using fast CUDA kernel."""
f = _to_gpu_frame(img)
result = gpu_brightness(f, factor)
return result.gpu if result.is_on_gpu else result.cpu
def prim_gpu_contrast(img, factor):
"""Adjust contrast using fast CUDA kernel."""
f = _to_gpu_frame(img)
result = gpu_contrast(f, factor)
return result.gpu if result.is_on_gpu else result.cpu
def prim_gpu_zoom(img, factor, cx=None, cy=None):
"""Zoom image using fast CUDA kernel."""
f = _to_gpu_frame(img)
result = gpu_zoom(f, factor, cx, cy)
return result.gpu if result.is_on_gpu else result.cpu
def prim_gpu_hue_shift(img, degrees):
"""Shift hue using fast CUDA kernel."""
f = _to_gpu_frame(img)
result = gpu_hue_shift(f, degrees)
return result.gpu if result.is_on_gpu else result.cpu
def prim_gpu_invert(img):
"""Invert colors using fast CUDA kernel."""
f = _to_gpu_frame(img)
result = gpu_invert(f)
return result.gpu if result.is_on_gpu else result.cpu
def prim_gpu_ripple(img, amplitude, frequency=8, decay=2, phase=0, cx=None, cy=None):
"""Apply ripple effect using fast CUDA kernel."""
f = _to_gpu_frame(img)
result = gpu_ripple(f, amplitude, frequency, decay, phase, cx, cy)
return result.gpu if result.is_on_gpu else result.cpu
return {
'streaming-gpu:make-video-source': prim_make_video_source_gpu,
'gpu:blend': prim_gpu_blend,
'gpu:rotate': prim_gpu_rotate,
'gpu:brightness': prim_gpu_brightness,
'gpu:contrast': prim_gpu_contrast,
'gpu:zoom': prim_gpu_zoom,
'gpu:hue-shift': prim_gpu_hue_shift,
'gpu:invert': prim_gpu_invert,
'gpu:ripple': prim_gpu_ripple,
}
# Export
__all__ = [
'GPU_AVAILABLE',
'GPUFrame',
'GPUVideoSource',
'gpu_blend',
'gpu_resize',
'gpu_rotate',
'gpu_brightness',
'gpu_contrast',
'gpu_zoom',
'gpu_hue_shift',
'gpu_invert',
'gpu_ripple',
'gpu_composite',
'get_primitives',
'check_hwdec_available',
'PRIMITIVES',
]
# Import CPU primitives from streaming.py and include them in PRIMITIVES
# This ensures audio analysis primitives are available when streaming_gpu is loaded
def _get_cpu_primitives():
from sexp_effects.primitive_libs import streaming
return streaming.PRIMITIVES
PRIMITIVES = _get_cpu_primitives().copy()
# Try to import fused kernel compiler
_FUSED_KERNELS_AVAILABLE = False
_compile_frame_pipeline = None
_compile_autonomous_pipeline = None
try:
if GPU_AVAILABLE:
from streaming.sexp_to_cuda import compile_frame_pipeline as _compile_frame_pipeline
from streaming.sexp_to_cuda import compile_autonomous_pipeline as _compile_autonomous_pipeline
_FUSED_KERNELS_AVAILABLE = True
print("[streaming_gpu] Fused CUDA kernel compiler loaded", file=sys.stderr)
except ImportError as e:
print(f"[streaming_gpu] Fused kernels not available: {e}", file=sys.stderr)
# Fused pipeline cache
_FUSED_PIPELINE_CACHE = {}
def _normalize_effect_dict(effect):
"""Convert effect dict with Keyword keys to string keys."""
result = {}
for k, v in effect.items():
# Handle Keyword objects from sexp parser
if hasattr(k, 'name'): # Keyword object
key = k.name
else:
key = str(k)
result[key] = v
return result
def prim_fused_pipeline(img, effects_list, **dynamic_params):
"""
Apply a fused CUDA kernel pipeline to an image.
This compiles multiple effects into a single CUDA kernel that processes
the entire pipeline in one GPU pass, eliminating Python interpreter overhead.
Args:
img: Input image (GPU array or numpy array)
effects_list: List of effect dicts like:
[{'op': 'rotate', 'angle': 45.0},
{'op': 'hue_shift', 'degrees': 90.0},
{'op': 'ripple', 'amplitude': 10, ...}]
**dynamic_params: Parameters that change per-frame like:
rotate_angle=45, ripple_phase=0.5
Returns:
Processed image as GPU array
Supported ops: rotate, zoom, ripple, invert, hue_shift, brightness
"""
# Normalize effects list - convert Keyword keys to strings
effects_list = [_normalize_effect_dict(e) for e in effects_list]
if not _FUSED_KERNELS_AVAILABLE:
# Fallback: apply effects one by one
result = img
for effect in effects_list:
op = effect['op']
if op == 'rotate':
angle = dynamic_params.get('rotate_angle', effect.get('angle', 0))
result = gpu_rotate(result, angle)
elif op == 'zoom':
amount = dynamic_params.get('zoom_amount', effect.get('amount', 1.0))
result = gpu_zoom(result, amount)
elif op == 'hue_shift':
degrees = effect.get('degrees', 0)
result = gpu_hue_shift(result, degrees)
elif op == 'ripple':
result = gpu_ripple(result,
amplitude=effect.get('amplitude', 10),
frequency=effect.get('frequency', 8),
decay=effect.get('decay', 2),
phase=dynamic_params.get('ripple_phase', effect.get('phase', 0)),
center_x=effect.get('center_x'),
center_y=effect.get('center_y'))
elif op == 'brightness':
factor = effect.get('factor', 1.0)
result = gpu_contrast(result, factor, 0)
elif op == 'invert':
result = gpu_invert(result)
return result
# Get image dimensions
if hasattr(img, 'shape'):
h, w = img.shape[:2]
else:
raise ValueError("Image must have shape attribute")
# Create cache key from effects
import hashlib
ops_key = str([(e['op'], {k:v for k,v in e.items() if k != 'src2'}) for e in effects_list])
cache_key = f"{w}x{h}_{hashlib.md5(ops_key.encode()).hexdigest()}"
# Compile or get cached pipeline
if cache_key not in _FUSED_PIPELINE_CACHE:
_FUSED_PIPELINE_CACHE[cache_key] = _compile_frame_pipeline(effects_list, w, h)
pipeline = _FUSED_PIPELINE_CACHE[cache_key]
# Ensure image is on GPU and uint8
if hasattr(img, '__cuda_array_interface__'):
gpu_img = img
elif GPU_AVAILABLE:
gpu_img = cp.asarray(img)
else:
gpu_img = img
# Run the fused pipeline
return pipeline(gpu_img, **dynamic_params)
# Autonomous pipeline cache (separate from fused)
_AUTONOMOUS_PIPELINE_CACHE = {}
def prim_autonomous_pipeline(img, effects_list, dynamic_expressions, frame_num, fps=30.0):
"""
Apply a fully autonomous CUDA kernel pipeline.
This computes ALL parameters on GPU - including time-based expressions
like sin(t), t*30, etc. Zero Python in the hot path!
Args:
img: Input image (GPU array or numpy array)
effects_list: List of effect dicts
dynamic_expressions: Dict mapping param names to CUDA expressions:
{'rotate_angle': 't * 30.0f',
'ripple_phase': 't * 2.0f',
'brightness_factor': '0.8f + 0.4f * sinf(t * 2.0f)'}
frame_num: Current frame number
fps: Frames per second (default 30)
Returns:
Processed image as GPU array
Note: Expressions use CUDA syntax - use sinf() not sin(), etc.
"""
# Normalize effects and expressions
effects_list = [_normalize_effect_dict(e) for e in effects_list]
dynamic_expressions = {
(k.name if hasattr(k, 'name') else str(k)): v
for k, v in dynamic_expressions.items()
}
if not _FUSED_KERNELS_AVAILABLE or _compile_autonomous_pipeline is None:
# Fallback to regular fused pipeline with Python-computed params
import math
t = float(frame_num) / float(fps)
# Evaluate expressions in Python as fallback
dynamic_params = {}
for key, expr in dynamic_expressions.items():
try:
# Simple eval with t and math functions
result = eval(expr.replace('f', '').replace('sin', 'math.sin').replace('cos', 'math.cos'),
{'t': t, 'math': math, 'frame_num': frame_num})
dynamic_params[key] = result
except:
dynamic_params[key] = 0
return prim_fused_pipeline(img, effects_list, **dynamic_params)
# Get image dimensions
if hasattr(img, 'shape'):
h, w = img.shape[:2]
else:
raise ValueError("Image must have shape attribute")
# Create cache key
import hashlib
ops_key = str([(e['op'], {k:v for k,v in e.items() if k != 'src2'}) for e in effects_list])
expr_key = str(sorted(dynamic_expressions.items()))
cache_key = f"auto_{w}x{h}_{hashlib.md5((ops_key + expr_key).encode()).hexdigest()}"
# Compile or get cached pipeline
if cache_key not in _AUTONOMOUS_PIPELINE_CACHE:
_AUTONOMOUS_PIPELINE_CACHE[cache_key] = _compile_autonomous_pipeline(
effects_list, w, h, dynamic_expressions)
pipeline = _AUTONOMOUS_PIPELINE_CACHE[cache_key]
# Ensure image is on GPU
if hasattr(img, '__cuda_array_interface__'):
gpu_img = img
elif GPU_AVAILABLE:
gpu_img = cp.asarray(img)
else:
gpu_img = img
# Run - just pass frame_num and fps, kernel does the rest!
return pipeline(gpu_img, int(frame_num), float(fps))
# Add GPU-specific primitives
PRIMITIVES['fused-pipeline'] = prim_fused_pipeline
PRIMITIVES['autonomous-pipeline'] = prim_autonomous_pipeline
# (The GPU video source will be added by create_cid_primitives in the task)