Refactor to use IPFS CID as the primary content identifier: - Update database schema: content_hash -> cid, output_hash -> output_cid - Update all services, routers, and tasks to use cid terminology - Update HTML templates to display CID instead of hash - Update cache_manager parameter names - Update README documentation This completes the transition to CID-only content addressing. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
514 lines
16 KiB
Python
514 lines
16 KiB
Python
"""
|
|
Plan orchestration tasks.
|
|
|
|
Coordinates the full 3-phase execution:
|
|
1. Analyze inputs
|
|
2. Generate plan
|
|
3. Execute steps level by level
|
|
|
|
Uses IPFS-backed cache for durability.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
|
|
from celery import current_task, group, chain
|
|
|
|
# Import from the Celery app
|
|
import sys
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
from celery_app import app
|
|
from claiming import get_claimer
|
|
from cache_manager import get_cache_manager
|
|
|
|
# Import artdag modules
|
|
try:
|
|
from artdag import Cache
|
|
from artdag.analysis import Analyzer, AnalysisResult
|
|
from artdag.planning import RecipePlanner, ExecutionPlan, Recipe
|
|
except ImportError:
|
|
Cache = None
|
|
Analyzer = None
|
|
AnalysisResult = None
|
|
RecipePlanner = None
|
|
ExecutionPlan = None
|
|
Recipe = None
|
|
|
|
from .execute import execute_step
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Cache directories
|
|
CACHE_DIR = Path(os.environ.get('CACHE_DIR', str(Path.home() / ".artdag" / "cache")))
|
|
ANALYSIS_CACHE_DIR = CACHE_DIR / 'analysis'
|
|
PLAN_CACHE_DIR = CACHE_DIR / 'plans'
|
|
|
|
|
|
@app.task(bind=True, name='tasks.run_plan')
|
|
def run_plan(
|
|
self,
|
|
plan_json: str,
|
|
run_id: Optional[str] = None,
|
|
) -> dict:
|
|
"""
|
|
Execute a complete execution plan.
|
|
|
|
Runs steps level by level, with parallel execution within each level.
|
|
Results are stored in IPFS-backed cache.
|
|
|
|
Args:
|
|
plan_json: JSON-serialized ExecutionPlan
|
|
run_id: Optional run ID for tracking
|
|
|
|
Returns:
|
|
Dict with execution results
|
|
"""
|
|
if ExecutionPlan is None:
|
|
raise ImportError("artdag.planning not available")
|
|
|
|
plan = ExecutionPlan.from_json(plan_json)
|
|
cache_mgr = get_cache_manager()
|
|
|
|
logger.info(f"Executing plan {plan.plan_id[:16]}... ({len(plan.steps)} steps)")
|
|
|
|
# Build initial cache_ids mapping (step_id -> cache_id)
|
|
cache_ids = {}
|
|
for step in plan.steps:
|
|
cache_ids[step.step_id] = step.cache_id
|
|
|
|
# Also map input hashes
|
|
for name, cid in plan.input_hashes.items():
|
|
cache_ids[name] = cid
|
|
|
|
# Group steps by level
|
|
steps_by_level = plan.get_steps_by_level()
|
|
max_level = max(steps_by_level.keys()) if steps_by_level else 0
|
|
|
|
results_by_step = {}
|
|
total_cached = 0
|
|
total_executed = 0
|
|
|
|
for level in range(max_level + 1):
|
|
level_steps = steps_by_level.get(level, [])
|
|
if not level_steps:
|
|
continue
|
|
|
|
logger.info(f"Executing level {level}: {len(level_steps)} steps")
|
|
|
|
# Check which steps need execution
|
|
steps_to_run = []
|
|
|
|
for step in level_steps:
|
|
# Check if cached
|
|
cached_path = cache_mgr.get_by_cid(step.cache_id)
|
|
if cached_path:
|
|
results_by_step[step.step_id] = {
|
|
"status": "cached",
|
|
"cache_id": step.cache_id,
|
|
"output_path": str(cached_path),
|
|
}
|
|
total_cached += 1
|
|
else:
|
|
steps_to_run.append(step)
|
|
|
|
if not steps_to_run:
|
|
logger.info(f"Level {level}: all steps cached")
|
|
continue
|
|
|
|
# Build input cache_ids for this level
|
|
level_cache_ids = dict(cache_ids)
|
|
|
|
# Execute steps in parallel
|
|
tasks = [
|
|
execute_step.s(step.to_json(), plan.plan_id, level_cache_ids)
|
|
for step in steps_to_run
|
|
]
|
|
|
|
job = group(tasks)
|
|
async_results = job.apply_async()
|
|
|
|
# Wait for completion
|
|
try:
|
|
step_results = async_results.get(timeout=3600)
|
|
except Exception as e:
|
|
logger.error(f"Level {level} execution failed: {e}")
|
|
return {
|
|
"status": "failed",
|
|
"error": str(e),
|
|
"level": level,
|
|
"results": results_by_step,
|
|
"run_id": run_id,
|
|
}
|
|
|
|
# Process results
|
|
for result in step_results:
|
|
step_id = result.get("step_id")
|
|
cache_id = result.get("cache_id")
|
|
|
|
results_by_step[step_id] = result
|
|
cache_ids[step_id] = cache_id
|
|
|
|
if result.get("status") in ("completed", "cached", "completed_by_other"):
|
|
total_executed += 1
|
|
elif result.get("status") == "failed":
|
|
logger.error(f"Step {step_id} failed: {result.get('error')}")
|
|
return {
|
|
"status": "failed",
|
|
"error": f"Step {step_id} failed: {result.get('error')}",
|
|
"level": level,
|
|
"results": results_by_step,
|
|
"run_id": run_id,
|
|
}
|
|
|
|
# Get final output
|
|
output_step = plan.get_step(plan.output_step)
|
|
output_cache_id = output_step.cache_id if output_step else None
|
|
output_path = None
|
|
output_ipfs_cid = None
|
|
output_name = plan.output_name
|
|
|
|
if output_cache_id:
|
|
output_path = cache_mgr.get_by_cid(output_cache_id)
|
|
output_ipfs_cid = cache_mgr.get_ipfs_cid(output_cache_id)
|
|
|
|
# Build list of all outputs with their names and artifacts
|
|
all_outputs = []
|
|
for step in plan.steps:
|
|
step_result = results_by_step.get(step.step_id, {})
|
|
step_outputs = step_result.get("outputs", [])
|
|
|
|
# If no outputs in result, build from step definition
|
|
if not step_outputs and step.outputs:
|
|
for output_def in step.outputs:
|
|
output_cache_path = cache_mgr.get_by_cid(output_def.cache_id)
|
|
output_ipfs = cache_mgr.get_ipfs_cid(output_def.cache_id) if output_cache_path else None
|
|
all_outputs.append({
|
|
"name": output_def.name,
|
|
"step_id": step.step_id,
|
|
"step_name": step.name,
|
|
"cache_id": output_def.cache_id,
|
|
"media_type": output_def.media_type,
|
|
"path": str(output_cache_path) if output_cache_path else None,
|
|
"ipfs_cid": output_ipfs,
|
|
"status": "cached" if output_cache_path else "missing",
|
|
})
|
|
else:
|
|
for output in step_outputs:
|
|
all_outputs.append({
|
|
**output,
|
|
"step_id": step.step_id,
|
|
"step_name": step.name,
|
|
"status": "completed",
|
|
})
|
|
|
|
return {
|
|
"status": "completed",
|
|
"run_id": run_id,
|
|
"plan_id": plan.plan_id,
|
|
"plan_name": plan.name,
|
|
"recipe_name": plan.recipe_name,
|
|
"output_name": output_name,
|
|
"output_cache_id": output_cache_id,
|
|
"output_path": str(output_path) if output_path else None,
|
|
"output_ipfs_cid": output_ipfs_cid,
|
|
"total_steps": len(plan.steps),
|
|
"cached": total_cached,
|
|
"executed": total_executed,
|
|
"results": results_by_step,
|
|
"outputs": all_outputs,
|
|
}
|
|
|
|
|
|
def _extract_analysis_from_recipe(compiled_recipe) -> List[Dict]:
|
|
"""
|
|
Extract analysis nodes from a compiled recipe.
|
|
|
|
Finds all (analyze ...) nodes and returns their configurations.
|
|
Analysis nodes are identified by type "ANALYZE" or by having
|
|
an "analyze" config key.
|
|
"""
|
|
analysis_nodes = []
|
|
nodes = compiled_recipe.nodes
|
|
|
|
if isinstance(nodes, dict):
|
|
nodes = list(nodes.values())
|
|
|
|
for node in nodes:
|
|
node_type = node.get("type", "").upper()
|
|
config = node.get("config", {})
|
|
|
|
# Check if this is an analysis node
|
|
if node_type == "ANALYZE" or config.get("analyze"):
|
|
analysis_nodes.append({
|
|
"node_id": node.get("id"),
|
|
"input_ref": config.get("input") or config.get("source"),
|
|
"feature": config.get("feature") or config.get("analyze"),
|
|
"config": config,
|
|
})
|
|
|
|
return analysis_nodes
|
|
|
|
|
|
@app.task(bind=True, name='tasks.run_recipe')
|
|
def run_recipe(
|
|
self,
|
|
recipe_sexp: str,
|
|
input_hashes: Dict[str, str],
|
|
run_id: Optional[str] = None,
|
|
) -> dict:
|
|
"""
|
|
Run a complete recipe through all phases.
|
|
|
|
The recipe S-expression declares what analysis is needed.
|
|
Analysis nodes in the recipe are executed first, then their
|
|
outputs are used to generate the execution plan.
|
|
|
|
1. Parse: Compile recipe S-expression
|
|
2. Analyze: Run analysis nodes from recipe
|
|
3. Plan: Generate execution plan using analysis results
|
|
4. Execute: Run the plan
|
|
|
|
Args:
|
|
recipe_sexp: Recipe S-expression content
|
|
input_hashes: Mapping from input name to content hash
|
|
run_id: Optional run ID for tracking
|
|
|
|
Returns:
|
|
Dict with final results
|
|
"""
|
|
# Import S-expression compiler
|
|
try:
|
|
from artdag.sexp import compile_string
|
|
except ImportError:
|
|
raise ImportError("artdag.sexp not available")
|
|
|
|
if Analyzer is None:
|
|
raise ImportError("artdag.analysis not available")
|
|
|
|
cache_mgr = get_cache_manager()
|
|
|
|
logger.info(f"Running recipe with {len(input_hashes)} inputs")
|
|
|
|
# Phase 1: Parse recipe
|
|
logger.info("Phase 1: Parsing recipe S-expression...")
|
|
|
|
try:
|
|
compiled = compile_string(recipe_sexp)
|
|
except Exception as e:
|
|
return {"status": "failed", "error": f"Recipe parse error: {e}"}
|
|
|
|
logger.info(f"Parsed recipe: {compiled.name}")
|
|
|
|
# Phase 2: Run analysis nodes from recipe
|
|
logger.info("Phase 2: Running analysis from recipe...")
|
|
|
|
analysis_nodes = _extract_analysis_from_recipe(compiled)
|
|
logger.info(f"Found {len(analysis_nodes)} analysis nodes in recipe")
|
|
|
|
ANALYSIS_CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
|
analyzer = Analyzer(cache_dir=ANALYSIS_CACHE_DIR)
|
|
|
|
analysis_results = {}
|
|
for analysis_node in analysis_nodes:
|
|
input_ref = analysis_node["input_ref"]
|
|
feature = analysis_node["feature"]
|
|
node_id = analysis_node["node_id"]
|
|
|
|
# Resolve input reference to content hash
|
|
cid = input_hashes.get(input_ref)
|
|
if not cid:
|
|
logger.warning(f"Analysis node {node_id}: input '{input_ref}' not in input_hashes")
|
|
continue
|
|
|
|
path = cache_mgr.get_by_cid(cid)
|
|
if not path:
|
|
logger.warning(f"Analysis node {node_id}: content {cid[:16]}... not in cache")
|
|
continue
|
|
|
|
try:
|
|
# Run analysis for the specific feature
|
|
features = [feature] if feature else ["beats", "energy"]
|
|
result = analyzer.analyze(
|
|
input_hash=cid,
|
|
features=features,
|
|
input_path=Path(path),
|
|
)
|
|
# Store result keyed by node_id so plan can reference it
|
|
analysis_results[node_id] = result
|
|
# Also store by cid for compatibility
|
|
analysis_results[cid] = result
|
|
logger.info(f"Analysis {node_id}: feature={feature}, tempo={result.tempo}")
|
|
except Exception as e:
|
|
logger.warning(f"Analysis failed for {node_id}: {e}")
|
|
|
|
logger.info(f"Completed {len(analysis_results)} analysis results")
|
|
|
|
# Phase 3: Generate plan
|
|
logger.info("Phase 3: Generating execution plan...")
|
|
|
|
# Use the S-expression planner if available
|
|
try:
|
|
from artdag.sexp.planner import create_plan
|
|
plan = create_plan(compiled, inputs=input_hashes)
|
|
except ImportError:
|
|
# Fall back to legacy planner
|
|
if RecipePlanner is None:
|
|
raise ImportError("No planner available")
|
|
recipe = Recipe.from_dict(compiled.to_dict())
|
|
planner = RecipePlanner(use_tree_reduction=True)
|
|
plan = planner.plan(
|
|
recipe=recipe,
|
|
input_hashes=input_hashes,
|
|
analysis=analysis_results,
|
|
)
|
|
|
|
logger.info(f"Generated plan with {len(plan.steps)} steps")
|
|
|
|
# Save plan as S-expression through cache manager (goes to IPFS)
|
|
import tempfile
|
|
|
|
plan_content = plan.to_sexp_string() if hasattr(plan, 'to_sexp_string') else plan.to_json()
|
|
plan_suffix = ".sexp" if hasattr(plan, 'to_sexp_string') else ".json"
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=plan_suffix, mode="w") as tmp:
|
|
tmp.write(plan_content)
|
|
tmp_path = Path(tmp.name)
|
|
|
|
# Store in cache (content-addressed, auto-pins to IPFS)
|
|
# Plan is just another node output - no special treatment needed
|
|
cached, plan_ipfs_cid = cache_mgr.put(tmp_path, node_type="plan", move=True)
|
|
logger.info(f"Plan cached: hash={cached.cid}, ipfs={plan_ipfs_cid}")
|
|
|
|
# Phase 4: Execute
|
|
logger.info("Phase 4: Executing plan...")
|
|
|
|
result = run_plan(plan.to_json(), run_id=run_id)
|
|
|
|
return {
|
|
"status": result.get("status"),
|
|
"run_id": run_id,
|
|
"recipe": compiled.name,
|
|
"plan_id": plan.plan_id,
|
|
"plan_cache_id": cached.cid,
|
|
"plan_ipfs_cid": plan_ipfs_cid,
|
|
"output_path": result.get("output_path"),
|
|
"output_cache_id": result.get("output_cache_id"),
|
|
"output_ipfs_cid": result.get("output_ipfs_cid"),
|
|
"analysis_count": len(analysis_results),
|
|
"total_steps": len(plan.steps),
|
|
"cached": result.get("cached", 0),
|
|
"executed": result.get("executed", 0),
|
|
"error": result.get("error"),
|
|
}
|
|
|
|
|
|
@app.task(bind=True, name='tasks.generate_plan')
|
|
def generate_plan(
|
|
self,
|
|
recipe_sexp: str,
|
|
input_hashes: Dict[str, str],
|
|
) -> dict:
|
|
"""
|
|
Generate an execution plan without executing it.
|
|
|
|
Useful for:
|
|
- Previewing what will be executed
|
|
- Checking cache status
|
|
- Debugging recipe issues
|
|
|
|
Args:
|
|
recipe_sexp: Recipe S-expression content
|
|
input_hashes: Mapping from input name to content hash
|
|
|
|
Returns:
|
|
Dict with plan details
|
|
"""
|
|
try:
|
|
from artdag.sexp import compile_string
|
|
except ImportError:
|
|
raise ImportError("artdag.sexp not available")
|
|
|
|
if Analyzer is None:
|
|
raise ImportError("artdag.analysis not available")
|
|
|
|
cache_mgr = get_cache_manager()
|
|
|
|
# Parse recipe
|
|
try:
|
|
compiled = compile_string(recipe_sexp)
|
|
except Exception as e:
|
|
return {"status": "failed", "error": f"Recipe parse error: {e}"}
|
|
|
|
# Extract and run analysis nodes from recipe
|
|
analysis_nodes = _extract_analysis_from_recipe(compiled)
|
|
|
|
ANALYSIS_CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
|
analyzer = Analyzer(cache_dir=ANALYSIS_CACHE_DIR)
|
|
|
|
analysis_results = {}
|
|
for analysis_node in analysis_nodes:
|
|
input_ref = analysis_node["input_ref"]
|
|
feature = analysis_node["feature"]
|
|
node_id = analysis_node["node_id"]
|
|
|
|
cid = input_hashes.get(input_ref)
|
|
if not cid:
|
|
continue
|
|
|
|
path = cache_mgr.get_by_cid(cid)
|
|
if path:
|
|
try:
|
|
features = [feature] if feature else ["beats", "energy"]
|
|
result = analyzer.analyze(
|
|
input_hash=cid,
|
|
features=features,
|
|
input_path=Path(path),
|
|
)
|
|
analysis_results[node_id] = result
|
|
analysis_results[cid] = result
|
|
except Exception as e:
|
|
logger.warning(f"Analysis failed for {node_id}: {e}")
|
|
|
|
# Generate plan
|
|
try:
|
|
from artdag.sexp.planner import create_plan
|
|
plan = create_plan(compiled, inputs=input_hashes)
|
|
except ImportError:
|
|
if RecipePlanner is None:
|
|
raise ImportError("No planner available")
|
|
recipe = Recipe.from_dict(compiled.to_dict())
|
|
planner = RecipePlanner(use_tree_reduction=True)
|
|
plan = planner.plan(
|
|
recipe=recipe,
|
|
input_hashes=input_hashes,
|
|
analysis=analysis_results,
|
|
)
|
|
|
|
# Check cache status for each step
|
|
steps_status = []
|
|
for step in plan.steps:
|
|
cached = cache_mgr.has_content(step.cache_id)
|
|
steps_status.append({
|
|
"step_id": step.step_id,
|
|
"node_type": step.node_type,
|
|
"cache_id": step.cache_id,
|
|
"level": step.level,
|
|
"cached": cached,
|
|
})
|
|
|
|
cached_count = sum(1 for s in steps_status if s["cached"])
|
|
|
|
return {
|
|
"status": "planned",
|
|
"recipe": compiled.name,
|
|
"plan_id": plan.plan_id,
|
|
"total_steps": len(plan.steps),
|
|
"cached_steps": cached_count,
|
|
"pending_steps": len(plan.steps) - cached_count,
|
|
"steps": steps_status,
|
|
"plan_json": plan.to_json(),
|
|
}
|