Store IPFS CID in database for DAG outputs
Properly unpack cache_manager.put() tuple to get IPFS CID and store it in PostgreSQL via database.create_cache_item(). This fixes the "Output has no IPFS CID - cannot publish" error when publishing from L1 to L2. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
22
tasks.py
22
tasks.py
@@ -333,8 +333,26 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
|
||||
if result.output_path and result.output_path.exists():
|
||||
output_hash = file_hash(result.output_path)
|
||||
|
||||
# Store in cache_manager for proper tracking
|
||||
cached = cache_manager.put(result.output_path, node_type="dag_output")
|
||||
# Store in cache_manager for proper tracking (returns tuple)
|
||||
cached, ipfs_cid = cache_manager.put(result.output_path, node_type="dag_output")
|
||||
|
||||
# Store in database (for L2 to query IPFS CID)
|
||||
import asyncio
|
||||
import database
|
||||
|
||||
async def save_to_db():
|
||||
if database.pool is None:
|
||||
await database.init_db()
|
||||
await database.create_cache_item(output_hash, ipfs_cid)
|
||||
|
||||
try:
|
||||
loop = asyncio.get_event_loop()
|
||||
if loop.is_running():
|
||||
asyncio.ensure_future(save_to_db())
|
||||
else:
|
||||
loop.run_until_complete(save_to_db())
|
||||
except RuntimeError:
|
||||
asyncio.run(save_to_db())
|
||||
|
||||
# Record activity for deletion tracking
|
||||
input_hashes = []
|
||||
|
||||
Reference in New Issue
Block a user