Files
celery/tasks.py

144 lines
4.7 KiB
Python

"""
Art DAG Celery Tasks
Distributed rendering tasks for the Art DAG system.
"""
import hashlib
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from celery import Task
from celery_app import app
# Add effects to path (use env var in Docker, fallback to home dir locally)
EFFECTS_PATH = Path(os.environ.get("EFFECTS_PATH", str(Path.home() / "artdag-effects")))
sys.path.insert(0, str(EFFECTS_PATH / "dog"))
def file_hash(path: Path) -> str:
"""Compute SHA3-256 hash of a file."""
hasher = hashlib.sha3_256()
actual_path = path.resolve() if path.is_symlink() else path
with open(actual_path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
hasher.update(chunk)
return hasher.hexdigest()
class RenderTask(Task):
"""Base task with provenance tracking."""
def on_success(self, retval, task_id, args, kwargs):
"""Record successful render."""
print(f"Task {task_id} completed: {retval}")
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Record failed render."""
print(f"Task {task_id} failed: {exc}")
@app.task(base=RenderTask, bind=True)
def render_effect(self, input_hash: str, effect_name: str, output_name: str) -> dict:
"""
Render an effect on an input asset.
Args:
input_hash: SHA3-256 hash of input asset
effect_name: Name of effect (e.g., "dog", "identity")
output_name: Name for output asset
Returns:
Provenance record with output hash
"""
# Cache directory (shared between server and worker)
CACHE_DIR = Path(os.environ.get("CACHE_DIR", str(Path.home() / ".artdag" / "cache")))
# Registry hashes (for effects/infra metadata only)
REGISTRY = {
"effect:dog": {
"hash": "d048fe313433eb4e38f0e24194ffae91b896ca3e6eed3e50b2cc37b7be495555"
},
"effect:identity": {
"hash": "640ea11ee881ebf4101af0a955439105ab11e763682b209e88ea08fc66e1cc03"
},
"infra:artdag": {
"hash": "96a5972de216aee12ec794dcad5f9360da2e676171eabf24a46dfe1ee5fee4b0"
},
"infra:giles-hp": {
"hash": "964bf6e69dc4e2493f42375013caffe26404ec3cf8eb5d9bc170cd42a361523b"
}
}
# Input comes from cache by hash
input_path = CACHE_DIR / input_hash
if not input_path.exists():
raise ValueError(f"Input not in cache: {input_hash}")
output_dir = CACHE_DIR
# Verify input
actual_hash = file_hash(input_path)
if actual_hash != input_hash:
raise ValueError(f"Input hash mismatch: expected {input_hash}, got {actual_hash}")
self.update_state(state='RENDERING', meta={'effect': effect_name, 'input': input_hash[:16]})
# Load and apply effect
if effect_name == "dog":
from effect import effect_dog, DOG_HASH
output_path = output_dir / f"{output_name}.mkv"
result = effect_dog(input_path, output_path, {})
expected_hash = DOG_HASH
elif effect_name == "identity":
from artdag.nodes.effect import effect_identity
output_path = output_dir / f"{output_name}{input_path.suffix}"
result = effect_identity(input_path, output_path, {})
expected_hash = input_hash
else:
raise ValueError(f"Unknown effect: {effect_name}")
# Verify output
output_hash = file_hash(result)
if output_hash != expected_hash:
raise ValueError(f"Output hash mismatch: expected {expected_hash}, got {output_hash}")
# Build provenance
provenance = {
"task_id": self.request.id,
"rendered_at": datetime.now(timezone.utc).isoformat(),
"rendered_by": "@giles@artdag.rose-ash.com",
"output": {
"name": output_name,
"content_hash": output_hash,
"local_path": str(result)
},
"inputs": [
{"content_hash": input_hash}
],
"effects": [
{"name": f"effect:{effect_name}", "content_hash": REGISTRY[f"effect:{effect_name}"]["hash"]}
],
"infrastructure": {
"software": {"name": "infra:artdag", "content_hash": REGISTRY["infra:artdag"]["hash"]},
"hardware": {"name": "infra:giles-hp", "content_hash": REGISTRY["infra:giles-hp"]["hash"]}
}
}
# Save provenance
provenance_path = result.with_suffix(".provenance.json")
with open(provenance_path, "w") as f:
json.dump(provenance, f, indent=2)
return provenance
@app.task
def render_dog_from_cat() -> dict:
"""Convenience task: render cat through dog effect."""
CAT_HASH = "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b"
return render_effect.delay(CAT_HASH, "dog", "dog-from-cat-celery").get()