Files
test/streaming/stream_sexp.py
gilesb 95fcc67dcc Add generic streaming interpreter with configurable sources/audio
- Add stream_sexp_generic.py: fully generic sexp interpreter
- Add streaming primitives for video sources and audio analysis
- Add config system for external sources and audio files
- Add templates for reusable scans and macros
- Fix video/audio stream mapping in file output
- Add dynamic source cycling based on sources array length
- Remove old Python effect files (migrated to sexp)
- Update sexp effects to use namespaced primitives

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 17:48:04 +00:00

1082 lines
40 KiB
Python

"""
Generic Streaming S-expression Interpreter.
Executes streaming sexp recipes frame-by-frame.
The sexp defines the pipeline logic - interpreter just provides primitives.
Primitives:
(read source-name) - read frame from source
(rotate frame :angle N) - rotate frame
(zoom frame :amount N) - zoom frame
(invert frame :amount N) - invert colors
(hue-shift frame :degrees N) - shift hue
(blend frame1 frame2 :opacity N) - blend two frames
(blend-weighted [frames...] [weights...]) - weighted blend
(ripple frame :amplitude N :cx N :cy N ...) - ripple effect
(bind scan-name :field) - get scan state field
(map value [lo hi]) - map 0-1 value to range
energy - current energy (0-1)
beat - 1 if beat, 0 otherwise
t - current time
beat-count - total beats so far
Example sexp:
(stream "test"
:fps 30
(source vid "video.mp4")
(audio aud "music.mp3")
(scan spin beat
:init {:angle 0 :dir 1}
:step (dict :angle (+ angle (* dir 10)) :dir dir))
(frame
(-> (read vid)
(rotate :angle (bind spin :angle))
(zoom :amount (map energy [1 1.5])))))
"""
import sys
import time
import json
import hashlib
import numpy as np
import subprocess
from pathlib import Path
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Tuple, Union
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "artdag"))
from artdag.sexp.parser import parse, parse_all, Symbol, Keyword
@dataclass
class StreamContext:
"""Runtime context for streaming."""
t: float = 0.0
frame_num: int = 0
fps: float = 30.0
energy: float = 0.0
is_beat: bool = False
beat_count: int = 0
output_size: Tuple[int, int] = (720, 720)
class StreamCache:
"""Cache for streaming data."""
def __init__(self, cache_dir: Path, recipe_hash: str):
self.cache_dir = cache_dir / recipe_hash
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.analysis_buffer: Dict[str, List] = {}
self.scan_states: Dict[str, List] = {}
self.keyframe_interval = 5.0
def record_analysis(self, name: str, t: float, value: float):
if name not in self.analysis_buffer:
self.analysis_buffer[name] = []
t = float(t) if hasattr(t, 'item') else t
value = float(value) if hasattr(value, 'item') else value
self.analysis_buffer[name].append((t, value))
def record_scan_state(self, name: str, t: float, state: dict):
if name not in self.scan_states:
self.scan_states[name] = []
states = self.scan_states[name]
if not states or t - states[-1][0] >= self.keyframe_interval:
t = float(t) if hasattr(t, 'item') else t
clean = {k: (float(v) if hasattr(v, 'item') else v) for k, v in state.items()}
self.scan_states[name].append((t, clean))
def flush(self):
for name, data in self.analysis_buffer.items():
path = self.cache_dir / f"analysis_{name}.json"
existing = json.loads(path.read_text()) if path.exists() else []
existing.extend(data)
path.write_text(json.dumps(existing))
self.analysis_buffer.clear()
for name, states in self.scan_states.items():
path = self.cache_dir / f"scan_{name}.json"
existing = json.loads(path.read_text()) if path.exists() else []
existing.extend(states)
path.write_text(json.dumps(existing))
self.scan_states.clear()
class VideoSource:
"""Video source - reads frames sequentially."""
def __init__(self, path: str, fps: float = 30):
self.path = Path(path)
if not self.path.exists():
raise FileNotFoundError(f"Video not found: {path}")
# Get info
cmd = ["ffprobe", "-v", "quiet", "-print_format", "json",
"-show_streams", "-show_format", str(self.path)]
info = json.loads(subprocess.run(cmd, capture_output=True, text=True).stdout)
for s in info.get("streams", []):
if s.get("codec_type") == "video":
self.width = s.get("width", 720)
self.height = s.get("height", 720)
break
else:
self.width, self.height = 720, 720
self.duration = float(info.get("format", {}).get("duration", 60))
self.size = (self.width, self.height)
# Start decoder
cmd = ["ffmpeg", "-v", "quiet", "-i", str(self.path),
"-f", "rawvideo", "-pix_fmt", "rgb24", "-r", str(fps), "-"]
self._proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
self._frame_size = self.width * self.height * 3
self._current_frame = None
def read(self) -> Optional[np.ndarray]:
"""Read next frame."""
data = self._proc.stdout.read(self._frame_size)
if len(data) < self._frame_size:
return self._current_frame # Return last frame if stream ends
self._current_frame = np.frombuffer(data, dtype=np.uint8).reshape(
self.height, self.width, 3).copy()
return self._current_frame
def skip(self):
"""Read and discard frame (keep pipe in sync)."""
self._proc.stdout.read(self._frame_size)
def close(self):
if self._proc:
self._proc.terminate()
self._proc.wait()
class AudioAnalyzer:
"""Real-time audio analysis."""
def __init__(self, path: str, sample_rate: int = 22050):
self.path = Path(path)
# Load audio
cmd = ["ffmpeg", "-v", "quiet", "-i", str(self.path),
"-f", "f32le", "-ac", "1", "-ar", str(sample_rate), "-"]
self._audio = np.frombuffer(
subprocess.run(cmd, capture_output=True).stdout, dtype=np.float32)
self.sample_rate = sample_rate
# Get duration
cmd = ["ffprobe", "-v", "quiet", "-print_format", "json",
"-show_format", str(self.path)]
info = json.loads(subprocess.run(cmd, capture_output=True, text=True).stdout)
self.duration = float(info.get("format", {}).get("duration", 60))
self._flux_history = []
self._last_beat_time = -1
def get_energy(self, t: float) -> float:
idx = int(t * self.sample_rate)
start = max(0, idx - 512)
end = min(len(self._audio), idx + 512)
if start >= end:
return 0.0
return min(1.0, np.sqrt(np.mean(self._audio[start:end] ** 2)) * 3.0)
def get_beat(self, t: float) -> bool:
idx = int(t * self.sample_rate)
size = 2048
start, end = max(0, idx - size//2), min(len(self._audio), idx + size//2)
if end - start < size//2:
return False
curr = self._audio[start:end]
pstart, pend = max(0, start - 512), max(0, end - 512)
if pend <= pstart:
return False
prev = self._audio[pstart:pend]
curr_spec = np.abs(np.fft.rfft(curr * np.hanning(len(curr))))
prev_spec = np.abs(np.fft.rfft(prev * np.hanning(len(prev))))
n = min(len(curr_spec), len(prev_spec))
flux = np.sum(np.maximum(0, curr_spec[:n] - prev_spec[:n])) / (n + 1)
self._flux_history.append((t, flux))
while self._flux_history and self._flux_history[0][0] < t - 1.5:
self._flux_history.pop(0)
if len(self._flux_history) < 3:
return False
vals = [f for _, f in self._flux_history]
threshold = np.mean(vals) + np.std(vals) * 0.3 + 0.001
is_beat = flux > threshold and t - self._last_beat_time > 0.1
if is_beat:
self._last_beat_time = t
return is_beat
class StreamInterpreter:
"""
Generic streaming sexp interpreter.
Evaluates the frame pipeline expression each frame.
"""
def __init__(self, sexp_path: str, cache_dir: str = None):
self.sexp_path = Path(sexp_path)
self.sexp_dir = self.sexp_path.parent
text = self.sexp_path.read_text()
self.ast = parse(text)
self.config = self._parse_config()
recipe_hash = hashlib.sha256(text.encode()).hexdigest()[:16]
cache_path = Path(cache_dir) if cache_dir else self.sexp_dir / ".stream_cache"
self.cache = StreamCache(cache_path, recipe_hash)
self.ctx = StreamContext(fps=self.config.get('fps', 30))
self.sources: Dict[str, VideoSource] = {}
self.frames: Dict[str, np.ndarray] = {} # Current frame per source
self._sources_read: set = set() # Track which sources read this frame
self.audios: Dict[str, AudioAnalyzer] = {} # Multiple named audio sources
self.audio_paths: Dict[str, str] = {}
self.audio_state: Dict[str, dict] = {} # Per-audio: {energy, is_beat, beat_count, last_beat}
self.scans: Dict[str, dict] = {}
# Registries for external definitions
self.primitives: Dict[str, Any] = {} # name -> Python function
self.effects: Dict[str, dict] = {} # name -> {params, body}
self.macros: Dict[str, dict] = {} # name -> {params, body}
self.primitive_lib_dir = self.sexp_dir.parent / "sexp_effects" / "primitive_libs"
self.frame_pipeline = None # The (frame ...) expression
import random
self.rng = random.Random(self.config.get('seed', 42))
def _parse_config(self) -> dict:
"""Parse config from (stream name :key val ...)."""
config = {'fps': 30, 'seed': 42}
if not self.ast or not isinstance(self.ast[0], Symbol):
return config
if self.ast[0].name != 'stream':
return config
i = 2
while i < len(self.ast):
if isinstance(self.ast[i], Keyword):
config[self.ast[i].name] = self.ast[i + 1] if i + 1 < len(self.ast) else None
i += 2
elif isinstance(self.ast[i], list):
break
else:
i += 1
return config
def _load_primitives(self, lib_name: str):
"""Load primitives from a Python library file."""
import importlib.util
# Try multiple paths
lib_paths = [
self.primitive_lib_dir / f"{lib_name}.py",
self.sexp_dir / "primitive_libs" / f"{lib_name}.py",
self.sexp_dir.parent / "sexp_effects" / "primitive_libs" / f"{lib_name}.py",
]
lib_path = None
for p in lib_paths:
if p.exists():
lib_path = p
break
if not lib_path:
print(f"Warning: primitive library '{lib_name}' not found", file=sys.stderr)
return
spec = importlib.util.spec_from_file_location(lib_name, lib_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Extract all prim_* functions
count = 0
for name in dir(module):
if name.startswith('prim_'):
func = getattr(module, name)
prim_name = name[5:] # Remove 'prim_' prefix
self.primitives[prim_name] = func
# Also register with dashes instead of underscores
dash_name = prim_name.replace('_', '-')
self.primitives[dash_name] = func
# Also register with -img suffix (sexp convention)
self.primitives[dash_name + '-img'] = func
count += 1
# Also check for PRIMITIVES dict (some modules use this for additional exports)
if hasattr(module, 'PRIMITIVES'):
prims = getattr(module, 'PRIMITIVES')
if isinstance(prims, dict):
for name, func in prims.items():
self.primitives[name] = func
# Also register underscore version
underscore_name = name.replace('-', '_')
self.primitives[underscore_name] = func
count += 1
print(f"Loaded primitives: {lib_name} ({count} functions)", file=sys.stderr)
def _load_effect(self, effect_path: Path):
"""Load and register an effect from a .sexp file."""
if not effect_path.exists():
print(f"Warning: effect file not found: {effect_path}", file=sys.stderr)
return
text = effect_path.read_text()
ast = parse_all(text)
for form in ast:
if not isinstance(form, list) or not form:
continue
if not isinstance(form[0], Symbol):
continue
cmd = form[0].name
if cmd == 'require-primitives':
lib_name = form[1] if isinstance(form[1], str) else str(form[1]).strip('"')
self._load_primitives(lib_name)
elif cmd == 'define-effect':
name = form[1].name if isinstance(form[1], Symbol) else str(form[1])
params = {}
body = None
i = 2
while i < len(form):
if isinstance(form[i], Keyword):
if form[i].name == 'params' and i + 1 < len(form):
# Parse params list
params_list = form[i + 1]
for p in params_list:
if isinstance(p, list) and p:
pname = p[0].name if isinstance(p[0], Symbol) else str(p[0])
pdef = {'default': 0}
j = 1
while j < len(p):
if isinstance(p[j], Keyword):
pdef[p[j].name] = p[j + 1] if j + 1 < len(p) else None
j += 2
else:
j += 1
params[pname] = pdef
i += 2
else:
i += 2
else:
# Body expression
body = form[i]
i += 1
self.effects[name] = {'params': params, 'body': body, 'path': str(effect_path)}
print(f"Effect: {name}", file=sys.stderr)
elif cmd == 'defmacro':
name = form[1].name if isinstance(form[1], Symbol) else str(form[1])
params = []
body = None
if len(form) > 2 and isinstance(form[2], list):
params = [p.name if isinstance(p, Symbol) else str(p) for p in form[2]]
if len(form) > 3:
body = form[3]
self.macros[name] = {'params': params, 'body': body}
print(f"Macro: {name}", file=sys.stderr)
def _init(self):
"""Initialize sources, scans, and pipeline from sexp."""
for form in self.ast:
if not isinstance(form, list) or not form:
continue
if not isinstance(form[0], Symbol):
continue
cmd = form[0].name
# === External loading ===
if cmd == 'require-primitives':
lib_name = form[1] if isinstance(form[1], str) else str(form[1]).strip('"')
self._load_primitives(lib_name)
elif cmd == 'effect':
# (effect name :path "...")
name = form[1].name if isinstance(form[1], Symbol) else str(form[1])
i = 2
while i < len(form):
if isinstance(form[i], Keyword) and form[i].name == 'path':
path = str(form[i + 1]).strip('"')
full = (self.sexp_dir / path).resolve()
self._load_effect(full)
i += 2
else:
i += 1
elif cmd == 'include':
# (include :path "...")
i = 1
while i < len(form):
if isinstance(form[i], Keyword) and form[i].name == 'path':
path = str(form[i + 1]).strip('"')
full = (self.sexp_dir / path).resolve()
self._load_effect(full) # Reuse effect loader for includes
i += 2
else:
i += 1
# === Sources ===
elif cmd == 'source':
name = form[1].name if isinstance(form[1], Symbol) else str(form[1])
path = str(form[2]).strip('"')
full = (self.sexp_dir / path).resolve()
if full.exists():
self.sources[name] = VideoSource(str(full), self.ctx.fps)
print(f"Source: {name} -> {full}", file=sys.stderr)
else:
print(f"Warning: {full} not found", file=sys.stderr)
elif cmd == 'audio':
name = form[1].name if isinstance(form[1], Symbol) else str(form[1])
path = str(form[2]).strip('"')
full = (self.sexp_dir / path).resolve()
if full.exists():
self.audios[name] = AudioAnalyzer(str(full))
self.audio_paths[name] = str(full)
self.audio_state[name] = {'energy': 0.0, 'is_beat': False, 'beat_count': 0, 'last_beat': False}
print(f"Audio: {name} -> {full}", file=sys.stderr)
elif cmd == 'scan':
name = form[1].name if isinstance(form[1], Symbol) else str(form[1])
# Trigger can be:
# (beat audio-name) - trigger on beat from specific audio
# beat - legacy: trigger on beat from first audio
trigger_expr = form[2]
if isinstance(trigger_expr, list) and len(trigger_expr) >= 2:
# (beat audio-name)
trigger_type = trigger_expr[0].name if isinstance(trigger_expr[0], Symbol) else str(trigger_expr[0])
trigger_audio = trigger_expr[1].name if isinstance(trigger_expr[1], Symbol) else str(trigger_expr[1])
trigger = (trigger_type, trigger_audio)
else:
# Legacy bare symbol
trigger = trigger_expr.name if isinstance(trigger_expr, Symbol) else str(trigger_expr)
init_val, step_expr = {}, None
i = 3
while i < len(form):
if isinstance(form[i], Keyword):
if form[i].name == 'init' and i + 1 < len(form):
init_val = self._eval(form[i + 1], {})
elif form[i].name == 'step' and i + 1 < len(form):
step_expr = form[i + 1]
i += 2
else:
i += 1
self.scans[name] = {
'state': dict(init_val) if isinstance(init_val, dict) else {'acc': init_val},
'init': init_val,
'step': step_expr,
'trigger': trigger,
}
trigger_str = f"{trigger[0]} {trigger[1]}" if isinstance(trigger, tuple) else trigger
print(f"Scan: {name} (on {trigger_str})", file=sys.stderr)
elif cmd == 'frame':
# (frame expr) - the pipeline to evaluate each frame
self.frame_pipeline = form[1] if len(form) > 1 else None
# Set output size from first source
if self.sources:
first = next(iter(self.sources.values()))
self.ctx.output_size = first.size
def _eval(self, expr, env: dict) -> Any:
"""Evaluate an expression."""
import cv2
# Primitives
if isinstance(expr, (int, float)):
return expr
if isinstance(expr, str):
return expr
if isinstance(expr, Symbol):
name = expr.name
# Built-in values
if name == 't' or name == '_time':
return self.ctx.t
if name == 'pi':
import math
return math.pi
if name == 'true':
return True
if name == 'false':
return False
if name == 'nil':
return None
# Environment lookup
if name in env:
return env[name]
# Scan state lookup
if name in self.scans:
return self.scans[name]['state']
return 0
if isinstance(expr, Keyword):
return expr.name
if not isinstance(expr, list) or not expr:
return expr
# Dict literal {:key val ...}
if isinstance(expr[0], Keyword):
result = {}
i = 0
while i < len(expr):
if isinstance(expr[i], Keyword):
result[expr[i].name] = self._eval(expr[i + 1], env) if i + 1 < len(expr) else None
i += 2
else:
i += 1
return result
head = expr[0]
if not isinstance(head, Symbol):
return [self._eval(e, env) for e in expr]
op = head.name
args = expr[1:]
# Check if op is a closure in environment
if op in env:
val = env[op]
if isinstance(val, dict) and val.get('_type') == 'closure':
# Invoke closure
closure = val
closure_env = dict(closure['env'])
for i, pname in enumerate(closure['params']):
closure_env[pname] = self._eval(args[i], env) if i < len(args) else None
return self._eval(closure['body'], closure_env)
# Threading macro
if op == '->':
result = self._eval(args[0], env)
for form in args[1:]:
if isinstance(form, list) and form:
# Insert result as first arg
new_form = [form[0], result] + form[1:]
result = self._eval(new_form, env)
else:
result = self._eval([form, result], env)
return result
# === Audio analysis (explicit) ===
if op == 'energy':
# (energy audio-name) - get current energy from named audio
audio_name = args[0].name if isinstance(args[0], Symbol) else str(args[0])
if audio_name in self.audio_state:
return self.audio_state[audio_name]['energy']
return 0.0
if op == 'beat':
# (beat audio-name) - 1 if beat this frame, 0 otherwise
audio_name = args[0].name if isinstance(args[0], Symbol) else str(args[0])
if audio_name in self.audio_state:
return 1.0 if self.audio_state[audio_name]['is_beat'] else 0.0
return 0.0
if op == 'beat-count':
# (beat-count audio-name) - total beats from named audio
audio_name = args[0].name if isinstance(args[0], Symbol) else str(args[0])
if audio_name in self.audio_state:
return self.audio_state[audio_name]['beat_count']
return 0
# === Frame operations ===
if op == 'read':
# (read source-name) - get current frame from source (lazy read)
name = args[0].name if isinstance(args[0], Symbol) else str(args[0])
if name not in self.frames:
if name in self.sources:
self.frames[name] = self.sources[name].read()
self._sources_read.add(name)
return self.frames.get(name)
# === Binding and mapping ===
if op == 'bind':
# (bind scan-name :field) or (bind scan-name)
scan_name = args[0].name if isinstance(args[0], Symbol) else str(args[0])
field = None
if len(args) > 1 and isinstance(args[1], Keyword):
field = args[1].name
if scan_name in self.scans:
state = self.scans[scan_name]['state']
if field:
return state.get(field, 0)
return state
return 0
if op == 'map':
# (map value [lo hi])
val = self._eval(args[0], env)
range_list = self._eval(args[1], env) if len(args) > 1 else [0, 1]
if isinstance(range_list, list) and len(range_list) >= 2:
lo, hi = range_list[0], range_list[1]
return lo + val * (hi - lo)
return val
# === Arithmetic ===
if op == '+':
return sum(self._eval(a, env) for a in args)
if op == '-':
vals = [self._eval(a, env) for a in args]
return vals[0] - sum(vals[1:]) if len(vals) > 1 else -vals[0]
if op == '*':
result = 1
for a in args:
result *= self._eval(a, env)
return result
if op == '/':
vals = [self._eval(a, env) for a in args]
return vals[0] / vals[1] if len(vals) > 1 and vals[1] != 0 else 0
if op == 'mod':
vals = [self._eval(a, env) for a in args]
return vals[0] % vals[1] if len(vals) > 1 and vals[1] != 0 else 0
if op == 'map-range':
# (map-range val from-lo from-hi to-lo to-hi)
val = self._eval(args[0], env)
from_lo = self._eval(args[1], env)
from_hi = self._eval(args[2], env)
to_lo = self._eval(args[3], env)
to_hi = self._eval(args[4], env)
# Normalize val to 0-1 in source range, then scale to target range
if from_hi == from_lo:
return to_lo
t = (val - from_lo) / (from_hi - from_lo)
return to_lo + t * (to_hi - to_lo)
# === Comparison ===
if op == '<':
return self._eval(args[0], env) < self._eval(args[1], env)
if op == '>':
return self._eval(args[0], env) > self._eval(args[1], env)
if op == '=':
return self._eval(args[0], env) == self._eval(args[1], env)
if op == '<=':
return self._eval(args[0], env) <= self._eval(args[1], env)
if op == '>=':
return self._eval(args[0], env) >= self._eval(args[1], env)
if op == 'and':
for arg in args:
if not self._eval(arg, env):
return False
return True
if op == 'or':
# Lisp-style or: returns first truthy value, or last value if none truthy
result = False
for arg in args:
result = self._eval(arg, env)
if result:
return result
return result
if op == 'not':
return not self._eval(args[0], env)
# === Logic ===
if op == 'if':
cond = self._eval(args[0], env)
if cond:
return self._eval(args[1], env)
return self._eval(args[2], env) if len(args) > 2 else None
if op == 'cond':
# (cond pred1 expr1 pred2 expr2 ... true else-expr)
i = 0
while i < len(args) - 1:
pred = self._eval(args[i], env)
if pred:
return self._eval(args[i + 1], env)
i += 2
return None
if op == 'lambda':
# (lambda (params...) body) - create a closure
params = args[0]
body = args[1]
param_names = [p.name if isinstance(p, Symbol) else str(p) for p in params]
# Return a closure dict that captures the current env
return {'_type': 'closure', 'params': param_names, 'body': body, 'env': dict(env)}
if op == 'let' or op == 'let*':
# Support both formats:
# (let [name val name val ...] body) - flat vector
# (let ((name val) (name val) ...) body) - nested list
# Note: our let already evaluates sequentially like let*
bindings = args[0]
body = args[1]
new_env = dict(env)
if bindings and isinstance(bindings[0], list):
# Nested format: ((name val) (name val) ...)
for binding in bindings:
if isinstance(binding, list) and len(binding) >= 2:
name = binding[0].name if isinstance(binding[0], Symbol) else str(binding[0])
val = self._eval(binding[1], new_env)
new_env[name] = val
else:
# Flat format: [name val name val ...]
i = 0
while i < len(bindings):
name = bindings[i].name if isinstance(bindings[i], Symbol) else str(bindings[i])
val = self._eval(bindings[i + 1], new_env)
new_env[name] = val
i += 2
return self._eval(body, new_env)
# === Random ===
if op == 'rand':
return self.rng.random()
if op == 'rand-int':
lo = int(self._eval(args[0], env))
hi = int(self._eval(args[1], env))
return self.rng.randint(lo, hi)
if op == 'rand-range':
lo = self._eval(args[0], env)
hi = self._eval(args[1], env)
return lo + self.rng.random() * (hi - lo)
# === Dict ===
if op == 'dict':
result = {}
i = 0
while i < len(args):
if isinstance(args[i], Keyword):
result[args[i].name] = self._eval(args[i + 1], env) if i + 1 < len(args) else None
i += 2
else:
i += 1
return result
if op == 'get':
d = self._eval(args[0], env)
key = args[1].name if isinstance(args[1], Keyword) else self._eval(args[1], env)
if isinstance(d, dict):
return d.get(key, 0)
return 0
# === List ===
if op == 'list':
return [self._eval(a, env) for a in args]
if op == 'nth':
lst = self._eval(args[0], env)
idx = int(self._eval(args[1], env))
if isinstance(lst, list) and 0 <= idx < len(lst):
return lst[idx]
return None
if op == 'len':
lst = self._eval(args[0], env)
return len(lst) if isinstance(lst, (list, dict, str)) else 0
# === External effects ===
if op in self.effects:
effect = self.effects[op]
effect_env = dict(env)
effect_env['t'] = self.ctx.t
# Set defaults for all params
param_names = list(effect['params'].keys())
for pname, pdef in effect['params'].items():
effect_env[pname] = pdef.get('default', 0)
# Parse args: first is frame, then positional params, then kwargs
positional_idx = 0
i = 0
while i < len(args):
if isinstance(args[i], Keyword):
# Keyword arg
pname = args[i].name
if pname in effect['params'] and i + 1 < len(args):
effect_env[pname] = self._eval(args[i + 1], env)
i += 2
else:
# Positional arg
val = self._eval(args[i], env)
if positional_idx == 0:
effect_env['frame'] = val
elif positional_idx - 1 < len(param_names):
effect_env[param_names[positional_idx - 1]] = val
positional_idx += 1
i += 1
return self._eval(effect['body'], effect_env)
# === External primitives ===
if op in self.primitives:
prim_func = self.primitives[op]
# Evaluate all args
evaluated_args = []
kwargs = {}
i = 0
while i < len(args):
if isinstance(args[i], Keyword):
k = args[i].name
v = self._eval(args[i + 1], env) if i + 1 < len(args) else None
kwargs[k] = v
i += 2
else:
evaluated_args.append(self._eval(args[i], env))
i += 1
# Call primitive
try:
if kwargs:
return prim_func(*evaluated_args, **kwargs)
return prim_func(*evaluated_args)
except Exception as e:
print(f"Primitive {op} error: {e}", file=sys.stderr)
return None
# === Macros ===
if op in self.macros:
macro = self.macros[op]
# Bind macro params to args (unevaluated)
macro_env = dict(env)
for i, pname in enumerate(macro['params']):
macro_env[pname] = args[i] if i < len(args) else None
# Expand and evaluate
return self._eval(macro['body'], macro_env)
# === Primitive-style call (name-with-dashes -> prim_name_with_underscores) ===
prim_name = op.replace('-', '_')
if prim_name in self.primitives:
prim_func = self.primitives[prim_name]
evaluated_args = []
kwargs = {}
i = 0
while i < len(args):
if isinstance(args[i], Keyword):
k = args[i].name.replace('-', '_')
v = self._eval(args[i + 1], env) if i + 1 < len(args) else None
kwargs[k] = v
i += 2
else:
evaluated_args.append(self._eval(args[i], env))
i += 1
try:
if kwargs:
return prim_func(*evaluated_args, **kwargs)
return prim_func(*evaluated_args)
except Exception as e:
print(f"Primitive {op} error: {e}", file=sys.stderr)
return None
# Unknown - return as-is
return expr
def _step_scans(self):
"""Step scans on beat from specific audio."""
for name, scan in self.scans.items():
trigger = scan['trigger']
# Check if this scan should step
should_step = False
audio_name = None
if isinstance(trigger, tuple) and trigger[0] == 'beat':
# Explicit: (beat audio-name)
audio_name = trigger[1]
if audio_name in self.audio_state:
should_step = self.audio_state[audio_name]['is_beat']
elif trigger == 'beat':
# Legacy: use first audio
if self.audio_state:
audio_name = next(iter(self.audio_state))
should_step = self.audio_state[audio_name]['is_beat']
if should_step and audio_name:
state = self.audio_state[audio_name]
env = dict(scan['state'])
env['beat_count'] = state['beat_count']
env['t'] = self.ctx.t
env['energy'] = state['energy']
if scan['step']:
new_state = self._eval(scan['step'], env)
if isinstance(new_state, dict):
scan['state'] = new_state
elif new_state is not None:
scan['state'] = {'acc': new_state}
self.cache.record_scan_state(name, self.ctx.t, scan['state'])
def run(self, duration: float = None, output: str = "pipe"):
"""Run the streaming pipeline."""
from .output import PipeOutput, DisplayOutput, FileOutput
self._init()
if not self.sources:
print("Error: no sources", file=sys.stderr)
return
if not self.frame_pipeline:
print("Error: no (frame ...) pipeline defined", file=sys.stderr)
return
w, h = self.ctx.output_size
# Duration from first audio or default
if duration is None:
if self.audios:
first_audio = next(iter(self.audios.values()))
duration = first_audio.duration
else:
duration = 60.0
n_frames = int(duration * self.ctx.fps)
frame_time = 1.0 / self.ctx.fps
print(f"Streaming {n_frames} frames @ {self.ctx.fps}fps", file=sys.stderr)
# Use first audio for playback sync
first_audio_path = next(iter(self.audio_paths.values())) if self.audio_paths else None
# Output
if output == "pipe":
out = PipeOutput(size=(w, h), fps=self.ctx.fps,
audio_source=first_audio_path)
elif output == "preview":
out = DisplayOutput(size=(w, h), fps=self.ctx.fps,
audio_source=first_audio_path)
else:
out = FileOutput(output, size=(w, h), fps=self.ctx.fps,
audio_source=first_audio_path)
try:
for frame_num in range(n_frames):
if not out.is_open:
print(f"\nOutput closed at {frame_num}", file=sys.stderr)
break
self.ctx.t = frame_num * frame_time
self.ctx.frame_num = frame_num
# Update all audio states
for audio_name, analyzer in self.audios.items():
state = self.audio_state[audio_name]
energy = analyzer.get_energy(self.ctx.t)
is_beat_raw = analyzer.get_beat(self.ctx.t)
is_beat = is_beat_raw and not state['last_beat']
state['last_beat'] = is_beat_raw
state['energy'] = energy
state['is_beat'] = is_beat
if is_beat:
state['beat_count'] += 1
self.cache.record_analysis(f'{audio_name}_energy', self.ctx.t, energy)
self.cache.record_analysis(f'{audio_name}_beat', self.ctx.t, 1.0 if is_beat else 0.0)
# Step scans
self._step_scans()
# Clear frames - will be read lazily
self.frames.clear()
self._sources_read = set()
# Evaluate pipeline (reads happen on-demand)
result = self._eval(self.frame_pipeline, {})
# Skip unread sources to keep pipes in sync
for name, src in self.sources.items():
if name not in self._sources_read:
src.skip()
# Ensure output size
if result is not None:
import cv2
if result.shape[:2] != (h, w):
result = cv2.resize(result, (w, h))
out.write(result, self.ctx.t)
# Progress
if frame_num % 30 == 0:
pct = 100 * frame_num / n_frames
# Show beats from first audio
total_beats = 0
if self.audio_state:
first_state = next(iter(self.audio_state.values()))
total_beats = first_state['beat_count']
print(f"\r{pct:5.1f}% | beats:{total_beats}",
end="", file=sys.stderr)
sys.stderr.flush()
if frame_num % 300 == 0:
self.cache.flush()
except KeyboardInterrupt:
print("\nInterrupted", file=sys.stderr)
except Exception as e:
print(f"\nError: {e}", file=sys.stderr)
import traceback
traceback.print_exc()
finally:
out.close()
for src in self.sources.values():
src.close()
self.cache.flush()
print("\nDone", file=sys.stderr)
def run_stream(sexp_path: str, duration: float = None, output: str = "pipe", fps: float = None):
"""Run a streaming sexp."""
interp = StreamInterpreter(sexp_path)
if fps:
interp.ctx.fps = fps
interp.run(duration=duration, output=output)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Run streaming sexp")
parser.add_argument("sexp", help="Path to .sexp file")
parser.add_argument("-d", "--duration", type=float, default=None)
parser.add_argument("-o", "--output", default="pipe")
parser.add_argument("--fps", type=float, default=None, help="Override fps (default: from sexp)")
args = parser.parse_args()
run_stream(args.sexp, duration=args.duration, output=args.output, fps=args.fps)