Files
celery/streaming/gpu_output.py
giles ed617fcdd6
Some checks are pending
GPU Worker CI/CD / test (push) Waiting to run
GPU Worker CI/CD / deploy (push) Blocked by required conditions
Fix lazy audio path resolution for GPU streaming
Audio playback path was being resolved during parsing when database
may not be ready, causing fallback to non-existent path. Now resolves
lazily when stream starts, matching how audio analyzer works.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 11:32:04 +00:00

456 lines
14 KiB
Python

"""
Zero-copy GPU video encoding output.
Uses PyNvVideoCodec for direct GPU-to-GPU encoding without CPU transfers.
Frames stay on GPU throughout: CuPy → NV12 conversion → NVENC encoding.
"""
import numpy as np
import subprocess
import sys
import threading
import queue
from pathlib import Path
from typing import Tuple, Optional, Union
import time
# Try to import GPU libraries
try:
import cupy as cp
CUPY_AVAILABLE = True
except ImportError:
cp = None
CUPY_AVAILABLE = False
try:
import PyNvVideoCodec as nvc
PYNVCODEC_AVAILABLE = True
except ImportError:
nvc = None
PYNVCODEC_AVAILABLE = False
def check_gpu_encode_available() -> bool:
"""Check if zero-copy GPU encoding is available."""
return CUPY_AVAILABLE and PYNVCODEC_AVAILABLE
# RGB to NV12 CUDA kernel
_RGB_TO_NV12_KERNEL = None
def _get_rgb_to_nv12_kernel():
"""Get or create the RGB to NV12 conversion kernel."""
global _RGB_TO_NV12_KERNEL
if _RGB_TO_NV12_KERNEL is None and CUPY_AVAILABLE:
_RGB_TO_NV12_KERNEL = cp.RawKernel(r'''
extern "C" __global__
void rgb_to_nv12(
const unsigned char* rgb,
unsigned char* y_plane,
unsigned char* uv_plane,
int width, int height
) {
int x = blockIdx.x * blockDim.x + threadIdx.x;
int y = blockIdx.y * blockDim.y + threadIdx.y;
if (x >= width || y >= height) return;
int rgb_idx = (y * width + x) * 3;
unsigned char r = rgb[rgb_idx];
unsigned char g = rgb[rgb_idx + 1];
unsigned char b = rgb[rgb_idx + 2];
// RGB to Y (BT.601)
int y_val = ((66 * r + 129 * g + 25 * b + 128) >> 8) + 16;
y_plane[y * width + x] = (unsigned char)(y_val > 255 ? 255 : (y_val < 0 ? 0 : y_val));
// UV (subsample 2x2) - only process even pixels
if ((x & 1) == 0 && (y & 1) == 0) {
int u_val = ((-38 * r - 74 * g + 112 * b + 128) >> 8) + 128;
int v_val = ((112 * r - 94 * g - 18 * b + 128) >> 8) + 128;
int uv_idx = (y / 2) * width + x;
uv_plane[uv_idx] = (unsigned char)(u_val > 255 ? 255 : (u_val < 0 ? 0 : u_val));
uv_plane[uv_idx + 1] = (unsigned char)(v_val > 255 ? 255 : (v_val < 0 ? 0 : v_val));
}
}
''', 'rgb_to_nv12')
return _RGB_TO_NV12_KERNEL
class GPUEncoder:
"""
Zero-copy GPU video encoder using PyNvVideoCodec.
Frames are converted from RGB to NV12 on GPU and encoded directly
without any CPU memory transfers.
"""
def __init__(self, width: int, height: int, fps: float = 30, crf: int = 23):
if not check_gpu_encode_available():
raise RuntimeError("GPU encoding not available (need CuPy and PyNvVideoCodec)")
self.width = width
self.height = height
self.fps = fps
self.crf = crf
# Create dummy video to get frame buffer template
self._init_frame_buffer()
# Create encoder with low-latency settings (no B-frames for immediate output)
self.encoder = nvc.CreateEncoder(
width, height, "NV12", usecpuinputbuffer=False,
bf=0, # No B-frames - immediate output
lowLatency=1, # Low latency mode
)
# CUDA kernel grid/block config
self._block = (16, 16)
self._grid = ((width + 15) // 16, (height + 15) // 16)
self._frame_count = 0
self._encoded_data = []
print(f"[GPUEncoder] Initialized {width}x{height} @ {fps}fps, zero-copy GPU encoding", file=sys.stderr)
def _init_frame_buffer(self):
"""Initialize frame buffer from dummy decode."""
# Create minimal dummy video
dummy_path = Path("/tmp/gpu_encoder_dummy.mp4")
subprocess.run([
"ffmpeg", "-y", "-f", "lavfi",
"-i", f"color=black:size={self.width}x{self.height}:duration=0.1:rate=30",
"-c:v", "h264", "-pix_fmt", "yuv420p",
str(dummy_path)
], capture_output=True)
# Decode to get frame buffer
demuxer = nvc.CreateDemuxer(str(dummy_path))
decoder = nvc.CreateDecoder(gpuid=0, usedevicememory=True)
self._template_frame = None
for _ in range(30):
packet = demuxer.Demux()
if not packet:
break
frames = decoder.Decode(packet)
if frames:
self._template_frame = frames[0]
break
if not self._template_frame:
raise RuntimeError("Failed to initialize GPU frame buffer")
# Wrap frame planes with CuPy for zero-copy access
y_ptr = self._template_frame.GetPtrToPlane(0)
uv_ptr = self._template_frame.GetPtrToPlane(1)
y_mem = cp.cuda.UnownedMemory(y_ptr, self.height * self.width, None)
self._y_plane = cp.ndarray(
(self.height, self.width), dtype=cp.uint8,
memptr=cp.cuda.MemoryPointer(y_mem, 0)
)
uv_mem = cp.cuda.UnownedMemory(uv_ptr, (self.height // 2) * self.width, None)
self._uv_plane = cp.ndarray(
(self.height // 2, self.width), dtype=cp.uint8,
memptr=cp.cuda.MemoryPointer(uv_mem, 0)
)
# Keep references to prevent GC
self._decoder = decoder
self._demuxer = demuxer
# Cleanup dummy file
dummy_path.unlink(missing_ok=True)
def encode_frame(self, frame: Union[np.ndarray, 'cp.ndarray']) -> bytes:
"""
Encode a frame (RGB format) to H.264.
Args:
frame: RGB frame as numpy or CuPy array, shape (H, W, 3)
Returns:
Encoded bytes (may be empty if frame is buffered)
"""
# Ensure frame is on GPU
if isinstance(frame, np.ndarray):
frame_gpu = cp.asarray(frame)
else:
frame_gpu = frame
# Ensure uint8
if frame_gpu.dtype != cp.uint8:
frame_gpu = cp.clip(frame_gpu, 0, 255).astype(cp.uint8)
# Ensure contiguous
if not frame_gpu.flags['C_CONTIGUOUS']:
frame_gpu = cp.ascontiguousarray(frame_gpu)
# Convert RGB to NV12 on GPU
kernel = _get_rgb_to_nv12_kernel()
kernel(self._grid, self._block, (frame_gpu, self._y_plane, self._uv_plane, self.width, self.height))
# Encode (GPU to GPU)
result = self.encoder.Encode(self._template_frame)
self._frame_count += 1
return result if result else b''
def flush(self) -> bytes:
"""Flush encoder and return remaining data."""
return self.encoder.EndEncode()
def close(self):
"""Close encoder and cleanup."""
pass
class GPUHLSOutput:
"""
GPU-accelerated HLS output with IPFS upload.
Uses zero-copy GPU encoding and writes HLS segments.
Uploads happen asynchronously in a background thread to avoid stuttering.
"""
def __init__(
self,
output_dir: str,
size: Tuple[int, int],
fps: float = 30,
segment_duration: float = 4.0,
crf: int = 23,
audio_source: str = None,
ipfs_gateway: str = "https://ipfs.io/ipfs",
on_playlist_update: callable = None,
):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.size = size
self.fps = fps
self.segment_duration = segment_duration
self.ipfs_gateway = ipfs_gateway.rstrip("/")
self._on_playlist_update = on_playlist_update
self._is_open = True
self.audio_source = audio_source
# GPU encoder
self._gpu_encoder = GPUEncoder(size[0], size[1], fps, crf)
# Segment management
self._current_segment = 0
self._frames_in_segment = 0
self._frames_per_segment = int(fps * segment_duration)
self._segment_data = []
# Track segment CIDs for IPFS
self.segment_cids = {}
self._playlist_cid = None
self._upload_lock = threading.Lock()
# Import IPFS client
from ipfs_client import add_file, add_bytes
self._ipfs_add_file = add_file
self._ipfs_add_bytes = add_bytes
# Background upload thread
self._upload_queue = queue.Queue()
self._upload_thread = threading.Thread(target=self._upload_worker, daemon=True)
self._upload_thread.start()
# Setup ffmpeg for muxing (takes raw H.264, outputs .ts segments)
self._setup_muxer()
print(f"[GPUHLSOutput] Initialized {size[0]}x{size[1]} @ {fps}fps, GPU encoding", file=sys.stderr)
def _setup_muxer(self):
"""Setup ffmpeg for muxing H.264 to MPEG-TS segments with optional audio."""
self.local_playlist_path = self.output_dir / "stream.m3u8"
cmd = [
"ffmpeg", "-y",
"-f", "h264", # Input is raw H.264
"-i", "-",
]
# Add audio input if provided
if self.audio_source:
cmd.extend(["-i", str(self.audio_source)])
cmd.extend(["-map", "0:v", "-map", "1:a"])
cmd.extend([
"-c:v", "copy", # Just copy video, no re-encoding
])
# Add audio codec if we have audio
if self.audio_source:
cmd.extend(["-c:a", "aac", "-b:a", "128k", "-shortest"])
cmd.extend([
"-f", "hls",
"-hls_time", str(self.segment_duration),
"-hls_list_size", "0",
"-hls_flags", "independent_segments+append_list+split_by_time",
"-hls_segment_type", "mpegts",
"-hls_segment_filename", str(self.output_dir / "segment_%05d.ts"),
str(self.local_playlist_path),
])
print(f"[GPUHLSOutput] FFmpeg cmd: {' '.join(cmd)}", file=sys.stderr)
self._muxer = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE, # Capture stderr for debugging
)
def write(self, frame: Union[np.ndarray, 'cp.ndarray'], t: float = 0):
"""Write a frame using GPU encoding."""
if not self._is_open:
return
# GPU encode
encoded = self._gpu_encoder.encode_frame(frame)
# Send to muxer
if encoded:
try:
self._muxer.stdin.write(encoded)
except BrokenPipeError:
self._is_open = False
return
self._frames_in_segment += 1
# Check for segment completion
if self._frames_in_segment >= self._frames_per_segment:
self._frames_in_segment = 0
self._check_upload_segments()
def _upload_worker(self):
"""Background worker thread for async IPFS uploads."""
while True:
try:
item = self._upload_queue.get(timeout=1.0)
if item is None: # Shutdown signal
break
seg_path, seg_num = item
self._do_upload(seg_path, seg_num)
except queue.Empty:
continue
except Exception as e:
print(f"Upload worker error: {e}", file=sys.stderr)
def _do_upload(self, seg_path: Path, seg_num: int):
"""Actually perform the upload (runs in background thread)."""
try:
cid = self._ipfs_add_file(seg_path, pin=True)
if cid:
with self._upload_lock:
self.segment_cids[seg_num] = cid
print(f"Added to IPFS: {seg_path.name} -> {cid}", file=sys.stderr)
self._update_playlist()
except Exception as e:
print(f"Failed to add to IPFS: {e}", file=sys.stderr)
def _check_upload_segments(self):
"""Check for and queue new segments for async IPFS upload."""
segments = sorted(self.output_dir.glob("segment_*.ts"))
for seg_path in segments:
seg_num = int(seg_path.stem.split("_")[1])
with self._upload_lock:
if seg_num in self.segment_cids:
continue
# Check if segment is complete (quick check, no blocking)
try:
size1 = seg_path.stat().st_size
if size1 == 0:
continue
# Quick non-blocking check
time.sleep(0.01)
size2 = seg_path.stat().st_size
if size1 != size2:
continue
except FileNotFoundError:
continue
# Queue for async upload (non-blocking!)
self._upload_queue.put((seg_path, seg_num))
def _update_playlist(self):
"""Generate and upload IPFS-aware playlist."""
with self._upload_lock:
if not self.segment_cids:
return
lines = [
"#EXTM3U",
"#EXT-X-VERSION:3",
f"#EXT-X-TARGETDURATION:{int(self.segment_duration) + 1}",
"#EXT-X-MEDIA-SEQUENCE:0",
]
for seg_num in sorted(self.segment_cids.keys()):
cid = self.segment_cids[seg_num]
lines.append(f"#EXTINF:{self.segment_duration:.3f},")
lines.append(f"{self.ipfs_gateway}/{cid}")
playlist_content = "\n".join(lines) + "\n"
# Upload playlist
self._playlist_cid = self._ipfs_add_bytes(playlist_content.encode(), pin=True)
if self._playlist_cid and self._on_playlist_update:
self._on_playlist_update(self._playlist_cid)
def close(self):
"""Close output and flush remaining data."""
if not self._is_open:
return
self._is_open = False
# Flush GPU encoder
final_data = self._gpu_encoder.flush()
if final_data:
try:
self._muxer.stdin.write(final_data)
except:
pass
# Close muxer
try:
self._muxer.stdin.close()
self._muxer.wait(timeout=10)
except:
self._muxer.kill()
# Final segment upload
self._check_upload_segments()
# Wait for pending uploads to complete
self._upload_queue.put(None) # Signal shutdown
self._upload_thread.join(timeout=30)
self._gpu_encoder.close()
@property
def is_open(self) -> bool:
return self._is_open
@property
def playlist_cid(self) -> Optional[str]:
return self._playlist_cid
@property
def playlist_url(self) -> Optional[str]:
"""Get the full IPFS URL for the playlist."""
if self._playlist_cid:
return f"{self.ipfs_gateway}/{self._playlist_cid}"
return None