- Add stream_sexp_generic.py: fully generic sexp interpreter - Add streaming primitives for video sources and audio analysis - Add config system for external sources and audio files - Add templates for reusable scans and macros - Fix video/audio stream mapping in file output - Add dynamic source cycling based on sources array length - Remove old Python effect files (migrated to sexp) - Update sexp effects to use namespaced primitives Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
847 lines
29 KiB
Python
847 lines
29 KiB
Python
"""
|
|
Streaming pipeline executor.
|
|
|
|
Directly executes compiled sexp recipes frame-by-frame.
|
|
No adapter layer - frames and analysis flow through the DAG.
|
|
"""
|
|
|
|
import sys
|
|
import time
|
|
import numpy as np
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional
|
|
from dataclasses import dataclass, field
|
|
|
|
from .sources import VideoSource
|
|
from .audio import StreamingAudioAnalyzer
|
|
from .output import DisplayOutput, FileOutput
|
|
from .sexp_interp import SexpInterpreter
|
|
|
|
|
|
@dataclass
|
|
class FrameContext:
|
|
"""Context passed through the pipeline for each frame."""
|
|
t: float # Current time
|
|
energy: float = 0.0
|
|
is_beat: bool = False
|
|
beat_count: int = 0
|
|
analysis: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
class StreamingPipeline:
|
|
"""
|
|
Executes a compiled sexp recipe as a streaming pipeline.
|
|
|
|
Frames flow through the DAG directly - no adapter needed.
|
|
Each node is evaluated lazily when its output is requested.
|
|
"""
|
|
|
|
def __init__(self, compiled_recipe, recipe_dir: Path = None, fps: float = 30, seed: int = 42,
|
|
output_size: tuple = None):
|
|
self.recipe = compiled_recipe
|
|
self.recipe_dir = recipe_dir or Path(".")
|
|
self.fps = fps
|
|
self.seed = seed
|
|
|
|
# Build node lookup
|
|
self.nodes = {n['id']: n for n in compiled_recipe.nodes}
|
|
|
|
# Runtime state
|
|
self.sources: Dict[str, VideoSource] = {}
|
|
self.audio_analyzer: Optional[StreamingAudioAnalyzer] = None
|
|
self.audio_source_path: Optional[str] = None
|
|
|
|
# Sexp interpreter for expressions
|
|
self.interp = SexpInterpreter()
|
|
|
|
# Scan state (node_id -> current value)
|
|
self.scan_state: Dict[str, Any] = {}
|
|
self.scan_emit: Dict[str, Any] = {}
|
|
|
|
# SLICE_ON state
|
|
self.slice_on_acc: Dict[str, Any] = {}
|
|
self.slice_on_result: Dict[str, Any] = {}
|
|
|
|
# Frame cache for current timestep (cleared each frame)
|
|
self._frame_cache: Dict[str, np.ndarray] = {}
|
|
|
|
# Context for current frame
|
|
self.ctx = FrameContext(t=0.0)
|
|
|
|
# Output size (w, h) - set after sources are initialized
|
|
self._output_size = output_size
|
|
|
|
# Initialize
|
|
self._init_sources()
|
|
self._init_scans()
|
|
self._init_slice_on()
|
|
|
|
# Set output size from first source if not specified
|
|
if self._output_size is None and self.sources:
|
|
first_source = next(iter(self.sources.values()))
|
|
self._output_size = first_source._size
|
|
|
|
def _init_sources(self):
|
|
"""Initialize video and audio sources."""
|
|
for node in self.recipe.nodes:
|
|
if node.get('type') == 'SOURCE':
|
|
config = node.get('config', {})
|
|
path = config.get('path')
|
|
if path:
|
|
full_path = (self.recipe_dir / path).resolve()
|
|
suffix = full_path.suffix.lower()
|
|
|
|
if suffix in ('.mp4', '.webm', '.mov', '.avi', '.mkv'):
|
|
if not full_path.exists():
|
|
print(f"Warning: video not found: {full_path}", file=sys.stderr)
|
|
continue
|
|
self.sources[node['id']] = VideoSource(
|
|
str(full_path),
|
|
target_fps=self.fps
|
|
)
|
|
elif suffix in ('.mp3', '.wav', '.flac', '.ogg', '.m4a', '.aac'):
|
|
if not full_path.exists():
|
|
print(f"Warning: audio not found: {full_path}", file=sys.stderr)
|
|
continue
|
|
self.audio_source_path = str(full_path)
|
|
self.audio_analyzer = StreamingAudioAnalyzer(str(full_path))
|
|
|
|
def _init_scans(self):
|
|
"""Initialize scan nodes with their initial state."""
|
|
import random
|
|
seed_offset = 0
|
|
|
|
for node in self.recipe.nodes:
|
|
if node.get('type') == 'SCAN':
|
|
config = node.get('config', {})
|
|
|
|
# Create RNG for this scan
|
|
scan_seed = config.get('seed', self.seed + seed_offset)
|
|
rng = random.Random(scan_seed)
|
|
seed_offset += 1
|
|
|
|
# Evaluate initial value
|
|
init_expr = config.get('init', 0)
|
|
init_value = self.interp.eval(init_expr, {})
|
|
|
|
self.scan_state[node['id']] = {
|
|
'value': init_value,
|
|
'rng': rng,
|
|
'config': config,
|
|
}
|
|
|
|
# Compute initial emit
|
|
self._update_scan_emit(node['id'])
|
|
|
|
def _update_scan_emit(self, node_id: str):
|
|
"""Update the emit value for a scan."""
|
|
state = self.scan_state[node_id]
|
|
config = state['config']
|
|
emit_expr = config.get('emit_expr', config.get('emit', None))
|
|
|
|
if emit_expr is None:
|
|
# No emit expression - emit the value directly
|
|
self.scan_emit[node_id] = state['value']
|
|
return
|
|
|
|
# Build environment from state
|
|
env = {}
|
|
if isinstance(state['value'], dict):
|
|
env.update(state['value'])
|
|
else:
|
|
env['acc'] = state['value']
|
|
|
|
env['beat_count'] = self.ctx.beat_count
|
|
env['time'] = self.ctx.t
|
|
|
|
# Set RNG for interpreter
|
|
self.interp.rng = state['rng']
|
|
|
|
self.scan_emit[node_id] = self.interp.eval(emit_expr, env)
|
|
|
|
def _step_scan(self, node_id: str):
|
|
"""Step a scan forward on beat."""
|
|
state = self.scan_state[node_id]
|
|
config = state['config']
|
|
step_expr = config.get('step_expr', config.get('step', None))
|
|
|
|
if step_expr is None:
|
|
return
|
|
|
|
# Build environment
|
|
env = {}
|
|
if isinstance(state['value'], dict):
|
|
env.update(state['value'])
|
|
else:
|
|
env['acc'] = state['value']
|
|
|
|
env['beat_count'] = self.ctx.beat_count
|
|
env['time'] = self.ctx.t
|
|
|
|
# Set RNG
|
|
self.interp.rng = state['rng']
|
|
|
|
# Evaluate step
|
|
new_value = self.interp.eval(step_expr, env)
|
|
state['value'] = new_value
|
|
|
|
# Update emit
|
|
self._update_scan_emit(node_id)
|
|
|
|
def _init_slice_on(self):
|
|
"""Initialize SLICE_ON nodes."""
|
|
for node in self.recipe.nodes:
|
|
if node.get('type') == 'SLICE_ON':
|
|
config = node.get('config', {})
|
|
init = config.get('init', {})
|
|
self.slice_on_acc[node['id']] = dict(init)
|
|
|
|
# Evaluate initial state
|
|
self._eval_slice_on(node['id'])
|
|
|
|
def _eval_slice_on(self, node_id: str):
|
|
"""Evaluate a SLICE_ON node's Lambda."""
|
|
node = self.nodes[node_id]
|
|
config = node.get('config', {})
|
|
fn = config.get('fn')
|
|
videos = config.get('videos', [])
|
|
|
|
if not fn:
|
|
return
|
|
|
|
acc = self.slice_on_acc[node_id]
|
|
n_videos = len(videos)
|
|
|
|
# Set up environment
|
|
self.interp.globals['videos'] = list(range(n_videos))
|
|
|
|
try:
|
|
from .sexp_interp import eval_slice_on_lambda
|
|
result = eval_slice_on_lambda(
|
|
fn, acc, self.ctx.beat_count, 0, 1,
|
|
list(range(n_videos)), self.interp
|
|
)
|
|
self.slice_on_result[node_id] = result
|
|
|
|
# Update accumulator
|
|
if 'acc' in result:
|
|
self.slice_on_acc[node_id] = result['acc']
|
|
except Exception as e:
|
|
print(f"SLICE_ON eval error: {e}", file=sys.stderr)
|
|
|
|
def _on_beat(self):
|
|
"""Called when a beat is detected."""
|
|
self.ctx.beat_count += 1
|
|
|
|
# Step all scans
|
|
for node_id in self.scan_state:
|
|
self._step_scan(node_id)
|
|
|
|
# Step all SLICE_ON nodes
|
|
for node_id in self.slice_on_acc:
|
|
self._eval_slice_on(node_id)
|
|
|
|
def _get_frame(self, node_id: str) -> Optional[np.ndarray]:
|
|
"""
|
|
Get the output frame for a node at current time.
|
|
|
|
Recursively evaluates inputs as needed.
|
|
Results are cached for the current timestep.
|
|
"""
|
|
if node_id in self._frame_cache:
|
|
return self._frame_cache[node_id]
|
|
|
|
node = self.nodes.get(node_id)
|
|
if not node:
|
|
return None
|
|
|
|
node_type = node.get('type')
|
|
|
|
if node_type == 'SOURCE':
|
|
frame = self._eval_source(node)
|
|
elif node_type == 'SEGMENT':
|
|
frame = self._eval_segment(node)
|
|
elif node_type == 'EFFECT':
|
|
frame = self._eval_effect(node)
|
|
elif node_type == 'SLICE_ON':
|
|
frame = self._eval_slice_on_frame(node)
|
|
else:
|
|
# Unknown node type - try to pass through input
|
|
inputs = node.get('inputs', [])
|
|
frame = self._get_frame(inputs[0]) if inputs else None
|
|
|
|
self._frame_cache[node_id] = frame
|
|
return frame
|
|
|
|
def _eval_source(self, node: dict) -> Optional[np.ndarray]:
|
|
"""Evaluate a SOURCE node."""
|
|
source = self.sources.get(node['id'])
|
|
if source:
|
|
return source.read_frame(self.ctx.t)
|
|
return None
|
|
|
|
def _eval_segment(self, node: dict) -> Optional[np.ndarray]:
|
|
"""Evaluate a SEGMENT node (time segment of source)."""
|
|
inputs = node.get('inputs', [])
|
|
if not inputs:
|
|
return None
|
|
|
|
config = node.get('config', {})
|
|
start = config.get('start', 0)
|
|
duration = config.get('duration')
|
|
|
|
# Resolve any bindings
|
|
if isinstance(start, dict):
|
|
start = self._resolve_binding(start) if start.get('_binding') else 0
|
|
if isinstance(duration, dict):
|
|
duration = self._resolve_binding(duration) if duration.get('_binding') else None
|
|
|
|
# Adjust time for segment
|
|
t_local = self.ctx.t + (start if isinstance(start, (int, float)) else 0)
|
|
if duration and isinstance(duration, (int, float)):
|
|
t_local = t_local % duration # Loop within segment
|
|
|
|
# Get source frame at adjusted time
|
|
source_id = inputs[0]
|
|
source = self.sources.get(source_id)
|
|
if source:
|
|
return source.read_frame(t_local)
|
|
|
|
return self._get_frame(source_id)
|
|
|
|
def _eval_effect(self, node: dict) -> Optional[np.ndarray]:
|
|
"""Evaluate an EFFECT node."""
|
|
import cv2
|
|
|
|
inputs = node.get('inputs', [])
|
|
config = node.get('config', {})
|
|
effect_name = config.get('effect')
|
|
|
|
# Get input frame(s)
|
|
input_frames = [self._get_frame(inp) for inp in inputs]
|
|
input_frames = [f for f in input_frames if f is not None]
|
|
|
|
if not input_frames:
|
|
return None
|
|
|
|
frame = input_frames[0]
|
|
|
|
# Resolve bindings in config
|
|
params = self._resolve_config(config)
|
|
|
|
# Apply effect based on name
|
|
if effect_name == 'rotate':
|
|
angle = params.get('angle', 0)
|
|
if abs(angle) > 0.5:
|
|
h, w = frame.shape[:2]
|
|
center = (w // 2, h // 2)
|
|
matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
|
|
frame = cv2.warpAffine(frame, matrix, (w, h))
|
|
|
|
elif effect_name == 'zoom':
|
|
amount = params.get('amount', 1.0)
|
|
if abs(amount - 1.0) > 0.01:
|
|
frame = self._apply_zoom(frame, amount)
|
|
|
|
elif effect_name == 'invert':
|
|
amount = params.get('amount', 0)
|
|
if amount > 0.01:
|
|
inverted = 255 - frame
|
|
frame = cv2.addWeighted(frame, 1 - amount, inverted, amount, 0)
|
|
|
|
elif effect_name == 'hue_shift':
|
|
degrees = params.get('degrees', 0)
|
|
if abs(degrees) > 1:
|
|
hsv = cv2.cvtColor(frame, cv2.COLOR_RGB2HSV)
|
|
hsv[:, :, 0] = (hsv[:, :, 0].astype(int) + int(degrees / 2)) % 180
|
|
frame = cv2.cvtColor(hsv, cv2.COLOR_HSV2RGB)
|
|
|
|
elif effect_name == 'blend':
|
|
if len(input_frames) >= 2:
|
|
opacity = params.get('opacity', 0.5)
|
|
frame = cv2.addWeighted(input_frames[0], 1 - opacity,
|
|
input_frames[1], opacity, 0)
|
|
|
|
elif effect_name == 'blend_multi':
|
|
weights = params.get('weights', [])
|
|
if len(input_frames) > 1 and weights:
|
|
h, w = input_frames[0].shape[:2]
|
|
result = np.zeros((h, w, 3), dtype=np.float32)
|
|
for f, wt in zip(input_frames, weights):
|
|
if f is not None and wt > 0.001:
|
|
if f.shape[:2] != (h, w):
|
|
f = cv2.resize(f, (w, h))
|
|
result += f.astype(np.float32) * wt
|
|
frame = np.clip(result, 0, 255).astype(np.uint8)
|
|
|
|
elif effect_name == 'ripple':
|
|
amp = params.get('amplitude', 0)
|
|
if amp > 1:
|
|
frame = self._apply_ripple(frame, amp,
|
|
params.get('center_x', 0.5),
|
|
params.get('center_y', 0.5),
|
|
params.get('frequency', 8),
|
|
params.get('decay', 2),
|
|
params.get('speed', 5))
|
|
|
|
return frame
|
|
|
|
def _eval_slice_on_frame(self, node: dict) -> Optional[np.ndarray]:
|
|
"""Evaluate a SLICE_ON node - returns composited frame."""
|
|
import cv2
|
|
|
|
config = node.get('config', {})
|
|
video_ids = config.get('videos', [])
|
|
result = self.slice_on_result.get(node['id'], {})
|
|
|
|
if not result:
|
|
# No result yet - return first video
|
|
if video_ids:
|
|
return self._get_frame(video_ids[0])
|
|
return None
|
|
|
|
# Get layers and compose info
|
|
layers = result.get('layers', [])
|
|
compose = result.get('compose', {})
|
|
weights = compose.get('weights', [])
|
|
|
|
if not layers or not weights:
|
|
if video_ids:
|
|
return self._get_frame(video_ids[0])
|
|
return None
|
|
|
|
# Get frames for each layer
|
|
frames = []
|
|
for i, layer in enumerate(layers):
|
|
video_idx = layer.get('video', i)
|
|
if video_idx < len(video_ids):
|
|
frame = self._get_frame(video_ids[video_idx])
|
|
|
|
# Apply layer effects (zoom)
|
|
effects = layer.get('effects', [])
|
|
for eff in effects:
|
|
eff_name = eff.get('effect')
|
|
if hasattr(eff_name, 'name'):
|
|
eff_name = eff_name.name
|
|
if eff_name == 'zoom':
|
|
zoom_amt = eff.get('amount', 1.0)
|
|
if frame is not None:
|
|
frame = self._apply_zoom(frame, zoom_amt)
|
|
|
|
frames.append(frame)
|
|
else:
|
|
frames.append(None)
|
|
|
|
# Composite with weights - use consistent output size
|
|
if self._output_size:
|
|
w, h = self._output_size
|
|
else:
|
|
# Fallback to first non-None frame size
|
|
for f in frames:
|
|
if f is not None:
|
|
h, w = f.shape[:2]
|
|
break
|
|
else:
|
|
return None
|
|
|
|
output = np.zeros((h, w, 3), dtype=np.float32)
|
|
|
|
for frame, weight in zip(frames, weights):
|
|
if frame is None or weight < 0.001:
|
|
continue
|
|
|
|
# Resize to output size
|
|
if frame.shape[1] != w or frame.shape[0] != h:
|
|
frame = cv2.resize(frame, (w, h))
|
|
|
|
output += frame.astype(np.float32) * weight
|
|
|
|
# Normalize weights
|
|
total_weight = sum(wt for wt in weights if wt > 0.001)
|
|
if total_weight > 0 and abs(total_weight - 1.0) > 0.01:
|
|
output /= total_weight
|
|
|
|
return np.clip(output, 0, 255).astype(np.uint8)
|
|
|
|
def _resolve_config(self, config: dict) -> dict:
|
|
"""Resolve bindings in effect config to actual values."""
|
|
resolved = {}
|
|
|
|
for key, value in config.items():
|
|
if key in ('effect', 'effect_path', 'effect_cid', 'effects_registry',
|
|
'analysis_refs', 'inputs', 'cid'):
|
|
continue
|
|
|
|
if isinstance(value, dict) and value.get('_binding'):
|
|
resolved[key] = self._resolve_binding(value)
|
|
elif isinstance(value, dict) and value.get('_expr'):
|
|
resolved[key] = self._resolve_expr(value)
|
|
else:
|
|
resolved[key] = value
|
|
|
|
return resolved
|
|
|
|
def _resolve_binding(self, binding: dict) -> Any:
|
|
"""Resolve a binding to its current value."""
|
|
source_id = binding.get('source')
|
|
feature = binding.get('feature', 'values')
|
|
range_map = binding.get('range')
|
|
|
|
# Get raw value from scan or analysis
|
|
if source_id in self.scan_emit:
|
|
value = self.scan_emit[source_id]
|
|
elif source_id in self.ctx.analysis:
|
|
data = self.ctx.analysis[source_id]
|
|
value = data.get(feature, data.get('values', [0]))[0] if isinstance(data, dict) else data
|
|
else:
|
|
# Fallback to energy
|
|
value = self.ctx.energy
|
|
|
|
# Extract feature from dict
|
|
if isinstance(value, dict) and feature in value:
|
|
value = value[feature]
|
|
|
|
# Apply range mapping
|
|
if range_map and isinstance(value, (int, float)):
|
|
lo, hi = range_map
|
|
value = lo + value * (hi - lo)
|
|
|
|
return value
|
|
|
|
def _resolve_expr(self, expr: dict) -> Any:
|
|
"""Resolve a compiled expression."""
|
|
env = {
|
|
'energy': self.ctx.energy,
|
|
'beat_count': self.ctx.beat_count,
|
|
't': self.ctx.t,
|
|
}
|
|
|
|
# Add scan values
|
|
for scan_id, value in self.scan_emit.items():
|
|
# Use short form if available
|
|
env[scan_id] = value
|
|
|
|
# Extract the actual expression from _expr wrapper
|
|
actual_expr = expr.get('_expr', expr)
|
|
return self.interp.eval(actual_expr, env)
|
|
|
|
def _apply_zoom(self, frame: np.ndarray, amount: float) -> np.ndarray:
|
|
"""Apply zoom to frame."""
|
|
import cv2
|
|
h, w = frame.shape[:2]
|
|
|
|
if amount > 1.01:
|
|
# Zoom in: crop center
|
|
new_w, new_h = int(w / amount), int(h / amount)
|
|
if new_w > 0 and new_h > 0:
|
|
x1, y1 = (w - new_w) // 2, (h - new_h) // 2
|
|
cropped = frame[y1:y1+new_h, x1:x1+new_w]
|
|
return cv2.resize(cropped, (w, h))
|
|
elif amount < 0.99:
|
|
# Zoom out: shrink and center
|
|
scaled_w, scaled_h = int(w * amount), int(h * amount)
|
|
if scaled_w > 0 and scaled_h > 0:
|
|
shrunk = cv2.resize(frame, (scaled_w, scaled_h))
|
|
canvas = np.zeros((h, w, 3), dtype=np.uint8)
|
|
x_off, y_off = (w - scaled_w) // 2, (h - scaled_h) // 2
|
|
canvas[y_off:y_off+scaled_h, x_off:x_off+scaled_w] = shrunk
|
|
return canvas
|
|
|
|
return frame
|
|
|
|
def _apply_ripple(self, frame: np.ndarray, amplitude: float,
|
|
cx: float, cy: float, frequency: float,
|
|
decay: float, speed: float) -> np.ndarray:
|
|
"""Apply ripple effect."""
|
|
import cv2
|
|
h, w = frame.shape[:2]
|
|
|
|
# Create coordinate grids
|
|
y_coords, x_coords = np.mgrid[0:h, 0:w].astype(np.float32)
|
|
|
|
# Normalize to center
|
|
center_x, center_y = w * cx, h * cy
|
|
dx = x_coords - center_x
|
|
dy = y_coords - center_y
|
|
dist = np.sqrt(dx**2 + dy**2)
|
|
|
|
# Ripple displacement
|
|
phase = self.ctx.t * speed
|
|
ripple = amplitude * np.sin(dist / frequency - phase) * np.exp(-dist * decay / max(w, h))
|
|
|
|
# Displace coordinates
|
|
angle = np.arctan2(dy, dx)
|
|
map_x = (x_coords + ripple * np.cos(angle)).astype(np.float32)
|
|
map_y = (y_coords + ripple * np.sin(angle)).astype(np.float32)
|
|
|
|
return cv2.remap(frame, map_x, map_y, cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT)
|
|
|
|
def _find_output_node(self) -> Optional[str]:
|
|
"""Find the final output node (MUX or last EFFECT)."""
|
|
# Look for MUX node
|
|
for node in self.recipe.nodes:
|
|
if node.get('type') == 'MUX':
|
|
return node['id']
|
|
|
|
# Otherwise find last EFFECT after SLICE_ON
|
|
last_effect = None
|
|
found_slice_on = False
|
|
for node in self.recipe.nodes:
|
|
if node.get('type') == 'SLICE_ON':
|
|
found_slice_on = True
|
|
elif node.get('type') == 'EFFECT' and found_slice_on:
|
|
last_effect = node['id']
|
|
|
|
return last_effect
|
|
|
|
def render_frame(self, t: float) -> Optional[np.ndarray]:
|
|
"""Render a single frame at time t."""
|
|
# Clear frame cache
|
|
self._frame_cache.clear()
|
|
|
|
# Update context
|
|
self.ctx.t = t
|
|
|
|
# Update audio analysis
|
|
if self.audio_analyzer:
|
|
self.audio_analyzer.set_time(t)
|
|
energy = self.audio_analyzer.get_energy()
|
|
is_beat = self.audio_analyzer.get_beat()
|
|
|
|
# Beat edge detection
|
|
was_beat = self.ctx.is_beat
|
|
self.ctx.energy = energy
|
|
self.ctx.is_beat = is_beat
|
|
|
|
if is_beat and not was_beat:
|
|
self._on_beat()
|
|
|
|
# Store in analysis dict
|
|
self.ctx.analysis['live_energy'] = {'values': [energy]}
|
|
self.ctx.analysis['live_beat'] = {'values': [1.0 if is_beat else 0.0]}
|
|
|
|
# Find output node and render
|
|
output_node = self._find_output_node()
|
|
if output_node:
|
|
frame = self._get_frame(output_node)
|
|
# Normalize to output size
|
|
if frame is not None and self._output_size:
|
|
w, h = self._output_size
|
|
if frame.shape[1] != w or frame.shape[0] != h:
|
|
import cv2
|
|
frame = cv2.resize(frame, (w, h))
|
|
return frame
|
|
|
|
return None
|
|
|
|
def run(self, output: str = "preview", duration: float = None):
|
|
"""
|
|
Run the pipeline.
|
|
|
|
Args:
|
|
output: "preview", filename, or Output object
|
|
duration: Duration in seconds (default: audio duration or 60s)
|
|
"""
|
|
# Determine duration
|
|
if duration is None:
|
|
if self.audio_analyzer:
|
|
duration = self.audio_analyzer.duration
|
|
else:
|
|
duration = 60.0
|
|
|
|
# Create output
|
|
if output == "preview":
|
|
# Get frame size from first source
|
|
first_source = next(iter(self.sources.values()), None)
|
|
if first_source:
|
|
w, h = first_source._size
|
|
else:
|
|
w, h = 720, 720
|
|
out = DisplayOutput(size=(w, h), fps=self.fps, audio_source=self.audio_source_path)
|
|
elif isinstance(output, str):
|
|
first_source = next(iter(self.sources.values()), None)
|
|
if first_source:
|
|
w, h = first_source._size
|
|
else:
|
|
w, h = 720, 720
|
|
out = FileOutput(output, size=(w, h), fps=self.fps, audio_source=self.audio_source_path)
|
|
else:
|
|
out = output
|
|
|
|
frame_time = 1.0 / self.fps
|
|
n_frames = int(duration * self.fps)
|
|
|
|
print(f"Streaming: {len(self.sources)} sources -> {output}", file=sys.stderr)
|
|
print(f"Duration: {duration:.1f}s, {n_frames} frames @ {self.fps}fps", file=sys.stderr)
|
|
|
|
start_time = time.time()
|
|
frame_count = 0
|
|
|
|
try:
|
|
for frame_num in range(n_frames):
|
|
t = frame_num * frame_time
|
|
|
|
frame = self.render_frame(t)
|
|
|
|
if frame is not None:
|
|
out.write(frame, t)
|
|
frame_count += 1
|
|
|
|
# Progress
|
|
if frame_num % 50 == 0:
|
|
elapsed = time.time() - start_time
|
|
fps = frame_count / elapsed if elapsed > 0 else 0
|
|
pct = 100 * frame_num / n_frames
|
|
print(f"\r{pct:5.1f}% | {fps:5.1f} fps | frame {frame_num}/{n_frames}",
|
|
end="", file=sys.stderr)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nInterrupted", file=sys.stderr)
|
|
finally:
|
|
out.close()
|
|
for src in self.sources.values():
|
|
src.close()
|
|
|
|
elapsed = time.time() - start_time
|
|
avg_fps = frame_count / elapsed if elapsed > 0 else 0
|
|
print(f"\nCompleted: {frame_count} frames in {elapsed:.1f}s ({avg_fps:.1f} fps avg)",
|
|
file=sys.stderr)
|
|
|
|
|
|
def run_pipeline(recipe_path: str, output: str = "preview",
|
|
duration: float = None, fps: float = None):
|
|
"""
|
|
Run a recipe through the streaming pipeline.
|
|
|
|
No adapter layer - directly executes the compiled recipe.
|
|
"""
|
|
from pathlib import Path
|
|
|
|
# Add artdag to path
|
|
import sys
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "artdag"))
|
|
|
|
from artdag.sexp.compiler import compile_string
|
|
|
|
recipe_path = Path(recipe_path)
|
|
recipe_text = recipe_path.read_text()
|
|
compiled = compile_string(recipe_text, {}, recipe_dir=recipe_path.parent)
|
|
|
|
pipeline = StreamingPipeline(
|
|
compiled,
|
|
recipe_dir=recipe_path.parent,
|
|
fps=fps or compiled.encoding.get('fps', 30),
|
|
)
|
|
|
|
pipeline.run(output=output, duration=duration)
|
|
|
|
|
|
def run_pipeline_piped(recipe_path: str, duration: float = None, fps: float = None):
|
|
"""
|
|
Run pipeline and pipe directly to mpv with audio.
|
|
"""
|
|
import subprocess
|
|
from pathlib import Path
|
|
import sys
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "artdag"))
|
|
from artdag.sexp.compiler import compile_string
|
|
|
|
recipe_path = Path(recipe_path)
|
|
recipe_text = recipe_path.read_text()
|
|
compiled = compile_string(recipe_text, {}, recipe_dir=recipe_path.parent)
|
|
|
|
pipeline = StreamingPipeline(
|
|
compiled,
|
|
recipe_dir=recipe_path.parent,
|
|
fps=fps or compiled.encoding.get('fps', 30),
|
|
)
|
|
|
|
# Get frame info
|
|
first_source = next(iter(pipeline.sources.values()), None)
|
|
if first_source:
|
|
w, h = first_source._size
|
|
else:
|
|
w, h = 720, 720
|
|
|
|
# Determine duration
|
|
if duration is None:
|
|
if pipeline.audio_analyzer:
|
|
duration = pipeline.audio_analyzer.duration
|
|
else:
|
|
duration = 60.0
|
|
|
|
actual_fps = fps or compiled.encoding.get('fps', 30)
|
|
n_frames = int(duration * actual_fps)
|
|
frame_time = 1.0 / actual_fps
|
|
|
|
print(f"Streaming {n_frames} frames @ {actual_fps}fps to mpv", file=sys.stderr)
|
|
|
|
# Start mpv
|
|
mpv_cmd = [
|
|
"mpv", "--no-cache",
|
|
"--demuxer=rawvideo",
|
|
f"--demuxer-rawvideo-w={w}",
|
|
f"--demuxer-rawvideo-h={h}",
|
|
"--demuxer-rawvideo-mp-format=rgb24",
|
|
f"--demuxer-rawvideo-fps={actual_fps}",
|
|
"--title=Streaming Pipeline",
|
|
"-"
|
|
]
|
|
mpv = subprocess.Popen(mpv_cmd, stdin=subprocess.PIPE, stderr=subprocess.DEVNULL)
|
|
|
|
# Start audio if available
|
|
audio_proc = None
|
|
if pipeline.audio_source_path:
|
|
audio_cmd = ["ffplay", "-nodisp", "-autoexit", "-loglevel", "quiet",
|
|
pipeline.audio_source_path]
|
|
audio_proc = subprocess.Popen(audio_cmd, stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL)
|
|
|
|
try:
|
|
import cv2
|
|
for frame_num in range(n_frames):
|
|
if mpv.poll() is not None:
|
|
break # mpv closed
|
|
|
|
t = frame_num * frame_time
|
|
frame = pipeline.render_frame(t)
|
|
if frame is not None:
|
|
# Ensure consistent frame size
|
|
if frame.shape[1] != w or frame.shape[0] != h:
|
|
frame = cv2.resize(frame, (w, h))
|
|
if not frame.flags['C_CONTIGUOUS']:
|
|
frame = np.ascontiguousarray(frame)
|
|
try:
|
|
mpv.stdin.write(frame.tobytes())
|
|
mpv.stdin.flush()
|
|
except BrokenPipeError:
|
|
break
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
if mpv.stdin:
|
|
mpv.stdin.close()
|
|
mpv.terminate()
|
|
if audio_proc:
|
|
audio_proc.terminate()
|
|
for src in pipeline.sources.values():
|
|
src.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
parser = argparse.ArgumentParser(description="Run sexp recipe through streaming pipeline")
|
|
parser.add_argument("recipe", help="Path to .sexp recipe file")
|
|
parser.add_argument("-o", "--output", default="pipe",
|
|
help="Output: 'pipe' (mpv), 'preview', or filename (default: pipe)")
|
|
parser.add_argument("-d", "--duration", type=float, default=None,
|
|
help="Duration in seconds (default: audio duration)")
|
|
parser.add_argument("--fps", type=float, default=None,
|
|
help="Frame rate (default: from recipe)")
|
|
args = parser.parse_args()
|
|
|
|
if args.output == "pipe":
|
|
run_pipeline_piped(args.recipe, duration=args.duration, fps=args.fps)
|
|
else:
|
|
run_pipeline(args.recipe, output=args.output, duration=args.duration, fps=args.fps)
|