From 6b2991bf243bcce1de46a5f8adcd6d355ffeecdb Mon Sep 17 00:00:00 2001 From: giles Date: Tue, 3 Feb 2026 00:14:42 +0000 Subject: [PATCH] Fix database event loop conflicts in streaming task --- tasks/streaming.py | 60 ++++++++++++++++++++++++++-------------------- 1 file changed, 34 insertions(+), 26 deletions(-) diff --git a/tasks/streaming.py b/tasks/streaming.py index 702405f..fa0adbc 100644 --- a/tasks/streaming.py +++ b/tasks/streaming.py @@ -329,24 +329,30 @@ def run_stream( logger.info(f"Stream output cached: CID={cached_file.cid}, IPFS={ipfs_cid}") - # Save to database + # Save to database - reuse the module-level loop to avoid pool conflicts + global _resolve_loop, _db_initialized import asyncio import database - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) try: + # Reuse or create event loop + if _resolve_loop is None or _resolve_loop.is_closed(): + _resolve_loop = asyncio.new_event_loop() + asyncio.set_event_loop(_resolve_loop) + _db_initialized = False + # Initialize database pool if needed - if database.pool is None: - loop.run_until_complete(database.init_db()) + if not _db_initialized: + _resolve_loop.run_until_complete(database.init_db()) + _db_initialized = True # Get recipe CID from pending_run - pending = loop.run_until_complete(database.get_pending_run(run_id)) + pending = _resolve_loop.run_until_complete(database.get_pending_run(run_id)) recipe_cid = pending.get("recipe", "streaming") if pending else "streaming" # Save to run_cache for completed runs logger.info(f"Saving run {run_id} to run_cache with actor_id={actor_id}") - loop.run_until_complete(database.save_run_cache( + _resolve_loop.run_until_complete(database.save_run_cache( run_id=run_id, output_cid=cached_file.cid, recipe=recipe_cid, @@ -355,15 +361,13 @@ def run_stream( actor_id=actor_id, )) # Update pending run status - loop.run_until_complete(database.update_pending_run_status( + _resolve_loop.run_until_complete(database.update_pending_run_status( run_id=run_id, status="completed", )) logger.info(f"Saved run {run_id} to database with actor_id={actor_id}") except Exception as db_err: logger.warning(f"Failed to save run to database: {db_err}") - finally: - loop.close() return { "status": "completed", @@ -374,24 +378,26 @@ def run_stream( "output_path": str(cached_file.path), } else: - # Update pending run status to failed + # Update pending run status to failed - reuse module loop + global _resolve_loop, _db_initialized import asyncio import database - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) try: - if database.pool is None: - loop.run_until_complete(database.init_db()) - loop.run_until_complete(database.update_pending_run_status( + if _resolve_loop is None or _resolve_loop.is_closed(): + _resolve_loop = asyncio.new_event_loop() + asyncio.set_event_loop(_resolve_loop) + _db_initialized = False + if not _db_initialized: + _resolve_loop.run_until_complete(database.init_db()) + _db_initialized = True + _resolve_loop.run_until_complete(database.update_pending_run_status( run_id=run_id, status="failed", error="Output file not created", )) except Exception as db_err: logger.warning(f"Failed to update run status: {db_err}") - finally: - loop.close() return { "status": "failed", @@ -405,24 +411,26 @@ def run_stream( import traceback traceback.print_exc() - # Update pending run status to failed + # Update pending run status to failed - reuse module loop + global _resolve_loop, _db_initialized import asyncio import database - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) try: - if database.pool is None: - loop.run_until_complete(database.init_db()) - loop.run_until_complete(database.update_pending_run_status( + if _resolve_loop is None or _resolve_loop.is_closed(): + _resolve_loop = asyncio.new_event_loop() + asyncio.set_event_loop(_resolve_loop) + _db_initialized = False + if not _db_initialized: + _resolve_loop.run_until_complete(database.init_db()) + _db_initialized = True + _resolve_loop.run_until_complete(database.update_pending_run_status( run_id=run_id, status="failed", error=str(e), )) except Exception as db_err: logger.warning(f"Failed to update run status: {db_err}") - finally: - loop.close() return { "status": "failed",