""" Streaming video rendering task. Executes S-expression recipes for frame-by-frame video processing. Supports CID and friendly name references for assets. Supports pause/resume/restart for long renders. """ import hashlib import logging import os import signal import sys import tempfile from pathlib import Path from typing import Dict, Optional from celery import current_task # Add parent directory to path for imports sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from celery_app import app from cache_manager import get_cache_manager logger = logging.getLogger(__name__) class PauseRequested(Exception): """Raised when user requests pause via SIGTERM.""" pass # Debug: verify module is being loaded print(f"DEBUG MODULE LOAD: tasks/streaming.py loaded at {__file__}", file=sys.stderr) # Module-level event loop for database operations _resolve_loop = None _db_initialized = False def resolve_asset(ref: str, actor_id: Optional[str] = None) -> Optional[Path]: """ Resolve an asset reference (CID or friendly name) to a file path. Args: ref: CID or friendly name (e.g., "my-video" or "QmXyz...") actor_id: User ID for friendly name resolution Returns: Path to the asset file, or None if not found """ global _resolve_loop, _db_initialized import sys print(f"RESOLVE_ASSET: ref={ref}, actor_id={actor_id}", file=sys.stderr) cache_mgr = get_cache_manager() # Try as direct CID first path = cache_mgr.get_by_cid(ref) print(f"RESOLVE_ASSET: get_by_cid({ref}) = {path}", file=sys.stderr) if path and path.exists(): logger.info(f"Resolved {ref[:16]}... as CID to {path}") return path # Try as friendly name if actor_id provided print(f"RESOLVE_ASSET: trying friendly name lookup, actor_id={actor_id}", file=sys.stderr) if actor_id: from database import resolve_friendly_name_sync, get_ipfs_cid_sync try: # Use synchronous database functions to avoid event loop issues cid = resolve_friendly_name_sync(actor_id, ref) print(f"RESOLVE_ASSET: resolve_friendly_name_sync({actor_id}, {ref}) = {cid}", file=sys.stderr) if cid: path = cache_mgr.get_by_cid(cid) print(f"RESOLVE_ASSET: get_by_cid({cid}) = {path}", file=sys.stderr) if path and path.exists(): print(f"RESOLVE_ASSET: SUCCESS - resolved to {path}", file=sys.stderr) logger.info(f"Resolved '{ref}' via friendly name to {path}") return path # File not in local cache - look up IPFS CID and fetch # The cid from friendly_names is internal, need to get ipfs_cid from cache_items ipfs_cid = get_ipfs_cid_sync(cid) if not ipfs_cid or ipfs_cid == cid: # No separate IPFS CID, try using the cid directly (might be IPFS CID) ipfs_cid = cid print(f"RESOLVE_ASSET: file not local, trying IPFS fetch for {ipfs_cid}", file=sys.stderr) import ipfs_client content = ipfs_client.get_bytes(ipfs_cid, use_gateway_fallback=True) if content: # Save to local cache import tempfile from pathlib import Path with tempfile.NamedTemporaryFile(delete=False, suffix='.sexp') as tmp: tmp.write(content) tmp_path = Path(tmp.name) # Store in cache cached_file, _ = cache_mgr.put(tmp_path, node_type="effect", skip_ipfs=True) # Index by IPFS CID for future lookups cache_mgr._set_content_index(cid, cached_file.cid) print(f"RESOLVE_ASSET: fetched from IPFS and cached at {cached_file.path}", file=sys.stderr) logger.info(f"Fetched '{ref}' from IPFS and cached at {cached_file.path}") return cached_file.path else: print(f"RESOLVE_ASSET: IPFS fetch failed for {cid}", file=sys.stderr) except Exception as e: print(f"RESOLVE_ASSET: ERROR - {e}", file=sys.stderr) logger.warning(f"Failed to resolve friendly name '{ref}': {e}") logger.warning(f"Could not resolve asset reference: {ref}") return None class CIDVideoSource: """ Video source that resolves CIDs to file paths. Wraps the streaming VideoSource to work with cached assets. """ def __init__(self, cid: str, fps: float = 30, actor_id: Optional[str] = None): self.cid = cid self.fps = fps self.actor_id = actor_id self._source = None def _ensure_source(self): if self._source is None: logger.info(f"CIDVideoSource._ensure_source: resolving cid={self.cid} with actor_id={self.actor_id}") path = resolve_asset(self.cid, self.actor_id) if not path: raise ValueError(f"Could not resolve video source '{self.cid}' for actor_id={self.actor_id}") logger.info(f"CIDVideoSource._ensure_source: resolved to path={path}") # Use GPU-accelerated video source if available try: from sexp_effects.primitive_libs.streaming_gpu import GPUVideoSource, GPU_AVAILABLE if GPU_AVAILABLE: logger.info(f"CIDVideoSource: using GPUVideoSource for {path}") self._source = GPUVideoSource(str(path), self.fps, prefer_gpu=True) else: raise ImportError("GPU not available") except (ImportError, Exception) as e: logger.info(f"CIDVideoSource: falling back to CPU VideoSource ({e})") from sexp_effects.primitive_libs.streaming import VideoSource self._source = VideoSource(str(path), self.fps) def read_at(self, t: float): self._ensure_source() return self._source.read_at(t) def read(self): self._ensure_source() return self._source.read() @property def size(self): self._ensure_source() return self._source.size @property def duration(self): self._ensure_source() return self._source._duration @property def path(self): self._ensure_source() return self._source.path @property def _stream_time(self): self._ensure_source() return self._source._stream_time def skip(self): self._ensure_source() return self._source.skip() def close(self): if self._source: self._source.close() class CIDAudioAnalyzer: """ Audio analyzer that resolves CIDs to file paths. """ def __init__(self, cid: str, actor_id: Optional[str] = None): self.cid = cid self.actor_id = actor_id self._analyzer = None def _ensure_analyzer(self): if self._analyzer is None: path = resolve_asset(self.cid, self.actor_id) if not path: raise ValueError(f"Could not resolve audio source: {self.cid}") from sexp_effects.primitive_libs.streaming import AudioAnalyzer self._analyzer = AudioAnalyzer(str(path)) def get_energy(self, t: float) -> float: self._ensure_analyzer() return self._analyzer.get_energy(t) def get_beat(self, t: float) -> bool: self._ensure_analyzer() return self._analyzer.get_beat(t) def get_beat_count(self, t: float) -> int: self._ensure_analyzer() return self._analyzer.get_beat_count(t) @property def duration(self): self._ensure_analyzer() return self._analyzer.duration def create_cid_primitives(actor_id: Optional[str] = None): """ Create CID-aware primitive functions. Returns dict of primitives that resolve CIDs before creating sources. """ from celery.utils.log import get_task_logger cid_logger = get_task_logger(__name__) def prim_make_video_source_cid(cid: str, fps: float = 30): cid_logger.warning(f"DEBUG: CID-aware make-video-source: cid={cid}, actor_id={actor_id}") return CIDVideoSource(cid, fps, actor_id) def prim_make_audio_analyzer_cid(cid: str): cid_logger.warning(f"DEBUG: CID-aware make-audio-analyzer: cid={cid}, actor_id={actor_id}") return CIDAudioAnalyzer(cid, actor_id) return { 'streaming:make-video-source': prim_make_video_source_cid, 'streaming:make-audio-analyzer': prim_make_audio_analyzer_cid, } @app.task(bind=True, name='tasks.run_stream') def run_stream( self, run_id: str, recipe_sexp: str, output_name: str = "output.mp4", duration: Optional[float] = None, fps: Optional[float] = None, actor_id: Optional[str] = None, sources_sexp: Optional[str] = None, audio_sexp: Optional[str] = None, resume: bool = False, ) -> dict: """ Execute a streaming S-expression recipe. Args: run_id: The run ID for database tracking recipe_sexp: The recipe S-expression content output_name: Name for the output file duration: Optional duration override (seconds) fps: Optional FPS override actor_id: User ID for friendly name resolution sources_sexp: Optional sources config S-expression audio_sexp: Optional audio config S-expression resume: If True, load checkpoint and resume from where we left off Returns: Dict with output_cid, output_path, and status """ global _resolve_loop, _db_initialized task_id = self.request.id logger.info(f"Starting stream task {task_id} for run {run_id} (resume={resume})") # Handle graceful pause (SIGTERM from Celery revoke) pause_requested = False original_sigterm = signal.getsignal(signal.SIGTERM) def handle_sigterm(signum, frame): nonlocal pause_requested pause_requested = True logger.info(f"Pause requested for run {run_id} (SIGTERM received)") signal.signal(signal.SIGTERM, handle_sigterm) self.update_state(state='INITIALIZING', meta={'progress': 0}) # Get the app directory for primitive/effect paths app_dir = Path(__file__).parent.parent # celery/ sexp_effects_dir = app_dir / "sexp_effects" effects_dir = app_dir / "effects" templates_dir = app_dir / "templates" # Create temp directory for work work_dir = Path(tempfile.mkdtemp(prefix="stream_")) recipe_path = work_dir / "recipe.sexp" # Write output to shared cache for live streaming access cache_dir = Path(os.environ.get("CACHE_DIR", "/data/cache")) stream_dir = cache_dir / "streaming" / run_id stream_dir.mkdir(parents=True, exist_ok=True) # Use IPFS HLS output for distributed streaming - segments uploaded to IPFS output_path = str(stream_dir) + "/ipfs-hls" # /ipfs-hls suffix triggers IPFS HLS mode # Create symlinks to effect directories so relative paths work (work_dir / "sexp_effects").symlink_to(sexp_effects_dir) (work_dir / "effects").symlink_to(effects_dir) (work_dir / "templates").symlink_to(templates_dir) try: # Write recipe to temp file recipe_path.write_text(recipe_sexp) # Write optional config files sources_path = None if sources_sexp: sources_path = work_dir / "sources.sexp" sources_path.write_text(sources_sexp) audio_path = None if audio_sexp: audio_path = work_dir / "audio.sexp" audio_path.write_text(audio_sexp) self.update_state(state='RENDERING', meta={'progress': 5}) # Import the streaming interpreter from streaming.stream_sexp_generic import StreamInterpreter # Load checkpoint if resuming checkpoint = None if resume: import asyncio import database try: if _resolve_loop is None or _resolve_loop.is_closed(): _resolve_loop = asyncio.new_event_loop() asyncio.set_event_loop(_resolve_loop) _db_initialized = False if not _db_initialized: _resolve_loop.run_until_complete(database.init_db()) _db_initialized = True checkpoint = _resolve_loop.run_until_complete(database.get_run_checkpoint(run_id)) if checkpoint: logger.info(f"Loaded checkpoint for run {run_id}: frame {checkpoint.get('frame_num')}") else: logger.warning(f"No checkpoint found for run {run_id}, starting from beginning") except Exception as e: logger.error(f"Failed to load checkpoint: {e}") checkpoint = None # Create interpreter (pass actor_id for friendly name resolution) interp = StreamInterpreter(str(recipe_path), actor_id=actor_id) # Set primitive library directory explicitly interp.primitive_lib_dir = sexp_effects_dir / "primitive_libs" if fps: interp.config['fps'] = fps if sources_path: interp.sources_config = sources_path if audio_path: interp.audio_config = audio_path # Override primitives with CID-aware versions cid_prims = create_cid_primitives(actor_id) from celery.utils.log import get_task_logger task_logger = get_task_logger(__name__) task_logger.warning(f"DEBUG: Overriding primitives: {list(cid_prims.keys())}") task_logger.warning(f"DEBUG: Primitives before: {list(interp.primitives.keys())[:10]}...") interp.primitives.update(cid_prims) task_logger.warning(f"DEBUG: streaming:make-video-source is now: {type(interp.primitives.get('streaming:make-video-source'))}") # Set up callback to update database when IPFS playlist is created (for live HLS redirect) def on_playlist_update(playlist_cid, quality_playlists=None): """Update database with playlist CID and quality info. Args: playlist_cid: Master playlist CID quality_playlists: Dict of quality name -> {cid, width, height, bitrate} """ global _resolve_loop, _db_initialized import asyncio import database try: if _resolve_loop is None or _resolve_loop.is_closed(): _resolve_loop = asyncio.new_event_loop() asyncio.set_event_loop(_resolve_loop) _db_initialized = False if not _db_initialized: _resolve_loop.run_until_complete(database.init_db()) _db_initialized = True _resolve_loop.run_until_complete(database.update_pending_run_playlist(run_id, playlist_cid, quality_playlists)) logger.info(f"Updated pending run {run_id} with IPFS playlist: {playlist_cid}, qualities: {list(quality_playlists.keys()) if quality_playlists else []}") except Exception as e: logger.error(f"Failed to update playlist CID in database: {e}") interp.on_playlist_update = on_playlist_update # Set up progress callback to update Celery task state def on_progress(pct, frame_num, total_frames): nonlocal pause_requested # Scale progress: 5% (start) to 85% (before caching) scaled_progress = 5 + (pct * 0.8) # 5% to 85% self.update_state(state='RENDERING', meta={ 'progress': scaled_progress, 'frame': frame_num, 'total_frames': total_frames, 'percent': pct, }) interp.on_progress = on_progress # Set up checkpoint callback to save state at segment boundaries def on_checkpoint(ckpt): """Save checkpoint state to database.""" nonlocal pause_requested global _resolve_loop, _db_initialized import asyncio import database try: if _resolve_loop is None or _resolve_loop.is_closed(): _resolve_loop = asyncio.new_event_loop() asyncio.set_event_loop(_resolve_loop) _db_initialized = False if not _db_initialized: _resolve_loop.run_until_complete(database.init_db()) _db_initialized = True # Get total frames from interpreter config total_frames = None if hasattr(interp, 'output') and hasattr(interp.output, '_frame_count'): # Estimate total frames based on duration fps_val = interp.config.get('fps', 30) for name, val in interp.globals.items(): if hasattr(val, 'duration'): total_frames = int(val.duration * fps_val) break _resolve_loop.run_until_complete(database.update_pending_run_checkpoint( run_id=run_id, checkpoint_frame=ckpt['frame_num'], checkpoint_t=ckpt['t'], checkpoint_scans=ckpt.get('scans'), total_frames=total_frames, )) logger.info(f"Saved checkpoint for run {run_id}: frame {ckpt['frame_num']}") # Check if pause was requested after checkpoint if pause_requested: logger.info(f"Pause requested after checkpoint, raising PauseRequested") raise PauseRequested("Render paused by user") except PauseRequested: raise # Re-raise to stop the render except Exception as e: logger.error(f"Failed to save checkpoint: {e}") interp.on_checkpoint = on_checkpoint # Build resume state for the interpreter (includes segment CIDs for output) resume_from = None if checkpoint: resume_from = { 'frame_num': checkpoint.get('frame_num'), 't': checkpoint.get('t'), 'scans': checkpoint.get('scans', {}), } # Add segment CIDs if available (from quality_playlists in checkpoint) # Note: We need to extract segment_cids from the output's state, which isn't # directly stored. For now, the output will re-check existing segments on disk. # Run rendering to file logger.info(f"Rendering to {output_path}" + (f" (resuming from frame {resume_from['frame_num']})" if resume_from else "")) render_paused = False try: interp.run(duration=duration, output=str(output_path), resume_from=resume_from) except PauseRequested: # Graceful pause - checkpoint already saved render_paused = True logger.info(f"Render paused for run {run_id}") # Restore original signal handler signal.signal(signal.SIGTERM, original_sigterm) if render_paused: import asyncio import database try: if _resolve_loop is None or _resolve_loop.is_closed(): _resolve_loop = asyncio.new_event_loop() asyncio.set_event_loop(_resolve_loop) _db_initialized = False if not _db_initialized: _resolve_loop.run_until_complete(database.init_db()) _db_initialized = True _resolve_loop.run_until_complete(database.update_pending_run_status(run_id, 'paused')) except Exception as e: logger.error(f"Failed to update status to paused: {e}") return {"status": "paused", "run_id": run_id, "task_id": task_id} # Check for interpreter errors if interp.errors: error_msg = f"Rendering failed with {len(interp.errors)} errors: {interp.errors[0]}" raise RuntimeError(error_msg) self.update_state(state='CACHING', meta={'progress': 90}) # Get IPFS playlist CID if available (from IPFSHLSOutput) ipfs_playlist_cid = None ipfs_playlist_url = None segment_cids = {} if hasattr(interp, 'output') and hasattr(interp.output, 'playlist_cid'): ipfs_playlist_cid = interp.output.playlist_cid ipfs_playlist_url = interp.output.playlist_url segment_cids = getattr(interp.output, 'segment_cids', {}) logger.info(f"IPFS HLS: playlist={ipfs_playlist_cid}, segments={len(segment_cids)}") # Update pending run with playlist CID for live HLS redirect if ipfs_playlist_cid: import asyncio import database try: if _resolve_loop is None or _resolve_loop.is_closed(): _resolve_loop = asyncio.new_event_loop() asyncio.set_event_loop(_resolve_loop) if not _db_initialized: _resolve_loop.run_until_complete(database.init_db()) _db_initialized = True _resolve_loop.run_until_complete(database.update_pending_run_playlist(run_id, ipfs_playlist_cid)) logger.info(f"Updated pending run {run_id} with IPFS playlist CID: {ipfs_playlist_cid}") except Exception as e: logger.error(f"Failed to update pending run with playlist CID: {e}") raise # Fail fast - database errors should not be silently ignored # HLS output creates playlist and segments # - Single-res: stream_dir/stream.m3u8 and stream_dir/segment_*.ts # - Multi-res: stream_dir/original/playlist.m3u8 and stream_dir/original/segment_*.ts hls_playlist = stream_dir / "stream.m3u8" if not hls_playlist.exists(): # Try multi-res output path hls_playlist = stream_dir / "original" / "playlist.m3u8" # Validate HLS output (must have playlist and at least one segment) if not hls_playlist.exists(): raise RuntimeError("HLS playlist not created - rendering likely failed") segments = list(stream_dir.glob("segment_*.ts")) if not segments: # Try multi-res output path segments = list(stream_dir.glob("original/segment_*.ts")) if not segments: raise RuntimeError("No HLS segments created - rendering likely failed") logger.info(f"HLS rendering complete: {len(segments)} segments created, IPFS playlist: {ipfs_playlist_cid}") # Mux HLS segments into a single MP4 for persistent cache storage final_mp4 = stream_dir / "output.mp4" import subprocess mux_cmd = [ "ffmpeg", "-y", "-i", str(hls_playlist), "-c", "copy", # Just copy streams, no re-encoding "-movflags", "+faststart", # Move moov atom to start for web playback "-fflags", "+genpts", # Generate proper timestamps str(final_mp4) ] logger.info(f"Muxing HLS to MP4: {' '.join(mux_cmd)}") result = subprocess.run(mux_cmd, capture_output=True, text=True) if result.returncode != 0: logger.warning(f"HLS mux failed: {result.stderr}") # Fall back to using the first segment for caching final_mp4 = segments[0] # Store output in cache if final_mp4.exists(): cache_mgr = get_cache_manager() cached_file, ipfs_cid = cache_mgr.put( source_path=final_mp4, node_type="STREAM_OUTPUT", node_id=f"stream_{task_id}", ) logger.info(f"Stream output cached: CID={cached_file.cid}, IPFS={ipfs_cid}") # Save to database - reuse the module-level loop to avoid pool conflicts import asyncio import database try: # Reuse or create event loop if _resolve_loop is None or _resolve_loop.is_closed(): _resolve_loop = asyncio.new_event_loop() asyncio.set_event_loop(_resolve_loop) _db_initialized = False # Initialize database pool if needed if not _db_initialized: _resolve_loop.run_until_complete(database.init_db()) _db_initialized = True # Get recipe CID from pending_run pending = _resolve_loop.run_until_complete(database.get_pending_run(run_id)) recipe_cid = pending.get("recipe", "streaming") if pending else "streaming" # Save to run_cache for completed runs logger.info(f"Saving run {run_id} to run_cache with actor_id={actor_id}") _resolve_loop.run_until_complete(database.save_run_cache( run_id=run_id, output_cid=cached_file.cid, recipe=recipe_cid, inputs=[], ipfs_cid=ipfs_cid, actor_id=actor_id, )) # Register output as video type so frontend displays it correctly _resolve_loop.run_until_complete(database.add_item_type( cid=cached_file.cid, actor_id=actor_id, item_type="video", path=str(cached_file.path), description=f"Stream output from run {run_id}", )) logger.info(f"Registered output {cached_file.cid} as video type") # Update pending run status _resolve_loop.run_until_complete(database.update_pending_run_status( run_id=run_id, status="completed", )) logger.info(f"Saved run {run_id} to database with actor_id={actor_id}") except Exception as db_err: logger.error(f"Failed to save run to database: {db_err}") raise RuntimeError(f"Database error saving run {run_id}: {db_err}") from db_err return { "status": "completed", "run_id": run_id, "task_id": task_id, "output_cid": cached_file.cid, "ipfs_cid": ipfs_cid, "output_path": str(cached_file.path), # IPFS HLS streaming info "ipfs_playlist_cid": ipfs_playlist_cid, "ipfs_playlist_url": ipfs_playlist_url, "ipfs_segment_count": len(segment_cids), } else: # Update pending run status to failed - reuse module loop import asyncio import database try: if _resolve_loop is None or _resolve_loop.is_closed(): _resolve_loop = asyncio.new_event_loop() asyncio.set_event_loop(_resolve_loop) _db_initialized = False if not _db_initialized: _resolve_loop.run_until_complete(database.init_db()) _db_initialized = True _resolve_loop.run_until_complete(database.update_pending_run_status( run_id=run_id, status="failed", error="Output file not created", )) except Exception as db_err: logger.warning(f"Failed to update run status: {db_err}") return { "status": "failed", "run_id": run_id, "task_id": task_id, "error": "Output file not created", } except Exception as e: logger.error(f"Stream task {task_id} failed: {e}") import traceback traceback.print_exc() # Update pending run status to failed - reuse module loop import asyncio import database try: if _resolve_loop is None or _resolve_loop.is_closed(): _resolve_loop = asyncio.new_event_loop() asyncio.set_event_loop(_resolve_loop) _db_initialized = False if not _db_initialized: _resolve_loop.run_until_complete(database.init_db()) _db_initialized = True _resolve_loop.run_until_complete(database.update_pending_run_status( run_id=run_id, status="failed", error=str(e), )) except Exception as db_err: logger.warning(f"Failed to update run status: {db_err}") return { "status": "failed", "run_id": run_id, "task_id": task_id, "error": str(e), } finally: # Cleanup temp directory only - NOT the streaming directory! # The streaming directory contains HLS segments that may still be uploading # to IPFS. Deleting it prematurely causes upload failures and missing segments. # # stream_dir cleanup should happen via: # 1. A separate cleanup task that runs after confirming IPFS uploads succeeded # 2. Or a periodic cleanup job that removes old streaming dirs import shutil if work_dir.exists(): shutil.rmtree(work_dir, ignore_errors=True) # NOTE: stream_dir is intentionally NOT deleted here to allow IPFS uploads to complete # TODO: Implement a deferred cleanup mechanism for stream_dir after IPFS confirmation