""" S-Expression Effect Interpreter Interprets effect definitions written in S-expressions. Only allows safe primitives - no arbitrary code execution. """ import numpy as np from typing import Any, Dict, List, Optional, Callable from pathlib import Path from .parser import Symbol, Keyword, parse, parse_file from .primitives import PRIMITIVES, reset_rng class Environment: """Lexical environment for variable bindings.""" def __init__(self, parent: 'Environment' = None): self.bindings: Dict[str, Any] = {} self.parent = parent def get(self, name: str) -> Any: if name in self.bindings: return self.bindings[name] if self.parent: return self.parent.get(name) raise NameError(f"Undefined variable: {name}") def set(self, name: str, value: Any): self.bindings[name] = value def has(self, name: str) -> bool: if name in self.bindings: return True if self.parent: return self.parent.has(name) return False class Lambda: """A user-defined function (lambda).""" def __init__(self, params: List[str], body: Any, env: Environment): self.params = params self.body = body self.env = env # Closure environment def __repr__(self): return f"" class EffectDefinition: """A parsed effect definition.""" def __init__(self, name: str, params: Dict[str, Any], body: Any): self.name = name self.params = params # {name: (type, default)} self.body = body def __repr__(self): return f"" class Interpreter: """ S-Expression interpreter for effects. Provides a safe execution environment where only whitelisted primitives can be called. """ def __init__(self): # Base environment with primitives self.global_env = Environment() # Load primitives for name, fn in PRIMITIVES.items(): self.global_env.set(name, fn) # Special values self.global_env.set('true', True) self.global_env.set('false', False) self.global_env.set('nil', None) # Loaded effect definitions self.effects: Dict[str, EffectDefinition] = {} def eval(self, expr: Any, env: Environment = None) -> Any: """Evaluate an S-expression.""" if env is None: env = self.global_env # Atoms if isinstance(expr, (int, float, str, bool)): return expr if expr is None: return None if isinstance(expr, Symbol): return env.get(expr.name) if isinstance(expr, Keyword): return expr # Keywords evaluate to themselves if isinstance(expr, np.ndarray): return expr # Images pass through # Lists (function calls / special forms) if isinstance(expr, list): if not expr: return [] head = expr[0] # Special forms if isinstance(head, Symbol): form = head.name # Quote if form == 'quote': return expr[1] # Define if form == 'define': name = expr[1] if isinstance(name, Symbol): value = self.eval(expr[2], env) self.global_env.set(name.name, value) return value else: raise SyntaxError(f"define requires symbol, got {name}") # Define-effect if form == 'define-effect': return self._define_effect(expr, env) # Lambda if form == 'lambda' or form == 'λ': params = [p.name if isinstance(p, Symbol) else p for p in expr[1]] body = expr[2] return Lambda(params, body, env) # Let if form == 'let': return self._eval_let(expr, env) # Let* if form == 'let*': return self._eval_let_star(expr, env) # If if form == 'if': cond = self.eval(expr[1], env) if cond: return self.eval(expr[2], env) elif len(expr) > 3: return self.eval(expr[3], env) return None # Cond if form == 'cond': return self._eval_cond(expr, env) # And if form == 'and': result = True for e in expr[1:]: result = self.eval(e, env) if not result: return False return result # Or if form == 'or': for e in expr[1:]: result = self.eval(e, env) if result: return result return False # Not if form == 'not': return not self.eval(expr[1], env) # Begin (sequence) if form == 'begin': result = None for e in expr[1:]: result = self.eval(e, env) return result # Thread-first macro: (-> x (f a) (g b)) => (g (f x a) b) if form == '->': result = self.eval(expr[1], env) for form_expr in expr[2:]: if isinstance(form_expr, list): # Insert result as first arg: (f a b) => (f result a b) result = self.eval([form_expr[0], result] + form_expr[1:], env) else: # Just a symbol: f => (f result) result = self.eval([form_expr, result], env) return result # Set! (mutation) if form == 'set!': name = expr[1].name if isinstance(expr[1], Symbol) else expr[1] value = self.eval(expr[2], env) # Find and update in appropriate scope scope = env while scope: if name in scope.bindings: scope.bindings[name] = value return value scope = scope.parent raise NameError(f"Cannot set undefined variable: {name}") # State-get / state-set (for effect state) if form == 'state-get': state = env.get('__state__') key = self.eval(expr[1], env) if isinstance(key, Symbol): key = key.name default = self.eval(expr[2], env) if len(expr) > 2 else None return state.get(key, default) if form == 'state-set': state = env.get('__state__') key = self.eval(expr[1], env) if isinstance(key, Symbol): key = key.name value = self.eval(expr[2], env) state[key] = value return value # ascii-fx-zone special form - delays evaluation of expression parameters if form == 'ascii-fx-zone': return self._eval_ascii_fx_zone(expr, env) # Function call fn = self.eval(head, env) args = [self.eval(arg, env) for arg in expr[1:]] # Handle keyword arguments pos_args = [] kw_args = {} i = 0 while i < len(args): if isinstance(args[i], Keyword): kw_args[args[i].name] = args[i + 1] if i + 1 < len(args) else None i += 2 else: pos_args.append(args[i]) i += 1 return self._apply(fn, pos_args, kw_args, env) raise TypeError(f"Cannot evaluate: {expr}") def _wrap_lambda(self, lam: 'Lambda') -> Callable: """Wrap a Lambda in a Python callable for use by primitives.""" def wrapper(*args): new_env = Environment(lam.env) for i, param in enumerate(lam.params): if i < len(args): new_env.set(param, args[i]) else: new_env.set(param, None) return self.eval(lam.body, new_env) return wrapper def _apply(self, fn: Any, args: List[Any], kwargs: Dict[str, Any], env: Environment) -> Any: """Apply a function to arguments.""" if isinstance(fn, Lambda): # User-defined function new_env = Environment(fn.env) for i, param in enumerate(fn.params): if i < len(args): new_env.set(param, args[i]) else: new_env.set(param, None) return self.eval(fn.body, new_env) elif callable(fn): # Wrap any Lambda arguments so primitives can call them wrapped_args = [] for arg in args: if isinstance(arg, Lambda): wrapped_args.append(self._wrap_lambda(arg)) else: wrapped_args.append(arg) # Primitive function if kwargs: return fn(*wrapped_args, **kwargs) return fn(*wrapped_args) else: raise TypeError(f"Cannot call: {fn}") def _parse_bindings(self, bindings: list) -> list: """Parse bindings in either Scheme or Clojure style. Scheme: ((x 1) (y 2)) -> [(x, 1), (y, 2)] Clojure: [x 1 y 2] -> [(x, 1), (y, 2)] """ if not bindings: return [] # Check if Clojure style (flat list with symbols and values alternating) if isinstance(bindings[0], Symbol): # Clojure style: [x 1 y 2] pairs = [] i = 0 while i < len(bindings) - 1: name = bindings[i].name if isinstance(bindings[i], Symbol) else bindings[i] value = bindings[i + 1] pairs.append((name, value)) i += 2 return pairs else: # Scheme style: ((x 1) (y 2)) pairs = [] for binding in bindings: name = binding[0].name if isinstance(binding[0], Symbol) else binding[0] value = binding[1] pairs.append((name, value)) return pairs def _eval_let(self, expr: Any, env: Environment) -> Any: """Evaluate let expression: (let ((x 1) (y 2)) body) or (let [x 1 y 2] body) Note: Uses sequential binding (like Clojure let / Scheme let*) so each binding can reference previous bindings. """ bindings = expr[1] body = expr[2] new_env = Environment(env) for name, value_expr in self._parse_bindings(bindings): value = self.eval(value_expr, new_env) # Sequential: can see previous bindings new_env.set(name, value) return self.eval(body, new_env) def _eval_let_star(self, expr: Any, env: Environment) -> Any: """Evaluate let* expression: sequential bindings.""" bindings = expr[1] body = expr[2] new_env = Environment(env) for name, value_expr in self._parse_bindings(bindings): value = self.eval(value_expr, new_env) # Evaluate in current env new_env.set(name, value) return self.eval(body, new_env) def _eval_cond(self, expr: Any, env: Environment) -> Any: """Evaluate cond expression.""" for clause in expr[1:]: test = clause[0] if isinstance(test, Symbol) and test.name == 'else': return self.eval(clause[1], env) if self.eval(test, env): return self.eval(clause[1], env) return None def _eval_ascii_fx_zone(self, expr: Any, env: Environment) -> Any: """ Evaluate ascii-fx-zone special form. Syntax: (ascii-fx-zone frame :cols 80 :alphabet "standard" :color_mode "color" :background "black" :contrast 1.5 :char_hue ;; NOT evaluated - passed to primitive :char_saturation :char_brightness :char_scale :char_rotation :char_jitter ) The expression parameters (:char_hue, etc.) are NOT pre-evaluated. They are passed as raw S-expressions to the primitive which evaluates them per-zone with zone context variables injected. """ from .primitives import prim_ascii_fx_zone # Expression parameter names that should NOT be evaluated expr_params = {'char_hue', 'char_saturation', 'char_brightness', 'char_scale', 'char_rotation', 'char_jitter', 'cell_effect'} # Parse arguments frame = self.eval(expr[1], env) # First arg is always the frame # Defaults cols = 80 char_size = None # If set, overrides cols alphabet = "standard" color_mode = "color" background = "black" contrast = 1.5 char_hue = None char_saturation = None char_brightness = None char_scale = None char_rotation = None char_jitter = None cell_effect = None # Lambda for arbitrary per-cell effects # Convenience params for staged recipes energy = None rotation_scale = 0 # Extra params to pass to zone dict for lambdas extra_params = {} # Parse keyword arguments i = 2 while i < len(expr): item = expr[i] if isinstance(item, Keyword): if i + 1 >= len(expr): break value_expr = expr[i + 1] kw_name = item.name if kw_name in expr_params: # Resolve symbol references but don't evaluate expressions # This handles the case where effect definition passes a param like :char_hue char_hue resolved = value_expr if isinstance(value_expr, Symbol): try: resolved = env.get(value_expr.name) except NameError: resolved = value_expr # Keep as symbol if not found if kw_name == 'char_hue': char_hue = resolved elif kw_name == 'char_saturation': char_saturation = resolved elif kw_name == 'char_brightness': char_brightness = resolved elif kw_name == 'char_scale': char_scale = resolved elif kw_name == 'char_rotation': char_rotation = resolved elif kw_name == 'char_jitter': char_jitter = resolved elif kw_name == 'cell_effect': cell_effect = resolved else: # Evaluate normally value = self.eval(value_expr, env) if kw_name == 'cols': cols = int(value) elif kw_name == 'char_size': # Handle nil/None values if value is None or (isinstance(value, Symbol) and value.name == 'nil'): char_size = None else: char_size = int(value) elif kw_name == 'alphabet': alphabet = str(value) elif kw_name == 'color_mode': color_mode = str(value) elif kw_name == 'background': background = str(value) elif kw_name == 'contrast': contrast = float(value) elif kw_name == 'energy': if value is None or (isinstance(value, Symbol) and value.name == 'nil'): energy = None else: energy = float(value) extra_params['energy'] = energy elif kw_name == 'rotation_scale': rotation_scale = float(value) extra_params['rotation_scale'] = rotation_scale else: # Store any other params for lambdas to access extra_params[kw_name] = value i += 2 else: i += 1 # If energy and rotation_scale provided, build rotation expression # rotation = energy * rotation_scale * position_factor # position_factor: bottom-left=0, top-right=3 # Formula: 1.5 * (zone-col-norm + (1 - zone-row-norm)) if energy is not None and rotation_scale > 0: # Build expression as S-expression list that will be evaluated per-zone # (* (* energy rotation_scale) (* 1.5 (+ zone-col-norm (- 1 zone-row-norm)))) energy_times_scale = energy * rotation_scale # The position part uses zone variables, so we build it as an expression char_rotation = [ Symbol('*'), energy_times_scale, [Symbol('*'), 1.5, [Symbol('+'), Symbol('zone-col-norm'), [Symbol('-'), 1, Symbol('zone-row-norm')]]] ] # Pull any extra params from environment that aren't standard params # These are typically passed from recipes for use in cell_effect lambdas standard_params = { 'cols', 'char_size', 'alphabet', 'color_mode', 'background', 'contrast', 'char_hue', 'char_saturation', 'char_brightness', 'char_scale', 'char_rotation', 'char_jitter', 'cell_effect', 'energy', 'rotation_scale', 'frame', 't', '_time', '__state__', '__interp__', 'true', 'false', 'nil' } # Check environment for extra bindings current_env = env while current_env is not None: for k, v in current_env.bindings.items(): if k not in standard_params and k not in extra_params and not callable(v): # Add non-standard, non-callable bindings to extra_params if isinstance(v, (int, float, str, bool)) or v is None: extra_params[k] = v current_env = current_env.parent # Call the primitive with interpreter and env for expression evaluation return prim_ascii_fx_zone( frame, cols, char_size, alphabet, color_mode, background, contrast, char_hue, char_saturation, char_brightness, char_scale, char_rotation, char_jitter, self, env, extra_params, cell_effect ) def _define_effect(self, expr: Any, env: Environment) -> EffectDefinition: """ Parse effect definition. Required syntax: (define-effect name :params ( (param1 :type int :default 8 :desc "description") ) body) Effects MUST use :params syntax. Legacy ((param default) ...) is not supported. """ name = expr[1].name if isinstance(expr[1], Symbol) else expr[1] params = {} body = None found_params = False # Parse :params and body i = 2 while i < len(expr): item = expr[i] if isinstance(item, Keyword) and item.name == "params": # :params syntax if i + 1 >= len(expr): raise SyntaxError(f"Effect '{name}': Missing params list after :params keyword") params_list = expr[i + 1] params = self._parse_params_block(params_list) found_params = True i += 2 elif isinstance(item, Keyword): # Skip other keywords (like :desc) i += 2 elif body is None: # First non-keyword item is the body if isinstance(item, list) and item: first_elem = item[0] # Check for legacy syntax and reject it if isinstance(first_elem, list) and len(first_elem) >= 2: raise SyntaxError( f"Effect '{name}': Legacy parameter syntax ((name default) ...) is not supported. " f"Use :params block instead." ) body = item i += 1 else: i += 1 if body is None: raise SyntaxError(f"Effect '{name}': No body found") if not found_params: raise SyntaxError( f"Effect '{name}': Missing :params block. " f"For effects with no parameters, use empty :params ()" ) effect = EffectDefinition(name, params, body) self.effects[name] = effect return effect def _parse_params_block(self, params_list: list) -> Dict[str, Any]: """ Parse :params block syntax: ( (param_name :type int :default 8 :range [4 32] :desc "description") ) """ params = {} for param_def in params_list: if not isinstance(param_def, list) or len(param_def) < 1: continue # First element is the parameter name first = param_def[0] if isinstance(first, Symbol): param_name = first.name elif isinstance(first, str): param_name = first else: continue # Parse keyword arguments default = None i = 1 while i < len(param_def): item = param_def[i] if isinstance(item, Keyword): if i + 1 >= len(param_def): break kw_value = param_def[i + 1] if item.name == "default": default = kw_value i += 2 else: i += 1 params[param_name] = default return params def load_effect(self, path: str) -> EffectDefinition: """Load an effect definition from a .sexp file.""" expr = parse_file(path) # Handle multiple top-level expressions if isinstance(expr, list) and expr and isinstance(expr[0], list): for e in expr: self.eval(e) else: self.eval(expr) # Return the last defined effect if self.effects: return list(self.effects.values())[-1] return None def run_effect(self, name: str, frame, params: Dict[str, Any], state: Dict[str, Any]) -> tuple: """ Run an effect on frame(s). Args: name: Effect name frame: Input frame (H, W, 3) RGB uint8, or list of frames for multi-input params: Effect parameters (overrides defaults) state: Persistent state dict Returns: (output_frame, new_state) """ if name not in self.effects: raise ValueError(f"Unknown effect: {name}") effect = self.effects[name] # Create environment for this run env = Environment(self.global_env) # Bind frame(s) - support both single frame and list of frames if isinstance(frame, list): # Multi-input effect frames = frame env.set('frame', frames[0] if frames else None) # Backwards compat env.set('inputs', frames) # Named frame bindings for i, f in enumerate(frames): env.set(f'frame-{chr(ord("a") + i)}', f) # frame-a, frame-b, etc. else: # Single-input effect env.set('frame', frame) # Bind state if state is None: state = {} env.set('__state__', state) # Validate that all provided params are known (except internal params) # Extra params are allowed and will be passed through to cell_effect lambdas known_params = set(effect.params.keys()) internal_params = {'_time', 'seed', '_binding', 'effect', 'cid', 'hash', 'effect_path'} extra_effect_params = {} # Unknown params passed through for cell_effect lambdas for k in params.keys(): if k not in known_params and k not in internal_params: # Allow unknown params - they'll be passed to cell_effect lambdas via zone dict extra_effect_params[k] = params[k] # Bind parameters (defaults + overrides) for pname, pdefault in effect.params.items(): value = params.get(pname) if value is None: # Evaluate default if it's an expression (list) if isinstance(pdefault, list): value = self.eval(pdefault, env) else: value = pdefault env.set(pname, value) # Bind extra params (unknown params passed through for cell_effect lambdas) for k, v in extra_effect_params.items(): env.set(k, v) # Reset RNG with seed if provided seed = params.get('seed', 42) reset_rng(int(seed)) # Bind time if provided time_val = params.get('_time', 0) env.set('t', time_val) env.set('_time', time_val) # Evaluate body result = self.eval(effect.body, env) # Ensure result is an image if not isinstance(result, np.ndarray): result = frame return result, state def eval_with_zone(self, expr, env: Environment, zone) -> Any: """ Evaluate expression with zone-* variables injected. Args: expr: Expression to evaluate (S-expression) env: Parent environment with bound values zone: ZoneContext object with cell data Zone variables injected: zone-row, zone-col: Grid position (integers) zone-row-norm, zone-col-norm: Normalized position (0-1) zone-lum: Cell luminance (0-1) zone-sat: Cell saturation (0-1) zone-hue: Cell hue (0-360) zone-r, zone-g, zone-b: RGB components (0-1) Returns: Evaluated result (typically a number) """ # Create child environment with zone variables zone_env = Environment(env) zone_env.set('zone-row', zone.row) zone_env.set('zone-col', zone.col) zone_env.set('zone-row-norm', zone.row_norm) zone_env.set('zone-col-norm', zone.col_norm) zone_env.set('zone-lum', zone.luminance) zone_env.set('zone-sat', zone.saturation) zone_env.set('zone-hue', zone.hue) zone_env.set('zone-r', zone.r) zone_env.set('zone-g', zone.g) zone_env.set('zone-b', zone.b) return self.eval(expr, zone_env) # ============================================================================= # Convenience Functions # ============================================================================= _interpreter = None def get_interpreter() -> Interpreter: """Get or create the global interpreter.""" global _interpreter if _interpreter is None: _interpreter = Interpreter() return _interpreter def load_effect(path: str) -> EffectDefinition: """Load an effect from a .sexp file.""" return get_interpreter().load_effect(path) def load_effects_dir(directory: str): """Load all .sexp effects from a directory.""" interp = get_interpreter() dir_path = Path(directory) for path in dir_path.glob('*.sexp'): try: interp.load_effect(str(path)) except Exception as e: print(f"Warning: Failed to load {path}: {e}") def run_effect(name: str, frame: np.ndarray, params: Dict[str, Any], state: Dict[str, Any] = None) -> tuple: """Run an effect.""" return get_interpreter().run_effect(name, frame, params, state or {}) def list_effects() -> List[str]: """List loaded effect names.""" return list(get_interpreter().effects.keys()) # ============================================================================= # Adapter for existing effect system # ============================================================================= def make_process_frame(effect_path: str) -> Callable: """ Create a process_frame function from a .sexp effect. This allows S-expression effects to be used with the existing effect system. """ interp = get_interpreter() interp.load_effect(effect_path) effect_name = Path(effect_path).stem def process_frame(frame: np.ndarray, params: dict, state: dict) -> tuple: return interp.run_effect(effect_name, frame, params, state) return process_frame