Fix running runs not appearing in UI list
- Normalize Celery status names (started -> running) - Store full run metadata in Redis for pending runs (recipe, inputs, actor_id) - Filter pending runs by actor_id so users only see their own - Parse both old and new Redis task data formats for compatibility Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -102,10 +102,28 @@ class RunService:
|
||||
}
|
||||
|
||||
# Check if there's a running task
|
||||
task_id = self.redis.get(f"{self.task_key_prefix}{run_id}")
|
||||
if task_id:
|
||||
if isinstance(task_id, bytes):
|
||||
task_id = task_id.decode()
|
||||
task_data = self.redis.get(f"{self.task_key_prefix}{run_id}")
|
||||
if task_data:
|
||||
if isinstance(task_data, bytes):
|
||||
task_data = task_data.decode()
|
||||
|
||||
# Parse task data (supports both old format string and new JSON format)
|
||||
try:
|
||||
parsed = json.loads(task_data)
|
||||
task_id = parsed.get("task_id")
|
||||
task_actor_id = parsed.get("actor_id")
|
||||
task_recipe = parsed.get("recipe")
|
||||
task_inputs = parsed.get("inputs")
|
||||
task_output_name = parsed.get("output_name")
|
||||
task_created_at = parsed.get("created_at")
|
||||
except json.JSONDecodeError:
|
||||
# Old format: just the task_id string
|
||||
task_id = task_data
|
||||
task_actor_id = None
|
||||
task_recipe = None
|
||||
task_inputs = None
|
||||
task_output_name = None
|
||||
task_created_at = None
|
||||
|
||||
# Get task state from Celery
|
||||
from celery.result import AsyncResult
|
||||
@@ -114,10 +132,26 @@ class RunService:
|
||||
result = AsyncResult(task_id, app=celery_app)
|
||||
status = result.status.lower()
|
||||
|
||||
# Normalize Celery status names
|
||||
status_map = {
|
||||
"pending": "pending",
|
||||
"started": "running",
|
||||
"success": "completed",
|
||||
"failure": "failed",
|
||||
"retry": "running",
|
||||
"revoked": "failed",
|
||||
}
|
||||
normalized_status = status_map.get(status, status)
|
||||
|
||||
run_data = {
|
||||
"run_id": run_id,
|
||||
"status": status if status != "pending" else "pending",
|
||||
"status": normalized_status,
|
||||
"celery_task_id": task_id,
|
||||
"actor_id": task_actor_id,
|
||||
"recipe": task_recipe,
|
||||
"inputs": task_inputs,
|
||||
"output_name": task_output_name,
|
||||
"created_at": task_created_at,
|
||||
}
|
||||
|
||||
# If task completed, get result
|
||||
@@ -151,11 +185,13 @@ class RunService:
|
||||
)
|
||||
for key in keys:
|
||||
run_id = key.decode().replace(self.task_key_prefix, "") if isinstance(key, bytes) else key.replace(self.task_key_prefix, "")
|
||||
# Check if this run belongs to the user and isn't already in results
|
||||
# Check if this run isn't already in completed results
|
||||
if not any(r.get("run_id") == run_id for r in runs):
|
||||
run = await self.get_run(run_id)
|
||||
if run and run.get("status") in ("pending", "running"):
|
||||
pending.append(run)
|
||||
# Filter by actor_id
|
||||
if run.get("actor_id") == actor_id:
|
||||
pending.append(run)
|
||||
if cursor == 0:
|
||||
break
|
||||
|
||||
@@ -276,11 +312,19 @@ class RunService:
|
||||
return None, "Legacy mode only supports single-input recipes. Use use_dag=true for multi-input."
|
||||
task = render_effect.delay(input_list[0], recipe, output_name)
|
||||
|
||||
# Store task_id mapping in Redis (ephemeral)
|
||||
# Store task mapping in Redis (ephemeral) - includes metadata for list display
|
||||
task_data = json.dumps({
|
||||
"task_id": task.id,
|
||||
"actor_id": actor_id,
|
||||
"recipe": recipe,
|
||||
"inputs": input_list,
|
||||
"output_name": output_name,
|
||||
"created_at": datetime.now(timezone.utc).isoformat(),
|
||||
})
|
||||
self.redis.setex(
|
||||
f"{self.task_key_prefix}{run_id}",
|
||||
3600 * 24, # 24 hour TTL
|
||||
task.id
|
||||
task_data
|
||||
)
|
||||
|
||||
return {
|
||||
|
||||
Reference in New Issue
Block a user