Add IPFS-primary orchestration

Complete pipeline with everything on IPFS:
- register_input_cid / register_recipe_cid
- generate_plan_cid (stores plan on IPFS)
- execute_plan_from_cid (fetches plan from IPFS)
- run_recipe_cid (full pipeline, returns output CID)
- run_from_local (convenience: local files → IPFS → run)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-11 08:18:34 +00:00
parent 92d154f524
commit 25f7213741

447
tasks/orchestrate_cid.py Normal file
View File

@@ -0,0 +1,447 @@
"""
IPFS-primary orchestration.
Everything on IPFS:
- Inputs (media files)
- Analysis results (JSON)
- Execution plans (JSON)
- Step outputs (media files)
The entire pipeline just passes CIDs around.
"""
import json
import logging
import os
import shutil
import tempfile
from pathlib import Path
from typing import Dict, List, Optional
from celery import current_task, group
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from celery_app import app
import ipfs_client
# Redis for caching
import redis
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/5")
_redis: Optional[redis.Redis] = None
def get_redis() -> redis.Redis:
global _redis
if _redis is None:
_redis = redis.from_url(REDIS_URL, decode_responses=True)
return _redis
# Import artdag modules
try:
from artdag.analysis import Analyzer, AnalysisResult
from artdag.planning import RecipePlanner, ExecutionPlan, Recipe
from artdag import nodes # Register executors
except ImportError:
Analyzer = None
AnalysisResult = None
RecipePlanner = None
ExecutionPlan = None
Recipe = None
from .analyze_cid import analyze_input_cid
from .execute_cid import execute_step_cid
logger = logging.getLogger(__name__)
# Redis keys
PLAN_CACHE_KEY = "artdag:plan_cid" # hash: plan_id → plan CID
RECIPE_CACHE_KEY = "artdag:recipe_cid" # hash: recipe_hash → recipe CID
RUN_CACHE_KEY = "artdag:run_cid" # hash: run_id → output CID
def compute_run_id(recipe_cid: str, input_cids: Dict[str, str]) -> str:
"""Compute deterministic run ID from recipe and inputs."""
import hashlib
# Sort inputs for determinism
sorted_inputs = sorted(input_cids.items())
data = f"{recipe_cid}:{sorted_inputs}"
return hashlib.sha3_256(data.encode()).hexdigest()
@app.task(bind=True, name='tasks.register_input_cid')
def register_input_cid(
self,
input_path: str,
) -> dict:
"""
Register an input file on IPFS.
Args:
input_path: Local path to the input file
Returns:
Dict with 'cid' and 'content_hash'
"""
import hashlib
path = Path(input_path)
if not path.exists():
return {"status": "failed", "error": f"File not found: {input_path}"}
# Compute content hash
with open(path, "rb") as f:
content_hash = hashlib.sha3_256(f.read()).hexdigest()
# Add to IPFS
cid = ipfs_client.add_file(path)
if not cid:
return {"status": "failed", "error": "Failed to add to IPFS"}
logger.info(f"[CID] Registered input: {path.name}{cid}")
return {
"status": "completed",
"cid": cid,
"content_hash": content_hash,
"path": str(path),
}
@app.task(bind=True, name='tasks.register_recipe_cid')
def register_recipe_cid(
self,
recipe_path: str,
) -> dict:
"""
Register a recipe on IPFS.
Args:
recipe_path: Local path to the recipe YAML file
Returns:
Dict with 'cid' and 'recipe' (parsed)
"""
if Recipe is None:
raise ImportError("artdag.planning not available")
path = Path(recipe_path)
if not path.exists():
return {"status": "failed", "error": f"Recipe not found: {recipe_path}"}
# Parse recipe
recipe = Recipe.from_file(path)
# Store recipe YAML on IPFS
recipe_bytes = path.read_bytes()
cid = ipfs_client.add_bytes(recipe_bytes)
if not cid:
return {"status": "failed", "error": "Failed to add recipe to IPFS"}
logger.info(f"[CID] Registered recipe: {recipe.name}{cid}")
return {
"status": "completed",
"cid": cid,
"name": recipe.name,
"version": recipe.version,
}
@app.task(bind=True, name='tasks.generate_plan_cid')
def generate_plan_cid(
self,
recipe_cid: str,
input_cids: Dict[str, str],
input_hashes: Dict[str, str],
analysis_cids: Optional[Dict[str, str]] = None,
) -> dict:
"""
Generate execution plan and store on IPFS.
Args:
recipe_cid: IPFS CID of the recipe
input_cids: Mapping from input name to IPFS CID
input_hashes: Mapping from input name to content hash
analysis_cids: Optional mapping from input_hash to analysis CID
Returns:
Dict with 'plan_cid' and 'plan_id'
"""
if RecipePlanner is None:
raise ImportError("artdag.planning not available")
# Fetch recipe from IPFS
recipe_bytes = ipfs_client.get_bytes(recipe_cid)
if not recipe_bytes:
return {"status": "failed", "error": f"Failed to fetch recipe: {recipe_cid}"}
# Parse recipe from YAML
import yaml
recipe_dict = yaml.safe_load(recipe_bytes.decode('utf-8'))
recipe = Recipe.from_dict(recipe_dict)
# Fetch analysis results if provided
analysis = {}
if analysis_cids:
for input_hash, analysis_cid in analysis_cids.items():
analysis_bytes = ipfs_client.get_bytes(analysis_cid)
if analysis_bytes:
analysis_dict = json.loads(analysis_bytes.decode('utf-8'))
analysis[input_hash] = AnalysisResult.from_dict(analysis_dict)
# Generate plan
planner = RecipePlanner(use_tree_reduction=True)
plan = planner.plan(
recipe=recipe,
input_hashes=input_hashes,
analysis=analysis if analysis else None,
)
# Store plan on IPFS
plan_cid = ipfs_client.add_json(json.loads(plan.to_json()))
if not plan_cid:
return {"status": "failed", "error": "Failed to store plan on IPFS"}
# Cache plan_id → plan_cid mapping
get_redis().hset(PLAN_CACHE_KEY, plan.plan_id, plan_cid)
logger.info(f"[CID] Generated plan: {plan.plan_id[:16]}... → {plan_cid}")
return {
"status": "completed",
"plan_cid": plan_cid,
"plan_id": plan.plan_id,
"steps": len(plan.steps),
}
@app.task(bind=True, name='tasks.execute_plan_from_cid')
def execute_plan_from_cid(
self,
plan_cid: str,
input_cids: Dict[str, str],
) -> dict:
"""
Execute a plan fetched from IPFS.
Args:
plan_cid: IPFS CID of the execution plan
input_cids: Mapping from input name/step_id to IPFS CID
Returns:
Dict with 'output_cid' and step results
"""
if ExecutionPlan is None:
raise ImportError("artdag.planning not available")
# Fetch plan from IPFS
plan_bytes = ipfs_client.get_bytes(plan_cid)
if not plan_bytes:
return {"status": "failed", "error": f"Failed to fetch plan: {plan_cid}"}
plan_json = plan_bytes.decode('utf-8')
plan = ExecutionPlan.from_json(plan_json)
logger.info(f"[CID] Executing plan: {plan.plan_id[:16]}... ({len(plan.steps)} steps)")
# CID results accumulate as steps complete
cid_results = dict(input_cids)
step_cids = {}
steps_by_level = plan.get_steps_by_level()
for level in sorted(steps_by_level.keys()):
steps = steps_by_level[level]
logger.info(f"[CID] Level {level}: {len(steps)} steps")
# Build input CIDs for this level
level_input_cids = dict(cid_results)
level_input_cids.update(step_cids)
# Dispatch steps in parallel
tasks = [
execute_step_cid.s(step.to_json(), level_input_cids)
for step in steps
]
if len(tasks) == 1:
results = [tasks[0].apply_async().get(timeout=3600)]
else:
job = group(tasks)
results = job.apply_async().get(timeout=3600)
# Collect output CIDs
for step, result in zip(steps, results):
if result.get("status") in ("completed", "cached", "completed_by_other"):
step_cids[step.step_id] = result["cid"]
else:
return {
"status": "failed",
"failed_step": step.step_id,
"error": result.get("error", "Unknown error"),
}
# Get final output CID
output_step_id = plan.output_step or plan.steps[-1].step_id
output_cid = step_cids.get(output_step_id)
logger.info(f"[CID] Plan complete: {output_cid}")
return {
"status": "completed",
"plan_id": plan.plan_id,
"plan_cid": plan_cid,
"output_cid": output_cid,
"step_cids": step_cids,
}
@app.task(bind=True, name='tasks.run_recipe_cid')
def run_recipe_cid(
self,
recipe_cid: str,
input_cids: Dict[str, str],
input_hashes: Dict[str, str],
features: Optional[List[str]] = None,
) -> dict:
"""
Run complete pipeline: analyze → plan → execute.
Everything on IPFS. Returns output CID.
Args:
recipe_cid: IPFS CID of the recipe
input_cids: Mapping from input name to IPFS CID
input_hashes: Mapping from input name to content hash
features: Optional list of features to analyze
Returns:
Dict with output_cid and all intermediate CIDs
"""
if features is None:
features = ["beats", "energy"]
logger.info(f"[CID] Running recipe {recipe_cid} with {len(input_cids)} inputs")
# Compute run ID for caching
run_id = compute_run_id(recipe_cid, input_cids)
# Check if run is already cached
cached_output = get_redis().hget(RUN_CACHE_KEY, run_id)
if cached_output:
logger.info(f"[CID] Run cache hit: {run_id[:16]}... → {cached_output}")
return {
"status": "cached",
"run_id": run_id,
"output_cid": cached_output,
}
# Phase 1: Analyze inputs
logger.info("[CID] Phase 1: Analysis")
analysis_cids = {}
for input_name, input_cid in input_cids.items():
input_hash = input_hashes.get(input_name)
if not input_hash:
continue
result = analyze_input_cid.apply_async(
args=[input_cid, input_hash, features]
).get(timeout=600)
if result.get("status") in ("completed", "cached"):
analysis_cids[input_hash] = result["analysis_cid"]
else:
logger.warning(f"[CID] Analysis failed for {input_name}: {result.get('error')}")
# Phase 2: Generate plan
logger.info("[CID] Phase 2: Planning")
plan_result = generate_plan_cid.apply_async(
args=[recipe_cid, input_cids, input_hashes, analysis_cids]
).get(timeout=120)
if plan_result.get("status") != "completed":
return {
"status": "failed",
"phase": "planning",
"error": plan_result.get("error", "Unknown error"),
}
plan_cid = plan_result["plan_cid"]
# Phase 3: Execute
logger.info("[CID] Phase 3: Execution")
exec_result = execute_plan_from_cid.apply_async(
args=[plan_cid, input_cids]
).get(timeout=7200) # 2 hour timeout for execution
if exec_result.get("status") != "completed":
return {
"status": "failed",
"phase": "execution",
"error": exec_result.get("error", "Unknown error"),
}
output_cid = exec_result["output_cid"]
# Cache the run
get_redis().hset(RUN_CACHE_KEY, run_id, output_cid)
logger.info(f"[CID] Run complete: {run_id[:16]}... → {output_cid}")
return {
"status": "completed",
"run_id": run_id,
"recipe_cid": recipe_cid,
"analysis_cids": analysis_cids,
"plan_cid": plan_cid,
"output_cid": output_cid,
"step_cids": exec_result.get("step_cids", {}),
}
@app.task(bind=True, name='tasks.run_from_local')
def run_from_local(
self,
recipe_path: str,
input_paths: Dict[str, str],
features: Optional[List[str]] = None,
) -> dict:
"""
Convenience task: register local files and run.
For bootstrapping - registers recipe and inputs on IPFS, then runs.
Args:
recipe_path: Local path to recipe YAML
input_paths: Mapping from input name to local file path
features: Optional list of features to analyze
Returns:
Dict with all CIDs including output
"""
import hashlib
# Register recipe
recipe_result = register_recipe_cid.apply_async(args=[recipe_path]).get(timeout=60)
if recipe_result.get("status") != "completed":
return {"status": "failed", "phase": "register_recipe", "error": recipe_result.get("error")}
recipe_cid = recipe_result["cid"]
# Register inputs
input_cids = {}
input_hashes = {}
for name, path in input_paths.items():
result = register_input_cid.apply_async(args=[path]).get(timeout=300)
if result.get("status") != "completed":
return {"status": "failed", "phase": "register_input", "input": name, "error": result.get("error")}
input_cids[name] = result["cid"]
input_hashes[name] = result["content_hash"]
# Run the pipeline
return run_recipe_cid.apply_async(
args=[recipe_cid, input_cids, input_hashes, features]
).get(timeout=7200)