From 4f3eccd4d34882bb39271487b487da889f7d8deb Mon Sep 17 00:00:00 2001 From: gilesb Date: Tue, 13 Jan 2026 04:31:04 +0000 Subject: [PATCH] Fix async database calls from sync context - Use dedicated thread with new event loop for database operations - Create new database connection per operation to avoid pool conflicts - Handles both async and sync calling contexts correctly Co-Authored-By: Claude Opus 4.5 --- cache_manager.py | 101 ++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 82 insertions(+), 19 deletions(-) diff --git a/cache_manager.py b/cache_manager.py index 882d3af..1beb437 100644 --- a/cache_manager.py +++ b/cache_manager.py @@ -177,27 +177,68 @@ class L1CacheManager: def _run_async(self, coro): """Run async coroutine from sync context.""" import asyncio + try: - loop = asyncio.get_event_loop() - if loop.is_running(): - # Create new loop in thread - import concurrent.futures - with concurrent.futures.ThreadPoolExecutor() as executor: - future = executor.submit(asyncio.run, coro) - return future.result(timeout=30) - else: - return loop.run_until_complete(coro) + loop = asyncio.get_running_loop() + # Already in async context - schedule on the running loop + future = asyncio.ensure_future(coro, loop=loop) + # Can't block here, so we need a different approach + # Use a new thread with its own loop + import threading + result = [None] + error = [None] + + def run_in_thread(): + try: + new_loop = asyncio.new_event_loop() + asyncio.set_event_loop(new_loop) + try: + result[0] = new_loop.run_until_complete(coro) + finally: + new_loop.close() + except Exception as e: + error[0] = e + + thread = threading.Thread(target=run_in_thread) + thread.start() + thread.join(timeout=30) + if error[0]: + raise error[0] + return result[0] except RuntimeError: - return asyncio.run(coro) + # No running loop - safe to use run_until_complete + try: + loop = asyncio.get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + return loop.run_until_complete(coro) def _set_content_index(self, cache_id: str, ipfs_cid: str): """Set content index entry in database (cache_id -> ipfs_cid).""" import database async def save_to_db(): - if database.pool is None: - await database.init_db() - await database.create_cache_item(cache_id, ipfs_cid) + # Create new connection for this thread + import asyncpg + conn = await asyncpg.connect( + host=database.DB_HOST, + port=database.DB_PORT, + user=database.DB_USER, + password=database.DB_PASSWORD, + database=database.DB_NAME, + ) + try: + await conn.execute( + """ + INSERT INTO cache_items (cid, ipfs_cid) + VALUES ($1, $2) + ON CONFLICT (cid) DO UPDATE SET ipfs_cid = $2 + """, + cache_id, ipfs_cid + ) + finally: + await conn.close() self._run_async(save_to_db()) logger.info(f"Indexed in database: {cache_id[:16]}... -> {ipfs_cid}") @@ -207,9 +248,22 @@ class L1CacheManager: import database async def get_from_db(): - if database.pool is None: - await database.init_db() - return await database.get_cache_item(cache_id) + import asyncpg + conn = await asyncpg.connect( + host=database.DB_HOST, + port=database.DB_PORT, + user=database.DB_USER, + password=database.DB_PASSWORD, + database=database.DB_NAME, + ) + try: + row = await conn.fetchrow( + "SELECT ipfs_cid FROM cache_items WHERE cid = $1", + cache_id + ) + return {"ipfs_cid": row["ipfs_cid"]} if row else None + finally: + await conn.close() result = self._run_async(get_from_db()) if result and result.get("ipfs_cid"): @@ -221,9 +275,18 @@ class L1CacheManager: import database async def delete_from_db(): - if database.pool is None: - await database.init_db() - await database.delete_cache_item(cache_id) + import asyncpg + conn = await asyncpg.connect( + host=database.DB_HOST, + port=database.DB_PORT, + user=database.DB_USER, + password=database.DB_PASSWORD, + database=database.DB_NAME, + ) + try: + await conn.execute("DELETE FROM cache_items WHERE cid = $1", cache_id) + finally: + await conn.close() self._run_async(delete_from_db())