""" Run management routes for L1 server. Handles run creation, status, listing, and detail views. """ import asyncio import json import logging from datetime import datetime, timezone from typing import List, Optional, Dict, Any from fastapi import APIRouter, Request, Depends, HTTPException from fastapi.responses import HTMLResponse from pydantic import BaseModel from artdag_common import render from artdag_common.middleware import wants_html, wants_json from ..dependencies import ( require_auth, get_templates, get_current_user, get_redis_client, get_cache_manager ) from ..services.auth_service import UserContext from ..services.run_service import RunService router = APIRouter() logger = logging.getLogger(__name__) RUNS_KEY_PREFIX = "artdag:run:" class RunRequest(BaseModel): recipe: str inputs: List[str] output_name: Optional[str] = None use_dag: bool = True dag_json: Optional[str] = None class RunStatus(BaseModel): run_id: str status: str recipe: str inputs: List[str] output_name: Optional[str] = None created_at: Optional[str] = None completed_at: Optional[str] = None output_hash: Optional[str] = None username: Optional[str] = None provenance_cid: Optional[str] = None celery_task_id: Optional[str] = None error: Optional[str] = None plan_id: Optional[str] = None plan_name: Optional[str] = None step_results: Optional[Dict[str, Any]] = None all_outputs: Optional[List[str]] = None effects_commit: Optional[str] = None effect_url: Optional[str] = None infrastructure: Optional[Dict[str, Any]] = None def get_run_service(): """Get run service instance.""" import database return RunService(database, get_redis_client(), get_cache_manager()) @router.post("", response_model=RunStatus) async def create_run( request: RunRequest, ctx: UserContext = Depends(require_auth), run_service: RunService = Depends(get_run_service), ): """Start a new rendering run. Checks cache before executing.""" run, error = await run_service.create_run( recipe=request.recipe, inputs=request.inputs, output_name=request.output_name, use_dag=request.use_dag, dag_json=request.dag_json, actor_id=ctx.actor_id, l2_server=ctx.l2_server, ) if error: raise HTTPException(400, error) return run @router.get("/{run_id}", response_model=RunStatus) async def get_run( run_id: str, run_service: RunService = Depends(get_run_service), ): """Get status of a run.""" run = await run_service.get_run(run_id) if not run: raise HTTPException(404, f"Run {run_id} not found") return run @router.delete("/{run_id}") async def discard_run( run_id: str, ctx: UserContext = Depends(require_auth), run_service: RunService = Depends(get_run_service), ): """Discard (delete) a run and its outputs.""" success, error = await run_service.discard_run(run_id, ctx.actor_id, ctx.username) if error: raise HTTPException(400 if "Cannot" in error else 404, error) return {"discarded": True, "run_id": run_id} @router.get("") async def list_runs( request: Request, offset: int = 0, limit: int = 20, run_service: RunService = Depends(get_run_service), ): """List all runs for the current user.""" from ..services.auth_service import AuthService auth_service = AuthService(get_redis_client()) ctx = auth_service.get_user_from_cookie(request) if not ctx: if wants_json(request): raise HTTPException(401, "Authentication required") from fastapi.responses import RedirectResponse return RedirectResponse(url="/auth", status_code=302) runs = await run_service.list_runs(ctx.actor_id, offset=offset, limit=limit) has_more = len(runs) >= limit if wants_json(request): return {"runs": runs, "offset": offset, "limit": limit, "has_more": has_more} templates = get_templates(request) return render(templates, "runs/list.html", request, runs=runs, user=ctx, offset=offset, limit=limit, has_more=has_more, active_tab="runs", ) @router.get("/{run_id}/detail") async def run_detail( run_id: str, request: Request, run_service: RunService = Depends(get_run_service), ): """Run detail page with tabs for plan/analysis/artifacts.""" from ..services.auth_service import AuthService auth_service = AuthService(get_redis_client()) ctx = auth_service.get_user_from_cookie(request) if not ctx: if wants_json(request): raise HTTPException(401, "Authentication required") from fastapi.responses import RedirectResponse return RedirectResponse(url="/auth", status_code=302) run = await run_service.get_run(run_id) if not run: raise HTTPException(404, f"Run {run_id} not found") # Get plan, artifacts, and analysis plan = await run_service.get_run_plan(run_id) artifacts = await run_service.get_run_artifacts(run_id) analysis = await run_service.get_run_analysis(run_id) # Build DAG elements for visualization dag_elements = [] if plan and plan.get("steps"): node_colors = { "input": "#3b82f6", "effect": "#8b5cf6", "analyze": "#ec4899", "transform": "#10b981", "output": "#f59e0b", } for i, step in enumerate(plan["steps"]): dag_elements.append({ "data": { "id": step.get("id", f"step-{i}"), "label": step.get("name", f"Step {i+1}"), "color": node_colors.get(step.get("type", "effect"), "#6b7280"), } }) # Add edges from inputs for inp in step.get("inputs", []): dag_elements.append({ "data": { "source": inp, "target": step.get("id", f"step-{i}"), } }) if wants_json(request): return { "run": run, "plan": plan, "artifacts": artifacts, "analysis": analysis, } templates = get_templates(request) return render(templates, "runs/detail.html", request, run=run, plan=plan, artifacts=artifacts, analysis=analysis, dag_elements=dag_elements, user=ctx, active_tab="runs", ) @router.get("/{run_id}/plan") async def run_plan( run_id: str, request: Request, run_service: RunService = Depends(get_run_service), ): """Plan visualization as interactive DAG.""" from ..services.auth_service import AuthService auth_service = AuthService(get_redis_client()) ctx = auth_service.get_user_from_cookie(request) if not ctx: raise HTTPException(401, "Authentication required") plan = await run_service.get_run_plan(run_id) if not plan: raise HTTPException(404, "Plan not found for this run") if wants_json(request): return plan # Build DAG elements dag_elements = [] node_colors = { "input": "#3b82f6", "effect": "#8b5cf6", "analyze": "#ec4899", "transform": "#10b981", "output": "#f59e0b", } for i, step in enumerate(plan.get("steps", [])): step_id = step.get("id", f"step-{i}") dag_elements.append({ "data": { "id": step_id, "label": step.get("name", f"Step {i+1}"), "color": node_colors.get(step.get("type", "effect"), "#6b7280"), } }) for inp in step.get("inputs", []): dag_elements.append({ "data": {"source": inp, "target": step_id} }) templates = get_templates(request) return render(templates, "runs/plan.html", request, run_id=run_id, plan=plan, dag_elements=dag_elements, user=ctx, active_tab="runs", ) @router.get("/{run_id}/artifacts") async def run_artifacts( run_id: str, request: Request, run_service: RunService = Depends(get_run_service), ): """Get artifacts list for a run.""" from ..services.auth_service import AuthService auth_service = AuthService(get_redis_client()) ctx = auth_service.get_user_from_cookie(request) if not ctx: raise HTTPException(401, "Authentication required") artifacts = await run_service.get_run_artifacts(run_id) if wants_json(request): return {"artifacts": artifacts} templates = get_templates(request) return render(templates, "runs/artifacts.html", request, run_id=run_id, artifacts=artifacts, user=ctx, active_tab="runs", ) @router.delete("/{run_id}/ui", response_class=HTMLResponse) async def ui_discard_run( run_id: str, request: Request, run_service: RunService = Depends(get_run_service), ): """HTMX handler: discard a run.""" from ..services.auth_service import AuthService auth_service = AuthService(get_redis_client()) ctx = auth_service.get_user_from_cookie(request) if not ctx: return HTMLResponse( '