Store full provenance on IPFS with provenance_cid
Provenance records are now stored on IPFS before the DB transaction.
The provenance CID is included in both:
- The provenance stored in the asset record
- The ActivityPub activity object
This enables:
- Immutable, content-addressed provenance
- Bitcoin timestamping of provenance for cross-L2 dispute resolution
- Verifiable chain of custody: inputs → recipe → output
Provenance structure on IPFS:
{
"inputs": [...],
"output": {"content_hash": "...", "ipfs_cid": "..."},
"recipe": {...},
"recipe_cid": "Qm...",
"provenance_cid": "Qm...", // self-reference for verification
...
}
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
77
server.py
77
server.py
@@ -1891,6 +1891,23 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us
|
||||
"effects_commit": run.get("effects_commit"),
|
||||
}
|
||||
|
||||
# Build registered_inputs list (deterministic - can compute before DB transaction)
|
||||
registered_inputs = []
|
||||
for inp in input_infos:
|
||||
if inp["existing_asset"]:
|
||||
registered_inputs.append({
|
||||
"content_hash": inp["content_hash"],
|
||||
"name": inp["existing_asset"]["name"],
|
||||
"ipfs_cid": inp["ipfs_cid"]
|
||||
})
|
||||
else:
|
||||
# New input - name is deterministic
|
||||
registered_inputs.append({
|
||||
"content_hash": inp["content_hash"],
|
||||
"name": f"input-{inp['content_hash'][:16]}",
|
||||
"ipfs_cid": inp["ipfs_cid"]
|
||||
})
|
||||
|
||||
# ===== PHASE 2: IPFS OPERATIONS (non-blocking for event loop) =====
|
||||
def do_ipfs_operations():
|
||||
"""Run IPFS operations in thread pool to not block event loop."""
|
||||
@@ -1907,14 +1924,35 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us
|
||||
future.result() # Raises IPFSError if failed
|
||||
logger.info("record_run: All CIDs pinned successfully")
|
||||
|
||||
# Store recipe on IPFS and return CID
|
||||
# Store recipe on IPFS
|
||||
logger.info("record_run: Storing recipe on IPFS")
|
||||
return ipfs_client.add_json(recipe_data)
|
||||
recipe_cid = ipfs_client.add_json(recipe_data)
|
||||
|
||||
# Build and store full provenance on IPFS
|
||||
provenance = {
|
||||
"inputs": registered_inputs,
|
||||
"output": {
|
||||
"content_hash": output_hash,
|
||||
"ipfs_cid": output_ipfs_cid
|
||||
},
|
||||
"recipe": recipe_data,
|
||||
"recipe_cid": recipe_cid,
|
||||
"effect_url": run.get("effect_url"),
|
||||
"effects_commit": run.get("effects_commit"),
|
||||
"l1_server": l1_url,
|
||||
"l1_run_id": req.run_id,
|
||||
"rendered_at": run.get("completed_at"),
|
||||
"infrastructure": run.get("infrastructure")
|
||||
}
|
||||
logger.info("record_run: Storing provenance on IPFS")
|
||||
provenance_cid = ipfs_client.add_json(provenance)
|
||||
|
||||
return recipe_cid, provenance_cid, provenance
|
||||
|
||||
try:
|
||||
import asyncio
|
||||
recipe_cid = await asyncio.to_thread(do_ipfs_operations)
|
||||
logger.info(f"record_run: Recipe stored on IPFS: {recipe_cid[:16]}...")
|
||||
recipe_cid, provenance_cid, provenance = await asyncio.to_thread(do_ipfs_operations)
|
||||
logger.info(f"record_run: Recipe CID: {recipe_cid[:16]}..., Provenance CID: {provenance_cid[:16]}...")
|
||||
except IPFSError as e:
|
||||
logger.error(f"record_run: IPFS operation failed: {e}")
|
||||
raise HTTPException(500, f"IPFS operation failed: {e}")
|
||||
@@ -1923,18 +1961,14 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us
|
||||
logger.info("record_run: Starting DB transaction")
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
# Add provenance_cid to provenance for storage in DB
|
||||
provenance["provenance_cid"] = provenance_cid
|
||||
|
||||
try:
|
||||
async with db.transaction() as conn:
|
||||
# Register input assets (if not already on L2)
|
||||
registered_inputs = []
|
||||
for inp in input_infos:
|
||||
if inp["existing_asset"]:
|
||||
# Already on L2
|
||||
registered_inputs.append({
|
||||
"content_hash": inp["content_hash"],
|
||||
"name": inp["existing_asset"]["name"]
|
||||
})
|
||||
else:
|
||||
if not inp["existing_asset"]:
|
||||
# Create new input asset
|
||||
input_name = f"input-{inp['content_hash'][:16]}"
|
||||
input_asset = {
|
||||
@@ -1948,29 +1982,12 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us
|
||||
"created_at": now
|
||||
}
|
||||
await db.create_asset_tx(conn, input_asset)
|
||||
registered_inputs.append({
|
||||
"content_hash": inp["content_hash"],
|
||||
"name": input_name
|
||||
})
|
||||
|
||||
# Check output name doesn't exist
|
||||
if await db.asset_exists_by_name_tx(conn, req.output_name):
|
||||
raise HTTPException(400, f"Asset already exists: {req.output_name}")
|
||||
|
||||
# Build provenance with recipe CID
|
||||
provenance = {
|
||||
"inputs": registered_inputs,
|
||||
"recipe": recipe_data,
|
||||
"recipe_cid": recipe_cid,
|
||||
"effect_url": run.get("effect_url"),
|
||||
"effects_commit": run.get("effects_commit"),
|
||||
"l1_server": l1_url,
|
||||
"l1_run_id": req.run_id,
|
||||
"rendered_at": run.get("completed_at"),
|
||||
"infrastructure": run.get("infrastructure")
|
||||
}
|
||||
|
||||
# Create output asset
|
||||
# Create output asset with provenance (includes provenance_cid)
|
||||
output_asset = {
|
||||
"name": req.output_name,
|
||||
"content_hash": output_hash,
|
||||
|
||||
Reference in New Issue
Block a user