Fix database event loop conflicts in streaming task

This commit is contained in:
giles
2026-02-03 00:14:42 +00:00
parent 3ec045c533
commit 6b2991bf24

View File

@@ -329,24 +329,30 @@ def run_stream(
logger.info(f"Stream output cached: CID={cached_file.cid}, IPFS={ipfs_cid}") logger.info(f"Stream output cached: CID={cached_file.cid}, IPFS={ipfs_cid}")
# Save to database # Save to database - reuse the module-level loop to avoid pool conflicts
global _resolve_loop, _db_initialized
import asyncio import asyncio
import database import database
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try: try:
# Reuse or create event loop
if _resolve_loop is None or _resolve_loop.is_closed():
_resolve_loop = asyncio.new_event_loop()
asyncio.set_event_loop(_resolve_loop)
_db_initialized = False
# Initialize database pool if needed # Initialize database pool if needed
if database.pool is None: if not _db_initialized:
loop.run_until_complete(database.init_db()) _resolve_loop.run_until_complete(database.init_db())
_db_initialized = True
# Get recipe CID from pending_run # Get recipe CID from pending_run
pending = loop.run_until_complete(database.get_pending_run(run_id)) pending = _resolve_loop.run_until_complete(database.get_pending_run(run_id))
recipe_cid = pending.get("recipe", "streaming") if pending else "streaming" recipe_cid = pending.get("recipe", "streaming") if pending else "streaming"
# Save to run_cache for completed runs # Save to run_cache for completed runs
logger.info(f"Saving run {run_id} to run_cache with actor_id={actor_id}") logger.info(f"Saving run {run_id} to run_cache with actor_id={actor_id}")
loop.run_until_complete(database.save_run_cache( _resolve_loop.run_until_complete(database.save_run_cache(
run_id=run_id, run_id=run_id,
output_cid=cached_file.cid, output_cid=cached_file.cid,
recipe=recipe_cid, recipe=recipe_cid,
@@ -355,15 +361,13 @@ def run_stream(
actor_id=actor_id, actor_id=actor_id,
)) ))
# Update pending run status # Update pending run status
loop.run_until_complete(database.update_pending_run_status( _resolve_loop.run_until_complete(database.update_pending_run_status(
run_id=run_id, run_id=run_id,
status="completed", status="completed",
)) ))
logger.info(f"Saved run {run_id} to database with actor_id={actor_id}") logger.info(f"Saved run {run_id} to database with actor_id={actor_id}")
except Exception as db_err: except Exception as db_err:
logger.warning(f"Failed to save run to database: {db_err}") logger.warning(f"Failed to save run to database: {db_err}")
finally:
loop.close()
return { return {
"status": "completed", "status": "completed",
@@ -374,24 +378,26 @@ def run_stream(
"output_path": str(cached_file.path), "output_path": str(cached_file.path),
} }
else: else:
# Update pending run status to failed # Update pending run status to failed - reuse module loop
global _resolve_loop, _db_initialized
import asyncio import asyncio
import database import database
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try: try:
if database.pool is None: if _resolve_loop is None or _resolve_loop.is_closed():
loop.run_until_complete(database.init_db()) _resolve_loop = asyncio.new_event_loop()
loop.run_until_complete(database.update_pending_run_status( asyncio.set_event_loop(_resolve_loop)
_db_initialized = False
if not _db_initialized:
_resolve_loop.run_until_complete(database.init_db())
_db_initialized = True
_resolve_loop.run_until_complete(database.update_pending_run_status(
run_id=run_id, run_id=run_id,
status="failed", status="failed",
error="Output file not created", error="Output file not created",
)) ))
except Exception as db_err: except Exception as db_err:
logger.warning(f"Failed to update run status: {db_err}") logger.warning(f"Failed to update run status: {db_err}")
finally:
loop.close()
return { return {
"status": "failed", "status": "failed",
@@ -405,24 +411,26 @@ def run_stream(
import traceback import traceback
traceback.print_exc() traceback.print_exc()
# Update pending run status to failed # Update pending run status to failed - reuse module loop
global _resolve_loop, _db_initialized
import asyncio import asyncio
import database import database
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try: try:
if database.pool is None: if _resolve_loop is None or _resolve_loop.is_closed():
loop.run_until_complete(database.init_db()) _resolve_loop = asyncio.new_event_loop()
loop.run_until_complete(database.update_pending_run_status( asyncio.set_event_loop(_resolve_loop)
_db_initialized = False
if not _db_initialized:
_resolve_loop.run_until_complete(database.init_db())
_db_initialized = True
_resolve_loop.run_until_complete(database.update_pending_run_status(
run_id=run_id, run_id=run_id,
status="failed", status="failed",
error=str(e), error=str(e),
)) ))
except Exception as db_err: except Exception as db_err:
logger.warning(f"Failed to update run status: {db_err}") logger.warning(f"Failed to update run status: {db_err}")
finally:
loop.close()
return { return {
"status": "failed", "status": "failed",