From d7d7cd28c2b94cddfcf58c26c12c81ba18e1bd2a Mon Sep 17 00:00:00 2001 From: gilesb Date: Tue, 13 Jan 2026 04:20:34 +0000 Subject: [PATCH] Store cache items by IPFS CID, index by cache_id - Files in /data/cache/nodes/ are now stored by IPFS CID only - cache_id parameter creates index from cache_id -> IPFS CID - Removed deprecated node_id parameter behavior - get_by_cid(cache_id) still works via index lookup Co-Authored-By: Claude Opus 4.5 --- cache_manager.py | 33 ++++++++++++++++----------------- legacy_tasks.py | 29 ++++++++++++----------------- 2 files changed, 28 insertions(+), 34 deletions(-) diff --git a/cache_manager.py b/cache_manager.py index f825769..7f32fe5 100644 --- a/cache_manager.py +++ b/cache_manager.py @@ -267,16 +267,21 @@ class L1CacheManager: source_path: Path, node_type: str = "upload", node_id: str = None, + cache_id: str = None, execution_time: float = 0.0, move: bool = False, ) -> tuple[CachedFile, Optional[str]]: """ Store a file in the cache and upload to IPFS. + Files are ALWAYS stored by IPFS CID. The cache_id parameter creates + an index from cache_id -> IPFS CID for code-addressed lookups. + Args: source_path: Path to file to cache node_type: Type of node (e.g., "upload", "source", "effect") - node_id: Optional node_id; if not provided, uses CID + node_id: DEPRECATED - ignored, always uses IPFS CID + cache_id: Optional code-addressed cache ID to index execution_time: How long the operation took move: If True, move instead of copy @@ -288,9 +293,8 @@ class L1CacheManager: if not cid: raise RuntimeError(f"IPFS upload failed for {source_path}. IPFS is required.") - # Use CID as node_id if not provided - if node_id is None: - node_id = cid + # Always store by IPFS CID (node_id parameter is deprecated) + node_id = cid # Check if already cached (by node_id) existing = self.cache.get_entry(node_id) @@ -319,21 +323,16 @@ class L1CacheManager: verify_path = self.cache.get(node_id) logger.info(f"put: Verify cache.get(node_id={node_id[:16]}...) = {verify_path}") - # Update content index (CID -> node_id mapping) - self._set_content_index(cid, node_id) - logger.info(f"put: Set content index {cid[:16]}... -> {node_id[:16]}...") + # Index by cache_id if provided (code-addressed cache lookup) + # This allows get_by_cid(cache_id) to find files stored by IPFS CID + if cache_id and cache_id != cid: + self._set_content_index(cache_id, cid) + logger.info(f"put: Indexed cache_id {cache_id[:16]}... -> IPFS {cid}") - # Also index by node_id itself (for code-addressed cache lookups) - # This allows get_by_cid(cache_id) to work when cache_id != IPFS CID - if node_id != cid: - self._set_content_index(node_id, node_id) - logger.debug(f"Self-indexed: {node_id[:16]}... -> {node_id[:16]}...") - - # Also index by local hash if cid is an IPFS CID - # This ensures both IPFS CID and local hash can be used to find the file + # Also index by local hash for content-based lookup if local_hash and local_hash != cid: - self._set_content_index(local_hash, node_id) - logger.debug(f"Dual-indexed: {local_hash[:16]}... -> {node_id}") + self._set_content_index(local_hash, cid) + logger.debug(f"Indexed local hash {local_hash[:16]}... -> IPFS {cid}") logger.info(f"Cached: {cid[:16]}...") diff --git a/legacy_tasks.py b/legacy_tasks.py index 874f282..afbd9db 100644 --- a/legacy_tasks.py +++ b/legacy_tasks.py @@ -346,21 +346,16 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict: else: cache_node_type = "dag_intermediate" - # Store in cache_manager (indexes by cid, uploads to IPFS) - # put() returns (CachedFile, cid) where cid is IPFS CID if available, else local hash + # Store in cache_manager (stored by IPFS CID, indexed by node_id) cached, content_cid = cache_manager.put( Path(node_path), node_type=cache_node_type, - node_id=node_id, + cache_id=node_id, ) - # content_cid is the primary identifier (IPFS CID or local hash) + # content_cid is always IPFS CID now (IPFS failures are fatal) node_hashes[node_id] = content_cid - # Track IPFS CIDs separately (they start with Qm or bafy) - if content_cid and (content_cid.startswith("Qm") or content_cid.startswith("bafy")): - node_ipfs_cids[node_id] = content_cid - logger.info(f"Cached node {node_id}: IPFS CID {content_cid}") - else: - logger.info(f"Cached node {node_id}: local hash {content_cid[:16] if content_cid else 'none'}...") + node_ipfs_cids[node_id] = content_cid + logger.info(f"Cached node {node_id}: IPFS CID {content_cid}") # Get output hash from the output node # Use the same identifier that's in the cache index (IPFS CID if available) @@ -840,11 +835,11 @@ def execute_recipe(self, recipe_sexp: str, input_hashes: Dict[str, str], run_id: import shutil shutil.copy2(current_input, final_output) - # Upload to IPFS + # Upload to IPFS (stored by IPFS CID, indexed by cache_id) cached, content_cid = cache_manager.put( final_output, node_type="COMPOUND", - node_id=step.cache_id, + cache_id=step.cache_id, ) # Cleanup temp files @@ -910,11 +905,11 @@ def execute_recipe(self, recipe_sexp: str, input_hashes: Dict[str, str], run_id: if result.returncode != 0: raise RuntimeError(f"FFmpeg concat failed: {result.stderr}") - # Upload to IPFS + # Upload to IPFS (stored by IPFS CID, indexed by cache_id) cached, content_cid = cache_manager.put( final_output, node_type="SEQUENCE", - node_id=step.cache_id, + cache_id=step.cache_id, ) # Cleanup @@ -963,7 +958,7 @@ def execute_recipe(self, recipe_sexp: str, input_hashes: Dict[str, str], run_id: cached, content_cid = cache_manager.put( result_path, node_type="EFFECT", - node_id=step.cache_id, + cache_id=step.cache_id, ) step_results[step.step_id] = { @@ -995,11 +990,11 @@ def execute_recipe(self, recipe_sexp: str, input_hashes: Dict[str, str], run_id: logger.info(f"Executing step {step.step_id} ({step.node_type}) with {len(input_paths)} inputs") result_path = executor.execute(step.config, input_paths, output_path) - # Store result in cache under code-addressed cache_id + # Store result in cache (by IPFS CID, indexed by cache_id) cached, content_cid = cache_manager.put( result_path, node_type=step.node_type, - node_id=step.cache_id, # Use cache_id as node_id + cache_id=step.cache_id, ) step_results[step.step_id] = {