Files
celery/sexp_effects/primitive_libs/streaming_gpu.py
giles b7e3827fa2
Some checks are pending
GPU Worker CI/CD / test (push) Waiting to run
GPU Worker CI/CD / deploy (push) Blocked by required conditions
Use PyNvCodec for true zero-copy GPU video decode
Replace decord (CPU-only pip package) with PyNvCodec which provides
direct NVDEC access. Frames decode straight to GPU memory without
any CPU transfer, eliminating the memory bandwidth bottleneck.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 01:47:03 +00:00

672 lines
22 KiB
Python

"""
GPU-Accelerated Streaming Primitives
Provides GPU-native video source and frame processing.
Frames stay on GPU memory throughout the pipeline for maximum performance.
Architecture:
- GPUFrame: Wrapper that tracks whether data is on CPU or GPU
- GPUVideoSource: Hardware-accelerated decode to GPU memory
- GPU primitives operate directly on GPU frames
- Transfer to CPU only at final output
Requirements:
- CuPy for CUDA support
- FFmpeg with NVDEC support (for hardware decode)
- NVIDIA GPU with CUDA capability
"""
import os
import sys
import json
import subprocess
import numpy as np
from pathlib import Path
from typing import Optional, Tuple, Union
# Try to import CuPy
try:
import cupy as cp
GPU_AVAILABLE = True
except ImportError:
cp = None
GPU_AVAILABLE = False
# Check for hardware decode support
_HWDEC_AVAILABLE: Optional[bool] = None
_PYNVCODEC_AVAILABLE: Optional[bool] = None
def check_hwdec_available() -> bool:
"""Check if NVIDIA hardware decode is available."""
global _HWDEC_AVAILABLE
if _HWDEC_AVAILABLE is not None:
return _HWDEC_AVAILABLE
try:
# Check for nvidia-smi (GPU present)
result = subprocess.run(["nvidia-smi"], capture_output=True, timeout=2)
if result.returncode != 0:
_HWDEC_AVAILABLE = False
return False
# Check for nvdec in ffmpeg
result = subprocess.run(
["ffmpeg", "-hwaccels"],
capture_output=True,
text=True,
timeout=5
)
_HWDEC_AVAILABLE = "cuda" in result.stdout
except Exception:
_HWDEC_AVAILABLE = False
return _HWDEC_AVAILABLE
def check_pynvcodec_available() -> bool:
"""Check if PyNvCodec GPU decode is available."""
global _PYNVCODEC_AVAILABLE
if _PYNVCODEC_AVAILABLE is not None:
return _PYNVCODEC_AVAILABLE
try:
import PyNvCodec as nvc
_PYNVCODEC_AVAILABLE = True
print("[streaming_gpu] PyNvCodec GPU decode available", file=sys.stderr)
except Exception as e:
_PYNVCODEC_AVAILABLE = False
print(f"[streaming_gpu] PyNvCodec not available: {e}", file=sys.stderr)
return _PYNVCODEC_AVAILABLE
class GPUFrame:
"""
Frame container that tracks data location (CPU/GPU).
Enables zero-copy operations when data is already on the right device.
Lazy transfer - only moves data when actually needed.
"""
def __init__(self, data: Union[np.ndarray, 'cp.ndarray'], on_gpu: bool = None):
self._cpu_data: Optional[np.ndarray] = None
self._gpu_data = None # Optional[cp.ndarray]
if on_gpu is None:
# Auto-detect based on type
if GPU_AVAILABLE and isinstance(data, cp.ndarray):
self._gpu_data = data
else:
self._cpu_data = np.asarray(data)
elif on_gpu and GPU_AVAILABLE:
self._gpu_data = cp.asarray(data) if not isinstance(data, cp.ndarray) else data
else:
self._cpu_data = np.asarray(data) if isinstance(data, np.ndarray) else cp.asnumpy(data)
@property
def cpu(self) -> np.ndarray:
"""Get frame as numpy array (transfers from GPU if needed)."""
if self._cpu_data is None:
if self._gpu_data is not None and GPU_AVAILABLE:
self._cpu_data = cp.asnumpy(self._gpu_data)
else:
raise ValueError("No frame data available")
return self._cpu_data
@property
def gpu(self):
"""Get frame as CuPy array (transfers to GPU if needed)."""
if not GPU_AVAILABLE:
raise RuntimeError("GPU not available")
if self._gpu_data is None:
if self._cpu_data is not None:
self._gpu_data = cp.asarray(self._cpu_data)
else:
raise ValueError("No frame data available")
return self._gpu_data
@property
def is_on_gpu(self) -> bool:
"""Check if data is currently on GPU."""
return self._gpu_data is not None
@property
def shape(self) -> Tuple[int, ...]:
"""Get frame shape."""
if self._gpu_data is not None:
return self._gpu_data.shape
return self._cpu_data.shape
@property
def dtype(self):
"""Get frame dtype."""
if self._gpu_data is not None:
return self._gpu_data.dtype
return self._cpu_data.dtype
def numpy(self) -> np.ndarray:
"""Alias for cpu property."""
return self.cpu
def cupy(self):
"""Alias for gpu property."""
return self.gpu
def free_cpu(self):
"""Free CPU memory (keep GPU only)."""
if self._gpu_data is not None:
self._cpu_data = None
def free_gpu(self):
"""Free GPU memory (keep CPU only)."""
if self._cpu_data is not None:
self._gpu_data = None
class GPUVideoSource:
"""
GPU-accelerated video source using hardware decode.
Uses PyNvCodec for true zero-copy NVDEC decode - frames go directly
to GPU memory without any CPU transfer.
Falls back to FFmpeg pipe if PyNvCodec unavailable (slower due to CPU copy).
"""
def __init__(self, path: str, fps: float = 30, prefer_gpu: bool = True):
self.path = Path(path)
self.fps = fps
self.prefer_gpu = prefer_gpu and GPU_AVAILABLE
self._use_pynvcodec = self.prefer_gpu and check_pynvcodec_available()
self._frame_size: Optional[Tuple[int, int]] = None
self._duration: Optional[float] = None
self._video_fps: float = 30.0
self._total_frames: int = 0
self._frame_time = 1.0 / fps
self._last_read_time = -1
self._cached_frame: Optional[GPUFrame] = None
# PyNvCodec decoder components
self._nvdec = None
self._nv_cvt = None # NV12 to RGB converter
self._nv_dwn = None # GPU to CPU downloader (for fallback)
self._gpu_id = 0
# FFmpeg fallback state
self._proc = None
self._stream_time = 0.0
# Initialize video source
self._init_video()
mode = "PyNvCodec-GPU" if self._use_pynvcodec else ("ffmpeg-hwaccel" if check_hwdec_available() else "ffmpeg-CPU")
print(f"[GPUVideoSource] {self.path.name}: {self._frame_size}, "
f"duration={self._duration:.1f}s, mode={mode}", file=sys.stderr)
def _init_video(self):
"""Initialize video reader (PyNvCodec or probe for ffmpeg)."""
# First probe video for metadata (needed for both paths)
self._probe_video()
if self._use_pynvcodec:
try:
import PyNvCodec as nvc
# Create NVDEC decoder - decodes directly to GPU
self._nvdec = nvc.PyNvDecoder(
str(self.path),
self._gpu_id
)
# Get actual dimensions from decoder
self._frame_size = (self._nvdec.Width(), self._nvdec.Height())
# Create color converter: NV12 (decoder output) -> RGB
self._nv_cvt = nvc.PySurfaceConverter(
self._nvdec.Width(),
self._nvdec.Height(),
nvc.PixelFormat.NV12,
nvc.PixelFormat.RGB,
self._gpu_id
)
print(f"[GPUVideoSource] PyNvCodec initialized: {self._frame_size}", file=sys.stderr)
return
except Exception as e:
print(f"[GPUVideoSource] PyNvCodec init failed, falling back to ffmpeg: {e}", file=sys.stderr)
self._use_pynvcodec = False
self._nvdec = None
self._nv_cvt = None
def _probe_video(self):
"""Probe video file for metadata (FFmpeg fallback)."""
cmd = ["ffprobe", "-v", "quiet", "-print_format", "json",
"-show_streams", "-show_format", str(self.path)]
result = subprocess.run(cmd, capture_output=True, text=True)
info = json.loads(result.stdout)
for stream in info.get("streams", []):
if stream.get("codec_type") == "video":
self._frame_size = (stream.get("width", 720), stream.get("height", 720))
if "duration" in stream:
self._duration = float(stream["duration"])
elif "tags" in stream and "DURATION" in stream["tags"]:
dur_str = stream["tags"]["DURATION"]
parts = dur_str.split(":")
if len(parts) == 3:
h, m, s = parts
self._duration = int(h) * 3600 + int(m) * 60 + float(s)
# Get fps
if "r_frame_rate" in stream:
fps_str = stream["r_frame_rate"]
if "/" in fps_str:
num, den = fps_str.split("/")
self._video_fps = float(num) / float(den)
break
if self._duration is None and "format" in info:
if "duration" in info["format"]:
self._duration = float(info["format"]["duration"])
if not self._frame_size:
self._frame_size = (720, 720)
if not self._duration:
self._duration = 60.0
self._total_frames = int(self._duration * self._video_fps)
def _start_stream(self, seek_time: float = 0):
"""Start ffmpeg decode process (fallback mode)."""
if self._proc:
self._proc.kill()
self._proc = None
if not self.path.exists():
raise FileNotFoundError(f"Video file not found: {self.path}")
w, h = self._frame_size
# Build ffmpeg command
cmd = ["ffmpeg", "-v", "error"]
# Hardware decode if available
if check_hwdec_available():
cmd.extend(["-hwaccel", "cuda"])
cmd.extend([
"-ss", f"{seek_time:.3f}",
"-i", str(self.path),
"-f", "rawvideo",
"-pix_fmt", "rgb24",
"-s", f"{w}x{h}",
"-r", str(self.fps),
"-"
])
self._proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self._stream_time = seek_time
def _read_frame_raw(self) -> Optional[np.ndarray]:
"""Read one frame from ffmpeg pipe (fallback mode)."""
w, h = self._frame_size
frame_size = w * h * 3
if not self._proc or self._proc.poll() is not None:
return None
data = self._proc.stdout.read(frame_size)
if len(data) < frame_size:
return None
return np.frombuffer(data, dtype=np.uint8).reshape((h, w, 3)).copy()
def _read_frame_pynvcodec(self, target_time: float) -> Optional[GPUFrame]:
"""Read frame using PyNvCodec (true GPU-native, zero CPU copy)."""
if self._nvdec is None:
return None
try:
import PyNvCodec as nvc
# Seek if needed (PyNvCodec uses frame numbers)
target_frame = int(target_time * self._video_fps)
target_frame = target_frame % max(1, self._total_frames) # Loop
# Decode frame - returns surface in GPU memory
raw_surface = self._nvdec.DecodeSingleSurface()
if raw_surface.Empty():
# Try to seek and decode again
seek_ctx = nvc.SeekContext(target_frame)
self._nvdec.DecodeSingleSurface(seek_ctx)
raw_surface = self._nvdec.DecodeSingleSurface()
if raw_surface.Empty():
return None
# Convert NV12 -> RGB on GPU
rgb_surface = self._nv_cvt.Execute(raw_surface)
if rgb_surface.Empty():
return None
# Get as CuPy array - stays on GPU!
if GPU_AVAILABLE:
# Create CuPy array from GPU surface pointer
# PyNvCodec surfaces can be converted to numpy, then to cupy
# But for true zero-copy, we use the CUDA pointer directly
frame_ptr = rgb_surface.PlanePtr()
pitch = rgb_surface.Pitch()
height = rgb_surface.Height()
width = rgb_surface.Width()
# Create cupy array from device pointer
# Note: PyNvCodec stores data in pitched format
mem = cp.cuda.UnownedMemory(frame_ptr.GpuMem(), pitch * height * 3, None)
memptr = cp.cuda.MemoryPointer(mem, 0)
gpu_frame = cp.ndarray((height, width, 3), dtype=cp.uint8, memptr=memptr)
# Make a copy to ensure we own the memory (surface may be reused)
gpu_frame = gpu_frame.copy()
return GPUFrame(gpu_frame, on_gpu=True)
else:
# Fallback to CPU
frame_np = np.ndarray(
shape=(rgb_surface.Height(), rgb_surface.Width(), 3),
dtype=np.uint8
)
# Download to CPU (not ideal but works)
if self._nv_dwn is None:
self._nv_dwn = nvc.PySurfaceDownloader(
rgb_surface.Width(),
rgb_surface.Height(),
nvc.PixelFormat.RGB,
self._gpu_id
)
self._nv_dwn.DownloadSingleSurface(rgb_surface, frame_np)
return GPUFrame(frame_np, on_gpu=False)
except Exception as e:
print(f"[GPUVideoSource] PyNvCodec read error at t={target_time:.2f}: {e}", file=sys.stderr)
return None
def read_at(self, t: float) -> Optional[GPUFrame]:
"""
Read frame at specific time.
Returns GPUFrame with data on GPU if GPU mode enabled.
"""
# Cache check
if t == self._last_read_time and self._cached_frame is not None:
return self._cached_frame
# Loop time for shorter videos
seek_time = t
if self._duration and self._duration > 0:
seek_time = t % self._duration
if seek_time > self._duration - 0.1:
seek_time = 0.0
self._last_read_time = t
# Use PyNvCodec if available (true GPU-native decode, zero CPU copy)
if self._use_pynvcodec:
self._cached_frame = self._read_frame_pynvcodec(seek_time)
if self._cached_frame is not None:
# Free CPU copy if on GPU (saves memory)
if self.prefer_gpu and self._cached_frame.is_on_gpu:
self._cached_frame.free_cpu()
return self._cached_frame
# FFmpeg fallback
need_seek = (
self._proc is None or
self._proc.poll() is not None or
seek_time < self._stream_time - self._frame_time or
seek_time > self._stream_time + 2.0
)
if need_seek:
self._start_stream(seek_time)
# Skip frames to reach target
while self._stream_time + self._frame_time <= seek_time:
frame = self._read_frame_raw()
if frame is None:
self._start_stream(seek_time)
break
self._stream_time += self._frame_time
# Read target frame
frame_np = self._read_frame_raw()
if frame_np is None:
return self._cached_frame
self._stream_time += self._frame_time
# Create GPUFrame - transfer to GPU if in GPU mode
self._cached_frame = GPUFrame(frame_np, on_gpu=self.prefer_gpu)
# Free CPU copy if on GPU (saves memory)
if self.prefer_gpu and self._cached_frame.is_on_gpu:
self._cached_frame.free_cpu()
return self._cached_frame
def read(self) -> Optional[GPUFrame]:
"""Read current frame."""
if self._cached_frame is not None:
return self._cached_frame
return self.read_at(0)
@property
def size(self) -> Tuple[int, int]:
return self._frame_size
@property
def duration(self) -> float:
return self._duration
def close(self):
"""Close the video source."""
if self._proc:
self._proc.kill()
self._proc = None
# Release PyNvCodec resources
self._nvdec = None
self._nv_cvt = None
self._nv_dwn = None
# GPU-aware primitive functions
def gpu_blend(frame_a: GPUFrame, frame_b: GPUFrame, alpha: float = 0.5) -> GPUFrame:
"""
Blend two frames on GPU.
Both frames stay on GPU throughout - no CPU transfer.
"""
if not GPU_AVAILABLE:
a = frame_a.cpu.astype(np.float32)
b = frame_b.cpu.astype(np.float32)
result = (a * alpha + b * (1 - alpha)).astype(np.uint8)
return GPUFrame(result, on_gpu=False)
a = frame_a.gpu.astype(cp.float32)
b = frame_b.gpu.astype(cp.float32)
result = (a * alpha + b * (1 - alpha)).astype(cp.uint8)
return GPUFrame(result, on_gpu=True)
def gpu_resize(frame: GPUFrame, size: Tuple[int, int]) -> GPUFrame:
"""Resize frame on GPU."""
import cv2
if not GPU_AVAILABLE or not frame.is_on_gpu:
resized = cv2.resize(frame.cpu, size)
return GPUFrame(resized, on_gpu=False)
# CuPy doesn't have built-in resize, use scipy zoom
from cupyx.scipy import ndimage as cpndimage
gpu_data = frame.gpu
h, w = gpu_data.shape[:2]
target_w, target_h = size
zoom_y = target_h / h
zoom_x = target_w / w
if gpu_data.ndim == 3:
resized = cpndimage.zoom(gpu_data, (zoom_y, zoom_x, 1), order=1)
else:
resized = cpndimage.zoom(gpu_data, (zoom_y, zoom_x), order=1)
return GPUFrame(resized, on_gpu=True)
def gpu_rotate(frame: GPUFrame, angle: float) -> GPUFrame:
"""Rotate frame on GPU."""
if not GPU_AVAILABLE or not frame.is_on_gpu:
import cv2
h, w = frame.cpu.shape[:2]
center = (w // 2, h // 2)
M = cv2.getRotationMatrix2D(center, angle, 1.0)
rotated = cv2.warpAffine(frame.cpu, M, (w, h))
return GPUFrame(rotated, on_gpu=False)
from cupyx.scipy import ndimage as cpndimage
rotated = cpndimage.rotate(frame.gpu, angle, reshape=False, order=1)
return GPUFrame(rotated, on_gpu=True)
def gpu_brightness(frame: GPUFrame, factor: float) -> GPUFrame:
"""Adjust brightness on GPU."""
if not GPU_AVAILABLE or not frame.is_on_gpu:
result = np.clip(frame.cpu.astype(np.float32) * factor, 0, 255).astype(np.uint8)
return GPUFrame(result, on_gpu=False)
result = cp.clip(frame.gpu.astype(cp.float32) * factor, 0, 255).astype(cp.uint8)
return GPUFrame(result, on_gpu=True)
def gpu_composite(frames: list, weights: list = None) -> GPUFrame:
"""
Composite multiple frames with weights.
All frames processed on GPU for efficiency.
"""
if not frames:
raise ValueError("No frames to composite")
if len(frames) == 1:
return frames[0]
if weights is None:
weights = [1.0 / len(frames)] * len(frames)
# Normalize weights
total = sum(weights)
if total > 0:
weights = [w / total for w in weights]
use_gpu = GPU_AVAILABLE and any(f.is_on_gpu for f in frames)
if use_gpu:
# All on GPU
target_shape = frames[0].gpu.shape
result = cp.zeros(target_shape, dtype=cp.float32)
for frame, weight in zip(frames, weights):
gpu_data = frame.gpu.astype(cp.float32)
if gpu_data.shape != target_shape:
# Resize to match
from cupyx.scipy import ndimage as cpndimage
h, w = target_shape[:2]
fh, fw = gpu_data.shape[:2]
zoom_factors = (h/fh, w/fw, 1) if gpu_data.ndim == 3 else (h/fh, w/fw)
gpu_data = cpndimage.zoom(gpu_data, zoom_factors, order=1)
result += gpu_data * weight
return GPUFrame(cp.clip(result, 0, 255).astype(cp.uint8), on_gpu=True)
else:
# All on CPU
import cv2
target_shape = frames[0].cpu.shape
result = np.zeros(target_shape, dtype=np.float32)
for frame, weight in zip(frames, weights):
cpu_data = frame.cpu.astype(np.float32)
if cpu_data.shape != target_shape:
cpu_data = cv2.resize(cpu_data, (target_shape[1], target_shape[0]))
result += cpu_data * weight
return GPUFrame(np.clip(result, 0, 255).astype(np.uint8), on_gpu=False)
# Primitive registration for streaming interpreter
def get_primitives():
"""
Get GPU-aware primitives for registration with interpreter.
These wrap the GPU functions to work with the sexp interpreter.
"""
def prim_make_video_source_gpu(path: str, fps: float = 30):
"""Create GPU-accelerated video source."""
return GPUVideoSource(path, fps, prefer_gpu=True)
def prim_gpu_blend(a, b, alpha=0.5):
"""Blend two frames."""
fa = a if isinstance(a, GPUFrame) else GPUFrame(a)
fb = b if isinstance(b, GPUFrame) else GPUFrame(b)
result = gpu_blend(fa, fb, alpha)
return result.cpu # Return numpy for compatibility
def prim_gpu_rotate(img, angle):
"""Rotate image."""
f = img if isinstance(img, GPUFrame) else GPUFrame(img)
result = gpu_rotate(f, angle)
return result.cpu
def prim_gpu_brightness(img, factor):
"""Adjust brightness."""
f = img if isinstance(img, GPUFrame) else GPUFrame(img)
result = gpu_brightness(f, factor)
return result.cpu
return {
'streaming-gpu:make-video-source': prim_make_video_source_gpu,
'gpu:blend': prim_gpu_blend,
'gpu:rotate': prim_gpu_rotate,
'gpu:brightness': prim_gpu_brightness,
}
# Export
__all__ = [
'GPU_AVAILABLE',
'GPUFrame',
'GPUVideoSource',
'gpu_blend',
'gpu_resize',
'gpu_rotate',
'gpu_brightness',
'gpu_composite',
'get_primitives',
'check_hwdec_available',
'PRIMITIVES',
]
# Import CPU primitives from streaming.py and include them in PRIMITIVES
# This ensures audio analysis primitives are available when streaming_gpu is loaded
def _get_cpu_primitives():
from sexp_effects.primitive_libs import streaming
return streaming.PRIMITIVES
PRIMITIVES = _get_cpu_primitives().copy()
# Add GPU-specific primitives
# (The GPU video source will be added by create_cid_primitives in the task)