diff --git a/app/routers/cache.py b/app/routers/cache.py index 9120450..a019a0f 100644 --- a/app/routers/cache.py +++ b/app/routers/cache.py @@ -209,9 +209,9 @@ async def upload_content( ctx: UserContext = Depends(require_auth), cache_service: CacheService = Depends(get_cache_service), ): - """Upload content to cache.""" + """Upload content to cache and IPFS.""" content = await file.read() - content_hash, error = await cache_service.upload_content( + content_hash, ipfs_cid, error = await cache_service.upload_content( content=content, filename=file.filename, actor_id=ctx.actor_id, @@ -221,7 +221,8 @@ async def upload_content( raise HTTPException(400, error) return { - "content_hash": content_hash, + "cid": ipfs_cid, + "content_hash": content_hash, # Legacy, for backwards compatibility "filename": file.filename, "size": len(content), "uploaded": True, diff --git a/app/services/cache_service.py b/app/services/cache_service.py index f660512..cb7acc7 100644 --- a/app/services/cache_service.py +++ b/app/services/cache_service.py @@ -453,8 +453,8 @@ class CacheService: content: bytes, filename: str, actor_id: str, - ) -> Tuple[Optional[str], Optional[str]]: - """Upload content to cache. Returns (content_hash, error).""" + ) -> Tuple[Optional[str], Optional[str], Optional[str]]: + """Upload content to cache. Returns (content_hash, ipfs_cid, error).""" import tempfile try: @@ -466,7 +466,7 @@ class CacheService: # Detect MIME type before moving file mime_type = get_mime_type(tmp_path) - # Store in cache + # Store in cache (also stores in IPFS) cached, ipfs_cid = self.cache.put(tmp_path, node_type="upload", move=True) content_hash = cached.content_hash @@ -479,9 +479,9 @@ class CacheService: filename=filename ) - return content_hash, None + return content_hash, ipfs_cid, None except Exception as e: - return None, f"Upload failed: {e}" + return None, None, f"Upload failed: {e}" async def list_media( self, diff --git a/cache_manager.py b/cache_manager.py index 57f4f60..8720167 100644 --- a/cache_manager.py +++ b/cache_manager.py @@ -427,8 +427,17 @@ class L1CacheManager: """Get cached file path by node_id.""" return self.cache.get(node_id) + def _is_ipfs_cid(self, identifier: str) -> bool: + """Check if identifier looks like an IPFS CID.""" + # CIDv0 starts with "Qm", CIDv1 starts with "bafy" or other multibase prefixes + return identifier.startswith("Qm") or identifier.startswith("bafy") or identifier.startswith("baf") + def get_by_content_hash(self, content_hash: str) -> Optional[Path]: - """Get cached file path by content_hash. Falls back to IPFS if not in local cache.""" + """Get cached file path by content_hash or IPFS CID. Falls back to IPFS if not in local cache.""" + + # If it looks like an IPFS CID, use get_by_cid instead + if self._is_ipfs_cid(content_hash): + return self.get_by_cid(content_hash) # Check index first (Redis then local) node_id = self._get_content_index(content_hash) @@ -469,6 +478,32 @@ class L1CacheManager: return None + def get_by_cid(self, ipfs_cid: str) -> Optional[Path]: + """Get cached file path by IPFS CID. Fetches from IPFS if not in local cache.""" + + # Check if we have this CID cached locally (indexed by CID) + cached_path = self.legacy_dir / ipfs_cid + if cached_path.exists() and cached_path.is_file(): + return cached_path + + # Check cache directory structure + cid_cache_dir = self.cache_dir / ipfs_cid + if cid_cache_dir.exists() and cid_cache_dir.is_dir(): + # Look for output file + for f in cid_cache_dir.iterdir(): + if f.is_file() and not f.name.endswith('.json'): + return f + + # Fetch from IPFS + logger.info(f"Fetching from IPFS: {ipfs_cid[:16]}...") + recovery_path = self.legacy_dir / ipfs_cid + recovery_path.parent.mkdir(parents=True, exist_ok=True) + if ipfs_client.get_file(ipfs_cid, recovery_path): + logger.info(f"Fetched from IPFS: {recovery_path}") + return recovery_path + + return None + def has_content(self, content_hash: str) -> bool: """Check if content exists in cache.""" return self.get_by_content_hash(content_hash) is not None diff --git a/tasks/execute_sexp.py b/tasks/execute_sexp.py index 79160ea..902ac1a 100644 --- a/tasks/execute_sexp.py +++ b/tasks/execute_sexp.py @@ -197,13 +197,14 @@ def execute_step_sexp( try: # Handle SOURCE nodes if node_type == "SOURCE": - content_hash = config.get("hash") - if not content_hash: - raise ValueError("SOURCE step missing :hash") + # Support both :cid (new IPFS) and :hash (legacy) + content_id = config.get("cid") or config.get("hash") + if not content_id: + raise ValueError("SOURCE step missing :cid or :hash") - path = cache_mgr.get_by_content_hash(content_hash) + path = cache_mgr.get_by_content_hash(content_id) if not path: - raise ValueError(f"SOURCE input not found: {content_hash[:16]}...") + raise ValueError(f"SOURCE input not found: {content_id[:16]}...") output_path = str(path) complete_task(cache_id, worker_id, output_path)