- New streaming/ module for real-time video processing: - compositor.py: Main streaming compositor with cycle-crossfade - sexp_executor.py: Executes compiled sexp recipes in real-time - sexp_interp.py: Full S-expression interpreter for SLICE_ON Lambda - recipe_adapter.py: Bridges recipes to streaming compositor - sources.py: Video source with ffmpeg streaming - audio.py: Real-time audio analysis (energy, beats) - output.py: Preview (mpv) and file output with audio muxing - New templates/: - cycle-crossfade.sexp: Smooth zoom-based video cycling - process-pair.sexp: Dual-clip processing with effects - Key features: - Videos cycle in input-videos order (not definition order) - Cumulative whole-spin rotation - Zero-weight sources skip processing - Live audio-reactive effects - New effects: blend_multi for weighted layer compositing - Updated primitives and interpreter for streaming compatibility Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
679 lines
24 KiB
Python
679 lines
24 KiB
Python
"""
|
|
Streaming S-expression executor.
|
|
|
|
Executes compiled sexp recipes in real-time by:
|
|
- Evaluating scan expressions on each beat
|
|
- Resolving bindings to get effect parameter values
|
|
- Applying effects frame-by-frame
|
|
- Evaluating SLICE_ON Lambda for cycle crossfade
|
|
"""
|
|
|
|
import random
|
|
import numpy as np
|
|
from typing import Dict, List, Any, Optional
|
|
from dataclasses import dataclass, field
|
|
|
|
from .sexp_interp import SexpInterpreter, eval_slice_on_lambda
|
|
|
|
|
|
@dataclass
|
|
class ScanState:
|
|
"""Runtime state for a scan."""
|
|
node_id: str
|
|
name: Optional[str]
|
|
value: Any
|
|
rng: random.Random
|
|
init_expr: dict
|
|
step_expr: dict
|
|
emit_expr: dict
|
|
|
|
|
|
class ExprEvaluator:
|
|
"""
|
|
Evaluates compiled expression ASTs.
|
|
|
|
Expressions are dicts with:
|
|
- _expr: True (marks as expression)
|
|
- op: operation name
|
|
- args: list of arguments
|
|
- name: for 'var' ops
|
|
- keys: for 'dict' ops
|
|
"""
|
|
|
|
def __init__(self, rng: random.Random = None):
|
|
self.rng = rng or random.Random()
|
|
|
|
def eval(self, expr: Any, env: Dict[str, Any]) -> Any:
|
|
"""Evaluate an expression in the given environment."""
|
|
# Literal values
|
|
if not isinstance(expr, dict):
|
|
return expr
|
|
|
|
# Check if it's an expression
|
|
if not expr.get('_expr'):
|
|
# It's a plain dict - return as-is
|
|
return expr
|
|
|
|
op = expr.get('op')
|
|
args = expr.get('args', [])
|
|
|
|
# Evaluate based on operation
|
|
if op == 'var':
|
|
name = expr.get('name')
|
|
if name in env:
|
|
return env[name]
|
|
raise KeyError(f"Unknown variable: {name}")
|
|
|
|
elif op == 'dict':
|
|
keys = expr.get('keys', [])
|
|
values = [self.eval(a, env) for a in args]
|
|
return dict(zip(keys, values))
|
|
|
|
elif op == 'get':
|
|
obj = self.eval(args[0], env)
|
|
key = args[1]
|
|
return obj.get(key) if isinstance(obj, dict) else obj[key]
|
|
|
|
elif op == 'if':
|
|
cond = self.eval(args[0], env)
|
|
if cond:
|
|
return self.eval(args[1], env)
|
|
elif len(args) > 2:
|
|
return self.eval(args[2], env)
|
|
return None
|
|
|
|
# Comparison ops
|
|
elif op == '<':
|
|
return self.eval(args[0], env) < self.eval(args[1], env)
|
|
elif op == '>':
|
|
return self.eval(args[0], env) > self.eval(args[1], env)
|
|
elif op == '<=':
|
|
return self.eval(args[0], env) <= self.eval(args[1], env)
|
|
elif op == '>=':
|
|
return self.eval(args[0], env) >= self.eval(args[1], env)
|
|
elif op == '=':
|
|
return self.eval(args[0], env) == self.eval(args[1], env)
|
|
elif op == '!=':
|
|
return self.eval(args[0], env) != self.eval(args[1], env)
|
|
|
|
# Arithmetic ops
|
|
elif op == '+':
|
|
return self.eval(args[0], env) + self.eval(args[1], env)
|
|
elif op == '-':
|
|
return self.eval(args[0], env) - self.eval(args[1], env)
|
|
elif op == '*':
|
|
return self.eval(args[0], env) * self.eval(args[1], env)
|
|
elif op == '/':
|
|
return self.eval(args[0], env) / self.eval(args[1], env)
|
|
elif op == 'mod':
|
|
return self.eval(args[0], env) % self.eval(args[1], env)
|
|
|
|
# Random ops
|
|
elif op == 'rand':
|
|
return self.rng.random()
|
|
elif op == 'rand-int':
|
|
lo = self.eval(args[0], env)
|
|
hi = self.eval(args[1], env)
|
|
return self.rng.randint(lo, hi)
|
|
elif op == 'rand-range':
|
|
lo = self.eval(args[0], env)
|
|
hi = self.eval(args[1], env)
|
|
return self.rng.uniform(lo, hi)
|
|
|
|
# Logic ops
|
|
elif op == 'and':
|
|
return all(self.eval(a, env) for a in args)
|
|
elif op == 'or':
|
|
return any(self.eval(a, env) for a in args)
|
|
elif op == 'not':
|
|
return not self.eval(args[0], env)
|
|
|
|
else:
|
|
raise ValueError(f"Unknown operation: {op}")
|
|
|
|
|
|
class SexpStreamingExecutor:
|
|
"""
|
|
Executes a compiled sexp recipe in streaming mode.
|
|
|
|
Reads scan definitions, effect chains, and bindings from the
|
|
compiled recipe and executes them frame-by-frame.
|
|
"""
|
|
|
|
def __init__(self, compiled_recipe, seed: int = 42):
|
|
self.recipe = compiled_recipe
|
|
self.master_seed = seed
|
|
|
|
# Build node lookup
|
|
self.nodes = {n['id']: n for n in compiled_recipe.nodes}
|
|
|
|
# State (must be initialized before _init_scans)
|
|
self.beat_count = 0
|
|
self.current_time = 0.0
|
|
self.last_beat_time = 0.0
|
|
self.last_beat_detected = False
|
|
self.energy = 0.0
|
|
|
|
# Initialize scans
|
|
self.scans: Dict[str, ScanState] = {}
|
|
self.scan_outputs: Dict[str, Any] = {} # Current emit values by node_id
|
|
self._init_scans()
|
|
|
|
# Initialize SLICE_ON interpreter
|
|
self.sexp_interp = SexpInterpreter(random.Random(seed))
|
|
self._slice_on_lambda = None
|
|
self._slice_on_acc = None
|
|
self._slice_on_result = None # Last evaluation result {layers, compose, acc}
|
|
self._init_slice_on()
|
|
|
|
def _init_slice_on(self):
|
|
"""Initialize SLICE_ON Lambda for cycle crossfade."""
|
|
for node in self.recipe.nodes:
|
|
if node.get('type') == 'SLICE_ON':
|
|
config = node.get('config', {})
|
|
self._slice_on_lambda = config.get('fn')
|
|
init = config.get('init', {})
|
|
self._slice_on_acc = {
|
|
'cycle': init.get('cycle', 0),
|
|
'beat': init.get('beat', 0),
|
|
'clen': init.get('clen', 60),
|
|
}
|
|
# Evaluate initial state
|
|
self._eval_slice_on()
|
|
break
|
|
|
|
def _eval_slice_on(self):
|
|
"""Evaluate the SLICE_ON Lambda with current state."""
|
|
if not self._slice_on_lambda:
|
|
return
|
|
|
|
n = len(self._get_video_sources())
|
|
videos = list(range(n)) # Placeholder video indices
|
|
|
|
try:
|
|
result = eval_slice_on_lambda(
|
|
self._slice_on_lambda,
|
|
self._slice_on_acc,
|
|
self.beat_count,
|
|
0.0, # start time (not used for weights)
|
|
1.0, # end time (not used for weights)
|
|
videos,
|
|
self.sexp_interp,
|
|
)
|
|
self._slice_on_result = result
|
|
# Update accumulator for next beat
|
|
if 'acc' in result:
|
|
self._slice_on_acc = result['acc']
|
|
except Exception as e:
|
|
import sys
|
|
print(f"SLICE_ON eval error: {e}", file=sys.stderr)
|
|
|
|
def _init_scans(self):
|
|
"""Initialize all scan nodes from the recipe."""
|
|
seed_offset = 0
|
|
for node in self.recipe.nodes:
|
|
if node.get('type') == 'SCAN':
|
|
node_id = node['id']
|
|
config = node.get('config', {})
|
|
|
|
# Create RNG with unique seed
|
|
scan_seed = config.get('seed', self.master_seed + seed_offset)
|
|
rng = random.Random(scan_seed)
|
|
seed_offset += 1
|
|
|
|
# Evaluate initial value
|
|
init_expr = config.get('init', 0)
|
|
evaluator = ExprEvaluator(rng)
|
|
init_value = evaluator.eval(init_expr, {})
|
|
|
|
self.scans[node_id] = ScanState(
|
|
node_id=node_id,
|
|
name=node.get('name'),
|
|
value=init_value,
|
|
rng=rng,
|
|
init_expr=init_expr,
|
|
step_expr=config.get('step_expr', {}),
|
|
emit_expr=config.get('emit_expr', {}),
|
|
)
|
|
|
|
# Compute initial emit
|
|
self._update_emit(node_id)
|
|
|
|
def _update_emit(self, node_id: str):
|
|
"""Update the emit value for a scan."""
|
|
scan = self.scans[node_id]
|
|
evaluator = ExprEvaluator(scan.rng)
|
|
|
|
# Build environment from current state
|
|
env = self._build_scan_env(scan)
|
|
|
|
# Evaluate emit expression
|
|
emit_value = evaluator.eval(scan.emit_expr, env)
|
|
self.scan_outputs[node_id] = emit_value
|
|
|
|
def _build_scan_env(self, scan: ScanState) -> Dict[str, Any]:
|
|
"""Build environment for scan expression evaluation."""
|
|
env = {}
|
|
|
|
# Add state variables
|
|
if isinstance(scan.value, dict):
|
|
env.update(scan.value)
|
|
else:
|
|
env['acc'] = scan.value
|
|
|
|
# Add beat count
|
|
env['beat_count'] = self.beat_count
|
|
env['time'] = self.current_time
|
|
|
|
return env
|
|
|
|
def on_beat(self):
|
|
"""Update all scans on a beat."""
|
|
self.beat_count += 1
|
|
|
|
# Estimate beat interval
|
|
beat_interval = self.current_time - self.last_beat_time if self.last_beat_time > 0 else 0.5
|
|
self.last_beat_time = self.current_time
|
|
|
|
# Step each scan
|
|
for node_id, scan in self.scans.items():
|
|
evaluator = ExprEvaluator(scan.rng)
|
|
env = self._build_scan_env(scan)
|
|
|
|
# Evaluate step expression
|
|
new_value = evaluator.eval(scan.step_expr, env)
|
|
scan.value = new_value
|
|
|
|
# Update emit
|
|
self._update_emit(node_id)
|
|
|
|
# Step the cycle state
|
|
self._step_cycle()
|
|
|
|
def on_frame(self, energy: float, is_beat: bool, t: float = 0.0):
|
|
"""Called each frame with audio analysis."""
|
|
self.current_time = t
|
|
self.energy = energy
|
|
|
|
# Update scans on beat (edge detection)
|
|
if is_beat and not self.last_beat_detected:
|
|
self.on_beat()
|
|
self.last_beat_detected = is_beat
|
|
|
|
def resolve_binding(self, binding: dict) -> Any:
|
|
"""Resolve a binding to get the current value."""
|
|
if not isinstance(binding, dict) or not binding.get('_binding'):
|
|
return binding
|
|
|
|
source_id = binding.get('source')
|
|
feature = binding.get('feature', 'values')
|
|
range_map = binding.get('range')
|
|
|
|
# Get the raw value
|
|
if source_id in self.scan_outputs:
|
|
value = self.scan_outputs[source_id]
|
|
else:
|
|
# Might be an analyzer reference - use energy as fallback
|
|
value = self.energy
|
|
|
|
# Extract feature if value is a dict
|
|
if isinstance(value, dict) and feature in value:
|
|
value = value[feature]
|
|
|
|
# Apply range mapping
|
|
if range_map and isinstance(value, (int, float)):
|
|
lo, hi = range_map
|
|
value = lo + value * (hi - lo)
|
|
|
|
return value
|
|
|
|
def get_effect_params(self, effect_node: dict) -> Dict[str, Any]:
|
|
"""Get resolved parameters for an effect node."""
|
|
config = effect_node.get('config', {})
|
|
params = {}
|
|
|
|
for key, value in config.items():
|
|
# Skip internal fields
|
|
if key in ('effect', 'effect_path', 'effect_cid', 'effects_registry', 'analysis_refs'):
|
|
continue
|
|
|
|
# Resolve bindings
|
|
params[key] = self.resolve_binding(value)
|
|
|
|
return params
|
|
|
|
def get_scan_value(self, name: str) -> Any:
|
|
"""Get scan output by name."""
|
|
for node_id, scan in self.scans.items():
|
|
if scan.name == name:
|
|
return self.scan_outputs.get(node_id)
|
|
return None
|
|
|
|
def get_all_scan_values(self) -> Dict[str, Any]:
|
|
"""Get all named scan outputs."""
|
|
result = {}
|
|
for node_id, scan in self.scans.items():
|
|
if scan.name:
|
|
result[scan.name] = self.scan_outputs.get(node_id)
|
|
return result
|
|
|
|
# === Compositor interface methods ===
|
|
|
|
def _get_video_sources(self) -> List[str]:
|
|
"""Get list of video source node IDs."""
|
|
sources = []
|
|
for node in self.recipe.nodes:
|
|
if node.get('type') == 'SOURCE':
|
|
sources.append(node['id'])
|
|
# Filter to video only (exclude audio - last one is usually audio)
|
|
# Look at file extensions in the paths
|
|
return sources[:-1] if len(sources) > 1 else sources
|
|
|
|
def _trace_effect_chain(self, start_id: str, stop_at_blend: bool = True) -> List[dict]:
|
|
"""Trace effect chain from a node, returning effects in order."""
|
|
chain = []
|
|
current_id = start_id
|
|
|
|
for _ in range(20): # Max depth
|
|
# Find node that uses current as input
|
|
next_node = None
|
|
for node in self.recipe.nodes:
|
|
if current_id in node.get('inputs', []):
|
|
if node.get('type') == 'EFFECT':
|
|
effect_type = node.get('config', {}).get('effect')
|
|
chain.append(node)
|
|
if stop_at_blend and effect_type == 'blend':
|
|
return chain
|
|
next_node = node
|
|
break
|
|
elif node.get('type') == 'SEGMENT':
|
|
next_node = node
|
|
break
|
|
|
|
if next_node is None:
|
|
break
|
|
current_id = next_node['id']
|
|
|
|
return chain
|
|
|
|
def _find_clip_chains(self, source_idx: int) -> tuple:
|
|
"""Find effect chains for clip A and B from a source."""
|
|
sources = self._get_video_sources()
|
|
if source_idx >= len(sources):
|
|
return [], []
|
|
|
|
source_id = sources[source_idx]
|
|
|
|
# Find SEGMENT node
|
|
segment_id = None
|
|
for node in self.recipe.nodes:
|
|
if node.get('type') == 'SEGMENT' and source_id in node.get('inputs', []):
|
|
segment_id = node['id']
|
|
break
|
|
|
|
if not segment_id:
|
|
return [], []
|
|
|
|
# Find the two effect chains from segment (clip A and clip B)
|
|
chains = []
|
|
for node in self.recipe.nodes:
|
|
if segment_id in node.get('inputs', []) and node.get('type') == 'EFFECT':
|
|
chain = self._trace_effect_chain(segment_id)
|
|
# Get chain starting from this specific branch
|
|
branch_chain = [node]
|
|
current = node['id']
|
|
for _ in range(10):
|
|
found = False
|
|
for n in self.recipe.nodes:
|
|
if current in n.get('inputs', []) and n.get('type') == 'EFFECT':
|
|
branch_chain.append(n)
|
|
if n.get('config', {}).get('effect') == 'blend':
|
|
break
|
|
current = n['id']
|
|
found = True
|
|
break
|
|
if not found:
|
|
break
|
|
chains.append(branch_chain)
|
|
|
|
# Return first two chains as A and B
|
|
chain_a = chains[0] if len(chains) > 0 else []
|
|
chain_b = chains[1] if len(chains) > 1 else []
|
|
return chain_a, chain_b
|
|
|
|
def get_effect_params(self, source_idx: int, clip: str, energy: float) -> Dict:
|
|
"""Get effect parameters for a source clip (compositor interface)."""
|
|
# Get the correct chain for this clip
|
|
chain_a, chain_b = self._find_clip_chains(source_idx)
|
|
chain = chain_a if clip == 'a' else chain_b
|
|
|
|
# Default params
|
|
params = {
|
|
"rotate_angle": 0,
|
|
"zoom_amount": 1.0,
|
|
"invert_amount": 0,
|
|
"hue_degrees": 0,
|
|
"ascii_mix": 0,
|
|
"ascii_char_size": 8,
|
|
}
|
|
|
|
# Resolve from effects in chain
|
|
for eff in chain:
|
|
config = eff.get('config', {})
|
|
effect_type = config.get('effect')
|
|
|
|
if effect_type == 'rotate':
|
|
angle_binding = config.get('angle')
|
|
if angle_binding:
|
|
if isinstance(angle_binding, dict) and angle_binding.get('_binding'):
|
|
# Bound to analyzer - use energy with range
|
|
range_map = angle_binding.get('range')
|
|
if range_map:
|
|
lo, hi = range_map
|
|
params["rotate_angle"] = lo + energy * (hi - lo)
|
|
else:
|
|
params["rotate_angle"] = self.resolve_binding(angle_binding)
|
|
else:
|
|
params["rotate_angle"] = angle_binding if isinstance(angle_binding, (int, float)) else 0
|
|
|
|
elif effect_type == 'zoom':
|
|
amount_binding = config.get('amount')
|
|
if amount_binding:
|
|
if isinstance(amount_binding, dict) and amount_binding.get('_binding'):
|
|
range_map = amount_binding.get('range')
|
|
if range_map:
|
|
lo, hi = range_map
|
|
params["zoom_amount"] = lo + energy * (hi - lo)
|
|
else:
|
|
params["zoom_amount"] = self.resolve_binding(amount_binding)
|
|
else:
|
|
params["zoom_amount"] = amount_binding if isinstance(amount_binding, (int, float)) else 1.0
|
|
|
|
elif effect_type == 'invert':
|
|
amount_binding = config.get('amount')
|
|
if amount_binding:
|
|
val = self.resolve_binding(amount_binding)
|
|
params["invert_amount"] = val if isinstance(val, (int, float)) else 0
|
|
|
|
elif effect_type == 'hue_shift':
|
|
deg_binding = config.get('degrees')
|
|
if deg_binding:
|
|
val = self.resolve_binding(deg_binding)
|
|
params["hue_degrees"] = val if isinstance(val, (int, float)) else 0
|
|
|
|
elif effect_type == 'ascii_art':
|
|
mix_binding = config.get('mix')
|
|
if mix_binding:
|
|
val = self.resolve_binding(mix_binding)
|
|
params["ascii_mix"] = val if isinstance(val, (int, float)) else 0
|
|
size_binding = config.get('char_size')
|
|
if size_binding:
|
|
if isinstance(size_binding, dict) and size_binding.get('_binding'):
|
|
range_map = size_binding.get('range')
|
|
if range_map:
|
|
lo, hi = range_map
|
|
params["ascii_char_size"] = lo + energy * (hi - lo)
|
|
|
|
return params
|
|
|
|
def get_pair_params(self, source_idx: int) -> Dict:
|
|
"""Get blend and rotation params for a video pair (compositor interface)."""
|
|
params = {
|
|
"blend_opacity": 0.5,
|
|
"pair_rotation": 0,
|
|
}
|
|
|
|
# Find the blend node for this source
|
|
chain_a, _ = self._find_clip_chains(source_idx)
|
|
|
|
# The last effect in chain_a should be the blend
|
|
blend_node = None
|
|
for eff in reversed(chain_a):
|
|
if eff.get('config', {}).get('effect') == 'blend':
|
|
blend_node = eff
|
|
break
|
|
|
|
if blend_node:
|
|
config = blend_node.get('config', {})
|
|
opacity_binding = config.get('opacity')
|
|
if opacity_binding:
|
|
val = self.resolve_binding(opacity_binding)
|
|
if isinstance(val, (int, float)):
|
|
params["blend_opacity"] = val
|
|
|
|
# Find rotate after blend (pair rotation)
|
|
blend_id = blend_node['id']
|
|
for node in self.recipe.nodes:
|
|
if blend_id in node.get('inputs', []) and node.get('type') == 'EFFECT':
|
|
if node.get('config', {}).get('effect') == 'rotate':
|
|
angle_binding = node.get('config', {}).get('angle')
|
|
if angle_binding:
|
|
val = self.resolve_binding(angle_binding)
|
|
if isinstance(val, (int, float)):
|
|
params["pair_rotation"] = val
|
|
break
|
|
|
|
return params
|
|
|
|
def _get_cycle_state(self) -> dict:
|
|
"""Get current cycle state from SLICE_ON or internal tracking."""
|
|
if not hasattr(self, '_cycle_state'):
|
|
# Initialize from SLICE_ON node
|
|
for node in self.recipe.nodes:
|
|
if node.get('type') == 'SLICE_ON':
|
|
init = node.get('config', {}).get('init', {})
|
|
self._cycle_state = {
|
|
'cycle': init.get('cycle', 0),
|
|
'beat': init.get('beat', 0),
|
|
'clen': init.get('clen', 60),
|
|
}
|
|
break
|
|
else:
|
|
self._cycle_state = {'cycle': 0, 'beat': 0, 'clen': 60}
|
|
|
|
return self._cycle_state
|
|
|
|
def _step_cycle(self):
|
|
"""Step the cycle state forward on beat by evaluating SLICE_ON Lambda."""
|
|
# Use interpreter to evaluate the Lambda
|
|
self._eval_slice_on()
|
|
|
|
def get_cycle_weights(self) -> List[float]:
|
|
"""Get blend weights for cycle-crossfade from SLICE_ON result."""
|
|
n = len(self._get_video_sources())
|
|
if n == 0:
|
|
return [1.0]
|
|
|
|
# Get weights from interpreted result
|
|
if self._slice_on_result:
|
|
compose = self._slice_on_result.get('compose', {})
|
|
weights = compose.get('weights', [])
|
|
if weights and len(weights) == n:
|
|
# Normalize
|
|
total = sum(weights)
|
|
if total > 0:
|
|
return [w / total for w in weights]
|
|
|
|
# Fallback: equal weights
|
|
return [1.0 / n] * n
|
|
|
|
def get_cycle_zooms(self) -> List[float]:
|
|
"""Get zoom amounts for cycle-crossfade from SLICE_ON result."""
|
|
n = len(self._get_video_sources())
|
|
if n == 0:
|
|
return [1.0]
|
|
|
|
# Get zooms from interpreted result (layers -> effects -> zoom amount)
|
|
if self._slice_on_result:
|
|
layers = self._slice_on_result.get('layers', [])
|
|
if layers and len(layers) == n:
|
|
zooms = []
|
|
for layer in layers:
|
|
effects = layer.get('effects', [])
|
|
zoom_amt = 1.0
|
|
for eff in effects:
|
|
if eff.get('effect') == 'zoom' or (hasattr(eff.get('effect'), 'name') and eff.get('effect').name == 'zoom'):
|
|
zoom_amt = eff.get('amount', 1.0)
|
|
break
|
|
zooms.append(zoom_amt)
|
|
return zooms
|
|
|
|
# Fallback
|
|
return [1.0] * n
|
|
|
|
def _get_final_rotate_scan_id(self) -> str:
|
|
"""Find the scan ID that drives the final rotation (after SLICE_ON)."""
|
|
if hasattr(self, '_final_rotate_scan_id'):
|
|
return self._final_rotate_scan_id
|
|
|
|
# Find SLICE_ON node index
|
|
slice_on_idx = None
|
|
for i, node in enumerate(self.recipe.nodes):
|
|
if node.get('type') == 'SLICE_ON':
|
|
slice_on_idx = i
|
|
break
|
|
|
|
# Find rotate effect after SLICE_ON
|
|
if slice_on_idx is not None:
|
|
for node in self.recipe.nodes[slice_on_idx + 1:]:
|
|
if node.get('type') == 'EFFECT':
|
|
config = node.get('config', {})
|
|
if config.get('effect') == 'rotate':
|
|
angle_binding = config.get('angle', {})
|
|
if isinstance(angle_binding, dict) and angle_binding.get('_binding'):
|
|
self._final_rotate_scan_id = angle_binding.get('source')
|
|
return self._final_rotate_scan_id
|
|
|
|
self._final_rotate_scan_id = None
|
|
return None
|
|
|
|
def get_final_effects(self, energy: float) -> Dict:
|
|
"""Get final composition effects (compositor interface)."""
|
|
# Get named scans
|
|
scan_values = self.get_all_scan_values()
|
|
|
|
# Whole spin - get from the specific scan bound to final rotate effect
|
|
whole_spin = 0
|
|
final_rotate_scan_id = self._get_final_rotate_scan_id()
|
|
if final_rotate_scan_id and final_rotate_scan_id in self.scan_outputs:
|
|
val = self.scan_outputs[final_rotate_scan_id]
|
|
if isinstance(val, dict) and 'angle' in val:
|
|
whole_spin = val['angle']
|
|
elif isinstance(val, (int, float)):
|
|
whole_spin = val
|
|
|
|
# Ripple
|
|
ripple_gate = scan_values.get('ripple-gate', 0)
|
|
ripple_cx = scan_values.get('ripple-cx', 0.5)
|
|
ripple_cy = scan_values.get('ripple-cy', 0.5)
|
|
|
|
if isinstance(ripple_gate, dict):
|
|
ripple_gate = ripple_gate.get('gate', 0) if 'gate' in ripple_gate else 1
|
|
|
|
return {
|
|
"whole_spin_angle": whole_spin,
|
|
"ripple_amplitude": ripple_gate * (5 + energy * 45),
|
|
"ripple_cx": ripple_cx if isinstance(ripple_cx, (int, float)) else 0.5,
|
|
"ripple_cy": ripple_cy if isinstance(ripple_cy, (int, float)) else 0.5,
|
|
}
|