""" Run management routes for L1 server. Handles run creation, status, listing, and detail views. """ import asyncio import json import logging from datetime import datetime, timezone from typing import List, Optional, Dict, Any from fastapi import APIRouter, Request, Depends, HTTPException from fastapi.responses import HTMLResponse from pydantic import BaseModel from artdag_common import render from artdag_common.middleware import wants_html, wants_json from artdag_common.middleware.auth import UserContext from ..dependencies import ( require_auth, get_templates, get_current_user, get_redis_client, get_cache_manager ) from ..services.run_service import RunService router = APIRouter() logger = logging.getLogger(__name__) def plan_to_sexp(plan: dict, recipe_name: str = None) -> str: """Convert a plan to S-expression format for display.""" if not plan or not plan.get("steps"): return ";; No plan available" lines = [] lines.append(f'(plan "{recipe_name or "unknown"}"') # Group nodes by type for cleaner output steps = plan.get("steps", []) for step in steps: step_id = step.get("id", "?") step_type = step.get("type", "EFFECT") inputs = step.get("inputs", []) config = step.get("config", {}) # Build the step S-expression if step_type == "SOURCE": if config.get("input"): # Variable input input_name = config.get("name", config.get("input", "input")) lines.append(f' (source :input "{input_name}")') elif config.get("asset"): # Fixed asset lines.append(f' (source {config.get("asset", step_id)})') else: lines.append(f' (source {step_id})') elif step_type == "EFFECT": effect_name = config.get("effect", step_id) if inputs: inp_str = " ".join(inputs) lines.append(f' (-> {inp_str} (effect {effect_name}))') else: lines.append(f' (effect {effect_name})') elif step_type == "SEQUENCE": if inputs: inp_str = " ".join(inputs) lines.append(f' (sequence {inp_str})') else: lines.append(f' (sequence)') else: # Generic node if inputs: inp_str = " ".join(inputs) lines.append(f' ({step_type.lower()} {inp_str})') else: lines.append(f' ({step_type.lower()} {step_id})') lines.append(')') return "\n".join(lines) RUNS_KEY_PREFIX = "artdag:run:" class RunRequest(BaseModel): recipe: str inputs: List[str] output_name: Optional[str] = None use_dag: bool = True dag_json: Optional[str] = None class RunStatus(BaseModel): run_id: str status: str recipe: Optional[str] = None inputs: Optional[List[str]] = None output_name: Optional[str] = None created_at: Optional[str] = None completed_at: Optional[str] = None output_cid: Optional[str] = None username: Optional[str] = None provenance_cid: Optional[str] = None celery_task_id: Optional[str] = None error: Optional[str] = None plan_id: Optional[str] = None plan_name: Optional[str] = None step_results: Optional[Dict[str, Any]] = None all_outputs: Optional[List[str]] = None effects_commit: Optional[str] = None effect_url: Optional[str] = None infrastructure: Optional[Dict[str, Any]] = None def get_run_service(): """Get run service instance.""" import database return RunService(database, get_redis_client(), get_cache_manager()) @router.post("", response_model=RunStatus) async def create_run( request: RunRequest, ctx: UserContext = Depends(require_auth), run_service: RunService = Depends(get_run_service), ): """Start a new rendering run. Checks cache before executing.""" run, error = await run_service.create_run( recipe=request.recipe, inputs=request.inputs, output_name=request.output_name, use_dag=request.use_dag, dag_json=request.dag_json, actor_id=ctx.actor_id, l2_server=ctx.l2_server, ) if error: raise HTTPException(400, error) return run @router.get("/{run_id}") async def get_run( request: Request, run_id: str, run_service: RunService = Depends(get_run_service), ): """Get status of a run.""" run = await run_service.get_run(run_id) if not run: raise HTTPException(404, f"Run {run_id} not found") # Only render HTML if browser explicitly requests it if wants_html(request): # Extract username from actor_id (format: @user@server) actor_id = run.get("actor_id", "") if actor_id and actor_id.startswith("@"): parts = actor_id[1:].split("@") run["username"] = parts[0] if parts else "Unknown" else: run["username"] = actor_id or "Unknown" # Helper to normalize input refs to just node IDs def normalize_inputs(inputs): """Convert input refs (may be dicts or strings) to list of node IDs.""" result = [] for inp in inputs: if isinstance(inp, dict): node_id = inp.get("node") or inp.get("input") or inp.get("id") else: node_id = inp if node_id: result.append(node_id) return result # Try to load the recipe to show the plan plan = None plan_sexp = None # Native S-expression if available recipe_ipfs_cid = None recipe_id = run.get("recipe") # Check for valid recipe ID (64-char hash, IPFS CIDv0 "Qm...", or CIDv1 "bafy...") is_valid_recipe_id = recipe_id and ( len(recipe_id) == 64 or recipe_id.startswith("Qm") or recipe_id.startswith("bafy") ) if is_valid_recipe_id: try: from ..services.recipe_service import RecipeService recipe_service = RecipeService(get_redis_client(), get_cache_manager()) recipe = await recipe_service.get_recipe(recipe_id) if recipe: # Use native S-expression if available (code is data!) if recipe.get("sexp"): plan_sexp = recipe["sexp"] # Get IPFS CID for the recipe recipe_ipfs_cid = recipe.get("ipfs_cid") # Build steps for DAG visualization dag = recipe.get("dag", {}) nodes = dag.get("nodes", []) steps = [] if isinstance(nodes, list): for node in nodes: node_id = node.get("id", "") steps.append({ "id": node_id, "name": node_id, "type": node.get("type", "EFFECT"), "status": "completed", # Run completed "inputs": normalize_inputs(node.get("inputs", [])), "config": node.get("config", {}), }) elif isinstance(nodes, dict): for node_id, node in nodes.items(): steps.append({ "id": node_id, "name": node_id, "type": node.get("type", "EFFECT"), "status": "completed", "inputs": normalize_inputs(node.get("inputs", [])), "config": node.get("config", {}), }) if steps: plan = {"steps": steps} run["total_steps"] = len(steps) run["executed"] = len(steps) if run.get("status") == "completed" else 0 # Use recipe name instead of hash for display (if not already set) if recipe.get("name") and not run.get("recipe_name"): run["recipe_name"] = recipe["name"] except Exception as e: logger.warning(f"Failed to load recipe for plan: {e}") # Helper to convert simple type to MIME type prefix for template def type_to_mime(simple_type: str) -> str: if simple_type == "video": return "video/mp4" elif simple_type == "image": return "image/jpeg" elif simple_type == "audio": return "audio/mpeg" return None # Build artifacts list from output and inputs artifacts = [] output_media_type = None if run.get("output_cid"): # Detect media type using magic bytes output_cid = run["output_cid"] media_type = None try: from ..services.run_service import detect_media_type cache_path = get_cache_manager().get_by_cid(output_cid) if cache_path and cache_path.exists(): simple_type = detect_media_type(cache_path) media_type = type_to_mime(simple_type) output_media_type = media_type except Exception: pass artifacts.append({ "cid": output_cid, "step_name": "Output", "media_type": media_type or "application/octet-stream", }) # Build inputs list with media types run_inputs = [] if run.get("inputs"): from ..services.run_service import detect_media_type cache_manager = get_cache_manager() for i, input_hash in enumerate(run["inputs"]): media_type = None try: cache_path = cache_manager.get_by_cid(input_hash) if cache_path and cache_path.exists(): simple_type = detect_media_type(cache_path) media_type = type_to_mime(simple_type) except Exception: pass run_inputs.append({ "cid": input_hash, "name": f"Input {i + 1}", "media_type": media_type, }) # Build DAG elements for visualization dag_elements = [] if plan and plan.get("steps"): node_colors = { "input": "#3b82f6", "effect": "#8b5cf6", "analyze": "#ec4899", "transform": "#10b981", "output": "#f59e0b", "SOURCE": "#3b82f6", "EFFECT": "#8b5cf6", "SEQUENCE": "#ec4899", } for i, step in enumerate(plan["steps"]): step_id = step.get("id", f"step-{i}") dag_elements.append({ "data": { "id": step_id, "label": step.get("name", f"Step {i+1}"), "color": node_colors.get(step.get("type", "effect"), "#6b7280"), } }) for inp in step.get("inputs", []): # Handle both string and dict inputs if isinstance(inp, dict): source = inp.get("node") or inp.get("input") or inp.get("id") else: source = inp if source: dag_elements.append({ "data": { "source": source, "target": step_id, } }) # Use native S-expression if available, otherwise generate from plan if not plan_sexp and plan: plan_sexp = plan_to_sexp(plan, run.get("recipe_name")) from ..dependencies import get_nav_counts user = await get_current_user(request) nav_counts = await get_nav_counts(user.actor_id if user else None) templates = get_templates(request) return render(templates, "runs/detail.html", request, run=run, plan=plan, artifacts=artifacts, run_inputs=run_inputs, dag_elements=dag_elements, output_media_type=output_media_type, plan_sexp=plan_sexp, recipe_ipfs_cid=recipe_ipfs_cid, nav_counts=nav_counts, active_tab="runs", ) # Default to JSON for API clients return run @router.delete("/{run_id}") async def discard_run( run_id: str, ctx: UserContext = Depends(require_auth), run_service: RunService = Depends(get_run_service), ): """Discard (delete) a run and its outputs.""" success, error = await run_service.discard_run(run_id, ctx.actor_id, ctx.username) if error: raise HTTPException(400 if "Cannot" in error else 404, error) return {"discarded": True, "run_id": run_id} @router.get("") async def list_runs( request: Request, offset: int = 0, limit: int = 20, run_service: RunService = Depends(get_run_service), ctx: UserContext = Depends(require_auth), ): """List all runs for the current user.""" runs = await run_service.list_runs(ctx.actor_id, offset=offset, limit=limit) has_more = len(runs) >= limit if wants_json(request): return {"runs": runs, "offset": offset, "limit": limit, "has_more": has_more} # Add media info for inline previews (only for HTML) cache_manager = get_cache_manager() from ..services.run_service import detect_media_type def type_to_mime(simple_type: str) -> str: if simple_type == "video": return "video/mp4" elif simple_type == "image": return "image/jpeg" elif simple_type == "audio": return "audio/mpeg" return None for run in runs: # Add output media info if run.get("output_cid"): try: cache_path = cache_manager.get_by_cid(run["output_cid"]) if cache_path and cache_path.exists(): simple_type = detect_media_type(cache_path) run["output_media_type"] = type_to_mime(simple_type) except Exception: pass # Add input media info (first 3 inputs) input_previews = [] inputs = run.get("inputs", []) if isinstance(inputs, list): for input_hash in inputs[:3]: preview = {"cid": input_hash, "media_type": None} try: cache_path = cache_manager.get_by_cid(input_hash) if cache_path and cache_path.exists(): simple_type = detect_media_type(cache_path) preview["media_type"] = type_to_mime(simple_type) except Exception: pass input_previews.append(preview) run["input_previews"] = input_previews from ..dependencies import get_nav_counts nav_counts = await get_nav_counts(ctx.actor_id) templates = get_templates(request) return render(templates, "runs/list.html", request, runs=runs, user=ctx, nav_counts=nav_counts, offset=offset, limit=limit, has_more=has_more, active_tab="runs", ) @router.get("/{run_id}/detail") async def run_detail( run_id: str, request: Request, run_service: RunService = Depends(get_run_service), ctx: UserContext = Depends(require_auth), ): """Run detail page with tabs for plan/analysis/artifacts.""" run = await run_service.get_run(run_id) if not run: raise HTTPException(404, f"Run {run_id} not found") # Get plan, artifacts, and analysis plan = await run_service.get_run_plan(run_id) artifacts = await run_service.get_run_artifacts(run_id) analysis = await run_service.get_run_analysis(run_id) # Build DAG elements for visualization dag_elements = [] if plan and plan.get("steps"): node_colors = { "input": "#3b82f6", "effect": "#8b5cf6", "analyze": "#ec4899", "transform": "#10b981", "output": "#f59e0b", "SOURCE": "#3b82f6", "EFFECT": "#8b5cf6", "SEQUENCE": "#ec4899", } for i, step in enumerate(plan["steps"]): step_id = step.get("id", f"step-{i}") dag_elements.append({ "data": { "id": step_id, "label": step.get("name", f"Step {i+1}"), "color": node_colors.get(step.get("type", "effect"), "#6b7280"), } }) # Add edges from inputs (handle both string and dict formats) for inp in step.get("inputs", []): if isinstance(inp, dict): source = inp.get("node") or inp.get("input") or inp.get("id") else: source = inp if source: dag_elements.append({ "data": { "source": source, "target": step_id, } }) if wants_json(request): return { "run": run, "plan": plan, "artifacts": artifacts, "analysis": analysis, } templates = get_templates(request) return render(templates, "runs/detail.html", request, run=run, plan=plan, artifacts=artifacts, analysis=analysis, dag_elements=dag_elements, user=ctx, active_tab="runs", ) @router.get("/{run_id}/plan") async def run_plan( run_id: str, request: Request, run_service: RunService = Depends(get_run_service), ctx: UserContext = Depends(require_auth), ): """Plan visualization as interactive DAG.""" plan = await run_service.get_run_plan(run_id) if not plan: raise HTTPException(404, "Plan not found for this run") if wants_json(request): return plan # Build DAG elements dag_elements = [] node_colors = { "input": "#3b82f6", "effect": "#8b5cf6", "analyze": "#ec4899", "transform": "#10b981", "output": "#f59e0b", "SOURCE": "#3b82f6", "EFFECT": "#8b5cf6", "SEQUENCE": "#ec4899", } for i, step in enumerate(plan.get("steps", [])): step_id = step.get("id", f"step-{i}") dag_elements.append({ "data": { "id": step_id, "label": step.get("name", f"Step {i+1}"), "color": node_colors.get(step.get("type", "effect"), "#6b7280"), } }) for inp in step.get("inputs", []): # Handle both string and dict formats if isinstance(inp, dict): source = inp.get("node") or inp.get("input") or inp.get("id") else: source = inp if source: dag_elements.append({ "data": {"source": source, "target": step_id} }) templates = get_templates(request) return render(templates, "runs/plan.html", request, run_id=run_id, plan=plan, dag_elements=dag_elements, user=ctx, active_tab="runs", ) @router.get("/{run_id}/artifacts") async def run_artifacts( run_id: str, request: Request, run_service: RunService = Depends(get_run_service), ctx: UserContext = Depends(require_auth), ): """Get artifacts list for a run.""" artifacts = await run_service.get_run_artifacts(run_id) if wants_json(request): return {"artifacts": artifacts} templates = get_templates(request) return render(templates, "runs/artifacts.html", request, run_id=run_id, artifacts=artifacts, user=ctx, active_tab="runs", ) @router.get("/{run_id}/plan/node/{cache_id}", response_class=HTMLResponse) async def plan_node_detail( run_id: str, cache_id: str, request: Request, run_service: RunService = Depends(get_run_service), ): """HTMX partial: Get plan node detail by cache_id.""" from artdag_common import render_fragment ctx = await get_current_user(request) if not ctx: return HTMLResponse('

Login required

', status_code=401) run = await run_service.get_run(run_id) if not run: return HTMLResponse('

Run not found

', status_code=404) plan = await run_service.get_run_plan(run_id) if not plan: return HTMLResponse('

Plan not found

') # Build lookups steps_by_cache_id = {} steps_by_step_id = {} for s in plan.get("steps", []): if s.get("cache_id"): steps_by_cache_id[s["cache_id"]] = s if s.get("step_id"): steps_by_step_id[s["step_id"]] = s step = steps_by_cache_id.get(cache_id) if not step: return HTMLResponse(f'

Step not found

') cache_manager = get_cache_manager() # Node colors node_colors = { "SOURCE": "#3b82f6", "EFFECT": "#22c55e", "OUTPUT": "#a855f7", "ANALYSIS": "#f59e0b", "_LIST": "#6366f1", "default": "#6b7280" } node_color = node_colors.get(step.get("node_type", "EFFECT"), node_colors["default"]) # Check cache status has_cached = cache_manager.has_content(cache_id) if cache_id else False # Determine output media type output_media_type = None output_preview = False if has_cached: cache_path = cache_manager.get_content_path(cache_id) if cache_path: output_media_type = run_service.detect_media_type(cache_path) output_preview = output_media_type in ('video', 'image', 'audio') # Check for IPFS CID ipfs_cid = None if run.step_results: res = run.step_results.get(step.get("step_id")) if isinstance(res, dict) and res.get("cid"): ipfs_cid = res["cid"] # Build input previews inputs = [] for inp_step_id in step.get("input_steps", []): inp_step = steps_by_step_id.get(inp_step_id) if inp_step: inp_cache_id = inp_step.get("cache_id", "") inp_has_cached = cache_manager.has_content(inp_cache_id) if inp_cache_id else False inp_media_type = None if inp_has_cached: inp_path = cache_manager.get_content_path(inp_cache_id) if inp_path: inp_media_type = run_service.detect_media_type(inp_path) inputs.append({ "name": inp_step.get("name", inp_step_id[:12]), "cache_id": inp_cache_id, "media_type": inp_media_type, "has_cached": inp_has_cached, }) status = "cached" if (has_cached or ipfs_cid) else ("completed" if run.status == "completed" else "pending") templates = get_templates(request) return HTMLResponse(render_fragment(templates, "runs/plan_node.html", step=step, cache_id=cache_id, node_color=node_color, status=status, has_cached=has_cached, output_preview=output_preview, output_media_type=output_media_type, ipfs_cid=ipfs_cid, ipfs_gateway="https://ipfs.io/ipfs", inputs=inputs, config=step.get("config", {}), )) @router.delete("/{run_id}/ui", response_class=HTMLResponse) async def ui_discard_run( run_id: str, request: Request, run_service: RunService = Depends(get_run_service), ): """HTMX handler: discard a run.""" ctx = await get_current_user(request) if not ctx: return HTMLResponse( '
Login required
', status_code=401 ) success, error = await run_service.discard_run(run_id, ctx.actor_id, ctx.username) if error: return HTMLResponse(f'
{error}
') return HTMLResponse( '
Run discarded
' '' ) @router.post("/{run_id}/publish") async def publish_run( run_id: str, request: Request, ctx: UserContext = Depends(require_auth), run_service: RunService = Depends(get_run_service), ): """Publish run output to L2 and IPFS.""" from ..services.cache_service import CacheService from ..dependencies import get_cache_manager import database run = await run_service.get_run(run_id) if not run: raise HTTPException(404, "Run not found") # Check if run has output output_cid = run.get("output_cid") if not output_cid: error = "Run has no output to publish" if wants_html(request): return HTMLResponse(f'{error}') raise HTTPException(400, error) # Use cache service to publish the output cache_service = CacheService(database, get_cache_manager()) ipfs_cid, error = await cache_service.publish_to_l2( cid=output_cid, actor_id=ctx.actor_id, l2_server=ctx.l2_server, auth_token=request.cookies.get("auth_token"), ) if error: if wants_html(request): return HTMLResponse(f'{error}') raise HTTPException(400, error) if wants_html(request): return HTMLResponse(f'Shared: {ipfs_cid[:16]}...') return {"ipfs_cid": ipfs_cid, "output_cid": output_cid, "published": True}