Files
celery/app/services/recipe_service.py
gilesb 6adef63fad Fix get_recipe to handle both YAML and S-expression formats
The upload endpoint accepts both YAML and S-expression recipes, but
get_recipe only tried to parse S-expression. Now it detects the format
and parses accordingly.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 15:31:25 +00:00

299 lines
11 KiB
Python

"""
Recipe Service - business logic for recipe management.
Recipes are S-expressions stored in the content-addressed cache (and IPFS).
The recipe ID is the content hash of the file.
"""
import tempfile
from pathlib import Path
from typing import Optional, List, Dict, Any, Tuple, TYPE_CHECKING
from artdag.sexp import compile_string, parse, serialize, CompileError, ParseError
if TYPE_CHECKING:
import redis
from cache_manager import L1CacheManager
from ..types import Recipe, CompiledDAG, VisualizationDAG, VisNode, VisEdge
class RecipeService:
"""
Service for managing recipes.
Recipes are S-expressions stored in the content-addressed cache.
"""
def __init__(self, redis: "redis.Redis", cache: "L1CacheManager") -> None:
# Redis kept for compatibility but not used for recipe storage
self.redis = redis
self.cache = cache
async def get_recipe(self, recipe_id: str) -> Optional[Recipe]:
"""Get a recipe by ID (content hash)."""
import yaml
# Get from cache (content-addressed storage)
path = self.cache.get_by_cid(recipe_id)
if not path or not path.exists():
return None
with open(path) as f:
content = f.read()
# Detect format - check if it starts with ( after skipping comments
def is_sexp_format(text):
for line in text.split('\n'):
stripped = line.strip()
if not stripped or stripped.startswith(';'):
continue
return stripped.startswith('(')
return False
if is_sexp_format(content):
# Parse S-expression
try:
compiled = compile_string(content)
recipe_data = compiled.to_dict()
recipe_data["sexp"] = content
recipe_data["format"] = "sexp"
except (ParseError, CompileError) as e:
return {"error": str(e), "recipe_id": recipe_id}
else:
# Parse YAML
try:
recipe_data = yaml.safe_load(content)
if not isinstance(recipe_data, dict):
return {"error": "Invalid YAML: expected dictionary", "recipe_id": recipe_id}
recipe_data["yaml"] = content
recipe_data["format"] = "yaml"
except yaml.YAMLError as e:
return {"error": f"YAML parse error: {e}", "recipe_id": recipe_id}
# Add the recipe_id to the data for convenience
recipe_data["recipe_id"] = recipe_id
# Get IPFS CID if available
ipfs_cid = self.cache.get_ipfs_cid(recipe_id)
if ipfs_cid:
recipe_data["ipfs_cid"] = ipfs_cid
# Compute step_count from nodes (handle both formats)
if recipe_data.get("format") == "sexp":
nodes = recipe_data.get("dag", {}).get("nodes", [])
else:
# YAML format: nodes might be at top level or under dag
nodes = recipe_data.get("nodes", recipe_data.get("dag", {}).get("nodes", []))
recipe_data["step_count"] = len(nodes) if isinstance(nodes, (list, dict)) else 0
return recipe_data
async def list_recipes(self, actor_id: Optional[str] = None, offset: int = 0, limit: int = 20) -> List[Recipe]:
"""
List available recipes for a user.
L1 data is isolated per-user - only shows recipes owned by actor_id.
"""
import logging
logger = logging.getLogger(__name__)
recipes = []
if hasattr(self.cache, 'list_by_type'):
items = self.cache.list_by_type('recipe')
logger.info(f"Found {len(items)} recipe CIDs in cache: {items[:5]}...")
for cid in items:
logger.debug(f"Attempting to get recipe {cid[:16]}...")
recipe = await self.get_recipe(cid)
if recipe and not recipe.get("error"):
owner = recipe.get("owner")
# Filter by actor - L1 is per-user
# Note: S-expression recipes don't have owner field, so owner=None
# means the recipe is shared/public and visible to all users
if actor_id is None or owner is None or owner == actor_id:
recipes.append(recipe)
logger.info(f"Recipe {cid[:16]}... included (owner={owner}, actor={actor_id})")
else:
logger.info(f"Recipe {cid[:16]}... filtered out (owner={owner}, actor={actor_id})")
elif recipe and recipe.get("error"):
logger.warning(f"Recipe {cid[:16]}... has error: {recipe.get('error')}")
else:
logger.warning(f"Recipe {cid[:16]}... returned None")
else:
logger.warning("Cache does not have list_by_type method")
# Add friendly names
if actor_id:
from .naming_service import get_naming_service
naming = get_naming_service()
for recipe in recipes:
recipe_id = recipe.get("recipe_id")
if recipe_id:
friendly = await naming.get_by_cid(actor_id, recipe_id)
if friendly:
recipe["friendly_name"] = friendly["friendly_name"]
recipe["base_name"] = friendly["base_name"]
# Sort by name
recipes.sort(key=lambda r: r.get("name", ""))
return recipes[offset:offset + limit]
async def upload_recipe(
self,
content: str,
uploader: str,
name: str = None,
description: str = None,
) -> Tuple[Optional[str], Optional[str]]:
"""
Upload a recipe from S-expression content.
The recipe is stored in the cache and pinned to IPFS.
Returns (recipe_id, error_message).
"""
# Validate S-expression
try:
compiled = compile_string(content)
except ParseError as e:
return None, f"Parse error: {e}"
except CompileError as e:
return None, f"Compile error: {e}"
# Write to temp file for caching
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".sexp", mode="w") as tmp:
tmp.write(content)
tmp_path = Path(tmp.name)
# Store in cache (content-addressed, auto-pins to IPFS)
cached, ipfs_cid = self.cache.put(tmp_path, node_type="recipe", move=True)
recipe_id = ipfs_cid or cached.cid # Prefer IPFS CID
# Assign friendly name
if uploader:
from .naming_service import get_naming_service
naming = get_naming_service()
display_name = name or compiled.name or "unnamed-recipe"
await naming.assign_name(
cid=recipe_id,
actor_id=uploader,
item_type="recipe",
display_name=display_name,
)
return recipe_id, None
except Exception as e:
return None, f"Failed to cache recipe: {e}"
async def delete_recipe(self, recipe_id: str, actor_id: str = None) -> Tuple[bool, Optional[str]]:
"""
Delete a recipe.
Note: This only removes from local cache. IPFS copies persist.
Returns (success, error_message).
"""
recipe = await self.get_recipe(recipe_id)
if not recipe:
return False, "Recipe not found"
# Check ownership if actor_id provided
if actor_id:
recipe_owner = recipe.get("owner")
if recipe_owner and recipe_owner != actor_id:
return False, "Cannot delete: you don't own this recipe"
# Delete from cache
try:
if hasattr(self.cache, 'delete_by_cid'):
success, msg = self.cache.delete_by_cid(recipe_id)
if not success:
return False, msg
else:
path = self.cache.get_by_cid(recipe_id)
if path and path.exists():
path.unlink()
return True, None
except Exception as e:
return False, f"Failed to delete: {e}"
def parse_recipe(self, content: str) -> CompiledDAG:
"""Parse recipe S-expression content."""
compiled = compile_string(content)
return compiled.to_dict()
def build_dag(self, recipe: Recipe) -> VisualizationDAG:
"""
Build DAG visualization data from recipe.
Returns nodes and edges for Cytoscape.js.
"""
vis_nodes: List[VisNode] = []
edges: List[VisEdge] = []
dag = recipe.get("dag", {})
dag_nodes = dag.get("nodes", [])
output_node = dag.get("output")
# Handle list format (compiled S-expression)
if isinstance(dag_nodes, list):
for node_def in dag_nodes:
node_id = node_def.get("id")
node_type = node_def.get("type", "EFFECT")
vis_nodes.append({
"data": {
"id": node_id,
"label": node_id,
"nodeType": node_type,
"isOutput": node_id == output_node,
}
})
for input_ref in node_def.get("inputs", []):
if isinstance(input_ref, dict):
source = input_ref.get("node") or input_ref.get("input")
else:
source = input_ref
if source:
edges.append({
"data": {
"source": source,
"target": node_id,
}
})
# Handle dict format
elif isinstance(dag_nodes, dict):
for node_id, node_def in dag_nodes.items():
node_type = node_def.get("type", "EFFECT")
vis_nodes.append({
"data": {
"id": node_id,
"label": node_id,
"nodeType": node_type,
"isOutput": node_id == output_node,
}
})
for input_ref in node_def.get("inputs", []):
if isinstance(input_ref, dict):
source = input_ref.get("node") or input_ref.get("input")
else:
source = input_ref
if source:
edges.append({
"data": {
"source": source,
"target": node_id,
}
})
return {"nodes": vis_nodes, "edges": edges}