Add timing logs to runs and cache endpoints
Log timestamps for each operation in get_run and get_cached to diagnose slowness: - load_run from Redis - Celery AsyncResult and task.ready() - cache_file operations - database queries for cache metadata Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
41
server.py
41
server.py
@@ -11,13 +11,22 @@ Manages rendering runs and provides access to the cache.
|
|||||||
import base64
|
import base64
|
||||||
import hashlib
|
import hashlib
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s %(levelname)s %(name)s: %(message)s'
|
||||||
|
)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
from fastapi import FastAPI, HTTPException, UploadFile, File, Depends, Form, Request
|
from fastapi import FastAPI, HTTPException, UploadFile, File, Depends, Form, Request
|
||||||
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse
|
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse
|
||||||
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||||||
@@ -577,15 +586,27 @@ async def create_run(request: RunRequest, ctx: UserContext = Depends(get_require
|
|||||||
@app.get("/runs/{run_id}", response_model=RunStatus)
|
@app.get("/runs/{run_id}", response_model=RunStatus)
|
||||||
async def get_run(run_id: str):
|
async def get_run(run_id: str):
|
||||||
"""Get status of a run."""
|
"""Get status of a run."""
|
||||||
|
start = time.time()
|
||||||
|
logger.info(f"get_run: Starting for {run_id}")
|
||||||
|
|
||||||
|
t0 = time.time()
|
||||||
run = load_run(run_id)
|
run = load_run(run_id)
|
||||||
|
logger.info(f"get_run: load_run took {time.time()-t0:.3f}s, status={run.status if run else 'None'}")
|
||||||
|
|
||||||
if not run:
|
if not run:
|
||||||
raise HTTPException(404, f"Run {run_id} not found")
|
raise HTTPException(404, f"Run {run_id} not found")
|
||||||
|
|
||||||
# Check Celery task status if running
|
# Check Celery task status if running
|
||||||
if run.status == "running" and run.celery_task_id:
|
if run.status == "running" and run.celery_task_id:
|
||||||
|
t0 = time.time()
|
||||||
task = celery_app.AsyncResult(run.celery_task_id)
|
task = celery_app.AsyncResult(run.celery_task_id)
|
||||||
|
logger.info(f"get_run: AsyncResult took {time.time()-t0:.3f}s")
|
||||||
|
|
||||||
if task.ready():
|
t0 = time.time()
|
||||||
|
is_ready = task.ready()
|
||||||
|
logger.info(f"get_run: task.ready() took {time.time()-t0:.3f}s, ready={is_ready}")
|
||||||
|
|
||||||
|
if is_ready:
|
||||||
if task.successful():
|
if task.successful():
|
||||||
result = task.result
|
result = task.result
|
||||||
run.status = "completed"
|
run.status = "completed"
|
||||||
@@ -612,7 +633,9 @@ async def get_run(run_id: str):
|
|||||||
|
|
||||||
# Cache the output (legacy mode - DAG already caches via cache_manager)
|
# Cache the output (legacy mode - DAG already caches via cache_manager)
|
||||||
if output_path and output_path.exists() and "output_hash" not in result:
|
if output_path and output_path.exists() and "output_hash" not in result:
|
||||||
|
t0 = time.time()
|
||||||
await cache_file(output_path, node_type="effect_output")
|
await cache_file(output_path, node_type="effect_output")
|
||||||
|
logger.info(f"get_run: cache_file took {time.time()-t0:.3f}s")
|
||||||
|
|
||||||
# Record activity for deletion tracking (legacy mode)
|
# Record activity for deletion tracking (legacy mode)
|
||||||
if run.output_hash and run.inputs:
|
if run.output_hash and run.inputs:
|
||||||
@@ -626,8 +649,11 @@ async def get_run(run_id: str):
|
|||||||
run.error = str(task.result)
|
run.error = str(task.result)
|
||||||
|
|
||||||
# Save updated status
|
# Save updated status
|
||||||
|
t0 = time.time()
|
||||||
save_run(run)
|
save_run(run)
|
||||||
|
logger.info(f"get_run: save_run took {time.time()-t0:.3f}s")
|
||||||
|
|
||||||
|
logger.info(f"get_run: Total time {time.time()-start:.3f}s")
|
||||||
return run
|
return run
|
||||||
|
|
||||||
|
|
||||||
@@ -1656,23 +1682,34 @@ async def ui_discard_recipe(recipe_id: str, request: Request):
|
|||||||
@app.get("/cache/{content_hash}")
|
@app.get("/cache/{content_hash}")
|
||||||
async def get_cached(content_hash: str, request: Request):
|
async def get_cached(content_hash: str, request: Request):
|
||||||
"""Get cached content by hash. Content negotiation: HTML for browsers, JSON for APIs, file for downloads."""
|
"""Get cached content by hash. Content negotiation: HTML for browsers, JSON for APIs, file for downloads."""
|
||||||
|
start = time.time()
|
||||||
|
accept = request.headers.get("accept", "")
|
||||||
|
logger.info(f"get_cached: {content_hash[:16]}... Accept={accept[:50]}")
|
||||||
|
|
||||||
ctx = get_user_context_from_cookie(request)
|
ctx = get_user_context_from_cookie(request)
|
||||||
cache_path = get_cache_path(content_hash)
|
cache_path = get_cache_path(content_hash)
|
||||||
|
|
||||||
if not cache_path:
|
if not cache_path:
|
||||||
|
logger.info(f"get_cached: Not found, took {time.time()-start:.3f}s")
|
||||||
if wants_html(request):
|
if wants_html(request):
|
||||||
content = f'<p class="text-red-400">Content not found: {content_hash}</p>'
|
content = f'<p class="text-red-400">Content not found: {content_hash}</p>'
|
||||||
return HTMLResponse(render_page("Not Found", content, ctx.actor_id if ctx else None, active_tab="media"), status_code=404)
|
return HTMLResponse(render_page("Not Found", content, ctx.actor_id if ctx else None, active_tab="media"), status_code=404)
|
||||||
raise HTTPException(404, f"Content {content_hash} not in cache")
|
raise HTTPException(404, f"Content {content_hash} not in cache")
|
||||||
|
|
||||||
# JSON response for API clients
|
# JSON response for API clients
|
||||||
accept = request.headers.get("accept", "")
|
|
||||||
if "application/json" in accept:
|
if "application/json" in accept:
|
||||||
|
t0 = time.time()
|
||||||
meta = await database.load_item_metadata(content_hash, ctx.actor_id if ctx else None)
|
meta = await database.load_item_metadata(content_hash, ctx.actor_id if ctx else None)
|
||||||
|
logger.debug(f"get_cached: load_item_metadata took {time.time()-t0:.3f}s")
|
||||||
|
|
||||||
|
t0 = time.time()
|
||||||
cache_item = await database.get_cache_item(content_hash)
|
cache_item = await database.get_cache_item(content_hash)
|
||||||
|
logger.debug(f"get_cached: get_cache_item took {time.time()-t0:.3f}s")
|
||||||
|
|
||||||
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
|
ipfs_cid = cache_item.get("ipfs_cid") if cache_item else None
|
||||||
file_size = cache_path.stat().st_size
|
file_size = cache_path.stat().st_size
|
||||||
media_type = detect_media_type(cache_path)
|
media_type = detect_media_type(cache_path)
|
||||||
|
logger.info(f"get_cached: JSON response, ipfs_cid={ipfs_cid[:16] if ipfs_cid else 'None'}..., took {time.time()-start:.3f}s")
|
||||||
return {
|
return {
|
||||||
"content_hash": content_hash,
|
"content_hash": content_hash,
|
||||||
"size": file_size,
|
"size": file_size,
|
||||||
|
|||||||
Reference in New Issue
Block a user