diff --git a/server.py b/server.py index 32bfd4e..7832b0b 100644 --- a/server.py +++ b/server.py @@ -1677,9 +1677,10 @@ async def _register_asset_impl(req: RegisterRequest, owner: str): if await db.asset_exists(req.name): raise HTTPException(400, f"Asset already exists: {req.name}") - # ===== PHASE 2: IPFS OPERATIONS ===== + # ===== PHASE 2: IPFS OPERATIONS (non-blocking) ===== + import asyncio try: - ipfs_client.pin_or_raise(req.ipfs_cid) + await asyncio.to_thread(ipfs_client.pin_or_raise, req.ipfs_cid) except IPFSError as e: raise HTTPException(500, f"IPFS operation failed: {e}") @@ -1764,14 +1765,28 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us import ipfs_client from ipfs_client import IPFSError - # ===== PHASE 1: PREPARATION (read-only) ===== + # ===== PHASE 1: PREPARATION (read-only, non-blocking) ===== + import asyncio l1_url = req.l1_server.rstrip('/') + # Helper to fetch from L1 without blocking event loop + def fetch_l1_run(): + resp = requests.get(f"{l1_url}/runs/{req.run_id}", timeout=30) + resp.raise_for_status() + return resp.json() + + def fetch_l1_cache(content_hash): + resp = requests.get( + f"{l1_url}/cache/{content_hash}", + headers={"Accept": "application/json"}, + timeout=10 + ) + resp.raise_for_status() + return resp.json() + # Fetch run from L1 try: - resp = requests.get(f"{l1_url}/runs/{req.run_id}", timeout=30) - resp.raise_for_status() - run = resp.json() + run = await asyncio.to_thread(fetch_l1_run) except Exception as e: raise HTTPException(400, f"Failed to fetch run from L1 ({l1_url}): {e}") @@ -1784,13 +1799,7 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us # Fetch output cache info from L1 (must exist - it's new) try: - cache_resp = requests.get( - f"{l1_url}/cache/{output_hash}", - headers={"Accept": "application/json"}, - timeout=10 - ) - cache_resp.raise_for_status() - cache_info = cache_resp.json() + cache_info = await asyncio.to_thread(fetch_l1_cache, output_hash) output_media_type = cache_info.get("media_type", "image") output_ipfs_cid = cache_info.get("ipfs_cid") except Exception as e: @@ -1816,13 +1825,7 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us else: # Not on L2, try L1 try: - inp_resp = requests.get( - f"{l1_url}/cache/{input_hash}", - headers={"Accept": "application/json"}, - timeout=10 - ) - inp_resp.raise_for_status() - inp_info = inp_resp.json() + inp_info = await asyncio.to_thread(fetch_l1_cache, input_hash) ipfs_cid = inp_info.get("ipfs_cid") if not ipfs_cid: raise HTTPException(400, f"Input {input_hash[:16]}... has no IPFS CID (not on L2 or L1)") @@ -1846,8 +1849,9 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us "effects_commit": run.get("effects_commit"), } - # ===== PHASE 2: IPFS OPERATIONS (blocking, before any DB changes) ===== - try: + # ===== PHASE 2: IPFS OPERATIONS (non-blocking for event loop) ===== + def do_ipfs_operations(): + """Run IPFS operations in thread pool to not block event loop.""" from concurrent.futures import ThreadPoolExecutor, as_completed # Collect all CIDs to pin (inputs + output) @@ -1857,12 +1861,14 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us with ThreadPoolExecutor(max_workers=5) as executor: futures = {executor.submit(ipfs_client.pin_or_raise, cid): cid for cid in cids_to_pin} for future in as_completed(futures): - cid = futures[future] future.result() # Raises IPFSError if failed - # Store recipe on IPFS - recipe_cid = ipfs_client.add_json(recipe_data) + # Store recipe on IPFS and return CID + return ipfs_client.add_json(recipe_data) + try: + import asyncio + recipe_cid = await asyncio.to_thread(do_ipfs_operations) except IPFSError as e: raise HTTPException(500, f"IPFS operation failed: {e}") @@ -1996,9 +2002,10 @@ async def publish_cache(req: PublishCacheRequest, user: User = Depends(get_requi if await db.asset_exists(req.asset_name): raise HTTPException(400, f"Asset name already exists: {req.asset_name}") - # ===== PHASE 2: IPFS OPERATIONS ===== + # ===== PHASE 2: IPFS OPERATIONS (non-blocking) ===== + import asyncio try: - ipfs_client.pin_or_raise(req.ipfs_cid) + await asyncio.to_thread(ipfs_client.pin_or_raise, req.ipfs_cid) except IPFSError as e: raise HTTPException(500, f"IPFS operation failed: {e}") @@ -2305,6 +2312,9 @@ async def get_anchor_endpoint(merkle_root: str): @app.get("/anchors/{merkle_root}/tree") async def get_anchor_tree(merkle_root: str): """Get the full merkle tree from IPFS.""" + import asyncio + import ipfs_client + anchor = await db.get_anchor(merkle_root) if not anchor: raise HTTPException(404, f"Anchor not found: {merkle_root}") @@ -2313,9 +2323,8 @@ async def get_anchor_tree(merkle_root: str): if not tree_cid: raise HTTPException(404, "Anchor has no tree on IPFS") - import ipfs_client try: - tree_bytes = ipfs_client.get_bytes(tree_cid) + tree_bytes = await asyncio.to_thread(ipfs_client.get_bytes, tree_cid) if tree_bytes: return json.loads(tree_bytes) except Exception as e: @@ -2346,13 +2355,14 @@ async def verify_activity_anchor(activity_id: str): if not anchor: return {"verified": False, "reason": "Anchor record not found"} - # Get tree from IPFS + # Get tree from IPFS (non-blocking) + import asyncio tree_cid = anchor.get("tree_ipfs_cid") if not tree_cid: return {"verified": False, "reason": "Merkle tree not on IPFS"} try: - tree_bytes = ipfs_client.get_bytes(tree_cid) + tree_bytes = await asyncio.to_thread(ipfs_client.get_bytes, tree_cid) tree = json.loads(tree_bytes) if tree_bytes else None except Exception: return {"verified": False, "reason": "Failed to fetch tree from IPFS"}