Refactor to use IPFS CID as the primary content identifier: - Update database schema: content_hash -> cid, output_hash -> output_cid - Update all services, routers, and tasks to use cid terminology - Update HTML templates to display CID instead of hash - Update cache_manager parameter names - Update README documentation This completes the transition to CID-only content addressing. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
435 lines
12 KiB
Python
435 lines
12 KiB
Python
"""
|
|
IPFS-primary orchestration.
|
|
|
|
Everything on IPFS:
|
|
- Inputs (media files)
|
|
- Analysis results (JSON)
|
|
- Execution plans (JSON)
|
|
- Step outputs (media files)
|
|
|
|
The entire pipeline just passes CIDs around.
|
|
|
|
Uses HybridStateManager for:
|
|
- Fast local Redis operations
|
|
- Background IPNS sync with other L1 nodes
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
|
|
from celery import group
|
|
|
|
import sys
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
from celery_app import app
|
|
import ipfs_client
|
|
from hybrid_state import get_state_manager
|
|
|
|
# Import artdag modules
|
|
try:
|
|
from artdag.analysis import Analyzer, AnalysisResult
|
|
from artdag.planning import RecipePlanner, ExecutionPlan, Recipe
|
|
from artdag import nodes # Register executors
|
|
except ImportError:
|
|
Analyzer = None
|
|
AnalysisResult = None
|
|
RecipePlanner = None
|
|
ExecutionPlan = None
|
|
Recipe = None
|
|
|
|
from .analyze_cid import analyze_input_cid
|
|
from .execute_cid import execute_step_cid
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def compute_run_id(recipe_cid: str, input_cids: Dict[str, str]) -> str:
|
|
"""Compute deterministic run ID from recipe and inputs."""
|
|
import hashlib
|
|
# Sort inputs for determinism
|
|
sorted_inputs = sorted(input_cids.items())
|
|
data = f"{recipe_cid}:{sorted_inputs}"
|
|
return hashlib.sha3_256(data.encode()).hexdigest()
|
|
|
|
|
|
@app.task(bind=True, name='tasks.register_input_cid')
|
|
def register_input_cid(
|
|
self,
|
|
input_path: str,
|
|
) -> dict:
|
|
"""
|
|
Register an input file on IPFS.
|
|
|
|
Args:
|
|
input_path: Local path to the input file
|
|
|
|
Returns:
|
|
Dict with 'cid' and 'cid'
|
|
"""
|
|
import hashlib
|
|
|
|
path = Path(input_path)
|
|
if not path.exists():
|
|
return {"status": "failed", "error": f"File not found: {input_path}"}
|
|
|
|
# Compute content hash
|
|
with open(path, "rb") as f:
|
|
cid = hashlib.sha3_256(f.read()).hexdigest()
|
|
|
|
# Add to IPFS
|
|
cid = ipfs_client.add_file(path)
|
|
if not cid:
|
|
return {"status": "failed", "error": "Failed to add to IPFS"}
|
|
|
|
logger.info(f"[CID] Registered input: {path.name} → {cid}")
|
|
|
|
return {
|
|
"status": "completed",
|
|
"cid": cid,
|
|
"cid": cid,
|
|
"path": str(path),
|
|
}
|
|
|
|
|
|
@app.task(bind=True, name='tasks.register_recipe_cid')
|
|
def register_recipe_cid(
|
|
self,
|
|
recipe_path: str,
|
|
) -> dict:
|
|
"""
|
|
Register a recipe on IPFS.
|
|
|
|
Args:
|
|
recipe_path: Local path to the recipe YAML file
|
|
|
|
Returns:
|
|
Dict with 'cid' and 'recipe' (parsed)
|
|
"""
|
|
if Recipe is None:
|
|
raise ImportError("artdag.planning not available")
|
|
|
|
path = Path(recipe_path)
|
|
if not path.exists():
|
|
return {"status": "failed", "error": f"Recipe not found: {recipe_path}"}
|
|
|
|
# Parse recipe
|
|
recipe = Recipe.from_file(path)
|
|
|
|
# Store recipe YAML on IPFS
|
|
recipe_bytes = path.read_bytes()
|
|
cid = ipfs_client.add_bytes(recipe_bytes)
|
|
if not cid:
|
|
return {"status": "failed", "error": "Failed to add recipe to IPFS"}
|
|
|
|
logger.info(f"[CID] Registered recipe: {recipe.name} → {cid}")
|
|
|
|
return {
|
|
"status": "completed",
|
|
"cid": cid,
|
|
"name": recipe.name,
|
|
"version": recipe.version,
|
|
}
|
|
|
|
|
|
@app.task(bind=True, name='tasks.generate_plan_cid')
|
|
def generate_plan_cid(
|
|
self,
|
|
recipe_cid: str,
|
|
input_cids: Dict[str, str],
|
|
input_hashes: Dict[str, str],
|
|
analysis_cids: Optional[Dict[str, str]] = None,
|
|
) -> dict:
|
|
"""
|
|
Generate execution plan and store on IPFS.
|
|
|
|
Args:
|
|
recipe_cid: IPFS CID of the recipe
|
|
input_cids: Mapping from input name to IPFS CID
|
|
input_hashes: Mapping from input name to content hash
|
|
analysis_cids: Optional mapping from input_hash to analysis CID
|
|
|
|
Returns:
|
|
Dict with 'plan_cid' and 'plan_id'
|
|
"""
|
|
if RecipePlanner is None:
|
|
raise ImportError("artdag.planning not available")
|
|
|
|
# Fetch recipe from IPFS
|
|
recipe_bytes = ipfs_client.get_bytes(recipe_cid)
|
|
if not recipe_bytes:
|
|
return {"status": "failed", "error": f"Failed to fetch recipe: {recipe_cid}"}
|
|
|
|
# Parse recipe from YAML
|
|
import yaml
|
|
recipe_dict = yaml.safe_load(recipe_bytes.decode('utf-8'))
|
|
recipe = Recipe.from_dict(recipe_dict)
|
|
|
|
# Fetch analysis results if provided
|
|
analysis = {}
|
|
if analysis_cids:
|
|
for input_hash, analysis_cid in analysis_cids.items():
|
|
analysis_bytes = ipfs_client.get_bytes(analysis_cid)
|
|
if analysis_bytes:
|
|
analysis_dict = json.loads(analysis_bytes.decode('utf-8'))
|
|
analysis[input_hash] = AnalysisResult.from_dict(analysis_dict)
|
|
|
|
# Generate plan
|
|
planner = RecipePlanner(use_tree_reduction=True)
|
|
plan = planner.plan(
|
|
recipe=recipe,
|
|
input_hashes=input_hashes,
|
|
analysis=analysis if analysis else None,
|
|
)
|
|
|
|
# Store plan on IPFS
|
|
plan_cid = ipfs_client.add_json(json.loads(plan.to_json()))
|
|
if not plan_cid:
|
|
return {"status": "failed", "error": "Failed to store plan on IPFS"}
|
|
|
|
# Cache plan_id → plan_cid mapping
|
|
get_state_manager().set_plan_cid(plan.plan_id, plan_cid)
|
|
|
|
logger.info(f"[CID] Generated plan: {plan.plan_id[:16]}... → {plan_cid}")
|
|
|
|
return {
|
|
"status": "completed",
|
|
"plan_cid": plan_cid,
|
|
"plan_id": plan.plan_id,
|
|
"steps": len(plan.steps),
|
|
}
|
|
|
|
|
|
@app.task(bind=True, name='tasks.execute_plan_from_cid')
|
|
def execute_plan_from_cid(
|
|
self,
|
|
plan_cid: str,
|
|
input_cids: Dict[str, str],
|
|
) -> dict:
|
|
"""
|
|
Execute a plan fetched from IPFS.
|
|
|
|
Args:
|
|
plan_cid: IPFS CID of the execution plan
|
|
input_cids: Mapping from input name/step_id to IPFS CID
|
|
|
|
Returns:
|
|
Dict with 'output_cid' and step results
|
|
"""
|
|
if ExecutionPlan is None:
|
|
raise ImportError("artdag.planning not available")
|
|
|
|
# Fetch plan from IPFS
|
|
plan_bytes = ipfs_client.get_bytes(plan_cid)
|
|
if not plan_bytes:
|
|
return {"status": "failed", "error": f"Failed to fetch plan: {plan_cid}"}
|
|
|
|
plan_json = plan_bytes.decode('utf-8')
|
|
plan = ExecutionPlan.from_json(plan_json)
|
|
|
|
logger.info(f"[CID] Executing plan: {plan.plan_id[:16]}... ({len(plan.steps)} steps)")
|
|
|
|
# CID results accumulate as steps complete
|
|
cid_results = dict(input_cids)
|
|
step_cids = {}
|
|
|
|
steps_by_level = plan.get_steps_by_level()
|
|
|
|
for level in sorted(steps_by_level.keys()):
|
|
steps = steps_by_level[level]
|
|
logger.info(f"[CID] Level {level}: {len(steps)} steps")
|
|
|
|
# Build input CIDs for this level
|
|
level_input_cids = dict(cid_results)
|
|
level_input_cids.update(step_cids)
|
|
|
|
# Dispatch steps in parallel
|
|
tasks = [
|
|
execute_step_cid.s(step.to_json(), level_input_cids)
|
|
for step in steps
|
|
]
|
|
|
|
if len(tasks) == 1:
|
|
results = [tasks[0].apply_async().get(timeout=3600)]
|
|
else:
|
|
job = group(tasks)
|
|
results = job.apply_async().get(timeout=3600)
|
|
|
|
# Collect output CIDs
|
|
for step, result in zip(steps, results):
|
|
if result.get("status") in ("completed", "cached", "completed_by_other"):
|
|
step_cids[step.step_id] = result["cid"]
|
|
else:
|
|
return {
|
|
"status": "failed",
|
|
"failed_step": step.step_id,
|
|
"error": result.get("error", "Unknown error"),
|
|
}
|
|
|
|
# Get final output CID
|
|
output_step_id = plan.output_step or plan.steps[-1].step_id
|
|
output_cid = step_cids.get(output_step_id)
|
|
|
|
logger.info(f"[CID] Plan complete: {output_cid}")
|
|
|
|
return {
|
|
"status": "completed",
|
|
"plan_id": plan.plan_id,
|
|
"plan_cid": plan_cid,
|
|
"output_cid": output_cid,
|
|
"step_cids": step_cids,
|
|
}
|
|
|
|
|
|
@app.task(bind=True, name='tasks.run_recipe_cid')
|
|
def run_recipe_cid(
|
|
self,
|
|
recipe_cid: str,
|
|
input_cids: Dict[str, str],
|
|
input_hashes: Dict[str, str],
|
|
features: Optional[List[str]] = None,
|
|
) -> dict:
|
|
"""
|
|
Run complete pipeline: analyze → plan → execute.
|
|
|
|
Everything on IPFS. Returns output CID.
|
|
|
|
Args:
|
|
recipe_cid: IPFS CID of the recipe
|
|
input_cids: Mapping from input name to IPFS CID
|
|
input_hashes: Mapping from input name to content hash
|
|
features: Optional list of features to analyze
|
|
|
|
Returns:
|
|
Dict with output_cid and all intermediate CIDs
|
|
"""
|
|
if features is None:
|
|
features = ["beats", "energy"]
|
|
|
|
logger.info(f"[CID] Running recipe {recipe_cid} with {len(input_cids)} inputs")
|
|
|
|
# Compute run ID for caching
|
|
run_id = compute_run_id(recipe_cid, input_cids)
|
|
|
|
# Check if run is already cached
|
|
cached_output = get_state_manager().get_run_cid(run_id)
|
|
if cached_output:
|
|
logger.info(f"[CID] Run cache hit: {run_id[:16]}... → {cached_output}")
|
|
return {
|
|
"status": "cached",
|
|
"run_id": run_id,
|
|
"output_cid": cached_output,
|
|
}
|
|
|
|
# Phase 1: Analyze inputs
|
|
logger.info("[CID] Phase 1: Analysis")
|
|
analysis_cids = {}
|
|
|
|
for input_name, input_cid in input_cids.items():
|
|
input_hash = input_hashes.get(input_name)
|
|
if not input_hash:
|
|
continue
|
|
|
|
result = analyze_input_cid.apply_async(
|
|
args=[input_cid, input_hash, features]
|
|
).get(timeout=600)
|
|
|
|
if result.get("status") in ("completed", "cached"):
|
|
analysis_cids[input_hash] = result["analysis_cid"]
|
|
else:
|
|
logger.warning(f"[CID] Analysis failed for {input_name}: {result.get('error')}")
|
|
|
|
# Phase 2: Generate plan
|
|
logger.info("[CID] Phase 2: Planning")
|
|
plan_result = generate_plan_cid.apply_async(
|
|
args=[recipe_cid, input_cids, input_hashes, analysis_cids]
|
|
).get(timeout=120)
|
|
|
|
if plan_result.get("status") != "completed":
|
|
return {
|
|
"status": "failed",
|
|
"phase": "planning",
|
|
"error": plan_result.get("error", "Unknown error"),
|
|
}
|
|
|
|
plan_cid = plan_result["plan_cid"]
|
|
|
|
# Phase 3: Execute
|
|
logger.info("[CID] Phase 3: Execution")
|
|
exec_result = execute_plan_from_cid.apply_async(
|
|
args=[plan_cid, input_cids]
|
|
).get(timeout=7200) # 2 hour timeout for execution
|
|
|
|
if exec_result.get("status") != "completed":
|
|
return {
|
|
"status": "failed",
|
|
"phase": "execution",
|
|
"error": exec_result.get("error", "Unknown error"),
|
|
}
|
|
|
|
output_cid = exec_result["output_cid"]
|
|
|
|
# Cache the run
|
|
get_state_manager().set_run_cid(run_id, output_cid)
|
|
|
|
logger.info(f"[CID] Run complete: {run_id[:16]}... → {output_cid}")
|
|
|
|
return {
|
|
"status": "completed",
|
|
"run_id": run_id,
|
|
"recipe_cid": recipe_cid,
|
|
"analysis_cids": analysis_cids,
|
|
"plan_cid": plan_cid,
|
|
"output_cid": output_cid,
|
|
"step_cids": exec_result.get("step_cids", {}),
|
|
}
|
|
|
|
|
|
@app.task(bind=True, name='tasks.run_from_local')
|
|
def run_from_local(
|
|
self,
|
|
recipe_path: str,
|
|
input_paths: Dict[str, str],
|
|
features: Optional[List[str]] = None,
|
|
) -> dict:
|
|
"""
|
|
Convenience task: register local files and run.
|
|
|
|
For bootstrapping - registers recipe and inputs on IPFS, then runs.
|
|
|
|
Args:
|
|
recipe_path: Local path to recipe YAML
|
|
input_paths: Mapping from input name to local file path
|
|
features: Optional list of features to analyze
|
|
|
|
Returns:
|
|
Dict with all CIDs including output
|
|
"""
|
|
import hashlib
|
|
|
|
# Register recipe
|
|
recipe_result = register_recipe_cid.apply_async(args=[recipe_path]).get(timeout=60)
|
|
if recipe_result.get("status") != "completed":
|
|
return {"status": "failed", "phase": "register_recipe", "error": recipe_result.get("error")}
|
|
|
|
recipe_cid = recipe_result["cid"]
|
|
|
|
# Register inputs
|
|
input_cids = {}
|
|
input_hashes = {}
|
|
|
|
for name, path in input_paths.items():
|
|
result = register_input_cid.apply_async(args=[path]).get(timeout=300)
|
|
if result.get("status") != "completed":
|
|
return {"status": "failed", "phase": "register_input", "input": name, "error": result.get("error")}
|
|
|
|
input_cids[name] = result["cid"]
|
|
input_hashes[name] = result["cid"]
|
|
|
|
# Run the pipeline
|
|
return run_recipe_cid.apply_async(
|
|
args=[recipe_cid, input_cids, input_hashes, features]
|
|
).get(timeout=7200)
|