Remove Redis fallbacks - database only, no silent failures
- Database is the ONLY source of truth for cache_id -> ipfs_cid - Removed Redis caching layer entirely - Failures will raise exceptions instead of warning and continuing Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -171,105 +171,61 @@ class L1CacheManager:
|
|||||||
# Content index maps: CID (content hash or IPFS CID) -> node_id (code hash)
|
# Content index maps: CID (content hash or IPFS CID) -> node_id (code hash)
|
||||||
# IPFS index maps: node_id -> IPFS CID
|
# IPFS index maps: node_id -> IPFS CID
|
||||||
#
|
#
|
||||||
# Database is source of truth for cache_id -> ipfs_cid mapping.
|
# Database is the ONLY source of truth for cache_id -> ipfs_cid mapping.
|
||||||
# Redis is used as fast cache on top.
|
# No fallbacks - failures raise exceptions.
|
||||||
|
|
||||||
|
def _run_async(self, coro):
|
||||||
|
"""Run async coroutine from sync context."""
|
||||||
|
import asyncio
|
||||||
|
try:
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
if loop.is_running():
|
||||||
|
# Create new loop in thread
|
||||||
|
import concurrent.futures
|
||||||
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||||
|
future = executor.submit(asyncio.run, coro)
|
||||||
|
return future.result(timeout=30)
|
||||||
|
else:
|
||||||
|
return loop.run_until_complete(coro)
|
||||||
|
except RuntimeError:
|
||||||
|
return asyncio.run(coro)
|
||||||
|
|
||||||
def _set_content_index(self, cache_id: str, ipfs_cid: str):
|
def _set_content_index(self, cache_id: str, ipfs_cid: str):
|
||||||
"""Set content index entry in database (cache_id -> ipfs_cid)."""
|
"""Set content index entry in database (cache_id -> ipfs_cid)."""
|
||||||
import asyncio
|
|
||||||
import database
|
import database
|
||||||
|
|
||||||
# Save to database (persistent)
|
|
||||||
async def save_to_db():
|
async def save_to_db():
|
||||||
if database.pool is None:
|
if database.pool is None:
|
||||||
await database.init_db()
|
await database.init_db()
|
||||||
await database.create_cache_item(cache_id, ipfs_cid)
|
await database.create_cache_item(cache_id, ipfs_cid)
|
||||||
|
|
||||||
try:
|
self._run_async(save_to_db())
|
||||||
loop = asyncio.get_event_loop()
|
logger.info(f"Indexed in database: {cache_id[:16]}... -> {ipfs_cid}")
|
||||||
if loop.is_running():
|
|
||||||
asyncio.ensure_future(save_to_db())
|
|
||||||
else:
|
|
||||||
loop.run_until_complete(save_to_db())
|
|
||||||
except RuntimeError:
|
|
||||||
asyncio.run(save_to_db())
|
|
||||||
|
|
||||||
# Also cache in Redis for fast lookup
|
|
||||||
if self._redis:
|
|
||||||
try:
|
|
||||||
self._redis.hset(self._redis_content_key, cache_id, ipfs_cid)
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Failed to set content index in Redis: {e}")
|
|
||||||
|
|
||||||
def _get_content_index(self, cache_id: str) -> Optional[str]:
|
def _get_content_index(self, cache_id: str) -> Optional[str]:
|
||||||
"""Get content index entry (cache_id -> ipfs_cid)."""
|
"""Get content index entry (cache_id -> ipfs_cid) from database."""
|
||||||
import asyncio
|
|
||||||
import database
|
import database
|
||||||
|
|
||||||
# Check Redis first (fast cache)
|
|
||||||
if self._redis:
|
|
||||||
try:
|
|
||||||
val = self._redis.hget(self._redis_content_key, cache_id)
|
|
||||||
if val:
|
|
||||||
return val.decode() if isinstance(val, bytes) else val
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Failed to get content index from Redis: {e}")
|
|
||||||
|
|
||||||
# Fall back to database (persistent)
|
|
||||||
async def get_from_db():
|
async def get_from_db():
|
||||||
if database.pool is None:
|
if database.pool is None:
|
||||||
await database.init_db()
|
await database.init_db()
|
||||||
return await database.get_cache_item(cache_id)
|
return await database.get_cache_item(cache_id)
|
||||||
|
|
||||||
try:
|
result = self._run_async(get_from_db())
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
if loop.is_running():
|
|
||||||
# Can't await in sync context with running loop
|
|
||||||
# Return None and let caller handle
|
|
||||||
return None
|
|
||||||
else:
|
|
||||||
result = loop.run_until_complete(get_from_db())
|
|
||||||
except RuntimeError:
|
|
||||||
result = asyncio.run(get_from_db())
|
|
||||||
|
|
||||||
if result and result.get("ipfs_cid"):
|
if result and result.get("ipfs_cid"):
|
||||||
ipfs_cid = result["ipfs_cid"]
|
return result["ipfs_cid"]
|
||||||
# Populate Redis cache
|
|
||||||
if self._redis:
|
|
||||||
try:
|
|
||||||
self._redis.hset(self._redis_content_key, cache_id, ipfs_cid)
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
return ipfs_cid
|
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _del_content_index(self, cache_id: str):
|
def _del_content_index(self, cache_id: str):
|
||||||
"""Delete content index entry."""
|
"""Delete content index entry from database."""
|
||||||
import asyncio
|
|
||||||
import database
|
import database
|
||||||
|
|
||||||
# Delete from database
|
|
||||||
async def delete_from_db():
|
async def delete_from_db():
|
||||||
if database.pool is None:
|
if database.pool is None:
|
||||||
await database.init_db()
|
await database.init_db()
|
||||||
await database.delete_cache_item(cache_id)
|
await database.delete_cache_item(cache_id)
|
||||||
|
|
||||||
try:
|
self._run_async(delete_from_db())
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
if loop.is_running():
|
|
||||||
asyncio.ensure_future(delete_from_db())
|
|
||||||
else:
|
|
||||||
loop.run_until_complete(delete_from_db())
|
|
||||||
except RuntimeError:
|
|
||||||
asyncio.run(delete_from_db())
|
|
||||||
|
|
||||||
# Also delete from Redis
|
|
||||||
if self._redis:
|
|
||||||
try:
|
|
||||||
self._redis.hdel(self._redis_content_key, cache_id)
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Failed to delete content index from Redis: {e}")
|
|
||||||
|
|
||||||
def _set_ipfs_index(self, cid: str, ipfs_cid: str):
|
def _set_ipfs_index(self, cid: str, ipfs_cid: str):
|
||||||
"""Set IPFS index entry in Redis."""
|
"""Set IPFS index entry in Redis."""
|
||||||
|
|||||||
Reference in New Issue
Block a user