Merges full history from art-dag/mono.git into the monorepo under the artdag/ directory. Contains: core (DAG engine), l1 (Celery rendering server), l2 (ActivityPub registry), common (shared templates/middleware), client (CLI), test (e2e). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> git-subtree-dir: artdag git-subtree-mainline:1a179de547git-subtree-split:4c2e716558
596 lines
22 KiB
Python
596 lines
22 KiB
Python
"""
|
|
Streaming video compositor.
|
|
|
|
Main entry point for the streaming pipeline. Combines:
|
|
- Multiple video sources (with looping)
|
|
- Per-source effect chains
|
|
- Layer compositing
|
|
- Optional live audio analysis
|
|
- Output to display/file/stream
|
|
"""
|
|
|
|
import time
|
|
import sys
|
|
import numpy as np
|
|
from typing import List, Dict, Any, Optional, Union
|
|
from pathlib import Path
|
|
|
|
from .sources import Source, VideoSource
|
|
from .backends import Backend, NumpyBackend, get_backend
|
|
from .output import Output, DisplayOutput, FileOutput, MultiOutput
|
|
|
|
|
|
class StreamingCompositor:
|
|
"""
|
|
Real-time streaming video compositor.
|
|
|
|
Reads frames from multiple sources, applies effects, composites layers,
|
|
and outputs the result - all frame-by-frame without intermediate files.
|
|
|
|
Example:
|
|
compositor = StreamingCompositor(
|
|
sources=["video1.mp4", "video2.mp4"],
|
|
effects_per_source=[
|
|
[{"effect": "rotate", "angle": 45}],
|
|
[{"effect": "zoom", "amount": 1.5}],
|
|
],
|
|
compositor_config={"mode": "alpha", "weights": [0.5, 0.5]},
|
|
)
|
|
compositor.run(output="preview", duration=60)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
sources: List[Union[str, Source]],
|
|
effects_per_source: List[List[Dict]] = None,
|
|
compositor_config: Dict = None,
|
|
analysis_data: Dict = None,
|
|
backend: str = "numpy",
|
|
recipe_dir: Path = None,
|
|
fps: float = 30,
|
|
audio_source: str = None,
|
|
):
|
|
"""
|
|
Initialize the streaming compositor.
|
|
|
|
Args:
|
|
sources: List of video paths or Source objects
|
|
effects_per_source: List of effect chains, one per source
|
|
compositor_config: How to blend layers (mode, weights)
|
|
analysis_data: Pre-computed analysis data for bindings
|
|
backend: "numpy" or "glsl"
|
|
recipe_dir: Directory for resolving relative effect paths
|
|
fps: Output frame rate
|
|
audio_source: Path to audio file for streaming analysis
|
|
"""
|
|
self.fps = fps
|
|
self.recipe_dir = recipe_dir or Path(".")
|
|
self.analysis_data = analysis_data or {}
|
|
|
|
# Initialize streaming audio analyzer if audio source provided
|
|
self._audio_analyzer = None
|
|
self._audio_source = audio_source
|
|
if audio_source:
|
|
from .audio import StreamingAudioAnalyzer
|
|
self._audio_analyzer = StreamingAudioAnalyzer(audio_source)
|
|
print(f"Streaming audio: {audio_source}", file=sys.stderr)
|
|
|
|
# Initialize sources
|
|
self.sources: List[Source] = []
|
|
for src in sources:
|
|
if isinstance(src, Source):
|
|
self.sources.append(src)
|
|
elif isinstance(src, (str, Path)):
|
|
self.sources.append(VideoSource(str(src), target_fps=fps))
|
|
else:
|
|
raise ValueError(f"Unknown source type: {type(src)}")
|
|
|
|
# Effect chains (default: no effects)
|
|
self.effects_per_source = effects_per_source or [[] for _ in self.sources]
|
|
if len(self.effects_per_source) != len(self.sources):
|
|
raise ValueError(
|
|
f"effects_per_source length ({len(self.effects_per_source)}) "
|
|
f"must match sources length ({len(self.sources)})"
|
|
)
|
|
|
|
# Compositor config (default: equal blend)
|
|
self.compositor_config = compositor_config or {
|
|
"mode": "alpha",
|
|
"weights": [1.0 / len(self.sources)] * len(self.sources),
|
|
}
|
|
|
|
# Initialize backend
|
|
self.backend: Backend = get_backend(
|
|
backend,
|
|
recipe_dir=self.recipe_dir,
|
|
)
|
|
|
|
# Load effects
|
|
self._load_effects()
|
|
|
|
def _load_effects(self):
|
|
"""Pre-load all effect definitions."""
|
|
for effects in self.effects_per_source:
|
|
for effect_config in effects:
|
|
effect_path = effect_config.get("effect_path")
|
|
if effect_path:
|
|
full_path = self.recipe_dir / effect_path
|
|
if full_path.exists():
|
|
self.backend.load_effect(full_path)
|
|
|
|
def _create_output(
|
|
self,
|
|
output: Union[str, Output],
|
|
size: tuple,
|
|
) -> Output:
|
|
"""Create output target from string or Output object."""
|
|
if isinstance(output, Output):
|
|
return output
|
|
|
|
if output == "preview":
|
|
return DisplayOutput("Streaming Preview", size,
|
|
audio_source=self._audio_source, fps=self.fps)
|
|
elif output == "null":
|
|
from .output import NullOutput
|
|
return NullOutput()
|
|
elif isinstance(output, str):
|
|
return FileOutput(output, size, fps=self.fps, audio_source=self._audio_source)
|
|
else:
|
|
raise ValueError(f"Unknown output type: {output}")
|
|
|
|
def run(
|
|
self,
|
|
output: Union[str, Output] = "preview",
|
|
duration: float = None,
|
|
audio_analyzer=None,
|
|
show_fps: bool = True,
|
|
recipe_executor=None,
|
|
):
|
|
"""
|
|
Run the streaming compositor.
|
|
|
|
Args:
|
|
output: Output target - "preview", filename, or Output object
|
|
duration: Duration in seconds (None = run until quit)
|
|
audio_analyzer: Optional AudioAnalyzer for live audio reactivity
|
|
show_fps: Show FPS counter in console
|
|
recipe_executor: Optional StreamingRecipeExecutor for full recipe logic
|
|
"""
|
|
# Determine output size from first source
|
|
output_size = self.sources[0].size
|
|
|
|
# Create output
|
|
out = self._create_output(output, output_size)
|
|
|
|
# Determine duration
|
|
if duration is None:
|
|
# Run until stopped (or min source duration if not looping)
|
|
duration = min(s.duration for s in self.sources)
|
|
if duration == float('inf'):
|
|
duration = 3600 # 1 hour max for live sources
|
|
|
|
total_frames = int(duration * self.fps)
|
|
frame_time = 1.0 / self.fps
|
|
|
|
print(f"Streaming: {len(self.sources)} sources -> {output}", file=sys.stderr)
|
|
print(f"Duration: {duration:.1f}s, {total_frames} frames @ {self.fps}fps", file=sys.stderr)
|
|
print(f"Output size: {output_size[0]}x{output_size[1]}", file=sys.stderr)
|
|
print(f"Press 'q' to quit (if preview)", file=sys.stderr)
|
|
|
|
# Frame loop
|
|
start_time = time.time()
|
|
frame_count = 0
|
|
fps_update_interval = 30 # Update FPS display every N frames
|
|
last_fps_time = start_time
|
|
last_fps_count = 0
|
|
|
|
try:
|
|
for frame_num in range(total_frames):
|
|
if not out.is_open:
|
|
print(f"\nOutput closed at frame {frame_num}", file=sys.stderr)
|
|
break
|
|
|
|
t = frame_num * frame_time
|
|
|
|
try:
|
|
# Update analysis data from streaming audio (file-based)
|
|
energy = 0.0
|
|
is_beat = False
|
|
if self._audio_analyzer:
|
|
self._update_from_audio(self._audio_analyzer, t)
|
|
energy = self.analysis_data.get("live_energy", {}).get("values", [0])[0]
|
|
is_beat = self.analysis_data.get("live_beat", {}).get("values", [0])[0] > 0.5
|
|
elif audio_analyzer:
|
|
self._update_from_audio(audio_analyzer, t)
|
|
energy = self.analysis_data.get("live_energy", {}).get("values", [0])[0]
|
|
is_beat = self.analysis_data.get("live_beat", {}).get("values", [0])[0] > 0.5
|
|
|
|
# Read frames from all sources
|
|
frames = [src.read_frame(t) for src in self.sources]
|
|
|
|
# Process through recipe executor if provided
|
|
if recipe_executor:
|
|
result = self._process_with_executor(
|
|
frames, recipe_executor, energy, is_beat, t
|
|
)
|
|
else:
|
|
# Simple backend processing
|
|
result = self.backend.process_frame(
|
|
frames,
|
|
self.effects_per_source,
|
|
self.compositor_config,
|
|
t,
|
|
self.analysis_data,
|
|
)
|
|
|
|
# Output
|
|
out.write(result, t)
|
|
frame_count += 1
|
|
|
|
# FPS display
|
|
if show_fps and frame_count % fps_update_interval == 0:
|
|
now = time.time()
|
|
elapsed = now - last_fps_time
|
|
if elapsed > 0:
|
|
current_fps = (frame_count - last_fps_count) / elapsed
|
|
progress = frame_num / total_frames * 100
|
|
print(
|
|
f"\r {progress:5.1f}% | {current_fps:5.1f} fps | "
|
|
f"frame {frame_num}/{total_frames}",
|
|
end="", file=sys.stderr
|
|
)
|
|
last_fps_time = now
|
|
last_fps_count = frame_count
|
|
|
|
except Exception as e:
|
|
print(f"\nError at frame {frame_num}, t={t:.1f}s: {e}", file=sys.stderr)
|
|
import traceback
|
|
traceback.print_exc()
|
|
break
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nInterrupted", file=sys.stderr)
|
|
finally:
|
|
out.close()
|
|
for src in self.sources:
|
|
if hasattr(src, 'close'):
|
|
src.close()
|
|
|
|
# Final stats
|
|
elapsed = time.time() - start_time
|
|
avg_fps = frame_count / elapsed if elapsed > 0 else 0
|
|
print(f"\nCompleted: {frame_count} frames in {elapsed:.1f}s ({avg_fps:.1f} fps avg)", file=sys.stderr)
|
|
|
|
def _process_with_executor(
|
|
self,
|
|
frames: List[np.ndarray],
|
|
executor,
|
|
energy: float,
|
|
is_beat: bool,
|
|
t: float,
|
|
) -> np.ndarray:
|
|
"""
|
|
Process frames using the recipe executor for full pipeline.
|
|
|
|
Implements:
|
|
1. process-pair: two clips per source with effects, blended
|
|
2. cycle-crossfade: dynamic composition with zoom and weights
|
|
3. Final effects: whole-spin, ripple
|
|
"""
|
|
import cv2
|
|
|
|
# Target size from first source
|
|
target_h, target_w = frames[0].shape[:2]
|
|
|
|
# Resize all frames to target size (letterbox to preserve aspect ratio)
|
|
resized_frames = []
|
|
for frame in frames:
|
|
fh, fw = frame.shape[:2]
|
|
if (fh, fw) != (target_h, target_w):
|
|
# Calculate scale to fit while preserving aspect ratio
|
|
scale = min(target_w / fw, target_h / fh)
|
|
new_w, new_h = int(fw * scale), int(fh * scale)
|
|
resized = cv2.resize(frame, (new_w, new_h))
|
|
# Center on black canvas
|
|
canvas = np.zeros((target_h, target_w, 3), dtype=np.uint8)
|
|
x_off = (target_w - new_w) // 2
|
|
y_off = (target_h - new_h) // 2
|
|
canvas[y_off:y_off+new_h, x_off:x_off+new_w] = resized
|
|
resized_frames.append(canvas)
|
|
else:
|
|
resized_frames.append(frame)
|
|
frames = resized_frames
|
|
|
|
# Update executor state
|
|
executor.on_frame(energy, is_beat, t)
|
|
|
|
# Get weights to know which sources are active
|
|
weights = executor.get_cycle_weights()
|
|
|
|
# Process each source as a "pair" (clip A and B with different effects)
|
|
processed_pairs = []
|
|
|
|
for i, frame in enumerate(frames):
|
|
# Skip sources with zero weight (but still need placeholder)
|
|
if i < len(weights) and weights[i] < 0.001:
|
|
processed_pairs.append(None)
|
|
continue
|
|
# Get effect params for clip A and B
|
|
params_a = executor.get_effect_params(i, "a", energy)
|
|
params_b = executor.get_effect_params(i, "b", energy)
|
|
pair_params = executor.get_pair_params(i)
|
|
|
|
# Process clip A
|
|
clip_a = self._apply_clip_effects(frame.copy(), params_a, t)
|
|
|
|
# Process clip B
|
|
clip_b = self._apply_clip_effects(frame.copy(), params_b, t)
|
|
|
|
# Blend A and B using pair_mix opacity
|
|
opacity = pair_params["blend_opacity"]
|
|
blended = cv2.addWeighted(
|
|
clip_a, 1 - opacity,
|
|
clip_b, opacity,
|
|
0
|
|
)
|
|
|
|
# Apply pair rotation
|
|
h, w = blended.shape[:2]
|
|
center = (w // 2, h // 2)
|
|
angle = pair_params["pair_rotation"]
|
|
if abs(angle) > 0.5:
|
|
matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
|
|
blended = cv2.warpAffine(blended, matrix, (w, h))
|
|
|
|
processed_pairs.append(blended)
|
|
|
|
# Cycle-crossfade composition
|
|
weights = executor.get_cycle_weights()
|
|
zooms = executor.get_cycle_zooms()
|
|
|
|
# Apply zoom per pair and composite
|
|
h, w = target_h, target_w
|
|
result = np.zeros((h, w, 3), dtype=np.float32)
|
|
|
|
for idx, (pair, weight, zoom) in enumerate(zip(processed_pairs, weights, zooms)):
|
|
# Skip zero-weight sources
|
|
if pair is None or weight < 0.001:
|
|
continue
|
|
|
|
orig_shape = pair.shape
|
|
|
|
# Apply zoom
|
|
if zoom > 1.01:
|
|
# Zoom in: crop center and resize up
|
|
new_w, new_h = int(w / zoom), int(h / zoom)
|
|
if new_w > 0 and new_h > 0:
|
|
x1, y1 = (w - new_w) // 2, (h - new_h) // 2
|
|
cropped = pair[y1:y1+new_h, x1:x1+new_w]
|
|
pair = cv2.resize(cropped, (w, h))
|
|
elif zoom < 0.99:
|
|
# Zoom out: shrink video and center on black
|
|
scaled_w, scaled_h = int(w * zoom), int(h * zoom)
|
|
if scaled_w > 0 and scaled_h > 0:
|
|
shrunk = cv2.resize(pair, (scaled_w, scaled_h))
|
|
canvas = np.zeros((h, w, 3), dtype=np.uint8)
|
|
x_off, y_off = (w - scaled_w) // 2, (h - scaled_h) // 2
|
|
canvas[y_off:y_off+scaled_h, x_off:x_off+scaled_w] = shrunk
|
|
pair = canvas.copy()
|
|
|
|
# Draw colored border - size indicates zoom level
|
|
border_colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0)]
|
|
color = border_colors[idx % 4]
|
|
thickness = max(3, int(10 * weight)) # Thicker border = higher weight
|
|
pair = np.ascontiguousarray(pair)
|
|
pair[:thickness, :] = color
|
|
pair[-thickness:, :] = color
|
|
pair[:, :thickness] = color
|
|
pair[:, -thickness:] = color
|
|
|
|
result += pair.astype(np.float32) * weight
|
|
|
|
result = np.clip(result, 0, 255).astype(np.uint8)
|
|
|
|
# Apply final effects (whole-spin, ripple)
|
|
final_params = executor.get_final_effects(energy)
|
|
|
|
# Whole spin
|
|
spin_angle = final_params["whole_spin_angle"]
|
|
if abs(spin_angle) > 0.5:
|
|
center = (w // 2, h // 2)
|
|
matrix = cv2.getRotationMatrix2D(center, spin_angle, 1.0)
|
|
result = cv2.warpAffine(result, matrix, (w, h))
|
|
|
|
# Ripple effect
|
|
amp = final_params["ripple_amplitude"]
|
|
if amp > 1:
|
|
result = self._apply_ripple(result, amp,
|
|
final_params["ripple_cx"],
|
|
final_params["ripple_cy"],
|
|
t)
|
|
|
|
return result
|
|
|
|
def _apply_clip_effects(self, frame: np.ndarray, params: dict, t: float) -> np.ndarray:
|
|
"""Apply per-clip effects: rotate, zoom, invert, hue_shift, ascii."""
|
|
import cv2
|
|
|
|
h, w = frame.shape[:2]
|
|
|
|
# Rotate
|
|
angle = params["rotate_angle"]
|
|
if abs(angle) > 0.5:
|
|
center = (w // 2, h // 2)
|
|
matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
|
|
frame = cv2.warpAffine(frame, matrix, (w, h))
|
|
|
|
# Zoom
|
|
zoom = params["zoom_amount"]
|
|
if abs(zoom - 1.0) > 0.01:
|
|
new_w, new_h = int(w / zoom), int(h / zoom)
|
|
if new_w > 0 and new_h > 0:
|
|
x1, y1 = (w - new_w) // 2, (h - new_h) // 2
|
|
x1, y1 = max(0, x1), max(0, y1)
|
|
x2, y2 = min(w, x1 + new_w), min(h, y1 + new_h)
|
|
if x2 > x1 and y2 > y1:
|
|
cropped = frame[y1:y2, x1:x2]
|
|
frame = cv2.resize(cropped, (w, h))
|
|
|
|
# Invert
|
|
if params["invert_amount"] > 0.5:
|
|
frame = 255 - frame
|
|
|
|
# Hue shift
|
|
hue_deg = params["hue_degrees"]
|
|
if abs(hue_deg) > 1:
|
|
hsv = cv2.cvtColor(frame, cv2.COLOR_RGB2HSV)
|
|
hsv[:, :, 0] = (hsv[:, :, 0].astype(np.int32) + int(hue_deg / 2)) % 180
|
|
frame = cv2.cvtColor(hsv, cv2.COLOR_HSV2RGB)
|
|
|
|
# ASCII art
|
|
if params["ascii_mix"] > 0.5:
|
|
char_size = max(4, int(params["ascii_char_size"]))
|
|
frame = self._apply_ascii(frame, char_size)
|
|
|
|
return frame
|
|
|
|
def _apply_ascii(self, frame: np.ndarray, char_size: int) -> np.ndarray:
|
|
"""Apply ASCII art effect."""
|
|
import cv2
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
|
|
h, w = frame.shape[:2]
|
|
chars = " .:-=+*#%@"
|
|
|
|
# Get font
|
|
try:
|
|
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf", char_size)
|
|
except:
|
|
font = ImageFont.load_default()
|
|
|
|
# Sample cells using area interpolation (fast block average)
|
|
rows = h // char_size
|
|
cols = w // char_size
|
|
if rows < 1 or cols < 1:
|
|
return frame
|
|
|
|
# Crop to exact grid and downsample
|
|
cropped = frame[:rows * char_size, :cols * char_size]
|
|
cell_colors = cv2.resize(cropped, (cols, rows), interpolation=cv2.INTER_AREA)
|
|
|
|
# Compute luminance
|
|
luminances = (0.299 * cell_colors[:, :, 0] +
|
|
0.587 * cell_colors[:, :, 1] +
|
|
0.114 * cell_colors[:, :, 2]) / 255.0
|
|
|
|
# Create output image
|
|
out_h = rows * char_size
|
|
out_w = cols * char_size
|
|
output = Image.new('RGB', (out_w, out_h), (0, 0, 0))
|
|
draw = ImageDraw.Draw(output)
|
|
|
|
# Draw characters
|
|
for r in range(rows):
|
|
for c in range(cols):
|
|
lum = luminances[r, c]
|
|
color = tuple(cell_colors[r, c])
|
|
|
|
# Map luminance to character
|
|
idx = int(lum * (len(chars) - 1))
|
|
char = chars[idx]
|
|
|
|
# Draw character
|
|
x = c * char_size
|
|
y = r * char_size
|
|
draw.text((x, y), char, fill=color, font=font)
|
|
|
|
# Convert back to numpy and resize to original
|
|
result = np.array(output)
|
|
if result.shape[:2] != (h, w):
|
|
result = cv2.resize(result, (w, h), interpolation=cv2.INTER_LINEAR)
|
|
|
|
return result
|
|
|
|
def _apply_ripple(self, frame: np.ndarray, amplitude: float,
|
|
cx: float, cy: float, t: float = 0) -> np.ndarray:
|
|
"""Apply ripple distortion effect."""
|
|
import cv2
|
|
|
|
h, w = frame.shape[:2]
|
|
center_x, center_y = cx * w, cy * h
|
|
max_dim = max(w, h)
|
|
|
|
# Create coordinate grids
|
|
y_coords, x_coords = np.mgrid[0:h, 0:w].astype(np.float32)
|
|
|
|
# Distance from center
|
|
dx = x_coords - center_x
|
|
dy = y_coords - center_y
|
|
dist = np.sqrt(dx*dx + dy*dy)
|
|
|
|
# Ripple parameters (matching recipe: frequency=8, decay=2, speed=5)
|
|
freq = 8
|
|
decay = 2
|
|
speed = 5
|
|
phase = t * speed * 2 * np.pi
|
|
|
|
# Ripple displacement (matching original formula)
|
|
ripple = np.sin(2 * np.pi * freq * dist / max_dim + phase) * amplitude
|
|
|
|
# Apply decay
|
|
if decay > 0:
|
|
ripple = ripple * np.exp(-dist * decay / max_dim)
|
|
|
|
# Displace along radial direction
|
|
with np.errstate(divide='ignore', invalid='ignore'):
|
|
norm_dx = np.where(dist > 0, dx / dist, 0)
|
|
norm_dy = np.where(dist > 0, dy / dist, 0)
|
|
|
|
map_x = (x_coords + ripple * norm_dx).astype(np.float32)
|
|
map_y = (y_coords + ripple * norm_dy).astype(np.float32)
|
|
|
|
return cv2.remap(frame, map_x, map_y, cv2.INTER_LINEAR,
|
|
borderMode=cv2.BORDER_REFLECT)
|
|
|
|
def _update_from_audio(self, analyzer, t: float):
|
|
"""Update analysis data from audio analyzer (streaming or live)."""
|
|
# Set time for file-based streaming analyzers
|
|
if hasattr(analyzer, 'set_time'):
|
|
analyzer.set_time(t)
|
|
|
|
# Get current audio features
|
|
energy = analyzer.get_energy() if hasattr(analyzer, 'get_energy') else 0
|
|
beat = analyzer.get_beat() if hasattr(analyzer, 'get_beat') else False
|
|
|
|
# Update analysis tracks - these can be referenced by effect bindings
|
|
self.analysis_data["live_energy"] = {
|
|
"times": [t],
|
|
"values": [energy],
|
|
"duration": float('inf'),
|
|
}
|
|
self.analysis_data["live_beat"] = {
|
|
"times": [t],
|
|
"values": [1.0 if beat else 0.0],
|
|
"duration": float('inf'),
|
|
}
|
|
|
|
|
|
def quick_preview(
|
|
sources: List[str],
|
|
effects: List[List[Dict]] = None,
|
|
duration: float = 10,
|
|
fps: float = 30,
|
|
):
|
|
"""
|
|
Quick preview helper - show sources with optional effects.
|
|
|
|
Example:
|
|
quick_preview(["video1.mp4", "video2.mp4"], duration=30)
|
|
"""
|
|
compositor = StreamingCompositor(
|
|
sources=sources,
|
|
effects_per_source=effects,
|
|
fps=fps,
|
|
)
|
|
compositor.run(output="preview", duration=duration)
|