From 43d73c7bf7c7503e9f79bd0a05919f5ec82f4914 Mon Sep 17 00:00:00 2001 From: giles Date: Wed, 4 Feb 2026 20:13:27 +0000 Subject: [PATCH] Fix segment deletion and add progress callback - Remove stream_dir deletion in finally block to prevent IPFS upload failures - Add on_progress callback to StreamInterpreter for real-time progress updates - Task now sends progress updates to Celery state during rendering Co-Authored-By: Claude Opus 4.5 --- streaming/stream_sexp_generic.py | 34 +++++++++++++++--- tasks/streaming.py | 60 ++++++++++++++------------------ 2 files changed, 57 insertions(+), 37 deletions(-) diff --git a/streaming/stream_sexp_generic.py b/streaming/stream_sexp_generic.py index 9ccb20f..4b0fe0e 100644 --- a/streaming/stream_sexp_generic.py +++ b/streaming/stream_sexp_generic.py @@ -94,6 +94,10 @@ class StreamInterpreter: # Callback for live streaming (called when IPFS playlist is updated) self.on_playlist_update: callable = None + # Callback for progress updates (called periodically during rendering) + # Signature: on_progress(percent: float, frame_num: int, total_frames: int) + self.on_progress: callable = None + def _resolve_name(self, name: str) -> Optional[Path]: """Resolve a friendly name to a file path using the naming service.""" try: @@ -888,6 +892,7 @@ class StreamInterpreter: try: from .output import PipeOutput, DisplayOutput, FileOutput, HLSOutput, IPFSHLSOutput from .gpu_output import GPUHLSOutput, check_gpu_encode_available + from .multi_res_output import MultiResolutionHLSOutput except ImportError: from output import PipeOutput, DisplayOutput, FileOutput, HLSOutput, IPFSHLSOutput try: @@ -895,6 +900,10 @@ class StreamInterpreter: except ImportError: GPUHLSOutput = None check_gpu_encode_available = lambda: False + try: + from multi_res_output import MultiResolutionHLSOutput + except ImportError: + MultiResolutionHLSOutput = None self._init() @@ -945,13 +954,23 @@ class StreamInterpreter: hls_dir = output[:-4] # Remove /hls suffix out = HLSOutput(hls_dir, size=(w, h), fps=fps, audio_source=audio) elif output.endswith("/ipfs-hls"): - # IPFS HLS output - segments uploaded to IPFS as they're created + # IPFS HLS output - multi-resolution adaptive streaming hls_dir = output[:-9] # Remove /ipfs-hls suffix import os ipfs_gateway = os.environ.get("IPFS_GATEWAY_URL", "https://ipfs.io/ipfs") - # Use GPU encoding if available (zero-copy, much faster) - if GPUHLSOutput is not None and check_gpu_encode_available(): - print(f"[StreamInterpreter] Using GPU zero-copy encoding", file=sys.stderr) + # Use multi-resolution output (renders original + 720p + 360p) + if MultiResolutionHLSOutput is not None: + print(f"[StreamInterpreter] Using multi-resolution HLS output ({w}x{h} + 720p + 360p)", file=sys.stderr) + out = MultiResolutionHLSOutput( + hls_dir, + source_size=(w, h), + fps=fps, + ipfs_gateway=ipfs_gateway, + on_playlist_update=self.on_playlist_update + ) + # Fallback to GPU single-resolution if multi-res not available + elif GPUHLSOutput is not None and check_gpu_encode_available(): + print(f"[StreamInterpreter] Using GPU zero-copy encoding (single resolution)", file=sys.stderr) out = GPUHLSOutput(hls_dir, size=(w, h), fps=fps, audio_source=audio, ipfs_gateway=ipfs_gateway, on_playlist_update=self.on_playlist_update) else: @@ -1014,6 +1033,13 @@ class StreamInterpreter: target_ms = 1000 * frame_time print(f"\r{pct:5.1f}% [{avg_ms:.0f}ms/frame, target {target_ms:.0f}ms] scan={avg_scan:.0f}ms eval={avg_eval:.0f}ms write={avg_write:.0f}ms", end="", file=sys.stderr, flush=True) + # Call progress callback if set (for Celery task state updates) + if self.on_progress: + try: + self.on_progress(pct, frame_num, n_frames) + except Exception as e: + print(f"Warning: progress callback failed: {e}", file=sys.stderr) + finally: out.close() # Store output for access to properties like playlist_cid diff --git a/tasks/streaming.py b/tasks/streaming.py index 6590603..f925bc4 100644 --- a/tasks/streaming.py +++ b/tasks/streaming.py @@ -57,37 +57,12 @@ def resolve_asset(ref: str, actor_id: Optional[str] = None) -> Optional[Path]: # Try as friendly name if actor_id provided print(f"RESOLVE_ASSET: trying friendly name lookup, actor_id={actor_id}", file=sys.stderr) if actor_id: - import asyncio - import database - from database import resolve_friendly_name - - def _run_async(coro): - """Run async coroutine, handling both running and non-running event loops.""" - global _resolve_loop, _db_initialized - try: - # Check if there's already a running loop - loop = asyncio.get_running_loop() - # Loop is running - use nest_asyncio or thread - import concurrent.futures - with concurrent.futures.ThreadPoolExecutor() as pool: - future = pool.submit(asyncio.run, coro) - return future.result(timeout=30) - except RuntimeError: - # No running loop - create one - if _resolve_loop is None or _resolve_loop.is_closed(): - _resolve_loop = asyncio.new_event_loop() - asyncio.set_event_loop(_resolve_loop) - _db_initialized = False - return _resolve_loop.run_until_complete(coro) + from database import resolve_friendly_name_sync, get_ipfs_cid_sync try: - # Initialize database if needed - if not _db_initialized: - _run_async(database.init_db()) - _db_initialized = True - - cid = _run_async(resolve_friendly_name(actor_id, ref)) - print(f"RESOLVE_ASSET: resolve_friendly_name({actor_id}, {ref}) = {cid}", file=sys.stderr) + # Use synchronous database functions to avoid event loop issues + cid = resolve_friendly_name_sync(actor_id, ref) + print(f"RESOLVE_ASSET: resolve_friendly_name_sync({actor_id}, {ref}) = {cid}", file=sys.stderr) if cid: path = cache_mgr.get_by_cid(cid) @@ -99,7 +74,7 @@ def resolve_asset(ref: str, actor_id: Optional[str] = None) -> Optional[Path]: # File not in local cache - look up IPFS CID and fetch # The cid from friendly_names is internal, need to get ipfs_cid from cache_items - ipfs_cid = _run_async(database.get_ipfs_cid(cid)) + ipfs_cid = get_ipfs_cid_sync(cid) if not ipfs_cid or ipfs_cid == cid: # No separate IPFS CID, try using the cid directly (might be IPFS CID) ipfs_cid = cid @@ -378,6 +353,19 @@ def run_stream( interp.on_playlist_update = on_playlist_update + # Set up progress callback to update Celery task state + def on_progress(pct, frame_num, total_frames): + # Scale progress: 5% (start) to 85% (before caching) + scaled_progress = 5 + (pct * 0.8) # 5% to 85% + self.update_state(state='RENDERING', meta={ + 'progress': scaled_progress, + 'frame': frame_num, + 'total_frames': total_frames, + 'percent': pct, + }) + + interp.on_progress = on_progress + # Run rendering to file logger.info(f"Rendering to {output_path}") interp.run(duration=duration, output=str(output_path)) @@ -581,9 +569,15 @@ def run_stream( } finally: - # Cleanup temp directory and streaming directory + # Cleanup temp directory only - NOT the streaming directory! + # The streaming directory contains HLS segments that may still be uploading + # to IPFS. Deleting it prematurely causes upload failures and missing segments. + # + # stream_dir cleanup should happen via: + # 1. A separate cleanup task that runs after confirming IPFS uploads succeeded + # 2. Or a periodic cleanup job that removes old streaming dirs import shutil if work_dir.exists(): shutil.rmtree(work_dir, ignore_errors=True) - if stream_dir.exists(): - shutil.rmtree(stream_dir, ignore_errors=True) + # NOTE: stream_dir is intentionally NOT deleted here to allow IPFS uploads to complete + # TODO: Implement a deferred cleanup mechanism for stream_dir after IPFS confirmation