- Add primitive_libs/ with modular primitive loading (core, math, image, color, color_ops, filters, geometry, drawing, blending, arrays, ascii) - Effects now explicitly declare dependencies via (require-primitives "...") - Convert ascii-fx-zone from hardcoded special form to loadable primitive - Add _is_symbol/_is_keyword helpers for duck typing to support both sexp_effects.parser.Symbol and artdag.sexp.parser.Symbol classes - Auto-inject _interp and _env for primitives that need them - Remove silent error swallowing in cell_effect evaluation Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
988 lines
34 KiB
Python
988 lines
34 KiB
Python
"""
|
|
S-Expression Effect Interpreter
|
|
|
|
Interprets effect definitions written in S-expressions.
|
|
Only allows safe primitives - no arbitrary code execution.
|
|
"""
|
|
|
|
import numpy as np
|
|
from typing import Any, Dict, List, Optional, Callable
|
|
from pathlib import Path
|
|
|
|
from .parser import Symbol, Keyword, parse, parse_file
|
|
from .primitives import PRIMITIVES, reset_rng
|
|
|
|
|
|
def _is_symbol(x) -> bool:
|
|
"""Check if x is a Symbol (duck typing to support multiple Symbol classes)."""
|
|
return hasattr(x, 'name') and type(x).__name__ == 'Symbol'
|
|
|
|
|
|
def _is_keyword(x) -> bool:
|
|
"""Check if x is a Keyword (duck typing to support multiple Keyword classes)."""
|
|
return hasattr(x, 'name') and type(x).__name__ == 'Keyword'
|
|
|
|
|
|
def _symbol_name(x) -> str:
|
|
"""Get the name from a Symbol."""
|
|
return x.name if hasattr(x, 'name') else str(x)
|
|
|
|
|
|
class Environment:
|
|
"""Lexical environment for variable bindings."""
|
|
|
|
def __init__(self, parent: 'Environment' = None):
|
|
self.bindings: Dict[str, Any] = {}
|
|
self.parent = parent
|
|
|
|
def get(self, name: str) -> Any:
|
|
if name in self.bindings:
|
|
return self.bindings[name]
|
|
if self.parent:
|
|
return self.parent.get(name)
|
|
raise NameError(f"Undefined variable: {name}")
|
|
|
|
def set(self, name: str, value: Any):
|
|
self.bindings[name] = value
|
|
|
|
def has(self, name: str) -> bool:
|
|
if name in self.bindings:
|
|
return True
|
|
if self.parent:
|
|
return self.parent.has(name)
|
|
return False
|
|
|
|
|
|
class Lambda:
|
|
"""A user-defined function (lambda)."""
|
|
|
|
def __init__(self, params: List[str], body: Any, env: Environment):
|
|
self.params = params
|
|
self.body = body
|
|
self.env = env # Closure environment
|
|
|
|
def __repr__(self):
|
|
return f"<lambda ({' '.join(self.params)})>"
|
|
|
|
|
|
class EffectDefinition:
|
|
"""A parsed effect definition."""
|
|
|
|
def __init__(self, name: str, params: Dict[str, Any], body: Any):
|
|
self.name = name
|
|
self.params = params # {name: (type, default)}
|
|
self.body = body
|
|
|
|
def __repr__(self):
|
|
return f"<effect {self.name}>"
|
|
|
|
|
|
class Interpreter:
|
|
"""
|
|
S-Expression interpreter for effects.
|
|
|
|
Provides a safe execution environment where only
|
|
whitelisted primitives can be called.
|
|
|
|
Args:
|
|
minimal_primitives: If True, only load core primitives (arithmetic, comparison,
|
|
basic data access). Additional primitives must be loaded with
|
|
(require-primitives) or (with-primitives).
|
|
If False (default), load all legacy primitives for backward compatibility.
|
|
"""
|
|
|
|
def __init__(self, minimal_primitives: bool = False):
|
|
# Base environment with primitives
|
|
self.global_env = Environment()
|
|
self.minimal_primitives = minimal_primitives
|
|
|
|
if minimal_primitives:
|
|
# Load only core primitives
|
|
from .primitive_libs.core import PRIMITIVES as CORE_PRIMITIVES
|
|
for name, fn in CORE_PRIMITIVES.items():
|
|
self.global_env.set(name, fn)
|
|
else:
|
|
# Load all legacy primitives for backward compatibility
|
|
for name, fn in PRIMITIVES.items():
|
|
self.global_env.set(name, fn)
|
|
|
|
# Special values
|
|
self.global_env.set('true', True)
|
|
self.global_env.set('false', False)
|
|
self.global_env.set('nil', None)
|
|
|
|
# Loaded effect definitions
|
|
self.effects: Dict[str, EffectDefinition] = {}
|
|
|
|
def eval(self, expr: Any, env: Environment = None) -> Any:
|
|
"""Evaluate an S-expression."""
|
|
if env is None:
|
|
env = self.global_env
|
|
|
|
# Atoms
|
|
if isinstance(expr, (int, float, str, bool)):
|
|
return expr
|
|
|
|
if expr is None:
|
|
return None
|
|
|
|
# Handle Symbol (duck typing to support both sexp_effects.parser.Symbol and artdag.sexp.parser.Symbol)
|
|
if _is_symbol(expr):
|
|
return env.get(expr.name)
|
|
|
|
# Handle Keyword (duck typing)
|
|
if _is_keyword(expr):
|
|
return expr # Keywords evaluate to themselves
|
|
|
|
if isinstance(expr, np.ndarray):
|
|
return expr # Images pass through
|
|
|
|
# Lists (function calls / special forms)
|
|
if isinstance(expr, list):
|
|
if not expr:
|
|
return []
|
|
|
|
head = expr[0]
|
|
|
|
# Special forms
|
|
if _is_symbol(head):
|
|
form = head.name
|
|
|
|
# Quote
|
|
if form == 'quote':
|
|
return expr[1]
|
|
|
|
# Define
|
|
if form == 'define':
|
|
name = expr[1]
|
|
if _is_symbol(name):
|
|
value = self.eval(expr[2], env)
|
|
self.global_env.set(name.name, value)
|
|
return value
|
|
else:
|
|
raise SyntaxError(f"define requires symbol, got {name}")
|
|
|
|
# Define-effect
|
|
if form == 'define-effect':
|
|
return self._define_effect(expr, env)
|
|
|
|
# Lambda
|
|
if form == 'lambda' or form == 'λ':
|
|
params = [p.name if _is_symbol(p) else p for p in expr[1]]
|
|
body = expr[2]
|
|
return Lambda(params, body, env)
|
|
|
|
# Let
|
|
if form == 'let':
|
|
return self._eval_let(expr, env)
|
|
|
|
# Let*
|
|
if form == 'let*':
|
|
return self._eval_let_star(expr, env)
|
|
|
|
# If
|
|
if form == 'if':
|
|
cond = self.eval(expr[1], env)
|
|
if cond:
|
|
return self.eval(expr[2], env)
|
|
elif len(expr) > 3:
|
|
return self.eval(expr[3], env)
|
|
return None
|
|
|
|
# Cond
|
|
if form == 'cond':
|
|
return self._eval_cond(expr, env)
|
|
|
|
# And
|
|
if form == 'and':
|
|
result = True
|
|
for e in expr[1:]:
|
|
result = self.eval(e, env)
|
|
if not result:
|
|
return False
|
|
return result
|
|
|
|
# Or
|
|
if form == 'or':
|
|
for e in expr[1:]:
|
|
result = self.eval(e, env)
|
|
if result:
|
|
return result
|
|
return False
|
|
|
|
# Not
|
|
if form == 'not':
|
|
return not self.eval(expr[1], env)
|
|
|
|
# Begin (sequence)
|
|
if form == 'begin':
|
|
result = None
|
|
for e in expr[1:]:
|
|
result = self.eval(e, env)
|
|
return result
|
|
|
|
# Thread-first macro: (-> x (f a) (g b)) => (g (f x a) b)
|
|
if form == '->':
|
|
result = self.eval(expr[1], env)
|
|
for form_expr in expr[2:]:
|
|
if isinstance(form_expr, list):
|
|
# Insert result as first arg: (f a b) => (f result a b)
|
|
result = self.eval([form_expr[0], result] + form_expr[1:], env)
|
|
else:
|
|
# Just a symbol: f => (f result)
|
|
result = self.eval([form_expr, result], env)
|
|
return result
|
|
|
|
# Set! (mutation)
|
|
if form == 'set!':
|
|
name = expr[1].name if _is_symbol(expr[1]) else expr[1]
|
|
value = self.eval(expr[2], env)
|
|
# Find and update in appropriate scope
|
|
scope = env
|
|
while scope:
|
|
if name in scope.bindings:
|
|
scope.bindings[name] = value
|
|
return value
|
|
scope = scope.parent
|
|
raise NameError(f"Cannot set undefined variable: {name}")
|
|
|
|
# State-get / state-set (for effect state)
|
|
if form == 'state-get':
|
|
state = env.get('__state__')
|
|
key = self.eval(expr[1], env)
|
|
if _is_symbol(key):
|
|
key = key.name
|
|
default = self.eval(expr[2], env) if len(expr) > 2 else None
|
|
return state.get(key, default)
|
|
|
|
if form == 'state-set':
|
|
state = env.get('__state__')
|
|
key = self.eval(expr[1], env)
|
|
if _is_symbol(key):
|
|
key = key.name
|
|
value = self.eval(expr[2], env)
|
|
state[key] = value
|
|
return value
|
|
|
|
# ascii-fx-zone special form - delays evaluation of expression parameters
|
|
if form == 'ascii-fx-zone':
|
|
return self._eval_ascii_fx_zone(expr, env)
|
|
|
|
# with-primitives - load primitive library and scope to body
|
|
if form == 'with-primitives':
|
|
return self._eval_with_primitives(expr, env)
|
|
|
|
# require-primitives - load primitive library into current scope
|
|
if form == 'require-primitives':
|
|
return self._eval_require_primitives(expr, env)
|
|
|
|
# Function call
|
|
fn = self.eval(head, env)
|
|
args = [self.eval(arg, env) for arg in expr[1:]]
|
|
|
|
# Handle keyword arguments
|
|
pos_args = []
|
|
kw_args = {}
|
|
i = 0
|
|
while i < len(args):
|
|
if _is_keyword(args[i]):
|
|
kw_args[args[i].name] = args[i + 1] if i + 1 < len(args) else None
|
|
i += 2
|
|
else:
|
|
pos_args.append(args[i])
|
|
i += 1
|
|
|
|
return self._apply(fn, pos_args, kw_args, env)
|
|
|
|
raise TypeError(f"Cannot evaluate: {expr}")
|
|
|
|
def _wrap_lambda(self, lam: 'Lambda') -> Callable:
|
|
"""Wrap a Lambda in a Python callable for use by primitives."""
|
|
def wrapper(*args):
|
|
new_env = Environment(lam.env)
|
|
for i, param in enumerate(lam.params):
|
|
if i < len(args):
|
|
new_env.set(param, args[i])
|
|
else:
|
|
new_env.set(param, None)
|
|
return self.eval(lam.body, new_env)
|
|
return wrapper
|
|
|
|
def _apply(self, fn: Any, args: List[Any], kwargs: Dict[str, Any], env: Environment) -> Any:
|
|
"""Apply a function to arguments."""
|
|
if isinstance(fn, Lambda):
|
|
# User-defined function
|
|
new_env = Environment(fn.env)
|
|
for i, param in enumerate(fn.params):
|
|
if i < len(args):
|
|
new_env.set(param, args[i])
|
|
else:
|
|
new_env.set(param, None)
|
|
return self.eval(fn.body, new_env)
|
|
|
|
elif callable(fn):
|
|
# Wrap any Lambda arguments so primitives can call them
|
|
wrapped_args = []
|
|
for arg in args:
|
|
if isinstance(arg, Lambda):
|
|
wrapped_args.append(self._wrap_lambda(arg))
|
|
else:
|
|
wrapped_args.append(arg)
|
|
|
|
# Inject _interp and _env for primitives that need them
|
|
import inspect
|
|
try:
|
|
sig = inspect.signature(fn)
|
|
params = sig.parameters
|
|
if '_interp' in params and '_interp' not in kwargs:
|
|
kwargs['_interp'] = self
|
|
if '_env' in params and '_env' not in kwargs:
|
|
kwargs['_env'] = env
|
|
except (ValueError, TypeError):
|
|
# Some built-in functions don't have inspectable signatures
|
|
pass
|
|
|
|
# Primitive function
|
|
if kwargs:
|
|
return fn(*wrapped_args, **kwargs)
|
|
return fn(*wrapped_args)
|
|
|
|
else:
|
|
raise TypeError(f"Cannot call: {fn}")
|
|
|
|
def _parse_bindings(self, bindings: list) -> list:
|
|
"""Parse bindings in either Scheme or Clojure style.
|
|
|
|
Scheme: ((x 1) (y 2)) -> [(x, 1), (y, 2)]
|
|
Clojure: [x 1 y 2] -> [(x, 1), (y, 2)]
|
|
"""
|
|
if not bindings:
|
|
return []
|
|
|
|
# Check if Clojure style (flat list with symbols and values alternating)
|
|
if _is_symbol(bindings[0]):
|
|
# Clojure style: [x 1 y 2]
|
|
pairs = []
|
|
i = 0
|
|
while i < len(bindings) - 1:
|
|
name = bindings[i].name if _is_symbol(bindings[i]) else bindings[i]
|
|
value = bindings[i + 1]
|
|
pairs.append((name, value))
|
|
i += 2
|
|
return pairs
|
|
else:
|
|
# Scheme style: ((x 1) (y 2))
|
|
pairs = []
|
|
for binding in bindings:
|
|
name = binding[0].name if _is_symbol(binding[0]) else binding[0]
|
|
value = binding[1]
|
|
pairs.append((name, value))
|
|
return pairs
|
|
|
|
def _eval_let(self, expr: Any, env: Environment) -> Any:
|
|
"""Evaluate let expression: (let ((x 1) (y 2)) body) or (let [x 1 y 2] body)
|
|
|
|
Note: Uses sequential binding (like Clojure let / Scheme let*) so each
|
|
binding can reference previous bindings.
|
|
"""
|
|
bindings = expr[1]
|
|
body = expr[2]
|
|
|
|
new_env = Environment(env)
|
|
for name, value_expr in self._parse_bindings(bindings):
|
|
value = self.eval(value_expr, new_env) # Sequential: can see previous bindings
|
|
new_env.set(name, value)
|
|
|
|
return self.eval(body, new_env)
|
|
|
|
def _eval_let_star(self, expr: Any, env: Environment) -> Any:
|
|
"""Evaluate let* expression: sequential bindings."""
|
|
bindings = expr[1]
|
|
body = expr[2]
|
|
|
|
new_env = Environment(env)
|
|
for name, value_expr in self._parse_bindings(bindings):
|
|
value = self.eval(value_expr, new_env) # Evaluate in current env
|
|
new_env.set(name, value)
|
|
|
|
return self.eval(body, new_env)
|
|
|
|
def _eval_cond(self, expr: Any, env: Environment) -> Any:
|
|
"""Evaluate cond expression."""
|
|
for clause in expr[1:]:
|
|
test = clause[0]
|
|
if _is_symbol(test) and test.name == 'else':
|
|
return self.eval(clause[1], env)
|
|
if self.eval(test, env):
|
|
return self.eval(clause[1], env)
|
|
return None
|
|
|
|
def _eval_with_primitives(self, expr: Any, env: Environment) -> Any:
|
|
"""
|
|
Evaluate with-primitives: scoped primitive library loading.
|
|
|
|
Syntax:
|
|
(with-primitives "math"
|
|
(sin (* x pi)))
|
|
|
|
(with-primitives "math" :path "custom/math.py"
|
|
body)
|
|
|
|
The primitives from the library are only available within the body.
|
|
"""
|
|
# Parse library name and optional path
|
|
lib_name = expr[1]
|
|
if _is_symbol(lib_name):
|
|
lib_name = lib_name.name
|
|
|
|
path = None
|
|
body_start = 2
|
|
|
|
# Check for :path keyword
|
|
if len(expr) > 2 and _is_keyword(expr[2]) and expr[2].name == 'path':
|
|
path = expr[3]
|
|
body_start = 4
|
|
|
|
# Load the primitive library
|
|
primitives = self.load_primitive_library(lib_name, path)
|
|
|
|
# Create new environment with primitives
|
|
new_env = Environment(env)
|
|
for name, fn in primitives.items():
|
|
new_env.set(name, fn)
|
|
|
|
# Evaluate body in new environment
|
|
result = None
|
|
for e in expr[body_start:]:
|
|
result = self.eval(e, new_env)
|
|
return result
|
|
|
|
def _eval_require_primitives(self, expr: Any, env: Environment) -> Any:
|
|
"""
|
|
Evaluate require-primitives: load primitives into current scope.
|
|
|
|
Syntax:
|
|
(require-primitives "math" "color" "filters")
|
|
|
|
Unlike with-primitives, this loads into the current environment
|
|
(typically used at top-level to set up an effect's dependencies).
|
|
"""
|
|
for lib_expr in expr[1:]:
|
|
if _is_symbol(lib_expr):
|
|
lib_name = lib_expr.name
|
|
else:
|
|
lib_name = lib_expr
|
|
|
|
primitives = self.load_primitive_library(lib_name)
|
|
for name, fn in primitives.items():
|
|
env.set(name, fn)
|
|
|
|
return None
|
|
|
|
def load_primitive_library(self, name: str, path: str = None) -> dict:
|
|
"""
|
|
Load a primitive library by name or path.
|
|
|
|
Returns dict of {name: function}.
|
|
"""
|
|
from .primitive_libs import load_primitive_library
|
|
return load_primitive_library(name, path)
|
|
|
|
def _eval_ascii_fx_zone(self, expr: Any, env: Environment) -> Any:
|
|
"""
|
|
Evaluate ascii-fx-zone special form.
|
|
|
|
Syntax:
|
|
(ascii-fx-zone frame
|
|
:cols 80
|
|
:alphabet "standard"
|
|
:color_mode "color"
|
|
:background "black"
|
|
:contrast 1.5
|
|
:char_hue <expr> ;; NOT evaluated - passed to primitive
|
|
:char_saturation <expr>
|
|
:char_brightness <expr>
|
|
:char_scale <expr>
|
|
:char_rotation <expr>
|
|
:char_jitter <expr>)
|
|
|
|
The expression parameters (:char_hue, etc.) are NOT pre-evaluated.
|
|
They are passed as raw S-expressions to the primitive which
|
|
evaluates them per-zone with zone context variables injected.
|
|
|
|
Requires: (require-primitives "ascii")
|
|
"""
|
|
# Look up ascii-fx-zone primitive from environment
|
|
# It must be loaded via (require-primitives "ascii")
|
|
try:
|
|
prim_ascii_fx_zone = env.get('ascii-fx-zone')
|
|
except NameError:
|
|
raise NameError(
|
|
"ascii-fx-zone primitive not found. "
|
|
"Add (require-primitives \"ascii\") to your effect file."
|
|
)
|
|
|
|
# Expression parameter names that should NOT be evaluated
|
|
expr_params = {'char_hue', 'char_saturation', 'char_brightness',
|
|
'char_scale', 'char_rotation', 'char_jitter', 'cell_effect'}
|
|
|
|
# Parse arguments
|
|
frame = self.eval(expr[1], env) # First arg is always the frame
|
|
|
|
# Defaults
|
|
cols = 80
|
|
char_size = None # If set, overrides cols
|
|
alphabet = "standard"
|
|
color_mode = "color"
|
|
background = "black"
|
|
contrast = 1.5
|
|
char_hue = None
|
|
char_saturation = None
|
|
char_brightness = None
|
|
char_scale = None
|
|
char_rotation = None
|
|
char_jitter = None
|
|
cell_effect = None # Lambda for arbitrary per-cell effects
|
|
# Convenience params for staged recipes
|
|
energy = None
|
|
rotation_scale = 0
|
|
# Extra params to pass to zone dict for lambdas
|
|
extra_params = {}
|
|
|
|
# Parse keyword arguments
|
|
i = 2
|
|
while i < len(expr):
|
|
item = expr[i]
|
|
if _is_keyword(item):
|
|
if i + 1 >= len(expr):
|
|
break
|
|
value_expr = expr[i + 1]
|
|
kw_name = item.name
|
|
|
|
if kw_name in expr_params:
|
|
# Resolve symbol references but don't evaluate expressions
|
|
# This handles the case where effect definition passes a param like :char_hue char_hue
|
|
resolved = value_expr
|
|
if _is_symbol(value_expr):
|
|
try:
|
|
resolved = env.get(value_expr.name)
|
|
except NameError:
|
|
resolved = value_expr # Keep as symbol if not found
|
|
|
|
if kw_name == 'char_hue':
|
|
char_hue = resolved
|
|
elif kw_name == 'char_saturation':
|
|
char_saturation = resolved
|
|
elif kw_name == 'char_brightness':
|
|
char_brightness = resolved
|
|
elif kw_name == 'char_scale':
|
|
char_scale = resolved
|
|
elif kw_name == 'char_rotation':
|
|
char_rotation = resolved
|
|
elif kw_name == 'char_jitter':
|
|
char_jitter = resolved
|
|
elif kw_name == 'cell_effect':
|
|
cell_effect = resolved
|
|
else:
|
|
# Evaluate normally
|
|
value = self.eval(value_expr, env)
|
|
if kw_name == 'cols':
|
|
cols = int(value)
|
|
elif kw_name == 'char_size':
|
|
# Handle nil/None values
|
|
if value is None or (_is_symbol(value) and value.name == 'nil'):
|
|
char_size = None
|
|
else:
|
|
char_size = int(value)
|
|
elif kw_name == 'alphabet':
|
|
alphabet = str(value)
|
|
elif kw_name == 'color_mode':
|
|
color_mode = str(value)
|
|
elif kw_name == 'background':
|
|
background = str(value)
|
|
elif kw_name == 'contrast':
|
|
contrast = float(value)
|
|
elif kw_name == 'energy':
|
|
if value is None or (_is_symbol(value) and value.name == 'nil'):
|
|
energy = None
|
|
else:
|
|
energy = float(value)
|
|
elif kw_name == 'rotation_scale':
|
|
rotation_scale = float(value)
|
|
else:
|
|
# Store any other params for lambdas to access
|
|
extra_params[kw_name] = value
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
|
|
# If energy and rotation_scale provided, build rotation expression
|
|
# rotation = energy * rotation_scale * position_factor
|
|
# position_factor: bottom-left=0, top-right=3
|
|
# Formula: 1.5 * (zone-col-norm + (1 - zone-row-norm))
|
|
if energy is not None and rotation_scale > 0:
|
|
# Build expression as S-expression list that will be evaluated per-zone
|
|
# (* (* energy rotation_scale) (* 1.5 (+ zone-col-norm (- 1 zone-row-norm))))
|
|
energy_times_scale = energy * rotation_scale
|
|
# The position part uses zone variables, so we build it as an expression
|
|
char_rotation = [
|
|
Symbol('*'),
|
|
energy_times_scale,
|
|
[Symbol('*'), 1.5,
|
|
[Symbol('+'), Symbol('zone-col-norm'),
|
|
[Symbol('-'), 1, Symbol('zone-row-norm')]]]
|
|
]
|
|
|
|
# Pull any extra params from environment that aren't standard params
|
|
# These are typically passed from recipes for use in cell_effect lambdas
|
|
standard_params = {
|
|
'cols', 'char_size', 'alphabet', 'color_mode', 'background', 'contrast',
|
|
'char_hue', 'char_saturation', 'char_brightness', 'char_scale',
|
|
'char_rotation', 'char_jitter', 'cell_effect', 'energy', 'rotation_scale',
|
|
'frame', 't', '_time', '__state__', '__interp__', 'true', 'false', 'nil'
|
|
}
|
|
# Check environment for extra bindings
|
|
current_env = env
|
|
while current_env is not None:
|
|
for k, v in current_env.bindings.items():
|
|
if k not in standard_params and k not in extra_params and not callable(v):
|
|
# Add non-standard, non-callable bindings to extra_params
|
|
if isinstance(v, (int, float, str, bool)) or v is None:
|
|
extra_params[k] = v
|
|
current_env = current_env.parent
|
|
|
|
# Call the primitive with interpreter and env for expression evaluation
|
|
return prim_ascii_fx_zone(
|
|
frame,
|
|
cols=cols,
|
|
char_size=char_size,
|
|
alphabet=alphabet,
|
|
color_mode=color_mode,
|
|
background=background,
|
|
contrast=contrast,
|
|
char_hue=char_hue,
|
|
char_saturation=char_saturation,
|
|
char_brightness=char_brightness,
|
|
char_scale=char_scale,
|
|
char_rotation=char_rotation,
|
|
char_jitter=char_jitter,
|
|
cell_effect=cell_effect,
|
|
energy=energy,
|
|
rotation_scale=rotation_scale,
|
|
_interp=self,
|
|
_env=env,
|
|
**extra_params
|
|
)
|
|
|
|
def _define_effect(self, expr: Any, env: Environment) -> EffectDefinition:
|
|
"""
|
|
Parse effect definition.
|
|
|
|
Required syntax:
|
|
(define-effect name
|
|
:params (
|
|
(param1 :type int :default 8 :desc "description")
|
|
)
|
|
body)
|
|
|
|
Effects MUST use :params syntax. Legacy ((param default) ...) is not supported.
|
|
"""
|
|
name = expr[1].name if _is_symbol(expr[1]) else expr[1]
|
|
|
|
params = {}
|
|
body = None
|
|
found_params = False
|
|
|
|
# Parse :params and body
|
|
i = 2
|
|
while i < len(expr):
|
|
item = expr[i]
|
|
if _is_keyword(item) and item.name == "params":
|
|
# :params syntax
|
|
if i + 1 >= len(expr):
|
|
raise SyntaxError(f"Effect '{name}': Missing params list after :params keyword")
|
|
params_list = expr[i + 1]
|
|
params = self._parse_params_block(params_list)
|
|
found_params = True
|
|
i += 2
|
|
elif _is_keyword(item):
|
|
# Skip other keywords (like :desc)
|
|
i += 2
|
|
elif body is None:
|
|
# First non-keyword item is the body
|
|
if isinstance(item, list) and item:
|
|
first_elem = item[0]
|
|
# Check for legacy syntax and reject it
|
|
if isinstance(first_elem, list) and len(first_elem) >= 2:
|
|
raise SyntaxError(
|
|
f"Effect '{name}': Legacy parameter syntax ((name default) ...) is not supported. "
|
|
f"Use :params block instead."
|
|
)
|
|
body = item
|
|
i += 1
|
|
else:
|
|
i += 1
|
|
|
|
if body is None:
|
|
raise SyntaxError(f"Effect '{name}': No body found")
|
|
|
|
if not found_params:
|
|
raise SyntaxError(
|
|
f"Effect '{name}': Missing :params block. "
|
|
f"For effects with no parameters, use empty :params ()"
|
|
)
|
|
|
|
effect = EffectDefinition(name, params, body)
|
|
self.effects[name] = effect
|
|
return effect
|
|
|
|
def _parse_params_block(self, params_list: list) -> Dict[str, Any]:
|
|
"""
|
|
Parse :params block syntax:
|
|
(
|
|
(param_name :type int :default 8 :range [4 32] :desc "description")
|
|
)
|
|
"""
|
|
params = {}
|
|
for param_def in params_list:
|
|
if not isinstance(param_def, list) or len(param_def) < 1:
|
|
continue
|
|
|
|
# First element is the parameter name
|
|
first = param_def[0]
|
|
if _is_symbol(first):
|
|
param_name = first.name
|
|
elif isinstance(first, str):
|
|
param_name = first
|
|
else:
|
|
continue
|
|
|
|
# Parse keyword arguments
|
|
default = None
|
|
i = 1
|
|
while i < len(param_def):
|
|
item = param_def[i]
|
|
if _is_keyword(item):
|
|
if i + 1 >= len(param_def):
|
|
break
|
|
kw_value = param_def[i + 1]
|
|
|
|
if item.name == "default":
|
|
default = kw_value
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
|
|
params[param_name] = default
|
|
|
|
return params
|
|
|
|
def load_effect(self, path: str) -> EffectDefinition:
|
|
"""Load an effect definition from a .sexp file."""
|
|
expr = parse_file(path)
|
|
|
|
# Handle multiple top-level expressions
|
|
if isinstance(expr, list) and expr and isinstance(expr[0], list):
|
|
for e in expr:
|
|
self.eval(e)
|
|
else:
|
|
self.eval(expr)
|
|
|
|
# Return the last defined effect
|
|
if self.effects:
|
|
return list(self.effects.values())[-1]
|
|
return None
|
|
|
|
def run_effect(self, name: str, frame, params: Dict[str, Any],
|
|
state: Dict[str, Any]) -> tuple:
|
|
"""
|
|
Run an effect on frame(s).
|
|
|
|
Args:
|
|
name: Effect name
|
|
frame: Input frame (H, W, 3) RGB uint8, or list of frames for multi-input
|
|
params: Effect parameters (overrides defaults)
|
|
state: Persistent state dict
|
|
|
|
Returns:
|
|
(output_frame, new_state)
|
|
"""
|
|
if name not in self.effects:
|
|
raise ValueError(f"Unknown effect: {name}")
|
|
|
|
effect = self.effects[name]
|
|
|
|
# Create environment for this run
|
|
env = Environment(self.global_env)
|
|
|
|
# Bind frame(s) - support both single frame and list of frames
|
|
if isinstance(frame, list):
|
|
# Multi-input effect
|
|
frames = frame
|
|
env.set('frame', frames[0] if frames else None) # Backwards compat
|
|
env.set('inputs', frames)
|
|
# Named frame bindings
|
|
for i, f in enumerate(frames):
|
|
env.set(f'frame-{chr(ord("a") + i)}', f) # frame-a, frame-b, etc.
|
|
else:
|
|
# Single-input effect
|
|
env.set('frame', frame)
|
|
|
|
# Bind state
|
|
if state is None:
|
|
state = {}
|
|
env.set('__state__', state)
|
|
|
|
# Validate that all provided params are known (except internal params)
|
|
# Extra params are allowed and will be passed through to cell_effect lambdas
|
|
known_params = set(effect.params.keys())
|
|
internal_params = {'_time', 'seed', '_binding', 'effect', 'cid', 'hash', 'effect_path'}
|
|
extra_effect_params = {} # Unknown params passed through for cell_effect lambdas
|
|
for k in params.keys():
|
|
if k not in known_params and k not in internal_params:
|
|
# Allow unknown params - they'll be passed to cell_effect lambdas via zone dict
|
|
extra_effect_params[k] = params[k]
|
|
|
|
# Bind parameters (defaults + overrides)
|
|
for pname, pdefault in effect.params.items():
|
|
value = params.get(pname)
|
|
if value is None:
|
|
# Evaluate default if it's an expression (list)
|
|
if isinstance(pdefault, list):
|
|
value = self.eval(pdefault, env)
|
|
else:
|
|
value = pdefault
|
|
env.set(pname, value)
|
|
|
|
# Bind extra params (unknown params passed through for cell_effect lambdas)
|
|
for k, v in extra_effect_params.items():
|
|
env.set(k, v)
|
|
|
|
# Reset RNG with seed if provided
|
|
seed = params.get('seed', 42)
|
|
reset_rng(int(seed))
|
|
|
|
# Bind time if provided
|
|
time_val = params.get('_time', 0)
|
|
env.set('t', time_val)
|
|
env.set('_time', time_val)
|
|
|
|
# Evaluate body
|
|
result = self.eval(effect.body, env)
|
|
|
|
# Ensure result is an image
|
|
if not isinstance(result, np.ndarray):
|
|
result = frame
|
|
|
|
return result, state
|
|
|
|
def eval_with_zone(self, expr, env: Environment, zone) -> Any:
|
|
"""
|
|
Evaluate expression with zone-* variables injected.
|
|
|
|
Args:
|
|
expr: Expression to evaluate (S-expression)
|
|
env: Parent environment with bound values
|
|
zone: ZoneContext object with cell data
|
|
|
|
Zone variables injected:
|
|
zone-row, zone-col: Grid position (integers)
|
|
zone-row-norm, zone-col-norm: Normalized position (0-1)
|
|
zone-lum: Cell luminance (0-1)
|
|
zone-sat: Cell saturation (0-1)
|
|
zone-hue: Cell hue (0-360)
|
|
zone-r, zone-g, zone-b: RGB components (0-1)
|
|
|
|
Returns:
|
|
Evaluated result (typically a number)
|
|
"""
|
|
# Create child environment with zone variables
|
|
zone_env = Environment(env)
|
|
zone_env.set('zone-row', zone.row)
|
|
zone_env.set('zone-col', zone.col)
|
|
zone_env.set('zone-row-norm', zone.row_norm)
|
|
zone_env.set('zone-col-norm', zone.col_norm)
|
|
zone_env.set('zone-lum', zone.luminance)
|
|
zone_env.set('zone-sat', zone.saturation)
|
|
zone_env.set('zone-hue', zone.hue)
|
|
zone_env.set('zone-r', zone.r)
|
|
zone_env.set('zone-g', zone.g)
|
|
zone_env.set('zone-b', zone.b)
|
|
|
|
return self.eval(expr, zone_env)
|
|
|
|
|
|
# =============================================================================
|
|
# Convenience Functions
|
|
# =============================================================================
|
|
|
|
_interpreter = None
|
|
_interpreter_minimal = None
|
|
|
|
|
|
def get_interpreter(minimal_primitives: bool = False) -> Interpreter:
|
|
"""Get or create the global interpreter.
|
|
|
|
Args:
|
|
minimal_primitives: If True, return interpreter with only core primitives.
|
|
Additional primitives must be loaded with require-primitives or with-primitives.
|
|
"""
|
|
global _interpreter, _interpreter_minimal
|
|
|
|
if minimal_primitives:
|
|
if _interpreter_minimal is None:
|
|
_interpreter_minimal = Interpreter(minimal_primitives=True)
|
|
return _interpreter_minimal
|
|
else:
|
|
if _interpreter is None:
|
|
_interpreter = Interpreter(minimal_primitives=False)
|
|
return _interpreter
|
|
|
|
|
|
def load_effect(path: str) -> EffectDefinition:
|
|
"""Load an effect from a .sexp file."""
|
|
return get_interpreter().load_effect(path)
|
|
|
|
|
|
def load_effects_dir(directory: str):
|
|
"""Load all .sexp effects from a directory."""
|
|
interp = get_interpreter()
|
|
dir_path = Path(directory)
|
|
for path in dir_path.glob('*.sexp'):
|
|
try:
|
|
interp.load_effect(str(path))
|
|
except Exception as e:
|
|
print(f"Warning: Failed to load {path}: {e}")
|
|
|
|
|
|
def run_effect(name: str, frame: np.ndarray, params: Dict[str, Any],
|
|
state: Dict[str, Any] = None) -> tuple:
|
|
"""Run an effect."""
|
|
return get_interpreter().run_effect(name, frame, params, state or {})
|
|
|
|
|
|
def list_effects() -> List[str]:
|
|
"""List loaded effect names."""
|
|
return list(get_interpreter().effects.keys())
|
|
|
|
|
|
# =============================================================================
|
|
# Adapter for existing effect system
|
|
# =============================================================================
|
|
|
|
def make_process_frame(effect_path: str) -> Callable:
|
|
"""
|
|
Create a process_frame function from a .sexp effect.
|
|
|
|
This allows S-expression effects to be used with the existing
|
|
effect system.
|
|
"""
|
|
interp = get_interpreter()
|
|
interp.load_effect(effect_path)
|
|
effect_name = Path(effect_path).stem
|
|
|
|
def process_frame(frame: np.ndarray, params: dict, state: dict) -> tuple:
|
|
return interp.run_effect(effect_name, frame, params, state)
|
|
|
|
return process_frame
|