Use orchestrated run_recipe for all recipe runs, remove v2 prefix

- UI recipe run now uses tasks.orchestrate.run_recipe (3-phase)
- Deterministic run_id via compute_run_id for cache deduplication
- Check for already-completed runs before starting
- Rename /api/v2/* endpoints to /api/* (plan, execute, run-recipe, run)
- All recipe runs now go through: analyze → plan → execute

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
giles
2026-01-11 01:07:14 +00:00
parent 41ceae1e6c
commit b9f8b5538b

View File

@@ -2593,7 +2593,7 @@ async def recipe_dag_visualization(recipe_id: str, request: Request):
@app.post("/ui/recipes/{recipe_id}/run", response_class=HTMLResponse)
async def ui_run_recipe(recipe_id: str, request: Request):
"""HTMX handler: run a recipe with form inputs."""
"""HTMX handler: run a recipe with form inputs using 3-phase orchestration."""
ctx = await get_user_context_from_cookie(request)
if not ctx:
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Login required</div>'
@@ -2604,13 +2604,13 @@ async def ui_run_recipe(recipe_id: str, request: Request):
# Parse form data
form_data = await request.form()
inputs = {}
input_hashes = {}
for var_input in recipe.variable_inputs:
value = form_data.get(var_input.node_id, "").strip()
if var_input.required and not value:
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Missing required input: {var_input.name}</div>'
if value:
inputs[var_input.node_id] = value
input_hashes[var_input.node_id] = value
# Load recipe YAML
recipe_path = cache_manager.get_by_content_hash(recipe_id)
@@ -2618,22 +2618,33 @@ async def ui_run_recipe(recipe_id: str, request: Request):
return '<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Recipe YAML not found in cache</div>'
try:
with open(recipe_path) as f:
yaml_config = yaml.safe_load(f)
recipe_yaml = recipe_path.read_text()
# Build DAG from recipe
dag = build_dag_from_recipe(yaml_config, inputs, recipe)
# Compute deterministic run_id
run_id = compute_run_id(
list(input_hashes.values()),
recipe.name,
recipe_id # recipe_id is already the content hash
)
# Create run
run_id = str(uuid.uuid4())
actor_id = ctx.actor_id
# Check if already completed
cached = await database.get_run_cache(run_id)
if cached:
output_hash = cached.get("output_hash")
if cache_manager.has_content(output_hash):
return f'''
<div class="bg-blue-900/50 border border-blue-700 text-blue-300 px-4 py-3 rounded-lg mb-4">
Already completed! <a href="/run/{run_id}" class="underline">View run</a>
</div>
'''
# Collect all input hashes
all_inputs = list(inputs.values())
# Collect all input hashes for RunStatus
all_inputs = list(input_hashes.values())
for fixed in recipe.fixed_inputs:
if fixed.content_hash:
all_inputs.append(fixed.content_hash)
# Create run status
run = RunStatus(
run_id=run_id,
status="pending",
@@ -2641,12 +2652,17 @@ async def ui_run_recipe(recipe_id: str, request: Request):
inputs=all_inputs,
output_name=f"{recipe.name}-{run_id[:8]}",
created_at=datetime.now(timezone.utc).isoformat(),
username=actor_id
username=ctx.actor_id
)
# Submit to Celery
dag_json = dag.to_json()
task = execute_dag.delay(dag_json, run.run_id)
# Submit to orchestrated run_recipe task (3-phase: analyze, plan, execute)
from tasks.orchestrate import run_recipe
task = run_recipe.delay(
recipe_yaml=recipe_yaml,
input_hashes=input_hashes,
features=["beats", "energy"],
run_id=run_id,
)
run.celery_task_id = task.id
run.status = "running"
@@ -2658,6 +2674,7 @@ async def ui_run_recipe(recipe_id: str, request: Request):
</div>
'''
except Exception as e:
logger.error(f"Recipe run failed: {e}")
return f'<div class="bg-red-900/50 border border-red-700 text-red-300 px-4 py-3 rounded-lg mb-4">Error: {str(e)}</div>'
@@ -6012,7 +6029,7 @@ class ExecutePlanRequest(BaseModel):
plan_json: str # JSON-serialized ExecutionPlan
@app.post("/api/v2/plan")
@app.post("/api/plan")
async def generate_plan_endpoint(
request: PlanRequest,
ctx: UserContext = Depends(get_required_user_context)
@@ -6051,7 +6068,7 @@ async def generate_plan_endpoint(
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/v2/execute")
@app.post("/api/execute")
async def execute_plan_endpoint(
request: ExecutePlanRequest,
ctx: UserContext = Depends(get_required_user_context)
@@ -6084,7 +6101,7 @@ async def execute_plan_endpoint(
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/v2/run-recipe")
@app.post("/api/run-recipe")
async def run_recipe_endpoint(
request: RecipeRunRequest,
ctx: UserContext = Depends(get_required_user_context)
@@ -6096,7 +6113,7 @@ async def run_recipe_endpoint(
2. Plan: Generate execution plan with cache IDs
3. Execute: Run steps with parallel execution
Returns immediately with run_id. Poll /api/v2/run/{run_id} for status.
Returns immediately with run_id. Poll /api/run/{run_id} for status.
"""
from tasks.orchestrate import run_recipe
@@ -6162,10 +6179,10 @@ async def run_recipe_endpoint(
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/v2/run/{run_id}")
async def get_run_v2(run_id: str, ctx: UserContext = Depends(get_required_user_context)):
@app.get("/api/run/{run_id}")
async def get_run_api(run_id: str, ctx: UserContext = Depends(get_required_user_context)):
"""
Get status of a 3-phase execution run.
Get status of a recipe execution run.
"""
# Check Redis for run status
run_data = redis_client.get(f"{RUNS_KEY_PREFIX}{run_id}")