""" Plan orchestration tasks. Coordinates the full 3-phase execution: 1. Analyze inputs 2. Generate plan 3. Execute steps level by level Uses IPFS-backed cache for durability. """ import json import logging import os from pathlib import Path from typing import Dict, List, Optional from celery import current_task, group, chain # Import from the Celery app import sys sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from celery_app import app from claiming import get_claimer from cache_manager import get_cache_manager # Import artdag modules try: from artdag import Cache from artdag.analysis import Analyzer, AnalysisResult from artdag.planning import RecipePlanner, ExecutionPlan, Recipe except ImportError: Cache = None Analyzer = None AnalysisResult = None RecipePlanner = None ExecutionPlan = None Recipe = None from .execute import execute_step logger = logging.getLogger(__name__) # Cache directories CACHE_DIR = Path(os.environ.get('CACHE_DIR', str(Path.home() / ".artdag" / "cache"))) ANALYSIS_CACHE_DIR = CACHE_DIR / 'analysis' PLAN_CACHE_DIR = CACHE_DIR / 'plans' @app.task(bind=True, name='tasks.run_plan') def run_plan( self, plan_json: str, run_id: Optional[str] = None, ) -> dict: """ Execute a complete execution plan. Runs steps level by level, with parallel execution within each level. Results are stored in IPFS-backed cache. Args: plan_json: JSON-serialized ExecutionPlan run_id: Optional run ID for tracking Returns: Dict with execution results """ if ExecutionPlan is None: raise ImportError("artdag.planning not available") plan = ExecutionPlan.from_json(plan_json) cache_mgr = get_cache_manager() logger.info(f"Executing plan {plan.plan_id[:16]}... ({len(plan.steps)} steps)") # Build initial cache_ids mapping (step_id -> cache_id) cache_ids = {} for step in plan.steps: cache_ids[step.step_id] = step.cache_id # Also map input hashes for name, cid in plan.input_hashes.items(): cache_ids[name] = cid # Group steps by level steps_by_level = plan.get_steps_by_level() max_level = max(steps_by_level.keys()) if steps_by_level else 0 results_by_step = {} total_cached = 0 total_executed = 0 for level in range(max_level + 1): level_steps = steps_by_level.get(level, []) if not level_steps: continue logger.info(f"Executing level {level}: {len(level_steps)} steps") # Check which steps need execution steps_to_run = [] for step in level_steps: # Check if cached cached_path = cache_mgr.get_by_cid(step.cache_id) if cached_path: results_by_step[step.step_id] = { "status": "cached", "cache_id": step.cache_id, "output_path": str(cached_path), } total_cached += 1 else: steps_to_run.append(step) if not steps_to_run: logger.info(f"Level {level}: all steps cached") continue # Build input cache_ids for this level level_cache_ids = dict(cache_ids) # Execute steps in parallel tasks = [ execute_step.s(step.to_json(), plan.plan_id, level_cache_ids) for step in steps_to_run ] job = group(tasks) async_results = job.apply_async() # Wait for completion try: step_results = async_results.get(timeout=3600) except Exception as e: logger.error(f"Level {level} execution failed: {e}") return { "status": "failed", "error": str(e), "level": level, "results": results_by_step, "run_id": run_id, } # Process results for result in step_results: step_id = result.get("step_id") cache_id = result.get("cache_id") results_by_step[step_id] = result cache_ids[step_id] = cache_id if result.get("status") in ("completed", "cached", "completed_by_other"): total_executed += 1 elif result.get("status") == "failed": logger.error(f"Step {step_id} failed: {result.get('error')}") return { "status": "failed", "error": f"Step {step_id} failed: {result.get('error')}", "level": level, "results": results_by_step, "run_id": run_id, } # Get final output output_step = plan.get_step(plan.output_step) output_cache_id = output_step.cache_id if output_step else None output_path = None output_ipfs_cid = None output_name = plan.output_name if output_cache_id: output_path = cache_mgr.get_by_cid(output_cache_id) output_ipfs_cid = cache_mgr.get_ipfs_cid(output_cache_id) # Build list of all outputs with their names and artifacts all_outputs = [] for step in plan.steps: step_result = results_by_step.get(step.step_id, {}) step_outputs = step_result.get("outputs", []) # If no outputs in result, build from step definition if not step_outputs and step.outputs: for output_def in step.outputs: output_cache_path = cache_mgr.get_by_cid(output_def.cache_id) output_ipfs = cache_mgr.get_ipfs_cid(output_def.cache_id) if output_cache_path else None all_outputs.append({ "name": output_def.name, "step_id": step.step_id, "step_name": step.name, "cache_id": output_def.cache_id, "media_type": output_def.media_type, "path": str(output_cache_path) if output_cache_path else None, "ipfs_cid": output_ipfs, "status": "cached" if output_cache_path else "missing", }) else: for output in step_outputs: all_outputs.append({ **output, "step_id": step.step_id, "step_name": step.name, "status": "completed", }) return { "status": "completed", "run_id": run_id, "plan_id": plan.plan_id, "plan_name": plan.name, "recipe_name": plan.recipe_name, "output_name": output_name, "output_cache_id": output_cache_id, "output_path": str(output_path) if output_path else None, "output_ipfs_cid": output_ipfs_cid, "total_steps": len(plan.steps), "cached": total_cached, "executed": total_executed, "results": results_by_step, "outputs": all_outputs, } def _extract_analysis_from_recipe(compiled_recipe) -> List[Dict]: """ Extract analysis nodes from a compiled recipe. Finds all (analyze ...) nodes and returns their configurations. Analysis nodes are identified by type "ANALYZE" or by having an "analyze" config key. """ analysis_nodes = [] nodes = compiled_recipe.nodes if isinstance(nodes, dict): nodes = list(nodes.values()) for node in nodes: node_type = node.get("type", "").upper() config = node.get("config", {}) # Check if this is an analysis node if node_type == "ANALYZE" or config.get("analyze"): analysis_nodes.append({ "node_id": node.get("id"), "input_ref": config.get("input") or config.get("source"), "feature": config.get("feature") or config.get("analyze"), "config": config, }) return analysis_nodes @app.task(bind=True, name='tasks.run_recipe') def run_recipe( self, recipe_sexp: str, input_hashes: Dict[str, str], run_id: Optional[str] = None, ) -> dict: """ Run a complete recipe through all phases. The recipe S-expression declares what analysis is needed. Analysis nodes in the recipe are executed first, then their outputs are used to generate the execution plan. 1. Parse: Compile recipe S-expression 2. Analyze: Run analysis nodes from recipe 3. Plan: Generate execution plan using analysis results 4. Execute: Run the plan Args: recipe_sexp: Recipe S-expression content input_hashes: Mapping from input name to content hash run_id: Optional run ID for tracking Returns: Dict with final results """ # Import S-expression compiler try: from artdag.sexp import compile_string except ImportError: raise ImportError("artdag.sexp not available") if Analyzer is None: raise ImportError("artdag.analysis not available") cache_mgr = get_cache_manager() logger.info(f"Running recipe with {len(input_hashes)} inputs") # Phase 1: Parse recipe logger.info("Phase 1: Parsing recipe S-expression...") try: compiled = compile_string(recipe_sexp) except Exception as e: return {"status": "failed", "error": f"Recipe parse error: {e}"} logger.info(f"Parsed recipe: {compiled.name}") # Phase 2: Run analysis nodes from recipe logger.info("Phase 2: Running analysis from recipe...") analysis_nodes = _extract_analysis_from_recipe(compiled) logger.info(f"Found {len(analysis_nodes)} analysis nodes in recipe") ANALYSIS_CACHE_DIR.mkdir(parents=True, exist_ok=True) analyzer = Analyzer(cache_dir=ANALYSIS_CACHE_DIR) analysis_results = {} for analysis_node in analysis_nodes: input_ref = analysis_node["input_ref"] feature = analysis_node["feature"] node_id = analysis_node["node_id"] # Resolve input reference to content hash cid = input_hashes.get(input_ref) if not cid: logger.warning(f"Analysis node {node_id}: input '{input_ref}' not in input_hashes") continue path = cache_mgr.get_by_cid(cid) if not path: logger.warning(f"Analysis node {node_id}: content {cid[:16]}... not in cache") continue try: # Run analysis for the specific feature features = [feature] if feature else ["beats", "energy"] result = analyzer.analyze( input_hash=cid, features=features, input_path=Path(path), ) # Store result keyed by node_id so plan can reference it analysis_results[node_id] = result # Also store by cid for compatibility analysis_results[cid] = result logger.info(f"Analysis {node_id}: feature={feature}, tempo={result.tempo}") except Exception as e: logger.warning(f"Analysis failed for {node_id}: {e}") logger.info(f"Completed {len(analysis_results)} analysis results") # Phase 3: Generate plan logger.info("Phase 3: Generating execution plan...") # Use the S-expression planner if available try: from artdag.sexp.planner import create_plan plan = create_plan(compiled, inputs=input_hashes) except ImportError: # Fall back to legacy planner if RecipePlanner is None: raise ImportError("No planner available") recipe = Recipe.from_dict(compiled.to_dict()) planner = RecipePlanner(use_tree_reduction=True) plan = planner.plan( recipe=recipe, input_hashes=input_hashes, analysis=analysis_results, ) logger.info(f"Generated plan with {len(plan.steps)} steps") # Save plan as S-expression through cache manager (goes to IPFS) import tempfile plan_content = plan.to_sexp_string() if hasattr(plan, 'to_sexp_string') else plan.to_json() plan_suffix = ".sexp" if hasattr(plan, 'to_sexp_string') else ".json" with tempfile.NamedTemporaryFile(delete=False, suffix=plan_suffix, mode="w") as tmp: tmp.write(plan_content) tmp_path = Path(tmp.name) # Store in cache (content-addressed, auto-pins to IPFS) # Plan is just another node output - no special treatment needed cached, plan_ipfs_cid = cache_mgr.put(tmp_path, node_type="plan", move=True) plan_cache_id = plan_ipfs_cid or cached.cid # Prefer IPFS CID logger.info(f"Plan cached: cid={plan_cache_id}, ipfs={plan_ipfs_cid}") # Phase 4: Execute logger.info("Phase 4: Executing plan...") result = run_plan(plan.to_json(), run_id=run_id) return { "status": result.get("status"), "run_id": run_id, "recipe": compiled.name, "plan_id": plan.plan_id, "plan_cache_id": plan_cache_id, "plan_ipfs_cid": plan_ipfs_cid, "output_path": result.get("output_path"), "output_cache_id": result.get("output_cache_id"), "output_ipfs_cid": result.get("output_ipfs_cid"), "analysis_count": len(analysis_results), "total_steps": len(plan.steps), "cached": result.get("cached", 0), "executed": result.get("executed", 0), "error": result.get("error"), } @app.task(bind=True, name='tasks.generate_plan') def generate_plan( self, recipe_sexp: str, input_hashes: Dict[str, str], ) -> dict: """ Generate an execution plan without executing it. Useful for: - Previewing what will be executed - Checking cache status - Debugging recipe issues Args: recipe_sexp: Recipe S-expression content input_hashes: Mapping from input name to content hash Returns: Dict with plan details """ try: from artdag.sexp import compile_string except ImportError: raise ImportError("artdag.sexp not available") if Analyzer is None: raise ImportError("artdag.analysis not available") cache_mgr = get_cache_manager() # Parse recipe try: compiled = compile_string(recipe_sexp) except Exception as e: return {"status": "failed", "error": f"Recipe parse error: {e}"} # Extract and run analysis nodes from recipe analysis_nodes = _extract_analysis_from_recipe(compiled) ANALYSIS_CACHE_DIR.mkdir(parents=True, exist_ok=True) analyzer = Analyzer(cache_dir=ANALYSIS_CACHE_DIR) analysis_results = {} for analysis_node in analysis_nodes: input_ref = analysis_node["input_ref"] feature = analysis_node["feature"] node_id = analysis_node["node_id"] cid = input_hashes.get(input_ref) if not cid: continue path = cache_mgr.get_by_cid(cid) if path: try: features = [feature] if feature else ["beats", "energy"] result = analyzer.analyze( input_hash=cid, features=features, input_path=Path(path), ) analysis_results[node_id] = result analysis_results[cid] = result except Exception as e: logger.warning(f"Analysis failed for {node_id}: {e}") # Generate plan try: from artdag.sexp.planner import create_plan plan = create_plan(compiled, inputs=input_hashes) except ImportError: if RecipePlanner is None: raise ImportError("No planner available") recipe = Recipe.from_dict(compiled.to_dict()) planner = RecipePlanner(use_tree_reduction=True) plan = planner.plan( recipe=recipe, input_hashes=input_hashes, analysis=analysis_results, ) # Check cache status for each step steps_status = [] for step in plan.steps: cached = cache_mgr.has_content(step.cache_id) steps_status.append({ "step_id": step.step_id, "node_type": step.node_type, "cache_id": step.cache_id, "level": step.level, "cached": cached, }) cached_count = sum(1 for s in steps_status if s["cached"]) return { "status": "planned", "recipe": compiled.name, "plan_id": plan.plan_id, "total_steps": len(plan.steps), "cached_steps": cached_count, "pending_steps": len(plan.steps) - cached_count, "steps": steps_status, "plan_json": plan.to_json(), }