Add COMPOUND node execution and S-expression API

- Execute COMPOUND nodes with combined FFmpeg filter chain
- Handle TRANSFORM, RESIZE, SEGMENT filters in chain
- Migrate orchestrator to S-expression recipes (remove YAML)
- Update API endpoints to use recipe_sexp parameter
- Extract analysis nodes from recipe for dynamic analysis

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-12 01:26:26 +00:00
parent 3599f3779b
commit 8e0b473925
4 changed files with 391 additions and 79 deletions

View File

@@ -11,7 +11,6 @@ import uuid
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Dict, List, Optional from typing import Dict, List, Optional
import yaml
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel from pydantic import BaseModel
@@ -26,9 +25,8 @@ RUNS_KEY_PREFIX = "artdag:run:"
class PlanRequest(BaseModel): class PlanRequest(BaseModel):
recipe_yaml: str recipe_sexp: str
input_hashes: Dict[str, str] input_hashes: Dict[str, str]
features: List[str] = ["beats", "energy"]
class ExecutePlanRequest(BaseModel): class ExecutePlanRequest(BaseModel):
@@ -37,9 +35,8 @@ class ExecutePlanRequest(BaseModel):
class RecipeRunRequest(BaseModel): class RecipeRunRequest(BaseModel):
recipe_yaml: str recipe_sexp: str
input_hashes: Dict[str, str] input_hashes: Dict[str, str]
features: List[str] = ["beats", "energy"]
def compute_run_id(input_hashes: List[str], recipe: str, recipe_hash: str = None) -> str: def compute_run_id(input_hashes: List[str], recipe: str, recipe_hash: str = None) -> str:
@@ -68,9 +65,8 @@ async def generate_plan_endpoint(
try: try:
task = generate_plan.delay( task = generate_plan.delay(
recipe_yaml=request.recipe_yaml, recipe_sexp=request.recipe_sexp,
input_hashes=request.input_hashes, input_hashes=request.input_hashes,
features=request.features,
) )
# Wait for result (plan generation is usually fast) # Wait for result (plan generation is usually fast)
@@ -136,15 +132,16 @@ async def run_recipe_endpoint(
Returns immediately with run_id. Poll /api/run/{run_id} for status. Returns immediately with run_id. Poll /api/run/{run_id} for status.
""" """
from tasks.orchestrate import run_recipe from tasks.orchestrate import run_recipe
from artdag.sexp import compile_string
import database import database
redis = get_redis_client() redis = get_redis_client()
cache = get_cache_manager() cache = get_cache_manager()
# Parse recipe name # Parse recipe name from S-expression
try: try:
recipe_data = yaml.safe_load(request.recipe_yaml) compiled = compile_string(request.recipe_sexp)
recipe_name = recipe_data.get("name", "unknown") recipe_name = compiled.name or "unknown"
except Exception: except Exception:
recipe_name = "unknown" recipe_name = "unknown"
@@ -152,7 +149,7 @@ async def run_recipe_endpoint(
run_id = compute_run_id( run_id = compute_run_id(
list(request.input_hashes.values()), list(request.input_hashes.values()),
recipe_name, recipe_name,
hashlib.sha3_256(request.recipe_yaml.encode()).hexdigest() hashlib.sha3_256(request.recipe_sexp.encode()).hexdigest()
) )
# Check if already completed # Check if already completed
@@ -171,9 +168,8 @@ async def run_recipe_endpoint(
# Submit to Celery # Submit to Celery
try: try:
task = run_recipe.delay( task = run_recipe.delay(
recipe_yaml=request.recipe_yaml, recipe_sexp=request.recipe_sexp,
input_hashes=request.input_hashes, input_hashes=request.input_hashes,
features=request.features,
run_id=run_id, run_id=run_id,
) )

View File

@@ -178,6 +178,109 @@ def execute_step(
"item_paths": item_paths, "item_paths": item_paths,
} }
# Handle COMPOUND nodes (collapsed effect chains)
if step.node_type == "COMPOUND":
filter_chain = step.config.get("filter_chain", [])
if not filter_chain:
raise ValueError("COMPOUND step has empty filter_chain")
# Resolve input paths
input_paths = []
for input_step_id in step.input_steps:
input_cache_id = input_cache_ids.get(input_step_id)
if not input_cache_id:
raise ValueError(f"No cache_id for input step: {input_step_id}")
path = cache_mgr.get_by_content_hash(input_cache_id)
if not path:
raise ValueError(f"Input not in cache: {input_cache_id[:16]}...")
input_paths.append(Path(path))
if not input_paths:
raise ValueError("COMPOUND step has no inputs")
# Build FFmpeg filter graph from chain
import subprocess
import tempfile
filters = []
for filter_item in filter_chain:
filter_type = filter_item.get("type", "")
filter_config = filter_item.get("config", {})
if filter_type == "TRANSFORM":
effects = filter_config.get("effects", {})
for eff_name, eff_value in effects.items():
if eff_name == "saturation":
filters.append(f"eq=saturation={eff_value}")
elif eff_name == "brightness":
filters.append(f"eq=brightness={eff_value}")
elif eff_name == "contrast":
filters.append(f"eq=contrast={eff_value}")
elif eff_name == "hue":
filters.append(f"hue=h={eff_value}")
elif filter_type == "RESIZE":
width = filter_config.get("width", -1)
height = filter_config.get("height", -1)
mode = filter_config.get("mode", "fit")
if mode == "fit":
filters.append(f"scale={width}:{height}:force_original_aspect_ratio=decrease")
elif mode == "fill":
filters.append(f"scale={width}:{height}:force_original_aspect_ratio=increase,crop={width}:{height}")
else:
filters.append(f"scale={width}:{height}")
output_dir = Path(tempfile.mkdtemp())
output_path = output_dir / f"compound_{step.cache_id[:16]}.mp4"
cmd = ["ffmpeg", "-y", "-i", str(input_paths[0])]
# Handle segment timing
for filter_item in filter_chain:
if filter_item.get("type") == "SEGMENT":
seg_config = filter_item.get("config", {})
if "start" in seg_config:
cmd.extend(["-ss", str(seg_config["start"])])
if "end" in seg_config:
duration = seg_config["end"] - seg_config.get("start", 0)
cmd.extend(["-t", str(duration)])
elif "duration" in seg_config:
cmd.extend(["-t", str(seg_config["duration"])])
if filters:
cmd.extend(["-vf", ",".join(filters)])
cmd.extend(["-c:v", "libx264", "-c:a", "aac", str(output_path)])
logger.info(f"Running COMPOUND FFmpeg: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg failed: {result.stderr}")
cached_file, ipfs_cid = cache_mgr.put(
source_path=output_path,
node_type="COMPOUND",
node_id=step.cache_id,
)
logger.info(f"COMPOUND step {step.step_id} completed with {len(filter_chain)} filters")
complete_task(step.cache_id, worker_id, str(cached_file.path))
import shutil
if output_dir.exists():
shutil.rmtree(output_dir, ignore_errors=True)
return {
"status": "completed",
"step_id": step.step_id,
"cache_id": step.cache_id,
"output_path": str(cached_file.path),
"content_hash": cached_file.content_hash,
"ipfs_cid": ipfs_cid,
"filter_count": len(filter_chain),
}
# Get executor for this node type # Get executor for this node type
try: try:
node_type = NodeType[step.node_type] node_type = NodeType[step.node_type]

View File

@@ -249,6 +249,127 @@ def execute_step_sexp(
} }
raise ValueError(f"No executor for EFFECT and no inputs") raise ValueError(f"No executor for EFFECT and no inputs")
# Handle COMPOUND nodes (collapsed effect chains)
if node_type == "COMPOUND":
filter_chain = config.get("filter_chain", [])
if not filter_chain:
raise ValueError("COMPOUND step has empty filter_chain")
# Get input paths
inputs = config.get("inputs", [])
input_paths = []
for inp in inputs:
inp_cache_id = input_cache_ids.get(inp, inp)
path = cache_mgr.get_by_content_hash(inp_cache_id)
if not path:
raise ValueError(f"Input not found: {inp_cache_id[:16]}...")
input_paths.append(Path(path))
if not input_paths:
raise ValueError("COMPOUND step has no inputs")
# Build FFmpeg filter graph from chain
filters = []
for i, filter_item in enumerate(filter_chain):
filter_type = filter_item.get("type", "")
filter_config = filter_item.get("config", {})
if filter_type == "EFFECT":
# Effect - for now identity-like, can be extended
effect_hash = filter_config.get("hash") or filter_config.get("effect")
# TODO: resolve effect to actual FFmpeg filter
# For now, skip identity-like effects
pass
elif filter_type == "TRANSFORM":
# Transform effects map to FFmpeg filters
effects = filter_config.get("effects", {})
for eff_name, eff_value in effects.items():
if eff_name == "saturation":
filters.append(f"eq=saturation={eff_value}")
elif eff_name == "brightness":
filters.append(f"eq=brightness={eff_value}")
elif eff_name == "contrast":
filters.append(f"eq=contrast={eff_value}")
elif eff_name == "hue":
filters.append(f"hue=h={eff_value}")
elif filter_type == "RESIZE":
width = filter_config.get("width", -1)
height = filter_config.get("height", -1)
mode = filter_config.get("mode", "fit")
if mode == "fit":
filters.append(f"scale={width}:{height}:force_original_aspect_ratio=decrease")
elif mode == "fill":
filters.append(f"scale={width}:{height}:force_original_aspect_ratio=increase,crop={width}:{height}")
else:
filters.append(f"scale={width}:{height}")
elif filter_type == "SEGMENT":
# Segment handled via -ss and -t, not filter
pass
# Create temp output
import tempfile
import subprocess
output_dir = Path(tempfile.mkdtemp())
output_path = output_dir / f"compound_{cache_id[:16]}.mp4"
# Build FFmpeg command
input_path = input_paths[0]
cmd = ["ffmpeg", "-y", "-i", str(input_path)]
# Handle segment timing if present
for filter_item in filter_chain:
if filter_item.get("type") == "SEGMENT":
seg_config = filter_item.get("config", {})
if "start" in seg_config:
cmd.extend(["-ss", str(seg_config["start"])])
if "end" in seg_config:
duration = seg_config["end"] - seg_config.get("start", 0)
cmd.extend(["-t", str(duration)])
elif "duration" in seg_config:
cmd.extend(["-t", str(seg_config["duration"])])
# Add filter graph if any
if filters:
cmd.extend(["-vf", ",".join(filters)])
# Output options
cmd.extend(["-c:v", "libx264", "-c:a", "aac", str(output_path)])
logger.info(f"Running COMPOUND FFmpeg: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"FFmpeg failed: {result.stderr}")
# Store in cache
cached_file, ipfs_cid = cache_mgr.put(
source_path=output_path,
node_type="COMPOUND",
node_id=cache_id,
)
logger.info(f"COMPOUND step {step_id} completed with {len(filter_chain)} filters, IPFS CID: {ipfs_cid}")
complete_task(cache_id, worker_id, str(cached_file.path))
# Cleanup temp
if output_dir.exists():
import shutil
shutil.rmtree(output_dir, ignore_errors=True)
return {
"status": "completed",
"step_id": step_id,
"cache_id": cache_id,
"output_path": str(cached_file.path),
"content_hash": cached_file.content_hash,
"ipfs_cid": ipfs_cid,
"filter_count": len(filter_chain),
}
# Get executor for other node types # Get executor for other node types
try: try:
node_type_enum = NodeType[node_type] node_type_enum = NodeType[node_type]

View File

@@ -222,90 +222,159 @@ def run_plan(
} }
def _extract_analysis_from_recipe(compiled_recipe) -> List[Dict]:
"""
Extract analysis nodes from a compiled recipe.
Finds all (analyze ...) nodes and returns their configurations.
Analysis nodes are identified by type "ANALYZE" or by having
an "analyze" config key.
"""
analysis_nodes = []
nodes = compiled_recipe.nodes
if isinstance(nodes, dict):
nodes = list(nodes.values())
for node in nodes:
node_type = node.get("type", "").upper()
config = node.get("config", {})
# Check if this is an analysis node
if node_type == "ANALYZE" or config.get("analyze"):
analysis_nodes.append({
"node_id": node.get("id"),
"input_ref": config.get("input") or config.get("source"),
"feature": config.get("feature") or config.get("analyze"),
"config": config,
})
return analysis_nodes
@app.task(bind=True, name='tasks.run_recipe') @app.task(bind=True, name='tasks.run_recipe')
def run_recipe( def run_recipe(
self, self,
recipe_yaml: str, recipe_sexp: str,
input_hashes: Dict[str, str], input_hashes: Dict[str, str],
features: List[str] = None,
run_id: Optional[str] = None, run_id: Optional[str] = None,
) -> dict: ) -> dict:
""" """
Run a complete recipe through all 3 phases. Run a complete recipe through all phases.
1. Analyze: Extract features from inputs The recipe S-expression declares what analysis is needed.
2. Plan: Generate execution plan Analysis nodes in the recipe are executed first, then their
3. Execute: Run the plan outputs are used to generate the execution plan.
1. Parse: Compile recipe S-expression
2. Analyze: Run analysis nodes from recipe
3. Plan: Generate execution plan using analysis results
4. Execute: Run the plan
Args: Args:
recipe_yaml: Recipe YAML content recipe_sexp: Recipe S-expression content
input_hashes: Mapping from input name to content hash input_hashes: Mapping from input name to content hash
features: Features to extract (default: ["beats", "energy"])
run_id: Optional run ID for tracking run_id: Optional run ID for tracking
Returns: Returns:
Dict with final results Dict with final results
""" """
if RecipePlanner is None or Analyzer is None: # Import S-expression compiler
raise ImportError("artdag modules not available") try:
from artdag.sexp import compile_string
except ImportError:
raise ImportError("artdag.sexp not available")
if features is None: if Analyzer is None:
features = ["beats", "energy"] raise ImportError("artdag.analysis not available")
cache_mgr = get_cache_manager() cache_mgr = get_cache_manager()
logger.info(f"Running recipe with {len(input_hashes)} inputs") logger.info(f"Running recipe with {len(input_hashes)} inputs")
# Phase 1: Analyze # Phase 1: Parse recipe
logger.info("Phase 1: Analyzing inputs...") logger.info("Phase 1: Parsing recipe S-expression...")
try:
compiled = compile_string(recipe_sexp)
except Exception as e:
return {"status": "failed", "error": f"Recipe parse error: {e}"}
logger.info(f"Parsed recipe: {compiled.name}")
# Phase 2: Run analysis nodes from recipe
logger.info("Phase 2: Running analysis from recipe...")
analysis_nodes = _extract_analysis_from_recipe(compiled)
logger.info(f"Found {len(analysis_nodes)} analysis nodes in recipe")
ANALYSIS_CACHE_DIR.mkdir(parents=True, exist_ok=True) ANALYSIS_CACHE_DIR.mkdir(parents=True, exist_ok=True)
analyzer = Analyzer(cache_dir=ANALYSIS_CACHE_DIR) analyzer = Analyzer(cache_dir=ANALYSIS_CACHE_DIR)
analysis_results = {} analysis_results = {}
for name, content_hash in input_hashes.items(): for analysis_node in analysis_nodes:
# Get path from cache input_ref = analysis_node["input_ref"]
feature = analysis_node["feature"]
node_id = analysis_node["node_id"]
# Resolve input reference to content hash
content_hash = input_hashes.get(input_ref)
if not content_hash:
logger.warning(f"Analysis node {node_id}: input '{input_ref}' not in input_hashes")
continue
path = cache_mgr.get_by_content_hash(content_hash) path = cache_mgr.get_by_content_hash(content_hash)
if path: if not path:
try: logger.warning(f"Analysis node {node_id}: content {content_hash[:16]}... not in cache")
result = analyzer.analyze( continue
input_hash=content_hash,
features=features,
input_path=Path(path),
)
analysis_results[content_hash] = result
logger.info(f"Analyzed {name}: tempo={result.tempo}, beats={len(result.beat_times or [])}")
except Exception as e:
logger.warning(f"Analysis failed for {name}: {e}")
else:
logger.warning(f"Input {name} ({content_hash[:16]}...) not in cache")
logger.info(f"Analyzed {len(analysis_results)} inputs") try:
# Run analysis for the specific feature
features = [feature] if feature else ["beats", "energy"]
result = analyzer.analyze(
input_hash=content_hash,
features=features,
input_path=Path(path),
)
# Store result keyed by node_id so plan can reference it
analysis_results[node_id] = result
# Also store by content_hash for compatibility
analysis_results[content_hash] = result
logger.info(f"Analysis {node_id}: feature={feature}, tempo={result.tempo}")
except Exception as e:
logger.warning(f"Analysis failed for {node_id}: {e}")
# Phase 2: Plan logger.info(f"Completed {len(analysis_results)} analysis results")
logger.info("Phase 2: Generating execution plan...")
recipe = Recipe.from_yaml(recipe_yaml) # Phase 3: Generate plan
planner = RecipePlanner(use_tree_reduction=True) logger.info("Phase 3: Generating execution plan...")
plan = planner.plan( # Use the S-expression planner if available
recipe=recipe, try:
input_hashes=input_hashes, from artdag.sexp.planner import create_plan
analysis=analysis_results, plan = create_plan(compiled, inputs=input_hashes)
) except ImportError:
# Fall back to legacy planner
if RecipePlanner is None:
raise ImportError("No planner available")
recipe = Recipe.from_dict(compiled.to_dict())
planner = RecipePlanner(use_tree_reduction=True)
plan = planner.plan(
recipe=recipe,
input_hashes=input_hashes,
analysis=analysis_results,
)
logger.info(f"Generated plan with {len(plan.steps)} steps") logger.info(f"Generated plan with {len(plan.steps)} steps")
# Save plan as S-expression through cache manager (goes to IPFS) # Save plan as S-expression through cache manager (goes to IPFS)
import tempfile import tempfile
from cache_manager import get_cache_manager
cache_mgr = get_cache_manager()
plan_sexp = plan.to_sexp_string() if hasattr(plan, 'to_sexp_string') else plan.to_json() plan_content = plan.to_sexp_string() if hasattr(plan, 'to_sexp_string') else plan.to_json()
plan_suffix = ".sexp" if hasattr(plan, 'to_sexp_string') else ".json" plan_suffix = ".sexp" if hasattr(plan, 'to_sexp_string') else ".json"
with tempfile.NamedTemporaryFile(delete=False, suffix=plan_suffix, mode="w") as tmp: with tempfile.NamedTemporaryFile(delete=False, suffix=plan_suffix, mode="w") as tmp:
tmp.write(plan_sexp) tmp.write(plan_content)
tmp_path = Path(tmp.name) tmp_path = Path(tmp.name)
# Store in cache (content-addressed, auto-pins to IPFS) # Store in cache (content-addressed, auto-pins to IPFS)
@@ -313,15 +382,15 @@ def run_recipe(
cached, plan_ipfs_cid = cache_mgr.put(tmp_path, node_type="plan", move=True) cached, plan_ipfs_cid = cache_mgr.put(tmp_path, node_type="plan", move=True)
logger.info(f"Plan cached: hash={cached.content_hash}, ipfs={plan_ipfs_cid}") logger.info(f"Plan cached: hash={cached.content_hash}, ipfs={plan_ipfs_cid}")
# Phase 3: Execute # Phase 4: Execute
logger.info("Phase 3: Executing plan...") logger.info("Phase 4: Executing plan...")
result = run_plan(plan.to_json(), run_id=run_id) result = run_plan(plan.to_json(), run_id=run_id)
return { return {
"status": result.get("status"), "status": result.get("status"),
"run_id": run_id, "run_id": run_id,
"recipe": recipe.name, "recipe": compiled.name,
"plan_id": plan.plan_id, "plan_id": plan.plan_id,
"plan_cache_id": cached.content_hash, "plan_cache_id": cached.content_hash,
"plan_ipfs_cid": plan_ipfs_cid, "plan_ipfs_cid": plan_ipfs_cid,
@@ -339,9 +408,8 @@ def run_recipe(
@app.task(bind=True, name='tasks.generate_plan') @app.task(bind=True, name='tasks.generate_plan')
def generate_plan( def generate_plan(
self, self,
recipe_yaml: str, recipe_sexp: str,
input_hashes: Dict[str, str], input_hashes: Dict[str, str],
features: List[str] = None,
) -> dict: ) -> dict:
""" """
Generate an execution plan without executing it. Generate an execution plan without executing it.
@@ -352,48 +420,72 @@ def generate_plan(
- Debugging recipe issues - Debugging recipe issues
Args: Args:
recipe_yaml: Recipe YAML content recipe_sexp: Recipe S-expression content
input_hashes: Mapping from input name to content hash input_hashes: Mapping from input name to content hash
features: Features to extract for analysis
Returns: Returns:
Dict with plan details Dict with plan details
""" """
if RecipePlanner is None or Analyzer is None: try:
raise ImportError("artdag modules not available") from artdag.sexp import compile_string
except ImportError:
raise ImportError("artdag.sexp not available")
if features is None: if Analyzer is None:
features = ["beats", "energy"] raise ImportError("artdag.analysis not available")
cache_mgr = get_cache_manager() cache_mgr = get_cache_manager()
# Analyze inputs # Parse recipe
try:
compiled = compile_string(recipe_sexp)
except Exception as e:
return {"status": "failed", "error": f"Recipe parse error: {e}"}
# Extract and run analysis nodes from recipe
analysis_nodes = _extract_analysis_from_recipe(compiled)
ANALYSIS_CACHE_DIR.mkdir(parents=True, exist_ok=True) ANALYSIS_CACHE_DIR.mkdir(parents=True, exist_ok=True)
analyzer = Analyzer(cache_dir=ANALYSIS_CACHE_DIR) analyzer = Analyzer(cache_dir=ANALYSIS_CACHE_DIR)
analysis_results = {} analysis_results = {}
for name, content_hash in input_hashes.items(): for analysis_node in analysis_nodes:
input_ref = analysis_node["input_ref"]
feature = analysis_node["feature"]
node_id = analysis_node["node_id"]
content_hash = input_hashes.get(input_ref)
if not content_hash:
continue
path = cache_mgr.get_by_content_hash(content_hash) path = cache_mgr.get_by_content_hash(content_hash)
if path: if path:
try: try:
features = [feature] if feature else ["beats", "energy"]
result = analyzer.analyze( result = analyzer.analyze(
input_hash=content_hash, input_hash=content_hash,
features=features, features=features,
input_path=Path(path), input_path=Path(path),
) )
analysis_results[node_id] = result
analysis_results[content_hash] = result analysis_results[content_hash] = result
except Exception as e: except Exception as e:
logger.warning(f"Analysis failed for {name}: {e}") logger.warning(f"Analysis failed for {node_id}: {e}")
# Generate plan # Generate plan
recipe = Recipe.from_yaml(recipe_yaml) try:
planner = RecipePlanner(use_tree_reduction=True) from artdag.sexp.planner import create_plan
plan = create_plan(compiled, inputs=input_hashes)
plan = planner.plan( except ImportError:
recipe=recipe, if RecipePlanner is None:
input_hashes=input_hashes, raise ImportError("No planner available")
analysis=analysis_results, recipe = Recipe.from_dict(compiled.to_dict())
) planner = RecipePlanner(use_tree_reduction=True)
plan = planner.plan(
recipe=recipe,
input_hashes=input_hashes,
analysis=analysis_results,
)
# Check cache status for each step # Check cache status for each step
steps_status = [] steps_status = []
@@ -411,7 +503,7 @@ def generate_plan(
return { return {
"status": "planned", "status": "planned",
"recipe": recipe.name, "recipe": compiled.name,
"plan_id": plan.plan_id, "plan_id": plan.plan_id,
"total_steps": len(plan.steps), "total_steps": len(plan.steps),
"cached_steps": cached_count, "cached_steps": cached_count,