From 2d20a6f45291cb236635f84d33c052fb0e68aea5 Mon Sep 17 00:00:00 2001 From: giles Date: Wed, 4 Feb 2026 09:51:56 +0000 Subject: [PATCH] Add fused-pipeline primitive and test for compiled CUDA kernels --- sexp_effects/primitive_libs/streaming_gpu.py | 96 +++++++++++++++++ test_fused_direct.py | 102 +++++++++++++++++++ test_fused_pipeline.sexp | 43 ++++++++ 3 files changed, 241 insertions(+) create mode 100644 test_fused_direct.py create mode 100644 test_fused_pipeline.sexp diff --git a/sexp_effects/primitive_libs/streaming_gpu.py b/sexp_effects/primitive_libs/streaming_gpu.py index a0391bf..e7ac693 100644 --- a/sexp_effects/primitive_libs/streaming_gpu.py +++ b/sexp_effects/primitive_libs/streaming_gpu.py @@ -842,5 +842,101 @@ def _get_cpu_primitives(): PRIMITIVES = _get_cpu_primitives().copy() +# Try to import fused kernel compiler +_FUSED_KERNELS_AVAILABLE = False +_compile_frame_pipeline = None +try: + if GPU_AVAILABLE: + from streaming.sexp_to_cuda import compile_frame_pipeline as _compile_frame_pipeline + _FUSED_KERNELS_AVAILABLE = True + print("[streaming_gpu] Fused CUDA kernel compiler loaded", file=sys.stderr) +except ImportError as e: + print(f"[streaming_gpu] Fused kernels not available: {e}", file=sys.stderr) + + +# Fused pipeline cache +_FUSED_PIPELINE_CACHE = {} + + +def prim_fused_pipeline(img, effects_list, **dynamic_params): + """ + Apply a fused CUDA kernel pipeline to an image. + + This compiles multiple effects into a single CUDA kernel that processes + the entire pipeline in one GPU pass, eliminating Python interpreter overhead. + + Args: + img: Input image (GPU array or numpy array) + effects_list: List of effect dicts like: + [{'op': 'rotate', 'angle': 45.0}, + {'op': 'hue_shift', 'degrees': 90.0}, + {'op': 'ripple', 'amplitude': 10, ...}] + **dynamic_params: Parameters that change per-frame like: + rotate_angle=45, ripple_phase=0.5 + + Returns: + Processed image as GPU array + + Supported ops: rotate, zoom, ripple, invert, hue_shift, brightness + """ + if not _FUSED_KERNELS_AVAILABLE: + # Fallback: apply effects one by one + result = img + for effect in effects_list: + op = effect['op'] + if op == 'rotate': + angle = dynamic_params.get('rotate_angle', effect.get('angle', 0)) + result = gpu_rotate(result, angle) + elif op == 'zoom': + amount = dynamic_params.get('zoom_amount', effect.get('amount', 1.0)) + result = gpu_zoom(result, amount) + elif op == 'hue_shift': + degrees = effect.get('degrees', 0) + result = gpu_hue_shift(result, degrees) + elif op == 'ripple': + result = gpu_ripple(result, + amplitude=effect.get('amplitude', 10), + frequency=effect.get('frequency', 8), + decay=effect.get('decay', 2), + phase=dynamic_params.get('ripple_phase', effect.get('phase', 0)), + center_x=effect.get('center_x'), + center_y=effect.get('center_y')) + elif op == 'brightness': + factor = effect.get('factor', 1.0) + result = gpu_contrast(result, factor, 0) + elif op == 'invert': + result = gpu_invert(result) + return result + + # Get image dimensions + if hasattr(img, 'shape'): + h, w = img.shape[:2] + else: + raise ValueError("Image must have shape attribute") + + # Create cache key from effects + import hashlib + ops_key = str([(e['op'], {k:v for k,v in e.items() if k != 'src2'}) for e in effects_list]) + cache_key = f"{w}x{h}_{hashlib.md5(ops_key.encode()).hexdigest()}" + + # Compile or get cached pipeline + if cache_key not in _FUSED_PIPELINE_CACHE: + _FUSED_PIPELINE_CACHE[cache_key] = _compile_frame_pipeline(effects_list, w, h) + + pipeline = _FUSED_PIPELINE_CACHE[cache_key] + + # Ensure image is on GPU and uint8 + if hasattr(img, '__cuda_array_interface__'): + gpu_img = img + elif GPU_AVAILABLE: + gpu_img = cp.asarray(img) + else: + gpu_img = img + + # Run the fused pipeline + return pipeline(gpu_img, **dynamic_params) + + # Add GPU-specific primitives +PRIMITIVES['fused-pipeline'] = prim_fused_pipeline # (The GPU video source will be added by create_cid_primitives in the task) diff --git a/test_fused_direct.py b/test_fused_direct.py new file mode 100644 index 0000000..638eff1 --- /dev/null +++ b/test_fused_direct.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +""" +Direct test of fused pipeline primitive. + +Compares performance of: +1. Fused kernel (single CUDA kernel for all effects) +2. Separate kernels (one CUDA kernel per effect) +""" + +import time +import sys + +# Check for CuPy +try: + import cupy as cp + print("[test] CuPy available") +except ImportError: + print("[test] CuPy not available - can't run test") + sys.exit(1) + +# Add path for imports +sys.path.insert(0, '/app') + +from streaming.sexp_to_cuda import compile_frame_pipeline +from streaming.jit_compiler import fast_rotate, fast_hue_shift, fast_ripple + +def test_fused_vs_separate(): + """Compare fused vs separate kernel performance.""" + + width, height = 1920, 1080 + n_frames = 100 + + # Create test frame + frame = cp.random.randint(0, 255, (height, width, 3), dtype=cp.uint8) + + # Define effects pipeline + effects = [ + {'op': 'rotate', 'angle': 45.0}, + {'op': 'hue_shift', 'degrees': 30.0}, + {'op': 'ripple', 'amplitude': 15, 'frequency': 10, 'decay': 2, 'phase': 0, 'center_x': 960, 'center_y': 540}, + ] + + print(f"\n[test] Testing {n_frames} frames at {width}x{height}") + print(f"[test] Effects: rotate, hue_shift, ripple\n") + + # ========== Test fused kernel ========== + print("[test] Compiling fused kernel...") + pipeline = compile_frame_pipeline(effects, width, height) + + # Warmup + output = pipeline(frame, rotate_angle=45, ripple_phase=0) + cp.cuda.Stream.null.synchronize() + + print("[test] Running fused kernel benchmark...") + start = time.time() + for i in range(n_frames): + output = pipeline(frame, rotate_angle=i * 3.6, ripple_phase=i * 0.1) + cp.cuda.Stream.null.synchronize() + fused_time = time.time() - start + + fused_ms = fused_time / n_frames * 1000 + fused_fps = n_frames / fused_time + print(f"[test] Fused kernel: {fused_ms:.2f}ms/frame ({fused_fps:.0f} fps)") + + # ========== Test separate kernels ========== + print("\n[test] Running separate kernels benchmark...") + + # Warmup + temp = fast_rotate(frame, 45.0) + temp = fast_hue_shift(temp, 30.0) + temp = fast_ripple(temp, 15, 10, 2, 0, 960, 540) + cp.cuda.Stream.null.synchronize() + + start = time.time() + for i in range(n_frames): + temp = fast_rotate(frame, i * 3.6) + temp = fast_hue_shift(temp, 30.0) + temp = fast_ripple(temp, 15, 10, 2, i * 0.1, 960, 540) + cp.cuda.Stream.null.synchronize() + separate_time = time.time() - start + + separate_ms = separate_time / n_frames * 1000 + separate_fps = n_frames / separate_time + print(f"[test] Separate kernels: {separate_ms:.2f}ms/frame ({separate_fps:.0f} fps)") + + # ========== Summary ========== + speedup = separate_time / fused_time + print(f"\n{'='*50}") + print(f"SPEEDUP: {speedup:.1f}x faster with fused kernel") + print(f"") + print(f"Fused: {fused_ms:.2f}ms ({fused_fps:.0f} fps)") + print(f"Separate: {separate_ms:.2f}ms ({separate_fps:.0f} fps)") + print(f"{'='*50}") + + # Compare with original Python sexp interpreter baseline (126-205ms) + python_baseline_ms = 150 # Approximate from profiling + vs_python = python_baseline_ms / fused_ms + print(f"\nVs Python sexp interpreter (~{python_baseline_ms}ms): {vs_python:.0f}x faster!") + + +if __name__ == '__main__': + test_fused_vs_separate() diff --git a/test_fused_pipeline.sexp b/test_fused_pipeline.sexp new file mode 100644 index 0000000..2e682bb --- /dev/null +++ b/test_fused_pipeline.sexp @@ -0,0 +1,43 @@ +;; Test Fused Pipeline - Should be much faster than interpreted +;; +;; This uses the fused-pipeline primitive which compiles all effects +;; into a single CUDA kernel instead of interpreting them one by one. + +(stream "fused_pipeline_test" + :fps 30 + :width 1920 + :height 1080 + :seed 42 + + ;; Load primitives + (require-primitives "streaming_gpu") + (require-primitives "image") + (require-primitives "math") + + ;; Define the effects pipeline (compiled to single CUDA kernel) + (def effects-pipeline + [{"op" "rotate" "angle" 0} + {"op" "zoom" "amount" 1.0} + {"op" "hue_shift" "degrees" 30} + {"op" "ripple" "amplitude" 15 "frequency" 10 "decay" 2 "phase" 0 "center_x" 960 "center_y" 540} + {"op" "brightness" "factor" 1.0}]) + + ;; Frame pipeline + (frame + (let [;; Create a gradient image + r (+ 0.5 (* 0.5 (math:sin (* t 1)))) + g (+ 0.5 (* 0.5 (math:sin (* t 1.3)))) + b (+ 0.5 (* 0.5 (math:sin (* t 1.7)))) + color [(* r 255) (* g 255) (* b 255)] + base (image:make-image 1920 1080 color) + + ;; Dynamic parameters (change per frame) + angle (* t 30) + zoom (+ 1.0 (* 0.2 (math:sin (* t 0.5)))) + phase (* t 2)] + + ;; Apply fused pipeline - all effects in ONE CUDA kernel! + (streaming_gpu:fused-pipeline base effects-pipeline + :rotate_angle angle + :zoom_amount zoom + :ripple_phase phase))))