diff --git a/legacy_tasks.py b/legacy_tasks.py index 5c66766..0dbfa6c 100644 --- a/legacy_tasks.py +++ b/legacy_tasks.py @@ -126,15 +126,25 @@ class SourceExecutor(Executor): raise ValueError("SOURCE node requires cid in config") # Look up in cache - source_path = CACHE_DIR / cid - if not source_path.exists(): - # Try nodes directory - from cache_manager import get_cache_manager - cache_manager = get_cache_manager() - source_path = cache_manager.get_by_cid(cid) + from cache_manager import get_cache_manager + cache_manager = get_cache_manager() + source_path = cache_manager.get_by_cid(cid) if not source_path or not source_path.exists(): - raise ValueError(f"Source content not in cache: {cid}") + # Not in cache - fetch from IPFS + import logging + logger = logging.getLogger(__name__) + logger.info(f"SOURCE {cid[:16]}... not in cache, fetching from IPFS") + + import ipfs_client + fetch_path = CACHE_DIR / "ipfs_fetch" / cid + fetch_path.parent.mkdir(parents=True, exist_ok=True) + + if ipfs_client.get_file(cid, str(fetch_path)): + logger.info(f"SOURCE {cid[:16]}... fetched from IPFS to {fetch_path}") + source_path = fetch_path + else: + raise ValueError(f"Source content not in cache and IPFS fetch failed: {cid}") # For source nodes, we just return the path (no transformation) # The engine will use this as input to subsequent nodes