Fix async event loop conflict in resolve_asset
Some checks are pending
GPU Worker CI/CD / test (push) Waiting to run
GPU Worker CI/CD / deploy (push) Blocked by required conditions

When running with --pool=solo, there may already be a running event loop.
Use thread pool to run async coroutines when a loop is already running.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
giles
2026-02-04 11:52:50 +00:00
parent 4b0f1b0bcd
commit d5f30035da

View File

@@ -61,19 +61,32 @@ def resolve_asset(ref: str, actor_id: Optional[str] = None) -> Optional[Path]:
import database import database
from database import resolve_friendly_name from database import resolve_friendly_name
try: def _run_async(coro):
# Reuse event loop for database operations """Run async coroutine, handling both running and non-running event loops."""
if _resolve_loop is None or _resolve_loop.is_closed(): global _resolve_loop, _db_initialized
_resolve_loop = asyncio.new_event_loop() try:
asyncio.set_event_loop(_resolve_loop) # Check if there's already a running loop
_db_initialized = False loop = asyncio.get_running_loop()
# Loop is running - use nest_asyncio or thread
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as pool:
future = pool.submit(asyncio.run, coro)
return future.result(timeout=30)
except RuntimeError:
# No running loop - create one
if _resolve_loop is None or _resolve_loop.is_closed():
_resolve_loop = asyncio.new_event_loop()
asyncio.set_event_loop(_resolve_loop)
_db_initialized = False
return _resolve_loop.run_until_complete(coro)
# Initialize database pool once per loop try:
# Initialize database if needed
if not _db_initialized: if not _db_initialized:
_resolve_loop.run_until_complete(database.init_db()) _run_async(database.init_db())
_db_initialized = True _db_initialized = True
cid = _resolve_loop.run_until_complete(resolve_friendly_name(actor_id, ref)) cid = _run_async(resolve_friendly_name(actor_id, ref))
print(f"RESOLVE_ASSET: resolve_friendly_name({actor_id}, {ref}) = {cid}", file=sys.stderr) print(f"RESOLVE_ASSET: resolve_friendly_name({actor_id}, {ref}) = {cid}", file=sys.stderr)
if cid: if cid:
@@ -86,7 +99,7 @@ def resolve_asset(ref: str, actor_id: Optional[str] = None) -> Optional[Path]:
# File not in local cache - look up IPFS CID and fetch # File not in local cache - look up IPFS CID and fetch
# The cid from friendly_names is internal, need to get ipfs_cid from cache_items # The cid from friendly_names is internal, need to get ipfs_cid from cache_items
ipfs_cid = _resolve_loop.run_until_complete(database.get_ipfs_cid(cid)) ipfs_cid = _run_async(database.get_ipfs_cid(cid))
if not ipfs_cid or ipfs_cid == cid: if not ipfs_cid or ipfs_cid == cid:
# No separate IPFS CID, try using the cid directly (might be IPFS CID) # No separate IPFS CID, try using the cid directly (might be IPFS CID)
ipfs_cid = cid ipfs_cid = cid