diff --git a/cache_manager.py b/cache_manager.py index 2514770..882d3af 100644 --- a/cache_manager.py +++ b/cache_manager.py @@ -171,105 +171,61 @@ class L1CacheManager: # Content index maps: CID (content hash or IPFS CID) -> node_id (code hash) # IPFS index maps: node_id -> IPFS CID # - # Database is source of truth for cache_id -> ipfs_cid mapping. - # Redis is used as fast cache on top. + # Database is the ONLY source of truth for cache_id -> ipfs_cid mapping. + # No fallbacks - failures raise exceptions. + + def _run_async(self, coro): + """Run async coroutine from sync context.""" + import asyncio + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + # Create new loop in thread + import concurrent.futures + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit(asyncio.run, coro) + return future.result(timeout=30) + else: + return loop.run_until_complete(coro) + except RuntimeError: + return asyncio.run(coro) def _set_content_index(self, cache_id: str, ipfs_cid: str): """Set content index entry in database (cache_id -> ipfs_cid).""" - import asyncio import database - # Save to database (persistent) async def save_to_db(): if database.pool is None: await database.init_db() await database.create_cache_item(cache_id, ipfs_cid) - try: - loop = asyncio.get_event_loop() - if loop.is_running(): - asyncio.ensure_future(save_to_db()) - else: - loop.run_until_complete(save_to_db()) - except RuntimeError: - asyncio.run(save_to_db()) - - # Also cache in Redis for fast lookup - if self._redis: - try: - self._redis.hset(self._redis_content_key, cache_id, ipfs_cid) - except Exception as e: - logger.warning(f"Failed to set content index in Redis: {e}") + self._run_async(save_to_db()) + logger.info(f"Indexed in database: {cache_id[:16]}... -> {ipfs_cid}") def _get_content_index(self, cache_id: str) -> Optional[str]: - """Get content index entry (cache_id -> ipfs_cid).""" - import asyncio + """Get content index entry (cache_id -> ipfs_cid) from database.""" import database - # Check Redis first (fast cache) - if self._redis: - try: - val = self._redis.hget(self._redis_content_key, cache_id) - if val: - return val.decode() if isinstance(val, bytes) else val - except Exception as e: - logger.warning(f"Failed to get content index from Redis: {e}") - - # Fall back to database (persistent) async def get_from_db(): if database.pool is None: await database.init_db() return await database.get_cache_item(cache_id) - try: - loop = asyncio.get_event_loop() - if loop.is_running(): - # Can't await in sync context with running loop - # Return None and let caller handle - return None - else: - result = loop.run_until_complete(get_from_db()) - except RuntimeError: - result = asyncio.run(get_from_db()) - + result = self._run_async(get_from_db()) if result and result.get("ipfs_cid"): - ipfs_cid = result["ipfs_cid"] - # Populate Redis cache - if self._redis: - try: - self._redis.hset(self._redis_content_key, cache_id, ipfs_cid) - except: - pass - return ipfs_cid - + return result["ipfs_cid"] return None def _del_content_index(self, cache_id: str): - """Delete content index entry.""" - import asyncio + """Delete content index entry from database.""" import database - # Delete from database async def delete_from_db(): if database.pool is None: await database.init_db() await database.delete_cache_item(cache_id) - try: - loop = asyncio.get_event_loop() - if loop.is_running(): - asyncio.ensure_future(delete_from_db()) - else: - loop.run_until_complete(delete_from_db()) - except RuntimeError: - asyncio.run(delete_from_db()) - - # Also delete from Redis - if self._redis: - try: - self._redis.hdel(self._redis_content_key, cache_id) - except Exception as e: - logger.warning(f"Failed to delete content index from Redis: {e}") + self._run_async(delete_from_db()) def _set_ipfs_index(self, cid: str, ipfs_cid: str): """Set IPFS index entry in Redis."""