#!/usr/bin/env python3 """ Art DAG Client CLI for interacting with the Art DAG L1 server. """ import json import os import sys import time from pathlib import Path import click import requests DEFAULT_SERVER = os.environ.get("ARTDAG_SERVER", "http://localhost:8100") def get_server(): """Get server URL from env or default.""" return DEFAULT_SERVER def api_get(path: str): """GET request to server.""" resp = requests.get(f"{get_server()}{path}") resp.raise_for_status() return resp.json() def api_post(path: str, data: dict = None, params: dict = None): """POST request to server.""" resp = requests.post(f"{get_server()}{path}", json=data, params=params) resp.raise_for_status() return resp.json() @click.group() @click.option("--server", "-s", envvar="ARTDAG_SERVER", default=DEFAULT_SERVER, help="L1 server URL") @click.pass_context def cli(ctx, server): """Art DAG Client - interact with L1 rendering server.""" ctx.ensure_object(dict) ctx.obj["server"] = server global DEFAULT_SERVER DEFAULT_SERVER = server @cli.command() def info(): """Show server info.""" data = api_get("/") click.echo(f"Server: {get_server()}") click.echo(f"Name: {data['name']}") click.echo(f"Version: {data['version']}") click.echo(f"Cache: {data['cache_dir']}") click.echo(f"Runs: {data['runs_count']}") @cli.command() @click.argument("recipe") @click.argument("input_hash") @click.option("--name", "-n", help="Output name") @click.option("--wait", "-w", is_flag=True, help="Wait for completion") def run(recipe, input_hash, name, wait): """Start a rendering run. RECIPE: Effect/recipe to apply (e.g., dog, identity) INPUT_HASH: Content hash of input asset """ # Resolve named assets assets = api_get("/assets") if input_hash in assets: input_hash = assets[input_hash] click.echo(f"Resolved input to: {input_hash[:16]}...") data = { "recipe": recipe, "inputs": [input_hash], } if name: data["output_name"] = name result = api_post("/runs", data) run_id = result["run_id"] click.echo(f"Run started: {run_id}") click.echo(f"Status: {result['status']}") if wait: click.echo("Waiting for completion...") while True: status = api_get(f"/runs/{run_id}") if status["status"] in ("completed", "failed"): break time.sleep(1) click.echo(".", nl=False) click.echo() if status["status"] == "completed": click.echo(f"Completed!") click.echo(f"Output: {status['output_hash']}") else: click.echo(f"Failed: {status.get('error', 'Unknown error')}") @cli.command("runs") @click.option("--limit", "-l", default=10, help="Max runs to show") def list_runs(limit): """List all runs.""" runs = api_get("/runs") if not runs: click.echo("No runs found.") return click.echo(f"{'ID':<36} {'Status':<10} {'Recipe':<10} {'Output Hash':<20}") click.echo("-" * 80) for run in runs[:limit]: output = run.get("output_hash", "")[:16] + "..." if run.get("output_hash") else "-" click.echo(f"{run['run_id']} {run['status']:<10} {run['recipe']:<10} {output}") @cli.command() @click.argument("run_id") def status(run_id): """Get status of a run.""" try: run = api_get(f"/runs/{run_id}") except requests.HTTPError as e: if e.response.status_code == 404: click.echo(f"Run not found: {run_id}") return raise click.echo(f"Run ID: {run['run_id']}") click.echo(f"Status: {run['status']}") click.echo(f"Recipe: {run['recipe']}") click.echo(f"Inputs: {', '.join(run['inputs'])}") click.echo(f"Output Name: {run['output_name']}") click.echo(f"Created: {run['created_at']}") if run.get("completed_at"): click.echo(f"Completed: {run['completed_at']}") if run.get("output_hash"): click.echo(f"Output Hash: {run['output_hash']}") if run.get("error"): click.echo(f"Error: {run['error']}") @cli.command() @click.option("--limit", "-l", default=20, help="Max items to show") def cache(limit): """List cached content.""" items = api_get("/cache") if not items: click.echo("Cache is empty.") return click.echo(f"Cached content ({len(items)} items):") for item in items[:limit]: click.echo(f" {item}") @cli.command() @click.argument("content_hash") @click.option("--output", "-o", type=click.Path(), help="Save to file (use - for stdout)") def view(content_hash, output): """View or download cached content. Use -o - to pipe to stdout, e.g.: artdag view -o - | mpv - """ url = f"{get_server()}/cache/{content_hash}" try: if output == "-": # Stream to stdout for piping resp = requests.get(url, stream=True) resp.raise_for_status() for chunk in resp.iter_content(chunk_size=8192): sys.stdout.buffer.write(chunk) elif output: # Download to file resp = requests.get(url, stream=True) resp.raise_for_status() with open(output, "wb") as f: for chunk in resp.iter_content(chunk_size=8192): f.write(chunk) click.echo(f"Saved to: {output}", err=True) else: # Get info via GET with stream to check size resp = requests.get(url, stream=True) resp.raise_for_status() size = resp.headers.get("content-length", "unknown") content_type = resp.headers.get("content-type", "unknown") click.echo(f"Hash: {content_hash}") click.echo(f"Size: {size} bytes") click.echo(f"Type: {content_type}") click.echo(f"URL: {url}") resp.close() except requests.HTTPError as e: if e.response.status_code == 404: click.echo(f"Not found: {content_hash}", err=True) else: raise @cli.command("import") @click.argument("filepath", type=click.Path(exists=True)) def import_file(filepath): """Import a local file to cache (local server only).""" path = str(Path(filepath).resolve()) result = api_post("/cache/import", params={"path": path}) click.echo(f"Imported: {result['content_hash']}") @cli.command() @click.argument("filepath", type=click.Path(exists=True)) def upload(filepath): """Upload a file to cache (works with remote servers).""" with open(filepath, "rb") as f: files = {"file": (Path(filepath).name, f)} resp = requests.post(f"{get_server()}/cache/upload", files=files) resp.raise_for_status() result = resp.json() click.echo(f"Uploaded: {result['content_hash']}") click.echo(f"Size: {result['size']} bytes") @cli.command() def assets(): """List known assets.""" data = api_get("/assets") click.echo("Known assets:") for name, hash in data.items(): click.echo(f" {name}: {hash[:16]}...") @cli.command() @click.argument("run_id") @click.argument("output_name") @click.option("--l2", envvar="ARTDAG_L2", default="http://localhost:8200", help="L2 server URL") def publish(run_id, output_name, l2): """Publish an L1 run to L2 (register ownership). RUN_ID: The L1 run ID to publish OUTPUT_NAME: Name for the registered asset """ # Post to L2 server resp = requests.post( f"{l2}/registry/record-run", json={"run_id": run_id, "output_name": output_name} ) resp.raise_for_status() result = resp.json() click.echo(f"Published to L2!") click.echo(f"Asset: {result['asset']['name']}") click.echo(f"Hash: {result['asset']['content_hash']}") click.echo(f"Activity: {result['activity']['activity_id']}") if __name__ == "__main__": cli()