Add S-expression recipe support

- Add format detection that correctly handles ; comments
- Import artdag.sexp parser/compiler with YAML fallback
- Add execute_step_sexp and run_plan_sexp Celery tasks
- Update recipe upload to handle both S-expr and YAML formats

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-11 23:08:53 +00:00
parent 9df78f771d
commit e59a50c000
5 changed files with 637 additions and 60 deletions

View File

@@ -4,10 +4,15 @@
# 1. analyze_input - Extract features from input media
# 2. execute_step - Execute a single step from the plan
# 3. run_plan - Orchestrate execution of a full plan
#
# S-expression tasks:
# 4. execute_step_sexp - Execute step from S-expression
# 5. run_plan_sexp - Run plan from S-expression
from .analyze import analyze_input, analyze_inputs
from .execute import execute_step
from .orchestrate import run_plan, run_recipe
from .execute_sexp import execute_step_sexp, run_plan_sexp
__all__ = [
"analyze_input",
@@ -15,4 +20,7 @@ __all__ = [
"execute_step",
"run_plan",
"run_recipe",
# S-expression tasks
"execute_step_sexp",
"run_plan_sexp",
]

460
tasks/execute_sexp.py Normal file
View File

@@ -0,0 +1,460 @@
"""
S-expression step execution task.
Executes individual steps received as S-expressions.
The S-expression is the canonical format - workers verify
cache_ids by hashing the received S-expression.
"""
import json
import logging
import os
import socket
from pathlib import Path
from typing import Dict, Optional
from celery import current_task
# Import from the Celery app
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from celery_app import app
from claiming import (
get_claimer,
claim_task,
complete_task,
fail_task,
ClaimStatus,
)
from cache_manager import get_cache_manager
# Import artdag S-expression modules
try:
from artdag.sexp import parse, Symbol, Keyword
from artdag import NodeType
from artdag.executor import get_executor
except ImportError:
parse = None
Symbol = None
Keyword = None
NodeType = None
get_executor = None
logger = logging.getLogger(__name__)
def get_worker_id() -> str:
"""Get a unique identifier for this worker."""
hostname = socket.gethostname()
pid = os.getpid()
return f"{hostname}:{pid}"
def sexp_to_config(sexp) -> Dict:
"""
Convert parsed S-expression to config dict.
Input: (effect :hash "abc123" :inputs ["step1"])
Output: {"node_type": "EFFECT", "hash": "abc123", "inputs": ["step1"]}
"""
if not isinstance(sexp, list) or len(sexp) < 1:
raise ValueError(f"Invalid step S-expression: {sexp}")
# First element is the node type
head = sexp[0]
if isinstance(head, Symbol):
node_type = head.name.upper()
else:
node_type = str(head).upper()
config = {"node_type": node_type}
# Parse keyword arguments
i = 1
while i < len(sexp):
item = sexp[i]
if isinstance(item, Keyword):
if i + 1 < len(sexp):
key = item.name.replace('-', '_')
value = sexp[i + 1]
config[key] = value
i += 2
else:
i += 1
else:
# Positional argument
i += 1
return config
@app.task(bind=True, name='tasks.execute_step_sexp')
def execute_step_sexp(
self,
step_sexp: str,
step_id: str,
cache_id: str,
plan_id: str,
input_cache_ids: Dict[str, str],
) -> dict:
"""
Execute a single step from an S-expression.
The step is received as a serialized S-expression string.
Workers can verify the cache_id by hashing the S-expression.
Args:
step_sexp: Serialized S-expression for the step
step_id: Human-readable step identifier
cache_id: Expected cache_id (SHA3-256 of step_sexp)
plan_id: ID of the parent execution plan
input_cache_ids: Mapping from input step_id to their cache_id
Returns:
Dict with execution result
"""
if parse is None:
raise ImportError("artdag.sexp not available")
worker_id = get_worker_id()
task_id = self.request.id
logger.info(f"Executing step {step_id} cache_id={cache_id[:16]}...")
logger.debug(f"Step S-expression: {step_sexp[:100]}...")
# Parse the S-expression
try:
parsed = parse(step_sexp)
config = sexp_to_config(parsed)
node_type = config.pop("node_type")
except Exception as e:
logger.error(f"Failed to parse step S-expression: {e}")
return {
"status": "failed",
"step_id": step_id,
"cache_id": cache_id,
"error": f"Parse error: {e}",
}
# Get cache manager
cache_mgr = get_cache_manager()
# Check if already cached
cached_path = cache_mgr.get_by_content_hash(cache_id)
if cached_path:
logger.info(f"Step {step_id} already cached at {cached_path}")
claimer = get_claimer()
claimer.mark_cached(cache_id, str(cached_path))
return {
"status": "cached",
"step_id": step_id,
"cache_id": cache_id,
"output_path": str(cached_path),
}
# Try to claim the task
if not claim_task(cache_id, worker_id, task_id):
logger.info(f"Step {step_id} claimed by another worker, waiting...")
claimer = get_claimer()
result = claimer.wait_for_completion(cache_id, timeout=600)
if result and result.status == ClaimStatus.COMPLETED:
return {
"status": "completed_by_other",
"step_id": step_id,
"cache_id": cache_id,
"output_path": result.output_path,
}
elif result and result.status == ClaimStatus.CACHED:
return {
"status": "cached",
"step_id": step_id,
"cache_id": cache_id,
"output_path": result.output_path,
}
elif result and result.status == ClaimStatus.FAILED:
return {
"status": "failed",
"step_id": step_id,
"cache_id": cache_id,
"error": result.error,
}
else:
return {
"status": "timeout",
"step_id": step_id,
"cache_id": cache_id,
"error": "Timeout waiting for other worker",
}
# We have the claim, update to running
claimer = get_claimer()
claimer.update_status(cache_id, worker_id, ClaimStatus.RUNNING)
try:
# Handle SOURCE nodes
if node_type == "SOURCE":
content_hash = config.get("hash")
if not content_hash:
raise ValueError("SOURCE step missing :hash")
path = cache_mgr.get_by_content_hash(content_hash)
if not path:
raise ValueError(f"SOURCE input not found: {content_hash[:16]}...")
output_path = str(path)
complete_task(cache_id, worker_id, output_path)
return {
"status": "completed",
"step_id": step_id,
"cache_id": cache_id,
"output_path": output_path,
}
# Handle EFFECT nodes
if node_type == "EFFECT":
effect_hash = config.get("hash")
if not effect_hash:
raise ValueError("EFFECT step missing :hash")
# Get input paths
inputs = config.get("inputs", [])
input_paths = []
for inp in inputs:
inp_cache_id = input_cache_ids.get(inp, inp)
path = cache_mgr.get_by_content_hash(inp_cache_id)
if not path:
raise ValueError(f"Input not found: {inp_cache_id[:16]}...")
input_paths.append(Path(path))
# Get executor
try:
executor = get_executor(NodeType.SOURCE) # Effects use SOURCE executor for now
except:
executor = None
if executor is None:
# Fallback: copy input to output (identity-like behavior)
if input_paths:
output_path = str(input_paths[0])
complete_task(cache_id, worker_id, output_path)
return {
"status": "completed",
"step_id": step_id,
"cache_id": cache_id,
"output_path": output_path,
}
raise ValueError(f"No executor for EFFECT and no inputs")
# Get executor for other node types
try:
node_type_enum = NodeType[node_type]
except (KeyError, TypeError):
node_type_enum = node_type
executor = get_executor(node_type_enum)
if executor is None:
raise ValueError(f"No executor for node type: {node_type}")
# Resolve input paths
inputs = config.pop("inputs", [])
input_paths = []
for inp in inputs:
inp_cache_id = input_cache_ids.get(inp, inp)
path = cache_mgr.get_by_content_hash(inp_cache_id)
if not path:
raise ValueError(f"Input not found: {inp_cache_id[:16]}...")
input_paths.append(Path(path))
# Create temp output
import tempfile
output_dir = Path(tempfile.mkdtemp())
output_path = output_dir / f"output_{cache_id[:16]}.mp4"
# Execute
logger.info(f"Running executor for {node_type} with {len(input_paths)} inputs")
result_path = executor.execute(config, input_paths, output_path)
# Store in cache
cached_file, ipfs_cid = cache_mgr.put(
source_path=result_path,
node_type=node_type,
node_id=cache_id,
)
logger.info(f"Step {step_id} completed, IPFS CID: {ipfs_cid}")
complete_task(cache_id, worker_id, str(cached_file.path))
# Cleanup temp
if output_dir.exists():
import shutil
shutil.rmtree(output_dir, ignore_errors=True)
return {
"status": "completed",
"step_id": step_id,
"cache_id": cache_id,
"output_path": str(cached_file.path),
"content_hash": cached_file.content_hash,
"ipfs_cid": ipfs_cid,
}
except Exception as e:
logger.error(f"Step {step_id} failed: {e}")
fail_task(cache_id, worker_id, str(e))
return {
"status": "failed",
"step_id": step_id,
"cache_id": cache_id,
"error": str(e),
}
@app.task(bind=True, name='tasks.run_plan_sexp')
def run_plan_sexp(
self,
plan_sexp: str,
run_id: Optional[str] = None,
) -> dict:
"""
Execute a complete S-expression execution plan.
Args:
plan_sexp: Serialized S-expression plan
run_id: Optional run ID for tracking
Returns:
Dict with execution results
"""
if parse is None:
raise ImportError("artdag.sexp not available")
from artdag.sexp.scheduler import PlanScheduler
from artdag.sexp.planner import ExecutionPlanSexp, PlanStep
logger.info(f"Running plan from S-expression (run_id={run_id})")
# Parse the plan S-expression
parsed = parse(plan_sexp)
# Extract plan metadata and steps
plan_id = None
recipe_id = None
recipe_hash = None
inputs = {}
steps = []
output_step_id = None
i = 1
while i < len(parsed):
item = parsed[i]
if isinstance(item, Keyword):
key = item.name
if i + 1 < len(parsed):
value = parsed[i + 1]
if key == "id":
plan_id = value
elif key == "recipe":
recipe_id = value
elif key == "recipe-hash":
recipe_hash = value
elif key == "output":
output_step_id = value
i += 2
else:
i += 1
elif isinstance(item, list) and len(item) > 0:
head = item[0]
if isinstance(head, Symbol):
if head.name == "inputs":
# Parse inputs block
for j in range(1, len(item)):
inp = item[j]
if isinstance(inp, list) and len(inp) >= 2:
name = inp[0].name if isinstance(inp[0], Symbol) else str(inp[0])
value = inp[1]
inputs[name] = value
elif head.name == "step":
# Parse step
step_id = item[1] if len(item) > 1 else None
step_cache_id = None
step_level = 0
step_node = None
j = 2
while j < len(item):
sub = item[j]
if isinstance(sub, Keyword):
if sub.name == "cache-id" and j + 1 < len(item):
step_cache_id = item[j + 1]
j += 2
elif sub.name == "level" and j + 1 < len(item):
step_level = item[j + 1]
j += 2
else:
j += 1
elif isinstance(sub, list):
step_node = sub
j += 1
else:
j += 1
if step_id and step_cache_id and step_node:
# Convert step_node to config
config = sexp_to_config(step_node)
node_type = config.pop("node_type")
step_inputs = config.pop("inputs", [])
steps.append(PlanStep(
step_id=step_id,
node_type=node_type,
config=config,
inputs=step_inputs if isinstance(step_inputs, list) else [],
cache_id=step_cache_id,
level=step_level,
))
i += 1
else:
i += 1
# Create plan object
plan = ExecutionPlanSexp(
plan_id=plan_id or "unknown",
recipe_id=recipe_id or "unknown",
recipe_hash=recipe_hash or "",
steps=steps,
output_step_id=output_step_id or (steps[-1].step_id if steps else ""),
inputs=inputs,
)
# Create scheduler and run
cache_mgr = get_cache_manager()
scheduler = PlanScheduler(
cache_manager=cache_mgr,
celery_app=app,
execute_task_name='tasks.execute_step_sexp',
)
result = scheduler.schedule(plan)
return {
"status": result.status,
"run_id": run_id,
"plan_id": result.plan_id,
"output_cache_id": result.output_cache_id,
"output_path": result.output_path,
"output_ipfs_cid": result.output_ipfs_cid,
"steps_completed": result.steps_completed,
"steps_cached": result.steps_cached,
"steps_failed": result.steps_failed,
"error": result.error,
}