- Rename misleading ipfs_cid variable to content_cid - Detect IPFS CIDs by prefix (Qm or bafy) instead of truthy check - Add clearer logging to show whether IPFS or local hash is used Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
558 lines
19 KiB
Python
558 lines
19 KiB
Python
"""
|
|
Art DAG Celery Tasks
|
|
|
|
Distributed rendering tasks for the Art DAG system.
|
|
Supports both single-effect runs and multi-step DAG execution.
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
|
|
from celery import Task
|
|
from celery_app import app
|
|
|
|
# Import artdag components
|
|
from artdag import DAG, Node, NodeType
|
|
from artdag.engine import Engine
|
|
from artdag.executor import register_executor, Executor, get_executor
|
|
from artdag.nodes.effect import register_effect
|
|
import artdag.nodes # Register all built-in executors (SOURCE, EFFECT, etc.)
|
|
|
|
# Add effects to path (use env var in Docker, fallback to home dir locally)
|
|
EFFECTS_PATH = Path(os.environ.get("EFFECTS_PATH", str(Path.home() / "artdag-effects")))
|
|
ARTDAG_PATH = Path(os.environ.get("ARTDAG_PATH", str(Path.home() / "art" / "artdag")))
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_effects_commit() -> str:
|
|
"""Get current git commit hash of effects repo."""
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "rev-parse", "HEAD"],
|
|
cwd=EFFECTS_PATH,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
if result.returncode == 0:
|
|
return result.stdout.strip()
|
|
except Exception:
|
|
pass
|
|
return "unknown"
|
|
|
|
|
|
def get_artdag_commit() -> str:
|
|
"""Get current git commit hash of artdag repo."""
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "rev-parse", "HEAD"],
|
|
cwd=ARTDAG_PATH,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
if result.returncode == 0:
|
|
return result.stdout.strip()
|
|
except Exception:
|
|
pass
|
|
return "unknown"
|
|
|
|
|
|
sys.path.insert(0, str(EFFECTS_PATH / "dog"))
|
|
|
|
# Register the dog effect with the EFFECT executor
|
|
# New format uses process() instead of effect_dog()
|
|
from effect import process as dog_process
|
|
|
|
@register_effect("dog")
|
|
def _dog_effect(input_path: Path, output_path: Path, config: dict) -> Path:
|
|
"""Dog effect wrapper - registered for DAG EFFECT nodes."""
|
|
# Wrap for new whole-video API
|
|
return dog_process([input_path], output_path, config, None)
|
|
|
|
|
|
def file_hash(path: Path) -> str:
|
|
"""Compute SHA3-256 hash of a file."""
|
|
hasher = hashlib.sha3_256()
|
|
actual_path = path.resolve() if path.is_symlink() else path
|
|
with open(actual_path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(65536), b""):
|
|
hasher.update(chunk)
|
|
return hasher.hexdigest()
|
|
|
|
|
|
# Cache directory (shared between server and worker)
|
|
CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache")))
|
|
|
|
|
|
# ============ Executors for Effects ============
|
|
|
|
@register_executor("effect:dog")
|
|
class DogExecutor(Executor):
|
|
"""Executor for the dog effect."""
|
|
|
|
def execute(self, config: Dict, inputs: List[Path], output_path: Path) -> Path:
|
|
from effect import effect_dog
|
|
if len(inputs) != 1:
|
|
raise ValueError(f"Dog effect expects 1 input, got {len(inputs)}")
|
|
return effect_dog(inputs[0], output_path, config)
|
|
|
|
|
|
@register_executor("effect:identity")
|
|
class IdentityExecutor(Executor):
|
|
"""Executor for the identity effect (passthrough)."""
|
|
|
|
def execute(self, config: Dict, inputs: List[Path], output_path: Path) -> Path:
|
|
from artdag.nodes.effect import effect_identity
|
|
if len(inputs) != 1:
|
|
raise ValueError(f"Identity effect expects 1 input, got {len(inputs)}")
|
|
return effect_identity(inputs[0], output_path, config)
|
|
|
|
|
|
@register_executor(NodeType.SOURCE)
|
|
class SourceExecutor(Executor):
|
|
"""Executor for SOURCE nodes - loads content from cache by hash."""
|
|
|
|
def execute(self, config: Dict, inputs: List[Path], output_path: Path) -> Path:
|
|
# Source nodes load from cache by cid
|
|
cid = config.get("cid")
|
|
if not cid:
|
|
raise ValueError("SOURCE node requires cid in config")
|
|
|
|
# Look up in cache
|
|
source_path = CACHE_DIR / cid
|
|
if not source_path.exists():
|
|
# Try nodes directory
|
|
from cache_manager import get_cache_manager
|
|
cache_manager = get_cache_manager()
|
|
source_path = cache_manager.get_by_cid(cid)
|
|
|
|
if not source_path or not source_path.exists():
|
|
raise ValueError(f"Source content not in cache: {cid}")
|
|
|
|
# For source nodes, we just return the path (no transformation)
|
|
# The engine will use this as input to subsequent nodes
|
|
return source_path
|
|
|
|
|
|
class RenderTask(Task):
|
|
"""Base task with provenance tracking."""
|
|
|
|
def on_success(self, retval, task_id, args, kwargs):
|
|
"""Record successful render."""
|
|
print(f"Task {task_id} completed: {retval}")
|
|
|
|
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
|
"""Record failed render."""
|
|
print(f"Task {task_id} failed: {exc}")
|
|
|
|
|
|
@app.task(base=RenderTask, bind=True)
|
|
def render_effect(self, input_hash: str, effect_name: str, output_name: str) -> dict:
|
|
"""
|
|
Render an effect on an input asset.
|
|
|
|
Args:
|
|
input_hash: SHA3-256 hash of input asset
|
|
effect_name: Name of effect (e.g., "dog", "identity")
|
|
output_name: Name for output asset
|
|
|
|
Returns:
|
|
Provenance record with output hash
|
|
"""
|
|
from cache_manager import get_cache_manager
|
|
|
|
# Registry hashes (for effects/infra metadata only)
|
|
REGISTRY = {
|
|
"effect:dog": {
|
|
"hash": "d048fe313433eb4e38f0e24194ffae91b896ca3e6eed3e50b2cc37b7be495555"
|
|
},
|
|
"effect:identity": {
|
|
"hash": "640ea11ee881ebf4101af0a955439105ab11e763682b209e88ea08fc66e1cc03"
|
|
},
|
|
"infra:artdag": {
|
|
"hash": "96a5972de216aee12ec794dcad5f9360da2e676171eabf24a46dfe1ee5fee4b0"
|
|
},
|
|
"infra:giles-hp": {
|
|
"hash": "964bf6e69dc4e2493f42375013caffe26404ec3cf8eb5d9bc170cd42a361523b"
|
|
}
|
|
}
|
|
|
|
# Input comes from cache by hash (supports both legacy and new cache locations)
|
|
cache_manager = get_cache_manager()
|
|
input_path = cache_manager.get_by_cid(input_hash)
|
|
if not input_path or not input_path.exists():
|
|
raise ValueError(f"Input not in cache: {input_hash}")
|
|
|
|
output_dir = CACHE_DIR
|
|
|
|
# Verify input
|
|
actual_hash = file_hash(input_path)
|
|
if actual_hash != input_hash:
|
|
raise ValueError(f"Input hash mismatch: expected {input_hash}, got {actual_hash}")
|
|
|
|
self.update_state(state='RENDERING', meta={'effect': effect_name, 'input': input_hash[:16]})
|
|
|
|
# Load and apply effect
|
|
if effect_name == "dog":
|
|
from effect import effect_dog, DOG_HASH
|
|
output_path = output_dir / f"{output_name}.mkv"
|
|
result = effect_dog(input_path, output_path, {})
|
|
expected_hash = DOG_HASH
|
|
elif effect_name == "identity":
|
|
from artdag.nodes.effect import effect_identity
|
|
output_path = output_dir / f"{output_name}{input_path.suffix}"
|
|
result = effect_identity(input_path, output_path, {})
|
|
expected_hash = input_hash
|
|
else:
|
|
raise ValueError(f"Unknown effect: {effect_name}")
|
|
|
|
# Verify output
|
|
output_cid = file_hash(result)
|
|
if output_cid != expected_hash:
|
|
raise ValueError(f"Output hash mismatch: expected {expected_hash}, got {output_cid}")
|
|
|
|
# Build effect info based on source
|
|
if effect_name == "identity":
|
|
# Identity is from artdag package on GitHub
|
|
artdag_commit = get_artdag_commit()
|
|
effect_info = {
|
|
"name": f"effect:{effect_name}",
|
|
"cid": REGISTRY[f"effect:{effect_name}"]["hash"],
|
|
"repo": "github",
|
|
"repo_commit": artdag_commit,
|
|
"repo_url": f"https://github.com/gilesbradshaw/art-dag/blob/{artdag_commit}/artdag/nodes/effect.py"
|
|
}
|
|
else:
|
|
# Other effects from rose-ash effects repo
|
|
effects_commit = get_effects_commit()
|
|
effect_info = {
|
|
"name": f"effect:{effect_name}",
|
|
"cid": REGISTRY[f"effect:{effect_name}"]["hash"],
|
|
"repo": "rose-ash",
|
|
"repo_commit": effects_commit,
|
|
"repo_url": f"https://git.rose-ash.com/art-dag/effects/src/commit/{effects_commit}/{effect_name}"
|
|
}
|
|
|
|
# Build provenance
|
|
provenance = {
|
|
"task_id": self.request.id,
|
|
"rendered_at": datetime.now(timezone.utc).isoformat(),
|
|
"rendered_by": "@giles@artdag.rose-ash.com",
|
|
"output": {
|
|
"name": output_name,
|
|
"cid": output_cid,
|
|
},
|
|
"inputs": [
|
|
{"cid": input_hash}
|
|
],
|
|
"effects": [effect_info],
|
|
"infrastructure": {
|
|
"software": {"name": "infra:artdag", "cid": REGISTRY["infra:artdag"]["hash"]},
|
|
"hardware": {"name": "infra:giles-hp", "cid": REGISTRY["infra:giles-hp"]["hash"]}
|
|
}
|
|
}
|
|
|
|
# Store provenance on IPFS
|
|
import ipfs_client
|
|
provenance_cid = ipfs_client.add_json(provenance)
|
|
if provenance_cid:
|
|
provenance["provenance_cid"] = provenance_cid
|
|
logger.info(f"Stored provenance on IPFS: {provenance_cid}")
|
|
else:
|
|
logger.warning("Failed to store provenance on IPFS")
|
|
|
|
return provenance
|
|
|
|
|
|
@app.task
|
|
def render_dog_from_cat() -> dict:
|
|
"""Convenience task: render cat through dog effect."""
|
|
CAT_HASH = "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b"
|
|
return render_effect.delay(CAT_HASH, "dog", "dog-from-cat-celery").get()
|
|
|
|
|
|
@app.task(base=RenderTask, bind=True)
|
|
def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
|
|
"""
|
|
Execute a multi-step DAG.
|
|
|
|
Args:
|
|
dag_json: Serialized DAG as JSON string
|
|
run_id: Optional run ID for tracking
|
|
|
|
Returns:
|
|
Execution result with output hash and node results
|
|
"""
|
|
from cache_manager import get_cache_manager
|
|
|
|
# Parse DAG
|
|
try:
|
|
dag = DAG.from_json(dag_json)
|
|
except Exception as e:
|
|
raise ValueError(f"Invalid DAG JSON: {e}")
|
|
|
|
# Validate DAG
|
|
errors = dag.validate()
|
|
if errors:
|
|
raise ValueError(f"Invalid DAG: {errors}")
|
|
|
|
# Create engine with cache directory
|
|
engine = Engine(CACHE_DIR / "nodes")
|
|
|
|
# Set up progress callback
|
|
def progress_callback(progress):
|
|
self.update_state(
|
|
state='EXECUTING',
|
|
meta={
|
|
'node_id': progress.node_id,
|
|
'node_type': progress.node_type,
|
|
'status': progress.status,
|
|
'progress': progress.progress,
|
|
'message': progress.message,
|
|
}
|
|
)
|
|
logger.info(f"DAG progress: {progress.node_id} - {progress.status} - {progress.message}")
|
|
|
|
engine.set_progress_callback(progress_callback)
|
|
|
|
# Execute DAG
|
|
self.update_state(state='EXECUTING', meta={'status': 'starting', 'nodes': len(dag.nodes)})
|
|
result = engine.execute(dag)
|
|
|
|
if not result.success:
|
|
raise RuntimeError(f"DAG execution failed: {result.error}")
|
|
|
|
# Index all node outputs by cid and upload to IPFS
|
|
cache_manager = get_cache_manager()
|
|
output_cid = None
|
|
node_hashes = {} # node_id -> cid mapping
|
|
node_ipfs_cids = {} # node_id -> ipfs_cid mapping
|
|
|
|
# Process all node results (intermediates + output)
|
|
for node_id, node_path in result.node_results.items():
|
|
if node_path and Path(node_path).exists():
|
|
node = dag.nodes.get(node_id)
|
|
# Skip SOURCE nodes - they're already in cache
|
|
if node and (node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE"):
|
|
cid = node.config.get("cid")
|
|
if cid:
|
|
node_hashes[node_id] = cid
|
|
continue
|
|
|
|
# Determine node type for cache metadata
|
|
node_type_str = str(node.node_type) if node else "intermediate"
|
|
if "effect" in node_type_str.lower():
|
|
cache_node_type = "effect_output"
|
|
else:
|
|
cache_node_type = "dag_intermediate"
|
|
|
|
# Store in cache_manager (indexes by cid, uploads to IPFS)
|
|
# put() returns (CachedFile, cid) where cid is IPFS CID if available, else local hash
|
|
cached, content_cid = cache_manager.put(
|
|
Path(node_path),
|
|
node_type=cache_node_type,
|
|
node_id=node_id,
|
|
)
|
|
# content_cid is the primary identifier (IPFS CID or local hash)
|
|
node_hashes[node_id] = content_cid
|
|
# Track IPFS CIDs separately (they start with Qm or bafy)
|
|
if content_cid and (content_cid.startswith("Qm") or content_cid.startswith("bafy")):
|
|
node_ipfs_cids[node_id] = content_cid
|
|
logger.info(f"Cached node {node_id}: IPFS CID {content_cid}")
|
|
else:
|
|
logger.info(f"Cached node {node_id}: local hash {content_cid[:16] if content_cid else 'none'}...")
|
|
|
|
# Get output hash from the output node
|
|
# Use the same identifier that's in the cache index (IPFS CID if available)
|
|
if result.output_path and result.output_path.exists():
|
|
local_hash = file_hash(result.output_path)
|
|
output_ipfs_cid = node_ipfs_cids.get(dag.output_id)
|
|
# Use IPFS CID as primary identifier if available, otherwise local hash
|
|
# This must match what's in the content_index from cache_manager.put()
|
|
output_cid = node_hashes.get(dag.output_id, local_hash)
|
|
|
|
# Store output in database (for L2 to query IPFS CID)
|
|
import asyncio
|
|
import database
|
|
|
|
async def save_to_db():
|
|
if database.pool is None:
|
|
await database.init_db()
|
|
await database.create_cache_item(output_cid, output_ipfs_cid)
|
|
# Also save the run result
|
|
if run_id:
|
|
input_hashes_for_db = [
|
|
node.config.get("cid")
|
|
for node in dag.nodes.values()
|
|
if (node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE")
|
|
and node.config.get("cid")
|
|
]
|
|
# Get actor_id and recipe from pending_runs (saved when run started)
|
|
actor_id = None
|
|
recipe_name = "dag"
|
|
pending = await database.get_pending_run(run_id)
|
|
if pending:
|
|
actor_id = pending.get("actor_id")
|
|
recipe_name = pending.get("recipe") or "dag"
|
|
|
|
await database.save_run_cache(
|
|
run_id=run_id,
|
|
output_cid=output_cid,
|
|
recipe=recipe_name,
|
|
inputs=input_hashes_for_db,
|
|
ipfs_cid=output_ipfs_cid,
|
|
actor_id=actor_id,
|
|
)
|
|
|
|
# Save output as media for the user
|
|
if actor_id:
|
|
await database.save_item_metadata(
|
|
cid=output_cid,
|
|
actor_id=actor_id,
|
|
item_type="media",
|
|
description=f"Output from recipe: {recipe_name}",
|
|
source_type="recipe",
|
|
source_note=f"run_id: {run_id}",
|
|
)
|
|
|
|
# Clean up pending run
|
|
if pending:
|
|
await database.complete_pending_run(run_id)
|
|
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
if loop.is_running():
|
|
asyncio.ensure_future(save_to_db())
|
|
else:
|
|
loop.run_until_complete(save_to_db())
|
|
except RuntimeError:
|
|
asyncio.run(save_to_db())
|
|
|
|
# Record activity for deletion tracking
|
|
input_hashes = []
|
|
intermediate_hashes = []
|
|
for node_id, node in dag.nodes.items():
|
|
if node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE":
|
|
cid = node.config.get("cid")
|
|
if cid:
|
|
input_hashes.append(cid)
|
|
elif node_id != dag.output_id and node_id in node_hashes:
|
|
intermediate_hashes.append(node_hashes[node_id])
|
|
|
|
if input_hashes:
|
|
from artdag.activities import Activity
|
|
from datetime import datetime, timezone
|
|
activity = Activity(
|
|
activity_id=run_id or f"dag-{output_cid[:16]}",
|
|
input_ids=sorted(input_hashes),
|
|
output_id=output_cid,
|
|
intermediate_ids=intermediate_hashes,
|
|
created_at=datetime.now(timezone.utc).timestamp(),
|
|
status="completed",
|
|
)
|
|
cache_manager.activity_store.add(activity)
|
|
|
|
# Build provenance
|
|
input_hashes_for_provenance = []
|
|
for node_id, node in dag.nodes.items():
|
|
if node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE":
|
|
cid = node.config.get("cid")
|
|
if cid:
|
|
input_hashes_for_provenance.append({"cid": cid})
|
|
|
|
provenance = {
|
|
"task_id": self.request.id,
|
|
"run_id": run_id,
|
|
"rendered_at": datetime.now(timezone.utc).isoformat(),
|
|
"output": {
|
|
"cid": output_cid,
|
|
"ipfs_cid": node_ipfs_cids.get(dag.output_id) if dag.output_id else None,
|
|
},
|
|
"inputs": input_hashes_for_provenance,
|
|
"dag": dag_json, # Full DAG definition
|
|
"nodes": {
|
|
node_id: {
|
|
"cid": node_hashes.get(node_id),
|
|
"ipfs_cid": node_ipfs_cids.get(node_id),
|
|
}
|
|
for node_id in dag.nodes.keys()
|
|
if node_id in node_hashes
|
|
},
|
|
"execution": {
|
|
"execution_time": result.execution_time,
|
|
"nodes_executed": result.nodes_executed,
|
|
"nodes_cached": result.nodes_cached,
|
|
}
|
|
}
|
|
|
|
# Store provenance on IPFS
|
|
import ipfs_client
|
|
provenance_cid = ipfs_client.add_json(provenance)
|
|
if provenance_cid:
|
|
provenance["provenance_cid"] = provenance_cid
|
|
logger.info(f"Stored DAG provenance on IPFS: {provenance_cid}")
|
|
else:
|
|
logger.warning("Failed to store DAG provenance on IPFS")
|
|
|
|
# Build result
|
|
return {
|
|
"success": True,
|
|
"run_id": run_id,
|
|
"output_cid": output_cid,
|
|
"output_ipfs_cid": node_ipfs_cids.get(dag.output_id) if dag.output_id else None,
|
|
"output_path": str(result.output_path) if result.output_path else None,
|
|
"execution_time": result.execution_time,
|
|
"nodes_executed": result.nodes_executed,
|
|
"nodes_cached": result.nodes_cached,
|
|
"node_results": {
|
|
node_id: str(path) for node_id, path in result.node_results.items()
|
|
},
|
|
"node_hashes": node_hashes, # node_id -> cid
|
|
"node_ipfs_cids": node_ipfs_cids, # node_id -> ipfs_cid
|
|
"provenance_cid": provenance_cid,
|
|
}
|
|
|
|
|
|
def build_effect_dag(input_hashes: List[str], effect_name: str) -> DAG:
|
|
"""
|
|
Build a simple DAG for applying an effect to inputs.
|
|
|
|
Args:
|
|
input_hashes: List of input content hashes
|
|
effect_name: Name of effect to apply (e.g., "dog", "identity")
|
|
|
|
Returns:
|
|
DAG ready for execution
|
|
"""
|
|
dag = DAG()
|
|
|
|
# Add source nodes for each input
|
|
source_ids = []
|
|
for i, cid in enumerate(input_hashes):
|
|
source_node = Node(
|
|
node_type=NodeType.SOURCE,
|
|
config={"cid": cid},
|
|
name=f"source_{i}",
|
|
)
|
|
dag.add_node(source_node)
|
|
source_ids.append(source_node.node_id)
|
|
|
|
# Add effect node
|
|
effect_node = Node(
|
|
node_type=f"effect:{effect_name}",
|
|
config={},
|
|
inputs=source_ids,
|
|
name=f"effect_{effect_name}",
|
|
)
|
|
dag.add_node(effect_node)
|
|
dag.set_output(effect_node.node_id)
|
|
|
|
return dag
|