""" Art DAG Celery Tasks Distributed rendering tasks for the Art DAG system. Supports both single-effect runs and multi-step DAG execution. """ import hashlib import json import logging import os import subprocess import sys from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional from celery import Task from celery_app import app # Import artdag components from artdag import DAG, Node, NodeType from artdag.engine import Engine from artdag.executor import register_executor, Executor, get_executor # Add effects to path (use env var in Docker, fallback to home dir locally) EFFECTS_PATH = Path(os.environ.get("EFFECTS_PATH", str(Path.home() / "artdag-effects"))) ARTDAG_PATH = Path(os.environ.get("ARTDAG_PATH", str(Path.home() / "art" / "artdag"))) logger = logging.getLogger(__name__) def get_effects_commit() -> str: """Get current git commit hash of effects repo.""" try: result = subprocess.run( ["git", "rev-parse", "HEAD"], cwd=EFFECTS_PATH, capture_output=True, text=True ) if result.returncode == 0: return result.stdout.strip() except Exception: pass return "unknown" def get_artdag_commit() -> str: """Get current git commit hash of artdag repo.""" try: result = subprocess.run( ["git", "rev-parse", "HEAD"], cwd=ARTDAG_PATH, capture_output=True, text=True ) if result.returncode == 0: return result.stdout.strip() except Exception: pass return "unknown" sys.path.insert(0, str(EFFECTS_PATH / "dog")) def file_hash(path: Path) -> str: """Compute SHA3-256 hash of a file.""" hasher = hashlib.sha3_256() actual_path = path.resolve() if path.is_symlink() else path with open(actual_path, "rb") as f: for chunk in iter(lambda: f.read(65536), b""): hasher.update(chunk) return hasher.hexdigest() # Cache directory (shared between server and worker) CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache"))) # ============ Executors for Effects ============ @register_executor("effect:dog") class DogExecutor(Executor): """Executor for the dog effect.""" def execute(self, config: Dict, inputs: List[Path], output_path: Path) -> Path: from effect import effect_dog if len(inputs) != 1: raise ValueError(f"Dog effect expects 1 input, got {len(inputs)}") return effect_dog(inputs[0], output_path, config) @register_executor("effect:identity") class IdentityExecutor(Executor): """Executor for the identity effect (passthrough).""" def execute(self, config: Dict, inputs: List[Path], output_path: Path) -> Path: from artdag.nodes.effect import effect_identity if len(inputs) != 1: raise ValueError(f"Identity effect expects 1 input, got {len(inputs)}") return effect_identity(inputs[0], output_path, config) @register_executor(NodeType.SOURCE) class SourceExecutor(Executor): """Executor for SOURCE nodes - loads content from cache by hash.""" def execute(self, config: Dict, inputs: List[Path], output_path: Path) -> Path: # Source nodes load from cache by content_hash content_hash = config.get("content_hash") if not content_hash: raise ValueError("SOURCE node requires content_hash in config") # Look up in cache source_path = CACHE_DIR / content_hash if not source_path.exists(): # Try nodes directory from cache_manager import get_cache_manager cache_manager = get_cache_manager() source_path = cache_manager.get_by_content_hash(content_hash) if not source_path or not source_path.exists(): raise ValueError(f"Source content not in cache: {content_hash}") # For source nodes, we just return the path (no transformation) # The engine will use this as input to subsequent nodes return source_path class RenderTask(Task): """Base task with provenance tracking.""" def on_success(self, retval, task_id, args, kwargs): """Record successful render.""" print(f"Task {task_id} completed: {retval}") def on_failure(self, exc, task_id, args, kwargs, einfo): """Record failed render.""" print(f"Task {task_id} failed: {exc}") @app.task(base=RenderTask, bind=True) def render_effect(self, input_hash: str, effect_name: str, output_name: str) -> dict: """ Render an effect on an input asset. Args: input_hash: SHA3-256 hash of input asset effect_name: Name of effect (e.g., "dog", "identity") output_name: Name for output asset Returns: Provenance record with output hash """ from cache_manager import get_cache_manager # Registry hashes (for effects/infra metadata only) REGISTRY = { "effect:dog": { "hash": "d048fe313433eb4e38f0e24194ffae91b896ca3e6eed3e50b2cc37b7be495555" }, "effect:identity": { "hash": "640ea11ee881ebf4101af0a955439105ab11e763682b209e88ea08fc66e1cc03" }, "infra:artdag": { "hash": "96a5972de216aee12ec794dcad5f9360da2e676171eabf24a46dfe1ee5fee4b0" }, "infra:giles-hp": { "hash": "964bf6e69dc4e2493f42375013caffe26404ec3cf8eb5d9bc170cd42a361523b" } } # Input comes from cache by hash (supports both legacy and new cache locations) cache_manager = get_cache_manager() input_path = cache_manager.get_by_content_hash(input_hash) if not input_path or not input_path.exists(): raise ValueError(f"Input not in cache: {input_hash}") output_dir = CACHE_DIR # Verify input actual_hash = file_hash(input_path) if actual_hash != input_hash: raise ValueError(f"Input hash mismatch: expected {input_hash}, got {actual_hash}") self.update_state(state='RENDERING', meta={'effect': effect_name, 'input': input_hash[:16]}) # Load and apply effect if effect_name == "dog": from effect import effect_dog, DOG_HASH output_path = output_dir / f"{output_name}.mkv" result = effect_dog(input_path, output_path, {}) expected_hash = DOG_HASH elif effect_name == "identity": from artdag.nodes.effect import effect_identity output_path = output_dir / f"{output_name}{input_path.suffix}" result = effect_identity(input_path, output_path, {}) expected_hash = input_hash else: raise ValueError(f"Unknown effect: {effect_name}") # Verify output output_hash = file_hash(result) if output_hash != expected_hash: raise ValueError(f"Output hash mismatch: expected {expected_hash}, got {output_hash}") # Build effect info based on source if effect_name == "identity": # Identity is from artdag package on GitHub artdag_commit = get_artdag_commit() effect_info = { "name": f"effect:{effect_name}", "content_hash": REGISTRY[f"effect:{effect_name}"]["hash"], "repo": "github", "repo_commit": artdag_commit, "repo_url": f"https://github.com/gilesbradshaw/art-dag/blob/{artdag_commit}/artdag/nodes/effect.py" } else: # Other effects from rose-ash effects repo effects_commit = get_effects_commit() effect_info = { "name": f"effect:{effect_name}", "content_hash": REGISTRY[f"effect:{effect_name}"]["hash"], "repo": "rose-ash", "repo_commit": effects_commit, "repo_url": f"https://git.rose-ash.com/art-dag/effects/src/commit/{effects_commit}/{effect_name}" } # Build provenance provenance = { "task_id": self.request.id, "rendered_at": datetime.now(timezone.utc).isoformat(), "rendered_by": "@giles@artdag.rose-ash.com", "output": { "name": output_name, "content_hash": output_hash, "local_path": str(result) }, "inputs": [ {"content_hash": input_hash} ], "effects": [effect_info], "infrastructure": { "software": {"name": "infra:artdag", "content_hash": REGISTRY["infra:artdag"]["hash"]}, "hardware": {"name": "infra:giles-hp", "content_hash": REGISTRY["infra:giles-hp"]["hash"]} } } # Save provenance provenance_path = result.with_suffix(".provenance.json") with open(provenance_path, "w") as f: json.dump(provenance, f, indent=2) return provenance @app.task def render_dog_from_cat() -> dict: """Convenience task: render cat through dog effect.""" CAT_HASH = "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b" return render_effect.delay(CAT_HASH, "dog", "dog-from-cat-celery").get() @app.task(base=RenderTask, bind=True) def execute_dag(self, dag_json: str, run_id: str = None) -> dict: """ Execute a multi-step DAG. Args: dag_json: Serialized DAG as JSON string run_id: Optional run ID for tracking Returns: Execution result with output hash and node results """ from cache_manager import get_cache_manager # Parse DAG try: dag = DAG.from_json(dag_json) except Exception as e: raise ValueError(f"Invalid DAG JSON: {e}") # Validate DAG errors = dag.validate() if errors: raise ValueError(f"Invalid DAG: {errors}") # Create engine with cache directory engine = Engine(CACHE_DIR / "nodes") # Set up progress callback def progress_callback(progress): self.update_state( state='EXECUTING', meta={ 'node_id': progress.node_id, 'node_type': progress.node_type, 'status': progress.status, 'progress': progress.progress, 'message': progress.message, } ) logger.info(f"DAG progress: {progress.node_id} - {progress.status} - {progress.message}") engine.set_progress_callback(progress_callback) # Execute DAG self.update_state(state='EXECUTING', meta={'status': 'starting', 'nodes': len(dag.nodes)}) result = engine.execute(dag) if not result.success: raise RuntimeError(f"DAG execution failed: {result.error}") # Get output hash cache_manager = get_cache_manager() output_hash = None if result.output_path and result.output_path.exists(): output_hash = file_hash(result.output_path) # Store in cache_manager for proper tracking cached = cache_manager.put(result.output_path, node_type="dag_output") # Record activity for deletion tracking input_hashes = [] for node_id, node in dag.nodes.items(): if node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE": content_hash = node.config.get("content_hash") if content_hash: input_hashes.append(content_hash) if input_hashes: cache_manager.record_simple_activity( input_hashes=input_hashes, output_hash=output_hash, run_id=run_id, ) # Build result return { "success": True, "run_id": run_id, "output_hash": output_hash, "output_path": str(result.output_path) if result.output_path else None, "execution_time": result.execution_time, "nodes_executed": result.nodes_executed, "nodes_cached": result.nodes_cached, "node_results": { node_id: str(path) for node_id, path in result.node_results.items() }, } def build_effect_dag(input_hashes: List[str], effect_name: str) -> DAG: """ Build a simple DAG for applying an effect to inputs. Args: input_hashes: List of input content hashes effect_name: Name of effect to apply (e.g., "dog", "identity") Returns: DAG ready for execution """ dag = DAG() # Add source nodes for each input source_ids = [] for i, content_hash in enumerate(input_hashes): source_node = Node( node_type=NodeType.SOURCE, config={"content_hash": content_hash}, name=f"source_{i}", ) dag.add_node(source_node) source_ids.append(source_node.node_id) # Add effect node effect_node = Node( node_type=f"effect:{effect_name}", config={}, inputs=source_ids, name=f"effect_{effect_name}", ) dag.add_node(effect_node) dag.set_output(effect_node.node_id) return dag