From 58b324043061c3913d66fbc8658778109fcf866e Mon Sep 17 00:00:00 2001 From: gilesb Date: Fri, 9 Jan 2026 23:35:50 +0000 Subject: [PATCH] Store IPFS CID in database for DAG outputs Properly unpack cache_manager.put() tuple to get IPFS CID and store it in PostgreSQL via database.create_cache_item(). This fixes the "Output has no IPFS CID - cannot publish" error when publishing from L1 to L2. Co-Authored-By: Claude Opus 4.5 --- tasks.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/tasks.py b/tasks.py index 382b4a1..a7877cc 100644 --- a/tasks.py +++ b/tasks.py @@ -333,8 +333,26 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict: if result.output_path and result.output_path.exists(): output_hash = file_hash(result.output_path) - # Store in cache_manager for proper tracking - cached = cache_manager.put(result.output_path, node_type="dag_output") + # Store in cache_manager for proper tracking (returns tuple) + cached, ipfs_cid = cache_manager.put(result.output_path, node_type="dag_output") + + # Store in database (for L2 to query IPFS CID) + import asyncio + import database + + async def save_to_db(): + if database.pool is None: + await database.init_db() + await database.create_cache_item(output_hash, ipfs_cid) + + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + asyncio.ensure_future(save_to_db()) + else: + loop.run_until_complete(save_to_db()) + except RuntimeError: + asyncio.run(save_to_db()) # Record activity for deletion tracking input_hashes = []