Fix DAG visualization and step link handling

- Handle dict inputs ({"node": "id"}) when building DAG edges
- Add normalize_inputs() to convert dict inputs to node IDs for steps
- Fix _parse_inputs to use _json.loads (correct import alias)
- Add SOURCE/EFFECT/SEQUENCE colors to node color maps

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-12 00:05:50 +00:00
parent 7a4cd3d413
commit 5c3558e1ba
2 changed files with 58 additions and 21 deletions

View File

@@ -110,6 +110,19 @@ async def get_run(
else: else:
run["username"] = actor_id or "Unknown" run["username"] = actor_id or "Unknown"
# Helper to normalize input refs to just node IDs
def normalize_inputs(inputs):
"""Convert input refs (may be dicts or strings) to list of node IDs."""
result = []
for inp in inputs:
if isinstance(inp, dict):
node_id = inp.get("node") or inp.get("input") or inp.get("id")
else:
node_id = inp
if node_id:
result.append(node_id)
return result
# Try to load the recipe to show the plan # Try to load the recipe to show the plan
plan = None plan = None
recipe_id = run.get("recipe") recipe_id = run.get("recipe")
@@ -132,7 +145,7 @@ async def get_run(
"name": node_id, "name": node_id,
"type": node.get("type", "EFFECT"), "type": node.get("type", "EFFECT"),
"status": "completed", # Run completed "status": "completed", # Run completed
"inputs": node.get("inputs", []), "inputs": normalize_inputs(node.get("inputs", [])),
"config": node.get("config", {}), "config": node.get("config", {}),
}) })
elif isinstance(nodes, dict): elif isinstance(nodes, dict):
@@ -142,7 +155,7 @@ async def get_run(
"name": node_id, "name": node_id,
"type": node.get("type", "EFFECT"), "type": node.get("type", "EFFECT"),
"status": "completed", "status": "completed",
"inputs": node.get("inputs", []), "inputs": normalize_inputs(node.get("inputs", [])),
"config": node.get("config", {}), "config": node.get("config", {}),
}) })
@@ -230,12 +243,18 @@ async def get_run(
} }
}) })
for inp in step.get("inputs", []): for inp in step.get("inputs", []):
dag_elements.append({ # Handle both string and dict inputs
"data": { if isinstance(inp, dict):
"source": inp, source = inp.get("node") or inp.get("input") or inp.get("id")
"target": step_id, else:
} source = inp
}) if source:
dag_elements.append({
"data": {
"source": source,
"target": step_id,
}
})
templates = get_templates(request) templates = get_templates(request)
return render(templates, "runs/detail.html", request, return render(templates, "runs/detail.html", request,
@@ -336,23 +355,32 @@ async def run_detail(
"analyze": "#ec4899", "analyze": "#ec4899",
"transform": "#10b981", "transform": "#10b981",
"output": "#f59e0b", "output": "#f59e0b",
"SOURCE": "#3b82f6",
"EFFECT": "#8b5cf6",
"SEQUENCE": "#ec4899",
} }
for i, step in enumerate(plan["steps"]): for i, step in enumerate(plan["steps"]):
step_id = step.get("id", f"step-{i}")
dag_elements.append({ dag_elements.append({
"data": { "data": {
"id": step.get("id", f"step-{i}"), "id": step_id,
"label": step.get("name", f"Step {i+1}"), "label": step.get("name", f"Step {i+1}"),
"color": node_colors.get(step.get("type", "effect"), "#6b7280"), "color": node_colors.get(step.get("type", "effect"), "#6b7280"),
} }
}) })
# Add edges from inputs # Add edges from inputs (handle both string and dict formats)
for inp in step.get("inputs", []): for inp in step.get("inputs", []):
dag_elements.append({ if isinstance(inp, dict):
"data": { source = inp.get("node") or inp.get("input") or inp.get("id")
"source": inp, else:
"target": step.get("id", f"step-{i}"), source = inp
} if source:
}) dag_elements.append({
"data": {
"source": source,
"target": step_id,
}
})
if wants_json(request): if wants_json(request):
return { return {
@@ -404,6 +432,9 @@ async def run_plan(
"analyze": "#ec4899", "analyze": "#ec4899",
"transform": "#10b981", "transform": "#10b981",
"output": "#f59e0b", "output": "#f59e0b",
"SOURCE": "#3b82f6",
"EFFECT": "#8b5cf6",
"SEQUENCE": "#ec4899",
} }
for i, step in enumerate(plan.get("steps", [])): for i, step in enumerate(plan.get("steps", [])):
@@ -416,9 +447,15 @@ async def run_plan(
} }
}) })
for inp in step.get("inputs", []): for inp in step.get("inputs", []):
dag_elements.append({ # Handle both string and dict formats
"data": {"source": inp, "target": step_id} if isinstance(inp, dict):
}) source = inp.get("node") or inp.get("input") or inp.get("id")
else:
source = inp
if source:
dag_elements.append({
"data": {"source": source, "target": step_id}
})
templates = get_templates(request) templates = get_templates(request)
return render(templates, "runs/plan.html", request, return render(templates, "runs/plan.html", request,

View File

@@ -1159,11 +1159,11 @@ def _parse_inputs(inputs_value):
return inputs_value return inputs_value
if isinstance(inputs_value, str): if isinstance(inputs_value, str):
try: try:
parsed = json.loads(inputs_value) parsed = _json.loads(inputs_value)
if isinstance(parsed, list): if isinstance(parsed, list):
return parsed return parsed
return [] return []
except (json.JSONDecodeError, TypeError): except (_json.JSONDecodeError, TypeError):
return [] return []
return [] return []