Store DAG plan to IPFS and track plan_cid in run_cache
- Add plan_cid column to run_cache schema - Store DAG JSON to IPFS during execute_dag task - Return plan_cid in run status and list APIs Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -132,6 +132,7 @@ class RunService:
|
|||||||
"output_cid": cached.get("output_cid"),
|
"output_cid": cached.get("output_cid"),
|
||||||
"ipfs_cid": cached.get("ipfs_cid"),
|
"ipfs_cid": cached.get("ipfs_cid"),
|
||||||
"provenance_cid": cached.get("provenance_cid"),
|
"provenance_cid": cached.get("provenance_cid"),
|
||||||
|
"plan_cid": cached.get("plan_cid"),
|
||||||
"actor_id": cached.get("actor_id"),
|
"actor_id": cached.get("actor_id"),
|
||||||
"created_at": cached.get("created_at"),
|
"created_at": cached.get("created_at"),
|
||||||
"completed_at": cached.get("created_at"),
|
"completed_at": cached.get("created_at"),
|
||||||
|
|||||||
20
database.py
20
database.py
@@ -74,6 +74,7 @@ CREATE TABLE IF NOT EXISTS run_cache (
|
|||||||
output_cid VARCHAR(64) NOT NULL,
|
output_cid VARCHAR(64) NOT NULL,
|
||||||
ipfs_cid VARCHAR(128),
|
ipfs_cid VARCHAR(128),
|
||||||
provenance_cid VARCHAR(128),
|
provenance_cid VARCHAR(128),
|
||||||
|
plan_cid VARCHAR(128),
|
||||||
recipe VARCHAR(255) NOT NULL,
|
recipe VARCHAR(255) NOT NULL,
|
||||||
inputs JSONB NOT NULL,
|
inputs JSONB NOT NULL,
|
||||||
actor_id VARCHAR(255),
|
actor_id VARCHAR(255),
|
||||||
@@ -1083,7 +1084,7 @@ async def get_run_cache(run_id: str) -> Optional[dict]:
|
|||||||
async with pool.acquire() as conn:
|
async with pool.acquire() as conn:
|
||||||
row = await conn.fetchrow(
|
row = await conn.fetchrow(
|
||||||
"""
|
"""
|
||||||
SELECT run_id, output_cid, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at
|
SELECT run_id, output_cid, ipfs_cid, provenance_cid, plan_cid, recipe, inputs, actor_id, created_at
|
||||||
FROM run_cache WHERE run_id = $1
|
FROM run_cache WHERE run_id = $1
|
||||||
""",
|
""",
|
||||||
run_id
|
run_id
|
||||||
@@ -1094,6 +1095,7 @@ async def get_run_cache(run_id: str) -> Optional[dict]:
|
|||||||
"output_cid": row["output_cid"],
|
"output_cid": row["output_cid"],
|
||||||
"ipfs_cid": row["ipfs_cid"],
|
"ipfs_cid": row["ipfs_cid"],
|
||||||
"provenance_cid": row["provenance_cid"],
|
"provenance_cid": row["provenance_cid"],
|
||||||
|
"plan_cid": row["plan_cid"],
|
||||||
"recipe": row["recipe"],
|
"recipe": row["recipe"],
|
||||||
"inputs": row["inputs"],
|
"inputs": row["inputs"],
|
||||||
"actor_id": row["actor_id"],
|
"actor_id": row["actor_id"],
|
||||||
@@ -1109,27 +1111,30 @@ async def save_run_cache(
|
|||||||
inputs: List[str],
|
inputs: List[str],
|
||||||
ipfs_cid: Optional[str] = None,
|
ipfs_cid: Optional[str] = None,
|
||||||
provenance_cid: Optional[str] = None,
|
provenance_cid: Optional[str] = None,
|
||||||
|
plan_cid: Optional[str] = None,
|
||||||
actor_id: Optional[str] = None,
|
actor_id: Optional[str] = None,
|
||||||
) -> dict:
|
) -> dict:
|
||||||
"""Save run result to cache. Updates if run_id already exists."""
|
"""Save run result to cache. Updates if run_id already exists."""
|
||||||
async with pool.acquire() as conn:
|
async with pool.acquire() as conn:
|
||||||
row = await conn.fetchrow(
|
row = await conn.fetchrow(
|
||||||
"""
|
"""
|
||||||
INSERT INTO run_cache (run_id, output_cid, ipfs_cid, provenance_cid, recipe, inputs, actor_id)
|
INSERT INTO run_cache (run_id, output_cid, ipfs_cid, provenance_cid, plan_cid, recipe, inputs, actor_id)
|
||||||
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||||
ON CONFLICT (run_id) DO UPDATE SET
|
ON CONFLICT (run_id) DO UPDATE SET
|
||||||
output_cid = EXCLUDED.output_cid,
|
output_cid = EXCLUDED.output_cid,
|
||||||
ipfs_cid = COALESCE(EXCLUDED.ipfs_cid, run_cache.ipfs_cid),
|
ipfs_cid = COALESCE(EXCLUDED.ipfs_cid, run_cache.ipfs_cid),
|
||||||
provenance_cid = COALESCE(EXCLUDED.provenance_cid, run_cache.provenance_cid)
|
provenance_cid = COALESCE(EXCLUDED.provenance_cid, run_cache.provenance_cid),
|
||||||
RETURNING run_id, output_cid, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at
|
plan_cid = COALESCE(EXCLUDED.plan_cid, run_cache.plan_cid)
|
||||||
|
RETURNING run_id, output_cid, ipfs_cid, provenance_cid, plan_cid, recipe, inputs, actor_id, created_at
|
||||||
""",
|
""",
|
||||||
run_id, output_cid, ipfs_cid, provenance_cid, recipe, _json.dumps(inputs), actor_id
|
run_id, output_cid, ipfs_cid, provenance_cid, plan_cid, recipe, _json.dumps(inputs), actor_id
|
||||||
)
|
)
|
||||||
return {
|
return {
|
||||||
"run_id": row["run_id"],
|
"run_id": row["run_id"],
|
||||||
"output_cid": row["output_cid"],
|
"output_cid": row["output_cid"],
|
||||||
"ipfs_cid": row["ipfs_cid"],
|
"ipfs_cid": row["ipfs_cid"],
|
||||||
"provenance_cid": row["provenance_cid"],
|
"provenance_cid": row["provenance_cid"],
|
||||||
|
"plan_cid": row["plan_cid"],
|
||||||
"recipe": row["recipe"],
|
"recipe": row["recipe"],
|
||||||
"inputs": row["inputs"],
|
"inputs": row["inputs"],
|
||||||
"actor_id": row["actor_id"],
|
"actor_id": row["actor_id"],
|
||||||
@@ -1183,7 +1188,7 @@ async def list_runs_by_actor(actor_id: str, offset: int = 0, limit: int = 20) ->
|
|||||||
async with pool.acquire() as conn:
|
async with pool.acquire() as conn:
|
||||||
rows = await conn.fetch(
|
rows = await conn.fetch(
|
||||||
"""
|
"""
|
||||||
SELECT run_id, output_cid, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at
|
SELECT run_id, output_cid, ipfs_cid, provenance_cid, plan_cid, recipe, inputs, actor_id, created_at
|
||||||
FROM run_cache
|
FROM run_cache
|
||||||
WHERE actor_id = $1
|
WHERE actor_id = $1
|
||||||
ORDER BY created_at DESC
|
ORDER BY created_at DESC
|
||||||
@@ -1197,6 +1202,7 @@ async def list_runs_by_actor(actor_id: str, offset: int = 0, limit: int = 20) ->
|
|||||||
"output_cid": row["output_cid"],
|
"output_cid": row["output_cid"],
|
||||||
"ipfs_cid": row["ipfs_cid"],
|
"ipfs_cid": row["ipfs_cid"],
|
||||||
"provenance_cid": row["provenance_cid"],
|
"provenance_cid": row["provenance_cid"],
|
||||||
|
"plan_cid": row["plan_cid"],
|
||||||
"recipe": row["recipe"],
|
"recipe": row["recipe"],
|
||||||
"inputs": _parse_inputs(row["inputs"]),
|
"inputs": _parse_inputs(row["inputs"]),
|
||||||
"actor_id": row["actor_id"],
|
"actor_id": row["actor_id"],
|
||||||
|
|||||||
@@ -382,6 +382,17 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
|
|||||||
import asyncio
|
import asyncio
|
||||||
import database
|
import database
|
||||||
|
|
||||||
|
# Store plan (DAG) to IPFS
|
||||||
|
plan_cid = None
|
||||||
|
try:
|
||||||
|
import ipfs_client
|
||||||
|
dag_dict = json.loads(dag_json)
|
||||||
|
plan_cid = ipfs_client.add_json(dag_dict)
|
||||||
|
if plan_cid:
|
||||||
|
logger.info(f"Stored plan to IPFS: {plan_cid}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to store plan to IPFS: {e}")
|
||||||
|
|
||||||
async def save_to_db():
|
async def save_to_db():
|
||||||
if database.pool is None:
|
if database.pool is None:
|
||||||
await database.init_db()
|
await database.init_db()
|
||||||
@@ -409,6 +420,7 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
|
|||||||
inputs=input_hashes_for_db,
|
inputs=input_hashes_for_db,
|
||||||
ipfs_cid=output_ipfs_cid,
|
ipfs_cid=output_ipfs_cid,
|
||||||
actor_id=actor_id,
|
actor_id=actor_id,
|
||||||
|
plan_cid=plan_cid,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Save output as media for the user
|
# Save output as media for the user
|
||||||
|
|||||||
Reference in New Issue
Block a user