Plan-based caching with artifact playback in UI

RunStatus now stores:
- plan_id, plan_name for linking to execution plan
- step_results for per-step execution status
- all_outputs for all artifacts from all steps

Plan visualization:
- Shows human-readable step names from recipe structure
- Video/audio artifact preview on node click
- Outputs list with links to cached artifacts
- Stats reflect actual execution status (completed/cached/pending)

Execution:
- Step results include outputs list with cache_ids
- run_plan returns all outputs from all steps
- Support for completed_by_other status

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-11 00:20:19 +00:00
parent 36bf0cd3f7
commit 3db606bf15
3 changed files with 278 additions and 32 deletions

View File

@@ -222,6 +222,32 @@ def execute_step(
# Mark completed
complete_task(step.cache_id, worker_id, str(cached_file.path))
# Build outputs list (for multi-output support)
outputs = []
if step.outputs:
# Use pre-defined outputs from step
for output_def in step.outputs:
outputs.append({
"name": output_def.name,
"cache_id": output_def.cache_id,
"media_type": output_def.media_type,
"index": output_def.index,
"path": str(cached_file.path),
"content_hash": cached_file.content_hash,
"ipfs_cid": ipfs_cid,
})
else:
# Single output (backwards compat)
outputs.append({
"name": step.name or step.step_id,
"cache_id": step.cache_id,
"media_type": "video/mp4",
"index": 0,
"path": str(cached_file.path),
"content_hash": cached_file.content_hash,
"ipfs_cid": ipfs_cid,
})
# Cleanup temp
if output_dir.exists():
import shutil
@@ -230,10 +256,12 @@ def execute_step(
return {
"status": "completed",
"step_id": step.step_id,
"name": step.name,
"cache_id": step.cache_id,
"output_path": str(cached_file.path),
"content_hash": cached_file.content_hash,
"ipfs_cid": ipfs_cid,
"outputs": outputs,
}
except Exception as e:

View File

@@ -168,15 +168,49 @@ def run_plan(
output_cache_id = output_step.cache_id if output_step else None
output_path = None
output_ipfs_cid = None
output_name = plan.output_name
if output_cache_id:
output_path = cache_mgr.get_by_content_hash(output_cache_id)
output_ipfs_cid = cache_mgr.get_ipfs_cid(output_cache_id)
# Build list of all outputs with their names and artifacts
all_outputs = []
for step in plan.steps:
step_result = results_by_step.get(step.step_id, {})
step_outputs = step_result.get("outputs", [])
# If no outputs in result, build from step definition
if not step_outputs and step.outputs:
for output_def in step.outputs:
output_cache_path = cache_mgr.get_by_content_hash(output_def.cache_id)
output_ipfs = cache_mgr.get_ipfs_cid(output_def.cache_id) if output_cache_path else None
all_outputs.append({
"name": output_def.name,
"step_id": step.step_id,
"step_name": step.name,
"cache_id": output_def.cache_id,
"media_type": output_def.media_type,
"path": str(output_cache_path) if output_cache_path else None,
"ipfs_cid": output_ipfs,
"status": "cached" if output_cache_path else "missing",
})
else:
for output in step_outputs:
all_outputs.append({
**output,
"step_id": step.step_id,
"step_name": step.name,
"status": "completed",
})
return {
"status": "completed",
"run_id": run_id,
"plan_id": plan.plan_id,
"plan_name": plan.name,
"recipe_name": plan.recipe_name,
"output_name": output_name,
"output_cache_id": output_cache_id,
"output_path": str(output_path) if output_path else None,
"output_ipfs_cid": output_ipfs_cid,
@@ -184,6 +218,7 @@ def run_plan(
"cached": total_cached,
"executed": total_executed,
"results": results_by_step,
"outputs": all_outputs,
}