Files
rose-ash/streaming/pipeline.py
giles c590f2e039 Squashed 'test/' content from commit f2edc20
git-subtree-dir: test
git-subtree-split: f2edc20cba865a6ef67ca807c2ed6cee8e6c2836
2026-02-24 23:10:04 +00:00

847 lines
29 KiB
Python

"""
Streaming pipeline executor.
Directly executes compiled sexp recipes frame-by-frame.
No adapter layer - frames and analysis flow through the DAG.
"""
import sys
import time
import numpy as np
from pathlib import Path
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from .sources import VideoSource
from .audio import StreamingAudioAnalyzer
from .output import DisplayOutput, FileOutput
from .sexp_interp import SexpInterpreter
@dataclass
class FrameContext:
"""Context passed through the pipeline for each frame."""
t: float # Current time
energy: float = 0.0
is_beat: bool = False
beat_count: int = 0
analysis: Dict[str, Any] = field(default_factory=dict)
class StreamingPipeline:
"""
Executes a compiled sexp recipe as a streaming pipeline.
Frames flow through the DAG directly - no adapter needed.
Each node is evaluated lazily when its output is requested.
"""
def __init__(self, compiled_recipe, recipe_dir: Path = None, fps: float = 30, seed: int = 42,
output_size: tuple = None):
self.recipe = compiled_recipe
self.recipe_dir = recipe_dir or Path(".")
self.fps = fps
self.seed = seed
# Build node lookup
self.nodes = {n['id']: n for n in compiled_recipe.nodes}
# Runtime state
self.sources: Dict[str, VideoSource] = {}
self.audio_analyzer: Optional[StreamingAudioAnalyzer] = None
self.audio_source_path: Optional[str] = None
# Sexp interpreter for expressions
self.interp = SexpInterpreter()
# Scan state (node_id -> current value)
self.scan_state: Dict[str, Any] = {}
self.scan_emit: Dict[str, Any] = {}
# SLICE_ON state
self.slice_on_acc: Dict[str, Any] = {}
self.slice_on_result: Dict[str, Any] = {}
# Frame cache for current timestep (cleared each frame)
self._frame_cache: Dict[str, np.ndarray] = {}
# Context for current frame
self.ctx = FrameContext(t=0.0)
# Output size (w, h) - set after sources are initialized
self._output_size = output_size
# Initialize
self._init_sources()
self._init_scans()
self._init_slice_on()
# Set output size from first source if not specified
if self._output_size is None and self.sources:
first_source = next(iter(self.sources.values()))
self._output_size = first_source._size
def _init_sources(self):
"""Initialize video and audio sources."""
for node in self.recipe.nodes:
if node.get('type') == 'SOURCE':
config = node.get('config', {})
path = config.get('path')
if path:
full_path = (self.recipe_dir / path).resolve()
suffix = full_path.suffix.lower()
if suffix in ('.mp4', '.webm', '.mov', '.avi', '.mkv'):
if not full_path.exists():
print(f"Warning: video not found: {full_path}", file=sys.stderr)
continue
self.sources[node['id']] = VideoSource(
str(full_path),
target_fps=self.fps
)
elif suffix in ('.mp3', '.wav', '.flac', '.ogg', '.m4a', '.aac'):
if not full_path.exists():
print(f"Warning: audio not found: {full_path}", file=sys.stderr)
continue
self.audio_source_path = str(full_path)
self.audio_analyzer = StreamingAudioAnalyzer(str(full_path))
def _init_scans(self):
"""Initialize scan nodes with their initial state."""
import random
seed_offset = 0
for node in self.recipe.nodes:
if node.get('type') == 'SCAN':
config = node.get('config', {})
# Create RNG for this scan
scan_seed = config.get('seed', self.seed + seed_offset)
rng = random.Random(scan_seed)
seed_offset += 1
# Evaluate initial value
init_expr = config.get('init', 0)
init_value = self.interp.eval(init_expr, {})
self.scan_state[node['id']] = {
'value': init_value,
'rng': rng,
'config': config,
}
# Compute initial emit
self._update_scan_emit(node['id'])
def _update_scan_emit(self, node_id: str):
"""Update the emit value for a scan."""
state = self.scan_state[node_id]
config = state['config']
emit_expr = config.get('emit_expr', config.get('emit', None))
if emit_expr is None:
# No emit expression - emit the value directly
self.scan_emit[node_id] = state['value']
return
# Build environment from state
env = {}
if isinstance(state['value'], dict):
env.update(state['value'])
else:
env['acc'] = state['value']
env['beat_count'] = self.ctx.beat_count
env['time'] = self.ctx.t
# Set RNG for interpreter
self.interp.rng = state['rng']
self.scan_emit[node_id] = self.interp.eval(emit_expr, env)
def _step_scan(self, node_id: str):
"""Step a scan forward on beat."""
state = self.scan_state[node_id]
config = state['config']
step_expr = config.get('step_expr', config.get('step', None))
if step_expr is None:
return
# Build environment
env = {}
if isinstance(state['value'], dict):
env.update(state['value'])
else:
env['acc'] = state['value']
env['beat_count'] = self.ctx.beat_count
env['time'] = self.ctx.t
# Set RNG
self.interp.rng = state['rng']
# Evaluate step
new_value = self.interp.eval(step_expr, env)
state['value'] = new_value
# Update emit
self._update_scan_emit(node_id)
def _init_slice_on(self):
"""Initialize SLICE_ON nodes."""
for node in self.recipe.nodes:
if node.get('type') == 'SLICE_ON':
config = node.get('config', {})
init = config.get('init', {})
self.slice_on_acc[node['id']] = dict(init)
# Evaluate initial state
self._eval_slice_on(node['id'])
def _eval_slice_on(self, node_id: str):
"""Evaluate a SLICE_ON node's Lambda."""
node = self.nodes[node_id]
config = node.get('config', {})
fn = config.get('fn')
videos = config.get('videos', [])
if not fn:
return
acc = self.slice_on_acc[node_id]
n_videos = len(videos)
# Set up environment
self.interp.globals['videos'] = list(range(n_videos))
try:
from .sexp_interp import eval_slice_on_lambda
result = eval_slice_on_lambda(
fn, acc, self.ctx.beat_count, 0, 1,
list(range(n_videos)), self.interp
)
self.slice_on_result[node_id] = result
# Update accumulator
if 'acc' in result:
self.slice_on_acc[node_id] = result['acc']
except Exception as e:
print(f"SLICE_ON eval error: {e}", file=sys.stderr)
def _on_beat(self):
"""Called when a beat is detected."""
self.ctx.beat_count += 1
# Step all scans
for node_id in self.scan_state:
self._step_scan(node_id)
# Step all SLICE_ON nodes
for node_id in self.slice_on_acc:
self._eval_slice_on(node_id)
def _get_frame(self, node_id: str) -> Optional[np.ndarray]:
"""
Get the output frame for a node at current time.
Recursively evaluates inputs as needed.
Results are cached for the current timestep.
"""
if node_id in self._frame_cache:
return self._frame_cache[node_id]
node = self.nodes.get(node_id)
if not node:
return None
node_type = node.get('type')
if node_type == 'SOURCE':
frame = self._eval_source(node)
elif node_type == 'SEGMENT':
frame = self._eval_segment(node)
elif node_type == 'EFFECT':
frame = self._eval_effect(node)
elif node_type == 'SLICE_ON':
frame = self._eval_slice_on_frame(node)
else:
# Unknown node type - try to pass through input
inputs = node.get('inputs', [])
frame = self._get_frame(inputs[0]) if inputs else None
self._frame_cache[node_id] = frame
return frame
def _eval_source(self, node: dict) -> Optional[np.ndarray]:
"""Evaluate a SOURCE node."""
source = self.sources.get(node['id'])
if source:
return source.read_frame(self.ctx.t)
return None
def _eval_segment(self, node: dict) -> Optional[np.ndarray]:
"""Evaluate a SEGMENT node (time segment of source)."""
inputs = node.get('inputs', [])
if not inputs:
return None
config = node.get('config', {})
start = config.get('start', 0)
duration = config.get('duration')
# Resolve any bindings
if isinstance(start, dict):
start = self._resolve_binding(start) if start.get('_binding') else 0
if isinstance(duration, dict):
duration = self._resolve_binding(duration) if duration.get('_binding') else None
# Adjust time for segment
t_local = self.ctx.t + (start if isinstance(start, (int, float)) else 0)
if duration and isinstance(duration, (int, float)):
t_local = t_local % duration # Loop within segment
# Get source frame at adjusted time
source_id = inputs[0]
source = self.sources.get(source_id)
if source:
return source.read_frame(t_local)
return self._get_frame(source_id)
def _eval_effect(self, node: dict) -> Optional[np.ndarray]:
"""Evaluate an EFFECT node."""
import cv2
inputs = node.get('inputs', [])
config = node.get('config', {})
effect_name = config.get('effect')
# Get input frame(s)
input_frames = [self._get_frame(inp) for inp in inputs]
input_frames = [f for f in input_frames if f is not None]
if not input_frames:
return None
frame = input_frames[0]
# Resolve bindings in config
params = self._resolve_config(config)
# Apply effect based on name
if effect_name == 'rotate':
angle = params.get('angle', 0)
if abs(angle) > 0.5:
h, w = frame.shape[:2]
center = (w // 2, h // 2)
matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
frame = cv2.warpAffine(frame, matrix, (w, h))
elif effect_name == 'zoom':
amount = params.get('amount', 1.0)
if abs(amount - 1.0) > 0.01:
frame = self._apply_zoom(frame, amount)
elif effect_name == 'invert':
amount = params.get('amount', 0)
if amount > 0.01:
inverted = 255 - frame
frame = cv2.addWeighted(frame, 1 - amount, inverted, amount, 0)
elif effect_name == 'hue_shift':
degrees = params.get('degrees', 0)
if abs(degrees) > 1:
hsv = cv2.cvtColor(frame, cv2.COLOR_RGB2HSV)
hsv[:, :, 0] = (hsv[:, :, 0].astype(int) + int(degrees / 2)) % 180
frame = cv2.cvtColor(hsv, cv2.COLOR_HSV2RGB)
elif effect_name == 'blend':
if len(input_frames) >= 2:
opacity = params.get('opacity', 0.5)
frame = cv2.addWeighted(input_frames[0], 1 - opacity,
input_frames[1], opacity, 0)
elif effect_name == 'blend_multi':
weights = params.get('weights', [])
if len(input_frames) > 1 and weights:
h, w = input_frames[0].shape[:2]
result = np.zeros((h, w, 3), dtype=np.float32)
for f, wt in zip(input_frames, weights):
if f is not None and wt > 0.001:
if f.shape[:2] != (h, w):
f = cv2.resize(f, (w, h))
result += f.astype(np.float32) * wt
frame = np.clip(result, 0, 255).astype(np.uint8)
elif effect_name == 'ripple':
amp = params.get('amplitude', 0)
if amp > 1:
frame = self._apply_ripple(frame, amp,
params.get('center_x', 0.5),
params.get('center_y', 0.5),
params.get('frequency', 8),
params.get('decay', 2),
params.get('speed', 5))
return frame
def _eval_slice_on_frame(self, node: dict) -> Optional[np.ndarray]:
"""Evaluate a SLICE_ON node - returns composited frame."""
import cv2
config = node.get('config', {})
video_ids = config.get('videos', [])
result = self.slice_on_result.get(node['id'], {})
if not result:
# No result yet - return first video
if video_ids:
return self._get_frame(video_ids[0])
return None
# Get layers and compose info
layers = result.get('layers', [])
compose = result.get('compose', {})
weights = compose.get('weights', [])
if not layers or not weights:
if video_ids:
return self._get_frame(video_ids[0])
return None
# Get frames for each layer
frames = []
for i, layer in enumerate(layers):
video_idx = layer.get('video', i)
if video_idx < len(video_ids):
frame = self._get_frame(video_ids[video_idx])
# Apply layer effects (zoom)
effects = layer.get('effects', [])
for eff in effects:
eff_name = eff.get('effect')
if hasattr(eff_name, 'name'):
eff_name = eff_name.name
if eff_name == 'zoom':
zoom_amt = eff.get('amount', 1.0)
if frame is not None:
frame = self._apply_zoom(frame, zoom_amt)
frames.append(frame)
else:
frames.append(None)
# Composite with weights - use consistent output size
if self._output_size:
w, h = self._output_size
else:
# Fallback to first non-None frame size
for f in frames:
if f is not None:
h, w = f.shape[:2]
break
else:
return None
output = np.zeros((h, w, 3), dtype=np.float32)
for frame, weight in zip(frames, weights):
if frame is None or weight < 0.001:
continue
# Resize to output size
if frame.shape[1] != w or frame.shape[0] != h:
frame = cv2.resize(frame, (w, h))
output += frame.astype(np.float32) * weight
# Normalize weights
total_weight = sum(wt for wt in weights if wt > 0.001)
if total_weight > 0 and abs(total_weight - 1.0) > 0.01:
output /= total_weight
return np.clip(output, 0, 255).astype(np.uint8)
def _resolve_config(self, config: dict) -> dict:
"""Resolve bindings in effect config to actual values."""
resolved = {}
for key, value in config.items():
if key in ('effect', 'effect_path', 'effect_cid', 'effects_registry',
'analysis_refs', 'inputs', 'cid'):
continue
if isinstance(value, dict) and value.get('_binding'):
resolved[key] = self._resolve_binding(value)
elif isinstance(value, dict) and value.get('_expr'):
resolved[key] = self._resolve_expr(value)
else:
resolved[key] = value
return resolved
def _resolve_binding(self, binding: dict) -> Any:
"""Resolve a binding to its current value."""
source_id = binding.get('source')
feature = binding.get('feature', 'values')
range_map = binding.get('range')
# Get raw value from scan or analysis
if source_id in self.scan_emit:
value = self.scan_emit[source_id]
elif source_id in self.ctx.analysis:
data = self.ctx.analysis[source_id]
value = data.get(feature, data.get('values', [0]))[0] if isinstance(data, dict) else data
else:
# Fallback to energy
value = self.ctx.energy
# Extract feature from dict
if isinstance(value, dict) and feature in value:
value = value[feature]
# Apply range mapping
if range_map and isinstance(value, (int, float)):
lo, hi = range_map
value = lo + value * (hi - lo)
return value
def _resolve_expr(self, expr: dict) -> Any:
"""Resolve a compiled expression."""
env = {
'energy': self.ctx.energy,
'beat_count': self.ctx.beat_count,
't': self.ctx.t,
}
# Add scan values
for scan_id, value in self.scan_emit.items():
# Use short form if available
env[scan_id] = value
# Extract the actual expression from _expr wrapper
actual_expr = expr.get('_expr', expr)
return self.interp.eval(actual_expr, env)
def _apply_zoom(self, frame: np.ndarray, amount: float) -> np.ndarray:
"""Apply zoom to frame."""
import cv2
h, w = frame.shape[:2]
if amount > 1.01:
# Zoom in: crop center
new_w, new_h = int(w / amount), int(h / amount)
if new_w > 0 and new_h > 0:
x1, y1 = (w - new_w) // 2, (h - new_h) // 2
cropped = frame[y1:y1+new_h, x1:x1+new_w]
return cv2.resize(cropped, (w, h))
elif amount < 0.99:
# Zoom out: shrink and center
scaled_w, scaled_h = int(w * amount), int(h * amount)
if scaled_w > 0 and scaled_h > 0:
shrunk = cv2.resize(frame, (scaled_w, scaled_h))
canvas = np.zeros((h, w, 3), dtype=np.uint8)
x_off, y_off = (w - scaled_w) // 2, (h - scaled_h) // 2
canvas[y_off:y_off+scaled_h, x_off:x_off+scaled_w] = shrunk
return canvas
return frame
def _apply_ripple(self, frame: np.ndarray, amplitude: float,
cx: float, cy: float, frequency: float,
decay: float, speed: float) -> np.ndarray:
"""Apply ripple effect."""
import cv2
h, w = frame.shape[:2]
# Create coordinate grids
y_coords, x_coords = np.mgrid[0:h, 0:w].astype(np.float32)
# Normalize to center
center_x, center_y = w * cx, h * cy
dx = x_coords - center_x
dy = y_coords - center_y
dist = np.sqrt(dx**2 + dy**2)
# Ripple displacement
phase = self.ctx.t * speed
ripple = amplitude * np.sin(dist / frequency - phase) * np.exp(-dist * decay / max(w, h))
# Displace coordinates
angle = np.arctan2(dy, dx)
map_x = (x_coords + ripple * np.cos(angle)).astype(np.float32)
map_y = (y_coords + ripple * np.sin(angle)).astype(np.float32)
return cv2.remap(frame, map_x, map_y, cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT)
def _find_output_node(self) -> Optional[str]:
"""Find the final output node (MUX or last EFFECT)."""
# Look for MUX node
for node in self.recipe.nodes:
if node.get('type') == 'MUX':
return node['id']
# Otherwise find last EFFECT after SLICE_ON
last_effect = None
found_slice_on = False
for node in self.recipe.nodes:
if node.get('type') == 'SLICE_ON':
found_slice_on = True
elif node.get('type') == 'EFFECT' and found_slice_on:
last_effect = node['id']
return last_effect
def render_frame(self, t: float) -> Optional[np.ndarray]:
"""Render a single frame at time t."""
# Clear frame cache
self._frame_cache.clear()
# Update context
self.ctx.t = t
# Update audio analysis
if self.audio_analyzer:
self.audio_analyzer.set_time(t)
energy = self.audio_analyzer.get_energy()
is_beat = self.audio_analyzer.get_beat()
# Beat edge detection
was_beat = self.ctx.is_beat
self.ctx.energy = energy
self.ctx.is_beat = is_beat
if is_beat and not was_beat:
self._on_beat()
# Store in analysis dict
self.ctx.analysis['live_energy'] = {'values': [energy]}
self.ctx.analysis['live_beat'] = {'values': [1.0 if is_beat else 0.0]}
# Find output node and render
output_node = self._find_output_node()
if output_node:
frame = self._get_frame(output_node)
# Normalize to output size
if frame is not None and self._output_size:
w, h = self._output_size
if frame.shape[1] != w or frame.shape[0] != h:
import cv2
frame = cv2.resize(frame, (w, h))
return frame
return None
def run(self, output: str = "preview", duration: float = None):
"""
Run the pipeline.
Args:
output: "preview", filename, or Output object
duration: Duration in seconds (default: audio duration or 60s)
"""
# Determine duration
if duration is None:
if self.audio_analyzer:
duration = self.audio_analyzer.duration
else:
duration = 60.0
# Create output
if output == "preview":
# Get frame size from first source
first_source = next(iter(self.sources.values()), None)
if first_source:
w, h = first_source._size
else:
w, h = 720, 720
out = DisplayOutput(size=(w, h), fps=self.fps, audio_source=self.audio_source_path)
elif isinstance(output, str):
first_source = next(iter(self.sources.values()), None)
if first_source:
w, h = first_source._size
else:
w, h = 720, 720
out = FileOutput(output, size=(w, h), fps=self.fps, audio_source=self.audio_source_path)
else:
out = output
frame_time = 1.0 / self.fps
n_frames = int(duration * self.fps)
print(f"Streaming: {len(self.sources)} sources -> {output}", file=sys.stderr)
print(f"Duration: {duration:.1f}s, {n_frames} frames @ {self.fps}fps", file=sys.stderr)
start_time = time.time()
frame_count = 0
try:
for frame_num in range(n_frames):
t = frame_num * frame_time
frame = self.render_frame(t)
if frame is not None:
out.write(frame, t)
frame_count += 1
# Progress
if frame_num % 50 == 0:
elapsed = time.time() - start_time
fps = frame_count / elapsed if elapsed > 0 else 0
pct = 100 * frame_num / n_frames
print(f"\r{pct:5.1f}% | {fps:5.1f} fps | frame {frame_num}/{n_frames}",
end="", file=sys.stderr)
except KeyboardInterrupt:
print("\nInterrupted", file=sys.stderr)
finally:
out.close()
for src in self.sources.values():
src.close()
elapsed = time.time() - start_time
avg_fps = frame_count / elapsed if elapsed > 0 else 0
print(f"\nCompleted: {frame_count} frames in {elapsed:.1f}s ({avg_fps:.1f} fps avg)",
file=sys.stderr)
def run_pipeline(recipe_path: str, output: str = "preview",
duration: float = None, fps: float = None):
"""
Run a recipe through the streaming pipeline.
No adapter layer - directly executes the compiled recipe.
"""
from pathlib import Path
# Add artdag to path
import sys
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "artdag"))
from artdag.sexp.compiler import compile_string
recipe_path = Path(recipe_path)
recipe_text = recipe_path.read_text()
compiled = compile_string(recipe_text, {}, recipe_dir=recipe_path.parent)
pipeline = StreamingPipeline(
compiled,
recipe_dir=recipe_path.parent,
fps=fps or compiled.encoding.get('fps', 30),
)
pipeline.run(output=output, duration=duration)
def run_pipeline_piped(recipe_path: str, duration: float = None, fps: float = None):
"""
Run pipeline and pipe directly to mpv with audio.
"""
import subprocess
from pathlib import Path
import sys
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "artdag"))
from artdag.sexp.compiler import compile_string
recipe_path = Path(recipe_path)
recipe_text = recipe_path.read_text()
compiled = compile_string(recipe_text, {}, recipe_dir=recipe_path.parent)
pipeline = StreamingPipeline(
compiled,
recipe_dir=recipe_path.parent,
fps=fps or compiled.encoding.get('fps', 30),
)
# Get frame info
first_source = next(iter(pipeline.sources.values()), None)
if first_source:
w, h = first_source._size
else:
w, h = 720, 720
# Determine duration
if duration is None:
if pipeline.audio_analyzer:
duration = pipeline.audio_analyzer.duration
else:
duration = 60.0
actual_fps = fps or compiled.encoding.get('fps', 30)
n_frames = int(duration * actual_fps)
frame_time = 1.0 / actual_fps
print(f"Streaming {n_frames} frames @ {actual_fps}fps to mpv", file=sys.stderr)
# Start mpv
mpv_cmd = [
"mpv", "--no-cache",
"--demuxer=rawvideo",
f"--demuxer-rawvideo-w={w}",
f"--demuxer-rawvideo-h={h}",
"--demuxer-rawvideo-mp-format=rgb24",
f"--demuxer-rawvideo-fps={actual_fps}",
"--title=Streaming Pipeline",
"-"
]
mpv = subprocess.Popen(mpv_cmd, stdin=subprocess.PIPE, stderr=subprocess.DEVNULL)
# Start audio if available
audio_proc = None
if pipeline.audio_source_path:
audio_cmd = ["ffplay", "-nodisp", "-autoexit", "-loglevel", "quiet",
pipeline.audio_source_path]
audio_proc = subprocess.Popen(audio_cmd, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
try:
import cv2
for frame_num in range(n_frames):
if mpv.poll() is not None:
break # mpv closed
t = frame_num * frame_time
frame = pipeline.render_frame(t)
if frame is not None:
# Ensure consistent frame size
if frame.shape[1] != w or frame.shape[0] != h:
frame = cv2.resize(frame, (w, h))
if not frame.flags['C_CONTIGUOUS']:
frame = np.ascontiguousarray(frame)
try:
mpv.stdin.write(frame.tobytes())
mpv.stdin.flush()
except BrokenPipeError:
break
except KeyboardInterrupt:
pass
finally:
if mpv.stdin:
mpv.stdin.close()
mpv.terminate()
if audio_proc:
audio_proc.terminate()
for src in pipeline.sources.values():
src.close()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Run sexp recipe through streaming pipeline")
parser.add_argument("recipe", help="Path to .sexp recipe file")
parser.add_argument("-o", "--output", default="pipe",
help="Output: 'pipe' (mpv), 'preview', or filename (default: pipe)")
parser.add_argument("-d", "--duration", type=float, default=None,
help="Duration in seconds (default: audio duration)")
parser.add_argument("--fps", type=float, default=None,
help="Frame rate (default: from recipe)")
args = parser.parse_args()
if args.output == "pipe":
run_pipeline_piped(args.recipe, duration=args.duration, fps=args.fps)
else:
run_pipeline(args.recipe, output=args.output, duration=args.duration, fps=args.fps)