Files
celery/legacy_tasks.py
gilesb d603485d40 Refactor to S-expression based execution with code-addressed cache IDs
Major changes:
- Add execute_recipe task that uses S-expression planner
- Recipe S-expression unfolds into plan S-expression with code-addressed cache IDs
- Cache IDs computed from Merkle tree of plan structure (before execution)
- Add ipfs_client.add_string() for storing S-expression plans
- Update run_service.create_run() to use execute_recipe when recipe_sexp available
- Add _sexp_to_steps() to parse S-expression plans for UI visualization
- Plan endpoint now returns both sexp content and parsed steps

The code-addressed hashing means each plan step's cache_id is:
  sha3_256({node_type, config, sorted(input_cache_ids)})

This creates deterministic "buckets" for computation results computed
entirely from the plan structure, enabling automatic cache reuse.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-13 00:27:24 +00:00

969 lines
35 KiB
Python

"""
Art DAG Celery Tasks
Distributed rendering tasks for the Art DAG system.
Supports both single-effect runs and multi-step DAG execution.
"""
import hashlib
import json
import logging
import os
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional
from celery import Task
from celery_app import app
# Import artdag components
from artdag import DAG, Node, NodeType
from artdag.engine import Engine
from artdag.executor import register_executor, Executor, get_executor
from artdag.nodes.effect import register_effect
import artdag.nodes # Register all built-in executors (SOURCE, EFFECT, etc.)
# Add effects to path (use env var in Docker, fallback to home dir locally)
EFFECTS_PATH = Path(os.environ.get("EFFECTS_PATH", str(Path.home() / "artdag-effects")))
ARTDAG_PATH = Path(os.environ.get("ARTDAG_PATH", str(Path.home() / "art" / "artdag")))
logger = logging.getLogger(__name__)
def get_effects_commit() -> str:
"""Get current git commit hash of effects repo."""
try:
result = subprocess.run(
["git", "rev-parse", "HEAD"],
cwd=EFFECTS_PATH,
capture_output=True,
text=True
)
if result.returncode == 0:
return result.stdout.strip()
except Exception:
pass
return "unknown"
def get_artdag_commit() -> str:
"""Get current git commit hash of artdag repo."""
try:
result = subprocess.run(
["git", "rev-parse", "HEAD"],
cwd=ARTDAG_PATH,
capture_output=True,
text=True
)
if result.returncode == 0:
return result.stdout.strip()
except Exception:
pass
return "unknown"
sys.path.insert(0, str(EFFECTS_PATH / "dog"))
# Register the dog effect with the EFFECT executor
# New format uses process() instead of effect_dog()
from effect import process as dog_process
@register_effect("dog")
def _dog_effect(input_path: Path, output_path: Path, config: dict) -> Path:
"""Dog effect wrapper - registered for DAG EFFECT nodes."""
# Wrap for new whole-video API
return dog_process([input_path], output_path, config, None)
def file_hash(path: Path) -> str:
"""Compute SHA3-256 hash of a file."""
hasher = hashlib.sha3_256()
actual_path = path.resolve() if path.is_symlink() else path
with open(actual_path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
hasher.update(chunk)
return hasher.hexdigest()
# Cache directory (shared between server and worker)
CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache")))
# ============ Executors for Effects ============
@register_executor("effect:dog")
class DogExecutor(Executor):
"""Executor for the dog effect."""
def execute(self, config: Dict, inputs: List[Path], output_path: Path) -> Path:
from effect import effect_dog
if len(inputs) != 1:
raise ValueError(f"Dog effect expects 1 input, got {len(inputs)}")
return effect_dog(inputs[0], output_path, config)
@register_executor("effect:identity")
class IdentityExecutor(Executor):
"""Executor for the identity effect (passthrough)."""
def execute(self, config: Dict, inputs: List[Path], output_path: Path) -> Path:
from artdag.nodes.effect import effect_identity
if len(inputs) != 1:
raise ValueError(f"Identity effect expects 1 input, got {len(inputs)}")
return effect_identity(inputs[0], output_path, config)
@register_executor(NodeType.SOURCE)
class SourceExecutor(Executor):
"""Executor for SOURCE nodes - loads content from cache by hash."""
def execute(self, config: Dict, inputs: List[Path], output_path: Path) -> Path:
# Source nodes load from cache by cid
cid = config.get("cid")
if not cid:
raise ValueError("SOURCE node requires cid in config")
# Look up in cache
from cache_manager import get_cache_manager
cache_manager = get_cache_manager()
source_path = cache_manager.get_by_cid(cid)
if not source_path or not source_path.exists():
# Not in cache - fetch from IPFS
import logging
logger = logging.getLogger(__name__)
logger.info(f"SOURCE {cid[:16]}... not in cache, fetching from IPFS")
import ipfs_client
fetch_path = CACHE_DIR / "ipfs_fetch" / cid
fetch_path.parent.mkdir(parents=True, exist_ok=True)
if ipfs_client.get_file(cid, str(fetch_path)):
logger.info(f"SOURCE {cid[:16]}... fetched from IPFS to {fetch_path}")
source_path = fetch_path
else:
raise ValueError(f"Source content not in cache and IPFS fetch failed: {cid}")
# For source nodes, we just return the path (no transformation)
# The engine will use this as input to subsequent nodes
return source_path
class RenderTask(Task):
"""Base task with provenance tracking."""
def on_success(self, retval, task_id, args, kwargs):
"""Record successful render."""
print(f"Task {task_id} completed: {retval}")
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Record failed render."""
print(f"Task {task_id} failed: {exc}")
@app.task(base=RenderTask, bind=True)
def render_effect(self, input_hash: str, effect_name: str, output_name: str) -> dict:
"""
Render an effect on an input asset.
Args:
input_hash: SHA3-256 hash of input asset
effect_name: Name of effect (e.g., "dog", "identity")
output_name: Name for output asset
Returns:
Provenance record with output hash
"""
from cache_manager import get_cache_manager
# Registry hashes (for effects/infra metadata only)
REGISTRY = {
"effect:dog": {
"hash": "d048fe313433eb4e38f0e24194ffae91b896ca3e6eed3e50b2cc37b7be495555"
},
"effect:identity": {
"hash": "640ea11ee881ebf4101af0a955439105ab11e763682b209e88ea08fc66e1cc03"
},
"infra:artdag": {
"hash": "96a5972de216aee12ec794dcad5f9360da2e676171eabf24a46dfe1ee5fee4b0"
},
"infra:giles-hp": {
"hash": "964bf6e69dc4e2493f42375013caffe26404ec3cf8eb5d9bc170cd42a361523b"
}
}
# Input comes from cache by hash (supports both legacy and new cache locations)
cache_manager = get_cache_manager()
input_path = cache_manager.get_by_cid(input_hash)
if not input_path or not input_path.exists():
raise ValueError(f"Input not in cache: {input_hash}")
output_dir = CACHE_DIR
# Verify input
actual_hash = file_hash(input_path)
if actual_hash != input_hash:
raise ValueError(f"Input hash mismatch: expected {input_hash}, got {actual_hash}")
self.update_state(state='RENDERING', meta={'effect': effect_name, 'input': input_hash[:16]})
# Load and apply effect
if effect_name == "dog":
from effect import effect_dog, DOG_HASH
output_path = output_dir / f"{output_name}.mkv"
result = effect_dog(input_path, output_path, {})
expected_hash = DOG_HASH
elif effect_name == "identity":
from artdag.nodes.effect import effect_identity
output_path = output_dir / f"{output_name}{input_path.suffix}"
result = effect_identity(input_path, output_path, {})
expected_hash = input_hash
else:
raise ValueError(f"Unknown effect: {effect_name}")
# Verify output
output_cid = file_hash(result)
if output_cid != expected_hash:
raise ValueError(f"Output hash mismatch: expected {expected_hash}, got {output_cid}")
# Build effect info based on source
if effect_name == "identity":
# Identity is from artdag package on GitHub
artdag_commit = get_artdag_commit()
effect_info = {
"name": f"effect:{effect_name}",
"cid": REGISTRY[f"effect:{effect_name}"]["hash"],
"repo": "github",
"repo_commit": artdag_commit,
"repo_url": f"https://github.com/gilesbradshaw/art-dag/blob/{artdag_commit}/artdag/nodes/effect.py"
}
else:
# Other effects from rose-ash effects repo
effects_commit = get_effects_commit()
effect_info = {
"name": f"effect:{effect_name}",
"cid": REGISTRY[f"effect:{effect_name}"]["hash"],
"repo": "rose-ash",
"repo_commit": effects_commit,
"repo_url": f"https://git.rose-ash.com/art-dag/effects/src/commit/{effects_commit}/{effect_name}"
}
# Build provenance
provenance = {
"task_id": self.request.id,
"rendered_at": datetime.now(timezone.utc).isoformat(),
"rendered_by": "@giles@artdag.rose-ash.com",
"output": {
"name": output_name,
"cid": output_cid,
},
"inputs": [
{"cid": input_hash}
],
"effects": [effect_info],
"infrastructure": {
"software": {"name": "infra:artdag", "cid": REGISTRY["infra:artdag"]["hash"]},
"hardware": {"name": "infra:giles-hp", "cid": REGISTRY["infra:giles-hp"]["hash"]}
}
}
# Store provenance on IPFS
import ipfs_client
provenance_cid = ipfs_client.add_json(provenance)
if provenance_cid:
provenance["provenance_cid"] = provenance_cid
logger.info(f"Stored provenance on IPFS: {provenance_cid}")
else:
logger.warning("Failed to store provenance on IPFS")
return provenance
@app.task
def render_dog_from_cat() -> dict:
"""Convenience task: render cat through dog effect."""
CAT_HASH = "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b"
return render_effect.delay(CAT_HASH, "dog", "dog-from-cat-celery").get()
@app.task(base=RenderTask, bind=True)
def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
"""
Execute a multi-step DAG.
Args:
dag_json: Serialized DAG as JSON string
run_id: Optional run ID for tracking
Returns:
Execution result with output hash and node results
"""
from cache_manager import get_cache_manager
# Parse DAG
try:
dag = DAG.from_json(dag_json)
except Exception as e:
raise ValueError(f"Invalid DAG JSON: {e}")
# Validate DAG
errors = dag.validate()
if errors:
raise ValueError(f"Invalid DAG: {errors}")
# Create engine with cache directory
engine = Engine(CACHE_DIR / "nodes")
# Set up progress callback
def progress_callback(progress):
self.update_state(
state='EXECUTING',
meta={
'node_id': progress.node_id,
'node_type': progress.node_type,
'status': progress.status,
'progress': progress.progress,
'message': progress.message,
}
)
logger.info(f"DAG progress: {progress.node_id} - {progress.status} - {progress.message}")
engine.set_progress_callback(progress_callback)
# Execute DAG
self.update_state(state='EXECUTING', meta={'status': 'starting', 'nodes': len(dag.nodes)})
result = engine.execute(dag)
if not result.success:
raise RuntimeError(f"DAG execution failed: {result.error}")
# Index all node outputs by cid and upload to IPFS
cache_manager = get_cache_manager()
output_cid = None
node_hashes = {} # node_id -> cid mapping
node_ipfs_cids = {} # node_id -> ipfs_cid mapping
# Process all node results (intermediates + output)
for node_id, node_path in result.node_results.items():
if node_path and Path(node_path).exists():
node = dag.nodes.get(node_id)
# Skip SOURCE nodes - they're already in cache
if node and (node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE"):
cid = node.config.get("cid")
if cid:
node_hashes[node_id] = cid
continue
# Determine node type for cache metadata
node_type_str = str(node.node_type) if node else "intermediate"
if "effect" in node_type_str.lower():
cache_node_type = "effect_output"
else:
cache_node_type = "dag_intermediate"
# Store in cache_manager (indexes by cid, uploads to IPFS)
# put() returns (CachedFile, cid) where cid is IPFS CID if available, else local hash
cached, content_cid = cache_manager.put(
Path(node_path),
node_type=cache_node_type,
node_id=node_id,
)
# content_cid is the primary identifier (IPFS CID or local hash)
node_hashes[node_id] = content_cid
# Track IPFS CIDs separately (they start with Qm or bafy)
if content_cid and (content_cid.startswith("Qm") or content_cid.startswith("bafy")):
node_ipfs_cids[node_id] = content_cid
logger.info(f"Cached node {node_id}: IPFS CID {content_cid}")
else:
logger.info(f"Cached node {node_id}: local hash {content_cid[:16] if content_cid else 'none'}...")
# Get output hash from the output node
# Use the same identifier that's in the cache index (IPFS CID if available)
if result.output_path and result.output_path.exists():
local_hash = file_hash(result.output_path)
output_ipfs_cid = node_ipfs_cids.get(dag.output_id)
# Use IPFS CID as primary identifier if available, otherwise local hash
# This must match what's in the content_index from cache_manager.put()
output_cid = node_hashes.get(dag.output_id, local_hash)
# Store output in database (for L2 to query IPFS CID)
import asyncio
import database
# Store plan (DAG) to IPFS and local cache
plan_cid = None
try:
import ipfs_client
dag_dict = json.loads(dag_json)
plan_cid = ipfs_client.add_json(dag_dict)
if plan_cid:
logger.info(f"Stored plan to IPFS: {plan_cid}")
# Also store locally so it can be retrieved without IPFS
# Store directly in cache_dir (get_by_cid checks cache_dir/cid)
plan_path = CACHE_DIR / plan_cid
CACHE_DIR.mkdir(parents=True, exist_ok=True)
with open(plan_path, "w") as f:
json.dump(dag_dict, f, indent=2)
except Exception as e:
logger.warning(f"Failed to store plan to IPFS: {e}")
async def save_to_db():
if database.pool is None:
await database.init_db()
await database.create_cache_item(output_cid, output_ipfs_cid)
# Also save the run result
if run_id:
input_hashes_for_db = [
node.config.get("cid")
for node in dag.nodes.values()
if (node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE")
and node.config.get("cid")
]
# Get actor_id and recipe from pending_runs (saved when run started)
actor_id = None
recipe_name = "dag"
pending = await database.get_pending_run(run_id)
if pending:
actor_id = pending.get("actor_id")
recipe_name = pending.get("recipe") or "dag"
await database.save_run_cache(
run_id=run_id,
output_cid=output_cid,
recipe=recipe_name,
inputs=input_hashes_for_db,
ipfs_cid=output_ipfs_cid,
actor_id=actor_id,
plan_cid=plan_cid,
)
# Save output as media for the user
if actor_id:
await database.save_item_metadata(
cid=output_cid,
actor_id=actor_id,
item_type="media",
description=f"Output from recipe: {recipe_name}",
source_type="recipe",
source_note=f"run_id: {run_id}",
)
# Clean up pending run
if pending:
await database.complete_pending_run(run_id)
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.ensure_future(save_to_db())
else:
loop.run_until_complete(save_to_db())
except RuntimeError:
asyncio.run(save_to_db())
# Record activity for deletion tracking
input_hashes = []
intermediate_hashes = []
for node_id, node in dag.nodes.items():
if node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE":
cid = node.config.get("cid")
if cid:
input_hashes.append(cid)
elif node_id != dag.output_id and node_id in node_hashes:
intermediate_hashes.append(node_hashes[node_id])
if input_hashes:
from artdag.activities import Activity
from datetime import datetime, timezone
activity = Activity(
activity_id=run_id or f"dag-{output_cid[:16]}",
input_ids=sorted(input_hashes),
output_id=output_cid,
intermediate_ids=intermediate_hashes,
created_at=datetime.now(timezone.utc).timestamp(),
status="completed",
)
cache_manager.activity_store.add(activity)
# Build provenance
input_hashes_for_provenance = []
for node_id, node in dag.nodes.items():
if node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE":
cid = node.config.get("cid")
if cid:
input_hashes_for_provenance.append({"cid": cid})
provenance = {
"task_id": self.request.id,
"run_id": run_id,
"rendered_at": datetime.now(timezone.utc).isoformat(),
"output": {
"cid": output_cid,
"ipfs_cid": node_ipfs_cids.get(dag.output_id) if dag.output_id else None,
},
"inputs": input_hashes_for_provenance,
"dag": dag_json, # Full DAG definition
"nodes": {
node_id: {
"cid": node_hashes.get(node_id),
"ipfs_cid": node_ipfs_cids.get(node_id),
}
for node_id in dag.nodes.keys()
if node_id in node_hashes
},
"execution": {
"execution_time": result.execution_time,
"nodes_executed": result.nodes_executed,
"nodes_cached": result.nodes_cached,
}
}
# Store provenance on IPFS
import ipfs_client
provenance_cid = ipfs_client.add_json(provenance)
if provenance_cid:
provenance["provenance_cid"] = provenance_cid
logger.info(f"Stored DAG provenance on IPFS: {provenance_cid}")
else:
logger.warning("Failed to store DAG provenance on IPFS")
# Build result
return {
"success": True,
"run_id": run_id,
"output_cid": output_cid,
"output_ipfs_cid": node_ipfs_cids.get(dag.output_id) if dag.output_id else None,
"output_path": str(result.output_path) if result.output_path else None,
"execution_time": result.execution_time,
"nodes_executed": result.nodes_executed,
"nodes_cached": result.nodes_cached,
"node_results": {
node_id: str(path) for node_id, path in result.node_results.items()
},
"node_hashes": node_hashes, # node_id -> cid
"node_ipfs_cids": node_ipfs_cids, # node_id -> ipfs_cid
"provenance_cid": provenance_cid,
}
@app.task(base=RenderTask, bind=True)
def execute_recipe(self, recipe_sexp: str, input_hashes: Dict[str, str], run_id: str = None) -> dict:
"""
Execute an S-expression recipe.
The recipe S-expression unfolds into a plan S-expression with code-addressed
cache IDs computed before execution. Each plan node gets a deterministic hash
"bucket" based on the computation definition (Merkle tree), not the results.
Phases:
1. Parse: compile_string(recipe_sexp) -> CompiledRecipe
2. Analyze: Extract and run analysis nodes from recipe
3. Plan: create_plan(compiled, inputs) -> ExecutionPlanSexp with cache IDs
4. Store: plan.to_string() -> store as S-expression
5. Execute: Run steps level-by-level, checking cache by cache_id
6. Return: Include plan_sexp in result
Args:
recipe_sexp: Recipe as S-expression string
input_hashes: Mapping from input name to content hash (CID)
run_id: Optional run ID for tracking
Returns:
Execution result with output CID, plan S-expression, and node results
"""
from cache_manager import get_cache_manager
import ipfs_client
# Try to import S-expression modules
try:
from artdag.sexp import compile_string, CompileError, ParseError
from artdag.sexp.planner import create_plan, ExecutionPlanSexp, PlanStep
except ImportError as e:
raise ImportError(f"S-expression modules not available: {e}")
cache_manager = get_cache_manager()
logger.info(f"Executing recipe with {len(input_hashes)} inputs, run_id={run_id}")
# ============ Phase 1: Parse ============
self.update_state(state='PARSING', meta={'status': 'parsing recipe'})
logger.info("Phase 1: Parsing recipe S-expression...")
try:
compiled = compile_string(recipe_sexp)
except (ParseError, CompileError) as e:
raise ValueError(f"Recipe parse error: {e}")
recipe_name = compiled.name or "unnamed"
logger.info(f"Parsed recipe: {recipe_name}")
# ============ Phase 2: Analysis ============
self.update_state(state='ANALYZING', meta={'status': 'running analysis'})
logger.info("Phase 2: Running analysis nodes...")
analysis_results = {}
# Extract analysis nodes from compiled recipe
for node in compiled.nodes:
node_type = node.get("type", "").upper()
config = node.get("config", {})
if node_type == "ANALYZE" or config.get("analyze"):
node_id = node.get("id")
input_ref = config.get("input") or config.get("source")
feature = config.get("feature") or config.get("analyze")
# Resolve input reference to CID
cid = input_hashes.get(input_ref)
if not cid:
logger.warning(f"Analysis node {node_id}: input '{input_ref}' not in input_hashes")
continue
# Get input file path
input_path = cache_manager.get_by_cid(cid)
if not input_path:
logger.warning(f"Analysis node {node_id}: content {cid[:16]}... not in cache")
continue
# Run analysis
try:
from artdag.analysis import Analyzer
analysis_dir = CACHE_DIR / "analysis"
analysis_dir.mkdir(parents=True, exist_ok=True)
analyzer = Analyzer(cache_dir=analysis_dir)
features = [feature] if feature else ["beats", "energy"]
result = analyzer.analyze(
input_hash=cid,
features=features,
input_path=Path(input_path),
)
analysis_results[node_id] = result
analysis_results[cid] = result
logger.info(f"Analysis {node_id}: feature={feature}")
except Exception as e:
logger.warning(f"Analysis failed for {node_id}: {e}")
logger.info(f"Completed {len(analysis_results)} analysis results")
# ============ Phase 3: Generate Plan ============
self.update_state(state='PLANNING', meta={'status': 'generating plan'})
logger.info("Phase 3: Generating execution plan with code-addressed cache IDs...")
plan = create_plan(compiled, inputs=input_hashes)
logger.info(f"Generated plan with {len(plan.steps)} steps, plan_id={plan.plan_id[:16]}...")
# ============ Phase 4: Store Plan as S-expression ============
plan_sexp = plan.to_string(pretty=True)
plan_cid = None
try:
plan_cid = ipfs_client.add_string(plan_sexp)
if plan_cid:
logger.info(f"Stored plan to IPFS: {plan_cid}")
# Also store locally for fast retrieval
plan_path = CACHE_DIR / plan_cid
CACHE_DIR.mkdir(parents=True, exist_ok=True)
plan_path.write_text(plan_sexp)
except Exception as e:
logger.warning(f"Failed to store plan to IPFS: {e}")
# ============ Phase 5: Execute Steps Level-by-Level ============
self.update_state(state='EXECUTING', meta={'status': 'executing steps', 'total_steps': len(plan.steps)})
logger.info("Phase 4: Executing plan steps...")
# Group steps by level
steps_by_level: Dict[int, List[PlanStep]] = {}
for step in plan.steps:
level = step.level
steps_by_level.setdefault(level, []).append(step)
max_level = max(steps_by_level.keys()) if steps_by_level else 0
step_results = {} # step_id -> {"status", "path", "cid", "ipfs_cid"}
cache_id_to_path = {} # cache_id -> output path (for resolving inputs)
total_cached = 0
total_executed = 0
# Map input names to their cache_ids (inputs are their own cache_ids)
for name, cid in input_hashes.items():
cache_id_to_path[cid] = cache_manager.get_by_cid(cid)
for level in range(max_level + 1):
level_steps = steps_by_level.get(level, [])
if not level_steps:
continue
logger.info(f"Executing level {level}: {len(level_steps)} steps")
for step in level_steps:
self.update_state(
state='EXECUTING',
meta={
'step_id': step.step_id,
'step_type': step.node_type,
'level': level,
'cache_id': step.cache_id[:16],
}
)
# Check if cached using code-addressed cache_id
cached_path = cache_manager.get_by_cid(step.cache_id)
if cached_path and cached_path.exists():
logger.info(f"Step {step.step_id}: cached at {step.cache_id[:16]}...")
step_results[step.step_id] = {
"status": "cached",
"path": str(cached_path),
"cache_id": step.cache_id,
}
cache_id_to_path[step.cache_id] = cached_path
total_cached += 1
continue
# Execute the step
try:
# Resolve input paths from previous step cache_ids
input_paths = []
for input_ref in step.inputs:
# input_ref is a step_id - find its cache_id and path
input_step = next((s for s in plan.steps if s.step_id == input_ref), None)
if input_step:
input_cache_id = input_step.cache_id
input_path = cache_id_to_path.get(input_cache_id)
if input_path:
input_paths.append(Path(input_path))
else:
# Check if it's a source input
source_cid = step.config.get("cid")
if source_cid:
input_path = cache_manager.get_by_cid(source_cid)
if input_path:
input_paths.append(Path(input_path))
else:
# Direct CID reference (source node)
source_cid = input_hashes.get(input_ref) or step.config.get("cid")
if source_cid:
input_path = cache_manager.get_by_cid(source_cid)
if input_path:
input_paths.append(Path(input_path))
# Handle SOURCE nodes
if step.node_type == "SOURCE":
source_cid = step.config.get("cid")
if source_cid:
source_path = cache_manager.get_by_cid(source_cid)
if source_path:
step_results[step.step_id] = {
"status": "source",
"path": str(source_path),
"cache_id": step.cache_id,
"cid": source_cid,
}
cache_id_to_path[step.cache_id] = source_path
total_cached += 1
continue
else:
raise ValueError(f"Source content not found: {source_cid}")
# Get executor for this step type
executor = get_executor(step.node_type)
if not executor:
# Try effect executor
effect_name = step.config.get("effect")
if effect_name:
executor = get_executor(f"effect:{effect_name}")
if not executor:
raise ValueError(f"No executor for node type: {step.node_type}")
# Determine output path
output_dir = CACHE_DIR / "nodes" / step.cache_id
output_dir.mkdir(parents=True, exist_ok=True)
output_path = output_dir / "output.mkv"
# Execute
logger.info(f"Executing step {step.step_id} ({step.node_type}) with {len(input_paths)} inputs")
result_path = executor.execute(step.config, input_paths, output_path)
# Store result in cache under code-addressed cache_id
cached, content_cid = cache_manager.put(
result_path,
node_type=step.node_type,
node_id=step.cache_id, # Use cache_id as node_id
)
step_results[step.step_id] = {
"status": "executed",
"path": str(result_path),
"cache_id": step.cache_id,
"cid": content_cid,
"ipfs_cid": content_cid if content_cid.startswith("Qm") or content_cid.startswith("bafy") else None,
}
cache_id_to_path[step.cache_id] = result_path
total_executed += 1
logger.info(f"Step {step.step_id}: executed -> {content_cid[:16]}...")
except Exception as e:
logger.error(f"Step {step.step_id} failed: {e}")
return {
"success": False,
"run_id": run_id,
"error": f"Step {step.step_id} failed: {e}",
"step_results": step_results,
"plan_cid": plan_cid,
"plan_sexp": plan_sexp,
}
# Get output from final step
output_step = next((s for s in plan.steps if s.step_id == plan.output_step_id), None)
output_cid = None
output_ipfs_cid = None
output_path = None
if output_step:
output_result = step_results.get(output_step.step_id, {})
output_cid = output_result.get("cid") or output_result.get("cache_id")
output_ipfs_cid = output_result.get("ipfs_cid")
output_path = output_result.get("path")
# ============ Phase 6: Store Results ============
logger.info("Phase 5: Storing results...")
# Store in database
import asyncio
import database
async def save_to_db():
if database.pool is None:
await database.init_db()
# Get actor_id from pending run
actor_id = None
pending = await database.get_pending_run(run_id) if run_id else None
if pending:
actor_id = pending.get("actor_id")
await database.save_run_cache(
run_id=run_id,
output_cid=output_cid,
recipe=recipe_name,
inputs=list(input_hashes.values()),
ipfs_cid=output_ipfs_cid,
actor_id=actor_id,
plan_cid=plan_cid,
)
# Save output as media for user
if actor_id and output_cid:
await database.save_item_metadata(
cid=output_cid,
actor_id=actor_id,
item_type="media",
description=f"Output from recipe: {recipe_name}",
source_type="recipe",
source_note=f"run_id: {run_id}",
)
# Complete pending run
if pending and run_id:
await database.complete_pending_run(run_id)
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.ensure_future(save_to_db())
else:
loop.run_until_complete(save_to_db())
except RuntimeError:
asyncio.run(save_to_db())
# Build and store provenance
provenance = {
"task_id": self.request.id,
"run_id": run_id,
"rendered_at": datetime.now(timezone.utc).isoformat(),
"recipe": recipe_name,
"recipe_sexp": recipe_sexp,
"plan_sexp": plan_sexp,
"plan_cid": plan_cid,
"output": {
"cid": output_cid,
"ipfs_cid": output_ipfs_cid,
},
"inputs": input_hashes,
"steps": {
step_id: {
"cache_id": result.get("cache_id"),
"cid": result.get("cid"),
"status": result.get("status"),
}
for step_id, result in step_results.items()
},
"execution": {
"total_steps": len(plan.steps),
"cached": total_cached,
"executed": total_executed,
}
}
provenance_cid = ipfs_client.add_json(provenance)
if provenance_cid:
logger.info(f"Stored provenance on IPFS: {provenance_cid}")
logger.info(f"Recipe execution complete: output={output_cid[:16] if output_cid else 'none'}...")
return {
"success": True,
"run_id": run_id,
"recipe": recipe_name,
"plan_cid": plan_cid,
"plan_sexp": plan_sexp,
"output_cid": output_cid,
"output_ipfs_cid": output_ipfs_cid,
"output_path": output_path,
"total_steps": len(plan.steps),
"cached": total_cached,
"executed": total_executed,
"step_results": step_results,
"provenance_cid": provenance_cid,
}
def build_effect_dag(input_hashes: List[str], effect_name: str) -> DAG:
"""
Build a simple DAG for applying an effect to inputs.
Args:
input_hashes: List of input content hashes
effect_name: Name of effect to apply (e.g., "dog", "identity")
Returns:
DAG ready for execution
"""
dag = DAG()
# Add source nodes for each input
source_ids = []
for i, cid in enumerate(input_hashes):
source_node = Node(
node_type=NodeType.SOURCE,
config={"cid": cid},
name=f"source_{i}",
)
dag.add_node(source_node)
source_ids.append(source_node.node_id)
# Add effect node
effect_node = Node(
node_type=f"effect:{effect_name}",
config={},
inputs=source_ids,
name=f"effect_{effect_name}",
)
dag.add_node(effect_node)
dag.set_output(effect_node.node_id)
return dag