diff --git a/app/services/run_service.py b/app/services/run_service.py index 7a34635..9b514e7 100644 --- a/app/services/run_service.py +++ b/app/services/run_service.py @@ -102,10 +102,28 @@ class RunService: } # Check if there's a running task - task_id = self.redis.get(f"{self.task_key_prefix}{run_id}") - if task_id: - if isinstance(task_id, bytes): - task_id = task_id.decode() + task_data = self.redis.get(f"{self.task_key_prefix}{run_id}") + if task_data: + if isinstance(task_data, bytes): + task_data = task_data.decode() + + # Parse task data (supports both old format string and new JSON format) + try: + parsed = json.loads(task_data) + task_id = parsed.get("task_id") + task_actor_id = parsed.get("actor_id") + task_recipe = parsed.get("recipe") + task_inputs = parsed.get("inputs") + task_output_name = parsed.get("output_name") + task_created_at = parsed.get("created_at") + except json.JSONDecodeError: + # Old format: just the task_id string + task_id = task_data + task_actor_id = None + task_recipe = None + task_inputs = None + task_output_name = None + task_created_at = None # Get task state from Celery from celery.result import AsyncResult @@ -114,10 +132,26 @@ class RunService: result = AsyncResult(task_id, app=celery_app) status = result.status.lower() + # Normalize Celery status names + status_map = { + "pending": "pending", + "started": "running", + "success": "completed", + "failure": "failed", + "retry": "running", + "revoked": "failed", + } + normalized_status = status_map.get(status, status) + run_data = { "run_id": run_id, - "status": status if status != "pending" else "pending", + "status": normalized_status, "celery_task_id": task_id, + "actor_id": task_actor_id, + "recipe": task_recipe, + "inputs": task_inputs, + "output_name": task_output_name, + "created_at": task_created_at, } # If task completed, get result @@ -151,11 +185,13 @@ class RunService: ) for key in keys: run_id = key.decode().replace(self.task_key_prefix, "") if isinstance(key, bytes) else key.replace(self.task_key_prefix, "") - # Check if this run belongs to the user and isn't already in results + # Check if this run isn't already in completed results if not any(r.get("run_id") == run_id for r in runs): run = await self.get_run(run_id) if run and run.get("status") in ("pending", "running"): - pending.append(run) + # Filter by actor_id + if run.get("actor_id") == actor_id: + pending.append(run) if cursor == 0: break @@ -276,11 +312,19 @@ class RunService: return None, "Legacy mode only supports single-input recipes. Use use_dag=true for multi-input." task = render_effect.delay(input_list[0], recipe, output_name) - # Store task_id mapping in Redis (ephemeral) + # Store task mapping in Redis (ephemeral) - includes metadata for list display + task_data = json.dumps({ + "task_id": task.id, + "actor_id": actor_id, + "recipe": recipe, + "inputs": input_list, + "output_name": output_name, + "created_at": datetime.now(timezone.utc).isoformat(), + }) self.redis.setex( f"{self.task_key_prefix}{run_id}", 3600 * 24, # 24 hour TTL - task.id + task_data ) return {