Fix registry lookups to use cid, remove dead legacy code

- Fix all registry lookups to use "cid" instead of "hash" key
  - app/routers/recipes.py: asset and effect resolution
  - tasks/execute_sexp.py: effect config lookups
  - server_legacy.py references (now deleted)
- Prefer IPFS CID over local hash in cache operations
  - cache_service.py: import_from_ipfs, upload_content
  - orchestrate.py: plan caching
  - legacy_tasks.py: node hash tracking

Remove ~7800 lines of dead code:
- server_legacy.py: replaced by modular app/ structure
- tasks/*_cid.py: unused refactoring only imported by server_legacy

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-12 09:09:40 +00:00
parent 8e7228fc38
commit 60344b34f4
9 changed files with 12 additions and 7783 deletions

View File

@@ -1,192 +0,0 @@
"""
IPFS-primary analysis tasks.
Fetches inputs from IPFS, stores analysis results on IPFS.
Uses HybridStateManager for:
- Fast local Redis operations
- Background IPNS sync with other L1 nodes
"""
import json
import logging
import os
import shutil
import tempfile
from pathlib import Path
from typing import Dict, List, Optional
from celery import current_task
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from celery_app import app
import ipfs_client
from hybrid_state import get_state_manager
# Import artdag analysis module
try:
from artdag.analysis import Analyzer, AnalysisResult
except ImportError:
Analyzer = None
AnalysisResult = None
logger = logging.getLogger(__name__)
def get_cached_analysis_cid(input_hash: str, features: List[str]) -> Optional[str]:
"""Check if analysis is already cached."""
return get_state_manager().get_analysis_cid(input_hash, features)
def set_cached_analysis_cid(input_hash: str, features: List[str], cid: str) -> None:
"""Store analysis CID in cache."""
get_state_manager().set_analysis_cid(input_hash, features, cid)
@app.task(bind=True, name='tasks.analyze_input_cid')
def analyze_input_cid(
self,
input_cid: str,
input_hash: str,
features: List[str],
) -> dict:
"""
Analyze an input file using IPFS-primary architecture.
Args:
input_cid: IPFS CID of the input file
input_hash: Content hash of the input (for cache key)
features: List of features to extract
Returns:
Dict with 'analysis_cid' and 'result'
"""
if Analyzer is None:
raise ImportError("artdag.analysis not available")
logger.info(f"[CID] Analyzing {input_hash[:16]}... for features: {features}")
# Check cache first
cached_cid = get_cached_analysis_cid(input_hash, features)
if cached_cid:
logger.info(f"[CID] Analysis cache hit: {cached_cid}")
# Fetch the cached analysis from IPFS
analysis_bytes = ipfs_client.get_bytes(cached_cid)
if analysis_bytes:
result_dict = json.loads(analysis_bytes.decode('utf-8'))
return {
"status": "cached",
"input_hash": input_hash,
"analysis_cid": cached_cid,
"result": result_dict,
}
# Create temp workspace
work_dir = Path(tempfile.mkdtemp(prefix="artdag_analysis_"))
try:
# Fetch input from IPFS
input_path = work_dir / f"input_{input_cid[:16]}.mkv"
logger.info(f"[CID] Fetching input: {input_cid}")
if not ipfs_client.get_file(input_cid, input_path):
raise RuntimeError(f"Failed to fetch input from IPFS: {input_cid}")
# Run analysis (no local cache, we'll store on IPFS)
analyzer = Analyzer(cache_dir=None)
result = analyzer.analyze(
input_hash=input_hash,
features=features,
input_path=input_path,
)
result_dict = result.to_dict()
# Store analysis result on IPFS
analysis_cid = ipfs_client.add_json(result_dict)
if not analysis_cid:
raise RuntimeError("Failed to store analysis on IPFS")
logger.info(f"[CID] Analysis complete: {input_hash[:16]}... → {analysis_cid}")
# Cache the mapping
set_cached_analysis_cid(input_hash, features, analysis_cid)
return {
"status": "completed",
"input_hash": input_hash,
"analysis_cid": analysis_cid,
"result": result_dict,
}
except Exception as e:
logger.error(f"[CID] Analysis failed for {input_hash}: {e}")
return {
"status": "failed",
"input_hash": input_hash,
"error": str(e),
}
finally:
# Cleanup
shutil.rmtree(work_dir, ignore_errors=True)
@app.task(bind=True, name='tasks.analyze_inputs_cid')
def analyze_inputs_cid(
self,
input_cids: Dict[str, str],
features: List[str],
) -> dict:
"""
Analyze multiple inputs using IPFS-primary architecture.
Args:
input_cids: Dict mapping input_hash to IPFS CID
features: List of features to extract from all inputs
Returns:
Dict with analysis CIDs and results for all inputs
"""
from celery import group
logger.info(f"[CID] Analyzing {len(input_cids)} inputs for features: {features}")
# Dispatch analysis tasks in parallel
tasks = [
analyze_input_cid.s(cid, input_hash, features)
for input_hash, cid in input_cids.items()
]
if len(tasks) == 1:
results = [tasks[0].apply_async().get(timeout=600)]
else:
job = group(tasks)
results = job.apply_async().get(timeout=600)
# Collect results
analysis_cids = {}
analysis_results = {}
errors = []
for result in results:
input_hash = result.get("input_hash")
if result.get("status") in ("completed", "cached"):
analysis_cids[input_hash] = result["analysis_cid"]
analysis_results[input_hash] = result["result"]
else:
errors.append({
"input_hash": input_hash,
"error": result.get("error", "Unknown error"),
})
return {
"status": "completed" if not errors else "partial",
"analysis_cids": analysis_cids,
"results": analysis_results,
"errors": errors,
"total": len(input_cids),
"successful": len(analysis_cids),
}

View File

@@ -1,299 +0,0 @@
"""
Simplified step execution with IPFS-primary architecture.
Steps receive CIDs, produce CIDs. No file paths cross machine boundaries.
IPFS nodes form a distributed cache automatically.
Uses HybridStateManager for:
- Fast local Redis operations
- Background IPNS sync with other L1 nodes
"""
import logging
import os
import shutil
import socket
import tempfile
from pathlib import Path
from typing import Dict, Optional
from celery import current_task
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from celery_app import app
import ipfs_client
from hybrid_state import get_state_manager
# Import artdag
try:
from artdag import NodeType
from artdag.executor import get_executor
from artdag.planning import ExecutionStep
from artdag import nodes # Register executors
except ImportError:
NodeType = None
get_executor = None
ExecutionStep = None
logger = logging.getLogger(__name__)
def get_worker_id() -> str:
"""Get unique worker identifier."""
return f"{socket.gethostname()}:{os.getpid()}"
def get_cached_cid(cache_id: str) -> Optional[str]:
"""Check if cache_id has a known CID."""
return get_state_manager().get_cached_cid(cache_id)
def set_cached_cid(cache_id: str, cid: str) -> None:
"""Store cache_id → CID mapping."""
get_state_manager().set_cached_cid(cache_id, cid)
def try_claim(cache_id: str, worker_id: str, ttl: int = 300) -> bool:
"""Try to claim a cache_id for execution. Returns True if claimed."""
return get_state_manager().try_claim(cache_id, worker_id, ttl)
def release_claim(cache_id: str) -> None:
"""Release a claim."""
get_state_manager().release_claim(cache_id)
def wait_for_cid(cache_id: str, timeout: int = 600, poll_interval: float = 0.5) -> Optional[str]:
"""Wait for another worker to produce a CID for cache_id."""
import time
start = time.time()
while time.time() - start < timeout:
cid = get_cached_cid(cache_id)
if cid:
return cid
time.sleep(poll_interval)
return None
def fetch_from_ipfs(cid: str, dest_dir: Path) -> Path:
"""Fetch a CID from IPFS to a local temp file."""
dest_path = dest_dir / f"{cid}.mkv"
if not ipfs_client.get_file(cid, dest_path):
raise RuntimeError(f"Failed to fetch CID from IPFS: {cid}")
return dest_path
@app.task(bind=True, name='tasks.execute_step_cid')
def execute_step_cid(
self,
step_json: str,
input_cids: Dict[str, str],
) -> Dict:
"""
Execute a step using IPFS-primary architecture.
Args:
step_json: JSON-serialized ExecutionStep
input_cids: Mapping from input step_id to their IPFS CID
Returns:
Dict with 'cid' (output CID) and 'status'
"""
if ExecutionStep is None:
raise ImportError("artdag not available")
step = ExecutionStep.from_json(step_json)
worker_id = get_worker_id()
logger.info(f"[CID] Executing {step.step_id} ({step.node_type})")
# 1. Check if already computed
existing_cid = get_cached_cid(step.cache_id)
if existing_cid:
logger.info(f"[CID] Cache hit: {step.cache_id[:16]}... → {existing_cid}")
return {
"status": "cached",
"step_id": step.step_id,
"cache_id": step.cache_id,
"cid": existing_cid,
}
# 2. Try to claim
if not try_claim(step.cache_id, worker_id):
logger.info(f"[CID] Claimed by another worker, waiting...")
cid = wait_for_cid(step.cache_id)
if cid:
return {
"status": "completed_by_other",
"step_id": step.step_id,
"cache_id": step.cache_id,
"cid": cid,
}
return {
"status": "timeout",
"step_id": step.step_id,
"cache_id": step.cache_id,
"error": "Timeout waiting for other worker",
}
# 3. We have the claim - execute
try:
# Handle SOURCE nodes
if step.node_type == "SOURCE":
# SOURCE nodes should have their CID in input_cids
source_name = step.config.get("name") or step.step_id
cid = input_cids.get(source_name) or input_cids.get(step.step_id)
if not cid:
raise ValueError(f"SOURCE missing input CID: {source_name}")
set_cached_cid(step.cache_id, cid)
return {
"status": "completed",
"step_id": step.step_id,
"cache_id": step.cache_id,
"cid": cid,
}
# Get executor
try:
node_type = NodeType[step.node_type]
except KeyError:
node_type = step.node_type
executor = get_executor(node_type)
if executor is None:
raise ValueError(f"No executor for: {step.node_type}")
# Create temp workspace
work_dir = Path(tempfile.mkdtemp(prefix="artdag_"))
try:
# Fetch inputs from IPFS
input_paths = []
for i, input_step_id in enumerate(step.input_steps):
input_cid = input_cids.get(input_step_id)
if not input_cid:
raise ValueError(f"Missing input CID for: {input_step_id}")
input_path = work_dir / f"input_{i}_{input_cid[:16]}.mkv"
logger.info(f"[CID] Fetching input {i}: {input_cid}")
if not ipfs_client.get_file(input_cid, input_path):
raise RuntimeError(f"Failed to fetch: {input_cid}")
input_paths.append(input_path)
# Execute
output_path = work_dir / f"output_{step.cache_id[:16]}.mkv"
logger.info(f"[CID] Running {step.node_type} with {len(input_paths)} inputs")
result_path = executor.execute(step.config, input_paths, output_path)
# Add output to IPFS
output_cid = ipfs_client.add_file(result_path)
if not output_cid:
raise RuntimeError("Failed to add output to IPFS")
logger.info(f"[CID] Completed: {step.step_id}{output_cid}")
# Store mapping
set_cached_cid(step.cache_id, output_cid)
return {
"status": "completed",
"step_id": step.step_id,
"cache_id": step.cache_id,
"cid": output_cid,
}
finally:
# Cleanup temp workspace
shutil.rmtree(work_dir, ignore_errors=True)
except Exception as e:
logger.error(f"[CID] Failed: {step.step_id}: {e}")
release_claim(step.cache_id)
return {
"status": "failed",
"step_id": step.step_id,
"cache_id": step.cache_id,
"error": str(e),
}
@app.task(bind=True, name='tasks.execute_plan_cid')
def execute_plan_cid(
self,
plan_json: str,
input_cids: Dict[str, str],
) -> Dict:
"""
Execute an entire plan using IPFS-primary architecture.
Args:
plan_json: JSON-serialized ExecutionPlan
input_cids: Mapping from input name to IPFS CID
Returns:
Dict with 'output_cid' and per-step results
"""
from celery import group
from artdag.planning import ExecutionPlan
plan = ExecutionPlan.from_json(plan_json)
logger.info(f"[CID] Executing plan: {plan.plan_id[:16]}... ({len(plan.steps)} steps)")
# CID results accumulate as steps complete
cid_results = dict(input_cids)
# Also map step_id → CID for dependency resolution
step_cids = {}
steps_by_level = plan.get_steps_by_level()
for level in sorted(steps_by_level.keys()):
steps = steps_by_level[level]
logger.info(f"[CID] Level {level}: {len(steps)} steps")
# Build input CIDs for this level
level_input_cids = dict(cid_results)
level_input_cids.update(step_cids)
# Dispatch steps in parallel
tasks = [
execute_step_cid.s(step.to_json(), level_input_cids)
for step in steps
]
if len(tasks) == 1:
# Single task - run directly
results = [tasks[0].apply_async().get(timeout=3600)]
else:
# Multiple tasks - run in parallel
job = group(tasks)
results = job.apply_async().get(timeout=3600)
# Collect output CIDs
for step, result in zip(steps, results):
if result.get("status") in ("completed", "cached", "completed_by_other"):
step_cids[step.step_id] = result["cid"]
else:
return {
"status": "failed",
"failed_step": step.step_id,
"error": result.get("error", "Unknown error"),
}
# Get final output CID
output_step_id = plan.output_step or plan.steps[-1].step_id
output_cid = step_cids.get(output_step_id)
logger.info(f"[CID] Plan complete: {output_cid}")
return {
"status": "completed",
"plan_id": plan.plan_id,
"output_cid": output_cid,
"step_cids": step_cids,
}

View File

@@ -217,9 +217,9 @@ def execute_step_sexp(
# Handle EFFECT nodes
if node_type == "EFFECT":
effect_hash = config.get("hash")
effect_hash = config.get("cid") or config.get("hash")
if not effect_hash:
raise ValueError("EFFECT step missing :hash")
raise ValueError("EFFECT step missing :cid")
# Get input paths
inputs = config.get("inputs", [])
@@ -277,7 +277,7 @@ def execute_step_sexp(
if filter_type == "EFFECT":
# Effect - for now identity-like, can be extended
effect_hash = filter_config.get("hash") or filter_config.get("effect")
effect_hash = filter_config.get("cid") or filter_config.get("hash") or filter_config.get("effect")
# TODO: resolve effect to actual FFmpeg filter
# For now, skip identity-like effects
pass

View File

@@ -380,7 +380,8 @@ def run_recipe(
# Store in cache (content-addressed, auto-pins to IPFS)
# Plan is just another node output - no special treatment needed
cached, plan_ipfs_cid = cache_mgr.put(tmp_path, node_type="plan", move=True)
logger.info(f"Plan cached: hash={cached.cid}, ipfs={plan_ipfs_cid}")
plan_cache_id = plan_ipfs_cid or cached.cid # Prefer IPFS CID
logger.info(f"Plan cached: cid={plan_cache_id}, ipfs={plan_ipfs_cid}")
# Phase 4: Execute
logger.info("Phase 4: Executing plan...")
@@ -392,7 +393,7 @@ def run_recipe(
"run_id": run_id,
"recipe": compiled.name,
"plan_id": plan.plan_id,
"plan_cache_id": cached.cid,
"plan_cache_id": plan_cache_id,
"plan_ipfs_cid": plan_ipfs_cid,
"output_path": result.get("output_path"),
"output_cache_id": result.get("output_cache_id"),

View File

@@ -1,434 +0,0 @@
"""
IPFS-primary orchestration.
Everything on IPFS:
- Inputs (media files)
- Analysis results (JSON)
- Execution plans (JSON)
- Step outputs (media files)
The entire pipeline just passes CIDs around.
Uses HybridStateManager for:
- Fast local Redis operations
- Background IPNS sync with other L1 nodes
"""
import json
import logging
import os
from pathlib import Path
from typing import Dict, List, Optional
from celery import group
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from celery_app import app
import ipfs_client
from hybrid_state import get_state_manager
# Import artdag modules
try:
from artdag.analysis import Analyzer, AnalysisResult
from artdag.planning import RecipePlanner, ExecutionPlan, Recipe
from artdag import nodes # Register executors
except ImportError:
Analyzer = None
AnalysisResult = None
RecipePlanner = None
ExecutionPlan = None
Recipe = None
from .analyze_cid import analyze_input_cid
from .execute_cid import execute_step_cid
logger = logging.getLogger(__name__)
def compute_run_id(recipe_cid: str, input_cids: Dict[str, str]) -> str:
"""Compute deterministic run ID from recipe and inputs."""
import hashlib
# Sort inputs for determinism
sorted_inputs = sorted(input_cids.items())
data = f"{recipe_cid}:{sorted_inputs}"
return hashlib.sha3_256(data.encode()).hexdigest()
@app.task(bind=True, name='tasks.register_input_cid')
def register_input_cid(
self,
input_path: str,
) -> dict:
"""
Register an input file on IPFS.
Args:
input_path: Local path to the input file
Returns:
Dict with 'cid' and 'cid'
"""
import hashlib
path = Path(input_path)
if not path.exists():
return {"status": "failed", "error": f"File not found: {input_path}"}
# Compute content hash
with open(path, "rb") as f:
cid = hashlib.sha3_256(f.read()).hexdigest()
# Add to IPFS
cid = ipfs_client.add_file(path)
if not cid:
return {"status": "failed", "error": "Failed to add to IPFS"}
logger.info(f"[CID] Registered input: {path.name}{cid}")
return {
"status": "completed",
"cid": cid,
"cid": cid,
"path": str(path),
}
@app.task(bind=True, name='tasks.register_recipe_cid')
def register_recipe_cid(
self,
recipe_path: str,
) -> dict:
"""
Register a recipe on IPFS.
Args:
recipe_path: Local path to the recipe YAML file
Returns:
Dict with 'cid' and 'recipe' (parsed)
"""
if Recipe is None:
raise ImportError("artdag.planning not available")
path = Path(recipe_path)
if not path.exists():
return {"status": "failed", "error": f"Recipe not found: {recipe_path}"}
# Parse recipe
recipe = Recipe.from_file(path)
# Store recipe YAML on IPFS
recipe_bytes = path.read_bytes()
cid = ipfs_client.add_bytes(recipe_bytes)
if not cid:
return {"status": "failed", "error": "Failed to add recipe to IPFS"}
logger.info(f"[CID] Registered recipe: {recipe.name}{cid}")
return {
"status": "completed",
"cid": cid,
"name": recipe.name,
"version": recipe.version,
}
@app.task(bind=True, name='tasks.generate_plan_cid')
def generate_plan_cid(
self,
recipe_cid: str,
input_cids: Dict[str, str],
input_hashes: Dict[str, str],
analysis_cids: Optional[Dict[str, str]] = None,
) -> dict:
"""
Generate execution plan and store on IPFS.
Args:
recipe_cid: IPFS CID of the recipe
input_cids: Mapping from input name to IPFS CID
input_hashes: Mapping from input name to content hash
analysis_cids: Optional mapping from input_hash to analysis CID
Returns:
Dict with 'plan_cid' and 'plan_id'
"""
if RecipePlanner is None:
raise ImportError("artdag.planning not available")
# Fetch recipe from IPFS
recipe_bytes = ipfs_client.get_bytes(recipe_cid)
if not recipe_bytes:
return {"status": "failed", "error": f"Failed to fetch recipe: {recipe_cid}"}
# Parse recipe from YAML
import yaml
recipe_dict = yaml.safe_load(recipe_bytes.decode('utf-8'))
recipe = Recipe.from_dict(recipe_dict)
# Fetch analysis results if provided
analysis = {}
if analysis_cids:
for input_hash, analysis_cid in analysis_cids.items():
analysis_bytes = ipfs_client.get_bytes(analysis_cid)
if analysis_bytes:
analysis_dict = json.loads(analysis_bytes.decode('utf-8'))
analysis[input_hash] = AnalysisResult.from_dict(analysis_dict)
# Generate plan
planner = RecipePlanner(use_tree_reduction=True)
plan = planner.plan(
recipe=recipe,
input_hashes=input_hashes,
analysis=analysis if analysis else None,
)
# Store plan on IPFS
plan_cid = ipfs_client.add_json(json.loads(plan.to_json()))
if not plan_cid:
return {"status": "failed", "error": "Failed to store plan on IPFS"}
# Cache plan_id → plan_cid mapping
get_state_manager().set_plan_cid(plan.plan_id, plan_cid)
logger.info(f"[CID] Generated plan: {plan.plan_id[:16]}... → {plan_cid}")
return {
"status": "completed",
"plan_cid": plan_cid,
"plan_id": plan.plan_id,
"steps": len(plan.steps),
}
@app.task(bind=True, name='tasks.execute_plan_from_cid')
def execute_plan_from_cid(
self,
plan_cid: str,
input_cids: Dict[str, str],
) -> dict:
"""
Execute a plan fetched from IPFS.
Args:
plan_cid: IPFS CID of the execution plan
input_cids: Mapping from input name/step_id to IPFS CID
Returns:
Dict with 'output_cid' and step results
"""
if ExecutionPlan is None:
raise ImportError("artdag.planning not available")
# Fetch plan from IPFS
plan_bytes = ipfs_client.get_bytes(plan_cid)
if not plan_bytes:
return {"status": "failed", "error": f"Failed to fetch plan: {plan_cid}"}
plan_json = plan_bytes.decode('utf-8')
plan = ExecutionPlan.from_json(plan_json)
logger.info(f"[CID] Executing plan: {plan.plan_id[:16]}... ({len(plan.steps)} steps)")
# CID results accumulate as steps complete
cid_results = dict(input_cids)
step_cids = {}
steps_by_level = plan.get_steps_by_level()
for level in sorted(steps_by_level.keys()):
steps = steps_by_level[level]
logger.info(f"[CID] Level {level}: {len(steps)} steps")
# Build input CIDs for this level
level_input_cids = dict(cid_results)
level_input_cids.update(step_cids)
# Dispatch steps in parallel
tasks = [
execute_step_cid.s(step.to_json(), level_input_cids)
for step in steps
]
if len(tasks) == 1:
results = [tasks[0].apply_async().get(timeout=3600)]
else:
job = group(tasks)
results = job.apply_async().get(timeout=3600)
# Collect output CIDs
for step, result in zip(steps, results):
if result.get("status") in ("completed", "cached", "completed_by_other"):
step_cids[step.step_id] = result["cid"]
else:
return {
"status": "failed",
"failed_step": step.step_id,
"error": result.get("error", "Unknown error"),
}
# Get final output CID
output_step_id = plan.output_step or plan.steps[-1].step_id
output_cid = step_cids.get(output_step_id)
logger.info(f"[CID] Plan complete: {output_cid}")
return {
"status": "completed",
"plan_id": plan.plan_id,
"plan_cid": plan_cid,
"output_cid": output_cid,
"step_cids": step_cids,
}
@app.task(bind=True, name='tasks.run_recipe_cid')
def run_recipe_cid(
self,
recipe_cid: str,
input_cids: Dict[str, str],
input_hashes: Dict[str, str],
features: Optional[List[str]] = None,
) -> dict:
"""
Run complete pipeline: analyze → plan → execute.
Everything on IPFS. Returns output CID.
Args:
recipe_cid: IPFS CID of the recipe
input_cids: Mapping from input name to IPFS CID
input_hashes: Mapping from input name to content hash
features: Optional list of features to analyze
Returns:
Dict with output_cid and all intermediate CIDs
"""
if features is None:
features = ["beats", "energy"]
logger.info(f"[CID] Running recipe {recipe_cid} with {len(input_cids)} inputs")
# Compute run ID for caching
run_id = compute_run_id(recipe_cid, input_cids)
# Check if run is already cached
cached_output = get_state_manager().get_run_cid(run_id)
if cached_output:
logger.info(f"[CID] Run cache hit: {run_id[:16]}... → {cached_output}")
return {
"status": "cached",
"run_id": run_id,
"output_cid": cached_output,
}
# Phase 1: Analyze inputs
logger.info("[CID] Phase 1: Analysis")
analysis_cids = {}
for input_name, input_cid in input_cids.items():
input_hash = input_hashes.get(input_name)
if not input_hash:
continue
result = analyze_input_cid.apply_async(
args=[input_cid, input_hash, features]
).get(timeout=600)
if result.get("status") in ("completed", "cached"):
analysis_cids[input_hash] = result["analysis_cid"]
else:
logger.warning(f"[CID] Analysis failed for {input_name}: {result.get('error')}")
# Phase 2: Generate plan
logger.info("[CID] Phase 2: Planning")
plan_result = generate_plan_cid.apply_async(
args=[recipe_cid, input_cids, input_hashes, analysis_cids]
).get(timeout=120)
if plan_result.get("status") != "completed":
return {
"status": "failed",
"phase": "planning",
"error": plan_result.get("error", "Unknown error"),
}
plan_cid = plan_result["plan_cid"]
# Phase 3: Execute
logger.info("[CID] Phase 3: Execution")
exec_result = execute_plan_from_cid.apply_async(
args=[plan_cid, input_cids]
).get(timeout=7200) # 2 hour timeout for execution
if exec_result.get("status") != "completed":
return {
"status": "failed",
"phase": "execution",
"error": exec_result.get("error", "Unknown error"),
}
output_cid = exec_result["output_cid"]
# Cache the run
get_state_manager().set_run_cid(run_id, output_cid)
logger.info(f"[CID] Run complete: {run_id[:16]}... → {output_cid}")
return {
"status": "completed",
"run_id": run_id,
"recipe_cid": recipe_cid,
"analysis_cids": analysis_cids,
"plan_cid": plan_cid,
"output_cid": output_cid,
"step_cids": exec_result.get("step_cids", {}),
}
@app.task(bind=True, name='tasks.run_from_local')
def run_from_local(
self,
recipe_path: str,
input_paths: Dict[str, str],
features: Optional[List[str]] = None,
) -> dict:
"""
Convenience task: register local files and run.
For bootstrapping - registers recipe and inputs on IPFS, then runs.
Args:
recipe_path: Local path to recipe YAML
input_paths: Mapping from input name to local file path
features: Optional list of features to analyze
Returns:
Dict with all CIDs including output
"""
import hashlib
# Register recipe
recipe_result = register_recipe_cid.apply_async(args=[recipe_path]).get(timeout=60)
if recipe_result.get("status") != "completed":
return {"status": "failed", "phase": "register_recipe", "error": recipe_result.get("error")}
recipe_cid = recipe_result["cid"]
# Register inputs
input_cids = {}
input_hashes = {}
for name, path in input_paths.items():
result = register_input_cid.apply_async(args=[path]).get(timeout=300)
if result.get("status") != "completed":
return {"status": "failed", "phase": "register_input", "input": name, "error": result.get("error")}
input_cids[name] = result["cid"]
input_hashes[name] = result["cid"]
# Run the pipeline
return run_recipe_cid.apply_async(
args=[recipe_cid, input_cids, input_hashes, features]
).get(timeout=7200)