diff --git a/database.py b/database.py index 8967a7d..a2cc80c 100644 --- a/database.py +++ b/database.py @@ -90,6 +90,7 @@ CREATE TABLE IF NOT EXISTS pending_runs ( recipe VARCHAR(255) NOT NULL, inputs JSONB NOT NULL, dag_json TEXT, + plan_cid VARCHAR(128), output_name VARCHAR(255), actor_id VARCHAR(255), error TEXT, @@ -1480,7 +1481,7 @@ async def get_pending_run(run_id: str) -> Optional[dict]: async with pool.acquire() as conn: row = await conn.fetchrow( """ - SELECT run_id, celery_task_id, status, recipe, inputs, dag_json, output_name, actor_id, error, created_at, updated_at + SELECT run_id, celery_task_id, status, recipe, inputs, dag_json, plan_cid, output_name, actor_id, error, created_at, updated_at FROM pending_runs WHERE run_id = $1 """, run_id @@ -1497,6 +1498,7 @@ async def get_pending_run(run_id: str) -> Optional[dict]: "recipe": row["recipe"], "inputs": inputs, "dag_json": row["dag_json"], + "plan_cid": row["plan_cid"], "output_name": row["output_name"], "actor_id": row["actor_id"], "error": row["error"], @@ -1571,6 +1573,16 @@ async def update_pending_run_status(run_id: str, status: str, error: Optional[st return "UPDATE 1" in result +async def update_pending_run_plan(run_id: str, plan_cid: str) -> bool: + """Update the plan_cid of a pending run (called when plan is generated).""" + async with pool.acquire() as conn: + result = await conn.execute( + "UPDATE pending_runs SET plan_cid = $2, updated_at = NOW() WHERE run_id = $1", + run_id, plan_cid + ) + return "UPDATE 1" in result + + async def complete_pending_run(run_id: str) -> bool: """Remove a pending run after it completes (moves to run_cache).""" async with pool.acquire() as conn: diff --git a/legacy_tasks.py b/legacy_tasks.py index 5b39415..874f282 100644 --- a/legacy_tasks.py +++ b/legacy_tasks.py @@ -649,6 +649,24 @@ def execute_recipe(self, recipe_sexp: str, input_hashes: Dict[str, str], run_id: plan_path = CACHE_DIR / plan_cid CACHE_DIR.mkdir(parents=True, exist_ok=True) plan_path.write_text(plan_sexp) + + # Save plan_cid to database immediately so it's available even if run fails + if run_id: + import asyncio + import database + async def save_plan_cid(): + if database.pool is None: + await database.init_db() + await database.update_pending_run_plan(run_id, plan_cid) + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + asyncio.ensure_future(save_plan_cid()) + else: + loop.run_until_complete(save_plan_cid()) + except RuntimeError: + asyncio.run(save_plan_cid()) + logger.info(f"Saved plan_cid to pending run: {run_id}") except Exception as e: logger.warning(f"Failed to store plan to IPFS: {e}")