diff --git a/app/routers/recipes.py b/app/routers/recipes.py index 46e3ca8..f5f7f85 100644 --- a/app/routers/recipes.py +++ b/app/routers/recipes.py @@ -24,7 +24,7 @@ logger = logging.getLogger(__name__) class RecipeUploadRequest(BaseModel): - yaml_content: str + sexp_content: str name: Optional[str] = None description: Optional[str] = None @@ -44,47 +44,50 @@ async def upload_recipe( ctx: UserContext = Depends(require_auth), recipe_service: RecipeService = Depends(get_recipe_service), ): - """Upload a new recipe from YAML file.""" - import yaml + """Upload a new recipe from S-expression file.""" + from artdag.sexp import compile_string, ParseError, CompileError - # Read the YAML content from the uploaded file - yaml_content = (await file.read()).decode("utf-8") + # Read the S-expression content from the uploaded file + sexp_content = (await file.read()).decode("utf-8") - # Parse YAML to extract recipe info for response + # Parse and compile S-expression to extract recipe info try: - recipe_data = yaml.safe_load(yaml_content) - except yaml.YAMLError as e: - raise HTTPException(400, f"Invalid YAML: {e}") + compiled = compile_string(sexp_content) + except ParseError as e: + raise HTTPException(400, f"Parse error: {e}") + except CompileError as e: + raise HTTPException(400, f"Compile error: {e}") - # Use filename (without extension) as recipe name if not in YAML - recipe_name = recipe_data.get("name") + # Use filename (without extension) as recipe name if not specified + recipe_name = compiled.name if not recipe_name and file.filename: recipe_name = file.filename.rsplit(".", 1)[0] recipe_id, error = await recipe_service.upload_recipe( - yaml_content=yaml_content, + sexp_content=sexp_content, uploader=ctx.actor_id, name=recipe_name, - description=recipe_data.get("description"), + description=compiled.description, ) if error: raise HTTPException(400, error) - # Extract input info for response - inputs = recipe_data.get("inputs", {}) + # Extract input info from compiled nodes variable_inputs = [] fixed_inputs = [] - for input_name, input_def in inputs.items(): - if isinstance(input_def, dict) and input_def.get("fixed"): - fixed_inputs.append(input_name) - else: - variable_inputs.append(input_name) + for node in compiled.nodes: + if node.get("type") == "SOURCE": + config = node.get("config", {}) + if config.get("input"): + variable_inputs.append(config.get("name", node.get("id"))) + elif config.get("asset"): + fixed_inputs.append(config.get("asset")) return { "recipe_id": recipe_id, "name": recipe_name or "unnamed", - "version": recipe_data.get("version", "1.0"), + "version": compiled.version, "variable_inputs": variable_inputs, "fixed_inputs": fixed_inputs, "message": "Recipe uploaded successfully", @@ -235,9 +238,9 @@ async def get_recipe( # Add steps to recipe for template recipe["steps"] = steps - # Add YAML source - import yaml - recipe["yaml"] = yaml.dump(recipe, default_flow_style=False) + # Use S-expression source if available + if "sexp" not in recipe: + recipe["sexp"] = "; No S-expression source available" templates = get_templates(request) return render(templates, "recipes/detail.html", request, diff --git a/app/routers/runs.py b/app/routers/runs.py index 9c61ea7..33c1b6a 100644 --- a/app/routers/runs.py +++ b/app/routers/runs.py @@ -102,6 +102,14 @@ async def get_run( # Only render HTML if browser explicitly requests it if wants_html(request): + # Extract username from actor_id (format: @user@server) + actor_id = run.get("actor_id", "") + if actor_id and actor_id.startswith("@"): + parts = actor_id[1:].split("@") + run["username"] = parts[0] if parts else "Unknown" + else: + run["username"] = actor_id or "Unknown" + # Try to load the recipe to show the plan plan = None recipe_id = run.get("recipe") @@ -111,11 +119,9 @@ async def get_run( recipe_service = RecipeService(get_redis_client(), get_cache_manager()) recipe = await recipe_service.get_recipe(recipe_id) if recipe: - # Build plan from recipe nodes - nodes = recipe.get("nodes", []) - if not nodes: - dag = recipe.get("dag", {}) - nodes = dag.get("nodes", []) + # Use the new build_dag method if available + dag = recipe.get("dag", {}) + nodes = dag.get("nodes", []) steps = [] if isinstance(nodes, list): @@ -137,6 +143,12 @@ async def get_run( if steps: plan = {"steps": steps} + run["total_steps"] = len(steps) + run["executed"] = len(steps) if run.get("status") == "completed" else 0 + + # Use recipe name instead of hash for display + if recipe.get("name"): + run["recipe_name"] = recipe["name"] except Exception as e: logger.warning(f"Failed to load recipe for plan: {e}") diff --git a/app/services/recipe_service.py b/app/services/recipe_service.py index 760283e..bc3010a 100644 --- a/app/services/recipe_service.py +++ b/app/services/recipe_service.py @@ -1,15 +1,15 @@ """ Recipe Service - business logic for recipe management. -Recipes are content-addressed YAML files stored in the cache (and IPFS). -The recipe ID is the content hash of the YAML file. +Recipes are content-addressed S-expression files stored in the cache (and IPFS). +The recipe ID is the content hash of the S-expression file. """ import tempfile from pathlib import Path from typing import Optional, List, Dict, Any, Tuple -import yaml +from artdag.sexp import compile_string, parse, serialize, CompileError, ParseError class RecipeService: @@ -32,32 +32,28 @@ class RecipeService: return None with open(path) as f: - recipe_data = yaml.safe_load(f) + sexp_content = f.read() + + # Compile S-expression recipe + try: + compiled = compile_string(sexp_content) + recipe_data = compiled.to_dict() + except (ParseError, CompileError) as e: + # Return basic error info + return {"error": str(e), "recipe_id": recipe_id} # Add the recipe_id to the data for convenience - if isinstance(recipe_data, dict): - recipe_data["recipe_id"] = recipe_id - # Get IPFS CID if available - ipfs_cid = self.cache.get_ipfs_cid(recipe_id) - if ipfs_cid: - recipe_data["ipfs_cid"] = ipfs_cid + recipe_data["recipe_id"] = recipe_id + recipe_data["sexp"] = sexp_content # Keep original S-expression - # Compute step_count from nodes - nodes = recipe_data.get("nodes", []) - if not nodes: - dag = recipe_data.get("dag", {}) - nodes = dag.get("nodes", []) if isinstance(dag, dict) else [] - if not nodes: - nodes = recipe_data.get("pipeline", []) - if not nodes: - nodes = recipe_data.get("steps", []) + # Get IPFS CID if available + ipfs_cid = self.cache.get_ipfs_cid(recipe_id) + if ipfs_cid: + recipe_data["ipfs_cid"] = ipfs_cid - if isinstance(nodes, list): - recipe_data["step_count"] = len(nodes) - elif isinstance(nodes, dict): - recipe_data["step_count"] = len(nodes) - else: - recipe_data["step_count"] = 0 + # Compute step_count from nodes + nodes = recipe_data.get("dag", {}).get("nodes", []) + recipe_data["step_count"] = len(nodes) if isinstance(nodes, list) else 0 return recipe_data @@ -97,40 +93,33 @@ class RecipeService: async def upload_recipe( self, - yaml_content: str, + sexp_content: str, uploader: str, name: str = None, description: str = None, ) -> Tuple[Optional[str], Optional[str]]: """ - Upload a recipe from YAML content. + Upload a recipe from S-expression content. The recipe is stored in the cache and optionally pinned to IPFS. Returns (recipe_id, error_message). """ - # Validate YAML + # Validate and compile S-expression try: - recipe_data = yaml.safe_load(yaml_content) - except yaml.YAMLError as e: - return None, f"Invalid YAML: {e}" + compiled = compile_string(sexp_content) + except ParseError as e: + return None, f"Parse error: {e}" + except CompileError as e: + return None, f"Compile error: {e}" - if not isinstance(recipe_data, dict): - return None, "Recipe must be a YAML dictionary" - - # Add uploader info to the YAML before storing - recipe_data["uploader"] = uploader - if name: - recipe_data["name"] = name - if description: - recipe_data["description"] = description - - # Serialize back to YAML (with added metadata) - final_yaml = yaml.dump(recipe_data, default_flow_style=False) + # For now, store the original S-expression content + # The uploader info is not embedded in the S-expression (kept in metadata) + # In a full implementation, we might add a :uploader keyword # Write to temp file for caching try: - with tempfile.NamedTemporaryFile(delete=False, suffix=".yaml", mode="w") as tmp: - tmp.write(final_yaml) + with tempfile.NamedTemporaryFile(delete=False, suffix=".sexp", mode="w") as tmp: + tmp.write(sexp_content) tmp_path = Path(tmp.name) # Store in cache (content-addressed, auto-pins to IPFS) @@ -176,9 +165,10 @@ class RecipeService: except Exception as e: return False, f"Failed to delete: {e}" - def parse_yaml(self, yaml_content: str) -> Dict[str, Any]: - """Parse recipe YAML content.""" - return yaml.safe_load(yaml_content) + def parse_recipe(self, sexp_content: str) -> Dict[str, Any]: + """Parse and compile recipe S-expression content.""" + compiled = compile_string(sexp_content) + return compiled.to_dict() def build_dag(self, recipe: Dict[str, Any]) -> Dict[str, Any]: """ @@ -186,37 +176,69 @@ class RecipeService: Returns nodes and edges for Cytoscape.js. """ - nodes = [] + vis_nodes = [] edges = [] dag = recipe.get("dag", {}) - dag_nodes = dag.get("nodes", {}) + dag_nodes = dag.get("nodes", []) output_node = dag.get("output") - for node_id, node_def in dag_nodes.items(): - node_type = node_def.get("type", "EFFECT") - nodes.append({ - "data": { - "id": node_id, - "label": node_id, - "nodeType": node_type, - "isOutput": node_id == output_node, - } - }) + # Handle list format from compiled S-expression recipes + if isinstance(dag_nodes, list): + for node_def in dag_nodes: + node_id = node_def.get("id") + node_type = node_def.get("type", "EFFECT") - # Build edges from inputs - for input_ref in node_def.get("inputs", []): - if isinstance(input_ref, dict): - source = input_ref.get("node") or input_ref.get("input") - else: - source = input_ref + vis_nodes.append({ + "data": { + "id": node_id, + "label": node_id, + "nodeType": node_type, + "isOutput": node_id == output_node, + } + }) - if source: - edges.append({ - "data": { - "source": source, - "target": node_id, - } - }) + # Build edges from inputs + for input_ref in node_def.get("inputs", []): + if isinstance(input_ref, dict): + source = input_ref.get("node") or input_ref.get("input") + else: + source = input_ref - return {"nodes": nodes, "edges": edges} + if source: + edges.append({ + "data": { + "source": source, + "target": node_id, + } + }) + + # Handle dict format (legacy) + elif isinstance(dag_nodes, dict): + for node_id, node_def in dag_nodes.items(): + node_type = node_def.get("type", "EFFECT") + + vis_nodes.append({ + "data": { + "id": node_id, + "label": node_id, + "nodeType": node_type, + "isOutput": node_id == output_node, + } + }) + + for input_ref in node_def.get("inputs", []): + if isinstance(input_ref, dict): + source = input_ref.get("node") or input_ref.get("input") + else: + source = input_ref + + if source: + edges.append({ + "data": { + "source": source, + "target": node_id, + } + }) + + return {"nodes": vis_nodes, "edges": edges}