From eb1de433b1229b8d20ac8500fd8dc267dd97e171 Mon Sep 17 00:00:00 2001 From: gilesb Date: Sun, 11 Jan 2026 21:43:44 +0000 Subject: [PATCH] Get actor_id from pending_runs when saving completed run When a DAG task completes, look up actor_id from pending_runs (where it was saved when the run started) and include it in run_cache. Also clean up pending_runs entry after completion. Co-Authored-By: Claude Opus 4.5 --- legacy_tasks.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/legacy_tasks.py b/legacy_tasks.py index 72405f0..fe26f42 100644 --- a/legacy_tasks.py +++ b/legacy_tasks.py @@ -383,13 +383,23 @@ def execute_dag(self, dag_json: str, run_id: str = None) -> dict: if (node.node_type == NodeType.SOURCE or str(node.node_type) == "SOURCE") and node.config.get("content_hash") ] + # Get actor_id from pending_runs (saved when run started) + actor_id = None + pending = await database.get_pending_run(run_id) + if pending: + actor_id = pending.get("actor_id") + await database.save_run_cache( run_id=run_id, output_hash=output_hash, recipe="dag", inputs=input_hashes_for_db, ipfs_cid=output_ipfs_cid, + actor_id=actor_id, ) + # Clean up pending run + if pending: + await database.complete_pending_run(run_id) try: loop = asyncio.get_event_loop()