""" Zero-copy GPU video encoding output. Uses PyNvVideoCodec for direct GPU-to-GPU encoding without CPU transfers. Frames stay on GPU throughout: CuPy → NV12 conversion → NVENC encoding. """ import numpy as np import subprocess import sys import threading import queue from pathlib import Path from typing import Tuple, Optional, Union import time # Try to import GPU libraries try: import cupy as cp CUPY_AVAILABLE = True except ImportError: cp = None CUPY_AVAILABLE = False try: import PyNvVideoCodec as nvc PYNVCODEC_AVAILABLE = True except ImportError: nvc = None PYNVCODEC_AVAILABLE = False def check_gpu_encode_available() -> bool: """Check if zero-copy GPU encoding is available.""" return CUPY_AVAILABLE and PYNVCODEC_AVAILABLE # RGB to NV12 CUDA kernel _RGB_TO_NV12_KERNEL = None def _get_rgb_to_nv12_kernel(): """Get or create the RGB to NV12 conversion kernel.""" global _RGB_TO_NV12_KERNEL if _RGB_TO_NV12_KERNEL is None and CUPY_AVAILABLE: _RGB_TO_NV12_KERNEL = cp.RawKernel(r''' extern "C" __global__ void rgb_to_nv12( const unsigned char* rgb, unsigned char* y_plane, unsigned char* uv_plane, int width, int height ) { int x = blockIdx.x * blockDim.x + threadIdx.x; int y = blockIdx.y * blockDim.y + threadIdx.y; if (x >= width || y >= height) return; int rgb_idx = (y * width + x) * 3; unsigned char r = rgb[rgb_idx]; unsigned char g = rgb[rgb_idx + 1]; unsigned char b = rgb[rgb_idx + 2]; // RGB to Y (BT.601) int y_val = ((66 * r + 129 * g + 25 * b + 128) >> 8) + 16; y_plane[y * width + x] = (unsigned char)(y_val > 255 ? 255 : (y_val < 0 ? 0 : y_val)); // UV (subsample 2x2) - only process even pixels if ((x & 1) == 0 && (y & 1) == 0) { int u_val = ((-38 * r - 74 * g + 112 * b + 128) >> 8) + 128; int v_val = ((112 * r - 94 * g - 18 * b + 128) >> 8) + 128; int uv_idx = (y / 2) * width + x; uv_plane[uv_idx] = (unsigned char)(u_val > 255 ? 255 : (u_val < 0 ? 0 : u_val)); uv_plane[uv_idx + 1] = (unsigned char)(v_val > 255 ? 255 : (v_val < 0 ? 0 : v_val)); } } ''', 'rgb_to_nv12') return _RGB_TO_NV12_KERNEL class GPUEncoder: """ Zero-copy GPU video encoder using PyNvVideoCodec. Frames are converted from RGB to NV12 on GPU and encoded directly without any CPU memory transfers. """ def __init__(self, width: int, height: int, fps: float = 30, crf: int = 23): if not check_gpu_encode_available(): raise RuntimeError("GPU encoding not available (need CuPy and PyNvVideoCodec)") self.width = width self.height = height self.fps = fps self.crf = crf # Create dummy video to get frame buffer template self._init_frame_buffer() # Create encoder with low-latency settings (no B-frames for immediate output) self.encoder = nvc.CreateEncoder( width, height, "NV12", usecpuinputbuffer=False, bf=0, # No B-frames - immediate output lowLatency=1, # Low latency mode ) # CUDA kernel grid/block config self._block = (16, 16) self._grid = ((width + 15) // 16, (height + 15) // 16) self._frame_count = 0 self._encoded_data = [] print(f"[GPUEncoder] Initialized {width}x{height} @ {fps}fps, zero-copy GPU encoding", file=sys.stderr) def _init_frame_buffer(self): """Initialize frame buffer from dummy decode.""" # Create minimal dummy video dummy_path = Path("/tmp/gpu_encoder_dummy.mp4") subprocess.run([ "ffmpeg", "-y", "-f", "lavfi", "-i", f"color=black:size={self.width}x{self.height}:duration=0.1:rate=30", "-c:v", "h264", "-pix_fmt", "yuv420p", str(dummy_path) ], capture_output=True) # Decode to get frame buffer demuxer = nvc.CreateDemuxer(str(dummy_path)) decoder = nvc.CreateDecoder(gpuid=0, usedevicememory=True) self._template_frame = None for _ in range(30): packet = demuxer.Demux() if not packet: break frames = decoder.Decode(packet) if frames: self._template_frame = frames[0] break if not self._template_frame: raise RuntimeError("Failed to initialize GPU frame buffer") # Wrap frame planes with CuPy for zero-copy access y_ptr = self._template_frame.GetPtrToPlane(0) uv_ptr = self._template_frame.GetPtrToPlane(1) y_mem = cp.cuda.UnownedMemory(y_ptr, self.height * self.width, None) self._y_plane = cp.ndarray( (self.height, self.width), dtype=cp.uint8, memptr=cp.cuda.MemoryPointer(y_mem, 0) ) uv_mem = cp.cuda.UnownedMemory(uv_ptr, (self.height // 2) * self.width, None) self._uv_plane = cp.ndarray( (self.height // 2, self.width), dtype=cp.uint8, memptr=cp.cuda.MemoryPointer(uv_mem, 0) ) # Keep references to prevent GC self._decoder = decoder self._demuxer = demuxer # Cleanup dummy file dummy_path.unlink(missing_ok=True) def encode_frame(self, frame: Union[np.ndarray, 'cp.ndarray']) -> bytes: """ Encode a frame (RGB format) to H.264. Args: frame: RGB frame as numpy or CuPy array, shape (H, W, 3) Returns: Encoded bytes (may be empty if frame is buffered) """ # Ensure frame is on GPU if isinstance(frame, np.ndarray): frame_gpu = cp.asarray(frame) else: frame_gpu = frame # Ensure uint8 if frame_gpu.dtype != cp.uint8: frame_gpu = cp.clip(frame_gpu, 0, 255).astype(cp.uint8) # Ensure contiguous if not frame_gpu.flags['C_CONTIGUOUS']: frame_gpu = cp.ascontiguousarray(frame_gpu) # Convert RGB to NV12 on GPU kernel = _get_rgb_to_nv12_kernel() kernel(self._grid, self._block, (frame_gpu, self._y_plane, self._uv_plane, self.width, self.height)) # Encode (GPU to GPU) result = self.encoder.Encode(self._template_frame) self._frame_count += 1 return result if result else b'' def flush(self) -> bytes: """Flush encoder and return remaining data.""" return self.encoder.EndEncode() def close(self): """Close encoder and cleanup.""" pass class GPUHLSOutput: """ GPU-accelerated HLS output with IPFS upload. Uses zero-copy GPU encoding and writes HLS segments. Uploads happen asynchronously in a background thread to avoid stuttering. """ def __init__( self, output_dir: str, size: Tuple[int, int], fps: float = 30, segment_duration: float = 4.0, crf: int = 23, audio_source: str = None, ipfs_gateway: str = "https://ipfs.io/ipfs", on_playlist_update: callable = None, ): self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) self.size = size self.fps = fps self.segment_duration = segment_duration self.ipfs_gateway = ipfs_gateway.rstrip("/") self._on_playlist_update = on_playlist_update self._is_open = True # GPU encoder self._gpu_encoder = GPUEncoder(size[0], size[1], fps, crf) # Segment management self._current_segment = 0 self._frames_in_segment = 0 self._frames_per_segment = int(fps * segment_duration) self._segment_data = [] # Track segment CIDs for IPFS self.segment_cids = {} self._playlist_cid = None self._upload_lock = threading.Lock() # Import IPFS client from ipfs_client import add_file, add_bytes self._ipfs_add_file = add_file self._ipfs_add_bytes = add_bytes # Background upload thread self._upload_queue = queue.Queue() self._upload_thread = threading.Thread(target=self._upload_worker, daemon=True) self._upload_thread.start() # Setup ffmpeg for muxing (takes raw H.264, outputs .ts segments) self._setup_muxer() print(f"[GPUHLSOutput] Initialized {size[0]}x{size[1]} @ {fps}fps, GPU encoding", file=sys.stderr) def _setup_muxer(self): """Setup ffmpeg for muxing H.264 to MPEG-TS segments.""" self.local_playlist_path = self.output_dir / "stream.m3u8" cmd = [ "ffmpeg", "-y", "-f", "h264", # Input is raw H.264 "-i", "-", "-c:v", "copy", # Just copy, no re-encoding "-f", "hls", "-hls_time", str(self.segment_duration), "-hls_list_size", "0", "-hls_flags", "independent_segments+append_list+split_by_time", "-hls_segment_type", "mpegts", "-hls_segment_filename", str(self.output_dir / "segment_%05d.ts"), str(self.local_playlist_path), ] self._muxer = subprocess.Popen( cmd, stdin=subprocess.PIPE, stderr=subprocess.DEVNULL, ) def write(self, frame: Union[np.ndarray, 'cp.ndarray'], t: float = 0): """Write a frame using GPU encoding.""" if not self._is_open: return # GPU encode encoded = self._gpu_encoder.encode_frame(frame) # Send to muxer if encoded: try: self._muxer.stdin.write(encoded) except BrokenPipeError: self._is_open = False return self._frames_in_segment += 1 # Check for segment completion if self._frames_in_segment >= self._frames_per_segment: self._frames_in_segment = 0 self._check_upload_segments() def _upload_worker(self): """Background worker thread for async IPFS uploads.""" while True: try: item = self._upload_queue.get(timeout=1.0) if item is None: # Shutdown signal break seg_path, seg_num = item self._do_upload(seg_path, seg_num) except queue.Empty: continue except Exception as e: print(f"Upload worker error: {e}", file=sys.stderr) def _do_upload(self, seg_path: Path, seg_num: int): """Actually perform the upload (runs in background thread).""" try: cid = self._ipfs_add_file(seg_path, pin=True) if cid: with self._upload_lock: self.segment_cids[seg_num] = cid print(f"Added to IPFS: {seg_path.name} -> {cid}", file=sys.stderr) self._update_playlist() except Exception as e: print(f"Failed to add to IPFS: {e}", file=sys.stderr) def _check_upload_segments(self): """Check for and queue new segments for async IPFS upload.""" segments = sorted(self.output_dir.glob("segment_*.ts")) for seg_path in segments: seg_num = int(seg_path.stem.split("_")[1]) with self._upload_lock: if seg_num in self.segment_cids: continue # Check if segment is complete (quick check, no blocking) try: size1 = seg_path.stat().st_size if size1 == 0: continue # Quick non-blocking check time.sleep(0.01) size2 = seg_path.stat().st_size if size1 != size2: continue except FileNotFoundError: continue # Queue for async upload (non-blocking!) self._upload_queue.put((seg_path, seg_num)) def _update_playlist(self): """Generate and upload IPFS-aware playlist.""" with self._upload_lock: if not self.segment_cids: return lines = [ "#EXTM3U", "#EXT-X-VERSION:3", f"#EXT-X-TARGETDURATION:{int(self.segment_duration) + 1}", "#EXT-X-MEDIA-SEQUENCE:0", ] for seg_num in sorted(self.segment_cids.keys()): cid = self.segment_cids[seg_num] lines.append(f"#EXTINF:{self.segment_duration:.3f},") lines.append(f"{self.ipfs_gateway}/{cid}") playlist_content = "\n".join(lines) + "\n" # Upload playlist self._playlist_cid = self._ipfs_add_bytes(playlist_content.encode(), pin=True) if self._playlist_cid and self._on_playlist_update: self._on_playlist_update(self._playlist_cid) def close(self): """Close output and flush remaining data.""" if not self._is_open: return self._is_open = False # Flush GPU encoder final_data = self._gpu_encoder.flush() if final_data: try: self._muxer.stdin.write(final_data) except: pass # Close muxer try: self._muxer.stdin.close() self._muxer.wait(timeout=10) except: self._muxer.kill() # Final segment upload self._check_upload_segments() # Wait for pending uploads to complete self._upload_queue.put(None) # Signal shutdown self._upload_thread.join(timeout=30) self._gpu_encoder.close() @property def is_open(self) -> bool: return self._is_open @property def playlist_cid(self) -> Optional[str]: return self._playlist_cid @property def playlist_url(self) -> Optional[str]: """Get the full IPFS URL for the playlist.""" if self._playlist_cid: return f"{self.ipfs_gateway}/{self._playlist_cid}" return None