Fix segment deletion and add progress callback
- Remove stream_dir deletion in finally block to prevent IPFS upload failures - Add on_progress callback to StreamInterpreter for real-time progress updates - Task now sends progress updates to Celery state during rendering Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -94,6 +94,10 @@ class StreamInterpreter:
|
|||||||
# Callback for live streaming (called when IPFS playlist is updated)
|
# Callback for live streaming (called when IPFS playlist is updated)
|
||||||
self.on_playlist_update: callable = None
|
self.on_playlist_update: callable = None
|
||||||
|
|
||||||
|
# Callback for progress updates (called periodically during rendering)
|
||||||
|
# Signature: on_progress(percent: float, frame_num: int, total_frames: int)
|
||||||
|
self.on_progress: callable = None
|
||||||
|
|
||||||
def _resolve_name(self, name: str) -> Optional[Path]:
|
def _resolve_name(self, name: str) -> Optional[Path]:
|
||||||
"""Resolve a friendly name to a file path using the naming service."""
|
"""Resolve a friendly name to a file path using the naming service."""
|
||||||
try:
|
try:
|
||||||
@@ -888,6 +892,7 @@ class StreamInterpreter:
|
|||||||
try:
|
try:
|
||||||
from .output import PipeOutput, DisplayOutput, FileOutput, HLSOutput, IPFSHLSOutput
|
from .output import PipeOutput, DisplayOutput, FileOutput, HLSOutput, IPFSHLSOutput
|
||||||
from .gpu_output import GPUHLSOutput, check_gpu_encode_available
|
from .gpu_output import GPUHLSOutput, check_gpu_encode_available
|
||||||
|
from .multi_res_output import MultiResolutionHLSOutput
|
||||||
except ImportError:
|
except ImportError:
|
||||||
from output import PipeOutput, DisplayOutput, FileOutput, HLSOutput, IPFSHLSOutput
|
from output import PipeOutput, DisplayOutput, FileOutput, HLSOutput, IPFSHLSOutput
|
||||||
try:
|
try:
|
||||||
@@ -895,6 +900,10 @@ class StreamInterpreter:
|
|||||||
except ImportError:
|
except ImportError:
|
||||||
GPUHLSOutput = None
|
GPUHLSOutput = None
|
||||||
check_gpu_encode_available = lambda: False
|
check_gpu_encode_available = lambda: False
|
||||||
|
try:
|
||||||
|
from multi_res_output import MultiResolutionHLSOutput
|
||||||
|
except ImportError:
|
||||||
|
MultiResolutionHLSOutput = None
|
||||||
|
|
||||||
self._init()
|
self._init()
|
||||||
|
|
||||||
@@ -945,13 +954,23 @@ class StreamInterpreter:
|
|||||||
hls_dir = output[:-4] # Remove /hls suffix
|
hls_dir = output[:-4] # Remove /hls suffix
|
||||||
out = HLSOutput(hls_dir, size=(w, h), fps=fps, audio_source=audio)
|
out = HLSOutput(hls_dir, size=(w, h), fps=fps, audio_source=audio)
|
||||||
elif output.endswith("/ipfs-hls"):
|
elif output.endswith("/ipfs-hls"):
|
||||||
# IPFS HLS output - segments uploaded to IPFS as they're created
|
# IPFS HLS output - multi-resolution adaptive streaming
|
||||||
hls_dir = output[:-9] # Remove /ipfs-hls suffix
|
hls_dir = output[:-9] # Remove /ipfs-hls suffix
|
||||||
import os
|
import os
|
||||||
ipfs_gateway = os.environ.get("IPFS_GATEWAY_URL", "https://ipfs.io/ipfs")
|
ipfs_gateway = os.environ.get("IPFS_GATEWAY_URL", "https://ipfs.io/ipfs")
|
||||||
# Use GPU encoding if available (zero-copy, much faster)
|
# Use multi-resolution output (renders original + 720p + 360p)
|
||||||
if GPUHLSOutput is not None and check_gpu_encode_available():
|
if MultiResolutionHLSOutput is not None:
|
||||||
print(f"[StreamInterpreter] Using GPU zero-copy encoding", file=sys.stderr)
|
print(f"[StreamInterpreter] Using multi-resolution HLS output ({w}x{h} + 720p + 360p)", file=sys.stderr)
|
||||||
|
out = MultiResolutionHLSOutput(
|
||||||
|
hls_dir,
|
||||||
|
source_size=(w, h),
|
||||||
|
fps=fps,
|
||||||
|
ipfs_gateway=ipfs_gateway,
|
||||||
|
on_playlist_update=self.on_playlist_update
|
||||||
|
)
|
||||||
|
# Fallback to GPU single-resolution if multi-res not available
|
||||||
|
elif GPUHLSOutput is not None and check_gpu_encode_available():
|
||||||
|
print(f"[StreamInterpreter] Using GPU zero-copy encoding (single resolution)", file=sys.stderr)
|
||||||
out = GPUHLSOutput(hls_dir, size=(w, h), fps=fps, audio_source=audio, ipfs_gateway=ipfs_gateway,
|
out = GPUHLSOutput(hls_dir, size=(w, h), fps=fps, audio_source=audio, ipfs_gateway=ipfs_gateway,
|
||||||
on_playlist_update=self.on_playlist_update)
|
on_playlist_update=self.on_playlist_update)
|
||||||
else:
|
else:
|
||||||
@@ -1014,6 +1033,13 @@ class StreamInterpreter:
|
|||||||
target_ms = 1000 * frame_time
|
target_ms = 1000 * frame_time
|
||||||
print(f"\r{pct:5.1f}% [{avg_ms:.0f}ms/frame, target {target_ms:.0f}ms] scan={avg_scan:.0f}ms eval={avg_eval:.0f}ms write={avg_write:.0f}ms", end="", file=sys.stderr, flush=True)
|
print(f"\r{pct:5.1f}% [{avg_ms:.0f}ms/frame, target {target_ms:.0f}ms] scan={avg_scan:.0f}ms eval={avg_eval:.0f}ms write={avg_write:.0f}ms", end="", file=sys.stderr, flush=True)
|
||||||
|
|
||||||
|
# Call progress callback if set (for Celery task state updates)
|
||||||
|
if self.on_progress:
|
||||||
|
try:
|
||||||
|
self.on_progress(pct, frame_num, n_frames)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Warning: progress callback failed: {e}", file=sys.stderr)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
out.close()
|
out.close()
|
||||||
# Store output for access to properties like playlist_cid
|
# Store output for access to properties like playlist_cid
|
||||||
|
|||||||
@@ -57,37 +57,12 @@ def resolve_asset(ref: str, actor_id: Optional[str] = None) -> Optional[Path]:
|
|||||||
# Try as friendly name if actor_id provided
|
# Try as friendly name if actor_id provided
|
||||||
print(f"RESOLVE_ASSET: trying friendly name lookup, actor_id={actor_id}", file=sys.stderr)
|
print(f"RESOLVE_ASSET: trying friendly name lookup, actor_id={actor_id}", file=sys.stderr)
|
||||||
if actor_id:
|
if actor_id:
|
||||||
import asyncio
|
from database import resolve_friendly_name_sync, get_ipfs_cid_sync
|
||||||
import database
|
|
||||||
from database import resolve_friendly_name
|
|
||||||
|
|
||||||
def _run_async(coro):
|
|
||||||
"""Run async coroutine, handling both running and non-running event loops."""
|
|
||||||
global _resolve_loop, _db_initialized
|
|
||||||
try:
|
|
||||||
# Check if there's already a running loop
|
|
||||||
loop = asyncio.get_running_loop()
|
|
||||||
# Loop is running - use nest_asyncio or thread
|
|
||||||
import concurrent.futures
|
|
||||||
with concurrent.futures.ThreadPoolExecutor() as pool:
|
|
||||||
future = pool.submit(asyncio.run, coro)
|
|
||||||
return future.result(timeout=30)
|
|
||||||
except RuntimeError:
|
|
||||||
# No running loop - create one
|
|
||||||
if _resolve_loop is None or _resolve_loop.is_closed():
|
|
||||||
_resolve_loop = asyncio.new_event_loop()
|
|
||||||
asyncio.set_event_loop(_resolve_loop)
|
|
||||||
_db_initialized = False
|
|
||||||
return _resolve_loop.run_until_complete(coro)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Initialize database if needed
|
# Use synchronous database functions to avoid event loop issues
|
||||||
if not _db_initialized:
|
cid = resolve_friendly_name_sync(actor_id, ref)
|
||||||
_run_async(database.init_db())
|
print(f"RESOLVE_ASSET: resolve_friendly_name_sync({actor_id}, {ref}) = {cid}", file=sys.stderr)
|
||||||
_db_initialized = True
|
|
||||||
|
|
||||||
cid = _run_async(resolve_friendly_name(actor_id, ref))
|
|
||||||
print(f"RESOLVE_ASSET: resolve_friendly_name({actor_id}, {ref}) = {cid}", file=sys.stderr)
|
|
||||||
|
|
||||||
if cid:
|
if cid:
|
||||||
path = cache_mgr.get_by_cid(cid)
|
path = cache_mgr.get_by_cid(cid)
|
||||||
@@ -99,7 +74,7 @@ def resolve_asset(ref: str, actor_id: Optional[str] = None) -> Optional[Path]:
|
|||||||
|
|
||||||
# File not in local cache - look up IPFS CID and fetch
|
# File not in local cache - look up IPFS CID and fetch
|
||||||
# The cid from friendly_names is internal, need to get ipfs_cid from cache_items
|
# The cid from friendly_names is internal, need to get ipfs_cid from cache_items
|
||||||
ipfs_cid = _run_async(database.get_ipfs_cid(cid))
|
ipfs_cid = get_ipfs_cid_sync(cid)
|
||||||
if not ipfs_cid or ipfs_cid == cid:
|
if not ipfs_cid or ipfs_cid == cid:
|
||||||
# No separate IPFS CID, try using the cid directly (might be IPFS CID)
|
# No separate IPFS CID, try using the cid directly (might be IPFS CID)
|
||||||
ipfs_cid = cid
|
ipfs_cid = cid
|
||||||
@@ -378,6 +353,19 @@ def run_stream(
|
|||||||
|
|
||||||
interp.on_playlist_update = on_playlist_update
|
interp.on_playlist_update = on_playlist_update
|
||||||
|
|
||||||
|
# Set up progress callback to update Celery task state
|
||||||
|
def on_progress(pct, frame_num, total_frames):
|
||||||
|
# Scale progress: 5% (start) to 85% (before caching)
|
||||||
|
scaled_progress = 5 + (pct * 0.8) # 5% to 85%
|
||||||
|
self.update_state(state='RENDERING', meta={
|
||||||
|
'progress': scaled_progress,
|
||||||
|
'frame': frame_num,
|
||||||
|
'total_frames': total_frames,
|
||||||
|
'percent': pct,
|
||||||
|
})
|
||||||
|
|
||||||
|
interp.on_progress = on_progress
|
||||||
|
|
||||||
# Run rendering to file
|
# Run rendering to file
|
||||||
logger.info(f"Rendering to {output_path}")
|
logger.info(f"Rendering to {output_path}")
|
||||||
interp.run(duration=duration, output=str(output_path))
|
interp.run(duration=duration, output=str(output_path))
|
||||||
@@ -581,9 +569,15 @@ def run_stream(
|
|||||||
}
|
}
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# Cleanup temp directory and streaming directory
|
# Cleanup temp directory only - NOT the streaming directory!
|
||||||
|
# The streaming directory contains HLS segments that may still be uploading
|
||||||
|
# to IPFS. Deleting it prematurely causes upload failures and missing segments.
|
||||||
|
#
|
||||||
|
# stream_dir cleanup should happen via:
|
||||||
|
# 1. A separate cleanup task that runs after confirming IPFS uploads succeeded
|
||||||
|
# 2. Or a periodic cleanup job that removes old streaming dirs
|
||||||
import shutil
|
import shutil
|
||||||
if work_dir.exists():
|
if work_dir.exists():
|
||||||
shutil.rmtree(work_dir, ignore_errors=True)
|
shutil.rmtree(work_dir, ignore_errors=True)
|
||||||
if stream_dir.exists():
|
# NOTE: stream_dir is intentionally NOT deleted here to allow IPFS uploads to complete
|
||||||
shutil.rmtree(stream_dir, ignore_errors=True)
|
# TODO: Implement a deferred cleanup mechanism for stream_dir after IPFS confirmation
|
||||||
|
|||||||
Reference in New Issue
Block a user