""" S-Expression Effect Interpreter Interprets effect definitions written in S-expressions. Only allows safe primitives - no arbitrary code execution. """ import numpy as np from typing import Any, Dict, List, Optional, Callable from pathlib import Path from .parser import Symbol, Keyword, parse, parse_file from .primitives import PRIMITIVES, reset_rng def _is_symbol(x) -> bool: """Check if x is a Symbol (duck typing to support multiple Symbol classes).""" return hasattr(x, 'name') and type(x).__name__ == 'Symbol' def _is_keyword(x) -> bool: """Check if x is a Keyword (duck typing to support multiple Keyword classes).""" return hasattr(x, 'name') and type(x).__name__ == 'Keyword' def _symbol_name(x) -> str: """Get the name from a Symbol.""" return x.name if hasattr(x, 'name') else str(x) class Environment: """Lexical environment for variable bindings.""" def __init__(self, parent: 'Environment' = None): self.bindings: Dict[str, Any] = {} self.parent = parent def get(self, name: str) -> Any: if name in self.bindings: return self.bindings[name] if self.parent: return self.parent.get(name) raise NameError(f"Undefined variable: {name}") def set(self, name: str, value: Any): self.bindings[name] = value def has(self, name: str) -> bool: if name in self.bindings: return True if self.parent: return self.parent.has(name) return False class Lambda: """A user-defined function (lambda).""" def __init__(self, params: List[str], body: Any, env: Environment): self.params = params self.body = body self.env = env # Closure environment def __repr__(self): return f"" class EffectDefinition: """A parsed effect definition.""" def __init__(self, name: str, params: Dict[str, Any], body: Any): self.name = name self.params = params # {name: (type, default)} self.body = body def __repr__(self): return f"" class Interpreter: """ S-Expression interpreter for effects. Provides a safe execution environment where only whitelisted primitives can be called. Args: minimal_primitives: If True, only load core primitives (arithmetic, comparison, basic data access). Additional primitives must be loaded with (require-primitives) or (with-primitives). If False (default), load all legacy primitives for backward compatibility. """ def __init__(self, minimal_primitives: bool = False): # Base environment with primitives self.global_env = Environment() self.minimal_primitives = minimal_primitives if minimal_primitives: # Load only core primitives from .primitive_libs.core import PRIMITIVES as CORE_PRIMITIVES for name, fn in CORE_PRIMITIVES.items(): self.global_env.set(name, fn) else: # Load all legacy primitives for backward compatibility for name, fn in PRIMITIVES.items(): self.global_env.set(name, fn) # Special values self.global_env.set('true', True) self.global_env.set('false', False) self.global_env.set('nil', None) # Loaded effect definitions self.effects: Dict[str, EffectDefinition] = {} def eval(self, expr: Any, env: Environment = None) -> Any: """Evaluate an S-expression.""" if env is None: env = self.global_env # Atoms if isinstance(expr, (int, float, str, bool)): return expr if expr is None: return None # Handle Symbol (duck typing to support both sexp_effects.parser.Symbol and artdag.sexp.parser.Symbol) if _is_symbol(expr): return env.get(expr.name) # Handle Keyword (duck typing) if _is_keyword(expr): return expr # Keywords evaluate to themselves if isinstance(expr, np.ndarray): return expr # Images pass through # Lists (function calls / special forms) if isinstance(expr, list): if not expr: return [] head = expr[0] # Special forms if _is_symbol(head): form = head.name # Quote if form == 'quote': return expr[1] # Define if form == 'define': name = expr[1] if _is_symbol(name): # Simple define: (define name value) value = self.eval(expr[2], env) self.global_env.set(name.name, value) return value elif isinstance(name, list) and len(name) >= 1 and _is_symbol(name[0]): # Function define: (define (fn-name args...) body) # Desugars to: (define fn-name (lambda (args...) body)) fn_name = name[0].name params = [p.name if _is_symbol(p) else p for p in name[1:]] body = expr[2] fn = Lambda(params, body, env) self.global_env.set(fn_name, fn) return fn else: raise SyntaxError(f"define requires symbol or (name args...), got {name}") # Define-effect if form == 'define-effect': return self._define_effect(expr, env) # Lambda if form == 'lambda' or form == 'λ': params = [p.name if _is_symbol(p) else p for p in expr[1]] body = expr[2] return Lambda(params, body, env) # Let if form == 'let': return self._eval_let(expr, env) # Let* if form == 'let*': return self._eval_let_star(expr, env) # If if form == 'if': cond = self.eval(expr[1], env) if cond: return self.eval(expr[2], env) elif len(expr) > 3: return self.eval(expr[3], env) return None # Cond if form == 'cond': return self._eval_cond(expr, env) # And if form == 'and': result = True for e in expr[1:]: result = self.eval(e, env) if not result: return False return result # Or if form == 'or': for e in expr[1:]: result = self.eval(e, env) if result: return result return False # Not if form == 'not': return not self.eval(expr[1], env) # Begin (sequence) if form == 'begin': result = None for e in expr[1:]: result = self.eval(e, env) return result # Thread-first macro: (-> x (f a) (g b)) => (g (f x a) b) if form == '->': result = self.eval(expr[1], env) for form_expr in expr[2:]: if isinstance(form_expr, list): # Insert result as first arg: (f a b) => (f result a b) result = self.eval([form_expr[0], result] + form_expr[1:], env) else: # Just a symbol: f => (f result) result = self.eval([form_expr, result], env) return result # Set! (mutation) if form == 'set!': name = expr[1].name if _is_symbol(expr[1]) else expr[1] value = self.eval(expr[2], env) # Find and update in appropriate scope scope = env while scope: if name in scope.bindings: scope.bindings[name] = value return value scope = scope.parent raise NameError(f"Cannot set undefined variable: {name}") # State-get / state-set (for effect state) if form == 'state-get': state = env.get('__state__') key = self.eval(expr[1], env) if _is_symbol(key): key = key.name default = self.eval(expr[2], env) if len(expr) > 2 else None return state.get(key, default) if form == 'state-set': state = env.get('__state__') key = self.eval(expr[1], env) if _is_symbol(key): key = key.name value = self.eval(expr[2], env) state[key] = value return value # ascii-fx-zone special form - delays evaluation of expression parameters if form == 'ascii-fx-zone': return self._eval_ascii_fx_zone(expr, env) # with-primitives - load primitive library and scope to body if form == 'with-primitives': return self._eval_with_primitives(expr, env) # require-primitives - load primitive library into current scope if form == 'require-primitives': return self._eval_require_primitives(expr, env) # require - load .sexp file into current scope if form == 'require': return self._eval_require(expr, env) # Function call fn = self.eval(head, env) args = [self.eval(arg, env) for arg in expr[1:]] # Handle keyword arguments pos_args = [] kw_args = {} i = 0 while i < len(args): if _is_keyword(args[i]): kw_args[args[i].name] = args[i + 1] if i + 1 < len(args) else None i += 2 else: pos_args.append(args[i]) i += 1 return self._apply(fn, pos_args, kw_args, env) raise TypeError(f"Cannot evaluate: {expr}") def _wrap_lambda(self, lam: 'Lambda') -> Callable: """Wrap a Lambda in a Python callable for use by primitives.""" def wrapper(*args): new_env = Environment(lam.env) for i, param in enumerate(lam.params): if i < len(args): new_env.set(param, args[i]) else: new_env.set(param, None) return self.eval(lam.body, new_env) return wrapper def _apply(self, fn: Any, args: List[Any], kwargs: Dict[str, Any], env: Environment) -> Any: """Apply a function to arguments.""" if isinstance(fn, Lambda): # User-defined function new_env = Environment(fn.env) for i, param in enumerate(fn.params): if i < len(args): new_env.set(param, args[i]) else: new_env.set(param, None) return self.eval(fn.body, new_env) elif callable(fn): # Wrap any Lambda arguments so primitives can call them wrapped_args = [] for arg in args: if isinstance(arg, Lambda): wrapped_args.append(self._wrap_lambda(arg)) else: wrapped_args.append(arg) # Inject _interp and _env for primitives that need them import inspect try: sig = inspect.signature(fn) params = sig.parameters if '_interp' in params and '_interp' not in kwargs: kwargs['_interp'] = self if '_env' in params and '_env' not in kwargs: kwargs['_env'] = env except (ValueError, TypeError): # Some built-in functions don't have inspectable signatures pass # Primitive function if kwargs: return fn(*wrapped_args, **kwargs) return fn(*wrapped_args) else: raise TypeError(f"Cannot call: {fn}") def _parse_bindings(self, bindings: list) -> list: """Parse bindings in either Scheme or Clojure style. Scheme: ((x 1) (y 2)) -> [(x, 1), (y, 2)] Clojure: [x 1 y 2] -> [(x, 1), (y, 2)] """ if not bindings: return [] # Check if Clojure style (flat list with symbols and values alternating) if _is_symbol(bindings[0]): # Clojure style: [x 1 y 2] pairs = [] i = 0 while i < len(bindings) - 1: name = bindings[i].name if _is_symbol(bindings[i]) else bindings[i] value = bindings[i + 1] pairs.append((name, value)) i += 2 return pairs else: # Scheme style: ((x 1) (y 2)) pairs = [] for binding in bindings: name = binding[0].name if _is_symbol(binding[0]) else binding[0] value = binding[1] pairs.append((name, value)) return pairs def _eval_let(self, expr: Any, env: Environment) -> Any: """Evaluate let expression: (let ((x 1) (y 2)) body) or (let [x 1 y 2] body) Note: Uses sequential binding (like Clojure let / Scheme let*) so each binding can reference previous bindings. """ bindings = expr[1] body = expr[2] new_env = Environment(env) for name, value_expr in self._parse_bindings(bindings): value = self.eval(value_expr, new_env) # Sequential: can see previous bindings new_env.set(name, value) return self.eval(body, new_env) def _eval_let_star(self, expr: Any, env: Environment) -> Any: """Evaluate let* expression: sequential bindings.""" bindings = expr[1] body = expr[2] new_env = Environment(env) for name, value_expr in self._parse_bindings(bindings): value = self.eval(value_expr, new_env) # Evaluate in current env new_env.set(name, value) return self.eval(body, new_env) def _eval_cond(self, expr: Any, env: Environment) -> Any: """Evaluate cond expression.""" for clause in expr[1:]: test = clause[0] if _is_symbol(test) and test.name == 'else': return self.eval(clause[1], env) if self.eval(test, env): return self.eval(clause[1], env) return None def _eval_with_primitives(self, expr: Any, env: Environment) -> Any: """ Evaluate with-primitives: scoped primitive library loading. Syntax: (with-primitives "math" (sin (* x pi))) (with-primitives "math" :path "custom/math.py" body) The primitives from the library are only available within the body. """ # Parse library name and optional path lib_name = expr[1] if _is_symbol(lib_name): lib_name = lib_name.name path = None body_start = 2 # Check for :path keyword if len(expr) > 2 and _is_keyword(expr[2]) and expr[2].name == 'path': path = expr[3] body_start = 4 # Load the primitive library primitives = self.load_primitive_library(lib_name, path) # Create new environment with primitives new_env = Environment(env) for name, fn in primitives.items(): new_env.set(name, fn) # Evaluate body in new environment result = None for e in expr[body_start:]: result = self.eval(e, new_env) return result def _eval_require_primitives(self, expr: Any, env: Environment) -> Any: """ Evaluate require-primitives: load primitives into current scope. Syntax: (require-primitives "math" "color" "filters") Unlike with-primitives, this loads into the current environment (typically used at top-level to set up an effect's dependencies). """ for lib_expr in expr[1:]: if _is_symbol(lib_expr): lib_name = lib_expr.name else: lib_name = lib_expr primitives = self.load_primitive_library(lib_name) for name, fn in primitives.items(): env.set(name, fn) return None def load_primitive_library(self, name: str, path: str = None) -> dict: """ Load a primitive library by name or path. Returns dict of {name: function}. """ from .primitive_libs import load_primitive_library return load_primitive_library(name, path) def _eval_require(self, expr: Any, env: Environment) -> Any: """ Evaluate require: load a .sexp file and evaluate its definitions. Syntax: (require "derived") ; loads derived.sexp from sexp_effects/ (require "path/to/file.sexp") ; loads from explicit path Definitions from the file are added to the current environment. """ for lib_expr in expr[1:]: if _is_symbol(lib_expr): lib_name = lib_expr.name else: lib_name = lib_expr # Find the .sexp file sexp_path = self._find_sexp_file(lib_name) if sexp_path is None: raise ValueError(f"Cannot find sexp file: {lib_name}") # Parse and evaluate the file content = parse_file(sexp_path) # Evaluate all top-level expressions if isinstance(content, list) and content and isinstance(content[0], list): for e in content: self.eval(e, env) else: self.eval(content, env) return None def _find_sexp_file(self, name: str) -> Optional[str]: """Find a .sexp file by name.""" # Try various locations candidates = [ # Explicit path name, name + '.sexp', # In sexp_effects directory Path(__file__).parent / f'{name}.sexp', Path(__file__).parent / name, # In effects directory Path(__file__).parent / 'effects' / f'{name}.sexp', Path(__file__).parent / 'effects' / name, ] for path in candidates: p = Path(path) if not isinstance(path, Path) else path if p.exists() and p.is_file(): return str(p) return None def _eval_ascii_fx_zone(self, expr: Any, env: Environment) -> Any: """ Evaluate ascii-fx-zone special form. Syntax: (ascii-fx-zone frame :cols 80 :alphabet "standard" :color_mode "color" :background "black" :contrast 1.5 :char_hue ;; NOT evaluated - passed to primitive :char_saturation :char_brightness :char_scale :char_rotation :char_jitter ) The expression parameters (:char_hue, etc.) are NOT pre-evaluated. They are passed as raw S-expressions to the primitive which evaluates them per-zone with zone context variables injected. Requires: (require-primitives "ascii") """ # Look up ascii-fx-zone primitive from environment # It must be loaded via (require-primitives "ascii") try: prim_ascii_fx_zone = env.get('ascii-fx-zone') except NameError: raise NameError( "ascii-fx-zone primitive not found. " "Add (require-primitives \"ascii\") to your effect file." ) # Expression parameter names that should NOT be evaluated expr_params = {'char_hue', 'char_saturation', 'char_brightness', 'char_scale', 'char_rotation', 'char_jitter', 'cell_effect'} # Parse arguments frame = self.eval(expr[1], env) # First arg is always the frame # Defaults cols = 80 char_size = None # If set, overrides cols alphabet = "standard" color_mode = "color" background = "black" contrast = 1.5 char_hue = None char_saturation = None char_brightness = None char_scale = None char_rotation = None char_jitter = None cell_effect = None # Lambda for arbitrary per-cell effects # Convenience params for staged recipes energy = None rotation_scale = 0 # Extra params to pass to zone dict for lambdas extra_params = {} # Parse keyword arguments i = 2 while i < len(expr): item = expr[i] if _is_keyword(item): if i + 1 >= len(expr): break value_expr = expr[i + 1] kw_name = item.name if kw_name in expr_params: # Resolve symbol references but don't evaluate expressions # This handles the case where effect definition passes a param like :char_hue char_hue resolved = value_expr if _is_symbol(value_expr): try: resolved = env.get(value_expr.name) except NameError: resolved = value_expr # Keep as symbol if not found if kw_name == 'char_hue': char_hue = resolved elif kw_name == 'char_saturation': char_saturation = resolved elif kw_name == 'char_brightness': char_brightness = resolved elif kw_name == 'char_scale': char_scale = resolved elif kw_name == 'char_rotation': char_rotation = resolved elif kw_name == 'char_jitter': char_jitter = resolved elif kw_name == 'cell_effect': cell_effect = resolved else: # Evaluate normally value = self.eval(value_expr, env) if kw_name == 'cols': cols = int(value) elif kw_name == 'char_size': # Handle nil/None values if value is None or (_is_symbol(value) and value.name == 'nil'): char_size = None else: char_size = int(value) elif kw_name == 'alphabet': alphabet = str(value) elif kw_name == 'color_mode': color_mode = str(value) elif kw_name == 'background': background = str(value) elif kw_name == 'contrast': contrast = float(value) elif kw_name == 'energy': if value is None or (_is_symbol(value) and value.name == 'nil'): energy = None else: energy = float(value) elif kw_name == 'rotation_scale': rotation_scale = float(value) else: # Store any other params for lambdas to access extra_params[kw_name] = value i += 2 else: i += 1 # If energy and rotation_scale provided, build rotation expression # rotation = energy * rotation_scale * position_factor # position_factor: bottom-left=0, top-right=3 # Formula: 1.5 * (zone-col-norm + (1 - zone-row-norm)) if energy is not None and rotation_scale > 0: # Build expression as S-expression list that will be evaluated per-zone # (* (* energy rotation_scale) (* 1.5 (+ zone-col-norm (- 1 zone-row-norm)))) energy_times_scale = energy * rotation_scale # The position part uses zone variables, so we build it as an expression char_rotation = [ Symbol('*'), energy_times_scale, [Symbol('*'), 1.5, [Symbol('+'), Symbol('zone-col-norm'), [Symbol('-'), 1, Symbol('zone-row-norm')]]] ] # Pull any extra params from environment that aren't standard params # These are typically passed from recipes for use in cell_effect lambdas standard_params = { 'cols', 'char_size', 'alphabet', 'color_mode', 'background', 'contrast', 'char_hue', 'char_saturation', 'char_brightness', 'char_scale', 'char_rotation', 'char_jitter', 'cell_effect', 'energy', 'rotation_scale', 'frame', 't', '_time', '__state__', '__interp__', 'true', 'false', 'nil' } # Check environment for extra bindings current_env = env while current_env is not None: for k, v in current_env.bindings.items(): if k not in standard_params and k not in extra_params and not callable(v): # Add non-standard, non-callable bindings to extra_params if isinstance(v, (int, float, str, bool)) or v is None: extra_params[k] = v current_env = current_env.parent # Call the primitive with interpreter and env for expression evaluation return prim_ascii_fx_zone( frame, cols=cols, char_size=char_size, alphabet=alphabet, color_mode=color_mode, background=background, contrast=contrast, char_hue=char_hue, char_saturation=char_saturation, char_brightness=char_brightness, char_scale=char_scale, char_rotation=char_rotation, char_jitter=char_jitter, cell_effect=cell_effect, energy=energy, rotation_scale=rotation_scale, _interp=self, _env=env, **extra_params ) def _define_effect(self, expr: Any, env: Environment) -> EffectDefinition: """ Parse effect definition. Required syntax: (define-effect name :params ( (param1 :type int :default 8 :desc "description") ) body) Effects MUST use :params syntax. Legacy ((param default) ...) is not supported. """ name = expr[1].name if _is_symbol(expr[1]) else expr[1] params = {} body = None found_params = False # Parse :params and body i = 2 while i < len(expr): item = expr[i] if _is_keyword(item) and item.name == "params": # :params syntax if i + 1 >= len(expr): raise SyntaxError(f"Effect '{name}': Missing params list after :params keyword") params_list = expr[i + 1] params = self._parse_params_block(params_list) found_params = True i += 2 elif _is_keyword(item): # Skip other keywords (like :desc) i += 2 elif body is None: # First non-keyword item is the body if isinstance(item, list) and item: first_elem = item[0] # Check for legacy syntax and reject it if isinstance(first_elem, list) and len(first_elem) >= 2: raise SyntaxError( f"Effect '{name}': Legacy parameter syntax ((name default) ...) is not supported. " f"Use :params block instead." ) body = item i += 1 else: i += 1 if body is None: raise SyntaxError(f"Effect '{name}': No body found") if not found_params: raise SyntaxError( f"Effect '{name}': Missing :params block. " f"For effects with no parameters, use empty :params ()" ) effect = EffectDefinition(name, params, body) self.effects[name] = effect return effect def _parse_params_block(self, params_list: list) -> Dict[str, Any]: """ Parse :params block syntax: ( (param_name :type int :default 8 :range [4 32] :desc "description") ) """ params = {} for param_def in params_list: if not isinstance(param_def, list) or len(param_def) < 1: continue # First element is the parameter name first = param_def[0] if _is_symbol(first): param_name = first.name elif isinstance(first, str): param_name = first else: continue # Parse keyword arguments default = None i = 1 while i < len(param_def): item = param_def[i] if _is_keyword(item): if i + 1 >= len(param_def): break kw_value = param_def[i + 1] if item.name == "default": default = kw_value i += 2 else: i += 1 params[param_name] = default return params def load_effect(self, path: str) -> EffectDefinition: """Load an effect definition from a .sexp file.""" expr = parse_file(path) # Handle multiple top-level expressions if isinstance(expr, list) and expr and isinstance(expr[0], list): for e in expr: self.eval(e) else: self.eval(expr) # Return the last defined effect if self.effects: return list(self.effects.values())[-1] return None def load_effect_from_string(self, sexp_content: str, effect_name: str = None) -> EffectDefinition: """Load an effect definition from an S-expression string. Args: sexp_content: The S-expression content as a string effect_name: Optional name hint (used if effect doesn't define its own name) Returns: The loaded EffectDefinition """ expr = parse(sexp_content) # Handle multiple top-level expressions if isinstance(expr, list) and expr and isinstance(expr[0], list): for e in expr: self.eval(e) else: self.eval(expr) # Return the effect if we can find it by name if effect_name and effect_name in self.effects: return self.effects[effect_name] # Return the most recently loaded effect if self.effects: return list(self.effects.values())[-1] return None def run_effect(self, name: str, frame, params: Dict[str, Any], state: Dict[str, Any]) -> tuple: """ Run an effect on frame(s). Args: name: Effect name frame: Input frame (H, W, 3) RGB uint8, or list of frames for multi-input params: Effect parameters (overrides defaults) state: Persistent state dict Returns: (output_frame, new_state) """ if name not in self.effects: raise ValueError(f"Unknown effect: {name}") effect = self.effects[name] # Create environment for this run env = Environment(self.global_env) # Bind frame(s) - support both single frame and list of frames if isinstance(frame, list): # Multi-input effect frames = frame env.set('frame', frames[0] if frames else None) # Backwards compat env.set('inputs', frames) # Named frame bindings for i, f in enumerate(frames): env.set(f'frame-{chr(ord("a") + i)}', f) # frame-a, frame-b, etc. else: # Single-input effect env.set('frame', frame) # Bind state if state is None: state = {} env.set('__state__', state) # Validate that all provided params are known (except internal params) # Extra params are allowed and will be passed through to cell_effect lambdas known_params = set(effect.params.keys()) internal_params = {'_time', 'seed', '_binding', 'effect', 'cid', 'hash', 'effect_path'} extra_effect_params = {} # Unknown params passed through for cell_effect lambdas for k in params.keys(): if k not in known_params and k not in internal_params: # Allow unknown params - they'll be passed to cell_effect lambdas via zone dict extra_effect_params[k] = params[k] # Bind parameters (defaults + overrides) for pname, pdefault in effect.params.items(): value = params.get(pname) if value is None: # Evaluate default if it's an expression (list) or a symbol (like 'nil') if isinstance(pdefault, list) or _is_symbol(pdefault): value = self.eval(pdefault, env) else: value = pdefault env.set(pname, value) # Bind extra params (unknown params passed through for cell_effect lambdas) for k, v in extra_effect_params.items(): env.set(k, v) # Reset RNG with seed if provided seed = params.get('seed', 42) reset_rng(int(seed)) # Bind time if provided time_val = params.get('_time', 0) env.set('t', time_val) env.set('_time', time_val) # Evaluate body result = self.eval(effect.body, env) # Ensure result is an image if not isinstance(result, np.ndarray): result = frame return result, state def eval_with_zone(self, expr, env: Environment, zone) -> Any: """ Evaluate expression with zone-* variables injected. Args: expr: Expression to evaluate (S-expression) env: Parent environment with bound values zone: ZoneContext object with cell data Zone variables injected: zone-row, zone-col: Grid position (integers) zone-row-norm, zone-col-norm: Normalized position (0-1) zone-lum: Cell luminance (0-1) zone-sat: Cell saturation (0-1) zone-hue: Cell hue (0-360) zone-r, zone-g, zone-b: RGB components (0-1) Returns: Evaluated result (typically a number) """ # Create child environment with zone variables zone_env = Environment(env) zone_env.set('zone-row', zone.row) zone_env.set('zone-col', zone.col) zone_env.set('zone-row-norm', zone.row_norm) zone_env.set('zone-col-norm', zone.col_norm) zone_env.set('zone-lum', zone.luminance) zone_env.set('zone-sat', zone.saturation) zone_env.set('zone-hue', zone.hue) zone_env.set('zone-r', zone.r) zone_env.set('zone-g', zone.g) zone_env.set('zone-b', zone.b) return self.eval(expr, zone_env) # ============================================================================= # Convenience Functions # ============================================================================= _interpreter = None _interpreter_minimal = None def get_interpreter(minimal_primitives: bool = False) -> Interpreter: """Get or create the global interpreter. Args: minimal_primitives: If True, return interpreter with only core primitives. Additional primitives must be loaded with require-primitives or with-primitives. """ global _interpreter, _interpreter_minimal if minimal_primitives: if _interpreter_minimal is None: _interpreter_minimal = Interpreter(minimal_primitives=True) return _interpreter_minimal else: if _interpreter is None: _interpreter = Interpreter(minimal_primitives=False) return _interpreter def load_effect(path: str) -> EffectDefinition: """Load an effect from a .sexp file.""" return get_interpreter().load_effect(path) def load_effects_dir(directory: str): """Load all .sexp effects from a directory.""" interp = get_interpreter() dir_path = Path(directory) for path in dir_path.glob('*.sexp'): try: interp.load_effect(str(path)) except Exception as e: print(f"Warning: Failed to load {path}: {e}") def run_effect(name: str, frame: np.ndarray, params: Dict[str, Any], state: Dict[str, Any] = None) -> tuple: """Run an effect.""" return get_interpreter().run_effect(name, frame, params, state or {}) def list_effects() -> List[str]: """List loaded effect names.""" return list(get_interpreter().effects.keys()) # ============================================================================= # Adapter for existing effect system # ============================================================================= def make_process_frame(effect_path: str) -> Callable: """ Create a process_frame function from a .sexp effect. This allows S-expression effects to be used with the existing effect system. """ interp = get_interpreter() interp.load_effect(effect_path) effect_name = Path(effect_path).stem def process_frame(frame: np.ndarray, params: dict, state: dict) -> tuple: return interp.run_effect(effect_name, frame, params, state) return process_frame