commit 500e57b9a4f510da0271d7795cd77ce02d1d7985 Author: gilesb Date: Wed Jan 7 02:04:07 2026 +0000 feat: distributed rendering with Celery - celery_app.py: Celery configuration with Redis broker - tasks.py: render_effect task with full provenance tracking - render.py: CLI for submitting render jobs - Successfully renders cat → dog with provenance chain 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8a4864c --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +__pycache__/ +*.py[cod] +.pytest_cache/ +*.egg-info/ +.venv/ +venv/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..28bea9d --- /dev/null +++ b/README.md @@ -0,0 +1,72 @@ +# Art Celery + +Distributed rendering for the Art DAG system using Celery. + +## Dependencies + +- **artdag** (GitHub): Core DAG execution engine +- **artdag-effects** (rose-ash): Effect implementations +- **Redis**: Message broker and result backend + +## Setup + +```bash +# Install Redis +sudo apt install redis-server + +# Install Python dependencies +pip install -r requirements.txt + +# Start a worker +celery -A celery_app worker --loglevel=info +``` + +## Usage + +```bash +# Render cat through dog effect +python render.py dog cat --sync + +# Render cat through identity effect +python render.py identity cat --sync + +# Submit async (don't wait) +python render.py dog cat +``` + +## Architecture + +``` +render.py (CLI) + │ + ▼ +celery_app.py (Celery broker) + │ + ▼ +tasks.py (render_effect task) + │ + ├── artdag (GitHub) - DAG execution + └── artdag-effects (rose-ash) - Effect implementations + │ + ▼ + Provenance JSON + Output file +``` + +## Provenance + +Every render produces a provenance record: + +```json +{ + "task_id": "celery-task-uuid", + "rendered_at": "2026-01-07T...", + "rendered_by": "@giles@artdag.rose-ash.com", + "output": {"name": "...", "content_hash": "..."}, + "inputs": [...], + "effects": [...], + "infrastructure": { + "software": {"name": "infra:artdag", "content_hash": "..."}, + "hardware": {"name": "infra:giles-hp", "content_hash": "..."} + } +} +``` diff --git a/celery_app.py b/celery_app.py new file mode 100644 index 0000000..4f7aaf6 --- /dev/null +++ b/celery_app.py @@ -0,0 +1,31 @@ +""" +Art DAG Celery Application + +Distributed rendering for the Art DAG system. +Uses the foundational artdag language from GitHub. +""" + +from celery import Celery + +app = Celery( + 'art_celery', + broker='redis://localhost:6379/5', + backend='redis://localhost:6379/5', + include=['tasks'] +) + +app.conf.update( + result_expires=3600, + task_serializer='json', + accept_content=['json', 'pickle'], # pickle needed for internal Celery messages + result_serializer='json', + event_serializer='json', + timezone='UTC', + enable_utc=True, + task_track_started=True, + task_acks_late=True, + worker_prefetch_multiplier=1, +) + +if __name__ == '__main__': + app.start() diff --git a/check_redis.py b/check_redis.py new file mode 100644 index 0000000..44f70aa --- /dev/null +++ b/check_redis.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python3 +"""Check Redis connectivity.""" + +import redis + +try: + r = redis.Redis(host='localhost', port=6379, db=0) + r.ping() + print("Redis: OK") +except redis.ConnectionError: + print("Redis: Not running") + print("Start with: sudo systemctl start redis-server") diff --git a/render.py b/render.py new file mode 100755 index 0000000..ad24406 --- /dev/null +++ b/render.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 +""" +CLI to submit render tasks to Art DAG Celery. + +Usage: + python render.py dog cat # Render cat through dog effect + python render.py identity cat # Render cat through identity effect + python render.py # General form +""" + +import argparse +import json +import sys + +from tasks import render_effect + +# Known asset hashes +ASSETS = { + "cat": "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b", +} + + +def main(): + parser = argparse.ArgumentParser(description="Submit render task to Art DAG Celery") + parser.add_argument("effect", help="Effect to apply (e.g., dog, identity)") + parser.add_argument("input", help="Input asset name or hash") + parser.add_argument("--output", "-o", help="Output name (default: -from-)") + parser.add_argument("--sync", "-s", action="store_true", help="Wait for result") + + args = parser.parse_args() + + # Resolve input to hash + input_hash = ASSETS.get(args.input, args.input) + if len(input_hash) != 64: + print(f"Error: Unknown asset '{args.input}' and not a valid hash") + sys.exit(1) + + # Generate output name + output_name = args.output or f"{args.effect}-from-{args.input}-celery" + + print(f"Submitting render task:") + print(f" Effect: {args.effect}") + print(f" Input: {args.input} ({input_hash[:16]}...)") + print(f" Output: {output_name}") + + # Submit task + task = render_effect.delay(input_hash, args.effect, output_name) + print(f" Task ID: {task.id}") + + if args.sync: + print("\nWaiting for result...") + try: + result = task.get(timeout=300) + print("\nRender complete!") + print(json.dumps(result, indent=2)) + except Exception as e: + print(f"\nRender failed: {e}") + sys.exit(1) + else: + print("\nTask submitted. Check status with:") + print(f" celery -A celery_app inspect query_task {task.id}") + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..b5836ba --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +celery[redis]>=5.3.0 +redis>=5.0.0 +requests>=2.31.0 +# Core artdag from GitHub +git+https://github.com/gilesbradshaw/art-dag.git diff --git a/tasks.py b/tasks.py new file mode 100644 index 0000000..178be84 --- /dev/null +++ b/tasks.py @@ -0,0 +1,151 @@ +""" +Art DAG Celery Tasks + +Distributed rendering tasks for the Art DAG system. +""" + +import hashlib +import json +import sys +from datetime import datetime, timezone +from pathlib import Path + +from celery import Task +from celery_app import app + +# Add effects to path +EFFECTS_PATH = Path.home() / "artdag-effects" +sys.path.insert(0, str(EFFECTS_PATH / "dog")) + + +def file_hash(path: Path) -> str: + """Compute SHA3-256 hash of a file.""" + hasher = hashlib.sha3_256() + actual_path = path.resolve() if path.is_symlink() else path + with open(actual_path, "rb") as f: + for chunk in iter(lambda: f.read(65536), b""): + hasher.update(chunk) + return hasher.hexdigest() + + +class RenderTask(Task): + """Base task with provenance tracking.""" + + def on_success(self, retval, task_id, args, kwargs): + """Record successful render.""" + print(f"Task {task_id} completed: {retval}") + + def on_failure(self, exc, task_id, args, kwargs, einfo): + """Record failed render.""" + print(f"Task {task_id} failed: {exc}") + + +@app.task(base=RenderTask, bind=True) +def render_effect(self, input_hash: str, effect_name: str, output_name: str) -> dict: + """ + Render an effect on an input asset. + + Args: + input_hash: SHA3-256 hash of input asset + effect_name: Name of effect (e.g., "dog", "identity") + output_name: Name for output asset + + Returns: + Provenance record with output hash + """ + # Registry hashes + REGISTRY = { + "cat": { + "hash": "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b", + "path": Path.home() / "artdag-art" / "cat.jpg" + }, + "effect:dog": { + "hash": "d048fe313433eb4e38f0e24194ffae91b896ca3e6eed3e50b2cc37b7be495555" + }, + "effect:identity": { + "hash": "640ea11ee881ebf4101af0a955439105ab11e763682b209e88ea08fc66e1cc03" + }, + "infra:artdag": { + "hash": "96a5972de216aee12ec794dcad5f9360da2e676171eabf24a46dfe1ee5fee4b0" + }, + "infra:giles-hp": { + "hash": "964bf6e69dc4e2493f42375013caffe26404ec3cf8eb5d9bc170cd42a361523b" + } + } + + # Find input by hash + input_asset = None + input_name = None + for name, data in REGISTRY.items(): + if data.get("hash") == input_hash and "path" in data: + input_asset = data + input_name = name + break + + if not input_asset: + raise ValueError(f"Unknown input hash: {input_hash}") + + input_path = input_asset["path"] + output_dir = Path.home() / "artdag-art" + + # Verify input + actual_hash = file_hash(input_path) + if actual_hash != input_hash: + raise ValueError(f"Input hash mismatch: expected {input_hash}, got {actual_hash}") + + self.update_state(state='RENDERING', meta={'effect': effect_name, 'input': input_name}) + + # Load and apply effect + if effect_name == "dog": + from effect import effect_dog, DOG_HASH + output_path = output_dir / f"{output_name}.mkv" + result = effect_dog(input_path, output_path, {}) + expected_hash = DOG_HASH + elif effect_name == "identity": + from artdag.nodes.effect import effect_identity + output_path = output_dir / f"{output_name}{input_path.suffix}" + result = effect_identity(input_path, output_path, {}) + expected_hash = input_hash + else: + raise ValueError(f"Unknown effect: {effect_name}") + + # Verify output + output_hash = file_hash(result) + if output_hash != expected_hash: + raise ValueError(f"Output hash mismatch: expected {expected_hash}, got {output_hash}") + + # Build provenance + provenance = { + "task_id": self.request.id, + "rendered_at": datetime.now(timezone.utc).isoformat(), + "rendered_by": "@giles@artdag.rose-ash.com", + "output": { + "name": output_name, + "content_hash": output_hash, + "local_path": str(result) + }, + "inputs": [ + {"name": input_name, "content_hash": input_hash} + ], + "effects": [ + {"name": f"effect:{effect_name}", "content_hash": REGISTRY[f"effect:{effect_name}"]["hash"]} + ], + "infrastructure": { + "software": {"name": "infra:artdag", "content_hash": REGISTRY["infra:artdag"]["hash"]}, + "hardware": {"name": "infra:giles-hp", "content_hash": REGISTRY["infra:giles-hp"]["hash"]} + } + } + + # Save provenance + provenance_path = result.with_suffix(".provenance.json") + with open(provenance_path, "w") as f: + json.dump(provenance, f, indent=2) + + return provenance + + +@app.task +def render_dog_from_cat() -> dict: + """Convenience task: render cat through dog effect.""" + CAT_HASH = "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b" + return render_effect.delay(CAT_HASH, "dog", "dog-from-cat-celery").get()