From ed5ef2bf39fd320a783591db41d33aa5fa9b9ecb Mon Sep 17 00:00:00 2001 From: giles Date: Wed, 4 Feb 2026 00:02:18 +0000 Subject: [PATCH] Add ipfs_playlist_cid to pending_runs and fail-fast on DB errors - Add ipfs_playlist_cid column to pending_runs schema with migration - Add pool guards to critical database functions (RuntimeError if not initialized) - Add update_pending_run_playlist() function for streaming - Update streaming task to save playlist CID to DB for HLS redirect - Change database error handling from warning to raising exception Errors should fail fast and explicitly, not be silently swallowed. Co-Authored-By: Claude Opus 4.5 --- database.py | 57 ++++++++++++++++++++++++++++++++++++++++++---- tasks/streaming.py | 21 ++++++++++++++++- 2 files changed, 72 insertions(+), 6 deletions(-) diff --git a/database.py b/database.py index 398c8b7..131d2c4 100644 --- a/database.py +++ b/database.py @@ -94,10 +94,20 @@ CREATE TABLE IF NOT EXISTS pending_runs ( output_name VARCHAR(255), actor_id VARCHAR(255), error TEXT, + ipfs_playlist_cid VARCHAR(128), -- For streaming: IPFS CID of HLS playlist created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ); +-- Add ipfs_playlist_cid if table exists but column doesn't (migration) +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.columns + WHERE table_name = 'pending_runs' AND column_name = 'ipfs_playlist_cid') THEN + ALTER TABLE pending_runs ADD COLUMN ipfs_playlist_cid VARCHAR(128); + END IF; +END $$; + CREATE INDEX IF NOT EXISTS idx_pending_runs_status ON pending_runs(status); CREATE INDEX IF NOT EXISTS idx_pending_runs_actor ON pending_runs(actor_id); @@ -166,13 +176,27 @@ CREATE INDEX IF NOT EXISTS idx_friendly_names_latest ON friendly_names(actor_id, async def init_db(): - """Initialize database connection pool and create schema.""" + """Initialize database connection pool and create schema. + + Raises: + asyncpg.PostgresError: If database connection fails + RuntimeError: If pool creation fails + """ global pool if pool is not None: return # Already initialized - pool = await asyncpg.create_pool(DATABASE_URL, min_size=2, max_size=10) - async with pool.acquire() as conn: - await conn.execute(SCHEMA_SQL) + try: + pool = await asyncpg.create_pool(DATABASE_URL, min_size=2, max_size=10) + if pool is None: + raise RuntimeError(f"Failed to create database pool for {DATABASE_URL}") + async with pool.acquire() as conn: + await conn.execute(SCHEMA_SQL) + except asyncpg.PostgresError as e: + pool = None + raise RuntimeError(f"Database connection failed: {e}") from e + except Exception as e: + pool = None + raise RuntimeError(f"Database initialization failed: {e}") from e async def close_db(): @@ -1088,6 +1112,8 @@ async def count_user_items(actor_id: str, item_type: Optional[str] = None) -> in async def get_run_cache(run_id: str) -> Optional[dict]: """Get cached run result by content-addressable run_id.""" + if pool is None: + raise RuntimeError("Database pool not initialized - call init_db() first") async with pool.acquire() as conn: row = await conn.fetchrow( """ @@ -1454,6 +1480,8 @@ async def create_pending_run( output_name: Optional[str] = None, ) -> dict: """Create a pending run record for durability.""" + if pool is None: + raise RuntimeError("Database pool not initialized - call init_db() first") async with pool.acquire() as conn: row = await conn.fetchrow( """ @@ -1483,10 +1511,12 @@ async def create_pending_run( async def get_pending_run(run_id: str) -> Optional[dict]: """Get a pending run by ID.""" + if pool is None: + raise RuntimeError("Database pool not initialized - call init_db() first") async with pool.acquire() as conn: row = await conn.fetchrow( """ - SELECT run_id, celery_task_id, status, recipe, inputs, dag_json, plan_cid, output_name, actor_id, error, created_at, updated_at + SELECT run_id, celery_task_id, status, recipe, inputs, dag_json, plan_cid, output_name, actor_id, error, ipfs_playlist_cid, created_at, updated_at FROM pending_runs WHERE run_id = $1 """, run_id @@ -1507,6 +1537,7 @@ async def get_pending_run(run_id: str) -> Optional[dict]: "output_name": row["output_name"], "actor_id": row["actor_id"], "error": row["error"], + "ipfs_playlist_cid": row["ipfs_playlist_cid"], "created_at": row["created_at"].isoformat() if row["created_at"] else None, "updated_at": row["updated_at"].isoformat() if row["updated_at"] else None, } @@ -1564,6 +1595,8 @@ async def list_pending_runs(actor_id: Optional[str] = None, status: Optional[str async def update_pending_run_status(run_id: str, status: str, error: Optional[str] = None) -> bool: """Update the status of a pending run.""" + if pool is None: + raise RuntimeError("Database pool not initialized - call init_db() first") async with pool.acquire() as conn: if error: result = await conn.execute( @@ -1580,6 +1613,8 @@ async def update_pending_run_status(run_id: str, status: str, error: Optional[st async def update_pending_run_plan(run_id: str, plan_cid: str) -> bool: """Update the plan_cid of a pending run (called when plan is generated).""" + if pool is None: + raise RuntimeError("Database pool not initialized - call init_db() first") async with pool.acquire() as conn: result = await conn.execute( "UPDATE pending_runs SET plan_cid = $2, updated_at = NOW() WHERE run_id = $1", @@ -1588,6 +1623,18 @@ async def update_pending_run_plan(run_id: str, plan_cid: str) -> bool: return "UPDATE 1" in result +async def update_pending_run_playlist(run_id: str, ipfs_playlist_cid: str) -> bool: + """Update the IPFS playlist CID of a streaming run.""" + if pool is None: + raise RuntimeError("Database pool not initialized - call init_db() first") + async with pool.acquire() as conn: + result = await conn.execute( + "UPDATE pending_runs SET ipfs_playlist_cid = $2, updated_at = NOW() WHERE run_id = $1", + run_id, ipfs_playlist_cid + ) + return "UPDATE 1" in result + + async def complete_pending_run(run_id: str) -> bool: """Remove a pending run after it completes (moves to run_cache).""" async with pool.acquire() as conn: diff --git a/tasks/streaming.py b/tasks/streaming.py index 9be53ee..7241b51 100644 --- a/tasks/streaming.py +++ b/tasks/streaming.py @@ -353,6 +353,24 @@ def run_stream( segment_cids = getattr(interp.output, 'segment_cids', {}) logger.info(f"IPFS HLS: playlist={ipfs_playlist_cid}, segments={len(segment_cids)}") + # Update pending run with playlist CID for live HLS redirect + if ipfs_playlist_cid: + global _resolve_loop, _db_initialized + import asyncio + import database + try: + if _resolve_loop is None or _resolve_loop.is_closed(): + _resolve_loop = asyncio.new_event_loop() + asyncio.set_event_loop(_resolve_loop) + if not _db_initialized: + _resolve_loop.run_until_complete(database.init_db()) + _db_initialized = True + _resolve_loop.run_until_complete(database.update_pending_run_playlist(run_id, ipfs_playlist_cid)) + logger.info(f"Updated pending run {run_id} with IPFS playlist CID: {ipfs_playlist_cid}") + except Exception as e: + logger.error(f"Failed to update pending run with playlist CID: {e}") + raise # Fail fast - database errors should not be silently ignored + # HLS output creates stream.m3u8 and segment_*.ts files in stream_dir hls_playlist = stream_dir / "stream.m3u8" @@ -439,7 +457,8 @@ def run_stream( )) logger.info(f"Saved run {run_id} to database with actor_id={actor_id}") except Exception as db_err: - logger.warning(f"Failed to save run to database: {db_err}") + logger.error(f"Failed to save run to database: {db_err}") + raise RuntimeError(f"Database error saving run {run_id}: {db_err}") from db_err return { "status": "completed",