Store cache items by IPFS CID, index by cache_id

- Files in /data/cache/nodes/ are now stored by IPFS CID only
- cache_id parameter creates index from cache_id -> IPFS CID
- Removed deprecated node_id parameter behavior
- get_by_cid(cache_id) still works via index lookup

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-13 04:20:34 +00:00
parent c46fcd2308
commit d7d7cd28c2
2 changed files with 28 additions and 34 deletions

View File

@@ -267,16 +267,21 @@ class L1CacheManager:
source_path: Path, source_path: Path,
node_type: str = "upload", node_type: str = "upload",
node_id: str = None, node_id: str = None,
cache_id: str = None,
execution_time: float = 0.0, execution_time: float = 0.0,
move: bool = False, move: bool = False,
) -> tuple[CachedFile, Optional[str]]: ) -> tuple[CachedFile, Optional[str]]:
""" """
Store a file in the cache and upload to IPFS. Store a file in the cache and upload to IPFS.
Files are ALWAYS stored by IPFS CID. The cache_id parameter creates
an index from cache_id -> IPFS CID for code-addressed lookups.
Args: Args:
source_path: Path to file to cache source_path: Path to file to cache
node_type: Type of node (e.g., "upload", "source", "effect") node_type: Type of node (e.g., "upload", "source", "effect")
node_id: Optional node_id; if not provided, uses CID node_id: DEPRECATED - ignored, always uses IPFS CID
cache_id: Optional code-addressed cache ID to index
execution_time: How long the operation took execution_time: How long the operation took
move: If True, move instead of copy move: If True, move instead of copy
@@ -288,9 +293,8 @@ class L1CacheManager:
if not cid: if not cid:
raise RuntimeError(f"IPFS upload failed for {source_path}. IPFS is required.") raise RuntimeError(f"IPFS upload failed for {source_path}. IPFS is required.")
# Use CID as node_id if not provided # Always store by IPFS CID (node_id parameter is deprecated)
if node_id is None: node_id = cid
node_id = cid
# Check if already cached (by node_id) # Check if already cached (by node_id)
existing = self.cache.get_entry(node_id) existing = self.cache.get_entry(node_id)
@@ -319,21 +323,16 @@ class L1CacheManager:
verify_path = self.cache.get(node_id) verify_path = self.cache.get(node_id)
logger.info(f"put: Verify cache.get(node_id={node_id[:16]}...) = {verify_path}") logger.info(f"put: Verify cache.get(node_id={node_id[:16]}...) = {verify_path}")
# Update content index (CID -> node_id mapping) # Index by cache_id if provided (code-addressed cache lookup)
self._set_content_index(cid, node_id) # This allows get_by_cid(cache_id) to find files stored by IPFS CID
logger.info(f"put: Set content index {cid[:16]}... -> {node_id[:16]}...") if cache_id and cache_id != cid:
self._set_content_index(cache_id, cid)
logger.info(f"put: Indexed cache_id {cache_id[:16]}... -> IPFS {cid}")
# Also index by node_id itself (for code-addressed cache lookups) # Also index by local hash for content-based lookup
# This allows get_by_cid(cache_id) to work when cache_id != IPFS CID
if node_id != cid:
self._set_content_index(node_id, node_id)
logger.debug(f"Self-indexed: {node_id[:16]}... -> {node_id[:16]}...")
# Also index by local hash if cid is an IPFS CID
# This ensures both IPFS CID and local hash can be used to find the file
if local_hash and local_hash != cid: if local_hash and local_hash != cid:
self._set_content_index(local_hash, node_id) self._set_content_index(local_hash, cid)
logger.debug(f"Dual-indexed: {local_hash[:16]}... -> {node_id}") logger.debug(f"Indexed local hash {local_hash[:16]}... -> IPFS {cid}")
logger.info(f"Cached: {cid[:16]}...") logger.info(f"Cached: {cid[:16]}...")

View File

@@ -346,21 +346,16 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict:
else: else:
cache_node_type = "dag_intermediate" cache_node_type = "dag_intermediate"
# Store in cache_manager (indexes by cid, uploads to IPFS) # Store in cache_manager (stored by IPFS CID, indexed by node_id)
# put() returns (CachedFile, cid) where cid is IPFS CID if available, else local hash
cached, content_cid = cache_manager.put( cached, content_cid = cache_manager.put(
Path(node_path), Path(node_path),
node_type=cache_node_type, node_type=cache_node_type,
node_id=node_id, cache_id=node_id,
) )
# content_cid is the primary identifier (IPFS CID or local hash) # content_cid is always IPFS CID now (IPFS failures are fatal)
node_hashes[node_id] = content_cid node_hashes[node_id] = content_cid
# Track IPFS CIDs separately (they start with Qm or bafy) node_ipfs_cids[node_id] = content_cid
if content_cid and (content_cid.startswith("Qm") or content_cid.startswith("bafy")): logger.info(f"Cached node {node_id}: IPFS CID {content_cid}")
node_ipfs_cids[node_id] = content_cid
logger.info(f"Cached node {node_id}: IPFS CID {content_cid}")
else:
logger.info(f"Cached node {node_id}: local hash {content_cid[:16] if content_cid else 'none'}...")
# Get output hash from the output node # Get output hash from the output node
# Use the same identifier that's in the cache index (IPFS CID if available) # Use the same identifier that's in the cache index (IPFS CID if available)
@@ -840,11 +835,11 @@ def execute_recipe(self, recipe_sexp: str, input_hashes: Dict[str, str], run_id:
import shutil import shutil
shutil.copy2(current_input, final_output) shutil.copy2(current_input, final_output)
# Upload to IPFS # Upload to IPFS (stored by IPFS CID, indexed by cache_id)
cached, content_cid = cache_manager.put( cached, content_cid = cache_manager.put(
final_output, final_output,
node_type="COMPOUND", node_type="COMPOUND",
node_id=step.cache_id, cache_id=step.cache_id,
) )
# Cleanup temp files # Cleanup temp files
@@ -910,11 +905,11 @@ def execute_recipe(self, recipe_sexp: str, input_hashes: Dict[str, str], run_id:
if result.returncode != 0: if result.returncode != 0:
raise RuntimeError(f"FFmpeg concat failed: {result.stderr}") raise RuntimeError(f"FFmpeg concat failed: {result.stderr}")
# Upload to IPFS # Upload to IPFS (stored by IPFS CID, indexed by cache_id)
cached, content_cid = cache_manager.put( cached, content_cid = cache_manager.put(
final_output, final_output,
node_type="SEQUENCE", node_type="SEQUENCE",
node_id=step.cache_id, cache_id=step.cache_id,
) )
# Cleanup # Cleanup
@@ -963,7 +958,7 @@ def execute_recipe(self, recipe_sexp: str, input_hashes: Dict[str, str], run_id:
cached, content_cid = cache_manager.put( cached, content_cid = cache_manager.put(
result_path, result_path,
node_type="EFFECT", node_type="EFFECT",
node_id=step.cache_id, cache_id=step.cache_id,
) )
step_results[step.step_id] = { step_results[step.step_id] = {
@@ -995,11 +990,11 @@ def execute_recipe(self, recipe_sexp: str, input_hashes: Dict[str, str], run_id:
logger.info(f"Executing step {step.step_id} ({step.node_type}) with {len(input_paths)} inputs") logger.info(f"Executing step {step.step_id} ({step.node_type}) with {len(input_paths)} inputs")
result_path = executor.execute(step.config, input_paths, output_path) result_path = executor.execute(step.config, input_paths, output_path)
# Store result in cache under code-addressed cache_id # Store result in cache (by IPFS CID, indexed by cache_id)
cached, content_cid = cache_manager.put( cached, content_cid = cache_manager.put(
result_path, result_path,
node_type=step.node_type, node_type=step.node_type,
node_id=step.cache_id, # Use cache_id as node_id cache_id=step.cache_id,
) )
step_results[step.step_id] = { step_results[step.step_id] = {