Files
celery/tasks/orchestrate_cid.py
gilesb ca8bfd8705 Add hybrid state manager for distributed L1 coordination
Implements HybridStateManager providing fast local Redis operations
with background IPNS sync for eventual consistency across L1 nodes.

- hybrid_state.py: Centralized state management (cache, claims, analysis, plans, runs)
- Updated execute_cid.py, analyze_cid.py, orchestrate_cid.py to use state manager
- Background IPNS sync (configurable interval, disabled by default)
- Atomic claiming with Redis SETNX for preventing duplicate work

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-11 09:36:14 +00:00

435 lines
12 KiB
Python

"""
IPFS-primary orchestration.
Everything on IPFS:
- Inputs (media files)
- Analysis results (JSON)
- Execution plans (JSON)
- Step outputs (media files)
The entire pipeline just passes CIDs around.
Uses HybridStateManager for:
- Fast local Redis operations
- Background IPNS sync with other L1 nodes
"""
import json
import logging
import os
from pathlib import Path
from typing import Dict, List, Optional
from celery import group
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from celery_app import app
import ipfs_client
from hybrid_state import get_state_manager
# Import artdag modules
try:
from artdag.analysis import Analyzer, AnalysisResult
from artdag.planning import RecipePlanner, ExecutionPlan, Recipe
from artdag import nodes # Register executors
except ImportError:
Analyzer = None
AnalysisResult = None
RecipePlanner = None
ExecutionPlan = None
Recipe = None
from .analyze_cid import analyze_input_cid
from .execute_cid import execute_step_cid
logger = logging.getLogger(__name__)
def compute_run_id(recipe_cid: str, input_cids: Dict[str, str]) -> str:
"""Compute deterministic run ID from recipe and inputs."""
import hashlib
# Sort inputs for determinism
sorted_inputs = sorted(input_cids.items())
data = f"{recipe_cid}:{sorted_inputs}"
return hashlib.sha3_256(data.encode()).hexdigest()
@app.task(bind=True, name='tasks.register_input_cid')
def register_input_cid(
self,
input_path: str,
) -> dict:
"""
Register an input file on IPFS.
Args:
input_path: Local path to the input file
Returns:
Dict with 'cid' and 'content_hash'
"""
import hashlib
path = Path(input_path)
if not path.exists():
return {"status": "failed", "error": f"File not found: {input_path}"}
# Compute content hash
with open(path, "rb") as f:
content_hash = hashlib.sha3_256(f.read()).hexdigest()
# Add to IPFS
cid = ipfs_client.add_file(path)
if not cid:
return {"status": "failed", "error": "Failed to add to IPFS"}
logger.info(f"[CID] Registered input: {path.name}{cid}")
return {
"status": "completed",
"cid": cid,
"content_hash": content_hash,
"path": str(path),
}
@app.task(bind=True, name='tasks.register_recipe_cid')
def register_recipe_cid(
self,
recipe_path: str,
) -> dict:
"""
Register a recipe on IPFS.
Args:
recipe_path: Local path to the recipe YAML file
Returns:
Dict with 'cid' and 'recipe' (parsed)
"""
if Recipe is None:
raise ImportError("artdag.planning not available")
path = Path(recipe_path)
if not path.exists():
return {"status": "failed", "error": f"Recipe not found: {recipe_path}"}
# Parse recipe
recipe = Recipe.from_file(path)
# Store recipe YAML on IPFS
recipe_bytes = path.read_bytes()
cid = ipfs_client.add_bytes(recipe_bytes)
if not cid:
return {"status": "failed", "error": "Failed to add recipe to IPFS"}
logger.info(f"[CID] Registered recipe: {recipe.name}{cid}")
return {
"status": "completed",
"cid": cid,
"name": recipe.name,
"version": recipe.version,
}
@app.task(bind=True, name='tasks.generate_plan_cid')
def generate_plan_cid(
self,
recipe_cid: str,
input_cids: Dict[str, str],
input_hashes: Dict[str, str],
analysis_cids: Optional[Dict[str, str]] = None,
) -> dict:
"""
Generate execution plan and store on IPFS.
Args:
recipe_cid: IPFS CID of the recipe
input_cids: Mapping from input name to IPFS CID
input_hashes: Mapping from input name to content hash
analysis_cids: Optional mapping from input_hash to analysis CID
Returns:
Dict with 'plan_cid' and 'plan_id'
"""
if RecipePlanner is None:
raise ImportError("artdag.planning not available")
# Fetch recipe from IPFS
recipe_bytes = ipfs_client.get_bytes(recipe_cid)
if not recipe_bytes:
return {"status": "failed", "error": f"Failed to fetch recipe: {recipe_cid}"}
# Parse recipe from YAML
import yaml
recipe_dict = yaml.safe_load(recipe_bytes.decode('utf-8'))
recipe = Recipe.from_dict(recipe_dict)
# Fetch analysis results if provided
analysis = {}
if analysis_cids:
for input_hash, analysis_cid in analysis_cids.items():
analysis_bytes = ipfs_client.get_bytes(analysis_cid)
if analysis_bytes:
analysis_dict = json.loads(analysis_bytes.decode('utf-8'))
analysis[input_hash] = AnalysisResult.from_dict(analysis_dict)
# Generate plan
planner = RecipePlanner(use_tree_reduction=True)
plan = planner.plan(
recipe=recipe,
input_hashes=input_hashes,
analysis=analysis if analysis else None,
)
# Store plan on IPFS
plan_cid = ipfs_client.add_json(json.loads(plan.to_json()))
if not plan_cid:
return {"status": "failed", "error": "Failed to store plan on IPFS"}
# Cache plan_id → plan_cid mapping
get_state_manager().set_plan_cid(plan.plan_id, plan_cid)
logger.info(f"[CID] Generated plan: {plan.plan_id[:16]}... → {plan_cid}")
return {
"status": "completed",
"plan_cid": plan_cid,
"plan_id": plan.plan_id,
"steps": len(plan.steps),
}
@app.task(bind=True, name='tasks.execute_plan_from_cid')
def execute_plan_from_cid(
self,
plan_cid: str,
input_cids: Dict[str, str],
) -> dict:
"""
Execute a plan fetched from IPFS.
Args:
plan_cid: IPFS CID of the execution plan
input_cids: Mapping from input name/step_id to IPFS CID
Returns:
Dict with 'output_cid' and step results
"""
if ExecutionPlan is None:
raise ImportError("artdag.planning not available")
# Fetch plan from IPFS
plan_bytes = ipfs_client.get_bytes(plan_cid)
if not plan_bytes:
return {"status": "failed", "error": f"Failed to fetch plan: {plan_cid}"}
plan_json = plan_bytes.decode('utf-8')
plan = ExecutionPlan.from_json(plan_json)
logger.info(f"[CID] Executing plan: {plan.plan_id[:16]}... ({len(plan.steps)} steps)")
# CID results accumulate as steps complete
cid_results = dict(input_cids)
step_cids = {}
steps_by_level = plan.get_steps_by_level()
for level in sorted(steps_by_level.keys()):
steps = steps_by_level[level]
logger.info(f"[CID] Level {level}: {len(steps)} steps")
# Build input CIDs for this level
level_input_cids = dict(cid_results)
level_input_cids.update(step_cids)
# Dispatch steps in parallel
tasks = [
execute_step_cid.s(step.to_json(), level_input_cids)
for step in steps
]
if len(tasks) == 1:
results = [tasks[0].apply_async().get(timeout=3600)]
else:
job = group(tasks)
results = job.apply_async().get(timeout=3600)
# Collect output CIDs
for step, result in zip(steps, results):
if result.get("status") in ("completed", "cached", "completed_by_other"):
step_cids[step.step_id] = result["cid"]
else:
return {
"status": "failed",
"failed_step": step.step_id,
"error": result.get("error", "Unknown error"),
}
# Get final output CID
output_step_id = plan.output_step or plan.steps[-1].step_id
output_cid = step_cids.get(output_step_id)
logger.info(f"[CID] Plan complete: {output_cid}")
return {
"status": "completed",
"plan_id": plan.plan_id,
"plan_cid": plan_cid,
"output_cid": output_cid,
"step_cids": step_cids,
}
@app.task(bind=True, name='tasks.run_recipe_cid')
def run_recipe_cid(
self,
recipe_cid: str,
input_cids: Dict[str, str],
input_hashes: Dict[str, str],
features: Optional[List[str]] = None,
) -> dict:
"""
Run complete pipeline: analyze → plan → execute.
Everything on IPFS. Returns output CID.
Args:
recipe_cid: IPFS CID of the recipe
input_cids: Mapping from input name to IPFS CID
input_hashes: Mapping from input name to content hash
features: Optional list of features to analyze
Returns:
Dict with output_cid and all intermediate CIDs
"""
if features is None:
features = ["beats", "energy"]
logger.info(f"[CID] Running recipe {recipe_cid} with {len(input_cids)} inputs")
# Compute run ID for caching
run_id = compute_run_id(recipe_cid, input_cids)
# Check if run is already cached
cached_output = get_state_manager().get_run_cid(run_id)
if cached_output:
logger.info(f"[CID] Run cache hit: {run_id[:16]}... → {cached_output}")
return {
"status": "cached",
"run_id": run_id,
"output_cid": cached_output,
}
# Phase 1: Analyze inputs
logger.info("[CID] Phase 1: Analysis")
analysis_cids = {}
for input_name, input_cid in input_cids.items():
input_hash = input_hashes.get(input_name)
if not input_hash:
continue
result = analyze_input_cid.apply_async(
args=[input_cid, input_hash, features]
).get(timeout=600)
if result.get("status") in ("completed", "cached"):
analysis_cids[input_hash] = result["analysis_cid"]
else:
logger.warning(f"[CID] Analysis failed for {input_name}: {result.get('error')}")
# Phase 2: Generate plan
logger.info("[CID] Phase 2: Planning")
plan_result = generate_plan_cid.apply_async(
args=[recipe_cid, input_cids, input_hashes, analysis_cids]
).get(timeout=120)
if plan_result.get("status") != "completed":
return {
"status": "failed",
"phase": "planning",
"error": plan_result.get("error", "Unknown error"),
}
plan_cid = plan_result["plan_cid"]
# Phase 3: Execute
logger.info("[CID] Phase 3: Execution")
exec_result = execute_plan_from_cid.apply_async(
args=[plan_cid, input_cids]
).get(timeout=7200) # 2 hour timeout for execution
if exec_result.get("status") != "completed":
return {
"status": "failed",
"phase": "execution",
"error": exec_result.get("error", "Unknown error"),
}
output_cid = exec_result["output_cid"]
# Cache the run
get_state_manager().set_run_cid(run_id, output_cid)
logger.info(f"[CID] Run complete: {run_id[:16]}... → {output_cid}")
return {
"status": "completed",
"run_id": run_id,
"recipe_cid": recipe_cid,
"analysis_cids": analysis_cids,
"plan_cid": plan_cid,
"output_cid": output_cid,
"step_cids": exec_result.get("step_cids", {}),
}
@app.task(bind=True, name='tasks.run_from_local')
def run_from_local(
self,
recipe_path: str,
input_paths: Dict[str, str],
features: Optional[List[str]] = None,
) -> dict:
"""
Convenience task: register local files and run.
For bootstrapping - registers recipe and inputs on IPFS, then runs.
Args:
recipe_path: Local path to recipe YAML
input_paths: Mapping from input name to local file path
features: Optional list of features to analyze
Returns:
Dict with all CIDs including output
"""
import hashlib
# Register recipe
recipe_result = register_recipe_cid.apply_async(args=[recipe_path]).get(timeout=60)
if recipe_result.get("status") != "completed":
return {"status": "failed", "phase": "register_recipe", "error": recipe_result.get("error")}
recipe_cid = recipe_result["cid"]
# Register inputs
input_cids = {}
input_hashes = {}
for name, path in input_paths.items():
result = register_input_cid.apply_async(args=[path]).get(timeout=300)
if result.get("status") != "completed":
return {"status": "failed", "phase": "register_input", "input": name, "error": result.get("error")}
input_cids[name] = result["cid"]
input_hashes[name] = result["content_hash"]
# Run the pipeline
return run_recipe_cid.apply_async(
args=[recipe_cid, input_cids, input_hashes, features]
).get(timeout=7200)