Squashed 'l1/' content from commit 670aa58

git-subtree-dir: l1
git-subtree-split: 670aa582df99e87fca7c247b949baf452e8c234f
This commit is contained in:
giles
2026-02-24 23:07:19 +00:00
commit 80c94ebea7
225 changed files with 57298 additions and 0 deletions

13
tasks/__init__.py Normal file
View File

@@ -0,0 +1,13 @@
# art-celery/tasks - Celery tasks for streaming video rendering
#
# Tasks:
# 1. run_stream - Execute a streaming S-expression recipe
# 2. upload_to_ipfs - Background IPFS upload for media files
from .streaming import run_stream
from .ipfs_upload import upload_to_ipfs
__all__ = [
"run_stream",
"upload_to_ipfs",
]

93
tasks/ipfs_upload.py Normal file
View File

@@ -0,0 +1,93 @@
"""
Background IPFS upload task.
Uploads files to IPFS in the background after initial local storage.
This allows fast uploads while still getting IPFS CIDs eventually.
"""
import logging
import os
import sys
from pathlib import Path
from typing import Optional
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from celery_app import app
import ipfs_client
logger = logging.getLogger(__name__)
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def upload_to_ipfs(self, local_cid: str, actor_id: str) -> Optional[str]:
"""
Upload a locally cached file to IPFS in the background.
Args:
local_cid: The local content hash of the file
actor_id: The user who uploaded the file
Returns:
IPFS CID if successful, None if failed
"""
from cache_manager import get_cache_manager
import asyncio
import database
logger.info(f"Background IPFS upload starting for {local_cid[:16]}...")
try:
cache_mgr = get_cache_manager()
# Get the file path from local cache
file_path = cache_mgr.get_by_cid(local_cid)
if not file_path or not file_path.exists():
logger.error(f"File not found for local CID {local_cid[:16]}...")
return None
# Upload to IPFS
logger.info(f"Uploading {file_path} to IPFS...")
ipfs_cid = ipfs_client.add_file(file_path)
if not ipfs_cid:
logger.error(f"IPFS upload failed for {local_cid[:16]}...")
raise self.retry(exc=Exception("IPFS upload failed"))
logger.info(f"IPFS upload successful: {local_cid[:16]}... -> {ipfs_cid[:16]}...")
# Update database with IPFS CID
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Initialize database pool if needed
loop.run_until_complete(database.init_pool())
# Update cache_items table
loop.run_until_complete(
database.update_cache_item_ipfs_cid(local_cid, ipfs_cid)
)
# Update friendly_names table to use IPFS CID instead of local hash
# This ensures assets can be fetched by remote workers via IPFS
try:
loop.run_until_complete(
database.update_friendly_name_cid(actor_id, local_cid, ipfs_cid)
)
logger.info(f"Friendly name updated: {local_cid[:16]}... -> {ipfs_cid[:16]}...")
except Exception as e:
logger.warning(f"Failed to update friendly name CID: {e}")
# Create index from IPFS CID to local cache
cache_mgr._set_content_index(ipfs_cid, local_cid)
logger.info(f"Database updated with IPFS CID for {local_cid[:16]}...")
finally:
loop.close()
return ipfs_cid
except Exception as e:
logger.error(f"Background IPFS upload error: {e}")
raise self.retry(exc=e)

724
tasks/streaming.py Normal file
View File

@@ -0,0 +1,724 @@
"""
Streaming video rendering task.
Executes S-expression recipes for frame-by-frame video processing.
Supports CID and friendly name references for assets.
Supports pause/resume/restart for long renders.
"""
import hashlib
import logging
import os
import signal
import sys
import tempfile
from pathlib import Path
from typing import Dict, Optional
from celery import current_task
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from celery_app import app
from cache_manager import get_cache_manager
logger = logging.getLogger(__name__)
class PauseRequested(Exception):
"""Raised when user requests pause via SIGTERM."""
pass
# Debug: verify module is being loaded
print(f"DEBUG MODULE LOAD: tasks/streaming.py loaded at {__file__}", file=sys.stderr)
# Module-level event loop for database operations
_resolve_loop = None
_db_initialized = False
def resolve_asset(ref: str, actor_id: Optional[str] = None) -> Optional[Path]:
"""
Resolve an asset reference (CID or friendly name) to a file path.
Args:
ref: CID or friendly name (e.g., "my-video" or "QmXyz...")
actor_id: User ID for friendly name resolution
Returns:
Path to the asset file, or None if not found
"""
global _resolve_loop, _db_initialized
import sys
print(f"RESOLVE_ASSET: ref={ref}, actor_id={actor_id}", file=sys.stderr)
cache_mgr = get_cache_manager()
# Try as direct CID first
path = cache_mgr.get_by_cid(ref)
print(f"RESOLVE_ASSET: get_by_cid({ref}) = {path}", file=sys.stderr)
if path and path.exists():
logger.info(f"Resolved {ref[:16]}... as CID to {path}")
return path
# Try as friendly name if actor_id provided
print(f"RESOLVE_ASSET: trying friendly name lookup, actor_id={actor_id}", file=sys.stderr)
if actor_id:
from database import resolve_friendly_name_sync, get_ipfs_cid_sync
try:
# Use synchronous database functions to avoid event loop issues
cid = resolve_friendly_name_sync(actor_id, ref)
print(f"RESOLVE_ASSET: resolve_friendly_name_sync({actor_id}, {ref}) = {cid}", file=sys.stderr)
if cid:
path = cache_mgr.get_by_cid(cid)
print(f"RESOLVE_ASSET: get_by_cid({cid}) = {path}", file=sys.stderr)
if path and path.exists():
print(f"RESOLVE_ASSET: SUCCESS - resolved to {path}", file=sys.stderr)
logger.info(f"Resolved '{ref}' via friendly name to {path}")
return path
# File not in local cache - look up IPFS CID and fetch
# The cid from friendly_names is internal, need to get ipfs_cid from cache_items
ipfs_cid = get_ipfs_cid_sync(cid)
if not ipfs_cid or ipfs_cid == cid:
# No separate IPFS CID, try using the cid directly (might be IPFS CID)
ipfs_cid = cid
print(f"RESOLVE_ASSET: file not local, trying IPFS fetch for {ipfs_cid}", file=sys.stderr)
import ipfs_client
content = ipfs_client.get_bytes(ipfs_cid, use_gateway_fallback=True)
if content:
# Save to local cache
import tempfile
from pathlib import Path
with tempfile.NamedTemporaryFile(delete=False, suffix='.sexp') as tmp:
tmp.write(content)
tmp_path = Path(tmp.name)
# Store in cache
cached_file, _ = cache_mgr.put(tmp_path, node_type="effect", skip_ipfs=True)
# Index by IPFS CID for future lookups
cache_mgr._set_content_index(cid, cached_file.cid)
print(f"RESOLVE_ASSET: fetched from IPFS and cached at {cached_file.path}", file=sys.stderr)
logger.info(f"Fetched '{ref}' from IPFS and cached at {cached_file.path}")
return cached_file.path
else:
print(f"RESOLVE_ASSET: IPFS fetch failed for {cid}", file=sys.stderr)
except Exception as e:
print(f"RESOLVE_ASSET: ERROR - {e}", file=sys.stderr)
logger.warning(f"Failed to resolve friendly name '{ref}': {e}")
logger.warning(f"Could not resolve asset reference: {ref}")
return None
class CIDVideoSource:
"""
Video source that resolves CIDs to file paths.
Wraps the streaming VideoSource to work with cached assets.
"""
def __init__(self, cid: str, fps: float = 30, actor_id: Optional[str] = None):
self.cid = cid
self.fps = fps
self.actor_id = actor_id
self._source = None
def _ensure_source(self):
if self._source is None:
logger.info(f"CIDVideoSource._ensure_source: resolving cid={self.cid} with actor_id={self.actor_id}")
path = resolve_asset(self.cid, self.actor_id)
if not path:
raise ValueError(f"Could not resolve video source '{self.cid}' for actor_id={self.actor_id}")
logger.info(f"CIDVideoSource._ensure_source: resolved to path={path}")
# Use GPU-accelerated video source if available
try:
from sexp_effects.primitive_libs.streaming_gpu import GPUVideoSource, GPU_AVAILABLE
if GPU_AVAILABLE:
logger.info(f"CIDVideoSource: using GPUVideoSource for {path}")
self._source = GPUVideoSource(str(path), self.fps, prefer_gpu=True)
else:
raise ImportError("GPU not available")
except (ImportError, Exception) as e:
logger.info(f"CIDVideoSource: falling back to CPU VideoSource ({e})")
from sexp_effects.primitive_libs.streaming import VideoSource
self._source = VideoSource(str(path), self.fps)
def read_at(self, t: float):
self._ensure_source()
return self._source.read_at(t)
def read(self):
self._ensure_source()
return self._source.read()
@property
def size(self):
self._ensure_source()
return self._source.size
@property
def duration(self):
self._ensure_source()
return self._source._duration
@property
def path(self):
self._ensure_source()
return self._source.path
@property
def _stream_time(self):
self._ensure_source()
return self._source._stream_time
def skip(self):
self._ensure_source()
return self._source.skip()
def close(self):
if self._source:
self._source.close()
class CIDAudioAnalyzer:
"""
Audio analyzer that resolves CIDs to file paths.
"""
def __init__(self, cid: str, actor_id: Optional[str] = None):
self.cid = cid
self.actor_id = actor_id
self._analyzer = None
def _ensure_analyzer(self):
if self._analyzer is None:
path = resolve_asset(self.cid, self.actor_id)
if not path:
raise ValueError(f"Could not resolve audio source: {self.cid}")
from sexp_effects.primitive_libs.streaming import AudioAnalyzer
self._analyzer = AudioAnalyzer(str(path))
def get_energy(self, t: float) -> float:
self._ensure_analyzer()
return self._analyzer.get_energy(t)
def get_beat(self, t: float) -> bool:
self._ensure_analyzer()
return self._analyzer.get_beat(t)
def get_beat_count(self, t: float) -> int:
self._ensure_analyzer()
return self._analyzer.get_beat_count(t)
@property
def duration(self):
self._ensure_analyzer()
return self._analyzer.duration
def create_cid_primitives(actor_id: Optional[str] = None):
"""
Create CID-aware primitive functions.
Returns dict of primitives that resolve CIDs before creating sources.
"""
from celery.utils.log import get_task_logger
cid_logger = get_task_logger(__name__)
def prim_make_video_source_cid(cid: str, fps: float = 30):
cid_logger.warning(f"DEBUG: CID-aware make-video-source: cid={cid}, actor_id={actor_id}")
return CIDVideoSource(cid, fps, actor_id)
def prim_make_audio_analyzer_cid(cid: str):
cid_logger.warning(f"DEBUG: CID-aware make-audio-analyzer: cid={cid}, actor_id={actor_id}")
return CIDAudioAnalyzer(cid, actor_id)
return {
'streaming:make-video-source': prim_make_video_source_cid,
'streaming:make-audio-analyzer': prim_make_audio_analyzer_cid,
}
@app.task(bind=True, name='tasks.run_stream')
def run_stream(
self,
run_id: str,
recipe_sexp: str,
output_name: str = "output.mp4",
duration: Optional[float] = None,
fps: Optional[float] = None,
actor_id: Optional[str] = None,
sources_sexp: Optional[str] = None,
audio_sexp: Optional[str] = None,
resume: bool = False,
) -> dict:
"""
Execute a streaming S-expression recipe.
Args:
run_id: The run ID for database tracking
recipe_sexp: The recipe S-expression content
output_name: Name for the output file
duration: Optional duration override (seconds)
fps: Optional FPS override
actor_id: User ID for friendly name resolution
sources_sexp: Optional sources config S-expression
audio_sexp: Optional audio config S-expression
resume: If True, load checkpoint and resume from where we left off
Returns:
Dict with output_cid, output_path, and status
"""
global _resolve_loop, _db_initialized
task_id = self.request.id
logger.info(f"Starting stream task {task_id} for run {run_id} (resume={resume})")
# Handle graceful pause (SIGTERM from Celery revoke)
pause_requested = False
original_sigterm = signal.getsignal(signal.SIGTERM)
def handle_sigterm(signum, frame):
nonlocal pause_requested
pause_requested = True
logger.info(f"Pause requested for run {run_id} (SIGTERM received)")
signal.signal(signal.SIGTERM, handle_sigterm)
self.update_state(state='INITIALIZING', meta={'progress': 0})
# Get the app directory for primitive/effect paths
app_dir = Path(__file__).parent.parent # celery/
sexp_effects_dir = app_dir / "sexp_effects"
effects_dir = app_dir / "effects"
templates_dir = app_dir / "templates"
# Create temp directory for work
work_dir = Path(tempfile.mkdtemp(prefix="stream_"))
recipe_path = work_dir / "recipe.sexp"
# Write output to shared cache for live streaming access
cache_dir = Path(os.environ.get("CACHE_DIR", "/data/cache"))
stream_dir = cache_dir / "streaming" / run_id
stream_dir.mkdir(parents=True, exist_ok=True)
# Use IPFS HLS output for distributed streaming - segments uploaded to IPFS
output_path = str(stream_dir) + "/ipfs-hls" # /ipfs-hls suffix triggers IPFS HLS mode
# Create symlinks to effect directories so relative paths work
(work_dir / "sexp_effects").symlink_to(sexp_effects_dir)
(work_dir / "effects").symlink_to(effects_dir)
(work_dir / "templates").symlink_to(templates_dir)
try:
# Write recipe to temp file
recipe_path.write_text(recipe_sexp)
# Write optional config files
sources_path = None
if sources_sexp:
sources_path = work_dir / "sources.sexp"
sources_path.write_text(sources_sexp)
audio_path = None
if audio_sexp:
audio_path = work_dir / "audio.sexp"
audio_path.write_text(audio_sexp)
self.update_state(state='RENDERING', meta={'progress': 5})
# Import the streaming interpreter
from streaming.stream_sexp_generic import StreamInterpreter
# Load checkpoint if resuming
checkpoint = None
if resume:
import asyncio
import database
try:
if _resolve_loop is None or _resolve_loop.is_closed():
_resolve_loop = asyncio.new_event_loop()
asyncio.set_event_loop(_resolve_loop)
_db_initialized = False
if not _db_initialized:
_resolve_loop.run_until_complete(database.init_db())
_db_initialized = True
checkpoint = _resolve_loop.run_until_complete(database.get_run_checkpoint(run_id))
if checkpoint:
logger.info(f"Loaded checkpoint for run {run_id}: frame {checkpoint.get('frame_num')}")
else:
logger.warning(f"No checkpoint found for run {run_id}, starting from beginning")
except Exception as e:
logger.error(f"Failed to load checkpoint: {e}")
checkpoint = None
# Create interpreter (pass actor_id for friendly name resolution)
interp = StreamInterpreter(str(recipe_path), actor_id=actor_id)
# Set primitive library directory explicitly
interp.primitive_lib_dir = sexp_effects_dir / "primitive_libs"
if fps:
interp.config['fps'] = fps
if sources_path:
interp.sources_config = sources_path
if audio_path:
interp.audio_config = audio_path
# Override primitives with CID-aware versions
cid_prims = create_cid_primitives(actor_id)
from celery.utils.log import get_task_logger
task_logger = get_task_logger(__name__)
task_logger.warning(f"DEBUG: Overriding primitives: {list(cid_prims.keys())}")
task_logger.warning(f"DEBUG: Primitives before: {list(interp.primitives.keys())[:10]}...")
interp.primitives.update(cid_prims)
task_logger.warning(f"DEBUG: streaming:make-video-source is now: {type(interp.primitives.get('streaming:make-video-source'))}")
# Set up callback to update database when IPFS playlist is created (for live HLS redirect)
def on_playlist_update(playlist_cid, quality_playlists=None):
"""Update database with playlist CID and quality info.
Args:
playlist_cid: Master playlist CID
quality_playlists: Dict of quality name -> {cid, width, height, bitrate}
"""
global _resolve_loop, _db_initialized
import asyncio
import database
try:
if _resolve_loop is None or _resolve_loop.is_closed():
_resolve_loop = asyncio.new_event_loop()
asyncio.set_event_loop(_resolve_loop)
_db_initialized = False
if not _db_initialized:
_resolve_loop.run_until_complete(database.init_db())
_db_initialized = True
_resolve_loop.run_until_complete(database.update_pending_run_playlist(run_id, playlist_cid, quality_playlists))
logger.info(f"Updated pending run {run_id} with IPFS playlist: {playlist_cid}, qualities: {list(quality_playlists.keys()) if quality_playlists else []}")
except Exception as e:
logger.error(f"Failed to update playlist CID in database: {e}")
interp.on_playlist_update = on_playlist_update
# Set up progress callback to update Celery task state
def on_progress(pct, frame_num, total_frames):
nonlocal pause_requested
# Scale progress: 5% (start) to 85% (before caching)
scaled_progress = 5 + (pct * 0.8) # 5% to 85%
self.update_state(state='RENDERING', meta={
'progress': scaled_progress,
'frame': frame_num,
'total_frames': total_frames,
'percent': pct,
})
interp.on_progress = on_progress
# Set up checkpoint callback to save state at segment boundaries
def on_checkpoint(ckpt):
"""Save checkpoint state to database."""
nonlocal pause_requested
global _resolve_loop, _db_initialized
import asyncio
import database
try:
if _resolve_loop is None or _resolve_loop.is_closed():
_resolve_loop = asyncio.new_event_loop()
asyncio.set_event_loop(_resolve_loop)
_db_initialized = False
if not _db_initialized:
_resolve_loop.run_until_complete(database.init_db())
_db_initialized = True
# Get total frames from interpreter config
total_frames = None
if hasattr(interp, 'output') and hasattr(interp.output, '_frame_count'):
# Estimate total frames based on duration
fps_val = interp.config.get('fps', 30)
for name, val in interp.globals.items():
if hasattr(val, 'duration'):
total_frames = int(val.duration * fps_val)
break
_resolve_loop.run_until_complete(database.update_pending_run_checkpoint(
run_id=run_id,
checkpoint_frame=ckpt['frame_num'],
checkpoint_t=ckpt['t'],
checkpoint_scans=ckpt.get('scans'),
total_frames=total_frames,
))
logger.info(f"Saved checkpoint for run {run_id}: frame {ckpt['frame_num']}")
# Check if pause was requested after checkpoint
if pause_requested:
logger.info(f"Pause requested after checkpoint, raising PauseRequested")
raise PauseRequested("Render paused by user")
except PauseRequested:
raise # Re-raise to stop the render
except Exception as e:
logger.error(f"Failed to save checkpoint: {e}")
interp.on_checkpoint = on_checkpoint
# Build resume state for the interpreter (includes segment CIDs for output)
resume_from = None
if checkpoint:
resume_from = {
'frame_num': checkpoint.get('frame_num'),
't': checkpoint.get('t'),
'scans': checkpoint.get('scans', {}),
}
# Add segment CIDs if available (from quality_playlists in checkpoint)
# Note: We need to extract segment_cids from the output's state, which isn't
# directly stored. For now, the output will re-check existing segments on disk.
# Run rendering to file
logger.info(f"Rendering to {output_path}" + (f" (resuming from frame {resume_from['frame_num']})" if resume_from else ""))
render_paused = False
try:
interp.run(duration=duration, output=str(output_path), resume_from=resume_from)
except PauseRequested:
# Graceful pause - checkpoint already saved
render_paused = True
logger.info(f"Render paused for run {run_id}")
# Restore original signal handler
signal.signal(signal.SIGTERM, original_sigterm)
if render_paused:
import asyncio
import database
try:
if _resolve_loop is None or _resolve_loop.is_closed():
_resolve_loop = asyncio.new_event_loop()
asyncio.set_event_loop(_resolve_loop)
_db_initialized = False
if not _db_initialized:
_resolve_loop.run_until_complete(database.init_db())
_db_initialized = True
_resolve_loop.run_until_complete(database.update_pending_run_status(run_id, 'paused'))
except Exception as e:
logger.error(f"Failed to update status to paused: {e}")
return {"status": "paused", "run_id": run_id, "task_id": task_id}
# Check for interpreter errors
if interp.errors:
error_msg = f"Rendering failed with {len(interp.errors)} errors: {interp.errors[0]}"
raise RuntimeError(error_msg)
self.update_state(state='CACHING', meta={'progress': 90})
# Get IPFS playlist CID if available (from IPFSHLSOutput)
ipfs_playlist_cid = None
ipfs_playlist_url = None
segment_cids = {}
if hasattr(interp, 'output') and hasattr(interp.output, 'playlist_cid'):
ipfs_playlist_cid = interp.output.playlist_cid
ipfs_playlist_url = interp.output.playlist_url
segment_cids = getattr(interp.output, 'segment_cids', {})
logger.info(f"IPFS HLS: playlist={ipfs_playlist_cid}, segments={len(segment_cids)}")
# Update pending run with playlist CID for live HLS redirect
if ipfs_playlist_cid:
import asyncio
import database
try:
if _resolve_loop is None or _resolve_loop.is_closed():
_resolve_loop = asyncio.new_event_loop()
asyncio.set_event_loop(_resolve_loop)
if not _db_initialized:
_resolve_loop.run_until_complete(database.init_db())
_db_initialized = True
_resolve_loop.run_until_complete(database.update_pending_run_playlist(run_id, ipfs_playlist_cid))
logger.info(f"Updated pending run {run_id} with IPFS playlist CID: {ipfs_playlist_cid}")
except Exception as e:
logger.error(f"Failed to update pending run with playlist CID: {e}")
raise # Fail fast - database errors should not be silently ignored
# HLS output creates playlist and segments
# - Single-res: stream_dir/stream.m3u8 and stream_dir/segment_*.ts
# - Multi-res: stream_dir/original/playlist.m3u8 and stream_dir/original/segment_*.ts
hls_playlist = stream_dir / "stream.m3u8"
if not hls_playlist.exists():
# Try multi-res output path
hls_playlist = stream_dir / "original" / "playlist.m3u8"
# Validate HLS output (must have playlist and at least one segment)
if not hls_playlist.exists():
raise RuntimeError("HLS playlist not created - rendering likely failed")
segments = list(stream_dir.glob("segment_*.ts"))
if not segments:
# Try multi-res output path
segments = list(stream_dir.glob("original/segment_*.ts"))
if not segments:
raise RuntimeError("No HLS segments created - rendering likely failed")
logger.info(f"HLS rendering complete: {len(segments)} segments created, IPFS playlist: {ipfs_playlist_cid}")
# Mux HLS segments into a single MP4 for persistent cache storage
final_mp4 = stream_dir / "output.mp4"
import subprocess
mux_cmd = [
"ffmpeg", "-y",
"-i", str(hls_playlist),
"-c", "copy", # Just copy streams, no re-encoding
"-movflags", "+faststart", # Move moov atom to start for web playback
"-fflags", "+genpts", # Generate proper timestamps
str(final_mp4)
]
logger.info(f"Muxing HLS to MP4: {' '.join(mux_cmd)}")
result = subprocess.run(mux_cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.warning(f"HLS mux failed: {result.stderr}")
# Fall back to using the first segment for caching
final_mp4 = segments[0]
# Store output in cache
if final_mp4.exists():
cache_mgr = get_cache_manager()
cached_file, ipfs_cid = cache_mgr.put(
source_path=final_mp4,
node_type="STREAM_OUTPUT",
node_id=f"stream_{task_id}",
)
logger.info(f"Stream output cached: CID={cached_file.cid}, IPFS={ipfs_cid}")
# Save to database - reuse the module-level loop to avoid pool conflicts
import asyncio
import database
try:
# Reuse or create event loop
if _resolve_loop is None or _resolve_loop.is_closed():
_resolve_loop = asyncio.new_event_loop()
asyncio.set_event_loop(_resolve_loop)
_db_initialized = False
# Initialize database pool if needed
if not _db_initialized:
_resolve_loop.run_until_complete(database.init_db())
_db_initialized = True
# Get recipe CID from pending_run
pending = _resolve_loop.run_until_complete(database.get_pending_run(run_id))
recipe_cid = pending.get("recipe", "streaming") if pending else "streaming"
# Save to run_cache for completed runs
logger.info(f"Saving run {run_id} to run_cache with actor_id={actor_id}")
_resolve_loop.run_until_complete(database.save_run_cache(
run_id=run_id,
output_cid=cached_file.cid,
recipe=recipe_cid,
inputs=[],
ipfs_cid=ipfs_cid,
actor_id=actor_id,
))
# Register output as video type so frontend displays it correctly
_resolve_loop.run_until_complete(database.add_item_type(
cid=cached_file.cid,
actor_id=actor_id,
item_type="video",
path=str(cached_file.path),
description=f"Stream output from run {run_id}",
))
logger.info(f"Registered output {cached_file.cid} as video type")
# Update pending run status
_resolve_loop.run_until_complete(database.update_pending_run_status(
run_id=run_id,
status="completed",
))
logger.info(f"Saved run {run_id} to database with actor_id={actor_id}")
except Exception as db_err:
logger.error(f"Failed to save run to database: {db_err}")
raise RuntimeError(f"Database error saving run {run_id}: {db_err}") from db_err
return {
"status": "completed",
"run_id": run_id,
"task_id": task_id,
"output_cid": cached_file.cid,
"ipfs_cid": ipfs_cid,
"output_path": str(cached_file.path),
# IPFS HLS streaming info
"ipfs_playlist_cid": ipfs_playlist_cid,
"ipfs_playlist_url": ipfs_playlist_url,
"ipfs_segment_count": len(segment_cids),
}
else:
# Update pending run status to failed - reuse module loop
import asyncio
import database
try:
if _resolve_loop is None or _resolve_loop.is_closed():
_resolve_loop = asyncio.new_event_loop()
asyncio.set_event_loop(_resolve_loop)
_db_initialized = False
if not _db_initialized:
_resolve_loop.run_until_complete(database.init_db())
_db_initialized = True
_resolve_loop.run_until_complete(database.update_pending_run_status(
run_id=run_id,
status="failed",
error="Output file not created",
))
except Exception as db_err:
logger.warning(f"Failed to update run status: {db_err}")
return {
"status": "failed",
"run_id": run_id,
"task_id": task_id,
"error": "Output file not created",
}
except Exception as e:
logger.error(f"Stream task {task_id} failed: {e}")
import traceback
traceback.print_exc()
# Update pending run status to failed - reuse module loop
import asyncio
import database
try:
if _resolve_loop is None or _resolve_loop.is_closed():
_resolve_loop = asyncio.new_event_loop()
asyncio.set_event_loop(_resolve_loop)
_db_initialized = False
if not _db_initialized:
_resolve_loop.run_until_complete(database.init_db())
_db_initialized = True
_resolve_loop.run_until_complete(database.update_pending_run_status(
run_id=run_id,
status="failed",
error=str(e),
))
except Exception as db_err:
logger.warning(f"Failed to update run status: {db_err}")
return {
"status": "failed",
"run_id": run_id,
"task_id": task_id,
"error": str(e),
}
finally:
# Cleanup temp directory only - NOT the streaming directory!
# The streaming directory contains HLS segments that may still be uploading
# to IPFS. Deleting it prematurely causes upload failures and missing segments.
#
# stream_dir cleanup should happen via:
# 1. A separate cleanup task that runs after confirming IPFS uploads succeeded
# 2. Or a periodic cleanup job that removes old streaming dirs
import shutil
if work_dir.exists():
shutil.rmtree(work_dir, ignore_errors=True)
# NOTE: stream_dir is intentionally NOT deleted here to allow IPFS uploads to complete
# TODO: Implement a deferred cleanup mechanism for stream_dir after IPFS confirmation