Add CLI feature parity with L1 web UI

New commands:
- stats: Show user counts (runs, recipes, effects, media, storage)
- effect <cid>: Show effect details with --source option
- storage list/add/test/delete: Manage IPFS storage providers
- clear-data: Clear all user L1 data (preserves storage config)

Enhanced commands:
- runs/recipes/effects/cache: Add --offset pagination
- cache: Add --type filter (all/image/video/audio)
- status: Add --plan, --artifacts, --analysis flags for detailed views

Other changes:
- All list commands now require auth and use server-side pagination
- Updated README with comprehensive CLI documentation

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-12 14:35:12 +00:00
parent dce277f1da
commit 84acfc45cf
2 changed files with 813 additions and 174 deletions

622
artdag.py
View File

@@ -254,6 +254,97 @@ def info():
click.echo(f"Runs: {data['runs_count']}")
@cli.command()
def stats():
"""Show user stats (runs, recipes, effects, media, storage counts)."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Use 'artdag login' first.", err=True)
sys.exit(1)
try:
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
resp = requests.get(f"{get_server()}/api/stats", headers=headers)
resp.raise_for_status()
stats = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to get stats: {e}", err=True)
sys.exit(1)
click.echo("User Stats:")
click.echo(f" Runs: {stats.get('runs', 0)}")
click.echo(f" Recipes: {stats.get('recipes', 0)}")
click.echo(f" Effects: {stats.get('effects', 0)}")
click.echo(f" Media: {stats.get('media', 0)}")
click.echo(f" Storage: {stats.get('storage', 0)}")
@cli.command("clear-data")
@click.option("--force", "-f", is_flag=True, help="Skip confirmation")
def clear_data(force):
"""Clear all user L1 data (runs, recipes, effects, media).
Storage provider configurations are preserved.
This action cannot be undone!
"""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Use 'artdag login' first.", err=True)
sys.exit(1)
# Show current stats first
try:
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
resp = requests.get(f"{get_server()}/api/stats", headers=headers)
resp.raise_for_status()
stats = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to get stats: {e}", err=True)
sys.exit(1)
click.echo("This will delete:")
click.echo(f" Runs: {stats.get('runs', 0)}")
click.echo(f" Recipes: {stats.get('recipes', 0)}")
click.echo(f" Effects: {stats.get('effects', 0)}")
click.echo(f" Media: {stats.get('media', 0)}")
click.echo()
click.echo("Storage configurations will be preserved.")
click.echo()
if not force:
if not click.confirm("Are you sure you want to delete all this data?"):
click.echo("Cancelled.")
return
click.echo()
click.echo("Clearing data...")
try:
resp = requests.delete(f"{get_server()}/api/clear-data", headers=headers)
resp.raise_for_status()
result = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to clear data: {e}", err=True)
sys.exit(1)
deleted = result.get("deleted", {})
click.echo()
click.echo("Deleted:")
click.echo(f" Runs: {deleted.get('runs', 0)}")
click.echo(f" Recipes: {deleted.get('recipes', 0)}")
click.echo(f" Effects: {deleted.get('effects', 0)}")
click.echo(f" Media: {deleted.get('media', 0)}")
errors = result.get("errors", [])
if errors:
click.echo()
click.echo("Errors encountered:")
for err in errors[:5]:
click.echo(f" - {err}")
if len(errors) > 5:
click.echo(f" ... and {len(errors) - 5} more")
@cli.command()
@click.argument("recipe")
@click.argument("input_hash")
@@ -315,39 +406,71 @@ def run(recipe, input_hash, name, wait):
@cli.command("runs")
@click.option("--limit", "-l", default=10, help="Max runs to show")
def list_runs(limit):
"""List all runs."""
runs = api_get("/runs")
@click.option("--offset", "-o", default=0, help="Offset for pagination")
def list_runs(limit, offset):
"""List all runs with pagination."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Use 'artdag login' first.", err=True)
sys.exit(1)
try:
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
resp = requests.get(f"{get_server()}/runs?offset={offset}&limit={limit}", headers=headers)
resp.raise_for_status()
data = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to list runs: {e}", err=True)
sys.exit(1)
runs = data.get("runs", [])
has_more = data.get("has_more", False)
if not runs:
click.echo("No runs found.")
return
start = offset + 1
end = offset + len(runs)
click.echo(f"Showing {start}-{end}" + (" (more available)" if has_more else ""))
click.echo()
click.echo(f"{'ID':<36} {'Status':<10} {'Recipe':<10} {'Output Hash':<20}")
click.echo("-" * 80)
for run in runs[:limit]:
for run in runs:
output = run.get("output_cid", "")[:16] + "..." if run.get("output_cid") else "-"
click.echo(f"{run['run_id']} {run['status']:<10} {run['recipe']:<10} {output}")
@cli.command()
@click.argument("run_id")
def status(run_id):
"""Get status of a run."""
@click.option("--plan", "-p", is_flag=True, help="Show execution plan with steps")
@click.option("--artifacts", "-a", is_flag=True, help="Show output artifacts")
@click.option("--analysis", is_flag=True, help="Show audio analysis data")
def status(run_id, plan, artifacts, analysis):
"""Get status of a run with optional detailed views."""
token_data = load_token()
headers = {}
if token_data.get("access_token"):
headers["Authorization"] = f"Bearer {token_data['access_token']}"
try:
run = api_get(f"/runs/{run_id}")
except requests.HTTPError as e:
if e.response.status_code == 404:
resp = requests.get(f"{get_server()}/runs/{run_id}", headers=headers)
if resp.status_code == 404:
click.echo(f"Run not found: {run_id}")
return
raise
resp.raise_for_status()
run = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to get run: {e}", err=True)
sys.exit(1)
# Basic status
click.echo(f"Run ID: {run['run_id']}")
click.echo(f"Status: {run['status']}")
click.echo(f"Recipe: {run['recipe']}")
click.echo(f"Inputs: {', '.join(run['inputs'])}")
click.echo(f"Output Name: {run['output_name']}")
click.echo(f"Inputs: {', '.join(run.get('inputs', []))}")
click.echo(f"Output Name: {run.get('output_name', 'N/A')}")
click.echo(f"Created: {run['created_at']}")
if run.get("completed_at"):
@@ -359,6 +482,98 @@ def status(run_id):
if run.get("error"):
click.echo(f"Error: {run['error']}")
# Plan view
if plan:
click.echo()
click.echo("Execution Plan:")
click.echo("-" * 60)
try:
plan_resp = requests.get(f"{get_server()}/runs/{run_id}/plan", headers=headers)
if plan_resp.status_code == 200:
plan_data = plan_resp.json()
steps = plan_data.get("steps", [])
if steps:
for i, step in enumerate(steps, 1):
status_str = step.get("status", "pending")
if status_str == "cached":
status_badge = "[cached]"
elif status_str == "completed":
status_badge = "[done]"
elif status_str == "running":
status_badge = "[running]"
else:
status_badge = "[pending]"
step_id = step.get("id", step.get("node_id", f"step_{i}"))
step_type = step.get("type", "unknown")
output_cid = step.get("output_cid", "")
output_str = f"{output_cid[:16]}..." if output_cid else ""
click.echo(f" {i}. {status_badge:<10} {step_id:<20} ({step_type}) {output_str}")
else:
click.echo(" No plan steps available.")
else:
click.echo(" Plan not available.")
except requests.RequestException:
click.echo(" Failed to fetch plan.")
# Artifacts view
if artifacts:
click.echo()
click.echo("Artifacts:")
click.echo("-" * 60)
try:
art_resp = requests.get(f"{get_server()}/runs/{run_id}/artifacts", headers=headers)
if art_resp.status_code == 200:
art_data = art_resp.json()
artifact_list = art_data.get("artifacts", [])
if artifact_list:
for art in artifact_list:
cid = art.get("cid", art.get("output_cid", "unknown"))
name = art.get("name", art.get("step_id", "output"))
media_type = art.get("media_type", art.get("content_type", ""))
size = art.get("size", "")
size_str = f" ({size})" if size else ""
type_str = f" [{media_type}]" if media_type else ""
click.echo(f" {name}: {cid[:24]}...{type_str}{size_str}")
else:
click.echo(" No artifacts available.")
else:
click.echo(" Artifacts not available.")
except requests.RequestException:
click.echo(" Failed to fetch artifacts.")
# Analysis view
if analysis:
click.echo()
click.echo("Analysis:")
click.echo("-" * 60)
try:
# Analysis is included in the detail view
detail_resp = requests.get(f"{get_server()}/runs/{run_id}/detail", headers=headers)
if detail_resp.status_code == 200:
detail_data = detail_resp.json()
analysis_data = detail_data.get("analysis", [])
if analysis_data:
for item in analysis_data:
input_name = item.get("input_name", item.get("name", "input"))
click.echo(f" {input_name}:")
if item.get("tempo"):
click.echo(f" Tempo: {item['tempo']} BPM")
if item.get("beat_count"):
click.echo(f" Beats: {item['beat_count']}")
if item.get("energy") is not None:
click.echo(f" Energy: {item['energy']}%")
if item.get("duration"):
click.echo(f" Duration: {item['duration']:.1f}s")
click.echo()
else:
click.echo(" No analysis data available.")
else:
click.echo(" Analysis not available.")
except requests.RequestException:
click.echo(" Failed to fetch analysis.")
@cli.command("delete-run")
@click.argument("run_id")
@@ -449,19 +664,92 @@ def delete_cache(cid, force):
click.echo(f"Deleted: {cid}")
MEDIA_TYPE_EXTENSIONS = {
"image": ["jpg", "jpeg", "png", "gif", "webp", "bmp", "svg"],
"video": ["mp4", "mkv", "webm", "mov", "avi", "wmv"],
"audio": ["mp3", "wav", "flac", "ogg", "m4a", "aac"],
}
def matches_media_type(item: dict, media_type: str) -> bool:
"""Check if item matches the requested media type."""
if media_type == "all":
return True
# Check content_type/media_type field
content_type = item.get("content_type", item.get("media_type", ""))
if content_type:
if media_type == "image" and content_type.startswith("image/"):
return True
if media_type == "video" and content_type.startswith("video/"):
return True
if media_type == "audio" and content_type.startswith("audio/"):
return True
# Check filename extension
filename = item.get("filename", item.get("friendly_name", ""))
if filename:
ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
if ext in MEDIA_TYPE_EXTENSIONS.get(media_type, []):
return True
return False
@cli.command()
@click.option("--limit", "-l", default=20, help="Max items to show")
def cache(limit):
"""List cached content."""
items = api_get("/cache")
@click.option("--offset", "-o", default=0, help="Offset for pagination")
@click.option("--type", "-t", "media_type", type=click.Choice(["all", "image", "video", "audio"]),
default="all", help="Filter by media type")
def cache(limit, offset, media_type):
"""List cached content with pagination and optional type filter."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Use 'artdag login' first.", err=True)
sys.exit(1)
# Fetch more items if filtering to ensure we get enough results
fetch_limit = limit * 3 if media_type != "all" else limit
try:
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
resp = requests.get(f"{get_server()}/cache?offset={offset}&limit={fetch_limit}", headers=headers)
resp.raise_for_status()
data = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to list cache: {e}", err=True)
sys.exit(1)
items = data.get("items", [])
has_more = data.get("has_more", False)
# Filter by media type if requested
if media_type != "all":
items = [item for item in items if isinstance(item, dict) and matches_media_type(item, media_type)]
items = items[:limit] # Apply limit after filtering
if not items:
click.echo("Cache is empty.")
if media_type != "all":
click.echo(f"No {media_type} files found in cache.")
else:
click.echo("Cache is empty.")
return
click.echo(f"Cached content ({len(items)} items):")
for item in items[:limit]:
click.echo(f" {item}")
start = offset + 1
end = offset + len(items)
type_str = f" ({media_type})" if media_type != "all" else ""
click.echo(f"Showing {start}-{end}{type_str}" + (" (more available)" if has_more else ""))
click.echo()
for item in items:
cid = item.get("cid", item) if isinstance(item, dict) else item
name = item.get("friendly_name") or item.get("filename") if isinstance(item, dict) else None
content_type = item.get("content_type", "") if isinstance(item, dict) else ""
type_badge = f"[{content_type.split('/')[0]}]" if content_type else ""
if name:
click.echo(f" {cid[:24]}... {name} {type_badge}")
else:
click.echo(f" {cid} {type_badge}")
@cli.command()
@@ -951,6 +1239,172 @@ def collection_delete(name):
click.echo(f"Deleted collection: {name}")
# ============ Storage Commands ============
STORAGE_PROVIDER_TYPES = ["pinata", "web3storage", "nftstorage", "infura", "filebase", "storj", "local"]
STORAGE_CONFIG_FIELDS = {
"pinata": ["api_key", "secret_key"],
"web3storage": ["api_token"],
"nftstorage": ["api_token"],
"infura": ["project_id", "project_secret"],
"filebase": ["access_key", "secret_key", "bucket"],
"storj": ["access_key", "secret_key", "bucket"],
"local": ["path"],
}
@cli.group()
def storage():
"""Manage IPFS storage providers."""
pass
@storage.command("list")
def storage_list():
"""List all storage providers."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Use 'artdag login' first.", err=True)
sys.exit(1)
try:
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
resp = requests.get(f"{get_server()}/storage", headers=headers)
resp.raise_for_status()
data = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to list storage providers: {e}", err=True)
sys.exit(1)
storages = data.get("storages", [])
if not storages:
click.echo("No storage providers configured.")
click.echo(f"\nAvailable types: {', '.join(STORAGE_PROVIDER_TYPES)}")
click.echo("Use 'artdag storage add <type>' to add one.")
return
click.echo("Storage Providers:")
click.echo()
for s in storages:
status = "Active" if s.get("is_active", True) else "Inactive"
click.echo(f" [{s['id']}] {s['provider_name'] or s['provider_type']} ({s['provider_type']})")
click.echo(f" Status: {status}")
click.echo(f" Capacity: {s.get('capacity_gb', 'N/A')} GB")
click.echo()
@storage.command("add")
@click.argument("provider_type", type=click.Choice(STORAGE_PROVIDER_TYPES))
@click.option("--name", "-n", help="Friendly name for this provider")
@click.option("--capacity", "-c", type=int, default=5, help="Capacity in GB (default: 5)")
def storage_add(provider_type, name, capacity):
"""Add a storage provider (interactive config)."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Use 'artdag login' first.", err=True)
sys.exit(1)
# Get config fields for this provider type
fields = STORAGE_CONFIG_FIELDS.get(provider_type, [])
config = {}
click.echo(f"Configuring {provider_type} storage provider...")
click.echo()
for field in fields:
is_secret = "secret" in field.lower() or "key" in field.lower() or "token" in field.lower()
if is_secret:
value = click.prompt(f" {field}", hide_input=True)
else:
value = click.prompt(f" {field}")
config[field] = value
# Send to server
try:
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
payload = {
"provider_type": provider_type,
"config": config,
"capacity_gb": capacity,
}
if name:
payload["provider_name"] = name
resp = requests.post(f"{get_server()}/storage", json=payload, headers=headers)
if resp.status_code == 400:
click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True)
sys.exit(1)
resp.raise_for_status()
result = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to add storage provider: {e}", err=True)
sys.exit(1)
click.echo()
click.echo(f"Storage provider added (ID: {result.get('id')})")
@storage.command("test")
@click.argument("storage_id", type=int)
def storage_test(storage_id):
"""Test storage provider connectivity."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Use 'artdag login' first.", err=True)
sys.exit(1)
try:
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
resp = requests.post(f"{get_server()}/storage/{storage_id}/test", headers=headers)
if resp.status_code == 404:
click.echo(f"Storage provider not found: {storage_id}", err=True)
sys.exit(1)
resp.raise_for_status()
result = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to test storage: {e}", err=True)
sys.exit(1)
if result.get("success"):
click.echo(f"Success: {result.get('message', 'Connection OK')}")
else:
click.echo(f"Failed: {result.get('message', 'Unknown error')}", err=True)
sys.exit(1)
@storage.command("delete")
@click.argument("storage_id", type=int)
@click.option("--force", "-f", is_flag=True, help="Skip confirmation")
def storage_delete(storage_id, force):
"""Delete a storage provider."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Use 'artdag login' first.", err=True)
sys.exit(1)
if not force:
if not click.confirm(f"Delete storage provider {storage_id}?"):
click.echo("Cancelled.")
return
try:
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
resp = requests.delete(f"{get_server()}/storage/{storage_id}", headers=headers)
if resp.status_code == 400:
click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True)
sys.exit(1)
if resp.status_code == 404:
click.echo(f"Storage provider not found: {storage_id}", err=True)
sys.exit(1)
resp.raise_for_status()
except requests.RequestException as e:
click.echo(f"Failed to delete storage provider: {e}", err=True)
sys.exit(1)
click.echo(f"Deleted storage provider: {storage_id}")
# ============ Recipe Commands ============
def _is_sexp_file(filepath: str, content: str) -> bool:
@@ -1073,24 +1527,32 @@ def upload_effect(filepath):
@cli.command("effects")
@click.option("--limit", "-l", default=20, help="Max effects to show")
def list_effects(limit):
"""List uploaded effects."""
try:
headers = {}
token_data = load_token()
if token_data.get("access_token"):
headers["Authorization"] = f"Bearer {token_data['access_token']}"
@click.option("--offset", "-o", default=0, help="Offset for pagination")
def list_effects(limit, offset):
"""List uploaded effects with pagination."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Use 'artdag login' first.", err=True)
sys.exit(1)
resp = requests.get(f"{get_server()}/effects", headers=headers)
try:
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
resp = requests.get(f"{get_server()}/effects?offset={offset}&limit={limit}", headers=headers)
resp.raise_for_status()
result = resp.json()
effects = result.get("effects", [])[:limit]
effects = result.get("effects", [])
has_more = result.get("has_more", False)
if not effects:
click.echo("No effects found")
return
click.echo(f"Effects ({len(effects)}):\n")
start = offset + 1
end = offset + len(effects)
click.echo(f"Showing {start}-{end}" + (" (more available)" if has_more else ""))
click.echo()
for effect in effects:
meta = effect.get("meta", {})
click.echo(f" {meta.get('name', 'unknown')} v{meta.get('version', '?')}")
@@ -1104,12 +1566,92 @@ def list_effects(limit):
sys.exit(1)
@cli.command("effect")
@click.argument("cid")
@click.option("--source", "-s", is_flag=True, help="Show source code")
def show_effect(cid, source):
"""Show details of an effect by CID."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Use 'artdag login' first.", err=True)
sys.exit(1)
try:
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
resp = requests.get(f"{get_server()}/effects/{cid}", headers=headers)
if resp.status_code == 404:
click.echo(f"Effect not found: {cid}", err=True)
sys.exit(1)
resp.raise_for_status()
effect = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to get effect: {e}", err=True)
sys.exit(1)
meta = effect.get("meta", effect)
name = meta.get("name", "Unnamed")
version = meta.get("version", "1.0.0")
author = meta.get("author", "Unknown")
description = meta.get("description", "No description")
click.echo(f"Name: {name} (v{version})")
click.echo(f"Author: {author}")
click.echo(f"Description: {description}")
click.echo(f"CID: {effect.get('cid', cid)}")
if effect.get("uploaded_at"):
click.echo(f"Uploaded: {effect['uploaded_at']}")
if effect.get("uploader"):
click.echo(f"Uploader: {effect['uploader']}")
if meta.get("temporal"):
click.echo("Temporal: Yes")
# Parameters
params = meta.get("params", [])
if params:
click.echo("\nParameters:")
for p in params:
param_type = p.get("type", "any")
param_desc = p.get("description", "")
param_range = ""
if "min" in p and "max" in p:
param_range = f" [{p['min']}-{p['max']}]"
param_default = f" default: {p['default']}" if "default" in p else ""
click.echo(f" - {p['name']} ({param_type}): {param_desc}{param_range}{param_default}")
# Dependencies
deps = meta.get("dependencies", [])
if deps:
click.echo("\nDependencies:")
for dep in deps:
click.echo(f" - {dep}")
# Source code
if source:
click.echo("\nSource Code:")
click.echo("-" * 40)
try:
source_resp = requests.get(f"{get_server()}/effects/{cid}/source", headers=headers)
if source_resp.status_code == 200:
click.echo(source_resp.text)
else:
click.echo("(Source not available)")
except requests.RequestException:
click.echo("(Failed to fetch source)")
@cli.command("recipes")
@click.option("--limit", "-l", default=10, help="Max recipes to show")
def list_recipes(limit):
"""List uploaded recipes."""
@click.option("--offset", "-o", default=0, help="Offset for pagination")
def list_recipes(limit, offset):
"""List uploaded recipes for the current user with pagination."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Use 'artdag login' first.", err=True)
sys.exit(1)
try:
resp = requests.get(f"{get_server()}/recipes")
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
resp = requests.get(f"{get_server()}/recipes?offset={offset}&limit={limit}", headers=headers)
resp.raise_for_status()
data = resp.json()
except requests.RequestException as e:
@@ -1117,14 +1659,20 @@ def list_recipes(limit):
sys.exit(1)
recipes = data.get("recipes", [])
has_more = data.get("has_more", False)
if not recipes:
click.echo("No recipes found.")
return
start = offset + 1
end = offset + len(recipes)
click.echo(f"Showing {start}-{end}" + (" (more available)" if has_more else ""))
click.echo()
click.echo(f"{'Name':<20} {'Version':<8} {'Variables':<10} {'Recipe ID':<24}")
click.echo("-" * 70)
for recipe in recipes[:limit]:
for recipe in recipes:
recipe_id = recipe["recipe_id"][:20] + "..."
var_count = len(recipe.get("variable_inputs", []))
click.echo(f"{recipe['name']:<20} {recipe['version']:<8} {var_count:<10} {recipe_id}")
@@ -1134,8 +1682,14 @@ def list_recipes(limit):
@click.argument("recipe_id")
def show_recipe(recipe_id):
"""Show details of a recipe."""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Use 'artdag login' first.", err=True)
sys.exit(1)
try:
resp = requests.get(f"{get_server()}/recipes/{recipe_id}")
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
resp = requests.get(f"{get_server()}/recipes/{recipe_id}", headers=headers)
if resp.status_code == 404:
click.echo(f"Recipe not found: {recipe_id}", err=True)
sys.exit(1)