Files
celery/streaming/gpu_output.py
giles b15e381f81
Some checks are pending
GPU Worker CI/CD / test (push) Waiting to run
GPU Worker CI/CD / deploy (push) Blocked by required conditions
Use /ipfs-ts/ path for HLS segments to get correct MIME type
2026-02-04 12:46:54 +00:00

458 lines
14 KiB
Python

"""
Zero-copy GPU video encoding output.
Uses PyNvVideoCodec for direct GPU-to-GPU encoding without CPU transfers.
Frames stay on GPU throughout: CuPy → NV12 conversion → NVENC encoding.
"""
import numpy as np
import subprocess
import sys
import threading
import queue
from pathlib import Path
from typing import Tuple, Optional, Union
import time
# Try to import GPU libraries
try:
import cupy as cp
CUPY_AVAILABLE = True
except ImportError:
cp = None
CUPY_AVAILABLE = False
try:
import PyNvVideoCodec as nvc
PYNVCODEC_AVAILABLE = True
except ImportError:
nvc = None
PYNVCODEC_AVAILABLE = False
def check_gpu_encode_available() -> bool:
"""Check if zero-copy GPU encoding is available."""
return CUPY_AVAILABLE and PYNVCODEC_AVAILABLE
# RGB to NV12 CUDA kernel
_RGB_TO_NV12_KERNEL = None
def _get_rgb_to_nv12_kernel():
"""Get or create the RGB to NV12 conversion kernel."""
global _RGB_TO_NV12_KERNEL
if _RGB_TO_NV12_KERNEL is None and CUPY_AVAILABLE:
_RGB_TO_NV12_KERNEL = cp.RawKernel(r'''
extern "C" __global__
void rgb_to_nv12(
const unsigned char* rgb,
unsigned char* y_plane,
unsigned char* uv_plane,
int width, int height
) {
int x = blockIdx.x * blockDim.x + threadIdx.x;
int y = blockIdx.y * blockDim.y + threadIdx.y;
if (x >= width || y >= height) return;
int rgb_idx = (y * width + x) * 3;
unsigned char r = rgb[rgb_idx];
unsigned char g = rgb[rgb_idx + 1];
unsigned char b = rgb[rgb_idx + 2];
// RGB to Y (BT.601)
int y_val = ((66 * r + 129 * g + 25 * b + 128) >> 8) + 16;
y_plane[y * width + x] = (unsigned char)(y_val > 255 ? 255 : (y_val < 0 ? 0 : y_val));
// UV (subsample 2x2) - only process even pixels
if ((x & 1) == 0 && (y & 1) == 0) {
int u_val = ((-38 * r - 74 * g + 112 * b + 128) >> 8) + 128;
int v_val = ((112 * r - 94 * g - 18 * b + 128) >> 8) + 128;
int uv_idx = (y / 2) * width + x;
uv_plane[uv_idx] = (unsigned char)(u_val > 255 ? 255 : (u_val < 0 ? 0 : u_val));
uv_plane[uv_idx + 1] = (unsigned char)(v_val > 255 ? 255 : (v_val < 0 ? 0 : v_val));
}
}
''', 'rgb_to_nv12')
return _RGB_TO_NV12_KERNEL
class GPUEncoder:
"""
Zero-copy GPU video encoder using PyNvVideoCodec.
Frames are converted from RGB to NV12 on GPU and encoded directly
without any CPU memory transfers.
"""
def __init__(self, width: int, height: int, fps: float = 30, crf: int = 23):
if not check_gpu_encode_available():
raise RuntimeError("GPU encoding not available (need CuPy and PyNvVideoCodec)")
self.width = width
self.height = height
self.fps = fps
self.crf = crf
# Create dummy video to get frame buffer template
self._init_frame_buffer()
# Create encoder with low-latency settings (no B-frames for immediate output)
self.encoder = nvc.CreateEncoder(
width, height, "NV12", usecpuinputbuffer=False,
bf=0, # No B-frames - immediate output
lowLatency=1, # Low latency mode
)
# CUDA kernel grid/block config
self._block = (16, 16)
self._grid = ((width + 15) // 16, (height + 15) // 16)
self._frame_count = 0
self._encoded_data = []
print(f"[GPUEncoder] Initialized {width}x{height} @ {fps}fps, zero-copy GPU encoding", file=sys.stderr)
def _init_frame_buffer(self):
"""Initialize frame buffer from dummy decode."""
# Create minimal dummy video
dummy_path = Path("/tmp/gpu_encoder_dummy.mp4")
subprocess.run([
"ffmpeg", "-y", "-f", "lavfi",
"-i", f"color=black:size={self.width}x{self.height}:duration=0.1:rate=30",
"-c:v", "h264", "-pix_fmt", "yuv420p",
str(dummy_path)
], capture_output=True)
# Decode to get frame buffer
demuxer = nvc.CreateDemuxer(str(dummy_path))
decoder = nvc.CreateDecoder(gpuid=0, usedevicememory=True)
self._template_frame = None
for _ in range(30):
packet = demuxer.Demux()
if not packet:
break
frames = decoder.Decode(packet)
if frames:
self._template_frame = frames[0]
break
if not self._template_frame:
raise RuntimeError("Failed to initialize GPU frame buffer")
# Wrap frame planes with CuPy for zero-copy access
y_ptr = self._template_frame.GetPtrToPlane(0)
uv_ptr = self._template_frame.GetPtrToPlane(1)
y_mem = cp.cuda.UnownedMemory(y_ptr, self.height * self.width, None)
self._y_plane = cp.ndarray(
(self.height, self.width), dtype=cp.uint8,
memptr=cp.cuda.MemoryPointer(y_mem, 0)
)
uv_mem = cp.cuda.UnownedMemory(uv_ptr, (self.height // 2) * self.width, None)
self._uv_plane = cp.ndarray(
(self.height // 2, self.width), dtype=cp.uint8,
memptr=cp.cuda.MemoryPointer(uv_mem, 0)
)
# Keep references to prevent GC
self._decoder = decoder
self._demuxer = demuxer
# Cleanup dummy file
dummy_path.unlink(missing_ok=True)
def encode_frame(self, frame: Union[np.ndarray, 'cp.ndarray']) -> bytes:
"""
Encode a frame (RGB format) to H.264.
Args:
frame: RGB frame as numpy or CuPy array, shape (H, W, 3)
Returns:
Encoded bytes (may be empty if frame is buffered)
"""
# Ensure frame is on GPU
if isinstance(frame, np.ndarray):
frame_gpu = cp.asarray(frame)
else:
frame_gpu = frame
# Ensure uint8
if frame_gpu.dtype != cp.uint8:
frame_gpu = cp.clip(frame_gpu, 0, 255).astype(cp.uint8)
# Ensure contiguous
if not frame_gpu.flags['C_CONTIGUOUS']:
frame_gpu = cp.ascontiguousarray(frame_gpu)
# Convert RGB to NV12 on GPU
kernel = _get_rgb_to_nv12_kernel()
kernel(self._grid, self._block, (frame_gpu, self._y_plane, self._uv_plane, self.width, self.height))
# Encode (GPU to GPU)
result = self.encoder.Encode(self._template_frame)
self._frame_count += 1
return result if result else b''
def flush(self) -> bytes:
"""Flush encoder and return remaining data."""
return self.encoder.EndEncode()
def close(self):
"""Close encoder and cleanup."""
pass
class GPUHLSOutput:
"""
GPU-accelerated HLS output with IPFS upload.
Uses zero-copy GPU encoding and writes HLS segments.
Uploads happen asynchronously in a background thread to avoid stuttering.
"""
def __init__(
self,
output_dir: str,
size: Tuple[int, int],
fps: float = 30,
segment_duration: float = 4.0,
crf: int = 23,
audio_source: str = None,
ipfs_gateway: str = "https://ipfs.io/ipfs",
on_playlist_update: callable = None,
):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.size = size
self.fps = fps
self.segment_duration = segment_duration
self.ipfs_gateway = ipfs_gateway.rstrip("/")
self._on_playlist_update = on_playlist_update
self._is_open = True
self.audio_source = audio_source
# GPU encoder
self._gpu_encoder = GPUEncoder(size[0], size[1], fps, crf)
# Segment management
self._current_segment = 0
self._frames_in_segment = 0
self._frames_per_segment = int(fps * segment_duration)
self._segment_data = []
# Track segment CIDs for IPFS
self.segment_cids = {}
self._playlist_cid = None
self._upload_lock = threading.Lock()
# Import IPFS client
from ipfs_client import add_file, add_bytes
self._ipfs_add_file = add_file
self._ipfs_add_bytes = add_bytes
# Background upload thread
self._upload_queue = queue.Queue()
self._upload_thread = threading.Thread(target=self._upload_worker, daemon=True)
self._upload_thread.start()
# Setup ffmpeg for muxing (takes raw H.264, outputs .ts segments)
self._setup_muxer()
print(f"[GPUHLSOutput] Initialized {size[0]}x{size[1]} @ {fps}fps, GPU encoding", file=sys.stderr)
def _setup_muxer(self):
"""Setup ffmpeg for muxing H.264 to MPEG-TS segments with optional audio."""
self.local_playlist_path = self.output_dir / "stream.m3u8"
cmd = [
"ffmpeg", "-y",
"-f", "h264", # Input is raw H.264
"-i", "-",
]
# Add audio input if provided
if self.audio_source:
cmd.extend(["-i", str(self.audio_source)])
cmd.extend(["-map", "0:v", "-map", "1:a"])
cmd.extend([
"-c:v", "copy", # Just copy video, no re-encoding
])
# Add audio codec if we have audio
if self.audio_source:
cmd.extend(["-c:a", "aac", "-b:a", "128k", "-shortest"])
cmd.extend([
"-f", "hls",
"-hls_time", str(self.segment_duration),
"-hls_list_size", "0",
"-hls_flags", "independent_segments+append_list+split_by_time",
"-hls_segment_type", "mpegts",
"-hls_segment_filename", str(self.output_dir / "segment_%05d.ts"),
str(self.local_playlist_path),
])
print(f"[GPUHLSOutput] FFmpeg cmd: {' '.join(cmd)}", file=sys.stderr)
self._muxer = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE, # Capture stderr for debugging
)
def write(self, frame: Union[np.ndarray, 'cp.ndarray'], t: float = 0):
"""Write a frame using GPU encoding."""
if not self._is_open:
return
# GPU encode
encoded = self._gpu_encoder.encode_frame(frame)
# Send to muxer
if encoded:
try:
self._muxer.stdin.write(encoded)
except BrokenPipeError:
self._is_open = False
return
self._frames_in_segment += 1
# Check for segment completion
if self._frames_in_segment >= self._frames_per_segment:
self._frames_in_segment = 0
self._check_upload_segments()
def _upload_worker(self):
"""Background worker thread for async IPFS uploads."""
while True:
try:
item = self._upload_queue.get(timeout=1.0)
if item is None: # Shutdown signal
break
seg_path, seg_num = item
self._do_upload(seg_path, seg_num)
except queue.Empty:
continue
except Exception as e:
print(f"Upload worker error: {e}", file=sys.stderr)
def _do_upload(self, seg_path: Path, seg_num: int):
"""Actually perform the upload (runs in background thread)."""
try:
cid = self._ipfs_add_file(seg_path, pin=True)
if cid:
with self._upload_lock:
self.segment_cids[seg_num] = cid
print(f"Added to IPFS: {seg_path.name} -> {cid}", file=sys.stderr)
self._update_playlist()
except Exception as e:
print(f"Failed to add to IPFS: {e}", file=sys.stderr)
def _check_upload_segments(self):
"""Check for and queue new segments for async IPFS upload."""
segments = sorted(self.output_dir.glob("segment_*.ts"))
for seg_path in segments:
seg_num = int(seg_path.stem.split("_")[1])
with self._upload_lock:
if seg_num in self.segment_cids:
continue
# Check if segment is complete (quick check, no blocking)
try:
size1 = seg_path.stat().st_size
if size1 == 0:
continue
# Quick non-blocking check
time.sleep(0.01)
size2 = seg_path.stat().st_size
if size1 != size2:
continue
except FileNotFoundError:
continue
# Queue for async upload (non-blocking!)
self._upload_queue.put((seg_path, seg_num))
def _update_playlist(self):
"""Generate and upload IPFS-aware playlist."""
with self._upload_lock:
if not self.segment_cids:
return
lines = [
"#EXTM3U",
"#EXT-X-VERSION:3",
f"#EXT-X-TARGETDURATION:{int(self.segment_duration) + 1}",
"#EXT-X-MEDIA-SEQUENCE:0",
]
for seg_num in sorted(self.segment_cids.keys()):
cid = self.segment_cids[seg_num]
lines.append(f"#EXTINF:{self.segment_duration:.3f},")
# Use /ipfs-ts/ path for segments to get correct MIME type (video/mp2t)
segment_gateway = self.ipfs_gateway.replace("/ipfs", "/ipfs-ts")
lines.append(f"{segment_gateway}/{cid}")
playlist_content = "\n".join(lines) + "\n"
# Upload playlist
self._playlist_cid = self._ipfs_add_bytes(playlist_content.encode(), pin=True)
if self._playlist_cid and self._on_playlist_update:
self._on_playlist_update(self._playlist_cid)
def close(self):
"""Close output and flush remaining data."""
if not self._is_open:
return
self._is_open = False
# Flush GPU encoder
final_data = self._gpu_encoder.flush()
if final_data:
try:
self._muxer.stdin.write(final_data)
except:
pass
# Close muxer
try:
self._muxer.stdin.close()
self._muxer.wait(timeout=10)
except:
self._muxer.kill()
# Final segment upload
self._check_upload_segments()
# Wait for pending uploads to complete
self._upload_queue.put(None) # Signal shutdown
self._upload_thread.join(timeout=30)
self._gpu_encoder.close()
@property
def is_open(self) -> bool:
return self._is_open
@property
def playlist_cid(self) -> Optional[str]:
return self._playlist_cid
@property
def playlist_url(self) -> Optional[str]:
"""Get the full IPFS URL for the playlist."""
if self._playlist_cid:
return f"{self.ipfs_gateway}/{self._playlist_cid}"
return None