Fix GPU encoding black frames and improve debug logging
Some checks are pending
GPU Worker CI/CD / test (push) Waiting to run
GPU Worker CI/CD / deploy (push) Blocked by required conditions

- Add CUDA sync before encoding to ensure RGB->NV12 kernel completes
- Add debug logging for frame data validation (sum check)
- Handle GPUFrame objects in GPUHLSOutput.write()
- Fix cv2.resize for CuPy arrays (use cupyx.scipy.ndimage.zoom)
- Fix fused pipeline parameter ordering (geometric first, color second)
- Add raindrop-style ripple with random position/freq/decay/amp
- Generate final VOD playlist with #EXT-X-ENDLIST

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
giles
2026-02-04 16:33:12 +00:00
parent b15e381f81
commit 9a8a701492
8 changed files with 471 additions and 37 deletions

View File

@@ -99,10 +99,13 @@ class GPUEncoder:
self._init_frame_buffer()
# Create encoder with low-latency settings (no B-frames for immediate output)
# Use H264 codec explicitly, with SPS/PPS headers for browser compatibility
self.encoder = nvc.CreateEncoder(
width, height, "NV12", usecpuinputbuffer=False,
codec="h264", # Explicit H.264 (not HEVC)
bf=0, # No B-frames - immediate output
lowLatency=1, # Low latency mode
repeatSPSPPS=1, # Include SPS/PPS with each IDR frame
idrPeriod=30, # IDR frame every 30 frames (1 sec at 30fps)
)
# CUDA kernel grid/block config
@@ -189,10 +192,25 @@ class GPUEncoder:
if not frame_gpu.flags['C_CONTIGUOUS']:
frame_gpu = cp.ascontiguousarray(frame_gpu)
# Debug: check input frame has actual data (first few frames only)
if self._frame_count < 3:
frame_sum = float(cp.sum(frame_gpu))
print(f"[GPUEncoder] Frame {self._frame_count}: shape={frame_gpu.shape}, dtype={frame_gpu.dtype}, sum={frame_sum:.0f}", file=sys.stderr)
if frame_sum < 1000:
print(f"[GPUEncoder] WARNING: Frame appears to be mostly black!", file=sys.stderr)
# Convert RGB to NV12 on GPU
kernel = _get_rgb_to_nv12_kernel()
kernel(self._grid, self._block, (frame_gpu, self._y_plane, self._uv_plane, self.width, self.height))
# CRITICAL: Synchronize CUDA to ensure kernel completes before encoding
cp.cuda.Stream.null.synchronize()
# Debug: check Y plane has data after conversion (first few frames only)
if self._frame_count < 3:
y_sum = float(cp.sum(self._y_plane))
print(f"[GPUEncoder] Frame {self._frame_count}: Y plane sum={y_sum:.0f}", file=sys.stderr)
# Encode (GPU to GPU)
result = self.encoder.Encode(self._template_frame)
self._frame_count += 1
@@ -312,6 +330,11 @@ class GPUHLSOutput:
if not self._is_open:
return
# Handle GPUFrame objects (from streaming_gpu primitives)
if hasattr(frame, 'gpu') and hasattr(frame, 'is_on_gpu'):
# It's a GPUFrame - extract the underlying array
frame = frame.gpu if frame.is_on_gpu else frame.cpu
# GPU encode
encoded = self._gpu_encoder.encode_frame(frame)
@@ -439,8 +462,44 @@ class GPUHLSOutput:
self._upload_queue.put(None) # Signal shutdown
self._upload_thread.join(timeout=30)
# Generate final playlist with #EXT-X-ENDLIST for VOD playback
self._generate_final_playlist()
self._gpu_encoder.close()
def _generate_final_playlist(self):
"""Generate final IPFS playlist with #EXT-X-ENDLIST for completed streams."""
with self._upload_lock:
if not self.segment_cids:
return
lines = [
"#EXTM3U",
"#EXT-X-VERSION:3",
f"#EXT-X-TARGETDURATION:{int(self.segment_duration) + 1}",
"#EXT-X-MEDIA-SEQUENCE:0",
"#EXT-X-PLAYLIST-TYPE:VOD", # Mark as VOD for completed streams
]
for seg_num in sorted(self.segment_cids.keys()):
cid = self.segment_cids[seg_num]
lines.append(f"#EXTINF:{self.segment_duration:.3f},")
# Use /ipfs-ts/ path for segments to get correct MIME type (video/mp2t)
segment_gateway = self.ipfs_gateway.replace("/ipfs", "/ipfs-ts")
lines.append(f"{segment_gateway}/{cid}")
# Mark stream as complete - critical for VOD playback
lines.append("#EXT-X-ENDLIST")
playlist_content = "\n".join(lines) + "\n"
# Upload final playlist
self._playlist_cid = self._ipfs_add_bytes(playlist_content.encode(), pin=True)
if self._playlist_cid:
print(f"[GPUHLSOutput] Final VOD playlist: {self._playlist_cid} ({len(self.segment_cids)} segments)", file=sys.stderr)
if self._on_playlist_update:
self._on_playlist_update(self._playlist_cid)
@property
def is_open(self) -> bool:
return self._is_open

View File

@@ -11,6 +11,9 @@ import numpy as np
from typing import Dict, List, Any, Optional, Tuple
import hashlib
import sys
import logging
logger = logging.getLogger(__name__)
# Kernel cache
_COMPILED_KERNELS: Dict[str, Any] = {}
@@ -72,6 +75,13 @@ def compile_frame_pipeline(effects: List[dict], width: int, height: int) -> call
def _generate_fused_kernel(effects: List[dict], width: int, height: int) -> str:
"""Generate CUDA kernel code for fused effects pipeline."""
# Validate all ops are supported
SUPPORTED_OPS = {'rotate', 'zoom', 'ripple', 'invert', 'hue_shift', 'brightness'}
for effect in effects:
op = effect.get('op')
if op not in SUPPORTED_OPS:
raise ValueError(f"Unsupported CUDA kernel operation: '{op}'. Supported ops: {', '.join(sorted(SUPPORTED_OPS))}. Note: 'resize' must be handled separately before the fused kernel.")
# Build the kernel
code = r'''
extern "C" __global__
@@ -129,7 +139,7 @@ void fused_pipeline(
'''
elif op == 'ripple':
code += f'''
// Ripple {i}
// Ripple {i} - matching original formula: sin(dist/freq - phase) * exp(-dist*decay/maxdim)
{{
float amplitude = params[param_idx++];
float frequency = params[param_idx++];
@@ -141,9 +151,11 @@ void fused_pipeline(
float rdx = src_x - rcx;
float rdy = src_y - rcy;
float dist = sqrtf(rdx * rdx + rdy * rdy);
float max_dim = (float)(width > height ? width : height);
float wave = sinf(dist * frequency * 0.1f + phase);
float amp = amplitude * expf(-dist * decay * 0.01f);
// Original formula: sin(dist / frequency - phase) * exp(-dist * decay / max_dim)
float wave = sinf(dist / frequency - phase);
float amp = amplitude * expf(-dist * decay / max_dim);
if (dist > 0.001f) {{
ripple_dx += rdx / dist * wave * amp;
@@ -288,10 +300,25 @@ void fused_pipeline(
return code
_BUILD_PARAMS_COUNT = 0
def _build_params(effects: List[dict], dynamic_params: dict) -> cp.ndarray:
"""Build parameter array for kernel."""
"""Build parameter array for kernel.
IMPORTANT: Parameters must be built in the same order the kernel consumes them:
1. First all geometric transforms (rotate, zoom, ripple) in list order
2. Then all color transforms (invert, hue_shift, brightness) in list order
"""
global _BUILD_PARAMS_COUNT
_BUILD_PARAMS_COUNT += 1
# ALWAYS log first few calls - use WARNING to ensure visibility in Celery logs
if _BUILD_PARAMS_COUNT <= 3:
logger.warning(f"[BUILD_PARAMS #{_BUILD_PARAMS_COUNT}] effects={[e['op'] for e in effects]}")
params = []
# First pass: geometric transforms (matches kernel's first loop)
for effect in effects:
op = effect['op']
@@ -300,16 +327,30 @@ def _build_params(effects: List[dict], dynamic_params: dict) -> cp.ndarray:
elif op == 'zoom':
params.append(float(dynamic_params.get('zoom_amount', effect.get('amount', 1.0))))
elif op == 'ripple':
params.append(float(dynamic_params.get('ripple_amplitude', effect.get('amplitude', 10))))
params.append(float(effect.get('frequency', 8)))
params.append(float(effect.get('decay', 2)))
params.append(float(dynamic_params.get('ripple_phase', effect.get('phase', 0))))
params.append(float(effect.get('center_x', 960)))
params.append(float(effect.get('center_y', 540)))
elif op == 'invert':
params.append(float(effect.get('amount', 0)))
amp = float(dynamic_params.get('ripple_amplitude', effect.get('amplitude', 10)))
freq = float(effect.get('frequency', 8))
decay = float(effect.get('decay', 2))
phase = float(dynamic_params.get('ripple_phase', effect.get('phase', 0)))
cx = float(effect.get('center_x', 960))
cy = float(effect.get('center_y', 540))
params.extend([amp, freq, decay, phase, cx, cy])
if _BUILD_PARAMS_COUNT <= 10 or _BUILD_PARAMS_COUNT % 500 == 0:
logger.warning(f"[BUILD_PARAMS #{_BUILD_PARAMS_COUNT}] ripple amp={amp} freq={freq} decay={decay} phase={phase:.2f} cx={cx} cy={cy}")
# Second pass: color transforms (matches kernel's second loop)
for effect in effects:
op = effect['op']
if op == 'invert':
amt = float(effect.get('amount', 0))
params.append(amt)
if _BUILD_PARAMS_COUNT <= 10 or _BUILD_PARAMS_COUNT % 500 == 0:
logger.warning(f"[BUILD_PARAMS #{_BUILD_PARAMS_COUNT}] invert amount={amt}")
elif op == 'hue_shift':
params.append(float(effect.get('degrees', 0)))
deg = float(effect.get('degrees', 0))
params.append(deg)
if _BUILD_PARAMS_COUNT <= 10 or _BUILD_PARAMS_COUNT % 500 == 0:
logger.warning(f"[BUILD_PARAMS #{_BUILD_PARAMS_COUNT}] hue_shift degrees={deg}")
elif op == 'brightness':
params.append(float(effect.get('factor', 1.0)))

View File

@@ -1028,7 +1028,24 @@ class StreamInterpreter:
if result is not None:
import cv2
if result.shape[:2] != (h, w):
result = cv2.resize(result, (w, h))
# Handle CuPy arrays - cv2 can't resize them directly
if hasattr(result, '__cuda_array_interface__'):
# Use GPU resize via cupyx.scipy
try:
import cupy as cp
from cupyx.scipy import ndimage as cpndimage
curr_h, curr_w = result.shape[:2]
zoom_y = h / curr_h
zoom_x = w / curr_w
if result.ndim == 3:
result = cpndimage.zoom(result, (zoom_y, zoom_x, 1), order=1)
else:
result = cpndimage.zoom(result, (zoom_y, zoom_x), order=1)
except ImportError:
# Fallback to CPU resize
result = cv2.resize(cp.asnumpy(result), (w, h))
else:
result = cv2.resize(result, (w, h))
out.write(result, self.ctx.t)
# Progress

View File

@@ -144,8 +144,7 @@ class StreamInterpreter:
"""Load a config file and process its definitions."""
config_path = Path(config_path) # Accept str or Path
if not config_path.exists():
print(f"Warning: config file not found: {config_path}", file=sys.stderr)
return
raise FileNotFoundError(f"Config file not found: {config_path}")
text = config_path.read_text()
ast = parse_all(text)
@@ -221,8 +220,7 @@ class StreamInterpreter:
break
if not lib_path:
print(f"Warning: primitive library '{lib_name}' not found", file=sys.stderr)
return
raise FileNotFoundError(f"Primitive library '{lib_name}' not found. Searched paths: {lib_paths}")
spec = importlib.util.spec_from_file_location(actual_lib_name, lib_path)
module = importlib.util.module_from_spec(spec)
@@ -262,8 +260,7 @@ class StreamInterpreter:
def _load_effect(self, effect_path: Path):
"""Load and register an effect from a .sexp file."""
if not effect_path.exists():
print(f"Warning: effect file not found: {effect_path}", file=sys.stderr)
return
raise FileNotFoundError(f"Effect/include file not found: {effect_path}")
text = effect_path.read_text()
ast = parse_all(text)
@@ -938,8 +935,7 @@ class StreamInterpreter:
audio = str(resolved)
print(f"Lazy resolved audio: {audio}", file=sys.stderr)
else:
print(f"WARNING: Audio file not found: {audio}", file=sys.stderr)
audio = None
raise FileNotFoundError(f"Audio file not found: {audio}")
if output == "pipe":
out = PipeOutput(size=(w, h), fps=fps, audio_source=audio)
elif output == "preview":