- Add on_playlist_update callback to IPFSHLSOutput - Pass callback through StreamInterpreter to output - Update database with playlist CID as segments are created - Enables live HLS redirect to IPFS before rendering completes Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
562 lines
21 KiB
Python
562 lines
21 KiB
Python
"""
|
|
Streaming video rendering task.
|
|
|
|
Executes S-expression recipes for frame-by-frame video processing.
|
|
Supports CID and friendly name references for assets.
|
|
"""
|
|
|
|
import hashlib
|
|
import logging
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import Dict, Optional
|
|
|
|
from celery import current_task
|
|
|
|
# Add parent directory to path for imports
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from celery_app import app
|
|
from cache_manager import get_cache_manager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Debug: verify module is being loaded
|
|
print(f"DEBUG MODULE LOAD: tasks/streaming.py loaded at {__file__}", file=sys.stderr)
|
|
|
|
# Module-level event loop for database operations
|
|
_resolve_loop = None
|
|
_db_initialized = False
|
|
|
|
|
|
def resolve_asset(ref: str, actor_id: Optional[str] = None) -> Optional[Path]:
|
|
"""
|
|
Resolve an asset reference (CID or friendly name) to a file path.
|
|
|
|
Args:
|
|
ref: CID or friendly name (e.g., "my-video" or "QmXyz...")
|
|
actor_id: User ID for friendly name resolution
|
|
|
|
Returns:
|
|
Path to the asset file, or None if not found
|
|
"""
|
|
global _resolve_loop, _db_initialized
|
|
import sys
|
|
print(f"RESOLVE_ASSET: ref={ref}, actor_id={actor_id}", file=sys.stderr)
|
|
cache_mgr = get_cache_manager()
|
|
|
|
# Try as direct CID first
|
|
path = cache_mgr.get_by_cid(ref)
|
|
print(f"RESOLVE_ASSET: get_by_cid({ref}) = {path}", file=sys.stderr)
|
|
if path and path.exists():
|
|
logger.info(f"Resolved {ref[:16]}... as CID to {path}")
|
|
return path
|
|
|
|
# Try as friendly name if actor_id provided
|
|
print(f"RESOLVE_ASSET: trying friendly name lookup, actor_id={actor_id}", file=sys.stderr)
|
|
if actor_id:
|
|
import asyncio
|
|
import database
|
|
from database import resolve_friendly_name
|
|
|
|
try:
|
|
# Reuse event loop for database operations
|
|
if _resolve_loop is None or _resolve_loop.is_closed():
|
|
_resolve_loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(_resolve_loop)
|
|
_db_initialized = False
|
|
|
|
# Initialize database pool once per loop
|
|
if not _db_initialized:
|
|
_resolve_loop.run_until_complete(database.init_db())
|
|
_db_initialized = True
|
|
|
|
cid = _resolve_loop.run_until_complete(resolve_friendly_name(actor_id, ref))
|
|
print(f"RESOLVE_ASSET: resolve_friendly_name({actor_id}, {ref}) = {cid}", file=sys.stderr)
|
|
|
|
if cid:
|
|
path = cache_mgr.get_by_cid(cid)
|
|
print(f"RESOLVE_ASSET: get_by_cid({cid}) = {path}", file=sys.stderr)
|
|
if path and path.exists():
|
|
print(f"RESOLVE_ASSET: SUCCESS - resolved to {path}", file=sys.stderr)
|
|
logger.info(f"Resolved '{ref}' via friendly name to {path}")
|
|
return path
|
|
|
|
# File not in local cache - try fetching from IPFS
|
|
# The CID from friendly_names is an IPFS CID
|
|
print(f"RESOLVE_ASSET: file not local, trying IPFS fetch for {cid}", file=sys.stderr)
|
|
import ipfs_client
|
|
content = ipfs_client.get_bytes(cid, use_gateway_fallback=True)
|
|
if content:
|
|
# Save to local cache
|
|
import tempfile
|
|
from pathlib import Path
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.sexp') as tmp:
|
|
tmp.write(content)
|
|
tmp_path = Path(tmp.name)
|
|
# Store in cache
|
|
cached_file, _ = cache_mgr.put(tmp_path, node_type="effect", skip_ipfs=True)
|
|
# Index by IPFS CID for future lookups
|
|
cache_mgr._set_content_index(cid, cached_file.cid)
|
|
print(f"RESOLVE_ASSET: fetched from IPFS and cached at {cached_file.path}", file=sys.stderr)
|
|
logger.info(f"Fetched '{ref}' from IPFS and cached at {cached_file.path}")
|
|
return cached_file.path
|
|
else:
|
|
print(f"RESOLVE_ASSET: IPFS fetch failed for {cid}", file=sys.stderr)
|
|
except Exception as e:
|
|
print(f"RESOLVE_ASSET: ERROR - {e}", file=sys.stderr)
|
|
logger.warning(f"Failed to resolve friendly name '{ref}': {e}")
|
|
|
|
logger.warning(f"Could not resolve asset reference: {ref}")
|
|
return None
|
|
|
|
|
|
class CIDVideoSource:
|
|
"""
|
|
Video source that resolves CIDs to file paths.
|
|
|
|
Wraps the streaming VideoSource to work with cached assets.
|
|
"""
|
|
|
|
def __init__(self, cid: str, fps: float = 30, actor_id: Optional[str] = None):
|
|
self.cid = cid
|
|
self.fps = fps
|
|
self.actor_id = actor_id
|
|
self._source = None
|
|
|
|
def _ensure_source(self):
|
|
if self._source is None:
|
|
logger.info(f"CIDVideoSource._ensure_source: resolving cid={self.cid} with actor_id={self.actor_id}")
|
|
path = resolve_asset(self.cid, self.actor_id)
|
|
if not path:
|
|
raise ValueError(f"Could not resolve video source '{self.cid}' for actor_id={self.actor_id}")
|
|
|
|
logger.info(f"CIDVideoSource._ensure_source: resolved to path={path}")
|
|
# Import from primitives where VideoSource is defined
|
|
from sexp_effects.primitive_libs.streaming import VideoSource
|
|
self._source = VideoSource(str(path), self.fps)
|
|
|
|
def read_at(self, t: float):
|
|
self._ensure_source()
|
|
return self._source.read_at(t)
|
|
|
|
def read(self):
|
|
self._ensure_source()
|
|
return self._source.read()
|
|
|
|
@property
|
|
def size(self):
|
|
self._ensure_source()
|
|
return self._source.size
|
|
|
|
@property
|
|
def duration(self):
|
|
self._ensure_source()
|
|
return self._source._duration
|
|
|
|
@property
|
|
def path(self):
|
|
self._ensure_source()
|
|
return self._source.path
|
|
|
|
@property
|
|
def _stream_time(self):
|
|
self._ensure_source()
|
|
return self._source._stream_time
|
|
|
|
def skip(self):
|
|
self._ensure_source()
|
|
return self._source.skip()
|
|
|
|
def close(self):
|
|
if self._source:
|
|
self._source.close()
|
|
|
|
|
|
class CIDAudioAnalyzer:
|
|
"""
|
|
Audio analyzer that resolves CIDs to file paths.
|
|
"""
|
|
|
|
def __init__(self, cid: str, actor_id: Optional[str] = None):
|
|
self.cid = cid
|
|
self.actor_id = actor_id
|
|
self._analyzer = None
|
|
|
|
def _ensure_analyzer(self):
|
|
if self._analyzer is None:
|
|
path = resolve_asset(self.cid, self.actor_id)
|
|
if not path:
|
|
raise ValueError(f"Could not resolve audio source: {self.cid}")
|
|
|
|
from sexp_effects.primitive_libs.streaming import AudioAnalyzer
|
|
self._analyzer = AudioAnalyzer(str(path))
|
|
|
|
def get_energy(self, t: float) -> float:
|
|
self._ensure_analyzer()
|
|
return self._analyzer.get_energy(t)
|
|
|
|
def get_beat(self, t: float) -> bool:
|
|
self._ensure_analyzer()
|
|
return self._analyzer.get_beat(t)
|
|
|
|
def get_beat_count(self, t: float) -> int:
|
|
self._ensure_analyzer()
|
|
return self._analyzer.get_beat_count(t)
|
|
|
|
@property
|
|
def duration(self):
|
|
self._ensure_analyzer()
|
|
return self._analyzer.duration
|
|
|
|
|
|
def create_cid_primitives(actor_id: Optional[str] = None):
|
|
"""
|
|
Create CID-aware primitive functions.
|
|
|
|
Returns dict of primitives that resolve CIDs before creating sources.
|
|
"""
|
|
from celery.utils.log import get_task_logger
|
|
cid_logger = get_task_logger(__name__)
|
|
def prim_make_video_source_cid(cid: str, fps: float = 30):
|
|
cid_logger.warning(f"DEBUG: CID-aware make-video-source: cid={cid}, actor_id={actor_id}")
|
|
return CIDVideoSource(cid, fps, actor_id)
|
|
|
|
def prim_make_audio_analyzer_cid(cid: str):
|
|
cid_logger.warning(f"DEBUG: CID-aware make-audio-analyzer: cid={cid}, actor_id={actor_id}")
|
|
return CIDAudioAnalyzer(cid, actor_id)
|
|
|
|
return {
|
|
'streaming:make-video-source': prim_make_video_source_cid,
|
|
'streaming:make-audio-analyzer': prim_make_audio_analyzer_cid,
|
|
}
|
|
|
|
|
|
@app.task(bind=True, name='tasks.run_stream')
|
|
def run_stream(
|
|
self,
|
|
run_id: str,
|
|
recipe_sexp: str,
|
|
output_name: str = "output.mp4",
|
|
duration: Optional[float] = None,
|
|
fps: Optional[float] = None,
|
|
actor_id: Optional[str] = None,
|
|
sources_sexp: Optional[str] = None,
|
|
audio_sexp: Optional[str] = None,
|
|
) -> dict:
|
|
"""
|
|
Execute a streaming S-expression recipe.
|
|
|
|
Args:
|
|
run_id: The run ID for database tracking
|
|
recipe_sexp: The recipe S-expression content
|
|
output_name: Name for the output file
|
|
duration: Optional duration override (seconds)
|
|
fps: Optional FPS override
|
|
actor_id: User ID for friendly name resolution
|
|
sources_sexp: Optional sources config S-expression
|
|
audio_sexp: Optional audio config S-expression
|
|
|
|
Returns:
|
|
Dict with output_cid, output_path, and status
|
|
"""
|
|
global _resolve_loop, _db_initialized
|
|
task_id = self.request.id
|
|
logger.info(f"Starting stream task {task_id} for run {run_id}")
|
|
|
|
self.update_state(state='INITIALIZING', meta={'progress': 0})
|
|
|
|
# Get the app directory for primitive/effect paths
|
|
app_dir = Path(__file__).parent.parent # celery/
|
|
sexp_effects_dir = app_dir / "sexp_effects"
|
|
effects_dir = app_dir / "effects"
|
|
templates_dir = app_dir / "templates"
|
|
|
|
# Create temp directory for work
|
|
work_dir = Path(tempfile.mkdtemp(prefix="stream_"))
|
|
recipe_path = work_dir / "recipe.sexp"
|
|
|
|
# Write output to shared cache for live streaming access
|
|
cache_dir = Path(os.environ.get("CACHE_DIR", "/data/cache"))
|
|
stream_dir = cache_dir / "streaming" / run_id
|
|
stream_dir.mkdir(parents=True, exist_ok=True)
|
|
# Use IPFS HLS output for distributed streaming - segments uploaded to IPFS
|
|
output_path = str(stream_dir) + "/ipfs-hls" # /ipfs-hls suffix triggers IPFS HLS mode
|
|
|
|
# Create symlinks to effect directories so relative paths work
|
|
(work_dir / "sexp_effects").symlink_to(sexp_effects_dir)
|
|
(work_dir / "effects").symlink_to(effects_dir)
|
|
(work_dir / "templates").symlink_to(templates_dir)
|
|
|
|
try:
|
|
# Write recipe to temp file
|
|
recipe_path.write_text(recipe_sexp)
|
|
|
|
# Write optional config files
|
|
sources_path = None
|
|
if sources_sexp:
|
|
sources_path = work_dir / "sources.sexp"
|
|
sources_path.write_text(sources_sexp)
|
|
|
|
audio_path = None
|
|
if audio_sexp:
|
|
audio_path = work_dir / "audio.sexp"
|
|
audio_path.write_text(audio_sexp)
|
|
|
|
self.update_state(state='RENDERING', meta={'progress': 5})
|
|
|
|
# Import the streaming interpreter
|
|
from streaming.stream_sexp_generic import StreamInterpreter
|
|
|
|
# Create interpreter (pass actor_id for friendly name resolution)
|
|
interp = StreamInterpreter(str(recipe_path), actor_id=actor_id)
|
|
|
|
# Set primitive library directory explicitly
|
|
interp.primitive_lib_dir = sexp_effects_dir / "primitive_libs"
|
|
|
|
if fps:
|
|
interp.config['fps'] = fps
|
|
if sources_path:
|
|
interp.sources_config = sources_path
|
|
if audio_path:
|
|
interp.audio_config = audio_path
|
|
|
|
# Override primitives with CID-aware versions
|
|
cid_prims = create_cid_primitives(actor_id)
|
|
from celery.utils.log import get_task_logger
|
|
task_logger = get_task_logger(__name__)
|
|
task_logger.warning(f"DEBUG: Overriding primitives: {list(cid_prims.keys())}")
|
|
task_logger.warning(f"DEBUG: Primitives before: {list(interp.primitives.keys())[:10]}...")
|
|
interp.primitives.update(cid_prims)
|
|
task_logger.warning(f"DEBUG: streaming:make-video-source is now: {type(interp.primitives.get('streaming:make-video-source'))}")
|
|
|
|
# Set up callback to update database when IPFS playlist is created (for live HLS redirect)
|
|
def on_playlist_update(playlist_cid):
|
|
global _resolve_loop, _db_initialized
|
|
import asyncio
|
|
import database
|
|
try:
|
|
if _resolve_loop is None or _resolve_loop.is_closed():
|
|
_resolve_loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(_resolve_loop)
|
|
_db_initialized = False
|
|
if not _db_initialized:
|
|
_resolve_loop.run_until_complete(database.init_db())
|
|
_db_initialized = True
|
|
_resolve_loop.run_until_complete(database.update_pending_run_playlist(run_id, playlist_cid))
|
|
logger.info(f"Updated pending run {run_id} with IPFS playlist: {playlist_cid}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to update playlist CID in database: {e}")
|
|
|
|
interp.on_playlist_update = on_playlist_update
|
|
|
|
# Run rendering to file
|
|
logger.info(f"Rendering to {output_path}")
|
|
interp.run(duration=duration, output=str(output_path))
|
|
|
|
# Check for interpreter errors
|
|
if interp.errors:
|
|
error_msg = f"Rendering failed with {len(interp.errors)} errors: {interp.errors[0]}"
|
|
raise RuntimeError(error_msg)
|
|
|
|
self.update_state(state='CACHING', meta={'progress': 90})
|
|
|
|
# Get IPFS playlist CID if available (from IPFSHLSOutput)
|
|
ipfs_playlist_cid = None
|
|
ipfs_playlist_url = None
|
|
segment_cids = {}
|
|
if hasattr(interp, 'output') and hasattr(interp.output, 'playlist_cid'):
|
|
ipfs_playlist_cid = interp.output.playlist_cid
|
|
ipfs_playlist_url = interp.output.playlist_url
|
|
segment_cids = getattr(interp.output, 'segment_cids', {})
|
|
logger.info(f"IPFS HLS: playlist={ipfs_playlist_cid}, segments={len(segment_cids)}")
|
|
|
|
# Update pending run with playlist CID for live HLS redirect
|
|
if ipfs_playlist_cid:
|
|
global _resolve_loop, _db_initialized
|
|
import asyncio
|
|
import database
|
|
try:
|
|
if _resolve_loop is None or _resolve_loop.is_closed():
|
|
_resolve_loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(_resolve_loop)
|
|
if not _db_initialized:
|
|
_resolve_loop.run_until_complete(database.init_db())
|
|
_db_initialized = True
|
|
_resolve_loop.run_until_complete(database.update_pending_run_playlist(run_id, ipfs_playlist_cid))
|
|
logger.info(f"Updated pending run {run_id} with IPFS playlist CID: {ipfs_playlist_cid}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to update pending run with playlist CID: {e}")
|
|
raise # Fail fast - database errors should not be silently ignored
|
|
|
|
# HLS output creates stream.m3u8 and segment_*.ts files in stream_dir
|
|
hls_playlist = stream_dir / "stream.m3u8"
|
|
|
|
# Validate HLS output (must have playlist and at least one segment)
|
|
if not hls_playlist.exists():
|
|
raise RuntimeError("HLS playlist not created - rendering likely failed")
|
|
|
|
segments = list(stream_dir.glob("segment_*.ts"))
|
|
if not segments:
|
|
raise RuntimeError("No HLS segments created - rendering likely failed")
|
|
|
|
logger.info(f"HLS rendering complete: {len(segments)} segments created, IPFS playlist: {ipfs_playlist_cid}")
|
|
|
|
# Mux HLS segments into a single MP4 for persistent cache storage
|
|
final_mp4 = stream_dir / "output.mp4"
|
|
import subprocess
|
|
mux_cmd = [
|
|
"ffmpeg", "-y",
|
|
"-i", str(hls_playlist),
|
|
"-c", "copy", # Just copy streams, no re-encoding
|
|
str(final_mp4)
|
|
]
|
|
logger.info(f"Muxing HLS to MP4: {' '.join(mux_cmd)}")
|
|
result = subprocess.run(mux_cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
logger.warning(f"HLS mux failed: {result.stderr}")
|
|
# Fall back to using the first segment for caching
|
|
final_mp4 = segments[0]
|
|
|
|
# Store output in cache
|
|
if final_mp4.exists():
|
|
cache_mgr = get_cache_manager()
|
|
cached_file, ipfs_cid = cache_mgr.put(
|
|
source_path=final_mp4,
|
|
node_type="STREAM_OUTPUT",
|
|
node_id=f"stream_{task_id}",
|
|
)
|
|
|
|
logger.info(f"Stream output cached: CID={cached_file.cid}, IPFS={ipfs_cid}")
|
|
|
|
# Save to database - reuse the module-level loop to avoid pool conflicts
|
|
import asyncio
|
|
import database
|
|
|
|
try:
|
|
# Reuse or create event loop
|
|
if _resolve_loop is None or _resolve_loop.is_closed():
|
|
_resolve_loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(_resolve_loop)
|
|
_db_initialized = False
|
|
|
|
# Initialize database pool if needed
|
|
if not _db_initialized:
|
|
_resolve_loop.run_until_complete(database.init_db())
|
|
_db_initialized = True
|
|
|
|
# Get recipe CID from pending_run
|
|
pending = _resolve_loop.run_until_complete(database.get_pending_run(run_id))
|
|
recipe_cid = pending.get("recipe", "streaming") if pending else "streaming"
|
|
|
|
# Save to run_cache for completed runs
|
|
logger.info(f"Saving run {run_id} to run_cache with actor_id={actor_id}")
|
|
_resolve_loop.run_until_complete(database.save_run_cache(
|
|
run_id=run_id,
|
|
output_cid=cached_file.cid,
|
|
recipe=recipe_cid,
|
|
inputs=[],
|
|
ipfs_cid=ipfs_cid,
|
|
actor_id=actor_id,
|
|
))
|
|
# Register output as video type so frontend displays it correctly
|
|
_resolve_loop.run_until_complete(database.add_item_type(
|
|
cid=cached_file.cid,
|
|
actor_id=actor_id,
|
|
item_type="video",
|
|
path=str(cached_file.path),
|
|
description=f"Stream output from run {run_id}",
|
|
))
|
|
logger.info(f"Registered output {cached_file.cid} as video type")
|
|
# Update pending run status
|
|
_resolve_loop.run_until_complete(database.update_pending_run_status(
|
|
run_id=run_id,
|
|
status="completed",
|
|
))
|
|
logger.info(f"Saved run {run_id} to database with actor_id={actor_id}")
|
|
except Exception as db_err:
|
|
logger.error(f"Failed to save run to database: {db_err}")
|
|
raise RuntimeError(f"Database error saving run {run_id}: {db_err}") from db_err
|
|
|
|
return {
|
|
"status": "completed",
|
|
"run_id": run_id,
|
|
"task_id": task_id,
|
|
"output_cid": cached_file.cid,
|
|
"ipfs_cid": ipfs_cid,
|
|
"output_path": str(cached_file.path),
|
|
# IPFS HLS streaming info
|
|
"ipfs_playlist_cid": ipfs_playlist_cid,
|
|
"ipfs_playlist_url": ipfs_playlist_url,
|
|
"ipfs_segment_count": len(segment_cids),
|
|
}
|
|
else:
|
|
# Update pending run status to failed - reuse module loop
|
|
import asyncio
|
|
import database
|
|
|
|
try:
|
|
if _resolve_loop is None or _resolve_loop.is_closed():
|
|
_resolve_loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(_resolve_loop)
|
|
_db_initialized = False
|
|
if not _db_initialized:
|
|
_resolve_loop.run_until_complete(database.init_db())
|
|
_db_initialized = True
|
|
_resolve_loop.run_until_complete(database.update_pending_run_status(
|
|
run_id=run_id,
|
|
status="failed",
|
|
error="Output file not created",
|
|
))
|
|
except Exception as db_err:
|
|
logger.warning(f"Failed to update run status: {db_err}")
|
|
|
|
return {
|
|
"status": "failed",
|
|
"run_id": run_id,
|
|
"task_id": task_id,
|
|
"error": "Output file not created",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Stream task {task_id} failed: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
# Update pending run status to failed - reuse module loop
|
|
import asyncio
|
|
import database
|
|
|
|
try:
|
|
if _resolve_loop is None or _resolve_loop.is_closed():
|
|
_resolve_loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(_resolve_loop)
|
|
_db_initialized = False
|
|
if not _db_initialized:
|
|
_resolve_loop.run_until_complete(database.init_db())
|
|
_db_initialized = True
|
|
_resolve_loop.run_until_complete(database.update_pending_run_status(
|
|
run_id=run_id,
|
|
status="failed",
|
|
error=str(e),
|
|
))
|
|
except Exception as db_err:
|
|
logger.warning(f"Failed to update run status: {db_err}")
|
|
|
|
return {
|
|
"status": "failed",
|
|
"run_id": run_id,
|
|
"task_id": task_id,
|
|
"error": str(e),
|
|
}
|
|
|
|
finally:
|
|
# Cleanup temp directory and streaming directory
|
|
import shutil
|
|
if work_dir.exists():
|
|
shutil.rmtree(work_dir, ignore_errors=True)
|
|
if stream_dir.exists():
|
|
shutil.rmtree(stream_dir, ignore_errors=True)
|