""" Output targets for streaming compositor. Supports: - Display window (preview) - File output (recording) - Stream output (RTMP, etc.) - future """ import numpy as np import subprocess from abc import ABC, abstractmethod from typing import Tuple, Optional from pathlib import Path class Output(ABC): """Abstract base class for output targets.""" @abstractmethod def write(self, frame: np.ndarray, t: float): """Write a frame to the output.""" pass @abstractmethod def close(self): """Close the output and clean up resources.""" pass @property @abstractmethod def is_open(self) -> bool: """Check if output is still open/valid.""" pass class DisplayOutput(Output): """ Display frames using mpv (handles Wayland properly). Useful for live preview. Press 'q' to quit. """ def __init__(self, title: str = "Streaming Preview", size: Tuple[int, int] = None, audio_source: str = None, fps: float = 30): self.title = title self.size = size self.audio_source = audio_source self.fps = fps self._is_open = True self._process = None self._audio_process = None def _start_mpv(self, frame_size: Tuple[int, int]): """Start mpv process for display.""" import sys w, h = frame_size cmd = [ "mpv", "--no-cache", "--demuxer=rawvideo", f"--demuxer-rawvideo-w={w}", f"--demuxer-rawvideo-h={h}", "--demuxer-rawvideo-mp-format=rgb24", f"--demuxer-rawvideo-fps={self.fps}", f"--title={self.title}", "-", ] print(f"Starting mpv: {' '.join(cmd)}", file=sys.stderr) self._process = subprocess.Popen( cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE, ) # Start audio playback if we have an audio source if self.audio_source: audio_cmd = [ "ffplay", "-nodisp", "-autoexit", "-loglevel", "quiet", str(self.audio_source) ] print(f"Starting audio: {self.audio_source}", file=sys.stderr) self._audio_process = subprocess.Popen( audio_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) def write(self, frame: np.ndarray, t: float): """Display frame.""" if not self._is_open: return # Ensure frame is correct format if frame.dtype != np.uint8: frame = np.clip(frame, 0, 255).astype(np.uint8) if not frame.flags['C_CONTIGUOUS']: frame = np.ascontiguousarray(frame) # Start mpv on first frame if self._process is None: self._start_mpv((frame.shape[1], frame.shape[0])) # Check if mpv is still running if self._process.poll() is not None: self._is_open = False return try: self._process.stdin.write(frame.tobytes()) self._process.stdin.flush() # Prevent buffering except BrokenPipeError: self._is_open = False def close(self): """Close the display and audio.""" if self._process: try: self._process.stdin.close() except: pass self._process.terminate() self._process.wait() if self._audio_process: self._audio_process.terminate() self._audio_process.wait() self._is_open = False @property def is_open(self) -> bool: if self._process and self._process.poll() is not None: self._is_open = False return self._is_open class FileOutput(Output): """ Write frames to a video file using ffmpeg. """ def __init__( self, path: str, size: Tuple[int, int], fps: float = 30, codec: str = "libx264", crf: int = 18, preset: str = "fast", audio_source: str = None, ): self.path = Path(path) self.size = size self.fps = fps self._is_open = True # Build ffmpeg command cmd = [ "ffmpeg", "-y", "-f", "rawvideo", "-vcodec", "rawvideo", "-pix_fmt", "rgb24", "-s", f"{size[0]}x{size[1]}", "-r", str(fps), "-i", "-", ] # Add audio input if provided if audio_source: cmd.extend(["-i", str(audio_source)]) # Explicitly map: video from input 0 (rawvideo), audio from input 1 cmd.extend(["-map", "0:v", "-map", "1:a"]) cmd.extend([ "-c:v", codec, "-preset", preset, "-crf", str(crf), "-pix_fmt", "yuv420p", ]) # Add audio codec if we have audio if audio_source: cmd.extend(["-c:a", "aac", "-b:a", "192k", "-shortest"]) # Use fragmented mp4 for streamable output while writing if str(self.path).endswith('.mp4'): cmd.extend(["-movflags", "frag_keyframe+empty_moov+default_base_moof"]) cmd.append(str(self.path)) import sys print(f"FileOutput cmd: {' '.join(cmd)}", file=sys.stderr) self._process = subprocess.Popen( cmd, stdin=subprocess.PIPE, stderr=None, # Show errors for debugging ) def write(self, frame: np.ndarray, t: float): """Write frame to video file.""" if not self._is_open or self._process.poll() is not None: self._is_open = False return # Resize if needed if frame.shape[1] != self.size[0] or frame.shape[0] != self.size[1]: import cv2 frame = cv2.resize(frame, self.size) try: self._process.stdin.write(frame.tobytes()) except BrokenPipeError: self._is_open = False def close(self): """Close the video file.""" if self._process: self._process.stdin.close() self._process.wait() self._is_open = False @property def is_open(self) -> bool: return self._is_open and self._process.poll() is None class MultiOutput(Output): """ Write to multiple outputs simultaneously. Useful for recording while showing preview. """ def __init__(self, outputs: list): self.outputs = outputs def write(self, frame: np.ndarray, t: float): for output in self.outputs: if output.is_open: output.write(frame, t) def close(self): for output in self.outputs: output.close() @property def is_open(self) -> bool: return any(o.is_open for o in self.outputs) class NullOutput(Output): """ Discard frames (for benchmarking). """ def __init__(self): self._is_open = True self.frame_count = 0 def write(self, frame: np.ndarray, t: float): self.frame_count += 1 def close(self): self._is_open = False @property def is_open(self) -> bool: return self._is_open class PipeOutput(Output): """ Pipe frames directly to mpv. Launches mpv with rawvideo demuxer and writes frames to stdin. """ def __init__(self, size: Tuple[int, int], fps: float = 30, audio_source: str = None): self.size = size self.fps = fps self.audio_source = audio_source self._is_open = True self._process = None self._audio_process = None self._started = False def _start(self): """Start mpv and audio on first frame.""" if self._started: return self._started = True import sys w, h = self.size # Start mpv cmd = [ "mpv", "--no-cache", "--demuxer=rawvideo", f"--demuxer-rawvideo-w={w}", f"--demuxer-rawvideo-h={h}", "--demuxer-rawvideo-mp-format=rgb24", f"--demuxer-rawvideo-fps={self.fps}", "--title=Streaming", "-" ] print(f"Starting mpv: {w}x{h} @ {self.fps}fps", file=sys.stderr) self._process = subprocess.Popen( cmd, stdin=subprocess.PIPE, stderr=subprocess.DEVNULL, ) # Start audio if self.audio_source: audio_cmd = [ "ffplay", "-nodisp", "-autoexit", "-loglevel", "quiet", str(self.audio_source) ] print(f"Starting audio: {self.audio_source}", file=sys.stderr) self._audio_process = subprocess.Popen( audio_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) def write(self, frame: np.ndarray, t: float): """Write frame to mpv.""" if not self._is_open: return self._start() # Check mpv still running if self._process.poll() is not None: self._is_open = False return # Resize if needed if frame.shape[1] != self.size[0] or frame.shape[0] != self.size[1]: import cv2 frame = cv2.resize(frame, self.size) # Ensure correct format if frame.dtype != np.uint8: frame = np.clip(frame, 0, 255).astype(np.uint8) if not frame.flags['C_CONTIGUOUS']: frame = np.ascontiguousarray(frame) try: self._process.stdin.write(frame.tobytes()) self._process.stdin.flush() except BrokenPipeError: self._is_open = False def close(self): """Close mpv and audio.""" if self._process: try: self._process.stdin.close() except: pass self._process.terminate() self._process.wait() if self._audio_process: self._audio_process.terminate() self._audio_process.wait() self._is_open = False @property def is_open(self) -> bool: if self._process and self._process.poll() is not None: self._is_open = False return self._is_open