Files
mono/l1/streaming/multi_res_output.py
giles b788f1f778
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 13m49s
Fix CPU HLS streaming (yuv420p) and opt-in middleware for fragments
- Add -pix_fmt yuv420p to multi_res_output.py libx264 path so browsers
  can decode CPU-encoded segments (was producing yuv444p / High 4:4:4).
- Switch silent auth check and coop fragment middlewares from opt-out
  blocklists to opt-in: only run for GET requests with Accept: text/html.
  Prevents unnecessary nav-tree/auth-menu HTTP calls on every HLS segment,
  IPFS proxy, and API request.
- Add opaque grant token verification to L1/L2 dependencies.
- Migrate client CLI to device authorization flow.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 18:33:53 +00:00

525 lines
19 KiB
Python

"""
Multi-Resolution HLS Output with IPFS Storage.
Renders video at multiple quality levels simultaneously:
- Original resolution (from recipe)
- 720p (streaming quality)
- 360p (mobile/low bandwidth)
All segments stored on IPFS. Master playlist enables adaptive bitrate streaming.
"""
import os
import sys
import subprocess
import threading
import queue
import time
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
from dataclasses import dataclass, field
import numpy as np
from streaming.output import check_nvenc_available
# Try GPU imports
try:
import cupy as cp
GPU_AVAILABLE = True
except ImportError:
cp = None
GPU_AVAILABLE = False
@dataclass
class QualityLevel:
"""Configuration for a quality level."""
name: str
width: int
height: int
bitrate: int # kbps
segment_cids: Dict[int, str] = field(default_factory=dict)
playlist_cid: Optional[str] = None
class MultiResolutionHLSOutput:
"""
GPU-accelerated multi-resolution HLS output with IPFS storage.
Encodes video at multiple quality levels simultaneously using NVENC.
Segments are uploaded to IPFS as they're created.
Generates adaptive bitrate master playlist.
"""
def __init__(
self,
output_dir: str,
source_size: Tuple[int, int],
fps: float = 30,
segment_duration: float = 4.0,
ipfs_gateway: str = "https://ipfs.io/ipfs",
on_playlist_update: callable = None,
audio_source: str = None,
resume_from: Optional[Dict] = None,
):
"""Initialize multi-resolution HLS output.
Args:
output_dir: Directory for HLS output files
source_size: (width, height) of source frames
fps: Frames per second
segment_duration: Duration of each HLS segment in seconds
ipfs_gateway: IPFS gateway URL for playlist URLs
on_playlist_update: Callback when playlists are updated
audio_source: Optional audio file to mux with video
resume_from: Optional dict to resume from checkpoint with keys:
- segment_cids: Dict of quality -> {seg_num: cid}
"""
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.source_width, self.source_height = source_size
self.fps = fps
self.segment_duration = segment_duration
self.ipfs_gateway = ipfs_gateway.rstrip("/")
self._on_playlist_update = on_playlist_update
self.audio_source = audio_source
self._is_open = True
self._frame_count = 0
# Define quality levels
self.qualities: Dict[str, QualityLevel] = {}
self._setup_quality_levels()
# Restore segment CIDs if resuming (don't re-upload existing segments)
if resume_from and resume_from.get('segment_cids'):
for name, cids in resume_from['segment_cids'].items():
if name in self.qualities:
self.qualities[name].segment_cids = dict(cids)
print(f"[MultiResHLS] Restored {len(cids)} segment CIDs for {name}", file=sys.stderr)
# IPFS client
from ipfs_client import add_file, add_bytes
self._ipfs_add_file = add_file
self._ipfs_add_bytes = add_bytes
# Upload queue and thread
self._upload_queue = queue.Queue()
self._upload_thread = threading.Thread(target=self._upload_worker, daemon=True)
self._upload_thread.start()
# Track master playlist
self._master_playlist_cid = None
# Setup encoders
self._setup_encoders()
print(f"[MultiResHLS] Initialized {self.source_width}x{self.source_height} @ {fps}fps", file=sys.stderr)
print(f"[MultiResHLS] Quality levels: {list(self.qualities.keys())}", file=sys.stderr)
def _setup_quality_levels(self):
"""Configure quality levels based on source resolution."""
# Always include original resolution
self.qualities['original'] = QualityLevel(
name='original',
width=self.source_width,
height=self.source_height,
bitrate=self._estimate_bitrate(self.source_width, self.source_height),
)
# Add 720p if source is larger
if self.source_height > 720:
aspect = self.source_width / self.source_height
w720 = int(720 * aspect)
w720 = w720 - (w720 % 2) # Ensure even width
self.qualities['720p'] = QualityLevel(
name='720p',
width=w720,
height=720,
bitrate=2500,
)
# Add 360p if source is larger
if self.source_height > 360:
aspect = self.source_width / self.source_height
w360 = int(360 * aspect)
w360 = w360 - (w360 % 2) # Ensure even width
self.qualities['360p'] = QualityLevel(
name='360p',
width=w360,
height=360,
bitrate=800,
)
def _estimate_bitrate(self, width: int, height: int) -> int:
"""Estimate appropriate bitrate for resolution (in kbps)."""
pixels = width * height
if pixels >= 3840 * 2160: # 4K
return 15000
elif pixels >= 1920 * 1080: # 1080p
return 5000
elif pixels >= 1280 * 720: # 720p
return 2500
elif pixels >= 854 * 480: # 480p
return 1500
else:
return 800
def _setup_encoders(self):
"""Setup FFmpeg encoder processes for each quality level."""
self._encoders: Dict[str, subprocess.Popen] = {}
self._encoder_threads: Dict[str, threading.Thread] = {}
for name, quality in self.qualities.items():
# Create output directory for this quality
quality_dir = self.output_dir / name
quality_dir.mkdir(parents=True, exist_ok=True)
# Build FFmpeg command
cmd = self._build_encoder_cmd(quality, quality_dir)
print(f"[MultiResHLS] Starting encoder for {name}: {quality.width}x{quality.height}", file=sys.stderr)
# Start encoder process
proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=10**7, # Large buffer to prevent blocking
)
self._encoders[name] = proc
# Start stderr drain thread
stderr_thread = threading.Thread(
target=self._drain_stderr,
args=(name, proc),
daemon=True
)
stderr_thread.start()
self._encoder_threads[name] = stderr_thread
def _build_encoder_cmd(self, quality: QualityLevel, output_dir: Path) -> List[str]:
"""Build FFmpeg command for a quality level."""
playlist_path = output_dir / "playlist.m3u8"
segment_pattern = output_dir / "segment_%05d.ts"
cmd = [
"ffmpeg", "-y",
"-f", "rawvideo",
"-pixel_format", "rgb24",
"-video_size", f"{self.source_width}x{self.source_height}",
"-framerate", str(self.fps),
"-i", "-",
]
# Add audio input if provided
if self.audio_source:
cmd.extend(["-i", str(self.audio_source)])
# Map video from input 0, audio from input 1
cmd.extend(["-map", "0:v", "-map", "1:a"])
# Scale if not original resolution
if quality.width != self.source_width or quality.height != self.source_height:
cmd.extend([
"-vf", f"scale={quality.width}:{quality.height}:flags=lanczos",
])
# Encoding settings - use NVENC if available, fall back to libx264
use_nvenc = check_nvenc_available()
if use_nvenc:
cmd.extend([
"-c:v", "h264_nvenc",
"-preset", "p4", # Balanced speed/quality
"-tune", "hq",
"-b:v", f"{quality.bitrate}k",
"-maxrate", f"{int(quality.bitrate * 1.5)}k",
"-bufsize", f"{quality.bitrate * 2}k",
])
else:
cmd.extend([
"-c:v", "libx264",
"-preset", "fast",
"-b:v", f"{quality.bitrate}k",
"-maxrate", f"{int(quality.bitrate * 1.5)}k",
"-bufsize", f"{quality.bitrate * 2}k",
])
cmd.extend([
"-pix_fmt", "yuv420p", # Required for browser MSE compatibility
"-g", str(int(self.fps * self.segment_duration)), # Keyframe interval = segment duration
"-keyint_min", str(int(self.fps * self.segment_duration)),
"-sc_threshold", "0", # Disable scene change detection for consistent segments
])
# Add audio encoding if audio source provided
if self.audio_source:
cmd.extend([
"-c:a", "aac",
"-b:a", "128k",
"-shortest", # Stop when shortest stream ends
])
# HLS output
cmd.extend([
"-f", "hls",
"-hls_time", str(self.segment_duration),
"-hls_list_size", "0", # Keep all segments in playlist
"-hls_flags", "independent_segments+append_list",
"-hls_segment_type", "mpegts",
"-hls_segment_filename", str(segment_pattern),
str(playlist_path),
])
return cmd
def _drain_stderr(self, name: str, proc: subprocess.Popen):
"""Drain FFmpeg stderr to prevent blocking."""
try:
for line in proc.stderr:
line_str = line.decode('utf-8', errors='replace').strip()
if line_str and ('error' in line_str.lower() or 'warning' in line_str.lower()):
print(f"[FFmpeg/{name}] {line_str}", file=sys.stderr)
except Exception as e:
print(f"[FFmpeg/{name}] stderr drain error: {e}", file=sys.stderr)
def write(self, frame: Union[np.ndarray, 'cp.ndarray'], t: float = 0):
"""Write a frame to all quality encoders."""
if not self._is_open:
return
# Convert GPU frame to CPU if needed
if GPU_AVAILABLE and hasattr(frame, 'get'):
frame = frame.get() # CuPy to NumPy
elif hasattr(frame, 'cpu'):
frame = frame.cpu # GPUFrame to NumPy
elif hasattr(frame, 'gpu') and hasattr(frame, 'is_on_gpu'):
frame = frame.gpu.get() if frame.is_on_gpu else frame.cpu
# Ensure correct format
if frame.dtype != np.uint8:
frame = np.clip(frame, 0, 255).astype(np.uint8)
# Ensure contiguous
if not frame.flags['C_CONTIGUOUS']:
frame = np.ascontiguousarray(frame)
frame_bytes = frame.tobytes()
# Write to all encoders
for name, proc in self._encoders.items():
if proc.poll() is not None:
print(f"[MultiResHLS] Encoder {name} died with code {proc.returncode}", file=sys.stderr)
self._is_open = False
return
try:
proc.stdin.write(frame_bytes)
except BrokenPipeError:
print(f"[MultiResHLS] Encoder {name} pipe broken", file=sys.stderr)
self._is_open = False
return
self._frame_count += 1
# Check for new segments periodically
if self._frame_count % int(self.fps * self.segment_duration) == 0:
self._check_and_upload_segments()
def _check_and_upload_segments(self):
"""Check for new segments and queue them for upload."""
for name, quality in self.qualities.items():
quality_dir = self.output_dir / name
segments = sorted(quality_dir.glob("segment_*.ts"))
for seg_path in segments:
seg_num = int(seg_path.stem.split("_")[1])
if seg_num in quality.segment_cids:
continue # Already uploaded
# Check if segment is complete (not still being written)
try:
size1 = seg_path.stat().st_size
if size1 == 0:
continue
time.sleep(0.05)
size2 = seg_path.stat().st_size
if size1 != size2:
continue # Still being written
except FileNotFoundError:
continue
# Queue for upload
self._upload_queue.put((name, seg_path, seg_num))
def _upload_worker(self):
"""Background worker for IPFS uploads."""
while True:
try:
item = self._upload_queue.get(timeout=1.0)
if item is None: # Shutdown signal
break
quality_name, seg_path, seg_num = item
self._do_upload(quality_name, seg_path, seg_num)
except queue.Empty:
continue
except Exception as e:
print(f"[MultiResHLS] Upload worker error: {e}", file=sys.stderr)
def _do_upload(self, quality_name: str, seg_path: Path, seg_num: int):
"""Upload a segment to IPFS."""
try:
cid = self._ipfs_add_file(seg_path, pin=True)
if cid:
self.qualities[quality_name].segment_cids[seg_num] = cid
print(f"[MultiResHLS] Uploaded {quality_name}/segment_{seg_num:05d}.ts -> {cid[:16]}...", file=sys.stderr)
# Update playlists after each upload
self._update_playlists()
except Exception as e:
print(f"[MultiResHLS] Failed to upload {seg_path}: {e}", file=sys.stderr)
def _update_playlists(self):
"""Generate and upload IPFS playlists."""
# Generate quality-specific playlists
for name, quality in self.qualities.items():
if not quality.segment_cids:
continue
playlist = self._generate_quality_playlist(quality)
cid = self._ipfs_add_bytes(playlist.encode(), pin=True)
if cid:
quality.playlist_cid = cid
# Generate master playlist
self._generate_master_playlist()
def _generate_quality_playlist(self, quality: QualityLevel, finalize: bool = False) -> str:
"""Generate HLS playlist for a quality level."""
lines = [
"#EXTM3U",
"#EXT-X-VERSION:3",
f"#EXT-X-TARGETDURATION:{int(self.segment_duration) + 1}",
"#EXT-X-MEDIA-SEQUENCE:0",
]
if finalize:
lines.append("#EXT-X-PLAYLIST-TYPE:VOD")
# Use /ipfs-ts/ for correct MIME type
segment_gateway = self.ipfs_gateway.replace("/ipfs", "/ipfs-ts")
for seg_num in sorted(quality.segment_cids.keys()):
cid = quality.segment_cids[seg_num]
lines.append(f"#EXTINF:{self.segment_duration:.3f},")
lines.append(f"{segment_gateway}/{cid}")
if finalize:
lines.append("#EXT-X-ENDLIST")
return "\n".join(lines) + "\n"
def _generate_master_playlist(self, finalize: bool = False):
"""Generate and upload master playlist."""
lines = ["#EXTM3U", "#EXT-X-VERSION:3"]
for name, quality in self.qualities.items():
if not quality.playlist_cid:
continue
lines.append(
f"#EXT-X-STREAM-INF:BANDWIDTH={quality.bitrate * 1000},"
f"RESOLUTION={quality.width}x{quality.height},"
f"NAME=\"{name}\""
)
lines.append(f"{self.ipfs_gateway}/{quality.playlist_cid}")
if len(lines) <= 2:
return # No quality playlists yet
master_content = "\n".join(lines) + "\n"
cid = self._ipfs_add_bytes(master_content.encode(), pin=True)
if cid:
self._master_playlist_cid = cid
print(f"[MultiResHLS] Master playlist: {cid}", file=sys.stderr)
if self._on_playlist_update:
# Pass both master CID and quality info for dynamic playlist generation
quality_info = {
name: {
"cid": q.playlist_cid,
"width": q.width,
"height": q.height,
"bitrate": q.bitrate,
}
for name, q in self.qualities.items()
if q.playlist_cid
}
self._on_playlist_update(cid, quality_info)
def close(self):
"""Close all encoders and finalize output."""
if not self._is_open:
return
self._is_open = False
print(f"[MultiResHLS] Closing after {self._frame_count} frames", file=sys.stderr)
# Close encoder stdin pipes
for name, proc in self._encoders.items():
try:
proc.stdin.close()
except:
pass
# Wait for encoders to finish
for name, proc in self._encoders.items():
try:
proc.wait(timeout=30)
print(f"[MultiResHLS] Encoder {name} finished with code {proc.returncode}", file=sys.stderr)
except subprocess.TimeoutExpired:
proc.kill()
print(f"[MultiResHLS] Encoder {name} killed (timeout)", file=sys.stderr)
# Final segment check and upload
self._check_and_upload_segments()
# Wait for uploads to complete
self._upload_queue.put(None) # Shutdown signal
self._upload_thread.join(timeout=60)
# Generate final playlists with EXT-X-ENDLIST
for name, quality in self.qualities.items():
if quality.segment_cids:
playlist = self._generate_quality_playlist(quality, finalize=True)
cid = self._ipfs_add_bytes(playlist.encode(), pin=True)
if cid:
quality.playlist_cid = cid
print(f"[MultiResHLS] Final {name} playlist: {cid} ({len(quality.segment_cids)} segments)", file=sys.stderr)
# Final master playlist
self._generate_master_playlist(finalize=True)
print(f"[MultiResHLS] Complete. Master playlist: {self._master_playlist_cid}", file=sys.stderr)
@property
def is_open(self) -> bool:
return self._is_open
@property
def playlist_cid(self) -> Optional[str]:
return self._master_playlist_cid
@property
def playlist_url(self) -> Optional[str]:
if self._master_playlist_cid:
return f"{self.ipfs_gateway}/{self._master_playlist_cid}"
return None
@property
def segment_cids(self) -> Dict[str, Dict[int, str]]:
"""Get all segment CIDs organized by quality."""
return {name: dict(q.segment_cids) for name, q in self.qualities.items()}