Add support for more decentralized storage providers

Added 4 new storage providers:
- NFT.Storage (free for NFT data)
- Infura IPFS (5GB free)
- Filebase (5GB free, S3-compatible IPFS)
- Storj (25GB free, decentralized cloud)

Updated UI with 7 total storage options in a 4-column grid,
each with distinct colored borders for visibility.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-10 00:25:09 +00:00
parent 8bef9afb1f
commit 770c36479f
2 changed files with 604 additions and 17 deletions

View File

@@ -370,6 +370,479 @@ class Web3StorageProvider(StorageProvider):
return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0}
class NFTStorageProvider(StorageProvider):
"""NFT.Storage pinning service provider (free for NFT data)."""
provider_type = "nftstorage"
def __init__(self, api_token: str, capacity_gb: int = 5):
self.api_token = api_token
self.capacity_bytes = capacity_gb * 1024**3
self.base_url = "https://api.nft.storage"
def _headers(self) -> dict:
return {"Authorization": f"Bearer {self.api_token}"}
async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
"""Pin content to NFT.Storage."""
try:
import asyncio
def do_pin():
response = requests.post(
f"{self.base_url}/upload",
data=data,
headers={**self._headers(), "Content-Type": "application/octet-stream"},
timeout=120
)
response.raise_for_status()
return response.json().get("value", {}).get("cid")
cid = await asyncio.to_thread(do_pin)
logger.info(f"NFT.Storage: Pinned {content_hash[:16]}... as {cid}")
return cid
except Exception as e:
logger.error(f"NFT.Storage pin failed: {e}")
return None
async def unpin(self, content_hash: str) -> bool:
"""NFT.Storage doesn't support unpinning - data is stored permanently."""
logger.warning("NFT.Storage: Unpinning not supported (permanent storage)")
return False
async def get(self, content_hash: str) -> Optional[bytes]:
"""Get content from NFT.Storage - would need CID mapping."""
return None
async def is_pinned(self, content_hash: str) -> bool:
"""Check if content is pinned - would need CID mapping."""
return False
async def test_connection(self) -> tuple[bool, str]:
"""Test NFT.Storage API connectivity."""
try:
import asyncio
def do_test():
response = requests.get(
f"{self.base_url}/",
headers=self._headers(),
timeout=10
)
response.raise_for_status()
return True, "Connected to NFT.Storage successfully"
return await asyncio.to_thread(do_test)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
return False, "Invalid API token"
return False, f"HTTP error: {e}"
except Exception as e:
return False, f"Connection failed: {e}"
def get_usage(self) -> dict:
"""Get NFT.Storage usage stats."""
return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0}
class InfuraIPFSProvider(StorageProvider):
"""Infura IPFS pinning service provider."""
provider_type = "infura"
def __init__(self, project_id: str, project_secret: str, capacity_gb: int = 5):
self.project_id = project_id
self.project_secret = project_secret
self.capacity_bytes = capacity_gb * 1024**3
self.base_url = "https://ipfs.infura.io:5001/api/v0"
def _auth(self) -> tuple:
return (self.project_id, self.project_secret)
async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
"""Pin content to Infura IPFS."""
try:
import asyncio
def do_pin():
files = {"file": (filename or f"{content_hash[:16]}.bin", data)}
response = requests.post(
f"{self.base_url}/add",
files=files,
auth=self._auth(),
timeout=120
)
response.raise_for_status()
return response.json().get("Hash")
cid = await asyncio.to_thread(do_pin)
logger.info(f"Infura IPFS: Pinned {content_hash[:16]}... as {cid}")
return cid
except Exception as e:
logger.error(f"Infura IPFS pin failed: {e}")
return None
async def unpin(self, content_hash: str) -> bool:
"""Unpin content from Infura IPFS."""
try:
import asyncio
def do_unpin():
response = requests.post(
f"{self.base_url}/pin/rm",
params={"arg": content_hash},
auth=self._auth(),
timeout=30
)
response.raise_for_status()
return True
return await asyncio.to_thread(do_unpin)
except Exception as e:
logger.error(f"Infura IPFS unpin failed: {e}")
return False
async def get(self, content_hash: str) -> Optional[bytes]:
"""Get content from Infura IPFS gateway."""
try:
import asyncio
def do_get():
response = requests.post(
f"{self.base_url}/cat",
params={"arg": content_hash},
auth=self._auth(),
timeout=120
)
response.raise_for_status()
return response.content
return await asyncio.to_thread(do_get)
except Exception as e:
logger.error(f"Infura IPFS get failed: {e}")
return None
async def is_pinned(self, content_hash: str) -> bool:
"""Check if content is pinned on Infura IPFS."""
try:
import asyncio
def do_check():
response = requests.post(
f"{self.base_url}/pin/ls",
params={"arg": content_hash},
auth=self._auth(),
timeout=30
)
return response.status_code == 200
return await asyncio.to_thread(do_check)
except Exception:
return False
async def test_connection(self) -> tuple[bool, str]:
"""Test Infura IPFS API connectivity."""
try:
import asyncio
def do_test():
response = requests.post(
f"{self.base_url}/id",
auth=self._auth(),
timeout=10
)
response.raise_for_status()
return True, "Connected to Infura IPFS successfully"
return await asyncio.to_thread(do_test)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
return False, "Invalid project credentials"
return False, f"HTTP error: {e}"
except Exception as e:
return False, f"Connection failed: {e}"
def get_usage(self) -> dict:
"""Get Infura usage stats."""
return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0}
class FilebaseProvider(StorageProvider):
"""Filebase S3-compatible IPFS pinning service."""
provider_type = "filebase"
def __init__(self, access_key: str, secret_key: str, bucket: str, capacity_gb: int = 5):
self.access_key = access_key
self.secret_key = secret_key
self.bucket = bucket
self.capacity_bytes = capacity_gb * 1024**3
self.endpoint = "https://s3.filebase.com"
async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
"""Pin content to Filebase."""
try:
import asyncio
import boto3
from botocore.config import Config
def do_pin():
s3 = boto3.client(
's3',
endpoint_url=self.endpoint,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
config=Config(signature_version='s3v4')
)
key = filename or f"{content_hash[:16]}.bin"
s3.put_object(Bucket=self.bucket, Key=key, Body=data)
# Get CID from response headers
head = s3.head_object(Bucket=self.bucket, Key=key)
return head.get('Metadata', {}).get('cid', content_hash)
cid = await asyncio.to_thread(do_pin)
logger.info(f"Filebase: Pinned {content_hash[:16]}... as {cid}")
return cid
except Exception as e:
logger.error(f"Filebase pin failed: {e}")
return None
async def unpin(self, content_hash: str) -> bool:
"""Remove content from Filebase."""
try:
import asyncio
import boto3
from botocore.config import Config
def do_unpin():
s3 = boto3.client(
's3',
endpoint_url=self.endpoint,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
config=Config(signature_version='s3v4')
)
s3.delete_object(Bucket=self.bucket, Key=content_hash)
return True
return await asyncio.to_thread(do_unpin)
except Exception as e:
logger.error(f"Filebase unpin failed: {e}")
return False
async def get(self, content_hash: str) -> Optional[bytes]:
"""Get content from Filebase."""
try:
import asyncio
import boto3
from botocore.config import Config
def do_get():
s3 = boto3.client(
's3',
endpoint_url=self.endpoint,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
config=Config(signature_version='s3v4')
)
response = s3.get_object(Bucket=self.bucket, Key=content_hash)
return response['Body'].read()
return await asyncio.to_thread(do_get)
except Exception as e:
logger.error(f"Filebase get failed: {e}")
return None
async def is_pinned(self, content_hash: str) -> bool:
"""Check if content exists in Filebase."""
try:
import asyncio
import boto3
from botocore.config import Config
def do_check():
s3 = boto3.client(
's3',
endpoint_url=self.endpoint,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
config=Config(signature_version='s3v4')
)
s3.head_object(Bucket=self.bucket, Key=content_hash)
return True
return await asyncio.to_thread(do_check)
except Exception:
return False
async def test_connection(self) -> tuple[bool, str]:
"""Test Filebase connectivity."""
try:
import asyncio
import boto3
from botocore.config import Config
def do_test():
s3 = boto3.client(
's3',
endpoint_url=self.endpoint,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
config=Config(signature_version='s3v4')
)
s3.head_bucket(Bucket=self.bucket)
return True, f"Connected to Filebase bucket '{self.bucket}'"
return await asyncio.to_thread(do_test)
except Exception as e:
if "404" in str(e):
return False, f"Bucket '{self.bucket}' not found"
if "403" in str(e):
return False, "Invalid credentials or no access to bucket"
return False, f"Connection failed: {e}"
def get_usage(self) -> dict:
"""Get Filebase usage stats."""
return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0}
class StorjProvider(StorageProvider):
"""Storj decentralized cloud storage (S3-compatible)."""
provider_type = "storj"
def __init__(self, access_key: str, secret_key: str, bucket: str, capacity_gb: int = 25):
self.access_key = access_key
self.secret_key = secret_key
self.bucket = bucket
self.capacity_bytes = capacity_gb * 1024**3
self.endpoint = "https://gateway.storjshare.io"
async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
"""Store content on Storj."""
try:
import asyncio
import boto3
from botocore.config import Config
def do_pin():
s3 = boto3.client(
's3',
endpoint_url=self.endpoint,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
config=Config(signature_version='s3v4')
)
key = filename or content_hash
s3.put_object(Bucket=self.bucket, Key=key, Body=data)
return content_hash
result = await asyncio.to_thread(do_pin)
logger.info(f"Storj: Stored {content_hash[:16]}...")
return result
except Exception as e:
logger.error(f"Storj pin failed: {e}")
return None
async def unpin(self, content_hash: str) -> bool:
"""Remove content from Storj."""
try:
import asyncio
import boto3
from botocore.config import Config
def do_unpin():
s3 = boto3.client(
's3',
endpoint_url=self.endpoint,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
config=Config(signature_version='s3v4')
)
s3.delete_object(Bucket=self.bucket, Key=content_hash)
return True
return await asyncio.to_thread(do_unpin)
except Exception as e:
logger.error(f"Storj unpin failed: {e}")
return False
async def get(self, content_hash: str) -> Optional[bytes]:
"""Get content from Storj."""
try:
import asyncio
import boto3
from botocore.config import Config
def do_get():
s3 = boto3.client(
's3',
endpoint_url=self.endpoint,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
config=Config(signature_version='s3v4')
)
response = s3.get_object(Bucket=self.bucket, Key=content_hash)
return response['Body'].read()
return await asyncio.to_thread(do_get)
except Exception as e:
logger.error(f"Storj get failed: {e}")
return None
async def is_pinned(self, content_hash: str) -> bool:
"""Check if content exists on Storj."""
try:
import asyncio
import boto3
from botocore.config import Config
def do_check():
s3 = boto3.client(
's3',
endpoint_url=self.endpoint,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
config=Config(signature_version='s3v4')
)
s3.head_object(Bucket=self.bucket, Key=content_hash)
return True
return await asyncio.to_thread(do_check)
except Exception:
return False
async def test_connection(self) -> tuple[bool, str]:
"""Test Storj connectivity."""
try:
import asyncio
import boto3
from botocore.config import Config
def do_test():
s3 = boto3.client(
's3',
endpoint_url=self.endpoint,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
config=Config(signature_version='s3v4')
)
s3.head_bucket(Bucket=self.bucket)
return True, f"Connected to Storj bucket '{self.bucket}'"
return await asyncio.to_thread(do_test)
except Exception as e:
if "404" in str(e):
return False, f"Bucket '{self.bucket}' not found"
if "403" in str(e):
return False, "Invalid credentials or no access to bucket"
return False, f"Connection failed: {e}"
def get_usage(self) -> dict:
"""Get Storj usage stats."""
return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0}
class LocalStorageProvider(StorageProvider):
"""Local filesystem storage provider."""
@@ -477,7 +950,7 @@ def create_provider(provider_type: str, config: dict) -> Optional[StorageProvide
Factory function to create a storage provider from config.
Args:
provider_type: 'pinata', 'web3storage', or 'local'
provider_type: One of 'pinata', 'web3storage', 'nftstorage', 'infura', 'filebase', 'storj', 'local'
config: Provider-specific configuration dict
Returns:
@@ -493,7 +966,32 @@ def create_provider(provider_type: str, config: dict) -> Optional[StorageProvide
elif provider_type == "web3storage":
return Web3StorageProvider(
api_token=config["api_token"],
capacity_gb=config.get("capacity_gb", 1)
capacity_gb=config.get("capacity_gb", 5)
)
elif provider_type == "nftstorage":
return NFTStorageProvider(
api_token=config["api_token"],
capacity_gb=config.get("capacity_gb", 5)
)
elif provider_type == "infura":
return InfuraIPFSProvider(
project_id=config["project_id"],
project_secret=config["project_secret"],
capacity_gb=config.get("capacity_gb", 5)
)
elif provider_type == "filebase":
return FilebaseProvider(
access_key=config["access_key"],
secret_key=config["secret_key"],
bucket=config["bucket"],
capacity_gb=config.get("capacity_gb", 5)
)
elif provider_type == "storj":
return StorjProvider(
access_key=config["access_key"],
secret_key=config["secret_key"],
bucket=config["bucket"],
capacity_gb=config.get("capacity_gb", 25)
)
elif provider_type == "local":
return LocalStorageProvider(