diff --git a/cache_manager.py b/cache_manager.py index 7f32fe5..2514770 100644 --- a/cache_manager.py +++ b/cache_manager.py @@ -171,32 +171,103 @@ class L1CacheManager: # Content index maps: CID (content hash or IPFS CID) -> node_id (code hash) # IPFS index maps: node_id -> IPFS CID # - # Redis is the only shared state. Filesystem is source of truth. + # Database is source of truth for cache_id -> ipfs_cid mapping. + # Redis is used as fast cache on top. - def _set_content_index(self, cid: str, node_id: str): - """Set content index entry in Redis.""" + def _set_content_index(self, cache_id: str, ipfs_cid: str): + """Set content index entry in database (cache_id -> ipfs_cid).""" + import asyncio + import database + + # Save to database (persistent) + async def save_to_db(): + if database.pool is None: + await database.init_db() + await database.create_cache_item(cache_id, ipfs_cid) + + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + asyncio.ensure_future(save_to_db()) + else: + loop.run_until_complete(save_to_db()) + except RuntimeError: + asyncio.run(save_to_db()) + + # Also cache in Redis for fast lookup if self._redis: try: - self._redis.hset(self._redis_content_key, cid, node_id) + self._redis.hset(self._redis_content_key, cache_id, ipfs_cid) except Exception as e: logger.warning(f"Failed to set content index in Redis: {e}") - def _get_content_index(self, cid: str) -> Optional[str]: - """Get content index entry from Redis.""" + def _get_content_index(self, cache_id: str) -> Optional[str]: + """Get content index entry (cache_id -> ipfs_cid).""" + import asyncio + import database + + # Check Redis first (fast cache) if self._redis: try: - val = self._redis.hget(self._redis_content_key, cid) + val = self._redis.hget(self._redis_content_key, cache_id) if val: return val.decode() if isinstance(val, bytes) else val except Exception as e: logger.warning(f"Failed to get content index from Redis: {e}") + + # Fall back to database (persistent) + async def get_from_db(): + if database.pool is None: + await database.init_db() + return await database.get_cache_item(cache_id) + + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + # Can't await in sync context with running loop + # Return None and let caller handle + return None + else: + result = loop.run_until_complete(get_from_db()) + except RuntimeError: + result = asyncio.run(get_from_db()) + + if result and result.get("ipfs_cid"): + ipfs_cid = result["ipfs_cid"] + # Populate Redis cache + if self._redis: + try: + self._redis.hset(self._redis_content_key, cache_id, ipfs_cid) + except: + pass + return ipfs_cid + return None - def _del_content_index(self, cid: str): - """Delete content index entry from Redis.""" + def _del_content_index(self, cache_id: str): + """Delete content index entry.""" + import asyncio + import database + + # Delete from database + async def delete_from_db(): + if database.pool is None: + await database.init_db() + await database.delete_cache_item(cache_id) + + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + asyncio.ensure_future(delete_from_db()) + else: + loop.run_until_complete(delete_from_db()) + except RuntimeError: + asyncio.run(delete_from_db()) + + # Also delete from Redis if self._redis: try: - self._redis.hdel(self._redis_content_key, cid) + self._redis.hdel(self._redis_content_key, cache_id) except Exception as e: logger.warning(f"Failed to delete content index from Redis: {e}")