Files
celery/app/routers/recipes.py
gilesb 0ddeb5ba94 Resolve registry asset/effect references when running recipes
SOURCE nodes with config.asset now get content_hash from registry.
EFFECT nodes with config.effect now get effect_hash from registry.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-11 18:22:36 +00:00

455 lines
15 KiB
Python

"""
Recipe management routes for L1 server.
Handles recipe upload, listing, viewing, and execution.
"""
import logging
from typing import List, Optional
from fastapi import APIRouter, Request, Depends, HTTPException, UploadFile, File
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
from artdag_common import render
from artdag_common.middleware import wants_html, wants_json
from artdag_common.middleware.auth import UserContext
from ..dependencies import require_auth, get_templates, get_redis_client, get_cache_manager
from ..services.auth_service import AuthService
from ..services.recipe_service import RecipeService
router = APIRouter()
logger = logging.getLogger(__name__)
class RecipeUploadRequest(BaseModel):
yaml_content: str
name: Optional[str] = None
description: Optional[str] = None
class RecipeRunRequest(BaseModel):
inputs: dict = {}
def get_recipe_service():
"""Get recipe service instance."""
return RecipeService(get_redis_client(), get_cache_manager())
@router.post("/upload")
async def upload_recipe(
file: UploadFile = File(...),
ctx: UserContext = Depends(require_auth),
recipe_service: RecipeService = Depends(get_recipe_service),
):
"""Upload a new recipe from YAML file."""
import yaml
# Read the YAML content from the uploaded file
yaml_content = (await file.read()).decode("utf-8")
# Parse YAML to extract recipe info for response
try:
recipe_data = yaml.safe_load(yaml_content)
except yaml.YAMLError as e:
raise HTTPException(400, f"Invalid YAML: {e}")
# Use filename (without extension) as recipe name if not in YAML
recipe_name = recipe_data.get("name")
if not recipe_name and file.filename:
recipe_name = file.filename.rsplit(".", 1)[0]
recipe_id, error = await recipe_service.upload_recipe(
yaml_content=yaml_content,
uploader=ctx.actor_id,
name=recipe_name,
description=recipe_data.get("description"),
)
if error:
raise HTTPException(400, error)
# Extract input info for response
inputs = recipe_data.get("inputs", {})
variable_inputs = []
fixed_inputs = []
for input_name, input_def in inputs.items():
if isinstance(input_def, dict) and input_def.get("fixed"):
fixed_inputs.append(input_name)
else:
variable_inputs.append(input_name)
return {
"recipe_id": recipe_id,
"name": recipe_name or "unnamed",
"version": recipe_data.get("version", "1.0"),
"variable_inputs": variable_inputs,
"fixed_inputs": fixed_inputs,
"message": "Recipe uploaded successfully",
}
@router.get("")
async def list_recipes(
request: Request,
offset: int = 0,
limit: int = 20,
recipe_service: RecipeService = Depends(get_recipe_service),
):
"""List available recipes."""
auth_service = AuthService(get_redis_client())
ctx = auth_service.get_user_from_cookie(request)
if not ctx:
if wants_json(request):
raise HTTPException(401, "Authentication required")
from fastapi.responses import RedirectResponse
return RedirectResponse(url="/auth", status_code=302)
recipes = await recipe_service.list_recipes(ctx.actor_id, offset=offset, limit=limit)
if wants_json(request):
return {"recipes": recipes, "offset": offset, "limit": limit}
templates = get_templates(request)
return render(templates, "recipes/list.html", request,
recipes=recipes,
user=ctx,
active_tab="recipes",
)
@router.get("/{recipe_id}")
async def get_recipe(
recipe_id: str,
request: Request,
recipe_service: RecipeService = Depends(get_recipe_service),
):
"""Get recipe details."""
auth_service = AuthService(get_redis_client())
ctx = auth_service.get_user_from_cookie(request)
if not ctx:
if wants_json(request):
raise HTTPException(401, "Authentication required")
from fastapi.responses import RedirectResponse
return RedirectResponse(url="/auth", status_code=302)
recipe = await recipe_service.get_recipe(recipe_id)
if not recipe:
raise HTTPException(404, "Recipe not found")
if wants_json(request):
return recipe
# Build DAG elements for visualization and convert nodes to steps format
dag_elements = []
steps = []
node_colors = {
"SOURCE": "#3b82f6",
"EFFECT": "#8b5cf6",
"SEQUENCE": "#ec4899",
"transform": "#10b981",
"output": "#f59e0b",
}
# Get nodes from dag - can be list or dict
dag = recipe.get("dag", {})
nodes = dag.get("nodes", [])
# Convert list of nodes to steps format
if isinstance(nodes, list):
for node in nodes:
node_id = node.get("id", "")
node_type = node.get("type", "EFFECT")
inputs = node.get("inputs", [])
config = node.get("config", {})
steps.append({
"id": node_id,
"name": node_id,
"type": node_type,
"inputs": inputs,
"params": config,
})
dag_elements.append({
"data": {
"id": node_id,
"label": node_id,
"color": node_colors.get(node_type, "#6b7280"),
}
})
for inp in inputs:
if isinstance(inp, str):
dag_elements.append({
"data": {"source": inp, "target": node_id}
})
elif isinstance(nodes, dict):
for node_id, node in nodes.items():
node_type = node.get("type", "EFFECT")
inputs = node.get("inputs", [])
config = node.get("config", {})
steps.append({
"id": node_id,
"name": node_id,
"type": node_type,
"inputs": inputs,
"params": config,
})
dag_elements.append({
"data": {
"id": node_id,
"label": node_id,
"color": node_colors.get(node_type, "#6b7280"),
}
})
for inp in inputs:
if isinstance(inp, str):
dag_elements.append({
"data": {"source": inp, "target": node_id}
})
# Add steps to recipe for template
recipe["steps"] = steps
# Add YAML source
import yaml
recipe["yaml"] = yaml.dump(recipe, default_flow_style=False)
templates = get_templates(request)
return render(templates, "recipes/detail.html", request,
recipe=recipe,
dag_elements=dag_elements,
user=ctx,
active_tab="recipes",
)
@router.delete("/{recipe_id}")
async def delete_recipe(
recipe_id: str,
ctx: UserContext = Depends(require_auth),
recipe_service: RecipeService = Depends(get_recipe_service),
):
"""Delete a recipe."""
success, error = await recipe_service.delete_recipe(recipe_id, ctx.actor_id)
if error:
raise HTTPException(400 if "Cannot" in error else 404, error)
return {"deleted": True, "recipe_id": recipe_id}
@router.post("/{recipe_id}/run")
async def run_recipe(
recipe_id: str,
req: RecipeRunRequest,
ctx: UserContext = Depends(require_auth),
recipe_service: RecipeService = Depends(get_recipe_service),
):
"""Run a recipe with given inputs."""
from ..services.run_service import RunService
from ..dependencies import get_cache_manager
import database
recipe = await recipe_service.get_recipe(recipe_id)
if not recipe:
raise HTTPException(404, "Recipe not found")
try:
import json
# Create run using run service
run_service = RunService(database, get_redis_client(), get_cache_manager())
# If recipe has a DAG definition, bind inputs and convert to JSON
recipe_dag = recipe.get("dag")
dag_json = None
if recipe_dag and isinstance(recipe_dag, dict):
# Bind inputs to the DAG's source nodes
dag_copy = json.loads(json.dumps(recipe_dag)) # Deep copy
nodes = dag_copy.get("nodes", {})
# Get registry for resolving asset/effect references
registry = recipe.get("registry", {})
assets = registry.get("assets", {})
effects = registry.get("effects", {})
# Convert nodes from list to dict if needed, and transform to artdag format
if isinstance(nodes, list):
nodes_dict = {}
for node in nodes:
node_id = node.get("id")
if node_id:
config = node.get("config", {})
# Resolve asset references for SOURCE nodes
if node.get("type") == "SOURCE" and "asset" in config:
asset_name = config["asset"]
if asset_name in assets:
config["content_hash"] = assets[asset_name].get("hash")
# Resolve effect references for EFFECT nodes
if node.get("type") == "EFFECT" and "effect" in config:
effect_name = config["effect"]
if effect_name in effects:
config["effect_hash"] = effects[effect_name].get("hash")
# Transform to artdag format: type->node_type, id->node_id
transformed = {
"node_id": node_id,
"node_type": node.get("type", "EFFECT"),
"config": config,
"inputs": node.get("inputs", []),
"name": node.get("name"),
}
nodes_dict[node_id] = transformed
nodes = nodes_dict
dag_copy["nodes"] = nodes
# Map user-provided input names to content hashes (for variable inputs)
for input_name, content_hash in req.inputs.items():
if input_name in nodes:
node = nodes[input_name]
if node.get("node_type") == "SOURCE":
if "config" not in node:
node["config"] = {}
node["config"]["content_hash"] = content_hash
# Transform output to output_id
if "output" in dag_copy:
dag_copy["output_id"] = dag_copy.pop("output")
# Add metadata if not present
if "metadata" not in dag_copy:
dag_copy["metadata"] = {}
dag_json = json.dumps(dag_copy)
run, error = await run_service.create_run(
recipe=recipe.get("name", recipe_id),
inputs=req.inputs,
use_dag=True,
dag_json=dag_json,
actor_id=ctx.actor_id,
l2_server=ctx.l2_server,
)
if error:
raise HTTPException(400, error)
if not run:
raise HTTPException(500, "Run creation returned no result")
return {
"run_id": run["run_id"] if isinstance(run, dict) else run.run_id,
"status": run.get("status", "pending") if isinstance(run, dict) else run.status,
"message": "Recipe execution started",
}
except HTTPException:
raise
except Exception as e:
logger.exception(f"Error running recipe {recipe_id}")
raise HTTPException(500, f"Run failed: {e}")
@router.get("/{recipe_id}/dag")
async def recipe_dag(
recipe_id: str,
request: Request,
recipe_service: RecipeService = Depends(get_recipe_service),
):
"""Get recipe DAG visualization data."""
recipe = await recipe_service.get_recipe(recipe_id)
if not recipe:
raise HTTPException(404, "Recipe not found")
dag_elements = []
node_colors = {
"input": "#3b82f6",
"effect": "#8b5cf6",
"analyze": "#ec4899",
"transform": "#10b981",
"output": "#f59e0b",
}
for i, step in enumerate(recipe.get("steps", [])):
step_id = step.get("id", f"step-{i}")
dag_elements.append({
"data": {
"id": step_id,
"label": step.get("name", f"Step {i+1}"),
"color": node_colors.get(step.get("type", "effect"), "#6b7280"),
}
})
for inp in step.get("inputs", []):
dag_elements.append({
"data": {"source": inp, "target": step_id}
})
return {"elements": dag_elements}
@router.delete("/{recipe_id}/ui", response_class=HTMLResponse)
async def ui_discard_recipe(
recipe_id: str,
request: Request,
recipe_service: RecipeService = Depends(get_recipe_service),
):
"""HTMX handler: discard a recipe."""
auth_service = AuthService(get_redis_client())
ctx = auth_service.get_user_from_cookie(request)
if not ctx:
return HTMLResponse('<div class="text-red-400">Login required</div>', status_code=401)
success, error = await recipe_service.delete_recipe(recipe_id, ctx.actor_id)
if error:
return HTMLResponse(f'<div class="text-red-400">{error}</div>')
return HTMLResponse(
'<div class="text-green-400">Recipe deleted</div>'
'<script>setTimeout(() => window.location.href = "/recipes", 1500);</script>'
)
@router.post("/{recipe_id}/publish")
async def publish_recipe(
recipe_id: str,
request: Request,
ctx: UserContext = Depends(require_auth),
recipe_service: RecipeService = Depends(get_recipe_service),
):
"""Publish recipe to L2 and IPFS."""
from ..services.cache_service import CacheService
from ..dependencies import get_cache_manager
import database
# Verify recipe exists
recipe = await recipe_service.get_recipe(recipe_id)
if not recipe:
raise HTTPException(404, "Recipe not found")
# Use cache service to publish (recipes are stored in cache)
cache_service = CacheService(database, get_cache_manager())
ipfs_cid, error = await cache_service.publish_to_l2(
content_hash=recipe_id,
actor_id=ctx.actor_id,
l2_server=ctx.l2_server,
auth_token=request.cookies.get("auth_token"),
)
if error:
if wants_html(request):
return HTMLResponse(f'<span class="text-red-400">{error}</span>')
raise HTTPException(400, error)
if wants_html(request):
return HTMLResponse(f'<span class="text-green-400">Shared: {ipfs_cid[:16]}...</span>')
return {"ipfs_cid": ipfs_cid, "published": True}