Store provenance on IPFS instead of local files
- Add add_json() to ipfs_client for storing JSON data - Update render_effect task to store provenance on IPFS - Update execute_dag task to store DAG provenance on IPFS - Add provenance_cid field to RunStatus model - Extract provenance_cid from task results Provenance is now immutable and content-addressed, enabling: - Cross-L2 verification - Bitcoin timestamping for dispute resolution - Complete audit trail on IPFS Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
47
tasks.py
47
tasks.py
@@ -237,7 +237,6 @@ def render_effect(self, input_hash: str, effect_name: str, output_name: str) ->
|
||||
"output": {
|
||||
"name": output_name,
|
||||
"content_hash": output_hash,
|
||||
"local_path": str(result)
|
||||
},
|
||||
"inputs": [
|
||||
{"content_hash": input_hash}
|
||||
@@ -249,10 +248,14 @@ def render_effect(self, input_hash: str, effect_name: str, output_name: str) ->
|
||||
}
|
||||
}
|
||||
|
||||
# Save provenance
|
||||
provenance_path = result.with_suffix(".provenance.json")
|
||||
with open(provenance_path, "w") as f:
|
||||
json.dump(provenance, f, indent=2)
|
||||
# Store provenance on IPFS
|
||||
import ipfs_client
|
||||
provenance_cid = ipfs_client.add_json(provenance)
|
||||
if provenance_cid:
|
||||
provenance["provenance_cid"] = provenance_cid
|
||||
logger.info(f"Stored provenance on IPFS: {provenance_cid}")
|
||||
else:
|
||||
logger.warning("Failed to store provenance on IPFS")
|
||||
|
||||
return provenance
|
||||
|
||||
@@ -339,6 +342,39 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
|
||||
run_id=run_id,
|
||||
)
|
||||
|
||||
# Build provenance
|
||||
input_hashes_for_provenance = []
|
||||
for node_id, node in dag.nodes.items():
|
||||
if node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE":
|
||||
content_hash = node.config.get("content_hash")
|
||||
if content_hash:
|
||||
input_hashes_for_provenance.append({"content_hash": content_hash})
|
||||
|
||||
provenance = {
|
||||
"task_id": self.request.id,
|
||||
"run_id": run_id,
|
||||
"rendered_at": datetime.now(timezone.utc).isoformat(),
|
||||
"output": {
|
||||
"content_hash": output_hash,
|
||||
},
|
||||
"inputs": input_hashes_for_provenance,
|
||||
"dag": dag_json, # Full DAG definition
|
||||
"execution": {
|
||||
"execution_time": result.execution_time,
|
||||
"nodes_executed": result.nodes_executed,
|
||||
"nodes_cached": result.nodes_cached,
|
||||
}
|
||||
}
|
||||
|
||||
# Store provenance on IPFS
|
||||
import ipfs_client
|
||||
provenance_cid = ipfs_client.add_json(provenance)
|
||||
if provenance_cid:
|
||||
provenance["provenance_cid"] = provenance_cid
|
||||
logger.info(f"Stored DAG provenance on IPFS: {provenance_cid}")
|
||||
else:
|
||||
logger.warning("Failed to store DAG provenance on IPFS")
|
||||
|
||||
# Build result
|
||||
return {
|
||||
"success": True,
|
||||
@@ -351,6 +387,7 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
|
||||
"node_results": {
|
||||
node_id: str(path) for node_id, path in result.node_results.items()
|
||||
},
|
||||
"provenance_cid": provenance_cid,
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user