- Add sexp_to_jax.py: JAX compiler for S-expression effects - Use jax.random.fold_in for deterministic but varying random per frame - Pass seed from recipe config through to JAX effects - Fix NVENC detection to do actual encode test - Add set_random_seed for deterministic Python random The fold_in approach allows frame_num to be traced (not static) while still producing different random patterns per frame, fixing the interference pattern issue. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
963 lines
30 KiB
Python
963 lines
30 KiB
Python
"""
|
|
Output targets for streaming compositor.
|
|
|
|
Supports:
|
|
- Display window (preview)
|
|
- File output (recording)
|
|
- Stream output (RTMP, etc.) - future
|
|
- NVENC hardware encoding (auto-detected)
|
|
- CuPy GPU arrays (auto-converted to numpy for output)
|
|
"""
|
|
|
|
import numpy as np
|
|
import subprocess
|
|
import threading
|
|
import queue
|
|
from abc import ABC, abstractmethod
|
|
from typing import Tuple, Optional, List, Union
|
|
from pathlib import Path
|
|
|
|
# Try to import CuPy for GPU array support
|
|
try:
|
|
import cupy as cp
|
|
CUPY_AVAILABLE = True
|
|
except ImportError:
|
|
cp = None
|
|
CUPY_AVAILABLE = False
|
|
|
|
|
|
def ensure_numpy(frame: Union[np.ndarray, 'cp.ndarray']) -> np.ndarray:
|
|
"""Convert frame to numpy array if it's a CuPy array."""
|
|
if CUPY_AVAILABLE and isinstance(frame, cp.ndarray):
|
|
return cp.asnumpy(frame)
|
|
return frame
|
|
|
|
# Cache NVENC availability check
|
|
_nvenc_available: Optional[bool] = None
|
|
|
|
|
|
def check_nvenc_available() -> bool:
|
|
"""Check if NVENC hardware encoding is available and working.
|
|
|
|
Does a real encode test to catch cases where nvenc is listed
|
|
but CUDA libraries aren't loaded.
|
|
"""
|
|
global _nvenc_available
|
|
if _nvenc_available is not None:
|
|
return _nvenc_available
|
|
|
|
try:
|
|
# First check if encoder is listed
|
|
result = subprocess.run(
|
|
["ffmpeg", "-encoders"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if "h264_nvenc" not in result.stdout:
|
|
_nvenc_available = False
|
|
return _nvenc_available
|
|
|
|
# Actually try to encode a small test frame
|
|
result = subprocess.run(
|
|
["ffmpeg", "-y", "-f", "lavfi", "-i", "testsrc=duration=0.1:size=64x64:rate=1",
|
|
"-c:v", "h264_nvenc", "-f", "null", "-"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
_nvenc_available = result.returncode == 0
|
|
if not _nvenc_available:
|
|
import sys
|
|
print("NVENC listed but not working, falling back to libx264", file=sys.stderr)
|
|
except Exception:
|
|
_nvenc_available = False
|
|
|
|
return _nvenc_available
|
|
|
|
|
|
def get_encoder_params(codec: str, preset: str, crf: int) -> List[str]:
|
|
"""
|
|
Get encoder-specific FFmpeg parameters.
|
|
|
|
For NVENC (h264_nvenc, hevc_nvenc):
|
|
- Uses -cq for constant quality (similar to CRF)
|
|
- Presets: p1 (fastest) to p7 (slowest/best quality)
|
|
- Mapping: fast->p4, medium->p5, slow->p6
|
|
|
|
For libx264:
|
|
- Uses -crf for constant rate factor
|
|
- Presets: ultrafast, superfast, veryfast, faster, fast, medium, slow, slower, veryslow
|
|
"""
|
|
if codec in ("h264_nvenc", "hevc_nvenc"):
|
|
# Map libx264 presets to NVENC presets
|
|
nvenc_preset_map = {
|
|
"ultrafast": "p1",
|
|
"superfast": "p2",
|
|
"veryfast": "p3",
|
|
"faster": "p3",
|
|
"fast": "p4",
|
|
"medium": "p5",
|
|
"slow": "p6",
|
|
"slower": "p6",
|
|
"veryslow": "p7",
|
|
}
|
|
nvenc_preset = nvenc_preset_map.get(preset, "p4")
|
|
|
|
# NVENC quality: 0 (best) to 51 (worst), similar to CRF
|
|
# CRF 18 = high quality, CRF 23 = good quality
|
|
return [
|
|
"-c:v", codec,
|
|
"-preset", nvenc_preset,
|
|
"-cq", str(crf), # Constant quality mode
|
|
"-rc", "vbr", # Variable bitrate with quality target
|
|
]
|
|
else:
|
|
# Standard libx264 params
|
|
return [
|
|
"-c:v", codec,
|
|
"-preset", preset,
|
|
"-crf", str(crf),
|
|
]
|
|
|
|
|
|
class Output(ABC):
|
|
"""Abstract base class for output targets."""
|
|
|
|
@abstractmethod
|
|
def write(self, frame: np.ndarray, t: float):
|
|
"""Write a frame to the output."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def close(self):
|
|
"""Close the output and clean up resources."""
|
|
pass
|
|
|
|
@property
|
|
@abstractmethod
|
|
def is_open(self) -> bool:
|
|
"""Check if output is still open/valid."""
|
|
pass
|
|
|
|
|
|
class DisplayOutput(Output):
|
|
"""
|
|
Display frames using mpv (handles Wayland properly).
|
|
|
|
Useful for live preview. Press 'q' to quit.
|
|
"""
|
|
|
|
def __init__(self, title: str = "Streaming Preview", size: Tuple[int, int] = None,
|
|
audio_source: str = None, fps: float = 30):
|
|
self.title = title
|
|
self.size = size
|
|
self.audio_source = audio_source
|
|
self.fps = fps
|
|
self._is_open = True
|
|
self._process = None
|
|
self._audio_process = None
|
|
|
|
def _start_mpv(self, frame_size: Tuple[int, int]):
|
|
"""Start mpv process for display."""
|
|
import sys
|
|
w, h = frame_size
|
|
cmd = [
|
|
"mpv",
|
|
"--no-cache",
|
|
"--demuxer=rawvideo",
|
|
f"--demuxer-rawvideo-w={w}",
|
|
f"--demuxer-rawvideo-h={h}",
|
|
"--demuxer-rawvideo-mp-format=rgb24",
|
|
f"--demuxer-rawvideo-fps={self.fps}",
|
|
f"--title={self.title}",
|
|
"-",
|
|
]
|
|
print(f"Starting mpv: {' '.join(cmd)}", file=sys.stderr)
|
|
self._process = subprocess.Popen(
|
|
cmd,
|
|
stdin=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
|
|
# Start audio playback if we have an audio source
|
|
if self.audio_source:
|
|
audio_cmd = [
|
|
"ffplay", "-nodisp", "-autoexit", "-loglevel", "quiet",
|
|
str(self.audio_source)
|
|
]
|
|
print(f"Starting audio: {self.audio_source}", file=sys.stderr)
|
|
self._audio_process = subprocess.Popen(
|
|
audio_cmd,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
|
|
def write(self, frame: np.ndarray, t: float):
|
|
"""Display frame."""
|
|
if not self._is_open:
|
|
return
|
|
|
|
# Convert GPU array to numpy if needed
|
|
frame = ensure_numpy(frame)
|
|
|
|
# Ensure frame is correct format
|
|
if frame.dtype != np.uint8:
|
|
frame = np.clip(frame, 0, 255).astype(np.uint8)
|
|
if not frame.flags['C_CONTIGUOUS']:
|
|
frame = np.ascontiguousarray(frame)
|
|
|
|
# Start mpv on first frame
|
|
if self._process is None:
|
|
self._start_mpv((frame.shape[1], frame.shape[0]))
|
|
|
|
# Check if mpv is still running
|
|
if self._process.poll() is not None:
|
|
self._is_open = False
|
|
return
|
|
|
|
try:
|
|
self._process.stdin.write(frame.tobytes())
|
|
self._process.stdin.flush() # Prevent buffering
|
|
except BrokenPipeError:
|
|
self._is_open = False
|
|
|
|
def close(self):
|
|
"""Close the display and audio."""
|
|
if self._process:
|
|
try:
|
|
self._process.stdin.close()
|
|
except:
|
|
pass
|
|
self._process.terminate()
|
|
self._process.wait()
|
|
if self._audio_process:
|
|
self._audio_process.terminate()
|
|
self._audio_process.wait()
|
|
self._is_open = False
|
|
|
|
@property
|
|
def is_open(self) -> bool:
|
|
if self._process and self._process.poll() is not None:
|
|
self._is_open = False
|
|
return self._is_open
|
|
|
|
|
|
class FileOutput(Output):
|
|
"""
|
|
Write frames to a video file using ffmpeg.
|
|
|
|
Automatically uses NVENC hardware encoding when available,
|
|
falling back to libx264 CPU encoding otherwise.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
path: str,
|
|
size: Tuple[int, int],
|
|
fps: float = 30,
|
|
codec: str = "auto", # "auto", "h264_nvenc", "libx264"
|
|
crf: int = 18,
|
|
preset: str = "fast",
|
|
audio_source: str = None,
|
|
):
|
|
self.path = Path(path)
|
|
self.size = size
|
|
self.fps = fps
|
|
self._is_open = True
|
|
|
|
# Auto-detect NVENC
|
|
if codec == "auto":
|
|
codec = "h264_nvenc" if check_nvenc_available() else "libx264"
|
|
self.codec = codec
|
|
|
|
# Build ffmpeg command
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-f", "rawvideo",
|
|
"-vcodec", "rawvideo",
|
|
"-pix_fmt", "rgb24",
|
|
"-s", f"{size[0]}x{size[1]}",
|
|
"-r", str(fps),
|
|
"-i", "-",
|
|
]
|
|
|
|
# Add audio input if provided
|
|
if audio_source:
|
|
cmd.extend(["-i", str(audio_source)])
|
|
# Explicitly map: video from input 0 (rawvideo), audio from input 1
|
|
cmd.extend(["-map", "0:v", "-map", "1:a"])
|
|
|
|
# Get encoder-specific params
|
|
cmd.extend(get_encoder_params(codec, preset, crf))
|
|
cmd.extend(["-pix_fmt", "yuv420p"])
|
|
|
|
# Add audio codec if we have audio
|
|
if audio_source:
|
|
cmd.extend(["-c:a", "aac", "-b:a", "192k", "-shortest"])
|
|
|
|
# Use fragmented mp4 for streamable output while writing
|
|
if str(self.path).endswith('.mp4'):
|
|
cmd.extend(["-movflags", "frag_keyframe+empty_moov+default_base_moof"])
|
|
|
|
cmd.append(str(self.path))
|
|
|
|
import sys
|
|
print(f"FileOutput cmd: {' '.join(cmd)}", file=sys.stderr)
|
|
self._process = subprocess.Popen(
|
|
cmd,
|
|
stdin=subprocess.PIPE,
|
|
stderr=None, # Show errors for debugging
|
|
)
|
|
|
|
def write(self, frame: np.ndarray, t: float):
|
|
"""Write frame to video file."""
|
|
if not self._is_open or self._process.poll() is not None:
|
|
self._is_open = False
|
|
return
|
|
|
|
# Convert GPU array to numpy if needed
|
|
frame = ensure_numpy(frame)
|
|
|
|
# Resize if needed
|
|
if frame.shape[1] != self.size[0] or frame.shape[0] != self.size[1]:
|
|
import cv2
|
|
frame = cv2.resize(frame, self.size)
|
|
|
|
# Ensure correct format
|
|
if frame.dtype != np.uint8:
|
|
frame = np.clip(frame, 0, 255).astype(np.uint8)
|
|
if not frame.flags['C_CONTIGUOUS']:
|
|
frame = np.ascontiguousarray(frame)
|
|
|
|
try:
|
|
self._process.stdin.write(frame.tobytes())
|
|
except BrokenPipeError:
|
|
self._is_open = False
|
|
|
|
def close(self):
|
|
"""Close the video file."""
|
|
if self._process:
|
|
self._process.stdin.close()
|
|
self._process.wait()
|
|
self._is_open = False
|
|
|
|
@property
|
|
def is_open(self) -> bool:
|
|
return self._is_open and self._process.poll() is None
|
|
|
|
|
|
class MultiOutput(Output):
|
|
"""
|
|
Write to multiple outputs simultaneously.
|
|
|
|
Useful for recording while showing preview.
|
|
"""
|
|
|
|
def __init__(self, outputs: list):
|
|
self.outputs = outputs
|
|
|
|
def write(self, frame: np.ndarray, t: float):
|
|
for output in self.outputs:
|
|
if output.is_open:
|
|
output.write(frame, t)
|
|
|
|
def close(self):
|
|
for output in self.outputs:
|
|
output.close()
|
|
|
|
@property
|
|
def is_open(self) -> bool:
|
|
return any(o.is_open for o in self.outputs)
|
|
|
|
|
|
class NullOutput(Output):
|
|
"""
|
|
Discard frames (for benchmarking).
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._is_open = True
|
|
self.frame_count = 0
|
|
|
|
def write(self, frame: np.ndarray, t: float):
|
|
self.frame_count += 1
|
|
|
|
def close(self):
|
|
self._is_open = False
|
|
|
|
@property
|
|
def is_open(self) -> bool:
|
|
return self._is_open
|
|
|
|
|
|
class PipeOutput(Output):
|
|
"""
|
|
Pipe frames directly to mpv.
|
|
|
|
Launches mpv with rawvideo demuxer and writes frames to stdin.
|
|
"""
|
|
|
|
def __init__(self, size: Tuple[int, int], fps: float = 30, audio_source: str = None):
|
|
self.size = size
|
|
self.fps = fps
|
|
self.audio_source = audio_source
|
|
self._is_open = True
|
|
self._process = None
|
|
self._audio_process = None
|
|
self._started = False
|
|
|
|
def _start(self):
|
|
"""Start mpv and audio on first frame."""
|
|
if self._started:
|
|
return
|
|
self._started = True
|
|
|
|
import sys
|
|
w, h = self.size
|
|
|
|
# Start mpv
|
|
cmd = [
|
|
"mpv", "--no-cache",
|
|
"--demuxer=rawvideo",
|
|
f"--demuxer-rawvideo-w={w}",
|
|
f"--demuxer-rawvideo-h={h}",
|
|
"--demuxer-rawvideo-mp-format=rgb24",
|
|
f"--demuxer-rawvideo-fps={self.fps}",
|
|
"--title=Streaming",
|
|
"-"
|
|
]
|
|
print(f"Starting mpv: {w}x{h} @ {self.fps}fps", file=sys.stderr)
|
|
self._process = subprocess.Popen(
|
|
cmd,
|
|
stdin=subprocess.PIPE,
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
|
|
# Start audio
|
|
if self.audio_source:
|
|
audio_cmd = [
|
|
"ffplay", "-nodisp", "-autoexit", "-loglevel", "quiet",
|
|
str(self.audio_source)
|
|
]
|
|
print(f"Starting audio: {self.audio_source}", file=sys.stderr)
|
|
self._audio_process = subprocess.Popen(
|
|
audio_cmd,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
|
|
def write(self, frame: np.ndarray, t: float):
|
|
"""Write frame to mpv."""
|
|
if not self._is_open:
|
|
return
|
|
|
|
self._start()
|
|
|
|
# Check mpv still running
|
|
if self._process.poll() is not None:
|
|
self._is_open = False
|
|
return
|
|
|
|
# Convert GPU array to numpy if needed
|
|
frame = ensure_numpy(frame)
|
|
|
|
# Resize if needed
|
|
if frame.shape[1] != self.size[0] or frame.shape[0] != self.size[1]:
|
|
import cv2
|
|
frame = cv2.resize(frame, self.size)
|
|
|
|
# Ensure correct format
|
|
if frame.dtype != np.uint8:
|
|
frame = np.clip(frame, 0, 255).astype(np.uint8)
|
|
if not frame.flags['C_CONTIGUOUS']:
|
|
frame = np.ascontiguousarray(frame)
|
|
|
|
try:
|
|
self._process.stdin.write(frame.tobytes())
|
|
self._process.stdin.flush()
|
|
except BrokenPipeError:
|
|
self._is_open = False
|
|
|
|
def close(self):
|
|
"""Close mpv and audio."""
|
|
if self._process:
|
|
try:
|
|
self._process.stdin.close()
|
|
except:
|
|
pass
|
|
self._process.terminate()
|
|
self._process.wait()
|
|
if self._audio_process:
|
|
self._audio_process.terminate()
|
|
self._audio_process.wait()
|
|
self._is_open = False
|
|
|
|
@property
|
|
def is_open(self) -> bool:
|
|
if self._process and self._process.poll() is not None:
|
|
self._is_open = False
|
|
return self._is_open
|
|
|
|
|
|
class HLSOutput(Output):
|
|
"""
|
|
Write frames as HLS stream (m3u8 playlist + .ts segments).
|
|
|
|
This enables true live streaming where the browser can poll
|
|
for new segments as they become available.
|
|
|
|
Automatically uses NVENC hardware encoding when available.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
output_dir: str,
|
|
size: Tuple[int, int],
|
|
fps: float = 30,
|
|
segment_duration: float = 4.0, # 4s segments for stability
|
|
codec: str = "auto", # "auto", "h264_nvenc", "libx264"
|
|
crf: int = 23,
|
|
preset: str = "fast", # Better quality than ultrafast
|
|
audio_source: str = None,
|
|
):
|
|
self.output_dir = Path(output_dir)
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
self.size = size
|
|
self.fps = fps
|
|
self.segment_duration = segment_duration
|
|
self._is_open = True
|
|
|
|
# Auto-detect NVENC
|
|
if codec == "auto":
|
|
codec = "h264_nvenc" if check_nvenc_available() else "libx264"
|
|
self.codec = codec
|
|
|
|
# HLS playlist path
|
|
self.playlist_path = self.output_dir / "stream.m3u8"
|
|
|
|
# Build ffmpeg command for HLS output
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-f", "rawvideo",
|
|
"-vcodec", "rawvideo",
|
|
"-pix_fmt", "rgb24",
|
|
"-s", f"{size[0]}x{size[1]}",
|
|
"-r", str(fps),
|
|
"-i", "-",
|
|
]
|
|
|
|
# Add audio input if provided
|
|
if audio_source:
|
|
cmd.extend(["-i", str(audio_source)])
|
|
cmd.extend(["-map", "0:v", "-map", "1:a"])
|
|
|
|
# Keyframe interval - must be exactly segment_duration for clean cuts
|
|
gop_size = int(fps * segment_duration)
|
|
|
|
# Get encoder-specific params
|
|
cmd.extend(get_encoder_params(codec, preset, crf))
|
|
cmd.extend([
|
|
"-pix_fmt", "yuv420p",
|
|
# Force keyframes at exact intervals for clean segment boundaries
|
|
"-g", str(gop_size),
|
|
"-keyint_min", str(gop_size),
|
|
"-sc_threshold", "0", # Disable scene change detection
|
|
"-force_key_frames", f"expr:gte(t,n_forced*{segment_duration})",
|
|
# Reduce buffering for faster segment availability
|
|
"-flush_packets", "1",
|
|
])
|
|
|
|
# Add audio codec if we have audio
|
|
if audio_source:
|
|
cmd.extend(["-c:a", "aac", "-b:a", "128k", "-shortest"])
|
|
|
|
# HLS specific options for smooth live streaming
|
|
cmd.extend([
|
|
"-f", "hls",
|
|
"-hls_time", str(segment_duration),
|
|
"-hls_list_size", "0", # Keep all segments in playlist
|
|
"-hls_flags", "independent_segments+append_list+split_by_time",
|
|
"-hls_segment_type", "mpegts",
|
|
"-hls_segment_filename", str(self.output_dir / "segment_%05d.ts"),
|
|
str(self.playlist_path),
|
|
])
|
|
|
|
import sys
|
|
print(f"HLSOutput cmd: {' '.join(cmd)}", file=sys.stderr)
|
|
self._process = subprocess.Popen(
|
|
cmd,
|
|
stdin=subprocess.PIPE,
|
|
stderr=None, # Show errors for debugging
|
|
)
|
|
|
|
# Track segments for status reporting
|
|
self.segments_written = 0
|
|
self._last_segment_check = 0
|
|
|
|
def write(self, frame: np.ndarray, t: float):
|
|
"""Write frame to HLS stream."""
|
|
if not self._is_open or self._process.poll() is not None:
|
|
self._is_open = False
|
|
return
|
|
|
|
# Convert GPU array to numpy if needed
|
|
frame = ensure_numpy(frame)
|
|
|
|
# Resize if needed
|
|
if frame.shape[1] != self.size[0] or frame.shape[0] != self.size[1]:
|
|
import cv2
|
|
frame = cv2.resize(frame, self.size)
|
|
|
|
# Ensure correct format
|
|
if frame.dtype != np.uint8:
|
|
frame = np.clip(frame, 0, 255).astype(np.uint8)
|
|
if not frame.flags['C_CONTIGUOUS']:
|
|
frame = np.ascontiguousarray(frame)
|
|
|
|
try:
|
|
self._process.stdin.write(frame.tobytes())
|
|
except BrokenPipeError:
|
|
self._is_open = False
|
|
|
|
# Periodically count segments
|
|
if t - self._last_segment_check > 1.0:
|
|
self._last_segment_check = t
|
|
self.segments_written = len(list(self.output_dir.glob("segment_*.ts")))
|
|
|
|
def close(self):
|
|
"""Close the HLS stream."""
|
|
if self._process:
|
|
self._process.stdin.close()
|
|
self._process.wait()
|
|
self._is_open = False
|
|
|
|
# Final segment count
|
|
self.segments_written = len(list(self.output_dir.glob("segment_*.ts")))
|
|
|
|
# Mark playlist as ended (VOD mode)
|
|
if self.playlist_path.exists():
|
|
with open(self.playlist_path, "a") as f:
|
|
f.write("#EXT-X-ENDLIST\n")
|
|
|
|
@property
|
|
def is_open(self) -> bool:
|
|
return self._is_open and self._process.poll() is None
|
|
|
|
|
|
class IPFSHLSOutput(Output):
|
|
"""
|
|
Write frames as HLS stream with segments uploaded to IPFS.
|
|
|
|
Each segment is uploaded to IPFS as it's created, enabling distributed
|
|
streaming where clients can fetch segments from any IPFS gateway.
|
|
|
|
The m3u8 playlist is continuously updated with IPFS URLs and can be
|
|
fetched via get_playlist() or the playlist_cid property.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
output_dir: str,
|
|
size: Tuple[int, int],
|
|
fps: float = 30,
|
|
segment_duration: float = 4.0,
|
|
codec: str = "auto",
|
|
crf: int = 23,
|
|
preset: str = "fast",
|
|
audio_source: str = None,
|
|
ipfs_gateway: str = "https://ipfs.io/ipfs",
|
|
on_playlist_update: callable = None,
|
|
):
|
|
self.output_dir = Path(output_dir)
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
self.size = size
|
|
self.fps = fps
|
|
self.segment_duration = segment_duration
|
|
self.ipfs_gateway = ipfs_gateway.rstrip("/")
|
|
self._is_open = True
|
|
self._on_playlist_update = on_playlist_update # Callback when playlist CID changes
|
|
|
|
# Auto-detect NVENC
|
|
if codec == "auto":
|
|
codec = "h264_nvenc" if check_nvenc_available() else "libx264"
|
|
self.codec = codec
|
|
|
|
# Track segment CIDs
|
|
self.segment_cids: dict = {} # segment_number -> cid
|
|
self._last_segment_checked = -1
|
|
self._playlist_cid: Optional[str] = None
|
|
self._upload_lock = threading.Lock()
|
|
|
|
# Import IPFS client
|
|
from ipfs_client import add_file, add_bytes
|
|
self._ipfs_add_file = add_file
|
|
self._ipfs_add_bytes = add_bytes
|
|
|
|
# Background upload thread for async IPFS uploads
|
|
self._upload_queue = queue.Queue()
|
|
self._upload_thread = threading.Thread(target=self._upload_worker, daemon=True)
|
|
self._upload_thread.start()
|
|
|
|
# Local HLS paths
|
|
self.local_playlist_path = self.output_dir / "stream.m3u8"
|
|
|
|
# Build ffmpeg command for HLS output
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-f", "rawvideo",
|
|
"-vcodec", "rawvideo",
|
|
"-pix_fmt", "rgb24",
|
|
"-s", f"{size[0]}x{size[1]}",
|
|
"-r", str(fps),
|
|
"-i", "-",
|
|
]
|
|
|
|
# Add audio input if provided
|
|
if audio_source:
|
|
cmd.extend(["-i", str(audio_source)])
|
|
cmd.extend(["-map", "0:v", "-map", "1:a"])
|
|
|
|
# Keyframe interval
|
|
gop_size = int(fps * segment_duration)
|
|
|
|
# Get encoder-specific params
|
|
cmd.extend(get_encoder_params(codec, preset, crf))
|
|
cmd.extend([
|
|
"-pix_fmt", "yuv420p",
|
|
"-g", str(gop_size),
|
|
"-keyint_min", str(gop_size),
|
|
"-sc_threshold", "0",
|
|
"-force_key_frames", f"expr:gte(t,n_forced*{segment_duration})",
|
|
"-flush_packets", "1",
|
|
])
|
|
|
|
# Add audio codec if we have audio
|
|
if audio_source:
|
|
cmd.extend(["-c:a", "aac", "-b:a", "128k", "-shortest"])
|
|
|
|
# HLS options
|
|
cmd.extend([
|
|
"-f", "hls",
|
|
"-hls_time", str(segment_duration),
|
|
"-hls_list_size", "0",
|
|
"-hls_flags", "independent_segments+append_list+split_by_time",
|
|
"-hls_segment_type", "mpegts",
|
|
"-hls_segment_filename", str(self.output_dir / "segment_%05d.ts"),
|
|
str(self.local_playlist_path),
|
|
])
|
|
|
|
import sys
|
|
print(f"IPFSHLSOutput: starting ffmpeg", file=sys.stderr)
|
|
self._process = subprocess.Popen(
|
|
cmd,
|
|
stdin=subprocess.PIPE,
|
|
stderr=None,
|
|
)
|
|
|
|
def _upload_worker(self):
|
|
"""Background worker thread for async IPFS uploads."""
|
|
import sys
|
|
while True:
|
|
try:
|
|
item = self._upload_queue.get(timeout=1.0)
|
|
if item is None: # Shutdown signal
|
|
break
|
|
seg_path, seg_num = item
|
|
self._do_upload(seg_path, seg_num)
|
|
except queue.Empty:
|
|
continue
|
|
except Exception as e:
|
|
print(f"Upload worker error: {e}", file=sys.stderr)
|
|
|
|
def _do_upload(self, seg_path: Path, seg_num: int):
|
|
"""Actually perform the upload (runs in background thread)."""
|
|
import sys
|
|
try:
|
|
cid = self._ipfs_add_file(seg_path, pin=True)
|
|
if cid:
|
|
with self._upload_lock:
|
|
self.segment_cids[seg_num] = cid
|
|
print(f"IPFS: segment_{seg_num:05d}.ts -> {cid}", file=sys.stderr)
|
|
self._update_ipfs_playlist()
|
|
except Exception as e:
|
|
print(f"Failed to upload segment {seg_num}: {e}", file=sys.stderr)
|
|
|
|
def _upload_new_segments(self):
|
|
"""Check for new segments and queue them for async IPFS upload."""
|
|
import sys
|
|
import time
|
|
|
|
# Find all segments
|
|
segments = sorted(self.output_dir.glob("segment_*.ts"))
|
|
|
|
for seg_path in segments:
|
|
# Extract segment number from filename
|
|
seg_name = seg_path.stem # segment_00000
|
|
seg_num = int(seg_name.split("_")[1])
|
|
|
|
# Skip if already uploaded or queued
|
|
with self._upload_lock:
|
|
if seg_num in self.segment_cids:
|
|
continue
|
|
|
|
# Skip if segment is still being written (quick non-blocking check)
|
|
try:
|
|
size1 = seg_path.stat().st_size
|
|
if size1 == 0:
|
|
continue # Empty file, still being created
|
|
|
|
time.sleep(0.01) # Very short check
|
|
size2 = seg_path.stat().st_size
|
|
if size1 != size2:
|
|
continue # File still being written
|
|
except FileNotFoundError:
|
|
continue
|
|
|
|
# Queue for async upload (non-blocking!)
|
|
self._upload_queue.put((seg_path, seg_num))
|
|
|
|
def _update_ipfs_playlist(self):
|
|
"""Generate and upload IPFS-aware m3u8 playlist."""
|
|
import sys
|
|
|
|
with self._upload_lock:
|
|
if not self.segment_cids:
|
|
return
|
|
|
|
# Build m3u8 content with IPFS URLs
|
|
lines = [
|
|
"#EXTM3U",
|
|
"#EXT-X-VERSION:3",
|
|
f"#EXT-X-TARGETDURATION:{int(self.segment_duration) + 1}",
|
|
"#EXT-X-MEDIA-SEQUENCE:0",
|
|
]
|
|
|
|
# Add segments in order
|
|
for seg_num in sorted(self.segment_cids.keys()):
|
|
cid = self.segment_cids[seg_num]
|
|
lines.append(f"#EXTINF:{self.segment_duration:.3f},")
|
|
lines.append(f"{self.ipfs_gateway}/{cid}")
|
|
|
|
playlist_content = "\n".join(lines) + "\n"
|
|
|
|
# Upload playlist to IPFS
|
|
cid = self._ipfs_add_bytes(playlist_content.encode("utf-8"), pin=True)
|
|
if cid:
|
|
self._playlist_cid = cid
|
|
print(f"IPFS: playlist updated -> {cid} ({len(self.segment_cids)} segments)", file=sys.stderr)
|
|
# Notify callback (e.g., to update database for live HLS redirect)
|
|
if self._on_playlist_update:
|
|
try:
|
|
self._on_playlist_update(cid)
|
|
except Exception as e:
|
|
print(f"IPFS: playlist callback error: {e}", file=sys.stderr)
|
|
|
|
def write(self, frame: np.ndarray, t: float):
|
|
"""Write frame to HLS stream and upload segments to IPFS."""
|
|
if not self._is_open or self._process.poll() is not None:
|
|
self._is_open = False
|
|
return
|
|
|
|
# Convert GPU array to numpy if needed
|
|
frame = ensure_numpy(frame)
|
|
|
|
# Resize if needed
|
|
if frame.shape[1] != self.size[0] or frame.shape[0] != self.size[1]:
|
|
import cv2
|
|
frame = cv2.resize(frame, self.size)
|
|
|
|
# Ensure correct format
|
|
if frame.dtype != np.uint8:
|
|
frame = np.clip(frame, 0, 255).astype(np.uint8)
|
|
if not frame.flags['C_CONTIGUOUS']:
|
|
frame = np.ascontiguousarray(frame)
|
|
|
|
try:
|
|
self._process.stdin.write(frame.tobytes())
|
|
except BrokenPipeError:
|
|
self._is_open = False
|
|
return
|
|
|
|
# Check for new segments periodically (every second)
|
|
current_segment = int(t / self.segment_duration)
|
|
if current_segment > self._last_segment_checked:
|
|
self._last_segment_checked = current_segment
|
|
self._upload_new_segments()
|
|
|
|
def close(self):
|
|
"""Close the HLS stream and finalize IPFS uploads."""
|
|
import sys
|
|
|
|
if self._process:
|
|
self._process.stdin.close()
|
|
self._process.wait()
|
|
self._is_open = False
|
|
|
|
# Queue any remaining segments
|
|
self._upload_new_segments()
|
|
|
|
# Wait for pending uploads to complete
|
|
self._upload_queue.put(None) # Signal shutdown
|
|
self._upload_thread.join(timeout=30)
|
|
|
|
# Generate final playlist with #EXT-X-ENDLIST
|
|
if self.segment_cids:
|
|
lines = [
|
|
"#EXTM3U",
|
|
"#EXT-X-VERSION:3",
|
|
f"#EXT-X-TARGETDURATION:{int(self.segment_duration) + 1}",
|
|
"#EXT-X-MEDIA-SEQUENCE:0",
|
|
"#EXT-X-PLAYLIST-TYPE:VOD",
|
|
]
|
|
|
|
for seg_num in sorted(self.segment_cids.keys()):
|
|
cid = self.segment_cids[seg_num]
|
|
lines.append(f"#EXTINF:{self.segment_duration:.3f},")
|
|
lines.append(f"{self.ipfs_gateway}/{cid}")
|
|
|
|
lines.append("#EXT-X-ENDLIST")
|
|
playlist_content = "\n".join(lines) + "\n"
|
|
|
|
cid = self._ipfs_add_bytes(playlist_content.encode("utf-8"), pin=True)
|
|
if cid:
|
|
self._playlist_cid = cid
|
|
print(f"IPFS: final playlist -> {cid} ({len(self.segment_cids)} segments)", file=sys.stderr)
|
|
|
|
@property
|
|
def playlist_cid(self) -> Optional[str]:
|
|
"""Get the current playlist CID."""
|
|
return self._playlist_cid
|
|
|
|
@property
|
|
def playlist_url(self) -> Optional[str]:
|
|
"""Get the full IPFS URL for the playlist."""
|
|
if self._playlist_cid:
|
|
return f"{self.ipfs_gateway}/{self._playlist_cid}"
|
|
return None
|
|
|
|
def get_playlist(self) -> str:
|
|
"""Get the current m3u8 playlist content with IPFS URLs."""
|
|
if not self.segment_cids:
|
|
return "#EXTM3U\n"
|
|
|
|
lines = [
|
|
"#EXTM3U",
|
|
"#EXT-X-VERSION:3",
|
|
f"#EXT-X-TARGETDURATION:{int(self.segment_duration) + 1}",
|
|
"#EXT-X-MEDIA-SEQUENCE:0",
|
|
]
|
|
|
|
for seg_num in sorted(self.segment_cids.keys()):
|
|
cid = self.segment_cids[seg_num]
|
|
lines.append(f"#EXTINF:{self.segment_duration:.3f},")
|
|
lines.append(f"{self.ipfs_gateway}/{cid}")
|
|
|
|
if not self._is_open:
|
|
lines.append("#EXT-X-ENDLIST")
|
|
|
|
return "\n".join(lines) + "\n"
|
|
|
|
@property
|
|
def is_open(self) -> bool:
|
|
return self._is_open and self._process.poll() is None |