diff --git a/Dockerfile.gpu b/Dockerfile.gpu index f57e850..e927738 100644 --- a/Dockerfile.gpu +++ b/Dockerfile.gpu @@ -26,9 +26,9 @@ RUN pip install --no-cache-dir -r requirements.txt # Install GPU-specific dependencies (CuPy for CUDA 12.x) RUN pip install --no-cache-dir cupy-cuda12x -# Install decord for GPU-accelerated video decoding (keeps frames on GPU) -# This avoids CPU<->GPU memory transfers during video decode -RUN pip install --no-cache-dir decord +# Install PyNvCodec for true GPU-native video decoding (NVDEC) +# Frames decode directly to GPU memory - zero CPU transfer +RUN pip install --no-cache-dir PyNvCodec # Copy application COPY . . diff --git a/sexp_effects/primitive_libs/streaming_gpu.py b/sexp_effects/primitive_libs/streaming_gpu.py index e4a0051..71b5065 100644 --- a/sexp_effects/primitive_libs/streaming_gpu.py +++ b/sexp_effects/primitive_libs/streaming_gpu.py @@ -34,7 +34,7 @@ except ImportError: # Check for hardware decode support _HWDEC_AVAILABLE: Optional[bool] = None -_DECORD_AVAILABLE: Optional[bool] = None +_PYNVCODEC_AVAILABLE: Optional[bool] = None def check_hwdec_available() -> bool: @@ -64,23 +64,21 @@ def check_hwdec_available() -> bool: return _HWDEC_AVAILABLE -def check_decord_available() -> bool: - """Check if decord GPU decode is available.""" - global _DECORD_AVAILABLE - if _DECORD_AVAILABLE is not None: - return _DECORD_AVAILABLE +def check_pynvcodec_available() -> bool: + """Check if PyNvCodec GPU decode is available.""" + global _PYNVCODEC_AVAILABLE + if _PYNVCODEC_AVAILABLE is not None: + return _PYNVCODEC_AVAILABLE try: - import decord - from decord import gpu - # Try to create a GPU context - this will fail if CUDA isn't properly set up - _DECORD_AVAILABLE = True - print("[streaming_gpu] decord GPU decode available", file=sys.stderr) + import PyNvCodec as nvc + _PYNVCODEC_AVAILABLE = True + print("[streaming_gpu] PyNvCodec GPU decode available", file=sys.stderr) except Exception as e: - _DECORD_AVAILABLE = False - print(f"[streaming_gpu] decord not available: {e}", file=sys.stderr) + _PYNVCODEC_AVAILABLE = False + print(f"[streaming_gpu] PyNvCodec not available: {e}", file=sys.stderr) - return _DECORD_AVAILABLE + return _PYNVCODEC_AVAILABLE class GPUFrame: @@ -170,17 +168,17 @@ class GPUVideoSource: """ GPU-accelerated video source using hardware decode. - Uses decord with GPU context for true zero-copy NVDEC decode, - keeping decoded frames in GPU memory throughout. + Uses PyNvCodec for true zero-copy NVDEC decode - frames go directly + to GPU memory without any CPU transfer. - Falls back to FFmpeg pipe if decord unavailable (slower due to CPU copy). + Falls back to FFmpeg pipe if PyNvCodec unavailable (slower due to CPU copy). """ def __init__(self, path: str, fps: float = 30, prefer_gpu: bool = True): self.path = Path(path) self.fps = fps self.prefer_gpu = prefer_gpu and GPU_AVAILABLE - self._use_decord = self.prefer_gpu and check_decord_available() + self._use_pynvcodec = self.prefer_gpu and check_pynvcodec_available() self._frame_size: Optional[Tuple[int, int]] = None self._duration: Optional[float] = None @@ -190,8 +188,11 @@ class GPUVideoSource: self._last_read_time = -1 self._cached_frame: Optional[GPUFrame] = None - # Decord VideoReader (GPU context) - self._vr = None + # PyNvCodec decoder components + self._nvdec = None + self._nv_cvt = None # NV12 to RGB converter + self._nv_dwn = None # GPU to CPU downloader (for fallback) + self._gpu_id = 0 # FFmpeg fallback state self._proc = None @@ -200,32 +201,44 @@ class GPUVideoSource: # Initialize video source self._init_video() - mode = "decord-GPU" if self._use_decord else ("ffmpeg-hwaccel" if check_hwdec_available() else "ffmpeg-CPU") + mode = "PyNvCodec-GPU" if self._use_pynvcodec else ("ffmpeg-hwaccel" if check_hwdec_available() else "ffmpeg-CPU") print(f"[GPUVideoSource] {self.path.name}: {self._frame_size}, " f"duration={self._duration:.1f}s, mode={mode}", file=sys.stderr) def _init_video(self): - """Initialize video reader (decord or probe for ffmpeg).""" - if self._use_decord: + """Initialize video reader (PyNvCodec or probe for ffmpeg).""" + # First probe video for metadata (needed for both paths) + self._probe_video() + + if self._use_pynvcodec: try: - from decord import VideoReader, gpu, cpu - # Use GPU context for hardware decode - ctx = gpu(0) if self.prefer_gpu else cpu(0) - self._vr = VideoReader(str(self.path), ctx=ctx, num_threads=1) - self._total_frames = len(self._vr) - self._video_fps = self._vr.get_avg_fps() - self._duration = self._total_frames / self._video_fps - # Get frame size from first frame shape - first_frame = self._vr[0].asnumpy() - self._frame_size = (first_frame.shape[1], first_frame.shape[0]) + import PyNvCodec as nvc + + # Create NVDEC decoder - decodes directly to GPU + self._nvdec = nvc.PyNvDecoder( + str(self.path), + self._gpu_id + ) + + # Get actual dimensions from decoder + self._frame_size = (self._nvdec.Width(), self._nvdec.Height()) + + # Create color converter: NV12 (decoder output) -> RGB + self._nv_cvt = nvc.PySurfaceConverter( + self._nvdec.Width(), + self._nvdec.Height(), + nvc.PixelFormat.NV12, + nvc.PixelFormat.RGB, + self._gpu_id + ) + + print(f"[GPUVideoSource] PyNvCodec initialized: {self._frame_size}", file=sys.stderr) return except Exception as e: - print(f"[GPUVideoSource] decord init failed, falling back to ffmpeg: {e}", file=sys.stderr) - self._use_decord = False - self._vr = None - - # FFmpeg fallback - probe video - self._probe_video() + print(f"[GPUVideoSource] PyNvCodec init failed, falling back to ffmpeg: {e}", file=sys.stderr) + self._use_pynvcodec = False + self._nvdec = None + self._nv_cvt = None def _probe_video(self): """Probe video file for metadata (FFmpeg fallback).""" @@ -309,32 +322,72 @@ class GPUVideoSource: return np.frombuffer(data, dtype=np.uint8).reshape((h, w, 3)).copy() - def _read_frame_decord(self, frame_idx: int) -> Optional[GPUFrame]: - """Read frame using decord (GPU-native).""" - if self._vr is None: + def _read_frame_pynvcodec(self, target_time: float) -> Optional[GPUFrame]: + """Read frame using PyNvCodec (true GPU-native, zero CPU copy).""" + if self._nvdec is None: return None try: - # Handle looping - frame_idx = frame_idx % self._total_frames + import PyNvCodec as nvc - # Decord returns a tensor - asnumpy() gives numpy array - # With GPU context, decode happens on GPU, but asnumpy() copies to CPU - # For true zero-copy, we need to use decord's GPU tensor directly - frame_data = self._vr[frame_idx] + # Seek if needed (PyNvCodec uses frame numbers) + target_frame = int(target_time * self._video_fps) + target_frame = target_frame % max(1, self._total_frames) # Loop - # If using GPU context, try to get data directly on GPU - if self.prefer_gpu and GPU_AVAILABLE: - # frame_data is a decord NDArray - convert to numpy then to CuPy - # This still involves a copy, but decode was on GPU (faster) - frame_np = frame_data.asnumpy() - # Create GPUFrame and transfer to GPU - return GPUFrame(frame_np, on_gpu=True) + # Decode frame - returns surface in GPU memory + raw_surface = self._nvdec.DecodeSingleSurface() + if raw_surface.Empty(): + # Try to seek and decode again + seek_ctx = nvc.SeekContext(target_frame) + self._nvdec.DecodeSingleSurface(seek_ctx) + raw_surface = self._nvdec.DecodeSingleSurface() + if raw_surface.Empty(): + return None + + # Convert NV12 -> RGB on GPU + rgb_surface = self._nv_cvt.Execute(raw_surface) + if rgb_surface.Empty(): + return None + + # Get as CuPy array - stays on GPU! + if GPU_AVAILABLE: + # Create CuPy array from GPU surface pointer + # PyNvCodec surfaces can be converted to numpy, then to cupy + # But for true zero-copy, we use the CUDA pointer directly + frame_ptr = rgb_surface.PlanePtr() + pitch = rgb_surface.Pitch() + height = rgb_surface.Height() + width = rgb_surface.Width() + + # Create cupy array from device pointer + # Note: PyNvCodec stores data in pitched format + mem = cp.cuda.UnownedMemory(frame_ptr.GpuMem(), pitch * height * 3, None) + memptr = cp.cuda.MemoryPointer(mem, 0) + gpu_frame = cp.ndarray((height, width, 3), dtype=cp.uint8, memptr=memptr) + + # Make a copy to ensure we own the memory (surface may be reused) + gpu_frame = gpu_frame.copy() + + return GPUFrame(gpu_frame, on_gpu=True) else: - return GPUFrame(frame_data.asnumpy(), on_gpu=False) + # Fallback to CPU + frame_np = np.ndarray( + shape=(rgb_surface.Height(), rgb_surface.Width(), 3), + dtype=np.uint8 + ) + # Download to CPU (not ideal but works) + if self._nv_dwn is None: + self._nv_dwn = nvc.PySurfaceDownloader( + rgb_surface.Width(), + rgb_surface.Height(), + nvc.PixelFormat.RGB, + self._gpu_id + ) + self._nv_dwn.DownloadSingleSurface(rgb_surface, frame_np) + return GPUFrame(frame_np, on_gpu=False) except Exception as e: - print(f"[GPUVideoSource] decord read error at frame {frame_idx}: {e}", file=sys.stderr) + print(f"[GPUVideoSource] PyNvCodec read error at t={target_time:.2f}: {e}", file=sys.stderr) return None def read_at(self, t: float) -> Optional[GPUFrame]: @@ -356,10 +409,9 @@ class GPUVideoSource: self._last_read_time = t - # Use decord if available (GPU-native decode) - if self._use_decord: - frame_idx = int(seek_time * self._video_fps) - self._cached_frame = self._read_frame_decord(frame_idx) + # Use PyNvCodec if available (true GPU-native decode, zero CPU copy) + if self._use_pynvcodec: + self._cached_frame = self._read_frame_pynvcodec(seek_time) if self._cached_frame is not None: # Free CPU copy if on GPU (saves memory) if self.prefer_gpu and self._cached_frame.is_on_gpu: @@ -420,8 +472,10 @@ class GPUVideoSource: if self._proc: self._proc.kill() self._proc = None - if self._vr is not None: - self._vr = None # Release decord VideoReader + # Release PyNvCodec resources + self._nvdec = None + self._nv_cvt = None + self._nv_dwn = None # GPU-aware primitive functions