Files
celery/sexp_effects/primitive_libs/streaming_gpu.py
giles 9bdad268a5
Some checks are pending
GPU Worker CI/CD / test (push) Waiting to run
GPU Worker CI/CD / deploy (push) Blocked by required conditions
Fix DLPack: use frame.to_dlpack() for decord→CuPy zero-copy
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 02:10:18 +00:00

638 lines
21 KiB
Python

"""
GPU-Accelerated Streaming Primitives
Provides GPU-native video source and frame processing.
Frames stay on GPU memory throughout the pipeline for maximum performance.
Architecture:
- GPUFrame: Wrapper that tracks whether data is on CPU or GPU
- GPUVideoSource: Hardware-accelerated decode to GPU memory
- GPU primitives operate directly on GPU frames
- Transfer to CPU only at final output
Requirements:
- CuPy for CUDA support
- FFmpeg with NVDEC support (for hardware decode)
- NVIDIA GPU with CUDA capability
"""
import os
import sys
import json
import subprocess
import numpy as np
from pathlib import Path
from typing import Optional, Tuple, Union
# Try to import CuPy
try:
import cupy as cp
GPU_AVAILABLE = True
except ImportError:
cp = None
GPU_AVAILABLE = False
# Check for hardware decode support
_HWDEC_AVAILABLE: Optional[bool] = None
_DECORD_GPU_AVAILABLE: Optional[bool] = None
def check_hwdec_available() -> bool:
"""Check if NVIDIA hardware decode is available."""
global _HWDEC_AVAILABLE
if _HWDEC_AVAILABLE is not None:
return _HWDEC_AVAILABLE
try:
# Check for nvidia-smi (GPU present)
result = subprocess.run(["nvidia-smi"], capture_output=True, timeout=2)
if result.returncode != 0:
_HWDEC_AVAILABLE = False
return False
# Check for nvdec in ffmpeg
result = subprocess.run(
["ffmpeg", "-hwaccels"],
capture_output=True,
text=True,
timeout=5
)
_HWDEC_AVAILABLE = "cuda" in result.stdout
except Exception:
_HWDEC_AVAILABLE = False
return _HWDEC_AVAILABLE
def check_decord_gpu_available() -> bool:
"""Check if decord with CUDA GPU decode is available."""
global _DECORD_GPU_AVAILABLE
if _DECORD_GPU_AVAILABLE is not None:
return _DECORD_GPU_AVAILABLE
try:
import decord
from decord import gpu
# Try to create a GPU context to verify CUDA support
ctx = gpu(0)
_DECORD_GPU_AVAILABLE = True
print("[streaming_gpu] decord GPU (CUDA) decode available", file=sys.stderr)
except Exception as e:
_DECORD_GPU_AVAILABLE = False
print(f"[streaming_gpu] decord GPU not available: {e}", file=sys.stderr)
return _DECORD_GPU_AVAILABLE
class GPUFrame:
"""
Frame container that tracks data location (CPU/GPU).
Enables zero-copy operations when data is already on the right device.
Lazy transfer - only moves data when actually needed.
"""
def __init__(self, data: Union[np.ndarray, 'cp.ndarray'], on_gpu: bool = None):
self._cpu_data: Optional[np.ndarray] = None
self._gpu_data = None # Optional[cp.ndarray]
if on_gpu is None:
# Auto-detect based on type
if GPU_AVAILABLE and isinstance(data, cp.ndarray):
self._gpu_data = data
else:
self._cpu_data = np.asarray(data)
elif on_gpu and GPU_AVAILABLE:
self._gpu_data = cp.asarray(data) if not isinstance(data, cp.ndarray) else data
else:
self._cpu_data = np.asarray(data) if isinstance(data, np.ndarray) else cp.asnumpy(data)
@property
def cpu(self) -> np.ndarray:
"""Get frame as numpy array (transfers from GPU if needed)."""
if self._cpu_data is None:
if self._gpu_data is not None and GPU_AVAILABLE:
self._cpu_data = cp.asnumpy(self._gpu_data)
else:
raise ValueError("No frame data available")
return self._cpu_data
@property
def gpu(self):
"""Get frame as CuPy array (transfers to GPU if needed)."""
if not GPU_AVAILABLE:
raise RuntimeError("GPU not available")
if self._gpu_data is None:
if self._cpu_data is not None:
self._gpu_data = cp.asarray(self._cpu_data)
else:
raise ValueError("No frame data available")
return self._gpu_data
@property
def is_on_gpu(self) -> bool:
"""Check if data is currently on GPU."""
return self._gpu_data is not None
@property
def shape(self) -> Tuple[int, ...]:
"""Get frame shape."""
if self._gpu_data is not None:
return self._gpu_data.shape
return self._cpu_data.shape
@property
def dtype(self):
"""Get frame dtype."""
if self._gpu_data is not None:
return self._gpu_data.dtype
return self._cpu_data.dtype
def numpy(self) -> np.ndarray:
"""Alias for cpu property."""
return self.cpu
def cupy(self):
"""Alias for gpu property."""
return self.gpu
def free_cpu(self):
"""Free CPU memory (keep GPU only)."""
if self._gpu_data is not None:
self._cpu_data = None
def free_gpu(self):
"""Free GPU memory (keep CPU only)."""
if self._cpu_data is not None:
self._gpu_data = None
class GPUVideoSource:
"""
GPU-accelerated video source using hardware decode.
Uses decord with CUDA GPU context for true NVDEC decode - frames
decode directly to GPU memory via CUDA.
Falls back to FFmpeg pipe if decord GPU unavailable (slower due to CPU copy).
"""
def __init__(self, path: str, fps: float = 30, prefer_gpu: bool = True):
self.path = Path(path)
self.fps = fps
self.prefer_gpu = prefer_gpu and GPU_AVAILABLE
self._use_decord_gpu = self.prefer_gpu and check_decord_gpu_available()
self._frame_size: Optional[Tuple[int, int]] = None
self._duration: Optional[float] = None
self._video_fps: float = 30.0
self._total_frames: int = 0
self._frame_time = 1.0 / fps
self._last_read_time = -1
self._cached_frame: Optional[GPUFrame] = None
# Decord VideoReader with GPU context
self._vr = None
self._decord_ctx = None
# FFmpeg fallback state
self._proc = None
self._stream_time = 0.0
# Initialize video source
self._init_video()
mode = "decord-GPU" if self._use_decord_gpu else ("ffmpeg-hwaccel" if check_hwdec_available() else "ffmpeg-CPU")
print(f"[GPUVideoSource] {self.path.name}: {self._frame_size}, "
f"duration={self._duration:.1f}s, mode={mode}", file=sys.stderr)
def _init_video(self):
"""Initialize video reader (decord GPU or probe for ffmpeg)."""
if self._use_decord_gpu:
try:
from decord import VideoReader, gpu
# Use GPU context for NVDEC hardware decode
self._decord_ctx = gpu(0)
self._vr = VideoReader(str(self.path), ctx=self._decord_ctx, num_threads=1)
self._total_frames = len(self._vr)
self._video_fps = self._vr.get_avg_fps()
self._duration = self._total_frames / self._video_fps
# Get frame size from first frame
first_frame = self._vr[0]
self._frame_size = (first_frame.shape[1], first_frame.shape[0])
print(f"[GPUVideoSource] decord GPU initialized: {self._frame_size}, "
f"{self._total_frames} frames @ {self._video_fps:.1f}fps", file=sys.stderr)
return
except Exception as e:
print(f"[GPUVideoSource] decord GPU init failed, falling back to ffmpeg: {e}", file=sys.stderr)
self._use_decord_gpu = False
self._vr = None
self._decord_ctx = None
# FFmpeg fallback - probe video for metadata
self._probe_video()
def _probe_video(self):
"""Probe video file for metadata (FFmpeg fallback)."""
cmd = ["ffprobe", "-v", "quiet", "-print_format", "json",
"-show_streams", "-show_format", str(self.path)]
result = subprocess.run(cmd, capture_output=True, text=True)
info = json.loads(result.stdout)
for stream in info.get("streams", []):
if stream.get("codec_type") == "video":
self._frame_size = (stream.get("width", 720), stream.get("height", 720))
if "duration" in stream:
self._duration = float(stream["duration"])
elif "tags" in stream and "DURATION" in stream["tags"]:
dur_str = stream["tags"]["DURATION"]
parts = dur_str.split(":")
if len(parts) == 3:
h, m, s = parts
self._duration = int(h) * 3600 + int(m) * 60 + float(s)
# Get fps
if "r_frame_rate" in stream:
fps_str = stream["r_frame_rate"]
if "/" in fps_str:
num, den = fps_str.split("/")
self._video_fps = float(num) / float(den)
break
if self._duration is None and "format" in info:
if "duration" in info["format"]:
self._duration = float(info["format"]["duration"])
if not self._frame_size:
self._frame_size = (720, 720)
if not self._duration:
self._duration = 60.0
self._total_frames = int(self._duration * self._video_fps)
def _start_stream(self, seek_time: float = 0):
"""Start ffmpeg decode process (fallback mode)."""
if self._proc:
self._proc.kill()
self._proc = None
if not self.path.exists():
raise FileNotFoundError(f"Video file not found: {self.path}")
w, h = self._frame_size
# Build ffmpeg command
cmd = ["ffmpeg", "-v", "error"]
# Hardware decode if available
if check_hwdec_available():
cmd.extend(["-hwaccel", "cuda"])
cmd.extend([
"-ss", f"{seek_time:.3f}",
"-i", str(self.path),
"-f", "rawvideo",
"-pix_fmt", "rgb24",
"-s", f"{w}x{h}",
"-r", str(self.fps),
"-"
])
self._proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self._stream_time = seek_time
def _read_frame_raw(self) -> Optional[np.ndarray]:
"""Read one frame from ffmpeg pipe (fallback mode)."""
w, h = self._frame_size
frame_size = w * h * 3
if not self._proc or self._proc.poll() is not None:
return None
data = self._proc.stdout.read(frame_size)
if len(data) < frame_size:
return None
return np.frombuffer(data, dtype=np.uint8).reshape((h, w, 3)).copy()
def _read_frame_decord_gpu(self, frame_idx: int) -> Optional[GPUFrame]:
"""Read frame using decord with GPU context (NVDEC, zero-copy to CuPy)."""
if self._vr is None:
return None
try:
# Handle looping
frame_idx = frame_idx % max(1, self._total_frames)
# Decode frame - with GPU context, this uses NVDEC
frame_tensor = self._vr[frame_idx]
# Convert to CuPy via DLPack (zero-copy GPU transfer)
if GPU_AVAILABLE:
# decord tensors have .to_dlpack() which returns a PyCapsule
# that CuPy can consume for zero-copy GPU transfer
try:
dlpack_capsule = frame_tensor.to_dlpack()
gpu_frame = cp.from_dlpack(dlpack_capsule)
# Log success once per source
if not getattr(self, '_dlpack_logged', False):
print(f"[GPUVideoSource] DLPack zero-copy SUCCESS - frames stay on GPU", file=sys.stderr)
self._dlpack_logged = True
return GPUFrame(gpu_frame, on_gpu=True)
except Exception as dlpack_err:
# Fallback: convert via numpy (involves CPU copy)
if not getattr(self, '_dlpack_fail_logged', False):
print(f"[GPUVideoSource] DLPack FAILED ({dlpack_err}), using CPU copy fallback", file=sys.stderr)
self._dlpack_fail_logged = True
frame_np = frame_tensor.asnumpy()
return GPUFrame(frame_np, on_gpu=True)
else:
return GPUFrame(frame_tensor.asnumpy(), on_gpu=False)
except Exception as e:
print(f"[GPUVideoSource] decord GPU read error at frame {frame_idx}: {e}", file=sys.stderr)
return None
def read_at(self, t: float) -> Optional[GPUFrame]:
"""
Read frame at specific time.
Returns GPUFrame with data on GPU if GPU mode enabled.
"""
# Cache check
if t == self._last_read_time and self._cached_frame is not None:
return self._cached_frame
# Loop time for shorter videos
seek_time = t
if self._duration and self._duration > 0:
seek_time = t % self._duration
if seek_time > self._duration - 0.1:
seek_time = 0.0
self._last_read_time = t
# Use decord GPU if available (NVDEC decode, zero-copy via DLPack)
if self._use_decord_gpu:
frame_idx = int(seek_time * self._video_fps)
self._cached_frame = self._read_frame_decord_gpu(frame_idx)
if self._cached_frame is not None:
# Free CPU copy if on GPU (saves memory)
if self.prefer_gpu and self._cached_frame.is_on_gpu:
self._cached_frame.free_cpu()
return self._cached_frame
# FFmpeg fallback
need_seek = (
self._proc is None or
self._proc.poll() is not None or
seek_time < self._stream_time - self._frame_time or
seek_time > self._stream_time + 2.0
)
if need_seek:
self._start_stream(seek_time)
# Skip frames to reach target
while self._stream_time + self._frame_time <= seek_time:
frame = self._read_frame_raw()
if frame is None:
self._start_stream(seek_time)
break
self._stream_time += self._frame_time
# Read target frame
frame_np = self._read_frame_raw()
if frame_np is None:
return self._cached_frame
self._stream_time += self._frame_time
# Create GPUFrame - transfer to GPU if in GPU mode
self._cached_frame = GPUFrame(frame_np, on_gpu=self.prefer_gpu)
# Free CPU copy if on GPU (saves memory)
if self.prefer_gpu and self._cached_frame.is_on_gpu:
self._cached_frame.free_cpu()
return self._cached_frame
def read(self) -> Optional[GPUFrame]:
"""Read current frame."""
if self._cached_frame is not None:
return self._cached_frame
return self.read_at(0)
@property
def size(self) -> Tuple[int, int]:
return self._frame_size
@property
def duration(self) -> float:
return self._duration
def close(self):
"""Close the video source."""
if self._proc:
self._proc.kill()
self._proc = None
# Release decord resources
self._vr = None
self._decord_ctx = None
# GPU-aware primitive functions
def gpu_blend(frame_a: GPUFrame, frame_b: GPUFrame, alpha: float = 0.5) -> GPUFrame:
"""
Blend two frames on GPU.
Both frames stay on GPU throughout - no CPU transfer.
"""
if not GPU_AVAILABLE:
a = frame_a.cpu.astype(np.float32)
b = frame_b.cpu.astype(np.float32)
result = (a * alpha + b * (1 - alpha)).astype(np.uint8)
return GPUFrame(result, on_gpu=False)
a = frame_a.gpu.astype(cp.float32)
b = frame_b.gpu.astype(cp.float32)
result = (a * alpha + b * (1 - alpha)).astype(cp.uint8)
return GPUFrame(result, on_gpu=True)
def gpu_resize(frame: GPUFrame, size: Tuple[int, int]) -> GPUFrame:
"""Resize frame on GPU."""
import cv2
if not GPU_AVAILABLE or not frame.is_on_gpu:
resized = cv2.resize(frame.cpu, size)
return GPUFrame(resized, on_gpu=False)
# CuPy doesn't have built-in resize, use scipy zoom
from cupyx.scipy import ndimage as cpndimage
gpu_data = frame.gpu
h, w = gpu_data.shape[:2]
target_w, target_h = size
zoom_y = target_h / h
zoom_x = target_w / w
if gpu_data.ndim == 3:
resized = cpndimage.zoom(gpu_data, (zoom_y, zoom_x, 1), order=1)
else:
resized = cpndimage.zoom(gpu_data, (zoom_y, zoom_x), order=1)
return GPUFrame(resized, on_gpu=True)
def gpu_rotate(frame: GPUFrame, angle: float) -> GPUFrame:
"""Rotate frame on GPU."""
if not GPU_AVAILABLE or not frame.is_on_gpu:
import cv2
h, w = frame.cpu.shape[:2]
center = (w // 2, h // 2)
M = cv2.getRotationMatrix2D(center, angle, 1.0)
rotated = cv2.warpAffine(frame.cpu, M, (w, h))
return GPUFrame(rotated, on_gpu=False)
from cupyx.scipy import ndimage as cpndimage
rotated = cpndimage.rotate(frame.gpu, angle, reshape=False, order=1)
return GPUFrame(rotated, on_gpu=True)
def gpu_brightness(frame: GPUFrame, factor: float) -> GPUFrame:
"""Adjust brightness on GPU."""
if not GPU_AVAILABLE or not frame.is_on_gpu:
result = np.clip(frame.cpu.astype(np.float32) * factor, 0, 255).astype(np.uint8)
return GPUFrame(result, on_gpu=False)
result = cp.clip(frame.gpu.astype(cp.float32) * factor, 0, 255).astype(cp.uint8)
return GPUFrame(result, on_gpu=True)
def gpu_composite(frames: list, weights: list = None) -> GPUFrame:
"""
Composite multiple frames with weights.
All frames processed on GPU for efficiency.
"""
if not frames:
raise ValueError("No frames to composite")
if len(frames) == 1:
return frames[0]
if weights is None:
weights = [1.0 / len(frames)] * len(frames)
# Normalize weights
total = sum(weights)
if total > 0:
weights = [w / total for w in weights]
use_gpu = GPU_AVAILABLE and any(f.is_on_gpu for f in frames)
if use_gpu:
# All on GPU
target_shape = frames[0].gpu.shape
result = cp.zeros(target_shape, dtype=cp.float32)
for frame, weight in zip(frames, weights):
gpu_data = frame.gpu.astype(cp.float32)
if gpu_data.shape != target_shape:
# Resize to match
from cupyx.scipy import ndimage as cpndimage
h, w = target_shape[:2]
fh, fw = gpu_data.shape[:2]
zoom_factors = (h/fh, w/fw, 1) if gpu_data.ndim == 3 else (h/fh, w/fw)
gpu_data = cpndimage.zoom(gpu_data, zoom_factors, order=1)
result += gpu_data * weight
return GPUFrame(cp.clip(result, 0, 255).astype(cp.uint8), on_gpu=True)
else:
# All on CPU
import cv2
target_shape = frames[0].cpu.shape
result = np.zeros(target_shape, dtype=np.float32)
for frame, weight in zip(frames, weights):
cpu_data = frame.cpu.astype(np.float32)
if cpu_data.shape != target_shape:
cpu_data = cv2.resize(cpu_data, (target_shape[1], target_shape[0]))
result += cpu_data * weight
return GPUFrame(np.clip(result, 0, 255).astype(np.uint8), on_gpu=False)
# Primitive registration for streaming interpreter
def get_primitives():
"""
Get GPU-aware primitives for registration with interpreter.
These wrap the GPU functions to work with the sexp interpreter.
"""
def prim_make_video_source_gpu(path: str, fps: float = 30):
"""Create GPU-accelerated video source."""
return GPUVideoSource(path, fps, prefer_gpu=True)
def prim_gpu_blend(a, b, alpha=0.5):
"""Blend two frames."""
fa = a if isinstance(a, GPUFrame) else GPUFrame(a)
fb = b if isinstance(b, GPUFrame) else GPUFrame(b)
result = gpu_blend(fa, fb, alpha)
return result.cpu # Return numpy for compatibility
def prim_gpu_rotate(img, angle):
"""Rotate image."""
f = img if isinstance(img, GPUFrame) else GPUFrame(img)
result = gpu_rotate(f, angle)
return result.cpu
def prim_gpu_brightness(img, factor):
"""Adjust brightness."""
f = img if isinstance(img, GPUFrame) else GPUFrame(img)
result = gpu_brightness(f, factor)
return result.cpu
return {
'streaming-gpu:make-video-source': prim_make_video_source_gpu,
'gpu:blend': prim_gpu_blend,
'gpu:rotate': prim_gpu_rotate,
'gpu:brightness': prim_gpu_brightness,
}
# Export
__all__ = [
'GPU_AVAILABLE',
'GPUFrame',
'GPUVideoSource',
'gpu_blend',
'gpu_resize',
'gpu_rotate',
'gpu_brightness',
'gpu_composite',
'get_primitives',
'check_hwdec_available',
'PRIMITIVES',
]
# Import CPU primitives from streaming.py and include them in PRIMITIVES
# This ensures audio analysis primitives are available when streaming_gpu is loaded
def _get_cpu_primitives():
from sexp_effects.primitive_libs import streaming
return streaming.PRIMITIVES
PRIMITIVES = _get_cpu_primitives().copy()
# Add GPU-specific primitives
# (The GPU video source will be added by create_cid_primitives in the task)