Add sexp to CUDA kernel compiler for fused pipeline
Some checks are pending
GPU Worker CI/CD / test (push) Waiting to run
GPU Worker CI/CD / deploy (push) Blocked by required conditions

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
giles
2026-02-04 09:46:40 +00:00
parent 4d95ec5a32
commit 3b964ba18d

350
streaming/sexp_to_cuda.py Normal file
View File

@@ -0,0 +1,350 @@
"""
Sexp to CUDA Kernel Compiler.
Compiles sexp frame pipelines to fused CUDA kernels for maximum performance.
Instead of interpreting sexp and launching 10+ kernels per frame,
generates a single kernel that does everything in one pass.
"""
import cupy as cp
import numpy as np
from typing import Dict, List, Any, Optional, Tuple
import hashlib
import sys
# Kernel cache
_COMPILED_KERNELS: Dict[str, Any] = {}
def compile_frame_pipeline(effects: List[dict], width: int, height: int) -> callable:
"""
Compile a list of effects to a fused CUDA kernel.
Args:
effects: List of effect dicts like:
[{'op': 'rotate', 'angle': 45.0},
{'op': 'blend', 'alpha': 0.5, 'src2': <gpu_array>},
{'op': 'hue_shift', 'degrees': 90.0},
{'op': 'ripple', 'amplitude': 10.0, 'frequency': 8.0, ...}]
width, height: Frame dimensions
Returns:
Callable that takes input frame and returns output frame
"""
# Generate cache key
ops_key = str([(e['op'], {k:v for k,v in e.items() if k != 'src2'}) for e in effects])
cache_key = f"{width}x{height}_{hashlib.md5(ops_key.encode()).hexdigest()}"
if cache_key in _COMPILED_KERNELS:
return _COMPILED_KERNELS[cache_key]
# Generate fused kernel code
kernel_code = _generate_fused_kernel(effects, width, height)
# Compile kernel
kernel = cp.RawKernel(kernel_code, 'fused_pipeline')
# Create wrapper function
def run_pipeline(frame: cp.ndarray, **dynamic_params) -> cp.ndarray:
"""Run the compiled pipeline on a frame."""
if frame.dtype != cp.uint8:
frame = cp.clip(frame, 0, 255).astype(cp.uint8)
if not frame.flags['C_CONTIGUOUS']:
frame = cp.ascontiguousarray(frame)
output = cp.zeros_like(frame)
block = (16, 16)
grid = ((width + 15) // 16, (height + 15) // 16)
# Build parameter array
params = _build_params(effects, dynamic_params)
kernel(grid, block, (frame, output, width, height, params))
return output
_COMPILED_KERNELS[cache_key] = run_pipeline
return run_pipeline
def _generate_fused_kernel(effects: List[dict], width: int, height: int) -> str:
"""Generate CUDA kernel code for fused effects pipeline."""
# Build the kernel
code = r'''
extern "C" __global__
void fused_pipeline(
const unsigned char* src,
unsigned char* dst,
int width, int height,
const float* params
) {
int x = blockIdx.x * blockDim.x + threadIdx.x;
int y = blockIdx.y * blockDim.y + threadIdx.y;
if (x >= width || y >= height) return;
// Start with source coordinates
float src_x = (float)x;
float src_y = (float)y;
float cx = width / 2.0f;
float cy = height / 2.0f;
// Track accumulated transforms
float total_cos = 1.0f, total_sin = 0.0f; // rotation
float total_zoom = 1.0f; // zoom
float ripple_dx = 0.0f, ripple_dy = 0.0f; // ripple displacement
int param_idx = 0;
'''
# Add effect-specific code
for i, effect in enumerate(effects):
op = effect['op']
if op == 'rotate':
code += f'''
// Rotate {i}
{{
float angle = params[param_idx++] * 3.14159265f / 180.0f;
float c = cosf(angle);
float s = sinf(angle);
// Compose with existing rotation
float nc = total_cos * c - total_sin * s;
float ns = total_cos * s + total_sin * c;
total_cos = nc;
total_sin = ns;
}}
'''
elif op == 'zoom':
code += f'''
// Zoom {i}
{{
float zoom = params[param_idx++];
total_zoom *= zoom;
}}
'''
elif op == 'ripple':
code += f'''
// Ripple {i}
{{
float amplitude = params[param_idx++];
float frequency = params[param_idx++];
float decay = params[param_idx++];
float phase = params[param_idx++];
float rcx = params[param_idx++];
float rcy = params[param_idx++];
float rdx = src_x - rcx;
float rdy = src_y - rcy;
float dist = sqrtf(rdx * rdx + rdy * rdy);
float wave = sinf(dist * frequency * 0.1f + phase);
float amp = amplitude * expf(-dist * decay * 0.01f);
if (dist > 0.001f) {
ripple_dx += rdx / dist * wave * amp;
ripple_dy += rdy / dist * wave * amp;
}
}}
'''
# Apply all geometric transforms at once
code += '''
// Apply accumulated geometric transforms
{
// Translate to center
float dx = src_x - cx;
float dy = src_y - cy;
// Apply rotation
float rx = total_cos * dx + total_sin * dy;
float ry = -total_sin * dx + total_cos * dy;
// Apply zoom (inverse for sampling)
rx /= total_zoom;
ry /= total_zoom;
// Translate back and apply ripple
src_x = rx + cx - ripple_dx;
src_y = ry + cy - ripple_dy;
}
// Sample source with bilinear interpolation
float r, g, b;
if (src_x < 0 || src_x >= width - 1 || src_y < 0 || src_y >= height - 1) {
r = g = b = 0;
} else {
int x0 = (int)src_x;
int y0 = (int)src_y;
float fx = src_x - x0;
float fy = src_y - y0;
int idx00 = (y0 * width + x0) * 3;
int idx10 = (y0 * width + x0 + 1) * 3;
int idx01 = ((y0 + 1) * width + x0) * 3;
int idx11 = ((y0 + 1) * width + x0 + 1) * 3;
#define BILERP(c) \\
(src[idx00 + c] * (1-fx) * (1-fy) + \\
src[idx10 + c] * fx * (1-fy) + \\
src[idx01 + c] * (1-fx) * fy + \\
src[idx11 + c] * fx * fy)
r = BILERP(0);
g = BILERP(1);
b = BILERP(2);
}
'''
# Add color transforms
for i, effect in enumerate(effects):
op = effect['op']
if op == 'invert':
code += f'''
// Invert {i}
{{
float amount = params[param_idx++];
if (amount > 0.5f) {{
r = 255.0f - r;
g = 255.0f - g;
b = 255.0f - b;
}}
}}
'''
elif op == 'hue_shift':
code += f'''
// Hue shift {i}
{{
float shift = params[param_idx++];
if (fabsf(shift) > 0.01f) {{
// RGB to HSV
float rf = r / 255.0f;
float gf = g / 255.0f;
float bf = b / 255.0f;
float max_c = fmaxf(rf, fmaxf(gf, bf));
float min_c = fminf(rf, fminf(gf, bf));
float delta = max_c - min_c;
float h = 0, s = 0, v = max_c;
if (delta > 0.00001f) {{
s = delta / max_c;
if (rf >= max_c) h = (gf - bf) / delta;
else if (gf >= max_c) h = 2.0f + (bf - rf) / delta;
else h = 4.0f + (rf - gf) / delta;
h *= 60.0f;
if (h < 0) h += 360.0f;
}}
h = fmodf(h + shift + 360.0f, 360.0f);
// HSV to RGB
float c = v * s;
float x_val = c * (1 - fabsf(fmodf(h / 60.0f, 2.0f) - 1));
float m = v - c;
float r2, g2, b2;
if (h < 60) {{ r2 = c; g2 = x_val; b2 = 0; }}
else if (h < 120) {{ r2 = x_val; g2 = c; b2 = 0; }}
else if (h < 180) {{ r2 = 0; g2 = c; b2 = x_val; }}
else if (h < 240) {{ r2 = 0; g2 = x_val; b2 = c; }}
else if (h < 300) {{ r2 = x_val; g2 = 0; b2 = c; }}
else {{ r2 = c; g2 = 0; b2 = x_val; }}
r = (r2 + m) * 255.0f;
g = (g2 + m) * 255.0f;
b = (b2 + m) * 255.0f;
}}
}}
'''
elif op == 'brightness':
code += f'''
// Brightness {i}
{{
float factor = params[param_idx++];
r *= factor;
g *= factor;
b *= factor;
}}
'''
# Write output
code += '''
// Write output
int dst_idx = (y * width + x) * 3;
dst[dst_idx] = (unsigned char)fminf(255.0f, fmaxf(0.0f, r));
dst[dst_idx + 1] = (unsigned char)fminf(255.0f, fmaxf(0.0f, g));
dst[dst_idx + 2] = (unsigned char)fminf(255.0f, fmaxf(0.0f, b));
}
'''
return code
def _build_params(effects: List[dict], dynamic_params: dict) -> cp.ndarray:
"""Build parameter array for kernel."""
params = []
for effect in effects:
op = effect['op']
if op == 'rotate':
params.append(float(dynamic_params.get('rotate_angle', effect.get('angle', 0))))
elif op == 'zoom':
params.append(float(dynamic_params.get('zoom_amount', effect.get('amount', 1.0))))
elif op == 'ripple':
params.append(float(effect.get('amplitude', 10)))
params.append(float(effect.get('frequency', 8)))
params.append(float(effect.get('decay', 2)))
params.append(float(dynamic_params.get('ripple_phase', effect.get('phase', 0))))
params.append(float(effect.get('center_x', 960)))
params.append(float(effect.get('center_y', 540)))
elif op == 'invert':
params.append(float(effect.get('amount', 0)))
elif op == 'hue_shift':
params.append(float(effect.get('degrees', 0)))
elif op == 'brightness':
params.append(float(effect.get('factor', 1.0)))
return cp.array(params, dtype=cp.float32)
# Test the compiler
if __name__ == '__main__':
print("[sexp_to_cuda] Testing fused kernel compiler...")
# Define a test pipeline
effects = [
{'op': 'rotate', 'angle': 45.0},
{'op': 'zoom', 'amount': 1.2},
{'op': 'hue_shift', 'degrees': 30.0},
{'op': 'ripple', 'amplitude': 10, 'frequency': 8, 'decay': 2, 'phase': 0, 'center_x': 960, 'center_y': 540},
]
# Compile
pipeline = compile_frame_pipeline(effects, 1920, 1080)
# Test with dummy frame
import time
frame = cp.random.randint(0, 255, (1080, 1920, 3), dtype=cp.uint8)
# Warmup
output = pipeline(frame)
cp.cuda.Stream.null.synchronize()
# Benchmark
start = time.time()
for i in range(100):
output = pipeline(frame, rotate_angle=i, ripple_phase=i*0.1)
cp.cuda.Stream.null.synchronize()
elapsed = time.time() - start
print(f"Fused kernel: {elapsed/100*1000:.2f}ms per frame")
print(f"That's {100/elapsed:.0f} fps potential!")