All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m33s
Merges full history from art-dag/mono.git into the monorepo under the artdag/ directory. Contains: core (DAG engine), l1 (Celery rendering server), l2 (ActivityPub registry), common (shared templates/middleware), client (CLI), test (e2e). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> git-subtree-dir: artdag git-subtree-mainline:1a179de547git-subtree-split:4c2e716558
309 lines
9.5 KiB
Python
309 lines
9.5 KiB
Python
"""
|
|
Effect processing backends.
|
|
|
|
Provides abstraction over different rendering backends:
|
|
- numpy: CPU-based, works everywhere, ~3-5 fps
|
|
- glsl: GPU-based, requires OpenGL, 30+ fps (future)
|
|
"""
|
|
|
|
import numpy as np
|
|
from abc import ABC, abstractmethod
|
|
from typing import List, Dict, Any, Optional
|
|
from pathlib import Path
|
|
|
|
|
|
class Backend(ABC):
|
|
"""Abstract base class for effect processing backends."""
|
|
|
|
@abstractmethod
|
|
def process_frame(
|
|
self,
|
|
frames: List[np.ndarray],
|
|
effects_per_frame: List[List[Dict]],
|
|
compositor_config: Dict,
|
|
t: float,
|
|
analysis_data: Dict,
|
|
) -> np.ndarray:
|
|
"""
|
|
Process multiple input frames through effects and composite.
|
|
|
|
Args:
|
|
frames: List of input frames (one per source)
|
|
effects_per_frame: List of effect chains (one per source)
|
|
compositor_config: How to blend the layers
|
|
t: Current time in seconds
|
|
analysis_data: Analysis data for binding resolution
|
|
|
|
Returns:
|
|
Composited output frame
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def load_effect(self, effect_path: Path) -> Any:
|
|
"""Load an effect definition."""
|
|
pass
|
|
|
|
|
|
class NumpyBackend(Backend):
|
|
"""
|
|
CPU-based effect processing using NumPy.
|
|
|
|
Uses existing sexp_effects interpreter for effect execution.
|
|
Works on any system, but limited to ~3-5 fps for complex effects.
|
|
"""
|
|
|
|
def __init__(self, recipe_dir: Path = None, minimal_primitives: bool = True):
|
|
self.recipe_dir = recipe_dir or Path(".")
|
|
self.minimal_primitives = minimal_primitives
|
|
self._interpreter = None
|
|
self._loaded_effects = {}
|
|
|
|
def _get_interpreter(self):
|
|
"""Lazy-load the sexp interpreter."""
|
|
if self._interpreter is None:
|
|
from sexp_effects import get_interpreter
|
|
self._interpreter = get_interpreter(minimal_primitives=self.minimal_primitives)
|
|
return self._interpreter
|
|
|
|
def load_effect(self, effect_path: Path) -> Any:
|
|
"""Load an effect from sexp file."""
|
|
effect_key = str(effect_path)
|
|
if effect_key not in self._loaded_effects:
|
|
interp = self._get_interpreter()
|
|
interp.load_effect(str(effect_path))
|
|
self._loaded_effects[effect_key] = effect_path.stem
|
|
return self._loaded_effects[effect_key]
|
|
|
|
def _resolve_binding(self, value: Any, t: float, analysis_data: Dict) -> Any:
|
|
"""Resolve a parameter binding to its value at time t."""
|
|
if not isinstance(value, dict):
|
|
return value
|
|
|
|
if "_binding" in value or "_bind" in value:
|
|
source = value.get("source") or value.get("_bind")
|
|
feature = value.get("feature", "values")
|
|
range_map = value.get("range")
|
|
|
|
track = analysis_data.get(source, {})
|
|
times = track.get("times", [])
|
|
values = track.get("values", [])
|
|
|
|
if not times or not values:
|
|
return 0.0
|
|
|
|
# Find value at time t (linear interpolation)
|
|
if t <= times[0]:
|
|
val = values[0]
|
|
elif t >= times[-1]:
|
|
val = values[-1]
|
|
else:
|
|
# Binary search for bracket
|
|
for i in range(len(times) - 1):
|
|
if times[i] <= t <= times[i + 1]:
|
|
alpha = (t - times[i]) / (times[i + 1] - times[i])
|
|
val = values[i] * (1 - alpha) + values[i + 1] * alpha
|
|
break
|
|
else:
|
|
val = values[-1]
|
|
|
|
# Apply range mapping
|
|
if range_map and len(range_map) == 2:
|
|
val = range_map[0] + val * (range_map[1] - range_map[0])
|
|
|
|
return val
|
|
|
|
return value
|
|
|
|
def _apply_effect(
|
|
self,
|
|
frame: np.ndarray,
|
|
effect_name: str,
|
|
params: Dict,
|
|
t: float,
|
|
analysis_data: Dict,
|
|
) -> np.ndarray:
|
|
"""Apply a single effect to a frame."""
|
|
# Resolve bindings in params
|
|
resolved_params = {"_time": t}
|
|
for key, value in params.items():
|
|
if key in ("effect", "effect_path", "cid", "analysis_refs"):
|
|
continue
|
|
resolved_params[key] = self._resolve_binding(value, t, analysis_data)
|
|
|
|
# Try fast native effects first
|
|
result = self._apply_native_effect(frame, effect_name, resolved_params)
|
|
if result is not None:
|
|
return result
|
|
|
|
# Fall back to sexp interpreter for complex effects
|
|
interp = self._get_interpreter()
|
|
if effect_name in interp.effects:
|
|
result, _ = interp.run_effect(effect_name, frame, resolved_params, {})
|
|
return result
|
|
|
|
# Unknown effect - pass through
|
|
return frame
|
|
|
|
def _apply_native_effect(
|
|
self,
|
|
frame: np.ndarray,
|
|
effect_name: str,
|
|
params: Dict,
|
|
) -> Optional[np.ndarray]:
|
|
"""Fast native numpy effects for real-time streaming."""
|
|
import cv2
|
|
|
|
if effect_name == "zoom":
|
|
amount = float(params.get("amount", 1.0))
|
|
if abs(amount - 1.0) < 0.01:
|
|
return frame
|
|
h, w = frame.shape[:2]
|
|
# Crop center and resize
|
|
new_w, new_h = int(w / amount), int(h / amount)
|
|
x1, y1 = (w - new_w) // 2, (h - new_h) // 2
|
|
cropped = frame[y1:y1+new_h, x1:x1+new_w]
|
|
return cv2.resize(cropped, (w, h))
|
|
|
|
elif effect_name == "rotate":
|
|
angle = float(params.get("angle", 0))
|
|
if abs(angle) < 0.5:
|
|
return frame
|
|
h, w = frame.shape[:2]
|
|
center = (w // 2, h // 2)
|
|
matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
|
|
return cv2.warpAffine(frame, matrix, (w, h))
|
|
|
|
elif effect_name == "brightness":
|
|
amount = float(params.get("amount", 1.0))
|
|
return np.clip(frame * amount, 0, 255).astype(np.uint8)
|
|
|
|
elif effect_name == "invert":
|
|
amount = float(params.get("amount", 1.0))
|
|
if amount < 0.5:
|
|
return frame
|
|
return 255 - frame
|
|
|
|
# Not a native effect
|
|
return None
|
|
|
|
def process_frame(
|
|
self,
|
|
frames: List[np.ndarray],
|
|
effects_per_frame: List[List[Dict]],
|
|
compositor_config: Dict,
|
|
t: float,
|
|
analysis_data: Dict,
|
|
) -> np.ndarray:
|
|
"""
|
|
Process frames through effects and composite.
|
|
"""
|
|
if not frames:
|
|
return np.zeros((720, 1280, 3), dtype=np.uint8)
|
|
|
|
processed = []
|
|
|
|
# Apply effects to each input frame
|
|
for i, (frame, effects) in enumerate(zip(frames, effects_per_frame)):
|
|
result = frame.copy()
|
|
for effect_config in effects:
|
|
effect_name = effect_config.get("effect", "")
|
|
if effect_name:
|
|
result = self._apply_effect(
|
|
result, effect_name, effect_config, t, analysis_data
|
|
)
|
|
processed.append(result)
|
|
|
|
# Composite layers
|
|
if len(processed) == 1:
|
|
return processed[0]
|
|
|
|
return self._composite(processed, compositor_config, t, analysis_data)
|
|
|
|
def _composite(
|
|
self,
|
|
frames: List[np.ndarray],
|
|
config: Dict,
|
|
t: float,
|
|
analysis_data: Dict,
|
|
) -> np.ndarray:
|
|
"""Composite multiple frames into one."""
|
|
mode = config.get("mode", "alpha")
|
|
weights = config.get("weights", [1.0 / len(frames)] * len(frames))
|
|
|
|
# Resolve weight bindings
|
|
resolved_weights = []
|
|
for w in weights:
|
|
resolved_weights.append(self._resolve_binding(w, t, analysis_data))
|
|
|
|
# Normalize weights
|
|
total = sum(resolved_weights)
|
|
if total > 0:
|
|
resolved_weights = [w / total for w in resolved_weights]
|
|
else:
|
|
resolved_weights = [1.0 / len(frames)] * len(frames)
|
|
|
|
# Resize frames to match first frame
|
|
target_h, target_w = frames[0].shape[:2]
|
|
resized = []
|
|
for frame in frames:
|
|
if frame.shape[:2] != (target_h, target_w):
|
|
import cv2
|
|
frame = cv2.resize(frame, (target_w, target_h))
|
|
resized.append(frame.astype(np.float32))
|
|
|
|
# Weighted blend
|
|
result = np.zeros_like(resized[0])
|
|
for frame, weight in zip(resized, resolved_weights):
|
|
result += frame * weight
|
|
|
|
return np.clip(result, 0, 255).astype(np.uint8)
|
|
|
|
|
|
class GLSLBackend(Backend):
|
|
"""
|
|
GPU-based effect processing using OpenGL/GLSL.
|
|
|
|
Requires GPU with OpenGL 3.3+ support (or Mesa software renderer).
|
|
Achieves 30+ fps real-time processing.
|
|
|
|
TODO: Implement when ready for GPU acceleration.
|
|
"""
|
|
|
|
def __init__(self):
|
|
raise NotImplementedError(
|
|
"GLSL backend not yet implemented. Use NumpyBackend for now."
|
|
)
|
|
|
|
def load_effect(self, effect_path: Path) -> Any:
|
|
pass
|
|
|
|
def process_frame(
|
|
self,
|
|
frames: List[np.ndarray],
|
|
effects_per_frame: List[List[Dict]],
|
|
compositor_config: Dict,
|
|
t: float,
|
|
analysis_data: Dict,
|
|
) -> np.ndarray:
|
|
pass
|
|
|
|
|
|
def get_backend(name: str = "numpy", **kwargs) -> Backend:
|
|
"""
|
|
Get a backend by name.
|
|
|
|
Args:
|
|
name: "numpy" or "glsl"
|
|
**kwargs: Backend-specific options
|
|
|
|
Returns:
|
|
Backend instance
|
|
"""
|
|
if name == "numpy":
|
|
return NumpyBackend(**kwargs)
|
|
elif name == "glsl":
|
|
return GLSLBackend(**kwargs)
|
|
else:
|
|
raise ValueError(f"Unknown backend: {name}")
|