1169 lines
40 KiB
Python
1169 lines
40 KiB
Python
"""
|
|
Run management routes for L1 server.
|
|
|
|
Handles run creation, status, listing, and detail views.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import List, Optional, Dict, Any
|
|
|
|
from fastapi import APIRouter, Request, Depends, HTTPException
|
|
from fastapi.responses import HTMLResponse
|
|
from pydantic import BaseModel
|
|
|
|
from artdag_common import render
|
|
from artdag_common.middleware import wants_html, wants_json
|
|
from artdag_common.middleware.auth import UserContext
|
|
|
|
from ..dependencies import (
|
|
require_auth, get_templates, get_current_user,
|
|
get_redis_client, get_cache_manager
|
|
)
|
|
from ..services.run_service import RunService
|
|
|
|
router = APIRouter()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def plan_to_sexp(plan: dict, recipe_name: str = None) -> str:
|
|
"""Convert a plan to S-expression format for display."""
|
|
if not plan or not plan.get("steps"):
|
|
return ";; No plan available"
|
|
|
|
lines = []
|
|
lines.append(f'(plan "{recipe_name or "unknown"}"')
|
|
|
|
# Group nodes by type for cleaner output
|
|
steps = plan.get("steps", [])
|
|
|
|
for step in steps:
|
|
step_id = step.get("id", "?")
|
|
step_type = step.get("type", "EFFECT")
|
|
inputs = step.get("inputs", [])
|
|
config = step.get("config", {})
|
|
|
|
# Build the step S-expression
|
|
if step_type == "SOURCE":
|
|
if config.get("input"):
|
|
# Variable input
|
|
input_name = config.get("name", config.get("input", "input"))
|
|
lines.append(f' (source :input "{input_name}")')
|
|
elif config.get("asset"):
|
|
# Fixed asset
|
|
lines.append(f' (source {config.get("asset", step_id)})')
|
|
else:
|
|
lines.append(f' (source {step_id})')
|
|
elif step_type == "EFFECT":
|
|
effect_name = config.get("effect", step_id)
|
|
if inputs:
|
|
inp_str = " ".join(inputs)
|
|
lines.append(f' (-> {inp_str} (effect {effect_name}))')
|
|
else:
|
|
lines.append(f' (effect {effect_name})')
|
|
elif step_type == "SEQUENCE":
|
|
if inputs:
|
|
inp_str = " ".join(inputs)
|
|
lines.append(f' (sequence {inp_str})')
|
|
else:
|
|
lines.append(f' (sequence)')
|
|
else:
|
|
# Generic node
|
|
if inputs:
|
|
inp_str = " ".join(inputs)
|
|
lines.append(f' ({step_type.lower()} {inp_str})')
|
|
else:
|
|
lines.append(f' ({step_type.lower()} {step_id})')
|
|
|
|
lines.append(')')
|
|
return "\n".join(lines)
|
|
|
|
RUNS_KEY_PREFIX = "artdag:run:"
|
|
|
|
|
|
class RunRequest(BaseModel):
|
|
recipe: str
|
|
inputs: List[str]
|
|
output_name: Optional[str] = None
|
|
use_dag: bool = True
|
|
dag_json: Optional[str] = None
|
|
|
|
|
|
class RunStatus(BaseModel):
|
|
run_id: str
|
|
status: str
|
|
recipe: Optional[str] = None
|
|
inputs: Optional[List[str]] = None
|
|
output_name: Optional[str] = None
|
|
created_at: Optional[str] = None
|
|
completed_at: Optional[str] = None
|
|
output_cid: Optional[str] = None
|
|
username: Optional[str] = None
|
|
provenance_cid: Optional[str] = None
|
|
celery_task_id: Optional[str] = None
|
|
error: Optional[str] = None
|
|
plan_id: Optional[str] = None
|
|
plan_name: Optional[str] = None
|
|
step_results: Optional[Dict[str, Any]] = None
|
|
all_outputs: Optional[List[str]] = None
|
|
effects_commit: Optional[str] = None
|
|
effect_url: Optional[str] = None
|
|
infrastructure: Optional[Dict[str, Any]] = None
|
|
|
|
|
|
class StreamRequest(BaseModel):
|
|
"""Request to run a streaming recipe."""
|
|
recipe_sexp: str # The recipe S-expression content
|
|
output_name: str = "output.mp4"
|
|
duration: Optional[float] = None # Duration in seconds
|
|
fps: Optional[float] = None # FPS override
|
|
sources_sexp: Optional[str] = None # Sources config S-expression
|
|
audio_sexp: Optional[str] = None # Audio config S-expression
|
|
|
|
|
|
def get_run_service():
|
|
"""Get run service instance."""
|
|
import database
|
|
return RunService(database, get_redis_client(), get_cache_manager())
|
|
|
|
|
|
@router.post("", response_model=RunStatus)
|
|
async def create_run(
|
|
request: RunRequest,
|
|
ctx: UserContext = Depends(require_auth),
|
|
run_service: RunService = Depends(get_run_service),
|
|
):
|
|
"""Start a new rendering run. Checks cache before executing."""
|
|
run, error = await run_service.create_run(
|
|
recipe=request.recipe,
|
|
inputs=request.inputs,
|
|
output_name=request.output_name,
|
|
use_dag=request.use_dag,
|
|
dag_json=request.dag_json,
|
|
actor_id=ctx.actor_id,
|
|
l2_server=ctx.l2_server,
|
|
)
|
|
|
|
if error:
|
|
raise HTTPException(400, error)
|
|
|
|
return run
|
|
|
|
|
|
@router.post("/stream", response_model=RunStatus)
|
|
async def create_stream_run(
|
|
request: StreamRequest,
|
|
req: Request,
|
|
ctx: UserContext = Depends(get_current_user),
|
|
):
|
|
"""Start a streaming video render.
|
|
|
|
The recipe_sexp should be a complete streaming recipe with
|
|
(stream ...) form defining the pipeline.
|
|
|
|
Assets can be referenced by CID or friendly name in the recipe.
|
|
Requires authentication OR admin token in X-Admin-Token header.
|
|
"""
|
|
import uuid
|
|
import tempfile
|
|
import os
|
|
from pathlib import Path
|
|
import database
|
|
from tasks.streaming import run_stream
|
|
|
|
# Check for admin token if no user auth
|
|
admin_token = os.environ.get("ADMIN_TOKEN")
|
|
request_token = req.headers.get("X-Admin-Token")
|
|
admin_actor_id = req.headers.get("X-Actor-Id", "admin@local")
|
|
|
|
if not ctx and (not admin_token or request_token != admin_token):
|
|
raise HTTPException(401, "Authentication required")
|
|
|
|
# Use context actor_id or admin actor_id
|
|
actor_id = ctx.actor_id if ctx else admin_actor_id
|
|
|
|
# Generate run ID
|
|
run_id = str(uuid.uuid4())
|
|
|
|
# Store recipe in cache so it appears on /recipes page
|
|
recipe_id = None
|
|
try:
|
|
cache_manager = get_cache_manager()
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".sexp", mode="w") as tmp:
|
|
tmp.write(request.recipe_sexp)
|
|
tmp_path = Path(tmp.name)
|
|
|
|
cached, ipfs_cid = cache_manager.put(tmp_path, node_type="recipe", move=True)
|
|
recipe_id = cached.cid
|
|
|
|
# Extract recipe name from S-expression (look for (stream "name" ...) pattern)
|
|
import re
|
|
name_match = re.search(r'\(stream\s+"([^"]+)"', request.recipe_sexp)
|
|
recipe_name = name_match.group(1) if name_match else f"stream-{run_id[:8]}"
|
|
|
|
# Track ownership in item_types
|
|
await database.save_item_metadata(
|
|
cid=recipe_id,
|
|
actor_id=actor_id,
|
|
item_type="recipe",
|
|
description=f"Streaming recipe: {recipe_name}",
|
|
filename=f"{recipe_name}.sexp",
|
|
)
|
|
|
|
# Assign friendly name
|
|
from ..services.naming_service import get_naming_service
|
|
naming = get_naming_service()
|
|
await naming.assign_name(
|
|
cid=recipe_id,
|
|
actor_id=actor_id,
|
|
item_type="recipe",
|
|
display_name=recipe_name,
|
|
)
|
|
|
|
logger.info(f"Stored streaming recipe {recipe_id[:16]}... as '{recipe_name}'")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to store recipe in cache: {e}")
|
|
# Continue anyway - run will still work, just won't appear in /recipes
|
|
|
|
# Submit Celery task to GPU queue for hardware-accelerated rendering
|
|
task = run_stream.apply_async(
|
|
kwargs=dict(
|
|
run_id=run_id,
|
|
recipe_sexp=request.recipe_sexp,
|
|
output_name=request.output_name,
|
|
duration=request.duration,
|
|
fps=request.fps,
|
|
actor_id=actor_id,
|
|
sources_sexp=request.sources_sexp,
|
|
audio_sexp=request.audio_sexp,
|
|
),
|
|
queue='gpu',
|
|
)
|
|
|
|
# Store in database for durability
|
|
pending = await database.create_pending_run(
|
|
run_id=run_id,
|
|
celery_task_id=task.id,
|
|
recipe=recipe_id or "streaming", # Use recipe CID if available
|
|
inputs=[], # Streaming recipes don't have traditional inputs
|
|
actor_id=actor_id,
|
|
dag_json=request.recipe_sexp, # Store recipe content for viewing
|
|
output_name=request.output_name,
|
|
)
|
|
|
|
logger.info(f"Started stream run {run_id} with task {task.id}")
|
|
|
|
return RunStatus(
|
|
run_id=run_id,
|
|
status="pending",
|
|
recipe=recipe_id or "streaming",
|
|
created_at=pending.get("created_at"),
|
|
celery_task_id=task.id,
|
|
)
|
|
|
|
|
|
@router.get("/{run_id}")
|
|
async def get_run(
|
|
request: Request,
|
|
run_id: str,
|
|
run_service: RunService = Depends(get_run_service),
|
|
):
|
|
"""Get status of a run."""
|
|
run = await run_service.get_run(run_id)
|
|
if not run:
|
|
raise HTTPException(404, f"Run {run_id} not found")
|
|
|
|
# Only render HTML if browser explicitly requests it
|
|
if wants_html(request):
|
|
# Extract username from actor_id (format: @user@server)
|
|
actor_id = run.get("actor_id", "")
|
|
if actor_id and actor_id.startswith("@"):
|
|
parts = actor_id[1:].split("@")
|
|
run["username"] = parts[0] if parts else "Unknown"
|
|
else:
|
|
run["username"] = actor_id or "Unknown"
|
|
|
|
# Helper to normalize input refs to just node IDs
|
|
def normalize_inputs(inputs):
|
|
"""Convert input refs (may be dicts or strings) to list of node IDs."""
|
|
result = []
|
|
for inp in inputs:
|
|
if isinstance(inp, dict):
|
|
node_id = inp.get("node") or inp.get("input") or inp.get("id")
|
|
else:
|
|
node_id = inp
|
|
if node_id:
|
|
result.append(node_id)
|
|
return result
|
|
|
|
# Try to load the recipe to show the plan
|
|
plan = None
|
|
plan_sexp = None # Native S-expression if available
|
|
recipe_ipfs_cid = None
|
|
recipe_id = run.get("recipe")
|
|
# Check for valid recipe ID (64-char hash, IPFS CIDv0 "Qm...", or CIDv1 "bafy...")
|
|
is_valid_recipe_id = recipe_id and (
|
|
len(recipe_id) == 64 or
|
|
recipe_id.startswith("Qm") or
|
|
recipe_id.startswith("bafy")
|
|
)
|
|
if is_valid_recipe_id:
|
|
try:
|
|
from ..services.recipe_service import RecipeService
|
|
recipe_service = RecipeService(get_redis_client(), get_cache_manager())
|
|
recipe = await recipe_service.get_recipe(recipe_id)
|
|
if recipe:
|
|
# Use native S-expression if available (code is data!)
|
|
if recipe.get("sexp"):
|
|
plan_sexp = recipe["sexp"]
|
|
# Get IPFS CID for the recipe
|
|
recipe_ipfs_cid = recipe.get("ipfs_cid")
|
|
|
|
# Build steps for DAG visualization
|
|
dag = recipe.get("dag", {})
|
|
nodes = dag.get("nodes", [])
|
|
|
|
steps = []
|
|
if isinstance(nodes, list):
|
|
for node in nodes:
|
|
node_id = node.get("id", "")
|
|
steps.append({
|
|
"id": node_id,
|
|
"name": node_id,
|
|
"type": node.get("type", "EFFECT"),
|
|
"status": "completed", # Run completed
|
|
"inputs": normalize_inputs(node.get("inputs", [])),
|
|
"config": node.get("config", {}),
|
|
})
|
|
elif isinstance(nodes, dict):
|
|
for node_id, node in nodes.items():
|
|
steps.append({
|
|
"id": node_id,
|
|
"name": node_id,
|
|
"type": node.get("type", "EFFECT"),
|
|
"status": "completed",
|
|
"inputs": normalize_inputs(node.get("inputs", [])),
|
|
"config": node.get("config", {}),
|
|
})
|
|
|
|
if steps:
|
|
plan = {"steps": steps}
|
|
run["total_steps"] = len(steps)
|
|
run["executed"] = len(steps) if run.get("status") == "completed" else 0
|
|
|
|
# Use recipe name instead of hash for display (if not already set)
|
|
if recipe.get("name") and not run.get("recipe_name"):
|
|
run["recipe_name"] = recipe["name"]
|
|
except Exception as e:
|
|
logger.warning(f"Failed to load recipe for plan: {e}")
|
|
|
|
# Handle streaming runs - detect by recipe_sexp content or legacy "streaming" marker
|
|
recipe_sexp_content = run.get("recipe_sexp")
|
|
is_streaming = run.get("recipe") == "streaming" # Legacy marker
|
|
if not is_streaming and recipe_sexp_content:
|
|
# Check if content starts with (stream after skipping comments
|
|
for line in recipe_sexp_content.split('\n'):
|
|
stripped = line.strip()
|
|
if not stripped or stripped.startswith(';'):
|
|
continue
|
|
is_streaming = stripped.startswith('(stream')
|
|
break
|
|
if is_streaming and recipe_sexp_content and not plan:
|
|
plan_sexp = recipe_sexp_content
|
|
plan = {
|
|
"steps": [{
|
|
"id": "stream",
|
|
"type": "STREAM",
|
|
"name": "Streaming Recipe",
|
|
"inputs": [],
|
|
"config": {},
|
|
"status": "completed" if run.get("status") == "completed" else "pending",
|
|
}]
|
|
}
|
|
run["total_steps"] = 1
|
|
run["executed"] = 1 if run.get("status") == "completed" else 0
|
|
|
|
# Helper to convert simple type to MIME type prefix for template
|
|
def type_to_mime(simple_type: str) -> str:
|
|
if simple_type == "video":
|
|
return "video/mp4"
|
|
elif simple_type == "image":
|
|
return "image/jpeg"
|
|
elif simple_type == "audio":
|
|
return "audio/mpeg"
|
|
return None
|
|
|
|
# Build artifacts list from output and inputs
|
|
artifacts = []
|
|
output_media_type = None
|
|
if run.get("output_cid"):
|
|
# Detect media type using magic bytes, fall back to database item_type
|
|
output_cid = run["output_cid"]
|
|
media_type = None
|
|
try:
|
|
from ..services.run_service import detect_media_type
|
|
cache_path = get_cache_manager().get_by_cid(output_cid)
|
|
if cache_path and cache_path.exists():
|
|
simple_type = detect_media_type(cache_path)
|
|
media_type = type_to_mime(simple_type)
|
|
output_media_type = media_type
|
|
except Exception:
|
|
pass
|
|
# Fall back to database item_type if local detection failed
|
|
if not media_type:
|
|
try:
|
|
import database
|
|
item_types = await database.get_item_types(output_cid, run.get("actor_id"))
|
|
if item_types:
|
|
media_type = type_to_mime(item_types[0].get("type"))
|
|
output_media_type = media_type
|
|
except Exception:
|
|
pass
|
|
artifacts.append({
|
|
"cid": output_cid,
|
|
"step_name": "Output",
|
|
"media_type": media_type or "application/octet-stream",
|
|
})
|
|
|
|
# Build inputs list with media types
|
|
run_inputs = []
|
|
if run.get("inputs"):
|
|
from ..services.run_service import detect_media_type
|
|
cache_manager = get_cache_manager()
|
|
for i, input_hash in enumerate(run["inputs"]):
|
|
media_type = None
|
|
try:
|
|
cache_path = cache_manager.get_by_cid(input_hash)
|
|
if cache_path and cache_path.exists():
|
|
simple_type = detect_media_type(cache_path)
|
|
media_type = type_to_mime(simple_type)
|
|
except Exception:
|
|
pass
|
|
run_inputs.append({
|
|
"cid": input_hash,
|
|
"name": f"Input {i + 1}",
|
|
"media_type": media_type,
|
|
})
|
|
|
|
# Build DAG elements for visualization
|
|
dag_elements = []
|
|
if plan and plan.get("steps"):
|
|
node_colors = {
|
|
"input": "#3b82f6",
|
|
"effect": "#8b5cf6",
|
|
"analyze": "#ec4899",
|
|
"transform": "#10b981",
|
|
"output": "#f59e0b",
|
|
"SOURCE": "#3b82f6",
|
|
"EFFECT": "#8b5cf6",
|
|
"SEQUENCE": "#ec4899",
|
|
}
|
|
for i, step in enumerate(plan["steps"]):
|
|
step_id = step.get("id", f"step-{i}")
|
|
dag_elements.append({
|
|
"data": {
|
|
"id": step_id,
|
|
"label": step.get("name", f"Step {i+1}"),
|
|
"color": node_colors.get(step.get("type", "effect"), "#6b7280"),
|
|
}
|
|
})
|
|
for inp in step.get("inputs", []):
|
|
# Handle both string and dict inputs
|
|
if isinstance(inp, dict):
|
|
source = inp.get("node") or inp.get("input") or inp.get("id")
|
|
else:
|
|
source = inp
|
|
if source:
|
|
dag_elements.append({
|
|
"data": {
|
|
"source": source,
|
|
"target": step_id,
|
|
}
|
|
})
|
|
|
|
# Use native S-expression if available, otherwise generate from plan
|
|
if not plan_sexp and plan:
|
|
plan_sexp = plan_to_sexp(plan, run.get("recipe_name"))
|
|
|
|
from ..dependencies import get_nav_counts
|
|
user = await get_current_user(request)
|
|
nav_counts = await get_nav_counts(user.actor_id if user else None)
|
|
|
|
templates = get_templates(request)
|
|
return render(templates, "runs/detail.html", request,
|
|
run=run,
|
|
plan=plan,
|
|
artifacts=artifacts,
|
|
run_inputs=run_inputs,
|
|
dag_elements=dag_elements,
|
|
output_media_type=output_media_type,
|
|
plan_sexp=plan_sexp,
|
|
recipe_ipfs_cid=recipe_ipfs_cid,
|
|
nav_counts=nav_counts,
|
|
active_tab="runs",
|
|
)
|
|
|
|
# Default to JSON for API clients
|
|
return run
|
|
|
|
|
|
@router.delete("/{run_id}")
|
|
async def discard_run(
|
|
run_id: str,
|
|
ctx: UserContext = Depends(require_auth),
|
|
run_service: RunService = Depends(get_run_service),
|
|
):
|
|
"""Discard (delete) a run and its outputs."""
|
|
success, error = await run_service.discard_run(run_id, ctx.actor_id, ctx.username)
|
|
if error:
|
|
raise HTTPException(400 if "Cannot" in error else 404, error)
|
|
return {"discarded": True, "run_id": run_id}
|
|
|
|
|
|
@router.get("")
|
|
async def list_runs(
|
|
request: Request,
|
|
offset: int = 0,
|
|
limit: int = 20,
|
|
run_service: RunService = Depends(get_run_service),
|
|
ctx: UserContext = Depends(get_current_user),
|
|
):
|
|
"""List all runs for the current user."""
|
|
import os
|
|
|
|
# Check for admin token if no user auth
|
|
admin_token = os.environ.get("ADMIN_TOKEN")
|
|
request_token = request.headers.get("X-Admin-Token")
|
|
admin_actor_id = request.headers.get("X-Actor-Id")
|
|
|
|
if not ctx and (not admin_token or request_token != admin_token):
|
|
raise HTTPException(401, "Authentication required")
|
|
|
|
# Use context actor_id or admin actor_id
|
|
actor_id = ctx.actor_id if ctx else admin_actor_id
|
|
if not actor_id:
|
|
raise HTTPException(400, "X-Actor-Id header required with admin token")
|
|
|
|
runs = await run_service.list_runs(actor_id, offset=offset, limit=limit)
|
|
has_more = len(runs) >= limit
|
|
|
|
if wants_json(request):
|
|
return {"runs": runs, "offset": offset, "limit": limit, "has_more": has_more}
|
|
|
|
# Add media info for inline previews (only for HTML)
|
|
cache_manager = get_cache_manager()
|
|
from ..services.run_service import detect_media_type
|
|
|
|
def type_to_mime(simple_type: str) -> str:
|
|
if simple_type == "video":
|
|
return "video/mp4"
|
|
elif simple_type == "image":
|
|
return "image/jpeg"
|
|
elif simple_type == "audio":
|
|
return "audio/mpeg"
|
|
return None
|
|
|
|
for run in runs:
|
|
# Add output media info
|
|
if run.get("output_cid"):
|
|
try:
|
|
cache_path = cache_manager.get_by_cid(run["output_cid"])
|
|
if cache_path and cache_path.exists():
|
|
simple_type = detect_media_type(cache_path)
|
|
run["output_media_type"] = type_to_mime(simple_type)
|
|
except Exception:
|
|
pass
|
|
|
|
# Add input media info (first 3 inputs)
|
|
input_previews = []
|
|
inputs = run.get("inputs", [])
|
|
if isinstance(inputs, list):
|
|
for input_hash in inputs[:3]:
|
|
preview = {"cid": input_hash, "media_type": None}
|
|
try:
|
|
cache_path = cache_manager.get_by_cid(input_hash)
|
|
if cache_path and cache_path.exists():
|
|
simple_type = detect_media_type(cache_path)
|
|
preview["media_type"] = type_to_mime(simple_type)
|
|
except Exception:
|
|
pass
|
|
input_previews.append(preview)
|
|
run["input_previews"] = input_previews
|
|
|
|
from ..dependencies import get_nav_counts
|
|
nav_counts = await get_nav_counts(actor_id)
|
|
|
|
templates = get_templates(request)
|
|
return render(templates, "runs/list.html", request,
|
|
runs=runs,
|
|
user=ctx or {"actor_id": actor_id},
|
|
nav_counts=nav_counts,
|
|
offset=offset,
|
|
limit=limit,
|
|
has_more=has_more,
|
|
active_tab="runs",
|
|
)
|
|
|
|
|
|
@router.get("/{run_id}/detail")
|
|
async def run_detail(
|
|
run_id: str,
|
|
request: Request,
|
|
run_service: RunService = Depends(get_run_service),
|
|
ctx: UserContext = Depends(require_auth),
|
|
):
|
|
"""Run detail page with tabs for plan/analysis/artifacts."""
|
|
run = await run_service.get_run(run_id)
|
|
if not run:
|
|
raise HTTPException(404, f"Run {run_id} not found")
|
|
|
|
# Get plan, artifacts, and analysis
|
|
plan = await run_service.get_run_plan(run_id)
|
|
artifacts = await run_service.get_run_artifacts(run_id)
|
|
analysis = await run_service.get_run_analysis(run_id)
|
|
|
|
# Build DAG elements for visualization
|
|
dag_elements = []
|
|
if plan and plan.get("steps"):
|
|
node_colors = {
|
|
"input": "#3b82f6",
|
|
"effect": "#8b5cf6",
|
|
"analyze": "#ec4899",
|
|
"transform": "#10b981",
|
|
"output": "#f59e0b",
|
|
"SOURCE": "#3b82f6",
|
|
"EFFECT": "#8b5cf6",
|
|
"SEQUENCE": "#ec4899",
|
|
}
|
|
for i, step in enumerate(plan["steps"]):
|
|
step_id = step.get("id", f"step-{i}")
|
|
dag_elements.append({
|
|
"data": {
|
|
"id": step_id,
|
|
"label": step.get("name", f"Step {i+1}"),
|
|
"color": node_colors.get(step.get("type", "effect"), "#6b7280"),
|
|
}
|
|
})
|
|
# Add edges from inputs (handle both string and dict formats)
|
|
for inp in step.get("inputs", []):
|
|
if isinstance(inp, dict):
|
|
source = inp.get("node") or inp.get("input") or inp.get("id")
|
|
else:
|
|
source = inp
|
|
if source:
|
|
dag_elements.append({
|
|
"data": {
|
|
"source": source,
|
|
"target": step_id,
|
|
}
|
|
})
|
|
|
|
if wants_json(request):
|
|
return {
|
|
"run": run,
|
|
"plan": plan,
|
|
"artifacts": artifacts,
|
|
"analysis": analysis,
|
|
}
|
|
|
|
# Extract plan_sexp for streaming runs
|
|
plan_sexp = plan.get("sexp") if plan else None
|
|
|
|
templates = get_templates(request)
|
|
return render(templates, "runs/detail.html", request,
|
|
run=run,
|
|
plan=plan,
|
|
plan_sexp=plan_sexp,
|
|
artifacts=artifacts,
|
|
analysis=analysis,
|
|
dag_elements=dag_elements,
|
|
user=ctx,
|
|
active_tab="runs",
|
|
)
|
|
|
|
|
|
@router.get("/{run_id}/plan")
|
|
async def run_plan(
|
|
run_id: str,
|
|
request: Request,
|
|
run_service: RunService = Depends(get_run_service),
|
|
ctx: UserContext = Depends(require_auth),
|
|
):
|
|
"""Plan visualization as interactive DAG."""
|
|
plan = await run_service.get_run_plan(run_id)
|
|
if not plan:
|
|
raise HTTPException(404, "Plan not found for this run")
|
|
|
|
if wants_json(request):
|
|
return plan
|
|
|
|
# Build DAG elements
|
|
dag_elements = []
|
|
node_colors = {
|
|
"input": "#3b82f6",
|
|
"effect": "#8b5cf6",
|
|
"analyze": "#ec4899",
|
|
"transform": "#10b981",
|
|
"output": "#f59e0b",
|
|
"SOURCE": "#3b82f6",
|
|
"EFFECT": "#8b5cf6",
|
|
"SEQUENCE": "#ec4899",
|
|
}
|
|
|
|
for i, step in enumerate(plan.get("steps", [])):
|
|
step_id = step.get("id", f"step-{i}")
|
|
dag_elements.append({
|
|
"data": {
|
|
"id": step_id,
|
|
"label": step.get("name", f"Step {i+1}"),
|
|
"color": node_colors.get(step.get("type", "effect"), "#6b7280"),
|
|
}
|
|
})
|
|
for inp in step.get("inputs", []):
|
|
# Handle both string and dict formats
|
|
if isinstance(inp, dict):
|
|
source = inp.get("node") or inp.get("input") or inp.get("id")
|
|
else:
|
|
source = inp
|
|
if source:
|
|
dag_elements.append({
|
|
"data": {"source": source, "target": step_id}
|
|
})
|
|
|
|
templates = get_templates(request)
|
|
return render(templates, "runs/plan.html", request,
|
|
run_id=run_id,
|
|
plan=plan,
|
|
dag_elements=dag_elements,
|
|
user=ctx,
|
|
active_tab="runs",
|
|
)
|
|
|
|
|
|
@router.get("/{run_id}/artifacts")
|
|
async def run_artifacts(
|
|
run_id: str,
|
|
request: Request,
|
|
run_service: RunService = Depends(get_run_service),
|
|
ctx: UserContext = Depends(require_auth),
|
|
):
|
|
"""Get artifacts list for a run."""
|
|
artifacts = await run_service.get_run_artifacts(run_id)
|
|
|
|
if wants_json(request):
|
|
return {"artifacts": artifacts}
|
|
|
|
templates = get_templates(request)
|
|
return render(templates, "runs/artifacts.html", request,
|
|
run_id=run_id,
|
|
artifacts=artifacts,
|
|
user=ctx,
|
|
active_tab="runs",
|
|
)
|
|
|
|
|
|
@router.get("/{run_id}/plan/node/{cache_id}", response_class=HTMLResponse)
|
|
async def plan_node_detail(
|
|
run_id: str,
|
|
cache_id: str,
|
|
request: Request,
|
|
run_service: RunService = Depends(get_run_service),
|
|
):
|
|
"""HTMX partial: Get plan node detail by cache_id."""
|
|
from artdag_common import render_fragment
|
|
|
|
ctx = await get_current_user(request)
|
|
if not ctx:
|
|
return HTMLResponse('<p class="text-red-400">Login required</p>', status_code=401)
|
|
|
|
run = await run_service.get_run(run_id)
|
|
if not run:
|
|
return HTMLResponse('<p class="text-red-400">Run not found</p>', status_code=404)
|
|
|
|
plan = await run_service.get_run_plan(run_id)
|
|
if not plan:
|
|
return HTMLResponse('<p class="text-gray-400">Plan not found</p>')
|
|
|
|
# Build lookups
|
|
steps_by_cache_id = {}
|
|
steps_by_step_id = {}
|
|
for s in plan.get("steps", []):
|
|
if s.get("cache_id"):
|
|
steps_by_cache_id[s["cache_id"]] = s
|
|
if s.get("step_id"):
|
|
steps_by_step_id[s["step_id"]] = s
|
|
|
|
step = steps_by_cache_id.get(cache_id)
|
|
if not step:
|
|
return HTMLResponse(f'<p class="text-gray-400">Step not found</p>')
|
|
|
|
cache_manager = get_cache_manager()
|
|
|
|
# Node colors
|
|
node_colors = {
|
|
"SOURCE": "#3b82f6", "EFFECT": "#22c55e", "OUTPUT": "#a855f7",
|
|
"ANALYSIS": "#f59e0b", "_LIST": "#6366f1", "default": "#6b7280"
|
|
}
|
|
node_color = node_colors.get(step.get("node_type", "EFFECT"), node_colors["default"])
|
|
|
|
# Check cache status
|
|
has_cached = cache_manager.has_content(cache_id) if cache_id else False
|
|
|
|
# Determine output media type
|
|
output_media_type = None
|
|
output_preview = False
|
|
if has_cached:
|
|
cache_path = cache_manager.get_content_path(cache_id)
|
|
if cache_path:
|
|
output_media_type = run_service.detect_media_type(cache_path)
|
|
output_preview = output_media_type in ('video', 'image', 'audio')
|
|
|
|
# Check for IPFS CID
|
|
ipfs_cid = None
|
|
if run.step_results:
|
|
res = run.step_results.get(step.get("step_id"))
|
|
if isinstance(res, dict) and res.get("cid"):
|
|
ipfs_cid = res["cid"]
|
|
|
|
# Build input previews
|
|
inputs = []
|
|
for inp_step_id in step.get("input_steps", []):
|
|
inp_step = steps_by_step_id.get(inp_step_id)
|
|
if inp_step:
|
|
inp_cache_id = inp_step.get("cache_id", "")
|
|
inp_has_cached = cache_manager.has_content(inp_cache_id) if inp_cache_id else False
|
|
inp_media_type = None
|
|
if inp_has_cached:
|
|
inp_path = cache_manager.get_content_path(inp_cache_id)
|
|
if inp_path:
|
|
inp_media_type = run_service.detect_media_type(inp_path)
|
|
|
|
inputs.append({
|
|
"name": inp_step.get("name", inp_step_id[:12]),
|
|
"cache_id": inp_cache_id,
|
|
"media_type": inp_media_type,
|
|
"has_cached": inp_has_cached,
|
|
})
|
|
|
|
status = "cached" if (has_cached or ipfs_cid) else ("completed" if run.status == "completed" else "pending")
|
|
|
|
templates = get_templates(request)
|
|
return HTMLResponse(render_fragment(templates, "runs/plan_node.html",
|
|
step=step,
|
|
cache_id=cache_id,
|
|
node_color=node_color,
|
|
status=status,
|
|
has_cached=has_cached,
|
|
output_preview=output_preview,
|
|
output_media_type=output_media_type,
|
|
ipfs_cid=ipfs_cid,
|
|
ipfs_gateway="https://ipfs.io/ipfs",
|
|
inputs=inputs,
|
|
config=step.get("config", {}),
|
|
))
|
|
|
|
|
|
@router.delete("/{run_id}/ui", response_class=HTMLResponse)
|
|
async def ui_discard_run(
|
|
run_id: str,
|
|
request: Request,
|
|
run_service: RunService = Depends(get_run_service),
|
|
):
|
|
"""HTMX handler: discard a run."""
|
|
ctx = await get_current_user(request)
|
|
if not ctx:
|
|
return HTMLResponse(
|
|
'<div class="text-red-400">Login required</div>',
|
|
status_code=401
|
|
)
|
|
|
|
success, error = await run_service.discard_run(run_id, ctx.actor_id, ctx.username)
|
|
|
|
if error:
|
|
return HTMLResponse(f'<div class="text-red-400">{error}</div>')
|
|
|
|
return HTMLResponse(
|
|
'<div class="text-green-400">Run discarded</div>'
|
|
'<script>setTimeout(() => window.location.href = "/runs", 1500);</script>'
|
|
)
|
|
|
|
|
|
@router.post("/{run_id}/publish")
|
|
async def publish_run(
|
|
run_id: str,
|
|
request: Request,
|
|
ctx: UserContext = Depends(require_auth),
|
|
run_service: RunService = Depends(get_run_service),
|
|
):
|
|
"""Publish run output to L2 and IPFS."""
|
|
from ..services.cache_service import CacheService
|
|
from ..dependencies import get_cache_manager
|
|
import database
|
|
|
|
run = await run_service.get_run(run_id)
|
|
if not run:
|
|
raise HTTPException(404, "Run not found")
|
|
|
|
# Check if run has output
|
|
output_cid = run.get("output_cid")
|
|
if not output_cid:
|
|
error = "Run has no output to publish"
|
|
if wants_html(request):
|
|
return HTMLResponse(f'<span class="text-red-400">{error}</span>')
|
|
raise HTTPException(400, error)
|
|
|
|
# Use cache service to publish the output
|
|
cache_service = CacheService(database, get_cache_manager())
|
|
ipfs_cid, error = await cache_service.publish_to_l2(
|
|
cid=output_cid,
|
|
actor_id=ctx.actor_id,
|
|
l2_server=ctx.l2_server,
|
|
auth_token=request.cookies.get("auth_token"),
|
|
)
|
|
|
|
if error:
|
|
if wants_html(request):
|
|
return HTMLResponse(f'<span class="text-red-400">{error}</span>')
|
|
raise HTTPException(400, error)
|
|
|
|
if wants_html(request):
|
|
return HTMLResponse(f'<span class="text-green-400">Shared: {ipfs_cid[:16]}...</span>')
|
|
|
|
return {"ipfs_cid": ipfs_cid, "output_cid": output_cid, "published": True}
|
|
|
|
|
|
@router.delete("/admin/purge-failed")
|
|
async def purge_failed_runs(
|
|
request: Request,
|
|
ctx: UserContext = Depends(get_current_user),
|
|
):
|
|
"""Delete all failed runs from pending_runs table.
|
|
|
|
Requires authentication OR admin token in X-Admin-Token header.
|
|
"""
|
|
import database
|
|
import os
|
|
|
|
# Check for admin token
|
|
admin_token = os.environ.get("ADMIN_TOKEN")
|
|
request_token = request.headers.get("X-Admin-Token")
|
|
|
|
# Require either valid auth or admin token
|
|
if not ctx and (not admin_token or request_token != admin_token):
|
|
raise HTTPException(401, "Authentication required")
|
|
|
|
# Get all failed runs
|
|
failed_runs = await database.list_pending_runs(status="failed")
|
|
|
|
deleted = []
|
|
for run in failed_runs:
|
|
run_id = run.get("run_id")
|
|
try:
|
|
await database.delete_pending_run(run_id)
|
|
deleted.append(run_id)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to delete run {run_id}: {e}")
|
|
|
|
logger.info(f"Purged {len(deleted)} failed runs")
|
|
return {"purged": len(deleted), "run_ids": deleted}
|
|
|
|
|
|
@router.get("/{run_id}/stream")
|
|
async def stream_run_output(
|
|
run_id: str,
|
|
request: Request,
|
|
):
|
|
"""Stream the video output of a running render.
|
|
|
|
For IPFS HLS streams, redirects to the IPFS gateway playlist.
|
|
For local HLS streams, redirects to the m3u8 playlist.
|
|
For legacy MP4 streams, returns the file directly.
|
|
"""
|
|
from fastapi.responses import StreamingResponse, FileResponse, RedirectResponse
|
|
from pathlib import Path
|
|
import os
|
|
import database
|
|
from celery_app import app as celery_app
|
|
|
|
await database.init_db()
|
|
|
|
# Check for IPFS HLS streaming first (distributed P2P streaming)
|
|
pending = await database.get_pending_run(run_id)
|
|
if pending and pending.get("celery_task_id"):
|
|
task_id = pending["celery_task_id"]
|
|
result = celery_app.AsyncResult(task_id)
|
|
if result.ready() and isinstance(result.result, dict):
|
|
ipfs_playlist_url = result.result.get("ipfs_playlist_url")
|
|
if ipfs_playlist_url:
|
|
logger.info(f"Redirecting to IPFS stream: {ipfs_playlist_url}")
|
|
return RedirectResponse(url=ipfs_playlist_url, status_code=302)
|
|
|
|
cache_dir = os.environ.get("CACHE_DIR", "/data/cache")
|
|
stream_dir = Path(cache_dir) / "streaming" / run_id
|
|
|
|
# Check for local HLS output
|
|
hls_playlist = stream_dir / "stream.m3u8"
|
|
if hls_playlist.exists():
|
|
# Redirect to the HLS playlist endpoint
|
|
return RedirectResponse(
|
|
url=f"/runs/{run_id}/hls/stream.m3u8",
|
|
status_code=302
|
|
)
|
|
|
|
# Fall back to legacy MP4 streaming
|
|
stream_path = stream_dir / "output.mp4"
|
|
if not stream_path.exists():
|
|
raise HTTPException(404, "Stream not available yet")
|
|
|
|
file_size = stream_path.stat().st_size
|
|
if file_size == 0:
|
|
raise HTTPException(404, "Stream not ready")
|
|
|
|
return FileResponse(
|
|
path=str(stream_path),
|
|
media_type="video/mp4",
|
|
headers={
|
|
"Accept-Ranges": "bytes",
|
|
"Cache-Control": "no-cache, no-store, must-revalidate",
|
|
"X-Content-Size": str(file_size),
|
|
}
|
|
)
|
|
|
|
|
|
@router.get("/{run_id}/hls/{filename:path}")
|
|
async def serve_hls_content(
|
|
run_id: str,
|
|
filename: str,
|
|
request: Request,
|
|
):
|
|
"""Serve HLS playlist and segments for live streaming.
|
|
|
|
Serves stream.m3u8 (playlist) and segment_*.ts files.
|
|
The playlist updates as new segments are rendered.
|
|
|
|
If files aren't found locally, proxies to the GPU worker (if configured).
|
|
"""
|
|
from fastapi.responses import FileResponse, StreamingResponse
|
|
from pathlib import Path
|
|
import os
|
|
import httpx
|
|
|
|
cache_dir = os.environ.get("CACHE_DIR", "/data/cache")
|
|
stream_dir = Path(cache_dir) / "streaming" / run_id
|
|
file_path = stream_dir / filename
|
|
|
|
# Security: ensure we're only serving files within stream_dir
|
|
try:
|
|
file_path_resolved = file_path.resolve()
|
|
stream_dir_resolved = stream_dir.resolve()
|
|
if stream_dir.exists() and not str(file_path_resolved).startswith(str(stream_dir_resolved)):
|
|
raise HTTPException(403, "Invalid path")
|
|
except Exception:
|
|
pass # Allow proxy fallback
|
|
|
|
# Determine content type
|
|
if filename.endswith(".m3u8"):
|
|
media_type = "application/vnd.apple.mpegurl"
|
|
headers = {
|
|
"Cache-Control": "no-cache, no-store, must-revalidate",
|
|
"Access-Control-Allow-Origin": "*",
|
|
}
|
|
elif filename.endswith(".ts"):
|
|
media_type = "video/mp2t"
|
|
headers = {
|
|
"Cache-Control": "public, max-age=3600",
|
|
"Access-Control-Allow-Origin": "*",
|
|
}
|
|
else:
|
|
raise HTTPException(400, "Invalid file type")
|
|
|
|
# Try local file first
|
|
if file_path.exists():
|
|
return FileResponse(
|
|
path=str(file_path),
|
|
media_type=media_type,
|
|
headers=headers,
|
|
)
|
|
|
|
# Fallback: proxy to GPU worker if configured
|
|
gpu_worker_url = os.environ.get("GPU_WORKER_STREAM_URL")
|
|
if gpu_worker_url:
|
|
# Proxy request to GPU worker
|
|
proxy_url = f"{gpu_worker_url}/{run_id}/{filename}"
|
|
try:
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
resp = await client.get(proxy_url)
|
|
if resp.status_code == 200:
|
|
return StreamingResponse(
|
|
content=iter([resp.content]),
|
|
media_type=media_type,
|
|
headers=headers,
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"GPU worker proxy failed: {e}")
|
|
|
|
raise HTTPException(404, f"File not found: {filename}")
|
|
|
|
|
|
@router.get("/{run_id}/ipfs-stream")
|
|
async def get_ipfs_stream_info(run_id: str, request: Request):
|
|
"""Get IPFS streaming info for a run.
|
|
|
|
Returns the IPFS playlist URL and segment info if available.
|
|
This allows clients to stream directly from IPFS gateways.
|
|
"""
|
|
from celery_app import app as celery_app
|
|
import database
|
|
import os
|
|
|
|
await database.init_db()
|
|
|
|
# Try to get pending run to find the Celery task ID
|
|
pending = await database.get_pending_run(run_id)
|
|
if not pending:
|
|
# Try completed runs
|
|
run = await database.get_run_cache(run_id)
|
|
if not run:
|
|
raise HTTPException(404, "Run not found")
|
|
# For completed runs, check if we have IPFS info stored
|
|
ipfs_cid = run.get("ipfs_cid")
|
|
if ipfs_cid:
|
|
gateway = os.environ.get("IPFS_GATEWAY_URL", "https://ipfs.io/ipfs")
|
|
return {
|
|
"run_id": run_id,
|
|
"status": "completed",
|
|
"ipfs_video_url": f"{gateway}/{ipfs_cid}",
|
|
}
|
|
raise HTTPException(404, "No IPFS stream info available")
|
|
|
|
task_id = pending.get("celery_task_id")
|
|
if not task_id:
|
|
raise HTTPException(404, "No task ID for this run")
|
|
|
|
# Get the Celery task result
|
|
result = celery_app.AsyncResult(task_id)
|
|
|
|
if result.ready():
|
|
# Task is complete - check the result for IPFS playlist info
|
|
task_result = result.result
|
|
if isinstance(task_result, dict):
|
|
ipfs_playlist_cid = task_result.get("ipfs_playlist_cid")
|
|
ipfs_playlist_url = task_result.get("ipfs_playlist_url")
|
|
if ipfs_playlist_url:
|
|
return {
|
|
"run_id": run_id,
|
|
"status": "completed",
|
|
"ipfs_playlist_cid": ipfs_playlist_cid,
|
|
"ipfs_playlist_url": ipfs_playlist_url,
|
|
"segment_count": task_result.get("ipfs_segment_count", 0),
|
|
}
|
|
|
|
# Task is still running or no IPFS info available
|
|
return {
|
|
"run_id": run_id,
|
|
"status": pending.get("status", "pending"),
|
|
"message": "IPFS streaming info not yet available"
|
|
}
|