diff --git a/legacy_tasks.py b/legacy_tasks.py index 02b6e90..fa0ac36 100644 --- a/legacy_tasks.py +++ b/legacy_tasks.py @@ -354,15 +354,20 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict: cache_node_type = "dag_intermediate" # Store in cache_manager (indexes by cid, uploads to IPFS) - cached, ipfs_cid = cache_manager.put( + # put() returns (CachedFile, cid) where cid is IPFS CID if available, else local hash + cached, content_cid = cache_manager.put( Path(node_path), node_type=cache_node_type, node_id=node_id, ) - node_hashes[node_id] = ipfs_cid or cached.cid # Prefer IPFS CID - if ipfs_cid: - node_ipfs_cids[node_id] = ipfs_cid - logger.info(f"Cached node {node_id}: {cached.cid[:16]}... -> {ipfs_cid or 'no IPFS'}") + # content_cid is the primary identifier (IPFS CID or local hash) + node_hashes[node_id] = content_cid + # Track IPFS CIDs separately (they start with Qm or bafy) + if content_cid and (content_cid.startswith("Qm") or content_cid.startswith("bafy")): + node_ipfs_cids[node_id] = content_cid + logger.info(f"Cached node {node_id}: IPFS CID {content_cid}") + else: + logger.info(f"Cached node {node_id}: local hash {content_cid[:16] if content_cid else 'none'}...") # Get output hash from the output node # Use the same identifier that's in the cache index (IPFS CID if available)