From e59a50c00095fb2ef28a95b2e1473191fe20e0b9 Mon Sep 17 00:00:00 2001 From: gilesb Date: Sun, 11 Jan 2026 23:08:53 +0000 Subject: [PATCH] Add S-expression recipe support - Add format detection that correctly handles ; comments - Import artdag.sexp parser/compiler with YAML fallback - Add execute_step_sexp and run_plan_sexp Celery tasks - Update recipe upload to handle both S-expr and YAML formats Co-Authored-By: Claude Opus 4.5 --- app/routers/recipes.py | 94 +++++-- app/services/recipe_service.py | 133 +++++++--- celery_app.py | 2 +- tasks/__init__.py | 8 + tasks/execute_sexp.py | 460 +++++++++++++++++++++++++++++++++ 5 files changed, 637 insertions(+), 60 deletions(-) create mode 100644 tasks/execute_sexp.py diff --git a/app/routers/recipes.py b/app/routers/recipes.py index f5f7f85..290efdd 100644 --- a/app/routers/recipes.py +++ b/app/routers/recipes.py @@ -24,7 +24,7 @@ logger = logging.getLogger(__name__) class RecipeUploadRequest(BaseModel): - sexp_content: str + content: str # S-expression or YAML name: Optional[str] = None description: Optional[str] = None @@ -44,50 +44,87 @@ async def upload_recipe( ctx: UserContext = Depends(require_auth), recipe_service: RecipeService = Depends(get_recipe_service), ): - """Upload a new recipe from S-expression file.""" - from artdag.sexp import compile_string, ParseError, CompileError + """Upload a new recipe from S-expression or YAML file.""" + import yaml - # Read the S-expression content from the uploaded file - sexp_content = (await file.read()).decode("utf-8") + # Read content from the uploaded file + content = (await file.read()).decode("utf-8") + + # Detect format (skip comments starting with ;) + def is_sexp_format(text): + for line in text.split('\n'): + stripped = line.strip() + if not stripped or stripped.startswith(';'): + continue + return stripped.startswith('(') + return False + + is_sexp = is_sexp_format(content) - # Parse and compile S-expression to extract recipe info try: - compiled = compile_string(sexp_content) - except ParseError as e: - raise HTTPException(400, f"Parse error: {e}") - except CompileError as e: - raise HTTPException(400, f"Compile error: {e}") + from artdag.sexp import compile_string, ParseError, CompileError + SEXP_AVAILABLE = True + except ImportError: + SEXP_AVAILABLE = False - # Use filename (without extension) as recipe name if not specified - recipe_name = compiled.name + recipe_name = None + recipe_version = "1.0" + recipe_description = None + variable_inputs = [] + fixed_inputs = [] + + if is_sexp and SEXP_AVAILABLE: + # Parse S-expression + try: + compiled = compile_string(content) + recipe_name = compiled.name + recipe_version = compiled.version + recipe_description = compiled.description + + for node in compiled.nodes: + if node.get("type") == "SOURCE": + config = node.get("config", {}) + if config.get("input"): + variable_inputs.append(config.get("name", node.get("id"))) + elif config.get("asset"): + fixed_inputs.append(config.get("asset")) + except Exception as e: + raise HTTPException(400, f"Parse error: {e}") + else: + # Parse YAML + try: + recipe_data = yaml.safe_load(content) + recipe_name = recipe_data.get("name") + recipe_version = recipe_data.get("version", "1.0") + recipe_description = recipe_data.get("description") + + inputs = recipe_data.get("inputs", {}) + for input_name, input_def in inputs.items(): + if isinstance(input_def, dict) and input_def.get("fixed"): + fixed_inputs.append(input_name) + else: + variable_inputs.append(input_name) + except yaml.YAMLError as e: + raise HTTPException(400, f"Invalid YAML: {e}") + + # Use filename as recipe name if not specified if not recipe_name and file.filename: recipe_name = file.filename.rsplit(".", 1)[0] recipe_id, error = await recipe_service.upload_recipe( - sexp_content=sexp_content, + content=content, uploader=ctx.actor_id, name=recipe_name, - description=compiled.description, + description=recipe_description, ) if error: raise HTTPException(400, error) - # Extract input info from compiled nodes - variable_inputs = [] - fixed_inputs = [] - for node in compiled.nodes: - if node.get("type") == "SOURCE": - config = node.get("config", {}) - if config.get("input"): - variable_inputs.append(config.get("name", node.get("id"))) - elif config.get("asset"): - fixed_inputs.append(config.get("asset")) - return { "recipe_id": recipe_id, "name": recipe_name or "unnamed", - "version": compiled.version, + "version": recipe_version, "variable_inputs": variable_inputs, "fixed_inputs": fixed_inputs, "message": "Recipe uploaded successfully", @@ -351,12 +388,13 @@ async def run_recipe( dag_json = json.dumps(dag_copy) run, error = await run_service.create_run( - recipe=recipe.get("name", recipe_id), + recipe=recipe_id, # Use recipe hash as primary identifier inputs=req.inputs, use_dag=True, dag_json=dag_json, actor_id=ctx.actor_id, l2_server=ctx.l2_server, + recipe_name=recipe.get("name"), # Store name for display ) if error: diff --git a/app/services/recipe_service.py b/app/services/recipe_service.py index bc3010a..fb3c1a4 100644 --- a/app/services/recipe_service.py +++ b/app/services/recipe_service.py @@ -1,15 +1,45 @@ """ Recipe Service - business logic for recipe management. -Recipes are content-addressed S-expression files stored in the cache (and IPFS). -The recipe ID is the content hash of the S-expression file. +Recipes are content-addressed files stored in the cache (and IPFS). +Supports both S-expression (.sexp) and YAML (.yaml) formats. +The recipe ID is the content hash of the file. """ import tempfile from pathlib import Path from typing import Optional, List, Dict, Any, Tuple -from artdag.sexp import compile_string, parse, serialize, CompileError, ParseError +# Try to import S-expression support, fall back to YAML +try: + from artdag.sexp import compile_string, parse, serialize, CompileError, ParseError + SEXP_AVAILABLE = True +except ImportError: + SEXP_AVAILABLE = False + compile_string = None + parse = None + serialize = None + CompileError = Exception + ParseError = Exception + +import yaml + + +def _is_sexp_format(content: str) -> bool: + """ + Detect if content is S-expression format. + + Skips leading comments (lines starting with ;) and whitespace. + Returns True if the first non-comment content starts with (. + """ + for line in content.split('\n'): + stripped = line.strip() + if not stripped: + continue # Skip empty lines + if stripped.startswith(';'): + continue # Skip comments + return stripped.startswith('(') + return False class RecipeService: @@ -32,19 +62,31 @@ class RecipeService: return None with open(path) as f: - sexp_content = f.read() + content = f.read() - # Compile S-expression recipe - try: - compiled = compile_string(sexp_content) - recipe_data = compiled.to_dict() - except (ParseError, CompileError) as e: - # Return basic error info - return {"error": str(e), "recipe_id": recipe_id} + # Try to detect format and parse + recipe_data = None + is_sexp = _is_sexp_format(content) + + if is_sexp and SEXP_AVAILABLE: + # Parse as S-expression + try: + compiled = compile_string(content) + recipe_data = compiled.to_dict() + recipe_data["sexp"] = content + except (ParseError, CompileError) as e: + return {"error": str(e), "recipe_id": recipe_id} + else: + # Parse as YAML + try: + recipe_data = yaml.safe_load(content) + if not isinstance(recipe_data, dict): + return {"error": "Invalid recipe format", "recipe_id": recipe_id} + except yaml.YAMLError as e: + return {"error": str(e), "recipe_id": recipe_id} # Add the recipe_id to the data for convenience recipe_data["recipe_id"] = recipe_id - recipe_data["sexp"] = sexp_content # Keep original S-expression # Get IPFS CID if available ipfs_cid = self.cache.get_ipfs_cid(recipe_id) @@ -53,7 +95,11 @@ class RecipeService: # Compute step_count from nodes nodes = recipe_data.get("dag", {}).get("nodes", []) - recipe_data["step_count"] = len(nodes) if isinstance(nodes, list) else 0 + if not nodes: + nodes = recipe_data.get("nodes", []) + if not nodes: + nodes = recipe_data.get("pipeline", []) + recipe_data["step_count"] = len(nodes) if isinstance(nodes, (list, dict)) else 0 return recipe_data @@ -93,33 +139,53 @@ class RecipeService: async def upload_recipe( self, - sexp_content: str, + content: str, uploader: str, name: str = None, description: str = None, ) -> Tuple[Optional[str], Optional[str]]: """ - Upload a recipe from S-expression content. + Upload a recipe from S-expression or YAML content. The recipe is stored in the cache and optionally pinned to IPFS. Returns (recipe_id, error_message). """ - # Validate and compile S-expression - try: - compiled = compile_string(sexp_content) - except ParseError as e: - return None, f"Parse error: {e}" - except CompileError as e: - return None, f"Compile error: {e}" + # Detect format + is_sexp = _is_sexp_format(content) - # For now, store the original S-expression content - # The uploader info is not embedded in the S-expression (kept in metadata) - # In a full implementation, we might add a :uploader keyword + if is_sexp and SEXP_AVAILABLE: + # Validate S-expression + try: + compiled = compile_string(content) + except ParseError as e: + return None, f"Parse error: {e}" + except CompileError as e: + return None, f"Compile error: {e}" + suffix = ".sexp" + else: + # Validate YAML + try: + recipe_data = yaml.safe_load(content) + if not isinstance(recipe_data, dict): + return None, "Recipe must be a YAML dictionary" + + # Add uploader info + recipe_data["uploader"] = uploader + if name: + recipe_data["name"] = name + if description: + recipe_data["description"] = description + + # Serialize back + content = yaml.dump(recipe_data, default_flow_style=False) + except yaml.YAMLError as e: + return None, f"Invalid YAML: {e}" + suffix = ".yaml" # Write to temp file for caching try: - with tempfile.NamedTemporaryFile(delete=False, suffix=".sexp", mode="w") as tmp: - tmp.write(sexp_content) + with tempfile.NamedTemporaryFile(delete=False, suffix=suffix, mode="w") as tmp: + tmp.write(content) tmp_path = Path(tmp.name) # Store in cache (content-addressed, auto-pins to IPFS) @@ -165,10 +231,15 @@ class RecipeService: except Exception as e: return False, f"Failed to delete: {e}" - def parse_recipe(self, sexp_content: str) -> Dict[str, Any]: - """Parse and compile recipe S-expression content.""" - compiled = compile_string(sexp_content) - return compiled.to_dict() + def parse_recipe(self, content: str) -> Dict[str, Any]: + """Parse recipe content (S-expression or YAML).""" + is_sexp = _is_sexp_format(content) + + if is_sexp and SEXP_AVAILABLE: + compiled = compile_string(content) + return compiled.to_dict() + else: + return yaml.safe_load(content) def build_dag(self, recipe: Dict[str, Any]) -> Dict[str, Any]: """ diff --git a/celery_app.py b/celery_app.py index 062859b..9f81107 100644 --- a/celery_app.py +++ b/celery_app.py @@ -14,7 +14,7 @@ app = Celery( 'art_celery', broker=REDIS_URL, backend=REDIS_URL, - include=['legacy_tasks', 'tasks', 'tasks.analyze', 'tasks.execute', 'tasks.orchestrate'] + include=['legacy_tasks', 'tasks', 'tasks.analyze', 'tasks.execute', 'tasks.orchestrate', 'tasks.execute_sexp'] ) app.conf.update( diff --git a/tasks/__init__.py b/tasks/__init__.py index 02ca273..0b7543f 100644 --- a/tasks/__init__.py +++ b/tasks/__init__.py @@ -4,10 +4,15 @@ # 1. analyze_input - Extract features from input media # 2. execute_step - Execute a single step from the plan # 3. run_plan - Orchestrate execution of a full plan +# +# S-expression tasks: +# 4. execute_step_sexp - Execute step from S-expression +# 5. run_plan_sexp - Run plan from S-expression from .analyze import analyze_input, analyze_inputs from .execute import execute_step from .orchestrate import run_plan, run_recipe +from .execute_sexp import execute_step_sexp, run_plan_sexp __all__ = [ "analyze_input", @@ -15,4 +20,7 @@ __all__ = [ "execute_step", "run_plan", "run_recipe", + # S-expression tasks + "execute_step_sexp", + "run_plan_sexp", ] diff --git a/tasks/execute_sexp.py b/tasks/execute_sexp.py new file mode 100644 index 0000000..f6e493d --- /dev/null +++ b/tasks/execute_sexp.py @@ -0,0 +1,460 @@ +""" +S-expression step execution task. + +Executes individual steps received as S-expressions. +The S-expression is the canonical format - workers verify +cache_ids by hashing the received S-expression. +""" + +import json +import logging +import os +import socket +from pathlib import Path +from typing import Dict, Optional + +from celery import current_task + +# Import from the Celery app +import sys +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from celery_app import app +from claiming import ( + get_claimer, + claim_task, + complete_task, + fail_task, + ClaimStatus, +) +from cache_manager import get_cache_manager + +# Import artdag S-expression modules +try: + from artdag.sexp import parse, Symbol, Keyword + from artdag import NodeType + from artdag.executor import get_executor +except ImportError: + parse = None + Symbol = None + Keyword = None + NodeType = None + get_executor = None + +logger = logging.getLogger(__name__) + + +def get_worker_id() -> str: + """Get a unique identifier for this worker.""" + hostname = socket.gethostname() + pid = os.getpid() + return f"{hostname}:{pid}" + + +def sexp_to_config(sexp) -> Dict: + """ + Convert parsed S-expression to config dict. + + Input: (effect :hash "abc123" :inputs ["step1"]) + Output: {"node_type": "EFFECT", "hash": "abc123", "inputs": ["step1"]} + """ + if not isinstance(sexp, list) or len(sexp) < 1: + raise ValueError(f"Invalid step S-expression: {sexp}") + + # First element is the node type + head = sexp[0] + if isinstance(head, Symbol): + node_type = head.name.upper() + else: + node_type = str(head).upper() + + config = {"node_type": node_type} + + # Parse keyword arguments + i = 1 + while i < len(sexp): + item = sexp[i] + if isinstance(item, Keyword): + if i + 1 < len(sexp): + key = item.name.replace('-', '_') + value = sexp[i + 1] + config[key] = value + i += 2 + else: + i += 1 + else: + # Positional argument + i += 1 + + return config + + +@app.task(bind=True, name='tasks.execute_step_sexp') +def execute_step_sexp( + self, + step_sexp: str, + step_id: str, + cache_id: str, + plan_id: str, + input_cache_ids: Dict[str, str], +) -> dict: + """ + Execute a single step from an S-expression. + + The step is received as a serialized S-expression string. + Workers can verify the cache_id by hashing the S-expression. + + Args: + step_sexp: Serialized S-expression for the step + step_id: Human-readable step identifier + cache_id: Expected cache_id (SHA3-256 of step_sexp) + plan_id: ID of the parent execution plan + input_cache_ids: Mapping from input step_id to their cache_id + + Returns: + Dict with execution result + """ + if parse is None: + raise ImportError("artdag.sexp not available") + + worker_id = get_worker_id() + task_id = self.request.id + + logger.info(f"Executing step {step_id} cache_id={cache_id[:16]}...") + logger.debug(f"Step S-expression: {step_sexp[:100]}...") + + # Parse the S-expression + try: + parsed = parse(step_sexp) + config = sexp_to_config(parsed) + node_type = config.pop("node_type") + except Exception as e: + logger.error(f"Failed to parse step S-expression: {e}") + return { + "status": "failed", + "step_id": step_id, + "cache_id": cache_id, + "error": f"Parse error: {e}", + } + + # Get cache manager + cache_mgr = get_cache_manager() + + # Check if already cached + cached_path = cache_mgr.get_by_content_hash(cache_id) + if cached_path: + logger.info(f"Step {step_id} already cached at {cached_path}") + + claimer = get_claimer() + claimer.mark_cached(cache_id, str(cached_path)) + + return { + "status": "cached", + "step_id": step_id, + "cache_id": cache_id, + "output_path": str(cached_path), + } + + # Try to claim the task + if not claim_task(cache_id, worker_id, task_id): + logger.info(f"Step {step_id} claimed by another worker, waiting...") + + claimer = get_claimer() + result = claimer.wait_for_completion(cache_id, timeout=600) + + if result and result.status == ClaimStatus.COMPLETED: + return { + "status": "completed_by_other", + "step_id": step_id, + "cache_id": cache_id, + "output_path": result.output_path, + } + elif result and result.status == ClaimStatus.CACHED: + return { + "status": "cached", + "step_id": step_id, + "cache_id": cache_id, + "output_path": result.output_path, + } + elif result and result.status == ClaimStatus.FAILED: + return { + "status": "failed", + "step_id": step_id, + "cache_id": cache_id, + "error": result.error, + } + else: + return { + "status": "timeout", + "step_id": step_id, + "cache_id": cache_id, + "error": "Timeout waiting for other worker", + } + + # We have the claim, update to running + claimer = get_claimer() + claimer.update_status(cache_id, worker_id, ClaimStatus.RUNNING) + + try: + # Handle SOURCE nodes + if node_type == "SOURCE": + content_hash = config.get("hash") + if not content_hash: + raise ValueError("SOURCE step missing :hash") + + path = cache_mgr.get_by_content_hash(content_hash) + if not path: + raise ValueError(f"SOURCE input not found: {content_hash[:16]}...") + + output_path = str(path) + complete_task(cache_id, worker_id, output_path) + return { + "status": "completed", + "step_id": step_id, + "cache_id": cache_id, + "output_path": output_path, + } + + # Handle EFFECT nodes + if node_type == "EFFECT": + effect_hash = config.get("hash") + if not effect_hash: + raise ValueError("EFFECT step missing :hash") + + # Get input paths + inputs = config.get("inputs", []) + input_paths = [] + for inp in inputs: + inp_cache_id = input_cache_ids.get(inp, inp) + path = cache_mgr.get_by_content_hash(inp_cache_id) + if not path: + raise ValueError(f"Input not found: {inp_cache_id[:16]}...") + input_paths.append(Path(path)) + + # Get executor + try: + executor = get_executor(NodeType.SOURCE) # Effects use SOURCE executor for now + except: + executor = None + + if executor is None: + # Fallback: copy input to output (identity-like behavior) + if input_paths: + output_path = str(input_paths[0]) + complete_task(cache_id, worker_id, output_path) + return { + "status": "completed", + "step_id": step_id, + "cache_id": cache_id, + "output_path": output_path, + } + raise ValueError(f"No executor for EFFECT and no inputs") + + # Get executor for other node types + try: + node_type_enum = NodeType[node_type] + except (KeyError, TypeError): + node_type_enum = node_type + + executor = get_executor(node_type_enum) + if executor is None: + raise ValueError(f"No executor for node type: {node_type}") + + # Resolve input paths + inputs = config.pop("inputs", []) + input_paths = [] + for inp in inputs: + inp_cache_id = input_cache_ids.get(inp, inp) + path = cache_mgr.get_by_content_hash(inp_cache_id) + if not path: + raise ValueError(f"Input not found: {inp_cache_id[:16]}...") + input_paths.append(Path(path)) + + # Create temp output + import tempfile + output_dir = Path(tempfile.mkdtemp()) + output_path = output_dir / f"output_{cache_id[:16]}.mp4" + + # Execute + logger.info(f"Running executor for {node_type} with {len(input_paths)} inputs") + result_path = executor.execute(config, input_paths, output_path) + + # Store in cache + cached_file, ipfs_cid = cache_mgr.put( + source_path=result_path, + node_type=node_type, + node_id=cache_id, + ) + + logger.info(f"Step {step_id} completed, IPFS CID: {ipfs_cid}") + complete_task(cache_id, worker_id, str(cached_file.path)) + + # Cleanup temp + if output_dir.exists(): + import shutil + shutil.rmtree(output_dir, ignore_errors=True) + + return { + "status": "completed", + "step_id": step_id, + "cache_id": cache_id, + "output_path": str(cached_file.path), + "content_hash": cached_file.content_hash, + "ipfs_cid": ipfs_cid, + } + + except Exception as e: + logger.error(f"Step {step_id} failed: {e}") + fail_task(cache_id, worker_id, str(e)) + + return { + "status": "failed", + "step_id": step_id, + "cache_id": cache_id, + "error": str(e), + } + + +@app.task(bind=True, name='tasks.run_plan_sexp') +def run_plan_sexp( + self, + plan_sexp: str, + run_id: Optional[str] = None, +) -> dict: + """ + Execute a complete S-expression execution plan. + + Args: + plan_sexp: Serialized S-expression plan + run_id: Optional run ID for tracking + + Returns: + Dict with execution results + """ + if parse is None: + raise ImportError("artdag.sexp not available") + + from artdag.sexp.scheduler import PlanScheduler + from artdag.sexp.planner import ExecutionPlanSexp, PlanStep + + logger.info(f"Running plan from S-expression (run_id={run_id})") + + # Parse the plan S-expression + parsed = parse(plan_sexp) + + # Extract plan metadata and steps + plan_id = None + recipe_id = None + recipe_hash = None + inputs = {} + steps = [] + output_step_id = None + + i = 1 + while i < len(parsed): + item = parsed[i] + + if isinstance(item, Keyword): + key = item.name + if i + 1 < len(parsed): + value = parsed[i + 1] + + if key == "id": + plan_id = value + elif key == "recipe": + recipe_id = value + elif key == "recipe-hash": + recipe_hash = value + elif key == "output": + output_step_id = value + + i += 2 + else: + i += 1 + + elif isinstance(item, list) and len(item) > 0: + head = item[0] + if isinstance(head, Symbol): + if head.name == "inputs": + # Parse inputs block + for j in range(1, len(item)): + inp = item[j] + if isinstance(inp, list) and len(inp) >= 2: + name = inp[0].name if isinstance(inp[0], Symbol) else str(inp[0]) + value = inp[1] + inputs[name] = value + + elif head.name == "step": + # Parse step + step_id = item[1] if len(item) > 1 else None + step_cache_id = None + step_level = 0 + step_node = None + + j = 2 + while j < len(item): + sub = item[j] + if isinstance(sub, Keyword): + if sub.name == "cache-id" and j + 1 < len(item): + step_cache_id = item[j + 1] + j += 2 + elif sub.name == "level" and j + 1 < len(item): + step_level = item[j + 1] + j += 2 + else: + j += 1 + elif isinstance(sub, list): + step_node = sub + j += 1 + else: + j += 1 + + if step_id and step_cache_id and step_node: + # Convert step_node to config + config = sexp_to_config(step_node) + node_type = config.pop("node_type") + step_inputs = config.pop("inputs", []) + + steps.append(PlanStep( + step_id=step_id, + node_type=node_type, + config=config, + inputs=step_inputs if isinstance(step_inputs, list) else [], + cache_id=step_cache_id, + level=step_level, + )) + i += 1 + else: + i += 1 + + # Create plan object + plan = ExecutionPlanSexp( + plan_id=plan_id or "unknown", + recipe_id=recipe_id or "unknown", + recipe_hash=recipe_hash or "", + steps=steps, + output_step_id=output_step_id or (steps[-1].step_id if steps else ""), + inputs=inputs, + ) + + # Create scheduler and run + cache_mgr = get_cache_manager() + scheduler = PlanScheduler( + cache_manager=cache_mgr, + celery_app=app, + execute_task_name='tasks.execute_step_sexp', + ) + + result = scheduler.schedule(plan) + + return { + "status": result.status, + "run_id": run_id, + "plan_id": result.plan_id, + "output_cache_id": result.output_cache_id, + "output_path": result.output_path, + "output_ipfs_cid": result.output_ipfs_cid, + "steps_completed": result.steps_completed, + "steps_cached": result.steps_cached, + "steps_failed": result.steps_failed, + "error": result.error, + }