""" S-Expression Effect Interpreter Interprets effect definitions written in S-expressions. Only allows safe primitives - no arbitrary code execution. """ import numpy as np from typing import Any, Dict, List, Optional, Callable from pathlib import Path from .parser import Symbol, Keyword, parse, parse_file from .primitives import PRIMITIVES, reset_rng class Environment: """Lexical environment for variable bindings.""" def __init__(self, parent: 'Environment' = None): self.bindings: Dict[str, Any] = {} self.parent = parent def get(self, name: str) -> Any: if name in self.bindings: return self.bindings[name] if self.parent: return self.parent.get(name) raise NameError(f"Undefined variable: {name}") def set(self, name: str, value: Any): self.bindings[name] = value def has(self, name: str) -> bool: if name in self.bindings: return True if self.parent: return self.parent.has(name) return False class Lambda: """A user-defined function (lambda).""" def __init__(self, params: List[str], body: Any, env: Environment): self.params = params self.body = body self.env = env # Closure environment def __repr__(self): return f"" class EffectDefinition: """A parsed effect definition.""" def __init__(self, name: str, params: Dict[str, Any], body: Any): self.name = name self.params = params # {name: (type, default)} self.body = body def __repr__(self): return f"" class Interpreter: """ S-Expression interpreter for effects. Provides a safe execution environment where only whitelisted primitives can be called. """ def __init__(self): # Base environment with primitives self.global_env = Environment() # Load primitives for name, fn in PRIMITIVES.items(): self.global_env.set(name, fn) # Special values self.global_env.set('true', True) self.global_env.set('false', False) self.global_env.set('nil', None) # Loaded effect definitions self.effects: Dict[str, EffectDefinition] = {} def eval(self, expr: Any, env: Environment = None) -> Any: """Evaluate an S-expression.""" if env is None: env = self.global_env # Atoms if isinstance(expr, (int, float, str, bool)): return expr if expr is None: return None if isinstance(expr, Symbol): return env.get(expr.name) if isinstance(expr, Keyword): return expr # Keywords evaluate to themselves if isinstance(expr, np.ndarray): return expr # Images pass through # Lists (function calls / special forms) if isinstance(expr, list): if not expr: return [] head = expr[0] # Special forms if isinstance(head, Symbol): form = head.name # Quote if form == 'quote': return expr[1] # Define if form == 'define': name = expr[1] if isinstance(name, Symbol): value = self.eval(expr[2], env) self.global_env.set(name.name, value) return value else: raise SyntaxError(f"define requires symbol, got {name}") # Define-effect if form == 'define-effect': return self._define_effect(expr, env) # Lambda if form == 'lambda' or form == 'λ': params = [p.name if isinstance(p, Symbol) else p for p in expr[1]] body = expr[2] return Lambda(params, body, env) # Let if form == 'let': return self._eval_let(expr, env) # Let* if form == 'let*': return self._eval_let_star(expr, env) # If if form == 'if': cond = self.eval(expr[1], env) if cond: return self.eval(expr[2], env) elif len(expr) > 3: return self.eval(expr[3], env) return None # Cond if form == 'cond': return self._eval_cond(expr, env) # And if form == 'and': result = True for e in expr[1:]: result = self.eval(e, env) if not result: return False return result # Or if form == 'or': for e in expr[1:]: result = self.eval(e, env) if result: return result return False # Not if form == 'not': return not self.eval(expr[1], env) # Begin (sequence) if form == 'begin': result = None for e in expr[1:]: result = self.eval(e, env) return result # Thread-first macro: (-> x (f a) (g b)) => (g (f x a) b) if form == '->': result = self.eval(expr[1], env) for form_expr in expr[2:]: if isinstance(form_expr, list): # Insert result as first arg: (f a b) => (f result a b) result = self.eval([form_expr[0], result] + form_expr[1:], env) else: # Just a symbol: f => (f result) result = self.eval([form_expr, result], env) return result # Set! (mutation) if form == 'set!': name = expr[1].name if isinstance(expr[1], Symbol) else expr[1] value = self.eval(expr[2], env) # Find and update in appropriate scope scope = env while scope: if name in scope.bindings: scope.bindings[name] = value return value scope = scope.parent raise NameError(f"Cannot set undefined variable: {name}") # State-get / state-set (for effect state) if form == 'state-get': state = env.get('__state__') key = self.eval(expr[1], env) if isinstance(key, Symbol): key = key.name default = self.eval(expr[2], env) if len(expr) > 2 else None return state.get(key, default) if form == 'state-set': state = env.get('__state__') key = self.eval(expr[1], env) if isinstance(key, Symbol): key = key.name value = self.eval(expr[2], env) state[key] = value return value # Function call fn = self.eval(head, env) args = [self.eval(arg, env) for arg in expr[1:]] # Handle keyword arguments pos_args = [] kw_args = {} i = 0 while i < len(args): if isinstance(args[i], Keyword): kw_args[args[i].name] = args[i + 1] if i + 1 < len(args) else None i += 2 else: pos_args.append(args[i]) i += 1 return self._apply(fn, pos_args, kw_args, env) raise TypeError(f"Cannot evaluate: {expr}") def _wrap_lambda(self, lam: 'Lambda') -> Callable: """Wrap a Lambda in a Python callable for use by primitives.""" def wrapper(*args): new_env = Environment(lam.env) for i, param in enumerate(lam.params): if i < len(args): new_env.set(param, args[i]) else: new_env.set(param, None) return self.eval(lam.body, new_env) return wrapper def _apply(self, fn: Any, args: List[Any], kwargs: Dict[str, Any], env: Environment) -> Any: """Apply a function to arguments.""" if isinstance(fn, Lambda): # User-defined function new_env = Environment(fn.env) for i, param in enumerate(fn.params): if i < len(args): new_env.set(param, args[i]) else: new_env.set(param, None) return self.eval(fn.body, new_env) elif callable(fn): # Wrap any Lambda arguments so primitives can call them wrapped_args = [] for arg in args: if isinstance(arg, Lambda): wrapped_args.append(self._wrap_lambda(arg)) else: wrapped_args.append(arg) # Primitive function if kwargs: return fn(*wrapped_args, **kwargs) return fn(*wrapped_args) else: raise TypeError(f"Cannot call: {fn}") def _parse_bindings(self, bindings: list) -> list: """Parse bindings in either Scheme or Clojure style. Scheme: ((x 1) (y 2)) -> [(x, 1), (y, 2)] Clojure: [x 1 y 2] -> [(x, 1), (y, 2)] """ if not bindings: return [] # Check if Clojure style (flat list with symbols and values alternating) if isinstance(bindings[0], Symbol): # Clojure style: [x 1 y 2] pairs = [] i = 0 while i < len(bindings) - 1: name = bindings[i].name if isinstance(bindings[i], Symbol) else bindings[i] value = bindings[i + 1] pairs.append((name, value)) i += 2 return pairs else: # Scheme style: ((x 1) (y 2)) pairs = [] for binding in bindings: name = binding[0].name if isinstance(binding[0], Symbol) else binding[0] value = binding[1] pairs.append((name, value)) return pairs def _eval_let(self, expr: Any, env: Environment) -> Any: """Evaluate let expression: (let ((x 1) (y 2)) body) or (let [x 1 y 2] body) Note: Uses sequential binding (like Clojure let / Scheme let*) so each binding can reference previous bindings. """ bindings = expr[1] body = expr[2] new_env = Environment(env) for name, value_expr in self._parse_bindings(bindings): value = self.eval(value_expr, new_env) # Sequential: can see previous bindings new_env.set(name, value) return self.eval(body, new_env) def _eval_let_star(self, expr: Any, env: Environment) -> Any: """Evaluate let* expression: sequential bindings.""" bindings = expr[1] body = expr[2] new_env = Environment(env) for name, value_expr in self._parse_bindings(bindings): value = self.eval(value_expr, new_env) # Evaluate in current env new_env.set(name, value) return self.eval(body, new_env) def _eval_cond(self, expr: Any, env: Environment) -> Any: """Evaluate cond expression.""" for clause in expr[1:]: test = clause[0] if isinstance(test, Symbol) and test.name == 'else': return self.eval(clause[1], env) if self.eval(test, env): return self.eval(clause[1], env) return None def _define_effect(self, expr: Any, env: Environment) -> EffectDefinition: """ Parse effect definition: (define-effect name ((param1 default1) (param2 default2) ...) body) """ name = expr[1].name if isinstance(expr[1], Symbol) else expr[1] params_list = expr[2] if len(expr) > 2 else [] body = expr[3] if len(expr) > 3 else expr[2] # Parse parameters params = {} if isinstance(params_list, list): for p in params_list: if isinstance(p, list) and len(p) >= 2: pname = p[0].name if isinstance(p[0], Symbol) else p[0] pdefault = p[1] params[pname] = pdefault elif isinstance(p, Symbol): params[p.name] = None effect = EffectDefinition(name, params, body) self.effects[name] = effect return effect def load_effect(self, path: str) -> EffectDefinition: """Load an effect definition from a .sexp file.""" expr = parse_file(path) # Handle multiple top-level expressions if isinstance(expr, list) and expr and isinstance(expr[0], list): for e in expr: self.eval(e) else: self.eval(expr) # Return the last defined effect if self.effects: return list(self.effects.values())[-1] return None def run_effect(self, name: str, frame, params: Dict[str, Any], state: Dict[str, Any]) -> tuple: """ Run an effect on frame(s). Args: name: Effect name frame: Input frame (H, W, 3) RGB uint8, or list of frames for multi-input params: Effect parameters (overrides defaults) state: Persistent state dict Returns: (output_frame, new_state) """ if name not in self.effects: raise ValueError(f"Unknown effect: {name}") effect = self.effects[name] # Create environment for this run env = Environment(self.global_env) # Bind frame(s) - support both single frame and list of frames if isinstance(frame, list): # Multi-input effect frames = frame env.set('frame', frames[0] if frames else None) # Backwards compat env.set('inputs', frames) # Named frame bindings for i, f in enumerate(frames): env.set(f'frame-{chr(ord("a") + i)}', f) # frame-a, frame-b, etc. else: # Single-input effect env.set('frame', frame) # Bind state if state is None: state = {} env.set('__state__', state) # Bind parameters (defaults + overrides) for pname, pdefault in effect.params.items(): value = params.get(pname) if value is None: # Evaluate default if it's an expression (list) if isinstance(pdefault, list): value = self.eval(pdefault, env) else: value = pdefault env.set(pname, value) # Reset RNG with seed if provided seed = params.get('seed', 42) reset_rng(int(seed)) # Bind time if provided time_val = params.get('_time', 0) env.set('t', time_val) env.set('_time', time_val) # Evaluate body result = self.eval(effect.body, env) # Ensure result is an image if not isinstance(result, np.ndarray): result = frame return result, state # ============================================================================= # Convenience Functions # ============================================================================= _interpreter = None def get_interpreter() -> Interpreter: """Get or create the global interpreter.""" global _interpreter if _interpreter is None: _interpreter = Interpreter() return _interpreter def load_effect(path: str) -> EffectDefinition: """Load an effect from a .sexp file.""" return get_interpreter().load_effect(path) def load_effects_dir(directory: str): """Load all .sexp effects from a directory.""" interp = get_interpreter() dir_path = Path(directory) for path in dir_path.glob('*.sexp'): try: interp.load_effect(str(path)) except Exception as e: print(f"Warning: Failed to load {path}: {e}") def run_effect(name: str, frame: np.ndarray, params: Dict[str, Any], state: Dict[str, Any] = None) -> tuple: """Run an effect.""" return get_interpreter().run_effect(name, frame, params, state or {}) def list_effects() -> List[str]: """List loaded effect names.""" return list(get_interpreter().effects.keys()) # ============================================================================= # Adapter for existing effect system # ============================================================================= def make_process_frame(effect_path: str) -> Callable: """ Create a process_frame function from a .sexp effect. This allows S-expression effects to be used with the existing effect system. """ interp = get_interpreter() interp.load_effect(effect_path) effect_name = Path(effect_path).stem def process_frame(frame: np.ndarray, params: dict, state: dict) -> tuple: return interp.run_effect(effect_name, frame, params, state) return process_frame