From 45826138cac77bf0f508466ffef5fc00a5ed21ef Mon Sep 17 00:00:00 2001 From: gilesb Date: Fri, 9 Jan 2026 02:40:38 +0000 Subject: [PATCH] Store provenance on IPFS instead of local files - Add add_json() to ipfs_client for storing JSON data - Update render_effect task to store provenance on IPFS - Update execute_dag task to store DAG provenance on IPFS - Add provenance_cid field to RunStatus model - Extract provenance_cid from task results Provenance is now immutable and content-addressed, enabling: - Cross-L2 verification - Bitcoin timestamping for dispute resolution - Complete audit trail on IPFS Co-Authored-By: Claude Opus 4.5 --- .env | 0 ipfs_client.py | 16 ++++++++++++++++ server.py | 3 +++ tasks.py | 47 ++++++++++++++++++++++++++++++++++++++++++----- 4 files changed, 61 insertions(+), 5 deletions(-) create mode 100644 .env diff --git a/.env b/.env new file mode 100644 index 0000000..e69de29 diff --git a/ipfs_client.py b/ipfs_client.py index 17b6e74..905e15e 100644 --- a/ipfs_client.py +++ b/ipfs_client.py @@ -102,6 +102,22 @@ def add_bytes(data: bytes, pin: bool = True) -> Optional[str]: return None +def add_json(data: dict, pin: bool = True) -> Optional[str]: + """ + Serialize dict to JSON and add to IPFS. + + Args: + data: Dictionary to serialize and store + pin: Whether to pin the data (default: True) + + Returns: + IPFS CID or None on failure + """ + import json + json_bytes = json.dumps(data, indent=2, sort_keys=True).encode('utf-8') + return add_bytes(json_bytes, pin=pin) + + def get_file(cid: str, dest_path: Path) -> bool: """ Retrieve a file from IPFS and save to destination. diff --git a/server.py b/server.py index 1ba64c2..a8c2264 100644 --- a/server.py +++ b/server.py @@ -151,6 +151,7 @@ class RunStatus(BaseModel): effect_url: Optional[str] = None # URL to effect source code username: Optional[str] = None # Owner of the run (ActivityPub actor ID) infrastructure: Optional[dict] = None # Hardware/software used for rendering + provenance_cid: Optional[str] = None # IPFS CID of provenance record # ============ Recipe Models ============ @@ -616,10 +617,12 @@ async def get_run(run_id: str): if "output_hash" in result: # New DAG result format run.output_hash = result.get("output_hash") + run.provenance_cid = result.get("provenance_cid") output_path = Path(result.get("output_path", "")) if result.get("output_path") else None else: # Legacy render_effect format run.output_hash = result.get("output", {}).get("content_hash") + run.provenance_cid = result.get("provenance_cid") output_path = Path(result.get("output", {}).get("local_path", "")) # Extract effects info from provenance (legacy only) diff --git a/tasks.py b/tasks.py index 4690e5c..c457db8 100644 --- a/tasks.py +++ b/tasks.py @@ -237,7 +237,6 @@ def render_effect(self, input_hash: str, effect_name: str, output_name: str) -> "output": { "name": output_name, "content_hash": output_hash, - "local_path": str(result) }, "inputs": [ {"content_hash": input_hash} @@ -249,10 +248,14 @@ def render_effect(self, input_hash: str, effect_name: str, output_name: str) -> } } - # Save provenance - provenance_path = result.with_suffix(".provenance.json") - with open(provenance_path, "w") as f: - json.dump(provenance, f, indent=2) + # Store provenance on IPFS + import ipfs_client + provenance_cid = ipfs_client.add_json(provenance) + if provenance_cid: + provenance["provenance_cid"] = provenance_cid + logger.info(f"Stored provenance on IPFS: {provenance_cid}") + else: + logger.warning("Failed to store provenance on IPFS") return provenance @@ -339,6 +342,39 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict: run_id=run_id, ) + # Build provenance + input_hashes_for_provenance = [] + for node_id, node in dag.nodes.items(): + if node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE": + content_hash = node.config.get("content_hash") + if content_hash: + input_hashes_for_provenance.append({"content_hash": content_hash}) + + provenance = { + "task_id": self.request.id, + "run_id": run_id, + "rendered_at": datetime.now(timezone.utc).isoformat(), + "output": { + "content_hash": output_hash, + }, + "inputs": input_hashes_for_provenance, + "dag": dag_json, # Full DAG definition + "execution": { + "execution_time": result.execution_time, + "nodes_executed": result.nodes_executed, + "nodes_cached": result.nodes_cached, + } + } + + # Store provenance on IPFS + import ipfs_client + provenance_cid = ipfs_client.add_json(provenance) + if provenance_cid: + provenance["provenance_cid"] = provenance_cid + logger.info(f"Stored DAG provenance on IPFS: {provenance_cid}") + else: + logger.warning("Failed to store DAG provenance on IPFS") + # Build result return { "success": True, @@ -351,6 +387,7 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict: "node_results": { node_id: str(path) for node_id, path in result.node_results.items() }, + "provenance_cid": provenance_cid, }