diff --git a/server.py b/server.py
index d94ad45..f494bce 100644
--- a/server.py
+++ b/server.py
@@ -269,6 +269,7 @@ class RunStatus(BaseModel):
created_at: str
completed_at: Optional[str] = None
output_hash: Optional[str] = None
+ output_ipfs_cid: Optional[str] = None # IPFS CID of output (IPFS_PRIMARY mode)
error: Optional[str] = None
celery_task_id: Optional[str] = None
effects_commit: Optional[str] = None
@@ -835,7 +836,19 @@ async def get_run(run_id: str):
run.completed_at = datetime.now(timezone.utc).isoformat()
# Handle both legacy (render_effect) and new (execute_dag/run_plan) result formats
- if "output_hash" in result or "output_cache_id" in result:
+ if "output_cid" in result:
+ # IPFS-primary mode: everything on IPFS
+ run.output_ipfs_cid = result.get("output_cid")
+ run.plan_id = result.get("plan_id")
+ # Store step CIDs for UI
+ run.step_results = {
+ step_id: {"cid": cid, "status": "completed"}
+ for step_id, cid in result.get("step_cids", {}).items()
+ }
+ # Try to get content_hash from cache_id mapping in Redis
+ # (cache_id is often the same as content_hash)
+ output_path = None
+ elif "output_hash" in result or "output_cache_id" in result:
# New DAG/plan result format
run.output_hash = result.get("output_hash") or result.get("output_cache_id")
run.provenance_cid = result.get("provenance_cid")
@@ -1081,8 +1094,9 @@ async def run_detail(run_id: str, request: Request):
media_html = ""
available_inputs = [inp for inp in run.inputs if cache_manager.has_content(inp)]
has_output = run.status == "completed" and run.output_hash and cache_manager.has_content(run.output_hash)
+ has_ipfs_output = run.status == "completed" and run.output_ipfs_cid and not has_output
- if available_inputs or has_output:
+ if available_inputs or has_output or has_ipfs_output:
# Flexible grid - more columns for more items
num_items = len(available_inputs) + (1 if has_output else 0)
grid_cols = min(num_items, 3) # Max 3 columns
@@ -1123,6 +1137,18 @@ async def run_detail(run_id: str, request: Request):
{output_elem}
'''
+ elif has_ipfs_output:
+ # IPFS-only output (IPFS_PRIMARY mode)
+ output_cid = run.output_ipfs_cid
+ ipfs_gateway = IPFS_GATEWAY_URL.rstrip('/') if IPFS_GATEWAY_URL else "https://ipfs.io/ipfs"
+ output_elem = f''
+ media_html += f'''
+
+ '''
media_html += ''
# Build inputs list with names
@@ -1219,6 +1245,11 @@ async def run_detail(run_id: str, request: Request):
Output
{run.output_hash}
'''
+ elif run.output_ipfs_cid:
+ output_link = f''''''
completed_html = ""
if run.completed_at:
@@ -1543,22 +1574,37 @@ async def run_plan_visualization(run_id: str, request: Request):
'''
# Build steps list with cache_id links
+ # Check if we have step CIDs from IPFS_PRIMARY mode
+ step_cids = {}
+ if run.step_results:
+ for sid, res in run.step_results.items():
+ if isinstance(res, dict) and res.get("cid"):
+ step_cids[sid] = res["cid"]
+
for i, step in enumerate(steps):
step_id = step.get("step_id", "")
step_name = step.get("name", step_id[:20])
node_type = step.get("node_type", "EFFECT")
cache_id = step.get("cache_id", "")
+ step_cid = step_cids.get(step_id, "") # CID from IPFS_PRIMARY mode
has_cached = cache_manager.has_content(cache_id) if cache_id else False
color = NODE_COLORS.get(node_type, NODE_COLORS["default"])
status_badge = ""
- if has_cached:
+ if has_cached or step_cid:
status_badge = 'cached'
elif run.status == "completed":
status_badge = 'completed'
cache_link = ""
- if cache_id:
+ if step_cid:
+ # IPFS_PRIMARY mode - show CID link
+ cache_link = f'''
+ '''
+ elif cache_id:
if has_cached:
cache_link = f'''
@@ -3031,6 +3077,37 @@ async def get_cached(content_hash: str, request: Request):
return HTMLResponse(render_page(f"Cache: {content_hash[:16]}...", content, ctx.actor_id, active_tab="media"))
+@app.get("/ipfs/{cid}")
+async def ipfs_redirect(cid: str):
+ """Redirect to IPFS gateway for content viewing."""
+ from fastapi.responses import RedirectResponse
+ if IPFS_GATEWAY_URL:
+ gateway_url = f"{IPFS_GATEWAY_URL.rstrip('/')}/{cid}"
+ else:
+ gateway_url = f"https://ipfs.io/ipfs/{cid}"
+ return RedirectResponse(url=gateway_url, status_code=302)
+
+
+@app.get("/ipfs/{cid}/raw")
+async def ipfs_raw(cid: str):
+ """Fetch content from IPFS and serve it."""
+ # Try to get from IPFS and serve
+ import tempfile
+ with tempfile.NamedTemporaryFile(delete=False) as tmp:
+ tmp_path = Path(tmp.name)
+
+ if not ipfs_client.get_file(cid, tmp_path):
+ raise HTTPException(404, f"Could not fetch CID {cid} from IPFS")
+
+ # Detect media type
+ media_type_name = detect_media_type(tmp_path)
+ if media_type_name == "video":
+ return FileResponse(tmp_path, media_type="video/mp4", filename=f"{cid[:16]}.mp4")
+ elif media_type_name == "image":
+ return FileResponse(tmp_path, media_type="image/jpeg", filename=f"{cid[:16]}.jpg")
+ return FileResponse(tmp_path, filename=f"{cid[:16]}.bin")
+
+
@app.get("/cache/{content_hash}/raw")
async def get_cached_raw(content_hash: str):
"""Get raw cached content (file download)."""