Add v2 API commands for 3-phase execution

- plan: Generate and preview execution plan
- execute-plan: Execute pre-generated plan
- run-v2: Full 3-phase pipeline (Analyze → Plan → Execute)
- run-status: Check v2 run status

Uses new /api/v2/* endpoints for the 3-phase execution model.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-10 11:51:43 +00:00
parent f4da1fffb2
commit 2b1e699ab0

299
artdag.py
View File

@@ -1090,5 +1090,304 @@ def delete_recipe(recipe_id, force):
click.echo(f"Deleted recipe: {recipe_id[:16]}...")
# ============ v2 API Commands (3-Phase Execution) ============
@cli.command("plan")
@click.argument("recipe_file", type=click.Path(exists=True))
@click.option("--input", "-i", "inputs", multiple=True, help="Input as name:content_hash")
@click.option("--features", "-f", multiple=True, help="Features to extract (default: beats, energy)")
@click.option("--output", "-o", type=click.Path(), help="Save plan JSON to file")
def generate_plan(recipe_file, inputs, features, output):
"""Generate an execution plan from a recipe YAML. Requires login.
Preview what will be executed without actually running it.
RECIPE_FILE: Path to recipe YAML file
Example: artdag plan recipe.yaml -i source_video:abc123
"""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
# Read recipe YAML
with open(recipe_file) as f:
recipe_yaml = f.read()
# Parse inputs
input_hashes = {}
for inp in inputs:
if ":" not in inp:
click.echo(f"Invalid input format: {inp} (expected name:content_hash)", err=True)
sys.exit(1)
name, content_hash = inp.split(":", 1)
input_hashes[name] = content_hash
# Build request
request_data = {
"recipe_yaml": recipe_yaml,
"input_hashes": input_hashes,
}
if features:
request_data["features"] = list(features)
# Submit to API
try:
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
resp = requests.post(
f"{get_server()}/api/v2/plan",
json=request_data,
headers=headers
)
if resp.status_code == 401:
click.echo("Authentication failed. Please login again.", err=True)
sys.exit(1)
if resp.status_code == 400:
click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True)
sys.exit(1)
resp.raise_for_status()
result = resp.json()
except requests.RequestException as e:
click.echo(f"Plan generation failed: {e}", err=True)
sys.exit(1)
# Display results
click.echo(f"Recipe: {result['recipe']}")
click.echo(f"Plan ID: {result['plan_id'][:16]}...")
click.echo(f"Total steps: {result['total_steps']}")
click.echo(f"Cached: {result['cached_steps']}")
click.echo(f"Pending: {result['pending_steps']}")
if result.get("steps"):
click.echo("\nSteps:")
for step in result["steps"]:
status = "✓ cached" if step["cached"] else "○ pending"
click.echo(f" L{step['level']} {step['step_id']:<20} {step['node_type']:<10} {status}")
# Save plan JSON if requested
if output:
with open(output, "w") as f:
f.write(result["plan_json"])
click.echo(f"\nPlan saved to: {output}")
elif result.get("plan_json"):
click.echo(f"\nUse --output to save the plan JSON for later execution.")
@cli.command("execute-plan")
@click.argument("plan_file", type=click.Path(exists=True))
@click.option("--wait", "-w", is_flag=True, help="Wait for completion")
def execute_plan(plan_file, wait):
"""Execute a pre-generated execution plan. Requires login.
PLAN_FILE: Path to plan JSON file (from 'artdag plan --output')
Example: artdag execute-plan plan.json --wait
"""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
# Read plan JSON
with open(plan_file) as f:
plan_json = f.read()
# Submit to API
try:
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
resp = requests.post(
f"{get_server()}/api/v2/execute",
json={"plan_json": plan_json},
headers=headers
)
if resp.status_code == 401:
click.echo("Authentication failed. Please login again.", err=True)
sys.exit(1)
if resp.status_code == 400:
click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True)
sys.exit(1)
resp.raise_for_status()
result = resp.json()
except requests.RequestException as e:
click.echo(f"Execution failed: {e}", err=True)
sys.exit(1)
run_id = result["run_id"]
click.echo(f"Run started: {run_id}")
click.echo(f"Status: {result['status']}")
if wait:
_wait_for_v2_run(token_data, run_id)
@cli.command("run-v2")
@click.argument("recipe_file", type=click.Path(exists=True))
@click.option("--input", "-i", "inputs", multiple=True, help="Input as name:content_hash")
@click.option("--features", "-f", multiple=True, help="Features to extract (default: beats, energy)")
@click.option("--wait", "-w", is_flag=True, help="Wait for completion")
def run_recipe_v2(recipe_file, inputs, features, wait):
"""Run a recipe through 3-phase execution. Requires login.
Runs the full pipeline: Analyze → Plan → Execute
RECIPE_FILE: Path to recipe YAML file
Example: artdag run-v2 recipe.yaml -i source_video:abc123 --wait
"""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
# Read recipe YAML
with open(recipe_file) as f:
recipe_yaml = f.read()
# Parse recipe name for display
try:
recipe_data = yaml.safe_load(recipe_yaml)
recipe_name = recipe_data.get("name", "unknown")
except Exception:
recipe_name = "unknown"
# Parse inputs
input_hashes = {}
for inp in inputs:
if ":" not in inp:
click.echo(f"Invalid input format: {inp} (expected name:content_hash)", err=True)
sys.exit(1)
name, content_hash = inp.split(":", 1)
input_hashes[name] = content_hash
# Build request
request_data = {
"recipe_yaml": recipe_yaml,
"input_hashes": input_hashes,
}
if features:
request_data["features"] = list(features)
# Submit to API
click.echo(f"Running recipe: {recipe_name}")
click.echo(f"Inputs: {len(input_hashes)}")
try:
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
resp = requests.post(
f"{get_server()}/api/v2/run-recipe",
json=request_data,
headers=headers
)
if resp.status_code == 401:
click.echo("Authentication failed. Please login again.", err=True)
sys.exit(1)
if resp.status_code == 400:
click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True)
sys.exit(1)
resp.raise_for_status()
result = resp.json()
except requests.RequestException as e:
click.echo(f"Run failed: {e}", err=True)
sys.exit(1)
run_id = result["run_id"]
click.echo(f"Run ID: {run_id}")
click.echo(f"Status: {result['status']}")
if result.get("output_hash"):
click.echo(f"Output: {result['output_hash']}")
if result.get("output_ipfs_cid"):
click.echo(f"IPFS CID: {result['output_ipfs_cid']}")
return
if wait:
_wait_for_v2_run(token_data, run_id)
def _wait_for_v2_run(token_data: dict, run_id: str):
"""Poll v2 run status until completion."""
click.echo("Waiting for completion...")
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
while True:
time.sleep(2)
try:
resp = requests.get(
f"{get_server()}/api/v2/run/{run_id}",
headers=headers
)
resp.raise_for_status()
run = resp.json()
except requests.RequestException as e:
click.echo(f".", nl=False)
continue
status = run.get("status", "unknown")
if status == "completed":
click.echo(f"\nCompleted!")
if run.get("output_hash"):
click.echo(f"Output: {run['output_hash']}")
if run.get("output_ipfs_cid"):
click.echo(f"IPFS CID: {run['output_ipfs_cid']}")
if run.get("cached"):
click.echo(f"Steps cached: {run['cached']}")
if run.get("executed"):
click.echo(f"Steps executed: {run['executed']}")
break
elif status == "failed":
click.echo(f"\nFailed: {run.get('error', 'Unknown error')}", err=True)
sys.exit(1)
else:
click.echo(".", nl=False)
@cli.command("run-status")
@click.argument("run_id")
def run_status_v2(run_id):
"""Get status of a v2 run. Requires login.
RUN_ID: The run ID from run-v2 or execute-plan
"""
token_data = load_token()
if not token_data.get("access_token"):
click.echo("Not logged in. Please run: artdag login <username>", err=True)
sys.exit(1)
try:
headers = {"Authorization": f"Bearer {token_data['access_token']}"}
resp = requests.get(
f"{get_server()}/api/v2/run/{run_id}",
headers=headers
)
if resp.status_code == 404:
click.echo(f"Run not found: {run_id}", err=True)
sys.exit(1)
resp.raise_for_status()
run = resp.json()
except requests.RequestException as e:
click.echo(f"Failed to get status: {e}", err=True)
sys.exit(1)
click.echo(f"Run ID: {run_id}")
click.echo(f"Status: {run['status']}")
if run.get("recipe"):
click.echo(f"Recipe: {run['recipe']}")
if run.get("plan_id"):
click.echo(f"Plan ID: {run['plan_id'][:16]}...")
if run.get("output_hash"):
click.echo(f"Output: {run['output_hash']}")
if run.get("output_ipfs_cid"):
click.echo(f"IPFS CID: {run['output_ipfs_cid']}")
if run.get("cached") is not None:
click.echo(f"Cached: {run['cached']}")
if run.get("executed") is not None:
click.echo(f"Executed: {run['executed']}")
if run.get("error"):
click.echo(f"Error: {run['error']}")
if __name__ == "__main__":
cli()