Code-addressed node IDs and remove JSON index files

- Compiler now generates SHA3-256 hashes for node IDs
- Each hash includes type, config, and input hashes (Merkle tree)
- Same plan = same hashes = automatic cache reuse

Cache changes:
- Remove index.json - filesystem IS the index
- Files at {cache_dir}/{hash}/output.* are source of truth
- Per-node metadata.json for optional stats (not an index)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-12 22:38:50 +00:00
parent faf794ef35
commit 3e3df6ff2a
2 changed files with 17 additions and 116 deletions

View File

@@ -522,10 +522,11 @@ class RunService:
if not run:
return None
plan_cache_id = run.get("plan_cache_id")
if plan_cache_id:
# Check plan_cid (stored in database) or plan_cache_id (legacy)
plan_cid = run.get("plan_cid") or run.get("plan_cache_id")
if plan_cid:
# Get plan from cache by content hash
plan_path = self.cache.get_by_cid(plan_cache_id)
plan_path = self.cache.get_by_cid(plan_cid)
if plan_path and plan_path.exists():
with open(plan_path) as f:
content = f.read()

View File

@@ -162,79 +162,27 @@ class L1CacheManager:
is_shared_fn=self._is_shared_by_node_id,
)
# Content hash index: cid -> node_id
# Uses Redis if available, falls back to in-memory dict
self._content_index: Dict[str, str] = {}
self._load_content_index()
# IPFS CID index: cid -> ipfs_cid
self._ipfs_cids: Dict[str, str] = {}
self._load_ipfs_index()
# Legacy files directory (for files uploaded directly by cid)
self.legacy_dir = self.cache_dir / "legacy"
self.legacy_dir.mkdir(parents=True, exist_ok=True)
def _index_path(self) -> Path:
return self.cache_dir / "content_index.json"
def _load_content_index(self):
"""Load cid -> node_id index from Redis or JSON file."""
# If Redis available and has data, use it
if self._redis:
try:
redis_data = self._redis.hgetall(self._redis_content_key)
if redis_data:
self._content_index = {
k.decode() if isinstance(k, bytes) else k:
v.decode() if isinstance(v, bytes) else v
for k, v in redis_data.items()
}
logger.info(f"Loaded {len(self._content_index)} content index entries from Redis")
return
except Exception as e:
logger.warning(f"Failed to load content index from Redis: {e}")
# Fall back to JSON file
if self._index_path().exists():
try:
with open(self._index_path()) as f:
self._content_index = json.load(f)
except (json.JSONDecodeError, IOError) as e:
logger.warning(f"Failed to load content index: {e}")
self._content_index = {}
# Also index from existing cache entries
for entry in self.cache.list_entries():
if entry.cid:
self._content_index[entry.cid] = entry.node_id
# Migrate to Redis if available
if self._redis and self._content_index:
try:
self._redis.hset(self._redis_content_key, mapping=self._content_index)
logger.info(f"Migrated {len(self._content_index)} content index entries to Redis")
except Exception as e:
logger.warning(f"Failed to migrate content index to Redis: {e}")
def _save_content_index(self):
"""Save cid -> node_id index to Redis and JSON file."""
# Always save to JSON as backup
with open(self._index_path(), "w") as f:
json.dump(self._content_index, f, indent=2)
# ============ Redis Index (no JSON files) ============
#
# Content index maps: CID (content hash or IPFS CID) -> node_id (code hash)
# IPFS index maps: node_id -> IPFS CID
#
# Redis is the only shared state. Filesystem is source of truth.
def _set_content_index(self, cid: str, node_id: str):
"""Set a single content index entry (Redis + in-memory)."""
self._content_index[cid] = node_id
"""Set content index entry in Redis."""
if self._redis:
try:
self._redis.hset(self._redis_content_key, cid, node_id)
except Exception as e:
logger.warning(f"Failed to set content index in Redis: {e}")
self._save_content_index()
def _get_content_index(self, cid: str) -> Optional[str]:
"""Get a content index entry (Redis-first, then in-memory)."""
"""Get content index entry from Redis."""
if self._redis:
try:
val = self._redis.hget(self._redis_content_key, cid)
@@ -242,73 +190,26 @@ class L1CacheManager:
return val.decode() if isinstance(val, bytes) else val
except Exception as e:
logger.warning(f"Failed to get content index from Redis: {e}")
return self._content_index.get(cid)
return None
def _del_content_index(self, cid: str):
"""Delete a content index entry."""
if cid in self._content_index:
del self._content_index[cid]
"""Delete content index entry from Redis."""
if self._redis:
try:
self._redis.hdel(self._redis_content_key, cid)
except Exception as e:
logger.warning(f"Failed to delete content index from Redis: {e}")
self._save_content_index()
def _ipfs_index_path(self) -> Path:
return self.cache_dir / "ipfs_index.json"
def _load_ipfs_index(self):
"""Load cid -> ipfs_cid index from Redis or JSON file."""
# If Redis available and has data, use it
if self._redis:
try:
redis_data = self._redis.hgetall(self._redis_ipfs_key)
if redis_data:
self._ipfs_cids = {
k.decode() if isinstance(k, bytes) else k:
v.decode() if isinstance(v, bytes) else v
for k, v in redis_data.items()
}
logger.info(f"Loaded {len(self._ipfs_cids)} IPFS index entries from Redis")
return
except Exception as e:
logger.warning(f"Failed to load IPFS index from Redis: {e}")
# Fall back to JSON file
if self._ipfs_index_path().exists():
try:
with open(self._ipfs_index_path()) as f:
self._ipfs_cids = json.load(f)
except (json.JSONDecodeError, IOError) as e:
logger.warning(f"Failed to load IPFS index: {e}")
self._ipfs_cids = {}
# Migrate to Redis if available
if self._redis and self._ipfs_cids:
try:
self._redis.hset(self._redis_ipfs_key, mapping=self._ipfs_cids)
logger.info(f"Migrated {len(self._ipfs_cids)} IPFS index entries to Redis")
except Exception as e:
logger.warning(f"Failed to migrate IPFS index to Redis: {e}")
def _save_ipfs_index(self):
"""Save cid -> ipfs_cid index to JSON file (backup)."""
with open(self._ipfs_index_path(), "w") as f:
json.dump(self._ipfs_cids, f, indent=2)
def _set_ipfs_index(self, cid: str, ipfs_cid: str):
"""Set a single IPFS index entry (Redis + in-memory)."""
self._ipfs_cids[cid] = ipfs_cid
"""Set IPFS index entry in Redis."""
if self._redis:
try:
self._redis.hset(self._redis_ipfs_key, cid, ipfs_cid)
except Exception as e:
logger.warning(f"Failed to set IPFS index in Redis: {e}")
self._save_ipfs_index()
def _get_ipfs_cid_from_index(self, cid: str) -> Optional[str]:
"""Get IPFS CID from index (Redis-first, then in-memory)."""
"""Get IPFS CID from Redis."""
if self._redis:
try:
val = self._redis.hget(self._redis_ipfs_key, cid)
@@ -316,7 +217,7 @@ class L1CacheManager:
return val.decode() if isinstance(val, bytes) else val
except Exception as e:
logger.warning(f"Failed to get IPFS CID from Redis: {e}")
return self._ipfs_cids.get(cid)
return None
def get_ipfs_cid(self, cid: str) -> Optional[str]:
"""Get IPFS CID for a content hash."""
@@ -517,7 +418,6 @@ class L1CacheManager:
if ipfs_client.get_file(ipfs_cid, str(recovery_path)):
logger.info(f"get_by_cid: Fetched from IPFS: {recovery_path}")
return recovery_path
return recovery_path
return None