""" Streaming S-expression executor. Executes compiled sexp recipes in real-time by: - Evaluating scan expressions on each beat - Resolving bindings to get effect parameter values - Applying effects frame-by-frame - Evaluating SLICE_ON Lambda for cycle crossfade """ import random import numpy as np from typing import Dict, List, Any, Optional from dataclasses import dataclass, field from .sexp_interp import SexpInterpreter, eval_slice_on_lambda @dataclass class ScanState: """Runtime state for a scan.""" node_id: str name: Optional[str] value: Any rng: random.Random init_expr: dict step_expr: dict emit_expr: dict class ExprEvaluator: """ Evaluates compiled expression ASTs. Expressions are dicts with: - _expr: True (marks as expression) - op: operation name - args: list of arguments - name: for 'var' ops - keys: for 'dict' ops """ def __init__(self, rng: random.Random = None): self.rng = rng or random.Random() def eval(self, expr: Any, env: Dict[str, Any]) -> Any: """Evaluate an expression in the given environment.""" # Literal values if not isinstance(expr, dict): return expr # Check if it's an expression if not expr.get('_expr'): # It's a plain dict - return as-is return expr op = expr.get('op') args = expr.get('args', []) # Evaluate based on operation if op == 'var': name = expr.get('name') if name in env: return env[name] raise KeyError(f"Unknown variable: {name}") elif op == 'dict': keys = expr.get('keys', []) values = [self.eval(a, env) for a in args] return dict(zip(keys, values)) elif op == 'get': obj = self.eval(args[0], env) key = args[1] return obj.get(key) if isinstance(obj, dict) else obj[key] elif op == 'if': cond = self.eval(args[0], env) if cond: return self.eval(args[1], env) elif len(args) > 2: return self.eval(args[2], env) return None # Comparison ops elif op == '<': return self.eval(args[0], env) < self.eval(args[1], env) elif op == '>': return self.eval(args[0], env) > self.eval(args[1], env) elif op == '<=': return self.eval(args[0], env) <= self.eval(args[1], env) elif op == '>=': return self.eval(args[0], env) >= self.eval(args[1], env) elif op == '=': return self.eval(args[0], env) == self.eval(args[1], env) elif op == '!=': return self.eval(args[0], env) != self.eval(args[1], env) # Arithmetic ops elif op == '+': return self.eval(args[0], env) + self.eval(args[1], env) elif op == '-': return self.eval(args[0], env) - self.eval(args[1], env) elif op == '*': return self.eval(args[0], env) * self.eval(args[1], env) elif op == '/': return self.eval(args[0], env) / self.eval(args[1], env) elif op == 'mod': return self.eval(args[0], env) % self.eval(args[1], env) # Random ops elif op == 'rand': return self.rng.random() elif op == 'rand-int': lo = self.eval(args[0], env) hi = self.eval(args[1], env) return self.rng.randint(lo, hi) elif op == 'rand-range': lo = self.eval(args[0], env) hi = self.eval(args[1], env) return self.rng.uniform(lo, hi) # Logic ops elif op == 'and': return all(self.eval(a, env) for a in args) elif op == 'or': return any(self.eval(a, env) for a in args) elif op == 'not': return not self.eval(args[0], env) else: raise ValueError(f"Unknown operation: {op}") class SexpStreamingExecutor: """ Executes a compiled sexp recipe in streaming mode. Reads scan definitions, effect chains, and bindings from the compiled recipe and executes them frame-by-frame. """ def __init__(self, compiled_recipe, seed: int = 42): self.recipe = compiled_recipe self.master_seed = seed # Build node lookup self.nodes = {n['id']: n for n in compiled_recipe.nodes} # State (must be initialized before _init_scans) self.beat_count = 0 self.current_time = 0.0 self.last_beat_time = 0.0 self.last_beat_detected = False self.energy = 0.0 # Initialize scans self.scans: Dict[str, ScanState] = {} self.scan_outputs: Dict[str, Any] = {} # Current emit values by node_id self._init_scans() # Initialize SLICE_ON interpreter self.sexp_interp = SexpInterpreter(random.Random(seed)) self._slice_on_lambda = None self._slice_on_acc = None self._slice_on_result = None # Last evaluation result {layers, compose, acc} self._init_slice_on() def _init_slice_on(self): """Initialize SLICE_ON Lambda for cycle crossfade.""" for node in self.recipe.nodes: if node.get('type') == 'SLICE_ON': config = node.get('config', {}) self._slice_on_lambda = config.get('fn') init = config.get('init', {}) self._slice_on_acc = { 'cycle': init.get('cycle', 0), 'beat': init.get('beat', 0), 'clen': init.get('clen', 60), } # Evaluate initial state self._eval_slice_on() break def _eval_slice_on(self): """Evaluate the SLICE_ON Lambda with current state.""" if not self._slice_on_lambda: return n = len(self._get_video_sources()) videos = list(range(n)) # Placeholder video indices try: result = eval_slice_on_lambda( self._slice_on_lambda, self._slice_on_acc, self.beat_count, 0.0, # start time (not used for weights) 1.0, # end time (not used for weights) videos, self.sexp_interp, ) self._slice_on_result = result # Update accumulator for next beat if 'acc' in result: self._slice_on_acc = result['acc'] except Exception as e: import sys print(f"SLICE_ON eval error: {e}", file=sys.stderr) def _init_scans(self): """Initialize all scan nodes from the recipe.""" seed_offset = 0 for node in self.recipe.nodes: if node.get('type') == 'SCAN': node_id = node['id'] config = node.get('config', {}) # Create RNG with unique seed scan_seed = config.get('seed', self.master_seed + seed_offset) rng = random.Random(scan_seed) seed_offset += 1 # Evaluate initial value init_expr = config.get('init', 0) evaluator = ExprEvaluator(rng) init_value = evaluator.eval(init_expr, {}) self.scans[node_id] = ScanState( node_id=node_id, name=node.get('name'), value=init_value, rng=rng, init_expr=init_expr, step_expr=config.get('step_expr', {}), emit_expr=config.get('emit_expr', {}), ) # Compute initial emit self._update_emit(node_id) def _update_emit(self, node_id: str): """Update the emit value for a scan.""" scan = self.scans[node_id] evaluator = ExprEvaluator(scan.rng) # Build environment from current state env = self._build_scan_env(scan) # Evaluate emit expression emit_value = evaluator.eval(scan.emit_expr, env) self.scan_outputs[node_id] = emit_value def _build_scan_env(self, scan: ScanState) -> Dict[str, Any]: """Build environment for scan expression evaluation.""" env = {} # Add state variables if isinstance(scan.value, dict): env.update(scan.value) else: env['acc'] = scan.value # Add beat count env['beat_count'] = self.beat_count env['time'] = self.current_time return env def on_beat(self): """Update all scans on a beat.""" self.beat_count += 1 # Estimate beat interval beat_interval = self.current_time - self.last_beat_time if self.last_beat_time > 0 else 0.5 self.last_beat_time = self.current_time # Step each scan for node_id, scan in self.scans.items(): evaluator = ExprEvaluator(scan.rng) env = self._build_scan_env(scan) # Evaluate step expression new_value = evaluator.eval(scan.step_expr, env) scan.value = new_value # Update emit self._update_emit(node_id) # Step the cycle state self._step_cycle() def on_frame(self, energy: float, is_beat: bool, t: float = 0.0): """Called each frame with audio analysis.""" self.current_time = t self.energy = energy # Update scans on beat (edge detection) if is_beat and not self.last_beat_detected: self.on_beat() self.last_beat_detected = is_beat def resolve_binding(self, binding: dict) -> Any: """Resolve a binding to get the current value.""" if not isinstance(binding, dict) or not binding.get('_binding'): return binding source_id = binding.get('source') feature = binding.get('feature', 'values') range_map = binding.get('range') # Get the raw value if source_id in self.scan_outputs: value = self.scan_outputs[source_id] else: # Might be an analyzer reference - use energy as fallback value = self.energy # Extract feature if value is a dict if isinstance(value, dict) and feature in value: value = value[feature] # Apply range mapping if range_map and isinstance(value, (int, float)): lo, hi = range_map value = lo + value * (hi - lo) return value def get_effect_params(self, effect_node: dict) -> Dict[str, Any]: """Get resolved parameters for an effect node.""" config = effect_node.get('config', {}) params = {} for key, value in config.items(): # Skip internal fields if key in ('effect', 'effect_path', 'effect_cid', 'effects_registry', 'analysis_refs'): continue # Resolve bindings params[key] = self.resolve_binding(value) return params def get_scan_value(self, name: str) -> Any: """Get scan output by name.""" for node_id, scan in self.scans.items(): if scan.name == name: return self.scan_outputs.get(node_id) return None def get_all_scan_values(self) -> Dict[str, Any]: """Get all named scan outputs.""" result = {} for node_id, scan in self.scans.items(): if scan.name: result[scan.name] = self.scan_outputs.get(node_id) return result # === Compositor interface methods === def _get_video_sources(self) -> List[str]: """Get list of video source node IDs.""" sources = [] for node in self.recipe.nodes: if node.get('type') == 'SOURCE': sources.append(node['id']) # Filter to video only (exclude audio - last one is usually audio) # Look at file extensions in the paths return sources[:-1] if len(sources) > 1 else sources def _trace_effect_chain(self, start_id: str, stop_at_blend: bool = True) -> List[dict]: """Trace effect chain from a node, returning effects in order.""" chain = [] current_id = start_id for _ in range(20): # Max depth # Find node that uses current as input next_node = None for node in self.recipe.nodes: if current_id in node.get('inputs', []): if node.get('type') == 'EFFECT': effect_type = node.get('config', {}).get('effect') chain.append(node) if stop_at_blend and effect_type == 'blend': return chain next_node = node break elif node.get('type') == 'SEGMENT': next_node = node break if next_node is None: break current_id = next_node['id'] return chain def _find_clip_chains(self, source_idx: int) -> tuple: """Find effect chains for clip A and B from a source.""" sources = self._get_video_sources() if source_idx >= len(sources): return [], [] source_id = sources[source_idx] # Find SEGMENT node segment_id = None for node in self.recipe.nodes: if node.get('type') == 'SEGMENT' and source_id in node.get('inputs', []): segment_id = node['id'] break if not segment_id: return [], [] # Find the two effect chains from segment (clip A and clip B) chains = [] for node in self.recipe.nodes: if segment_id in node.get('inputs', []) and node.get('type') == 'EFFECT': chain = self._trace_effect_chain(segment_id) # Get chain starting from this specific branch branch_chain = [node] current = node['id'] for _ in range(10): found = False for n in self.recipe.nodes: if current in n.get('inputs', []) and n.get('type') == 'EFFECT': branch_chain.append(n) if n.get('config', {}).get('effect') == 'blend': break current = n['id'] found = True break if not found: break chains.append(branch_chain) # Return first two chains as A and B chain_a = chains[0] if len(chains) > 0 else [] chain_b = chains[1] if len(chains) > 1 else [] return chain_a, chain_b def get_effect_params(self, source_idx: int, clip: str, energy: float) -> Dict: """Get effect parameters for a source clip (compositor interface).""" # Get the correct chain for this clip chain_a, chain_b = self._find_clip_chains(source_idx) chain = chain_a if clip == 'a' else chain_b # Default params params = { "rotate_angle": 0, "zoom_amount": 1.0, "invert_amount": 0, "hue_degrees": 0, "ascii_mix": 0, "ascii_char_size": 8, } # Resolve from effects in chain for eff in chain: config = eff.get('config', {}) effect_type = config.get('effect') if effect_type == 'rotate': angle_binding = config.get('angle') if angle_binding: if isinstance(angle_binding, dict) and angle_binding.get('_binding'): # Bound to analyzer - use energy with range range_map = angle_binding.get('range') if range_map: lo, hi = range_map params["rotate_angle"] = lo + energy * (hi - lo) else: params["rotate_angle"] = self.resolve_binding(angle_binding) else: params["rotate_angle"] = angle_binding if isinstance(angle_binding, (int, float)) else 0 elif effect_type == 'zoom': amount_binding = config.get('amount') if amount_binding: if isinstance(amount_binding, dict) and amount_binding.get('_binding'): range_map = amount_binding.get('range') if range_map: lo, hi = range_map params["zoom_amount"] = lo + energy * (hi - lo) else: params["zoom_amount"] = self.resolve_binding(amount_binding) else: params["zoom_amount"] = amount_binding if isinstance(amount_binding, (int, float)) else 1.0 elif effect_type == 'invert': amount_binding = config.get('amount') if amount_binding: val = self.resolve_binding(amount_binding) params["invert_amount"] = val if isinstance(val, (int, float)) else 0 elif effect_type == 'hue_shift': deg_binding = config.get('degrees') if deg_binding: val = self.resolve_binding(deg_binding) params["hue_degrees"] = val if isinstance(val, (int, float)) else 0 elif effect_type == 'ascii_art': mix_binding = config.get('mix') if mix_binding: val = self.resolve_binding(mix_binding) params["ascii_mix"] = val if isinstance(val, (int, float)) else 0 size_binding = config.get('char_size') if size_binding: if isinstance(size_binding, dict) and size_binding.get('_binding'): range_map = size_binding.get('range') if range_map: lo, hi = range_map params["ascii_char_size"] = lo + energy * (hi - lo) return params def get_pair_params(self, source_idx: int) -> Dict: """Get blend and rotation params for a video pair (compositor interface).""" params = { "blend_opacity": 0.5, "pair_rotation": 0, } # Find the blend node for this source chain_a, _ = self._find_clip_chains(source_idx) # The last effect in chain_a should be the blend blend_node = None for eff in reversed(chain_a): if eff.get('config', {}).get('effect') == 'blend': blend_node = eff break if blend_node: config = blend_node.get('config', {}) opacity_binding = config.get('opacity') if opacity_binding: val = self.resolve_binding(opacity_binding) if isinstance(val, (int, float)): params["blend_opacity"] = val # Find rotate after blend (pair rotation) blend_id = blend_node['id'] for node in self.recipe.nodes: if blend_id in node.get('inputs', []) and node.get('type') == 'EFFECT': if node.get('config', {}).get('effect') == 'rotate': angle_binding = node.get('config', {}).get('angle') if angle_binding: val = self.resolve_binding(angle_binding) if isinstance(val, (int, float)): params["pair_rotation"] = val break return params def _get_cycle_state(self) -> dict: """Get current cycle state from SLICE_ON or internal tracking.""" if not hasattr(self, '_cycle_state'): # Initialize from SLICE_ON node for node in self.recipe.nodes: if node.get('type') == 'SLICE_ON': init = node.get('config', {}).get('init', {}) self._cycle_state = { 'cycle': init.get('cycle', 0), 'beat': init.get('beat', 0), 'clen': init.get('clen', 60), } break else: self._cycle_state = {'cycle': 0, 'beat': 0, 'clen': 60} return self._cycle_state def _step_cycle(self): """Step the cycle state forward on beat by evaluating SLICE_ON Lambda.""" # Use interpreter to evaluate the Lambda self._eval_slice_on() def get_cycle_weights(self) -> List[float]: """Get blend weights for cycle-crossfade from SLICE_ON result.""" n = len(self._get_video_sources()) if n == 0: return [1.0] # Get weights from interpreted result if self._slice_on_result: compose = self._slice_on_result.get('compose', {}) weights = compose.get('weights', []) if weights and len(weights) == n: # Normalize total = sum(weights) if total > 0: return [w / total for w in weights] # Fallback: equal weights return [1.0 / n] * n def get_cycle_zooms(self) -> List[float]: """Get zoom amounts for cycle-crossfade from SLICE_ON result.""" n = len(self._get_video_sources()) if n == 0: return [1.0] # Get zooms from interpreted result (layers -> effects -> zoom amount) if self._slice_on_result: layers = self._slice_on_result.get('layers', []) if layers and len(layers) == n: zooms = [] for layer in layers: effects = layer.get('effects', []) zoom_amt = 1.0 for eff in effects: if eff.get('effect') == 'zoom' or (hasattr(eff.get('effect'), 'name') and eff.get('effect').name == 'zoom'): zoom_amt = eff.get('amount', 1.0) break zooms.append(zoom_amt) return zooms # Fallback return [1.0] * n def _get_final_rotate_scan_id(self) -> str: """Find the scan ID that drives the final rotation (after SLICE_ON).""" if hasattr(self, '_final_rotate_scan_id'): return self._final_rotate_scan_id # Find SLICE_ON node index slice_on_idx = None for i, node in enumerate(self.recipe.nodes): if node.get('type') == 'SLICE_ON': slice_on_idx = i break # Find rotate effect after SLICE_ON if slice_on_idx is not None: for node in self.recipe.nodes[slice_on_idx + 1:]: if node.get('type') == 'EFFECT': config = node.get('config', {}) if config.get('effect') == 'rotate': angle_binding = config.get('angle', {}) if isinstance(angle_binding, dict) and angle_binding.get('_binding'): self._final_rotate_scan_id = angle_binding.get('source') return self._final_rotate_scan_id self._final_rotate_scan_id = None return None def get_final_effects(self, energy: float) -> Dict: """Get final composition effects (compositor interface).""" # Get named scans scan_values = self.get_all_scan_values() # Whole spin - get from the specific scan bound to final rotate effect whole_spin = 0 final_rotate_scan_id = self._get_final_rotate_scan_id() if final_rotate_scan_id and final_rotate_scan_id in self.scan_outputs: val = self.scan_outputs[final_rotate_scan_id] if isinstance(val, dict) and 'angle' in val: whole_spin = val['angle'] elif isinstance(val, (int, float)): whole_spin = val # Ripple ripple_gate = scan_values.get('ripple-gate', 0) ripple_cx = scan_values.get('ripple-cx', 0.5) ripple_cy = scan_values.get('ripple-cy', 0.5) if isinstance(ripple_gate, dict): ripple_gate = ripple_gate.get('gate', 0) if 'gate' in ripple_gate else 1 return { "whole_spin_angle": whole_spin, "ripple_amplitude": ripple_gate * (5 + energy * 45), "ripple_cx": ripple_cx if isinstance(ripple_cx, (int, float)) else 0.5, "ripple_cy": ripple_cy if isinstance(ripple_cy, (int, float)) else 0.5, }