- Update save_run_cache to also update actor_id, recipe, inputs on conflict - Add logging for actor_id when saving runs to run_cache - Add admin endpoint DELETE /runs/admin/purge-failed to delete all failed runs Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
84 lines
2.4 KiB
Python
84 lines
2.4 KiB
Python
"""
|
|
Background IPFS upload task.
|
|
|
|
Uploads files to IPFS in the background after initial local storage.
|
|
This allows fast uploads while still getting IPFS CIDs eventually.
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
# Add parent directory to path for imports
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from celery_app import app
|
|
import ipfs_client
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@app.task(bind=True, max_retries=3, default_retry_delay=60)
|
|
def upload_to_ipfs(self, local_cid: str, actor_id: str) -> Optional[str]:
|
|
"""
|
|
Upload a locally cached file to IPFS in the background.
|
|
|
|
Args:
|
|
local_cid: The local content hash of the file
|
|
actor_id: The user who uploaded the file
|
|
|
|
Returns:
|
|
IPFS CID if successful, None if failed
|
|
"""
|
|
from cache_manager import get_cache_manager
|
|
import asyncio
|
|
import database
|
|
|
|
logger.info(f"Background IPFS upload starting for {local_cid[:16]}...")
|
|
|
|
try:
|
|
cache_mgr = get_cache_manager()
|
|
|
|
# Get the file path from local cache
|
|
file_path = cache_mgr.get_by_cid(local_cid)
|
|
if not file_path or not file_path.exists():
|
|
logger.error(f"File not found for local CID {local_cid[:16]}...")
|
|
return None
|
|
|
|
# Upload to IPFS
|
|
logger.info(f"Uploading {file_path} to IPFS...")
|
|
ipfs_cid = ipfs_client.add_file(file_path)
|
|
|
|
if not ipfs_cid:
|
|
logger.error(f"IPFS upload failed for {local_cid[:16]}...")
|
|
raise self.retry(exc=Exception("IPFS upload failed"))
|
|
|
|
logger.info(f"IPFS upload successful: {local_cid[:16]}... -> {ipfs_cid[:16]}...")
|
|
|
|
# Update database with IPFS CID
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
# Initialize database pool if needed
|
|
loop.run_until_complete(database.init_pool())
|
|
|
|
# Update cache_items table
|
|
loop.run_until_complete(
|
|
database.update_cache_item_ipfs_cid(local_cid, ipfs_cid)
|
|
)
|
|
|
|
# Create index from IPFS CID to local cache
|
|
cache_mgr._set_content_index(ipfs_cid, local_cid)
|
|
|
|
logger.info(f"Database updated with IPFS CID for {local_cid[:16]}...")
|
|
finally:
|
|
loop.close()
|
|
|
|
return ipfs_cid
|
|
|
|
except Exception as e:
|
|
logger.error(f"Background IPFS upload error: {e}")
|
|
raise self.retry(exc=e)
|