diff --git a/app/routers/api.py b/app/routers/api.py index 10421e5..fa2b5b6 100644 --- a/app/routers/api.py +++ b/app/routers/api.py @@ -11,7 +11,6 @@ import uuid from datetime import datetime, timezone from typing import Dict, List, Optional -import yaml from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel @@ -26,9 +25,8 @@ RUNS_KEY_PREFIX = "artdag:run:" class PlanRequest(BaseModel): - recipe_yaml: str + recipe_sexp: str input_hashes: Dict[str, str] - features: List[str] = ["beats", "energy"] class ExecutePlanRequest(BaseModel): @@ -37,9 +35,8 @@ class ExecutePlanRequest(BaseModel): class RecipeRunRequest(BaseModel): - recipe_yaml: str + recipe_sexp: str input_hashes: Dict[str, str] - features: List[str] = ["beats", "energy"] def compute_run_id(input_hashes: List[str], recipe: str, recipe_hash: str = None) -> str: @@ -68,9 +65,8 @@ async def generate_plan_endpoint( try: task = generate_plan.delay( - recipe_yaml=request.recipe_yaml, + recipe_sexp=request.recipe_sexp, input_hashes=request.input_hashes, - features=request.features, ) # Wait for result (plan generation is usually fast) @@ -136,15 +132,16 @@ async def run_recipe_endpoint( Returns immediately with run_id. Poll /api/run/{run_id} for status. """ from tasks.orchestrate import run_recipe + from artdag.sexp import compile_string import database redis = get_redis_client() cache = get_cache_manager() - # Parse recipe name + # Parse recipe name from S-expression try: - recipe_data = yaml.safe_load(request.recipe_yaml) - recipe_name = recipe_data.get("name", "unknown") + compiled = compile_string(request.recipe_sexp) + recipe_name = compiled.name or "unknown" except Exception: recipe_name = "unknown" @@ -152,7 +149,7 @@ async def run_recipe_endpoint( run_id = compute_run_id( list(request.input_hashes.values()), recipe_name, - hashlib.sha3_256(request.recipe_yaml.encode()).hexdigest() + hashlib.sha3_256(request.recipe_sexp.encode()).hexdigest() ) # Check if already completed @@ -171,9 +168,8 @@ async def run_recipe_endpoint( # Submit to Celery try: task = run_recipe.delay( - recipe_yaml=request.recipe_yaml, + recipe_sexp=request.recipe_sexp, input_hashes=request.input_hashes, - features=request.features, run_id=run_id, ) diff --git a/tasks/execute.py b/tasks/execute.py index 98e4752..75727de 100644 --- a/tasks/execute.py +++ b/tasks/execute.py @@ -178,6 +178,109 @@ def execute_step( "item_paths": item_paths, } + # Handle COMPOUND nodes (collapsed effect chains) + if step.node_type == "COMPOUND": + filter_chain = step.config.get("filter_chain", []) + if not filter_chain: + raise ValueError("COMPOUND step has empty filter_chain") + + # Resolve input paths + input_paths = [] + for input_step_id in step.input_steps: + input_cache_id = input_cache_ids.get(input_step_id) + if not input_cache_id: + raise ValueError(f"No cache_id for input step: {input_step_id}") + path = cache_mgr.get_by_content_hash(input_cache_id) + if not path: + raise ValueError(f"Input not in cache: {input_cache_id[:16]}...") + input_paths.append(Path(path)) + + if not input_paths: + raise ValueError("COMPOUND step has no inputs") + + # Build FFmpeg filter graph from chain + import subprocess + import tempfile + + filters = [] + for filter_item in filter_chain: + filter_type = filter_item.get("type", "") + filter_config = filter_item.get("config", {}) + + if filter_type == "TRANSFORM": + effects = filter_config.get("effects", {}) + for eff_name, eff_value in effects.items(): + if eff_name == "saturation": + filters.append(f"eq=saturation={eff_value}") + elif eff_name == "brightness": + filters.append(f"eq=brightness={eff_value}") + elif eff_name == "contrast": + filters.append(f"eq=contrast={eff_value}") + elif eff_name == "hue": + filters.append(f"hue=h={eff_value}") + + elif filter_type == "RESIZE": + width = filter_config.get("width", -1) + height = filter_config.get("height", -1) + mode = filter_config.get("mode", "fit") + if mode == "fit": + filters.append(f"scale={width}:{height}:force_original_aspect_ratio=decrease") + elif mode == "fill": + filters.append(f"scale={width}:{height}:force_original_aspect_ratio=increase,crop={width}:{height}") + else: + filters.append(f"scale={width}:{height}") + + output_dir = Path(tempfile.mkdtemp()) + output_path = output_dir / f"compound_{step.cache_id[:16]}.mp4" + + cmd = ["ffmpeg", "-y", "-i", str(input_paths[0])] + + # Handle segment timing + for filter_item in filter_chain: + if filter_item.get("type") == "SEGMENT": + seg_config = filter_item.get("config", {}) + if "start" in seg_config: + cmd.extend(["-ss", str(seg_config["start"])]) + if "end" in seg_config: + duration = seg_config["end"] - seg_config.get("start", 0) + cmd.extend(["-t", str(duration)]) + elif "duration" in seg_config: + cmd.extend(["-t", str(seg_config["duration"])]) + + if filters: + cmd.extend(["-vf", ",".join(filters)]) + + cmd.extend(["-c:v", "libx264", "-c:a", "aac", str(output_path)]) + + logger.info(f"Running COMPOUND FFmpeg: {' '.join(cmd)}") + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode != 0: + raise RuntimeError(f"FFmpeg failed: {result.stderr}") + + cached_file, ipfs_cid = cache_mgr.put( + source_path=output_path, + node_type="COMPOUND", + node_id=step.cache_id, + ) + + logger.info(f"COMPOUND step {step.step_id} completed with {len(filter_chain)} filters") + complete_task(step.cache_id, worker_id, str(cached_file.path)) + + import shutil + if output_dir.exists(): + shutil.rmtree(output_dir, ignore_errors=True) + + return { + "status": "completed", + "step_id": step.step_id, + "cache_id": step.cache_id, + "output_path": str(cached_file.path), + "content_hash": cached_file.content_hash, + "ipfs_cid": ipfs_cid, + "filter_count": len(filter_chain), + } + # Get executor for this node type try: node_type = NodeType[step.node_type] diff --git a/tasks/execute_sexp.py b/tasks/execute_sexp.py index f6e493d..79160ea 100644 --- a/tasks/execute_sexp.py +++ b/tasks/execute_sexp.py @@ -249,6 +249,127 @@ def execute_step_sexp( } raise ValueError(f"No executor for EFFECT and no inputs") + # Handle COMPOUND nodes (collapsed effect chains) + if node_type == "COMPOUND": + filter_chain = config.get("filter_chain", []) + if not filter_chain: + raise ValueError("COMPOUND step has empty filter_chain") + + # Get input paths + inputs = config.get("inputs", []) + input_paths = [] + for inp in inputs: + inp_cache_id = input_cache_ids.get(inp, inp) + path = cache_mgr.get_by_content_hash(inp_cache_id) + if not path: + raise ValueError(f"Input not found: {inp_cache_id[:16]}...") + input_paths.append(Path(path)) + + if not input_paths: + raise ValueError("COMPOUND step has no inputs") + + # Build FFmpeg filter graph from chain + filters = [] + for i, filter_item in enumerate(filter_chain): + filter_type = filter_item.get("type", "") + filter_config = filter_item.get("config", {}) + + if filter_type == "EFFECT": + # Effect - for now identity-like, can be extended + effect_hash = filter_config.get("hash") or filter_config.get("effect") + # TODO: resolve effect to actual FFmpeg filter + # For now, skip identity-like effects + pass + + elif filter_type == "TRANSFORM": + # Transform effects map to FFmpeg filters + effects = filter_config.get("effects", {}) + for eff_name, eff_value in effects.items(): + if eff_name == "saturation": + filters.append(f"eq=saturation={eff_value}") + elif eff_name == "brightness": + filters.append(f"eq=brightness={eff_value}") + elif eff_name == "contrast": + filters.append(f"eq=contrast={eff_value}") + elif eff_name == "hue": + filters.append(f"hue=h={eff_value}") + + elif filter_type == "RESIZE": + width = filter_config.get("width", -1) + height = filter_config.get("height", -1) + mode = filter_config.get("mode", "fit") + if mode == "fit": + filters.append(f"scale={width}:{height}:force_original_aspect_ratio=decrease") + elif mode == "fill": + filters.append(f"scale={width}:{height}:force_original_aspect_ratio=increase,crop={width}:{height}") + else: + filters.append(f"scale={width}:{height}") + + elif filter_type == "SEGMENT": + # Segment handled via -ss and -t, not filter + pass + + # Create temp output + import tempfile + import subprocess + + output_dir = Path(tempfile.mkdtemp()) + output_path = output_dir / f"compound_{cache_id[:16]}.mp4" + + # Build FFmpeg command + input_path = input_paths[0] + cmd = ["ffmpeg", "-y", "-i", str(input_path)] + + # Handle segment timing if present + for filter_item in filter_chain: + if filter_item.get("type") == "SEGMENT": + seg_config = filter_item.get("config", {}) + if "start" in seg_config: + cmd.extend(["-ss", str(seg_config["start"])]) + if "end" in seg_config: + duration = seg_config["end"] - seg_config.get("start", 0) + cmd.extend(["-t", str(duration)]) + elif "duration" in seg_config: + cmd.extend(["-t", str(seg_config["duration"])]) + + # Add filter graph if any + if filters: + cmd.extend(["-vf", ",".join(filters)]) + + # Output options + cmd.extend(["-c:v", "libx264", "-c:a", "aac", str(output_path)]) + + logger.info(f"Running COMPOUND FFmpeg: {' '.join(cmd)}") + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode != 0: + raise RuntimeError(f"FFmpeg failed: {result.stderr}") + + # Store in cache + cached_file, ipfs_cid = cache_mgr.put( + source_path=output_path, + node_type="COMPOUND", + node_id=cache_id, + ) + + logger.info(f"COMPOUND step {step_id} completed with {len(filter_chain)} filters, IPFS CID: {ipfs_cid}") + complete_task(cache_id, worker_id, str(cached_file.path)) + + # Cleanup temp + if output_dir.exists(): + import shutil + shutil.rmtree(output_dir, ignore_errors=True) + + return { + "status": "completed", + "step_id": step_id, + "cache_id": cache_id, + "output_path": str(cached_file.path), + "content_hash": cached_file.content_hash, + "ipfs_cid": ipfs_cid, + "filter_count": len(filter_chain), + } + # Get executor for other node types try: node_type_enum = NodeType[node_type] diff --git a/tasks/orchestrate.py b/tasks/orchestrate.py index 347c02e..81b4ecc 100644 --- a/tasks/orchestrate.py +++ b/tasks/orchestrate.py @@ -222,90 +222,159 @@ def run_plan( } +def _extract_analysis_from_recipe(compiled_recipe) -> List[Dict]: + """ + Extract analysis nodes from a compiled recipe. + + Finds all (analyze ...) nodes and returns their configurations. + Analysis nodes are identified by type "ANALYZE" or by having + an "analyze" config key. + """ + analysis_nodes = [] + nodes = compiled_recipe.nodes + + if isinstance(nodes, dict): + nodes = list(nodes.values()) + + for node in nodes: + node_type = node.get("type", "").upper() + config = node.get("config", {}) + + # Check if this is an analysis node + if node_type == "ANALYZE" or config.get("analyze"): + analysis_nodes.append({ + "node_id": node.get("id"), + "input_ref": config.get("input") or config.get("source"), + "feature": config.get("feature") or config.get("analyze"), + "config": config, + }) + + return analysis_nodes + + @app.task(bind=True, name='tasks.run_recipe') def run_recipe( self, - recipe_yaml: str, + recipe_sexp: str, input_hashes: Dict[str, str], - features: List[str] = None, run_id: Optional[str] = None, ) -> dict: """ - Run a complete recipe through all 3 phases. + Run a complete recipe through all phases. - 1. Analyze: Extract features from inputs - 2. Plan: Generate execution plan - 3. Execute: Run the plan + The recipe S-expression declares what analysis is needed. + Analysis nodes in the recipe are executed first, then their + outputs are used to generate the execution plan. + + 1. Parse: Compile recipe S-expression + 2. Analyze: Run analysis nodes from recipe + 3. Plan: Generate execution plan using analysis results + 4. Execute: Run the plan Args: - recipe_yaml: Recipe YAML content + recipe_sexp: Recipe S-expression content input_hashes: Mapping from input name to content hash - features: Features to extract (default: ["beats", "energy"]) run_id: Optional run ID for tracking Returns: Dict with final results """ - if RecipePlanner is None or Analyzer is None: - raise ImportError("artdag modules not available") + # Import S-expression compiler + try: + from artdag.sexp import compile_string + except ImportError: + raise ImportError("artdag.sexp not available") - if features is None: - features = ["beats", "energy"] + if Analyzer is None: + raise ImportError("artdag.analysis not available") cache_mgr = get_cache_manager() logger.info(f"Running recipe with {len(input_hashes)} inputs") - # Phase 1: Analyze - logger.info("Phase 1: Analyzing inputs...") + # Phase 1: Parse recipe + logger.info("Phase 1: Parsing recipe S-expression...") + + try: + compiled = compile_string(recipe_sexp) + except Exception as e: + return {"status": "failed", "error": f"Recipe parse error: {e}"} + + logger.info(f"Parsed recipe: {compiled.name}") + + # Phase 2: Run analysis nodes from recipe + logger.info("Phase 2: Running analysis from recipe...") + + analysis_nodes = _extract_analysis_from_recipe(compiled) + logger.info(f"Found {len(analysis_nodes)} analysis nodes in recipe") ANALYSIS_CACHE_DIR.mkdir(parents=True, exist_ok=True) analyzer = Analyzer(cache_dir=ANALYSIS_CACHE_DIR) analysis_results = {} - for name, content_hash in input_hashes.items(): - # Get path from cache + for analysis_node in analysis_nodes: + input_ref = analysis_node["input_ref"] + feature = analysis_node["feature"] + node_id = analysis_node["node_id"] + + # Resolve input reference to content hash + content_hash = input_hashes.get(input_ref) + if not content_hash: + logger.warning(f"Analysis node {node_id}: input '{input_ref}' not in input_hashes") + continue + path = cache_mgr.get_by_content_hash(content_hash) - if path: - try: - result = analyzer.analyze( - input_hash=content_hash, - features=features, - input_path=Path(path), - ) - analysis_results[content_hash] = result - logger.info(f"Analyzed {name}: tempo={result.tempo}, beats={len(result.beat_times or [])}") - except Exception as e: - logger.warning(f"Analysis failed for {name}: {e}") - else: - logger.warning(f"Input {name} ({content_hash[:16]}...) not in cache") + if not path: + logger.warning(f"Analysis node {node_id}: content {content_hash[:16]}... not in cache") + continue - logger.info(f"Analyzed {len(analysis_results)} inputs") + try: + # Run analysis for the specific feature + features = [feature] if feature else ["beats", "energy"] + result = analyzer.analyze( + input_hash=content_hash, + features=features, + input_path=Path(path), + ) + # Store result keyed by node_id so plan can reference it + analysis_results[node_id] = result + # Also store by content_hash for compatibility + analysis_results[content_hash] = result + logger.info(f"Analysis {node_id}: feature={feature}, tempo={result.tempo}") + except Exception as e: + logger.warning(f"Analysis failed for {node_id}: {e}") - # Phase 2: Plan - logger.info("Phase 2: Generating execution plan...") + logger.info(f"Completed {len(analysis_results)} analysis results") - recipe = Recipe.from_yaml(recipe_yaml) - planner = RecipePlanner(use_tree_reduction=True) + # Phase 3: Generate plan + logger.info("Phase 3: Generating execution plan...") - plan = planner.plan( - recipe=recipe, - input_hashes=input_hashes, - analysis=analysis_results, - ) + # Use the S-expression planner if available + try: + from artdag.sexp.planner import create_plan + plan = create_plan(compiled, inputs=input_hashes) + except ImportError: + # Fall back to legacy planner + if RecipePlanner is None: + raise ImportError("No planner available") + recipe = Recipe.from_dict(compiled.to_dict()) + planner = RecipePlanner(use_tree_reduction=True) + plan = planner.plan( + recipe=recipe, + input_hashes=input_hashes, + analysis=analysis_results, + ) logger.info(f"Generated plan with {len(plan.steps)} steps") # Save plan as S-expression through cache manager (goes to IPFS) import tempfile - from cache_manager import get_cache_manager - cache_mgr = get_cache_manager() - plan_sexp = plan.to_sexp_string() if hasattr(plan, 'to_sexp_string') else plan.to_json() + plan_content = plan.to_sexp_string() if hasattr(plan, 'to_sexp_string') else plan.to_json() plan_suffix = ".sexp" if hasattr(plan, 'to_sexp_string') else ".json" with tempfile.NamedTemporaryFile(delete=False, suffix=plan_suffix, mode="w") as tmp: - tmp.write(plan_sexp) + tmp.write(plan_content) tmp_path = Path(tmp.name) # Store in cache (content-addressed, auto-pins to IPFS) @@ -313,15 +382,15 @@ def run_recipe( cached, plan_ipfs_cid = cache_mgr.put(tmp_path, node_type="plan", move=True) logger.info(f"Plan cached: hash={cached.content_hash}, ipfs={plan_ipfs_cid}") - # Phase 3: Execute - logger.info("Phase 3: Executing plan...") + # Phase 4: Execute + logger.info("Phase 4: Executing plan...") result = run_plan(plan.to_json(), run_id=run_id) return { "status": result.get("status"), "run_id": run_id, - "recipe": recipe.name, + "recipe": compiled.name, "plan_id": plan.plan_id, "plan_cache_id": cached.content_hash, "plan_ipfs_cid": plan_ipfs_cid, @@ -339,9 +408,8 @@ def run_recipe( @app.task(bind=True, name='tasks.generate_plan') def generate_plan( self, - recipe_yaml: str, + recipe_sexp: str, input_hashes: Dict[str, str], - features: List[str] = None, ) -> dict: """ Generate an execution plan without executing it. @@ -352,48 +420,72 @@ def generate_plan( - Debugging recipe issues Args: - recipe_yaml: Recipe YAML content + recipe_sexp: Recipe S-expression content input_hashes: Mapping from input name to content hash - features: Features to extract for analysis Returns: Dict with plan details """ - if RecipePlanner is None or Analyzer is None: - raise ImportError("artdag modules not available") + try: + from artdag.sexp import compile_string + except ImportError: + raise ImportError("artdag.sexp not available") - if features is None: - features = ["beats", "energy"] + if Analyzer is None: + raise ImportError("artdag.analysis not available") cache_mgr = get_cache_manager() - # Analyze inputs + # Parse recipe + try: + compiled = compile_string(recipe_sexp) + except Exception as e: + return {"status": "failed", "error": f"Recipe parse error: {e}"} + + # Extract and run analysis nodes from recipe + analysis_nodes = _extract_analysis_from_recipe(compiled) + ANALYSIS_CACHE_DIR.mkdir(parents=True, exist_ok=True) analyzer = Analyzer(cache_dir=ANALYSIS_CACHE_DIR) analysis_results = {} - for name, content_hash in input_hashes.items(): + for analysis_node in analysis_nodes: + input_ref = analysis_node["input_ref"] + feature = analysis_node["feature"] + node_id = analysis_node["node_id"] + + content_hash = input_hashes.get(input_ref) + if not content_hash: + continue + path = cache_mgr.get_by_content_hash(content_hash) if path: try: + features = [feature] if feature else ["beats", "energy"] result = analyzer.analyze( input_hash=content_hash, features=features, input_path=Path(path), ) + analysis_results[node_id] = result analysis_results[content_hash] = result except Exception as e: - logger.warning(f"Analysis failed for {name}: {e}") + logger.warning(f"Analysis failed for {node_id}: {e}") # Generate plan - recipe = Recipe.from_yaml(recipe_yaml) - planner = RecipePlanner(use_tree_reduction=True) - - plan = planner.plan( - recipe=recipe, - input_hashes=input_hashes, - analysis=analysis_results, - ) + try: + from artdag.sexp.planner import create_plan + plan = create_plan(compiled, inputs=input_hashes) + except ImportError: + if RecipePlanner is None: + raise ImportError("No planner available") + recipe = Recipe.from_dict(compiled.to_dict()) + planner = RecipePlanner(use_tree_reduction=True) + plan = planner.plan( + recipe=recipe, + input_hashes=input_hashes, + analysis=analysis_results, + ) # Check cache status for each step steps_status = [] @@ -411,7 +503,7 @@ def generate_plan( return { "status": "planned", - "recipe": recipe.name, + "recipe": compiled.name, "plan_id": plan.plan_id, "total_steps": len(plan.steps), "cached_steps": cached_count,