From b686ce75f865cd077e140ad9f60ece1a93a85c43 Mon Sep 17 00:00:00 2001 From: gilesb Date: Mon, 12 Jan 2026 00:33:54 +0000 Subject: [PATCH] Remove YAML support - S-expressions only - Recipe service now only handles S-expressions - Removed yaml import and all YAML parsing code - Plans are just node outputs - cached by content hash - Run service looks up plans from cache, falls back to legacy dir Code is data. Everything is S-expressions. Co-Authored-By: Claude Opus 4.5 --- app/services/recipe_service.py | 154 +++++++-------------------------- app/services/run_service.py | 35 ++++++-- tasks/orchestrate.py | 6 +- 3 files changed, 58 insertions(+), 137 deletions(-) diff --git a/app/services/recipe_service.py b/app/services/recipe_service.py index 96d37d3..e67a2ae 100644 --- a/app/services/recipe_service.py +++ b/app/services/recipe_service.py @@ -1,8 +1,7 @@ """ Recipe Service - business logic for recipe management. -Recipes are content-addressed files stored in the cache (and IPFS). -Supports both S-expression (.sexp) and YAML (.yaml) formats. +Recipes are S-expressions stored in the content-addressed cache (and IPFS). The recipe ID is the content hash of the file. """ @@ -10,43 +9,14 @@ import tempfile from pathlib import Path from typing import Optional, List, Dict, Any, Tuple -# Try to import S-expression support, fall back to YAML -try: - from artdag.sexp import compile_string, parse, serialize, CompileError, ParseError - SEXP_AVAILABLE = True -except ImportError: - SEXP_AVAILABLE = False - compile_string = None - parse = None - serialize = None - CompileError = Exception - ParseError = Exception - -import yaml - - -def _is_sexp_format(content: str) -> bool: - """ - Detect if content is S-expression format. - - Skips leading comments (lines starting with ;) and whitespace. - Returns True if the first non-comment content starts with (. - """ - for line in content.split('\n'): - stripped = line.strip() - if not stripped: - continue # Skip empty lines - if stripped.startswith(';'): - continue # Skip comments - return stripped.startswith('(') - return False +from artdag.sexp import compile_string, parse, serialize, CompileError, ParseError class RecipeService: """ Service for managing recipes. - Recipes are stored in the content-addressed cache, not Redis. + Recipes are S-expressions stored in the content-addressed cache. """ def __init__(self, redis, cache): @@ -64,36 +34,17 @@ class RecipeService: with open(path) as f: content = f.read() - # Try to detect format and parse - recipe_data = None - is_sexp = _is_sexp_format(content) - - if is_sexp: - if not SEXP_AVAILABLE: - return {"error": "S-expression recipes require artdag.sexp module (not installed)", "recipe_id": recipe_id} - # Parse as S-expression - try: - compiled = compile_string(content) - recipe_data = compiled.to_dict() - recipe_data["sexp"] = content - except (ParseError, CompileError) as e: - return {"error": str(e), "recipe_id": recipe_id} - else: - # Parse as YAML - try: - recipe_data = yaml.safe_load(content) - if not isinstance(recipe_data, dict): - return {"error": "Invalid recipe format", "recipe_id": recipe_id} - except yaml.YAMLError as e: - return {"error": str(e), "recipe_id": recipe_id} + # Parse S-expression + try: + compiled = compile_string(content) + recipe_data = compiled.to_dict() + recipe_data["sexp"] = content + except (ParseError, CompileError) as e: + return {"error": str(e), "recipe_id": recipe_id} # Add the recipe_id to the data for convenience recipe_data["recipe_id"] = recipe_id - # Normalize owner/uploader field (S-expr uses 'owner', YAML uses 'uploader') - if "owner" in recipe_data and "uploader" not in recipe_data: - recipe_data["uploader"] = recipe_data["owner"] - # Get IPFS CID if available ipfs_cid = self.cache.get_ipfs_cid(recipe_id) if ipfs_cid: @@ -101,10 +52,6 @@ class RecipeService: # Compute step_count from nodes nodes = recipe_data.get("dag", {}).get("nodes", []) - if not nodes: - nodes = recipe_data.get("nodes", []) - if not nodes: - nodes = recipe_data.get("pipeline", []) recipe_data["step_count"] = len(nodes) if isinstance(nodes, (list, dict)) else 0 return recipe_data @@ -114,29 +61,21 @@ class RecipeService: List available recipes for a user. L1 data is isolated per-user - only shows recipes owned by actor_id. - - Note: This scans the cache for recipe files. For production, - you might want a database index of recipes by owner. """ import logging logger = logging.getLogger(__name__) - # Get all cached items and filter for recipes recipes = [] - # Check if cache has a list method for recipes if hasattr(self.cache, 'list_by_type'): items = self.cache.list_by_type('recipe') - logger.info(f"Found {len(items)} recipe items in cache for actor_id={actor_id}") + logger.info(f"Found {len(items)} recipes in cache") for content_hash in items: recipe = await self.get_recipe(content_hash) - if recipe: - uploader = recipe.get("uploader") + if recipe and not recipe.get("error"): owner = recipe.get("owner") - logger.info(f"Recipe {content_hash[:12]}: name={recipe.get('name')}, uploader={uploader}, owner={owner}, actor_id={actor_id}") # Filter by actor - L1 is per-user - # Check both uploader and owner fields for flexibility - if actor_id is None or uploader == actor_id or owner == actor_id: + if actor_id is None or owner == actor_id: recipes.append(recipe) else: logger.warning("Cache does not have list_by_type method") @@ -144,7 +83,6 @@ class RecipeService: # Sort by name recipes.sort(key=lambda r: r.get("name", "")) - # Paginate return recipes[offset:offset + limit] async def upload_recipe( @@ -155,48 +93,22 @@ class RecipeService: description: str = None, ) -> Tuple[Optional[str], Optional[str]]: """ - Upload a recipe from S-expression or YAML content. + Upload a recipe from S-expression content. - The recipe is stored in the cache and optionally pinned to IPFS. + The recipe is stored in the cache and pinned to IPFS. Returns (recipe_id, error_message). """ - # Detect format - is_sexp = _is_sexp_format(content) - - if is_sexp: - if not SEXP_AVAILABLE: - return None, "S-expression recipes require artdag.sexp module (not installed on server)" - # Validate S-expression - try: - compiled = compile_string(content) - except ParseError as e: - return None, f"Parse error: {e}" - except CompileError as e: - return None, f"Compile error: {e}" - suffix = ".sexp" - else: - # Validate YAML - try: - recipe_data = yaml.safe_load(content) - if not isinstance(recipe_data, dict): - return None, "Recipe must be a YAML dictionary" - - # Add uploader info - recipe_data["uploader"] = uploader - if name: - recipe_data["name"] = name - if description: - recipe_data["description"] = description - - # Serialize back - content = yaml.dump(recipe_data, default_flow_style=False) - except yaml.YAMLError as e: - return None, f"Invalid YAML: {e}" - suffix = ".yaml" + # Validate S-expression + try: + compiled = compile_string(content) + except ParseError as e: + return None, f"Parse error: {e}" + except CompileError as e: + return None, f"Compile error: {e}" # Write to temp file for caching try: - with tempfile.NamedTemporaryFile(delete=False, suffix=suffix, mode="w") as tmp: + with tempfile.NamedTemporaryFile(delete=False, suffix=".sexp", mode="w") as tmp: tmp.write(content) tmp_path = Path(tmp.name) @@ -216,14 +128,13 @@ class RecipeService: Note: This only removes from local cache. IPFS copies persist. Returns (success, error_message). """ - # Get recipe to check ownership recipe = await self.get_recipe(recipe_id) if not recipe: return False, "Recipe not found" # Check ownership if actor_id provided if actor_id: - recipe_owner = recipe.get("uploader") + recipe_owner = recipe.get("owner") if recipe_owner and recipe_owner != actor_id: return False, "Cannot delete: you don't own this recipe" @@ -234,7 +145,6 @@ class RecipeService: if not success: return False, msg else: - # Fallback: get path and delete directly path = self.cache.get_by_content_hash(recipe_id) if path and path.exists(): path.unlink() @@ -244,14 +154,9 @@ class RecipeService: return False, f"Failed to delete: {e}" def parse_recipe(self, content: str) -> Dict[str, Any]: - """Parse recipe content (S-expression or YAML).""" - is_sexp = _is_sexp_format(content) - - if is_sexp and SEXP_AVAILABLE: - compiled = compile_string(content) - return compiled.to_dict() - else: - return yaml.safe_load(content) + """Parse recipe S-expression content.""" + compiled = compile_string(content) + return compiled.to_dict() def build_dag(self, recipe: Dict[str, Any]) -> Dict[str, Any]: """ @@ -266,7 +171,7 @@ class RecipeService: dag_nodes = dag.get("nodes", []) output_node = dag.get("output") - # Handle list format from compiled S-expression recipes + # Handle list format (compiled S-expression) if isinstance(dag_nodes, list): for node_def in dag_nodes: node_id = node_def.get("id") @@ -281,7 +186,6 @@ class RecipeService: } }) - # Build edges from inputs for input_ref in node_def.get("inputs", []): if isinstance(input_ref, dict): source = input_ref.get("node") or input_ref.get("input") @@ -296,7 +200,7 @@ class RecipeService: } }) - # Handle dict format (legacy) + # Handle dict format elif isinstance(dag_nodes, dict): for node_id, node_def in dag_nodes.items(): node_type = node_def.get("type", "EFFECT") diff --git a/app/services/run_service.py b/app/services/run_service.py index 4eb84b7..bd1638c 100644 --- a/app/services/run_service.py +++ b/app/services/run_service.py @@ -481,14 +481,36 @@ class RunService: return True, None async def get_run_plan(self, run_id: str) -> Optional[Dict[str, Any]]: - """Get execution plan for a run.""" - # Prefer S-expression plan + """Get execution plan for a run. + + Plans are just node outputs - cached by content hash like everything else. + """ + # Get run to find plan_cache_id + run = await self.get_run(run_id) + if not run: + return None + + plan_cache_id = run.get("plan_cache_id") + if plan_cache_id: + # Get plan from cache by content hash + plan_path = self.cache.get_by_content_hash(plan_cache_id) + if plan_path and plan_path.exists(): + with open(plan_path) as f: + content = f.read() + # Detect format + if content.strip().startswith("("): + return {"sexp": content, "format": "sexp"} + else: + plan = json.loads(content) + plan["format"] = "json" + return plan + + # Fall back to legacy plans directory sexp_path = self.cache_dir / "plans" / f"{run_id}.sexp" if sexp_path.exists(): with open(sexp_path) as f: return {"sexp": f.read(), "format": "sexp"} - # Fall back to JSON for legacy plans json_path = self.cache_dir / "plans" / f"{run_id}.json" if json_path.exists(): with open(json_path) as f: @@ -500,10 +522,9 @@ class RunService: async def get_run_plan_sexp(self, run_id: str) -> Optional[str]: """Get execution plan as S-expression string.""" - sexp_path = self.cache_dir / "plans" / f"{run_id}.sexp" - if sexp_path.exists(): - with open(sexp_path) as f: - return f.read() + plan = await self.get_run_plan(run_id) + if plan and plan.get("format") == "sexp": + return plan.get("sexp") return None async def get_run_artifacts(self, run_id: str) -> List[Dict[str, Any]]: diff --git a/tasks/orchestrate.py b/tasks/orchestrate.py index 562a9ab..347c02e 100644 --- a/tasks/orchestrate.py +++ b/tasks/orchestrate.py @@ -309,14 +309,10 @@ def run_recipe( tmp_path = Path(tmp.name) # Store in cache (content-addressed, auto-pins to IPFS) + # Plan is just another node output - no special treatment needed cached, plan_ipfs_cid = cache_mgr.put(tmp_path, node_type="plan", move=True) logger.info(f"Plan cached: hash={cached.content_hash}, ipfs={plan_ipfs_cid}") - # Also save to plans dir for legacy lookup by run_id - PLAN_CACHE_DIR.mkdir(parents=True, exist_ok=True) - run_plan_path = PLAN_CACHE_DIR / f"{run_id}.sexp" - run_plan_path.write_text(plan_sexp) - # Phase 3: Execute logger.info("Phase 3: Executing plan...")