Fetch source content from IPFS if not in local cache
When the Celery worker can't find source content in the local cache, fetch it from IPFS using the CID. This ensures workers can execute DAGs even when they don't share the same filesystem as the web server. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -126,15 +126,25 @@ class SourceExecutor(Executor):
|
||||
raise ValueError("SOURCE node requires cid in config")
|
||||
|
||||
# Look up in cache
|
||||
source_path = CACHE_DIR / cid
|
||||
if not source_path.exists():
|
||||
# Try nodes directory
|
||||
from cache_manager import get_cache_manager
|
||||
cache_manager = get_cache_manager()
|
||||
source_path = cache_manager.get_by_cid(cid)
|
||||
from cache_manager import get_cache_manager
|
||||
cache_manager = get_cache_manager()
|
||||
source_path = cache_manager.get_by_cid(cid)
|
||||
|
||||
if not source_path or not source_path.exists():
|
||||
raise ValueError(f"Source content not in cache: {cid}")
|
||||
# Not in cache - fetch from IPFS
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.info(f"SOURCE {cid[:16]}... not in cache, fetching from IPFS")
|
||||
|
||||
import ipfs_client
|
||||
fetch_path = CACHE_DIR / "ipfs_fetch" / cid
|
||||
fetch_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if ipfs_client.get_file(cid, str(fetch_path)):
|
||||
logger.info(f"SOURCE {cid[:16]}... fetched from IPFS to {fetch_path}")
|
||||
source_path = fetch_path
|
||||
else:
|
||||
raise ValueError(f"Source content not in cache and IPFS fetch failed: {cid}")
|
||||
|
||||
# For source nodes, we just return the path (no transformation)
|
||||
# The engine will use this as input to subsequent nodes
|
||||
|
||||
Reference in New Issue
Block a user