From 2c27eacb12f83ec39a2c88f75e5bf74a942daada Mon Sep 17 00:00:00 2001 From: gilesb Date: Tue, 13 Jan 2026 00:00:00 +0000 Subject: [PATCH] Convert DAG nodes dict to steps list in get_run_plan() The CLI expects {"steps": [...]} but DAG format stores {"nodes": {...}}. Added _dag_to_steps() to convert between formats, including topological sorting so sources appear first. Co-Authored-By: Claude Opus 4.5 --- app/services/run_service.py | 56 ++++++++++++++++++++++++++++++++++--- 1 file changed, 52 insertions(+), 4 deletions(-) diff --git a/app/services/run_service.py b/app/services/run_service.py index e73c673..54dd168 100644 --- a/app/services/run_service.py +++ b/app/services/run_service.py @@ -512,6 +512,56 @@ class RunService: return True, None + def _dag_to_steps(self, dag: Dict[str, Any]) -> Dict[str, Any]: + """Convert DAG nodes dict format to steps list format. + + DAG format: {"nodes": {"id": {...}}, "output_id": "..."} + Steps format: {"steps": [{"id": "...", "type": "...", ...}], "output_id": "..."} + """ + if "steps" in dag: + # Already in steps format + return dag + + if "nodes" not in dag: + return dag + + nodes = dag.get("nodes", {}) + steps = [] + + # Sort by topological order (sources first, then by input dependencies) + def get_level(node_id: str, visited: set = None) -> int: + if visited is None: + visited = set() + if node_id in visited: + return 0 + visited.add(node_id) + node = nodes.get(node_id, {}) + inputs = node.get("inputs", []) + if not inputs: + return 0 + return 1 + max(get_level(inp, visited) for inp in inputs) + + sorted_ids = sorted(nodes.keys(), key=lambda nid: (get_level(nid), nid)) + + for node_id in sorted_ids: + node = nodes[node_id] + steps.append({ + "id": node_id, + "step_id": node_id, + "type": node.get("node_type", "EFFECT"), + "config": node.get("config", {}), + "inputs": node.get("inputs", []), + "name": node.get("name"), + "cache_id": node_id, # In code-addressed system, node_id IS the cache_id + }) + + return { + "steps": steps, + "output_id": dag.get("output_id"), + "metadata": dag.get("metadata", {}), + "format": "json", + } + async def get_run_plan(self, run_id: str) -> Optional[Dict[str, Any]]: """Get execution plan for a run. @@ -535,8 +585,7 @@ class RunService: return {"sexp": content, "format": "sexp"} else: plan = json.loads(content) - plan["format"] = "json" - return plan + return self._dag_to_steps(plan) # Fall back to legacy plans directory sexp_path = self.cache_dir / "plans" / f"{run_id}.sexp" @@ -548,8 +597,7 @@ class RunService: if json_path.exists(): with open(json_path) as f: plan = json.load(f) - plan["format"] = "json" - return plan + return self._dag_to_steps(plan) return None