Fix lazy audio path resolution for GPU streaming
Some checks are pending
GPU Worker CI/CD / test (push) Waiting to run
GPU Worker CI/CD / deploy (push) Blocked by required conditions

Audio playback path was being resolved during parsing when database
may not be ready, causing fallback to non-existent path. Now resolves
lazily when stream starts, matching how audio analyzer works.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
giles
2026-02-04 11:32:04 +00:00
parent ef3638d3cf
commit ed617fcdd6
9 changed files with 159 additions and 57 deletions

View File

@@ -10,8 +10,8 @@
(require-primitives "streaming") (require-primitives "streaming")
;; Audio analyzer (provides beat detection and energy levels) ;; Audio analyzer (provides beat detection and energy levels)
;; Paths relative to working directory (project root) ;; Using friendly name for asset resolution
(def music (streaming:make-audio-analyzer "woods_half/halleluwah.webm")) (def music (streaming:make-audio-analyzer "woods-audio"))
;; Audio playback path (for sync with video output) ;; Audio playback path (for sync with video output)
(audio-playback "woods_half/halleluwah.webm") (audio-playback "woods-audio")

View File

@@ -10,16 +10,16 @@
(require-primitives "streaming") (require-primitives "streaming")
;; Video sources array ;; Video sources array
;; Paths relative to working directory (project root) ;; Using friendly names for asset resolution
(def sources [ (def sources [
(streaming:make-video-source "woods/1.webm" 10) (streaming:make-video-source "woods-1" 10)
(streaming:make-video-source "woods/2.webm" 10) (streaming:make-video-source "woods-2" 10)
(streaming:make-video-source "woods/3.webm" 10) (streaming:make-video-source "woods-3" 10)
(streaming:make-video-source "woods/4.webm" 10) (streaming:make-video-source "woods-4" 10)
(streaming:make-video-source "woods/5.webm" 10) (streaming:make-video-source "woods-5" 10)
(streaming:make-video-source "woods/6.webm" 10) (streaming:make-video-source "woods-6" 10)
(streaming:make-video-source "woods/7.webm" 10) (streaming:make-video-source "woods-7" 10)
(streaming:make-video-source "woods/8.webm" 10) (streaming:make-video-source "woods-8" 10)
]) ])
;; Per-pair effect config: rotation direction, rotation ranges, zoom ranges ;; Per-pair effect config: rotation direction, rotation ranges, zoom ranges

View File

@@ -244,6 +244,15 @@ async def update_cache_item_ipfs_cid(cid: str, ipfs_cid: str) -> bool:
return result == "UPDATE 1" return result == "UPDATE 1"
async def get_ipfs_cid(cid: str) -> Optional[str]:
"""Get the IPFS CID for a cache item by its internal CID."""
async with pool.acquire() as conn:
return await conn.fetchval(
"SELECT ipfs_cid FROM cache_items WHERE cid = $1",
cid
)
async def delete_cache_item(cid: str) -> bool: async def delete_cache_item(cid: str) -> bool:
"""Delete a cache item and all associated data (cascades).""" """Delete a cache item and all associated data (cascades)."""
async with pool.acquire() as conn: async with pool.acquire() as conn:

View File

@@ -125,6 +125,8 @@
dir (get cfg :dir) dir (get cfg :dir)
rot-max-a (get cfg :rot-a) rot-max-a (get cfg :rot-a)
rot-max-b (get cfg :rot-b) rot-max-b (get cfg :rot-b)
zoom-a (get cfg :zoom-a)
zoom-b (get cfg :zoom-b)
pair-angle (get pstate :angle) pair-angle (get pstate :angle)
inv-a-on (> (get pstate :inv-a) 0) inv-a-on (> (get pstate :inv-a) 0)
inv-b-on (> (get pstate :inv-b) 0) inv-b-on (> (get pstate :inv-b) 0)
@@ -140,10 +142,12 @@
;; Define effect pipelines for each source ;; Define effect pipelines for each source
;; These get compiled to single CUDA kernels! ;; These get compiled to single CUDA kernels!
effects-a [{:op "rotate" :angle angle-a} effects-a [{:op "zoom" :amount zoom-a}
{:op "rotate" :angle angle-a}
{:op "hue_shift" :degrees (if hue-a-on hue-a-val 0)} {:op "hue_shift" :degrees (if hue-a-on hue-a-val 0)}
{:op "invert" :amount (if inv-a-on 1 0)}] {:op "invert" :amount (if inv-a-on 1 0)}]
effects-b [{:op "rotate" :angle angle-b} effects-b [{:op "zoom" :amount zoom-b}
{:op "rotate" :angle angle-b}
{:op "hue_shift" :degrees (if hue-b-on hue-b-val 0)} {:op "hue_shift" :degrees (if hue-b-on hue-b-val 0)}
{:op "invert" :amount (if inv-b-on 1 0)}] {:op "invert" :amount (if inv-b-on 1 0)}]

View File

@@ -65,11 +65,20 @@ class VideoSource:
self._last_read_time = -1 self._last_read_time = -1
self._cached_frame = None self._cached_frame = None
# Check if file exists
if not self.path.exists():
raise FileNotFoundError(f"Video file not found: {self.path}")
# Get video info # Get video info
cmd = ["ffprobe", "-v", "quiet", "-print_format", "json", cmd = ["ffprobe", "-v", "quiet", "-print_format", "json",
"-show_streams", str(self.path)] "-show_streams", str(self.path)]
result = subprocess.run(cmd, capture_output=True, text=True) result = subprocess.run(cmd, capture_output=True, text=True)
info = json.loads(result.stdout) if result.returncode != 0:
raise RuntimeError(f"Failed to probe video '{self.path}': {result.stderr}")
try:
info = json.loads(result.stdout)
except json.JSONDecodeError:
raise RuntimeError(f"Invalid video file or ffprobe failed: {self.path}")
for stream in info.get("streams", []): for stream in info.get("streams", []):
if stream.get("codec_type") == "video": if stream.get("codec_type") == "video":
@@ -281,16 +290,27 @@ class AudioAnalyzer:
self.path = Path(path) self.path = Path(path)
self.sample_rate = sample_rate self.sample_rate = sample_rate
# Check if file exists
if not self.path.exists():
raise FileNotFoundError(f"Audio file not found: {self.path}")
# Load audio via ffmpeg # Load audio via ffmpeg
cmd = ["ffmpeg", "-v", "quiet", "-i", str(self.path), cmd = ["ffmpeg", "-v", "error", "-i", str(self.path),
"-f", "f32le", "-ac", "1", "-ar", str(sample_rate), "-"] "-f", "f32le", "-ac", "1", "-ar", str(sample_rate), "-"]
result = subprocess.run(cmd, capture_output=True) result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
raise RuntimeError(f"Failed to load audio '{self.path}': {result.stderr.decode()}")
self._audio = np.frombuffer(result.stdout, dtype=np.float32) self._audio = np.frombuffer(result.stdout, dtype=np.float32)
if len(self._audio) == 0:
raise RuntimeError(f"Audio file is empty or invalid: {self.path}")
# Get duration # Get duration
cmd = ["ffprobe", "-v", "quiet", "-print_format", "json", cmd = ["ffprobe", "-v", "quiet", "-print_format", "json",
"-show_format", str(self.path)] "-show_format", str(self.path)]
info = json.loads(subprocess.run(cmd, capture_output=True, text=True).stdout) result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"Failed to probe audio '{self.path}': {result.stderr}")
info = json.loads(result.stdout)
self.duration = float(info.get("format", {}).get("duration", 60)) self.duration = float(info.get("format", {}).get("duration", 60))
# Beat detection state # Beat detection state

View File

@@ -235,6 +235,7 @@ class GPUHLSOutput:
self.ipfs_gateway = ipfs_gateway.rstrip("/") self.ipfs_gateway = ipfs_gateway.rstrip("/")
self._on_playlist_update = on_playlist_update self._on_playlist_update = on_playlist_update
self._is_open = True self._is_open = True
self.audio_source = audio_source
# GPU encoder # GPU encoder
self._gpu_encoder = GPUEncoder(size[0], size[1], fps, crf) self._gpu_encoder = GPUEncoder(size[0], size[1], fps, crf)
@@ -266,14 +267,29 @@ class GPUHLSOutput:
print(f"[GPUHLSOutput] Initialized {size[0]}x{size[1]} @ {fps}fps, GPU encoding", file=sys.stderr) print(f"[GPUHLSOutput] Initialized {size[0]}x{size[1]} @ {fps}fps, GPU encoding", file=sys.stderr)
def _setup_muxer(self): def _setup_muxer(self):
"""Setup ffmpeg for muxing H.264 to MPEG-TS segments.""" """Setup ffmpeg for muxing H.264 to MPEG-TS segments with optional audio."""
self.local_playlist_path = self.output_dir / "stream.m3u8" self.local_playlist_path = self.output_dir / "stream.m3u8"
cmd = [ cmd = [
"ffmpeg", "-y", "ffmpeg", "-y",
"-f", "h264", # Input is raw H.264 "-f", "h264", # Input is raw H.264
"-i", "-", "-i", "-",
"-c:v", "copy", # Just copy, no re-encoding ]
# Add audio input if provided
if self.audio_source:
cmd.extend(["-i", str(self.audio_source)])
cmd.extend(["-map", "0:v", "-map", "1:a"])
cmd.extend([
"-c:v", "copy", # Just copy video, no re-encoding
])
# Add audio codec if we have audio
if self.audio_source:
cmd.extend(["-c:a", "aac", "-b:a", "128k", "-shortest"])
cmd.extend([
"-f", "hls", "-f", "hls",
"-hls_time", str(self.segment_duration), "-hls_time", str(self.segment_duration),
"-hls_list_size", "0", "-hls_list_size", "0",
@@ -281,12 +297,14 @@ class GPUHLSOutput:
"-hls_segment_type", "mpegts", "-hls_segment_type", "mpegts",
"-hls_segment_filename", str(self.output_dir / "segment_%05d.ts"), "-hls_segment_filename", str(self.output_dir / "segment_%05d.ts"),
str(self.local_playlist_path), str(self.local_playlist_path),
] ])
print(f"[GPUHLSOutput] FFmpeg cmd: {' '.join(cmd)}", file=sys.stderr)
self._muxer = subprocess.Popen( self._muxer = subprocess.Popen(
cmd, cmd,
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
stderr=subprocess.DEVNULL, stderr=subprocess.PIPE, # Capture stderr for debugging
) )
def write(self, frame: Union[np.ndarray, 'cp.ndarray'], t: float = 0): def write(self, frame: Union[np.ndarray, 'cp.ndarray'], t: float = 0):

View File

@@ -11,6 +11,8 @@ Supports:
import numpy as np import numpy as np
import subprocess import subprocess
import threading
import queue
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Tuple, Optional, List, Union from typing import Tuple, Optional, List, Union
from pathlib import Path from pathlib import Path
@@ -665,12 +667,18 @@ class IPFSHLSOutput(Output):
self.segment_cids: dict = {} # segment_number -> cid self.segment_cids: dict = {} # segment_number -> cid
self._last_segment_checked = -1 self._last_segment_checked = -1
self._playlist_cid: Optional[str] = None self._playlist_cid: Optional[str] = None
self._upload_lock = threading.Lock()
# Import IPFS client # Import IPFS client
from ipfs_client import add_file, add_bytes from ipfs_client import add_file, add_bytes
self._ipfs_add_file = add_file self._ipfs_add_file = add_file
self._ipfs_add_bytes = add_bytes self._ipfs_add_bytes = add_bytes
# Background upload thread for async IPFS uploads
self._upload_queue = queue.Queue()
self._upload_thread = threading.Thread(target=self._upload_worker, daemon=True)
self._upload_thread.start()
# Local HLS paths # Local HLS paths
self.local_playlist_path = self.output_dir / "stream.m3u8" self.local_playlist_path = self.output_dir / "stream.m3u8"
@@ -727,9 +735,38 @@ class IPFSHLSOutput(Output):
stderr=None, stderr=None,
) )
def _upload_new_segments(self): def _upload_worker(self):
"""Check for new segments and upload them to IPFS.""" """Background worker thread for async IPFS uploads."""
import sys import sys
while True:
try:
item = self._upload_queue.get(timeout=1.0)
if item is None: # Shutdown signal
break
seg_path, seg_num = item
self._do_upload(seg_path, seg_num)
except queue.Empty:
continue
except Exception as e:
print(f"Upload worker error: {e}", file=sys.stderr)
def _do_upload(self, seg_path: Path, seg_num: int):
"""Actually perform the upload (runs in background thread)."""
import sys
try:
cid = self._ipfs_add_file(seg_path, pin=True)
if cid:
with self._upload_lock:
self.segment_cids[seg_num] = cid
print(f"IPFS: segment_{seg_num:05d}.ts -> {cid}", file=sys.stderr)
self._update_ipfs_playlist()
except Exception as e:
print(f"Failed to upload segment {seg_num}: {e}", file=sys.stderr)
def _upload_new_segments(self):
"""Check for new segments and queue them for async IPFS upload."""
import sys
import time
# Find all segments # Find all segments
segments = sorted(self.output_dir.glob("segment_*.ts")) segments = sorted(self.output_dir.glob("segment_*.ts"))
@@ -739,53 +776,48 @@ class IPFSHLSOutput(Output):
seg_name = seg_path.stem # segment_00000 seg_name = seg_path.stem # segment_00000
seg_num = int(seg_name.split("_")[1]) seg_num = int(seg_name.split("_")[1])
# Skip if already uploaded # Skip if already uploaded or queued
if seg_num in self.segment_cids: with self._upload_lock:
continue if seg_num in self.segment_cids:
continue
# Skip if segment is still being written (check if file size is stable) # Skip if segment is still being written (quick non-blocking check)
try: try:
size1 = seg_path.stat().st_size size1 = seg_path.stat().st_size
if size1 == 0: if size1 == 0:
continue # Empty file, still being created continue # Empty file, still being created
import time time.sleep(0.01) # Very short check
time.sleep(0.1)
size2 = seg_path.stat().st_size size2 = seg_path.stat().st_size
if size1 != size2: if size1 != size2:
continue # File still being written continue # File still being written
except FileNotFoundError: except FileNotFoundError:
continue continue
# Upload to IPFS # Queue for async upload (non-blocking!)
cid = self._ipfs_add_file(seg_path, pin=True) self._upload_queue.put((seg_path, seg_num))
if cid:
self.segment_cids[seg_num] = cid
print(f"IPFS: segment_{seg_num:05d}.ts -> {cid}", file=sys.stderr)
# Update playlist after each segment upload
self._update_ipfs_playlist()
def _update_ipfs_playlist(self): def _update_ipfs_playlist(self):
"""Generate and upload IPFS-aware m3u8 playlist.""" """Generate and upload IPFS-aware m3u8 playlist."""
if not self.segment_cids:
return
import sys import sys
# Build m3u8 content with IPFS URLs with self._upload_lock:
lines = [ if not self.segment_cids:
"#EXTM3U", return
"#EXT-X-VERSION:3",
f"#EXT-X-TARGETDURATION:{int(self.segment_duration) + 1}",
"#EXT-X-MEDIA-SEQUENCE:0",
]
# Add segments in order # Build m3u8 content with IPFS URLs
for seg_num in sorted(self.segment_cids.keys()): lines = [
cid = self.segment_cids[seg_num] "#EXTM3U",
lines.append(f"#EXTINF:{self.segment_duration:.3f},") "#EXT-X-VERSION:3",
lines.append(f"{self.ipfs_gateway}/{cid}") f"#EXT-X-TARGETDURATION:{int(self.segment_duration) + 1}",
"#EXT-X-MEDIA-SEQUENCE:0",
]
# Add segments in order
for seg_num in sorted(self.segment_cids.keys()):
cid = self.segment_cids[seg_num]
lines.append(f"#EXTINF:{self.segment_duration:.3f},")
lines.append(f"{self.ipfs_gateway}/{cid}")
playlist_content = "\n".join(lines) + "\n" playlist_content = "\n".join(lines) + "\n"
@@ -842,9 +874,13 @@ class IPFSHLSOutput(Output):
self._process.wait() self._process.wait()
self._is_open = False self._is_open = False
# Upload any remaining segments # Queue any remaining segments
self._upload_new_segments() self._upload_new_segments()
# Wait for pending uploads to complete
self._upload_queue.put(None) # Signal shutdown
self._upload_thread.join(timeout=30)
# Generate final playlist with #EXT-X-ENDLIST # Generate final playlist with #EXT-X-ENDLIST
if self.segment_cids: if self.segment_cids:
lines = [ lines = [

View File

@@ -928,7 +928,18 @@ class StreamInterpreter:
ctx = Context(fps=fps) ctx = Context(fps=fps)
# Output (with optional audio sync) # Output (with optional audio sync)
# Resolve audio path lazily here if it wasn't resolved during parsing
audio = self.audio_playback audio = self.audio_playback
if audio and not Path(audio).exists():
# Try to resolve as friendly name (may have failed during parsing)
audio_name = Path(audio).name # Get just the name part
resolved = self._resolve_name(audio_name)
if resolved and resolved.exists():
audio = str(resolved)
print(f"Lazy resolved audio: {audio}", file=sys.stderr)
else:
print(f"WARNING: Audio file not found: {audio}", file=sys.stderr)
audio = None
if output == "pipe": if output == "pipe":
out = PipeOutput(size=(w, h), fps=fps, audio_source=audio) out = PipeOutput(size=(w, h), fps=fps, audio_source=audio)
elif output == "preview": elif output == "preview":

View File

@@ -84,11 +84,15 @@ def resolve_asset(ref: str, actor_id: Optional[str] = None) -> Optional[Path]:
logger.info(f"Resolved '{ref}' via friendly name to {path}") logger.info(f"Resolved '{ref}' via friendly name to {path}")
return path return path
# File not in local cache - try fetching from IPFS # File not in local cache - look up IPFS CID and fetch
# The CID from friendly_names is an IPFS CID # The cid from friendly_names is internal, need to get ipfs_cid from cache_items
print(f"RESOLVE_ASSET: file not local, trying IPFS fetch for {cid}", file=sys.stderr) ipfs_cid = _resolve_loop.run_until_complete(database.get_ipfs_cid(cid))
if not ipfs_cid or ipfs_cid == cid:
# No separate IPFS CID, try using the cid directly (might be IPFS CID)
ipfs_cid = cid
print(f"RESOLVE_ASSET: file not local, trying IPFS fetch for {ipfs_cid}", file=sys.stderr)
import ipfs_client import ipfs_client
content = ipfs_client.get_bytes(cid, use_gateway_fallback=True) content = ipfs_client.get_bytes(ipfs_cid, use_gateway_fallback=True)
if content: if content:
# Save to local cache # Save to local cache
import tempfile import tempfile