Files
celery/app/routers/runs.py
gilesb 977d9a9258 Fix artifact dict key mismatch (hash -> cid)
Template runs/detail.html expects artifact.cid but code provided
artifact.hash, causing UndefinedError when viewing run details.

- Change run_service.get_run_artifacts to return 'cid' key
- Change runs.py router inline artifact creation to use 'cid' key
- Add regression tests for artifact data structure

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 12:05:13 +00:00

784 lines
26 KiB
Python

"""
Run management routes for L1 server.
Handles run creation, status, listing, and detail views.
"""
import asyncio
import json
import logging
from datetime import datetime, timezone
from typing import List, Optional, Dict, Any
from fastapi import APIRouter, Request, Depends, HTTPException
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from artdag_common import render
from artdag_common.middleware import wants_html, wants_json
from artdag_common.middleware.auth import UserContext
from ..dependencies import (
require_auth, get_templates, get_current_user,
get_redis_client, get_cache_manager
)
from ..services.run_service import RunService
router = APIRouter()
logger = logging.getLogger(__name__)
def plan_to_sexp(plan: dict, recipe_name: str = None) -> str:
"""Convert a plan to S-expression format for display."""
if not plan or not plan.get("steps"):
return ";; No plan available"
lines = []
lines.append(f'(plan "{recipe_name or "unknown"}"')
# Group nodes by type for cleaner output
steps = plan.get("steps", [])
for step in steps:
step_id = step.get("id", "?")
step_type = step.get("type", "EFFECT")
inputs = step.get("inputs", [])
config = step.get("config", {})
# Build the step S-expression
if step_type == "SOURCE":
if config.get("input"):
# Variable input
input_name = config.get("name", config.get("input", "input"))
lines.append(f' (source :input "{input_name}")')
elif config.get("asset"):
# Fixed asset
lines.append(f' (source {config.get("asset", step_id)})')
else:
lines.append(f' (source {step_id})')
elif step_type == "EFFECT":
effect_name = config.get("effect", step_id)
if inputs:
inp_str = " ".join(inputs)
lines.append(f' (-> {inp_str} (effect {effect_name}))')
else:
lines.append(f' (effect {effect_name})')
elif step_type == "SEQUENCE":
if inputs:
inp_str = " ".join(inputs)
lines.append(f' (sequence {inp_str})')
else:
lines.append(f' (sequence)')
else:
# Generic node
if inputs:
inp_str = " ".join(inputs)
lines.append(f' ({step_type.lower()} {inp_str})')
else:
lines.append(f' ({step_type.lower()} {step_id})')
lines.append(')')
return "\n".join(lines)
RUNS_KEY_PREFIX = "artdag:run:"
class RunRequest(BaseModel):
recipe: str
inputs: List[str]
output_name: Optional[str] = None
use_dag: bool = True
dag_json: Optional[str] = None
class RunStatus(BaseModel):
run_id: str
status: str
recipe: Optional[str] = None
inputs: Optional[List[str]] = None
output_name: Optional[str] = None
created_at: Optional[str] = None
completed_at: Optional[str] = None
output_cid: Optional[str] = None
username: Optional[str] = None
provenance_cid: Optional[str] = None
celery_task_id: Optional[str] = None
error: Optional[str] = None
plan_id: Optional[str] = None
plan_name: Optional[str] = None
step_results: Optional[Dict[str, Any]] = None
all_outputs: Optional[List[str]] = None
effects_commit: Optional[str] = None
effect_url: Optional[str] = None
infrastructure: Optional[Dict[str, Any]] = None
def get_run_service():
"""Get run service instance."""
import database
return RunService(database, get_redis_client(), get_cache_manager())
@router.post("", response_model=RunStatus)
async def create_run(
request: RunRequest,
ctx: UserContext = Depends(require_auth),
run_service: RunService = Depends(get_run_service),
):
"""Start a new rendering run. Checks cache before executing."""
run, error = await run_service.create_run(
recipe=request.recipe,
inputs=request.inputs,
output_name=request.output_name,
use_dag=request.use_dag,
dag_json=request.dag_json,
actor_id=ctx.actor_id,
l2_server=ctx.l2_server,
)
if error:
raise HTTPException(400, error)
return run
@router.get("/{run_id}")
async def get_run(
request: Request,
run_id: str,
run_service: RunService = Depends(get_run_service),
):
"""Get status of a run."""
run = await run_service.get_run(run_id)
if not run:
raise HTTPException(404, f"Run {run_id} not found")
# Only render HTML if browser explicitly requests it
if wants_html(request):
# Extract username from actor_id (format: @user@server)
actor_id = run.get("actor_id", "")
if actor_id and actor_id.startswith("@"):
parts = actor_id[1:].split("@")
run["username"] = parts[0] if parts else "Unknown"
else:
run["username"] = actor_id or "Unknown"
# Helper to normalize input refs to just node IDs
def normalize_inputs(inputs):
"""Convert input refs (may be dicts or strings) to list of node IDs."""
result = []
for inp in inputs:
if isinstance(inp, dict):
node_id = inp.get("node") or inp.get("input") or inp.get("id")
else:
node_id = inp
if node_id:
result.append(node_id)
return result
# Try to load the recipe to show the plan
plan = None
plan_sexp = None # Native S-expression if available
recipe_ipfs_cid = None
recipe_id = run.get("recipe")
if recipe_id and len(recipe_id) == 64: # Looks like a hash
try:
from ..services.recipe_service import RecipeService
recipe_service = RecipeService(get_redis_client(), get_cache_manager())
recipe = await recipe_service.get_recipe(recipe_id)
if recipe:
# Use native S-expression if available (code is data!)
if recipe.get("sexp"):
plan_sexp = recipe["sexp"]
# Get IPFS CID for the recipe
recipe_ipfs_cid = recipe.get("ipfs_cid")
# Build steps for DAG visualization
dag = recipe.get("dag", {})
nodes = dag.get("nodes", [])
steps = []
if isinstance(nodes, list):
for node in nodes:
node_id = node.get("id", "")
steps.append({
"id": node_id,
"name": node_id,
"type": node.get("type", "EFFECT"),
"status": "completed", # Run completed
"inputs": normalize_inputs(node.get("inputs", [])),
"config": node.get("config", {}),
})
elif isinstance(nodes, dict):
for node_id, node in nodes.items():
steps.append({
"id": node_id,
"name": node_id,
"type": node.get("type", "EFFECT"),
"status": "completed",
"inputs": normalize_inputs(node.get("inputs", [])),
"config": node.get("config", {}),
})
if steps:
plan = {"steps": steps}
run["total_steps"] = len(steps)
run["executed"] = len(steps) if run.get("status") == "completed" else 0
# Use recipe name instead of hash for display (if not already set)
if recipe.get("name") and not run.get("recipe_name"):
run["recipe_name"] = recipe["name"]
except Exception as e:
logger.warning(f"Failed to load recipe for plan: {e}")
# Helper to convert simple type to MIME type prefix for template
def type_to_mime(simple_type: str) -> str:
if simple_type == "video":
return "video/mp4"
elif simple_type == "image":
return "image/jpeg"
elif simple_type == "audio":
return "audio/mpeg"
return None
# Build artifacts list from output and inputs
artifacts = []
output_media_type = None
if run.get("output_cid"):
# Detect media type using magic bytes
output_cid = run["output_cid"]
media_type = None
try:
from ..services.run_service import detect_media_type
cache_path = get_cache_manager().get_by_cid(output_cid)
if cache_path and cache_path.exists():
simple_type = detect_media_type(cache_path)
media_type = type_to_mime(simple_type)
output_media_type = media_type
except Exception:
pass
artifacts.append({
"cid": output_cid,
"step_name": "Output",
"media_type": media_type or "application/octet-stream",
})
# Build inputs list with media types
run_inputs = []
if run.get("inputs"):
from ..services.run_service import detect_media_type
cache_manager = get_cache_manager()
for i, input_hash in enumerate(run["inputs"]):
media_type = None
try:
cache_path = cache_manager.get_by_cid(input_hash)
if cache_path and cache_path.exists():
simple_type = detect_media_type(cache_path)
media_type = type_to_mime(simple_type)
except Exception:
pass
run_inputs.append({
"hash": input_hash,
"name": f"Input {i + 1}",
"media_type": media_type,
})
# Build DAG elements for visualization
dag_elements = []
if plan and plan.get("steps"):
node_colors = {
"input": "#3b82f6",
"effect": "#8b5cf6",
"analyze": "#ec4899",
"transform": "#10b981",
"output": "#f59e0b",
"SOURCE": "#3b82f6",
"EFFECT": "#8b5cf6",
"SEQUENCE": "#ec4899",
}
for i, step in enumerate(plan["steps"]):
step_id = step.get("id", f"step-{i}")
dag_elements.append({
"data": {
"id": step_id,
"label": step.get("name", f"Step {i+1}"),
"color": node_colors.get(step.get("type", "effect"), "#6b7280"),
}
})
for inp in step.get("inputs", []):
# Handle both string and dict inputs
if isinstance(inp, dict):
source = inp.get("node") or inp.get("input") or inp.get("id")
else:
source = inp
if source:
dag_elements.append({
"data": {
"source": source,
"target": step_id,
}
})
# Use native S-expression if available, otherwise generate from plan
if not plan_sexp and plan:
plan_sexp = plan_to_sexp(plan, run.get("recipe_name"))
templates = get_templates(request)
return render(templates, "runs/detail.html", request,
run=run,
plan=plan,
artifacts=artifacts,
run_inputs=run_inputs,
dag_elements=dag_elements,
output_media_type=output_media_type,
plan_sexp=plan_sexp,
recipe_ipfs_cid=recipe_ipfs_cid,
active_tab="runs",
)
# Default to JSON for API clients
return run
@router.delete("/{run_id}")
async def discard_run(
run_id: str,
ctx: UserContext = Depends(require_auth),
run_service: RunService = Depends(get_run_service),
):
"""Discard (delete) a run and its outputs."""
success, error = await run_service.discard_run(run_id, ctx.actor_id, ctx.username)
if error:
raise HTTPException(400 if "Cannot" in error else 404, error)
return {"discarded": True, "run_id": run_id}
@router.get("")
async def list_runs(
request: Request,
offset: int = 0,
limit: int = 20,
run_service: RunService = Depends(get_run_service),
):
"""List all runs for the current user."""
from ..services.auth_service import AuthService
auth_service = AuthService(get_redis_client())
ctx = auth_service.get_user_from_cookie(request)
if not ctx:
if wants_json(request):
raise HTTPException(401, "Authentication required")
from fastapi.responses import RedirectResponse
return RedirectResponse(url="/auth", status_code=302)
runs = await run_service.list_runs(ctx.actor_id, offset=offset, limit=limit)
has_more = len(runs) >= limit
if wants_json(request):
return {"runs": runs, "offset": offset, "limit": limit, "has_more": has_more}
# Add media info for inline previews (only for HTML)
cache_manager = get_cache_manager()
from ..services.run_service import detect_media_type
def type_to_mime(simple_type: str) -> str:
if simple_type == "video":
return "video/mp4"
elif simple_type == "image":
return "image/jpeg"
elif simple_type == "audio":
return "audio/mpeg"
return None
for run in runs:
# Add output media info
if run.get("output_cid"):
try:
cache_path = cache_manager.get_by_cid(run["output_cid"])
if cache_path and cache_path.exists():
simple_type = detect_media_type(cache_path)
run["output_media_type"] = type_to_mime(simple_type)
except Exception:
pass
# Add input media info (first 3 inputs)
input_previews = []
inputs = run.get("inputs", [])
if isinstance(inputs, list):
for input_hash in inputs[:3]:
preview = {"hash": input_hash, "media_type": None}
try:
cache_path = cache_manager.get_by_cid(input_hash)
if cache_path and cache_path.exists():
simple_type = detect_media_type(cache_path)
preview["media_type"] = type_to_mime(simple_type)
except Exception:
pass
input_previews.append(preview)
run["input_previews"] = input_previews
templates = get_templates(request)
return render(templates, "runs/list.html", request,
runs=runs,
user=ctx,
offset=offset,
limit=limit,
has_more=has_more,
active_tab="runs",
)
@router.get("/{run_id}/detail")
async def run_detail(
run_id: str,
request: Request,
run_service: RunService = Depends(get_run_service),
):
"""Run detail page with tabs for plan/analysis/artifacts."""
from ..services.auth_service import AuthService
auth_service = AuthService(get_redis_client())
ctx = auth_service.get_user_from_cookie(request)
if not ctx:
if wants_json(request):
raise HTTPException(401, "Authentication required")
from fastapi.responses import RedirectResponse
return RedirectResponse(url="/auth", status_code=302)
run = await run_service.get_run(run_id)
if not run:
raise HTTPException(404, f"Run {run_id} not found")
# Get plan, artifacts, and analysis
plan = await run_service.get_run_plan(run_id)
artifacts = await run_service.get_run_artifacts(run_id)
analysis = await run_service.get_run_analysis(run_id)
# Build DAG elements for visualization
dag_elements = []
if plan and plan.get("steps"):
node_colors = {
"input": "#3b82f6",
"effect": "#8b5cf6",
"analyze": "#ec4899",
"transform": "#10b981",
"output": "#f59e0b",
"SOURCE": "#3b82f6",
"EFFECT": "#8b5cf6",
"SEQUENCE": "#ec4899",
}
for i, step in enumerate(plan["steps"]):
step_id = step.get("id", f"step-{i}")
dag_elements.append({
"data": {
"id": step_id,
"label": step.get("name", f"Step {i+1}"),
"color": node_colors.get(step.get("type", "effect"), "#6b7280"),
}
})
# Add edges from inputs (handle both string and dict formats)
for inp in step.get("inputs", []):
if isinstance(inp, dict):
source = inp.get("node") or inp.get("input") or inp.get("id")
else:
source = inp
if source:
dag_elements.append({
"data": {
"source": source,
"target": step_id,
}
})
if wants_json(request):
return {
"run": run,
"plan": plan,
"artifacts": artifacts,
"analysis": analysis,
}
templates = get_templates(request)
return render(templates, "runs/detail.html", request,
run=run,
plan=plan,
artifacts=artifacts,
analysis=analysis,
dag_elements=dag_elements,
user=ctx,
active_tab="runs",
)
@router.get("/{run_id}/plan")
async def run_plan(
run_id: str,
request: Request,
run_service: RunService = Depends(get_run_service),
):
"""Plan visualization as interactive DAG."""
from ..services.auth_service import AuthService
auth_service = AuthService(get_redis_client())
ctx = auth_service.get_user_from_cookie(request)
if not ctx:
raise HTTPException(401, "Authentication required")
plan = await run_service.get_run_plan(run_id)
if not plan:
raise HTTPException(404, "Plan not found for this run")
if wants_json(request):
return plan
# Build DAG elements
dag_elements = []
node_colors = {
"input": "#3b82f6",
"effect": "#8b5cf6",
"analyze": "#ec4899",
"transform": "#10b981",
"output": "#f59e0b",
"SOURCE": "#3b82f6",
"EFFECT": "#8b5cf6",
"SEQUENCE": "#ec4899",
}
for i, step in enumerate(plan.get("steps", [])):
step_id = step.get("id", f"step-{i}")
dag_elements.append({
"data": {
"id": step_id,
"label": step.get("name", f"Step {i+1}"),
"color": node_colors.get(step.get("type", "effect"), "#6b7280"),
}
})
for inp in step.get("inputs", []):
# Handle both string and dict formats
if isinstance(inp, dict):
source = inp.get("node") or inp.get("input") or inp.get("id")
else:
source = inp
if source:
dag_elements.append({
"data": {"source": source, "target": step_id}
})
templates = get_templates(request)
return render(templates, "runs/plan.html", request,
run_id=run_id,
plan=plan,
dag_elements=dag_elements,
user=ctx,
active_tab="runs",
)
@router.get("/{run_id}/artifacts")
async def run_artifacts(
run_id: str,
request: Request,
run_service: RunService = Depends(get_run_service),
):
"""Get artifacts list for a run."""
from ..services.auth_service import AuthService
auth_service = AuthService(get_redis_client())
ctx = auth_service.get_user_from_cookie(request)
if not ctx:
raise HTTPException(401, "Authentication required")
artifacts = await run_service.get_run_artifacts(run_id)
if wants_json(request):
return {"artifacts": artifacts}
templates = get_templates(request)
return render(templates, "runs/artifacts.html", request,
run_id=run_id,
artifacts=artifacts,
user=ctx,
active_tab="runs",
)
@router.get("/{run_id}/plan/node/{cache_id}", response_class=HTMLResponse)
async def plan_node_detail(
run_id: str,
cache_id: str,
request: Request,
run_service: RunService = Depends(get_run_service),
):
"""HTMX partial: Get plan node detail by cache_id."""
from ..services.auth_service import AuthService
from artdag_common import render_fragment
auth_service = AuthService(get_redis_client())
ctx = auth_service.get_user_from_cookie(request)
if not ctx:
return HTMLResponse('<p class="text-red-400">Login required</p>', status_code=401)
run = await run_service.get_run(run_id)
if not run:
return HTMLResponse('<p class="text-red-400">Run not found</p>', status_code=404)
plan = await run_service.get_run_plan(run_id)
if not plan:
return HTMLResponse('<p class="text-gray-400">Plan not found</p>')
# Build lookups
steps_by_cache_id = {}
steps_by_step_id = {}
for s in plan.get("steps", []):
if s.get("cache_id"):
steps_by_cache_id[s["cache_id"]] = s
if s.get("step_id"):
steps_by_step_id[s["step_id"]] = s
step = steps_by_cache_id.get(cache_id)
if not step:
return HTMLResponse(f'<p class="text-gray-400">Step not found</p>')
cache_manager = get_cache_manager()
# Node colors
node_colors = {
"SOURCE": "#3b82f6", "EFFECT": "#22c55e", "OUTPUT": "#a855f7",
"ANALYSIS": "#f59e0b", "_LIST": "#6366f1", "default": "#6b7280"
}
node_color = node_colors.get(step.get("node_type", "EFFECT"), node_colors["default"])
# Check cache status
has_cached = cache_manager.has_content(cache_id) if cache_id else False
# Determine output media type
output_media_type = None
output_preview = False
if has_cached:
cache_path = cache_manager.get_content_path(cache_id)
if cache_path:
output_media_type = run_service.detect_media_type(cache_path)
output_preview = output_media_type in ('video', 'image', 'audio')
# Check for IPFS CID
ipfs_cid = None
if run.step_results:
res = run.step_results.get(step.get("step_id"))
if isinstance(res, dict) and res.get("cid"):
ipfs_cid = res["cid"]
# Build input previews
inputs = []
for inp_step_id in step.get("input_steps", []):
inp_step = steps_by_step_id.get(inp_step_id)
if inp_step:
inp_cache_id = inp_step.get("cache_id", "")
inp_has_cached = cache_manager.has_content(inp_cache_id) if inp_cache_id else False
inp_media_type = None
if inp_has_cached:
inp_path = cache_manager.get_content_path(inp_cache_id)
if inp_path:
inp_media_type = run_service.detect_media_type(inp_path)
inputs.append({
"name": inp_step.get("name", inp_step_id[:12]),
"cache_id": inp_cache_id,
"media_type": inp_media_type,
"has_cached": inp_has_cached,
})
status = "cached" if (has_cached or ipfs_cid) else ("completed" if run.status == "completed" else "pending")
templates = get_templates(request)
return HTMLResponse(render_fragment(templates, "runs/plan_node.html",
step=step,
cache_id=cache_id,
node_color=node_color,
status=status,
has_cached=has_cached,
output_preview=output_preview,
output_media_type=output_media_type,
ipfs_cid=ipfs_cid,
ipfs_gateway="https://ipfs.io/ipfs",
inputs=inputs,
config=step.get("config", {}),
))
@router.delete("/{run_id}/ui", response_class=HTMLResponse)
async def ui_discard_run(
run_id: str,
request: Request,
run_service: RunService = Depends(get_run_service),
):
"""HTMX handler: discard a run."""
from ..services.auth_service import AuthService
auth_service = AuthService(get_redis_client())
ctx = auth_service.get_user_from_cookie(request)
if not ctx:
return HTMLResponse(
'<div class="text-red-400">Login required</div>',
status_code=401
)
success, error = await run_service.discard_run(run_id, ctx.actor_id, ctx.username)
if error:
return HTMLResponse(f'<div class="text-red-400">{error}</div>')
return HTMLResponse(
'<div class="text-green-400">Run discarded</div>'
'<script>setTimeout(() => window.location.href = "/runs", 1500);</script>'
)
@router.post("/{run_id}/publish")
async def publish_run(
run_id: str,
request: Request,
ctx: UserContext = Depends(require_auth),
run_service: RunService = Depends(get_run_service),
):
"""Publish run output to L2 and IPFS."""
from ..services.cache_service import CacheService
from ..dependencies import get_cache_manager
import database
run = await run_service.get_run(run_id)
if not run:
raise HTTPException(404, "Run not found")
# Check if run has output
output_cid = run.get("output_cid")
if not output_cid:
error = "Run has no output to publish"
if wants_html(request):
return HTMLResponse(f'<span class="text-red-400">{error}</span>')
raise HTTPException(400, error)
# Use cache service to publish the output
cache_service = CacheService(database, get_cache_manager())
ipfs_cid, error = await cache_service.publish_to_l2(
cid=output_cid,
actor_id=ctx.actor_id,
l2_server=ctx.l2_server,
auth_token=request.cookies.get("auth_token"),
)
if error:
if wants_html(request):
return HTMLResponse(f'<span class="text-red-400">{error}</span>')
raise HTTPException(400, error)
if wants_html(request):
return HTMLResponse(f'<span class="text-green-400">Shared: {ipfs_cid[:16]}...</span>')
return {"ipfs_cid": ipfs_cid, "output_cid": output_cid, "published": True}