From c145d4a4272196d43f2b24e429101171ab385dd9 Mon Sep 17 00:00:00 2001 From: giles Date: Sun, 11 Jan 2026 08:32:27 +0000 Subject: [PATCH] Add IPFS_PRIMARY mode UI support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add output_ipfs_cid field to RunStatus model - Handle output_cid from IPFS-primary task results - Add /ipfs/{cid} redirect route to IPFS gateway - Add /ipfs/{cid}/raw to fetch and serve IPFS content - Show IPFS output in run detail when output_hash unavailable - Display step CIDs on plan page for IPFS_PRIMARY runs 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- server.py | 85 ++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 81 insertions(+), 4 deletions(-) diff --git a/server.py b/server.py index d94ad45..f494bce 100644 --- a/server.py +++ b/server.py @@ -269,6 +269,7 @@ class RunStatus(BaseModel): created_at: str completed_at: Optional[str] = None output_hash: Optional[str] = None + output_ipfs_cid: Optional[str] = None # IPFS CID of output (IPFS_PRIMARY mode) error: Optional[str] = None celery_task_id: Optional[str] = None effects_commit: Optional[str] = None @@ -835,7 +836,19 @@ async def get_run(run_id: str): run.completed_at = datetime.now(timezone.utc).isoformat() # Handle both legacy (render_effect) and new (execute_dag/run_plan) result formats - if "output_hash" in result or "output_cache_id" in result: + if "output_cid" in result: + # IPFS-primary mode: everything on IPFS + run.output_ipfs_cid = result.get("output_cid") + run.plan_id = result.get("plan_id") + # Store step CIDs for UI + run.step_results = { + step_id: {"cid": cid, "status": "completed"} + for step_id, cid in result.get("step_cids", {}).items() + } + # Try to get content_hash from cache_id mapping in Redis + # (cache_id is often the same as content_hash) + output_path = None + elif "output_hash" in result or "output_cache_id" in result: # New DAG/plan result format run.output_hash = result.get("output_hash") or result.get("output_cache_id") run.provenance_cid = result.get("provenance_cid") @@ -1081,8 +1094,9 @@ async def run_detail(run_id: str, request: Request): media_html = "" available_inputs = [inp for inp in run.inputs if cache_manager.has_content(inp)] has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash) + has_ipfs_output = run.status == "completed" and run.output_ipfs_cid and not has_output - if available_inputs or has_output: + if available_inputs or has_output or has_ipfs_output: # Flexible grid - more columns for more items num_items = len(available_inputs) + (1 if has_output else 0) grid_cols = min(num_items, 3) # Max 3 columns @@ -1123,6 +1137,18 @@ async def run_detail(run_id: str, request: Request):
{output_elem}
''' + elif has_ipfs_output: + # IPFS-only output (IPFS_PRIMARY mode) + output_cid = run.output_ipfs_cid + ipfs_gateway = IPFS_GATEWAY_URL.rstrip('/') if IPFS_GATEWAY_URL else "https://ipfs.io/ipfs" + output_elem = f'' + media_html += f''' +
+
Output (IPFS)
+ {output_cid} +
{output_elem}
+
+ ''' media_html += '' # Build inputs list with names @@ -1219,6 +1245,11 @@ async def run_detail(run_id: str, request: Request):
Output
{run.output_hash} ''' + elif run.output_ipfs_cid: + output_link = f'''
+
Output (IPFS)
+ {run.output_ipfs_cid} +
''' completed_html = "" if run.completed_at: @@ -1543,22 +1574,37 @@ async def run_plan_visualization(run_id: str, request: Request): ''' # Build steps list with cache_id links + # Check if we have step CIDs from IPFS_PRIMARY mode + step_cids = {} + if run.step_results: + for sid, res in run.step_results.items(): + if isinstance(res, dict) and res.get("cid"): + step_cids[sid] = res["cid"] + for i, step in enumerate(steps): step_id = step.get("step_id", "") step_name = step.get("name", step_id[:20]) node_type = step.get("node_type", "EFFECT") cache_id = step.get("cache_id", "") + step_cid = step_cids.get(step_id, "") # CID from IPFS_PRIMARY mode has_cached = cache_manager.has_content(cache_id) if cache_id else False color = NODE_COLORS.get(node_type, NODE_COLORS["default"]) status_badge = "" - if has_cached: + if has_cached or step_cid: status_badge = 'cached' elif run.status == "completed": status_badge = 'completed' cache_link = "" - if cache_id: + if step_cid: + # IPFS_PRIMARY mode - show CID link + cache_link = f''' +
+ Output (IPFS): + {step_cid} +
''' + elif cache_id: if has_cached: cache_link = f'''
@@ -3031,6 +3077,37 @@ async def get_cached(content_hash: str, request: Request): return HTMLResponse(render_page(f"Cache: {content_hash[:16]}...", content, ctx.actor_id, active_tab="media")) +@app.get("/ipfs/{cid}") +async def ipfs_redirect(cid: str): + """Redirect to IPFS gateway for content viewing.""" + from fastapi.responses import RedirectResponse + if IPFS_GATEWAY_URL: + gateway_url = f"{IPFS_GATEWAY_URL.rstrip('/')}/{cid}" + else: + gateway_url = f"https://ipfs.io/ipfs/{cid}" + return RedirectResponse(url=gateway_url, status_code=302) + + +@app.get("/ipfs/{cid}/raw") +async def ipfs_raw(cid: str): + """Fetch content from IPFS and serve it.""" + # Try to get from IPFS and serve + import tempfile + with tempfile.NamedTemporaryFile(delete=False) as tmp: + tmp_path = Path(tmp.name) + + if not ipfs_client.get_file(cid, tmp_path): + raise HTTPException(404, f"Could not fetch CID {cid} from IPFS") + + # Detect media type + media_type_name = detect_media_type(tmp_path) + if media_type_name == "video": + return FileResponse(tmp_path, media_type="video/mp4", filename=f"{cid[:16]}.mp4") + elif media_type_name == "image": + return FileResponse(tmp_path, media_type="image/jpeg", filename=f"{cid[:16]}.jpg") + return FileResponse(tmp_path, filename=f"{cid[:16]}.bin") + + @app.get("/cache/{content_hash}/raw") async def get_cached_raw(content_hash: str): """Get raw cached content (file download)."""