Files
test/effects/echo.py
gilesb 406cc7c0c7 Initial commit: video effects processing system
Add S-expression based video effects pipeline with modular effect
definitions, constructs, and recipe files.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-19 12:34:45 +00:00

140 lines
3.8 KiB
Python

# /// script
# requires-python = ">=3.10"
# dependencies = ["numpy"]
# ///
"""
@effect echo
@version 1.0.0
@author artdag
@description
Motion trail / echo effect. Blends current frame with previous frames
to create ghosting/trailing effects. Great for fast movement scenes.
Uses a frame buffer in state to store recent frames for blending.
@param num_echoes int
@range 1 20
@default 4
Number of trailing frames to blend.
@param decay float
@range 0 1
@default 0.5
Opacity ratio between successive echoes. 0.5 = each echo half as bright.
@param blend_mode string
@enum blend add screen maximum
@default blend
How to combine echoes:
- blend: weighted average
- add: sum (can overexpose)
- screen: like add but resists overexposure
- maximum: brightest pixel wins
@state frame_buffer list
Circular buffer of recent frames.
@example
(effect echo :num_echoes 6 :decay 0.6)
@example
;; More echoes on energy
(effect echo :num_echoes (bind energy :range [2 10]))
"""
import numpy as np
def process_frame(frame: np.ndarray, params: dict, state: dict) -> tuple:
"""
Apply echo/motion trail effect to a video frame.
Args:
frame: Input frame as numpy array (H, W, 3) RGB uint8
params: Effect parameters
- num_echoes: number of trailing frames (default 4)
- decay: opacity decay ratio (default 0.5)
- blend_mode: blend/add/screen/maximum (default blend)
state: Persistent state dict
- frame_buffer: list of recent frames
Returns:
Tuple of (processed_frame, new_state)
"""
num_echoes = max(1, min(int(params.get("num_echoes", 4)), 20))
decay = max(0, min(params.get("decay", 0.5), 1))
blend_mode = params.get("blend_mode", "blend")
if state is None:
state = {}
# Initialize frame buffer
if "frame_buffer" not in state:
state["frame_buffer"] = []
buffer = state["frame_buffer"]
# Add current frame to buffer
buffer.append(frame.copy())
# Limit buffer size
max_buffer = num_echoes + 5
while len(buffer) > max_buffer:
buffer.pop(0)
# Collect frames and intensities for blending
frames = []
intensities = []
intensity = 1.0
# Current frame first, then older frames
for i in range(min(num_echoes + 1, len(buffer))):
idx = len(buffer) - 1 - i
if idx >= 0:
frames.append(buffer[idx].astype(np.float32))
intensities.append(intensity)
intensity *= decay
if not frames:
return frame, state
# Blend frames according to mode
result = _blend_frames(frames, intensities, blend_mode)
return np.clip(result, 0, 255).astype(np.uint8), state
def _blend_frames(frames, intensities, blend_mode):
"""Blend multiple frames according to blend mode."""
if not frames:
return frames[0]
if blend_mode == "add":
result = np.zeros_like(frames[0])
for frame, intensity in zip(frames, intensities):
result += frame * intensity
return result
elif blend_mode == "screen":
result = np.zeros_like(frames[0])
for frame, intensity in zip(frames, intensities):
weighted = (frame / 255.0) * intensity
result = 255 * (1 - (1 - result / 255.0) * (1 - weighted))
return result
elif blend_mode == "maximum":
result = frames[0] * intensities[0]
for frame, intensity in zip(frames[1:], intensities[1:]):
result = np.maximum(result, frame * intensity)
return result
else: # blend - weighted average
total = sum(intensities)
if total == 0:
return frames[0]
result = np.zeros_like(frames[0])
for frame, intensity in zip(frames, intensities):
result += frame * (intensity / total)
return result