409 lines
13 KiB
Python
409 lines
13 KiB
Python
"""
|
|
Streaming video rendering task.
|
|
|
|
Executes S-expression recipes for frame-by-frame video processing.
|
|
Supports CID and friendly name references for assets.
|
|
"""
|
|
|
|
import hashlib
|
|
import logging
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
from typing import Dict, Optional
|
|
|
|
from celery import current_task
|
|
|
|
# Add parent directory to path for imports
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from celery_app import app
|
|
from cache_manager import get_cache_manager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# Module-level event loop for database operations
|
|
_resolve_loop = None
|
|
_db_initialized = False
|
|
|
|
|
|
def resolve_asset(ref: str, actor_id: Optional[str] = None) -> Optional[Path]:
|
|
"""
|
|
Resolve an asset reference (CID or friendly name) to a file path.
|
|
|
|
Args:
|
|
ref: CID or friendly name (e.g., "my-video" or "QmXyz...")
|
|
actor_id: User ID for friendly name resolution
|
|
|
|
Returns:
|
|
Path to the asset file, or None if not found
|
|
"""
|
|
global _resolve_loop, _db_initialized
|
|
cache_mgr = get_cache_manager()
|
|
|
|
# Try as direct CID first
|
|
path = cache_mgr.get_by_cid(ref)
|
|
if path and path.exists():
|
|
logger.info(f"Resolved {ref[:16]}... as CID to {path}")
|
|
return path
|
|
|
|
# Try as friendly name if actor_id provided
|
|
if actor_id:
|
|
import asyncio
|
|
import database
|
|
from database import resolve_friendly_name
|
|
|
|
try:
|
|
# Reuse event loop for database operations
|
|
if _resolve_loop is None or _resolve_loop.is_closed():
|
|
_resolve_loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(_resolve_loop)
|
|
_db_initialized = False
|
|
|
|
# Initialize database pool once per loop
|
|
if not _db_initialized:
|
|
_resolve_loop.run_until_complete(database.init_db())
|
|
_db_initialized = True
|
|
|
|
cid = _resolve_loop.run_until_complete(resolve_friendly_name(actor_id, ref))
|
|
|
|
if cid:
|
|
path = cache_mgr.get_by_cid(cid)
|
|
if path and path.exists():
|
|
logger.info(f"Resolved '{ref}' via friendly name to {path}")
|
|
return path
|
|
except Exception as e:
|
|
logger.warning(f"Failed to resolve friendly name '{ref}': {e}")
|
|
|
|
logger.warning(f"Could not resolve asset reference: {ref}")
|
|
return None
|
|
|
|
|
|
class CIDVideoSource:
|
|
"""
|
|
Video source that resolves CIDs to file paths.
|
|
|
|
Wraps the streaming VideoSource to work with cached assets.
|
|
"""
|
|
|
|
def __init__(self, cid: str, fps: float = 30, actor_id: Optional[str] = None):
|
|
self.cid = cid
|
|
self.fps = fps
|
|
self.actor_id = actor_id
|
|
self._source = None
|
|
|
|
def _ensure_source(self):
|
|
if self._source is None:
|
|
logger.info(f"CIDVideoSource._ensure_source: resolving cid={self.cid} with actor_id={self.actor_id}")
|
|
path = resolve_asset(self.cid, self.actor_id)
|
|
if not path:
|
|
raise ValueError(f"Could not resolve video source '{self.cid}' for actor_id={self.actor_id}")
|
|
|
|
logger.info(f"CIDVideoSource._ensure_source: resolved to path={path}")
|
|
from streaming.stream_sexp_generic import VideoSource
|
|
# Import from primitives where VideoSource is defined
|
|
from sexp_effects.primitive_libs.streaming import VideoSource
|
|
self._source = VideoSource(str(path), self.fps)
|
|
|
|
def read_at(self, t: float):
|
|
self._ensure_source()
|
|
return self._source.read_at(t)
|
|
|
|
def read(self):
|
|
self._ensure_source()
|
|
return self._source.read()
|
|
|
|
@property
|
|
def size(self):
|
|
self._ensure_source()
|
|
return self._source.size
|
|
|
|
@property
|
|
def duration(self):
|
|
self._ensure_source()
|
|
return self._source._duration
|
|
|
|
def close(self):
|
|
if self._source:
|
|
self._source.close()
|
|
|
|
|
|
class CIDAudioAnalyzer:
|
|
"""
|
|
Audio analyzer that resolves CIDs to file paths.
|
|
"""
|
|
|
|
def __init__(self, cid: str, actor_id: Optional[str] = None):
|
|
self.cid = cid
|
|
self.actor_id = actor_id
|
|
self._analyzer = None
|
|
|
|
def _ensure_analyzer(self):
|
|
if self._analyzer is None:
|
|
path = resolve_asset(self.cid, self.actor_id)
|
|
if not path:
|
|
raise ValueError(f"Could not resolve audio source: {self.cid}")
|
|
|
|
from sexp_effects.primitive_libs.streaming import AudioAnalyzer
|
|
self._analyzer = AudioAnalyzer(str(path))
|
|
|
|
def get_energy(self, t: float) -> float:
|
|
self._ensure_analyzer()
|
|
return self._analyzer.get_energy(t)
|
|
|
|
def get_beat(self, t: float) -> bool:
|
|
self._ensure_analyzer()
|
|
return self._analyzer.get_beat(t)
|
|
|
|
def get_beat_count(self, t: float) -> int:
|
|
self._ensure_analyzer()
|
|
return self._analyzer.get_beat_count(t)
|
|
|
|
@property
|
|
def duration(self):
|
|
self._ensure_analyzer()
|
|
return self._analyzer.duration
|
|
|
|
|
|
def create_cid_primitives(actor_id: Optional[str] = None):
|
|
"""
|
|
Create CID-aware primitive functions.
|
|
|
|
Returns dict of primitives that resolve CIDs before creating sources.
|
|
"""
|
|
def prim_make_video_source_cid(cid: str, fps: float = 30):
|
|
logger.info(f"CID-aware make-video-source called: cid={cid}, fps={fps}, actor_id={actor_id}")
|
|
return CIDVideoSource(cid, fps, actor_id)
|
|
|
|
def prim_make_audio_analyzer_cid(cid: str):
|
|
logger.info(f"CID-aware make-audio-analyzer called: cid={cid}, actor_id={actor_id}")
|
|
return CIDAudioAnalyzer(cid, actor_id)
|
|
|
|
return {
|
|
'streaming:make-video-source': prim_make_video_source_cid,
|
|
'streaming:make-audio-analyzer': prim_make_audio_analyzer_cid,
|
|
}
|
|
|
|
|
|
@app.task(bind=True, name='tasks.run_stream')
|
|
def run_stream(
|
|
self,
|
|
run_id: str,
|
|
recipe_sexp: str,
|
|
output_name: str = "output.mp4",
|
|
duration: Optional[float] = None,
|
|
fps: Optional[float] = None,
|
|
actor_id: Optional[str] = None,
|
|
sources_sexp: Optional[str] = None,
|
|
audio_sexp: Optional[str] = None,
|
|
) -> dict:
|
|
"""
|
|
Execute a streaming S-expression recipe.
|
|
|
|
Args:
|
|
run_id: The run ID for database tracking
|
|
recipe_sexp: The recipe S-expression content
|
|
output_name: Name for the output file
|
|
duration: Optional duration override (seconds)
|
|
fps: Optional FPS override
|
|
actor_id: User ID for friendly name resolution
|
|
sources_sexp: Optional sources config S-expression
|
|
audio_sexp: Optional audio config S-expression
|
|
|
|
Returns:
|
|
Dict with output_cid, output_path, and status
|
|
"""
|
|
task_id = self.request.id
|
|
logger.info(f"Starting stream task {task_id} for run {run_id}")
|
|
|
|
self.update_state(state='INITIALIZING', meta={'progress': 0})
|
|
|
|
# Get the app directory for primitive/effect paths
|
|
app_dir = Path(__file__).parent.parent # celery/
|
|
sexp_effects_dir = app_dir / "sexp_effects"
|
|
effects_dir = app_dir / "effects"
|
|
templates_dir = app_dir / "templates"
|
|
|
|
# Create temp directory for work
|
|
work_dir = Path(tempfile.mkdtemp(prefix="stream_"))
|
|
recipe_path = work_dir / "recipe.sexp"
|
|
output_path = work_dir / output_name
|
|
|
|
# Create symlinks to effect directories so relative paths work
|
|
(work_dir / "sexp_effects").symlink_to(sexp_effects_dir)
|
|
(work_dir / "effects").symlink_to(effects_dir)
|
|
(work_dir / "templates").symlink_to(templates_dir)
|
|
|
|
try:
|
|
# Write recipe to temp file
|
|
recipe_path.write_text(recipe_sexp)
|
|
|
|
# Write optional config files
|
|
sources_path = None
|
|
if sources_sexp:
|
|
sources_path = work_dir / "sources.sexp"
|
|
sources_path.write_text(sources_sexp)
|
|
|
|
audio_path = None
|
|
if audio_sexp:
|
|
audio_path = work_dir / "audio.sexp"
|
|
audio_path.write_text(audio_sexp)
|
|
|
|
self.update_state(state='RENDERING', meta={'progress': 5})
|
|
|
|
# Import the streaming interpreter
|
|
from streaming.stream_sexp_generic import StreamInterpreter
|
|
|
|
# Create interpreter (pass actor_id for friendly name resolution)
|
|
interp = StreamInterpreter(str(recipe_path), actor_id=actor_id)
|
|
|
|
# Set primitive library directory explicitly
|
|
interp.primitive_lib_dir = sexp_effects_dir / "primitive_libs"
|
|
|
|
if fps:
|
|
interp.config['fps'] = fps
|
|
if sources_path:
|
|
interp.sources_config = sources_path
|
|
if audio_path:
|
|
interp.audio_config = audio_path
|
|
|
|
# Override primitives with CID-aware versions
|
|
cid_prims = create_cid_primitives(actor_id)
|
|
interp.primitives.update(cid_prims)
|
|
|
|
# Run rendering to file
|
|
logger.info(f"Rendering to {output_path}")
|
|
interp.run(duration=duration, output=str(output_path))
|
|
|
|
# Check for interpreter errors
|
|
if interp.errors:
|
|
error_msg = f"Rendering failed with {len(interp.errors)} errors: {interp.errors[0]}"
|
|
raise RuntimeError(error_msg)
|
|
|
|
self.update_state(state='CACHING', meta={'progress': 90})
|
|
|
|
# Validate output file (must be > 1KB to have actual frames)
|
|
if output_path.exists() and output_path.stat().st_size < 1024:
|
|
raise RuntimeError(f"Output file is too small ({output_path.stat().st_size} bytes) - rendering likely failed")
|
|
|
|
# Store output in cache
|
|
if output_path.exists():
|
|
cache_mgr = get_cache_manager()
|
|
cached_file, ipfs_cid = cache_mgr.put(
|
|
source_path=output_path,
|
|
node_type="STREAM_OUTPUT",
|
|
node_id=f"stream_{task_id}",
|
|
)
|
|
|
|
logger.info(f"Stream output cached: CID={cached_file.cid}, IPFS={ipfs_cid}")
|
|
|
|
# Save to database
|
|
import asyncio
|
|
import database
|
|
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
# Initialize database pool if needed
|
|
if database.pool is None:
|
|
loop.run_until_complete(database.init_db())
|
|
|
|
# Get recipe CID from pending_run
|
|
pending = loop.run_until_complete(database.get_pending_run(run_id))
|
|
recipe_cid = pending.get("recipe", "streaming") if pending else "streaming"
|
|
|
|
# Save to run_cache for completed runs
|
|
logger.info(f"Saving run {run_id} to run_cache with actor_id={actor_id}")
|
|
loop.run_until_complete(database.save_run_cache(
|
|
run_id=run_id,
|
|
output_cid=cached_file.cid,
|
|
recipe=recipe_cid,
|
|
inputs=[],
|
|
ipfs_cid=ipfs_cid,
|
|
actor_id=actor_id,
|
|
))
|
|
# Update pending run status
|
|
loop.run_until_complete(database.update_pending_run_status(
|
|
run_id=run_id,
|
|
status="completed",
|
|
))
|
|
logger.info(f"Saved run {run_id} to database with actor_id={actor_id}")
|
|
except Exception as db_err:
|
|
logger.warning(f"Failed to save run to database: {db_err}")
|
|
finally:
|
|
loop.close()
|
|
|
|
return {
|
|
"status": "completed",
|
|
"run_id": run_id,
|
|
"task_id": task_id,
|
|
"output_cid": cached_file.cid,
|
|
"ipfs_cid": ipfs_cid,
|
|
"output_path": str(cached_file.path),
|
|
}
|
|
else:
|
|
# Update pending run status to failed
|
|
import asyncio
|
|
import database
|
|
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
if database.pool is None:
|
|
loop.run_until_complete(database.init_db())
|
|
loop.run_until_complete(database.update_pending_run_status(
|
|
run_id=run_id,
|
|
status="failed",
|
|
error="Output file not created",
|
|
))
|
|
except Exception as db_err:
|
|
logger.warning(f"Failed to update run status: {db_err}")
|
|
finally:
|
|
loop.close()
|
|
|
|
return {
|
|
"status": "failed",
|
|
"run_id": run_id,
|
|
"task_id": task_id,
|
|
"error": "Output file not created",
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Stream task {task_id} failed: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
# Update pending run status to failed
|
|
import asyncio
|
|
import database
|
|
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
if database.pool is None:
|
|
loop.run_until_complete(database.init_db())
|
|
loop.run_until_complete(database.update_pending_run_status(
|
|
run_id=run_id,
|
|
status="failed",
|
|
error=str(e),
|
|
))
|
|
except Exception as db_err:
|
|
logger.warning(f"Failed to update run status: {db_err}")
|
|
finally:
|
|
loop.close()
|
|
|
|
return {
|
|
"status": "failed",
|
|
"run_id": run_id,
|
|
"task_id": task_id,
|
|
"error": str(e),
|
|
}
|
|
|
|
finally:
|
|
# Cleanup temp directory
|
|
import shutil
|
|
if work_dir.exists():
|
|
shutil.rmtree(work_dir, ignore_errors=True)
|