diff --git a/server.py b/server.py index b51c7c7..1818567 100644 --- a/server.py +++ b/server.py @@ -1534,8 +1534,6 @@ async def run_recipe(recipe_id: str, request: RecipeRunRequest, ctx: UserContext # Build DAG from recipe dag = build_dag_from_recipe(yaml_config, request.inputs, recipe) - # Create run - run_id = str(uuid.uuid4()) actor_id = ctx.actor_id # Collect all input hashes @@ -1544,12 +1542,71 @@ async def run_recipe(recipe_id: str, request: RecipeRunRequest, ctx: UserContext if fixed.content_hash: all_inputs.append(fixed.content_hash) + # Compute content-addressable run_id + run_id = compute_run_id(all_inputs, f"recipe:{recipe.name}") + output_name = f"{recipe.name}-{run_id[:8]}" + + # Check L1 cache first + cached_run = await database.get_run_cache(run_id) + if cached_run: + output_hash = cached_run["output_hash"] + if cache_manager.has_content(output_hash): + logger.info(f"run_recipe: Cache hit for run_id={run_id[:16]}...") + return RunStatus( + run_id=run_id, + status="completed", + recipe=f"recipe:{recipe.name}", + inputs=all_inputs, + output_name=output_name, + created_at=cached_run.get("created_at", datetime.now(timezone.utc).isoformat()), + completed_at=cached_run.get("created_at", datetime.now(timezone.utc).isoformat()), + output_hash=output_hash, + username=actor_id, + provenance_cid=cached_run.get("provenance_cid"), + ) + + # Check L2 if not in L1 + l2_server = ctx.l2_server + try: + l2_resp = http_requests.get(f"{l2_server}/assets/by-run-id/{run_id}", timeout=10) + if l2_resp.status_code == 200: + l2_data = l2_resp.json() + output_hash = l2_data.get("output_hash") + ipfs_cid = l2_data.get("ipfs_cid") + if output_hash and ipfs_cid: + logger.info(f"run_recipe: Found on L2, pulling from IPFS") + import ipfs_client + legacy_dir = CACHE_DIR / "legacy" + legacy_dir.mkdir(parents=True, exist_ok=True) + recovery_path = legacy_dir / output_hash + if ipfs_client.get_file(ipfs_cid, str(recovery_path)): + cache_manager._set_content_index(output_hash, output_hash) + cache_manager._set_ipfs_index(output_hash, ipfs_cid) + await database.save_run_cache( + run_id=run_id, output_hash=output_hash, + recipe=f"recipe:{recipe.name}", inputs=all_inputs, + ipfs_cid=ipfs_cid, provenance_cid=l2_data.get("provenance_cid"), + actor_id=actor_id, + ) + return RunStatus( + run_id=run_id, status="completed", + recipe=f"recipe:{recipe.name}", inputs=all_inputs, + output_name=output_name, + created_at=datetime.now(timezone.utc).isoformat(), + completed_at=datetime.now(timezone.utc).isoformat(), + output_hash=output_hash, username=actor_id, + provenance_cid=l2_data.get("provenance_cid"), + ) + except Exception as e: + logger.warning(f"run_recipe: L2 lookup failed: {e}") + + # Not cached - run Celery run = RunStatus( run_id=run_id, status="pending", recipe=f"recipe:{recipe.name}", inputs=all_inputs, - output_name=f"{recipe.name}-{run_id[:8]}", + output_name=output_name, created_at=datetime.now(timezone.utc).isoformat(), username=actor_id ) @@ -3652,6 +3709,7 @@ def render_page(title: str, content: str, actor_id: Optional[str] = None, active Runs Recipes Media + Download Client
@@ -3720,6 +3778,7 @@ def render_ui_html(actor_id: Optional[str] = None, tab: str = "runs") -> str: Runs Recipes Media + Download Client