Make all IPFS and L1 operations non-blocking

Wrap synchronous requests.get() and ipfs_client calls in
asyncio.to_thread() to prevent blocking the FastAPI event loop.
This fixes web UI slowness during publishing operations.

- record_run: L1 fetches and IPFS operations now async
- _register_asset_impl: IPFS pin now async
- publish_cache: IPFS pin now async
- get_anchor_tree: IPFS get_bytes now async
- verify_activity_anchor: IPFS get_bytes now async

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-09 01:42:15 +00:00
parent 7ead2026ef
commit 9ef45e295b

View File

@@ -1677,9 +1677,10 @@ async def _register_asset_impl(req: RegisterRequest, owner: str):
if await db.asset_exists(req.name):
raise HTTPException(400, f"Asset already exists: {req.name}")
# ===== PHASE 2: IPFS OPERATIONS =====
# ===== PHASE 2: IPFS OPERATIONS (non-blocking) =====
import asyncio
try:
ipfs_client.pin_or_raise(req.ipfs_cid)
await asyncio.to_thread(ipfs_client.pin_or_raise, req.ipfs_cid)
except IPFSError as e:
raise HTTPException(500, f"IPFS operation failed: {e}")
@@ -1764,14 +1765,28 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us
import ipfs_client
from ipfs_client import IPFSError
# ===== PHASE 1: PREPARATION (read-only) =====
# ===== PHASE 1: PREPARATION (read-only, non-blocking) =====
import asyncio
l1_url = req.l1_server.rstrip('/')
# Helper to fetch from L1 without blocking event loop
def fetch_l1_run():
resp = requests.get(f"{l1_url}/runs/{req.run_id}", timeout=30)
resp.raise_for_status()
return resp.json()
def fetch_l1_cache(content_hash):
resp = requests.get(
f"{l1_url}/cache/{content_hash}",
headers={"Accept": "application/json"},
timeout=10
)
resp.raise_for_status()
return resp.json()
# Fetch run from L1
try:
resp = requests.get(f"{l1_url}/runs/{req.run_id}", timeout=30)
resp.raise_for_status()
run = resp.json()
run = await asyncio.to_thread(fetch_l1_run)
except Exception as e:
raise HTTPException(400, f"Failed to fetch run from L1 ({l1_url}): {e}")
@@ -1784,13 +1799,7 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us
# Fetch output cache info from L1 (must exist - it's new)
try:
cache_resp = requests.get(
f"{l1_url}/cache/{output_hash}",
headers={"Accept": "application/json"},
timeout=10
)
cache_resp.raise_for_status()
cache_info = cache_resp.json()
cache_info = await asyncio.to_thread(fetch_l1_cache, output_hash)
output_media_type = cache_info.get("media_type", "image")
output_ipfs_cid = cache_info.get("ipfs_cid")
except Exception as e:
@@ -1816,13 +1825,7 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us
else:
# Not on L2, try L1
try:
inp_resp = requests.get(
f"{l1_url}/cache/{input_hash}",
headers={"Accept": "application/json"},
timeout=10
)
inp_resp.raise_for_status()
inp_info = inp_resp.json()
inp_info = await asyncio.to_thread(fetch_l1_cache, input_hash)
ipfs_cid = inp_info.get("ipfs_cid")
if not ipfs_cid:
raise HTTPException(400, f"Input {input_hash[:16]}... has no IPFS CID (not on L2 or L1)")
@@ -1846,8 +1849,9 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us
"effects_commit": run.get("effects_commit"),
}
# ===== PHASE 2: IPFS OPERATIONS (blocking, before any DB changes) =====
try:
# ===== PHASE 2: IPFS OPERATIONS (non-blocking for event loop) =====
def do_ipfs_operations():
"""Run IPFS operations in thread pool to not block event loop."""
from concurrent.futures import ThreadPoolExecutor, as_completed
# Collect all CIDs to pin (inputs + output)
@@ -1857,12 +1861,14 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(ipfs_client.pin_or_raise, cid): cid for cid in cids_to_pin}
for future in as_completed(futures):
cid = futures[future]
future.result() # Raises IPFSError if failed
# Store recipe on IPFS
recipe_cid = ipfs_client.add_json(recipe_data)
# Store recipe on IPFS and return CID
return ipfs_client.add_json(recipe_data)
try:
import asyncio
recipe_cid = await asyncio.to_thread(do_ipfs_operations)
except IPFSError as e:
raise HTTPException(500, f"IPFS operation failed: {e}")
@@ -1996,9 +2002,10 @@ async def publish_cache(req: PublishCacheRequest, user: User = Depends(get_requi
if await db.asset_exists(req.asset_name):
raise HTTPException(400, f"Asset name already exists: {req.asset_name}")
# ===== PHASE 2: IPFS OPERATIONS =====
# ===== PHASE 2: IPFS OPERATIONS (non-blocking) =====
import asyncio
try:
ipfs_client.pin_or_raise(req.ipfs_cid)
await asyncio.to_thread(ipfs_client.pin_or_raise, req.ipfs_cid)
except IPFSError as e:
raise HTTPException(500, f"IPFS operation failed: {e}")
@@ -2305,6 +2312,9 @@ async def get_anchor_endpoint(merkle_root: str):
@app.get("/anchors/{merkle_root}/tree")
async def get_anchor_tree(merkle_root: str):
"""Get the full merkle tree from IPFS."""
import asyncio
import ipfs_client
anchor = await db.get_anchor(merkle_root)
if not anchor:
raise HTTPException(404, f"Anchor not found: {merkle_root}")
@@ -2313,9 +2323,8 @@ async def get_anchor_tree(merkle_root: str):
if not tree_cid:
raise HTTPException(404, "Anchor has no tree on IPFS")
import ipfs_client
try:
tree_bytes = ipfs_client.get_bytes(tree_cid)
tree_bytes = await asyncio.to_thread(ipfs_client.get_bytes, tree_cid)
if tree_bytes:
return json.loads(tree_bytes)
except Exception as e:
@@ -2346,13 +2355,14 @@ async def verify_activity_anchor(activity_id: str):
if not anchor:
return {"verified": False, "reason": "Anchor record not found"}
# Get tree from IPFS
# Get tree from IPFS (non-blocking)
import asyncio
tree_cid = anchor.get("tree_ipfs_cid")
if not tree_cid:
return {"verified": False, "reason": "Merkle tree not on IPFS"}
try:
tree_bytes = ipfs_client.get_bytes(tree_cid)
tree_bytes = await asyncio.to_thread(ipfs_client.get_bytes, tree_cid)
tree = json.loads(tree_bytes) if tree_bytes else None
except Exception:
return {"verified": False, "reason": "Failed to fetch tree from IPFS"}