From 25f7213741a0fe6d4151ce9b0a3ee3834eda988e Mon Sep 17 00:00:00 2001 From: gilesb Date: Sun, 11 Jan 2026 08:18:34 +0000 Subject: [PATCH] Add IPFS-primary orchestration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Complete pipeline with everything on IPFS: - register_input_cid / register_recipe_cid - generate_plan_cid (stores plan on IPFS) - execute_plan_from_cid (fetches plan from IPFS) - run_recipe_cid (full pipeline, returns output CID) - run_from_local (convenience: local files → IPFS → run) Co-Authored-By: Claude Opus 4.5 --- tasks/orchestrate_cid.py | 447 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 447 insertions(+) create mode 100644 tasks/orchestrate_cid.py diff --git a/tasks/orchestrate_cid.py b/tasks/orchestrate_cid.py new file mode 100644 index 0000000..da4c724 --- /dev/null +++ b/tasks/orchestrate_cid.py @@ -0,0 +1,447 @@ +""" +IPFS-primary orchestration. + +Everything on IPFS: +- Inputs (media files) +- Analysis results (JSON) +- Execution plans (JSON) +- Step outputs (media files) + +The entire pipeline just passes CIDs around. +""" + +import json +import logging +import os +import shutil +import tempfile +from pathlib import Path +from typing import Dict, List, Optional + +from celery import current_task, group + +import sys +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from celery_app import app +import ipfs_client + +# Redis for caching +import redis +REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/5") +_redis: Optional[redis.Redis] = None + +def get_redis() -> redis.Redis: + global _redis + if _redis is None: + _redis = redis.from_url(REDIS_URL, decode_responses=True) + return _redis + +# Import artdag modules +try: + from artdag.analysis import Analyzer, AnalysisResult + from artdag.planning import RecipePlanner, ExecutionPlan, Recipe + from artdag import nodes # Register executors +except ImportError: + Analyzer = None + AnalysisResult = None + RecipePlanner = None + ExecutionPlan = None + Recipe = None + +from .analyze_cid import analyze_input_cid +from .execute_cid import execute_step_cid + +logger = logging.getLogger(__name__) + +# Redis keys +PLAN_CACHE_KEY = "artdag:plan_cid" # hash: plan_id → plan CID +RECIPE_CACHE_KEY = "artdag:recipe_cid" # hash: recipe_hash → recipe CID +RUN_CACHE_KEY = "artdag:run_cid" # hash: run_id → output CID + + +def compute_run_id(recipe_cid: str, input_cids: Dict[str, str]) -> str: + """Compute deterministic run ID from recipe and inputs.""" + import hashlib + # Sort inputs for determinism + sorted_inputs = sorted(input_cids.items()) + data = f"{recipe_cid}:{sorted_inputs}" + return hashlib.sha3_256(data.encode()).hexdigest() + + +@app.task(bind=True, name='tasks.register_input_cid') +def register_input_cid( + self, + input_path: str, +) -> dict: + """ + Register an input file on IPFS. + + Args: + input_path: Local path to the input file + + Returns: + Dict with 'cid' and 'content_hash' + """ + import hashlib + + path = Path(input_path) + if not path.exists(): + return {"status": "failed", "error": f"File not found: {input_path}"} + + # Compute content hash + with open(path, "rb") as f: + content_hash = hashlib.sha3_256(f.read()).hexdigest() + + # Add to IPFS + cid = ipfs_client.add_file(path) + if not cid: + return {"status": "failed", "error": "Failed to add to IPFS"} + + logger.info(f"[CID] Registered input: {path.name} → {cid}") + + return { + "status": "completed", + "cid": cid, + "content_hash": content_hash, + "path": str(path), + } + + +@app.task(bind=True, name='tasks.register_recipe_cid') +def register_recipe_cid( + self, + recipe_path: str, +) -> dict: + """ + Register a recipe on IPFS. + + Args: + recipe_path: Local path to the recipe YAML file + + Returns: + Dict with 'cid' and 'recipe' (parsed) + """ + if Recipe is None: + raise ImportError("artdag.planning not available") + + path = Path(recipe_path) + if not path.exists(): + return {"status": "failed", "error": f"Recipe not found: {recipe_path}"} + + # Parse recipe + recipe = Recipe.from_file(path) + + # Store recipe YAML on IPFS + recipe_bytes = path.read_bytes() + cid = ipfs_client.add_bytes(recipe_bytes) + if not cid: + return {"status": "failed", "error": "Failed to add recipe to IPFS"} + + logger.info(f"[CID] Registered recipe: {recipe.name} → {cid}") + + return { + "status": "completed", + "cid": cid, + "name": recipe.name, + "version": recipe.version, + } + + +@app.task(bind=True, name='tasks.generate_plan_cid') +def generate_plan_cid( + self, + recipe_cid: str, + input_cids: Dict[str, str], + input_hashes: Dict[str, str], + analysis_cids: Optional[Dict[str, str]] = None, +) -> dict: + """ + Generate execution plan and store on IPFS. + + Args: + recipe_cid: IPFS CID of the recipe + input_cids: Mapping from input name to IPFS CID + input_hashes: Mapping from input name to content hash + analysis_cids: Optional mapping from input_hash to analysis CID + + Returns: + Dict with 'plan_cid' and 'plan_id' + """ + if RecipePlanner is None: + raise ImportError("artdag.planning not available") + + # Fetch recipe from IPFS + recipe_bytes = ipfs_client.get_bytes(recipe_cid) + if not recipe_bytes: + return {"status": "failed", "error": f"Failed to fetch recipe: {recipe_cid}"} + + # Parse recipe from YAML + import yaml + recipe_dict = yaml.safe_load(recipe_bytes.decode('utf-8')) + recipe = Recipe.from_dict(recipe_dict) + + # Fetch analysis results if provided + analysis = {} + if analysis_cids: + for input_hash, analysis_cid in analysis_cids.items(): + analysis_bytes = ipfs_client.get_bytes(analysis_cid) + if analysis_bytes: + analysis_dict = json.loads(analysis_bytes.decode('utf-8')) + analysis[input_hash] = AnalysisResult.from_dict(analysis_dict) + + # Generate plan + planner = RecipePlanner(use_tree_reduction=True) + plan = planner.plan( + recipe=recipe, + input_hashes=input_hashes, + analysis=analysis if analysis else None, + ) + + # Store plan on IPFS + plan_cid = ipfs_client.add_json(json.loads(plan.to_json())) + if not plan_cid: + return {"status": "failed", "error": "Failed to store plan on IPFS"} + + # Cache plan_id → plan_cid mapping + get_redis().hset(PLAN_CACHE_KEY, plan.plan_id, plan_cid) + + logger.info(f"[CID] Generated plan: {plan.plan_id[:16]}... → {plan_cid}") + + return { + "status": "completed", + "plan_cid": plan_cid, + "plan_id": plan.plan_id, + "steps": len(plan.steps), + } + + +@app.task(bind=True, name='tasks.execute_plan_from_cid') +def execute_plan_from_cid( + self, + plan_cid: str, + input_cids: Dict[str, str], +) -> dict: + """ + Execute a plan fetched from IPFS. + + Args: + plan_cid: IPFS CID of the execution plan + input_cids: Mapping from input name/step_id to IPFS CID + + Returns: + Dict with 'output_cid' and step results + """ + if ExecutionPlan is None: + raise ImportError("artdag.planning not available") + + # Fetch plan from IPFS + plan_bytes = ipfs_client.get_bytes(plan_cid) + if not plan_bytes: + return {"status": "failed", "error": f"Failed to fetch plan: {plan_cid}"} + + plan_json = plan_bytes.decode('utf-8') + plan = ExecutionPlan.from_json(plan_json) + + logger.info(f"[CID] Executing plan: {plan.plan_id[:16]}... ({len(plan.steps)} steps)") + + # CID results accumulate as steps complete + cid_results = dict(input_cids) + step_cids = {} + + steps_by_level = plan.get_steps_by_level() + + for level in sorted(steps_by_level.keys()): + steps = steps_by_level[level] + logger.info(f"[CID] Level {level}: {len(steps)} steps") + + # Build input CIDs for this level + level_input_cids = dict(cid_results) + level_input_cids.update(step_cids) + + # Dispatch steps in parallel + tasks = [ + execute_step_cid.s(step.to_json(), level_input_cids) + for step in steps + ] + + if len(tasks) == 1: + results = [tasks[0].apply_async().get(timeout=3600)] + else: + job = group(tasks) + results = job.apply_async().get(timeout=3600) + + # Collect output CIDs + for step, result in zip(steps, results): + if result.get("status") in ("completed", "cached", "completed_by_other"): + step_cids[step.step_id] = result["cid"] + else: + return { + "status": "failed", + "failed_step": step.step_id, + "error": result.get("error", "Unknown error"), + } + + # Get final output CID + output_step_id = plan.output_step or plan.steps[-1].step_id + output_cid = step_cids.get(output_step_id) + + logger.info(f"[CID] Plan complete: {output_cid}") + + return { + "status": "completed", + "plan_id": plan.plan_id, + "plan_cid": plan_cid, + "output_cid": output_cid, + "step_cids": step_cids, + } + + +@app.task(bind=True, name='tasks.run_recipe_cid') +def run_recipe_cid( + self, + recipe_cid: str, + input_cids: Dict[str, str], + input_hashes: Dict[str, str], + features: Optional[List[str]] = None, +) -> dict: + """ + Run complete pipeline: analyze → plan → execute. + + Everything on IPFS. Returns output CID. + + Args: + recipe_cid: IPFS CID of the recipe + input_cids: Mapping from input name to IPFS CID + input_hashes: Mapping from input name to content hash + features: Optional list of features to analyze + + Returns: + Dict with output_cid and all intermediate CIDs + """ + if features is None: + features = ["beats", "energy"] + + logger.info(f"[CID] Running recipe {recipe_cid} with {len(input_cids)} inputs") + + # Compute run ID for caching + run_id = compute_run_id(recipe_cid, input_cids) + + # Check if run is already cached + cached_output = get_redis().hget(RUN_CACHE_KEY, run_id) + if cached_output: + logger.info(f"[CID] Run cache hit: {run_id[:16]}... → {cached_output}") + return { + "status": "cached", + "run_id": run_id, + "output_cid": cached_output, + } + + # Phase 1: Analyze inputs + logger.info("[CID] Phase 1: Analysis") + analysis_cids = {} + + for input_name, input_cid in input_cids.items(): + input_hash = input_hashes.get(input_name) + if not input_hash: + continue + + result = analyze_input_cid.apply_async( + args=[input_cid, input_hash, features] + ).get(timeout=600) + + if result.get("status") in ("completed", "cached"): + analysis_cids[input_hash] = result["analysis_cid"] + else: + logger.warning(f"[CID] Analysis failed for {input_name}: {result.get('error')}") + + # Phase 2: Generate plan + logger.info("[CID] Phase 2: Planning") + plan_result = generate_plan_cid.apply_async( + args=[recipe_cid, input_cids, input_hashes, analysis_cids] + ).get(timeout=120) + + if plan_result.get("status") != "completed": + return { + "status": "failed", + "phase": "planning", + "error": plan_result.get("error", "Unknown error"), + } + + plan_cid = plan_result["plan_cid"] + + # Phase 3: Execute + logger.info("[CID] Phase 3: Execution") + exec_result = execute_plan_from_cid.apply_async( + args=[plan_cid, input_cids] + ).get(timeout=7200) # 2 hour timeout for execution + + if exec_result.get("status") != "completed": + return { + "status": "failed", + "phase": "execution", + "error": exec_result.get("error", "Unknown error"), + } + + output_cid = exec_result["output_cid"] + + # Cache the run + get_redis().hset(RUN_CACHE_KEY, run_id, output_cid) + + logger.info(f"[CID] Run complete: {run_id[:16]}... → {output_cid}") + + return { + "status": "completed", + "run_id": run_id, + "recipe_cid": recipe_cid, + "analysis_cids": analysis_cids, + "plan_cid": plan_cid, + "output_cid": output_cid, + "step_cids": exec_result.get("step_cids", {}), + } + + +@app.task(bind=True, name='tasks.run_from_local') +def run_from_local( + self, + recipe_path: str, + input_paths: Dict[str, str], + features: Optional[List[str]] = None, +) -> dict: + """ + Convenience task: register local files and run. + + For bootstrapping - registers recipe and inputs on IPFS, then runs. + + Args: + recipe_path: Local path to recipe YAML + input_paths: Mapping from input name to local file path + features: Optional list of features to analyze + + Returns: + Dict with all CIDs including output + """ + import hashlib + + # Register recipe + recipe_result = register_recipe_cid.apply_async(args=[recipe_path]).get(timeout=60) + if recipe_result.get("status") != "completed": + return {"status": "failed", "phase": "register_recipe", "error": recipe_result.get("error")} + + recipe_cid = recipe_result["cid"] + + # Register inputs + input_cids = {} + input_hashes = {} + + for name, path in input_paths.items(): + result = register_input_cid.apply_async(args=[path]).get(timeout=300) + if result.get("status") != "completed": + return {"status": "failed", "phase": "register_input", "input": name, "error": result.get("error")} + + input_cids[name] = result["cid"] + input_hashes[name] = result["content_hash"] + + # Run the pipeline + return run_recipe_cid.apply_async( + args=[recipe_cid, input_cids, input_hashes, features] + ).get(timeout=7200)