""" Sexp to CUDA Kernel Compiler. Compiles sexp frame pipelines to fused CUDA kernels for maximum performance. Instead of interpreting sexp and launching 10+ kernels per frame, generates a single kernel that does everything in one pass. """ import cupy as cp import numpy as np from typing import Dict, List, Any, Optional, Tuple import hashlib import sys import logging logger = logging.getLogger(__name__) # Kernel cache _COMPILED_KERNELS: Dict[str, Any] = {} def compile_frame_pipeline(effects: List[dict], width: int, height: int) -> callable: """ Compile a list of effects to a fused CUDA kernel. Args: effects: List of effect dicts like: [{'op': 'rotate', 'angle': 45.0}, {'op': 'blend', 'alpha': 0.5, 'src2': }, {'op': 'hue_shift', 'degrees': 90.0}, {'op': 'ripple', 'amplitude': 10.0, 'frequency': 8.0, ...}] width, height: Frame dimensions Returns: Callable that takes input frame and returns output frame """ # Generate cache key ops_key = str([(e['op'], {k:v for k,v in e.items() if k != 'src2'}) for e in effects]) cache_key = f"{width}x{height}_{hashlib.md5(ops_key.encode()).hexdigest()}" if cache_key in _COMPILED_KERNELS: return _COMPILED_KERNELS[cache_key] # Generate fused kernel code kernel_code = _generate_fused_kernel(effects, width, height) # Compile kernel kernel = cp.RawKernel(kernel_code, 'fused_pipeline') # Create wrapper function def run_pipeline(frame: cp.ndarray, **dynamic_params) -> cp.ndarray: """Run the compiled pipeline on a frame.""" if frame.dtype != cp.uint8: frame = cp.clip(frame, 0, 255).astype(cp.uint8) if not frame.flags['C_CONTIGUOUS']: frame = cp.ascontiguousarray(frame) output = cp.zeros_like(frame) block = (16, 16) grid = ((width + 15) // 16, (height + 15) // 16) # Build parameter array params = _build_params(effects, dynamic_params) kernel(grid, block, (frame, output, width, height, params)) return output _COMPILED_KERNELS[cache_key] = run_pipeline return run_pipeline def _generate_fused_kernel(effects: List[dict], width: int, height: int) -> str: """Generate CUDA kernel code for fused effects pipeline.""" # Validate all ops are supported SUPPORTED_OPS = {'rotate', 'zoom', 'ripple', 'invert', 'hue_shift', 'brightness'} for effect in effects: op = effect.get('op') if op not in SUPPORTED_OPS: raise ValueError(f"Unsupported CUDA kernel operation: '{op}'. Supported ops: {', '.join(sorted(SUPPORTED_OPS))}. Note: 'resize' must be handled separately before the fused kernel.") # Build the kernel code = r''' extern "C" __global__ void fused_pipeline( const unsigned char* src, unsigned char* dst, int width, int height, const float* params ) { int x = blockIdx.x * blockDim.x + threadIdx.x; int y = blockIdx.y * blockDim.y + threadIdx.y; if (x >= width || y >= height) return; // Start with source coordinates float src_x = (float)x; float src_y = (float)y; float cx = width / 2.0f; float cy = height / 2.0f; // Track accumulated transforms float total_cos = 1.0f, total_sin = 0.0f; // rotation float total_zoom = 1.0f; // zoom float ripple_dx = 0.0f, ripple_dy = 0.0f; // ripple displacement int param_idx = 0; ''' # Add effect-specific code for i, effect in enumerate(effects): op = effect['op'] if op == 'rotate': code += f''' // Rotate {i} {{ float angle = params[param_idx++] * 3.14159265f / 180.0f; float c = cosf(angle); float s = sinf(angle); // Compose with existing rotation float nc = total_cos * c - total_sin * s; float ns = total_cos * s + total_sin * c; total_cos = nc; total_sin = ns; }} ''' elif op == 'zoom': code += f''' // Zoom {i} {{ float zoom = params[param_idx++]; total_zoom *= zoom; }} ''' elif op == 'ripple': code += f''' // Ripple {i} - matching original formula: sin(dist/freq - phase) * exp(-dist*decay/maxdim) {{ float amplitude = params[param_idx++]; float frequency = params[param_idx++]; float decay = params[param_idx++]; float phase = params[param_idx++]; float rcx = params[param_idx++]; float rcy = params[param_idx++]; float rdx = src_x - rcx; float rdy = src_y - rcy; float dist = sqrtf(rdx * rdx + rdy * rdy); float max_dim = (float)(width > height ? width : height); // Original formula: sin(dist / frequency - phase) * exp(-dist * decay / max_dim) float wave = sinf(dist / frequency - phase); float amp = amplitude * expf(-dist * decay / max_dim); if (dist > 0.001f) {{ ripple_dx += rdx / dist * wave * amp; ripple_dy += rdy / dist * wave * amp; }} }} ''' # Apply all geometric transforms at once code += ''' // Apply accumulated geometric transforms { // Translate to center float dx = src_x - cx; float dy = src_y - cy; // Apply rotation float rx = total_cos * dx + total_sin * dy; float ry = -total_sin * dx + total_cos * dy; // Apply zoom (inverse for sampling) rx /= total_zoom; ry /= total_zoom; // Translate back and apply ripple src_x = rx + cx - ripple_dx; src_y = ry + cy - ripple_dy; } // Sample source with bilinear interpolation float r, g, b; if (src_x < 0 || src_x >= width - 1 || src_y < 0 || src_y >= height - 1) { r = g = b = 0; } else { int x0 = (int)src_x; int y0 = (int)src_y; float fx = src_x - x0; float fy = src_y - y0; int idx00 = (y0 * width + x0) * 3; int idx10 = (y0 * width + x0 + 1) * 3; int idx01 = ((y0 + 1) * width + x0) * 3; int idx11 = ((y0 + 1) * width + x0 + 1) * 3; #define BILERP(c) \\ (src[idx00 + c] * (1-fx) * (1-fy) + \\ src[idx10 + c] * fx * (1-fy) + \\ src[idx01 + c] * (1-fx) * fy + \\ src[idx11 + c] * fx * fy) r = BILERP(0); g = BILERP(1); b = BILERP(2); } ''' # Add color transforms for i, effect in enumerate(effects): op = effect['op'] if op == 'invert': code += f''' // Invert {i} {{ float amount = params[param_idx++]; if (amount > 0.5f) {{ r = 255.0f - r; g = 255.0f - g; b = 255.0f - b; }} }} ''' elif op == 'hue_shift': code += f''' // Hue shift {i} {{ float shift = params[param_idx++]; if (fabsf(shift) > 0.01f) {{ // RGB to HSV float rf = r / 255.0f; float gf = g / 255.0f; float bf = b / 255.0f; float max_c = fmaxf(rf, fmaxf(gf, bf)); float min_c = fminf(rf, fminf(gf, bf)); float delta = max_c - min_c; float h = 0, s = 0, v = max_c; if (delta > 0.00001f) {{ s = delta / max_c; if (rf >= max_c) h = (gf - bf) / delta; else if (gf >= max_c) h = 2.0f + (bf - rf) / delta; else h = 4.0f + (rf - gf) / delta; h *= 60.0f; if (h < 0) h += 360.0f; }} h = fmodf(h + shift + 360.0f, 360.0f); // HSV to RGB float c = v * s; float x_val = c * (1 - fabsf(fmodf(h / 60.0f, 2.0f) - 1)); float m = v - c; float r2, g2, b2; if (h < 60) {{ r2 = c; g2 = x_val; b2 = 0; }} else if (h < 120) {{ r2 = x_val; g2 = c; b2 = 0; }} else if (h < 180) {{ r2 = 0; g2 = c; b2 = x_val; }} else if (h < 240) {{ r2 = 0; g2 = x_val; b2 = c; }} else if (h < 300) {{ r2 = x_val; g2 = 0; b2 = c; }} else {{ r2 = c; g2 = 0; b2 = x_val; }} r = (r2 + m) * 255.0f; g = (g2 + m) * 255.0f; b = (b2 + m) * 255.0f; }} }} ''' elif op == 'brightness': code += f''' // Brightness {i} {{ float factor = params[param_idx++]; r *= factor; g *= factor; b *= factor; }} ''' # Write output code += ''' // Write output int dst_idx = (y * width + x) * 3; dst[dst_idx] = (unsigned char)fminf(255.0f, fmaxf(0.0f, r)); dst[dst_idx + 1] = (unsigned char)fminf(255.0f, fmaxf(0.0f, g)); dst[dst_idx + 2] = (unsigned char)fminf(255.0f, fmaxf(0.0f, b)); } ''' return code _BUILD_PARAMS_COUNT = 0 def _build_params(effects: List[dict], dynamic_params: dict) -> cp.ndarray: """Build parameter array for kernel. IMPORTANT: Parameters must be built in the same order the kernel consumes them: 1. First all geometric transforms (rotate, zoom, ripple) in list order 2. Then all color transforms (invert, hue_shift, brightness) in list order """ global _BUILD_PARAMS_COUNT _BUILD_PARAMS_COUNT += 1 # ALWAYS log first few calls - use WARNING to ensure visibility in Celery logs if _BUILD_PARAMS_COUNT <= 3: logger.warning(f"[BUILD_PARAMS #{_BUILD_PARAMS_COUNT}] effects={[e['op'] for e in effects]}") params = [] # First pass: geometric transforms (matches kernel's first loop) for effect in effects: op = effect['op'] if op == 'rotate': params.append(float(dynamic_params.get('rotate_angle', effect.get('angle', 0)))) elif op == 'zoom': params.append(float(dynamic_params.get('zoom_amount', effect.get('amount', 1.0)))) elif op == 'ripple': amp = float(dynamic_params.get('ripple_amplitude', effect.get('amplitude', 10))) freq = float(effect.get('frequency', 8)) decay = float(effect.get('decay', 2)) phase = float(dynamic_params.get('ripple_phase', effect.get('phase', 0))) cx = float(effect.get('center_x', 960)) cy = float(effect.get('center_y', 540)) params.extend([amp, freq, decay, phase, cx, cy]) if _BUILD_PARAMS_COUNT <= 10 or _BUILD_PARAMS_COUNT % 500 == 0: logger.warning(f"[BUILD_PARAMS #{_BUILD_PARAMS_COUNT}] ripple amp={amp} freq={freq} decay={decay} phase={phase:.2f} cx={cx} cy={cy}") # Second pass: color transforms (matches kernel's second loop) for effect in effects: op = effect['op'] if op == 'invert': amt = float(effect.get('amount', 0)) params.append(amt) if _BUILD_PARAMS_COUNT <= 10 or _BUILD_PARAMS_COUNT % 500 == 0: logger.warning(f"[BUILD_PARAMS #{_BUILD_PARAMS_COUNT}] invert amount={amt}") elif op == 'hue_shift': deg = float(effect.get('degrees', 0)) params.append(deg) if _BUILD_PARAMS_COUNT <= 10 or _BUILD_PARAMS_COUNT % 500 == 0: logger.warning(f"[BUILD_PARAMS #{_BUILD_PARAMS_COUNT}] hue_shift degrees={deg}") elif op == 'brightness': params.append(float(effect.get('factor', 1.0))) return cp.array(params, dtype=cp.float32) def compile_autonomous_pipeline(effects: List[dict], width: int, height: int, dynamic_expressions: dict = None) -> callable: """ Compile a fully autonomous pipeline that computes ALL parameters on GPU. This eliminates Python from the hot path - the kernel computes time-based parameters (sin, cos, etc.) directly on GPU. Args: effects: List of effect dicts width, height: Frame dimensions dynamic_expressions: Dict mapping param names to expressions, e.g.: {'rotate_angle': 't * 30', 'ripple_phase': 't * 2', 'brightness_factor': '0.8 + 0.4 * sin(t * 2)'} Returns: Callable that takes (frame, frame_num, fps) and returns output frame """ if dynamic_expressions is None: dynamic_expressions = {} # Generate cache key ops_key = str([(e['op'], {k:v for k,v in e.items() if k != 'src2'}) for e in effects]) expr_key = str(sorted(dynamic_expressions.items())) cache_key = f"auto_{width}x{height}_{hashlib.md5((ops_key + expr_key).encode()).hexdigest()}" if cache_key in _COMPILED_KERNELS: return _COMPILED_KERNELS[cache_key] # Generate autonomous kernel code kernel_code = _generate_autonomous_kernel(effects, width, height, dynamic_expressions) # Compile kernel kernel = cp.RawKernel(kernel_code, 'autonomous_pipeline') # Create wrapper function def run_autonomous(frame: cp.ndarray, frame_num: int, fps: float = 30.0) -> cp.ndarray: """Run the autonomous pipeline - no Python in the hot path!""" if frame.dtype != cp.uint8: frame = cp.clip(frame, 0, 255).astype(cp.uint8) if not frame.flags['C_CONTIGUOUS']: frame = cp.ascontiguousarray(frame) output = cp.zeros_like(frame) block = (16, 16) grid = ((width + 15) // 16, (height + 15) // 16) # Only pass frame_num and fps - kernel computes everything else! t = float(frame_num) / float(fps) kernel(grid, block, (frame, output, np.int32(width), np.int32(height), np.float32(t), np.int32(frame_num))) return output _COMPILED_KERNELS[cache_key] = run_autonomous return run_autonomous def _generate_autonomous_kernel(effects: List[dict], width: int, height: int, dynamic_expressions: dict) -> str: """Generate CUDA kernel that computes everything autonomously.""" # Map simple expressions to CUDA code def expr_to_cuda(expr: str) -> str: """Convert simple expression to CUDA.""" expr = expr.replace('sin(', 'sinf(') expr = expr.replace('cos(', 'cosf(') expr = expr.replace('abs(', 'fabsf(') return expr code = r''' extern "C" __global__ void autonomous_pipeline( const unsigned char* src, unsigned char* dst, int width, int height, float t, int frame_num ) { int x = blockIdx.x * blockDim.x + threadIdx.x; int y = blockIdx.y * blockDim.y + threadIdx.y; if (x >= width || y >= height) return; // Compute dynamic parameters from time (ALL ON GPU!) ''' # Add dynamic parameter calculations rotate_expr = dynamic_expressions.get('rotate_angle', '0.0f') ripple_phase_expr = dynamic_expressions.get('ripple_phase', '0.0f') brightness_expr = dynamic_expressions.get('brightness_factor', '1.0f') zoom_expr = dynamic_expressions.get('zoom_amount', '1.0f') code += f''' float rotate_angle = {expr_to_cuda(rotate_expr)}; float ripple_phase = {expr_to_cuda(ripple_phase_expr)}; float brightness_factor = {expr_to_cuda(brightness_expr)}; float zoom_amount = {expr_to_cuda(zoom_expr)}; // Start with source coordinates float src_x = (float)x; float src_y = (float)y; float cx = width / 2.0f; float cy = height / 2.0f; // Accumulated transforms float total_cos = 1.0f, total_sin = 0.0f; float total_zoom = 1.0f; float ripple_dx = 0.0f, ripple_dy = 0.0f; ''' # Add effect-specific code for i, effect in enumerate(effects): op = effect['op'] if op == 'rotate': code += f''' // Rotate {i} {{ float angle = rotate_angle * 3.14159265f / 180.0f; float c = cosf(angle); float s = sinf(angle); float nc = total_cos * c - total_sin * s; float ns = total_cos * s + total_sin * c; total_cos = nc; total_sin = ns; }} ''' elif op == 'zoom': code += f''' // Zoom {i} {{ total_zoom *= zoom_amount; }} ''' elif op == 'ripple': amp = float(effect.get('amplitude', 10)) freq = float(effect.get('frequency', 8)) decay = float(effect.get('decay', 2)) rcx = float(effect.get('center_x', width/2)) rcy = float(effect.get('center_y', height/2)) code += f''' // Ripple {i} {{ float amplitude = {amp:.1f}f; float frequency = {freq:.1f}f; float decay_val = {decay:.1f}f; float rcx = {rcx:.1f}f; float rcy = {rcy:.1f}f; float rdx = src_x - rcx; float rdy = src_y - rcy; float dist = sqrtf(rdx * rdx + rdy * rdy); float wave = sinf(dist * frequency * 0.1f + ripple_phase); float amp = amplitude * expf(-dist * decay_val * 0.01f); if (dist > 0.001f) {{ ripple_dx += rdx / dist * wave * amp; ripple_dy += rdy / dist * wave * amp; }} }} ''' # Apply geometric transforms code += ''' // Apply accumulated transforms { float dx = src_x - cx; float dy = src_y - cy; float rx = total_cos * dx + total_sin * dy; float ry = -total_sin * dx + total_cos * dy; rx /= total_zoom; ry /= total_zoom; src_x = rx + cx - ripple_dx; src_y = ry + cy - ripple_dy; } // Bilinear sample float r, g, b; if (src_x < 0 || src_x >= width - 1 || src_y < 0 || src_y >= height - 1) { r = g = b = 0; } else { int x0 = (int)src_x; int y0 = (int)src_y; float fx = src_x - x0; float fy = src_y - y0; int idx00 = (y0 * width + x0) * 3; int idx10 = (y0 * width + x0 + 1) * 3; int idx01 = ((y0 + 1) * width + x0) * 3; int idx11 = ((y0 + 1) * width + x0 + 1) * 3; #define BILERP(c) \\ (src[idx00 + c] * (1-fx) * (1-fy) + \\ src[idx10 + c] * fx * (1-fy) + \\ src[idx01 + c] * (1-fx) * fy + \\ src[idx11 + c] * fx * fy) r = BILERP(0); g = BILERP(1); b = BILERP(2); } ''' # Add color transforms for i, effect in enumerate(effects): op = effect['op'] if op == 'hue_shift': degrees = float(effect.get('degrees', 0)) code += f''' // Hue shift {i} {{ float shift = {degrees:.1f}f; float rf = r / 255.0f; float gf = g / 255.0f; float bf = b / 255.0f; float max_c = fmaxf(rf, fmaxf(gf, bf)); float min_c = fminf(rf, fminf(gf, bf)); float delta = max_c - min_c; float h = 0, s = 0, v = max_c; if (delta > 0.00001f) {{ s = delta / max_c; if (rf >= max_c) h = (gf - bf) / delta; else if (gf >= max_c) h = 2.0f + (bf - rf) / delta; else h = 4.0f + (rf - gf) / delta; h *= 60.0f; if (h < 0) h += 360.0f; }} h = fmodf(h + shift + 360.0f, 360.0f); float c = v * s; float x_val = c * (1 - fabsf(fmodf(h / 60.0f, 2.0f) - 1)); float m = v - c; float r2, g2, b2; if (h < 60) {{ r2 = c; g2 = x_val; b2 = 0; }} else if (h < 120) {{ r2 = x_val; g2 = c; b2 = 0; }} else if (h < 180) {{ r2 = 0; g2 = c; b2 = x_val; }} else if (h < 240) {{ r2 = 0; g2 = x_val; b2 = c; }} else if (h < 300) {{ r2 = x_val; g2 = 0; b2 = c; }} else {{ r2 = c; g2 = 0; b2 = x_val; }} r = (r2 + m) * 255.0f; g = (g2 + m) * 255.0f; b = (b2 + m) * 255.0f; }} ''' elif op == 'brightness': code += ''' // Brightness { r *= brightness_factor; g *= brightness_factor; b *= brightness_factor; } ''' # Write output code += ''' // Write output int dst_idx = (y * width + x) * 3; dst[dst_idx] = (unsigned char)fminf(255.0f, fmaxf(0.0f, r)); dst[dst_idx + 1] = (unsigned char)fminf(255.0f, fmaxf(0.0f, g)); dst[dst_idx + 2] = (unsigned char)fminf(255.0f, fmaxf(0.0f, b)); } ''' return code # Test the compiler if __name__ == '__main__': import time print("[sexp_to_cuda] Testing fused kernel compiler...") print("=" * 60) # Define a test pipeline effects = [ {'op': 'rotate', 'angle': 45.0}, {'op': 'hue_shift', 'degrees': 30.0}, {'op': 'ripple', 'amplitude': 15, 'frequency': 10, 'decay': 2, 'phase': 0, 'center_x': 960, 'center_y': 540}, {'op': 'brightness', 'factor': 1.0}, ] frame = cp.random.randint(0, 255, (1080, 1920, 3), dtype=cp.uint8) # ===== Test 1: Standard fused kernel (params passed from Python) ===== print("\n[Test 1] Standard fused kernel (Python computes params)") pipeline = compile_frame_pipeline(effects, 1920, 1080) # Warmup output = pipeline(frame) cp.cuda.Stream.null.synchronize() # Benchmark with Python param computation start = time.time() for i in range(100): # Simulate Python computing params (like sexp interpreter does) import math t = i / 30.0 angle = t * 30 phase = t * 2 brightness = 0.8 + 0.4 * math.sin(t * 2) output = pipeline(frame, rotate_angle=angle, ripple_phase=phase) cp.cuda.Stream.null.synchronize() elapsed = time.time() - start print(f" Time: {elapsed/100*1000:.2f}ms per frame") print(f" FPS: {100/elapsed:.0f}") # ===== Test 2: Autonomous kernel (GPU computes everything) ===== print("\n[Test 2] Autonomous kernel (GPU computes ALL params)") dynamic_expressions = { 'rotate_angle': 't * 30.0f', 'ripple_phase': 't * 2.0f', 'brightness_factor': '0.8f + 0.4f * sinf(t * 2.0f)', } auto_pipeline = compile_autonomous_pipeline(effects, 1920, 1080, dynamic_expressions) # Warmup output = auto_pipeline(frame, 0, 30.0) cp.cuda.Stream.null.synchronize() # Benchmark - NO Python computation in loop! start = time.time() for i in range(100): output = auto_pipeline(frame, i, 30.0) # Just pass frame_num! cp.cuda.Stream.null.synchronize() elapsed = time.time() - start print(f" Time: {elapsed/100*1000:.2f}ms per frame") print(f" FPS: {100/elapsed:.0f}") print("\n" + "=" * 60) print("Autonomous kernel eliminates Python from hot path!")