Files
celery/app/services/recipe_service.py
gilesb fe8e65881d Map owner to uploader for S-expression recipes
S-expression recipes use 'owner' field while YAML uses 'uploader'.
Normalize to 'uploader' so recipe listing filter works for both formats.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-11 23:40:50 +00:00

324 lines
11 KiB
Python

"""
Recipe Service - business logic for recipe management.
Recipes are content-addressed files stored in the cache (and IPFS).
Supports both S-expression (.sexp) and YAML (.yaml) formats.
The recipe ID is the content hash of the file.
"""
import tempfile
from pathlib import Path
from typing import Optional, List, Dict, Any, Tuple
# Try to import S-expression support, fall back to YAML
try:
from artdag.sexp import compile_string, parse, serialize, CompileError, ParseError
SEXP_AVAILABLE = True
except ImportError:
SEXP_AVAILABLE = False
compile_string = None
parse = None
serialize = None
CompileError = Exception
ParseError = Exception
import yaml
def _is_sexp_format(content: str) -> bool:
"""
Detect if content is S-expression format.
Skips leading comments (lines starting with ;) and whitespace.
Returns True if the first non-comment content starts with (.
"""
for line in content.split('\n'):
stripped = line.strip()
if not stripped:
continue # Skip empty lines
if stripped.startswith(';'):
continue # Skip comments
return stripped.startswith('(')
return False
class RecipeService:
"""
Service for managing recipes.
Recipes are stored in the content-addressed cache, not Redis.
"""
def __init__(self, redis, cache):
# Redis kept for compatibility but not used for recipe storage
self.redis = redis
self.cache = cache
async def get_recipe(self, recipe_id: str) -> Optional[Dict[str, Any]]:
"""Get a recipe by ID (content hash)."""
# Get from cache (content-addressed storage)
path = self.cache.get_by_content_hash(recipe_id)
if not path or not path.exists():
return None
with open(path) as f:
content = f.read()
# Try to detect format and parse
recipe_data = None
is_sexp = _is_sexp_format(content)
if is_sexp:
if not SEXP_AVAILABLE:
return {"error": "S-expression recipes require artdag.sexp module (not installed)", "recipe_id": recipe_id}
# Parse as S-expression
try:
compiled = compile_string(content)
recipe_data = compiled.to_dict()
recipe_data["sexp"] = content
except (ParseError, CompileError) as e:
return {"error": str(e), "recipe_id": recipe_id}
else:
# Parse as YAML
try:
recipe_data = yaml.safe_load(content)
if not isinstance(recipe_data, dict):
return {"error": "Invalid recipe format", "recipe_id": recipe_id}
except yaml.YAMLError as e:
return {"error": str(e), "recipe_id": recipe_id}
# Add the recipe_id to the data for convenience
recipe_data["recipe_id"] = recipe_id
# Normalize owner/uploader field (S-expr uses 'owner', YAML uses 'uploader')
if "owner" in recipe_data and "uploader" not in recipe_data:
recipe_data["uploader"] = recipe_data["owner"]
# Get IPFS CID if available
ipfs_cid = self.cache.get_ipfs_cid(recipe_id)
if ipfs_cid:
recipe_data["ipfs_cid"] = ipfs_cid
# Compute step_count from nodes
nodes = recipe_data.get("dag", {}).get("nodes", [])
if not nodes:
nodes = recipe_data.get("nodes", [])
if not nodes:
nodes = recipe_data.get("pipeline", [])
recipe_data["step_count"] = len(nodes) if isinstance(nodes, (list, dict)) else 0
return recipe_data
async def list_recipes(self, actor_id: str = None, offset: int = 0, limit: int = 20) -> list:
"""
List available recipes for a user.
L1 data is isolated per-user - only shows recipes owned by actor_id.
Note: This scans the cache for recipe files. For production,
you might want a database index of recipes by owner.
"""
import logging
logger = logging.getLogger(__name__)
# Get all cached items and filter for recipes
recipes = []
# Check if cache has a list method for recipes
if hasattr(self.cache, 'list_by_type'):
items = self.cache.list_by_type('recipe')
logger.info(f"Found {len(items)} recipe items in cache")
for content_hash in items:
recipe = await self.get_recipe(content_hash)
if recipe:
uploader = recipe.get("uploader")
logger.info(f"Recipe {content_hash[:12]}: uploader={uploader}, actor_id={actor_id}")
# Filter by actor - L1 is per-user
if actor_id is None or uploader == actor_id:
recipes.append(recipe)
# Sort by name
recipes.sort(key=lambda r: r.get("name", ""))
# Paginate
return recipes[offset:offset + limit]
async def upload_recipe(
self,
content: str,
uploader: str,
name: str = None,
description: str = None,
) -> Tuple[Optional[str], Optional[str]]:
"""
Upload a recipe from S-expression or YAML content.
The recipe is stored in the cache and optionally pinned to IPFS.
Returns (recipe_id, error_message).
"""
# Detect format
is_sexp = _is_sexp_format(content)
if is_sexp:
if not SEXP_AVAILABLE:
return None, "S-expression recipes require artdag.sexp module (not installed on server)"
# Validate S-expression
try:
compiled = compile_string(content)
except ParseError as e:
return None, f"Parse error: {e}"
except CompileError as e:
return None, f"Compile error: {e}"
suffix = ".sexp"
else:
# Validate YAML
try:
recipe_data = yaml.safe_load(content)
if not isinstance(recipe_data, dict):
return None, "Recipe must be a YAML dictionary"
# Add uploader info
recipe_data["uploader"] = uploader
if name:
recipe_data["name"] = name
if description:
recipe_data["description"] = description
# Serialize back
content = yaml.dump(recipe_data, default_flow_style=False)
except yaml.YAMLError as e:
return None, f"Invalid YAML: {e}"
suffix = ".yaml"
# Write to temp file for caching
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix, mode="w") as tmp:
tmp.write(content)
tmp_path = Path(tmp.name)
# Store in cache (content-addressed, auto-pins to IPFS)
cached, ipfs_cid = self.cache.put(tmp_path, node_type="recipe", move=True)
recipe_id = cached.content_hash
return recipe_id, None
except Exception as e:
return None, f"Failed to cache recipe: {e}"
async def delete_recipe(self, recipe_id: str, actor_id: str = None) -> Tuple[bool, Optional[str]]:
"""
Delete a recipe.
Note: This only removes from local cache. IPFS copies persist.
Returns (success, error_message).
"""
# Get recipe to check ownership
recipe = await self.get_recipe(recipe_id)
if not recipe:
return False, "Recipe not found"
# Check ownership if actor_id provided
if actor_id:
recipe_owner = recipe.get("uploader")
if recipe_owner and recipe_owner != actor_id:
return False, "Cannot delete: you don't own this recipe"
# Delete from cache
try:
if hasattr(self.cache, 'delete_by_content_hash'):
success, msg = self.cache.delete_by_content_hash(recipe_id)
if not success:
return False, msg
else:
# Fallback: get path and delete directly
path = self.cache.get_by_content_hash(recipe_id)
if path and path.exists():
path.unlink()
return True, None
except Exception as e:
return False, f"Failed to delete: {e}"
def parse_recipe(self, content: str) -> Dict[str, Any]:
"""Parse recipe content (S-expression or YAML)."""
is_sexp = _is_sexp_format(content)
if is_sexp and SEXP_AVAILABLE:
compiled = compile_string(content)
return compiled.to_dict()
else:
return yaml.safe_load(content)
def build_dag(self, recipe: Dict[str, Any]) -> Dict[str, Any]:
"""
Build DAG visualization data from recipe.
Returns nodes and edges for Cytoscape.js.
"""
vis_nodes = []
edges = []
dag = recipe.get("dag", {})
dag_nodes = dag.get("nodes", [])
output_node = dag.get("output")
# Handle list format from compiled S-expression recipes
if isinstance(dag_nodes, list):
for node_def in dag_nodes:
node_id = node_def.get("id")
node_type = node_def.get("type", "EFFECT")
vis_nodes.append({
"data": {
"id": node_id,
"label": node_id,
"nodeType": node_type,
"isOutput": node_id == output_node,
}
})
# Build edges from inputs
for input_ref in node_def.get("inputs", []):
if isinstance(input_ref, dict):
source = input_ref.get("node") or input_ref.get("input")
else:
source = input_ref
if source:
edges.append({
"data": {
"source": source,
"target": node_id,
}
})
# Handle dict format (legacy)
elif isinstance(dag_nodes, dict):
for node_id, node_def in dag_nodes.items():
node_type = node_def.get("type", "EFFECT")
vis_nodes.append({
"data": {
"id": node_id,
"label": node_id,
"nodeType": node_type,
"isOutput": node_id == output_node,
}
})
for input_ref in node_def.get("inputs", []):
if isinstance(input_ref, dict):
source = input_ref.get("node") or input_ref.get("input")
else:
source = input_ref
if source:
edges.append({
"data": {
"source": source,
"target": node_id,
}
})
return {"nodes": vis_nodes, "edges": edges}