diff --git a/app/services/recipe_service.py b/app/services/recipe_service.py index a61c39c..a81306b 100644 --- a/app/services/recipe_service.py +++ b/app/services/recipe_service.py @@ -46,23 +46,30 @@ class RecipeService: async def list_recipes(self, actor_id: str = None, offset: int = 0, limit: int = 20) -> list: """ - List available recipes. + List available recipes for a user. + + L1 data is isolated per-user - only shows recipes owned by actor_id. Note: This scans the cache for recipe files. For production, you might want a database index of recipes by owner. """ + import logging + logger = logging.getLogger(__name__) + # Get all cached items and filter for recipes - # This is a simplified implementation - production would use a proper index recipes = [] # Check if cache has a list method for recipes if hasattr(self.cache, 'list_by_type'): items = self.cache.list_by_type('recipe') + logger.info(f"Found {len(items)} recipe items in cache") for content_hash in items: recipe = await self.get_recipe(content_hash) if recipe: - # Filter by actor if specified - if actor_id is None or recipe.get("uploader") == actor_id: + uploader = recipe.get("uploader") + logger.info(f"Recipe {content_hash[:12]}: uploader={uploader}, actor_id={actor_id}") + # Filter by actor - L1 is per-user + if actor_id is None or uploader == actor_id: recipes.append(recipe) # Sort by name diff --git a/app/services/run_service.py b/app/services/run_service.py index 63892d5..525ff3f 100644 --- a/app/services/run_service.py +++ b/app/services/run_service.py @@ -101,7 +101,67 @@ class RunService: "completed_at": cached.get("created_at"), } - # Check if there's a running task + # Check database for pending run + pending = await self.db.get_pending_run(run_id) + if pending: + task_id = pending.get("celery_task_id") + if task_id: + # Check actual Celery task state + from celery.result import AsyncResult + from celery_app import app as celery_app + + result = AsyncResult(task_id, app=celery_app) + status = result.status.lower() + + # Normalize status + status_map = { + "pending": "pending", + "started": "running", + "success": "completed", + "failure": "failed", + "retry": "running", + "revoked": "failed", + } + normalized_status = status_map.get(status, status) + + run_data = { + "run_id": run_id, + "status": normalized_status, + "celery_task_id": task_id, + "actor_id": pending.get("actor_id"), + "recipe": pending.get("recipe"), + "inputs": pending.get("inputs"), + "output_name": pending.get("output_name"), + "created_at": pending.get("created_at"), + "error": pending.get("error"), + } + + # If task completed, get result + if result.ready(): + if result.successful(): + run_data["status"] = "completed" + task_result = result.result + if isinstance(task_result, dict): + run_data["output_hash"] = task_result.get("output_hash") + else: + run_data["status"] = "failed" + run_data["error"] = str(result.result) + + return run_data + + # No task_id but have pending record - return from DB + return { + "run_id": run_id, + "status": pending.get("status", "pending"), + "recipe": pending.get("recipe"), + "inputs": pending.get("inputs"), + "output_name": pending.get("output_name"), + "actor_id": pending.get("actor_id"), + "created_at": pending.get("created_at"), + "error": pending.get("error"), + } + + # Fallback: Check Redis for backwards compatibility task_data = self.redis.get(f"{self.task_key_prefix}{run_id}") if task_data: if isinstance(task_data, bytes): @@ -176,33 +236,28 @@ class RunService: return None async def list_runs(self, actor_id: str, offset: int = 0, limit: int = 20) -> list: - """List runs for a user. Returns completed runs from database.""" + """List runs for a user. Returns completed and pending runs from database.""" # Get completed runs from database - runs = await self.db.list_runs_by_actor(actor_id, offset=offset, limit=limit) + completed_runs = await self.db.list_runs_by_actor(actor_id, offset=0, limit=limit + 50) - # Also check for any pending tasks in Redis + # Get pending runs from database + pending_db = await self.db.list_pending_runs(actor_id=actor_id) + + # Convert pending runs to run format with live status check pending = [] - cursor = 0 - while True: - cursor, keys = self.redis.scan( - cursor=cursor, - match=f"{self.task_key_prefix}*", - count=100 - ) - for key in keys: - run_id = key.decode().replace(self.task_key_prefix, "") if isinstance(key, bytes) else key.replace(self.task_key_prefix, "") - # Check if this run isn't already in completed results - if not any(r.get("run_id") == run_id for r in runs): - run = await self.get_run(run_id) - if run and run.get("status") in ("pending", "running"): - # Filter by actor_id - if run.get("actor_id") == actor_id: - pending.append(run) - if cursor == 0: - break + for pr in pending_db: + run_id = pr.get("run_id") + # Skip if already in completed + if any(r.get("run_id") == run_id for r in completed_runs): + continue + + # Get live status + run = await self.get_run(run_id) + if run and run.get("status") in ("pending", "running"): + pending.append(run) # Combine and sort - all_runs = pending + runs + all_runs = pending + completed_runs all_runs.sort(key=lambda r: r.get("created_at", ""), reverse=True) return all_runs[offset:offset + limit] @@ -318,7 +373,18 @@ class RunService: return None, "Legacy mode only supports single-input recipes. Use use_dag=true for multi-input." task = render_effect.delay(input_list[0], recipe, output_name) - # Store task mapping in Redis (ephemeral) - includes metadata for list display + # Store pending run in database for durability + await self.db.create_pending_run( + run_id=run_id, + celery_task_id=task.id, + recipe=recipe, + inputs=input_list, + actor_id=actor_id, + dag_json=dag_json, + output_name=output_name, + ) + + # Also store in Redis for backwards compatibility (shorter TTL) task_data = json.dumps({ "task_id": task.id, "actor_id": actor_id, @@ -329,7 +395,7 @@ class RunService: }) self.redis.setex( f"{self.task_key_prefix}{run_id}", - 3600 * 24, # 24 hour TTL + 3600 * 4, # 4 hour TTL (database is primary now) task_data ) @@ -459,3 +525,123 @@ class RunService: def detect_media_type(self, path: Path) -> str: """Detect media type for a file path.""" return detect_media_type(path) + + async def recover_pending_runs(self) -> Dict[str, int]: + """ + Recover pending runs after restart. + + Checks all pending runs in the database and: + - Updates status for completed tasks + - Re-queues orphaned tasks that can be retried + - Marks as failed if unrecoverable + + Returns counts of recovered, completed, failed runs. + """ + from celery.result import AsyncResult + from celery_app import app as celery_app + + try: + from legacy_tasks import execute_dag + except ImportError: + return {"error": "Celery tasks not available"} + + stats = {"recovered": 0, "completed": 0, "failed": 0, "still_running": 0} + + # Get all pending/running runs from database + pending_runs = await self.db.list_pending_runs() + + for run in pending_runs: + run_id = run.get("run_id") + task_id = run.get("celery_task_id") + status = run.get("status") + + if not task_id: + # No task ID - try to re-queue if we have dag_json + dag_json = run.get("dag_json") + if dag_json: + try: + new_task = execute_dag.delay(dag_json, run_id) + await self.db.create_pending_run( + run_id=run_id, + celery_task_id=new_task.id, + recipe=run.get("recipe", "unknown"), + inputs=run.get("inputs", []), + actor_id=run.get("actor_id"), + dag_json=dag_json, + output_name=run.get("output_name"), + ) + stats["recovered"] += 1 + except Exception as e: + await self.db.update_pending_run_status( + run_id, "failed", f"Recovery failed: {e}" + ) + stats["failed"] += 1 + else: + await self.db.update_pending_run_status( + run_id, "failed", "No DAG data for recovery" + ) + stats["failed"] += 1 + continue + + # Check Celery task state + result = AsyncResult(task_id, app=celery_app) + celery_status = result.status.lower() + + if result.ready(): + if result.successful(): + # Task completed - move to run_cache + task_result = result.result + if isinstance(task_result, dict) and task_result.get("output_hash"): + await self.db.save_run_cache( + run_id=run_id, + output_hash=task_result["output_hash"], + recipe=run.get("recipe", "unknown"), + inputs=run.get("inputs", []), + ipfs_cid=task_result.get("ipfs_cid"), + provenance_cid=task_result.get("provenance_cid"), + actor_id=run.get("actor_id"), + ) + await self.db.complete_pending_run(run_id) + stats["completed"] += 1 + else: + await self.db.update_pending_run_status( + run_id, "failed", "Task completed but no output hash" + ) + stats["failed"] += 1 + else: + # Task failed + await self.db.update_pending_run_status( + run_id, "failed", str(result.result) + ) + stats["failed"] += 1 + elif celery_status in ("pending", "started", "retry"): + # Still running + stats["still_running"] += 1 + else: + # Unknown state - try to re-queue if we have dag_json + dag_json = run.get("dag_json") + if dag_json: + try: + new_task = execute_dag.delay(dag_json, run_id) + await self.db.create_pending_run( + run_id=run_id, + celery_task_id=new_task.id, + recipe=run.get("recipe", "unknown"), + inputs=run.get("inputs", []), + actor_id=run.get("actor_id"), + dag_json=dag_json, + output_name=run.get("output_name"), + ) + stats["recovered"] += 1 + except Exception as e: + await self.db.update_pending_run_status( + run_id, "failed", f"Recovery failed: {e}" + ) + stats["failed"] += 1 + else: + await self.db.update_pending_run_status( + run_id, "failed", f"Task in unknown state: {celery_status}" + ) + stats["failed"] += 1 + + return stats diff --git a/celery_app.py b/celery_app.py index 745866d..062859b 100644 --- a/celery_app.py +++ b/celery_app.py @@ -18,7 +18,7 @@ app = Celery( ) app.conf.update( - result_expires=3600, + result_expires=86400 * 7, # 7 days - allow time for recovery after restarts task_serializer='json', accept_content=['json', 'pickle'], # pickle needed for internal Celery messages result_serializer='json', @@ -26,8 +26,10 @@ app.conf.update( timezone='UTC', enable_utc=True, task_track_started=True, - task_acks_late=True, + task_acks_late=True, # Don't ack until task completes - survives worker restart worker_prefetch_multiplier=1, + task_reject_on_worker_lost=True, # Re-queue if worker dies + task_acks_on_failure_or_timeout=True, # Ack failed tasks so they don't retry forever ) if __name__ == '__main__': diff --git a/database.py b/database.py index 0a456d7..a0af580 100644 --- a/database.py +++ b/database.py @@ -91,6 +91,25 @@ CREATE TABLE IF NOT EXISTS run_cache ( created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ); +-- Pending/running runs: tracks in-progress work for durability +-- Allows runs to survive restarts and be recovered +CREATE TABLE IF NOT EXISTS pending_runs ( + run_id VARCHAR(64) PRIMARY KEY, + celery_task_id VARCHAR(128), + status VARCHAR(20) NOT NULL DEFAULT 'pending', -- pending, running, failed + recipe VARCHAR(255) NOT NULL, + inputs JSONB NOT NULL, + dag_json TEXT, + output_name VARCHAR(255), + actor_id VARCHAR(255), + error TEXT, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_pending_runs_status ON pending_runs(status); +CREATE INDEX IF NOT EXISTS idx_pending_runs_actor ON pending_runs(actor_id); + -- User storage backends (synced from L2 or configured locally) CREATE TABLE IF NOT EXISTS storage_backends ( id SERIAL PRIMARY KEY, @@ -1357,3 +1376,170 @@ async def get_pins_for_content(content_hash: str) -> List[dict]: content_hash ) return [dict(row) for row in rows] + + +# ============ Pending Runs ============ + +async def create_pending_run( + run_id: str, + celery_task_id: str, + recipe: str, + inputs: List[str], + actor_id: str, + dag_json: Optional[str] = None, + output_name: Optional[str] = None, +) -> dict: + """Create a pending run record for durability.""" + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + INSERT INTO pending_runs (run_id, celery_task_id, status, recipe, inputs, dag_json, output_name, actor_id) + VALUES ($1, $2, 'running', $3, $4, $5, $6, $7) + ON CONFLICT (run_id) DO UPDATE SET + celery_task_id = EXCLUDED.celery_task_id, + status = 'running', + updated_at = NOW() + RETURNING run_id, celery_task_id, status, recipe, inputs, dag_json, output_name, actor_id, created_at, updated_at + """, + run_id, celery_task_id, recipe, _json.dumps(inputs), dag_json, output_name, actor_id + ) + return { + "run_id": row["run_id"], + "celery_task_id": row["celery_task_id"], + "status": row["status"], + "recipe": row["recipe"], + "inputs": row["inputs"], + "dag_json": row["dag_json"], + "output_name": row["output_name"], + "actor_id": row["actor_id"], + "created_at": row["created_at"].isoformat() if row["created_at"] else None, + "updated_at": row["updated_at"].isoformat() if row["updated_at"] else None, + } + + +async def get_pending_run(run_id: str) -> Optional[dict]: + """Get a pending run by ID.""" + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT run_id, celery_task_id, status, recipe, inputs, dag_json, output_name, actor_id, error, created_at, updated_at + FROM pending_runs WHERE run_id = $1 + """, + run_id + ) + if row: + return { + "run_id": row["run_id"], + "celery_task_id": row["celery_task_id"], + "status": row["status"], + "recipe": row["recipe"], + "inputs": row["inputs"], + "dag_json": row["dag_json"], + "output_name": row["output_name"], + "actor_id": row["actor_id"], + "error": row["error"], + "created_at": row["created_at"].isoformat() if row["created_at"] else None, + "updated_at": row["updated_at"].isoformat() if row["updated_at"] else None, + } + return None + + +async def list_pending_runs(actor_id: Optional[str] = None, status: Optional[str] = None) -> List[dict]: + """List pending runs, optionally filtered by actor and/or status.""" + async with pool.acquire() as conn: + conditions = [] + params = [] + param_idx = 1 + + if actor_id: + conditions.append(f"actor_id = ${param_idx}") + params.append(actor_id) + param_idx += 1 + + if status: + conditions.append(f"status = ${param_idx}") + params.append(status) + param_idx += 1 + + where_clause = " AND ".join(conditions) if conditions else "TRUE" + + rows = await conn.fetch( + f""" + SELECT run_id, celery_task_id, status, recipe, inputs, output_name, actor_id, error, created_at, updated_at + FROM pending_runs + WHERE {where_clause} + ORDER BY created_at DESC + """, + *params + ) + return [ + { + "run_id": row["run_id"], + "celery_task_id": row["celery_task_id"], + "status": row["status"], + "recipe": row["recipe"], + "inputs": row["inputs"], + "output_name": row["output_name"], + "actor_id": row["actor_id"], + "error": row["error"], + "created_at": row["created_at"].isoformat() if row["created_at"] else None, + "updated_at": row["updated_at"].isoformat() if row["updated_at"] else None, + } + for row in rows + ] + + +async def update_pending_run_status(run_id: str, status: str, error: Optional[str] = None) -> bool: + """Update the status of a pending run.""" + async with pool.acquire() as conn: + if error: + result = await conn.execute( + "UPDATE pending_runs SET status = $2, error = $3, updated_at = NOW() WHERE run_id = $1", + run_id, status, error + ) + else: + result = await conn.execute( + "UPDATE pending_runs SET status = $2, updated_at = NOW() WHERE run_id = $1", + run_id, status + ) + return "UPDATE 1" in result + + +async def complete_pending_run(run_id: str) -> bool: + """Remove a pending run after it completes (moves to run_cache).""" + async with pool.acquire() as conn: + result = await conn.execute( + "DELETE FROM pending_runs WHERE run_id = $1", + run_id + ) + return "DELETE 1" in result + + +async def get_stale_pending_runs(older_than_hours: int = 24) -> List[dict]: + """Get pending runs that haven't been updated recently (for recovery).""" + async with pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT run_id, celery_task_id, status, recipe, inputs, dag_json, output_name, actor_id, created_at, updated_at + FROM pending_runs + WHERE status IN ('pending', 'running') + AND updated_at < NOW() - INTERVAL '%s hours' + ORDER BY created_at + """, + older_than_hours + ) + return [ + { + "run_id": row["run_id"], + "celery_task_id": row["celery_task_id"], + "status": row["status"], + "recipe": row["recipe"], + "inputs": row["inputs"], + "dag_json": row["dag_json"], + "output_name": row["output_name"], + "actor_id": row["actor_id"], + "created_at": row["created_at"].isoformat() if row["created_at"] else None, + "updated_at": row["updated_at"].isoformat() if row["updated_at"] else None, + } + for row in rows + ]