Remove YAML support - S-expressions only

- Recipe service now only handles S-expressions
- Removed yaml import and all YAML parsing code
- Plans are just node outputs - cached by content hash
- Run service looks up plans from cache, falls back to legacy dir

Code is data. Everything is S-expressions.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-12 00:33:54 +00:00
parent 3dbbb52d23
commit b686ce75f8
3 changed files with 58 additions and 137 deletions

View File

@@ -1,8 +1,7 @@
""" """
Recipe Service - business logic for recipe management. Recipe Service - business logic for recipe management.
Recipes are content-addressed files stored in the cache (and IPFS). Recipes are S-expressions stored in the content-addressed cache (and IPFS).
Supports both S-expression (.sexp) and YAML (.yaml) formats.
The recipe ID is the content hash of the file. The recipe ID is the content hash of the file.
""" """
@@ -10,43 +9,14 @@ import tempfile
from pathlib import Path from pathlib import Path
from typing import Optional, List, Dict, Any, Tuple from typing import Optional, List, Dict, Any, Tuple
# Try to import S-expression support, fall back to YAML from artdag.sexp import compile_string, parse, serialize, CompileError, ParseError
try:
from artdag.sexp import compile_string, parse, serialize, CompileError, ParseError
SEXP_AVAILABLE = True
except ImportError:
SEXP_AVAILABLE = False
compile_string = None
parse = None
serialize = None
CompileError = Exception
ParseError = Exception
import yaml
def _is_sexp_format(content: str) -> bool:
"""
Detect if content is S-expression format.
Skips leading comments (lines starting with ;) and whitespace.
Returns True if the first non-comment content starts with (.
"""
for line in content.split('\n'):
stripped = line.strip()
if not stripped:
continue # Skip empty lines
if stripped.startswith(';'):
continue # Skip comments
return stripped.startswith('(')
return False
class RecipeService: class RecipeService:
""" """
Service for managing recipes. Service for managing recipes.
Recipes are stored in the content-addressed cache, not Redis. Recipes are S-expressions stored in the content-addressed cache.
""" """
def __init__(self, redis, cache): def __init__(self, redis, cache):
@@ -64,36 +34,17 @@ class RecipeService:
with open(path) as f: with open(path) as f:
content = f.read() content = f.read()
# Try to detect format and parse # Parse S-expression
recipe_data = None try:
is_sexp = _is_sexp_format(content) compiled = compile_string(content)
recipe_data = compiled.to_dict()
if is_sexp: recipe_data["sexp"] = content
if not SEXP_AVAILABLE: except (ParseError, CompileError) as e:
return {"error": "S-expression recipes require artdag.sexp module (not installed)", "recipe_id": recipe_id} return {"error": str(e), "recipe_id": recipe_id}
# Parse as S-expression
try:
compiled = compile_string(content)
recipe_data = compiled.to_dict()
recipe_data["sexp"] = content
except (ParseError, CompileError) as e:
return {"error": str(e), "recipe_id": recipe_id}
else:
# Parse as YAML
try:
recipe_data = yaml.safe_load(content)
if not isinstance(recipe_data, dict):
return {"error": "Invalid recipe format", "recipe_id": recipe_id}
except yaml.YAMLError as e:
return {"error": str(e), "recipe_id": recipe_id}
# Add the recipe_id to the data for convenience # Add the recipe_id to the data for convenience
recipe_data["recipe_id"] = recipe_id recipe_data["recipe_id"] = recipe_id
# Normalize owner/uploader field (S-expr uses 'owner', YAML uses 'uploader')
if "owner" in recipe_data and "uploader" not in recipe_data:
recipe_data["uploader"] = recipe_data["owner"]
# Get IPFS CID if available # Get IPFS CID if available
ipfs_cid = self.cache.get_ipfs_cid(recipe_id) ipfs_cid = self.cache.get_ipfs_cid(recipe_id)
if ipfs_cid: if ipfs_cid:
@@ -101,10 +52,6 @@ class RecipeService:
# Compute step_count from nodes # Compute step_count from nodes
nodes = recipe_data.get("dag", {}).get("nodes", []) nodes = recipe_data.get("dag", {}).get("nodes", [])
if not nodes:
nodes = recipe_data.get("nodes", [])
if not nodes:
nodes = recipe_data.get("pipeline", [])
recipe_data["step_count"] = len(nodes) if isinstance(nodes, (list, dict)) else 0 recipe_data["step_count"] = len(nodes) if isinstance(nodes, (list, dict)) else 0
return recipe_data return recipe_data
@@ -114,29 +61,21 @@ class RecipeService:
List available recipes for a user. List available recipes for a user.
L1 data is isolated per-user - only shows recipes owned by actor_id. L1 data is isolated per-user - only shows recipes owned by actor_id.
Note: This scans the cache for recipe files. For production,
you might want a database index of recipes by owner.
""" """
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Get all cached items and filter for recipes
recipes = [] recipes = []
# Check if cache has a list method for recipes
if hasattr(self.cache, 'list_by_type'): if hasattr(self.cache, 'list_by_type'):
items = self.cache.list_by_type('recipe') items = self.cache.list_by_type('recipe')
logger.info(f"Found {len(items)} recipe items in cache for actor_id={actor_id}") logger.info(f"Found {len(items)} recipes in cache")
for content_hash in items: for content_hash in items:
recipe = await self.get_recipe(content_hash) recipe = await self.get_recipe(content_hash)
if recipe: if recipe and not recipe.get("error"):
uploader = recipe.get("uploader")
owner = recipe.get("owner") owner = recipe.get("owner")
logger.info(f"Recipe {content_hash[:12]}: name={recipe.get('name')}, uploader={uploader}, owner={owner}, actor_id={actor_id}")
# Filter by actor - L1 is per-user # Filter by actor - L1 is per-user
# Check both uploader and owner fields for flexibility if actor_id is None or owner == actor_id:
if actor_id is None or uploader == actor_id or owner == actor_id:
recipes.append(recipe) recipes.append(recipe)
else: else:
logger.warning("Cache does not have list_by_type method") logger.warning("Cache does not have list_by_type method")
@@ -144,7 +83,6 @@ class RecipeService:
# Sort by name # Sort by name
recipes.sort(key=lambda r: r.get("name", "")) recipes.sort(key=lambda r: r.get("name", ""))
# Paginate
return recipes[offset:offset + limit] return recipes[offset:offset + limit]
async def upload_recipe( async def upload_recipe(
@@ -155,48 +93,22 @@ class RecipeService:
description: str = None, description: str = None,
) -> Tuple[Optional[str], Optional[str]]: ) -> Tuple[Optional[str], Optional[str]]:
""" """
Upload a recipe from S-expression or YAML content. Upload a recipe from S-expression content.
The recipe is stored in the cache and optionally pinned to IPFS. The recipe is stored in the cache and pinned to IPFS.
Returns (recipe_id, error_message). Returns (recipe_id, error_message).
""" """
# Detect format # Validate S-expression
is_sexp = _is_sexp_format(content) try:
compiled = compile_string(content)
if is_sexp: except ParseError as e:
if not SEXP_AVAILABLE: return None, f"Parse error: {e}"
return None, "S-expression recipes require artdag.sexp module (not installed on server)" except CompileError as e:
# Validate S-expression return None, f"Compile error: {e}"
try:
compiled = compile_string(content)
except ParseError as e:
return None, f"Parse error: {e}"
except CompileError as e:
return None, f"Compile error: {e}"
suffix = ".sexp"
else:
# Validate YAML
try:
recipe_data = yaml.safe_load(content)
if not isinstance(recipe_data, dict):
return None, "Recipe must be a YAML dictionary"
# Add uploader info
recipe_data["uploader"] = uploader
if name:
recipe_data["name"] = name
if description:
recipe_data["description"] = description
# Serialize back
content = yaml.dump(recipe_data, default_flow_style=False)
except yaml.YAMLError as e:
return None, f"Invalid YAML: {e}"
suffix = ".yaml"
# Write to temp file for caching # Write to temp file for caching
try: try:
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix, mode="w") as tmp: with tempfile.NamedTemporaryFile(delete=False, suffix=".sexp", mode="w") as tmp:
tmp.write(content) tmp.write(content)
tmp_path = Path(tmp.name) tmp_path = Path(tmp.name)
@@ -216,14 +128,13 @@ class RecipeService:
Note: This only removes from local cache. IPFS copies persist. Note: This only removes from local cache. IPFS copies persist.
Returns (success, error_message). Returns (success, error_message).
""" """
# Get recipe to check ownership
recipe = await self.get_recipe(recipe_id) recipe = await self.get_recipe(recipe_id)
if not recipe: if not recipe:
return False, "Recipe not found" return False, "Recipe not found"
# Check ownership if actor_id provided # Check ownership if actor_id provided
if actor_id: if actor_id:
recipe_owner = recipe.get("uploader") recipe_owner = recipe.get("owner")
if recipe_owner and recipe_owner != actor_id: if recipe_owner and recipe_owner != actor_id:
return False, "Cannot delete: you don't own this recipe" return False, "Cannot delete: you don't own this recipe"
@@ -234,7 +145,6 @@ class RecipeService:
if not success: if not success:
return False, msg return False, msg
else: else:
# Fallback: get path and delete directly
path = self.cache.get_by_content_hash(recipe_id) path = self.cache.get_by_content_hash(recipe_id)
if path and path.exists(): if path and path.exists():
path.unlink() path.unlink()
@@ -244,14 +154,9 @@ class RecipeService:
return False, f"Failed to delete: {e}" return False, f"Failed to delete: {e}"
def parse_recipe(self, content: str) -> Dict[str, Any]: def parse_recipe(self, content: str) -> Dict[str, Any]:
"""Parse recipe content (S-expression or YAML).""" """Parse recipe S-expression content."""
is_sexp = _is_sexp_format(content) compiled = compile_string(content)
return compiled.to_dict()
if is_sexp and SEXP_AVAILABLE:
compiled = compile_string(content)
return compiled.to_dict()
else:
return yaml.safe_load(content)
def build_dag(self, recipe: Dict[str, Any]) -> Dict[str, Any]: def build_dag(self, recipe: Dict[str, Any]) -> Dict[str, Any]:
""" """
@@ -266,7 +171,7 @@ class RecipeService:
dag_nodes = dag.get("nodes", []) dag_nodes = dag.get("nodes", [])
output_node = dag.get("output") output_node = dag.get("output")
# Handle list format from compiled S-expression recipes # Handle list format (compiled S-expression)
if isinstance(dag_nodes, list): if isinstance(dag_nodes, list):
for node_def in dag_nodes: for node_def in dag_nodes:
node_id = node_def.get("id") node_id = node_def.get("id")
@@ -281,7 +186,6 @@ class RecipeService:
} }
}) })
# Build edges from inputs
for input_ref in node_def.get("inputs", []): for input_ref in node_def.get("inputs", []):
if isinstance(input_ref, dict): if isinstance(input_ref, dict):
source = input_ref.get("node") or input_ref.get("input") source = input_ref.get("node") or input_ref.get("input")
@@ -296,7 +200,7 @@ class RecipeService:
} }
}) })
# Handle dict format (legacy) # Handle dict format
elif isinstance(dag_nodes, dict): elif isinstance(dag_nodes, dict):
for node_id, node_def in dag_nodes.items(): for node_id, node_def in dag_nodes.items():
node_type = node_def.get("type", "EFFECT") node_type = node_def.get("type", "EFFECT")

View File

@@ -481,14 +481,36 @@ class RunService:
return True, None return True, None
async def get_run_plan(self, run_id: str) -> Optional[Dict[str, Any]]: async def get_run_plan(self, run_id: str) -> Optional[Dict[str, Any]]:
"""Get execution plan for a run.""" """Get execution plan for a run.
# Prefer S-expression plan
Plans are just node outputs - cached by content hash like everything else.
"""
# Get run to find plan_cache_id
run = await self.get_run(run_id)
if not run:
return None
plan_cache_id = run.get("plan_cache_id")
if plan_cache_id:
# Get plan from cache by content hash
plan_path = self.cache.get_by_content_hash(plan_cache_id)
if plan_path and plan_path.exists():
with open(plan_path) as f:
content = f.read()
# Detect format
if content.strip().startswith("("):
return {"sexp": content, "format": "sexp"}
else:
plan = json.loads(content)
plan["format"] = "json"
return plan
# Fall back to legacy plans directory
sexp_path = self.cache_dir / "plans" / f"{run_id}.sexp" sexp_path = self.cache_dir / "plans" / f"{run_id}.sexp"
if sexp_path.exists(): if sexp_path.exists():
with open(sexp_path) as f: with open(sexp_path) as f:
return {"sexp": f.read(), "format": "sexp"} return {"sexp": f.read(), "format": "sexp"}
# Fall back to JSON for legacy plans
json_path = self.cache_dir / "plans" / f"{run_id}.json" json_path = self.cache_dir / "plans" / f"{run_id}.json"
if json_path.exists(): if json_path.exists():
with open(json_path) as f: with open(json_path) as f:
@@ -500,10 +522,9 @@ class RunService:
async def get_run_plan_sexp(self, run_id: str) -> Optional[str]: async def get_run_plan_sexp(self, run_id: str) -> Optional[str]:
"""Get execution plan as S-expression string.""" """Get execution plan as S-expression string."""
sexp_path = self.cache_dir / "plans" / f"{run_id}.sexp" plan = await self.get_run_plan(run_id)
if sexp_path.exists(): if plan and plan.get("format") == "sexp":
with open(sexp_path) as f: return plan.get("sexp")
return f.read()
return None return None
async def get_run_artifacts(self, run_id: str) -> List[Dict[str, Any]]: async def get_run_artifacts(self, run_id: str) -> List[Dict[str, Any]]:

View File

@@ -309,14 +309,10 @@ def run_recipe(
tmp_path = Path(tmp.name) tmp_path = Path(tmp.name)
# Store in cache (content-addressed, auto-pins to IPFS) # Store in cache (content-addressed, auto-pins to IPFS)
# Plan is just another node output - no special treatment needed
cached, plan_ipfs_cid = cache_mgr.put(tmp_path, node_type="plan", move=True) cached, plan_ipfs_cid = cache_mgr.put(tmp_path, node_type="plan", move=True)
logger.info(f"Plan cached: hash={cached.content_hash}, ipfs={plan_ipfs_cid}") logger.info(f"Plan cached: hash={cached.content_hash}, ipfs={plan_ipfs_cid}")
# Also save to plans dir for legacy lookup by run_id
PLAN_CACHE_DIR.mkdir(parents=True, exist_ok=True)
run_plan_path = PLAN_CACHE_DIR / f"{run_id}.sexp"
run_plan_path.write_text(plan_sexp)
# Phase 3: Execute # Phase 3: Execute
logger.info("Phase 3: Executing plan...") logger.info("Phase 3: Executing plan...")