Add decord for GPU-native video decode
Some checks are pending
GPU Worker CI/CD / test (push) Waiting to run
GPU Worker CI/CD / deploy (push) Blocked by required conditions

- Install decord in GPU Dockerfile for hardware video decode
- Update GPUVideoSource to use decord with GPU context
- Decord decodes on GPU via NVDEC, avoiding CPU memory copies
- Falls back to FFmpeg pipe if decord unavailable
- Enable STREAMING_GPU_PERSIST=1 for full GPU pipeline

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
giles
2026-02-04 01:17:22 +00:00
parent ef4bc24eda
commit 771fb8cebc
2 changed files with 123 additions and 18 deletions

View File

@@ -26,6 +26,10 @@ RUN pip install --no-cache-dir -r requirements.txt
# Install GPU-specific dependencies (CuPy for CUDA 12.x)
RUN pip install --no-cache-dir cupy-cuda12x
# Install decord for GPU-accelerated video decoding (keeps frames on GPU)
# This avoids CPU<->GPU memory transfers during video decode
RUN pip install --no-cache-dir decord
# Copy application
COPY . .
@@ -39,8 +43,8 @@ ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
ENV EFFECTS_PATH=/app/artdag-effects
ENV PYTHONPATH=/app
# GPU persistence disabled until all primitives support GPU frames
ENV STREAMING_GPU_PERSIST=0
# GPU persistence enabled - frames stay on GPU throughout pipeline
ENV STREAMING_GPU_PERSIST=1
# Use cluster's public IPFS gateway for HLS segment URLs
ENV IPFS_GATEWAY_URL=https://celery-artdag.rose-ash.com/ipfs

View File

@@ -34,6 +34,7 @@ except ImportError:
# Check for hardware decode support
_HWDEC_AVAILABLE: Optional[bool] = None
_DECORD_AVAILABLE: Optional[bool] = None
def check_hwdec_available() -> bool:
@@ -63,6 +64,25 @@ def check_hwdec_available() -> bool:
return _HWDEC_AVAILABLE
def check_decord_available() -> bool:
"""Check if decord GPU decode is available."""
global _DECORD_AVAILABLE
if _DECORD_AVAILABLE is not None:
return _DECORD_AVAILABLE
try:
import decord
from decord import gpu
# Try to create a GPU context - this will fail if CUDA isn't properly set up
_DECORD_AVAILABLE = True
print("[streaming_gpu] decord GPU decode available", file=sys.stderr)
except Exception as e:
_DECORD_AVAILABLE = False
print(f"[streaming_gpu] decord not available: {e}", file=sys.stderr)
return _DECORD_AVAILABLE
class GPUFrame:
"""
Frame container that tracks data location (CPU/GPU).
@@ -150,33 +170,65 @@ class GPUVideoSource:
"""
GPU-accelerated video source using hardware decode.
Uses NVDEC for hardware video decoding when available,
keeping decoded frames in GPU memory for zero-copy processing.
Uses decord with GPU context for true zero-copy NVDEC decode,
keeping decoded frames in GPU memory throughout.
Falls back to CPU decode if hardware decode unavailable.
Falls back to FFmpeg pipe if decord unavailable (slower due to CPU copy).
"""
def __init__(self, path: str, fps: float = 30, prefer_gpu: bool = True):
self.path = Path(path)
self.fps = fps
self.prefer_gpu = prefer_gpu and GPU_AVAILABLE and check_hwdec_available()
self.prefer_gpu = prefer_gpu and GPU_AVAILABLE
self._use_decord = self.prefer_gpu and check_decord_available()
self._frame_size: Optional[Tuple[int, int]] = None
self._duration: Optional[float] = None
self._proc = None
self._stream_time = 0.0
self._video_fps: float = 30.0
self._total_frames: int = 0
self._frame_time = 1.0 / fps
self._last_read_time = -1
self._cached_frame: Optional[GPUFrame] = None
# Get video info
# Decord VideoReader (GPU context)
self._vr = None
# FFmpeg fallback state
self._proc = None
self._stream_time = 0.0
# Initialize video source
self._init_video()
mode = "decord-GPU" if self._use_decord else ("ffmpeg-hwaccel" if check_hwdec_available() else "ffmpeg-CPU")
print(f"[GPUVideoSource] {self.path.name}: {self._frame_size}, "
f"duration={self._duration:.1f}s, mode={mode}", file=sys.stderr)
def _init_video(self):
"""Initialize video reader (decord or probe for ffmpeg)."""
if self._use_decord:
try:
from decord import VideoReader, gpu, cpu
# Use GPU context for hardware decode
ctx = gpu(0) if self.prefer_gpu else cpu(0)
self._vr = VideoReader(str(self.path), ctx=ctx, num_threads=1)
self._total_frames = len(self._vr)
self._video_fps = self._vr.get_avg_fps()
self._duration = self._total_frames / self._video_fps
# Get frame size from first frame shape
first_frame = self._vr[0].asnumpy()
self._frame_size = (first_frame.shape[1], first_frame.shape[0])
return
except Exception as e:
print(f"[GPUVideoSource] decord init failed, falling back to ffmpeg: {e}", file=sys.stderr)
self._use_decord = False
self._vr = None
# FFmpeg fallback - probe video
self._probe_video()
print(f"[GPUVideoSource] {self.path.name}: {self._frame_size}, "
f"duration={self._duration:.1f}s, gpu={self.prefer_gpu}", file=sys.stderr)
def _probe_video(self):
"""Probe video file for metadata."""
"""Probe video file for metadata (FFmpeg fallback)."""
cmd = ["ffprobe", "-v", "quiet", "-print_format", "json",
"-show_streams", "-show_format", str(self.path)]
result = subprocess.run(cmd, capture_output=True, text=True)
@@ -193,6 +245,12 @@ class GPUVideoSource:
if len(parts) == 3:
h, m, s = parts
self._duration = int(h) * 3600 + int(m) * 60 + float(s)
# Get fps
if "r_frame_rate" in stream:
fps_str = stream["r_frame_rate"]
if "/" in fps_str:
num, den = fps_str.split("/")
self._video_fps = float(num) / float(den)
break
if self._duration is None and "format" in info:
@@ -204,8 +262,10 @@ class GPUVideoSource:
if not self._duration:
self._duration = 60.0
self._total_frames = int(self._duration * self._video_fps)
def _start_stream(self, seek_time: float = 0):
"""Start ffmpeg decode process."""
"""Start ffmpeg decode process (fallback mode)."""
if self._proc:
self._proc.kill()
self._proc = None
@@ -219,7 +279,7 @@ class GPUVideoSource:
cmd = ["ffmpeg", "-v", "error"]
# Hardware decode if available
if self.prefer_gpu:
if check_hwdec_available():
cmd.extend(["-hwaccel", "cuda"])
cmd.extend([
@@ -236,7 +296,7 @@ class GPUVideoSource:
self._stream_time = seek_time
def _read_frame_raw(self) -> Optional[np.ndarray]:
"""Read one frame from ffmpeg pipe."""
"""Read one frame from ffmpeg pipe (fallback mode)."""
w, h = self._frame_size
frame_size = w * h * 3
@@ -249,6 +309,34 @@ class GPUVideoSource:
return np.frombuffer(data, dtype=np.uint8).reshape((h, w, 3)).copy()
def _read_frame_decord(self, frame_idx: int) -> Optional[GPUFrame]:
"""Read frame using decord (GPU-native)."""
if self._vr is None:
return None
try:
# Handle looping
frame_idx = frame_idx % self._total_frames
# Decord returns a tensor - asnumpy() gives numpy array
# With GPU context, decode happens on GPU, but asnumpy() copies to CPU
# For true zero-copy, we need to use decord's GPU tensor directly
frame_data = self._vr[frame_idx]
# If using GPU context, try to get data directly on GPU
if self.prefer_gpu and GPU_AVAILABLE:
# frame_data is a decord NDArray - convert to numpy then to CuPy
# This still involves a copy, but decode was on GPU (faster)
frame_np = frame_data.asnumpy()
# Create GPUFrame and transfer to GPU
return GPUFrame(frame_np, on_gpu=True)
else:
return GPUFrame(frame_data.asnumpy(), on_gpu=False)
except Exception as e:
print(f"[GPUVideoSource] decord read error at frame {frame_idx}: {e}", file=sys.stderr)
return None
def read_at(self, t: float) -> Optional[GPUFrame]:
"""
Read frame at specific time.
@@ -266,7 +354,19 @@ class GPUVideoSource:
if seek_time > self._duration - 0.1:
seek_time = 0.0
# Determine if we need to seek
self._last_read_time = t
# Use decord if available (GPU-native decode)
if self._use_decord:
frame_idx = int(seek_time * self._video_fps)
self._cached_frame = self._read_frame_decord(frame_idx)
if self._cached_frame is not None:
# Free CPU copy if on GPU (saves memory)
if self.prefer_gpu and self._cached_frame.is_on_gpu:
self._cached_frame.free_cpu()
return self._cached_frame
# FFmpeg fallback
need_seek = (
self._proc is None or
self._proc.poll() is not None or
@@ -291,7 +391,6 @@ class GPUVideoSource:
return self._cached_frame
self._stream_time += self._frame_time
self._last_read_time = t
# Create GPUFrame - transfer to GPU if in GPU mode
self._cached_frame = GPUFrame(frame_np, on_gpu=self.prefer_gpu)
@@ -321,6 +420,8 @@ class GPUVideoSource:
if self._proc:
self._proc.kill()
self._proc = None
if self._vr is not None:
self._vr = None # Release decord VideoReader
# GPU-aware primitive functions