From 2e3d3a5c6db9edcf1eb82b29eba9bcc4bb5b1913 Mon Sep 17 00:00:00 2001 From: gilesb Date: Mon, 12 Jan 2026 18:43:48 +0000 Subject: [PATCH] Store DAG plan to IPFS and track plan_cid in run_cache - Add plan_cid column to run_cache schema - Store DAG JSON to IPFS during execute_dag task - Return plan_cid in run status and list APIs Co-Authored-By: Claude Opus 4.5 --- app/services/run_service.py | 1 + database.py | 20 +++++++++++++------- legacy_tasks.py | 12 ++++++++++++ 3 files changed, 26 insertions(+), 7 deletions(-) diff --git a/app/services/run_service.py b/app/services/run_service.py index 20f9246..af8cd03 100644 --- a/app/services/run_service.py +++ b/app/services/run_service.py @@ -132,6 +132,7 @@ class RunService: "output_cid": cached.get("output_cid"), "ipfs_cid": cached.get("ipfs_cid"), "provenance_cid": cached.get("provenance_cid"), + "plan_cid": cached.get("plan_cid"), "actor_id": cached.get("actor_id"), "created_at": cached.get("created_at"), "completed_at": cached.get("created_at"), diff --git a/database.py b/database.py index a36dcf7..93ef91d 100644 --- a/database.py +++ b/database.py @@ -74,6 +74,7 @@ CREATE TABLE IF NOT EXISTS run_cache ( output_cid VARCHAR(64) NOT NULL, ipfs_cid VARCHAR(128), provenance_cid VARCHAR(128), + plan_cid VARCHAR(128), recipe VARCHAR(255) NOT NULL, inputs JSONB NOT NULL, actor_id VARCHAR(255), @@ -1083,7 +1084,7 @@ async def get_run_cache(run_id: str) -> Optional[dict]: async with pool.acquire() as conn: row = await conn.fetchrow( """ - SELECT run_id, output_cid, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at + SELECT run_id, output_cid, ipfs_cid, provenance_cid, plan_cid, recipe, inputs, actor_id, created_at FROM run_cache WHERE run_id = $1 """, run_id @@ -1094,6 +1095,7 @@ async def get_run_cache(run_id: str) -> Optional[dict]: "output_cid": row["output_cid"], "ipfs_cid": row["ipfs_cid"], "provenance_cid": row["provenance_cid"], + "plan_cid": row["plan_cid"], "recipe": row["recipe"], "inputs": row["inputs"], "actor_id": row["actor_id"], @@ -1109,27 +1111,30 @@ async def save_run_cache( inputs: List[str], ipfs_cid: Optional[str] = None, provenance_cid: Optional[str] = None, + plan_cid: Optional[str] = None, actor_id: Optional[str] = None, ) -> dict: """Save run result to cache. Updates if run_id already exists.""" async with pool.acquire() as conn: row = await conn.fetchrow( """ - INSERT INTO run_cache (run_id, output_cid, ipfs_cid, provenance_cid, recipe, inputs, actor_id) - VALUES ($1, $2, $3, $4, $5, $6, $7) + INSERT INTO run_cache (run_id, output_cid, ipfs_cid, provenance_cid, plan_cid, recipe, inputs, actor_id) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (run_id) DO UPDATE SET output_cid = EXCLUDED.output_cid, ipfs_cid = COALESCE(EXCLUDED.ipfs_cid, run_cache.ipfs_cid), - provenance_cid = COALESCE(EXCLUDED.provenance_cid, run_cache.provenance_cid) - RETURNING run_id, output_cid, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at + provenance_cid = COALESCE(EXCLUDED.provenance_cid, run_cache.provenance_cid), + plan_cid = COALESCE(EXCLUDED.plan_cid, run_cache.plan_cid) + RETURNING run_id, output_cid, ipfs_cid, provenance_cid, plan_cid, recipe, inputs, actor_id, created_at """, - run_id, output_cid, ipfs_cid, provenance_cid, recipe, _json.dumps(inputs), actor_id + run_id, output_cid, ipfs_cid, provenance_cid, plan_cid, recipe, _json.dumps(inputs), actor_id ) return { "run_id": row["run_id"], "output_cid": row["output_cid"], "ipfs_cid": row["ipfs_cid"], "provenance_cid": row["provenance_cid"], + "plan_cid": row["plan_cid"], "recipe": row["recipe"], "inputs": row["inputs"], "actor_id": row["actor_id"], @@ -1183,7 +1188,7 @@ async def list_runs_by_actor(actor_id: str, offset: int = 0, limit: int = 20) -> async with pool.acquire() as conn: rows = await conn.fetch( """ - SELECT run_id, output_cid, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at + SELECT run_id, output_cid, ipfs_cid, provenance_cid, plan_cid, recipe, inputs, actor_id, created_at FROM run_cache WHERE actor_id = $1 ORDER BY created_at DESC @@ -1197,6 +1202,7 @@ async def list_runs_by_actor(actor_id: str, offset: int = 0, limit: int = 20) -> "output_cid": row["output_cid"], "ipfs_cid": row["ipfs_cid"], "provenance_cid": row["provenance_cid"], + "plan_cid": row["plan_cid"], "recipe": row["recipe"], "inputs": _parse_inputs(row["inputs"]), "actor_id": row["actor_id"], diff --git a/legacy_tasks.py b/legacy_tasks.py index fa0ac36..5c66766 100644 --- a/legacy_tasks.py +++ b/legacy_tasks.py @@ -382,6 +382,17 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict: import asyncio import database + # Store plan (DAG) to IPFS + plan_cid = None + try: + import ipfs_client + dag_dict = json.loads(dag_json) + plan_cid = ipfs_client.add_json(dag_dict) + if plan_cid: + logger.info(f"Stored plan to IPFS: {plan_cid}") + except Exception as e: + logger.warning(f"Failed to store plan to IPFS: {e}") + async def save_to_db(): if database.pool is None: await database.init_db() @@ -409,6 +420,7 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict: inputs=input_hashes_for_db, ipfs_cid=output_ipfs_cid, actor_id=actor_id, + plan_cid=plan_cid, ) # Save output as media for the user