- Add IPFSHLSOutput class that uploads segments to IPFS as they're created - Update streaming task to use IPFS HLS output for distributed streaming - Add /ipfs-stream endpoint to get IPFS playlist URL - Update /stream endpoint to redirect to IPFS when available - Add GPU persistence mode (STREAMING_GPU_PERSIST=1) to keep frames on GPU - Add hardware video decoding (NVDEC) support for faster video processing - Add GPU-accelerated primitive libraries: blending_gpu, color_ops_gpu, geometry_gpu - Add streaming_gpu module with GPUFrame class for tracking CPU/GPU data location - Add Dockerfile.gpu for building GPU-enabled worker image Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
573 lines
19 KiB
Python
573 lines
19 KiB
Python
"""
|
|
Effect processing backends.
|
|
|
|
Provides abstraction over different rendering backends:
|
|
- numpy: CPU-based, works everywhere, ~3-5 fps
|
|
- glsl: GPU-based, requires OpenGL, 30+ fps (future)
|
|
"""
|
|
|
|
import numpy as np
|
|
from abc import ABC, abstractmethod
|
|
from typing import List, Dict, Any, Optional
|
|
from pathlib import Path
|
|
|
|
|
|
class Backend(ABC):
|
|
"""Abstract base class for effect processing backends."""
|
|
|
|
@abstractmethod
|
|
def process_frame(
|
|
self,
|
|
frames: List[np.ndarray],
|
|
effects_per_frame: List[List[Dict]],
|
|
compositor_config: Dict,
|
|
t: float,
|
|
analysis_data: Dict,
|
|
) -> np.ndarray:
|
|
"""
|
|
Process multiple input frames through effects and composite.
|
|
|
|
Args:
|
|
frames: List of input frames (one per source)
|
|
effects_per_frame: List of effect chains (one per source)
|
|
compositor_config: How to blend the layers
|
|
t: Current time in seconds
|
|
analysis_data: Analysis data for binding resolution
|
|
|
|
Returns:
|
|
Composited output frame
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def load_effect(self, effect_path: Path) -> Any:
|
|
"""Load an effect definition."""
|
|
pass
|
|
|
|
|
|
class NumpyBackend(Backend):
|
|
"""
|
|
CPU-based effect processing using NumPy.
|
|
|
|
Uses existing sexp_effects interpreter for effect execution.
|
|
Works on any system, but limited to ~3-5 fps for complex effects.
|
|
"""
|
|
|
|
def __init__(self, recipe_dir: Path = None, minimal_primitives: bool = True):
|
|
self.recipe_dir = recipe_dir or Path(".")
|
|
self.minimal_primitives = minimal_primitives
|
|
self._interpreter = None
|
|
self._loaded_effects = {}
|
|
|
|
def _get_interpreter(self):
|
|
"""Lazy-load the sexp interpreter."""
|
|
if self._interpreter is None:
|
|
from sexp_effects import get_interpreter
|
|
self._interpreter = get_interpreter(minimal_primitives=self.minimal_primitives)
|
|
return self._interpreter
|
|
|
|
def load_effect(self, effect_path: Path) -> Any:
|
|
"""Load an effect from sexp file."""
|
|
if isinstance(effect_path, str):
|
|
effect_path = Path(effect_path)
|
|
effect_key = str(effect_path)
|
|
if effect_key not in self._loaded_effects:
|
|
interp = self._get_interpreter()
|
|
interp.load_effect(str(effect_path))
|
|
self._loaded_effects[effect_key] = effect_path.stem
|
|
return self._loaded_effects[effect_key]
|
|
|
|
def _resolve_binding(self, value: Any, t: float, analysis_data: Dict) -> Any:
|
|
"""Resolve a parameter binding to its value at time t."""
|
|
if not isinstance(value, dict):
|
|
return value
|
|
|
|
if "_binding" in value or "_bind" in value:
|
|
source = value.get("source") or value.get("_bind")
|
|
feature = value.get("feature", "values")
|
|
range_map = value.get("range")
|
|
|
|
track = analysis_data.get(source, {})
|
|
times = track.get("times", [])
|
|
values = track.get("values", [])
|
|
|
|
if not times or not values:
|
|
return 0.0
|
|
|
|
# Find value at time t (linear interpolation)
|
|
if t <= times[0]:
|
|
val = values[0]
|
|
elif t >= times[-1]:
|
|
val = values[-1]
|
|
else:
|
|
# Binary search for bracket
|
|
for i in range(len(times) - 1):
|
|
if times[i] <= t <= times[i + 1]:
|
|
alpha = (t - times[i]) / (times[i + 1] - times[i])
|
|
val = values[i] * (1 - alpha) + values[i + 1] * alpha
|
|
break
|
|
else:
|
|
val = values[-1]
|
|
|
|
# Apply range mapping
|
|
if range_map and len(range_map) == 2:
|
|
val = range_map[0] + val * (range_map[1] - range_map[0])
|
|
|
|
return val
|
|
|
|
return value
|
|
|
|
def _apply_effect(
|
|
self,
|
|
frame: np.ndarray,
|
|
effect_name: str,
|
|
params: Dict,
|
|
t: float,
|
|
analysis_data: Dict,
|
|
) -> np.ndarray:
|
|
"""Apply a single effect to a frame."""
|
|
# Resolve bindings in params
|
|
resolved_params = {"_time": t}
|
|
for key, value in params.items():
|
|
if key in ("effect", "effect_path", "cid", "analysis_refs"):
|
|
continue
|
|
resolved_params[key] = self._resolve_binding(value, t, analysis_data)
|
|
|
|
# Try fast native effects first
|
|
result = self._apply_native_effect(frame, effect_name, resolved_params)
|
|
if result is not None:
|
|
return result
|
|
|
|
# Fall back to sexp interpreter for complex effects
|
|
interp = self._get_interpreter()
|
|
if effect_name in interp.effects:
|
|
result, _ = interp.run_effect(effect_name, frame, resolved_params, {})
|
|
return result
|
|
|
|
# Unknown effect - pass through
|
|
return frame
|
|
|
|
def _apply_native_effect(
|
|
self,
|
|
frame: np.ndarray,
|
|
effect_name: str,
|
|
params: Dict,
|
|
) -> Optional[np.ndarray]:
|
|
"""Fast native numpy effects for real-time streaming."""
|
|
import cv2
|
|
|
|
if effect_name == "zoom":
|
|
amount = float(params.get("amount", 1.0))
|
|
if abs(amount - 1.0) < 0.01:
|
|
return frame
|
|
h, w = frame.shape[:2]
|
|
# Crop center and resize
|
|
new_w, new_h = int(w / amount), int(h / amount)
|
|
x1, y1 = (w - new_w) // 2, (h - new_h) // 2
|
|
cropped = frame[y1:y1+new_h, x1:x1+new_w]
|
|
return cv2.resize(cropped, (w, h))
|
|
|
|
elif effect_name == "rotate":
|
|
angle = float(params.get("angle", 0))
|
|
if abs(angle) < 0.5:
|
|
return frame
|
|
h, w = frame.shape[:2]
|
|
center = (w // 2, h // 2)
|
|
matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
|
|
return cv2.warpAffine(frame, matrix, (w, h))
|
|
|
|
elif effect_name == "brightness":
|
|
amount = float(params.get("amount", 1.0))
|
|
return np.clip(frame * amount, 0, 255).astype(np.uint8)
|
|
|
|
elif effect_name == "invert":
|
|
amount = float(params.get("amount", 1.0))
|
|
if amount < 0.5:
|
|
return frame
|
|
return 255 - frame
|
|
|
|
# Not a native effect
|
|
return None
|
|
|
|
def process_frame(
|
|
self,
|
|
frames: List[np.ndarray],
|
|
effects_per_frame: List[List[Dict]],
|
|
compositor_config: Dict,
|
|
t: float,
|
|
analysis_data: Dict,
|
|
) -> np.ndarray:
|
|
"""
|
|
Process frames through effects and composite.
|
|
"""
|
|
if not frames:
|
|
return np.zeros((720, 1280, 3), dtype=np.uint8)
|
|
|
|
processed = []
|
|
|
|
# Apply effects to each input frame
|
|
for i, (frame, effects) in enumerate(zip(frames, effects_per_frame)):
|
|
result = frame.copy()
|
|
for effect_config in effects:
|
|
effect_name = effect_config.get("effect", "")
|
|
if effect_name:
|
|
result = self._apply_effect(
|
|
result, effect_name, effect_config, t, analysis_data
|
|
)
|
|
processed.append(result)
|
|
|
|
# Composite layers
|
|
if len(processed) == 1:
|
|
return processed[0]
|
|
|
|
return self._composite(processed, compositor_config, t, analysis_data)
|
|
|
|
def _composite(
|
|
self,
|
|
frames: List[np.ndarray],
|
|
config: Dict,
|
|
t: float,
|
|
analysis_data: Dict,
|
|
) -> np.ndarray:
|
|
"""Composite multiple frames into one."""
|
|
mode = config.get("mode", "alpha")
|
|
weights = config.get("weights", [1.0 / len(frames)] * len(frames))
|
|
|
|
# Resolve weight bindings
|
|
resolved_weights = []
|
|
for w in weights:
|
|
resolved_weights.append(self._resolve_binding(w, t, analysis_data))
|
|
|
|
# Normalize weights
|
|
total = sum(resolved_weights)
|
|
if total > 0:
|
|
resolved_weights = [w / total for w in resolved_weights]
|
|
else:
|
|
resolved_weights = [1.0 / len(frames)] * len(frames)
|
|
|
|
# Resize frames to match first frame
|
|
target_h, target_w = frames[0].shape[:2]
|
|
resized = []
|
|
for frame in frames:
|
|
if frame.shape[:2] != (target_h, target_w):
|
|
import cv2
|
|
frame = cv2.resize(frame, (target_w, target_h))
|
|
resized.append(frame.astype(np.float32))
|
|
|
|
# Weighted blend
|
|
result = np.zeros_like(resized[0])
|
|
for frame, weight in zip(resized, resolved_weights):
|
|
result += frame * weight
|
|
|
|
return np.clip(result, 0, 255).astype(np.uint8)
|
|
|
|
|
|
class WGPUBackend(Backend):
|
|
"""
|
|
GPU-based effect processing using wgpu/WebGPU compute shaders.
|
|
|
|
Compiles sexp effects to WGSL at load time, executes on GPU.
|
|
Achieves 30+ fps real-time processing on supported hardware.
|
|
|
|
Requirements:
|
|
- wgpu-py library
|
|
- Vulkan-capable GPU (or software renderer)
|
|
"""
|
|
|
|
def __init__(self, recipe_dir: Path = None):
|
|
self.recipe_dir = recipe_dir or Path(".")
|
|
self._device = None
|
|
self._loaded_effects: Dict[str, Any] = {} # name -> compiled shader info
|
|
self._numpy_fallback = NumpyBackend(recipe_dir)
|
|
# Buffer pool for reuse - keyed by (width, height)
|
|
self._buffer_pool: Dict[tuple, Dict] = {}
|
|
|
|
def _ensure_device(self):
|
|
"""Lazy-initialize wgpu device."""
|
|
if self._device is not None:
|
|
return
|
|
|
|
try:
|
|
import wgpu
|
|
adapter = wgpu.gpu.request_adapter_sync(power_preference="high-performance")
|
|
self._device = adapter.request_device_sync()
|
|
print(f"[WGPUBackend] Using GPU: {adapter.info.get('device', 'unknown')}")
|
|
except Exception as e:
|
|
print(f"[WGPUBackend] GPU init failed: {e}, falling back to CPU")
|
|
self._device = None
|
|
|
|
def load_effect(self, effect_path: Path) -> Any:
|
|
"""Load and compile an effect from sexp file to WGSL."""
|
|
effect_key = str(effect_path)
|
|
if effect_key in self._loaded_effects:
|
|
return self._loaded_effects[effect_key]
|
|
|
|
try:
|
|
from sexp_effects.wgsl_compiler import compile_effect_file
|
|
compiled = compile_effect_file(str(effect_path))
|
|
|
|
self._ensure_device()
|
|
if self._device is None:
|
|
# Fall back to numpy
|
|
return self._numpy_fallback.load_effect(effect_path)
|
|
|
|
# Create shader module
|
|
import wgpu
|
|
shader_module = self._device.create_shader_module(code=compiled.wgsl_code)
|
|
|
|
# Create compute pipeline
|
|
pipeline = self._device.create_compute_pipeline(
|
|
layout="auto",
|
|
compute={"module": shader_module, "entry_point": "main"}
|
|
)
|
|
|
|
self._loaded_effects[effect_key] = {
|
|
'compiled': compiled,
|
|
'pipeline': pipeline,
|
|
'name': compiled.name,
|
|
}
|
|
return compiled.name
|
|
|
|
except Exception as e:
|
|
print(f"[WGPUBackend] Failed to compile {effect_path}: {e}")
|
|
# Fall back to numpy for this effect
|
|
return self._numpy_fallback.load_effect(effect_path)
|
|
|
|
def _resolve_binding(self, value: Any, t: float, analysis_data: Dict) -> Any:
|
|
"""Resolve a parameter binding to its value at time t."""
|
|
# Delegate to numpy backend's implementation
|
|
return self._numpy_fallback._resolve_binding(value, t, analysis_data)
|
|
|
|
def _get_or_create_buffers(self, w: int, h: int):
|
|
"""Get or create reusable buffers for given dimensions."""
|
|
import wgpu
|
|
|
|
key = (w, h)
|
|
if key in self._buffer_pool:
|
|
return self._buffer_pool[key]
|
|
|
|
size = w * h * 4 # u32 per pixel
|
|
|
|
# Create staging buffer for uploads (MAP_WRITE)
|
|
staging_buffer = self._device.create_buffer(
|
|
size=size,
|
|
usage=wgpu.BufferUsage.MAP_WRITE | wgpu.BufferUsage.COPY_SRC,
|
|
mapped_at_creation=False,
|
|
)
|
|
|
|
# Create input buffer (STORAGE, receives data from staging)
|
|
input_buffer = self._device.create_buffer(
|
|
size=size,
|
|
usage=wgpu.BufferUsage.STORAGE | wgpu.BufferUsage.COPY_DST,
|
|
)
|
|
|
|
# Create output buffer (STORAGE + COPY_SRC for readback)
|
|
output_buffer = self._device.create_buffer(
|
|
size=size,
|
|
usage=wgpu.BufferUsage.STORAGE | wgpu.BufferUsage.COPY_SRC,
|
|
)
|
|
|
|
# Params buffer (uniform, 256 bytes should be enough)
|
|
params_buffer = self._device.create_buffer(
|
|
size=256,
|
|
usage=wgpu.BufferUsage.UNIFORM | wgpu.BufferUsage.COPY_DST,
|
|
)
|
|
|
|
self._buffer_pool[key] = {
|
|
'staging': staging_buffer,
|
|
'input': input_buffer,
|
|
'output': output_buffer,
|
|
'params': params_buffer,
|
|
'size': size,
|
|
}
|
|
return self._buffer_pool[key]
|
|
|
|
def _apply_effect_gpu(
|
|
self,
|
|
frame: np.ndarray,
|
|
effect_name: str,
|
|
params: Dict,
|
|
t: float,
|
|
) -> Optional[np.ndarray]:
|
|
"""Apply effect using GPU. Returns None if GPU not available."""
|
|
import wgpu
|
|
|
|
# Find the loaded effect
|
|
effect_info = None
|
|
for key, info in self._loaded_effects.items():
|
|
if info.get('name') == effect_name:
|
|
effect_info = info
|
|
break
|
|
|
|
if effect_info is None or self._device is None:
|
|
return None
|
|
|
|
compiled = effect_info['compiled']
|
|
pipeline = effect_info['pipeline']
|
|
|
|
h, w = frame.shape[:2]
|
|
|
|
# Get reusable buffers
|
|
buffers = self._get_or_create_buffers(w, h)
|
|
|
|
# Pack frame as u32 array (RGB -> packed u32)
|
|
r = frame[:, :, 0].astype(np.uint32)
|
|
g = frame[:, :, 1].astype(np.uint32)
|
|
b = frame[:, :, 2].astype(np.uint32)
|
|
packed = (r << 16) | (g << 8) | b
|
|
input_data = packed.flatten().astype(np.uint32)
|
|
|
|
# Upload input data via queue.write_buffer (more efficient than recreation)
|
|
self._device.queue.write_buffer(buffers['input'], 0, input_data.tobytes())
|
|
|
|
# Build params struct
|
|
import struct
|
|
param_values = [w, h] # width, height as u32
|
|
param_format = "II" # two u32
|
|
|
|
# Add time as f32
|
|
param_values.append(t)
|
|
param_format += "f"
|
|
|
|
# Add effect-specific params
|
|
for param in compiled.params:
|
|
val = params.get(param.name, param.default)
|
|
if val is None:
|
|
val = 0
|
|
if param.wgsl_type == 'f32':
|
|
param_values.append(float(val))
|
|
param_format += "f"
|
|
elif param.wgsl_type == 'i32':
|
|
param_values.append(int(val))
|
|
param_format += "i"
|
|
elif param.wgsl_type == 'u32':
|
|
param_values.append(int(val))
|
|
param_format += "I"
|
|
|
|
# Pad to 16-byte alignment
|
|
param_bytes = struct.pack(param_format, *param_values)
|
|
while len(param_bytes) % 16 != 0:
|
|
param_bytes += b'\x00'
|
|
|
|
self._device.queue.write_buffer(buffers['params'], 0, param_bytes)
|
|
|
|
# Create bind group (unfortunately this can't be easily reused with different effects)
|
|
bind_group = self._device.create_bind_group(
|
|
layout=pipeline.get_bind_group_layout(0),
|
|
entries=[
|
|
{"binding": 0, "resource": {"buffer": buffers['input']}},
|
|
{"binding": 1, "resource": {"buffer": buffers['output']}},
|
|
{"binding": 2, "resource": {"buffer": buffers['params']}},
|
|
]
|
|
)
|
|
|
|
# Dispatch compute
|
|
encoder = self._device.create_command_encoder()
|
|
compute_pass = encoder.begin_compute_pass()
|
|
compute_pass.set_pipeline(pipeline)
|
|
compute_pass.set_bind_group(0, bind_group)
|
|
|
|
# Workgroups: ceil(w/16) x ceil(h/16)
|
|
wg_x = (w + 15) // 16
|
|
wg_y = (h + 15) // 16
|
|
compute_pass.dispatch_workgroups(wg_x, wg_y, 1)
|
|
compute_pass.end()
|
|
|
|
self._device.queue.submit([encoder.finish()])
|
|
|
|
# Read back result
|
|
result_data = self._device.queue.read_buffer(buffers['output'])
|
|
result_packed = np.frombuffer(result_data, dtype=np.uint32).reshape(h, w)
|
|
|
|
# Unpack u32 -> RGB
|
|
result = np.zeros((h, w, 3), dtype=np.uint8)
|
|
result[:, :, 0] = ((result_packed >> 16) & 0xFF).astype(np.uint8)
|
|
result[:, :, 1] = ((result_packed >> 8) & 0xFF).astype(np.uint8)
|
|
result[:, :, 2] = (result_packed & 0xFF).astype(np.uint8)
|
|
|
|
return result
|
|
|
|
def _apply_effect(
|
|
self,
|
|
frame: np.ndarray,
|
|
effect_name: str,
|
|
params: Dict,
|
|
t: float,
|
|
analysis_data: Dict,
|
|
) -> np.ndarray:
|
|
"""Apply a single effect to a frame."""
|
|
# Resolve bindings in params
|
|
resolved_params = {"_time": t}
|
|
for key, value in params.items():
|
|
if key in ("effect", "effect_path", "cid", "analysis_refs"):
|
|
continue
|
|
resolved_params[key] = self._resolve_binding(value, t, analysis_data)
|
|
|
|
# Try GPU first
|
|
self._ensure_device()
|
|
if self._device is not None:
|
|
result = self._apply_effect_gpu(frame, effect_name, resolved_params, t)
|
|
if result is not None:
|
|
return result
|
|
|
|
# Fall back to numpy
|
|
return self._numpy_fallback._apply_effect(
|
|
frame, effect_name, params, t, analysis_data
|
|
)
|
|
|
|
def process_frame(
|
|
self,
|
|
frames: List[np.ndarray],
|
|
effects_per_frame: List[List[Dict]],
|
|
compositor_config: Dict,
|
|
t: float,
|
|
analysis_data: Dict,
|
|
) -> np.ndarray:
|
|
"""Process frames through effects and composite."""
|
|
if not frames:
|
|
return np.zeros((720, 1280, 3), dtype=np.uint8)
|
|
|
|
processed = []
|
|
|
|
# Apply effects to each input frame
|
|
for i, (frame, effects) in enumerate(zip(frames, effects_per_frame)):
|
|
result = frame.copy()
|
|
for effect_config in effects:
|
|
effect_name = effect_config.get("effect", "")
|
|
if effect_name:
|
|
result = self._apply_effect(
|
|
result, effect_name, effect_config, t, analysis_data
|
|
)
|
|
processed.append(result)
|
|
|
|
# Composite layers (use numpy backend for now)
|
|
if len(processed) == 1:
|
|
return processed[0]
|
|
|
|
return self._numpy_fallback._composite(
|
|
processed, compositor_config, t, analysis_data
|
|
)
|
|
|
|
|
|
# Keep GLSLBackend as alias for backwards compatibility
|
|
GLSLBackend = WGPUBackend
|
|
|
|
|
|
def get_backend(name: str = "numpy", **kwargs) -> Backend:
|
|
"""
|
|
Get a backend by name.
|
|
|
|
Args:
|
|
name: "numpy", "wgpu", or "glsl" (alias for wgpu)
|
|
**kwargs: Backend-specific options
|
|
|
|
Returns:
|
|
Backend instance
|
|
"""
|
|
if name == "numpy":
|
|
return NumpyBackend(**kwargs)
|
|
elif name in ("wgpu", "glsl", "gpu"):
|
|
return WGPUBackend(**kwargs)
|
|
else:
|
|
raise ValueError(f"Unknown backend: {name}")
|