""" Storage provider abstraction for user-attachable storage. Supports: - Pinata (IPFS pinning service) - web3.storage (IPFS pinning service) - Local filesystem storage """ import hashlib import json import logging import os import shutil from abc import ABC, abstractmethod from pathlib import Path from typing import Optional import requests logger = logging.getLogger(__name__) class StorageProvider(ABC): """Abstract base class for storage backends.""" provider_type: str = "unknown" @abstractmethod async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]: """ Pin content to storage. Args: content_hash: SHA3-256 hash of the content data: Raw bytes to store filename: Optional filename hint Returns: IPFS CID or provider-specific ID, or None on failure """ pass @abstractmethod async def unpin(self, content_hash: str) -> bool: """ Unpin content from storage. Args: content_hash: SHA3-256 hash of the content Returns: True if unpinned successfully """ pass @abstractmethod async def get(self, content_hash: str) -> Optional[bytes]: """ Retrieve content from storage. Args: content_hash: SHA3-256 hash of the content Returns: Raw bytes or None if not found """ pass @abstractmethod async def is_pinned(self, content_hash: str) -> bool: """Check if content is pinned in this storage.""" pass @abstractmethod async def test_connection(self) -> tuple[bool, str]: """ Test connectivity to the storage provider. Returns: (success, message) tuple """ pass @abstractmethod def get_usage(self) -> dict: """ Get storage usage statistics. Returns: {used_bytes, capacity_bytes, pin_count} """ pass class PinataProvider(StorageProvider): """Pinata IPFS pinning service provider.""" provider_type = "pinata" def __init__(self, api_key: str, secret_key: str, capacity_gb: int = 1): self.api_key = api_key self.secret_key = secret_key self.capacity_bytes = capacity_gb * 1024**3 self.base_url = "https://api.pinata.cloud" self._usage_cache = None def _headers(self) -> dict: return { "pinata_api_key": self.api_key, "pinata_secret_api_key": self.secret_key, } async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]: """Pin content to Pinata.""" try: import asyncio def do_pin(): files = {"file": (filename or f"{content_hash[:16]}.bin", data)} metadata = { "name": filename or content_hash[:16], "keyvalues": {"content_hash": content_hash} } response = requests.post( f"{self.base_url}/pinning/pinFileToIPFS", files=files, data={"pinataMetadata": json.dumps(metadata)}, headers=self._headers(), timeout=120 ) response.raise_for_status() return response.json().get("IpfsHash") cid = await asyncio.to_thread(do_pin) logger.info(f"Pinata: Pinned {content_hash[:16]}... as {cid}") return cid except Exception as e: logger.error(f"Pinata pin failed: {e}") return None async def unpin(self, content_hash: str) -> bool: """Unpin content from Pinata by finding its CID first.""" try: import asyncio def do_unpin(): # First find the pin by content_hash metadata response = requests.get( f"{self.base_url}/data/pinList", params={"metadata[keyvalues][content_hash]": content_hash, "status": "pinned"}, headers=self._headers(), timeout=30 ) response.raise_for_status() pins = response.json().get("rows", []) if not pins: return False # Unpin each matching CID for pin in pins: cid = pin.get("ipfs_pin_hash") if cid: resp = requests.delete( f"{self.base_url}/pinning/unpin/{cid}", headers=self._headers(), timeout=30 ) resp.raise_for_status() return True result = await asyncio.to_thread(do_unpin) logger.info(f"Pinata: Unpinned {content_hash[:16]}...") return result except Exception as e: logger.error(f"Pinata unpin failed: {e}") return False async def get(self, content_hash: str) -> Optional[bytes]: """Get content from Pinata via IPFS gateway.""" try: import asyncio def do_get(): # First find the CID response = requests.get( f"{self.base_url}/data/pinList", params={"metadata[keyvalues][content_hash]": content_hash, "status": "pinned"}, headers=self._headers(), timeout=30 ) response.raise_for_status() pins = response.json().get("rows", []) if not pins: return None cid = pins[0].get("ipfs_pin_hash") if not cid: return None # Fetch from gateway gateway_response = requests.get( f"https://gateway.pinata.cloud/ipfs/{cid}", timeout=120 ) gateway_response.raise_for_status() return gateway_response.content return await asyncio.to_thread(do_get) except Exception as e: logger.error(f"Pinata get failed: {e}") return None async def is_pinned(self, content_hash: str) -> bool: """Check if content is pinned on Pinata.""" try: import asyncio def do_check(): response = requests.get( f"{self.base_url}/data/pinList", params={"metadata[keyvalues][content_hash]": content_hash, "status": "pinned"}, headers=self._headers(), timeout=30 ) response.raise_for_status() return len(response.json().get("rows", [])) > 0 return await asyncio.to_thread(do_check) except Exception: return False async def test_connection(self) -> tuple[bool, str]: """Test Pinata API connectivity.""" try: import asyncio def do_test(): response = requests.get( f"{self.base_url}/data/testAuthentication", headers=self._headers(), timeout=10 ) response.raise_for_status() return True, "Connected to Pinata successfully" return await asyncio.to_thread(do_test) except requests.exceptions.HTTPError as e: if e.response.status_code == 401: return False, "Invalid API credentials" return False, f"HTTP error: {e}" except Exception as e: return False, f"Connection failed: {e}" def get_usage(self) -> dict: """Get Pinata usage stats.""" try: response = requests.get( f"{self.base_url}/data/userPinnedDataTotal", headers=self._headers(), timeout=10 ) response.raise_for_status() data = response.json() return { "used_bytes": data.get("pin_size_total", 0), "capacity_bytes": self.capacity_bytes, "pin_count": data.get("pin_count", 0) } except Exception: return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0} class Web3StorageProvider(StorageProvider): """web3.storage pinning service provider.""" provider_type = "web3storage" def __init__(self, api_token: str, capacity_gb: int = 1): self.api_token = api_token self.capacity_bytes = capacity_gb * 1024**3 self.base_url = "https://api.web3.storage" def _headers(self) -> dict: return {"Authorization": f"Bearer {self.api_token}"} async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]: """Pin content to web3.storage.""" try: import asyncio def do_pin(): response = requests.post( f"{self.base_url}/upload", data=data, headers={ **self._headers(), "X-Name": filename or content_hash[:16] }, timeout=120 ) response.raise_for_status() return response.json().get("cid") cid = await asyncio.to_thread(do_pin) logger.info(f"web3.storage: Pinned {content_hash[:16]}... as {cid}") return cid except Exception as e: logger.error(f"web3.storage pin failed: {e}") return None async def unpin(self, content_hash: str) -> bool: """web3.storage doesn't support unpinning - data is stored permanently.""" logger.warning("web3.storage: Unpinning not supported (permanent storage)") return False async def get(self, content_hash: str) -> Optional[bytes]: """Get content from web3.storage - would need CID mapping.""" # web3.storage requires knowing the CID to fetch # For now, return None - we'd need to maintain a mapping return None async def is_pinned(self, content_hash: str) -> bool: """Check if content is pinned - would need CID mapping.""" return False async def test_connection(self) -> tuple[bool, str]: """Test web3.storage API connectivity.""" try: import asyncio def do_test(): response = requests.get( f"{self.base_url}/user/uploads", headers=self._headers(), params={"size": 1}, timeout=10 ) response.raise_for_status() return True, "Connected to web3.storage successfully" return await asyncio.to_thread(do_test) except requests.exceptions.HTTPError as e: if e.response.status_code == 401: return False, "Invalid API token" return False, f"HTTP error: {e}" except Exception as e: return False, f"Connection failed: {e}" def get_usage(self) -> dict: """Get web3.storage usage stats.""" try: response = requests.get( f"{self.base_url}/user/uploads", headers=self._headers(), params={"size": 1000}, timeout=30 ) response.raise_for_status() uploads = response.json() total_size = sum(u.get("dagSize", 0) for u in uploads) return { "used_bytes": total_size, "capacity_bytes": self.capacity_bytes, "pin_count": len(uploads) } except Exception: return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0} class LocalStorageProvider(StorageProvider): """Local filesystem storage provider.""" provider_type = "local" def __init__(self, base_path: str, capacity_gb: int = 10): self.base_path = Path(base_path) self.capacity_bytes = capacity_gb * 1024**3 # Create directory if it doesn't exist self.base_path.mkdir(parents=True, exist_ok=True) def _get_file_path(self, content_hash: str) -> Path: """Get file path for a content hash (using subdirectories).""" # Use first 2 chars as subdirectory for better filesystem performance subdir = content_hash[:2] return self.base_path / subdir / content_hash async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]: """Store content locally.""" try: import asyncio def do_store(): file_path = self._get_file_path(content_hash) file_path.parent.mkdir(parents=True, exist_ok=True) file_path.write_bytes(data) return content_hash # Use content_hash as ID for local storage result = await asyncio.to_thread(do_store) logger.info(f"Local: Stored {content_hash[:16]}...") return result except Exception as e: logger.error(f"Local storage failed: {e}") return None async def unpin(self, content_hash: str) -> bool: """Remove content from local storage.""" try: import asyncio def do_remove(): file_path = self._get_file_path(content_hash) if file_path.exists(): file_path.unlink() return True return False return await asyncio.to_thread(do_remove) except Exception as e: logger.error(f"Local unpin failed: {e}") return False async def get(self, content_hash: str) -> Optional[bytes]: """Get content from local storage.""" try: import asyncio def do_get(): file_path = self._get_file_path(content_hash) if file_path.exists(): return file_path.read_bytes() return None return await asyncio.to_thread(do_get) except Exception as e: logger.error(f"Local get failed: {e}") return None async def is_pinned(self, content_hash: str) -> bool: """Check if content exists in local storage.""" return self._get_file_path(content_hash).exists() async def test_connection(self) -> tuple[bool, str]: """Test local storage is writable.""" try: test_file = self.base_path / ".write_test" test_file.write_text("test") test_file.unlink() return True, f"Local storage ready at {self.base_path}" except Exception as e: return False, f"Cannot write to {self.base_path}: {e}" def get_usage(self) -> dict: """Get local storage usage stats.""" try: total_size = 0 file_count = 0 for subdir in self.base_path.iterdir(): if subdir.is_dir() and len(subdir.name) == 2: for f in subdir.iterdir(): if f.is_file(): total_size += f.stat().st_size file_count += 1 return { "used_bytes": total_size, "capacity_bytes": self.capacity_bytes, "pin_count": file_count } except Exception: return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0} def create_provider(provider_type: str, config: dict) -> Optional[StorageProvider]: """ Factory function to create a storage provider from config. Args: provider_type: 'pinata', 'web3storage', or 'local' config: Provider-specific configuration dict Returns: StorageProvider instance or None if invalid """ try: if provider_type == "pinata": return PinataProvider( api_key=config["api_key"], secret_key=config["secret_key"], capacity_gb=config.get("capacity_gb", 1) ) elif provider_type == "web3storage": return Web3StorageProvider( api_token=config["api_token"], capacity_gb=config.get("capacity_gb", 1) ) elif provider_type == "local": return LocalStorageProvider( base_path=config["path"], capacity_gb=config.get("capacity_gb", 10) ) else: logger.error(f"Unknown provider type: {provider_type}") return None except KeyError as e: logger.error(f"Missing config key for {provider_type}: {e}") return None except Exception as e: logger.error(f"Failed to create provider {provider_type}: {e}") return None