Add IPFS proxy for live HLS streaming - rewrites playlist URLs
This commit is contained in:
@@ -1262,6 +1262,24 @@ async def get_playlist(run_id: str, request: Request):
|
|||||||
except httpx.RequestError as e:
|
except httpx.RequestError as e:
|
||||||
raise HTTPException(502, f"IPFS error: {e}")
|
raise HTTPException(502, f"IPFS error: {e}")
|
||||||
|
|
||||||
|
# Rewrite IPFS URLs to use our proxy endpoint so HLS.js polls us instead of static IPFS
|
||||||
|
# This handles both /ipfs/{cid} and https://gateway/ipfs/{cid} patterns
|
||||||
|
import re
|
||||||
|
gateway = os.environ.get("IPFS_GATEWAY_URL", "https://celery-artdag.rose-ash.com/ipfs")
|
||||||
|
|
||||||
|
# Replace absolute gateway URLs with our proxy
|
||||||
|
playlist_content = re.sub(
|
||||||
|
rf'{re.escape(gateway)}/([A-Za-z0-9]+)',
|
||||||
|
rf'/runs/{run_id}/ipfs-proxy/\1',
|
||||||
|
playlist_content
|
||||||
|
)
|
||||||
|
# Also handle /ipfs/ paths
|
||||||
|
playlist_content = re.sub(
|
||||||
|
r'/ipfs/([A-Za-z0-9]+)',
|
||||||
|
rf'/runs/{run_id}/ipfs-proxy/\1',
|
||||||
|
playlist_content
|
||||||
|
)
|
||||||
|
|
||||||
return Response(
|
return Response(
|
||||||
content=playlist_content,
|
content=playlist_content,
|
||||||
media_type="application/vnd.apple.mpegurl",
|
media_type="application/vnd.apple.mpegurl",
|
||||||
@@ -1274,6 +1292,63 @@ async def get_playlist(run_id: str, request: Request):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/{run_id}/ipfs-proxy/{cid}")
|
||||||
|
async def proxy_ipfs_content(run_id: str, cid: str, request: Request):
|
||||||
|
"""Proxy IPFS content with no-cache headers for live streaming.
|
||||||
|
|
||||||
|
This allows HLS.js to poll for updated playlists through us rather than
|
||||||
|
hitting static IPFS URLs directly.
|
||||||
|
"""
|
||||||
|
import os
|
||||||
|
import httpx
|
||||||
|
from fastapi.responses import Response
|
||||||
|
|
||||||
|
ipfs_api = os.environ.get("IPFS_API_URL", "http://celery_ipfs:5001")
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||||
|
resp = await client.post(f"{ipfs_api}/api/v0/cat?arg={cid}")
|
||||||
|
if resp.status_code != 200:
|
||||||
|
raise HTTPException(502, f"Failed to fetch from IPFS: {cid}")
|
||||||
|
content = resp.content
|
||||||
|
except httpx.RequestError as e:
|
||||||
|
raise HTTPException(502, f"IPFS error: {e}")
|
||||||
|
|
||||||
|
# Determine content type
|
||||||
|
if cid.endswith('.m3u8') or b'#EXTM3U' in content[:20]:
|
||||||
|
media_type = "application/vnd.apple.mpegurl"
|
||||||
|
# Rewrite any IPFS URLs in sub-playlists too
|
||||||
|
import re
|
||||||
|
text_content = content.decode('utf-8', errors='replace')
|
||||||
|
gateway = os.environ.get("IPFS_GATEWAY_URL", "https://celery-artdag.rose-ash.com/ipfs")
|
||||||
|
text_content = re.sub(
|
||||||
|
rf'{re.escape(gateway)}/([A-Za-z0-9]+)',
|
||||||
|
rf'/runs/{run_id}/ipfs-proxy/\1',
|
||||||
|
text_content
|
||||||
|
)
|
||||||
|
text_content = re.sub(
|
||||||
|
r'/ipfs/([A-Za-z0-9]+)',
|
||||||
|
rf'/runs/{run_id}/ipfs-proxy/\1',
|
||||||
|
text_content
|
||||||
|
)
|
||||||
|
content = text_content.encode('utf-8')
|
||||||
|
elif b'\x47' in content[:1]: # MPEG-TS sync byte
|
||||||
|
media_type = "video/mp2t"
|
||||||
|
else:
|
||||||
|
media_type = "application/octet-stream"
|
||||||
|
|
||||||
|
return Response(
|
||||||
|
content=content,
|
||||||
|
media_type=media_type,
|
||||||
|
headers={
|
||||||
|
"Cache-Control": "no-cache, no-store, must-revalidate",
|
||||||
|
"Pragma": "no-cache",
|
||||||
|
"Expires": "0",
|
||||||
|
"Access-Control-Allow-Origin": "*",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@router.get("/{run_id}/ipfs-stream")
|
@router.get("/{run_id}/ipfs-stream")
|
||||||
async def get_ipfs_stream_info(run_id: str, request: Request):
|
async def get_ipfs_stream_info(run_id: str, request: Request):
|
||||||
"""Get IPFS streaming info for a run.
|
"""Get IPFS streaming info for a run.
|
||||||
|
|||||||
Reference in New Issue
Block a user