Use database for cache_id -> ipfs_cid mapping

- Database (cache_items table) is now source of truth
- Redis used as fast cache on top
- Mapping persists across restarts

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-13 04:22:28 +00:00
parent d7d7cd28c2
commit 529c173722

View File

@@ -171,32 +171,103 @@ class L1CacheManager:
# Content index maps: CID (content hash or IPFS CID) -> node_id (code hash)
# IPFS index maps: node_id -> IPFS CID
#
# Redis is the only shared state. Filesystem is source of truth.
# Database is source of truth for cache_id -> ipfs_cid mapping.
# Redis is used as fast cache on top.
def _set_content_index(self, cid: str, node_id: str):
"""Set content index entry in Redis."""
def _set_content_index(self, cache_id: str, ipfs_cid: str):
"""Set content index entry in database (cache_id -> ipfs_cid)."""
import asyncio
import database
# Save to database (persistent)
async def save_to_db():
if database.pool is None:
await database.init_db()
await database.create_cache_item(cache_id, ipfs_cid)
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.ensure_future(save_to_db())
else:
loop.run_until_complete(save_to_db())
except RuntimeError:
asyncio.run(save_to_db())
# Also cache in Redis for fast lookup
if self._redis:
try:
self._redis.hset(self._redis_content_key, cid, node_id)
self._redis.hset(self._redis_content_key, cache_id, ipfs_cid)
except Exception as e:
logger.warning(f"Failed to set content index in Redis: {e}")
def _get_content_index(self, cid: str) -> Optional[str]:
"""Get content index entry from Redis."""
def _get_content_index(self, cache_id: str) -> Optional[str]:
"""Get content index entry (cache_id -> ipfs_cid)."""
import asyncio
import database
# Check Redis first (fast cache)
if self._redis:
try:
val = self._redis.hget(self._redis_content_key, cid)
val = self._redis.hget(self._redis_content_key, cache_id)
if val:
return val.decode() if isinstance(val, bytes) else val
except Exception as e:
logger.warning(f"Failed to get content index from Redis: {e}")
# Fall back to database (persistent)
async def get_from_db():
if database.pool is None:
await database.init_db()
return await database.get_cache_item(cache_id)
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# Can't await in sync context with running loop
# Return None and let caller handle
return None
else:
result = loop.run_until_complete(get_from_db())
except RuntimeError:
result = asyncio.run(get_from_db())
if result and result.get("ipfs_cid"):
ipfs_cid = result["ipfs_cid"]
# Populate Redis cache
if self._redis:
try:
self._redis.hset(self._redis_content_key, cache_id, ipfs_cid)
except:
pass
return ipfs_cid
return None
def _del_content_index(self, cid: str):
"""Delete content index entry from Redis."""
def _del_content_index(self, cache_id: str):
"""Delete content index entry."""
import asyncio
import database
# Delete from database
async def delete_from_db():
if database.pool is None:
await database.init_db()
await database.delete_cache_item(cache_id)
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.ensure_future(delete_from_db())
else:
loop.run_until_complete(delete_from_db())
except RuntimeError:
asyncio.run(delete_from_db())
# Also delete from Redis
if self._redis:
try:
self._redis.hdel(self._redis_content_key, cid)
self._redis.hdel(self._redis_content_key, cache_id)
except Exception as e:
logger.warning(f"Failed to delete content index from Redis: {e}")