Add IPFS_PRIMARY mode UI support

- Add output_ipfs_cid field to RunStatus model
- Handle output_cid from IPFS-primary task results
- Add /ipfs/{cid} redirect route to IPFS gateway
- Add /ipfs/{cid}/raw to fetch and serve IPFS content
- Show IPFS output in run detail when output_hash unavailable
- Display step CIDs on plan page for IPFS_PRIMARY runs

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
giles
2026-01-11 08:32:27 +00:00
parent 68c2e45541
commit c145d4a427

View File

@@ -269,6 +269,7 @@ class RunStatus(BaseModel):
created_at: str
completed_at: Optional[str] = None
output_hash: Optional[str] = None
output_ipfs_cid: Optional[str] = None # IPFS CID of output (IPFS_PRIMARY mode)
error: Optional[str] = None
celery_task_id: Optional[str] = None
effects_commit: Optional[str] = None
@@ -835,7 +836,19 @@ async def get_run(run_id: str):
run.completed_at = datetime.now(timezone.utc).isoformat()
# Handle both legacy (render_effect) and new (execute_dag/run_plan) result formats
if "output_hash" in result or "output_cache_id" in result:
if "output_cid" in result:
# IPFS-primary mode: everything on IPFS
run.output_ipfs_cid = result.get("output_cid")
run.plan_id = result.get("plan_id")
# Store step CIDs for UI
run.step_results = {
step_id: {"cid": cid, "status": "completed"}
for step_id, cid in result.get("step_cids", {}).items()
}
# Try to get content_hash from cache_id mapping in Redis
# (cache_id is often the same as content_hash)
output_path = None
elif "output_hash" in result or "output_cache_id" in result:
# New DAG/plan result format
run.output_hash = result.get("output_hash") or result.get("output_cache_id")
run.provenance_cid = result.get("provenance_cid")
@@ -1081,8 +1094,9 @@ async def run_detail(run_id: str, request: Request):
media_html = ""
available_inputs = [inp for inp in run.inputs if cache_manager.has_content(inp)]
has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash)
has_ipfs_output = run.status == "completed" and run.output_ipfs_cid and not has_output
if available_inputs or has_output:
if available_inputs or has_output or has_ipfs_output:
# Flexible grid - more columns for more items
num_items = len(available_inputs) + (1 if has_output else 0)
grid_cols = min(num_items, 3) # Max 3 columns
@@ -1123,6 +1137,18 @@ async def run_detail(run_id: str, request: Request):
<div class="mt-3 flex justify-center">{output_elem}</div>
</div>
'''
elif has_ipfs_output:
# IPFS-only output (IPFS_PRIMARY mode)
output_cid = run.output_ipfs_cid
ipfs_gateway = IPFS_GATEWAY_URL.rstrip('/') if IPFS_GATEWAY_URL else "https://ipfs.io/ipfs"
output_elem = f'<video src="{ipfs_gateway}/{output_cid}" controls autoplay muted loop playsinline class="max-w-full max-h-64 rounded-lg"></video>'
media_html += f'''
<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-2">Output (IPFS)</div>
<a href="/ipfs/{output_cid}" class="text-blue-400 hover:text-blue-300 font-mono text-xs">{output_cid}</a>
<div class="mt-3 flex justify-center">{output_elem}</div>
</div>
'''
media_html += '</div>'
# Build inputs list with names
@@ -1219,6 +1245,11 @@ async def run_detail(run_id: str, request: Request):
<div class="text-sm text-gray-400 mb-1">Output</div>
<a href="/cache/{run.output_hash}" class="text-blue-400 hover:text-blue-300 font-mono text-xs">{run.output_hash}</a>
</div>'''
elif run.output_ipfs_cid:
output_link = f'''<div class="bg-dark-600 rounded-lg p-4">
<div class="text-sm text-gray-400 mb-1">Output (IPFS)</div>
<a href="/ipfs/{run.output_ipfs_cid}" class="text-blue-400 hover:text-blue-300 font-mono text-xs">{run.output_ipfs_cid}</a>
</div>'''
completed_html = ""
if run.completed_at:
@@ -1543,22 +1574,37 @@ async def run_plan_visualization(run_id: str, request: Request):
'''
# Build steps list with cache_id links
# Check if we have step CIDs from IPFS_PRIMARY mode
step_cids = {}
if run.step_results:
for sid, res in run.step_results.items():
if isinstance(res, dict) and res.get("cid"):
step_cids[sid] = res["cid"]
for i, step in enumerate(steps):
step_id = step.get("step_id", "")
step_name = step.get("name", step_id[:20])
node_type = step.get("node_type", "EFFECT")
cache_id = step.get("cache_id", "")
step_cid = step_cids.get(step_id, "") # CID from IPFS_PRIMARY mode
has_cached = cache_manager.has_content(cache_id) if cache_id else False
color = NODE_COLORS.get(node_type, NODE_COLORS["default"])
status_badge = ""
if has_cached:
if has_cached or step_cid:
status_badge = '<span class="text-green-400 text-xs">cached</span>'
elif run.status == "completed":
status_badge = '<span class="text-green-400 text-xs">completed</span>'
cache_link = ""
if cache_id:
if step_cid:
# IPFS_PRIMARY mode - show CID link
cache_link = f'''
<div class="mt-1 ml-8 flex items-center gap-2">
<span class="text-gray-500 text-xs">Output (IPFS):</span>
<a href="/ipfs/{step_cid}" class="font-mono text-xs text-blue-400 hover:text-blue-300">{step_cid}</a>
</div>'''
elif cache_id:
if has_cached:
cache_link = f'''
<div class="mt-1 ml-8 flex items-center gap-2">
@@ -3031,6 +3077,37 @@ async def get_cached(content_hash: str, request: Request):
return HTMLResponse(render_page(f"Cache: {content_hash[:16]}...", content, ctx.actor_id, active_tab="media"))
@app.get("/ipfs/{cid}")
async def ipfs_redirect(cid: str):
"""Redirect to IPFS gateway for content viewing."""
from fastapi.responses import RedirectResponse
if IPFS_GATEWAY_URL:
gateway_url = f"{IPFS_GATEWAY_URL.rstrip('/')}/{cid}"
else:
gateway_url = f"https://ipfs.io/ipfs/{cid}"
return RedirectResponse(url=gateway_url, status_code=302)
@app.get("/ipfs/{cid}/raw")
async def ipfs_raw(cid: str):
"""Fetch content from IPFS and serve it."""
# Try to get from IPFS and serve
import tempfile
with tempfile.NamedTemporaryFile(delete=False) as tmp:
tmp_path = Path(tmp.name)
if not ipfs_client.get_file(cid, tmp_path):
raise HTTPException(404, f"Could not fetch CID {cid} from IPFS")
# Detect media type
media_type_name = detect_media_type(tmp_path)
if media_type_name == "video":
return FileResponse(tmp_path, media_type="video/mp4", filename=f"{cid[:16]}.mp4")
elif media_type_name == "image":
return FileResponse(tmp_path, media_type="image/jpeg", filename=f"{cid[:16]}.jpg")
return FileResponse(tmp_path, filename=f"{cid[:16]}.bin")
@app.get("/cache/{content_hash}/raw")
async def get_cached_raw(content_hash: str):
"""Get raw cached content (file download)."""