- Identity effect is part of artdag package on GitHub - Other effects (dog) link to rose-ash effects repo with commit hash - Added effect_url field to RunStatus for proper URL storage - Extract repo_url from provenance instead of building it 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
181 lines
5.8 KiB
Python
181 lines
5.8 KiB
Python
"""
|
|
Art DAG Celery Tasks
|
|
|
|
Distributed rendering tasks for the Art DAG system.
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from celery import Task
|
|
from celery_app import app
|
|
|
|
# Add effects to path (use env var in Docker, fallback to home dir locally)
|
|
EFFECTS_PATH = Path(os.environ.get("EFFECTS_PATH", str(Path.home() / "artdag-effects")))
|
|
|
|
|
|
def get_effects_commit() -> str:
|
|
"""Get current git commit hash of effects repo."""
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "rev-parse", "HEAD"],
|
|
cwd=EFFECTS_PATH,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
if result.returncode == 0:
|
|
return result.stdout.strip()
|
|
except Exception:
|
|
pass
|
|
return "unknown"
|
|
|
|
|
|
sys.path.insert(0, str(EFFECTS_PATH / "dog"))
|
|
|
|
|
|
def file_hash(path: Path) -> str:
|
|
"""Compute SHA3-256 hash of a file."""
|
|
hasher = hashlib.sha3_256()
|
|
actual_path = path.resolve() if path.is_symlink() else path
|
|
with open(actual_path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(65536), b""):
|
|
hasher.update(chunk)
|
|
return hasher.hexdigest()
|
|
|
|
|
|
class RenderTask(Task):
|
|
"""Base task with provenance tracking."""
|
|
|
|
def on_success(self, retval, task_id, args, kwargs):
|
|
"""Record successful render."""
|
|
print(f"Task {task_id} completed: {retval}")
|
|
|
|
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
|
"""Record failed render."""
|
|
print(f"Task {task_id} failed: {exc}")
|
|
|
|
|
|
@app.task(base=RenderTask, bind=True)
|
|
def render_effect(self, input_hash: str, effect_name: str, output_name: str) -> dict:
|
|
"""
|
|
Render an effect on an input asset.
|
|
|
|
Args:
|
|
input_hash: SHA3-256 hash of input asset
|
|
effect_name: Name of effect (e.g., "dog", "identity")
|
|
output_name: Name for output asset
|
|
|
|
Returns:
|
|
Provenance record with output hash
|
|
"""
|
|
# Cache directory (shared between server and worker)
|
|
CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache")))
|
|
|
|
# Registry hashes (for effects/infra metadata only)
|
|
REGISTRY = {
|
|
"effect:dog": {
|
|
"hash": "d048fe313433eb4e38f0e24194ffae91b896ca3e6eed3e50b2cc37b7be495555"
|
|
},
|
|
"effect:identity": {
|
|
"hash": "640ea11ee881ebf4101af0a955439105ab11e763682b209e88ea08fc66e1cc03"
|
|
},
|
|
"infra:artdag": {
|
|
"hash": "96a5972de216aee12ec794dcad5f9360da2e676171eabf24a46dfe1ee5fee4b0"
|
|
},
|
|
"infra:giles-hp": {
|
|
"hash": "964bf6e69dc4e2493f42375013caffe26404ec3cf8eb5d9bc170cd42a361523b"
|
|
}
|
|
}
|
|
|
|
# Input comes from cache by hash
|
|
input_path = CACHE_DIR / input_hash
|
|
if not input_path.exists():
|
|
raise ValueError(f"Input not in cache: {input_hash}")
|
|
|
|
output_dir = CACHE_DIR
|
|
|
|
# Verify input
|
|
actual_hash = file_hash(input_path)
|
|
if actual_hash != input_hash:
|
|
raise ValueError(f"Input hash mismatch: expected {input_hash}, got {actual_hash}")
|
|
|
|
self.update_state(state='RENDERING', meta={'effect': effect_name, 'input': input_hash[:16]})
|
|
|
|
# Load and apply effect
|
|
if effect_name == "dog":
|
|
from effect import effect_dog, DOG_HASH
|
|
output_path = output_dir / f"{output_name}.mkv"
|
|
result = effect_dog(input_path, output_path, {})
|
|
expected_hash = DOG_HASH
|
|
elif effect_name == "identity":
|
|
from artdag.nodes.effect import effect_identity
|
|
output_path = output_dir / f"{output_name}{input_path.suffix}"
|
|
result = effect_identity(input_path, output_path, {})
|
|
expected_hash = input_hash
|
|
else:
|
|
raise ValueError(f"Unknown effect: {effect_name}")
|
|
|
|
# Verify output
|
|
output_hash = file_hash(result)
|
|
if output_hash != expected_hash:
|
|
raise ValueError(f"Output hash mismatch: expected {expected_hash}, got {output_hash}")
|
|
|
|
# Build effect info based on source
|
|
if effect_name == "identity":
|
|
# Identity is from artdag package on GitHub
|
|
effect_info = {
|
|
"name": f"effect:{effect_name}",
|
|
"content_hash": REGISTRY[f"effect:{effect_name}"]["hash"],
|
|
"repo": "github",
|
|
"repo_url": "https://github.com/gilesbradshaw/art-dag/blob/main/artdag/nodes/effect.py"
|
|
}
|
|
else:
|
|
# Other effects from rose-ash effects repo
|
|
effects_commit = get_effects_commit()
|
|
effect_info = {
|
|
"name": f"effect:{effect_name}",
|
|
"content_hash": REGISTRY[f"effect:{effect_name}"]["hash"],
|
|
"repo": "rose-ash",
|
|
"repo_commit": effects_commit,
|
|
"repo_url": f"https://git.rose-ash.com/art-dag/effects/src/commit/{effects_commit}/{effect_name}"
|
|
}
|
|
|
|
# Build provenance
|
|
provenance = {
|
|
"task_id": self.request.id,
|
|
"rendered_at": datetime.now(timezone.utc).isoformat(),
|
|
"rendered_by": "@giles@artdag.rose-ash.com",
|
|
"output": {
|
|
"name": output_name,
|
|
"content_hash": output_hash,
|
|
"local_path": str(result)
|
|
},
|
|
"inputs": [
|
|
{"content_hash": input_hash}
|
|
],
|
|
"effects": [effect_info],
|
|
"infrastructure": {
|
|
"software": {"name": "infra:artdag", "content_hash": REGISTRY["infra:artdag"]["hash"]},
|
|
"hardware": {"name": "infra:giles-hp", "content_hash": REGISTRY["infra:giles-hp"]["hash"]}
|
|
}
|
|
}
|
|
|
|
# Save provenance
|
|
provenance_path = result.with_suffix(".provenance.json")
|
|
with open(provenance_path, "w") as f:
|
|
json.dump(provenance, f, indent=2)
|
|
|
|
return provenance
|
|
|
|
|
|
@app.task
|
|
def render_dog_from_cat() -> dict:
|
|
"""Convenience task: render cat through dog effect."""
|
|
CAT_HASH = "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b"
|
|
return render_effect.delay(CAT_HASH, "dog", "dog-from-cat-celery").get()
|