Add comprehensive logging to publishing endpoints

Added logging throughout record_run, publish_cache, and _register_asset_impl:
- Log start of each operation with key parameters
- Log L1 fetch operations and their results
- Log IPFS pin operations
- Log DB transaction start and completion
- Log errors with context
- Log successful completion

This makes it easier to diagnose timeout and other issues.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-09 01:54:17 +00:00
parent 9ef45e295b
commit 449ed0c100

View File

@@ -11,6 +11,7 @@ Manages ownership registry, activities, and federation.
import hashlib
import json
import logging
import os
import uuid
from contextlib import asynccontextmanager
@@ -19,6 +20,13 @@ from pathlib import Path
from typing import Optional
from urllib.parse import urlparse
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(name)s: %(message)s'
)
logger = logging.getLogger(__name__)
from fastapi import FastAPI, HTTPException, Request, Response, Depends, Cookie
from fastapi.responses import JSONResponse, HTMLResponse, RedirectResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
@@ -1668,6 +1676,8 @@ async def _register_asset_impl(req: RegisterRequest, owner: str):
import ipfs_client
from ipfs_client import IPFSError
logger.info(f"register_asset: Starting for {req.name} (hash={req.content_hash[:16]}...)")
# ===== PHASE 1: VALIDATION =====
# IPFS CID is required
if not req.ipfs_cid:
@@ -1679,9 +1689,12 @@ async def _register_asset_impl(req: RegisterRequest, owner: str):
# ===== PHASE 2: IPFS OPERATIONS (non-blocking) =====
import asyncio
logger.info(f"register_asset: Pinning CID {req.ipfs_cid[:16]}... on IPFS")
try:
await asyncio.to_thread(ipfs_client.pin_or_raise, req.ipfs_cid)
logger.info("register_asset: CID pinned successfully")
except IPFSError as e:
logger.error(f"register_asset: IPFS pin failed: {e}")
raise HTTPException(500, f"IPFS operation failed: {e}")
# ===== PHASE 3: DB TRANSACTION =====
@@ -1739,9 +1752,10 @@ async def _register_asset_impl(req: RegisterRequest, owner: str):
except HTTPException:
raise
except Exception as e:
logger.error(f"Database transaction failed: {e}")
logger.error(f"register_asset: Database transaction failed: {e}")
raise HTTPException(500, f"Failed to register asset: {e}")
logger.info(f"register_asset: Successfully registered {req.name}")
return {"asset": created_asset, "activity": created_activity}
@@ -1769,13 +1783,17 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us
import asyncio
l1_url = req.l1_server.rstrip('/')
logger.info(f"record_run: Starting for run_id={req.run_id} from {l1_url}")
# Helper to fetch from L1 without blocking event loop
def fetch_l1_run():
logger.info(f"record_run: Fetching run from L1: {l1_url}/runs/{req.run_id}")
resp = requests.get(f"{l1_url}/runs/{req.run_id}", timeout=30)
resp.raise_for_status()
return resp.json()
def fetch_l1_cache(content_hash):
logger.debug(f"record_run: Fetching cache {content_hash[:16]}... from L1")
resp = requests.get(
f"{l1_url}/cache/{content_hash}",
headers={"Accept": "application/json"},
@@ -1787,7 +1805,9 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us
# Fetch run from L1
try:
run = await asyncio.to_thread(fetch_l1_run)
logger.info(f"record_run: Fetched run, status={run.get('status')}, inputs={len(run.get('inputs', []))}")
except Exception as e:
logger.error(f"record_run: Failed to fetch run from L1: {e}")
raise HTTPException(400, f"Failed to fetch run from L1 ({l1_url}): {e}")
if run.get("status") != "completed":
@@ -1798,24 +1818,30 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us
raise HTTPException(400, "Run has no output hash")
# Fetch output cache info from L1 (must exist - it's new)
logger.info(f"record_run: Fetching output cache {output_hash[:16]}... from L1")
try:
cache_info = await asyncio.to_thread(fetch_l1_cache, output_hash)
output_media_type = cache_info.get("media_type", "image")
output_ipfs_cid = cache_info.get("ipfs_cid")
logger.info(f"record_run: Output has IPFS CID: {output_ipfs_cid[:16] if output_ipfs_cid else 'None'}...")
except Exception as e:
logger.error(f"record_run: Failed to fetch output cache info: {e}")
raise HTTPException(400, f"Failed to fetch output cache info: {e}")
if not output_ipfs_cid:
logger.error("record_run: Output has no IPFS CID")
raise HTTPException(400, "Output has no IPFS CID - cannot publish")
# Gather input info: check L2 first, then fall back to L1
input_hashes = run.get("inputs", [])
input_infos = [] # List of {content_hash, ipfs_cid, media_type, existing_asset}
logger.info(f"record_run: Gathering info for {len(input_hashes)} inputs")
for input_hash in input_hashes:
# Check if already on L2
existing = await db.get_asset_by_hash(input_hash)
if existing and existing.get("ipfs_cid"):
logger.info(f"record_run: Input {input_hash[:16]}... found on L2")
input_infos.append({
"content_hash": input_hash,
"ipfs_cid": existing["ipfs_cid"],
@@ -1824,10 +1850,12 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us
})
else:
# Not on L2, try L1
logger.info(f"record_run: Input {input_hash[:16]}... not on L2, fetching from L1")
try:
inp_info = await asyncio.to_thread(fetch_l1_cache, input_hash)
ipfs_cid = inp_info.get("ipfs_cid")
if not ipfs_cid:
logger.error(f"record_run: Input {input_hash[:16]}... has no IPFS CID")
raise HTTPException(400, f"Input {input_hash[:16]}... has no IPFS CID (not on L2 or L1)")
input_infos.append({
"content_hash": input_hash,
@@ -1838,6 +1866,7 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us
except HTTPException:
raise
except Exception as e:
logger.error(f"record_run: Failed to fetch input {input_hash[:16]}... from L1: {e}")
raise HTTPException(400, f"Input {input_hash[:16]}... not on L2 and failed to fetch from L1: {e}")
# Prepare recipe data
@@ -1856,23 +1885,29 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us
# Collect all CIDs to pin (inputs + output)
cids_to_pin = [inp["ipfs_cid"] for inp in input_infos] + [output_ipfs_cid]
logger.info(f"record_run: Pinning {len(cids_to_pin)} CIDs on IPFS")
# Pin all in parallel
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(ipfs_client.pin_or_raise, cid): cid for cid in cids_to_pin}
for future in as_completed(futures):
future.result() # Raises IPFSError if failed
logger.info("record_run: All CIDs pinned successfully")
# Store recipe on IPFS and return CID
logger.info("record_run: Storing recipe on IPFS")
return ipfs_client.add_json(recipe_data)
try:
import asyncio
recipe_cid = await asyncio.to_thread(do_ipfs_operations)
logger.info(f"record_run: Recipe stored on IPFS: {recipe_cid[:16]}...")
except IPFSError as e:
logger.error(f"record_run: IPFS operation failed: {e}")
raise HTTPException(500, f"IPFS operation failed: {e}")
# ===== PHASE 3: DB TRANSACTION (all-or-nothing) =====
logger.info("record_run: Starting DB transaction")
now = datetime.now(timezone.utc).isoformat()
try:
@@ -1964,9 +1999,10 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us
except HTTPException:
raise
except Exception as e:
logger.error(f"Database transaction failed: {e}")
logger.error(f"record_run: Database transaction failed: {e}")
raise HTTPException(500, f"Failed to record run: {e}")
logger.info(f"record_run: Successfully published {req.output_name} with {len(registered_inputs)} inputs")
return {"asset": created_asset, "activity": created_activity}
@@ -1982,6 +2018,8 @@ async def publish_cache(req: PublishCacheRequest, user: User = Depends(get_requi
import ipfs_client
from ipfs_client import IPFSError
logger.info(f"publish_cache: Starting for {req.asset_name} (hash={req.content_hash[:16]}...)")
# ===== PHASE 1: VALIDATION =====
# Validate origin
if not req.origin or "type" not in req.origin:
@@ -2004,12 +2042,16 @@ async def publish_cache(req: PublishCacheRequest, user: User = Depends(get_requi
# ===== PHASE 2: IPFS OPERATIONS (non-blocking) =====
import asyncio
logger.info(f"publish_cache: Pinning CID {req.ipfs_cid[:16]}... on IPFS")
try:
await asyncio.to_thread(ipfs_client.pin_or_raise, req.ipfs_cid)
logger.info("publish_cache: CID pinned successfully")
except IPFSError as e:
logger.error(f"publish_cache: IPFS pin failed: {e}")
raise HTTPException(500, f"IPFS operation failed: {e}")
# ===== PHASE 3: DB TRANSACTION =====
logger.info("publish_cache: Starting DB transaction")
now = datetime.now(timezone.utc).isoformat()
try:
@@ -2078,9 +2120,10 @@ async def publish_cache(req: PublishCacheRequest, user: User = Depends(get_requi
except HTTPException:
raise
except Exception as e:
logger.error(f"Database transaction failed: {e}")
logger.error(f"publish_cache: Database transaction failed: {e}")
raise HTTPException(500, f"Failed to publish cache item: {e}")
logger.info(f"publish_cache: Successfully published {req.asset_name}")
return {"asset": created_asset, "activity": created_activity}