Files
client/artdag.py
gilesb efca939c7d Add --raw flag to view command and improve metadata display
- Add --raw/-r flag to fetch from /cache/{cid}/raw endpoint
- Automatically use /raw when outputting to file or stdout
- Show friendly_name, title, filename in metadata view
- Use JSON API for metadata instead of streaming response

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-12 17:00:53 +00:00

2137 lines
75 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Art DAG Client
CLI for interacting with the Art DAG L1 server.
"""
import json
import os
import sys
import time
from pathlib import Path
import click
import requests
import yaml
DEFAULT_SERVER = os.environ.get("ARTDAG_SERVER", "http://localhost:8100")
DEFAULT_L2_SERVER = os.environ.get("ARTDAG_L2", "http://localhost:8200")
CONFIG_DIR = Path.home() / ".artdag"
TOKEN_FILE = CONFIG_DIR / "token.json"
def get_server():
"""Get server URL from env or default."""
return DEFAULT_SERVER
def get_l2_server():
"""Get L2 server URL."""
return DEFAULT_L2_SERVER
def load_token() -> dict:
"""Load saved token from config."""
if TOKEN_FILE.exists():
with open(TOKEN_FILE) as f:
return json.load(f)
return {}
def save_token(token_data: dict):
"""Save token to config."""
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
with open(TOKEN_FILE, "w") as f:
json.dump(token_data, f, indent=2)
TOKEN_FILE.chmod(0o600)
def clear_token():
"""Clear saved token."""
if TOKEN_FILE.exists():
TOKEN_FILE.unlink()
def get_auth_header(require_token: bool = False) -> dict:
"""Get headers for API requests. Always includes Accept: application/json."""
headers = {"Accept": "application/json"}
token_data = load_token()
token = token_data.get("access_token")
if token:
headers["Authorization"] = f"Bearer {token}"
elif require_token:
click.echo("Not logged in. Use 'artdag login' first.", err=True)
sys.exit(1)
return headers
def api_get(path: str, auth: bool = False):
"""GET request to server."""
headers = get_auth_header(require_token=auth)
resp = requests.get(f"{get_server()}{path}", headers=headers)
resp.raise_for_status()
return resp.json()
def api_post(path: str, data: dict = None, params: dict = None, auth: bool = False):
"""POST request to server."""
headers = get_auth_header(require_token=auth)
resp = requests.post(f"{get_server()}{path}", json=data, params=params, headers=headers)
resp.raise_for_status()
return resp.json()
@click.group()
@click.option("--server", "-s", envvar="ARTDAG_SERVER", default=DEFAULT_SERVER,
help="L1 server URL")
@click.option("--l2", envvar="ARTDAG_L2", default=DEFAULT_L2_SERVER,
help="L2 server URL")
@click.pass_context
def cli(ctx, server, l2):
"""Art DAG Client - interact with L1 rendering server."""
ctx.ensure_object(dict)
ctx.obj["server"] = server
ctx.obj["l2"] = l2
global DEFAULT_SERVER, DEFAULT_L2_SERVER
DEFAULT_SERVER = server
DEFAULT_L2_SERVER = l2
# ============ Auth Commands ============
@cli.command()
@click.argument("username")
@click.option("--password", "-p", prompt=True, hide_input=True)
def login(username, password):
"""Login to get access token."""
try:
# Server expects form data, not JSON
resp = requests.post(
f"{get_l2_server()}/auth/login",
data={"username": username, "password": password}
)
if resp.status_code == 200:
# Check if we got a token back in a cookie
if "auth_token" in resp.cookies:
token = resp.cookies["auth_token"]
# Decode token to get username and expiry
import base64
try:
# JWT format: header.payload.signature
payload = token.split(".")[1]
# Add padding if needed
payload += "=" * (4 - len(payload) % 4)
decoded = json.loads(base64.urlsafe_b64decode(payload))
token_data = {
"access_token": token,
"username": decoded.get("username", username),
"expires_at": decoded.get("exp", "")
}
save_token(token_data)
click.echo(f"Logged in as {token_data['username']}")
if token_data.get("expires_at"):
click.echo(f"Token expires: {token_data['expires_at']}")
except Exception:
# If we can't decode, just save the token
save_token({"access_token": token, "username": username})
click.echo(f"Logged in as {username}")
else:
# HTML response - check for success/error
if "successful" in resp.text.lower():
click.echo(f"Login successful but no token received. Try logging in via web browser.")
elif "invalid" in resp.text.lower():
click.echo(f"Login failed: Invalid username or password", err=True)
sys.exit(1)
else:
click.echo(f"Login failed: {resp.text}", err=True)
sys.exit(1)
else:
click.echo(f"Login failed: {resp.text}", err=True)
sys.exit(1)
except requests.RequestException as e:
click.echo(f"Login failed: {e}", err=True)
sys.exit(1)
@cli.command()
@click.argument("username")
@click.option("--password", "-p", prompt=True, hide_input=True, confirmation_prompt=True)
@click.option("--email", "-e", default=None, help="Email (optional)")
def register(username, password, email):
"""Register a new account."""
try:
# Server expects form data, not JSON
form_data = {
"username": username,
"password": password,
"password2": password,
}
if email:
form_data["email"] = email
resp = requests.post(
f"{get_l2_server()}/auth/register",
data=form_data
)
if resp.status_code == 200:
# Check if we got a token back in a cookie
if "auth_token" in resp.cookies:
token = resp.cookies["auth_token"]
# Decode token to get username and expiry
import base64
try:
# JWT format: header.payload.signature
payload = token.split(".")[1]
# Add padding if needed
payload += "=" * (4 - len(payload) % 4)
decoded = json.loads(base64.urlsafe_b64decode(payload))
token_data = {
"access_token": token,
"username": decoded.get("username", username),
"expires_at": decoded.get("exp", "")
}
save_token(token_data)
click.echo(f"Registered and logged in as {token_data['username']}")
except Exception:
# If we can't decode, just save the token
save_token({"access_token": token, "username": username})
click.echo(f"Registered and logged in as {username}")
else:
# HTML response - registration successful
if "successful" in resp.text.lower():
click.echo(f"Registered as {username}. Please login to get a token.")
else:
click.echo(f"Registration failed: {resp.text}", err=True)
sys.exit(1)
else:
click.echo(f"Registration failed: {resp.text}", err=True)
sys.exit(1)
except requests.RequestException as e:
click.echo(f"Registration failed: {e}", err=True)
sys.exit(1)
@cli.command()
def logout():
"""Logout (clear saved token)."""
clear_token()
click.echo("Logged out")
@cli.command()
def whoami():
"""Show current logged-in user."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in")
return
try:
resp = requests.get(
f"{get_l2_server()}/auth/me",
headers={"Authorization": f"Bearer {token_data['access_token']}"}
)
if resp.status_code == 200:
user = resp.json()
click.echo(f"Username: {user['username']}")
click.echo(f"Created: {user['created_at']}")
if user.get('email'):
click.echo(f"Email: {user['email']}")
else:
click.echo("Token invalid or expired. Please login again.", err=True)
clear_token()
except requests.RequestException as e:
click.echo(f"Error: {e}", err=True)
# ============ Server Commands ============
@cli.command()
def info():
"""Show server info."""
data = api_get("/")
click.echo(f"Server: {get_server()}")
click.echo(f"Name: {data['name']}")
click.echo(f"Version: {data['version']}")
click.echo(f"Cache: {data['cache_dir']}")
click.echo(f"Runs: {data['runs_count']}")
@cli.command()
def stats():
"""Show user stats (runs, recipes, effects, media, storage counts)."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Use 'artdag login' first.", err=True)
sys.exit(1)
try:
headers = get_auth_header(require_token=True)
resp = requests.get(f"{get_server()}/api/stats", headers=headers)
resp.raise_for_status()
stats = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to get stats: {e}", err=True)
sys.exit(1)
click.echo("User Stats:")
click.echo(f" Runs: {stats.get('runs', 0)}")
click.echo(f" Recipes: {stats.get('recipes', 0)}")
click.echo(f" Effects: {stats.get('effects', 0)}")
click.echo(f" Media: {stats.get('media', 0)}")
click.echo(f" Storage: {stats.get('storage', 0)}")
@cli.command("clear-data")
@click.option("--force", "-f", is_flag=True, help="Skip confirmation")
def clear_data(force):
"""Clear all user L1 data (runs, recipes, effects, media).
Storage provider configurations are preserved.
This action cannot be undone!
"""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Use 'artdag login' first.", err=True)
sys.exit(1)
# Show current stats first
try:
headers = get_auth_header(require_token=True)
resp = requests.get(f"{get_server()}/api/stats", headers=headers)
resp.raise_for_status()
stats = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to get stats: {e}", err=True)
sys.exit(1)
click.echo("This will delete:")
click.echo(f" Runs: {stats.get('runs', 0)}")
click.echo(f" Recipes: {stats.get('recipes', 0)}")
click.echo(f" Effects: {stats.get('effects', 0)}")
click.echo(f" Media: {stats.get('media', 0)}")
click.echo()
click.echo("Storage configurations will be preserved.")
click.echo()
if not force:
if not click.confirm("Are you sure you want to delete all this data?"):
click.echo("Cancelled.")
return
click.echo()
click.echo("Clearing data...")
try:
resp = requests.delete(f"{get_server()}/api/clear-data", headers=headers)
resp.raise_for_status()
result = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to clear data: {e}", err=True)
sys.exit(1)
deleted = result.get("deleted", {})
click.echo()
click.echo("Deleted:")
click.echo(f" Runs: {deleted.get('runs', 0)}")
click.echo(f" Recipes: {deleted.get('recipes', 0)}")
click.echo(f" Effects: {deleted.get('effects', 0)}")
click.echo(f" Media: {deleted.get('media', 0)}")
errors = result.get("errors", [])
if errors:
click.echo()
click.echo("Errors encountered:")
for err in errors[:5]:
click.echo(f" - {err}")
if len(errors) > 5:
click.echo(f" ... and {len(errors) - 5} more")
@cli.command()
@click.argument("recipe")
@click.argument("input_hash")
@click.option("--name", "-n", help="Output name")
@click.option("--wait", "-w", is_flag=True, help="Wait for completion")
def run(recipe, input_hash, name, wait):
"""Start a rendering run. Requires login.
RECIPE: Effect/recipe to apply (e.g., dog, identity)
INPUT_HASH: Content hash of input asset
"""
# Check auth
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
# Resolve named assets
assets = api_get("/assets")
if input_hash in assets:
input_hash = assets[input_hash]
click.echo(f"Resolved input to: {input_hash[:16]}...")
data = {
"recipe": recipe,
"inputs": [input_hash],
}
if name:
data["output_name"] = name
try:
result = api_post("/runs", data, auth=True)
except requests.HTTPError as e:
if e.response.status_code == 401:
click.echo("Authentication failed. Please login again.", err=True)
sys.exit(1)
raise
run_id = result["run_id"]
click.echo(f"Run started: {run_id}")
click.echo(f"Status: {result['status']}")
if wait:
click.echo("Waiting for completion...")
while True:
status = api_get(f"/runs/{run_id}")
if status["status"] in ("completed", "failed"):
break
time.sleep(1)
click.echo(".", nl=False)
click.echo()
if status["status"] == "completed":
click.echo(f"Completed!")
click.echo(f"Output: {status['output_cid']}")
else:
click.echo(f"Failed: {status.get('error', 'Unknown error')}")
@cli.command("runs")
@click.option("--limit", "-l", default=10, help="Max runs to show")
@click.option("--offset", "-o", default=0, help="Offset for pagination")
def list_runs(limit, offset):
"""List all runs with pagination."""
headers = get_auth_header(require_token=True)
try:
resp = requests.get(f"{get_server()}/runs?offset={offset}&limit={limit}", headers=headers)
resp.raise_for_status()
data = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to list runs: {e}", err=True)
sys.exit(1)
runs = data.get("runs", [])
has_more = data.get("has_more", False)
if not runs:
click.echo("No runs found.")
return
start = offset + 1
end = offset + len(runs)
click.echo(f"Showing {start}-{end}" + (" (more available)" if has_more else ""))
click.echo()
click.echo(f"{'ID':<36} {'Status':<10} {'Recipe':<10} {'Output Hash':<20}")
click.echo("-" * 80)
for run in runs:
output = run.get("output_cid", "")[:16] + "..." if run.get("output_cid") else "-"
click.echo(f"{run['run_id']} {run['status']:<10} {run['recipe']:<10} {output}")
@cli.command()
@click.argument("run_id")
@click.option("--plan", "-p", is_flag=True, help="Show execution plan with steps")
@click.option("--artifacts", "-a", is_flag=True, help="Show output artifacts")
@click.option("--analysis", is_flag=True, help="Show audio analysis data")
def status(run_id, plan, artifacts, analysis):
"""Get status of a run with optional detailed views."""
headers = get_auth_header() # Optional auth, always has Accept header
try:
resp = requests.get(f"{get_server()}/runs/{run_id}", headers=headers)
if resp.status_code == 404:
click.echo(f"Run not found: {run_id}")
return
resp.raise_for_status()
run = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to get run: {e}", err=True)
sys.exit(1)
# Basic status
click.echo(f"Run ID: {run['run_id']}")
click.echo(f"Status: {run['status']}")
click.echo(f"Recipe: {run['recipe']}")
click.echo(f"Inputs: {', '.join(run.get('inputs', []))}")
click.echo(f"Output Name: {run.get('output_name', 'N/A')}")
click.echo(f"Created: {run['created_at']}")
if run.get("completed_at"):
click.echo(f"Completed: {run['completed_at']}")
if run.get("output_cid"):
click.echo(f"Output Hash: {run['output_cid']}")
if run.get("error"):
click.echo(f"Error: {run['error']}")
# Plan view
if plan:
click.echo()
click.echo("Execution Plan:")
click.echo("-" * 60)
try:
plan_resp = requests.get(f"{get_server()}/runs/{run_id}/plan", headers=headers)
if plan_resp.status_code == 200:
plan_data = plan_resp.json()
steps = plan_data.get("steps", [])
if steps:
for i, step in enumerate(steps, 1):
status_str = step.get("status", "pending")
if status_str == "cached":
status_badge = "[cached]"
elif status_str == "completed":
status_badge = "[done]"
elif status_str == "running":
status_badge = "[running]"
else:
status_badge = "[pending]"
step_id = step.get("id", step.get("node_id", f"step_{i}"))
step_type = step.get("type", "unknown")
output_cid = step.get("output_cid", "")
output_str = f"{output_cid[:16]}..." if output_cid else ""
click.echo(f" {i}. {status_badge:<10} {step_id:<20} ({step_type}) {output_str}")
else:
click.echo(" No plan steps available.")
else:
click.echo(" Plan not available.")
except requests.RequestException:
click.echo(" Failed to fetch plan.")
# Artifacts view
if artifacts:
click.echo()
click.echo("Artifacts:")
click.echo("-" * 60)
try:
art_resp = requests.get(f"{get_server()}/runs/{run_id}/artifacts", headers=headers)
if art_resp.status_code == 200:
art_data = art_resp.json()
artifact_list = art_data.get("artifacts", [])
if artifact_list:
for art in artifact_list:
cid = art.get("cid", art.get("output_cid", "unknown"))
name = art.get("name", art.get("step_id", "output"))
media_type = art.get("media_type", art.get("content_type", ""))
size = art.get("size", "")
size_str = f" ({size})" if size else ""
type_str = f" [{media_type}]" if media_type else ""
click.echo(f" {name}: {cid[:24]}...{type_str}{size_str}")
else:
click.echo(" No artifacts available.")
else:
click.echo(" Artifacts not available.")
except requests.RequestException:
click.echo(" Failed to fetch artifacts.")
# Analysis view
if analysis:
click.echo()
click.echo("Analysis:")
click.echo("-" * 60)
try:
# Analysis is included in the detail view
detail_resp = requests.get(f"{get_server()}/runs/{run_id}/detail", headers=headers)
if detail_resp.status_code == 200:
detail_data = detail_resp.json()
analysis_data = detail_data.get("analysis", [])
if analysis_data:
for item in analysis_data:
input_name = item.get("input_name", item.get("name", "input"))
click.echo(f" {input_name}:")
if item.get("tempo"):
click.echo(f" Tempo: {item['tempo']} BPM")
if item.get("beat_count"):
click.echo(f" Beats: {item['beat_count']}")
if item.get("energy") is not None:
click.echo(f" Energy: {item['energy']}%")
if item.get("duration"):
click.echo(f" Duration: {item['duration']:.1f}s")
click.echo()
else:
click.echo(" No analysis data available.")
else:
click.echo(" Analysis not available.")
except requests.RequestException:
click.echo(" Failed to fetch analysis.")
@cli.command("delete-run")
@click.argument("run_id")
@click.option("--force", "-f", is_flag=True, help="Skip confirmation")
def delete_run(run_id, force):
"""Delete a run. Requires login.
RUN_ID: The run ID to delete
"""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
# Get run info first
try:
run = api_get(f"/runs/{run_id}")
except requests.HTTPError as e:
if e.response.status_code == 404:
click.echo(f"Run not found: {run_id}", err=True)
sys.exit(1)
raise
if not force:
click.echo(f"Run: {run_id}")
click.echo(f"Status: {run['status']}")
click.echo(f"Recipe: {run['recipe']}")
if not click.confirm("Delete this run?"):
click.echo("Cancelled.")
return
try:
headers = get_auth_header(require_token=True)
resp = requests.delete(f"{get_server()}/runs/{run_id}", headers=headers)
if resp.status_code == 400:
click.echo(f"Cannot delete: {resp.json().get('detail', 'Unknown error')}", err=True)
sys.exit(1)
if resp.status_code == 403:
click.echo("Access denied", err=True)
sys.exit(1)
if resp.status_code == 404:
click.echo(f"Run not found: {run_id}", err=True)
sys.exit(1)
resp.raise_for_status()
except requests.RequestException as e:
click.echo(f"Failed to delete run: {e}", err=True)
sys.exit(1)
click.echo(f"Deleted run: {run_id}")
@cli.command("delete-cache")
@click.argument("cid")
@click.option("--force", "-f", is_flag=True, help="Skip confirmation")
def delete_cache(cid, force):
"""Delete a cached item. Requires login.
CID: The content identifier (IPFS CID) to delete
"""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
if not force:
click.echo(f"CID: {cid}")
if not click.confirm("Delete this cached item?"):
click.echo("Cancelled.")
return
try:
headers = get_auth_header(require_token=True)
resp = requests.delete(f"{get_server()}/cache/{cid}", headers=headers)
if resp.status_code == 400:
click.echo(f"Cannot delete: {resp.json().get('detail', 'Unknown error')}", err=True)
sys.exit(1)
if resp.status_code == 403:
click.echo("Access denied", err=True)
sys.exit(1)
if resp.status_code == 404:
click.echo(f"Content not found: {cid}", err=True)
sys.exit(1)
resp.raise_for_status()
except requests.RequestException as e:
click.echo(f"Failed to delete cache item: {e}", err=True)
sys.exit(1)
click.echo(f"Deleted: {cid}")
MEDIA_TYPE_EXTENSIONS = {
"image": ["jpg", "jpeg", "png", "gif", "webp", "bmp", "svg"],
"video": ["mp4", "mkv", "webm", "mov", "avi", "wmv"],
"audio": ["mp3", "wav", "flac", "ogg", "m4a", "aac"],
}
def matches_media_type(item: dict, media_type: str) -> bool:
"""Check if item matches the requested media type."""
if media_type == "all":
return True
# Check content_type/media_type field
content_type = item.get("content_type", item.get("media_type", ""))
if content_type:
if media_type == "image" and content_type.startswith("image/"):
return True
if media_type == "video" and content_type.startswith("video/"):
return True
if media_type == "audio" and content_type.startswith("audio/"):
return True
# Check filename extension
filename = item.get("filename", item.get("friendly_name", ""))
if filename:
ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
if ext in MEDIA_TYPE_EXTENSIONS.get(media_type, []):
return True
return False
@cli.command()
@click.option("--limit", "-l", default=20, help="Max items to show")
@click.option("--offset", "-o", default=0, help="Offset for pagination")
@click.option("--type", "-t", "media_type", type=click.Choice(["all", "image", "video", "audio"]),
default="all", help="Filter by media type")
def cache(limit, offset, media_type):
"""List cached content with pagination and optional type filter."""
headers = get_auth_header(require_token=True)
# Fetch more items if filtering to ensure we get enough results
fetch_limit = limit * 3 if media_type != "all" else limit
try:
resp = requests.get(f"{get_server()}/cache?offset={offset}&limit={fetch_limit}", headers=headers)
resp.raise_for_status()
data = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to list cache: {e}", err=True)
sys.exit(1)
items = data.get("items", [])
has_more = data.get("has_more", False)
# Filter by media type if requested
if media_type != "all":
items = [item for item in items if isinstance(item, dict) and matches_media_type(item, media_type)]
items = items[:limit] # Apply limit after filtering
if not items:
if media_type != "all":
click.echo(f"No {media_type} files found in cache.")
else:
click.echo("Cache is empty.")
return
start = offset + 1
end = offset + len(items)
type_str = f" ({media_type})" if media_type != "all" else ""
click.echo(f"Showing {start}-{end}{type_str}" + (" (more available)" if has_more else ""))
click.echo()
for item in items:
cid = item.get("cid", item) if isinstance(item, dict) else item
name = item.get("friendly_name") or item.get("filename") if isinstance(item, dict) else None
content_type = item.get("content_type", "") if isinstance(item, dict) else ""
type_badge = f"[{content_type.split('/')[0]}]" if content_type else ""
if name:
click.echo(f" {cid[:24]}... {name} {type_badge}")
else:
click.echo(f" {cid} {type_badge}")
@cli.command()
@click.argument("cid")
@click.option("--output", "-o", type=click.Path(), help="Save to file (use - for stdout)")
@click.option("--raw", "-r", is_flag=True, help="Get raw content (use with -o to download)")
def view(cid, output, raw):
"""View or download cached content.
Use -o - to pipe to stdout, e.g.: artdag view <cid> -o - | mpv -
Use --raw to get the raw file content instead of metadata.
"""
# Use /raw endpoint if --raw flag or if outputting to file/stdout
if raw or output:
url = f"{get_server()}/cache/{cid}/raw"
else:
url = f"{get_server()}/cache/{cid}"
try:
if output == "-":
# Stream to stdout for piping
resp = requests.get(url, stream=True)
resp.raise_for_status()
for chunk in resp.iter_content(chunk_size=8192):
sys.stdout.buffer.write(chunk)
elif output:
# Download to file
resp = requests.get(url, stream=True)
resp.raise_for_status()
with open(output, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
f.write(chunk)
click.echo(f"Saved to: {output}", err=True)
else:
# Get info - use JSON endpoint for metadata
headers = {"Accept": "application/json"}
resp = requests.get(f"{get_server()}/cache/{cid}", headers=headers)
resp.raise_for_status()
info = resp.json()
click.echo(f"CID: {cid}")
click.echo(f"Size: {info.get('size', 'unknown')} bytes")
click.echo(f"Type: {info.get('mime_type') or info.get('media_type', 'unknown')}")
if info.get('friendly_name'):
click.echo(f"Friendly Name: {info['friendly_name']}")
if info.get('title'):
click.echo(f"Title: {info['title']}")
if info.get('filename'):
click.echo(f"Filename: {info['filename']}")
click.echo(f"Raw URL: {get_server()}/cache/{cid}/raw")
except requests.HTTPError as e:
if e.response.status_code == 404:
click.echo(f"Not found: {cid}", err=True)
else:
raise
@cli.command("import")
@click.argument("filepath", type=click.Path(exists=True))
def import_file(filepath):
"""Import a local file to cache (local server only)."""
path = str(Path(filepath).resolve())
result = api_post("/cache/import", params={"path": path})
click.echo(f"Imported: {result['cid']}")
@cli.command()
@click.argument("filepath", type=click.Path(exists=True))
def upload(filepath):
"""Upload a file to cache and IPFS. Requires login."""
# Check auth
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
try:
with open(filepath, "rb") as f:
files = {"file": (Path(filepath).name, f)}
headers = get_auth_header(require_token=True)
resp = requests.post(f"{get_server()}/cache/upload", files=files, headers=headers)
if resp.status_code == 401:
click.echo("Authentication failed. Please login again.", err=True)
sys.exit(1)
if resp.status_code >= 400:
try:
detail = resp.json().get("detail", resp.text)
except:
detail = resp.text
click.echo(f"Upload failed: {resp.status_code} - {detail}", err=True)
sys.exit(1)
result = resp.json()
click.echo(f"CID: {result['cid']}")
click.echo(f"Size: {result['size']} bytes")
click.echo()
click.echo("Use in recipes:")
click.echo(f' (asset my-asset :cid "{result["cid"]}")')
except requests.RequestException as e:
click.echo(f"Upload failed: {e}", err=True)
sys.exit(1)
@cli.command()
def assets():
"""List known assets."""
data = api_get("/assets")
click.echo("Known assets:")
for name, hash in data.items():
click.echo(f" {name}: {hash[:16]}...")
@cli.command()
@click.argument("run_id")
@click.argument("output_name")
def publish(run_id, output_name):
"""Publish an L1 run to L2 (register ownership). Requires login.
RUN_ID: The L1 run ID to publish
OUTPUT_NAME: Name for the registered asset
"""
# Check auth
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
# Post to L2 server with auth, including which L1 server has the run
try:
resp = requests.post(
f"{get_l2_server()}/registry/record-run",
json={"run_id": run_id, "output_name": output_name, "l1_server": get_server()},
headers={"Authorization": f"Bearer {token_data['access_token']}"}
)
if resp.status_code == 401:
click.echo("Authentication failed. Please login again.", err=True)
sys.exit(1)
resp.raise_for_status()
except requests.RequestException as e:
click.echo(f"Failed to publish: {e}", err=True)
sys.exit(1)
result = resp.json()
click.echo(f"Published to L2!")
click.echo(f"Asset: {result['asset']['name']}")
click.echo(f"CID: {result['asset']['cid']}")
click.echo(f"Activity: {result['activity']['activity_id']}")
# ============ Metadata Commands ============
@cli.command()
@click.argument("cid")
@click.option("--origin", type=click.Choice(["self", "external"]), help="Set origin type")
@click.option("--origin-url", help="Set external origin URL")
@click.option("--origin-note", help="Note about the origin")
@click.option("--description", "-d", help="Set description")
@click.option("--tags", "-t", help="Set tags (comma-separated)")
@click.option("--folder", "-f", help="Set folder path")
@click.option("--add-collection", help="Add to collection")
@click.option("--remove-collection", help="Remove from collection")
@click.option("--publish", "publish_name", help="Publish to L2 with given asset name")
@click.option("--publish-type", default="image", help="Asset type for publishing (image, video)")
@click.option("--republish", is_flag=True, help="Re-sync with L2 after metadata changes")
def meta(cid, origin, origin_url, origin_note, description, tags, folder, add_collection, remove_collection, publish_name, publish_type, republish):
"""View or update metadata for a cached item.
With no options, displays current metadata.
With options, updates the specified fields.
Use --publish <name> to publish to L2 (requires origin to be set).
Use --republish to sync metadata changes to L2.
"""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
headers = get_auth_header(require_token=True)
# Handle publish action
if publish_name:
try:
resp = requests.post(
f"{get_server()}/cache/{cid}/publish",
json={"asset_name": publish_name, "asset_type": publish_type},
headers=headers
)
if resp.status_code == 400:
click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True)
sys.exit(1)
if resp.status_code == 404:
click.echo(f"Content not found: {cid}", err=True)
sys.exit(1)
resp.raise_for_status()
result = resp.json()
click.echo(f"Published to L2!")
click.echo(f"Asset name: {result['asset_name']}")
click.echo(f"Activity: {result['l2_result']['activity']['activity_id']}")
except requests.RequestException as e:
click.echo(f"Failed to publish: {e}", err=True)
sys.exit(1)
return
# Handle republish action
if republish:
try:
resp = requests.patch(
f"{get_server()}/cache/{cid}/republish",
headers=headers
)
if resp.status_code == 400:
click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True)
sys.exit(1)
if resp.status_code == 404:
click.echo(f"Content not found: {cid}", err=True)
sys.exit(1)
resp.raise_for_status()
result = resp.json()
click.echo(f"Re-synced with L2!")
click.echo(f"Asset name: {result['asset_name']}")
except requests.RequestException as e:
click.echo(f"Failed to republish: {e}", err=True)
sys.exit(1)
return
# If no update options, just display current metadata
has_updates = any([origin, origin_url, origin_note, description, tags, folder, add_collection, remove_collection])
if not has_updates:
# GET metadata
try:
resp = requests.get(f"{get_server()}/cache/{cid}/meta", headers=headers)
if resp.status_code == 404:
click.echo(f"Content not found: {cid}", err=True)
sys.exit(1)
if resp.status_code == 403:
click.echo("Access denied", err=True)
sys.exit(1)
resp.raise_for_status()
meta = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to get metadata: {e}", err=True)
sys.exit(1)
click.echo(f"Content Hash: {cid}")
click.echo(f"Uploader: {meta.get('uploader', 'unknown')}")
click.echo(f"Uploaded: {meta.get('uploaded_at', 'unknown')}")
if meta.get("origin"):
origin_info = meta["origin"]
click.echo(f"Origin: {origin_info.get('type', 'unknown')}")
if origin_info.get("url"):
click.echo(f" URL: {origin_info['url']}")
if origin_info.get("note"):
click.echo(f" Note: {origin_info['note']}")
else:
click.echo("Origin: not set")
click.echo(f"Description: {meta.get('description', 'none')}")
click.echo(f"Tags: {', '.join(meta.get('tags', [])) or 'none'}")
click.echo(f"Folder: {meta.get('folder', '/')}")
click.echo(f"Collections: {', '.join(meta.get('collections', [])) or 'none'}")
if meta.get("published"):
pub = meta["published"]
click.echo(f"Published: {pub.get('asset_name')} ({pub.get('published_at')})")
return
# Build update payload
update = {}
if origin or origin_url or origin_note:
# Get current origin first
try:
resp = requests.get(f"{get_server()}/cache/{cid}/meta", headers=headers)
resp.raise_for_status()
current = resp.json()
current_origin = current.get("origin", {})
except:
current_origin = {}
update["origin"] = {
"type": origin or current_origin.get("type", "self"),
"url": origin_url if origin_url is not None else current_origin.get("url"),
"note": origin_note if origin_note is not None else current_origin.get("note")
}
if description is not None:
update["description"] = description
if tags is not None:
update["tags"] = [t.strip() for t in tags.split(",") if t.strip()]
if folder is not None:
update["folder"] = folder
if add_collection or remove_collection:
# Get current collections
try:
resp = requests.get(f"{get_server()}/cache/{cid}/meta", headers=headers)
resp.raise_for_status()
current = resp.json()
collections = set(current.get("collections", []))
except:
collections = set()
if add_collection:
collections.add(add_collection)
if remove_collection and remove_collection in collections:
collections.remove(remove_collection)
update["collections"] = list(collections)
# PATCH metadata
try:
resp = requests.patch(
f"{get_server()}/cache/{cid}/meta",
json=update,
headers=headers
)
if resp.status_code == 404:
click.echo(f"Content not found: {cid}", err=True)
sys.exit(1)
if resp.status_code == 400:
click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True)
sys.exit(1)
resp.raise_for_status()
except requests.RequestException as e:
click.echo(f"Failed to update metadata: {e}", err=True)
sys.exit(1)
click.echo("Metadata updated.")
# ============ Folder Commands ============
@cli.group()
def folder():
"""Manage folders for organizing cached items."""
pass
@folder.command("list")
def folder_list():
"""List all folders."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
try:
resp = requests.get(
f"{get_server()}/user/folders",
headers={"Authorization": f"Bearer {token_data['access_token']}"}
)
resp.raise_for_status()
folders = resp.json()["folders"]
except requests.RequestException as e:
click.echo(f"Failed to list folders: {e}", err=True)
sys.exit(1)
click.echo("Folders:")
for f in folders:
click.echo(f" {f}")
@folder.command("create")
@click.argument("path")
def folder_create(path):
"""Create a new folder."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
try:
resp = requests.post(
f"{get_server()}/user/folders",
params={"folder_path": path},
headers={"Authorization": f"Bearer {token_data['access_token']}"}
)
if resp.status_code == 400:
click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True)
sys.exit(1)
resp.raise_for_status()
except requests.RequestException as e:
click.echo(f"Failed to create folder: {e}", err=True)
sys.exit(1)
click.echo(f"Created folder: {path}")
@folder.command("delete")
@click.argument("path")
def folder_delete(path):
"""Delete a folder (must be empty)."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
try:
resp = requests.delete(
f"{get_server()}/user/folders",
params={"folder_path": path},
headers={"Authorization": f"Bearer {token_data['access_token']}"}
)
if resp.status_code == 400:
click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True)
sys.exit(1)
if resp.status_code == 404:
click.echo(f"Folder not found: {path}", err=True)
sys.exit(1)
resp.raise_for_status()
except requests.RequestException as e:
click.echo(f"Failed to delete folder: {e}", err=True)
sys.exit(1)
click.echo(f"Deleted folder: {path}")
# ============ Collection Commands ============
@cli.group()
def collection():
"""Manage collections for organizing cached items."""
pass
@collection.command("list")
def collection_list():
"""List all collections."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
try:
resp = requests.get(
f"{get_server()}/user/collections",
headers={"Authorization": f"Bearer {token_data['access_token']}"}
)
resp.raise_for_status()
collections = resp.json()["collections"]
except requests.RequestException as e:
click.echo(f"Failed to list collections: {e}", err=True)
sys.exit(1)
click.echo("Collections:")
for c in collections:
click.echo(f" {c['name']} (created: {c['created_at'][:10]})")
@collection.command("create")
@click.argument("name")
def collection_create(name):
"""Create a new collection."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
try:
resp = requests.post(
f"{get_server()}/user/collections",
params={"name": name},
headers={"Authorization": f"Bearer {token_data['access_token']}"}
)
if resp.status_code == 400:
click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True)
sys.exit(1)
resp.raise_for_status()
except requests.RequestException as e:
click.echo(f"Failed to create collection: {e}", err=True)
sys.exit(1)
click.echo(f"Created collection: {name}")
@collection.command("delete")
@click.argument("name")
def collection_delete(name):
"""Delete a collection."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
try:
resp = requests.delete(
f"{get_server()}/user/collections",
params={"name": name},
headers={"Authorization": f"Bearer {token_data['access_token']}"}
)
if resp.status_code == 404:
click.echo(f"Collection not found: {name}", err=True)
sys.exit(1)
resp.raise_for_status()
except requests.RequestException as e:
click.echo(f"Failed to delete collection: {e}", err=True)
sys.exit(1)
click.echo(f"Deleted collection: {name}")
# ============ Storage Commands ============
STORAGE_PROVIDER_TYPES = ["pinata", "web3storage", "nftstorage", "infura", "filebase", "storj", "local"]
STORAGE_CONFIG_FIELDS = {
"pinata": ["api_key", "secret_key"],
"web3storage": ["api_token"],
"nftstorage": ["api_token"],
"infura": ["project_id", "project_secret"],
"filebase": ["access_key", "secret_key", "bucket"],
"storj": ["access_key", "secret_key", "bucket"],
"local": ["path"],
}
@cli.group()
def storage():
"""Manage IPFS storage providers."""
pass
@storage.command("list")
def storage_list():
"""List all storage providers."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Use 'artdag login' first.", err=True)
sys.exit(1)
try:
headers = get_auth_header(require_token=True)
resp = requests.get(f"{get_server()}/storage", headers=headers)
resp.raise_for_status()
data = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to list storage providers: {e}", err=True)
sys.exit(1)
storages = data.get("storages", [])
if not storages:
click.echo("No storage providers configured.")
click.echo(f"\nAvailable types: {', '.join(STORAGE_PROVIDER_TYPES)}")
click.echo("Use 'artdag storage add <type>' to add one.")
return
click.echo("Storage Providers:")
click.echo()
for s in storages:
status = "Active" if s.get("is_active", True) else "Inactive"
click.echo(f" [{s['id']}] {s['provider_name'] or s['provider_type']} ({s['provider_type']})")
click.echo(f" Status: {status}")
click.echo(f" Capacity: {s.get('capacity_gb', 'N/A')} GB")
click.echo()
@storage.command("add")
@click.argument("provider_type", type=click.Choice(STORAGE_PROVIDER_TYPES))
@click.option("--name", "-n", help="Friendly name for this provider")
@click.option("--capacity", "-c", type=int, default=5, help="Capacity in GB (default: 5)")
def storage_add(provider_type, name, capacity):
"""Add a storage provider (interactive config)."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Use 'artdag login' first.", err=True)
sys.exit(1)
# Get config fields for this provider type
fields = STORAGE_CONFIG_FIELDS.get(provider_type, [])
config = {}
click.echo(f"Configuring {provider_type} storage provider...")
click.echo()
for field in fields:
is_secret = "secret" in field.lower() or "key" in field.lower() or "token" in field.lower()
if is_secret:
value = click.prompt(f" {field}", hide_input=True)
else:
value = click.prompt(f" {field}")
config[field] = value
# Send to server
try:
headers = get_auth_header(require_token=True)
payload = {
"provider_type": provider_type,
"config": config,
"capacity_gb": capacity,
}
if name:
payload["provider_name"] = name
resp = requests.post(f"{get_server()}/storage", json=payload, headers=headers)
if resp.status_code == 400:
click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True)
sys.exit(1)
resp.raise_for_status()
result = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to add storage provider: {e}", err=True)
sys.exit(1)
click.echo()
click.echo(f"Storage provider added (ID: {result.get('id')})")
@storage.command("test")
@click.argument("storage_id", type=int)
def storage_test(storage_id):
"""Test storage provider connectivity."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Use 'artdag login' first.", err=True)
sys.exit(1)
try:
headers = get_auth_header(require_token=True)
resp = requests.post(f"{get_server()}/storage/{storage_id}/test", headers=headers)
if resp.status_code == 404:
click.echo(f"Storage provider not found: {storage_id}", err=True)
sys.exit(1)
resp.raise_for_status()
result = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to test storage: {e}", err=True)
sys.exit(1)
if result.get("success"):
click.echo(f"Success: {result.get('message', 'Connection OK')}")
else:
click.echo(f"Failed: {result.get('message', 'Unknown error')}", err=True)
sys.exit(1)
@storage.command("delete")
@click.argument("storage_id", type=int)
@click.option("--force", "-f", is_flag=True, help="Skip confirmation")
def storage_delete(storage_id, force):
"""Delete a storage provider."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Use 'artdag login' first.", err=True)
sys.exit(1)
if not force:
if not click.confirm(f"Delete storage provider {storage_id}?"):
click.echo("Cancelled.")
return
try:
headers = get_auth_header(require_token=True)
resp = requests.delete(f"{get_server()}/storage/{storage_id}", headers=headers)
if resp.status_code == 400:
click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True)
sys.exit(1)
if resp.status_code == 404:
click.echo(f"Storage provider not found: {storage_id}", err=True)
sys.exit(1)
resp.raise_for_status()
except requests.RequestException as e:
click.echo(f"Failed to delete storage provider: {e}", err=True)
sys.exit(1)
click.echo(f"Deleted storage provider: {storage_id}")
# ============ Recipe Commands ============
def _is_sexp_file(filepath: str, content: str) -> bool:
"""Detect if file is S-expression format."""
# Check extension first
if filepath.endswith('.sexp'):
return True
# Check content - skip comments and whitespace
for line in content.split('\n'):
stripped = line.strip()
if not stripped or stripped.startswith(';'):
continue
return stripped.startswith('(')
return False
@cli.command("upload-recipe")
@click.argument("filepath", type=click.Path(exists=True))
def upload_recipe(filepath):
"""Upload a recipe file (YAML or S-expression). Requires login."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
# Read content
with open(filepath) as f:
content = f.read()
# Detect format and validate
is_sexp = _is_sexp_file(filepath, content)
if is_sexp:
# S-expression - basic syntax check (starts with paren after comments)
# Full validation happens on server
click.echo("Detected S-expression format")
else:
# Validate YAML locally
try:
recipe = yaml.safe_load(content)
except yaml.YAMLError as e:
click.echo(f"Invalid YAML: {e}", err=True)
sys.exit(1)
# Check required fields for YAML
if not recipe.get("name"):
click.echo("Recipe must have a 'name' field", err=True)
sys.exit(1)
# Upload
try:
with open(filepath, "rb") as f:
files = {"file": (Path(filepath).name, f)}
headers = get_auth_header(require_token=True)
resp = requests.post(f"{get_server()}/recipes/upload", files=files, headers=headers)
if resp.status_code == 401:
click.echo("Authentication failed. Please login again.", err=True)
sys.exit(1)
if resp.status_code >= 400:
click.echo(f"Error response: {resp.text}", err=True)
resp.raise_for_status()
result = resp.json()
click.echo(f"Uploaded recipe: {result['name']} v{result.get('version', '1.0')}")
click.echo(f"Recipe ID: {result['recipe_id']}")
click.echo(f"Variable inputs: {result['variable_inputs']}")
click.echo(f"Fixed inputs: {result['fixed_inputs']}")
except requests.RequestException as e:
click.echo(f"Upload failed: {e}", err=True)
sys.exit(1)
@cli.command("upload-effect")
@click.argument("filepath", type=click.Path(exists=True))
def upload_effect(filepath):
"""Upload an effect file to IPFS. Requires login.
Effects are Python files with PEP 723 dependencies and @-tag metadata.
Returns the IPFS CID for use in recipes.
"""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
# Check it's a Python file
if not filepath.endswith(".py"):
click.echo("Effect must be a Python file (.py)", err=True)
sys.exit(1)
# Upload
try:
with open(filepath, "rb") as f:
files = {"file": (Path(filepath).name, f)}
headers = get_auth_header(require_token=True)
resp = requests.post(f"{get_server()}/effects/upload", files=files, headers=headers)
if resp.status_code == 401:
click.echo("Authentication failed. Please login again.", err=True)
sys.exit(1)
if resp.status_code >= 400:
click.echo(f"Error response: {resp.text}", err=True)
sys.exit(1)
resp.raise_for_status()
result = resp.json()
click.echo(f"Uploaded effect: {result['name']} v{result.get('version', '1.0.0')}")
click.echo(f"CID: {result['cid']}")
click.echo(f"Temporal: {result.get('temporal', False)}")
if result.get('params'):
click.echo(f"Parameters: {', '.join(p['name'] for p in result['params'])}")
if result.get('dependencies'):
click.echo(f"Dependencies: {', '.join(result['dependencies'])}")
click.echo()
click.echo("Use in recipes:")
click.echo(f' (effect {result["name"]} :cid "{result["cid"]}")')
except requests.RequestException as e:
click.echo(f"Upload failed: {e}", err=True)
sys.exit(1)
@cli.command("effects")
@click.option("--limit", "-l", default=20, help="Max effects to show")
@click.option("--offset", "-o", default=0, help="Offset for pagination")
def list_effects(limit, offset):
"""List uploaded effects with pagination."""
headers = get_auth_header(require_token=True)
try:
resp = requests.get(f"{get_server()}/effects?offset={offset}&limit={limit}", headers=headers)
resp.raise_for_status()
result = resp.json()
effects = result.get("effects", [])
has_more = result.get("has_more", False)
if not effects:
click.echo("No effects found")
return
start = offset + 1
end = offset + len(effects)
click.echo(f"Showing {start}-{end}" + (" (more available)" if has_more else ""))
click.echo()
for effect in effects:
meta = effect.get("meta", {})
click.echo(f" {meta.get('name', 'unknown')} v{meta.get('version', '?')}")
click.echo(f" Hash: {effect['cid'][:32]}...")
click.echo(f" Temporal: {meta.get('temporal', False)}")
if meta.get('params'):
click.echo(f" Params: {', '.join(p['name'] for p in meta['params'])}")
click.echo()
except requests.RequestException as e:
click.echo(f"Failed to list effects: {e}", err=True)
sys.exit(1)
@cli.command("effect")
@click.argument("cid")
@click.option("--source", "-s", is_flag=True, help="Show source code")
def show_effect(cid, source):
"""Show details of an effect by CID."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Use 'artdag login' first.", err=True)
sys.exit(1)
try:
headers = get_auth_header(require_token=True)
resp = requests.get(f"{get_server()}/effects/{cid}", headers=headers)
if resp.status_code == 404:
click.echo(f"Effect not found: {cid}", err=True)
sys.exit(1)
resp.raise_for_status()
effect = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to get effect: {e}", err=True)
sys.exit(1)
meta = effect.get("meta", effect)
name = meta.get("name", "Unnamed")
version = meta.get("version", "1.0.0")
author = meta.get("author", "Unknown")
description = meta.get("description", "No description")
click.echo(f"Name: {name} (v{version})")
click.echo(f"Author: {author}")
click.echo(f"Description: {description}")
click.echo(f"CID: {effect.get('cid', cid)}")
if effect.get("uploaded_at"):
click.echo(f"Uploaded: {effect['uploaded_at']}")
if effect.get("uploader"):
click.echo(f"Uploader: {effect['uploader']}")
if meta.get("temporal"):
click.echo("Temporal: Yes")
# Parameters
params = meta.get("params", [])
if params:
click.echo("\nParameters:")
for p in params:
param_type = p.get("type", "any")
param_desc = p.get("description", "")
param_range = ""
if "min" in p and "max" in p:
param_range = f" [{p['min']}-{p['max']}]"
param_default = f" default: {p['default']}" if "default" in p else ""
click.echo(f" - {p['name']} ({param_type}): {param_desc}{param_range}{param_default}")
# Dependencies
deps = meta.get("dependencies", [])
if deps:
click.echo("\nDependencies:")
for dep in deps:
click.echo(f" - {dep}")
# Source code
if source:
click.echo("\nSource Code:")
click.echo("-" * 40)
try:
source_resp = requests.get(f"{get_server()}/effects/{cid}/source", headers=headers)
if source_resp.status_code == 200:
click.echo(source_resp.text)
else:
click.echo("(Source not available)")
except requests.RequestException:
click.echo("(Failed to fetch source)")
@cli.command("recipes")
@click.option("--limit", "-l", default=10, help="Max recipes to show")
@click.option("--offset", "-o", default=0, help="Offset for pagination")
def list_recipes(limit, offset):
"""List uploaded recipes for the current user with pagination."""
headers = get_auth_header(require_token=True)
try:
resp = requests.get(f"{get_server()}/recipes?offset={offset}&limit={limit}", headers=headers)
resp.raise_for_status()
data = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to list recipes: {e}", err=True)
sys.exit(1)
recipes = data.get("recipes", [])
has_more = data.get("has_more", False)
if not recipes:
click.echo("No recipes found.")
return
start = offset + 1
end = offset + len(recipes)
click.echo(f"Showing {start}-{end}" + (" (more available)" if has_more else ""))
click.echo()
for recipe in recipes:
recipe_id = recipe["recipe_id"]
var_count = len(recipe.get("variable_inputs", []))
friendly_name = recipe.get("friendly_name", "")
click.echo(f"Name: {recipe['name']}")
click.echo(f" Version: {recipe.get('version', 'N/A')}")
if friendly_name:
click.echo(f" Friendly Name: {friendly_name}")
click.echo(f" Variables: {var_count}")
click.echo(f" Recipe ID: {recipe_id}")
click.echo()
@cli.command("recipe")
@click.argument("recipe_id")
def show_recipe(recipe_id):
"""Show details of a recipe."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Use 'artdag login' first.", err=True)
sys.exit(1)
try:
headers = get_auth_header(require_token=True)
resp = requests.get(f"{get_server()}/recipes/{recipe_id}", headers=headers)
if resp.status_code == 404:
click.echo(f"Recipe not found: {recipe_id}", err=True)
sys.exit(1)
resp.raise_for_status()
recipe = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to get recipe: {e}", err=True)
sys.exit(1)
click.echo(f"Name: {recipe.get('name', 'Unnamed')}")
click.echo(f"Version: {recipe.get('version', 'N/A')}")
if recipe.get("friendly_name"):
click.echo(f"Friendly Name: {recipe['friendly_name']}")
click.echo(f"Description: {recipe.get('description', 'N/A')}")
click.echo(f"Recipe ID: {recipe['recipe_id']}")
click.echo(f"Owner: {recipe.get('owner', 'N/A')}")
if recipe.get("uploaded_at"):
click.echo(f"Uploaded: {recipe['uploaded_at']}")
if recipe.get("variable_inputs"):
click.echo("\nVariable Inputs:")
for inp in recipe["variable_inputs"]:
req = "*" if inp.get("required", True) else ""
click.echo(f" - {inp['name']}{req}: {inp.get('description', 'No description')}")
if recipe.get("fixed_inputs"):
click.echo("\nFixed Inputs:")
for inp in recipe["fixed_inputs"]:
click.echo(f" - {inp['asset']}: {inp['cid'][:16]}...")
@cli.command("run-recipe")
@click.argument("recipe_id")
@click.option("--input", "-i", "inputs", multiple=True, help="Input as node_id:cid")
@click.option("--wait", "-w", is_flag=True, help="Wait for completion")
def run_recipe(recipe_id, inputs, wait):
"""Run a recipe with variable inputs. Requires login.
RECIPE_ID: The recipe ID (content hash)
Example: artdag run-recipe abc123 -i source_image:def456
"""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
# Parse inputs
input_dict = {}
for inp in inputs:
if ":" not in inp:
click.echo(f"Invalid input format: {inp} (expected node_id:cid)", err=True)
sys.exit(1)
node_id, cid = inp.split(":", 1)
input_dict[node_id] = cid
# Run
try:
headers = get_auth_header(require_token=True)
resp = requests.post(
f"{get_server()}/recipes/{recipe_id}/run",
json={"inputs": input_dict},
headers=headers
)
if resp.status_code == 401:
click.echo("Authentication failed. Please login again.", err=True)
sys.exit(1)
if resp.status_code == 400:
error = resp.json().get("detail", "Bad request")
click.echo(f"Error: {error}", err=True)
sys.exit(1)
if resp.status_code == 404:
click.echo(f"Recipe not found: {recipe_id}", err=True)
sys.exit(1)
resp.raise_for_status()
result = resp.json()
except requests.RequestException as e:
click.echo(f"Run failed: {e}", err=True)
sys.exit(1)
click.echo(f"Run started: {result['run_id']}")
if result.get('recipe'):
click.echo(f"Recipe: {result['recipe']}")
click.echo(f"Status: {result.get('status', 'pending')}")
if wait:
click.echo("Waiting for completion...")
run_id = result["run_id"]
while True:
time.sleep(2)
try:
resp = requests.get(f"{get_server()}/runs/{run_id}")
resp.raise_for_status()
run = resp.json()
except requests.RequestException:
continue
if run["status"] == "completed":
click.echo(f"Completed! Output: {run.get('output_cid', 'N/A')}")
break
elif run["status"] == "failed":
click.echo(f"Failed: {run.get('error', 'Unknown error')}", err=True)
sys.exit(1)
@cli.command("delete-recipe")
@click.argument("recipe_id")
@click.option("--force", "-f", is_flag=True, help="Skip confirmation")
def delete_recipe(recipe_id, force):
"""Delete a recipe. Requires login."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
if not force:
if not click.confirm(f"Delete recipe {recipe_id[:16]}...?"):
click.echo("Cancelled.")
return
try:
headers = get_auth_header(require_token=True)
resp = requests.delete(f"{get_server()}/recipes/{recipe_id}", headers=headers)
if resp.status_code == 401:
click.echo("Authentication failed. Please login again.", err=True)
sys.exit(1)
if resp.status_code == 400:
error = resp.json().get("detail", "Cannot delete")
click.echo(f"Error: {error}", err=True)
sys.exit(1)
if resp.status_code == 404:
click.echo(f"Recipe not found: {recipe_id}", err=True)
sys.exit(1)
resp.raise_for_status()
except requests.RequestException as e:
click.echo(f"Delete failed: {e}", err=True)
sys.exit(1)
click.echo(f"Deleted recipe: {recipe_id[:16]}...")
# ============ v2 API Commands (3-Phase Execution) ============
@cli.command("plan")
@click.argument("recipe_file", type=click.Path(exists=True))
@click.option("--input", "-i", "inputs", multiple=True, help="Input as name:cid")
@click.option("--features", "-f", multiple=True, help="Features to extract (default: beats, energy)")
@click.option("--output", "-o", type=click.Path(), help="Save plan JSON to file")
def generate_plan(recipe_file, inputs, features, output):
"""Generate an execution plan from a recipe YAML. Requires login.
Preview what will be executed without actually running it.
RECIPE_FILE: Path to recipe YAML file
Example: artdag plan recipe.yaml -i source_video:abc123
"""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
# Read recipe YAML
with open(recipe_file) as f:
recipe_yaml = f.read()
# Parse inputs
input_hashes = {}
for inp in inputs:
if ":" not in inp:
click.echo(f"Invalid input format: {inp} (expected name:cid)", err=True)
sys.exit(1)
name, cid = inp.split(":", 1)
input_hashes[name] = cid
# Build request
request_data = {
"recipe_yaml": recipe_yaml,
"input_hashes": input_hashes,
}
if features:
request_data["features"] = list(features)
# Submit to API
try:
headers = get_auth_header(require_token=True)
resp = requests.post(
f"{get_server()}/api/v2/plan",
json=request_data,
headers=headers
)
if resp.status_code == 401:
click.echo("Authentication failed. Please login again.", err=True)
sys.exit(1)
if resp.status_code == 400:
click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True)
sys.exit(1)
resp.raise_for_status()
result = resp.json()
except requests.RequestException as e:
click.echo(f"Plan generation failed: {e}", err=True)
sys.exit(1)
# Display results
click.echo(f"Recipe: {result['recipe']}")
click.echo(f"Plan ID: {result['plan_id'][:16]}...")
click.echo(f"Total steps: {result['total_steps']}")
click.echo(f"Cached: {result['cached_steps']}")
click.echo(f"Pending: {result['pending_steps']}")
if result.get("steps"):
click.echo("\nSteps:")
for step in result["steps"]:
status = "✓ cached" if step["cached"] else "○ pending"
click.echo(f" L{step['level']} {step['step_id']:<20} {step['node_type']:<10} {status}")
# Save plan JSON if requested
if output:
with open(output, "w") as f:
f.write(result["plan_json"])
click.echo(f"\nPlan saved to: {output}")
elif result.get("plan_json"):
click.echo(f"\nUse --output to save the plan JSON for later execution.")
@cli.command("execute-plan")
@click.argument("plan_file", type=click.Path(exists=True))
@click.option("--wait", "-w", is_flag=True, help="Wait for completion")
def execute_plan(plan_file, wait):
"""Execute a pre-generated execution plan. Requires login.
PLAN_FILE: Path to plan JSON file (from 'artdag plan --output')
Example: artdag execute-plan plan.json --wait
"""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
# Read plan JSON
with open(plan_file) as f:
plan_json = f.read()
# Submit to API
try:
headers = get_auth_header(require_token=True)
resp = requests.post(
f"{get_server()}/api/v2/execute",
json={"plan_json": plan_json},
headers=headers
)
if resp.status_code == 401:
click.echo("Authentication failed. Please login again.", err=True)
sys.exit(1)
if resp.status_code == 400:
click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True)
sys.exit(1)
resp.raise_for_status()
result = resp.json()
except requests.RequestException as e:
click.echo(f"Execution failed: {e}", err=True)
sys.exit(1)
run_id = result["run_id"]
click.echo(f"Run started: {run_id}")
click.echo(f"Status: {result['status']}")
if wait:
_wait_for_v2_run(token_data, run_id)
@cli.command("run-v2")
@click.argument("recipe_file", type=click.Path(exists=True))
@click.option("--input", "-i", "inputs", multiple=True, help="Input as name:cid")
@click.option("--features", "-f", multiple=True, help="Features to extract (default: beats, energy)")
@click.option("--wait", "-w", is_flag=True, help="Wait for completion")
def run_recipe_v2(recipe_file, inputs, features, wait):
"""Run a recipe through 3-phase execution. Requires login.
Runs the full pipeline: Analyze → Plan → Execute
RECIPE_FILE: Path to recipe YAML file
Example: artdag run-v2 recipe.yaml -i source_video:abc123 --wait
"""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
# Read recipe YAML
with open(recipe_file) as f:
recipe_yaml = f.read()
# Parse recipe name for display
try:
recipe_data = yaml.safe_load(recipe_yaml)
recipe_name = recipe_data.get("name", "unknown")
except Exception:
recipe_name = "unknown"
# Parse inputs
input_hashes = {}
for inp in inputs:
if ":" not in inp:
click.echo(f"Invalid input format: {inp} (expected name:cid)", err=True)
sys.exit(1)
name, cid = inp.split(":", 1)
input_hashes[name] = cid
# Build request
request_data = {
"recipe_yaml": recipe_yaml,
"input_hashes": input_hashes,
}
if features:
request_data["features"] = list(features)
# Submit to API
click.echo(f"Running recipe: {recipe_name}")
click.echo(f"Inputs: {len(input_hashes)}")
try:
headers = get_auth_header(require_token=True)
resp = requests.post(
f"{get_server()}/api/v2/run-recipe",
json=request_data,
headers=headers
)
if resp.status_code == 401:
click.echo("Authentication failed. Please login again.", err=True)
sys.exit(1)
if resp.status_code == 400:
click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True)
sys.exit(1)
resp.raise_for_status()
result = resp.json()
except requests.RequestException as e:
click.echo(f"Run failed: {e}", err=True)
sys.exit(1)
run_id = result["run_id"]
click.echo(f"Run ID: {run_id}")
click.echo(f"Status: {result['status']}")
if result.get("output_cid"):
click.echo(f"Output: {result['output_cid']}")
if result.get("output_ipfs_cid"):
click.echo(f"IPFS CID: {result['output_ipfs_cid']}")
return
if wait:
_wait_for_v2_run(token_data, run_id)
def _wait_for_v2_run(token_data: dict, run_id: str):
"""Poll v2 run status until completion."""
click.echo("Waiting for completion...")
headers = get_auth_header(require_token=True)
while True:
time.sleep(2)
try:
resp = requests.get(
f"{get_server()}/api/v2/run/{run_id}",
headers=headers
)
resp.raise_for_status()
run = resp.json()
except requests.RequestException as e:
click.echo(f".", nl=False)
continue
status = run.get("status", "unknown")
if status == "completed":
click.echo(f"\nCompleted!")
if run.get("output_cid"):
click.echo(f"Output: {run['output_cid']}")
if run.get("output_ipfs_cid"):
click.echo(f"IPFS CID: {run['output_ipfs_cid']}")
if run.get("cached"):
click.echo(f"Steps cached: {run['cached']}")
if run.get("executed"):
click.echo(f"Steps executed: {run['executed']}")
break
elif status == "failed":
click.echo(f"\nFailed: {run.get('error', 'Unknown error')}", err=True)
sys.exit(1)
else:
click.echo(".", nl=False)
@cli.command("run-status")
@click.argument("run_id")
def run_status_v2(run_id):
"""Get status of a v2 run. Requires login.
RUN_ID: The run ID from run-v2 or execute-plan
"""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
try:
headers = get_auth_header(require_token=True)
resp = requests.get(
f"{get_server()}/api/v2/run/{run_id}",
headers=headers
)
if resp.status_code == 404:
click.echo(f"Run not found: {run_id}", err=True)
sys.exit(1)
resp.raise_for_status()
run = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to get status: {e}", err=True)
sys.exit(1)
click.echo(f"Run ID: {run_id}")
click.echo(f"Status: {run['status']}")
if run.get("recipe"):
click.echo(f"Recipe: {run['recipe']}")
if run.get("plan_id"):
click.echo(f"Plan ID: {run['plan_id'][:16]}...")
if run.get("output_cid"):
click.echo(f"Output: {run['output_cid']}")
if run.get("output_ipfs_cid"):
click.echo(f"IPFS CID: {run['output_ipfs_cid']}")
if run.get("cached") is not None:
click.echo(f"Cached: {run['cached']}")
if run.get("executed") is not None:
click.echo(f"Executed: {run['executed']}")
if run.get("error"):
click.echo(f"Error: {run['error']}")
if __name__ == "__main__":
cli()