Files
celery/tasks/ipfs_upload.py
giles d20eef76ad Fix completed runs not appearing in list + add purge-failed endpoint
- Update save_run_cache to also update actor_id, recipe, inputs on conflict
- Add logging for actor_id when saving runs to run_cache
- Add admin endpoint DELETE /runs/admin/purge-failed to delete all failed runs

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 23:24:39 +00:00

84 lines
2.4 KiB
Python

"""
Background IPFS upload task.
Uploads files to IPFS in the background after initial local storage.
This allows fast uploads while still getting IPFS CIDs eventually.
"""
import logging
import os
import sys
from pathlib import Path
from typing import Optional
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from celery_app import app
import ipfs_client
logger = logging.getLogger(__name__)
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def upload_to_ipfs(self, local_cid: str, actor_id: str) -> Optional[str]:
"""
Upload a locally cached file to IPFS in the background.
Args:
local_cid: The local content hash of the file
actor_id: The user who uploaded the file
Returns:
IPFS CID if successful, None if failed
"""
from cache_manager import get_cache_manager
import asyncio
import database
logger.info(f"Background IPFS upload starting for {local_cid[:16]}...")
try:
cache_mgr = get_cache_manager()
# Get the file path from local cache
file_path = cache_mgr.get_by_cid(local_cid)
if not file_path or not file_path.exists():
logger.error(f"File not found for local CID {local_cid[:16]}...")
return None
# Upload to IPFS
logger.info(f"Uploading {file_path} to IPFS...")
ipfs_cid = ipfs_client.add_file(file_path)
if not ipfs_cid:
logger.error(f"IPFS upload failed for {local_cid[:16]}...")
raise self.retry(exc=Exception("IPFS upload failed"))
logger.info(f"IPFS upload successful: {local_cid[:16]}... -> {ipfs_cid[:16]}...")
# Update database with IPFS CID
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# Initialize database pool if needed
loop.run_until_complete(database.init_pool())
# Update cache_items table
loop.run_until_complete(
database.update_cache_item_ipfs_cid(local_cid, ipfs_cid)
)
# Create index from IPFS CID to local cache
cache_mgr._set_content_index(ipfs_cid, local_cid)
logger.info(f"Database updated with IPFS CID for {local_cid[:16]}...")
finally:
loop.close()
return ipfs_cid
except Exception as e:
logger.error(f"Background IPFS upload error: {e}")
raise self.retry(exc=e)