diff --git a/server.py b/server.py index b8b91b9..eea98db 100644 --- a/server.py +++ b/server.py @@ -272,6 +272,11 @@ class RunStatus(BaseModel): username: Optional[str] = None # Owner of the run (ActivityPub actor ID) infrastructure: Optional[dict] = None # Hardware/software used for rendering provenance_cid: Optional[str] = None # IPFS CID of provenance record + # Plan execution tracking + plan_id: Optional[str] = None # ID of the execution plan + plan_name: Optional[str] = None # Human-readable plan name + step_results: Optional[dict] = None # step_id -> result dict (status, cache_id, outputs) + all_outputs: Optional[list] = None # All outputs from all steps # ============ Recipe Models ============ @@ -825,13 +830,18 @@ async def get_run(run_id: str): run.status = "completed" run.completed_at = datetime.now(timezone.utc).isoformat() - # Handle both legacy (render_effect) and new (execute_dag) result formats - if "output_hash" in result: - # New DAG result format - run.output_hash = result.get("output_hash") + # Handle both legacy (render_effect) and new (execute_dag/run_plan) result formats + if "output_hash" in result or "output_cache_id" in result: + # New DAG/plan result format + run.output_hash = result.get("output_hash") or result.get("output_cache_id") run.provenance_cid = result.get("provenance_cid") output_path = Path(result.get("output_path", "")) if result.get("output_path") else None - else: + # Store plan execution data + run.plan_id = result.get("plan_id") + run.plan_name = result.get("plan_name") + run.step_results = result.get("results") # step_id -> result dict + run.all_outputs = result.get("outputs") # All outputs from all steps + elif "output" in result: # Legacy render_effect format run.output_hash = result.get("output", {}).get("content_hash") run.provenance_cid = result.get("provenance_cid") @@ -846,8 +856,9 @@ async def get_run(run_id: str): # Extract infrastructure info (legacy only) run.infrastructure = result.get("infrastructure") - # Cache the output (legacy mode - DAG already caches via cache_manager) - if output_path and output_path.exists() and "output_hash" not in result: + # Cache the output (legacy mode - DAG/plan already caches via cache_manager) + is_plan_result = "output_hash" in result or "output_cache_id" in result + if output_path and output_path.exists() and not is_plan_result: t0 = time.time() await cache_file(output_path, node_type="effect_output") logger.info(f"get_run: cache_file took {time.time()-t0:.3f}s") @@ -1383,27 +1394,70 @@ async def run_plan_visualization(run_id: str, request: Request): nodes = [] edges = [] steps = plan_data.get("steps", []) + all_outputs = plan_data.get("outputs", []) # Pre-built outputs if available + + # Use run's execution results if available (from completed runs) + step_results = run.step_results or {} + if run.all_outputs: + all_outputs = run.all_outputs + + # Build output lookup by step_id + outputs_by_step = {} + for output in all_outputs: + step_id = output.get("step_id") + if step_id: + if step_id not in outputs_by_step: + outputs_by_step[step_id] = [] + outputs_by_step[step_id].append(output) for step in steps: node_type = step.get("node_type", "EFFECT") color = NODE_COLORS.get(node_type, NODE_COLORS["default"]) - cached = step.get("cached", False) - status = "cached" if cached else "pending" - - # Shorter label for display step_id = step.get("step_id", "") - label = step_id[:12] + "..." if len(step_id) > 12 else step_id + + # Get execution result for this step if available + step_result = step_results.get(step_id, {}) + result_status = step_result.get("status") + + # Determine status from result or plan + if result_status in ("completed", "cached", "completed_by_other"): + status = result_status + elif step.get("cached", False): + status = "cached" + elif run.status == "running": + status = "pending" + else: + status = "pending" + + # Use human-readable name if available, otherwise short step_id + step_name = step.get("name", "") + if step_name: + # Use last part of dotted name for label + label_parts = step_name.split(".") + label = label_parts[-1] if label_parts else step_name + else: + label = step_id[:12] + "..." if len(step_id) > 12 else step_id + + # Get outputs for this step - prefer result outputs, then plan outputs + step_outputs = step_result.get("outputs", []) or step.get("outputs", []) or outputs_by_step.get(step_id, []) + output_cache_ids = [o.get("cache_id") for o in step_outputs] + + # Get cache_id from result if available + cache_id = step_result.get("cache_id") or step.get("cache_id", "") nodes.append({ "data": { "id": step_id, "label": label, + "name": step_name, "nodeType": node_type, "level": step.get("level", 0), - "cacheId": step.get("cache_id", ""), + "cacheId": cache_id, "status": status, "color": color, - "config": step.get("config") + "config": step.get("config"), + "outputs": step_outputs, + "outputCacheIds": output_cache_ids, } }) @@ -1422,16 +1476,31 @@ async def run_plan_visualization(run_id: str, request: Request): }) except json.JSONDecodeError: pass + else: + # Build edges directly from steps + for step in steps: + step_id = step.get("step_id", "") + for input_step in step.get("input_steps", []): + edges.append({ + "data": { + "source": input_step, + "target": step_id + } + }) nodes_json = json.dumps(nodes) edges_json = json.dumps(edges) dag_html = render_dag_cytoscape(nodes_json, edges_json) - # Stats summary - total = plan_data.get("total_steps", len(steps)) - cached = plan_data.get("cached_steps", sum(1 for s in steps if s.get("cached"))) - pending = plan_data.get("pending_steps", total - cached) + # Stats summary - count from built nodes to reflect actual execution status + total = len(nodes) + completed_count = sum(1 for n in nodes if n["data"]["status"] in ("completed", "completed_by_other")) + cached_count = sum(1 for n in nodes if n["data"]["status"] == "cached") + pending_count = total - completed_count - cached_count + + # Plan name for display + plan_name = run.plan_name or plan_data.get("recipe", run.recipe) content = f''' @@ -1444,20 +1513,24 @@ async def run_plan_visualization(run_id: str, request: Request): {tabs_html}
-

Execution Plan

+

Execution Plan: {plan_name}

-
+
{total}
Total Steps
-
{cached}
+
{completed_count}
+
Completed
+
+
+
{cached_count}
Cached
-
{pending}
-
Executed
+
{pending_count}
+
Pending
@@ -4489,8 +4562,27 @@ def render_dag_cytoscape(nodes_json: str, edges_json: str, container_id: str = " return f'''