Fix live HLS streaming with dynamic quality playlist URLs
Some checks are pending
GPU Worker CI/CD / test (push) Waiting to run
GPU Worker CI/CD / deploy (push) Blocked by required conditions

The problem: HLS.js caches quality playlist URLs from the master playlist.
Even when we update the master playlist CID, HLS.js keeps polling the same
static quality CID URL, so it never sees new segments.

The fix:
- Store quality-level CIDs in database (quality_playlists JSONB column)
- Generate master playlist with dynamic URLs (/runs/{id}/quality/{name}/playlist.m3u8)
- Add quality endpoint that fetches LATEST CID from database
- HLS.js now polls our dynamic endpoints which return fresh content

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
giles
2026-02-04 21:07:29 +00:00
parent 4647dd52c8
commit 2f56ffc472
4 changed files with 150 additions and 23 deletions

View File

@@ -1230,10 +1230,10 @@ async def serve_hls_content(
@router.get("/{run_id}/playlist.m3u8") @router.get("/{run_id}/playlist.m3u8")
async def get_playlist(run_id: str, request: Request): async def get_playlist(run_id: str, request: Request):
"""Get live HLS playlist for a streaming run. """Get live HLS master playlist for a streaming run.
Returns the latest playlist content directly, allowing HLS players For multi-resolution streams: generates a master playlist with DYNAMIC quality URLs.
to poll this URL for updates without dealing with changing IPFS CIDs. For single-resolution streams: returns the playlist directly from IPFS.
""" """
import database import database
import os import os
@@ -1246,24 +1246,112 @@ async def get_playlist(run_id: str, request: Request):
if not pending: if not pending:
raise HTTPException(404, "Run not found") raise HTTPException(404, "Run not found")
ipfs_playlist_cid = pending.get("ipfs_playlist_cid") quality_playlists = pending.get("quality_playlists")
if not ipfs_playlist_cid:
raise HTTPException(404, "Playlist not yet available") # Multi-resolution stream: generate master playlist with dynamic quality URLs
if quality_playlists:
lines = ["#EXTM3U", "#EXT-X-VERSION:3"]
for name, info in quality_playlists.items():
if not info.get("cid"):
continue
lines.append(
f"#EXT-X-STREAM-INF:BANDWIDTH={info['bitrate'] * 1000},"
f"RESOLUTION={info['width']}x{info['height']},"
f"NAME=\"{name}\""
)
# Use dynamic URL that fetches latest CID from database
lines.append(f"/runs/{run_id}/quality/{name}/playlist.m3u8")
if len(lines) <= 2:
raise HTTPException(404, "No quality playlists available")
playlist_content = "\n".join(lines) + "\n"
else:
# Single-resolution stream: fetch directly from IPFS
ipfs_playlist_cid = pending.get("ipfs_playlist_cid")
if not ipfs_playlist_cid:
raise HTTPException(404, "HLS playlist not created - rendering likely failed")
ipfs_api = os.environ.get("IPFS_API_URL", "http://celery_ipfs:5001")
try:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.post(f"{ipfs_api}/api/v0/cat?arg={ipfs_playlist_cid}")
if resp.status_code != 200:
raise HTTPException(502, "Failed to fetch playlist from IPFS")
playlist_content = resp.text
except httpx.RequestError as e:
raise HTTPException(502, f"IPFS error: {e}")
# Rewrite IPFS URLs to use our proxy endpoint
import re
gateway = os.environ.get("IPFS_GATEWAY_URL", "https://celery-artdag.rose-ash.com/ipfs")
playlist_content = re.sub(
rf'{re.escape(gateway)}/([A-Za-z0-9]+)',
rf'/runs/{run_id}/ipfs-proxy/\1',
playlist_content
)
playlist_content = re.sub(
r'/ipfs(?:-ts)?/([A-Za-z0-9]+)',
rf'/runs/{run_id}/ipfs-proxy/\1',
playlist_content
)
return Response(
content=playlist_content,
media_type="application/vnd.apple.mpegurl",
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0",
"Access-Control-Allow-Origin": "*",
}
)
@router.get("/{run_id}/quality/{quality}/playlist.m3u8")
async def get_quality_playlist(run_id: str, quality: str, request: Request):
"""Get quality-level HLS playlist for a streaming run.
Fetches the LATEST CID for this quality from the database,
so HLS.js always gets updated content.
"""
import database
import os
import httpx
from fastapi.responses import Response
await database.init_db()
pending = await database.get_pending_run(run_id)
if not pending:
raise HTTPException(404, "Run not found")
quality_playlists = pending.get("quality_playlists")
if not quality_playlists or quality not in quality_playlists:
raise HTTPException(404, f"Quality '{quality}' not found")
quality_cid = quality_playlists[quality].get("cid")
if not quality_cid:
raise HTTPException(404, f"Quality '{quality}' playlist not ready")
# Fetch playlist from local IPFS node # Fetch playlist from local IPFS node
ipfs_api = os.environ.get("IPFS_API_URL", "http://celery_ipfs:5001") ipfs_api = os.environ.get("IPFS_API_URL", "http://celery_ipfs:5001")
try: try:
async with httpx.AsyncClient(timeout=10.0) as client: async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.post(f"{ipfs_api}/api/v0/cat?arg={ipfs_playlist_cid}") resp = await client.post(f"{ipfs_api}/api/v0/cat?arg={quality_cid}")
if resp.status_code != 200: if resp.status_code != 200:
raise HTTPException(502, "Failed to fetch playlist from IPFS") raise HTTPException(502, f"Failed to fetch quality playlist from IPFS: {quality_cid}")
playlist_content = resp.text playlist_content = resp.text
except httpx.RequestError as e: except httpx.RequestError as e:
raise HTTPException(502, f"IPFS error: {e}") raise HTTPException(502, f"IPFS error: {e}")
# Rewrite IPFS URLs to use our proxy endpoint so HLS.js polls us instead of static IPFS # Rewrite segment URLs to use our proxy (segments are still static IPFS content)
# This handles both /ipfs/{cid} and https://gateway/ipfs/{cid} patterns
import re import re
gateway = os.environ.get("IPFS_GATEWAY_URL", "https://celery-artdag.rose-ash.com/ipfs") gateway = os.environ.get("IPFS_GATEWAY_URL", "https://celery-artdag.rose-ash.com/ipfs")
@@ -1273,9 +1361,9 @@ async def get_playlist(run_id: str, request: Request):
rf'/runs/{run_id}/ipfs-proxy/\1', rf'/runs/{run_id}/ipfs-proxy/\1',
playlist_content playlist_content
) )
# Also handle /ipfs/ paths # Also handle /ipfs/ paths and /ipfs-ts/ paths
playlist_content = re.sub( playlist_content = re.sub(
r'/ipfs/([A-Za-z0-9]+)', r'/ipfs(?:-ts)?/([A-Za-z0-9]+)',
rf'/runs/{run_id}/ipfs-proxy/\1', rf'/runs/{run_id}/ipfs-proxy/\1',
playlist_content playlist_content
) )

View File

@@ -95,6 +95,7 @@ CREATE TABLE IF NOT EXISTS pending_runs (
actor_id VARCHAR(255), actor_id VARCHAR(255),
error TEXT, error TEXT,
ipfs_playlist_cid VARCHAR(128), -- For streaming: IPFS CID of HLS playlist ipfs_playlist_cid VARCHAR(128), -- For streaming: IPFS CID of HLS playlist
quality_playlists JSONB, -- For streaming: quality-level playlist CIDs {quality_name: {cid, width, height, bitrate}}
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
); );
@@ -106,6 +107,10 @@ BEGIN
WHERE table_name = 'pending_runs' AND column_name = 'ipfs_playlist_cid') THEN WHERE table_name = 'pending_runs' AND column_name = 'ipfs_playlist_cid') THEN
ALTER TABLE pending_runs ADD COLUMN ipfs_playlist_cid VARCHAR(128); ALTER TABLE pending_runs ADD COLUMN ipfs_playlist_cid VARCHAR(128);
END IF; END IF;
IF NOT EXISTS (SELECT 1 FROM information_schema.columns
WHERE table_name = 'pending_runs' AND column_name = 'quality_playlists') THEN
ALTER TABLE pending_runs ADD COLUMN quality_playlists JSONB;
END IF;
END $$; END $$;
CREATE INDEX IF NOT EXISTS idx_pending_runs_status ON pending_runs(status); CREATE INDEX IF NOT EXISTS idx_pending_runs_status ON pending_runs(status);
@@ -1525,7 +1530,7 @@ async def get_pending_run(run_id: str) -> Optional[dict]:
async with pool.acquire() as conn: async with pool.acquire() as conn:
row = await conn.fetchrow( row = await conn.fetchrow(
""" """
SELECT run_id, celery_task_id, status, recipe, inputs, dag_json, plan_cid, output_name, actor_id, error, ipfs_playlist_cid, created_at, updated_at SELECT run_id, celery_task_id, status, recipe, inputs, dag_json, plan_cid, output_name, actor_id, error, ipfs_playlist_cid, quality_playlists, created_at, updated_at
FROM pending_runs WHERE run_id = $1 FROM pending_runs WHERE run_id = $1
""", """,
run_id run_id
@@ -1535,6 +1540,10 @@ async def get_pending_run(run_id: str) -> Optional[dict]:
inputs = row["inputs"] inputs = row["inputs"]
if isinstance(inputs, str): if isinstance(inputs, str):
inputs = _json.loads(inputs) inputs = _json.loads(inputs)
# Parse quality_playlists if it's a string
quality_playlists = row.get("quality_playlists")
if isinstance(quality_playlists, str):
quality_playlists = _json.loads(quality_playlists)
return { return {
"run_id": row["run_id"], "run_id": row["run_id"],
"celery_task_id": row["celery_task_id"], "celery_task_id": row["celery_task_id"],
@@ -1547,6 +1556,7 @@ async def get_pending_run(run_id: str) -> Optional[dict]:
"actor_id": row["actor_id"], "actor_id": row["actor_id"],
"error": row["error"], "error": row["error"],
"ipfs_playlist_cid": row["ipfs_playlist_cid"], "ipfs_playlist_cid": row["ipfs_playlist_cid"],
"quality_playlists": quality_playlists,
"created_at": row["created_at"].isoformat() if row["created_at"] else None, "created_at": row["created_at"].isoformat() if row["created_at"] else None,
"updated_at": row["updated_at"].isoformat() if row["updated_at"] else None, "updated_at": row["updated_at"].isoformat() if row["updated_at"] else None,
} }
@@ -1632,15 +1642,27 @@ async def update_pending_run_plan(run_id: str, plan_cid: str) -> bool:
return "UPDATE 1" in result return "UPDATE 1" in result
async def update_pending_run_playlist(run_id: str, ipfs_playlist_cid: str) -> bool: async def update_pending_run_playlist(run_id: str, ipfs_playlist_cid: str, quality_playlists: Optional[dict] = None) -> bool:
"""Update the IPFS playlist CID of a streaming run.""" """Update the IPFS playlist CID of a streaming run.
Args:
run_id: The run ID
ipfs_playlist_cid: Master playlist CID
quality_playlists: Dict of quality name -> {cid, width, height, bitrate}
"""
if pool is None: if pool is None:
raise RuntimeError("Database pool not initialized - call init_db() first") raise RuntimeError("Database pool not initialized - call init_db() first")
async with pool.acquire() as conn: async with pool.acquire() as conn:
result = await conn.execute( if quality_playlists:
"UPDATE pending_runs SET ipfs_playlist_cid = $2, updated_at = NOW() WHERE run_id = $1", result = await conn.execute(
run_id, ipfs_playlist_cid "UPDATE pending_runs SET ipfs_playlist_cid = $2, quality_playlists = $3, updated_at = NOW() WHERE run_id = $1",
) run_id, ipfs_playlist_cid, _json.dumps(quality_playlists)
)
else:
result = await conn.execute(
"UPDATE pending_runs SET ipfs_playlist_cid = $2, updated_at = NOW() WHERE run_id = $1",
run_id, ipfs_playlist_cid
)
return "UPDATE 1" in result return "UPDATE 1" in result

View File

@@ -410,7 +410,18 @@ class MultiResolutionHLSOutput:
print(f"[MultiResHLS] Master playlist: {cid}", file=sys.stderr) print(f"[MultiResHLS] Master playlist: {cid}", file=sys.stderr)
if self._on_playlist_update: if self._on_playlist_update:
self._on_playlist_update(cid) # Pass both master CID and quality info for dynamic playlist generation
quality_info = {
name: {
"cid": q.playlist_cid,
"width": q.width,
"height": q.height,
"bitrate": q.bitrate,
}
for name, q in self.qualities.items()
if q.playlist_cid
}
self._on_playlist_update(cid, quality_info)
def close(self): def close(self):
"""Close all encoders and finalize output.""" """Close all encoders and finalize output."""

View File

@@ -334,7 +334,13 @@ def run_stream(
task_logger.warning(f"DEBUG: streaming:make-video-source is now: {type(interp.primitives.get('streaming:make-video-source'))}") task_logger.warning(f"DEBUG: streaming:make-video-source is now: {type(interp.primitives.get('streaming:make-video-source'))}")
# Set up callback to update database when IPFS playlist is created (for live HLS redirect) # Set up callback to update database when IPFS playlist is created (for live HLS redirect)
def on_playlist_update(playlist_cid): def on_playlist_update(playlist_cid, quality_playlists=None):
"""Update database with playlist CID and quality info.
Args:
playlist_cid: Master playlist CID
quality_playlists: Dict of quality name -> {cid, width, height, bitrate}
"""
global _resolve_loop, _db_initialized global _resolve_loop, _db_initialized
import asyncio import asyncio
import database import database
@@ -346,8 +352,8 @@ def run_stream(
if not _db_initialized: if not _db_initialized:
_resolve_loop.run_until_complete(database.init_db()) _resolve_loop.run_until_complete(database.init_db())
_db_initialized = True _db_initialized = True
_resolve_loop.run_until_complete(database.update_pending_run_playlist(run_id, playlist_cid)) _resolve_loop.run_until_complete(database.update_pending_run_playlist(run_id, playlist_cid, quality_playlists))
logger.info(f"Updated pending run {run_id} with IPFS playlist: {playlist_cid}") logger.info(f"Updated pending run {run_id} with IPFS playlist: {playlist_cid}, qualities: {list(quality_playlists.keys()) if quality_playlists else []}")
except Exception as e: except Exception as e:
logger.error(f"Failed to update playlist CID in database: {e}") logger.error(f"Failed to update playlist CID in database: {e}")