""" IPFS-primary orchestration. Everything on IPFS: - Inputs (media files) - Analysis results (JSON) - Execution plans (JSON) - Step outputs (media files) The entire pipeline just passes CIDs around. Uses HybridStateManager for: - Fast local Redis operations - Background IPNS sync with other L1 nodes """ import json import logging import os from pathlib import Path from typing import Dict, List, Optional from celery import group import sys sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from celery_app import app import ipfs_client from hybrid_state import get_state_manager # Import artdag modules try: from artdag.analysis import Analyzer, AnalysisResult from artdag.planning import RecipePlanner, ExecutionPlan, Recipe from artdag import nodes # Register executors except ImportError: Analyzer = None AnalysisResult = None RecipePlanner = None ExecutionPlan = None Recipe = None from .analyze_cid import analyze_input_cid from .execute_cid import execute_step_cid logger = logging.getLogger(__name__) def compute_run_id(recipe_cid: str, input_cids: Dict[str, str]) -> str: """Compute deterministic run ID from recipe and inputs.""" import hashlib # Sort inputs for determinism sorted_inputs = sorted(input_cids.items()) data = f"{recipe_cid}:{sorted_inputs}" return hashlib.sha3_256(data.encode()).hexdigest() @app.task(bind=True, name='tasks.register_input_cid') def register_input_cid( self, input_path: str, ) -> dict: """ Register an input file on IPFS. Args: input_path: Local path to the input file Returns: Dict with 'cid' and 'content_hash' """ import hashlib path = Path(input_path) if not path.exists(): return {"status": "failed", "error": f"File not found: {input_path}"} # Compute content hash with open(path, "rb") as f: content_hash = hashlib.sha3_256(f.read()).hexdigest() # Add to IPFS cid = ipfs_client.add_file(path) if not cid: return {"status": "failed", "error": "Failed to add to IPFS"} logger.info(f"[CID] Registered input: {path.name} → {cid}") return { "status": "completed", "cid": cid, "content_hash": content_hash, "path": str(path), } @app.task(bind=True, name='tasks.register_recipe_cid') def register_recipe_cid( self, recipe_path: str, ) -> dict: """ Register a recipe on IPFS. Args: recipe_path: Local path to the recipe YAML file Returns: Dict with 'cid' and 'recipe' (parsed) """ if Recipe is None: raise ImportError("artdag.planning not available") path = Path(recipe_path) if not path.exists(): return {"status": "failed", "error": f"Recipe not found: {recipe_path}"} # Parse recipe recipe = Recipe.from_file(path) # Store recipe YAML on IPFS recipe_bytes = path.read_bytes() cid = ipfs_client.add_bytes(recipe_bytes) if not cid: return {"status": "failed", "error": "Failed to add recipe to IPFS"} logger.info(f"[CID] Registered recipe: {recipe.name} → {cid}") return { "status": "completed", "cid": cid, "name": recipe.name, "version": recipe.version, } @app.task(bind=True, name='tasks.generate_plan_cid') def generate_plan_cid( self, recipe_cid: str, input_cids: Dict[str, str], input_hashes: Dict[str, str], analysis_cids: Optional[Dict[str, str]] = None, ) -> dict: """ Generate execution plan and store on IPFS. Args: recipe_cid: IPFS CID of the recipe input_cids: Mapping from input name to IPFS CID input_hashes: Mapping from input name to content hash analysis_cids: Optional mapping from input_hash to analysis CID Returns: Dict with 'plan_cid' and 'plan_id' """ if RecipePlanner is None: raise ImportError("artdag.planning not available") # Fetch recipe from IPFS recipe_bytes = ipfs_client.get_bytes(recipe_cid) if not recipe_bytes: return {"status": "failed", "error": f"Failed to fetch recipe: {recipe_cid}"} # Parse recipe from YAML import yaml recipe_dict = yaml.safe_load(recipe_bytes.decode('utf-8')) recipe = Recipe.from_dict(recipe_dict) # Fetch analysis results if provided analysis = {} if analysis_cids: for input_hash, analysis_cid in analysis_cids.items(): analysis_bytes = ipfs_client.get_bytes(analysis_cid) if analysis_bytes: analysis_dict = json.loads(analysis_bytes.decode('utf-8')) analysis[input_hash] = AnalysisResult.from_dict(analysis_dict) # Generate plan planner = RecipePlanner(use_tree_reduction=True) plan = planner.plan( recipe=recipe, input_hashes=input_hashes, analysis=analysis if analysis else None, ) # Store plan on IPFS plan_cid = ipfs_client.add_json(json.loads(plan.to_json())) if not plan_cid: return {"status": "failed", "error": "Failed to store plan on IPFS"} # Cache plan_id → plan_cid mapping get_state_manager().set_plan_cid(plan.plan_id, plan_cid) logger.info(f"[CID] Generated plan: {plan.plan_id[:16]}... → {plan_cid}") return { "status": "completed", "plan_cid": plan_cid, "plan_id": plan.plan_id, "steps": len(plan.steps), } @app.task(bind=True, name='tasks.execute_plan_from_cid') def execute_plan_from_cid( self, plan_cid: str, input_cids: Dict[str, str], ) -> dict: """ Execute a plan fetched from IPFS. Args: plan_cid: IPFS CID of the execution plan input_cids: Mapping from input name/step_id to IPFS CID Returns: Dict with 'output_cid' and step results """ if ExecutionPlan is None: raise ImportError("artdag.planning not available") # Fetch plan from IPFS plan_bytes = ipfs_client.get_bytes(plan_cid) if not plan_bytes: return {"status": "failed", "error": f"Failed to fetch plan: {plan_cid}"} plan_json = plan_bytes.decode('utf-8') plan = ExecutionPlan.from_json(plan_json) logger.info(f"[CID] Executing plan: {plan.plan_id[:16]}... ({len(plan.steps)} steps)") # CID results accumulate as steps complete cid_results = dict(input_cids) step_cids = {} steps_by_level = plan.get_steps_by_level() for level in sorted(steps_by_level.keys()): steps = steps_by_level[level] logger.info(f"[CID] Level {level}: {len(steps)} steps") # Build input CIDs for this level level_input_cids = dict(cid_results) level_input_cids.update(step_cids) # Dispatch steps in parallel tasks = [ execute_step_cid.s(step.to_json(), level_input_cids) for step in steps ] if len(tasks) == 1: results = [tasks[0].apply_async().get(timeout=3600)] else: job = group(tasks) results = job.apply_async().get(timeout=3600) # Collect output CIDs for step, result in zip(steps, results): if result.get("status") in ("completed", "cached", "completed_by_other"): step_cids[step.step_id] = result["cid"] else: return { "status": "failed", "failed_step": step.step_id, "error": result.get("error", "Unknown error"), } # Get final output CID output_step_id = plan.output_step or plan.steps[-1].step_id output_cid = step_cids.get(output_step_id) logger.info(f"[CID] Plan complete: {output_cid}") return { "status": "completed", "plan_id": plan.plan_id, "plan_cid": plan_cid, "output_cid": output_cid, "step_cids": step_cids, } @app.task(bind=True, name='tasks.run_recipe_cid') def run_recipe_cid( self, recipe_cid: str, input_cids: Dict[str, str], input_hashes: Dict[str, str], features: Optional[List[str]] = None, ) -> dict: """ Run complete pipeline: analyze → plan → execute. Everything on IPFS. Returns output CID. Args: recipe_cid: IPFS CID of the recipe input_cids: Mapping from input name to IPFS CID input_hashes: Mapping from input name to content hash features: Optional list of features to analyze Returns: Dict with output_cid and all intermediate CIDs """ if features is None: features = ["beats", "energy"] logger.info(f"[CID] Running recipe {recipe_cid} with {len(input_cids)} inputs") # Compute run ID for caching run_id = compute_run_id(recipe_cid, input_cids) # Check if run is already cached cached_output = get_state_manager().get_run_cid(run_id) if cached_output: logger.info(f"[CID] Run cache hit: {run_id[:16]}... → {cached_output}") return { "status": "cached", "run_id": run_id, "output_cid": cached_output, } # Phase 1: Analyze inputs logger.info("[CID] Phase 1: Analysis") analysis_cids = {} for input_name, input_cid in input_cids.items(): input_hash = input_hashes.get(input_name) if not input_hash: continue result = analyze_input_cid.apply_async( args=[input_cid, input_hash, features] ).get(timeout=600) if result.get("status") in ("completed", "cached"): analysis_cids[input_hash] = result["analysis_cid"] else: logger.warning(f"[CID] Analysis failed for {input_name}: {result.get('error')}") # Phase 2: Generate plan logger.info("[CID] Phase 2: Planning") plan_result = generate_plan_cid.apply_async( args=[recipe_cid, input_cids, input_hashes, analysis_cids] ).get(timeout=120) if plan_result.get("status") != "completed": return { "status": "failed", "phase": "planning", "error": plan_result.get("error", "Unknown error"), } plan_cid = plan_result["plan_cid"] # Phase 3: Execute logger.info("[CID] Phase 3: Execution") exec_result = execute_plan_from_cid.apply_async( args=[plan_cid, input_cids] ).get(timeout=7200) # 2 hour timeout for execution if exec_result.get("status") != "completed": return { "status": "failed", "phase": "execution", "error": exec_result.get("error", "Unknown error"), } output_cid = exec_result["output_cid"] # Cache the run get_state_manager().set_run_cid(run_id, output_cid) logger.info(f"[CID] Run complete: {run_id[:16]}... → {output_cid}") return { "status": "completed", "run_id": run_id, "recipe_cid": recipe_cid, "analysis_cids": analysis_cids, "plan_cid": plan_cid, "output_cid": output_cid, "step_cids": exec_result.get("step_cids", {}), } @app.task(bind=True, name='tasks.run_from_local') def run_from_local( self, recipe_path: str, input_paths: Dict[str, str], features: Optional[List[str]] = None, ) -> dict: """ Convenience task: register local files and run. For bootstrapping - registers recipe and inputs on IPFS, then runs. Args: recipe_path: Local path to recipe YAML input_paths: Mapping from input name to local file path features: Optional list of features to analyze Returns: Dict with all CIDs including output """ import hashlib # Register recipe recipe_result = register_recipe_cid.apply_async(args=[recipe_path]).get(timeout=60) if recipe_result.get("status") != "completed": return {"status": "failed", "phase": "register_recipe", "error": recipe_result.get("error")} recipe_cid = recipe_result["cid"] # Register inputs input_cids = {} input_hashes = {} for name, path in input_paths.items(): result = register_input_cid.apply_async(args=[path]).get(timeout=300) if result.get("status") != "completed": return {"status": "failed", "phase": "register_input", "input": name, "error": result.get("error")} input_cids[name] = result["cid"] input_hashes[name] = result["content_hash"] # Run the pipeline return run_recipe_cid.apply_async( args=[recipe_cid, input_cids, input_hashes, features] ).get(timeout=7200)