Add ipfs_playlist_cid to pending_runs and fail-fast on DB errors
- Add ipfs_playlist_cid column to pending_runs schema with migration - Add pool guards to critical database functions (RuntimeError if not initialized) - Add update_pending_run_playlist() function for streaming - Update streaming task to save playlist CID to DB for HLS redirect - Change database error handling from warning to raising exception Errors should fail fast and explicitly, not be silently swallowed. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
57
database.py
57
database.py
@@ -94,10 +94,20 @@ CREATE TABLE IF NOT EXISTS pending_runs (
|
|||||||
output_name VARCHAR(255),
|
output_name VARCHAR(255),
|
||||||
actor_id VARCHAR(255),
|
actor_id VARCHAR(255),
|
||||||
error TEXT,
|
error TEXT,
|
||||||
|
ipfs_playlist_cid VARCHAR(128), -- For streaming: IPFS CID of HLS playlist
|
||||||
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
||||||
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
|
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
-- Add ipfs_playlist_cid if table exists but column doesn't (migration)
|
||||||
|
DO $$
|
||||||
|
BEGIN
|
||||||
|
IF NOT EXISTS (SELECT 1 FROM information_schema.columns
|
||||||
|
WHERE table_name = 'pending_runs' AND column_name = 'ipfs_playlist_cid') THEN
|
||||||
|
ALTER TABLE pending_runs ADD COLUMN ipfs_playlist_cid VARCHAR(128);
|
||||||
|
END IF;
|
||||||
|
END $$;
|
||||||
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_pending_runs_status ON pending_runs(status);
|
CREATE INDEX IF NOT EXISTS idx_pending_runs_status ON pending_runs(status);
|
||||||
CREATE INDEX IF NOT EXISTS idx_pending_runs_actor ON pending_runs(actor_id);
|
CREATE INDEX IF NOT EXISTS idx_pending_runs_actor ON pending_runs(actor_id);
|
||||||
|
|
||||||
@@ -166,13 +176,27 @@ CREATE INDEX IF NOT EXISTS idx_friendly_names_latest ON friendly_names(actor_id,
|
|||||||
|
|
||||||
|
|
||||||
async def init_db():
|
async def init_db():
|
||||||
"""Initialize database connection pool and create schema."""
|
"""Initialize database connection pool and create schema.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
asyncpg.PostgresError: If database connection fails
|
||||||
|
RuntimeError: If pool creation fails
|
||||||
|
"""
|
||||||
global pool
|
global pool
|
||||||
if pool is not None:
|
if pool is not None:
|
||||||
return # Already initialized
|
return # Already initialized
|
||||||
pool = await asyncpg.create_pool(DATABASE_URL, min_size=2, max_size=10)
|
try:
|
||||||
async with pool.acquire() as conn:
|
pool = await asyncpg.create_pool(DATABASE_URL, min_size=2, max_size=10)
|
||||||
await conn.execute(SCHEMA_SQL)
|
if pool is None:
|
||||||
|
raise RuntimeError(f"Failed to create database pool for {DATABASE_URL}")
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
await conn.execute(SCHEMA_SQL)
|
||||||
|
except asyncpg.PostgresError as e:
|
||||||
|
pool = None
|
||||||
|
raise RuntimeError(f"Database connection failed: {e}") from e
|
||||||
|
except Exception as e:
|
||||||
|
pool = None
|
||||||
|
raise RuntimeError(f"Database initialization failed: {e}") from e
|
||||||
|
|
||||||
|
|
||||||
async def close_db():
|
async def close_db():
|
||||||
@@ -1088,6 +1112,8 @@ async def count_user_items(actor_id: str, item_type: Optional[str] = None) -> in
|
|||||||
|
|
||||||
async def get_run_cache(run_id: str) -> Optional[dict]:
|
async def get_run_cache(run_id: str) -> Optional[dict]:
|
||||||
"""Get cached run result by content-addressable run_id."""
|
"""Get cached run result by content-addressable run_id."""
|
||||||
|
if pool is None:
|
||||||
|
raise RuntimeError("Database pool not initialized - call init_db() first")
|
||||||
async with pool.acquire() as conn:
|
async with pool.acquire() as conn:
|
||||||
row = await conn.fetchrow(
|
row = await conn.fetchrow(
|
||||||
"""
|
"""
|
||||||
@@ -1454,6 +1480,8 @@ async def create_pending_run(
|
|||||||
output_name: Optional[str] = None,
|
output_name: Optional[str] = None,
|
||||||
) -> dict:
|
) -> dict:
|
||||||
"""Create a pending run record for durability."""
|
"""Create a pending run record for durability."""
|
||||||
|
if pool is None:
|
||||||
|
raise RuntimeError("Database pool not initialized - call init_db() first")
|
||||||
async with pool.acquire() as conn:
|
async with pool.acquire() as conn:
|
||||||
row = await conn.fetchrow(
|
row = await conn.fetchrow(
|
||||||
"""
|
"""
|
||||||
@@ -1483,10 +1511,12 @@ async def create_pending_run(
|
|||||||
|
|
||||||
async def get_pending_run(run_id: str) -> Optional[dict]:
|
async def get_pending_run(run_id: str) -> Optional[dict]:
|
||||||
"""Get a pending run by ID."""
|
"""Get a pending run by ID."""
|
||||||
|
if pool is None:
|
||||||
|
raise RuntimeError("Database pool not initialized - call init_db() first")
|
||||||
async with pool.acquire() as conn:
|
async with pool.acquire() as conn:
|
||||||
row = await conn.fetchrow(
|
row = await conn.fetchrow(
|
||||||
"""
|
"""
|
||||||
SELECT run_id, celery_task_id, status, recipe, inputs, dag_json, plan_cid, output_name, actor_id, error, created_at, updated_at
|
SELECT run_id, celery_task_id, status, recipe, inputs, dag_json, plan_cid, output_name, actor_id, error, ipfs_playlist_cid, created_at, updated_at
|
||||||
FROM pending_runs WHERE run_id = $1
|
FROM pending_runs WHERE run_id = $1
|
||||||
""",
|
""",
|
||||||
run_id
|
run_id
|
||||||
@@ -1507,6 +1537,7 @@ async def get_pending_run(run_id: str) -> Optional[dict]:
|
|||||||
"output_name": row["output_name"],
|
"output_name": row["output_name"],
|
||||||
"actor_id": row["actor_id"],
|
"actor_id": row["actor_id"],
|
||||||
"error": row["error"],
|
"error": row["error"],
|
||||||
|
"ipfs_playlist_cid": row["ipfs_playlist_cid"],
|
||||||
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
|
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
|
||||||
"updated_at": row["updated_at"].isoformat() if row["updated_at"] else None,
|
"updated_at": row["updated_at"].isoformat() if row["updated_at"] else None,
|
||||||
}
|
}
|
||||||
@@ -1564,6 +1595,8 @@ async def list_pending_runs(actor_id: Optional[str] = None, status: Optional[str
|
|||||||
|
|
||||||
async def update_pending_run_status(run_id: str, status: str, error: Optional[str] = None) -> bool:
|
async def update_pending_run_status(run_id: str, status: str, error: Optional[str] = None) -> bool:
|
||||||
"""Update the status of a pending run."""
|
"""Update the status of a pending run."""
|
||||||
|
if pool is None:
|
||||||
|
raise RuntimeError("Database pool not initialized - call init_db() first")
|
||||||
async with pool.acquire() as conn:
|
async with pool.acquire() as conn:
|
||||||
if error:
|
if error:
|
||||||
result = await conn.execute(
|
result = await conn.execute(
|
||||||
@@ -1580,6 +1613,8 @@ async def update_pending_run_status(run_id: str, status: str, error: Optional[st
|
|||||||
|
|
||||||
async def update_pending_run_plan(run_id: str, plan_cid: str) -> bool:
|
async def update_pending_run_plan(run_id: str, plan_cid: str) -> bool:
|
||||||
"""Update the plan_cid of a pending run (called when plan is generated)."""
|
"""Update the plan_cid of a pending run (called when plan is generated)."""
|
||||||
|
if pool is None:
|
||||||
|
raise RuntimeError("Database pool not initialized - call init_db() first")
|
||||||
async with pool.acquire() as conn:
|
async with pool.acquire() as conn:
|
||||||
result = await conn.execute(
|
result = await conn.execute(
|
||||||
"UPDATE pending_runs SET plan_cid = $2, updated_at = NOW() WHERE run_id = $1",
|
"UPDATE pending_runs SET plan_cid = $2, updated_at = NOW() WHERE run_id = $1",
|
||||||
@@ -1588,6 +1623,18 @@ async def update_pending_run_plan(run_id: str, plan_cid: str) -> bool:
|
|||||||
return "UPDATE 1" in result
|
return "UPDATE 1" in result
|
||||||
|
|
||||||
|
|
||||||
|
async def update_pending_run_playlist(run_id: str, ipfs_playlist_cid: str) -> bool:
|
||||||
|
"""Update the IPFS playlist CID of a streaming run."""
|
||||||
|
if pool is None:
|
||||||
|
raise RuntimeError("Database pool not initialized - call init_db() first")
|
||||||
|
async with pool.acquire() as conn:
|
||||||
|
result = await conn.execute(
|
||||||
|
"UPDATE pending_runs SET ipfs_playlist_cid = $2, updated_at = NOW() WHERE run_id = $1",
|
||||||
|
run_id, ipfs_playlist_cid
|
||||||
|
)
|
||||||
|
return "UPDATE 1" in result
|
||||||
|
|
||||||
|
|
||||||
async def complete_pending_run(run_id: str) -> bool:
|
async def complete_pending_run(run_id: str) -> bool:
|
||||||
"""Remove a pending run after it completes (moves to run_cache)."""
|
"""Remove a pending run after it completes (moves to run_cache)."""
|
||||||
async with pool.acquire() as conn:
|
async with pool.acquire() as conn:
|
||||||
|
|||||||
@@ -353,6 +353,24 @@ def run_stream(
|
|||||||
segment_cids = getattr(interp.output, 'segment_cids', {})
|
segment_cids = getattr(interp.output, 'segment_cids', {})
|
||||||
logger.info(f"IPFS HLS: playlist={ipfs_playlist_cid}, segments={len(segment_cids)}")
|
logger.info(f"IPFS HLS: playlist={ipfs_playlist_cid}, segments={len(segment_cids)}")
|
||||||
|
|
||||||
|
# Update pending run with playlist CID for live HLS redirect
|
||||||
|
if ipfs_playlist_cid:
|
||||||
|
global _resolve_loop, _db_initialized
|
||||||
|
import asyncio
|
||||||
|
import database
|
||||||
|
try:
|
||||||
|
if _resolve_loop is None or _resolve_loop.is_closed():
|
||||||
|
_resolve_loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(_resolve_loop)
|
||||||
|
if not _db_initialized:
|
||||||
|
_resolve_loop.run_until_complete(database.init_db())
|
||||||
|
_db_initialized = True
|
||||||
|
_resolve_loop.run_until_complete(database.update_pending_run_playlist(run_id, ipfs_playlist_cid))
|
||||||
|
logger.info(f"Updated pending run {run_id} with IPFS playlist CID: {ipfs_playlist_cid}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to update pending run with playlist CID: {e}")
|
||||||
|
raise # Fail fast - database errors should not be silently ignored
|
||||||
|
|
||||||
# HLS output creates stream.m3u8 and segment_*.ts files in stream_dir
|
# HLS output creates stream.m3u8 and segment_*.ts files in stream_dir
|
||||||
hls_playlist = stream_dir / "stream.m3u8"
|
hls_playlist = stream_dir / "stream.m3u8"
|
||||||
|
|
||||||
@@ -439,7 +457,8 @@ def run_stream(
|
|||||||
))
|
))
|
||||||
logger.info(f"Saved run {run_id} to database with actor_id={actor_id}")
|
logger.info(f"Saved run {run_id} to database with actor_id={actor_id}")
|
||||||
except Exception as db_err:
|
except Exception as db_err:
|
||||||
logger.warning(f"Failed to save run to database: {db_err}")
|
logger.error(f"Failed to save run to database: {db_err}")
|
||||||
|
raise RuntimeError(f"Database error saving run {run_id}: {db_err}") from db_err
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"status": "completed",
|
"status": "completed",
|
||||||
|
|||||||
Reference in New Issue
Block a user