diff --git a/streaming/output.py b/streaming/output.py index 1fb5648..e8b0750 100644 --- a/streaming/output.py +++ b/streaming/output.py @@ -645,6 +645,7 @@ class IPFSHLSOutput(Output): preset: str = "fast", audio_source: str = None, ipfs_gateway: str = "https://ipfs.io/ipfs", + on_playlist_update: callable = None, ): self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) @@ -653,6 +654,7 @@ class IPFSHLSOutput(Output): self.segment_duration = segment_duration self.ipfs_gateway = ipfs_gateway.rstrip("/") self._is_open = True + self._on_playlist_update = on_playlist_update # Callback when playlist CID changes # Auto-detect NVENC if codec == "auto": @@ -792,6 +794,12 @@ class IPFSHLSOutput(Output): if cid: self._playlist_cid = cid print(f"IPFS: playlist updated -> {cid} ({len(self.segment_cids)} segments)", file=sys.stderr) + # Notify callback (e.g., to update database for live HLS redirect) + if self._on_playlist_update: + try: + self._on_playlist_update(cid) + except Exception as e: + print(f"IPFS: playlist callback error: {e}", file=sys.stderr) def write(self, frame: np.ndarray, t: float): """Write frame to HLS stream and upload segments to IPFS.""" diff --git a/streaming/stream_sexp_generic.py b/streaming/stream_sexp_generic.py index 424c252..4706712 100644 --- a/streaming/stream_sexp_generic.py +++ b/streaming/stream_sexp_generic.py @@ -85,6 +85,9 @@ class StreamInterpreter: # Error tracking self.errors: List[str] = [] + # Callback for live streaming (called when IPFS playlist is updated) + self.on_playlist_update: callable = None + def _resolve_name(self, name: str) -> Optional[Path]: """Resolve a friendly name to a file path using the naming service.""" try: @@ -906,7 +909,8 @@ class StreamInterpreter: hls_dir = output[:-9] # Remove /ipfs-hls suffix import os ipfs_gateway = os.environ.get("IPFS_GATEWAY_URL", "https://ipfs.io/ipfs") - out = IPFSHLSOutput(hls_dir, size=(w, h), fps=fps, audio_source=audio, ipfs_gateway=ipfs_gateway) + out = IPFSHLSOutput(hls_dir, size=(w, h), fps=fps, audio_source=audio, ipfs_gateway=ipfs_gateway, + on_playlist_update=self.on_playlist_update) else: out = FileOutput(output, size=(w, h), fps=fps, audio_source=audio) diff --git a/tasks/streaming.py b/tasks/streaming.py index 7241b51..42021b8 100644 --- a/tasks/streaming.py +++ b/tasks/streaming.py @@ -332,6 +332,26 @@ def run_stream( interp.primitives.update(cid_prims) task_logger.warning(f"DEBUG: streaming:make-video-source is now: {type(interp.primitives.get('streaming:make-video-source'))}") + # Set up callback to update database when IPFS playlist is created (for live HLS redirect) + def on_playlist_update(playlist_cid): + global _resolve_loop, _db_initialized + import asyncio + import database + try: + if _resolve_loop is None or _resolve_loop.is_closed(): + _resolve_loop = asyncio.new_event_loop() + asyncio.set_event_loop(_resolve_loop) + _db_initialized = False + if not _db_initialized: + _resolve_loop.run_until_complete(database.init_db()) + _db_initialized = True + _resolve_loop.run_until_complete(database.update_pending_run_playlist(run_id, playlist_cid)) + logger.info(f"Updated pending run {run_id} with IPFS playlist: {playlist_cid}") + except Exception as e: + logger.error(f"Failed to update playlist CID in database: {e}") + + interp.on_playlist_update = on_playlist_update + # Run rendering to file logger.info(f"Rendering to {output_path}") interp.run(duration=duration, output=str(output_path))