Files
client/artdag.py
gilesb d185683e93 Add config CLI commands
- Add PyYAML dependency
- Add upload-config: upload config YAML files
- Add configs: list uploaded configs
- Add config: show config details
- Add run-config: run config with variable inputs
- Add delete-config: delete configs (with pinning check)

Example workflow:
  artdag upload-config recipe.yaml
  artdag configs
  artdag run-config <config_id> -i node_id:content_hash --wait

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-08 03:18:17 +00:00

1095 lines
36 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Art DAG Client
CLI for interacting with the Art DAG L1 server.
"""
import json
import os
import sys
import time
from pathlib import Path
import click
import requests
import yaml
DEFAULT_SERVER = os.environ.get("ARTDAG_SERVER", "http://localhost:8100")
DEFAULT_L2_SERVER = os.environ.get("ARTDAG_L2", "http://localhost:8200")
CONFIG_DIR = Path.home() / ".artdag"
TOKEN_FILE = CONFIG_DIR / "token.json"
def get_server():
"""Get server URL from env or default."""
return DEFAULT_SERVER
def get_l2_server():
"""Get L2 server URL."""
return DEFAULT_L2_SERVER
def load_token() -> dict:
"""Load saved token from config."""
if TOKEN_FILE.exists():
with open(TOKEN_FILE) as f:
return json.load(f)
return {}
def save_token(token_data: dict):
"""Save token to config."""
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
with open(TOKEN_FILE, "w") as f:
json.dump(token_data, f, indent=2)
TOKEN_FILE.chmod(0o600)
def clear_token():
"""Clear saved token."""
if TOKEN_FILE.exists():
TOKEN_FILE.unlink()
def get_auth_header() -> dict:
"""Get Authorization header if token exists."""
token_data = load_token()
token = token_data.get("access_token")
if token:
return {"Authorization": f"Bearer {token}"}
return {}
def api_get(path: str, auth: bool = False):
"""GET request to server."""
headers = get_auth_header() if auth else {}
resp = requests.get(f"{get_server()}{path}", headers=headers)
resp.raise_for_status()
return resp.json()
def api_post(path: str, data: dict = None, params: dict = None, auth: bool = False):
"""POST request to server."""
headers = get_auth_header() if auth else {}
resp = requests.post(f"{get_server()}{path}", json=data, params=params, headers=headers)
resp.raise_for_status()
return resp.json()
@click.group()
@click.option("--server", "-s", envvar="ARTDAG_SERVER", default=DEFAULT_SERVER,
help="L1 server URL")
@click.option("--l2", envvar="ARTDAG_L2", default=DEFAULT_L2_SERVER,
help="L2 server URL")
@click.pass_context
def cli(ctx, server, l2):
"""Art DAG Client - interact with L1 rendering server."""
ctx.ensure_object(dict)
ctx.obj["server"] = server
ctx.obj["l2"] = l2
global DEFAULT_SERVER, DEFAULT_L2_SERVER
DEFAULT_SERVER = server
DEFAULT_L2_SERVER = l2
# ============ Auth Commands ============
@cli.command()
@click.argument("username")
@click.option("--password", "-p", prompt=True, hide_input=True)
def login(username, password):
"""Login to get access token."""
try:
resp = requests.post(
f"{get_l2_server()}/auth/login",
json={"username": username, "password": password}
)
if resp.status_code == 200:
token_data = resp.json()
save_token(token_data)
click.echo(f"Logged in as {token_data['username']}")
click.echo(f"Token expires: {token_data['expires_at']}")
else:
click.echo(f"Login failed: {resp.json().get('detail', 'Unknown error')}", err=True)
sys.exit(1)
except requests.RequestException as e:
click.echo(f"Login failed: {e}", err=True)
sys.exit(1)
@cli.command()
@click.argument("username")
@click.option("--password", "-p", prompt=True, hide_input=True, confirmation_prompt=True)
@click.option("--email", "-e", default=None, help="Email (optional)")
def register(username, password, email):
"""Register a new account."""
try:
resp = requests.post(
f"{get_l2_server()}/auth/register",
json={"username": username, "password": password, "email": email}
)
if resp.status_code == 200:
token_data = resp.json()
save_token(token_data)
click.echo(f"Registered and logged in as {token_data['username']}")
else:
click.echo(f"Registration failed: {resp.json().get('detail', 'Unknown error')}", err=True)
sys.exit(1)
except requests.RequestException as e:
click.echo(f"Registration failed: {e}", err=True)
sys.exit(1)
@cli.command()
def logout():
"""Logout (clear saved token)."""
clear_token()
click.echo("Logged out")
@cli.command()
def whoami():
"""Show current logged-in user."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in")
return
try:
resp = requests.get(
f"{get_l2_server()}/auth/me",
headers={"Authorization": f"Bearer {token_data['access_token']}"}
)
if resp.status_code == 200:
user = resp.json()
click.echo(f"Username: {user['username']}")
click.echo(f"Created: {user['created_at']}")
if user.get('email'):
click.echo(f"Email: {user['email']}")
else:
click.echo("Token invalid or expired. Please login again.", err=True)
clear_token()
except requests.RequestException as e:
click.echo(f"Error: {e}", err=True)
# ============ Server Commands ============
@cli.command()
def info():
"""Show server info."""
data = api_get("/")
click.echo(f"Server: {get_server()}")
click.echo(f"Name: {data['name']}")
click.echo(f"Version: {data['version']}")
click.echo(f"Cache: {data['cache_dir']}")
click.echo(f"Runs: {data['runs_count']}")
@cli.command()
@click.argument("recipe")
@click.argument("input_hash")
@click.option("--name", "-n", help="Output name")
@click.option("--wait", "-w", is_flag=True, help="Wait for completion")
def run(recipe, input_hash, name, wait):
"""Start a rendering run. Requires login.
RECIPE: Effect/recipe to apply (e.g., dog, identity)
INPUT_HASH: Content hash of input asset
"""
# Check auth
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
# Resolve named assets
assets = api_get("/assets")
if input_hash in assets:
input_hash = assets[input_hash]
click.echo(f"Resolved input to: {input_hash[:16]}...")
data = {
"recipe": recipe,
"inputs": [input_hash],
}
if name:
data["output_name"] = name
try:
result = api_post("/runs", data, auth=True)
except requests.HTTPError as e:
if e.response.status_code == 401:
click.echo("Authentication failed. Please login again.", err=True)
sys.exit(1)
raise
run_id = result["run_id"]
click.echo(f"Run started: {run_id}")
click.echo(f"Status: {result['status']}")
if wait:
click.echo("Waiting for completion...")
while True:
status = api_get(f"/runs/{run_id}")
if status["status"] in ("completed", "failed"):
break
time.sleep(1)
click.echo(".", nl=False)
click.echo()
if status["status"] == "completed":
click.echo(f"Completed!")
click.echo(f"Output: {status['output_hash']}")
else:
click.echo(f"Failed: {status.get('error', 'Unknown error')}")
@cli.command("runs")
@click.option("--limit", "-l", default=10, help="Max runs to show")
def list_runs(limit):
"""List all runs."""
runs = api_get("/runs")
if not runs:
click.echo("No runs found.")
return
click.echo(f"{'ID':<36} {'Status':<10} {'Recipe':<10} {'Output Hash':<20}")
click.echo("-" * 80)
for run in runs[:limit]:
output = run.get("output_hash", "")[:16] + "..." if run.get("output_hash") else "-"
click.echo(f"{run['run_id']} {run['status']:<10} {run['recipe']:<10} {output}")
@cli.command()
@click.argument("run_id")
def status(run_id):
"""Get status of a run."""
try:
run = api_get(f"/runs/{run_id}")
except requests.HTTPError as e:
if e.response.status_code == 404:
click.echo(f"Run not found: {run_id}")
return
raise
click.echo(f"Run ID: {run['run_id']}")
click.echo(f"Status: {run['status']}")
click.echo(f"Recipe: {run['recipe']}")
click.echo(f"Inputs: {', '.join(run['inputs'])}")
click.echo(f"Output Name: {run['output_name']}")
click.echo(f"Created: {run['created_at']}")
if run.get("completed_at"):
click.echo(f"Completed: {run['completed_at']}")
if run.get("output_hash"):
click.echo(f"Output Hash: {run['output_hash']}")
if run.get("error"):
click.echo(f"Error: {run['error']}")
@cli.command("delete-run")
@click.argument("run_id")
@click.option("--force", "-f", is_flag=True, help="Skip confirmation")
def delete_run(run_id, force):
"""Delete a run. Requires login.
RUN_ID: The run ID to delete
"""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
# Get run info first
try:
run = api_get(f"/runs/{run_id}")
except requests.HTTPError as e:
if e.response.status_code == 404:
click.echo(f"Run not found: {run_id}", err=True)
sys.exit(1)
raise
if not force:
click.echo(f"Run: {run_id}")
click.echo(f"Status: {run['status']}")
click.echo(f"Recipe: {run['recipe']}")
if not click.confirm("Delete this run?"):
click.echo("Cancelled.")
return
try:
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
resp = requests.delete(f"{get_server()}/runs/{run_id}", headers=headers)
if resp.status_code == 400:
click.echo(f"Cannot delete: {resp.json().get('detail', 'Unknown error')}", err=True)
sys.exit(1)
if resp.status_code == 403:
click.echo("Access denied", err=True)
sys.exit(1)
if resp.status_code == 404:
click.echo(f"Run not found: {run_id}", err=True)
sys.exit(1)
resp.raise_for_status()
except requests.RequestException as e:
click.echo(f"Failed to delete run: {e}", err=True)
sys.exit(1)
click.echo(f"Deleted run: {run_id}")
@cli.command("delete-cache")
@click.argument("content_hash")
@click.option("--force", "-f", is_flag=True, help="Skip confirmation")
def delete_cache(content_hash, force):
"""Delete a cached item. Requires login.
CONTENT_HASH: The content hash to delete
"""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
if not force:
click.echo(f"Content hash: {content_hash}")
if not click.confirm("Delete this cached item?"):
click.echo("Cancelled.")
return
try:
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
resp = requests.delete(f"{get_server()}/cache/{content_hash}", headers=headers)
if resp.status_code == 400:
click.echo(f"Cannot delete: {resp.json().get('detail', 'Unknown error')}", err=True)
sys.exit(1)
if resp.status_code == 403:
click.echo("Access denied", err=True)
sys.exit(1)
if resp.status_code == 404:
click.echo(f"Content not found: {content_hash}", err=True)
sys.exit(1)
resp.raise_for_status()
except requests.RequestException as e:
click.echo(f"Failed to delete cache item: {e}", err=True)
sys.exit(1)
click.echo(f"Deleted: {content_hash}")
@cli.command()
@click.option("--limit", "-l", default=20, help="Max items to show")
def cache(limit):
"""List cached content."""
items = api_get("/cache")
if not items:
click.echo("Cache is empty.")
return
click.echo(f"Cached content ({len(items)} items):")
for item in items[:limit]:
click.echo(f" {item}")
@cli.command()
@click.argument("content_hash")
@click.option("--output", "-o", type=click.Path(), help="Save to file (use - for stdout)")
def view(content_hash, output):
"""View or download cached content.
Use -o - to pipe to stdout, e.g.: artdag view <hash> -o - | mpv -
"""
url = f"{get_server()}/cache/{content_hash}"
try:
if output == "-":
# Stream to stdout for piping
resp = requests.get(url, stream=True)
resp.raise_for_status()
for chunk in resp.iter_content(chunk_size=8192):
sys.stdout.buffer.write(chunk)
elif output:
# Download to file
resp = requests.get(url, stream=True)
resp.raise_for_status()
with open(output, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
f.write(chunk)
click.echo(f"Saved to: {output}", err=True)
else:
# Get info via GET with stream to check size
resp = requests.get(url, stream=True)
resp.raise_for_status()
size = resp.headers.get("content-length", "unknown")
content_type = resp.headers.get("content-type", "unknown")
click.echo(f"Hash: {content_hash}")
click.echo(f"Size: {size} bytes")
click.echo(f"Type: {content_type}")
click.echo(f"URL: {url}")
resp.close()
except requests.HTTPError as e:
if e.response.status_code == 404:
click.echo(f"Not found: {content_hash}", err=True)
else:
raise
@cli.command("import")
@click.argument("filepath", type=click.Path(exists=True))
def import_file(filepath):
"""Import a local file to cache (local server only)."""
path = str(Path(filepath).resolve())
result = api_post("/cache/import", params={"path": path})
click.echo(f"Imported: {result['content_hash']}")
@cli.command()
@click.argument("filepath", type=click.Path(exists=True))
def upload(filepath):
"""Upload a file to cache (works with remote servers). Requires login."""
# Check auth
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
try:
with open(filepath, "rb") as f:
files = {"file": (Path(filepath).name, f)}
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
resp = requests.post(f"{get_server()}/cache/upload", files=files, headers=headers)
if resp.status_code == 401:
click.echo("Authentication failed. Please login again.", err=True)
sys.exit(1)
resp.raise_for_status()
result = resp.json()
click.echo(f"Uploaded: {result['content_hash']}")
click.echo(f"Size: {result['size']} bytes")
except requests.RequestException as e:
click.echo(f"Upload failed: {e}", err=True)
sys.exit(1)
@cli.command()
def assets():
"""List known assets."""
data = api_get("/assets")
click.echo("Known assets:")
for name, hash in data.items():
click.echo(f" {name}: {hash[:16]}...")
@cli.command()
@click.argument("run_id")
@click.argument("output_name")
def publish(run_id, output_name):
"""Publish an L1 run to L2 (register ownership). Requires login.
RUN_ID: The L1 run ID to publish
OUTPUT_NAME: Name for the registered asset
"""
# Check auth
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
# Post to L2 server with auth, including which L1 server has the run
try:
resp = requests.post(
f"{get_l2_server()}/registry/record-run",
json={"run_id": run_id, "output_name": output_name, "l1_server": get_server()},
headers={"Authorization": f"Bearer {token_data['access_token']}"}
)
if resp.status_code == 401:
click.echo("Authentication failed. Please login again.", err=True)
sys.exit(1)
resp.raise_for_status()
except requests.RequestException as e:
click.echo(f"Failed to publish: {e}", err=True)
sys.exit(1)
result = resp.json()
click.echo(f"Published to L2!")
click.echo(f"Asset: {result['asset']['name']}")
click.echo(f"Hash: {result['asset']['content_hash']}")
click.echo(f"Activity: {result['activity']['activity_id']}")
# ============ Metadata Commands ============
@cli.command()
@click.argument("content_hash")
@click.option("--origin", type=click.Choice(["self", "external"]), help="Set origin type")
@click.option("--origin-url", help="Set external origin URL")
@click.option("--origin-note", help="Note about the origin")
@click.option("--description", "-d", help="Set description")
@click.option("--tags", "-t", help="Set tags (comma-separated)")
@click.option("--folder", "-f", help="Set folder path")
@click.option("--add-collection", help="Add to collection")
@click.option("--remove-collection", help="Remove from collection")
@click.option("--publish", "publish_name", help="Publish to L2 with given asset name")
@click.option("--publish-type", default="image", help="Asset type for publishing (image, video)")
@click.option("--republish", is_flag=True, help="Re-sync with L2 after metadata changes")
def meta(content_hash, origin, origin_url, origin_note, description, tags, folder, add_collection, remove_collection, publish_name, publish_type, republish):
"""View or update metadata for a cached item.
With no options, displays current metadata.
With options, updates the specified fields.
Use --publish <name> to publish to L2 (requires origin to be set).
Use --republish to sync metadata changes to L2.
"""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
# Handle publish action
if publish_name:
try:
resp = requests.post(
f"{get_server()}/cache/{content_hash}/publish",
json={"asset_name": publish_name, "asset_type": publish_type},
headers=headers
)
if resp.status_code == 400:
click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True)
sys.exit(1)
if resp.status_code == 404:
click.echo(f"Content not found: {content_hash}", err=True)
sys.exit(1)
resp.raise_for_status()
result = resp.json()
click.echo(f"Published to L2!")
click.echo(f"Asset name: {result['asset_name']}")
click.echo(f"Activity: {result['l2_result']['activity']['activity_id']}")
except requests.RequestException as e:
click.echo(f"Failed to publish: {e}", err=True)
sys.exit(1)
return
# Handle republish action
if republish:
try:
resp = requests.patch(
f"{get_server()}/cache/{content_hash}/republish",
headers=headers
)
if resp.status_code == 400:
click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True)
sys.exit(1)
if resp.status_code == 404:
click.echo(f"Content not found: {content_hash}", err=True)
sys.exit(1)
resp.raise_for_status()
result = resp.json()
click.echo(f"Re-synced with L2!")
click.echo(f"Asset name: {result['asset_name']}")
except requests.RequestException as e:
click.echo(f"Failed to republish: {e}", err=True)
sys.exit(1)
return
# If no update options, just display current metadata
has_updates = any([origin, origin_url, origin_note, description, tags, folder, add_collection, remove_collection])
if not has_updates:
# GET metadata
try:
resp = requests.get(f"{get_server()}/cache/{content_hash}/meta", headers=headers)
if resp.status_code == 404:
click.echo(f"Content not found: {content_hash}", err=True)
sys.exit(1)
if resp.status_code == 403:
click.echo("Access denied", err=True)
sys.exit(1)
resp.raise_for_status()
meta = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to get metadata: {e}", err=True)
sys.exit(1)
click.echo(f"Content Hash: {content_hash}")
click.echo(f"Uploader: {meta.get('uploader', 'unknown')}")
click.echo(f"Uploaded: {meta.get('uploaded_at', 'unknown')}")
if meta.get("origin"):
origin_info = meta["origin"]
click.echo(f"Origin: {origin_info.get('type', 'unknown')}")
if origin_info.get("url"):
click.echo(f" URL: {origin_info['url']}")
if origin_info.get("note"):
click.echo(f" Note: {origin_info['note']}")
else:
click.echo("Origin: not set")
click.echo(f"Description: {meta.get('description', 'none')}")
click.echo(f"Tags: {', '.join(meta.get('tags', [])) or 'none'}")
click.echo(f"Folder: {meta.get('folder', '/')}")
click.echo(f"Collections: {', '.join(meta.get('collections', [])) or 'none'}")
if meta.get("published"):
pub = meta["published"]
click.echo(f"Published: {pub.get('asset_name')} ({pub.get('published_at')})")
return
# Build update payload
update = {}
if origin or origin_url or origin_note:
# Get current origin first
try:
resp = requests.get(f"{get_server()}/cache/{content_hash}/meta", headers=headers)
resp.raise_for_status()
current = resp.json()
current_origin = current.get("origin", {})
except:
current_origin = {}
update["origin"] = {
"type": origin or current_origin.get("type", "self"),
"url": origin_url if origin_url is not None else current_origin.get("url"),
"note": origin_note if origin_note is not None else current_origin.get("note")
}
if description is not None:
update["description"] = description
if tags is not None:
update["tags"] = [t.strip() for t in tags.split(",") if t.strip()]
if folder is not None:
update["folder"] = folder
if add_collection or remove_collection:
# Get current collections
try:
resp = requests.get(f"{get_server()}/cache/{content_hash}/meta", headers=headers)
resp.raise_for_status()
current = resp.json()
collections = set(current.get("collections", []))
except:
collections = set()
if add_collection:
collections.add(add_collection)
if remove_collection and remove_collection in collections:
collections.remove(remove_collection)
update["collections"] = list(collections)
# PATCH metadata
try:
resp = requests.patch(
f"{get_server()}/cache/{content_hash}/meta",
json=update,
headers=headers
)
if resp.status_code == 404:
click.echo(f"Content not found: {content_hash}", err=True)
sys.exit(1)
if resp.status_code == 400:
click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True)
sys.exit(1)
resp.raise_for_status()
except requests.RequestException as e:
click.echo(f"Failed to update metadata: {e}", err=True)
sys.exit(1)
click.echo("Metadata updated.")
# ============ Folder Commands ============
@cli.group()
def folder():
"""Manage folders for organizing cached items."""
pass
@folder.command("list")
def folder_list():
"""List all folders."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
try:
resp = requests.get(
f"{get_server()}/user/folders",
headers={"Authorization": f"Bearer {token_data['access_token']}"}
)
resp.raise_for_status()
folders = resp.json()["folders"]
except requests.RequestException as e:
click.echo(f"Failed to list folders: {e}", err=True)
sys.exit(1)
click.echo("Folders:")
for f in folders:
click.echo(f" {f}")
@folder.command("create")
@click.argument("path")
def folder_create(path):
"""Create a new folder."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
try:
resp = requests.post(
f"{get_server()}/user/folders",
params={"folder_path": path},
headers={"Authorization": f"Bearer {token_data['access_token']}"}
)
if resp.status_code == 400:
click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True)
sys.exit(1)
resp.raise_for_status()
except requests.RequestException as e:
click.echo(f"Failed to create folder: {e}", err=True)
sys.exit(1)
click.echo(f"Created folder: {path}")
@folder.command("delete")
@click.argument("path")
def folder_delete(path):
"""Delete a folder (must be empty)."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
try:
resp = requests.delete(
f"{get_server()}/user/folders",
params={"folder_path": path},
headers={"Authorization": f"Bearer {token_data['access_token']}"}
)
if resp.status_code == 400:
click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True)
sys.exit(1)
if resp.status_code == 404:
click.echo(f"Folder not found: {path}", err=True)
sys.exit(1)
resp.raise_for_status()
except requests.RequestException as e:
click.echo(f"Failed to delete folder: {e}", err=True)
sys.exit(1)
click.echo(f"Deleted folder: {path}")
# ============ Collection Commands ============
@cli.group()
def collection():
"""Manage collections for organizing cached items."""
pass
@collection.command("list")
def collection_list():
"""List all collections."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
try:
resp = requests.get(
f"{get_server()}/user/collections",
headers={"Authorization": f"Bearer {token_data['access_token']}"}
)
resp.raise_for_status()
collections = resp.json()["collections"]
except requests.RequestException as e:
click.echo(f"Failed to list collections: {e}", err=True)
sys.exit(1)
click.echo("Collections:")
for c in collections:
click.echo(f" {c['name']} (created: {c['created_at'][:10]})")
@collection.command("create")
@click.argument("name")
def collection_create(name):
"""Create a new collection."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
try:
resp = requests.post(
f"{get_server()}/user/collections",
params={"name": name},
headers={"Authorization": f"Bearer {token_data['access_token']}"}
)
if resp.status_code == 400:
click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True)
sys.exit(1)
resp.raise_for_status()
except requests.RequestException as e:
click.echo(f"Failed to create collection: {e}", err=True)
sys.exit(1)
click.echo(f"Created collection: {name}")
@collection.command("delete")
@click.argument("name")
def collection_delete(name):
"""Delete a collection."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
try:
resp = requests.delete(
f"{get_server()}/user/collections",
params={"name": name},
headers={"Authorization": f"Bearer {token_data['access_token']}"}
)
if resp.status_code == 404:
click.echo(f"Collection not found: {name}", err=True)
sys.exit(1)
resp.raise_for_status()
except requests.RequestException as e:
click.echo(f"Failed to delete collection: {e}", err=True)
sys.exit(1)
click.echo(f"Deleted collection: {name}")
# ============ Config Commands ============
@cli.command("upload-config")
@click.argument("filepath", type=click.Path(exists=True))
def upload_config(filepath):
"""Upload a config YAML file. Requires login."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
# Validate YAML locally first
with open(filepath) as f:
try:
config = yaml.safe_load(f)
except yaml.YAMLError as e:
click.echo(f"Invalid YAML: {e}", err=True)
sys.exit(1)
# Check required fields
if not config.get("name"):
click.echo("Config must have a 'name' field", err=True)
sys.exit(1)
# Upload
try:
with open(filepath, "rb") as f:
files = {"file": (Path(filepath).name, f)}
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
resp = requests.post(f"{get_server()}/configs/upload", files=files, headers=headers)
if resp.status_code == 401:
click.echo("Authentication failed. Please login again.", err=True)
sys.exit(1)
resp.raise_for_status()
result = resp.json()
click.echo(f"Uploaded config: {result['name']} v{result.get('version', '1.0')}")
click.echo(f"Config ID: {result['config_id']}")
click.echo(f"Variable inputs: {result['variable_inputs']}")
click.echo(f"Fixed inputs: {result['fixed_inputs']}")
except requests.RequestException as e:
click.echo(f"Upload failed: {e}", err=True)
sys.exit(1)
@cli.command("configs")
@click.option("--limit", "-l", default=10, help="Max configs to show")
def list_configs(limit):
"""List uploaded configs."""
try:
resp = requests.get(f"{get_server()}/configs")
resp.raise_for_status()
data = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to list configs: {e}", err=True)
sys.exit(1)
configs = data.get("configs", [])
if not configs:
click.echo("No configs found.")
return
click.echo(f"{'Name':<20} {'Version':<8} {'Variables':<10} {'Config ID':<24}")
click.echo("-" * 70)
for config in configs[:limit]:
config_id = config["config_id"][:20] + "..."
var_count = len(config.get("variable_inputs", []))
click.echo(f"{config['name']:<20} {config['version']:<8} {var_count:<10} {config_id}")
@cli.command("config")
@click.argument("config_id")
def show_config(config_id):
"""Show details of a config."""
try:
resp = requests.get(f"{get_server()}/configs/{config_id}")
if resp.status_code == 404:
click.echo(f"Config not found: {config_id}", err=True)
sys.exit(1)
resp.raise_for_status()
config = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to get config: {e}", err=True)
sys.exit(1)
click.echo(f"Name: {config['name']}")
click.echo(f"Version: {config['version']}")
click.echo(f"Description: {config.get('description', 'N/A')}")
click.echo(f"Config ID: {config['config_id']}")
click.echo(f"Owner: {config.get('owner', 'N/A')}")
click.echo(f"Uploaded: {config['uploaded_at']}")
if config.get("variable_inputs"):
click.echo("\nVariable Inputs:")
for inp in config["variable_inputs"]:
req = "*" if inp.get("required", True) else ""
click.echo(f" - {inp['name']}{req}: {inp.get('description', 'No description')}")
if config.get("fixed_inputs"):
click.echo("\nFixed Inputs:")
for inp in config["fixed_inputs"]:
click.echo(f" - {inp['asset']}: {inp['content_hash'][:16]}...")
@cli.command("run-config")
@click.argument("config_id")
@click.option("--input", "-i", "inputs", multiple=True, help="Input as node_id:content_hash")
@click.option("--wait", "-w", is_flag=True, help="Wait for completion")
def run_config(config_id, inputs, wait):
"""Run a config with variable inputs. Requires login.
CONFIG_ID: The config ID (content hash)
Example: artdag run-config abc123 -i source_image:def456
"""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
# Parse inputs
input_dict = {}
for inp in inputs:
if ":" not in inp:
click.echo(f"Invalid input format: {inp} (expected node_id:content_hash)", err=True)
sys.exit(1)
node_id, content_hash = inp.split(":", 1)
input_dict[node_id] = content_hash
# Run
try:
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
resp = requests.post(
f"{get_server()}/configs/{config_id}/run",
json={"inputs": input_dict},
headers=headers
)
if resp.status_code == 401:
click.echo("Authentication failed. Please login again.", err=True)
sys.exit(1)
if resp.status_code == 400:
error = resp.json().get("detail", "Bad request")
click.echo(f"Error: {error}", err=True)
sys.exit(1)
if resp.status_code == 404:
click.echo(f"Config not found: {config_id}", err=True)
sys.exit(1)
resp.raise_for_status()
result = resp.json()
except requests.RequestException as e:
click.echo(f"Run failed: {e}", err=True)
sys.exit(1)
click.echo(f"Run started: {result['run_id']}")
click.echo(f"Recipe: {result['recipe']}")
click.echo(f"Status: {result['status']}")
if wait:
click.echo("Waiting for completion...")
run_id = result["run_id"]
while True:
time.sleep(2)
try:
resp = requests.get(f"{get_server()}/runs/{run_id}")
resp.raise_for_status()
run = resp.json()
except requests.RequestException:
continue
if run["status"] == "completed":
click.echo(f"Completed! Output: {run.get('output_hash', 'N/A')}")
break
elif run["status"] == "failed":
click.echo(f"Failed: {run.get('error', 'Unknown error')}", err=True)
sys.exit(1)
@cli.command("delete-config")
@click.argument("config_id")
@click.option("--force", "-f", is_flag=True, help="Skip confirmation")
def delete_config(config_id, force):
"""Delete a config. Requires login."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
if not force:
if not click.confirm(f"Delete config {config_id[:16]}...?"):
click.echo("Cancelled.")
return
try:
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
resp = requests.delete(f"{get_server()}/configs/{config_id}", headers=headers)
if resp.status_code == 401:
click.echo("Authentication failed. Please login again.", err=True)
sys.exit(1)
if resp.status_code == 400:
error = resp.json().get("detail", "Cannot delete")
click.echo(f"Error: {error}", err=True)
sys.exit(1)
if resp.status_code == 404:
click.echo(f"Config not found: {config_id}", err=True)
sys.exit(1)
resp.raise_for_status()
except requests.RequestException as e:
click.echo(f"Delete failed: {e}", err=True)
sys.exit(1)
click.echo(f"Deleted config: {config_id[:16]}...")
if __name__ == "__main__":
cli()