Add durable pending runs and recipe list debugging

- Store pending runs in PostgreSQL for durability across restarts
- Add recovery method for orphaned runs
- Increase Celery result_expires to 7 days
- Add task_reject_on_worker_lost for automatic re-queuing
- Add logging to recipe list to debug filter issues

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-11 20:35:00 +00:00
parent a6dd470623
commit 8ab0f05a7d
4 changed files with 412 additions and 31 deletions

View File

@@ -46,23 +46,30 @@ class RecipeService:
async def list_recipes(self, actor_id: str = None, offset: int = 0, limit: int = 20) -> list:
"""
List available recipes.
List available recipes for a user.
L1 data is isolated per-user - only shows recipes owned by actor_id.
Note: This scans the cache for recipe files. For production,
you might want a database index of recipes by owner.
"""
import logging
logger = logging.getLogger(__name__)
# Get all cached items and filter for recipes
# This is a simplified implementation - production would use a proper index
recipes = []
# Check if cache has a list method for recipes
if hasattr(self.cache, 'list_by_type'):
items = self.cache.list_by_type('recipe')
logger.info(f"Found {len(items)} recipe items in cache")
for content_hash in items:
recipe = await self.get_recipe(content_hash)
if recipe:
# Filter by actor if specified
if actor_id is None or recipe.get("uploader") == actor_id:
uploader = recipe.get("uploader")
logger.info(f"Recipe {content_hash[:12]}: uploader={uploader}, actor_id={actor_id}")
# Filter by actor - L1 is per-user
if actor_id is None or uploader == actor_id:
recipes.append(recipe)
# Sort by name

View File

@@ -101,7 +101,67 @@ class RunService:
"completed_at": cached.get("created_at"),
}
# Check if there's a running task
# Check database for pending run
pending = await self.db.get_pending_run(run_id)
if pending:
task_id = pending.get("celery_task_id")
if task_id:
# Check actual Celery task state
from celery.result import AsyncResult
from celery_app import app as celery_app
result = AsyncResult(task_id, app=celery_app)
status = result.status.lower()
# Normalize status
status_map = {
"pending": "pending",
"started": "running",
"success": "completed",
"failure": "failed",
"retry": "running",
"revoked": "failed",
}
normalized_status = status_map.get(status, status)
run_data = {
"run_id": run_id,
"status": normalized_status,
"celery_task_id": task_id,
"actor_id": pending.get("actor_id"),
"recipe": pending.get("recipe"),
"inputs": pending.get("inputs"),
"output_name": pending.get("output_name"),
"created_at": pending.get("created_at"),
"error": pending.get("error"),
}
# If task completed, get result
if result.ready():
if result.successful():
run_data["status"] = "completed"
task_result = result.result
if isinstance(task_result, dict):
run_data["output_hash"] = task_result.get("output_hash")
else:
run_data["status"] = "failed"
run_data["error"] = str(result.result)
return run_data
# No task_id but have pending record - return from DB
return {
"run_id": run_id,
"status": pending.get("status", "pending"),
"recipe": pending.get("recipe"),
"inputs": pending.get("inputs"),
"output_name": pending.get("output_name"),
"actor_id": pending.get("actor_id"),
"created_at": pending.get("created_at"),
"error": pending.get("error"),
}
# Fallback: Check Redis for backwards compatibility
task_data = self.redis.get(f"{self.task_key_prefix}{run_id}")
if task_data:
if isinstance(task_data, bytes):
@@ -176,33 +236,28 @@ class RunService:
return None
async def list_runs(self, actor_id: str, offset: int = 0, limit: int = 20) -> list:
"""List runs for a user. Returns completed runs from database."""
"""List runs for a user. Returns completed and pending runs from database."""
# Get completed runs from database
runs = await self.db.list_runs_by_actor(actor_id, offset=offset, limit=limit)
completed_runs = await self.db.list_runs_by_actor(actor_id, offset=0, limit=limit + 50)
# Also check for any pending tasks in Redis
# Get pending runs from database
pending_db = await self.db.list_pending_runs(actor_id=actor_id)
# Convert pending runs to run format with live status check
pending = []
cursor = 0
while True:
cursor, keys = self.redis.scan(
cursor=cursor,
match=f"{self.task_key_prefix}*",
count=100
)
for key in keys:
run_id = key.decode().replace(self.task_key_prefix, "") if isinstance(key, bytes) else key.replace(self.task_key_prefix, "")
# Check if this run isn't already in completed results
if not any(r.get("run_id") == run_id for r in runs):
run = await self.get_run(run_id)
if run and run.get("status") in ("pending", "running"):
# Filter by actor_id
if run.get("actor_id") == actor_id:
pending.append(run)
if cursor == 0:
break
for pr in pending_db:
run_id = pr.get("run_id")
# Skip if already in completed
if any(r.get("run_id") == run_id for r in completed_runs):
continue
# Get live status
run = await self.get_run(run_id)
if run and run.get("status") in ("pending", "running"):
pending.append(run)
# Combine and sort
all_runs = pending + runs
all_runs = pending + completed_runs
all_runs.sort(key=lambda r: r.get("created_at", ""), reverse=True)
return all_runs[offset:offset + limit]
@@ -318,7 +373,18 @@ class RunService:
return None, "Legacy mode only supports single-input recipes. Use use_dag=true for multi-input."
task = render_effect.delay(input_list[0], recipe, output_name)
# Store task mapping in Redis (ephemeral) - includes metadata for list display
# Store pending run in database for durability
await self.db.create_pending_run(
run_id=run_id,
celery_task_id=task.id,
recipe=recipe,
inputs=input_list,
actor_id=actor_id,
dag_json=dag_json,
output_name=output_name,
)
# Also store in Redis for backwards compatibility (shorter TTL)
task_data = json.dumps({
"task_id": task.id,
"actor_id": actor_id,
@@ -329,7 +395,7 @@ class RunService:
})
self.redis.setex(
f"{self.task_key_prefix}{run_id}",
3600 * 24, # 24 hour TTL
3600 * 4, # 4 hour TTL (database is primary now)
task_data
)
@@ -459,3 +525,123 @@ class RunService:
def detect_media_type(self, path: Path) -> str:
"""Detect media type for a file path."""
return detect_media_type(path)
async def recover_pending_runs(self) -> Dict[str, int]:
"""
Recover pending runs after restart.
Checks all pending runs in the database and:
- Updates status for completed tasks
- Re-queues orphaned tasks that can be retried
- Marks as failed if unrecoverable
Returns counts of recovered, completed, failed runs.
"""
from celery.result import AsyncResult
from celery_app import app as celery_app
try:
from legacy_tasks import execute_dag
except ImportError:
return {"error": "Celery tasks not available"}
stats = {"recovered": 0, "completed": 0, "failed": 0, "still_running": 0}
# Get all pending/running runs from database
pending_runs = await self.db.list_pending_runs()
for run in pending_runs:
run_id = run.get("run_id")
task_id = run.get("celery_task_id")
status = run.get("status")
if not task_id:
# No task ID - try to re-queue if we have dag_json
dag_json = run.get("dag_json")
if dag_json:
try:
new_task = execute_dag.delay(dag_json, run_id)
await self.db.create_pending_run(
run_id=run_id,
celery_task_id=new_task.id,
recipe=run.get("recipe", "unknown"),
inputs=run.get("inputs", []),
actor_id=run.get("actor_id"),
dag_json=dag_json,
output_name=run.get("output_name"),
)
stats["recovered"] += 1
except Exception as e:
await self.db.update_pending_run_status(
run_id, "failed", f"Recovery failed: {e}"
)
stats["failed"] += 1
else:
await self.db.update_pending_run_status(
run_id, "failed", "No DAG data for recovery"
)
stats["failed"] += 1
continue
# Check Celery task state
result = AsyncResult(task_id, app=celery_app)
celery_status = result.status.lower()
if result.ready():
if result.successful():
# Task completed - move to run_cache
task_result = result.result
if isinstance(task_result, dict) and task_result.get("output_hash"):
await self.db.save_run_cache(
run_id=run_id,
output_hash=task_result["output_hash"],
recipe=run.get("recipe", "unknown"),
inputs=run.get("inputs", []),
ipfs_cid=task_result.get("ipfs_cid"),
provenance_cid=task_result.get("provenance_cid"),
actor_id=run.get("actor_id"),
)
await self.db.complete_pending_run(run_id)
stats["completed"] += 1
else:
await self.db.update_pending_run_status(
run_id, "failed", "Task completed but no output hash"
)
stats["failed"] += 1
else:
# Task failed
await self.db.update_pending_run_status(
run_id, "failed", str(result.result)
)
stats["failed"] += 1
elif celery_status in ("pending", "started", "retry"):
# Still running
stats["still_running"] += 1
else:
# Unknown state - try to re-queue if we have dag_json
dag_json = run.get("dag_json")
if dag_json:
try:
new_task = execute_dag.delay(dag_json, run_id)
await self.db.create_pending_run(
run_id=run_id,
celery_task_id=new_task.id,
recipe=run.get("recipe", "unknown"),
inputs=run.get("inputs", []),
actor_id=run.get("actor_id"),
dag_json=dag_json,
output_name=run.get("output_name"),
)
stats["recovered"] += 1
except Exception as e:
await self.db.update_pending_run_status(
run_id, "failed", f"Recovery failed: {e}"
)
stats["failed"] += 1
else:
await self.db.update_pending_run_status(
run_id, "failed", f"Task in unknown state: {celery_status}"
)
stats["failed"] += 1
return stats

View File

@@ -18,7 +18,7 @@ app = Celery(
)
app.conf.update(
result_expires=3600,
result_expires=86400 * 7, # 7 days - allow time for recovery after restarts
task_serializer='json',
accept_content=['json', 'pickle'], # pickle needed for internal Celery messages
result_serializer='json',
@@ -26,8 +26,10 @@ app.conf.update(
timezone='UTC',
enable_utc=True,
task_track_started=True,
task_acks_late=True,
task_acks_late=True, # Don't ack until task completes - survives worker restart
worker_prefetch_multiplier=1,
task_reject_on_worker_lost=True, # Re-queue if worker dies
task_acks_on_failure_or_timeout=True, # Ack failed tasks so they don't retry forever
)
if __name__ == '__main__':

View File

@@ -91,6 +91,25 @@ CREATE TABLE IF NOT EXISTS run_cache (
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Pending/running runs: tracks in-progress work for durability
-- Allows runs to survive restarts and be recovered
CREATE TABLE IF NOT EXISTS pending_runs (
run_id VARCHAR(64) PRIMARY KEY,
celery_task_id VARCHAR(128),
status VARCHAR(20) NOT NULL DEFAULT 'pending', -- pending, running, failed
recipe VARCHAR(255) NOT NULL,
inputs JSONB NOT NULL,
dag_json TEXT,
output_name VARCHAR(255),
actor_id VARCHAR(255),
error TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_pending_runs_status ON pending_runs(status);
CREATE INDEX IF NOT EXISTS idx_pending_runs_actor ON pending_runs(actor_id);
-- User storage backends (synced from L2 or configured locally)
CREATE TABLE IF NOT EXISTS storage_backends (
id SERIAL PRIMARY KEY,
@@ -1357,3 +1376,170 @@ async def get_pins_for_content(content_hash: str) -> List[dict]:
content_hash
)
return [dict(row) for row in rows]
# ============ Pending Runs ============
async def create_pending_run(
run_id: str,
celery_task_id: str,
recipe: str,
inputs: List[str],
actor_id: str,
dag_json: Optional[str] = None,
output_name: Optional[str] = None,
) -> dict:
"""Create a pending run record for durability."""
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO pending_runs (run_id, celery_task_id, status, recipe, inputs, dag_json, output_name, actor_id)
VALUES ($1, $2, 'running', $3, $4, $5, $6, $7)
ON CONFLICT (run_id) DO UPDATE SET
celery_task_id = EXCLUDED.celery_task_id,
status = 'running',
updated_at = NOW()
RETURNING run_id, celery_task_id, status, recipe, inputs, dag_json, output_name, actor_id, created_at, updated_at
""",
run_id, celery_task_id, recipe, _json.dumps(inputs), dag_json, output_name, actor_id
)
return {
"run_id": row["run_id"],
"celery_task_id": row["celery_task_id"],
"status": row["status"],
"recipe": row["recipe"],
"inputs": row["inputs"],
"dag_json": row["dag_json"],
"output_name": row["output_name"],
"actor_id": row["actor_id"],
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
"updated_at": row["updated_at"].isoformat() if row["updated_at"] else None,
}
async def get_pending_run(run_id: str) -> Optional[dict]:
"""Get a pending run by ID."""
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT run_id, celery_task_id, status, recipe, inputs, dag_json, output_name, actor_id, error, created_at, updated_at
FROM pending_runs WHERE run_id = $1
""",
run_id
)
if row:
return {
"run_id": row["run_id"],
"celery_task_id": row["celery_task_id"],
"status": row["status"],
"recipe": row["recipe"],
"inputs": row["inputs"],
"dag_json": row["dag_json"],
"output_name": row["output_name"],
"actor_id": row["actor_id"],
"error": row["error"],
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
"updated_at": row["updated_at"].isoformat() if row["updated_at"] else None,
}
return None
async def list_pending_runs(actor_id: Optional[str] = None, status: Optional[str] = None) -> List[dict]:
"""List pending runs, optionally filtered by actor and/or status."""
async with pool.acquire() as conn:
conditions = []
params = []
param_idx = 1
if actor_id:
conditions.append(f"actor_id = ${param_idx}")
params.append(actor_id)
param_idx += 1
if status:
conditions.append(f"status = ${param_idx}")
params.append(status)
param_idx += 1
where_clause = " AND ".join(conditions) if conditions else "TRUE"
rows = await conn.fetch(
f"""
SELECT run_id, celery_task_id, status, recipe, inputs, output_name, actor_id, error, created_at, updated_at
FROM pending_runs
WHERE {where_clause}
ORDER BY created_at DESC
""",
*params
)
return [
{
"run_id": row["run_id"],
"celery_task_id": row["celery_task_id"],
"status": row["status"],
"recipe": row["recipe"],
"inputs": row["inputs"],
"output_name": row["output_name"],
"actor_id": row["actor_id"],
"error": row["error"],
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
"updated_at": row["updated_at"].isoformat() if row["updated_at"] else None,
}
for row in rows
]
async def update_pending_run_status(run_id: str, status: str, error: Optional[str] = None) -> bool:
"""Update the status of a pending run."""
async with pool.acquire() as conn:
if error:
result = await conn.execute(
"UPDATE pending_runs SET status = $2, error = $3, updated_at = NOW() WHERE run_id = $1",
run_id, status, error
)
else:
result = await conn.execute(
"UPDATE pending_runs SET status = $2, updated_at = NOW() WHERE run_id = $1",
run_id, status
)
return "UPDATE 1" in result
async def complete_pending_run(run_id: str) -> bool:
"""Remove a pending run after it completes (moves to run_cache)."""
async with pool.acquire() as conn:
result = await conn.execute(
"DELETE FROM pending_runs WHERE run_id = $1",
run_id
)
return "DELETE 1" in result
async def get_stale_pending_runs(older_than_hours: int = 24) -> List[dict]:
"""Get pending runs that haven't been updated recently (for recovery)."""
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT run_id, celery_task_id, status, recipe, inputs, dag_json, output_name, actor_id, created_at, updated_at
FROM pending_runs
WHERE status IN ('pending', 'running')
AND updated_at < NOW() - INTERVAL '%s hours'
ORDER BY created_at
""",
older_than_hours
)
return [
{
"run_id": row["run_id"],
"celery_task_id": row["celery_task_id"],
"status": row["status"],
"recipe": row["recipe"],
"inputs": row["inputs"],
"dag_json": row["dag_json"],
"output_name": row["output_name"],
"actor_id": row["actor_id"],
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
"updated_at": row["updated_at"].isoformat() if row["updated_at"] else None,
}
for row in rows
]