From 4647dd52c86595fa7240b81f43d975ec0621b887 Mon Sep 17 00:00:00 2001 From: giles Date: Wed, 4 Feb 2026 21:00:07 +0000 Subject: [PATCH] Add IPFS proxy for live HLS streaming - rewrites playlist URLs --- app/routers/runs.py | 75 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/app/routers/runs.py b/app/routers/runs.py index 3632c64..6559fdd 100644 --- a/app/routers/runs.py +++ b/app/routers/runs.py @@ -1262,6 +1262,24 @@ async def get_playlist(run_id: str, request: Request): except httpx.RequestError as e: raise HTTPException(502, f"IPFS error: {e}") + # Rewrite IPFS URLs to use our proxy endpoint so HLS.js polls us instead of static IPFS + # This handles both /ipfs/{cid} and https://gateway/ipfs/{cid} patterns + import re + gateway = os.environ.get("IPFS_GATEWAY_URL", "https://celery-artdag.rose-ash.com/ipfs") + + # Replace absolute gateway URLs with our proxy + playlist_content = re.sub( + rf'{re.escape(gateway)}/([A-Za-z0-9]+)', + rf'/runs/{run_id}/ipfs-proxy/\1', + playlist_content + ) + # Also handle /ipfs/ paths + playlist_content = re.sub( + r'/ipfs/([A-Za-z0-9]+)', + rf'/runs/{run_id}/ipfs-proxy/\1', + playlist_content + ) + return Response( content=playlist_content, media_type="application/vnd.apple.mpegurl", @@ -1274,6 +1292,63 @@ async def get_playlist(run_id: str, request: Request): ) +@router.get("/{run_id}/ipfs-proxy/{cid}") +async def proxy_ipfs_content(run_id: str, cid: str, request: Request): + """Proxy IPFS content with no-cache headers for live streaming. + + This allows HLS.js to poll for updated playlists through us rather than + hitting static IPFS URLs directly. + """ + import os + import httpx + from fastapi.responses import Response + + ipfs_api = os.environ.get("IPFS_API_URL", "http://celery_ipfs:5001") + + try: + async with httpx.AsyncClient(timeout=10.0) as client: + resp = await client.post(f"{ipfs_api}/api/v0/cat?arg={cid}") + if resp.status_code != 200: + raise HTTPException(502, f"Failed to fetch from IPFS: {cid}") + content = resp.content + except httpx.RequestError as e: + raise HTTPException(502, f"IPFS error: {e}") + + # Determine content type + if cid.endswith('.m3u8') or b'#EXTM3U' in content[:20]: + media_type = "application/vnd.apple.mpegurl" + # Rewrite any IPFS URLs in sub-playlists too + import re + text_content = content.decode('utf-8', errors='replace') + gateway = os.environ.get("IPFS_GATEWAY_URL", "https://celery-artdag.rose-ash.com/ipfs") + text_content = re.sub( + rf'{re.escape(gateway)}/([A-Za-z0-9]+)', + rf'/runs/{run_id}/ipfs-proxy/\1', + text_content + ) + text_content = re.sub( + r'/ipfs/([A-Za-z0-9]+)', + rf'/runs/{run_id}/ipfs-proxy/\1', + text_content + ) + content = text_content.encode('utf-8') + elif b'\x47' in content[:1]: # MPEG-TS sync byte + media_type = "video/mp2t" + else: + media_type = "application/octet-stream" + + return Response( + content=content, + media_type=media_type, + headers={ + "Cache-Control": "no-cache, no-store, must-revalidate", + "Pragma": "no-cache", + "Expires": "0", + "Access-Control-Allow-Origin": "*", + } + ) + + @router.get("/{run_id}/ipfs-stream") async def get_ipfs_stream_info(run_id: str, request: Request): """Get IPFS streaming info for a run.