From 449ed0c100894b3379b6e8b6d10eaed3ea2f63f1 Mon Sep 17 00:00:00 2001 From: gilesb Date: Fri, 9 Jan 2026 01:54:17 +0000 Subject: [PATCH] Add comprehensive logging to publishing endpoints Added logging throughout record_run, publish_cache, and _register_asset_impl: - Log start of each operation with key parameters - Log L1 fetch operations and their results - Log IPFS pin operations - Log DB transaction start and completion - Log errors with context - Log successful completion This makes it easier to diagnose timeout and other issues. Co-Authored-By: Claude Opus 4.5 --- server.py | 49 ++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 46 insertions(+), 3 deletions(-) diff --git a/server.py b/server.py index 7832b0b..67c1642 100644 --- a/server.py +++ b/server.py @@ -11,6 +11,7 @@ Manages ownership registry, activities, and federation. import hashlib import json +import logging import os import uuid from contextlib import asynccontextmanager @@ -19,6 +20,13 @@ from pathlib import Path from typing import Optional from urllib.parse import urlparse +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s %(levelname)s %(name)s: %(message)s' +) +logger = logging.getLogger(__name__) + from fastapi import FastAPI, HTTPException, Request, Response, Depends, Cookie from fastapi.responses import JSONResponse, HTMLResponse, RedirectResponse from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials @@ -1668,6 +1676,8 @@ async def _register_asset_impl(req: RegisterRequest, owner: str): import ipfs_client from ipfs_client import IPFSError + logger.info(f"register_asset: Starting for {req.name} (hash={req.content_hash[:16]}...)") + # ===== PHASE 1: VALIDATION ===== # IPFS CID is required if not req.ipfs_cid: @@ -1679,9 +1689,12 @@ async def _register_asset_impl(req: RegisterRequest, owner: str): # ===== PHASE 2: IPFS OPERATIONS (non-blocking) ===== import asyncio + logger.info(f"register_asset: Pinning CID {req.ipfs_cid[:16]}... on IPFS") try: await asyncio.to_thread(ipfs_client.pin_or_raise, req.ipfs_cid) + logger.info("register_asset: CID pinned successfully") except IPFSError as e: + logger.error(f"register_asset: IPFS pin failed: {e}") raise HTTPException(500, f"IPFS operation failed: {e}") # ===== PHASE 3: DB TRANSACTION ===== @@ -1739,9 +1752,10 @@ async def _register_asset_impl(req: RegisterRequest, owner: str): except HTTPException: raise except Exception as e: - logger.error(f"Database transaction failed: {e}") + logger.error(f"register_asset: Database transaction failed: {e}") raise HTTPException(500, f"Failed to register asset: {e}") + logger.info(f"register_asset: Successfully registered {req.name}") return {"asset": created_asset, "activity": created_activity} @@ -1769,13 +1783,17 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us import asyncio l1_url = req.l1_server.rstrip('/') + logger.info(f"record_run: Starting for run_id={req.run_id} from {l1_url}") + # Helper to fetch from L1 without blocking event loop def fetch_l1_run(): + logger.info(f"record_run: Fetching run from L1: {l1_url}/runs/{req.run_id}") resp = requests.get(f"{l1_url}/runs/{req.run_id}", timeout=30) resp.raise_for_status() return resp.json() def fetch_l1_cache(content_hash): + logger.debug(f"record_run: Fetching cache {content_hash[:16]}... from L1") resp = requests.get( f"{l1_url}/cache/{content_hash}", headers={"Accept": "application/json"}, @@ -1787,7 +1805,9 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us # Fetch run from L1 try: run = await asyncio.to_thread(fetch_l1_run) + logger.info(f"record_run: Fetched run, status={run.get('status')}, inputs={len(run.get('inputs', []))}") except Exception as e: + logger.error(f"record_run: Failed to fetch run from L1: {e}") raise HTTPException(400, f"Failed to fetch run from L1 ({l1_url}): {e}") if run.get("status") != "completed": @@ -1798,24 +1818,30 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us raise HTTPException(400, "Run has no output hash") # Fetch output cache info from L1 (must exist - it's new) + logger.info(f"record_run: Fetching output cache {output_hash[:16]}... from L1") try: cache_info = await asyncio.to_thread(fetch_l1_cache, output_hash) output_media_type = cache_info.get("media_type", "image") output_ipfs_cid = cache_info.get("ipfs_cid") + logger.info(f"record_run: Output has IPFS CID: {output_ipfs_cid[:16] if output_ipfs_cid else 'None'}...") except Exception as e: + logger.error(f"record_run: Failed to fetch output cache info: {e}") raise HTTPException(400, f"Failed to fetch output cache info: {e}") if not output_ipfs_cid: + logger.error("record_run: Output has no IPFS CID") raise HTTPException(400, "Output has no IPFS CID - cannot publish") # Gather input info: check L2 first, then fall back to L1 input_hashes = run.get("inputs", []) input_infos = [] # List of {content_hash, ipfs_cid, media_type, existing_asset} + logger.info(f"record_run: Gathering info for {len(input_hashes)} inputs") for input_hash in input_hashes: # Check if already on L2 existing = await db.get_asset_by_hash(input_hash) if existing and existing.get("ipfs_cid"): + logger.info(f"record_run: Input {input_hash[:16]}... found on L2") input_infos.append({ "content_hash": input_hash, "ipfs_cid": existing["ipfs_cid"], @@ -1824,10 +1850,12 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us }) else: # Not on L2, try L1 + logger.info(f"record_run: Input {input_hash[:16]}... not on L2, fetching from L1") try: inp_info = await asyncio.to_thread(fetch_l1_cache, input_hash) ipfs_cid = inp_info.get("ipfs_cid") if not ipfs_cid: + logger.error(f"record_run: Input {input_hash[:16]}... has no IPFS CID") raise HTTPException(400, f"Input {input_hash[:16]}... has no IPFS CID (not on L2 or L1)") input_infos.append({ "content_hash": input_hash, @@ -1838,6 +1866,7 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us except HTTPException: raise except Exception as e: + logger.error(f"record_run: Failed to fetch input {input_hash[:16]}... from L1: {e}") raise HTTPException(400, f"Input {input_hash[:16]}... not on L2 and failed to fetch from L1: {e}") # Prepare recipe data @@ -1856,23 +1885,29 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us # Collect all CIDs to pin (inputs + output) cids_to_pin = [inp["ipfs_cid"] for inp in input_infos] + [output_ipfs_cid] + logger.info(f"record_run: Pinning {len(cids_to_pin)} CIDs on IPFS") # Pin all in parallel with ThreadPoolExecutor(max_workers=5) as executor: futures = {executor.submit(ipfs_client.pin_or_raise, cid): cid for cid in cids_to_pin} for future in as_completed(futures): future.result() # Raises IPFSError if failed + logger.info("record_run: All CIDs pinned successfully") # Store recipe on IPFS and return CID + logger.info("record_run: Storing recipe on IPFS") return ipfs_client.add_json(recipe_data) try: import asyncio recipe_cid = await asyncio.to_thread(do_ipfs_operations) + logger.info(f"record_run: Recipe stored on IPFS: {recipe_cid[:16]}...") except IPFSError as e: + logger.error(f"record_run: IPFS operation failed: {e}") raise HTTPException(500, f"IPFS operation failed: {e}") # ===== PHASE 3: DB TRANSACTION (all-or-nothing) ===== + logger.info("record_run: Starting DB transaction") now = datetime.now(timezone.utc).isoformat() try: @@ -1964,9 +1999,10 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us except HTTPException: raise except Exception as e: - logger.error(f"Database transaction failed: {e}") + logger.error(f"record_run: Database transaction failed: {e}") raise HTTPException(500, f"Failed to record run: {e}") + logger.info(f"record_run: Successfully published {req.output_name} with {len(registered_inputs)} inputs") return {"asset": created_asset, "activity": created_activity} @@ -1982,6 +2018,8 @@ async def publish_cache(req: PublishCacheRequest, user: User = Depends(get_requi import ipfs_client from ipfs_client import IPFSError + logger.info(f"publish_cache: Starting for {req.asset_name} (hash={req.content_hash[:16]}...)") + # ===== PHASE 1: VALIDATION ===== # Validate origin if not req.origin or "type" not in req.origin: @@ -2004,12 +2042,16 @@ async def publish_cache(req: PublishCacheRequest, user: User = Depends(get_requi # ===== PHASE 2: IPFS OPERATIONS (non-blocking) ===== import asyncio + logger.info(f"publish_cache: Pinning CID {req.ipfs_cid[:16]}... on IPFS") try: await asyncio.to_thread(ipfs_client.pin_or_raise, req.ipfs_cid) + logger.info("publish_cache: CID pinned successfully") except IPFSError as e: + logger.error(f"publish_cache: IPFS pin failed: {e}") raise HTTPException(500, f"IPFS operation failed: {e}") # ===== PHASE 3: DB TRANSACTION ===== + logger.info("publish_cache: Starting DB transaction") now = datetime.now(timezone.utc).isoformat() try: @@ -2078,9 +2120,10 @@ async def publish_cache(req: PublishCacheRequest, user: User = Depends(get_requi except HTTPException: raise except Exception as e: - logger.error(f"Database transaction failed: {e}") + logger.error(f"publish_cache: Database transaction failed: {e}") raise HTTPException(500, f"Failed to publish cache item: {e}") + logger.info(f"publish_cache: Successfully published {req.asset_name}") return {"asset": created_asset, "activity": created_activity}