From 9a8a701492a3f82823db7bf5eb5b2c4820c49de3 Mon Sep 17 00:00:00 2001 From: giles Date: Wed, 4 Feb 2026 16:33:12 +0000 Subject: [PATCH] Fix GPU encoding black frames and improve debug logging - Add CUDA sync before encoding to ensure RGB->NV12 kernel completes - Add debug logging for frame data validation (sum check) - Handle GPUFrame objects in GPUHLSOutput.write() - Fix cv2.resize for CuPy arrays (use cupyx.scipy.ndimage.zoom) - Fix fused pipeline parameter ordering (geometric first, color second) - Add raindrop-style ripple with random position/freq/decay/amp - Generate final VOD playlist with #EXT-X-ENDLIST Co-Authored-By: Claude Opus 4.5 --- app/routers/runs.py | 46 ++++ app/templates/runs/detail.html | 27 ++- recipes/woods-lowres.sexp | 223 +++++++++++++++++++ sexp_effects/primitive_libs/streaming_gpu.py | 53 ++++- streaming/gpu_output.py | 61 ++++- streaming/sexp_to_cuda.py | 67 ++++-- streaming/stream_sexp.py | 19 +- streaming/stream_sexp_generic.py | 12 +- 8 files changed, 471 insertions(+), 37 deletions(-) create mode 100644 recipes/woods-lowres.sexp diff --git a/app/routers/runs.py b/app/routers/runs.py index 53c4540..3ff6cbf 100644 --- a/app/routers/runs.py +++ b/app/routers/runs.py @@ -1137,6 +1137,52 @@ async def serve_hls_content( raise HTTPException(404, f"File not found: {filename}") +@router.get("/{run_id}/playlist.m3u8") +async def get_playlist(run_id: str, request: Request): + """Get live HLS playlist for a streaming run. + + Returns the latest playlist content directly, allowing HLS players + to poll this URL for updates without dealing with changing IPFS CIDs. + """ + import database + import os + import httpx + from fastapi.responses import Response + + await database.init_db() + + pending = await database.get_pending_run(run_id) + if not pending: + raise HTTPException(404, "Run not found") + + ipfs_playlist_cid = pending.get("ipfs_playlist_cid") + if not ipfs_playlist_cid: + raise HTTPException(404, "Playlist not yet available") + + # Fetch playlist from local IPFS node + ipfs_api = os.environ.get("IPFS_API_URL", "http://celery_ipfs:5001") + + try: + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.post(f"{ipfs_api}/api/v0/cat?arg={ipfs_playlist_cid}") + if resp.status_code != 200: + raise HTTPException(502, "Failed to fetch playlist from IPFS") + playlist_content = resp.text + except httpx.RequestError as e: + raise HTTPException(502, f"IPFS error: {e}") + + return Response( + content=playlist_content, + media_type="application/vnd.apple.mpegurl", + headers={ + "Cache-Control": "no-cache, no-store, must-revalidate", + "Pragma": "no-cache", + "Expires": "0", + "Access-Control-Allow-Origin": "*", + } + ) + + @router.get("/{run_id}/ipfs-stream") async def get_ipfs_stream_info(run_id: str, request: Request): """Get IPFS streaming info for a run. diff --git a/app/templates/runs/detail.html b/app/templates/runs/detail.html index 339c437..22b9bd8 100644 --- a/app/templates/runs/detail.html +++ b/app/templates/runs/detail.html @@ -103,15 +103,34 @@ const video = document.getElementById('live-video'); const statusEl = document.getElementById('stream-status'); const loadingEl = document.getElementById('stream-loading'); - const hlsUrl = '/runs/{{ run.run_id }}/hls/stream.m3u8'; + // Use dynamic playlist endpoint with cache busting + const baseUrl = '/runs/{{ run.run_id }}/playlist.m3u8'; + function getHlsUrl() { + return baseUrl + '?_t=' + Date.now(); + } let hls = null; let retryCount = 0; const maxRetries = 120; // Try for up to 4 minutes let segmentsLoaded = 0; + // Custom playlist loader that adds cache-busting to every request + class CacheBustingPlaylistLoader extends Hls.DefaultConfig.loader { + load(context, config, callbacks) { + if (context.type === 'manifest' || context.type === 'level') { + const url = new URL(context.url, window.location.origin); + url.searchParams.set('_t', Date.now()); + context.url = url.toString(); + } + super.load(context, config, callbacks); + } + } + function initHls() { if (Hls.isSupported()) { hls = new Hls({ + // Custom loader to bust cache on playlist requests + pLoader: CacheBustingPlaylistLoader, + // Stay far behind live edge - rendering is slow (~0.1x speed) // 10 segments = 40s of buffer before catching up liveSyncDurationCount: 10, // Stay 10 segments behind live edge @@ -177,7 +196,7 @@ // Exponential backoff with jitter const delay = Math.min(1000 * Math.pow(1.5, Math.min(retryCount, 6)), 10000); setTimeout(() => { - hls.loadSource(hlsUrl); + hls.loadSource(getHlsUrl()); }, delay + Math.random() * 1000); } else { statusEl.textContent = 'Stream unavailable'; @@ -246,11 +265,11 @@ } }, 1000); - hls.loadSource(hlsUrl); + hls.loadSource(getHlsUrl()); hls.attachMedia(video); } else if (video.canPlayType('application/vnd.apple.mpegurl')) { // Native HLS support (Safari) - video.src = hlsUrl; + video.src = getHlsUrl(); video.addEventListener('loadedmetadata', function() { loadingEl.classList.add('hidden'); statusEl.textContent = 'Playing'; diff --git a/recipes/woods-lowres.sexp b/recipes/woods-lowres.sexp new file mode 100644 index 0000000..55a1a6a --- /dev/null +++ b/recipes/woods-lowres.sexp @@ -0,0 +1,223 @@ +;; Woods Recipe - OPTIMIZED VERSION +;; +;; Uses fused-pipeline for GPU acceleration when available, +;; falls back to individual primitives on CPU. +;; +;; Key optimizations: +;; 1. Uses streaming_gpu primitives with fast CUDA kernels +;; 2. Uses fused-pipeline to batch effects into single kernel passes +;; 3. GPU persistence - frames stay on GPU throughout pipeline + +(stream "woods-lowres" + :fps 30 + :width 640 + :height 360 + :seed 42 + + ;; Load standard primitives (includes proper asset resolution) + ;; Auto-selects GPU versions when available, falls back to CPU + (include :name "tpl-standard-primitives") + + ;; === SOURCES (using streaming: which has proper asset resolution) === + (def sources [ + (streaming:make-video-source "woods-1" 30) + (streaming:make-video-source "woods-2" 30) + (streaming:make-video-source "woods-3" 30) + (streaming:make-video-source "woods-4" 30) + (streaming:make-video-source "woods-5" 30) + (streaming:make-video-source "woods-6" 30) + (streaming:make-video-source "woods-7" 30) + (streaming:make-video-source "woods-8" 30) + ]) + + ;; Per-pair config + (def pair-configs [ + {:dir -1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5} + {:dir 1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5} + {:dir 1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5} + {:dir -1 :rot-a -45 :rot-b 45 :zoom-a 0.5 :zoom-b 1.5} + {:dir -1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5} + {:dir 1 :rot-a 30 :rot-b -30 :zoom-a 1.3 :zoom-b 0.7} + {:dir -1 :rot-a -45 :rot-b 45 :zoom-a 0.5 :zoom-b 1.5} + {:dir 1 :rot-a 45 :rot-b -45 :zoom-a 1.5 :zoom-b 0.5} + ]) + + ;; Audio + (def music (streaming:make-audio-analyzer "woods-audio")) + (audio-playback "woods-audio") + + ;; === SCANS === + + ;; Cycle state + (scan cycle (streaming:audio-beat music t) + :init {:active 0 :beat 0 :clen 16} + :step (if (< (+ beat 1) clen) + (dict :active active :beat (+ beat 1) :clen clen) + (dict :active (mod (+ active 1) (len sources)) :beat 0 + :clen (+ 8 (mod (* (streaming:audio-beat-count music t) 7) 17))))) + + ;; Spin scan + (scan spin (streaming:audio-beat music t) + :init {:angle 0 :dir 1 :speed 2} + :step (let [new-dir (if (< (core:rand) 0.05) (* dir -1) dir) + new-speed (if (< (core:rand) 0.1) (+ 1 (core:rand-int 1 4)) speed)] + (dict :angle (+ angle (* new-dir new-speed)) + :dir new-dir + :speed new-speed))) + + ;; Ripple scan - raindrop style, all params randomized + ;; Higher freq = bigger gaps between waves (formula is dist/freq) + (scan ripple-state (streaming:audio-beat music t) + :init {:gate 0 :cx 320 :cy 180 :freq 20 :decay 6 :amp-mult 1.0} + :step (let [new-gate (if (< (core:rand) 0.2) (+ 2 (core:rand-int 0 4)) (core:max 0 (- gate 1))) + triggered (> new-gate gate) + new-cx (if triggered (core:rand-int 50 590) cx) + new-cy (if triggered (core:rand-int 50 310) cy) + new-freq (if triggered (+ 15 (core:rand-int 0 20)) freq) + new-decay (if triggered (+ 5 (core:rand-int 0 4)) decay) + new-amp-mult (if triggered (+ 0.8 (* (core:rand) 1.2)) amp-mult)] + (dict :gate new-gate :cx new-cx :cy new-cy :freq new-freq :decay new-decay :amp-mult new-amp-mult))) + + ;; Pair states + (scan pairs (streaming:audio-beat music t) + :init {:states (map (core:range (len sources)) (lambda (_) + {:inv-a 0 :inv-b 0 :hue-a 0 :hue-b 0 :hue-a-val 0 :hue-b-val 0 :mix 0.5 :mix-rem 5 :angle 0 :rot-beat 0 :rot-clen 25}))} + :step (dict :states (map states (lambda (p) + (let [new-inv-a (if (< (core:rand) 0.1) (+ 1 (core:rand-int 1 4)) (core:max 0 (- (get p :inv-a) 1))) + new-inv-b (if (< (core:rand) 0.1) (+ 1 (core:rand-int 1 4)) (core:max 0 (- (get p :inv-b) 1))) + old-hue-a (get p :hue-a) + old-hue-b (get p :hue-b) + new-hue-a (if (< (core:rand) 0.1) (+ 1 (core:rand-int 1 4)) (core:max 0 (- old-hue-a 1))) + new-hue-b (if (< (core:rand) 0.1) (+ 1 (core:rand-int 1 4)) (core:max 0 (- old-hue-b 1))) + new-hue-a-val (if (> new-hue-a old-hue-a) (+ 30 (* (core:rand) 300)) (get p :hue-a-val)) + new-hue-b-val (if (> new-hue-b old-hue-b) (+ 30 (* (core:rand) 300)) (get p :hue-b-val)) + mix-rem (get p :mix-rem) + old-mix (get p :mix) + new-mix-rem (if (> mix-rem 0) (- mix-rem 1) (+ 1 (core:rand-int 1 10))) + new-mix (if (> mix-rem 0) old-mix (* (core:rand-int 0 2) 0.5)) + rot-beat (get p :rot-beat) + rot-clen (get p :rot-clen) + old-angle (get p :angle) + new-rot-beat (if (< (+ rot-beat 1) rot-clen) (+ rot-beat 1) 0) + new-rot-clen (if (< (+ rot-beat 1) rot-clen) rot-clen (+ 20 (core:rand-int 0 10))) + new-angle (+ old-angle (/ 360 rot-clen))] + (dict :inv-a new-inv-a :inv-b new-inv-b + :hue-a new-hue-a :hue-b new-hue-b + :hue-a-val new-hue-a-val :hue-b-val new-hue-b-val + :mix new-mix :mix-rem new-mix-rem + :angle new-angle :rot-beat new-rot-beat :rot-clen new-rot-clen)))))) + + ;; === OPTIMIZED PROCESS-PAIR MACRO === + ;; Uses fused-pipeline to batch rotate+hue+invert into single kernel + (defmacro process-pair-fast (idx) + (let [;; Get sources for this pair (with safe modulo indexing) + num-sources (len sources) + src-a (nth sources (mod (* idx 2) num-sources)) + src-b (nth sources (mod (+ (* idx 2) 1) num-sources)) + cfg (nth pair-configs idx) + pstate (nth (bind pairs :states) idx) + + ;; Read frames (GPU decode, stays on GPU) + frame-a (streaming:source-read src-a t) + frame-b (streaming:source-read src-b t) + + ;; Get state values + dir (get cfg :dir) + rot-max-a (get cfg :rot-a) + rot-max-b (get cfg :rot-b) + zoom-max-a (get cfg :zoom-a) + zoom-max-b (get cfg :zoom-b) + pair-angle (get pstate :angle) + inv-a-on (> (get pstate :inv-a) 0) + inv-b-on (> (get pstate :inv-b) 0) + hue-a-on (> (get pstate :hue-a) 0) + hue-b-on (> (get pstate :hue-b) 0) + hue-a-val (get pstate :hue-a-val) + hue-b-val (get pstate :hue-b-val) + mix-ratio (get pstate :mix) + + ;; Calculate rotation angles + angle-a (* dir pair-angle rot-max-a 0.01) + angle-b (* dir pair-angle rot-max-b 0.01) + + ;; Energy-driven zoom (maps audio energy 0-1 to 1-max) + zoom-a (core:map-range e 0 1 1 zoom-max-a) + zoom-b (core:map-range e 0 1 1 zoom-max-b) + + ;; Define effect pipelines for each source + ;; These get compiled to single CUDA kernels! + ;; First resize to target resolution, then apply effects + effects-a [{:op "resize" :width 640 :height 360} + {:op "zoom" :amount zoom-a} + {:op "rotate" :angle angle-a} + {:op "hue_shift" :degrees (if hue-a-on hue-a-val 0)} + {:op "invert" :amount (if inv-a-on 1 0)}] + effects-b [{:op "resize" :width 640 :height 360} + {:op "zoom" :amount zoom-b} + {:op "rotate" :angle angle-b} + {:op "hue_shift" :degrees (if hue-b-on hue-b-val 0)} + {:op "invert" :amount (if inv-b-on 1 0)}] + + ;; Apply fused pipelines (single kernel per source!) + processed-a (streaming:fused-pipeline frame-a effects-a) + processed-b (streaming:fused-pipeline frame-b effects-b)] + + ;; Blend the two processed frames + (blending:blend-images processed-a processed-b mix-ratio))) + + ;; === FRAME PIPELINE === + (frame + (let [now t + e (streaming:audio-energy music now) + + ;; Get cycle state + active (bind cycle :active) + beat-pos (bind cycle :beat) + clen (bind cycle :clen) + + ;; Transition logic + phase3 (* beat-pos 3) + fading (and (>= phase3 (* clen 2)) (< phase3 (* clen 3))) + fade-amt (if fading (/ (- phase3 (* clen 2)) clen) 0) + next-idx (mod (+ active 1) (len sources)) + + ;; Process active pair with fused pipeline + active-frame (process-pair-fast active) + + ;; Crossfade with zoom during transition + ;; Old pair: zooms out (1.0 -> 2.0) and fades out + ;; New pair: starts small (0.1), zooms in (-> 1.0) and fades in + result (if fading + (let [next-frame (process-pair-fast next-idx) + ;; Active zooms out as it fades + active-zoom (+ 1.0 fade-amt) + active-zoomed (streaming:fused-pipeline active-frame + [{:op "zoom" :amount active-zoom}]) + ;; Next starts small and zooms in + next-zoom (+ 0.1 (* fade-amt 0.9)) + next-zoomed (streaming:fused-pipeline next-frame + [{:op "zoom" :amount next-zoom}])] + (blending:blend-images active-zoomed next-zoomed fade-amt)) + active-frame) + + ;; Final effects pipeline (fused!) + spin-angle (bind spin :angle) + ;; Ripple params - all randomized per ripple trigger + rip-gate (bind ripple-state :gate) + rip-amp-mult (bind ripple-state :amp-mult) + rip-amp (* rip-gate rip-amp-mult (core:map-range e 0 1 50 200)) + rip-cx (bind ripple-state :cx) + rip-cy (bind ripple-state :cy) + rip-freq (bind ripple-state :freq) + rip-decay (bind ripple-state :decay) + + ;; Fused final effects + final-effects [{:op "rotate" :angle spin-angle} + {:op "ripple" :amplitude rip-amp :frequency rip-freq :decay rip-decay + :phase (* now 5) :center_x rip-cx :center_y rip-cy}]] + + ;; Apply final fused pipeline + (streaming:fused-pipeline result final-effects + :rotate_angle spin-angle + :ripple_phase (* now 5) + :ripple_amplitude rip-amp)))) diff --git a/sexp_effects/primitive_libs/streaming_gpu.py b/sexp_effects/primitive_libs/streaming_gpu.py index 3e7b247..f2aa7ea 100644 --- a/sexp_effects/primitive_libs/streaming_gpu.py +++ b/sexp_effects/primitive_libs/streaming_gpu.py @@ -894,7 +894,7 @@ def prim_fused_pipeline(img, effects_list, **dynamic_params): Returns: Processed image as GPU array - Supported ops: rotate, zoom, ripple, invert, hue_shift, brightness + Supported ops: rotate, zoom, ripple, invert, hue_shift, brightness, resize """ global _FUSED_CALL_COUNT _FUSED_CALL_COUNT += 1 @@ -904,8 +904,34 @@ def prim_fused_pipeline(img, effects_list, **dynamic_params): # Normalize effects list - convert Keyword keys to strings effects_list = [_normalize_effect_dict(e) for e in effects_list] + # Handle resize separately - it changes dimensions so must happen before fused kernel + resize_ops = [e for e in effects_list if e.get('op') == 'resize'] + other_effects = [e for e in effects_list if e.get('op') != 'resize'] + + # Apply resize first if needed + if resize_ops: + for resize_op in resize_ops: + target_w = int(resize_op.get('width', 640)) + target_h = int(resize_op.get('height', 360)) + # Wrap in GPUFrame if needed + if isinstance(img, GPUFrame): + img = gpu_resize(img, (target_w, target_h)) + img = img.gpu if img.is_on_gpu else img.cpu + else: + frame = GPUFrame(img, on_gpu=hasattr(img, '__cuda_array_interface__')) + img = gpu_resize(frame, (target_w, target_h)) + img = img.gpu if img.is_on_gpu else img.cpu + + # If no other effects, just return the resized image + if not other_effects: + return img + + # Update effects list to exclude resize ops + effects_list = other_effects + if not _FUSED_KERNELS_AVAILABLE: # Fallback: apply effects one by one + print(f"[FUSED FALLBACK] Using fallback path for {len(effects_list)} effects", file=sys.stderr) # Wrap in GPUFrame if needed (GPU functions expect GPUFrame objects) if isinstance(img, GPUFrame): result = img @@ -922,20 +948,27 @@ def prim_fused_pipeline(img, effects_list, **dynamic_params): result = gpu_zoom(result, amount) elif op == 'hue_shift': degrees = effect.get('degrees', 0) - result = gpu_hue_shift(result, degrees) + if abs(degrees) > 0.1: # Only apply if significant shift + result = gpu_hue_shift(result, degrees) elif op == 'ripple': - result = gpu_ripple(result, - amplitude=dynamic_params.get('ripple_amplitude', effect.get('amplitude', 10)), - frequency=effect.get('frequency', 8), - decay=effect.get('decay', 2), - phase=dynamic_params.get('ripple_phase', effect.get('phase', 0)), - cx=effect.get('center_x'), - cy=effect.get('center_y')) + amplitude = dynamic_params.get('ripple_amplitude', effect.get('amplitude', 10)) + if amplitude > 0.1: # Only apply if amplitude is significant + result = gpu_ripple(result, + amplitude=amplitude, + frequency=effect.get('frequency', 8), + decay=effect.get('decay', 2), + phase=dynamic_params.get('ripple_phase', effect.get('phase', 0)), + cx=effect.get('center_x'), + cy=effect.get('center_y')) elif op == 'brightness': factor = effect.get('factor', 1.0) result = gpu_contrast(result, factor, 0) elif op == 'invert': - result = gpu_invert(result) + amount = effect.get('amount', 0) + if amount > 0.5: # Only invert if amount > 0.5 + result = gpu_invert(result) + else: + raise ValueError(f"Unsupported fused pipeline operation: '{op}'. Supported ops: rotate, zoom, hue_shift, ripple, brightness, invert, resize") # Return raw array, not GPUFrame (downstream expects arrays with .flags attribute) if isinstance(result, GPUFrame): return result.gpu if result.is_on_gpu else result.cpu diff --git a/streaming/gpu_output.py b/streaming/gpu_output.py index 4c1e1b7..e32859e 100644 --- a/streaming/gpu_output.py +++ b/streaming/gpu_output.py @@ -99,10 +99,13 @@ class GPUEncoder: self._init_frame_buffer() # Create encoder with low-latency settings (no B-frames for immediate output) + # Use H264 codec explicitly, with SPS/PPS headers for browser compatibility self.encoder = nvc.CreateEncoder( width, height, "NV12", usecpuinputbuffer=False, + codec="h264", # Explicit H.264 (not HEVC) bf=0, # No B-frames - immediate output - lowLatency=1, # Low latency mode + repeatSPSPPS=1, # Include SPS/PPS with each IDR frame + idrPeriod=30, # IDR frame every 30 frames (1 sec at 30fps) ) # CUDA kernel grid/block config @@ -189,10 +192,25 @@ class GPUEncoder: if not frame_gpu.flags['C_CONTIGUOUS']: frame_gpu = cp.ascontiguousarray(frame_gpu) + # Debug: check input frame has actual data (first few frames only) + if self._frame_count < 3: + frame_sum = float(cp.sum(frame_gpu)) + print(f"[GPUEncoder] Frame {self._frame_count}: shape={frame_gpu.shape}, dtype={frame_gpu.dtype}, sum={frame_sum:.0f}", file=sys.stderr) + if frame_sum < 1000: + print(f"[GPUEncoder] WARNING: Frame appears to be mostly black!", file=sys.stderr) + # Convert RGB to NV12 on GPU kernel = _get_rgb_to_nv12_kernel() kernel(self._grid, self._block, (frame_gpu, self._y_plane, self._uv_plane, self.width, self.height)) + # CRITICAL: Synchronize CUDA to ensure kernel completes before encoding + cp.cuda.Stream.null.synchronize() + + # Debug: check Y plane has data after conversion (first few frames only) + if self._frame_count < 3: + y_sum = float(cp.sum(self._y_plane)) + print(f"[GPUEncoder] Frame {self._frame_count}: Y plane sum={y_sum:.0f}", file=sys.stderr) + # Encode (GPU to GPU) result = self.encoder.Encode(self._template_frame) self._frame_count += 1 @@ -312,6 +330,11 @@ class GPUHLSOutput: if not self._is_open: return + # Handle GPUFrame objects (from streaming_gpu primitives) + if hasattr(frame, 'gpu') and hasattr(frame, 'is_on_gpu'): + # It's a GPUFrame - extract the underlying array + frame = frame.gpu if frame.is_on_gpu else frame.cpu + # GPU encode encoded = self._gpu_encoder.encode_frame(frame) @@ -439,8 +462,44 @@ class GPUHLSOutput: self._upload_queue.put(None) # Signal shutdown self._upload_thread.join(timeout=30) + # Generate final playlist with #EXT-X-ENDLIST for VOD playback + self._generate_final_playlist() + self._gpu_encoder.close() + def _generate_final_playlist(self): + """Generate final IPFS playlist with #EXT-X-ENDLIST for completed streams.""" + with self._upload_lock: + if not self.segment_cids: + return + + lines = [ + "#EXTM3U", + "#EXT-X-VERSION:3", + f"#EXT-X-TARGETDURATION:{int(self.segment_duration) + 1}", + "#EXT-X-MEDIA-SEQUENCE:0", + "#EXT-X-PLAYLIST-TYPE:VOD", # Mark as VOD for completed streams + ] + + for seg_num in sorted(self.segment_cids.keys()): + cid = self.segment_cids[seg_num] + lines.append(f"#EXTINF:{self.segment_duration:.3f},") + # Use /ipfs-ts/ path for segments to get correct MIME type (video/mp2t) + segment_gateway = self.ipfs_gateway.replace("/ipfs", "/ipfs-ts") + lines.append(f"{segment_gateway}/{cid}") + + # Mark stream as complete - critical for VOD playback + lines.append("#EXT-X-ENDLIST") + + playlist_content = "\n".join(lines) + "\n" + + # Upload final playlist + self._playlist_cid = self._ipfs_add_bytes(playlist_content.encode(), pin=True) + if self._playlist_cid: + print(f"[GPUHLSOutput] Final VOD playlist: {self._playlist_cid} ({len(self.segment_cids)} segments)", file=sys.stderr) + if self._on_playlist_update: + self._on_playlist_update(self._playlist_cid) + @property def is_open(self) -> bool: return self._is_open diff --git a/streaming/sexp_to_cuda.py b/streaming/sexp_to_cuda.py index fc3afde..e4051bd 100644 --- a/streaming/sexp_to_cuda.py +++ b/streaming/sexp_to_cuda.py @@ -11,6 +11,9 @@ import numpy as np from typing import Dict, List, Any, Optional, Tuple import hashlib import sys +import logging + +logger = logging.getLogger(__name__) # Kernel cache _COMPILED_KERNELS: Dict[str, Any] = {} @@ -72,6 +75,13 @@ def compile_frame_pipeline(effects: List[dict], width: int, height: int) -> call def _generate_fused_kernel(effects: List[dict], width: int, height: int) -> str: """Generate CUDA kernel code for fused effects pipeline.""" + # Validate all ops are supported + SUPPORTED_OPS = {'rotate', 'zoom', 'ripple', 'invert', 'hue_shift', 'brightness'} + for effect in effects: + op = effect.get('op') + if op not in SUPPORTED_OPS: + raise ValueError(f"Unsupported CUDA kernel operation: '{op}'. Supported ops: {', '.join(sorted(SUPPORTED_OPS))}. Note: 'resize' must be handled separately before the fused kernel.") + # Build the kernel code = r''' extern "C" __global__ @@ -129,7 +139,7 @@ void fused_pipeline( ''' elif op == 'ripple': code += f''' - // Ripple {i} + // Ripple {i} - matching original formula: sin(dist/freq - phase) * exp(-dist*decay/maxdim) {{ float amplitude = params[param_idx++]; float frequency = params[param_idx++]; @@ -141,9 +151,11 @@ void fused_pipeline( float rdx = src_x - rcx; float rdy = src_y - rcy; float dist = sqrtf(rdx * rdx + rdy * rdy); + float max_dim = (float)(width > height ? width : height); - float wave = sinf(dist * frequency * 0.1f + phase); - float amp = amplitude * expf(-dist * decay * 0.01f); + // Original formula: sin(dist / frequency - phase) * exp(-dist * decay / max_dim) + float wave = sinf(dist / frequency - phase); + float amp = amplitude * expf(-dist * decay / max_dim); if (dist > 0.001f) {{ ripple_dx += rdx / dist * wave * amp; @@ -288,10 +300,25 @@ void fused_pipeline( return code +_BUILD_PARAMS_COUNT = 0 + def _build_params(effects: List[dict], dynamic_params: dict) -> cp.ndarray: - """Build parameter array for kernel.""" + """Build parameter array for kernel. + + IMPORTANT: Parameters must be built in the same order the kernel consumes them: + 1. First all geometric transforms (rotate, zoom, ripple) in list order + 2. Then all color transforms (invert, hue_shift, brightness) in list order + """ + global _BUILD_PARAMS_COUNT + _BUILD_PARAMS_COUNT += 1 + + # ALWAYS log first few calls - use WARNING to ensure visibility in Celery logs + if _BUILD_PARAMS_COUNT <= 3: + logger.warning(f"[BUILD_PARAMS #{_BUILD_PARAMS_COUNT}] effects={[e['op'] for e in effects]}") + params = [] + # First pass: geometric transforms (matches kernel's first loop) for effect in effects: op = effect['op'] @@ -300,16 +327,30 @@ def _build_params(effects: List[dict], dynamic_params: dict) -> cp.ndarray: elif op == 'zoom': params.append(float(dynamic_params.get('zoom_amount', effect.get('amount', 1.0)))) elif op == 'ripple': - params.append(float(dynamic_params.get('ripple_amplitude', effect.get('amplitude', 10)))) - params.append(float(effect.get('frequency', 8))) - params.append(float(effect.get('decay', 2))) - params.append(float(dynamic_params.get('ripple_phase', effect.get('phase', 0)))) - params.append(float(effect.get('center_x', 960))) - params.append(float(effect.get('center_y', 540))) - elif op == 'invert': - params.append(float(effect.get('amount', 0))) + amp = float(dynamic_params.get('ripple_amplitude', effect.get('amplitude', 10))) + freq = float(effect.get('frequency', 8)) + decay = float(effect.get('decay', 2)) + phase = float(dynamic_params.get('ripple_phase', effect.get('phase', 0))) + cx = float(effect.get('center_x', 960)) + cy = float(effect.get('center_y', 540)) + params.extend([amp, freq, decay, phase, cx, cy]) + if _BUILD_PARAMS_COUNT <= 10 or _BUILD_PARAMS_COUNT % 500 == 0: + logger.warning(f"[BUILD_PARAMS #{_BUILD_PARAMS_COUNT}] ripple amp={amp} freq={freq} decay={decay} phase={phase:.2f} cx={cx} cy={cy}") + + # Second pass: color transforms (matches kernel's second loop) + for effect in effects: + op = effect['op'] + + if op == 'invert': + amt = float(effect.get('amount', 0)) + params.append(amt) + if _BUILD_PARAMS_COUNT <= 10 or _BUILD_PARAMS_COUNT % 500 == 0: + logger.warning(f"[BUILD_PARAMS #{_BUILD_PARAMS_COUNT}] invert amount={amt}") elif op == 'hue_shift': - params.append(float(effect.get('degrees', 0))) + deg = float(effect.get('degrees', 0)) + params.append(deg) + if _BUILD_PARAMS_COUNT <= 10 or _BUILD_PARAMS_COUNT % 500 == 0: + logger.warning(f"[BUILD_PARAMS #{_BUILD_PARAMS_COUNT}] hue_shift degrees={deg}") elif op == 'brightness': params.append(float(effect.get('factor', 1.0))) diff --git a/streaming/stream_sexp.py b/streaming/stream_sexp.py index b36dabf..07acb2a 100644 --- a/streaming/stream_sexp.py +++ b/streaming/stream_sexp.py @@ -1028,7 +1028,24 @@ class StreamInterpreter: if result is not None: import cv2 if result.shape[:2] != (h, w): - result = cv2.resize(result, (w, h)) + # Handle CuPy arrays - cv2 can't resize them directly + if hasattr(result, '__cuda_array_interface__'): + # Use GPU resize via cupyx.scipy + try: + import cupy as cp + from cupyx.scipy import ndimage as cpndimage + curr_h, curr_w = result.shape[:2] + zoom_y = h / curr_h + zoom_x = w / curr_w + if result.ndim == 3: + result = cpndimage.zoom(result, (zoom_y, zoom_x, 1), order=1) + else: + result = cpndimage.zoom(result, (zoom_y, zoom_x), order=1) + except ImportError: + # Fallback to CPU resize + result = cv2.resize(cp.asnumpy(result), (w, h)) + else: + result = cv2.resize(result, (w, h)) out.write(result, self.ctx.t) # Progress diff --git a/streaming/stream_sexp_generic.py b/streaming/stream_sexp_generic.py index 6f11ef1..9ccb20f 100644 --- a/streaming/stream_sexp_generic.py +++ b/streaming/stream_sexp_generic.py @@ -144,8 +144,7 @@ class StreamInterpreter: """Load a config file and process its definitions.""" config_path = Path(config_path) # Accept str or Path if not config_path.exists(): - print(f"Warning: config file not found: {config_path}", file=sys.stderr) - return + raise FileNotFoundError(f"Config file not found: {config_path}") text = config_path.read_text() ast = parse_all(text) @@ -221,8 +220,7 @@ class StreamInterpreter: break if not lib_path: - print(f"Warning: primitive library '{lib_name}' not found", file=sys.stderr) - return + raise FileNotFoundError(f"Primitive library '{lib_name}' not found. Searched paths: {lib_paths}") spec = importlib.util.spec_from_file_location(actual_lib_name, lib_path) module = importlib.util.module_from_spec(spec) @@ -262,8 +260,7 @@ class StreamInterpreter: def _load_effect(self, effect_path: Path): """Load and register an effect from a .sexp file.""" if not effect_path.exists(): - print(f"Warning: effect file not found: {effect_path}", file=sys.stderr) - return + raise FileNotFoundError(f"Effect/include file not found: {effect_path}") text = effect_path.read_text() ast = parse_all(text) @@ -938,8 +935,7 @@ class StreamInterpreter: audio = str(resolved) print(f"Lazy resolved audio: {audio}", file=sys.stderr) else: - print(f"WARNING: Audio file not found: {audio}", file=sys.stderr) - audio = None + raise FileNotFoundError(f"Audio file not found: {audio}") if output == "pipe": out = PipeOutput(size=(w, h), fps=fps, audio_source=audio) elif output == "preview":