diff --git a/tasks.py b/tasks.py index 382b4a1..a7877cc 100644 --- a/tasks.py +++ b/tasks.py @@ -333,8 +333,26 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict: if result.output_path and result.output_path.exists(): output_hash = file_hash(result.output_path) - # Store in cache_manager for proper tracking - cached = cache_manager.put(result.output_path, node_type="dag_output") + # Store in cache_manager for proper tracking (returns tuple) + cached, ipfs_cid = cache_manager.put(result.output_path, node_type="dag_output") + + # Store in database (for L2 to query IPFS CID) + import asyncio + import database + + async def save_to_db(): + if database.pool is None: + await database.init_db() + await database.create_cache_item(output_hash, ipfs_cid) + + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + asyncio.ensure_future(save_to_db()) + else: + loop.run_until_complete(save_to_db()) + except RuntimeError: + asyncio.run(save_to_db()) # Record activity for deletion tracking input_hashes = []