510 lines
18 KiB
Python
510 lines
18 KiB
Python
"""
|
|
Multi-Resolution HLS Output with IPFS Storage.
|
|
|
|
Renders video at multiple quality levels simultaneously:
|
|
- Original resolution (from recipe)
|
|
- 720p (streaming quality)
|
|
- 360p (mobile/low bandwidth)
|
|
|
|
All segments stored on IPFS. Master playlist enables adaptive bitrate streaming.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import subprocess
|
|
import threading
|
|
import queue
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple, Union
|
|
from dataclasses import dataclass, field
|
|
|
|
import numpy as np
|
|
|
|
# Try GPU imports
|
|
try:
|
|
import cupy as cp
|
|
GPU_AVAILABLE = True
|
|
except ImportError:
|
|
cp = None
|
|
GPU_AVAILABLE = False
|
|
|
|
|
|
@dataclass
|
|
class QualityLevel:
|
|
"""Configuration for a quality level."""
|
|
name: str
|
|
width: int
|
|
height: int
|
|
bitrate: int # kbps
|
|
segment_cids: Dict[int, str] = field(default_factory=dict)
|
|
playlist_cid: Optional[str] = None
|
|
|
|
|
|
class MultiResolutionHLSOutput:
|
|
"""
|
|
GPU-accelerated multi-resolution HLS output with IPFS storage.
|
|
|
|
Encodes video at multiple quality levels simultaneously using NVENC.
|
|
Segments are uploaded to IPFS as they're created.
|
|
Generates adaptive bitrate master playlist.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
output_dir: str,
|
|
source_size: Tuple[int, int],
|
|
fps: float = 30,
|
|
segment_duration: float = 4.0,
|
|
ipfs_gateway: str = "https://ipfs.io/ipfs",
|
|
on_playlist_update: callable = None,
|
|
audio_source: str = None,
|
|
resume_from: Optional[Dict] = None,
|
|
):
|
|
"""Initialize multi-resolution HLS output.
|
|
|
|
Args:
|
|
output_dir: Directory for HLS output files
|
|
source_size: (width, height) of source frames
|
|
fps: Frames per second
|
|
segment_duration: Duration of each HLS segment in seconds
|
|
ipfs_gateway: IPFS gateway URL for playlist URLs
|
|
on_playlist_update: Callback when playlists are updated
|
|
audio_source: Optional audio file to mux with video
|
|
resume_from: Optional dict to resume from checkpoint with keys:
|
|
- segment_cids: Dict of quality -> {seg_num: cid}
|
|
"""
|
|
self.output_dir = Path(output_dir)
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
self.source_width, self.source_height = source_size
|
|
self.fps = fps
|
|
self.segment_duration = segment_duration
|
|
self.ipfs_gateway = ipfs_gateway.rstrip("/")
|
|
self._on_playlist_update = on_playlist_update
|
|
self.audio_source = audio_source
|
|
self._is_open = True
|
|
self._frame_count = 0
|
|
|
|
# Define quality levels
|
|
self.qualities: Dict[str, QualityLevel] = {}
|
|
self._setup_quality_levels()
|
|
|
|
# Restore segment CIDs if resuming (don't re-upload existing segments)
|
|
if resume_from and resume_from.get('segment_cids'):
|
|
for name, cids in resume_from['segment_cids'].items():
|
|
if name in self.qualities:
|
|
self.qualities[name].segment_cids = dict(cids)
|
|
print(f"[MultiResHLS] Restored {len(cids)} segment CIDs for {name}", file=sys.stderr)
|
|
|
|
# IPFS client
|
|
from ipfs_client import add_file, add_bytes
|
|
self._ipfs_add_file = add_file
|
|
self._ipfs_add_bytes = add_bytes
|
|
|
|
# Upload queue and thread
|
|
self._upload_queue = queue.Queue()
|
|
self._upload_thread = threading.Thread(target=self._upload_worker, daemon=True)
|
|
self._upload_thread.start()
|
|
|
|
# Track master playlist
|
|
self._master_playlist_cid = None
|
|
|
|
# Setup encoders
|
|
self._setup_encoders()
|
|
|
|
print(f"[MultiResHLS] Initialized {self.source_width}x{self.source_height} @ {fps}fps", file=sys.stderr)
|
|
print(f"[MultiResHLS] Quality levels: {list(self.qualities.keys())}", file=sys.stderr)
|
|
|
|
def _setup_quality_levels(self):
|
|
"""Configure quality levels based on source resolution."""
|
|
# Always include original resolution
|
|
self.qualities['original'] = QualityLevel(
|
|
name='original',
|
|
width=self.source_width,
|
|
height=self.source_height,
|
|
bitrate=self._estimate_bitrate(self.source_width, self.source_height),
|
|
)
|
|
|
|
# Add 720p if source is larger
|
|
if self.source_height > 720:
|
|
aspect = self.source_width / self.source_height
|
|
w720 = int(720 * aspect)
|
|
w720 = w720 - (w720 % 2) # Ensure even width
|
|
self.qualities['720p'] = QualityLevel(
|
|
name='720p',
|
|
width=w720,
|
|
height=720,
|
|
bitrate=2500,
|
|
)
|
|
|
|
# Add 360p if source is larger
|
|
if self.source_height > 360:
|
|
aspect = self.source_width / self.source_height
|
|
w360 = int(360 * aspect)
|
|
w360 = w360 - (w360 % 2) # Ensure even width
|
|
self.qualities['360p'] = QualityLevel(
|
|
name='360p',
|
|
width=w360,
|
|
height=360,
|
|
bitrate=800,
|
|
)
|
|
|
|
def _estimate_bitrate(self, width: int, height: int) -> int:
|
|
"""Estimate appropriate bitrate for resolution (in kbps)."""
|
|
pixels = width * height
|
|
if pixels >= 3840 * 2160: # 4K
|
|
return 15000
|
|
elif pixels >= 1920 * 1080: # 1080p
|
|
return 5000
|
|
elif pixels >= 1280 * 720: # 720p
|
|
return 2500
|
|
elif pixels >= 854 * 480: # 480p
|
|
return 1500
|
|
else:
|
|
return 800
|
|
|
|
def _setup_encoders(self):
|
|
"""Setup FFmpeg encoder processes for each quality level."""
|
|
self._encoders: Dict[str, subprocess.Popen] = {}
|
|
self._encoder_threads: Dict[str, threading.Thread] = {}
|
|
|
|
for name, quality in self.qualities.items():
|
|
# Create output directory for this quality
|
|
quality_dir = self.output_dir / name
|
|
quality_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Build FFmpeg command
|
|
cmd = self._build_encoder_cmd(quality, quality_dir)
|
|
|
|
print(f"[MultiResHLS] Starting encoder for {name}: {quality.width}x{quality.height}", file=sys.stderr)
|
|
|
|
# Start encoder process
|
|
proc = subprocess.Popen(
|
|
cmd,
|
|
stdin=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
bufsize=10**7, # Large buffer to prevent blocking
|
|
)
|
|
self._encoders[name] = proc
|
|
|
|
# Start stderr drain thread
|
|
stderr_thread = threading.Thread(
|
|
target=self._drain_stderr,
|
|
args=(name, proc),
|
|
daemon=True
|
|
)
|
|
stderr_thread.start()
|
|
self._encoder_threads[name] = stderr_thread
|
|
|
|
def _build_encoder_cmd(self, quality: QualityLevel, output_dir: Path) -> List[str]:
|
|
"""Build FFmpeg command for a quality level."""
|
|
playlist_path = output_dir / "playlist.m3u8"
|
|
segment_pattern = output_dir / "segment_%05d.ts"
|
|
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-f", "rawvideo",
|
|
"-pixel_format", "rgb24",
|
|
"-video_size", f"{self.source_width}x{self.source_height}",
|
|
"-framerate", str(self.fps),
|
|
"-i", "-",
|
|
]
|
|
|
|
# Add audio input if provided
|
|
if self.audio_source:
|
|
cmd.extend(["-i", str(self.audio_source)])
|
|
# Map video from input 0, audio from input 1
|
|
cmd.extend(["-map", "0:v", "-map", "1:a"])
|
|
|
|
# Scale if not original resolution
|
|
if quality.width != self.source_width or quality.height != self.source_height:
|
|
cmd.extend([
|
|
"-vf", f"scale={quality.width}:{quality.height}:flags=lanczos",
|
|
])
|
|
|
|
# NVENC encoding with quality settings
|
|
cmd.extend([
|
|
"-c:v", "h264_nvenc",
|
|
"-preset", "p4", # Balanced speed/quality
|
|
"-tune", "hq",
|
|
"-b:v", f"{quality.bitrate}k",
|
|
"-maxrate", f"{int(quality.bitrate * 1.5)}k",
|
|
"-bufsize", f"{quality.bitrate * 2}k",
|
|
"-g", str(int(self.fps * self.segment_duration)), # Keyframe interval = segment duration
|
|
"-keyint_min", str(int(self.fps * self.segment_duration)),
|
|
"-sc_threshold", "0", # Disable scene change detection for consistent segments
|
|
])
|
|
|
|
# Add audio encoding if audio source provided
|
|
if self.audio_source:
|
|
cmd.extend([
|
|
"-c:a", "aac",
|
|
"-b:a", "128k",
|
|
"-shortest", # Stop when shortest stream ends
|
|
])
|
|
|
|
# HLS output
|
|
cmd.extend([
|
|
"-f", "hls",
|
|
"-hls_time", str(self.segment_duration),
|
|
"-hls_list_size", "0", # Keep all segments in playlist
|
|
"-hls_flags", "independent_segments+append_list",
|
|
"-hls_segment_type", "mpegts",
|
|
"-hls_segment_filename", str(segment_pattern),
|
|
str(playlist_path),
|
|
])
|
|
|
|
return cmd
|
|
|
|
def _drain_stderr(self, name: str, proc: subprocess.Popen):
|
|
"""Drain FFmpeg stderr to prevent blocking."""
|
|
try:
|
|
for line in proc.stderr:
|
|
line_str = line.decode('utf-8', errors='replace').strip()
|
|
if line_str and ('error' in line_str.lower() or 'warning' in line_str.lower()):
|
|
print(f"[FFmpeg/{name}] {line_str}", file=sys.stderr)
|
|
except Exception as e:
|
|
print(f"[FFmpeg/{name}] stderr drain error: {e}", file=sys.stderr)
|
|
|
|
def write(self, frame: Union[np.ndarray, 'cp.ndarray'], t: float = 0):
|
|
"""Write a frame to all quality encoders."""
|
|
if not self._is_open:
|
|
return
|
|
|
|
# Convert GPU frame to CPU if needed
|
|
if GPU_AVAILABLE and hasattr(frame, 'get'):
|
|
frame = frame.get() # CuPy to NumPy
|
|
elif hasattr(frame, 'cpu'):
|
|
frame = frame.cpu # GPUFrame to NumPy
|
|
elif hasattr(frame, 'gpu') and hasattr(frame, 'is_on_gpu'):
|
|
frame = frame.gpu.get() if frame.is_on_gpu else frame.cpu
|
|
|
|
# Ensure correct format
|
|
if frame.dtype != np.uint8:
|
|
frame = np.clip(frame, 0, 255).astype(np.uint8)
|
|
|
|
# Ensure contiguous
|
|
if not frame.flags['C_CONTIGUOUS']:
|
|
frame = np.ascontiguousarray(frame)
|
|
|
|
frame_bytes = frame.tobytes()
|
|
|
|
# Write to all encoders
|
|
for name, proc in self._encoders.items():
|
|
if proc.poll() is not None:
|
|
print(f"[MultiResHLS] Encoder {name} died with code {proc.returncode}", file=sys.stderr)
|
|
self._is_open = False
|
|
return
|
|
|
|
try:
|
|
proc.stdin.write(frame_bytes)
|
|
except BrokenPipeError:
|
|
print(f"[MultiResHLS] Encoder {name} pipe broken", file=sys.stderr)
|
|
self._is_open = False
|
|
return
|
|
|
|
self._frame_count += 1
|
|
|
|
# Check for new segments periodically
|
|
if self._frame_count % int(self.fps * self.segment_duration) == 0:
|
|
self._check_and_upload_segments()
|
|
|
|
def _check_and_upload_segments(self):
|
|
"""Check for new segments and queue them for upload."""
|
|
for name, quality in self.qualities.items():
|
|
quality_dir = self.output_dir / name
|
|
segments = sorted(quality_dir.glob("segment_*.ts"))
|
|
|
|
for seg_path in segments:
|
|
seg_num = int(seg_path.stem.split("_")[1])
|
|
|
|
if seg_num in quality.segment_cids:
|
|
continue # Already uploaded
|
|
|
|
# Check if segment is complete (not still being written)
|
|
try:
|
|
size1 = seg_path.stat().st_size
|
|
if size1 == 0:
|
|
continue
|
|
time.sleep(0.05)
|
|
size2 = seg_path.stat().st_size
|
|
if size1 != size2:
|
|
continue # Still being written
|
|
except FileNotFoundError:
|
|
continue
|
|
|
|
# Queue for upload
|
|
self._upload_queue.put((name, seg_path, seg_num))
|
|
|
|
def _upload_worker(self):
|
|
"""Background worker for IPFS uploads."""
|
|
while True:
|
|
try:
|
|
item = self._upload_queue.get(timeout=1.0)
|
|
if item is None: # Shutdown signal
|
|
break
|
|
|
|
quality_name, seg_path, seg_num = item
|
|
self._do_upload(quality_name, seg_path, seg_num)
|
|
|
|
except queue.Empty:
|
|
continue
|
|
except Exception as e:
|
|
print(f"[MultiResHLS] Upload worker error: {e}", file=sys.stderr)
|
|
|
|
def _do_upload(self, quality_name: str, seg_path: Path, seg_num: int):
|
|
"""Upload a segment to IPFS."""
|
|
try:
|
|
cid = self._ipfs_add_file(seg_path, pin=True)
|
|
if cid:
|
|
self.qualities[quality_name].segment_cids[seg_num] = cid
|
|
print(f"[MultiResHLS] Uploaded {quality_name}/segment_{seg_num:05d}.ts -> {cid[:16]}...", file=sys.stderr)
|
|
|
|
# Update playlists after each upload
|
|
self._update_playlists()
|
|
except Exception as e:
|
|
print(f"[MultiResHLS] Failed to upload {seg_path}: {e}", file=sys.stderr)
|
|
|
|
def _update_playlists(self):
|
|
"""Generate and upload IPFS playlists."""
|
|
# Generate quality-specific playlists
|
|
for name, quality in self.qualities.items():
|
|
if not quality.segment_cids:
|
|
continue
|
|
|
|
playlist = self._generate_quality_playlist(quality)
|
|
cid = self._ipfs_add_bytes(playlist.encode(), pin=True)
|
|
if cid:
|
|
quality.playlist_cid = cid
|
|
|
|
# Generate master playlist
|
|
self._generate_master_playlist()
|
|
|
|
def _generate_quality_playlist(self, quality: QualityLevel, finalize: bool = False) -> str:
|
|
"""Generate HLS playlist for a quality level."""
|
|
lines = [
|
|
"#EXTM3U",
|
|
"#EXT-X-VERSION:3",
|
|
f"#EXT-X-TARGETDURATION:{int(self.segment_duration) + 1}",
|
|
"#EXT-X-MEDIA-SEQUENCE:0",
|
|
]
|
|
|
|
if finalize:
|
|
lines.append("#EXT-X-PLAYLIST-TYPE:VOD")
|
|
|
|
# Use /ipfs-ts/ for correct MIME type
|
|
segment_gateway = self.ipfs_gateway.replace("/ipfs", "/ipfs-ts")
|
|
|
|
for seg_num in sorted(quality.segment_cids.keys()):
|
|
cid = quality.segment_cids[seg_num]
|
|
lines.append(f"#EXTINF:{self.segment_duration:.3f},")
|
|
lines.append(f"{segment_gateway}/{cid}")
|
|
|
|
if finalize:
|
|
lines.append("#EXT-X-ENDLIST")
|
|
|
|
return "\n".join(lines) + "\n"
|
|
|
|
def _generate_master_playlist(self, finalize: bool = False):
|
|
"""Generate and upload master playlist."""
|
|
lines = ["#EXTM3U", "#EXT-X-VERSION:3"]
|
|
|
|
for name, quality in self.qualities.items():
|
|
if not quality.playlist_cid:
|
|
continue
|
|
|
|
lines.append(
|
|
f"#EXT-X-STREAM-INF:BANDWIDTH={quality.bitrate * 1000},"
|
|
f"RESOLUTION={quality.width}x{quality.height},"
|
|
f"NAME=\"{name}\""
|
|
)
|
|
lines.append(f"{self.ipfs_gateway}/{quality.playlist_cid}")
|
|
|
|
if len(lines) <= 2:
|
|
return # No quality playlists yet
|
|
|
|
master_content = "\n".join(lines) + "\n"
|
|
cid = self._ipfs_add_bytes(master_content.encode(), pin=True)
|
|
|
|
if cid:
|
|
self._master_playlist_cid = cid
|
|
print(f"[MultiResHLS] Master playlist: {cid}", file=sys.stderr)
|
|
|
|
if self._on_playlist_update:
|
|
# Pass both master CID and quality info for dynamic playlist generation
|
|
quality_info = {
|
|
name: {
|
|
"cid": q.playlist_cid,
|
|
"width": q.width,
|
|
"height": q.height,
|
|
"bitrate": q.bitrate,
|
|
}
|
|
for name, q in self.qualities.items()
|
|
if q.playlist_cid
|
|
}
|
|
self._on_playlist_update(cid, quality_info)
|
|
|
|
def close(self):
|
|
"""Close all encoders and finalize output."""
|
|
if not self._is_open:
|
|
return
|
|
|
|
self._is_open = False
|
|
print(f"[MultiResHLS] Closing after {self._frame_count} frames", file=sys.stderr)
|
|
|
|
# Close encoder stdin pipes
|
|
for name, proc in self._encoders.items():
|
|
try:
|
|
proc.stdin.close()
|
|
except:
|
|
pass
|
|
|
|
# Wait for encoders to finish
|
|
for name, proc in self._encoders.items():
|
|
try:
|
|
proc.wait(timeout=30)
|
|
print(f"[MultiResHLS] Encoder {name} finished with code {proc.returncode}", file=sys.stderr)
|
|
except subprocess.TimeoutExpired:
|
|
proc.kill()
|
|
print(f"[MultiResHLS] Encoder {name} killed (timeout)", file=sys.stderr)
|
|
|
|
# Final segment check and upload
|
|
self._check_and_upload_segments()
|
|
|
|
# Wait for uploads to complete
|
|
self._upload_queue.put(None) # Shutdown signal
|
|
self._upload_thread.join(timeout=60)
|
|
|
|
# Generate final playlists with EXT-X-ENDLIST
|
|
for name, quality in self.qualities.items():
|
|
if quality.segment_cids:
|
|
playlist = self._generate_quality_playlist(quality, finalize=True)
|
|
cid = self._ipfs_add_bytes(playlist.encode(), pin=True)
|
|
if cid:
|
|
quality.playlist_cid = cid
|
|
print(f"[MultiResHLS] Final {name} playlist: {cid} ({len(quality.segment_cids)} segments)", file=sys.stderr)
|
|
|
|
# Final master playlist
|
|
self._generate_master_playlist(finalize=True)
|
|
|
|
print(f"[MultiResHLS] Complete. Master playlist: {self._master_playlist_cid}", file=sys.stderr)
|
|
|
|
@property
|
|
def is_open(self) -> bool:
|
|
return self._is_open
|
|
|
|
@property
|
|
def playlist_cid(self) -> Optional[str]:
|
|
return self._master_playlist_cid
|
|
|
|
@property
|
|
def playlist_url(self) -> Optional[str]:
|
|
if self._master_playlist_cid:
|
|
return f"{self.ipfs_gateway}/{self._master_playlist_cid}"
|
|
return None
|
|
|
|
@property
|
|
def segment_cids(self) -> Dict[str, Dict[int, str]]:
|
|
"""Get all segment CIDs organized by quality."""
|
|
return {name: dict(q.segment_cids) for name, q in self.qualities.items()}
|