#!/usr/bin/env python3 """ CLI to submit render tasks to Art DAG Celery. Usage: python render.py dog cat # Render cat through dog effect python render.py identity cat # Render cat through identity effect python render.py # General form """ import argparse import json import sys from tasks import render_effect # Known asset hashes ASSETS = { "cat": "33268b6e167deaf018cc538de12dbe562612b33e89a749391cef855b320a269b", } def main(): parser = argparse.ArgumentParser(description="Submit render task to Art DAG Celery") parser.add_argument("effect", help="Effect to apply (e.g., dog, identity)") parser.add_argument("input", help="Input asset name or hash") parser.add_argument("--output", "-o", help="Output name (default: -from-)") parser.add_argument("--sync", "-s", action="store_true", help="Wait for result") args = parser.parse_args() # Resolve input to hash input_hash = ASSETS.get(args.input, args.input) if len(input_hash) != 64: print(f"Error: Unknown asset '{args.input}' and not a valid hash") sys.exit(1) # Generate output name output_name = args.output or f"{args.effect}-from-{args.input}-celery" print(f"Submitting render task:") print(f" Effect: {args.effect}") print(f" Input: {args.input} ({input_hash[:16]}...)") print(f" Output: {output_name}") # Submit task task = render_effect.delay(input_hash, args.effect, output_name) print(f" Task ID: {task.id}") if args.sync: print("\nWaiting for result...") try: result = task.get(timeout=300) print("\nRender complete!") print(json.dumps(result, indent=2)) except Exception as e: print(f"\nRender failed: {e}") sys.exit(1) else: print("\nTask submitted. Check status with:") print(f" celery -A celery_app inspect query_task {task.id}") if __name__ == "__main__": main()