From d5f30035dac9b079e9e2170067c0654bced7e03b Mon Sep 17 00:00:00 2001 From: giles Date: Wed, 4 Feb 2026 11:52:50 +0000 Subject: [PATCH] Fix async event loop conflict in resolve_asset When running with --pool=solo, there may already be a running event loop. Use thread pool to run async coroutines when a loop is already running. Co-Authored-By: Claude Opus 4.5 --- tasks/streaming.py | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/tasks/streaming.py b/tasks/streaming.py index 3d061cb..6590603 100644 --- a/tasks/streaming.py +++ b/tasks/streaming.py @@ -61,19 +61,32 @@ def resolve_asset(ref: str, actor_id: Optional[str] = None) -> Optional[Path]: import database from database import resolve_friendly_name - try: - # Reuse event loop for database operations - if _resolve_loop is None or _resolve_loop.is_closed(): - _resolve_loop = asyncio.new_event_loop() - asyncio.set_event_loop(_resolve_loop) - _db_initialized = False + def _run_async(coro): + """Run async coroutine, handling both running and non-running event loops.""" + global _resolve_loop, _db_initialized + try: + # Check if there's already a running loop + loop = asyncio.get_running_loop() + # Loop is running - use nest_asyncio or thread + import concurrent.futures + with concurrent.futures.ThreadPoolExecutor() as pool: + future = pool.submit(asyncio.run, coro) + return future.result(timeout=30) + except RuntimeError: + # No running loop - create one + if _resolve_loop is None or _resolve_loop.is_closed(): + _resolve_loop = asyncio.new_event_loop() + asyncio.set_event_loop(_resolve_loop) + _db_initialized = False + return _resolve_loop.run_until_complete(coro) - # Initialize database pool once per loop + try: + # Initialize database if needed if not _db_initialized: - _resolve_loop.run_until_complete(database.init_db()) + _run_async(database.init_db()) _db_initialized = True - cid = _resolve_loop.run_until_complete(resolve_friendly_name(actor_id, ref)) + cid = _run_async(resolve_friendly_name(actor_id, ref)) print(f"RESOLVE_ASSET: resolve_friendly_name({actor_id}, {ref}) = {cid}", file=sys.stderr) if cid: @@ -86,7 +99,7 @@ def resolve_asset(ref: str, actor_id: Optional[str] = None) -> Optional[Path]: # File not in local cache - look up IPFS CID and fetch # The cid from friendly_names is internal, need to get ipfs_cid from cache_items - ipfs_cid = _resolve_loop.run_until_complete(database.get_ipfs_cid(cid)) + ipfs_cid = _run_async(database.get_ipfs_cid(cid)) if not ipfs_cid or ipfs_cid == cid: # No separate IPFS CID, try using the cid directly (might be IPFS CID) ipfs_cid = cid