""" Video and image sources with looping support. """ import numpy as np import subprocess import json from pathlib import Path from typing import Optional, Tuple from abc import ABC, abstractmethod class Source(ABC): """Abstract base class for frame sources.""" @abstractmethod def read_frame(self, t: float) -> np.ndarray: """Read frame at time t (with looping if needed).""" pass @property @abstractmethod def duration(self) -> float: """Source duration in seconds.""" pass @property @abstractmethod def size(self) -> Tuple[int, int]: """Frame size as (width, height).""" pass @property @abstractmethod def fps(self) -> float: """Frames per second.""" pass class VideoSource(Source): """ Video file source with automatic looping. Reads frames on-demand, seeking as needed. When time exceeds duration, wraps around (loops). """ def __init__(self, path: str, target_fps: float = 30): self.path = Path(path) self.target_fps = target_fps # Initialize decode state first (before _probe which could fail) self._process: Optional[subprocess.Popen] = None self._current_start: Optional[float] = None self._frame_buffer: Optional[np.ndarray] = None self._buffer_time: Optional[float] = None self._duration = None self._size = None self._fps = None if not self.path.exists(): raise FileNotFoundError(f"Video not found: {path}") self._probe() def _probe(self): """Get video metadata.""" cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", str(self.path) ] result = subprocess.run(cmd, capture_output=True, text=True) data = json.loads(result.stdout) # Get duration self._duration = float(data["format"]["duration"]) # Get video stream info for stream in data["streams"]: if stream["codec_type"] == "video": self._size = (int(stream["width"]), int(stream["height"])) # Parse fps from r_frame_rate (e.g., "30/1" or "30000/1001") fps_parts = stream.get("r_frame_rate", "30/1").split("/") self._fps = float(fps_parts[0]) / float(fps_parts[1]) break @property def duration(self) -> float: return self._duration @property def size(self) -> Tuple[int, int]: return self._size @property def fps(self) -> float: return self._fps def _start_decode(self, start_time: float): """Start ffmpeg decode process from given time.""" if self._process: try: self._process.stdout.close() except: pass self._process.terminate() try: self._process.wait(timeout=1) except: self._process.kill() self._process.wait() w, h = self._size cmd = [ "ffmpeg", "-v", "quiet", "-ss", str(start_time), "-i", str(self.path), "-f", "rawvideo", "-pix_fmt", "rgb24", "-r", str(self.target_fps), "-" ] self._process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=w * h * 3 * 4, # Buffer a few frames ) self._current_start = start_time self._buffer_time = start_time def read_frame(self, t: float) -> np.ndarray: """ Read frame at time t. If t exceeds duration, wraps around (loops). Seeks if needed, otherwise reads sequentially. """ # Wrap time for looping t_wrapped = t % self._duration # Check if we need to seek (loop point or large time jump) need_seek = ( self._process is None or self._buffer_time is None or abs(t_wrapped - self._buffer_time) > 1.0 / self.target_fps * 2 ) if need_seek: self._start_decode(t_wrapped) # Read frame w, h = self._size frame_size = w * h * 3 # Try to read with retries for seek settling for attempt in range(3): raw = self._process.stdout.read(frame_size) if len(raw) == frame_size: break # End of stream or seek not ready - restart from beginning self._start_decode(0) if len(raw) < frame_size: # Still no data - return last frame or black if self._frame_buffer is not None: return self._frame_buffer.copy() return np.zeros((h, w, 3), dtype=np.uint8) frame = np.frombuffer(raw, dtype=np.uint8).reshape((h, w, 3)) self._frame_buffer = frame # Cache for fallback self._buffer_time = t_wrapped + 1.0 / self.target_fps return frame def close(self): """Clean up resources.""" if self._process: self._process.terminate() self._process.wait() self._process = None def __del__(self): self.close() def __repr__(self): return f"VideoSource({self.path.name}, {self._size[0]}x{self._size[1]}, {self._duration:.1f}s)" class ImageSource(Source): """ Static image source (returns same frame for any time). Useful for backgrounds, overlays, etc. """ def __init__(self, path: str): self.path = Path(path) if not self.path.exists(): raise FileNotFoundError(f"Image not found: {path}") # Load image import cv2 self._frame = cv2.imread(str(self.path)) self._frame = cv2.cvtColor(self._frame, cv2.COLOR_BGR2RGB) self._size = (self._frame.shape[1], self._frame.shape[0]) @property def duration(self) -> float: return float('inf') # Images last forever @property def size(self) -> Tuple[int, int]: return self._size @property def fps(self) -> float: return 30.0 # Arbitrary def read_frame(self, t: float) -> np.ndarray: return self._frame.copy() def __repr__(self): return f"ImageSource({self.path.name}, {self._size[0]}x{self._size[1]})" class LiveSource(Source): """ Live video capture source (webcam, capture card, etc.). Time parameter is ignored - always returns latest frame. """ def __init__(self, device: int = 0, size: Tuple[int, int] = (1280, 720), fps: float = 30): import cv2 self._cap = cv2.VideoCapture(device) self._cap.set(cv2.CAP_PROP_FRAME_WIDTH, size[0]) self._cap.set(cv2.CAP_PROP_FRAME_HEIGHT, size[1]) self._cap.set(cv2.CAP_PROP_FPS, fps) # Get actual settings self._size = ( int(self._cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(self._cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) ) self._fps = self._cap.get(cv2.CAP_PROP_FPS) if not self._cap.isOpened(): raise RuntimeError(f"Could not open video device {device}") @property def duration(self) -> float: return float('inf') # Live - no duration @property def size(self) -> Tuple[int, int]: return self._size @property def fps(self) -> float: return self._fps def read_frame(self, t: float) -> np.ndarray: """Read latest frame (t is ignored for live sources).""" import cv2 ret, frame = self._cap.read() if not ret: return np.zeros((self._size[1], self._size[0], 3), dtype=np.uint8) return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) def close(self): self._cap.release() def __del__(self): self.close() def __repr__(self): return f"LiveSource({self._size[0]}x{self._size[1]}, {self._fps}fps)"