Fix run detail: add username, total_steps, recipe_name
- Extract username from actor_id format (@user@server) - Set total_steps and executed from recipe nodes - Use recipe name for display instead of hash Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -1,15 +1,15 @@
|
||||
"""
|
||||
Recipe Service - business logic for recipe management.
|
||||
|
||||
Recipes are content-addressed YAML files stored in the cache (and IPFS).
|
||||
The recipe ID is the content hash of the YAML file.
|
||||
Recipes are content-addressed S-expression files stored in the cache (and IPFS).
|
||||
The recipe ID is the content hash of the S-expression file.
|
||||
"""
|
||||
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Optional, List, Dict, Any, Tuple
|
||||
|
||||
import yaml
|
||||
from artdag.sexp import compile_string, parse, serialize, CompileError, ParseError
|
||||
|
||||
|
||||
class RecipeService:
|
||||
@@ -32,32 +32,28 @@ class RecipeService:
|
||||
return None
|
||||
|
||||
with open(path) as f:
|
||||
recipe_data = yaml.safe_load(f)
|
||||
sexp_content = f.read()
|
||||
|
||||
# Compile S-expression recipe
|
||||
try:
|
||||
compiled = compile_string(sexp_content)
|
||||
recipe_data = compiled.to_dict()
|
||||
except (ParseError, CompileError) as e:
|
||||
# Return basic error info
|
||||
return {"error": str(e), "recipe_id": recipe_id}
|
||||
|
||||
# Add the recipe_id to the data for convenience
|
||||
if isinstance(recipe_data, dict):
|
||||
recipe_data["recipe_id"] = recipe_id
|
||||
# Get IPFS CID if available
|
||||
ipfs_cid = self.cache.get_ipfs_cid(recipe_id)
|
||||
if ipfs_cid:
|
||||
recipe_data["ipfs_cid"] = ipfs_cid
|
||||
recipe_data["recipe_id"] = recipe_id
|
||||
recipe_data["sexp"] = sexp_content # Keep original S-expression
|
||||
|
||||
# Compute step_count from nodes
|
||||
nodes = recipe_data.get("nodes", [])
|
||||
if not nodes:
|
||||
dag = recipe_data.get("dag", {})
|
||||
nodes = dag.get("nodes", []) if isinstance(dag, dict) else []
|
||||
if not nodes:
|
||||
nodes = recipe_data.get("pipeline", [])
|
||||
if not nodes:
|
||||
nodes = recipe_data.get("steps", [])
|
||||
# Get IPFS CID if available
|
||||
ipfs_cid = self.cache.get_ipfs_cid(recipe_id)
|
||||
if ipfs_cid:
|
||||
recipe_data["ipfs_cid"] = ipfs_cid
|
||||
|
||||
if isinstance(nodes, list):
|
||||
recipe_data["step_count"] = len(nodes)
|
||||
elif isinstance(nodes, dict):
|
||||
recipe_data["step_count"] = len(nodes)
|
||||
else:
|
||||
recipe_data["step_count"] = 0
|
||||
# Compute step_count from nodes
|
||||
nodes = recipe_data.get("dag", {}).get("nodes", [])
|
||||
recipe_data["step_count"] = len(nodes) if isinstance(nodes, list) else 0
|
||||
|
||||
return recipe_data
|
||||
|
||||
@@ -97,40 +93,33 @@ class RecipeService:
|
||||
|
||||
async def upload_recipe(
|
||||
self,
|
||||
yaml_content: str,
|
||||
sexp_content: str,
|
||||
uploader: str,
|
||||
name: str = None,
|
||||
description: str = None,
|
||||
) -> Tuple[Optional[str], Optional[str]]:
|
||||
"""
|
||||
Upload a recipe from YAML content.
|
||||
Upload a recipe from S-expression content.
|
||||
|
||||
The recipe is stored in the cache and optionally pinned to IPFS.
|
||||
Returns (recipe_id, error_message).
|
||||
"""
|
||||
# Validate YAML
|
||||
# Validate and compile S-expression
|
||||
try:
|
||||
recipe_data = yaml.safe_load(yaml_content)
|
||||
except yaml.YAMLError as e:
|
||||
return None, f"Invalid YAML: {e}"
|
||||
compiled = compile_string(sexp_content)
|
||||
except ParseError as e:
|
||||
return None, f"Parse error: {e}"
|
||||
except CompileError as e:
|
||||
return None, f"Compile error: {e}"
|
||||
|
||||
if not isinstance(recipe_data, dict):
|
||||
return None, "Recipe must be a YAML dictionary"
|
||||
|
||||
# Add uploader info to the YAML before storing
|
||||
recipe_data["uploader"] = uploader
|
||||
if name:
|
||||
recipe_data["name"] = name
|
||||
if description:
|
||||
recipe_data["description"] = description
|
||||
|
||||
# Serialize back to YAML (with added metadata)
|
||||
final_yaml = yaml.dump(recipe_data, default_flow_style=False)
|
||||
# For now, store the original S-expression content
|
||||
# The uploader info is not embedded in the S-expression (kept in metadata)
|
||||
# In a full implementation, we might add a :uploader keyword
|
||||
|
||||
# Write to temp file for caching
|
||||
try:
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=".yaml", mode="w") as tmp:
|
||||
tmp.write(final_yaml)
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=".sexp", mode="w") as tmp:
|
||||
tmp.write(sexp_content)
|
||||
tmp_path = Path(tmp.name)
|
||||
|
||||
# Store in cache (content-addressed, auto-pins to IPFS)
|
||||
@@ -176,9 +165,10 @@ class RecipeService:
|
||||
except Exception as e:
|
||||
return False, f"Failed to delete: {e}"
|
||||
|
||||
def parse_yaml(self, yaml_content: str) -> Dict[str, Any]:
|
||||
"""Parse recipe YAML content."""
|
||||
return yaml.safe_load(yaml_content)
|
||||
def parse_recipe(self, sexp_content: str) -> Dict[str, Any]:
|
||||
"""Parse and compile recipe S-expression content."""
|
||||
compiled = compile_string(sexp_content)
|
||||
return compiled.to_dict()
|
||||
|
||||
def build_dag(self, recipe: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
@@ -186,37 +176,69 @@ class RecipeService:
|
||||
|
||||
Returns nodes and edges for Cytoscape.js.
|
||||
"""
|
||||
nodes = []
|
||||
vis_nodes = []
|
||||
edges = []
|
||||
|
||||
dag = recipe.get("dag", {})
|
||||
dag_nodes = dag.get("nodes", {})
|
||||
dag_nodes = dag.get("nodes", [])
|
||||
output_node = dag.get("output")
|
||||
|
||||
for node_id, node_def in dag_nodes.items():
|
||||
node_type = node_def.get("type", "EFFECT")
|
||||
nodes.append({
|
||||
"data": {
|
||||
"id": node_id,
|
||||
"label": node_id,
|
||||
"nodeType": node_type,
|
||||
"isOutput": node_id == output_node,
|
||||
}
|
||||
})
|
||||
# Handle list format from compiled S-expression recipes
|
||||
if isinstance(dag_nodes, list):
|
||||
for node_def in dag_nodes:
|
||||
node_id = node_def.get("id")
|
||||
node_type = node_def.get("type", "EFFECT")
|
||||
|
||||
# Build edges from inputs
|
||||
for input_ref in node_def.get("inputs", []):
|
||||
if isinstance(input_ref, dict):
|
||||
source = input_ref.get("node") or input_ref.get("input")
|
||||
else:
|
||||
source = input_ref
|
||||
vis_nodes.append({
|
||||
"data": {
|
||||
"id": node_id,
|
||||
"label": node_id,
|
||||
"nodeType": node_type,
|
||||
"isOutput": node_id == output_node,
|
||||
}
|
||||
})
|
||||
|
||||
if source:
|
||||
edges.append({
|
||||
"data": {
|
||||
"source": source,
|
||||
"target": node_id,
|
||||
}
|
||||
})
|
||||
# Build edges from inputs
|
||||
for input_ref in node_def.get("inputs", []):
|
||||
if isinstance(input_ref, dict):
|
||||
source = input_ref.get("node") or input_ref.get("input")
|
||||
else:
|
||||
source = input_ref
|
||||
|
||||
return {"nodes": nodes, "edges": edges}
|
||||
if source:
|
||||
edges.append({
|
||||
"data": {
|
||||
"source": source,
|
||||
"target": node_id,
|
||||
}
|
||||
})
|
||||
|
||||
# Handle dict format (legacy)
|
||||
elif isinstance(dag_nodes, dict):
|
||||
for node_id, node_def in dag_nodes.items():
|
||||
node_type = node_def.get("type", "EFFECT")
|
||||
|
||||
vis_nodes.append({
|
||||
"data": {
|
||||
"id": node_id,
|
||||
"label": node_id,
|
||||
"nodeType": node_type,
|
||||
"isOutput": node_id == output_node,
|
||||
}
|
||||
})
|
||||
|
||||
for input_ref in node_def.get("inputs", []):
|
||||
if isinstance(input_ref, dict):
|
||||
source = input_ref.get("node") or input_ref.get("input")
|
||||
else:
|
||||
source = input_ref
|
||||
|
||||
if source:
|
||||
edges.append({
|
||||
"data": {
|
||||
"source": source,
|
||||
"target": node_id,
|
||||
}
|
||||
})
|
||||
|
||||
return {"nodes": vis_nodes, "edges": edges}
|
||||
|
||||
Reference in New Issue
Block a user