Convert DAG nodes dict to steps list in get_run_plan()
The CLI expects {"steps": [...]} but DAG format stores {"nodes": {...}}.
Added _dag_to_steps() to convert between formats, including topological
sorting so sources appear first.
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -512,6 +512,56 @@ class RunService:
|
|||||||
|
|
||||||
return True, None
|
return True, None
|
||||||
|
|
||||||
|
def _dag_to_steps(self, dag: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
|
"""Convert DAG nodes dict format to steps list format.
|
||||||
|
|
||||||
|
DAG format: {"nodes": {"id": {...}}, "output_id": "..."}
|
||||||
|
Steps format: {"steps": [{"id": "...", "type": "...", ...}], "output_id": "..."}
|
||||||
|
"""
|
||||||
|
if "steps" in dag:
|
||||||
|
# Already in steps format
|
||||||
|
return dag
|
||||||
|
|
||||||
|
if "nodes" not in dag:
|
||||||
|
return dag
|
||||||
|
|
||||||
|
nodes = dag.get("nodes", {})
|
||||||
|
steps = []
|
||||||
|
|
||||||
|
# Sort by topological order (sources first, then by input dependencies)
|
||||||
|
def get_level(node_id: str, visited: set = None) -> int:
|
||||||
|
if visited is None:
|
||||||
|
visited = set()
|
||||||
|
if node_id in visited:
|
||||||
|
return 0
|
||||||
|
visited.add(node_id)
|
||||||
|
node = nodes.get(node_id, {})
|
||||||
|
inputs = node.get("inputs", [])
|
||||||
|
if not inputs:
|
||||||
|
return 0
|
||||||
|
return 1 + max(get_level(inp, visited) for inp in inputs)
|
||||||
|
|
||||||
|
sorted_ids = sorted(nodes.keys(), key=lambda nid: (get_level(nid), nid))
|
||||||
|
|
||||||
|
for node_id in sorted_ids:
|
||||||
|
node = nodes[node_id]
|
||||||
|
steps.append({
|
||||||
|
"id": node_id,
|
||||||
|
"step_id": node_id,
|
||||||
|
"type": node.get("node_type", "EFFECT"),
|
||||||
|
"config": node.get("config", {}),
|
||||||
|
"inputs": node.get("inputs", []),
|
||||||
|
"name": node.get("name"),
|
||||||
|
"cache_id": node_id, # In code-addressed system, node_id IS the cache_id
|
||||||
|
})
|
||||||
|
|
||||||
|
return {
|
||||||
|
"steps": steps,
|
||||||
|
"output_id": dag.get("output_id"),
|
||||||
|
"metadata": dag.get("metadata", {}),
|
||||||
|
"format": "json",
|
||||||
|
}
|
||||||
|
|
||||||
async def get_run_plan(self, run_id: str) -> Optional[Dict[str, Any]]:
|
async def get_run_plan(self, run_id: str) -> Optional[Dict[str, Any]]:
|
||||||
"""Get execution plan for a run.
|
"""Get execution plan for a run.
|
||||||
|
|
||||||
@@ -535,8 +585,7 @@ class RunService:
|
|||||||
return {"sexp": content, "format": "sexp"}
|
return {"sexp": content, "format": "sexp"}
|
||||||
else:
|
else:
|
||||||
plan = json.loads(content)
|
plan = json.loads(content)
|
||||||
plan["format"] = "json"
|
return self._dag_to_steps(plan)
|
||||||
return plan
|
|
||||||
|
|
||||||
# Fall back to legacy plans directory
|
# Fall back to legacy plans directory
|
||||||
sexp_path = self.cache_dir / "plans" / f"{run_id}.sexp"
|
sexp_path = self.cache_dir / "plans" / f"{run_id}.sexp"
|
||||||
@@ -548,8 +597,7 @@ class RunService:
|
|||||||
if json_path.exists():
|
if json_path.exists():
|
||||||
with open(json_path) as f:
|
with open(json_path) as f:
|
||||||
plan = json.load(f)
|
plan = json.load(f)
|
||||||
plan["format"] = "json"
|
return self._dag_to_steps(plan)
|
||||||
return plan
|
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user