From 3e3df6ff2ac9afa3a1c3e158077d9848efb2d344 Mon Sep 17 00:00:00 2001 From: gilesb Date: Mon, 12 Jan 2026 22:38:50 +0000 Subject: [PATCH] Code-addressed node IDs and remove JSON index files - Compiler now generates SHA3-256 hashes for node IDs - Each hash includes type, config, and input hashes (Merkle tree) - Same plan = same hashes = automatic cache reuse Cache changes: - Remove index.json - filesystem IS the index - Files at {cache_dir}/{hash}/output.* are source of truth - Per-node metadata.json for optional stats (not an index) Co-Authored-By: Claude Opus 4.5 --- app/services/run_service.py | 7 +- cache_manager.py | 126 ++++-------------------------------- 2 files changed, 17 insertions(+), 116 deletions(-) diff --git a/app/services/run_service.py b/app/services/run_service.py index 9193d4f..e73c673 100644 --- a/app/services/run_service.py +++ b/app/services/run_service.py @@ -522,10 +522,11 @@ class RunService: if not run: return None - plan_cache_id = run.get("plan_cache_id") - if plan_cache_id: + # Check plan_cid (stored in database) or plan_cache_id (legacy) + plan_cid = run.get("plan_cid") or run.get("plan_cache_id") + if plan_cid: # Get plan from cache by content hash - plan_path = self.cache.get_by_cid(plan_cache_id) + plan_path = self.cache.get_by_cid(plan_cid) if plan_path and plan_path.exists(): with open(plan_path) as f: content = f.read() diff --git a/cache_manager.py b/cache_manager.py index 5b82c0a..e62180b 100644 --- a/cache_manager.py +++ b/cache_manager.py @@ -162,79 +162,27 @@ class L1CacheManager: is_shared_fn=self._is_shared_by_node_id, ) - # Content hash index: cid -> node_id - # Uses Redis if available, falls back to in-memory dict - self._content_index: Dict[str, str] = {} - self._load_content_index() - - # IPFS CID index: cid -> ipfs_cid - self._ipfs_cids: Dict[str, str] = {} - self._load_ipfs_index() - # Legacy files directory (for files uploaded directly by cid) self.legacy_dir = self.cache_dir / "legacy" self.legacy_dir.mkdir(parents=True, exist_ok=True) - def _index_path(self) -> Path: - return self.cache_dir / "content_index.json" - - def _load_content_index(self): - """Load cid -> node_id index from Redis or JSON file.""" - # If Redis available and has data, use it - if self._redis: - try: - redis_data = self._redis.hgetall(self._redis_content_key) - if redis_data: - self._content_index = { - k.decode() if isinstance(k, bytes) else k: - v.decode() if isinstance(v, bytes) else v - for k, v in redis_data.items() - } - logger.info(f"Loaded {len(self._content_index)} content index entries from Redis") - return - except Exception as e: - logger.warning(f"Failed to load content index from Redis: {e}") - - # Fall back to JSON file - if self._index_path().exists(): - try: - with open(self._index_path()) as f: - self._content_index = json.load(f) - except (json.JSONDecodeError, IOError) as e: - logger.warning(f"Failed to load content index: {e}") - self._content_index = {} - - # Also index from existing cache entries - for entry in self.cache.list_entries(): - if entry.cid: - self._content_index[entry.cid] = entry.node_id - - # Migrate to Redis if available - if self._redis and self._content_index: - try: - self._redis.hset(self._redis_content_key, mapping=self._content_index) - logger.info(f"Migrated {len(self._content_index)} content index entries to Redis") - except Exception as e: - logger.warning(f"Failed to migrate content index to Redis: {e}") - - def _save_content_index(self): - """Save cid -> node_id index to Redis and JSON file.""" - # Always save to JSON as backup - with open(self._index_path(), "w") as f: - json.dump(self._content_index, f, indent=2) + # ============ Redis Index (no JSON files) ============ + # + # Content index maps: CID (content hash or IPFS CID) -> node_id (code hash) + # IPFS index maps: node_id -> IPFS CID + # + # Redis is the only shared state. Filesystem is source of truth. def _set_content_index(self, cid: str, node_id: str): - """Set a single content index entry (Redis + in-memory).""" - self._content_index[cid] = node_id + """Set content index entry in Redis.""" if self._redis: try: self._redis.hset(self._redis_content_key, cid, node_id) except Exception as e: logger.warning(f"Failed to set content index in Redis: {e}") - self._save_content_index() def _get_content_index(self, cid: str) -> Optional[str]: - """Get a content index entry (Redis-first, then in-memory).""" + """Get content index entry from Redis.""" if self._redis: try: val = self._redis.hget(self._redis_content_key, cid) @@ -242,73 +190,26 @@ class L1CacheManager: return val.decode() if isinstance(val, bytes) else val except Exception as e: logger.warning(f"Failed to get content index from Redis: {e}") - return self._content_index.get(cid) + return None def _del_content_index(self, cid: str): - """Delete a content index entry.""" - if cid in self._content_index: - del self._content_index[cid] + """Delete content index entry from Redis.""" if self._redis: try: self._redis.hdel(self._redis_content_key, cid) except Exception as e: logger.warning(f"Failed to delete content index from Redis: {e}") - self._save_content_index() - - def _ipfs_index_path(self) -> Path: - return self.cache_dir / "ipfs_index.json" - - def _load_ipfs_index(self): - """Load cid -> ipfs_cid index from Redis or JSON file.""" - # If Redis available and has data, use it - if self._redis: - try: - redis_data = self._redis.hgetall(self._redis_ipfs_key) - if redis_data: - self._ipfs_cids = { - k.decode() if isinstance(k, bytes) else k: - v.decode() if isinstance(v, bytes) else v - for k, v in redis_data.items() - } - logger.info(f"Loaded {len(self._ipfs_cids)} IPFS index entries from Redis") - return - except Exception as e: - logger.warning(f"Failed to load IPFS index from Redis: {e}") - - # Fall back to JSON file - if self._ipfs_index_path().exists(): - try: - with open(self._ipfs_index_path()) as f: - self._ipfs_cids = json.load(f) - except (json.JSONDecodeError, IOError) as e: - logger.warning(f"Failed to load IPFS index: {e}") - self._ipfs_cids = {} - - # Migrate to Redis if available - if self._redis and self._ipfs_cids: - try: - self._redis.hset(self._redis_ipfs_key, mapping=self._ipfs_cids) - logger.info(f"Migrated {len(self._ipfs_cids)} IPFS index entries to Redis") - except Exception as e: - logger.warning(f"Failed to migrate IPFS index to Redis: {e}") - - def _save_ipfs_index(self): - """Save cid -> ipfs_cid index to JSON file (backup).""" - with open(self._ipfs_index_path(), "w") as f: - json.dump(self._ipfs_cids, f, indent=2) def _set_ipfs_index(self, cid: str, ipfs_cid: str): - """Set a single IPFS index entry (Redis + in-memory).""" - self._ipfs_cids[cid] = ipfs_cid + """Set IPFS index entry in Redis.""" if self._redis: try: self._redis.hset(self._redis_ipfs_key, cid, ipfs_cid) except Exception as e: logger.warning(f"Failed to set IPFS index in Redis: {e}") - self._save_ipfs_index() def _get_ipfs_cid_from_index(self, cid: str) -> Optional[str]: - """Get IPFS CID from index (Redis-first, then in-memory).""" + """Get IPFS CID from Redis.""" if self._redis: try: val = self._redis.hget(self._redis_ipfs_key, cid) @@ -316,7 +217,7 @@ class L1CacheManager: return val.decode() if isinstance(val, bytes) else val except Exception as e: logger.warning(f"Failed to get IPFS CID from Redis: {e}") - return self._ipfs_cids.get(cid) + return None def get_ipfs_cid(self, cid: str) -> Optional[str]: """Get IPFS CID for a content hash.""" @@ -517,7 +418,6 @@ class L1CacheManager: if ipfs_client.get_file(ipfs_cid, str(recovery_path)): logger.info(f"get_by_cid: Fetched from IPFS: {recovery_path}") return recovery_path - return recovery_path return None