From 9a1ed2adead0168225690495859bf12703cd82b3 Mon Sep 17 00:00:00 2001 From: gilesb Date: Mon, 12 Jan 2026 21:15:08 +0000 Subject: [PATCH] Fetch source content from IPFS if not in local cache When the Celery worker can't find source content in the local cache, fetch it from IPFS using the CID. This ensures workers can execute DAGs even when they don't share the same filesystem as the web server. Co-Authored-By: Claude Opus 4.5 --- legacy_tasks.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/legacy_tasks.py b/legacy_tasks.py index 5c66766..0dbfa6c 100644 --- a/legacy_tasks.py +++ b/legacy_tasks.py @@ -126,15 +126,25 @@ class SourceExecutor(Executor): raise ValueError("SOURCE node requires cid in config") # Look up in cache - source_path = CACHE_DIR / cid - if not source_path.exists(): - # Try nodes directory - from cache_manager import get_cache_manager - cache_manager = get_cache_manager() - source_path = cache_manager.get_by_cid(cid) + from cache_manager import get_cache_manager + cache_manager = get_cache_manager() + source_path = cache_manager.get_by_cid(cid) if not source_path or not source_path.exists(): - raise ValueError(f"Source content not in cache: {cid}") + # Not in cache - fetch from IPFS + import logging + logger = logging.getLogger(__name__) + logger.info(f"SOURCE {cid[:16]}... not in cache, fetching from IPFS") + + import ipfs_client + fetch_path = CACHE_DIR / "ipfs_fetch" / cid + fetch_path.parent.mkdir(parents=True, exist_ok=True) + + if ipfs_client.get_file(cid, str(fetch_path)): + logger.info(f"SOURCE {cid[:16]}... fetched from IPFS to {fetch_path}") + source_path = fetch_path + else: + raise ValueError(f"Source content not in cache and IPFS fetch failed: {cid}") # For source nodes, we just return the path (no transformation) # The engine will use this as input to subsequent nodes