Refactor to use IPFS CID as the primary content identifier: - Update database schema: content_hash -> cid, output_hash -> output_cid - Update all services, routers, and tasks to use cid terminology - Update HTML templates to display CID instead of hash - Update cache_manager parameter names - Update README documentation This completes the transition to CID-only content addressing. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1010 lines
34 KiB
Python
1010 lines
34 KiB
Python
"""
|
|
Storage provider abstraction for user-attachable storage.
|
|
|
|
Supports:
|
|
- Pinata (IPFS pinning service)
|
|
- web3.storage (IPFS pinning service)
|
|
- Local filesystem storage
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
from abc import ABC, abstractmethod
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import requests
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class StorageProvider(ABC):
|
|
"""Abstract base class for storage backends."""
|
|
|
|
provider_type: str = "unknown"
|
|
|
|
@abstractmethod
|
|
async def pin(self, cid: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
|
|
"""
|
|
Pin content to storage.
|
|
|
|
Args:
|
|
cid: SHA3-256 hash of the content
|
|
data: Raw bytes to store
|
|
filename: Optional filename hint
|
|
|
|
Returns:
|
|
IPFS CID or provider-specific ID, or None on failure
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def unpin(self, cid: str) -> bool:
|
|
"""
|
|
Unpin content from storage.
|
|
|
|
Args:
|
|
cid: SHA3-256 hash of the content
|
|
|
|
Returns:
|
|
True if unpinned successfully
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def get(self, cid: str) -> Optional[bytes]:
|
|
"""
|
|
Retrieve content from storage.
|
|
|
|
Args:
|
|
cid: SHA3-256 hash of the content
|
|
|
|
Returns:
|
|
Raw bytes or None if not found
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def is_pinned(self, cid: str) -> bool:
|
|
"""Check if content is pinned in this storage."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def test_connection(self) -> tuple[bool, str]:
|
|
"""
|
|
Test connectivity to the storage provider.
|
|
|
|
Returns:
|
|
(success, message) tuple
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_usage(self) -> dict:
|
|
"""
|
|
Get storage usage statistics.
|
|
|
|
Returns:
|
|
{used_bytes, capacity_bytes, pin_count}
|
|
"""
|
|
pass
|
|
|
|
|
|
class PinataProvider(StorageProvider):
|
|
"""Pinata IPFS pinning service provider."""
|
|
|
|
provider_type = "pinata"
|
|
|
|
def __init__(self, api_key: str, secret_key: str, capacity_gb: int = 1):
|
|
self.api_key = api_key
|
|
self.secret_key = secret_key
|
|
self.capacity_bytes = capacity_gb * 1024**3
|
|
self.base_url = "https://api.pinata.cloud"
|
|
self._usage_cache = None
|
|
|
|
def _headers(self) -> dict:
|
|
return {
|
|
"pinata_api_key": self.api_key,
|
|
"pinata_secret_api_key": self.secret_key,
|
|
}
|
|
|
|
async def pin(self, cid: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
|
|
"""Pin content to Pinata."""
|
|
try:
|
|
import asyncio
|
|
|
|
def do_pin():
|
|
files = {"file": (filename or f"{cid[:16]}.bin", data)}
|
|
metadata = {
|
|
"name": filename or cid[:16],
|
|
"keyvalues": {"cid": cid}
|
|
}
|
|
response = requests.post(
|
|
f"{self.base_url}/pinning/pinFileToIPFS",
|
|
files=files,
|
|
data={"pinataMetadata": json.dumps(metadata)},
|
|
headers=self._headers(),
|
|
timeout=120
|
|
)
|
|
response.raise_for_status()
|
|
return response.json().get("IpfsHash")
|
|
|
|
cid = await asyncio.to_thread(do_pin)
|
|
logger.info(f"Pinata: Pinned {cid[:16]}... as {cid}")
|
|
return cid
|
|
except Exception as e:
|
|
logger.error(f"Pinata pin failed: {e}")
|
|
return None
|
|
|
|
async def unpin(self, cid: str) -> bool:
|
|
"""Unpin content from Pinata by finding its CID first."""
|
|
try:
|
|
import asyncio
|
|
|
|
def do_unpin():
|
|
# First find the pin by cid metadata
|
|
response = requests.get(
|
|
f"{self.base_url}/data/pinList",
|
|
params={"metadata[keyvalues][cid]": cid, "status": "pinned"},
|
|
headers=self._headers(),
|
|
timeout=30
|
|
)
|
|
response.raise_for_status()
|
|
pins = response.json().get("rows", [])
|
|
|
|
if not pins:
|
|
return False
|
|
|
|
# Unpin each matching CID
|
|
for pin in pins:
|
|
cid = pin.get("ipfs_pin_hash")
|
|
if cid:
|
|
resp = requests.delete(
|
|
f"{self.base_url}/pinning/unpin/{cid}",
|
|
headers=self._headers(),
|
|
timeout=30
|
|
)
|
|
resp.raise_for_status()
|
|
return True
|
|
|
|
result = await asyncio.to_thread(do_unpin)
|
|
logger.info(f"Pinata: Unpinned {cid[:16]}...")
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"Pinata unpin failed: {e}")
|
|
return False
|
|
|
|
async def get(self, cid: str) -> Optional[bytes]:
|
|
"""Get content from Pinata via IPFS gateway."""
|
|
try:
|
|
import asyncio
|
|
|
|
def do_get():
|
|
# First find the CID
|
|
response = requests.get(
|
|
f"{self.base_url}/data/pinList",
|
|
params={"metadata[keyvalues][cid]": cid, "status": "pinned"},
|
|
headers=self._headers(),
|
|
timeout=30
|
|
)
|
|
response.raise_for_status()
|
|
pins = response.json().get("rows", [])
|
|
|
|
if not pins:
|
|
return None
|
|
|
|
cid = pins[0].get("ipfs_pin_hash")
|
|
if not cid:
|
|
return None
|
|
|
|
# Fetch from gateway
|
|
gateway_response = requests.get(
|
|
f"https://gateway.pinata.cloud/ipfs/{cid}",
|
|
timeout=120
|
|
)
|
|
gateway_response.raise_for_status()
|
|
return gateway_response.content
|
|
|
|
return await asyncio.to_thread(do_get)
|
|
except Exception as e:
|
|
logger.error(f"Pinata get failed: {e}")
|
|
return None
|
|
|
|
async def is_pinned(self, cid: str) -> bool:
|
|
"""Check if content is pinned on Pinata."""
|
|
try:
|
|
import asyncio
|
|
|
|
def do_check():
|
|
response = requests.get(
|
|
f"{self.base_url}/data/pinList",
|
|
params={"metadata[keyvalues][cid]": cid, "status": "pinned"},
|
|
headers=self._headers(),
|
|
timeout=30
|
|
)
|
|
response.raise_for_status()
|
|
return len(response.json().get("rows", [])) > 0
|
|
|
|
return await asyncio.to_thread(do_check)
|
|
except Exception:
|
|
return False
|
|
|
|
async def test_connection(self) -> tuple[bool, str]:
|
|
"""Test Pinata API connectivity."""
|
|
try:
|
|
import asyncio
|
|
|
|
def do_test():
|
|
response = requests.get(
|
|
f"{self.base_url}/data/testAuthentication",
|
|
headers=self._headers(),
|
|
timeout=10
|
|
)
|
|
response.raise_for_status()
|
|
return True, "Connected to Pinata successfully"
|
|
|
|
return await asyncio.to_thread(do_test)
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 401:
|
|
return False, "Invalid API credentials"
|
|
return False, f"HTTP error: {e}"
|
|
except Exception as e:
|
|
return False, f"Connection failed: {e}"
|
|
|
|
def get_usage(self) -> dict:
|
|
"""Get Pinata usage stats."""
|
|
try:
|
|
response = requests.get(
|
|
f"{self.base_url}/data/userPinnedDataTotal",
|
|
headers=self._headers(),
|
|
timeout=10
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return {
|
|
"used_bytes": data.get("pin_size_total", 0),
|
|
"capacity_bytes": self.capacity_bytes,
|
|
"pin_count": data.get("pin_count", 0)
|
|
}
|
|
except Exception:
|
|
return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0}
|
|
|
|
|
|
class Web3StorageProvider(StorageProvider):
|
|
"""web3.storage pinning service provider."""
|
|
|
|
provider_type = "web3storage"
|
|
|
|
def __init__(self, api_token: str, capacity_gb: int = 1):
|
|
self.api_token = api_token
|
|
self.capacity_bytes = capacity_gb * 1024**3
|
|
self.base_url = "https://api.web3.storage"
|
|
|
|
def _headers(self) -> dict:
|
|
return {"Authorization": f"Bearer {self.api_token}"}
|
|
|
|
async def pin(self, cid: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
|
|
"""Pin content to web3.storage."""
|
|
try:
|
|
import asyncio
|
|
|
|
def do_pin():
|
|
response = requests.post(
|
|
f"{self.base_url}/upload",
|
|
data=data,
|
|
headers={
|
|
**self._headers(),
|
|
"X-Name": filename or cid[:16]
|
|
},
|
|
timeout=120
|
|
)
|
|
response.raise_for_status()
|
|
return response.json().get("cid")
|
|
|
|
cid = await asyncio.to_thread(do_pin)
|
|
logger.info(f"web3.storage: Pinned {cid[:16]}... as {cid}")
|
|
return cid
|
|
except Exception as e:
|
|
logger.error(f"web3.storage pin failed: {e}")
|
|
return None
|
|
|
|
async def unpin(self, cid: str) -> bool:
|
|
"""web3.storage doesn't support unpinning - data is stored permanently."""
|
|
logger.warning("web3.storage: Unpinning not supported (permanent storage)")
|
|
return False
|
|
|
|
async def get(self, cid: str) -> Optional[bytes]:
|
|
"""Get content from web3.storage - would need CID mapping."""
|
|
# web3.storage requires knowing the CID to fetch
|
|
# For now, return None - we'd need to maintain a mapping
|
|
return None
|
|
|
|
async def is_pinned(self, cid: str) -> bool:
|
|
"""Check if content is pinned - would need CID mapping."""
|
|
return False
|
|
|
|
async def test_connection(self) -> tuple[bool, str]:
|
|
"""Test web3.storage API connectivity."""
|
|
try:
|
|
import asyncio
|
|
|
|
def do_test():
|
|
response = requests.get(
|
|
f"{self.base_url}/user/uploads",
|
|
headers=self._headers(),
|
|
params={"size": 1},
|
|
timeout=10
|
|
)
|
|
response.raise_for_status()
|
|
return True, "Connected to web3.storage successfully"
|
|
|
|
return await asyncio.to_thread(do_test)
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 401:
|
|
return False, "Invalid API token"
|
|
return False, f"HTTP error: {e}"
|
|
except Exception as e:
|
|
return False, f"Connection failed: {e}"
|
|
|
|
def get_usage(self) -> dict:
|
|
"""Get web3.storage usage stats."""
|
|
try:
|
|
response = requests.get(
|
|
f"{self.base_url}/user/uploads",
|
|
headers=self._headers(),
|
|
params={"size": 1000},
|
|
timeout=30
|
|
)
|
|
response.raise_for_status()
|
|
uploads = response.json()
|
|
total_size = sum(u.get("dagSize", 0) for u in uploads)
|
|
return {
|
|
"used_bytes": total_size,
|
|
"capacity_bytes": self.capacity_bytes,
|
|
"pin_count": len(uploads)
|
|
}
|
|
except Exception:
|
|
return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0}
|
|
|
|
|
|
class NFTStorageProvider(StorageProvider):
|
|
"""NFT.Storage pinning service provider (free for NFT data)."""
|
|
|
|
provider_type = "nftstorage"
|
|
|
|
def __init__(self, api_token: str, capacity_gb: int = 5):
|
|
self.api_token = api_token
|
|
self.capacity_bytes = capacity_gb * 1024**3
|
|
self.base_url = "https://api.nft.storage"
|
|
|
|
def _headers(self) -> dict:
|
|
return {"Authorization": f"Bearer {self.api_token}"}
|
|
|
|
async def pin(self, cid: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
|
|
"""Pin content to NFT.Storage."""
|
|
try:
|
|
import asyncio
|
|
|
|
def do_pin():
|
|
response = requests.post(
|
|
f"{self.base_url}/upload",
|
|
data=data,
|
|
headers={**self._headers(), "Content-Type": "application/octet-stream"},
|
|
timeout=120
|
|
)
|
|
response.raise_for_status()
|
|
return response.json().get("value", {}).get("cid")
|
|
|
|
cid = await asyncio.to_thread(do_pin)
|
|
logger.info(f"NFT.Storage: Pinned {cid[:16]}... as {cid}")
|
|
return cid
|
|
except Exception as e:
|
|
logger.error(f"NFT.Storage pin failed: {e}")
|
|
return None
|
|
|
|
async def unpin(self, cid: str) -> bool:
|
|
"""NFT.Storage doesn't support unpinning - data is stored permanently."""
|
|
logger.warning("NFT.Storage: Unpinning not supported (permanent storage)")
|
|
return False
|
|
|
|
async def get(self, cid: str) -> Optional[bytes]:
|
|
"""Get content from NFT.Storage - would need CID mapping."""
|
|
return None
|
|
|
|
async def is_pinned(self, cid: str) -> bool:
|
|
"""Check if content is pinned - would need CID mapping."""
|
|
return False
|
|
|
|
async def test_connection(self) -> tuple[bool, str]:
|
|
"""Test NFT.Storage API connectivity."""
|
|
try:
|
|
import asyncio
|
|
|
|
def do_test():
|
|
response = requests.get(
|
|
f"{self.base_url}/",
|
|
headers=self._headers(),
|
|
timeout=10
|
|
)
|
|
response.raise_for_status()
|
|
return True, "Connected to NFT.Storage successfully"
|
|
|
|
return await asyncio.to_thread(do_test)
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 401:
|
|
return False, "Invalid API token"
|
|
return False, f"HTTP error: {e}"
|
|
except Exception as e:
|
|
return False, f"Connection failed: {e}"
|
|
|
|
def get_usage(self) -> dict:
|
|
"""Get NFT.Storage usage stats."""
|
|
return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0}
|
|
|
|
|
|
class InfuraIPFSProvider(StorageProvider):
|
|
"""Infura IPFS pinning service provider."""
|
|
|
|
provider_type = "infura"
|
|
|
|
def __init__(self, project_id: str, project_secret: str, capacity_gb: int = 5):
|
|
self.project_id = project_id
|
|
self.project_secret = project_secret
|
|
self.capacity_bytes = capacity_gb * 1024**3
|
|
self.base_url = "https://ipfs.infura.io:5001/api/v0"
|
|
|
|
def _auth(self) -> tuple:
|
|
return (self.project_id, self.project_secret)
|
|
|
|
async def pin(self, cid: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
|
|
"""Pin content to Infura IPFS."""
|
|
try:
|
|
import asyncio
|
|
|
|
def do_pin():
|
|
files = {"file": (filename or f"{cid[:16]}.bin", data)}
|
|
response = requests.post(
|
|
f"{self.base_url}/add",
|
|
files=files,
|
|
auth=self._auth(),
|
|
timeout=120
|
|
)
|
|
response.raise_for_status()
|
|
return response.json().get("Hash")
|
|
|
|
cid = await asyncio.to_thread(do_pin)
|
|
logger.info(f"Infura IPFS: Pinned {cid[:16]}... as {cid}")
|
|
return cid
|
|
except Exception as e:
|
|
logger.error(f"Infura IPFS pin failed: {e}")
|
|
return None
|
|
|
|
async def unpin(self, cid: str) -> bool:
|
|
"""Unpin content from Infura IPFS."""
|
|
try:
|
|
import asyncio
|
|
|
|
def do_unpin():
|
|
response = requests.post(
|
|
f"{self.base_url}/pin/rm",
|
|
params={"arg": cid},
|
|
auth=self._auth(),
|
|
timeout=30
|
|
)
|
|
response.raise_for_status()
|
|
return True
|
|
|
|
return await asyncio.to_thread(do_unpin)
|
|
except Exception as e:
|
|
logger.error(f"Infura IPFS unpin failed: {e}")
|
|
return False
|
|
|
|
async def get(self, cid: str) -> Optional[bytes]:
|
|
"""Get content from Infura IPFS gateway."""
|
|
try:
|
|
import asyncio
|
|
|
|
def do_get():
|
|
response = requests.post(
|
|
f"{self.base_url}/cat",
|
|
params={"arg": cid},
|
|
auth=self._auth(),
|
|
timeout=120
|
|
)
|
|
response.raise_for_status()
|
|
return response.content
|
|
|
|
return await asyncio.to_thread(do_get)
|
|
except Exception as e:
|
|
logger.error(f"Infura IPFS get failed: {e}")
|
|
return None
|
|
|
|
async def is_pinned(self, cid: str) -> bool:
|
|
"""Check if content is pinned on Infura IPFS."""
|
|
try:
|
|
import asyncio
|
|
|
|
def do_check():
|
|
response = requests.post(
|
|
f"{self.base_url}/pin/ls",
|
|
params={"arg": cid},
|
|
auth=self._auth(),
|
|
timeout=30
|
|
)
|
|
return response.status_code == 200
|
|
|
|
return await asyncio.to_thread(do_check)
|
|
except Exception:
|
|
return False
|
|
|
|
async def test_connection(self) -> tuple[bool, str]:
|
|
"""Test Infura IPFS API connectivity."""
|
|
try:
|
|
import asyncio
|
|
|
|
def do_test():
|
|
response = requests.post(
|
|
f"{self.base_url}/id",
|
|
auth=self._auth(),
|
|
timeout=10
|
|
)
|
|
response.raise_for_status()
|
|
return True, "Connected to Infura IPFS successfully"
|
|
|
|
return await asyncio.to_thread(do_test)
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 401:
|
|
return False, "Invalid project credentials"
|
|
return False, f"HTTP error: {e}"
|
|
except Exception as e:
|
|
return False, f"Connection failed: {e}"
|
|
|
|
def get_usage(self) -> dict:
|
|
"""Get Infura usage stats."""
|
|
return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0}
|
|
|
|
|
|
class FilebaseProvider(StorageProvider):
|
|
"""Filebase S3-compatible IPFS pinning service."""
|
|
|
|
provider_type = "filebase"
|
|
|
|
def __init__(self, access_key: str, secret_key: str, bucket: str, capacity_gb: int = 5):
|
|
self.access_key = access_key
|
|
self.secret_key = secret_key
|
|
self.bucket = bucket
|
|
self.capacity_bytes = capacity_gb * 1024**3
|
|
self.endpoint = "https://s3.filebase.com"
|
|
|
|
async def pin(self, cid: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
|
|
"""Pin content to Filebase."""
|
|
try:
|
|
import asyncio
|
|
import boto3
|
|
from botocore.config import Config
|
|
|
|
def do_pin():
|
|
s3 = boto3.client(
|
|
's3',
|
|
endpoint_url=self.endpoint,
|
|
aws_access_key_id=self.access_key,
|
|
aws_secret_access_key=self.secret_key,
|
|
config=Config(signature_version='s3v4')
|
|
)
|
|
key = filename or f"{cid[:16]}.bin"
|
|
s3.put_object(Bucket=self.bucket, Key=key, Body=data)
|
|
# Get CID from response headers
|
|
head = s3.head_object(Bucket=self.bucket, Key=key)
|
|
return head.get('Metadata', {}).get('cid', cid)
|
|
|
|
cid = await asyncio.to_thread(do_pin)
|
|
logger.info(f"Filebase: Pinned {cid[:16]}... as {cid}")
|
|
return cid
|
|
except Exception as e:
|
|
logger.error(f"Filebase pin failed: {e}")
|
|
return None
|
|
|
|
async def unpin(self, cid: str) -> bool:
|
|
"""Remove content from Filebase."""
|
|
try:
|
|
import asyncio
|
|
import boto3
|
|
from botocore.config import Config
|
|
|
|
def do_unpin():
|
|
s3 = boto3.client(
|
|
's3',
|
|
endpoint_url=self.endpoint,
|
|
aws_access_key_id=self.access_key,
|
|
aws_secret_access_key=self.secret_key,
|
|
config=Config(signature_version='s3v4')
|
|
)
|
|
s3.delete_object(Bucket=self.bucket, Key=cid)
|
|
return True
|
|
|
|
return await asyncio.to_thread(do_unpin)
|
|
except Exception as e:
|
|
logger.error(f"Filebase unpin failed: {e}")
|
|
return False
|
|
|
|
async def get(self, cid: str) -> Optional[bytes]:
|
|
"""Get content from Filebase."""
|
|
try:
|
|
import asyncio
|
|
import boto3
|
|
from botocore.config import Config
|
|
|
|
def do_get():
|
|
s3 = boto3.client(
|
|
's3',
|
|
endpoint_url=self.endpoint,
|
|
aws_access_key_id=self.access_key,
|
|
aws_secret_access_key=self.secret_key,
|
|
config=Config(signature_version='s3v4')
|
|
)
|
|
response = s3.get_object(Bucket=self.bucket, Key=cid)
|
|
return response['Body'].read()
|
|
|
|
return await asyncio.to_thread(do_get)
|
|
except Exception as e:
|
|
logger.error(f"Filebase get failed: {e}")
|
|
return None
|
|
|
|
async def is_pinned(self, cid: str) -> bool:
|
|
"""Check if content exists in Filebase."""
|
|
try:
|
|
import asyncio
|
|
import boto3
|
|
from botocore.config import Config
|
|
|
|
def do_check():
|
|
s3 = boto3.client(
|
|
's3',
|
|
endpoint_url=self.endpoint,
|
|
aws_access_key_id=self.access_key,
|
|
aws_secret_access_key=self.secret_key,
|
|
config=Config(signature_version='s3v4')
|
|
)
|
|
s3.head_object(Bucket=self.bucket, Key=cid)
|
|
return True
|
|
|
|
return await asyncio.to_thread(do_check)
|
|
except Exception:
|
|
return False
|
|
|
|
async def test_connection(self) -> tuple[bool, str]:
|
|
"""Test Filebase connectivity."""
|
|
try:
|
|
import asyncio
|
|
import boto3
|
|
from botocore.config import Config
|
|
|
|
def do_test():
|
|
s3 = boto3.client(
|
|
's3',
|
|
endpoint_url=self.endpoint,
|
|
aws_access_key_id=self.access_key,
|
|
aws_secret_access_key=self.secret_key,
|
|
config=Config(signature_version='s3v4')
|
|
)
|
|
s3.head_bucket(Bucket=self.bucket)
|
|
return True, f"Connected to Filebase bucket '{self.bucket}'"
|
|
|
|
return await asyncio.to_thread(do_test)
|
|
except Exception as e:
|
|
if "404" in str(e):
|
|
return False, f"Bucket '{self.bucket}' not found"
|
|
if "403" in str(e):
|
|
return False, "Invalid credentials or no access to bucket"
|
|
return False, f"Connection failed: {e}"
|
|
|
|
def get_usage(self) -> dict:
|
|
"""Get Filebase usage stats."""
|
|
return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0}
|
|
|
|
|
|
class StorjProvider(StorageProvider):
|
|
"""Storj decentralized cloud storage (S3-compatible)."""
|
|
|
|
provider_type = "storj"
|
|
|
|
def __init__(self, access_key: str, secret_key: str, bucket: str, capacity_gb: int = 25):
|
|
self.access_key = access_key
|
|
self.secret_key = secret_key
|
|
self.bucket = bucket
|
|
self.capacity_bytes = capacity_gb * 1024**3
|
|
self.endpoint = "https://gateway.storjshare.io"
|
|
|
|
async def pin(self, cid: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
|
|
"""Store content on Storj."""
|
|
try:
|
|
import asyncio
|
|
import boto3
|
|
from botocore.config import Config
|
|
|
|
def do_pin():
|
|
s3 = boto3.client(
|
|
's3',
|
|
endpoint_url=self.endpoint,
|
|
aws_access_key_id=self.access_key,
|
|
aws_secret_access_key=self.secret_key,
|
|
config=Config(signature_version='s3v4')
|
|
)
|
|
key = filename or cid
|
|
s3.put_object(Bucket=self.bucket, Key=key, Body=data)
|
|
return cid
|
|
|
|
result = await asyncio.to_thread(do_pin)
|
|
logger.info(f"Storj: Stored {cid[:16]}...")
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"Storj pin failed: {e}")
|
|
return None
|
|
|
|
async def unpin(self, cid: str) -> bool:
|
|
"""Remove content from Storj."""
|
|
try:
|
|
import asyncio
|
|
import boto3
|
|
from botocore.config import Config
|
|
|
|
def do_unpin():
|
|
s3 = boto3.client(
|
|
's3',
|
|
endpoint_url=self.endpoint,
|
|
aws_access_key_id=self.access_key,
|
|
aws_secret_access_key=self.secret_key,
|
|
config=Config(signature_version='s3v4')
|
|
)
|
|
s3.delete_object(Bucket=self.bucket, Key=cid)
|
|
return True
|
|
|
|
return await asyncio.to_thread(do_unpin)
|
|
except Exception as e:
|
|
logger.error(f"Storj unpin failed: {e}")
|
|
return False
|
|
|
|
async def get(self, cid: str) -> Optional[bytes]:
|
|
"""Get content from Storj."""
|
|
try:
|
|
import asyncio
|
|
import boto3
|
|
from botocore.config import Config
|
|
|
|
def do_get():
|
|
s3 = boto3.client(
|
|
's3',
|
|
endpoint_url=self.endpoint,
|
|
aws_access_key_id=self.access_key,
|
|
aws_secret_access_key=self.secret_key,
|
|
config=Config(signature_version='s3v4')
|
|
)
|
|
response = s3.get_object(Bucket=self.bucket, Key=cid)
|
|
return response['Body'].read()
|
|
|
|
return await asyncio.to_thread(do_get)
|
|
except Exception as e:
|
|
logger.error(f"Storj get failed: {e}")
|
|
return None
|
|
|
|
async def is_pinned(self, cid: str) -> bool:
|
|
"""Check if content exists on Storj."""
|
|
try:
|
|
import asyncio
|
|
import boto3
|
|
from botocore.config import Config
|
|
|
|
def do_check():
|
|
s3 = boto3.client(
|
|
's3',
|
|
endpoint_url=self.endpoint,
|
|
aws_access_key_id=self.access_key,
|
|
aws_secret_access_key=self.secret_key,
|
|
config=Config(signature_version='s3v4')
|
|
)
|
|
s3.head_object(Bucket=self.bucket, Key=cid)
|
|
return True
|
|
|
|
return await asyncio.to_thread(do_check)
|
|
except Exception:
|
|
return False
|
|
|
|
async def test_connection(self) -> tuple[bool, str]:
|
|
"""Test Storj connectivity."""
|
|
try:
|
|
import asyncio
|
|
import boto3
|
|
from botocore.config import Config
|
|
|
|
def do_test():
|
|
s3 = boto3.client(
|
|
's3',
|
|
endpoint_url=self.endpoint,
|
|
aws_access_key_id=self.access_key,
|
|
aws_secret_access_key=self.secret_key,
|
|
config=Config(signature_version='s3v4')
|
|
)
|
|
s3.head_bucket(Bucket=self.bucket)
|
|
return True, f"Connected to Storj bucket '{self.bucket}'"
|
|
|
|
return await asyncio.to_thread(do_test)
|
|
except Exception as e:
|
|
if "404" in str(e):
|
|
return False, f"Bucket '{self.bucket}' not found"
|
|
if "403" in str(e):
|
|
return False, "Invalid credentials or no access to bucket"
|
|
return False, f"Connection failed: {e}"
|
|
|
|
def get_usage(self) -> dict:
|
|
"""Get Storj usage stats."""
|
|
return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0}
|
|
|
|
|
|
class LocalStorageProvider(StorageProvider):
|
|
"""Local filesystem storage provider."""
|
|
|
|
provider_type = "local"
|
|
|
|
def __init__(self, base_path: str, capacity_gb: int = 10):
|
|
self.base_path = Path(base_path)
|
|
self.capacity_bytes = capacity_gb * 1024**3
|
|
# Create directory if it doesn't exist
|
|
self.base_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
def _get_file_path(self, cid: str) -> Path:
|
|
"""Get file path for a content hash (using subdirectories)."""
|
|
# Use first 2 chars as subdirectory for better filesystem performance
|
|
subdir = cid[:2]
|
|
return self.base_path / subdir / cid
|
|
|
|
async def pin(self, cid: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
|
|
"""Store content locally."""
|
|
try:
|
|
import asyncio
|
|
|
|
def do_store():
|
|
file_path = self._get_file_path(cid)
|
|
file_path.parent.mkdir(parents=True, exist_ok=True)
|
|
file_path.write_bytes(data)
|
|
return cid # Use cid as ID for local storage
|
|
|
|
result = await asyncio.to_thread(do_store)
|
|
logger.info(f"Local: Stored {cid[:16]}...")
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"Local storage failed: {e}")
|
|
return None
|
|
|
|
async def unpin(self, cid: str) -> bool:
|
|
"""Remove content from local storage."""
|
|
try:
|
|
import asyncio
|
|
|
|
def do_remove():
|
|
file_path = self._get_file_path(cid)
|
|
if file_path.exists():
|
|
file_path.unlink()
|
|
return True
|
|
return False
|
|
|
|
return await asyncio.to_thread(do_remove)
|
|
except Exception as e:
|
|
logger.error(f"Local unpin failed: {e}")
|
|
return False
|
|
|
|
async def get(self, cid: str) -> Optional[bytes]:
|
|
"""Get content from local storage."""
|
|
try:
|
|
import asyncio
|
|
|
|
def do_get():
|
|
file_path = self._get_file_path(cid)
|
|
if file_path.exists():
|
|
return file_path.read_bytes()
|
|
return None
|
|
|
|
return await asyncio.to_thread(do_get)
|
|
except Exception as e:
|
|
logger.error(f"Local get failed: {e}")
|
|
return None
|
|
|
|
async def is_pinned(self, cid: str) -> bool:
|
|
"""Check if content exists in local storage."""
|
|
return self._get_file_path(cid).exists()
|
|
|
|
async def test_connection(self) -> tuple[bool, str]:
|
|
"""Test local storage is writable."""
|
|
try:
|
|
test_file = self.base_path / ".write_test"
|
|
test_file.write_text("test")
|
|
test_file.unlink()
|
|
return True, f"Local storage ready at {self.base_path}"
|
|
except Exception as e:
|
|
return False, f"Cannot write to {self.base_path}: {e}"
|
|
|
|
def get_usage(self) -> dict:
|
|
"""Get local storage usage stats."""
|
|
try:
|
|
total_size = 0
|
|
file_count = 0
|
|
for subdir in self.base_path.iterdir():
|
|
if subdir.is_dir() and len(subdir.name) == 2:
|
|
for f in subdir.iterdir():
|
|
if f.is_file():
|
|
total_size += f.stat().st_size
|
|
file_count += 1
|
|
return {
|
|
"used_bytes": total_size,
|
|
"capacity_bytes": self.capacity_bytes,
|
|
"pin_count": file_count
|
|
}
|
|
except Exception:
|
|
return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0}
|
|
|
|
|
|
def create_provider(provider_type: str, config: dict) -> Optional[StorageProvider]:
|
|
"""
|
|
Factory function to create a storage provider from config.
|
|
|
|
Args:
|
|
provider_type: One of 'pinata', 'web3storage', 'nftstorage', 'infura', 'filebase', 'storj', 'local'
|
|
config: Provider-specific configuration dict
|
|
|
|
Returns:
|
|
StorageProvider instance or None if invalid
|
|
"""
|
|
try:
|
|
if provider_type == "pinata":
|
|
return PinataProvider(
|
|
api_key=config["api_key"],
|
|
secret_key=config["secret_key"],
|
|
capacity_gb=config.get("capacity_gb", 1)
|
|
)
|
|
elif provider_type == "web3storage":
|
|
return Web3StorageProvider(
|
|
api_token=config["api_token"],
|
|
capacity_gb=config.get("capacity_gb", 5)
|
|
)
|
|
elif provider_type == "nftstorage":
|
|
return NFTStorageProvider(
|
|
api_token=config["api_token"],
|
|
capacity_gb=config.get("capacity_gb", 5)
|
|
)
|
|
elif provider_type == "infura":
|
|
return InfuraIPFSProvider(
|
|
project_id=config["project_id"],
|
|
project_secret=config["project_secret"],
|
|
capacity_gb=config.get("capacity_gb", 5)
|
|
)
|
|
elif provider_type == "filebase":
|
|
return FilebaseProvider(
|
|
access_key=config["access_key"],
|
|
secret_key=config["secret_key"],
|
|
bucket=config["bucket"],
|
|
capacity_gb=config.get("capacity_gb", 5)
|
|
)
|
|
elif provider_type == "storj":
|
|
return StorjProvider(
|
|
access_key=config["access_key"],
|
|
secret_key=config["secret_key"],
|
|
bucket=config["bucket"],
|
|
capacity_gb=config.get("capacity_gb", 25)
|
|
)
|
|
elif provider_type == "local":
|
|
return LocalStorageProvider(
|
|
base_path=config["path"],
|
|
capacity_gb=config.get("capacity_gb", 10)
|
|
)
|
|
else:
|
|
logger.error(f"Unknown provider type: {provider_type}")
|
|
return None
|
|
except KeyError as e:
|
|
logger.error(f"Missing config key for {provider_type}: {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Failed to create provider {provider_type}: {e}")
|
|
return None
|