Save plan_cid to database immediately after plan creation

- Add plan_cid column to pending_runs table schema
- Add update_pending_run_plan() function to save plan_cid
- Update get_pending_run() to return plan_cid
- Save plan_cid right after storing plan to IPFS (before execution)
- Plan is now available even if run fails

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-13 04:13:10 +00:00
parent 7813eb081a
commit 2c3f943e5a
2 changed files with 31 additions and 1 deletions

View File

@@ -90,6 +90,7 @@ CREATE TABLE IF NOT EXISTS pending_runs (
recipe VARCHAR(255) NOT NULL,
inputs JSONB NOT NULL,
dag_json TEXT,
plan_cid VARCHAR(128),
output_name VARCHAR(255),
actor_id VARCHAR(255),
error TEXT,
@@ -1480,7 +1481,7 @@ async def get_pending_run(run_id: str) -> Optional[dict]:
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT run_id, celery_task_id, status, recipe, inputs, dag_json, output_name, actor_id, error, created_at, updated_at
SELECT run_id, celery_task_id, status, recipe, inputs, dag_json, plan_cid, output_name, actor_id, error, created_at, updated_at
FROM pending_runs WHERE run_id = $1
""",
run_id
@@ -1497,6 +1498,7 @@ async def get_pending_run(run_id: str) -> Optional[dict]:
"recipe": row["recipe"],
"inputs": inputs,
"dag_json": row["dag_json"],
"plan_cid": row["plan_cid"],
"output_name": row["output_name"],
"actor_id": row["actor_id"],
"error": row["error"],
@@ -1571,6 +1573,16 @@ async def update_pending_run_status(run_id: str, status: str, error: Optional[st
return "UPDATE 1" in result
async def update_pending_run_plan(run_id: str, plan_cid: str) -> bool:
"""Update the plan_cid of a pending run (called when plan is generated)."""
async with pool.acquire() as conn:
result = await conn.execute(
"UPDATE pending_runs SET plan_cid = $2, updated_at = NOW() WHERE run_id = $1",
run_id, plan_cid
)
return "UPDATE 1" in result
async def complete_pending_run(run_id: str) -> bool:
"""Remove a pending run after it completes (moves to run_cache)."""
async with pool.acquire() as conn: