diff --git a/artdag.py b/artdag.py index 41a76b2..5e43a2e 100755 --- a/artdag.py +++ b/artdag.py @@ -1090,5 +1090,304 @@ def delete_recipe(recipe_id, force): click.echo(f"Deleted recipe: {recipe_id[:16]}...") +# ============ v2 API Commands (3-Phase Execution) ============ + +@cli.command("plan") +@click.argument("recipe_file", type=click.Path(exists=True)) +@click.option("--input", "-i", "inputs", multiple=True, help="Input as name:content_hash") +@click.option("--features", "-f", multiple=True, help="Features to extract (default: beats, energy)") +@click.option("--output", "-o", type=click.Path(), help="Save plan JSON to file") +def generate_plan(recipe_file, inputs, features, output): + """Generate an execution plan from a recipe YAML. Requires login. + + Preview what will be executed without actually running it. + + RECIPE_FILE: Path to recipe YAML file + + Example: artdag plan recipe.yaml -i source_video:abc123 + """ + token_data = load_token() + if not token_data.get("access_token"): + click.echo("Not logged in. Please run: artdag login ", err=True) + sys.exit(1) + + # Read recipe YAML + with open(recipe_file) as f: + recipe_yaml = f.read() + + # Parse inputs + input_hashes = {} + for inp in inputs: + if ":" not in inp: + click.echo(f"Invalid input format: {inp} (expected name:content_hash)", err=True) + sys.exit(1) + name, content_hash = inp.split(":", 1) + input_hashes[name] = content_hash + + # Build request + request_data = { + "recipe_yaml": recipe_yaml, + "input_hashes": input_hashes, + } + if features: + request_data["features"] = list(features) + + # Submit to API + try: + headers = {"Authorization": f"Bearer {token_data['access_token']}"} + resp = requests.post( + f"{get_server()}/api/v2/plan", + json=request_data, + headers=headers + ) + if resp.status_code == 401: + click.echo("Authentication failed. Please login again.", err=True) + sys.exit(1) + if resp.status_code == 400: + click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True) + sys.exit(1) + resp.raise_for_status() + result = resp.json() + except requests.RequestException as e: + click.echo(f"Plan generation failed: {e}", err=True) + sys.exit(1) + + # Display results + click.echo(f"Recipe: {result['recipe']}") + click.echo(f"Plan ID: {result['plan_id'][:16]}...") + click.echo(f"Total steps: {result['total_steps']}") + click.echo(f"Cached: {result['cached_steps']}") + click.echo(f"Pending: {result['pending_steps']}") + + if result.get("steps"): + click.echo("\nSteps:") + for step in result["steps"]: + status = "✓ cached" if step["cached"] else "○ pending" + click.echo(f" L{step['level']} {step['step_id']:<20} {step['node_type']:<10} {status}") + + # Save plan JSON if requested + if output: + with open(output, "w") as f: + f.write(result["plan_json"]) + click.echo(f"\nPlan saved to: {output}") + elif result.get("plan_json"): + click.echo(f"\nUse --output to save the plan JSON for later execution.") + + +@cli.command("execute-plan") +@click.argument("plan_file", type=click.Path(exists=True)) +@click.option("--wait", "-w", is_flag=True, help="Wait for completion") +def execute_plan(plan_file, wait): + """Execute a pre-generated execution plan. Requires login. + + PLAN_FILE: Path to plan JSON file (from 'artdag plan --output') + + Example: artdag execute-plan plan.json --wait + """ + token_data = load_token() + if not token_data.get("access_token"): + click.echo("Not logged in. Please run: artdag login ", err=True) + sys.exit(1) + + # Read plan JSON + with open(plan_file) as f: + plan_json = f.read() + + # Submit to API + try: + headers = {"Authorization": f"Bearer {token_data['access_token']}"} + resp = requests.post( + f"{get_server()}/api/v2/execute", + json={"plan_json": plan_json}, + headers=headers + ) + if resp.status_code == 401: + click.echo("Authentication failed. Please login again.", err=True) + sys.exit(1) + if resp.status_code == 400: + click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True) + sys.exit(1) + resp.raise_for_status() + result = resp.json() + except requests.RequestException as e: + click.echo(f"Execution failed: {e}", err=True) + sys.exit(1) + + run_id = result["run_id"] + click.echo(f"Run started: {run_id}") + click.echo(f"Status: {result['status']}") + + if wait: + _wait_for_v2_run(token_data, run_id) + + +@cli.command("run-v2") +@click.argument("recipe_file", type=click.Path(exists=True)) +@click.option("--input", "-i", "inputs", multiple=True, help="Input as name:content_hash") +@click.option("--features", "-f", multiple=True, help="Features to extract (default: beats, energy)") +@click.option("--wait", "-w", is_flag=True, help="Wait for completion") +def run_recipe_v2(recipe_file, inputs, features, wait): + """Run a recipe through 3-phase execution. Requires login. + + Runs the full pipeline: Analyze → Plan → Execute + + RECIPE_FILE: Path to recipe YAML file + + Example: artdag run-v2 recipe.yaml -i source_video:abc123 --wait + """ + token_data = load_token() + if not token_data.get("access_token"): + click.echo("Not logged in. Please run: artdag login ", err=True) + sys.exit(1) + + # Read recipe YAML + with open(recipe_file) as f: + recipe_yaml = f.read() + + # Parse recipe name for display + try: + recipe_data = yaml.safe_load(recipe_yaml) + recipe_name = recipe_data.get("name", "unknown") + except Exception: + recipe_name = "unknown" + + # Parse inputs + input_hashes = {} + for inp in inputs: + if ":" not in inp: + click.echo(f"Invalid input format: {inp} (expected name:content_hash)", err=True) + sys.exit(1) + name, content_hash = inp.split(":", 1) + input_hashes[name] = content_hash + + # Build request + request_data = { + "recipe_yaml": recipe_yaml, + "input_hashes": input_hashes, + } + if features: + request_data["features"] = list(features) + + # Submit to API + click.echo(f"Running recipe: {recipe_name}") + click.echo(f"Inputs: {len(input_hashes)}") + + try: + headers = {"Authorization": f"Bearer {token_data['access_token']}"} + resp = requests.post( + f"{get_server()}/api/v2/run-recipe", + json=request_data, + headers=headers + ) + if resp.status_code == 401: + click.echo("Authentication failed. Please login again.", err=True) + sys.exit(1) + if resp.status_code == 400: + click.echo(f"Error: {resp.json().get('detail', 'Bad request')}", err=True) + sys.exit(1) + resp.raise_for_status() + result = resp.json() + except requests.RequestException as e: + click.echo(f"Run failed: {e}", err=True) + sys.exit(1) + + run_id = result["run_id"] + click.echo(f"Run ID: {run_id}") + click.echo(f"Status: {result['status']}") + + if result.get("output_hash"): + click.echo(f"Output: {result['output_hash']}") + if result.get("output_ipfs_cid"): + click.echo(f"IPFS CID: {result['output_ipfs_cid']}") + return + + if wait: + _wait_for_v2_run(token_data, run_id) + + +def _wait_for_v2_run(token_data: dict, run_id: str): + """Poll v2 run status until completion.""" + click.echo("Waiting for completion...") + headers = {"Authorization": f"Bearer {token_data['access_token']}"} + + while True: + time.sleep(2) + try: + resp = requests.get( + f"{get_server()}/api/v2/run/{run_id}", + headers=headers + ) + resp.raise_for_status() + run = resp.json() + except requests.RequestException as e: + click.echo(f".", nl=False) + continue + + status = run.get("status", "unknown") + + if status == "completed": + click.echo(f"\nCompleted!") + if run.get("output_hash"): + click.echo(f"Output: {run['output_hash']}") + if run.get("output_ipfs_cid"): + click.echo(f"IPFS CID: {run['output_ipfs_cid']}") + if run.get("cached"): + click.echo(f"Steps cached: {run['cached']}") + if run.get("executed"): + click.echo(f"Steps executed: {run['executed']}") + break + elif status == "failed": + click.echo(f"\nFailed: {run.get('error', 'Unknown error')}", err=True) + sys.exit(1) + else: + click.echo(".", nl=False) + + +@cli.command("run-status") +@click.argument("run_id") +def run_status_v2(run_id): + """Get status of a v2 run. Requires login. + + RUN_ID: The run ID from run-v2 or execute-plan + """ + token_data = load_token() + if not token_data.get("access_token"): + click.echo("Not logged in. Please run: artdag login ", err=True) + sys.exit(1) + + try: + headers = {"Authorization": f"Bearer {token_data['access_token']}"} + resp = requests.get( + f"{get_server()}/api/v2/run/{run_id}", + headers=headers + ) + if resp.status_code == 404: + click.echo(f"Run not found: {run_id}", err=True) + sys.exit(1) + resp.raise_for_status() + run = resp.json() + except requests.RequestException as e: + click.echo(f"Failed to get status: {e}", err=True) + sys.exit(1) + + click.echo(f"Run ID: {run_id}") + click.echo(f"Status: {run['status']}") + + if run.get("recipe"): + click.echo(f"Recipe: {run['recipe']}") + if run.get("plan_id"): + click.echo(f"Plan ID: {run['plan_id'][:16]}...") + if run.get("output_hash"): + click.echo(f"Output: {run['output_hash']}") + if run.get("output_ipfs_cid"): + click.echo(f"IPFS CID: {run['output_ipfs_cid']}") + if run.get("cached") is not None: + click.echo(f"Cached: {run['cached']}") + if run.get("executed") is not None: + click.echo(f"Executed: {run['executed']}") + if run.get("error"): + click.echo(f"Error: {run['error']}") + + if __name__ == "__main__": cli()