- Remove execute_level() from tasks/execute.py (defined but never called) - Remove render_dog_from_cat() from legacy_tasks.py (test convenience, never used) - Remove duplicate file_hash() from legacy_tasks.py, import from cache_manager - Remove unused hashlib import Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
952 lines
35 KiB
Python
952 lines
35 KiB
Python
"""
|
|
Art DAG Celery Tasks
|
|
|
|
Distributed rendering tasks for the Art DAG system.
|
|
Supports both single-effect runs and multi-step DAG execution.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
|
|
from celery import Task
|
|
from celery_app import app
|
|
from cache_manager import file_hash
|
|
|
|
# Import artdag components
|
|
from artdag import DAG, Node, NodeType
|
|
from artdag.engine import Engine
|
|
from artdag.executor import register_executor, Executor, get_executor
|
|
from artdag.nodes.effect import register_effect
|
|
import artdag.nodes # Register all built-in executors (SOURCE, EFFECT, etc.)
|
|
|
|
# Add effects to path (use env var in Docker, fallback to home dir locally)
|
|
EFFECTS_PATH = Path(os.environ.get("EFFECTS_PATH", str(Path.home() / "artdag-effects")))
|
|
ARTDAG_PATH = Path(os.environ.get("ARTDAG_PATH", str(Path.home() / "art" / "artdag")))
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def get_effects_commit() -> str:
|
|
"""Get current git commit hash of effects repo."""
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "rev-parse", "HEAD"],
|
|
cwd=EFFECTS_PATH,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
if result.returncode == 0:
|
|
return result.stdout.strip()
|
|
except Exception:
|
|
pass
|
|
return "unknown"
|
|
|
|
|
|
def get_artdag_commit() -> str:
|
|
"""Get current git commit hash of artdag repo."""
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "rev-parse", "HEAD"],
|
|
cwd=ARTDAG_PATH,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
if result.returncode == 0:
|
|
return result.stdout.strip()
|
|
except Exception:
|
|
pass
|
|
return "unknown"
|
|
|
|
|
|
sys.path.insert(0, str(EFFECTS_PATH / "dog"))
|
|
|
|
# Register the dog effect with the EFFECT executor
|
|
# New format uses process() instead of effect_dog()
|
|
from effect import process as dog_process
|
|
|
|
@register_effect("dog")
|
|
def _dog_effect(input_path: Path, output_path: Path, config: dict) -> Path:
|
|
"""Dog effect wrapper - registered for DAG EFFECT nodes."""
|
|
# Wrap for new whole-video API
|
|
return dog_process([input_path], output_path, config, None)
|
|
|
|
|
|
# Cache directory (shared between server and worker)
|
|
CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache")))
|
|
|
|
|
|
# ============ Executors for Effects ============
|
|
|
|
@register_executor("effect:dog")
|
|
class DogExecutor(Executor):
|
|
"""Executor for the dog effect."""
|
|
|
|
def execute(self, config: Dict, inputs: List[Path], output_path: Path) -> Path:
|
|
from effect import effect_dog
|
|
if len(inputs) != 1:
|
|
raise ValueError(f"Dog effect expects 1 input, got {len(inputs)}")
|
|
return effect_dog(inputs[0], output_path, config)
|
|
|
|
|
|
@register_executor("effect:identity")
|
|
class IdentityExecutor(Executor):
|
|
"""Executor for the identity effect (passthrough)."""
|
|
|
|
def execute(self, config: Dict, inputs: List[Path], output_path: Path) -> Path:
|
|
from artdag.nodes.effect import effect_identity
|
|
if len(inputs) != 1:
|
|
raise ValueError(f"Identity effect expects 1 input, got {len(inputs)}")
|
|
return effect_identity(inputs[0], output_path, config)
|
|
|
|
|
|
@register_executor(NodeType.SOURCE)
|
|
class SourceExecutor(Executor):
|
|
"""Executor for SOURCE nodes - loads content from cache by hash."""
|
|
|
|
def execute(self, config: Dict, inputs: List[Path], output_path: Path) -> Path:
|
|
# Source nodes load from cache by cid
|
|
cid = config.get("cid")
|
|
if not cid:
|
|
raise ValueError("SOURCE node requires cid in config")
|
|
|
|
# Look up in cache
|
|
from cache_manager import get_cache_manager
|
|
cache_manager = get_cache_manager()
|
|
source_path = cache_manager.get_by_cid(cid)
|
|
|
|
if not source_path or not source_path.exists():
|
|
# Not in cache - fetch from IPFS
|
|
import logging
|
|
logger = logging.getLogger(__name__)
|
|
logger.info(f"SOURCE {cid[:16]}... not in cache, fetching from IPFS")
|
|
|
|
import ipfs_client
|
|
fetch_path = CACHE_DIR / "ipfs_fetch" / cid
|
|
fetch_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
if ipfs_client.get_file(cid, str(fetch_path)):
|
|
logger.info(f"SOURCE {cid[:16]}... fetched from IPFS to {fetch_path}")
|
|
source_path = fetch_path
|
|
else:
|
|
raise ValueError(f"Source content not in cache and IPFS fetch failed: {cid}")
|
|
|
|
# For source nodes, we just return the path (no transformation)
|
|
# The engine will use this as input to subsequent nodes
|
|
return source_path
|
|
|
|
|
|
class RenderTask(Task):
|
|
"""Base task with provenance tracking."""
|
|
|
|
def on_success(self, retval, task_id, args, kwargs):
|
|
"""Record successful render."""
|
|
print(f"Task {task_id} completed: {retval}")
|
|
|
|
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
|
"""Record failed render."""
|
|
print(f"Task {task_id} failed: {exc}")
|
|
|
|
|
|
@app.task(base=RenderTask, bind=True)
|
|
def render_effect(self, input_hash: str, effect_name: str, output_name: str) -> dict:
|
|
"""
|
|
Render an effect on an input asset.
|
|
|
|
Args:
|
|
input_hash: SHA3-256 hash of input asset
|
|
effect_name: Name of effect (e.g., "dog", "identity")
|
|
output_name: Name for output asset
|
|
|
|
Returns:
|
|
Provenance record with output hash
|
|
"""
|
|
from cache_manager import get_cache_manager
|
|
|
|
# Registry hashes (for effects/infra metadata only)
|
|
REGISTRY = {
|
|
"effect:dog": {
|
|
"hash": "d048fe313433eb4e38f0e24194ffae91b896ca3e6eed3e50b2cc37b7be495555"
|
|
},
|
|
"effect:identity": {
|
|
"hash": "640ea11ee881ebf4101af0a955439105ab11e763682b209e88ea08fc66e1cc03"
|
|
},
|
|
"infra:artdag": {
|
|
"hash": "96a5972de216aee12ec794dcad5f9360da2e676171eabf24a46dfe1ee5fee4b0"
|
|
},
|
|
"infra:giles-hp": {
|
|
"hash": "964bf6e69dc4e2493f42375013caffe26404ec3cf8eb5d9bc170cd42a361523b"
|
|
}
|
|
}
|
|
|
|
# Input comes from cache by hash (supports both legacy and new cache locations)
|
|
cache_manager = get_cache_manager()
|
|
input_path = cache_manager.get_by_cid(input_hash)
|
|
if not input_path or not input_path.exists():
|
|
raise ValueError(f"Input not in cache: {input_hash}")
|
|
|
|
output_dir = CACHE_DIR
|
|
|
|
# Verify input
|
|
actual_hash = file_hash(input_path)
|
|
if actual_hash != input_hash:
|
|
raise ValueError(f"Input hash mismatch: expected {input_hash}, got {actual_hash}")
|
|
|
|
self.update_state(state='RENDERING', meta={'effect': effect_name, 'input': input_hash[:16]})
|
|
|
|
# Load and apply effect
|
|
if effect_name == "dog":
|
|
from effect import effect_dog, DOG_HASH
|
|
output_path = output_dir / f"{output_name}.mkv"
|
|
result = effect_dog(input_path, output_path, {})
|
|
expected_hash = DOG_HASH
|
|
elif effect_name == "identity":
|
|
from artdag.nodes.effect import effect_identity
|
|
output_path = output_dir / f"{output_name}{input_path.suffix}"
|
|
result = effect_identity(input_path, output_path, {})
|
|
expected_hash = input_hash
|
|
else:
|
|
raise ValueError(f"Unknown effect: {effect_name}")
|
|
|
|
# Verify output
|
|
output_cid = file_hash(result)
|
|
if output_cid != expected_hash:
|
|
raise ValueError(f"Output hash mismatch: expected {expected_hash}, got {output_cid}")
|
|
|
|
# Build effect info based on source
|
|
if effect_name == "identity":
|
|
# Identity is from artdag package on GitHub
|
|
artdag_commit = get_artdag_commit()
|
|
effect_info = {
|
|
"name": f"effect:{effect_name}",
|
|
"cid": REGISTRY[f"effect:{effect_name}"]["hash"],
|
|
"repo": "github",
|
|
"repo_commit": artdag_commit,
|
|
"repo_url": f"https://github.com/gilesbradshaw/art-dag/blob/{artdag_commit}/artdag/nodes/effect.py"
|
|
}
|
|
else:
|
|
# Other effects from rose-ash effects repo
|
|
effects_commit = get_effects_commit()
|
|
effect_info = {
|
|
"name": f"effect:{effect_name}",
|
|
"cid": REGISTRY[f"effect:{effect_name}"]["hash"],
|
|
"repo": "rose-ash",
|
|
"repo_commit": effects_commit,
|
|
"repo_url": f"https://git.rose-ash.com/art-dag/effects/src/commit/{effects_commit}/{effect_name}"
|
|
}
|
|
|
|
# Build provenance
|
|
provenance = {
|
|
"task_id": self.request.id,
|
|
"rendered_at": datetime.now(timezone.utc).isoformat(),
|
|
"rendered_by": "@giles@artdag.rose-ash.com",
|
|
"output": {
|
|
"name": output_name,
|
|
"cid": output_cid,
|
|
},
|
|
"inputs": [
|
|
{"cid": input_hash}
|
|
],
|
|
"effects": [effect_info],
|
|
"infrastructure": {
|
|
"software": {"name": "infra:artdag", "cid": REGISTRY["infra:artdag"]["hash"]},
|
|
"hardware": {"name": "infra:giles-hp", "cid": REGISTRY["infra:giles-hp"]["hash"]}
|
|
}
|
|
}
|
|
|
|
# Store provenance on IPFS
|
|
import ipfs_client
|
|
provenance_cid = ipfs_client.add_json(provenance)
|
|
if provenance_cid:
|
|
provenance["provenance_cid"] = provenance_cid
|
|
logger.info(f"Stored provenance on IPFS: {provenance_cid}")
|
|
else:
|
|
logger.warning("Failed to store provenance on IPFS")
|
|
|
|
return provenance
|
|
|
|
|
|
@app.task(base=RenderTask, bind=True)
|
|
def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
|
|
"""
|
|
Execute a multi-step DAG.
|
|
|
|
Args:
|
|
dag_json: Serialized DAG as JSON string
|
|
run_id: Optional run ID for tracking
|
|
|
|
Returns:
|
|
Execution result with output hash and node results
|
|
"""
|
|
from cache_manager import get_cache_manager
|
|
|
|
# Parse DAG
|
|
try:
|
|
dag = DAG.from_json(dag_json)
|
|
except Exception as e:
|
|
raise ValueError(f"Invalid DAG JSON: {e}")
|
|
|
|
# Validate DAG
|
|
errors = dag.validate()
|
|
if errors:
|
|
raise ValueError(f"Invalid DAG: {errors}")
|
|
|
|
# Create engine with cache directory
|
|
engine = Engine(CACHE_DIR / "nodes")
|
|
|
|
# Set up progress callback
|
|
def progress_callback(progress):
|
|
self.update_state(
|
|
state='EXECUTING',
|
|
meta={
|
|
'node_id': progress.node_id,
|
|
'node_type': progress.node_type,
|
|
'status': progress.status,
|
|
'progress': progress.progress,
|
|
'message': progress.message,
|
|
}
|
|
)
|
|
logger.info(f"DAG progress: {progress.node_id} - {progress.status} - {progress.message}")
|
|
|
|
engine.set_progress_callback(progress_callback)
|
|
|
|
# Execute DAG
|
|
self.update_state(state='EXECUTING', meta={'status': 'starting', 'nodes': len(dag.nodes)})
|
|
result = engine.execute(dag)
|
|
|
|
if not result.success:
|
|
raise RuntimeError(f"DAG execution failed: {result.error}")
|
|
|
|
# Index all node outputs by cid and upload to IPFS
|
|
cache_manager = get_cache_manager()
|
|
output_cid = None
|
|
node_hashes = {} # node_id -> cid mapping
|
|
node_ipfs_cids = {} # node_id -> ipfs_cid mapping
|
|
|
|
# Process all node results (intermediates + output)
|
|
for node_id, node_path in result.node_results.items():
|
|
if node_path and Path(node_path).exists():
|
|
node = dag.nodes.get(node_id)
|
|
# Skip SOURCE nodes - they're already in cache
|
|
if node and (node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE"):
|
|
cid = node.config.get("cid")
|
|
if cid:
|
|
node_hashes[node_id] = cid
|
|
continue
|
|
|
|
# Determine node type for cache metadata
|
|
node_type_str = str(node.node_type) if node else "intermediate"
|
|
if "effect" in node_type_str.lower():
|
|
cache_node_type = "effect_output"
|
|
else:
|
|
cache_node_type = "dag_intermediate"
|
|
|
|
# Store in cache_manager (indexes by cid, uploads to IPFS)
|
|
# put() returns (CachedFile, cid) where cid is IPFS CID if available, else local hash
|
|
cached, content_cid = cache_manager.put(
|
|
Path(node_path),
|
|
node_type=cache_node_type,
|
|
node_id=node_id,
|
|
)
|
|
# content_cid is the primary identifier (IPFS CID or local hash)
|
|
node_hashes[node_id] = content_cid
|
|
# Track IPFS CIDs separately (they start with Qm or bafy)
|
|
if content_cid and (content_cid.startswith("Qm") or content_cid.startswith("bafy")):
|
|
node_ipfs_cids[node_id] = content_cid
|
|
logger.info(f"Cached node {node_id}: IPFS CID {content_cid}")
|
|
else:
|
|
logger.info(f"Cached node {node_id}: local hash {content_cid[:16] if content_cid else 'none'}...")
|
|
|
|
# Get output hash from the output node
|
|
# Use the same identifier that's in the cache index (IPFS CID if available)
|
|
if result.output_path and result.output_path.exists():
|
|
local_hash = file_hash(result.output_path)
|
|
output_ipfs_cid = node_ipfs_cids.get(dag.output_id)
|
|
# Use IPFS CID as primary identifier if available, otherwise local hash
|
|
# This must match what's in the content_index from cache_manager.put()
|
|
output_cid = node_hashes.get(dag.output_id, local_hash)
|
|
|
|
# Store output in database (for L2 to query IPFS CID)
|
|
import asyncio
|
|
import database
|
|
|
|
# Store plan (DAG) to IPFS and local cache
|
|
plan_cid = None
|
|
try:
|
|
import ipfs_client
|
|
dag_dict = json.loads(dag_json)
|
|
plan_cid = ipfs_client.add_json(dag_dict)
|
|
if plan_cid:
|
|
logger.info(f"Stored plan to IPFS: {plan_cid}")
|
|
# Also store locally so it can be retrieved without IPFS
|
|
# Store directly in cache_dir (get_by_cid checks cache_dir/cid)
|
|
plan_path = CACHE_DIR / plan_cid
|
|
CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
|
with open(plan_path, "w") as f:
|
|
json.dump(dag_dict, f, indent=2)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to store plan to IPFS: {e}")
|
|
|
|
async def save_to_db():
|
|
if database.pool is None:
|
|
await database.init_db()
|
|
await database.create_cache_item(output_cid, output_ipfs_cid)
|
|
# Also save the run result
|
|
if run_id:
|
|
input_hashes_for_db = [
|
|
node.config.get("cid")
|
|
for node in dag.nodes.values()
|
|
if (node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE")
|
|
and node.config.get("cid")
|
|
]
|
|
# Get actor_id and recipe from pending_runs (saved when run started)
|
|
actor_id = None
|
|
recipe_name = "dag"
|
|
pending = await database.get_pending_run(run_id)
|
|
if pending:
|
|
actor_id = pending.get("actor_id")
|
|
recipe_name = pending.get("recipe") or "dag"
|
|
|
|
await database.save_run_cache(
|
|
run_id=run_id,
|
|
output_cid=output_cid,
|
|
recipe=recipe_name,
|
|
inputs=input_hashes_for_db,
|
|
ipfs_cid=output_ipfs_cid,
|
|
actor_id=actor_id,
|
|
plan_cid=plan_cid,
|
|
)
|
|
|
|
# Save output as media for the user
|
|
if actor_id:
|
|
await database.save_item_metadata(
|
|
cid=output_cid,
|
|
actor_id=actor_id,
|
|
item_type="media",
|
|
description=f"Output from recipe: {recipe_name}",
|
|
source_type="recipe",
|
|
source_note=f"run_id: {run_id}",
|
|
)
|
|
|
|
# Clean up pending run
|
|
if pending:
|
|
await database.complete_pending_run(run_id)
|
|
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
if loop.is_running():
|
|
asyncio.ensure_future(save_to_db())
|
|
else:
|
|
loop.run_until_complete(save_to_db())
|
|
except RuntimeError:
|
|
asyncio.run(save_to_db())
|
|
|
|
# Record activity for deletion tracking
|
|
input_hashes = []
|
|
intermediate_hashes = []
|
|
for node_id, node in dag.nodes.items():
|
|
if node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE":
|
|
cid = node.config.get("cid")
|
|
if cid:
|
|
input_hashes.append(cid)
|
|
elif node_id != dag.output_id and node_id in node_hashes:
|
|
intermediate_hashes.append(node_hashes[node_id])
|
|
|
|
if input_hashes:
|
|
from artdag.activities import Activity
|
|
from datetime import datetime, timezone
|
|
activity = Activity(
|
|
activity_id=run_id or f"dag-{output_cid[:16]}",
|
|
input_ids=sorted(input_hashes),
|
|
output_id=output_cid,
|
|
intermediate_ids=intermediate_hashes,
|
|
created_at=datetime.now(timezone.utc).timestamp(),
|
|
status="completed",
|
|
)
|
|
cache_manager.activity_store.add(activity)
|
|
|
|
# Build provenance
|
|
input_hashes_for_provenance = []
|
|
for node_id, node in dag.nodes.items():
|
|
if node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE":
|
|
cid = node.config.get("cid")
|
|
if cid:
|
|
input_hashes_for_provenance.append({"cid": cid})
|
|
|
|
provenance = {
|
|
"task_id": self.request.id,
|
|
"run_id": run_id,
|
|
"rendered_at": datetime.now(timezone.utc).isoformat(),
|
|
"output": {
|
|
"cid": output_cid,
|
|
"ipfs_cid": node_ipfs_cids.get(dag.output_id) if dag.output_id else None,
|
|
},
|
|
"inputs": input_hashes_for_provenance,
|
|
"dag": dag_json, # Full DAG definition
|
|
"nodes": {
|
|
node_id: {
|
|
"cid": node_hashes.get(node_id),
|
|
"ipfs_cid": node_ipfs_cids.get(node_id),
|
|
}
|
|
for node_id in dag.nodes.keys()
|
|
if node_id in node_hashes
|
|
},
|
|
"execution": {
|
|
"execution_time": result.execution_time,
|
|
"nodes_executed": result.nodes_executed,
|
|
"nodes_cached": result.nodes_cached,
|
|
}
|
|
}
|
|
|
|
# Store provenance on IPFS
|
|
import ipfs_client
|
|
provenance_cid = ipfs_client.add_json(provenance)
|
|
if provenance_cid:
|
|
provenance["provenance_cid"] = provenance_cid
|
|
logger.info(f"Stored DAG provenance on IPFS: {provenance_cid}")
|
|
else:
|
|
logger.warning("Failed to store DAG provenance on IPFS")
|
|
|
|
# Build result
|
|
return {
|
|
"success": True,
|
|
"run_id": run_id,
|
|
"output_cid": output_cid,
|
|
"output_ipfs_cid": node_ipfs_cids.get(dag.output_id) if dag.output_id else None,
|
|
"output_path": str(result.output_path) if result.output_path else None,
|
|
"execution_time": result.execution_time,
|
|
"nodes_executed": result.nodes_executed,
|
|
"nodes_cached": result.nodes_cached,
|
|
"node_results": {
|
|
node_id: str(path) for node_id, path in result.node_results.items()
|
|
},
|
|
"node_hashes": node_hashes, # node_id -> cid
|
|
"node_ipfs_cids": node_ipfs_cids, # node_id -> ipfs_cid
|
|
"provenance_cid": provenance_cid,
|
|
}
|
|
|
|
|
|
@app.task(base=RenderTask, bind=True)
|
|
def execute_recipe(self, recipe_sexp: str, input_hashes: Dict[str, str], run_id: str = None) -> dict:
|
|
"""
|
|
Execute an S-expression recipe.
|
|
|
|
The recipe S-expression unfolds into a plan S-expression with code-addressed
|
|
cache IDs computed before execution. Each plan node gets a deterministic hash
|
|
"bucket" based on the computation definition (Merkle tree), not the results.
|
|
|
|
Phases:
|
|
1. Parse: compile_string(recipe_sexp) -> CompiledRecipe
|
|
2. Analyze: Extract and run analysis nodes from recipe
|
|
3. Plan: create_plan(compiled, inputs) -> ExecutionPlanSexp with cache IDs
|
|
4. Store: plan.to_string() -> store as S-expression
|
|
5. Execute: Run steps level-by-level, checking cache by cache_id
|
|
6. Return: Include plan_sexp in result
|
|
|
|
Args:
|
|
recipe_sexp: Recipe as S-expression string
|
|
input_hashes: Mapping from input name to content hash (CID)
|
|
run_id: Optional run ID for tracking
|
|
|
|
Returns:
|
|
Execution result with output CID, plan S-expression, and node results
|
|
"""
|
|
from cache_manager import get_cache_manager
|
|
import ipfs_client
|
|
|
|
# Try to import S-expression modules
|
|
try:
|
|
from artdag.sexp import compile_string, CompileError, ParseError
|
|
from artdag.sexp.planner import create_plan, ExecutionPlanSexp, PlanStep
|
|
except ImportError as e:
|
|
raise ImportError(f"S-expression modules not available: {e}")
|
|
|
|
cache_manager = get_cache_manager()
|
|
|
|
logger.info(f"Executing recipe with {len(input_hashes)} inputs, run_id={run_id}")
|
|
|
|
# ============ Phase 1: Parse ============
|
|
self.update_state(state='PARSING', meta={'status': 'parsing recipe'})
|
|
logger.info("Phase 1: Parsing recipe S-expression...")
|
|
|
|
try:
|
|
compiled = compile_string(recipe_sexp)
|
|
except (ParseError, CompileError) as e:
|
|
raise ValueError(f"Recipe parse error: {e}")
|
|
|
|
recipe_name = compiled.name or "unnamed"
|
|
logger.info(f"Parsed recipe: {recipe_name}")
|
|
|
|
# ============ Phase 2: Analysis ============
|
|
self.update_state(state='ANALYZING', meta={'status': 'running analysis'})
|
|
logger.info("Phase 2: Running analysis nodes...")
|
|
|
|
analysis_results = {}
|
|
# Extract analysis nodes from compiled recipe
|
|
for node in compiled.nodes:
|
|
node_type = node.get("type", "").upper()
|
|
config = node.get("config", {})
|
|
|
|
if node_type == "ANALYZE" or config.get("analyze"):
|
|
node_id = node.get("id")
|
|
input_ref = config.get("input") or config.get("source")
|
|
feature = config.get("feature") or config.get("analyze")
|
|
|
|
# Resolve input reference to CID
|
|
cid = input_hashes.get(input_ref)
|
|
if not cid:
|
|
logger.warning(f"Analysis node {node_id}: input '{input_ref}' not in input_hashes")
|
|
continue
|
|
|
|
# Get input file path
|
|
input_path = cache_manager.get_by_cid(cid)
|
|
if not input_path:
|
|
logger.warning(f"Analysis node {node_id}: content {cid[:16]}... not in cache")
|
|
continue
|
|
|
|
# Run analysis
|
|
try:
|
|
from artdag.analysis import Analyzer
|
|
analysis_dir = CACHE_DIR / "analysis"
|
|
analysis_dir.mkdir(parents=True, exist_ok=True)
|
|
analyzer = Analyzer(cache_dir=analysis_dir)
|
|
|
|
features = [feature] if feature else ["beats", "energy"]
|
|
result = analyzer.analyze(
|
|
input_hash=cid,
|
|
features=features,
|
|
input_path=Path(input_path),
|
|
)
|
|
analysis_results[node_id] = result
|
|
analysis_results[cid] = result
|
|
logger.info(f"Analysis {node_id}: feature={feature}")
|
|
except Exception as e:
|
|
logger.warning(f"Analysis failed for {node_id}: {e}")
|
|
|
|
logger.info(f"Completed {len(analysis_results)} analysis results")
|
|
|
|
# ============ Phase 3: Generate Plan ============
|
|
self.update_state(state='PLANNING', meta={'status': 'generating plan'})
|
|
logger.info("Phase 3: Generating execution plan with code-addressed cache IDs...")
|
|
|
|
plan = create_plan(compiled, inputs=input_hashes)
|
|
logger.info(f"Generated plan with {len(plan.steps)} steps, plan_id={plan.plan_id[:16]}...")
|
|
|
|
# ============ Phase 4: Store Plan as S-expression ============
|
|
plan_sexp = plan.to_string(pretty=True)
|
|
plan_cid = None
|
|
|
|
try:
|
|
plan_cid = ipfs_client.add_string(plan_sexp)
|
|
if plan_cid:
|
|
logger.info(f"Stored plan to IPFS: {plan_cid}")
|
|
# Also store locally for fast retrieval
|
|
plan_path = CACHE_DIR / plan_cid
|
|
CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
|
plan_path.write_text(plan_sexp)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to store plan to IPFS: {e}")
|
|
|
|
# ============ Phase 5: Execute Steps Level-by-Level ============
|
|
self.update_state(state='EXECUTING', meta={'status': 'executing steps', 'total_steps': len(plan.steps)})
|
|
logger.info("Phase 4: Executing plan steps...")
|
|
|
|
# Group steps by level
|
|
steps_by_level: Dict[int, List[PlanStep]] = {}
|
|
for step in plan.steps:
|
|
level = step.level
|
|
steps_by_level.setdefault(level, []).append(step)
|
|
|
|
max_level = max(steps_by_level.keys()) if steps_by_level else 0
|
|
|
|
step_results = {} # step_id -> {"status", "path", "cid", "ipfs_cid"}
|
|
cache_id_to_path = {} # cache_id -> output path (for resolving inputs)
|
|
total_cached = 0
|
|
total_executed = 0
|
|
|
|
# Map input names to their cache_ids (inputs are their own cache_ids)
|
|
for name, cid in input_hashes.items():
|
|
cache_id_to_path[cid] = cache_manager.get_by_cid(cid)
|
|
|
|
for level in range(max_level + 1):
|
|
level_steps = steps_by_level.get(level, [])
|
|
if not level_steps:
|
|
continue
|
|
|
|
logger.info(f"Executing level {level}: {len(level_steps)} steps")
|
|
|
|
for step in level_steps:
|
|
self.update_state(
|
|
state='EXECUTING',
|
|
meta={
|
|
'step_id': step.step_id,
|
|
'step_type': step.node_type,
|
|
'level': level,
|
|
'cache_id': step.cache_id[:16],
|
|
}
|
|
)
|
|
|
|
# Check if cached using code-addressed cache_id
|
|
cached_path = cache_manager.get_by_cid(step.cache_id)
|
|
if cached_path and cached_path.exists():
|
|
logger.info(f"Step {step.step_id}: cached at {step.cache_id[:16]}...")
|
|
step_results[step.step_id] = {
|
|
"status": "cached",
|
|
"path": str(cached_path),
|
|
"cache_id": step.cache_id,
|
|
}
|
|
cache_id_to_path[step.cache_id] = cached_path
|
|
total_cached += 1
|
|
continue
|
|
|
|
# Execute the step
|
|
try:
|
|
# Resolve input paths from previous step cache_ids
|
|
input_paths = []
|
|
for input_ref in step.inputs:
|
|
# input_ref is a step_id - find its cache_id and path
|
|
input_step = next((s for s in plan.steps if s.step_id == input_ref), None)
|
|
if input_step:
|
|
input_cache_id = input_step.cache_id
|
|
input_path = cache_id_to_path.get(input_cache_id)
|
|
if input_path:
|
|
input_paths.append(Path(input_path))
|
|
else:
|
|
# Check if it's a source input
|
|
source_cid = step.config.get("cid")
|
|
if source_cid:
|
|
input_path = cache_manager.get_by_cid(source_cid)
|
|
if input_path:
|
|
input_paths.append(Path(input_path))
|
|
else:
|
|
# Direct CID reference (source node)
|
|
source_cid = input_hashes.get(input_ref) or step.config.get("cid")
|
|
if source_cid:
|
|
input_path = cache_manager.get_by_cid(source_cid)
|
|
if input_path:
|
|
input_paths.append(Path(input_path))
|
|
|
|
# Handle SOURCE nodes
|
|
if step.node_type == "SOURCE":
|
|
source_cid = step.config.get("cid")
|
|
if source_cid:
|
|
source_path = cache_manager.get_by_cid(source_cid)
|
|
if source_path:
|
|
step_results[step.step_id] = {
|
|
"status": "source",
|
|
"path": str(source_path),
|
|
"cache_id": step.cache_id,
|
|
"cid": source_cid,
|
|
}
|
|
cache_id_to_path[step.cache_id] = source_path
|
|
total_cached += 1
|
|
continue
|
|
else:
|
|
raise ValueError(f"Source content not found: {source_cid}")
|
|
|
|
# Get executor for this step type
|
|
executor = get_executor(step.node_type)
|
|
if not executor:
|
|
# Try effect executor
|
|
effect_name = step.config.get("effect")
|
|
if effect_name:
|
|
executor = get_executor(f"effect:{effect_name}")
|
|
|
|
if not executor:
|
|
raise ValueError(f"No executor for node type: {step.node_type}")
|
|
|
|
# Determine output path
|
|
output_dir = CACHE_DIR / "nodes" / step.cache_id
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
output_path = output_dir / "output.mkv"
|
|
|
|
# Execute
|
|
logger.info(f"Executing step {step.step_id} ({step.node_type}) with {len(input_paths)} inputs")
|
|
result_path = executor.execute(step.config, input_paths, output_path)
|
|
|
|
# Store result in cache under code-addressed cache_id
|
|
cached, content_cid = cache_manager.put(
|
|
result_path,
|
|
node_type=step.node_type,
|
|
node_id=step.cache_id, # Use cache_id as node_id
|
|
)
|
|
|
|
step_results[step.step_id] = {
|
|
"status": "executed",
|
|
"path": str(result_path),
|
|
"cache_id": step.cache_id,
|
|
"cid": content_cid,
|
|
"ipfs_cid": content_cid if content_cid.startswith("Qm") or content_cid.startswith("bafy") else None,
|
|
}
|
|
cache_id_to_path[step.cache_id] = result_path
|
|
total_executed += 1
|
|
|
|
logger.info(f"Step {step.step_id}: executed -> {content_cid[:16]}...")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Step {step.step_id} failed: {e}")
|
|
return {
|
|
"success": False,
|
|
"run_id": run_id,
|
|
"error": f"Step {step.step_id} failed: {e}",
|
|
"step_results": step_results,
|
|
"plan_cid": plan_cid,
|
|
"plan_sexp": plan_sexp,
|
|
}
|
|
|
|
# Get output from final step
|
|
output_step = next((s for s in plan.steps if s.step_id == plan.output_step_id), None)
|
|
output_cid = None
|
|
output_ipfs_cid = None
|
|
output_path = None
|
|
|
|
if output_step:
|
|
output_result = step_results.get(output_step.step_id, {})
|
|
output_cid = output_result.get("cid") or output_result.get("cache_id")
|
|
output_ipfs_cid = output_result.get("ipfs_cid")
|
|
output_path = output_result.get("path")
|
|
|
|
# ============ Phase 6: Store Results ============
|
|
logger.info("Phase 5: Storing results...")
|
|
|
|
# Store in database
|
|
import asyncio
|
|
import database
|
|
|
|
async def save_to_db():
|
|
if database.pool is None:
|
|
await database.init_db()
|
|
|
|
# Get actor_id from pending run
|
|
actor_id = None
|
|
pending = await database.get_pending_run(run_id) if run_id else None
|
|
if pending:
|
|
actor_id = pending.get("actor_id")
|
|
|
|
await database.save_run_cache(
|
|
run_id=run_id,
|
|
output_cid=output_cid,
|
|
recipe=recipe_name,
|
|
inputs=list(input_hashes.values()),
|
|
ipfs_cid=output_ipfs_cid,
|
|
actor_id=actor_id,
|
|
plan_cid=plan_cid,
|
|
)
|
|
|
|
# Save output as media for user
|
|
if actor_id and output_cid:
|
|
await database.save_item_metadata(
|
|
cid=output_cid,
|
|
actor_id=actor_id,
|
|
item_type="media",
|
|
description=f"Output from recipe: {recipe_name}",
|
|
source_type="recipe",
|
|
source_note=f"run_id: {run_id}",
|
|
)
|
|
|
|
# Complete pending run
|
|
if pending and run_id:
|
|
await database.complete_pending_run(run_id)
|
|
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
if loop.is_running():
|
|
asyncio.ensure_future(save_to_db())
|
|
else:
|
|
loop.run_until_complete(save_to_db())
|
|
except RuntimeError:
|
|
asyncio.run(save_to_db())
|
|
|
|
# Build and store provenance
|
|
provenance = {
|
|
"task_id": self.request.id,
|
|
"run_id": run_id,
|
|
"rendered_at": datetime.now(timezone.utc).isoformat(),
|
|
"recipe": recipe_name,
|
|
"recipe_sexp": recipe_sexp,
|
|
"plan_sexp": plan_sexp,
|
|
"plan_cid": plan_cid,
|
|
"output": {
|
|
"cid": output_cid,
|
|
"ipfs_cid": output_ipfs_cid,
|
|
},
|
|
"inputs": input_hashes,
|
|
"steps": {
|
|
step_id: {
|
|
"cache_id": result.get("cache_id"),
|
|
"cid": result.get("cid"),
|
|
"status": result.get("status"),
|
|
}
|
|
for step_id, result in step_results.items()
|
|
},
|
|
"execution": {
|
|
"total_steps": len(plan.steps),
|
|
"cached": total_cached,
|
|
"executed": total_executed,
|
|
}
|
|
}
|
|
|
|
provenance_cid = ipfs_client.add_json(provenance)
|
|
if provenance_cid:
|
|
logger.info(f"Stored provenance on IPFS: {provenance_cid}")
|
|
|
|
logger.info(f"Recipe execution complete: output={output_cid[:16] if output_cid else 'none'}...")
|
|
|
|
return {
|
|
"success": True,
|
|
"run_id": run_id,
|
|
"recipe": recipe_name,
|
|
"plan_cid": plan_cid,
|
|
"plan_sexp": plan_sexp,
|
|
"output_cid": output_cid,
|
|
"output_ipfs_cid": output_ipfs_cid,
|
|
"output_path": output_path,
|
|
"total_steps": len(plan.steps),
|
|
"cached": total_cached,
|
|
"executed": total_executed,
|
|
"step_results": step_results,
|
|
"provenance_cid": provenance_cid,
|
|
}
|
|
|
|
|
|
def build_effect_dag(input_hashes: List[str], effect_name: str) -> DAG:
|
|
"""
|
|
Build a simple DAG for applying an effect to inputs.
|
|
|
|
Args:
|
|
input_hashes: List of input content hashes
|
|
effect_name: Name of effect to apply (e.g., "dog", "identity")
|
|
|
|
Returns:
|
|
DAG ready for execution
|
|
"""
|
|
dag = DAG()
|
|
|
|
# Add source nodes for each input
|
|
source_ids = []
|
|
for i, cid in enumerate(input_hashes):
|
|
source_node = Node(
|
|
node_type=NodeType.SOURCE,
|
|
config={"cid": cid},
|
|
name=f"source_{i}",
|
|
)
|
|
dag.add_node(source_node)
|
|
source_ids.append(source_node.node_id)
|
|
|
|
# Add effect node
|
|
effect_node = Node(
|
|
node_type=f"effect:{effect_name}",
|
|
config={},
|
|
inputs=source_ids,
|
|
name=f"effect_{effect_name}",
|
|
)
|
|
dag.add_node(effect_node)
|
|
dag.set_output(effect_node.node_id)
|
|
|
|
return dag
|