From ed617fcdd6b47edf297d504700b05329c7132942 Mon Sep 17 00:00:00 2001 From: giles Date: Wed, 4 Feb 2026 11:32:04 +0000 Subject: [PATCH] Fix lazy audio path resolution for GPU streaming Audio playback path was being resolved during parsing when database may not be ready, causing fallback to non-existent path. Now resolves lazily when stream starts, matching how audio analyzer works. Co-Authored-By: Claude Opus 4.5 --- configs/audio-halleluwah.sexp | 6 +- configs/sources-woods.sexp | 18 ++-- database.py | 9 ++ recipes/woods-recipe-optimized.sexp | 8 +- sexp_effects/primitive_libs/streaming.py | 26 +++++- streaming/gpu_output.py | 26 +++++- streaming/output.py | 100 +++++++++++++++-------- streaming/stream_sexp_generic.py | 11 +++ tasks/streaming.py | 12 ++- 9 files changed, 159 insertions(+), 57 deletions(-) diff --git a/configs/audio-halleluwah.sexp b/configs/audio-halleluwah.sexp index 5e4b812..7d7bfae 100644 --- a/configs/audio-halleluwah.sexp +++ b/configs/audio-halleluwah.sexp @@ -10,8 +10,8 @@ (require-primitives "streaming") ;; Audio analyzer (provides beat detection and energy levels) -;; Paths relative to working directory (project root) -(def music (streaming:make-audio-analyzer "woods_half/halleluwah.webm")) +;; Using friendly name for asset resolution +(def music (streaming:make-audio-analyzer "woods-audio")) ;; Audio playback path (for sync with video output) -(audio-playback "woods_half/halleluwah.webm") +(audio-playback "woods-audio") diff --git a/configs/sources-woods.sexp b/configs/sources-woods.sexp index 717bfd9..ab8dff4 100644 --- a/configs/sources-woods.sexp +++ b/configs/sources-woods.sexp @@ -10,16 +10,16 @@ (require-primitives "streaming") ;; Video sources array -;; Paths relative to working directory (project root) +;; Using friendly names for asset resolution (def sources [ - (streaming:make-video-source "woods/1.webm" 10) - (streaming:make-video-source "woods/2.webm" 10) - (streaming:make-video-source "woods/3.webm" 10) - (streaming:make-video-source "woods/4.webm" 10) - (streaming:make-video-source "woods/5.webm" 10) - (streaming:make-video-source "woods/6.webm" 10) - (streaming:make-video-source "woods/7.webm" 10) - (streaming:make-video-source "woods/8.webm" 10) + (streaming:make-video-source "woods-1" 10) + (streaming:make-video-source "woods-2" 10) + (streaming:make-video-source "woods-3" 10) + (streaming:make-video-source "woods-4" 10) + (streaming:make-video-source "woods-5" 10) + (streaming:make-video-source "woods-6" 10) + (streaming:make-video-source "woods-7" 10) + (streaming:make-video-source "woods-8" 10) ]) ;; Per-pair effect config: rotation direction, rotation ranges, zoom ranges diff --git a/database.py b/database.py index 131d2c4..c224d2c 100644 --- a/database.py +++ b/database.py @@ -244,6 +244,15 @@ async def update_cache_item_ipfs_cid(cid: str, ipfs_cid: str) -> bool: return result == "UPDATE 1" +async def get_ipfs_cid(cid: str) -> Optional[str]: + """Get the IPFS CID for a cache item by its internal CID.""" + async with pool.acquire() as conn: + return await conn.fetchval( + "SELECT ipfs_cid FROM cache_items WHERE cid = $1", + cid + ) + + async def delete_cache_item(cid: str) -> bool: """Delete a cache item and all associated data (cascades).""" async with pool.acquire() as conn: diff --git a/recipes/woods-recipe-optimized.sexp b/recipes/woods-recipe-optimized.sexp index ac581ef..fd853e6 100644 --- a/recipes/woods-recipe-optimized.sexp +++ b/recipes/woods-recipe-optimized.sexp @@ -125,6 +125,8 @@ dir (get cfg :dir) rot-max-a (get cfg :rot-a) rot-max-b (get cfg :rot-b) + zoom-a (get cfg :zoom-a) + zoom-b (get cfg :zoom-b) pair-angle (get pstate :angle) inv-a-on (> (get pstate :inv-a) 0) inv-b-on (> (get pstate :inv-b) 0) @@ -140,10 +142,12 @@ ;; Define effect pipelines for each source ;; These get compiled to single CUDA kernels! - effects-a [{:op "rotate" :angle angle-a} + effects-a [{:op "zoom" :amount zoom-a} + {:op "rotate" :angle angle-a} {:op "hue_shift" :degrees (if hue-a-on hue-a-val 0)} {:op "invert" :amount (if inv-a-on 1 0)}] - effects-b [{:op "rotate" :angle angle-b} + effects-b [{:op "zoom" :amount zoom-b} + {:op "rotate" :angle angle-b} {:op "hue_shift" :degrees (if hue-b-on hue-b-val 0)} {:op "invert" :amount (if inv-b-on 1 0)}] diff --git a/sexp_effects/primitive_libs/streaming.py b/sexp_effects/primitive_libs/streaming.py index 647fa3b..9092087 100644 --- a/sexp_effects/primitive_libs/streaming.py +++ b/sexp_effects/primitive_libs/streaming.py @@ -65,11 +65,20 @@ class VideoSource: self._last_read_time = -1 self._cached_frame = None + # Check if file exists + if not self.path.exists(): + raise FileNotFoundError(f"Video file not found: {self.path}") + # Get video info cmd = ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", str(self.path)] result = subprocess.run(cmd, capture_output=True, text=True) - info = json.loads(result.stdout) + if result.returncode != 0: + raise RuntimeError(f"Failed to probe video '{self.path}': {result.stderr}") + try: + info = json.loads(result.stdout) + except json.JSONDecodeError: + raise RuntimeError(f"Invalid video file or ffprobe failed: {self.path}") for stream in info.get("streams", []): if stream.get("codec_type") == "video": @@ -281,16 +290,27 @@ class AudioAnalyzer: self.path = Path(path) self.sample_rate = sample_rate + # Check if file exists + if not self.path.exists(): + raise FileNotFoundError(f"Audio file not found: {self.path}") + # Load audio via ffmpeg - cmd = ["ffmpeg", "-v", "quiet", "-i", str(self.path), + cmd = ["ffmpeg", "-v", "error", "-i", str(self.path), "-f", "f32le", "-ac", "1", "-ar", str(sample_rate), "-"] result = subprocess.run(cmd, capture_output=True) + if result.returncode != 0: + raise RuntimeError(f"Failed to load audio '{self.path}': {result.stderr.decode()}") self._audio = np.frombuffer(result.stdout, dtype=np.float32) + if len(self._audio) == 0: + raise RuntimeError(f"Audio file is empty or invalid: {self.path}") # Get duration cmd = ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", str(self.path)] - info = json.loads(subprocess.run(cmd, capture_output=True, text=True).stdout) + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + raise RuntimeError(f"Failed to probe audio '{self.path}': {result.stderr}") + info = json.loads(result.stdout) self.duration = float(info.get("format", {}).get("duration", 60)) # Beat detection state diff --git a/streaming/gpu_output.py b/streaming/gpu_output.py index 1aa498b..9f3f290 100644 --- a/streaming/gpu_output.py +++ b/streaming/gpu_output.py @@ -235,6 +235,7 @@ class GPUHLSOutput: self.ipfs_gateway = ipfs_gateway.rstrip("/") self._on_playlist_update = on_playlist_update self._is_open = True + self.audio_source = audio_source # GPU encoder self._gpu_encoder = GPUEncoder(size[0], size[1], fps, crf) @@ -266,14 +267,29 @@ class GPUHLSOutput: print(f"[GPUHLSOutput] Initialized {size[0]}x{size[1]} @ {fps}fps, GPU encoding", file=sys.stderr) def _setup_muxer(self): - """Setup ffmpeg for muxing H.264 to MPEG-TS segments.""" + """Setup ffmpeg for muxing H.264 to MPEG-TS segments with optional audio.""" self.local_playlist_path = self.output_dir / "stream.m3u8" cmd = [ "ffmpeg", "-y", "-f", "h264", # Input is raw H.264 "-i", "-", - "-c:v", "copy", # Just copy, no re-encoding + ] + + # Add audio input if provided + if self.audio_source: + cmd.extend(["-i", str(self.audio_source)]) + cmd.extend(["-map", "0:v", "-map", "1:a"]) + + cmd.extend([ + "-c:v", "copy", # Just copy video, no re-encoding + ]) + + # Add audio codec if we have audio + if self.audio_source: + cmd.extend(["-c:a", "aac", "-b:a", "128k", "-shortest"]) + + cmd.extend([ "-f", "hls", "-hls_time", str(self.segment_duration), "-hls_list_size", "0", @@ -281,12 +297,14 @@ class GPUHLSOutput: "-hls_segment_type", "mpegts", "-hls_segment_filename", str(self.output_dir / "segment_%05d.ts"), str(self.local_playlist_path), - ] + ]) + + print(f"[GPUHLSOutput] FFmpeg cmd: {' '.join(cmd)}", file=sys.stderr) self._muxer = subprocess.Popen( cmd, stdin=subprocess.PIPE, - stderr=subprocess.DEVNULL, + stderr=subprocess.PIPE, # Capture stderr for debugging ) def write(self, frame: Union[np.ndarray, 'cp.ndarray'], t: float = 0): diff --git a/streaming/output.py b/streaming/output.py index e8b0750..86439da 100644 --- a/streaming/output.py +++ b/streaming/output.py @@ -11,6 +11,8 @@ Supports: import numpy as np import subprocess +import threading +import queue from abc import ABC, abstractmethod from typing import Tuple, Optional, List, Union from pathlib import Path @@ -665,12 +667,18 @@ class IPFSHLSOutput(Output): self.segment_cids: dict = {} # segment_number -> cid self._last_segment_checked = -1 self._playlist_cid: Optional[str] = None + self._upload_lock = threading.Lock() # Import IPFS client from ipfs_client import add_file, add_bytes self._ipfs_add_file = add_file self._ipfs_add_bytes = add_bytes + # Background upload thread for async IPFS uploads + self._upload_queue = queue.Queue() + self._upload_thread = threading.Thread(target=self._upload_worker, daemon=True) + self._upload_thread.start() + # Local HLS paths self.local_playlist_path = self.output_dir / "stream.m3u8" @@ -727,9 +735,38 @@ class IPFSHLSOutput(Output): stderr=None, ) - def _upload_new_segments(self): - """Check for new segments and upload them to IPFS.""" + def _upload_worker(self): + """Background worker thread for async IPFS uploads.""" import sys + while True: + try: + item = self._upload_queue.get(timeout=1.0) + if item is None: # Shutdown signal + break + seg_path, seg_num = item + self._do_upload(seg_path, seg_num) + except queue.Empty: + continue + except Exception as e: + print(f"Upload worker error: {e}", file=sys.stderr) + + def _do_upload(self, seg_path: Path, seg_num: int): + """Actually perform the upload (runs in background thread).""" + import sys + try: + cid = self._ipfs_add_file(seg_path, pin=True) + if cid: + with self._upload_lock: + self.segment_cids[seg_num] = cid + print(f"IPFS: segment_{seg_num:05d}.ts -> {cid}", file=sys.stderr) + self._update_ipfs_playlist() + except Exception as e: + print(f"Failed to upload segment {seg_num}: {e}", file=sys.stderr) + + def _upload_new_segments(self): + """Check for new segments and queue them for async IPFS upload.""" + import sys + import time # Find all segments segments = sorted(self.output_dir.glob("segment_*.ts")) @@ -739,53 +776,48 @@ class IPFSHLSOutput(Output): seg_name = seg_path.stem # segment_00000 seg_num = int(seg_name.split("_")[1]) - # Skip if already uploaded - if seg_num in self.segment_cids: - continue + # Skip if already uploaded or queued + with self._upload_lock: + if seg_num in self.segment_cids: + continue - # Skip if segment is still being written (check if file size is stable) + # Skip if segment is still being written (quick non-blocking check) try: size1 = seg_path.stat().st_size if size1 == 0: continue # Empty file, still being created - import time - time.sleep(0.1) + time.sleep(0.01) # Very short check size2 = seg_path.stat().st_size if size1 != size2: continue # File still being written except FileNotFoundError: continue - # Upload to IPFS - cid = self._ipfs_add_file(seg_path, pin=True) - if cid: - self.segment_cids[seg_num] = cid - print(f"IPFS: segment_{seg_num:05d}.ts -> {cid}", file=sys.stderr) - - # Update playlist after each segment upload - self._update_ipfs_playlist() + # Queue for async upload (non-blocking!) + self._upload_queue.put((seg_path, seg_num)) def _update_ipfs_playlist(self): """Generate and upload IPFS-aware m3u8 playlist.""" - if not self.segment_cids: - return - import sys - # Build m3u8 content with IPFS URLs - lines = [ - "#EXTM3U", - "#EXT-X-VERSION:3", - f"#EXT-X-TARGETDURATION:{int(self.segment_duration) + 1}", - "#EXT-X-MEDIA-SEQUENCE:0", - ] + with self._upload_lock: + if not self.segment_cids: + return - # Add segments in order - for seg_num in sorted(self.segment_cids.keys()): - cid = self.segment_cids[seg_num] - lines.append(f"#EXTINF:{self.segment_duration:.3f},") - lines.append(f"{self.ipfs_gateway}/{cid}") + # Build m3u8 content with IPFS URLs + lines = [ + "#EXTM3U", + "#EXT-X-VERSION:3", + f"#EXT-X-TARGETDURATION:{int(self.segment_duration) + 1}", + "#EXT-X-MEDIA-SEQUENCE:0", + ] + + # Add segments in order + for seg_num in sorted(self.segment_cids.keys()): + cid = self.segment_cids[seg_num] + lines.append(f"#EXTINF:{self.segment_duration:.3f},") + lines.append(f"{self.ipfs_gateway}/{cid}") playlist_content = "\n".join(lines) + "\n" @@ -842,9 +874,13 @@ class IPFSHLSOutput(Output): self._process.wait() self._is_open = False - # Upload any remaining segments + # Queue any remaining segments self._upload_new_segments() + # Wait for pending uploads to complete + self._upload_queue.put(None) # Signal shutdown + self._upload_thread.join(timeout=30) + # Generate final playlist with #EXT-X-ENDLIST if self.segment_cids: lines = [ diff --git a/streaming/stream_sexp_generic.py b/streaming/stream_sexp_generic.py index b5d3f82..6f11ef1 100644 --- a/streaming/stream_sexp_generic.py +++ b/streaming/stream_sexp_generic.py @@ -928,7 +928,18 @@ class StreamInterpreter: ctx = Context(fps=fps) # Output (with optional audio sync) + # Resolve audio path lazily here if it wasn't resolved during parsing audio = self.audio_playback + if audio and not Path(audio).exists(): + # Try to resolve as friendly name (may have failed during parsing) + audio_name = Path(audio).name # Get just the name part + resolved = self._resolve_name(audio_name) + if resolved and resolved.exists(): + audio = str(resolved) + print(f"Lazy resolved audio: {audio}", file=sys.stderr) + else: + print(f"WARNING: Audio file not found: {audio}", file=sys.stderr) + audio = None if output == "pipe": out = PipeOutput(size=(w, h), fps=fps, audio_source=audio) elif output == "preview": diff --git a/tasks/streaming.py b/tasks/streaming.py index 6ba0bef..3d061cb 100644 --- a/tasks/streaming.py +++ b/tasks/streaming.py @@ -84,11 +84,15 @@ def resolve_asset(ref: str, actor_id: Optional[str] = None) -> Optional[Path]: logger.info(f"Resolved '{ref}' via friendly name to {path}") return path - # File not in local cache - try fetching from IPFS - # The CID from friendly_names is an IPFS CID - print(f"RESOLVE_ASSET: file not local, trying IPFS fetch for {cid}", file=sys.stderr) + # File not in local cache - look up IPFS CID and fetch + # The cid from friendly_names is internal, need to get ipfs_cid from cache_items + ipfs_cid = _resolve_loop.run_until_complete(database.get_ipfs_cid(cid)) + if not ipfs_cid or ipfs_cid == cid: + # No separate IPFS CID, try using the cid directly (might be IPFS CID) + ipfs_cid = cid + print(f"RESOLVE_ASSET: file not local, trying IPFS fetch for {ipfs_cid}", file=sys.stderr) import ipfs_client - content = ipfs_client.get_bytes(cid, use_gateway_fallback=True) + content = ipfs_client.get_bytes(ipfs_cid, use_gateway_fallback=True) if content: # Save to local cache import tempfile