""" Art DAG Celery Tasks Distributed rendering tasks for the Art DAG system. """ import hashlib import json import os import subprocess import sys from datetime import datetime, timezone from pathlib import Path from celery import Task from celery_app import app # Add effects to path (use env var in Docker, fallback to home dir locally) EFFECTS_PATH = Path(os.environ.get("EFFECTS_PATH", str(Path.home() / "artdag-effects"))) ARTDAG_PATH = Path(os.environ.get("ARTDAG_PATH", str(Path.home() / "art" / "artdag"))) def get_effects_commit() -> str: """Get current git commit hash of effects repo.""" try: result = subprocess.run( ["git", "rev-parse", "HEAD"], cwd=EFFECTS_PATH, capture_output=True, text=True ) if result.returncode == 0: return result.stdout.strip() except Exception: pass return "unknown" def get_artdag_commit() -> str: """Get current git commit hash of artdag repo.""" try: result = subprocess.run( ["git", "rev-parse", "HEAD"], cwd=ARTDAG_PATH, capture_output=True, text=True ) if result.returncode == 0: return result.stdout.strip() except Exception: pass return "unknown" sys.path.insert(0, str(EFFECTS_PATH / "dog")) def file_hash(path: Path) -> str: """Compute SHA3-256 hash of a file.""" hasher = hashlib.sha3_256() actual_path = path.resolve() if path.is_symlink() else path with open(actual_path, "rb") as f: for chunk in iter(lambda: f.read(65536), b""): hasher.update(chunk) return hasher.hexdigest() class RenderTask(Task): """Base task with provenance tracking.""" def on_success(self, retval, task_id, args, kwargs): """Record successful render.""" print(f"Task {task_id} completed: {retval}") def on_failure(self, exc, task_id, args, kwargs, einfo): """Record failed render.""" print(f"Task {task_id} failed: {exc}") @app.task(base=RenderTask, bind=True) def render_effect(self, input_hash: str, effect_name: str, output_name: str) -> dict: """ Render an effect on an input asset. Args: input_hash: SHA3-256 hash of input asset effect_name: Name of effect (e.g., "dog", "identity") output_name: Name for output asset Returns: Provenance record with output hash """ # Cache directory (shared between server and worker) CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache"))) # Registry hashes (for effects/infra metadata only) REGISTRY = { "effect:dog": { "hash": "d048fe313433eb4e38f0e24194ffae91b896ca3e6eed3e50b2cc37b7be495555" }, "effect:identity": { "hash": "640ea11ee881ebf4101af0a955439105ab11e763682b209e88ea08fc66e1cc03" }, "infra:artdag": { "hash": "96a5972de216aee12ec794dcad5f9360da2e676171eabf24a46dfe1ee5fee4b0" }, "infra:giles-hp": { "hash": "964bf6e69dc4e2493f42375013caffe26404ec3cf8eb5d9bc170cd42a361523b" } } # Input comes from cache by hash input_path = CACHE_DIR / input_hash if not input_path.exists(): raise ValueError(f"Input not in cache: {input_hash}") output_dir = CACHE_DIR # Verify input actual_hash = file_hash(input_path) if actual_hash != input_hash: raise ValueError(f"Input hash mismatch: expected {input_hash}, got {actual_hash}") self.update_state(state='RENDERING', meta={'effect': effect_name, 'input': input_hash[:16]}) # Load and apply effect if effect_name == "dog": from effect import effect_dog, DOG_HASH output_path = output_dir / f"{output_name}.mkv" result = effect_dog(input_path, output_path, {}) expected_hash = DOG_HASH elif effect_name == "identity": from artdag.nodes.effect import effect_identity output_path = output_dir / f"{output_name}{input_path.suffix}" result = effect_identity(input_path, output_path, {}) expected_hash = input_hash else: raise ValueError(f"Unknown effect: {effect_name}") # Verify output output_hash = file_hash(result) if output_hash != expected_hash: raise ValueError(f"Output hash mismatch: expected {expected_hash}, got {output_hash}") # Build effect info based on source if effect_name == "identity": # Identity is from artdag package on GitHub artdag_commit = get_artdag_commit() effect_info = { "name": f"effect:{effect_name}", "content_hash": REGISTRY[f"effect:{effect_name}"]["hash"], "repo": "github", "repo_commit": artdag_commit, "repo_url": f"https://github.com/gilesbradshaw/art-dag/blob/{artdag_commit}/artdag/nodes/effect.py" } else: # Other effects from rose-ash effects repo effects_commit = get_effects_commit() effect_info = { "name": f"effect:{effect_name}", "content_hash": REGISTRY[f"effect:{effect_name}"]["hash"], "repo": "rose-ash", "repo_commit": effects_commit, "repo_url": f"https://git.rose-ash.com/art-dag/effects/src/commit/{effects_commit}/{effect_name}" } # Build provenance provenance = { "task_id": self.request.id, "rendered_at": datetime.now(timezone.utc).isoformat(), "rendered_by": "@giles@artdag.rose-ash.com", "output": { "name": output_name, "content_hash": output_hash, "local_path": str(result) }, "inputs": [ {"content_hash": input_hash} ], "effects": [effect_info], "infrastructure": { "software": {"name": "infra:artdag", "content_hash": REGISTRY["infra:artdag"]["hash"]}, "hardware": {"name": "infra:giles-hp", "content_hash": REGISTRY["infra:giles-hp"]["hash"]} } } # Save provenance provenance_path = result.with_suffix(".provenance.json") with open(provenance_path, "w") as f: json.dump(provenance, f, indent=2) return provenance @app.task def render_dog_from_cat() -> dict: """Convenience task: render cat through dog effect.""" CAT_HASH = "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b" return render_effect.delay(CAT_HASH, "dog", "dog-from-cat-celery").get()