diff --git a/server.py b/server.py index 5b7d626..3bf59d7 100644 --- a/server.py +++ b/server.py @@ -2593,7 +2593,7 @@ async def recipe_dag_visualization(recipe_id: str, request: Request): @app.post("/ui/recipes/{recipe_id}/run", response_class=HTMLResponse) async def ui_run_recipe(recipe_id: str, request: Request): - """HTMX handler: run a recipe with form inputs.""" + """HTMX handler: run a recipe with form inputs using 3-phase orchestration.""" ctx = await get_user_context_from_cookie(request) if not ctx: return '
Login required
' @@ -2604,13 +2604,13 @@ async def ui_run_recipe(recipe_id: str, request: Request): # Parse form data form_data = await request.form() - inputs = {} + input_hashes = {} for var_input in recipe.variable_inputs: value = form_data.get(var_input.node_id, "").strip() if var_input.required and not value: return f'
Missing required input: {var_input.name}
' if value: - inputs[var_input.node_id] = value + input_hashes[var_input.node_id] = value # Load recipe YAML recipe_path = cache_manager.get_by_content_hash(recipe_id) @@ -2618,22 +2618,33 @@ async def ui_run_recipe(recipe_id: str, request: Request): return '
Recipe YAML not found in cache
' try: - with open(recipe_path) as f: - yaml_config = yaml.safe_load(f) + recipe_yaml = recipe_path.read_text() - # Build DAG from recipe - dag = build_dag_from_recipe(yaml_config, inputs, recipe) + # Compute deterministic run_id + run_id = compute_run_id( + list(input_hashes.values()), + recipe.name, + recipe_id # recipe_id is already the content hash + ) - # Create run - run_id = str(uuid.uuid4()) - actor_id = ctx.actor_id + # Check if already completed + cached = await database.get_run_cache(run_id) + if cached: + output_hash = cached.get("output_hash") + if cache_manager.has_content(output_hash): + return f''' +
+ Already completed! View run +
+ ''' - # Collect all input hashes - all_inputs = list(inputs.values()) + # Collect all input hashes for RunStatus + all_inputs = list(input_hashes.values()) for fixed in recipe.fixed_inputs: if fixed.content_hash: all_inputs.append(fixed.content_hash) + # Create run status run = RunStatus( run_id=run_id, status="pending", @@ -2641,12 +2652,17 @@ async def ui_run_recipe(recipe_id: str, request: Request): inputs=all_inputs, output_name=f"{recipe.name}-{run_id[:8]}", created_at=datetime.now(timezone.utc).isoformat(), - username=actor_id + username=ctx.actor_id ) - # Submit to Celery - dag_json = dag.to_json() - task = execute_dag.delay(dag_json, run.run_id) + # Submit to orchestrated run_recipe task (3-phase: analyze, plan, execute) + from tasks.orchestrate import run_recipe + task = run_recipe.delay( + recipe_yaml=recipe_yaml, + input_hashes=input_hashes, + features=["beats", "energy"], + run_id=run_id, + ) run.celery_task_id = task.id run.status = "running" @@ -2658,6 +2674,7 @@ async def ui_run_recipe(recipe_id: str, request: Request): ''' except Exception as e: + logger.error(f"Recipe run failed: {e}") return f'
Error: {str(e)}
' @@ -6012,7 +6029,7 @@ class ExecutePlanRequest(BaseModel): plan_json: str # JSON-serialized ExecutionPlan -@app.post("/api/v2/plan") +@app.post("/api/plan") async def generate_plan_endpoint( request: PlanRequest, ctx: UserContext = Depends(get_required_user_context) @@ -6051,7 +6068,7 @@ async def generate_plan_endpoint( raise HTTPException(status_code=500, detail=str(e)) -@app.post("/api/v2/execute") +@app.post("/api/execute") async def execute_plan_endpoint( request: ExecutePlanRequest, ctx: UserContext = Depends(get_required_user_context) @@ -6084,7 +6101,7 @@ async def execute_plan_endpoint( raise HTTPException(status_code=500, detail=str(e)) -@app.post("/api/v2/run-recipe") +@app.post("/api/run-recipe") async def run_recipe_endpoint( request: RecipeRunRequest, ctx: UserContext = Depends(get_required_user_context) @@ -6096,7 +6113,7 @@ async def run_recipe_endpoint( 2. Plan: Generate execution plan with cache IDs 3. Execute: Run steps with parallel execution - Returns immediately with run_id. Poll /api/v2/run/{run_id} for status. + Returns immediately with run_id. Poll /api/run/{run_id} for status. """ from tasks.orchestrate import run_recipe @@ -6162,10 +6179,10 @@ async def run_recipe_endpoint( raise HTTPException(status_code=500, detail=str(e)) -@app.get("/api/v2/run/{run_id}") -async def get_run_v2(run_id: str, ctx: UserContext = Depends(get_required_user_context)): +@app.get("/api/run/{run_id}") +async def get_run_api(run_id: str, ctx: UserContext = Depends(get_required_user_context)): """ - Get status of a 3-phase execution run. + Get status of a recipe execution run. """ # Check Redis for run status run_data = redis_client.get(f"{RUNS_KEY_PREFIX}{run_id}")