feat: distributed rendering with Celery

- celery_app.py: Celery configuration with Redis broker
- tasks.py: render_effect task with full provenance tracking
- render.py: CLI for submitting render jobs
- Successfully renders cat → dog with provenance chain

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-07 02:04:07 +00:00
commit 500e57b9a4
7 changed files with 342 additions and 0 deletions

6
.gitignore vendored Normal file
View File

@@ -0,0 +1,6 @@
__pycache__/
*.py[cod]
.pytest_cache/
*.egg-info/
.venv/
venv/

72
README.md Normal file
View File

@@ -0,0 +1,72 @@
# Art Celery
Distributed rendering for the Art DAG system using Celery.
## Dependencies
- **artdag** (GitHub): Core DAG execution engine
- **artdag-effects** (rose-ash): Effect implementations
- **Redis**: Message broker and result backend
## Setup
```bash
# Install Redis
sudo apt install redis-server
# Install Python dependencies
pip install -r requirements.txt
# Start a worker
celery -A celery_app worker --loglevel=info
```
## Usage
```bash
# Render cat through dog effect
python render.py dog cat --sync
# Render cat through identity effect
python render.py identity cat --sync
# Submit async (don't wait)
python render.py dog cat
```
## Architecture
```
render.py (CLI)
celery_app.py (Celery broker)
tasks.py (render_effect task)
├── artdag (GitHub) - DAG execution
└── artdag-effects (rose-ash) - Effect implementations
Provenance JSON + Output file
```
## Provenance
Every render produces a provenance record:
```json
{
"task_id": "celery-task-uuid",
"rendered_at": "2026-01-07T...",
"rendered_by": "@giles@artdag.rose-ash.com",
"output": {"name": "...", "content_hash": "..."},
"inputs": [...],
"effects": [...],
"infrastructure": {
"software": {"name": "infra:artdag", "content_hash": "..."},
"hardware": {"name": "infra:giles-hp", "content_hash": "..."}
}
}
```

31
celery_app.py Normal file
View File

@@ -0,0 +1,31 @@
"""
Art DAG Celery Application
Distributed rendering for the Art DAG system.
Uses the foundational artdag language from GitHub.
"""
from celery import Celery
app = Celery(
'art_celery',
broker='redis://localhost:6379/5',
backend='redis://localhost:6379/5',
include=['tasks']
)
app.conf.update(
result_expires=3600,
task_serializer='json',
accept_content=['json', 'pickle'], # pickle needed for internal Celery messages
result_serializer='json',
event_serializer='json',
timezone='UTC',
enable_utc=True,
task_track_started=True,
task_acks_late=True,
worker_prefetch_multiplier=1,
)
if __name__ == '__main__':
app.start()

12
check_redis.py Normal file
View File

@@ -0,0 +1,12 @@
#!/usr/bin/env python3
"""Check Redis connectivity."""
import redis
try:
r = redis.Redis(host='localhost', port=6379, db=0)
r.ping()
print("Redis: OK")
except redis.ConnectionError:
print("Redis: Not running")
print("Start with: sudo systemctl start redis-server")

65
render.py Executable file
View File

@@ -0,0 +1,65 @@
#!/usr/bin/env python3
"""
CLI to submit render tasks to Art DAG Celery.
Usage:
python render.py dog cat # Render cat through dog effect
python render.py identity cat # Render cat through identity effect
python render.py <effect> <input> # General form
"""
import argparse
import json
import sys
from tasks import render_effect
# Known asset hashes
ASSETS = {
"cat": "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b",
}
def main():
parser = argparse.ArgumentParser(description="Submit render task to Art DAG Celery")
parser.add_argument("effect", help="Effect to apply (e.g., dog, identity)")
parser.add_argument("input", help="Input asset name or hash")
parser.add_argument("--output", "-o", help="Output name (default: <effect>-from-<input>)")
parser.add_argument("--sync", "-s", action="store_true", help="Wait for result")
args = parser.parse_args()
# Resolve input to hash
input_hash = ASSETS.get(args.input, args.input)
if len(input_hash) != 64:
print(f"Error: Unknown asset '{args.input}' and not a valid hash")
sys.exit(1)
# Generate output name
output_name = args.output or f"{args.effect}-from-{args.input}-celery"
print(f"Submitting render task:")
print(f" Effect: {args.effect}")
print(f" Input: {args.input} ({input_hash[:16]}...)")
print(f" Output: {output_name}")
# Submit task
task = render_effect.delay(input_hash, args.effect, output_name)
print(f" Task ID: {task.id}")
if args.sync:
print("\nWaiting for result...")
try:
result = task.get(timeout=300)
print("\nRender complete!")
print(json.dumps(result, indent=2))
except Exception as e:
print(f"\nRender failed: {e}")
sys.exit(1)
else:
print("\nTask submitted. Check status with:")
print(f" celery -A celery_app inspect query_task {task.id}")
if __name__ == "__main__":
main()

5
requirements.txt Normal file
View File

@@ -0,0 +1,5 @@
celery[redis]>=5.3.0
redis>=5.0.0
requests>=2.31.0
# Core artdag from GitHub
git+https://github.com/gilesbradshaw/art-dag.git

151
tasks.py Normal file
View File

@@ -0,0 +1,151 @@
"""
Art DAG Celery Tasks
Distributed rendering tasks for the Art DAG system.
"""
import hashlib
import json
import sys
from datetime import datetime, timezone
from pathlib import Path
from celery import Task
from celery_app import app
# Add effects to path
EFFECTS_PATH = Path.home() / "artdag-effects"
sys.path.insert(0, str(EFFECTS_PATH / "dog"))
def file_hash(path: Path) -> str:
"""Compute SHA3-256 hash of a file."""
hasher = hashlib.sha3_256()
actual_path = path.resolve() if path.is_symlink() else path
with open(actual_path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
hasher.update(chunk)
return hasher.hexdigest()
class RenderTask(Task):
"""Base task with provenance tracking."""
def on_success(self, retval, task_id, args, kwargs):
"""Record successful render."""
print(f"Task {task_id} completed: {retval}")
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Record failed render."""
print(f"Task {task_id} failed: {exc}")
@app.task(base=RenderTask, bind=True)
def render_effect(self, input_hash: str, effect_name: str, output_name: str) -> dict:
"""
Render an effect on an input asset.
Args:
input_hash: SHA3-256 hash of input asset
effect_name: Name of effect (e.g., "dog", "identity")
output_name: Name for output asset
Returns:
Provenance record with output hash
"""
# Registry hashes
REGISTRY = {
"cat": {
"hash": "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b",
"path": Path.home() / "artdag-art" / "cat.jpg"
},
"effect:dog": {
"hash": "d048fe313433eb4e38f0e24194ffae91b896ca3e6eed3e50b2cc37b7be495555"
},
"effect:identity": {
"hash": "640ea11ee881ebf4101af0a955439105ab11e763682b209e88ea08fc66e1cc03"
},
"infra:artdag": {
"hash": "96a5972de216aee12ec794dcad5f9360da2e676171eabf24a46dfe1ee5fee4b0"
},
"infra:giles-hp": {
"hash": "964bf6e69dc4e2493f42375013caffe26404ec3cf8eb5d9bc170cd42a361523b"
}
}
# Find input by hash
input_asset = None
input_name = None
for name, data in REGISTRY.items():
if data.get("hash") == input_hash and "path" in data:
input_asset = data
input_name = name
break
if not input_asset:
raise ValueError(f"Unknown input hash: {input_hash}")
input_path = input_asset["path"]
output_dir = Path.home() / "artdag-art"
# Verify input
actual_hash = file_hash(input_path)
if actual_hash != input_hash:
raise ValueError(f"Input hash mismatch: expected {input_hash}, got {actual_hash}")
self.update_state(state='RENDERING', meta={'effect': effect_name, 'input': input_name})
# Load and apply effect
if effect_name == "dog":
from effect import effect_dog, DOG_HASH
output_path = output_dir / f"{output_name}.mkv"
result = effect_dog(input_path, output_path, {})
expected_hash = DOG_HASH
elif effect_name == "identity":
from artdag.nodes.effect import effect_identity
output_path = output_dir / f"{output_name}{input_path.suffix}"
result = effect_identity(input_path, output_path, {})
expected_hash = input_hash
else:
raise ValueError(f"Unknown effect: {effect_name}")
# Verify output
output_hash = file_hash(result)
if output_hash != expected_hash:
raise ValueError(f"Output hash mismatch: expected {expected_hash}, got {output_hash}")
# Build provenance
provenance = {
"task_id": self.request.id,
"rendered_at": datetime.now(timezone.utc).isoformat(),
"rendered_by": "@giles@artdag.rose-ash.com",
"output": {
"name": output_name,
"content_hash": output_hash,
"local_path": str(result)
},
"inputs": [
{"name": input_name, "content_hash": input_hash}
],
"effects": [
{"name": f"effect:{effect_name}", "content_hash": REGISTRY[f"effect:{effect_name}"]["hash"]}
],
"infrastructure": {
"software": {"name": "infra:artdag", "content_hash": REGISTRY["infra:artdag"]["hash"]},
"hardware": {"name": "infra:giles-hp", "content_hash": REGISTRY["infra:giles-hp"]["hash"]}
}
}
# Save provenance
provenance_path = result.with_suffix(".provenance.json")
with open(provenance_path, "w") as f:
json.dump(provenance, f, indent=2)
return provenance
@app.task
def render_dog_from_cat() -> dict:
"""Convenience task: render cat through dog effect."""
CAT_HASH = "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b"
return render_effect.delay(CAT_HASH, "dog", "dog-from-cat-celery").get()