diff --git a/server.py b/server.py index 84697be..b802483 100644 --- a/server.py +++ b/server.py @@ -1891,6 +1891,23 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us "effects_commit": run.get("effects_commit"), } + # Build registered_inputs list (deterministic - can compute before DB transaction) + registered_inputs = [] + for inp in input_infos: + if inp["existing_asset"]: + registered_inputs.append({ + "content_hash": inp["content_hash"], + "name": inp["existing_asset"]["name"], + "ipfs_cid": inp["ipfs_cid"] + }) + else: + # New input - name is deterministic + registered_inputs.append({ + "content_hash": inp["content_hash"], + "name": f"input-{inp['content_hash'][:16]}", + "ipfs_cid": inp["ipfs_cid"] + }) + # ===== PHASE 2: IPFS OPERATIONS (non-blocking for event loop) ===== def do_ipfs_operations(): """Run IPFS operations in thread pool to not block event loop.""" @@ -1907,14 +1924,35 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us future.result() # Raises IPFSError if failed logger.info("record_run: All CIDs pinned successfully") - # Store recipe on IPFS and return CID + # Store recipe on IPFS logger.info("record_run: Storing recipe on IPFS") - return ipfs_client.add_json(recipe_data) + recipe_cid = ipfs_client.add_json(recipe_data) + + # Build and store full provenance on IPFS + provenance = { + "inputs": registered_inputs, + "output": { + "content_hash": output_hash, + "ipfs_cid": output_ipfs_cid + }, + "recipe": recipe_data, + "recipe_cid": recipe_cid, + "effect_url": run.get("effect_url"), + "effects_commit": run.get("effects_commit"), + "l1_server": l1_url, + "l1_run_id": req.run_id, + "rendered_at": run.get("completed_at"), + "infrastructure": run.get("infrastructure") + } + logger.info("record_run: Storing provenance on IPFS") + provenance_cid = ipfs_client.add_json(provenance) + + return recipe_cid, provenance_cid, provenance try: import asyncio - recipe_cid = await asyncio.to_thread(do_ipfs_operations) - logger.info(f"record_run: Recipe stored on IPFS: {recipe_cid[:16]}...") + recipe_cid, provenance_cid, provenance = await asyncio.to_thread(do_ipfs_operations) + logger.info(f"record_run: Recipe CID: {recipe_cid[:16]}..., Provenance CID: {provenance_cid[:16]}...") except IPFSError as e: logger.error(f"record_run: IPFS operation failed: {e}") raise HTTPException(500, f"IPFS operation failed: {e}") @@ -1923,18 +1961,14 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us logger.info("record_run: Starting DB transaction") now = datetime.now(timezone.utc).isoformat() + # Add provenance_cid to provenance for storage in DB + provenance["provenance_cid"] = provenance_cid + try: async with db.transaction() as conn: # Register input assets (if not already on L2) - registered_inputs = [] for inp in input_infos: - if inp["existing_asset"]: - # Already on L2 - registered_inputs.append({ - "content_hash": inp["content_hash"], - "name": inp["existing_asset"]["name"] - }) - else: + if not inp["existing_asset"]: # Create new input asset input_name = f"input-{inp['content_hash'][:16]}" input_asset = { @@ -1948,29 +1982,12 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us "created_at": now } await db.create_asset_tx(conn, input_asset) - registered_inputs.append({ - "content_hash": inp["content_hash"], - "name": input_name - }) # Check output name doesn't exist if await db.asset_exists_by_name_tx(conn, req.output_name): raise HTTPException(400, f"Asset already exists: {req.output_name}") - # Build provenance with recipe CID - provenance = { - "inputs": registered_inputs, - "recipe": recipe_data, - "recipe_cid": recipe_cid, - "effect_url": run.get("effect_url"), - "effects_commit": run.get("effects_commit"), - "l1_server": l1_url, - "l1_run_id": req.run_id, - "rendered_at": run.get("completed_at"), - "infrastructure": run.get("infrastructure") - } - - # Create output asset + # Create output asset with provenance (includes provenance_cid) output_asset = { "name": req.output_name, "content_hash": output_hash,