From 76e4a002a0a9fbc67b1d994f1fbc36a4a4d51bb5 Mon Sep 17 00:00:00 2001 From: giles Date: Wed, 4 Feb 2026 20:26:10 +0000 Subject: [PATCH] Add missing multi_res_output.py and update gpu_output.py - multi_res_output.py was not tracked, causing import errors - Update gpu_output.py with recent changes Co-Authored-By: Claude Opus 4.5 --- streaming/gpu_output.py | 24 +- streaming/multi_res_output.py | 461 ++++++++++++++++++++++++++++++++++ 2 files changed, 484 insertions(+), 1 deletion(-) create mode 100644 streaming/multi_res_output.py diff --git a/streaming/gpu_output.py b/streaming/gpu_output.py index e32859e..3034310 100644 --- a/streaming/gpu_output.py +++ b/streaming/gpu_output.py @@ -325,6 +325,20 @@ class GPUHLSOutput: stderr=subprocess.PIPE, # Capture stderr for debugging ) + # Start thread to drain stderr (prevents pipe buffer from filling and blocking FFmpeg) + self._stderr_thread = threading.Thread(target=self._drain_stderr, daemon=True) + self._stderr_thread.start() + + def _drain_stderr(self): + """Drain FFmpeg stderr to prevent blocking.""" + try: + for line in self._muxer.stderr: + line_str = line.decode('utf-8', errors='replace').strip() + if line_str: + print(f"[FFmpeg] {line_str}", file=sys.stderr) + except Exception as e: + print(f"[FFmpeg stderr] Error reading: {e}", file=sys.stderr) + def write(self, frame: Union[np.ndarray, 'cp.ndarray'], t: float = 0): """Write a frame using GPU encoding.""" if not self._is_open: @@ -342,7 +356,15 @@ class GPUHLSOutput: if encoded: try: self._muxer.stdin.write(encoded) - except BrokenPipeError: + except BrokenPipeError as e: + print(f"[GPUHLSOutput] FFmpeg pipe broken after {self._frames_in_segment} frames in segment, total segments: {self._current_segment}", file=sys.stderr) + # Check if muxer is still running + if self._muxer.poll() is not None: + print(f"[GPUHLSOutput] FFmpeg exited with code {self._muxer.returncode}", file=sys.stderr) + self._is_open = False + return + except Exception as e: + print(f"[GPUHLSOutput] Error writing to FFmpeg: {e}", file=sys.stderr) self._is_open = False return diff --git a/streaming/multi_res_output.py b/streaming/multi_res_output.py new file mode 100644 index 0000000..85988da --- /dev/null +++ b/streaming/multi_res_output.py @@ -0,0 +1,461 @@ +""" +Multi-Resolution HLS Output with IPFS Storage. + +Renders video at multiple quality levels simultaneously: +- Original resolution (from recipe) +- 720p (streaming quality) +- 360p (mobile/low bandwidth) + +All segments stored on IPFS. Master playlist enables adaptive bitrate streaming. +""" + +import os +import sys +import subprocess +import threading +import queue +import time +from pathlib import Path +from typing import Dict, List, Optional, Tuple, Union +from dataclasses import dataclass, field + +import numpy as np + +# Try GPU imports +try: + import cupy as cp + GPU_AVAILABLE = True +except ImportError: + cp = None + GPU_AVAILABLE = False + + +@dataclass +class QualityLevel: + """Configuration for a quality level.""" + name: str + width: int + height: int + bitrate: int # kbps + segment_cids: Dict[int, str] = field(default_factory=dict) + playlist_cid: Optional[str] = None + + +class MultiResolutionHLSOutput: + """ + GPU-accelerated multi-resolution HLS output with IPFS storage. + + Encodes video at multiple quality levels simultaneously using NVENC. + Segments are uploaded to IPFS as they're created. + Generates adaptive bitrate master playlist. + """ + + def __init__( + self, + output_dir: str, + source_size: Tuple[int, int], + fps: float = 30, + segment_duration: float = 4.0, + ipfs_gateway: str = "https://ipfs.io/ipfs", + on_playlist_update: callable = None, + ): + self.output_dir = Path(output_dir) + self.output_dir.mkdir(parents=True, exist_ok=True) + self.source_width, self.source_height = source_size + self.fps = fps + self.segment_duration = segment_duration + self.ipfs_gateway = ipfs_gateway.rstrip("/") + self._on_playlist_update = on_playlist_update + self._is_open = True + self._frame_count = 0 + + # Define quality levels + self.qualities: Dict[str, QualityLevel] = {} + self._setup_quality_levels() + + # IPFS client + from ipfs_client import add_file, add_bytes + self._ipfs_add_file = add_file + self._ipfs_add_bytes = add_bytes + + # Upload queue and thread + self._upload_queue = queue.Queue() + self._upload_thread = threading.Thread(target=self._upload_worker, daemon=True) + self._upload_thread.start() + + # Track master playlist + self._master_playlist_cid = None + + # Setup encoders + self._setup_encoders() + + print(f"[MultiResHLS] Initialized {self.source_width}x{self.source_height} @ {fps}fps", file=sys.stderr) + print(f"[MultiResHLS] Quality levels: {list(self.qualities.keys())}", file=sys.stderr) + + def _setup_quality_levels(self): + """Configure quality levels based on source resolution.""" + # Always include original resolution + self.qualities['original'] = QualityLevel( + name='original', + width=self.source_width, + height=self.source_height, + bitrate=self._estimate_bitrate(self.source_width, self.source_height), + ) + + # Add 720p if source is larger + if self.source_height > 720: + aspect = self.source_width / self.source_height + w720 = int(720 * aspect) + w720 = w720 - (w720 % 2) # Ensure even width + self.qualities['720p'] = QualityLevel( + name='720p', + width=w720, + height=720, + bitrate=2500, + ) + + # Add 360p if source is larger + if self.source_height > 360: + aspect = self.source_width / self.source_height + w360 = int(360 * aspect) + w360 = w360 - (w360 % 2) # Ensure even width + self.qualities['360p'] = QualityLevel( + name='360p', + width=w360, + height=360, + bitrate=800, + ) + + def _estimate_bitrate(self, width: int, height: int) -> int: + """Estimate appropriate bitrate for resolution (in kbps).""" + pixels = width * height + if pixels >= 3840 * 2160: # 4K + return 15000 + elif pixels >= 1920 * 1080: # 1080p + return 5000 + elif pixels >= 1280 * 720: # 720p + return 2500 + elif pixels >= 854 * 480: # 480p + return 1500 + else: + return 800 + + def _setup_encoders(self): + """Setup FFmpeg encoder processes for each quality level.""" + self._encoders: Dict[str, subprocess.Popen] = {} + self._encoder_threads: Dict[str, threading.Thread] = {} + + for name, quality in self.qualities.items(): + # Create output directory for this quality + quality_dir = self.output_dir / name + quality_dir.mkdir(parents=True, exist_ok=True) + + # Build FFmpeg command + cmd = self._build_encoder_cmd(quality, quality_dir) + + print(f"[MultiResHLS] Starting encoder for {name}: {quality.width}x{quality.height}", file=sys.stderr) + + # Start encoder process + proc = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stderr=subprocess.PIPE, + bufsize=10**7, # Large buffer to prevent blocking + ) + self._encoders[name] = proc + + # Start stderr drain thread + stderr_thread = threading.Thread( + target=self._drain_stderr, + args=(name, proc), + daemon=True + ) + stderr_thread.start() + self._encoder_threads[name] = stderr_thread + + def _build_encoder_cmd(self, quality: QualityLevel, output_dir: Path) -> List[str]: + """Build FFmpeg command for a quality level.""" + playlist_path = output_dir / "playlist.m3u8" + segment_pattern = output_dir / "segment_%05d.ts" + + cmd = [ + "ffmpeg", "-y", + "-f", "rawvideo", + "-pixel_format", "rgb24", + "-video_size", f"{self.source_width}x{self.source_height}", + "-framerate", str(self.fps), + "-i", "-", + ] + + # Scale if not original resolution + if quality.width != self.source_width or quality.height != self.source_height: + cmd.extend([ + "-vf", f"scale={quality.width}:{quality.height}:flags=lanczos", + ]) + + # NVENC encoding with quality settings + cmd.extend([ + "-c:v", "h264_nvenc", + "-preset", "p4", # Balanced speed/quality + "-tune", "hq", + "-b:v", f"{quality.bitrate}k", + "-maxrate", f"{int(quality.bitrate * 1.5)}k", + "-bufsize", f"{quality.bitrate * 2}k", + "-g", str(int(self.fps * self.segment_duration)), # Keyframe interval = segment duration + "-keyint_min", str(int(self.fps * self.segment_duration)), + "-sc_threshold", "0", # Disable scene change detection for consistent segments + ]) + + # HLS output + cmd.extend([ + "-f", "hls", + "-hls_time", str(self.segment_duration), + "-hls_list_size", "0", # Keep all segments in playlist + "-hls_flags", "independent_segments+append_list", + "-hls_segment_type", "mpegts", + "-hls_segment_filename", str(segment_pattern), + str(playlist_path), + ]) + + return cmd + + def _drain_stderr(self, name: str, proc: subprocess.Popen): + """Drain FFmpeg stderr to prevent blocking.""" + try: + for line in proc.stderr: + line_str = line.decode('utf-8', errors='replace').strip() + if line_str and ('error' in line_str.lower() or 'warning' in line_str.lower()): + print(f"[FFmpeg/{name}] {line_str}", file=sys.stderr) + except Exception as e: + print(f"[FFmpeg/{name}] stderr drain error: {e}", file=sys.stderr) + + def write(self, frame: Union[np.ndarray, 'cp.ndarray'], t: float = 0): + """Write a frame to all quality encoders.""" + if not self._is_open: + return + + # Convert GPU frame to CPU if needed + if GPU_AVAILABLE and hasattr(frame, 'get'): + frame = frame.get() # CuPy to NumPy + elif hasattr(frame, 'cpu'): + frame = frame.cpu # GPUFrame to NumPy + elif hasattr(frame, 'gpu') and hasattr(frame, 'is_on_gpu'): + frame = frame.gpu.get() if frame.is_on_gpu else frame.cpu + + # Ensure correct format + if frame.dtype != np.uint8: + frame = np.clip(frame, 0, 255).astype(np.uint8) + + # Ensure contiguous + if not frame.flags['C_CONTIGUOUS']: + frame = np.ascontiguousarray(frame) + + frame_bytes = frame.tobytes() + + # Write to all encoders + for name, proc in self._encoders.items(): + if proc.poll() is not None: + print(f"[MultiResHLS] Encoder {name} died with code {proc.returncode}", file=sys.stderr) + self._is_open = False + return + + try: + proc.stdin.write(frame_bytes) + except BrokenPipeError: + print(f"[MultiResHLS] Encoder {name} pipe broken", file=sys.stderr) + self._is_open = False + return + + self._frame_count += 1 + + # Check for new segments periodically + if self._frame_count % int(self.fps * self.segment_duration) == 0: + self._check_and_upload_segments() + + def _check_and_upload_segments(self): + """Check for new segments and queue them for upload.""" + for name, quality in self.qualities.items(): + quality_dir = self.output_dir / name + segments = sorted(quality_dir.glob("segment_*.ts")) + + for seg_path in segments: + seg_num = int(seg_path.stem.split("_")[1]) + + if seg_num in quality.segment_cids: + continue # Already uploaded + + # Check if segment is complete (not still being written) + try: + size1 = seg_path.stat().st_size + if size1 == 0: + continue + time.sleep(0.05) + size2 = seg_path.stat().st_size + if size1 != size2: + continue # Still being written + except FileNotFoundError: + continue + + # Queue for upload + self._upload_queue.put((name, seg_path, seg_num)) + + def _upload_worker(self): + """Background worker for IPFS uploads.""" + while True: + try: + item = self._upload_queue.get(timeout=1.0) + if item is None: # Shutdown signal + break + + quality_name, seg_path, seg_num = item + self._do_upload(quality_name, seg_path, seg_num) + + except queue.Empty: + continue + except Exception as e: + print(f"[MultiResHLS] Upload worker error: {e}", file=sys.stderr) + + def _do_upload(self, quality_name: str, seg_path: Path, seg_num: int): + """Upload a segment to IPFS.""" + try: + cid = self._ipfs_add_file(seg_path, pin=True) + if cid: + self.qualities[quality_name].segment_cids[seg_num] = cid + print(f"[MultiResHLS] Uploaded {quality_name}/segment_{seg_num:05d}.ts -> {cid[:16]}...", file=sys.stderr) + + # Update playlists after each upload + self._update_playlists() + except Exception as e: + print(f"[MultiResHLS] Failed to upload {seg_path}: {e}", file=sys.stderr) + + def _update_playlists(self): + """Generate and upload IPFS playlists.""" + # Generate quality-specific playlists + for name, quality in self.qualities.items(): + if not quality.segment_cids: + continue + + playlist = self._generate_quality_playlist(quality) + cid = self._ipfs_add_bytes(playlist.encode(), pin=True) + if cid: + quality.playlist_cid = cid + + # Generate master playlist + self._generate_master_playlist() + + def _generate_quality_playlist(self, quality: QualityLevel, finalize: bool = False) -> str: + """Generate HLS playlist for a quality level.""" + lines = [ + "#EXTM3U", + "#EXT-X-VERSION:3", + f"#EXT-X-TARGETDURATION:{int(self.segment_duration) + 1}", + "#EXT-X-MEDIA-SEQUENCE:0", + ] + + if finalize: + lines.append("#EXT-X-PLAYLIST-TYPE:VOD") + + # Use /ipfs-ts/ for correct MIME type + segment_gateway = self.ipfs_gateway.replace("/ipfs", "/ipfs-ts") + + for seg_num in sorted(quality.segment_cids.keys()): + cid = quality.segment_cids[seg_num] + lines.append(f"#EXTINF:{self.segment_duration:.3f},") + lines.append(f"{segment_gateway}/{cid}") + + if finalize: + lines.append("#EXT-X-ENDLIST") + + return "\n".join(lines) + "\n" + + def _generate_master_playlist(self, finalize: bool = False): + """Generate and upload master playlist.""" + lines = ["#EXTM3U", "#EXT-X-VERSION:3"] + + for name, quality in self.qualities.items(): + if not quality.playlist_cid: + continue + + lines.append( + f"#EXT-X-STREAM-INF:BANDWIDTH={quality.bitrate * 1000}," + f"RESOLUTION={quality.width}x{quality.height}," + f"NAME=\"{name}\"" + ) + lines.append(f"{self.ipfs_gateway}/{quality.playlist_cid}") + + if len(lines) <= 2: + return # No quality playlists yet + + master_content = "\n".join(lines) + "\n" + cid = self._ipfs_add_bytes(master_content.encode(), pin=True) + + if cid: + self._master_playlist_cid = cid + print(f"[MultiResHLS] Master playlist: {cid}", file=sys.stderr) + + if self._on_playlist_update: + self._on_playlist_update(cid) + + def close(self): + """Close all encoders and finalize output.""" + if not self._is_open: + return + + self._is_open = False + print(f"[MultiResHLS] Closing after {self._frame_count} frames", file=sys.stderr) + + # Close encoder stdin pipes + for name, proc in self._encoders.items(): + try: + proc.stdin.close() + except: + pass + + # Wait for encoders to finish + for name, proc in self._encoders.items(): + try: + proc.wait(timeout=30) + print(f"[MultiResHLS] Encoder {name} finished with code {proc.returncode}", file=sys.stderr) + except subprocess.TimeoutExpired: + proc.kill() + print(f"[MultiResHLS] Encoder {name} killed (timeout)", file=sys.stderr) + + # Final segment check and upload + self._check_and_upload_segments() + + # Wait for uploads to complete + self._upload_queue.put(None) # Shutdown signal + self._upload_thread.join(timeout=60) + + # Generate final playlists with EXT-X-ENDLIST + for name, quality in self.qualities.items(): + if quality.segment_cids: + playlist = self._generate_quality_playlist(quality, finalize=True) + cid = self._ipfs_add_bytes(playlist.encode(), pin=True) + if cid: + quality.playlist_cid = cid + print(f"[MultiResHLS] Final {name} playlist: {cid} ({len(quality.segment_cids)} segments)", file=sys.stderr) + + # Final master playlist + self._generate_master_playlist(finalize=True) + + print(f"[MultiResHLS] Complete. Master playlist: {self._master_playlist_cid}", file=sys.stderr) + + @property + def is_open(self) -> bool: + return self._is_open + + @property + def playlist_cid(self) -> Optional[str]: + return self._master_playlist_cid + + @property + def playlist_url(self) -> Optional[str]: + if self._master_playlist_cid: + return f"{self.ipfs_gateway}/{self._master_playlist_cid}" + return None + + @property + def segment_cids(self) -> Dict[str, Dict[int, str]]: + """Get all segment CIDs organized by quality.""" + return {name: dict(q.segment_cids) for name, q in self.qualities.items()}