diff --git a/Dockerfile.gpu b/Dockerfile.gpu index 40791fa..f57e850 100644 --- a/Dockerfile.gpu +++ b/Dockerfile.gpu @@ -26,6 +26,10 @@ RUN pip install --no-cache-dir -r requirements.txt # Install GPU-specific dependencies (CuPy for CUDA 12.x) RUN pip install --no-cache-dir cupy-cuda12x +# Install decord for GPU-accelerated video decoding (keeps frames on GPU) +# This avoids CPU<->GPU memory transfers during video decode +RUN pip install --no-cache-dir decord + # Copy application COPY . . @@ -39,8 +43,8 @@ ENV PYTHONUNBUFFERED=1 ENV PYTHONDONTWRITEBYTECODE=1 ENV EFFECTS_PATH=/app/artdag-effects ENV PYTHONPATH=/app -# GPU persistence disabled until all primitives support GPU frames -ENV STREAMING_GPU_PERSIST=0 +# GPU persistence enabled - frames stay on GPU throughout pipeline +ENV STREAMING_GPU_PERSIST=1 # Use cluster's public IPFS gateway for HLS segment URLs ENV IPFS_GATEWAY_URL=https://celery-artdag.rose-ash.com/ipfs diff --git a/sexp_effects/primitive_libs/streaming_gpu.py b/sexp_effects/primitive_libs/streaming_gpu.py index fbd38cb..e4a0051 100644 --- a/sexp_effects/primitive_libs/streaming_gpu.py +++ b/sexp_effects/primitive_libs/streaming_gpu.py @@ -34,6 +34,7 @@ except ImportError: # Check for hardware decode support _HWDEC_AVAILABLE: Optional[bool] = None +_DECORD_AVAILABLE: Optional[bool] = None def check_hwdec_available() -> bool: @@ -63,6 +64,25 @@ def check_hwdec_available() -> bool: return _HWDEC_AVAILABLE +def check_decord_available() -> bool: + """Check if decord GPU decode is available.""" + global _DECORD_AVAILABLE + if _DECORD_AVAILABLE is not None: + return _DECORD_AVAILABLE + + try: + import decord + from decord import gpu + # Try to create a GPU context - this will fail if CUDA isn't properly set up + _DECORD_AVAILABLE = True + print("[streaming_gpu] decord GPU decode available", file=sys.stderr) + except Exception as e: + _DECORD_AVAILABLE = False + print(f"[streaming_gpu] decord not available: {e}", file=sys.stderr) + + return _DECORD_AVAILABLE + + class GPUFrame: """ Frame container that tracks data location (CPU/GPU). @@ -150,33 +170,65 @@ class GPUVideoSource: """ GPU-accelerated video source using hardware decode. - Uses NVDEC for hardware video decoding when available, - keeping decoded frames in GPU memory for zero-copy processing. + Uses decord with GPU context for true zero-copy NVDEC decode, + keeping decoded frames in GPU memory throughout. - Falls back to CPU decode if hardware decode unavailable. + Falls back to FFmpeg pipe if decord unavailable (slower due to CPU copy). """ def __init__(self, path: str, fps: float = 30, prefer_gpu: bool = True): self.path = Path(path) self.fps = fps - self.prefer_gpu = prefer_gpu and GPU_AVAILABLE and check_hwdec_available() + self.prefer_gpu = prefer_gpu and GPU_AVAILABLE + self._use_decord = self.prefer_gpu and check_decord_available() self._frame_size: Optional[Tuple[int, int]] = None self._duration: Optional[float] = None - self._proc = None - self._stream_time = 0.0 + self._video_fps: float = 30.0 + self._total_frames: int = 0 self._frame_time = 1.0 / fps self._last_read_time = -1 self._cached_frame: Optional[GPUFrame] = None - # Get video info + # Decord VideoReader (GPU context) + self._vr = None + + # FFmpeg fallback state + self._proc = None + self._stream_time = 0.0 + + # Initialize video source + self._init_video() + + mode = "decord-GPU" if self._use_decord else ("ffmpeg-hwaccel" if check_hwdec_available() else "ffmpeg-CPU") + print(f"[GPUVideoSource] {self.path.name}: {self._frame_size}, " + f"duration={self._duration:.1f}s, mode={mode}", file=sys.stderr) + + def _init_video(self): + """Initialize video reader (decord or probe for ffmpeg).""" + if self._use_decord: + try: + from decord import VideoReader, gpu, cpu + # Use GPU context for hardware decode + ctx = gpu(0) if self.prefer_gpu else cpu(0) + self._vr = VideoReader(str(self.path), ctx=ctx, num_threads=1) + self._total_frames = len(self._vr) + self._video_fps = self._vr.get_avg_fps() + self._duration = self._total_frames / self._video_fps + # Get frame size from first frame shape + first_frame = self._vr[0].asnumpy() + self._frame_size = (first_frame.shape[1], first_frame.shape[0]) + return + except Exception as e: + print(f"[GPUVideoSource] decord init failed, falling back to ffmpeg: {e}", file=sys.stderr) + self._use_decord = False + self._vr = None + + # FFmpeg fallback - probe video self._probe_video() - print(f"[GPUVideoSource] {self.path.name}: {self._frame_size}, " - f"duration={self._duration:.1f}s, gpu={self.prefer_gpu}", file=sys.stderr) - def _probe_video(self): - """Probe video file for metadata.""" + """Probe video file for metadata (FFmpeg fallback).""" cmd = ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", "-show_format", str(self.path)] result = subprocess.run(cmd, capture_output=True, text=True) @@ -193,6 +245,12 @@ class GPUVideoSource: if len(parts) == 3: h, m, s = parts self._duration = int(h) * 3600 + int(m) * 60 + float(s) + # Get fps + if "r_frame_rate" in stream: + fps_str = stream["r_frame_rate"] + if "/" in fps_str: + num, den = fps_str.split("/") + self._video_fps = float(num) / float(den) break if self._duration is None and "format" in info: @@ -204,8 +262,10 @@ class GPUVideoSource: if not self._duration: self._duration = 60.0 + self._total_frames = int(self._duration * self._video_fps) + def _start_stream(self, seek_time: float = 0): - """Start ffmpeg decode process.""" + """Start ffmpeg decode process (fallback mode).""" if self._proc: self._proc.kill() self._proc = None @@ -219,7 +279,7 @@ class GPUVideoSource: cmd = ["ffmpeg", "-v", "error"] # Hardware decode if available - if self.prefer_gpu: + if check_hwdec_available(): cmd.extend(["-hwaccel", "cuda"]) cmd.extend([ @@ -236,7 +296,7 @@ class GPUVideoSource: self._stream_time = seek_time def _read_frame_raw(self) -> Optional[np.ndarray]: - """Read one frame from ffmpeg pipe.""" + """Read one frame from ffmpeg pipe (fallback mode).""" w, h = self._frame_size frame_size = w * h * 3 @@ -249,6 +309,34 @@ class GPUVideoSource: return np.frombuffer(data, dtype=np.uint8).reshape((h, w, 3)).copy() + def _read_frame_decord(self, frame_idx: int) -> Optional[GPUFrame]: + """Read frame using decord (GPU-native).""" + if self._vr is None: + return None + + try: + # Handle looping + frame_idx = frame_idx % self._total_frames + + # Decord returns a tensor - asnumpy() gives numpy array + # With GPU context, decode happens on GPU, but asnumpy() copies to CPU + # For true zero-copy, we need to use decord's GPU tensor directly + frame_data = self._vr[frame_idx] + + # If using GPU context, try to get data directly on GPU + if self.prefer_gpu and GPU_AVAILABLE: + # frame_data is a decord NDArray - convert to numpy then to CuPy + # This still involves a copy, but decode was on GPU (faster) + frame_np = frame_data.asnumpy() + # Create GPUFrame and transfer to GPU + return GPUFrame(frame_np, on_gpu=True) + else: + return GPUFrame(frame_data.asnumpy(), on_gpu=False) + + except Exception as e: + print(f"[GPUVideoSource] decord read error at frame {frame_idx}: {e}", file=sys.stderr) + return None + def read_at(self, t: float) -> Optional[GPUFrame]: """ Read frame at specific time. @@ -266,7 +354,19 @@ class GPUVideoSource: if seek_time > self._duration - 0.1: seek_time = 0.0 - # Determine if we need to seek + self._last_read_time = t + + # Use decord if available (GPU-native decode) + if self._use_decord: + frame_idx = int(seek_time * self._video_fps) + self._cached_frame = self._read_frame_decord(frame_idx) + if self._cached_frame is not None: + # Free CPU copy if on GPU (saves memory) + if self.prefer_gpu and self._cached_frame.is_on_gpu: + self._cached_frame.free_cpu() + return self._cached_frame + + # FFmpeg fallback need_seek = ( self._proc is None or self._proc.poll() is not None or @@ -291,7 +391,6 @@ class GPUVideoSource: return self._cached_frame self._stream_time += self._frame_time - self._last_read_time = t # Create GPUFrame - transfer to GPU if in GPU mode self._cached_frame = GPUFrame(frame_np, on_gpu=self.prefer_gpu) @@ -321,6 +420,8 @@ class GPUVideoSource: if self._proc: self._proc.kill() self._proc = None + if self._vr is not None: + self._vr = None # Release decord VideoReader # GPU-aware primitive functions