diff --git a/app/routers/runs.py b/app/routers/runs.py index 6559fdd..36beb23 100644 --- a/app/routers/runs.py +++ b/app/routers/runs.py @@ -1230,10 +1230,10 @@ async def serve_hls_content( @router.get("/{run_id}/playlist.m3u8") async def get_playlist(run_id: str, request: Request): - """Get live HLS playlist for a streaming run. + """Get live HLS master playlist for a streaming run. - Returns the latest playlist content directly, allowing HLS players - to poll this URL for updates without dealing with changing IPFS CIDs. + For multi-resolution streams: generates a master playlist with DYNAMIC quality URLs. + For single-resolution streams: returns the playlist directly from IPFS. """ import database import os @@ -1246,24 +1246,112 @@ async def get_playlist(run_id: str, request: Request): if not pending: raise HTTPException(404, "Run not found") - ipfs_playlist_cid = pending.get("ipfs_playlist_cid") - if not ipfs_playlist_cid: - raise HTTPException(404, "Playlist not yet available") + quality_playlists = pending.get("quality_playlists") + + # Multi-resolution stream: generate master playlist with dynamic quality URLs + if quality_playlists: + lines = ["#EXTM3U", "#EXT-X-VERSION:3"] + + for name, info in quality_playlists.items(): + if not info.get("cid"): + continue + + lines.append( + f"#EXT-X-STREAM-INF:BANDWIDTH={info['bitrate'] * 1000}," + f"RESOLUTION={info['width']}x{info['height']}," + f"NAME=\"{name}\"" + ) + # Use dynamic URL that fetches latest CID from database + lines.append(f"/runs/{run_id}/quality/{name}/playlist.m3u8") + + if len(lines) <= 2: + raise HTTPException(404, "No quality playlists available") + + playlist_content = "\n".join(lines) + "\n" + + else: + # Single-resolution stream: fetch directly from IPFS + ipfs_playlist_cid = pending.get("ipfs_playlist_cid") + if not ipfs_playlist_cid: + raise HTTPException(404, "HLS playlist not created - rendering likely failed") + + ipfs_api = os.environ.get("IPFS_API_URL", "http://celery_ipfs:5001") + + try: + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.post(f"{ipfs_api}/api/v0/cat?arg={ipfs_playlist_cid}") + if resp.status_code != 200: + raise HTTPException(502, "Failed to fetch playlist from IPFS") + playlist_content = resp.text + except httpx.RequestError as e: + raise HTTPException(502, f"IPFS error: {e}") + + # Rewrite IPFS URLs to use our proxy endpoint + import re + gateway = os.environ.get("IPFS_GATEWAY_URL", "https://celery-artdag.rose-ash.com/ipfs") + + playlist_content = re.sub( + rf'{re.escape(gateway)}/([A-Za-z0-9]+)', + rf'/runs/{run_id}/ipfs-proxy/\1', + playlist_content + ) + playlist_content = re.sub( + r'/ipfs(?:-ts)?/([A-Za-z0-9]+)', + rf'/runs/{run_id}/ipfs-proxy/\1', + playlist_content + ) + + return Response( + content=playlist_content, + media_type="application/vnd.apple.mpegurl", + headers={ + "Cache-Control": "no-cache, no-store, must-revalidate", + "Pragma": "no-cache", + "Expires": "0", + "Access-Control-Allow-Origin": "*", + } + ) + + +@router.get("/{run_id}/quality/{quality}/playlist.m3u8") +async def get_quality_playlist(run_id: str, quality: str, request: Request): + """Get quality-level HLS playlist for a streaming run. + + Fetches the LATEST CID for this quality from the database, + so HLS.js always gets updated content. + """ + import database + import os + import httpx + from fastapi.responses import Response + + await database.init_db() + + pending = await database.get_pending_run(run_id) + if not pending: + raise HTTPException(404, "Run not found") + + quality_playlists = pending.get("quality_playlists") + if not quality_playlists or quality not in quality_playlists: + raise HTTPException(404, f"Quality '{quality}' not found") + + quality_cid = quality_playlists[quality].get("cid") + if not quality_cid: + raise HTTPException(404, f"Quality '{quality}' playlist not ready") # Fetch playlist from local IPFS node ipfs_api = os.environ.get("IPFS_API_URL", "http://celery_ipfs:5001") try: async with httpx.AsyncClient(timeout=10.0) as client: - resp = await client.post(f"{ipfs_api}/api/v0/cat?arg={ipfs_playlist_cid}") + resp = await client.post(f"{ipfs_api}/api/v0/cat?arg={quality_cid}") if resp.status_code != 200: - raise HTTPException(502, "Failed to fetch playlist from IPFS") + raise HTTPException(502, f"Failed to fetch quality playlist from IPFS: {quality_cid}") playlist_content = resp.text except httpx.RequestError as e: raise HTTPException(502, f"IPFS error: {e}") - # Rewrite IPFS URLs to use our proxy endpoint so HLS.js polls us instead of static IPFS - # This handles both /ipfs/{cid} and https://gateway/ipfs/{cid} patterns + # Rewrite segment URLs to use our proxy (segments are still static IPFS content) import re gateway = os.environ.get("IPFS_GATEWAY_URL", "https://celery-artdag.rose-ash.com/ipfs") @@ -1273,9 +1361,9 @@ async def get_playlist(run_id: str, request: Request): rf'/runs/{run_id}/ipfs-proxy/\1', playlist_content ) - # Also handle /ipfs/ paths + # Also handle /ipfs/ paths and /ipfs-ts/ paths playlist_content = re.sub( - r'/ipfs/([A-Za-z0-9]+)', + r'/ipfs(?:-ts)?/([A-Za-z0-9]+)', rf'/runs/{run_id}/ipfs-proxy/\1', playlist_content ) diff --git a/database.py b/database.py index e18e1c6..3da697d 100644 --- a/database.py +++ b/database.py @@ -95,6 +95,7 @@ CREATE TABLE IF NOT EXISTS pending_runs ( actor_id VARCHAR(255), error TEXT, ipfs_playlist_cid VARCHAR(128), -- For streaming: IPFS CID of HLS playlist + quality_playlists JSONB, -- For streaming: quality-level playlist CIDs {quality_name: {cid, width, height, bitrate}} created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ); @@ -106,6 +107,10 @@ BEGIN WHERE table_name = 'pending_runs' AND column_name = 'ipfs_playlist_cid') THEN ALTER TABLE pending_runs ADD COLUMN ipfs_playlist_cid VARCHAR(128); END IF; + IF NOT EXISTS (SELECT 1 FROM information_schema.columns + WHERE table_name = 'pending_runs' AND column_name = 'quality_playlists') THEN + ALTER TABLE pending_runs ADD COLUMN quality_playlists JSONB; + END IF; END $$; CREATE INDEX IF NOT EXISTS idx_pending_runs_status ON pending_runs(status); @@ -1525,7 +1530,7 @@ async def get_pending_run(run_id: str) -> Optional[dict]: async with pool.acquire() as conn: row = await conn.fetchrow( """ - SELECT run_id, celery_task_id, status, recipe, inputs, dag_json, plan_cid, output_name, actor_id, error, ipfs_playlist_cid, created_at, updated_at + SELECT run_id, celery_task_id, status, recipe, inputs, dag_json, plan_cid, output_name, actor_id, error, ipfs_playlist_cid, quality_playlists, created_at, updated_at FROM pending_runs WHERE run_id = $1 """, run_id @@ -1535,6 +1540,10 @@ async def get_pending_run(run_id: str) -> Optional[dict]: inputs = row["inputs"] if isinstance(inputs, str): inputs = _json.loads(inputs) + # Parse quality_playlists if it's a string + quality_playlists = row.get("quality_playlists") + if isinstance(quality_playlists, str): + quality_playlists = _json.loads(quality_playlists) return { "run_id": row["run_id"], "celery_task_id": row["celery_task_id"], @@ -1547,6 +1556,7 @@ async def get_pending_run(run_id: str) -> Optional[dict]: "actor_id": row["actor_id"], "error": row["error"], "ipfs_playlist_cid": row["ipfs_playlist_cid"], + "quality_playlists": quality_playlists, "created_at": row["created_at"].isoformat() if row["created_at"] else None, "updated_at": row["updated_at"].isoformat() if row["updated_at"] else None, } @@ -1632,15 +1642,27 @@ async def update_pending_run_plan(run_id: str, plan_cid: str) -> bool: return "UPDATE 1" in result -async def update_pending_run_playlist(run_id: str, ipfs_playlist_cid: str) -> bool: - """Update the IPFS playlist CID of a streaming run.""" +async def update_pending_run_playlist(run_id: str, ipfs_playlist_cid: str, quality_playlists: Optional[dict] = None) -> bool: + """Update the IPFS playlist CID of a streaming run. + + Args: + run_id: The run ID + ipfs_playlist_cid: Master playlist CID + quality_playlists: Dict of quality name -> {cid, width, height, bitrate} + """ if pool is None: raise RuntimeError("Database pool not initialized - call init_db() first") async with pool.acquire() as conn: - result = await conn.execute( - "UPDATE pending_runs SET ipfs_playlist_cid = $2, updated_at = NOW() WHERE run_id = $1", - run_id, ipfs_playlist_cid - ) + if quality_playlists: + result = await conn.execute( + "UPDATE pending_runs SET ipfs_playlist_cid = $2, quality_playlists = $3, updated_at = NOW() WHERE run_id = $1", + run_id, ipfs_playlist_cid, _json.dumps(quality_playlists) + ) + else: + result = await conn.execute( + "UPDATE pending_runs SET ipfs_playlist_cid = $2, updated_at = NOW() WHERE run_id = $1", + run_id, ipfs_playlist_cid + ) return "UPDATE 1" in result diff --git a/streaming/multi_res_output.py b/streaming/multi_res_output.py index f5d3111..722b677 100644 --- a/streaming/multi_res_output.py +++ b/streaming/multi_res_output.py @@ -410,7 +410,18 @@ class MultiResolutionHLSOutput: print(f"[MultiResHLS] Master playlist: {cid}", file=sys.stderr) if self._on_playlist_update: - self._on_playlist_update(cid) + # Pass both master CID and quality info for dynamic playlist generation + quality_info = { + name: { + "cid": q.playlist_cid, + "width": q.width, + "height": q.height, + "bitrate": q.bitrate, + } + for name, q in self.qualities.items() + if q.playlist_cid + } + self._on_playlist_update(cid, quality_info) def close(self): """Close all encoders and finalize output.""" diff --git a/tasks/streaming.py b/tasks/streaming.py index f925bc4..78aeabc 100644 --- a/tasks/streaming.py +++ b/tasks/streaming.py @@ -334,7 +334,13 @@ def run_stream( task_logger.warning(f"DEBUG: streaming:make-video-source is now: {type(interp.primitives.get('streaming:make-video-source'))}") # Set up callback to update database when IPFS playlist is created (for live HLS redirect) - def on_playlist_update(playlist_cid): + def on_playlist_update(playlist_cid, quality_playlists=None): + """Update database with playlist CID and quality info. + + Args: + playlist_cid: Master playlist CID + quality_playlists: Dict of quality name -> {cid, width, height, bitrate} + """ global _resolve_loop, _db_initialized import asyncio import database @@ -346,8 +352,8 @@ def run_stream( if not _db_initialized: _resolve_loop.run_until_complete(database.init_db()) _db_initialized = True - _resolve_loop.run_until_complete(database.update_pending_run_playlist(run_id, playlist_cid)) - logger.info(f"Updated pending run {run_id} with IPFS playlist: {playlist_cid}") + _resolve_loop.run_until_complete(database.update_pending_run_playlist(run_id, playlist_cid, quality_playlists)) + logger.info(f"Updated pending run {run_id} with IPFS playlist: {playlist_cid}, qualities: {list(quality_playlists.keys()) if quality_playlists else []}") except Exception as e: logger.error(f"Failed to update playlist CID in database: {e}")