When the Celery worker can't find source content in the local cache, fetch it from IPFS using the CID. This ensures workers can execute DAGs even when they don't share the same filesystem as the web server. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
580 lines
20 KiB
Python
580 lines
20 KiB
Python
"""
|
|
Art DAG Celery Tasks
|
|
|
|
Distributed rendering tasks for the Art DAG system.
|
|
Supports both single-effect runs and multi-step DAG execution.
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
|
|
from celery import Task
|
|
from celery_app import app
|
|
|
|
# Import artdag components
|
|
from artdag import DAG, Node, NodeType
|
|
from artdag.engine import Engine
|
|
from artdag.executor import register_executor, Executor, get_executor
|
|
from artdag.nodes.effect import register_effect
|
|
import artdag.nodes # Register all built-in executors (SOURCE, EFFECT, etc.)
|
|
|
|
# Add effects to path (use env var in Docker, fallback to home dir locally)
|
|
EFFECTS_PATH = Path(os.environ.get("EFFECTS_PATH", str(Path.home() / "artdag-effects")))
|
|
ARTDAG_PATH = Path(os.environ.get("ARTDAG_PATH", str(Path.home() / "art" / "artdag")))
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_effects_commit() -> str:
|
|
"""Get current git commit hash of effects repo."""
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "rev-parse", "HEAD"],
|
|
cwd=EFFECTS_PATH,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
if result.returncode == 0:
|
|
return result.stdout.strip()
|
|
except Exception:
|
|
pass
|
|
return "unknown"
|
|
|
|
|
|
def get_artdag_commit() -> str:
|
|
"""Get current git commit hash of artdag repo."""
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "rev-parse", "HEAD"],
|
|
cwd=ARTDAG_PATH,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
if result.returncode == 0:
|
|
return result.stdout.strip()
|
|
except Exception:
|
|
pass
|
|
return "unknown"
|
|
|
|
|
|
sys.path.insert(0, str(EFFECTS_PATH / "dog"))
|
|
|
|
# Register the dog effect with the EFFECT executor
|
|
# New format uses process() instead of effect_dog()
|
|
from effect import process as dog_process
|
|
|
|
@register_effect("dog")
|
|
def _dog_effect(input_path: Path, output_path: Path, config: dict) -> Path:
|
|
"""Dog effect wrapper - registered for DAG EFFECT nodes."""
|
|
# Wrap for new whole-video API
|
|
return dog_process([input_path], output_path, config, None)
|
|
|
|
|
|
def file_hash(path: Path) -> str:
|
|
"""Compute SHA3-256 hash of a file."""
|
|
hasher = hashlib.sha3_256()
|
|
actual_path = path.resolve() if path.is_symlink() else path
|
|
with open(actual_path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(65536), b""):
|
|
hasher.update(chunk)
|
|
return hasher.hexdigest()
|
|
|
|
|
|
# Cache directory (shared between server and worker)
|
|
CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache")))
|
|
|
|
|
|
# ============ Executors for Effects ============
|
|
|
|
@register_executor("effect:dog")
|
|
class DogExecutor(Executor):
|
|
"""Executor for the dog effect."""
|
|
|
|
def execute(self, config: Dict, inputs: List[Path], output_path: Path) -> Path:
|
|
from effect import effect_dog
|
|
if len(inputs) != 1:
|
|
raise ValueError(f"Dog effect expects 1 input, got {len(inputs)}")
|
|
return effect_dog(inputs[0], output_path, config)
|
|
|
|
|
|
@register_executor("effect:identity")
|
|
class IdentityExecutor(Executor):
|
|
"""Executor for the identity effect (passthrough)."""
|
|
|
|
def execute(self, config: Dict, inputs: List[Path], output_path: Path) -> Path:
|
|
from artdag.nodes.effect import effect_identity
|
|
if len(inputs) != 1:
|
|
raise ValueError(f"Identity effect expects 1 input, got {len(inputs)}")
|
|
return effect_identity(inputs[0], output_path, config)
|
|
|
|
|
|
@register_executor(NodeType.SOURCE)
|
|
class SourceExecutor(Executor):
|
|
"""Executor for SOURCE nodes - loads content from cache by hash."""
|
|
|
|
def execute(self, config: Dict, inputs: List[Path], output_path: Path) -> Path:
|
|
# Source nodes load from cache by cid
|
|
cid = config.get("cid")
|
|
if not cid:
|
|
raise ValueError("SOURCE node requires cid in config")
|
|
|
|
# Look up in cache
|
|
from cache_manager import get_cache_manager
|
|
cache_manager = get_cache_manager()
|
|
source_path = cache_manager.get_by_cid(cid)
|
|
|
|
if not source_path or not source_path.exists():
|
|
# Not in cache - fetch from IPFS
|
|
import logging
|
|
logger = logging.getLogger(__name__)
|
|
logger.info(f"SOURCE {cid[:16]}... not in cache, fetching from IPFS")
|
|
|
|
import ipfs_client
|
|
fetch_path = CACHE_DIR / "ipfs_fetch" / cid
|
|
fetch_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
if ipfs_client.get_file(cid, str(fetch_path)):
|
|
logger.info(f"SOURCE {cid[:16]}... fetched from IPFS to {fetch_path}")
|
|
source_path = fetch_path
|
|
else:
|
|
raise ValueError(f"Source content not in cache and IPFS fetch failed: {cid}")
|
|
|
|
# For source nodes, we just return the path (no transformation)
|
|
# The engine will use this as input to subsequent nodes
|
|
return source_path
|
|
|
|
|
|
class RenderTask(Task):
|
|
"""Base task with provenance tracking."""
|
|
|
|
def on_success(self, retval, task_id, args, kwargs):
|
|
"""Record successful render."""
|
|
print(f"Task {task_id} completed: {retval}")
|
|
|
|
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
|
"""Record failed render."""
|
|
print(f"Task {task_id} failed: {exc}")
|
|
|
|
|
|
@app.task(base=RenderTask, bind=True)
|
|
def render_effect(self, input_hash: str, effect_name: str, output_name: str) -> dict:
|
|
"""
|
|
Render an effect on an input asset.
|
|
|
|
Args:
|
|
input_hash: SHA3-256 hash of input asset
|
|
effect_name: Name of effect (e.g., "dog", "identity")
|
|
output_name: Name for output asset
|
|
|
|
Returns:
|
|
Provenance record with output hash
|
|
"""
|
|
from cache_manager import get_cache_manager
|
|
|
|
# Registry hashes (for effects/infra metadata only)
|
|
REGISTRY = {
|
|
"effect:dog": {
|
|
"hash": "d048fe313433eb4e38f0e24194ffae91b896ca3e6eed3e50b2cc37b7be495555"
|
|
},
|
|
"effect:identity": {
|
|
"hash": "640ea11ee881ebf4101af0a955439105ab11e763682b209e88ea08fc66e1cc03"
|
|
},
|
|
"infra:artdag": {
|
|
"hash": "96a5972de216aee12ec794dcad5f9360da2e676171eabf24a46dfe1ee5fee4b0"
|
|
},
|
|
"infra:giles-hp": {
|
|
"hash": "964bf6e69dc4e2493f42375013caffe26404ec3cf8eb5d9bc170cd42a361523b"
|
|
}
|
|
}
|
|
|
|
# Input comes from cache by hash (supports both legacy and new cache locations)
|
|
cache_manager = get_cache_manager()
|
|
input_path = cache_manager.get_by_cid(input_hash)
|
|
if not input_path or not input_path.exists():
|
|
raise ValueError(f"Input not in cache: {input_hash}")
|
|
|
|
output_dir = CACHE_DIR
|
|
|
|
# Verify input
|
|
actual_hash = file_hash(input_path)
|
|
if actual_hash != input_hash:
|
|
raise ValueError(f"Input hash mismatch: expected {input_hash}, got {actual_hash}")
|
|
|
|
self.update_state(state='RENDERING', meta={'effect': effect_name, 'input': input_hash[:16]})
|
|
|
|
# Load and apply effect
|
|
if effect_name == "dog":
|
|
from effect import effect_dog, DOG_HASH
|
|
output_path = output_dir / f"{output_name}.mkv"
|
|
result = effect_dog(input_path, output_path, {})
|
|
expected_hash = DOG_HASH
|
|
elif effect_name == "identity":
|
|
from artdag.nodes.effect import effect_identity
|
|
output_path = output_dir / f"{output_name}{input_path.suffix}"
|
|
result = effect_identity(input_path, output_path, {})
|
|
expected_hash = input_hash
|
|
else:
|
|
raise ValueError(f"Unknown effect: {effect_name}")
|
|
|
|
# Verify output
|
|
output_cid = file_hash(result)
|
|
if output_cid != expected_hash:
|
|
raise ValueError(f"Output hash mismatch: expected {expected_hash}, got {output_cid}")
|
|
|
|
# Build effect info based on source
|
|
if effect_name == "identity":
|
|
# Identity is from artdag package on GitHub
|
|
artdag_commit = get_artdag_commit()
|
|
effect_info = {
|
|
"name": f"effect:{effect_name}",
|
|
"cid": REGISTRY[f"effect:{effect_name}"]["hash"],
|
|
"repo": "github",
|
|
"repo_commit": artdag_commit,
|
|
"repo_url": f"https://github.com/gilesbradshaw/art-dag/blob/{artdag_commit}/artdag/nodes/effect.py"
|
|
}
|
|
else:
|
|
# Other effects from rose-ash effects repo
|
|
effects_commit = get_effects_commit()
|
|
effect_info = {
|
|
"name": f"effect:{effect_name}",
|
|
"cid": REGISTRY[f"effect:{effect_name}"]["hash"],
|
|
"repo": "rose-ash",
|
|
"repo_commit": effects_commit,
|
|
"repo_url": f"https://git.rose-ash.com/art-dag/effects/src/commit/{effects_commit}/{effect_name}"
|
|
}
|
|
|
|
# Build provenance
|
|
provenance = {
|
|
"task_id": self.request.id,
|
|
"rendered_at": datetime.now(timezone.utc).isoformat(),
|
|
"rendered_by": "@giles@artdag.rose-ash.com",
|
|
"output": {
|
|
"name": output_name,
|
|
"cid": output_cid,
|
|
},
|
|
"inputs": [
|
|
{"cid": input_hash}
|
|
],
|
|
"effects": [effect_info],
|
|
"infrastructure": {
|
|
"software": {"name": "infra:artdag", "cid": REGISTRY["infra:artdag"]["hash"]},
|
|
"hardware": {"name": "infra:giles-hp", "cid": REGISTRY["infra:giles-hp"]["hash"]}
|
|
}
|
|
}
|
|
|
|
# Store provenance on IPFS
|
|
import ipfs_client
|
|
provenance_cid = ipfs_client.add_json(provenance)
|
|
if provenance_cid:
|
|
provenance["provenance_cid"] = provenance_cid
|
|
logger.info(f"Stored provenance on IPFS: {provenance_cid}")
|
|
else:
|
|
logger.warning("Failed to store provenance on IPFS")
|
|
|
|
return provenance
|
|
|
|
|
|
@app.task
|
|
def render_dog_from_cat() -> dict:
|
|
"""Convenience task: render cat through dog effect."""
|
|
CAT_HASH = "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b"
|
|
return render_effect.delay(CAT_HASH, "dog", "dog-from-cat-celery").get()
|
|
|
|
|
|
@app.task(base=RenderTask, bind=True)
|
|
def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
|
|
"""
|
|
Execute a multi-step DAG.
|
|
|
|
Args:
|
|
dag_json: Serialized DAG as JSON string
|
|
run_id: Optional run ID for tracking
|
|
|
|
Returns:
|
|
Execution result with output hash and node results
|
|
"""
|
|
from cache_manager import get_cache_manager
|
|
|
|
# Parse DAG
|
|
try:
|
|
dag = DAG.from_json(dag_json)
|
|
except Exception as e:
|
|
raise ValueError(f"Invalid DAG JSON: {e}")
|
|
|
|
# Validate DAG
|
|
errors = dag.validate()
|
|
if errors:
|
|
raise ValueError(f"Invalid DAG: {errors}")
|
|
|
|
# Create engine with cache directory
|
|
engine = Engine(CACHE_DIR / "nodes")
|
|
|
|
# Set up progress callback
|
|
def progress_callback(progress):
|
|
self.update_state(
|
|
state='EXECUTING',
|
|
meta={
|
|
'node_id': progress.node_id,
|
|
'node_type': progress.node_type,
|
|
'status': progress.status,
|
|
'progress': progress.progress,
|
|
'message': progress.message,
|
|
}
|
|
)
|
|
logger.info(f"DAG progress: {progress.node_id} - {progress.status} - {progress.message}")
|
|
|
|
engine.set_progress_callback(progress_callback)
|
|
|
|
# Execute DAG
|
|
self.update_state(state='EXECUTING', meta={'status': 'starting', 'nodes': len(dag.nodes)})
|
|
result = engine.execute(dag)
|
|
|
|
if not result.success:
|
|
raise RuntimeError(f"DAG execution failed: {result.error}")
|
|
|
|
# Index all node outputs by cid and upload to IPFS
|
|
cache_manager = get_cache_manager()
|
|
output_cid = None
|
|
node_hashes = {} # node_id -> cid mapping
|
|
node_ipfs_cids = {} # node_id -> ipfs_cid mapping
|
|
|
|
# Process all node results (intermediates + output)
|
|
for node_id, node_path in result.node_results.items():
|
|
if node_path and Path(node_path).exists():
|
|
node = dag.nodes.get(node_id)
|
|
# Skip SOURCE nodes - they're already in cache
|
|
if node and (node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE"):
|
|
cid = node.config.get("cid")
|
|
if cid:
|
|
node_hashes[node_id] = cid
|
|
continue
|
|
|
|
# Determine node type for cache metadata
|
|
node_type_str = str(node.node_type) if node else "intermediate"
|
|
if "effect" in node_type_str.lower():
|
|
cache_node_type = "effect_output"
|
|
else:
|
|
cache_node_type = "dag_intermediate"
|
|
|
|
# Store in cache_manager (indexes by cid, uploads to IPFS)
|
|
# put() returns (CachedFile, cid) where cid is IPFS CID if available, else local hash
|
|
cached, content_cid = cache_manager.put(
|
|
Path(node_path),
|
|
node_type=cache_node_type,
|
|
node_id=node_id,
|
|
)
|
|
# content_cid is the primary identifier (IPFS CID or local hash)
|
|
node_hashes[node_id] = content_cid
|
|
# Track IPFS CIDs separately (they start with Qm or bafy)
|
|
if content_cid and (content_cid.startswith("Qm") or content_cid.startswith("bafy")):
|
|
node_ipfs_cids[node_id] = content_cid
|
|
logger.info(f"Cached node {node_id}: IPFS CID {content_cid}")
|
|
else:
|
|
logger.info(f"Cached node {node_id}: local hash {content_cid[:16] if content_cid else 'none'}...")
|
|
|
|
# Get output hash from the output node
|
|
# Use the same identifier that's in the cache index (IPFS CID if available)
|
|
if result.output_path and result.output_path.exists():
|
|
local_hash = file_hash(result.output_path)
|
|
output_ipfs_cid = node_ipfs_cids.get(dag.output_id)
|
|
# Use IPFS CID as primary identifier if available, otherwise local hash
|
|
# This must match what's in the content_index from cache_manager.put()
|
|
output_cid = node_hashes.get(dag.output_id, local_hash)
|
|
|
|
# Store output in database (for L2 to query IPFS CID)
|
|
import asyncio
|
|
import database
|
|
|
|
# Store plan (DAG) to IPFS
|
|
plan_cid = None
|
|
try:
|
|
import ipfs_client
|
|
dag_dict = json.loads(dag_json)
|
|
plan_cid = ipfs_client.add_json(dag_dict)
|
|
if plan_cid:
|
|
logger.info(f"Stored plan to IPFS: {plan_cid}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to store plan to IPFS: {e}")
|
|
|
|
async def save_to_db():
|
|
if database.pool is None:
|
|
await database.init_db()
|
|
await database.create_cache_item(output_cid, output_ipfs_cid)
|
|
# Also save the run result
|
|
if run_id:
|
|
input_hashes_for_db = [
|
|
node.config.get("cid")
|
|
for node in dag.nodes.values()
|
|
if (node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE")
|
|
and node.config.get("cid")
|
|
]
|
|
# Get actor_id and recipe from pending_runs (saved when run started)
|
|
actor_id = None
|
|
recipe_name = "dag"
|
|
pending = await database.get_pending_run(run_id)
|
|
if pending:
|
|
actor_id = pending.get("actor_id")
|
|
recipe_name = pending.get("recipe") or "dag"
|
|
|
|
await database.save_run_cache(
|
|
run_id=run_id,
|
|
output_cid=output_cid,
|
|
recipe=recipe_name,
|
|
inputs=input_hashes_for_db,
|
|
ipfs_cid=output_ipfs_cid,
|
|
actor_id=actor_id,
|
|
plan_cid=plan_cid,
|
|
)
|
|
|
|
# Save output as media for the user
|
|
if actor_id:
|
|
await database.save_item_metadata(
|
|
cid=output_cid,
|
|
actor_id=actor_id,
|
|
item_type="media",
|
|
description=f"Output from recipe: {recipe_name}",
|
|
source_type="recipe",
|
|
source_note=f"run_id: {run_id}",
|
|
)
|
|
|
|
# Clean up pending run
|
|
if pending:
|
|
await database.complete_pending_run(run_id)
|
|
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
if loop.is_running():
|
|
asyncio.ensure_future(save_to_db())
|
|
else:
|
|
loop.run_until_complete(save_to_db())
|
|
except RuntimeError:
|
|
asyncio.run(save_to_db())
|
|
|
|
# Record activity for deletion tracking
|
|
input_hashes = []
|
|
intermediate_hashes = []
|
|
for node_id, node in dag.nodes.items():
|
|
if node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE":
|
|
cid = node.config.get("cid")
|
|
if cid:
|
|
input_hashes.append(cid)
|
|
elif node_id != dag.output_id and node_id in node_hashes:
|
|
intermediate_hashes.append(node_hashes[node_id])
|
|
|
|
if input_hashes:
|
|
from artdag.activities import Activity
|
|
from datetime import datetime, timezone
|
|
activity = Activity(
|
|
activity_id=run_id or f"dag-{output_cid[:16]}",
|
|
input_ids=sorted(input_hashes),
|
|
output_id=output_cid,
|
|
intermediate_ids=intermediate_hashes,
|
|
created_at=datetime.now(timezone.utc).timestamp(),
|
|
status="completed",
|
|
)
|
|
cache_manager.activity_store.add(activity)
|
|
|
|
# Build provenance
|
|
input_hashes_for_provenance = []
|
|
for node_id, node in dag.nodes.items():
|
|
if node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE":
|
|
cid = node.config.get("cid")
|
|
if cid:
|
|
input_hashes_for_provenance.append({"cid": cid})
|
|
|
|
provenance = {
|
|
"task_id": self.request.id,
|
|
"run_id": run_id,
|
|
"rendered_at": datetime.now(timezone.utc).isoformat(),
|
|
"output": {
|
|
"cid": output_cid,
|
|
"ipfs_cid": node_ipfs_cids.get(dag.output_id) if dag.output_id else None,
|
|
},
|
|
"inputs": input_hashes_for_provenance,
|
|
"dag": dag_json, # Full DAG definition
|
|
"nodes": {
|
|
node_id: {
|
|
"cid": node_hashes.get(node_id),
|
|
"ipfs_cid": node_ipfs_cids.get(node_id),
|
|
}
|
|
for node_id in dag.nodes.keys()
|
|
if node_id in node_hashes
|
|
},
|
|
"execution": {
|
|
"execution_time": result.execution_time,
|
|
"nodes_executed": result.nodes_executed,
|
|
"nodes_cached": result.nodes_cached,
|
|
}
|
|
}
|
|
|
|
# Store provenance on IPFS
|
|
import ipfs_client
|
|
provenance_cid = ipfs_client.add_json(provenance)
|
|
if provenance_cid:
|
|
provenance["provenance_cid"] = provenance_cid
|
|
logger.info(f"Stored DAG provenance on IPFS: {provenance_cid}")
|
|
else:
|
|
logger.warning("Failed to store DAG provenance on IPFS")
|
|
|
|
# Build result
|
|
return {
|
|
"success": True,
|
|
"run_id": run_id,
|
|
"output_cid": output_cid,
|
|
"output_ipfs_cid": node_ipfs_cids.get(dag.output_id) if dag.output_id else None,
|
|
"output_path": str(result.output_path) if result.output_path else None,
|
|
"execution_time": result.execution_time,
|
|
"nodes_executed": result.nodes_executed,
|
|
"nodes_cached": result.nodes_cached,
|
|
"node_results": {
|
|
node_id: str(path) for node_id, path in result.node_results.items()
|
|
},
|
|
"node_hashes": node_hashes, # node_id -> cid
|
|
"node_ipfs_cids": node_ipfs_cids, # node_id -> ipfs_cid
|
|
"provenance_cid": provenance_cid,
|
|
}
|
|
|
|
|
|
def build_effect_dag(input_hashes: List[str], effect_name: str) -> DAG:
|
|
"""
|
|
Build a simple DAG for applying an effect to inputs.
|
|
|
|
Args:
|
|
input_hashes: List of input content hashes
|
|
effect_name: Name of effect to apply (e.g., "dog", "identity")
|
|
|
|
Returns:
|
|
DAG ready for execution
|
|
"""
|
|
dag = DAG()
|
|
|
|
# Add source nodes for each input
|
|
source_ids = []
|
|
for i, cid in enumerate(input_hashes):
|
|
source_node = Node(
|
|
node_type=NodeType.SOURCE,
|
|
config={"cid": cid},
|
|
name=f"source_{i}",
|
|
)
|
|
dag.add_node(source_node)
|
|
source_ids.append(source_node.node_id)
|
|
|
|
# Add effect node
|
|
effect_node = Node(
|
|
node_type=f"effect:{effect_name}",
|
|
config={},
|
|
inputs=source_ids,
|
|
name=f"effect_{effect_name}",
|
|
)
|
|
dag.add_node(effect_node)
|
|
dag.set_output(effect_node.node_id)
|
|
|
|
return dag
|