diff --git a/server.py b/server.py
index 86d81bc..8fe6bbd 100644
--- a/server.py
+++ b/server.py
@@ -43,6 +43,7 @@ from tasks import render_effect, execute_dag, build_effect_dag
from contextlib import asynccontextmanager
from cache_manager import L1CacheManager, get_cache_manager
import database
+import storage_providers
# L1 public URL for redirects
L1_PUBLIC_URL = os.environ.get("L1_PUBLIC_URL", "http://localhost:8100")
@@ -3177,6 +3178,21 @@ class PublishRequest(BaseModel):
asset_type: str = "image" # image, video, etc.
+class AddStorageRequest(BaseModel):
+ """Request to add a storage provider."""
+ provider_type: str # 'pinata', 'web3storage', 'local', etc.
+ provider_name: Optional[str] = None # User-friendly name
+ config: dict # Provider-specific config (api_key, path, etc.)
+ capacity_gb: int # Storage capacity in GB
+
+
+class UpdateStorageRequest(BaseModel):
+ """Request to update a storage provider."""
+ config: Optional[dict] = None
+ capacity_gb: Optional[int] = None
+ is_active: Optional[bool] = None
+
+
@app.get("/cache/{content_hash}/meta")
async def get_cache_meta(content_hash: str, ctx: UserContext = Depends(get_required_user_context)):
"""Get metadata for a cached file."""
@@ -4403,6 +4419,530 @@ async def ui_run_partial(run_id: str, request: Request):
return html
+# ============ User Storage Configuration ============
+
+STORAGE_PROVIDERS_INFO = {
+ "pinata": {"name": "Pinata", "desc": "1GB free, IPFS pinning", "color": "blue"},
+ "web3storage": {"name": "web3.storage", "desc": "IPFS + Filecoin", "color": "green"},
+ "nftstorage": {"name": "NFT.Storage", "desc": "Free for NFTs", "color": "pink"},
+ "infura": {"name": "Infura IPFS", "desc": "5GB free", "color": "orange"},
+ "filebase": {"name": "Filebase", "desc": "5GB free, S3+IPFS", "color": "cyan"},
+ "storj": {"name": "Storj", "desc": "25GB free", "color": "indigo"},
+ "local": {"name": "Local Storage", "desc": "Your own disk", "color": "purple"},
+}
+
+
+@app.get("/storage")
+async def list_storage(request: Request):
+ """List user's storage providers. HTML for browsers, JSON for API."""
+ accept = request.headers.get("accept", "")
+ wants_json = "application/json" in accept and "text/html" not in accept
+
+ ctx = await get_user_context_from_cookie(request)
+ if not ctx:
+ if wants_json:
+ raise HTTPException(401, "Authentication required")
+ return RedirectResponse(url="/auth", status_code=302)
+
+ storages = await database.get_user_storage(ctx.actor_id)
+
+ # Add usage stats to each storage
+ for storage in storages:
+ usage = await database.get_storage_usage(storage["id"])
+ storage["used_bytes"] = usage["used_bytes"]
+ storage["pin_count"] = usage["pin_count"]
+ storage["donated_gb"] = storage["capacity_gb"] // 2
+ # Mask sensitive config keys for display
+ if storage.get("config"):
+ config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"])
+ masked = {}
+ for k, v in config.items():
+ if "key" in k.lower() or "token" in k.lower() or "secret" in k.lower():
+ masked[k] = v[:4] + "..." + v[-4:] if len(str(v)) > 8 else "****"
+ else:
+ masked[k] = v
+ storage["config_display"] = masked
+
+ if wants_json:
+ return {"storages": storages}
+
+ return await ui_storage_page(ctx.username, storages, request)
+
+
+@app.post("/storage")
+async def add_storage(req: AddStorageRequest, ctx: UserContext = Depends(get_required_user_context)):
+ """Add a storage provider."""
+ valid_types = ["pinata", "web3storage", "nftstorage", "infura", "filebase", "storj", "local"]
+ if req.provider_type not in valid_types:
+ raise HTTPException(400, f"Invalid provider type: {req.provider_type}")
+
+ # Test the provider connection before saving
+ provider = storage_providers.create_provider(req.provider_type, {
+ **req.config,
+ "capacity_gb": req.capacity_gb
+ })
+ if not provider:
+ raise HTTPException(400, "Failed to create provider with given config")
+
+ success, message = await provider.test_connection()
+ if not success:
+ raise HTTPException(400, f"Provider connection failed: {message}")
+
+ # Save to database
+ provider_name = req.provider_name or f"{req.provider_type}-{ctx.username}"
+ storage_id = await database.add_user_storage(
+ actor_id=ctx.actor_id,
+ provider_type=req.provider_type,
+ provider_name=provider_name,
+ config=req.config,
+ capacity_gb=req.capacity_gb
+ )
+
+ if not storage_id:
+ raise HTTPException(500, "Failed to save storage provider")
+
+ return {"id": storage_id, "message": f"Storage provider added: {provider_name}"}
+
+
+@app.post("/storage/add")
+async def add_storage_form(
+ request: Request,
+ provider_type: str = Form(...),
+ provider_name: Optional[str] = Form(None),
+ description: Optional[str] = Form(None),
+ capacity_gb: int = Form(5),
+ api_key: Optional[str] = Form(None),
+ secret_key: Optional[str] = Form(None),
+ api_token: Optional[str] = Form(None),
+ project_id: Optional[str] = Form(None),
+ project_secret: Optional[str] = Form(None),
+ access_key: Optional[str] = Form(None),
+ bucket: Optional[str] = Form(None),
+ path: Optional[str] = Form(None),
+):
+ """Add a storage provider via HTML form (cookie auth)."""
+ ctx = await get_user_context_from_cookie(request)
+ if not ctx:
+ return HTMLResponse('
Not authenticated
', status_code=401)
+
+ valid_types = ["pinata", "web3storage", "nftstorage", "infura", "filebase", "storj", "local"]
+ if provider_type not in valid_types:
+ return HTMLResponse(f'Invalid provider type: {provider_type}
')
+
+ # Build config based on provider type
+ config = {}
+ if provider_type == "pinata":
+ if not api_key or not secret_key:
+ return HTMLResponse('Pinata requires API Key and Secret Key
')
+ config = {"api_key": api_key, "secret_key": secret_key}
+ elif provider_type == "web3storage":
+ if not api_token:
+ return HTMLResponse('web3.storage requires API Token
')
+ config = {"api_token": api_token}
+ elif provider_type == "nftstorage":
+ if not api_token:
+ return HTMLResponse('NFT.Storage requires API Token
')
+ config = {"api_token": api_token}
+ elif provider_type == "infura":
+ if not project_id or not project_secret:
+ return HTMLResponse('Infura requires Project ID and Project Secret
')
+ config = {"project_id": project_id, "project_secret": project_secret}
+ elif provider_type == "filebase":
+ if not access_key or not secret_key or not bucket:
+ return HTMLResponse('Filebase requires Access Key, Secret Key, and Bucket
')
+ config = {"access_key": access_key, "secret_key": secret_key, "bucket": bucket}
+ elif provider_type == "storj":
+ if not access_key or not secret_key or not bucket:
+ return HTMLResponse('Storj requires Access Key, Secret Key, and Bucket
')
+ config = {"access_key": access_key, "secret_key": secret_key, "bucket": bucket}
+ elif provider_type == "local":
+ if not path:
+ return HTMLResponse('Local storage requires a path
')
+ config = {"path": path}
+
+ # Test the provider connection before saving
+ provider = storage_providers.create_provider(provider_type, {
+ **config,
+ "capacity_gb": capacity_gb
+ })
+ if not provider:
+ return HTMLResponse('Failed to create provider with given config
')
+
+ success, message = await provider.test_connection()
+ if not success:
+ return HTMLResponse(f'Provider connection failed: {message}
')
+
+ # Save to database
+ name = provider_name or f"{provider_type}-{ctx.username}-{len(await database.get_user_storage_by_type(ctx.actor_id, provider_type)) + 1}"
+ storage_id = await database.add_user_storage(
+ actor_id=ctx.actor_id,
+ provider_type=provider_type,
+ provider_name=name,
+ config=config,
+ capacity_gb=capacity_gb,
+ description=description
+ )
+
+ if not storage_id:
+ return HTMLResponse('Failed to save storage provider
')
+
+ return HTMLResponse(f'''
+ Storage provider "{name}" added successfully!
+
+ ''')
+
+
+@app.get("/storage/{storage_id}")
+async def get_storage(storage_id: int, ctx: UserContext = Depends(get_required_user_context)):
+ """Get a specific storage provider."""
+ storage = await database.get_storage_by_id(storage_id)
+ if not storage:
+ raise HTTPException(404, "Storage provider not found")
+ if storage["actor_id"] != ctx.actor_id:
+ raise HTTPException(403, "Not authorized")
+
+ usage = await database.get_storage_usage(storage_id)
+ storage["used_bytes"] = usage["used_bytes"]
+ storage["pin_count"] = usage["pin_count"]
+ storage["donated_gb"] = storage["capacity_gb"] // 2
+
+ return storage
+
+
+@app.patch("/storage/{storage_id}")
+async def update_storage(storage_id: int, req: UpdateStorageRequest, ctx: UserContext = Depends(get_required_user_context)):
+ """Update a storage provider."""
+ storage = await database.get_storage_by_id(storage_id)
+ if not storage:
+ raise HTTPException(404, "Storage provider not found")
+ if storage["actor_id"] != ctx.actor_id:
+ raise HTTPException(403, "Not authorized")
+
+ # If updating config, test the new connection
+ if req.config:
+ existing_config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"])
+ new_config = {**existing_config, **req.config}
+ provider = storage_providers.create_provider(storage["provider_type"], {
+ **new_config,
+ "capacity_gb": req.capacity_gb or storage["capacity_gb"]
+ })
+ if provider:
+ success, message = await provider.test_connection()
+ if not success:
+ raise HTTPException(400, f"Provider connection failed: {message}")
+
+ success = await database.update_user_storage(
+ storage_id,
+ config=req.config,
+ capacity_gb=req.capacity_gb,
+ is_active=req.is_active
+ )
+
+ if not success:
+ raise HTTPException(500, "Failed to update storage provider")
+
+ return {"message": "Storage provider updated"}
+
+
+@app.delete("/storage/{storage_id}")
+async def remove_storage(storage_id: int, request: Request):
+ """Remove a storage provider."""
+ ctx = await get_user_context_from_cookie(request)
+ if not ctx:
+ raise HTTPException(401, "Not authenticated")
+
+ storage = await database.get_storage_by_id(storage_id)
+ if not storage:
+ raise HTTPException(404, "Storage provider not found")
+ if storage["actor_id"] != ctx.actor_id:
+ raise HTTPException(403, "Not authorized")
+
+ success = await database.remove_user_storage(storage_id)
+ if not success:
+ raise HTTPException(500, "Failed to remove storage provider")
+
+ if wants_html(request):
+ return HTMLResponse("")
+
+ return {"message": "Storage provider removed"}
+
+
+@app.post("/storage/{storage_id}/test")
+async def test_storage(storage_id: int, request: Request):
+ """Test storage provider connectivity."""
+ ctx = await get_user_context_from_cookie(request)
+ if not ctx:
+ if wants_html(request):
+ return HTMLResponse('Not authenticated ', status_code=401)
+ raise HTTPException(401, "Not authenticated")
+
+ storage = await database.get_storage_by_id(storage_id)
+ if not storage:
+ if wants_html(request):
+ return HTMLResponse('Storage not found ', status_code=404)
+ raise HTTPException(404, "Storage provider not found")
+ if storage["actor_id"] != ctx.actor_id:
+ if wants_html(request):
+ return HTMLResponse('Not authorized ', status_code=403)
+ raise HTTPException(403, "Not authorized")
+
+ config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"])
+ provider = storage_providers.create_provider(storage["provider_type"], {
+ **config,
+ "capacity_gb": storage["capacity_gb"]
+ })
+
+ if not provider:
+ if wants_html(request):
+ return HTMLResponse('Failed to create provider ')
+ raise HTTPException(500, "Failed to create provider")
+
+ success, message = await provider.test_connection()
+
+ if wants_html(request):
+ if success:
+ return HTMLResponse(f'{message} ')
+ return HTMLResponse(f'{message} ')
+
+ return {"success": success, "message": message}
+
+
+@app.get("/storage/type/{provider_type}")
+async def storage_type_page(provider_type: str, request: Request):
+ """Page for managing storage configs of a specific type."""
+ ctx = await get_user_context_from_cookie(request)
+ if not ctx:
+ return RedirectResponse(url="/auth", status_code=302)
+
+ if provider_type not in STORAGE_PROVIDERS_INFO:
+ raise HTTPException(404, "Invalid provider type")
+
+ storages = await database.get_user_storage_by_type(ctx.actor_id, provider_type)
+
+ # Add usage stats and mask config
+ for storage in storages:
+ usage = await database.get_storage_usage(storage["id"])
+ storage["used_bytes"] = usage["used_bytes"]
+ storage["pin_count"] = usage["pin_count"]
+ # Mask sensitive config keys
+ if storage.get("config"):
+ config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"])
+ masked = {}
+ for k, v in config.items():
+ if "key" in k.lower() or "token" in k.lower() or "secret" in k.lower():
+ masked[k] = v[:4] + "..." + v[-4:] if len(str(v)) > 8 else "****"
+ else:
+ masked[k] = v
+ storage["config_display"] = masked
+
+ info = STORAGE_PROVIDERS_INFO[provider_type]
+ return await ui_storage_type_page(ctx.username, provider_type, info, storages, request)
+
+
+async def ui_storage_page(username: str, storages: list, request: Request) -> HTMLResponse:
+ """Render the main storage management page."""
+ # Count by provider type
+ type_counts = {}
+ for s in storages:
+ ptype = s["provider_type"]
+ type_counts[ptype] = type_counts.get(ptype, 0) + 1
+
+ # Build provider type cards
+ cards = ""
+ for ptype, info in STORAGE_PROVIDERS_INFO.items():
+ count = type_counts.get(ptype, 0)
+ count_badge = f'{count} ' if count > 0 else ""
+ cards += f'''
+
+
+
+
{info["name"]}
+
{info["desc"]}
+
+ {count_badge}
+
+
+ '''
+
+ # Total stats
+ total_capacity = sum(s["capacity_gb"] for s in storages)
+ total_used = sum(s["used_bytes"] for s in storages)
+ total_pins = sum(s["pin_count"] for s in storages)
+
+ html = f'''
+
+
+
+ Storage - Art DAG L1
+
+
+
+
+
+
+
+
+
+ Storage Providers
+
+
+
+
+
Total Storage
+
{len(storages)} providers configured
+
+
+
{total_used / (1024**3):.1f} / {total_capacity} GB
+
{total_pins} items pinned
+
+
+
+
+
+ {cards}
+
+
+
+
+ '''
+ return HTMLResponse(html)
+
+
+async def ui_storage_type_page(username: str, provider_type: str, info: dict, storages: list, request: Request) -> HTMLResponse:
+ """Render storage management page for a specific provider type."""
+ # Build storage list
+ storage_rows = ""
+ for s in storages:
+ used_gb = s["used_bytes"] / (1024**3)
+ status_class = "bg-green-600" if s.get("is_active", True) else "bg-gray-600"
+ status_text = "Active" if s.get("is_active", True) else "Inactive"
+
+ config_display = ""
+ if s.get("config_display"):
+ for k, v in s["config_display"].items():
+ config_display += f'{k}: {v} '
+
+ storage_rows += f'''
+
+
+
{s.get("provider_name", "Unnamed")}
+
+ {status_text}
+ Test
+ Remove
+
+
+
+ {used_gb:.2f} / {s["capacity_gb"]} GB used ({s["pin_count"]} items)
+
+
{config_display}
+
+
+ '''
+
+ if not storages:
+ storage_rows = f'No {info["name"]} configs yet. Add one below.
'
+
+ # Build form fields based on provider type
+ form_fields = ""
+ if provider_type == "pinata":
+ form_fields = '''
+
+
+ '''
+ elif provider_type in ["web3storage", "nftstorage"]:
+ form_fields = '''
+
+ '''
+ elif provider_type == "infura":
+ form_fields = '''
+
+
+ '''
+ elif provider_type in ["filebase", "storj"]:
+ form_fields = '''
+
+
+
+ '''
+ elif provider_type == "local":
+ form_fields = '''
+
+ '''
+
+ html = f'''
+
+
+
+ {info["name"]} Storage - Art DAG L1
+
+
+
+
+
+
+
+
+
+
+
+
+ {storage_rows}
+
+
+
+
Add New {info["name"]} Config
+
+
+
+
+
+
+ '''
+ return HTMLResponse(html)
+
+
# ============ Client Download ============
CLIENT_TARBALL = Path(__file__).parent / "artdag-client.tar.gz"
diff --git a/storage_providers.py b/storage_providers.py
new file mode 100644
index 0000000..46dee08
--- /dev/null
+++ b/storage_providers.py
@@ -0,0 +1,1009 @@
+"""
+Storage provider abstraction for user-attachable storage.
+
+Supports:
+- Pinata (IPFS pinning service)
+- web3.storage (IPFS pinning service)
+- Local filesystem storage
+"""
+
+import hashlib
+import json
+import logging
+import os
+import shutil
+from abc import ABC, abstractmethod
+from pathlib import Path
+from typing import Optional
+
+import requests
+
+logger = logging.getLogger(__name__)
+
+
+class StorageProvider(ABC):
+ """Abstract base class for storage backends."""
+
+ provider_type: str = "unknown"
+
+ @abstractmethod
+ async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
+ """
+ Pin content to storage.
+
+ Args:
+ content_hash: SHA3-256 hash of the content
+ data: Raw bytes to store
+ filename: Optional filename hint
+
+ Returns:
+ IPFS CID or provider-specific ID, or None on failure
+ """
+ pass
+
+ @abstractmethod
+ async def unpin(self, content_hash: str) -> bool:
+ """
+ Unpin content from storage.
+
+ Args:
+ content_hash: SHA3-256 hash of the content
+
+ Returns:
+ True if unpinned successfully
+ """
+ pass
+
+ @abstractmethod
+ async def get(self, content_hash: str) -> Optional[bytes]:
+ """
+ Retrieve content from storage.
+
+ Args:
+ content_hash: SHA3-256 hash of the content
+
+ Returns:
+ Raw bytes or None if not found
+ """
+ pass
+
+ @abstractmethod
+ async def is_pinned(self, content_hash: str) -> bool:
+ """Check if content is pinned in this storage."""
+ pass
+
+ @abstractmethod
+ async def test_connection(self) -> tuple[bool, str]:
+ """
+ Test connectivity to the storage provider.
+
+ Returns:
+ (success, message) tuple
+ """
+ pass
+
+ @abstractmethod
+ def get_usage(self) -> dict:
+ """
+ Get storage usage statistics.
+
+ Returns:
+ {used_bytes, capacity_bytes, pin_count}
+ """
+ pass
+
+
+class PinataProvider(StorageProvider):
+ """Pinata IPFS pinning service provider."""
+
+ provider_type = "pinata"
+
+ def __init__(self, api_key: str, secret_key: str, capacity_gb: int = 1):
+ self.api_key = api_key
+ self.secret_key = secret_key
+ self.capacity_bytes = capacity_gb * 1024**3
+ self.base_url = "https://api.pinata.cloud"
+ self._usage_cache = None
+
+ def _headers(self) -> dict:
+ return {
+ "pinata_api_key": self.api_key,
+ "pinata_secret_api_key": self.secret_key,
+ }
+
+ async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
+ """Pin content to Pinata."""
+ try:
+ import asyncio
+
+ def do_pin():
+ files = {"file": (filename or f"{content_hash[:16]}.bin", data)}
+ metadata = {
+ "name": filename or content_hash[:16],
+ "keyvalues": {"content_hash": content_hash}
+ }
+ response = requests.post(
+ f"{self.base_url}/pinning/pinFileToIPFS",
+ files=files,
+ data={"pinataMetadata": json.dumps(metadata)},
+ headers=self._headers(),
+ timeout=120
+ )
+ response.raise_for_status()
+ return response.json().get("IpfsHash")
+
+ cid = await asyncio.to_thread(do_pin)
+ logger.info(f"Pinata: Pinned {content_hash[:16]}... as {cid}")
+ return cid
+ except Exception as e:
+ logger.error(f"Pinata pin failed: {e}")
+ return None
+
+ async def unpin(self, content_hash: str) -> bool:
+ """Unpin content from Pinata by finding its CID first."""
+ try:
+ import asyncio
+
+ def do_unpin():
+ # First find the pin by content_hash metadata
+ response = requests.get(
+ f"{self.base_url}/data/pinList",
+ params={"metadata[keyvalues][content_hash]": content_hash, "status": "pinned"},
+ headers=self._headers(),
+ timeout=30
+ )
+ response.raise_for_status()
+ pins = response.json().get("rows", [])
+
+ if not pins:
+ return False
+
+ # Unpin each matching CID
+ for pin in pins:
+ cid = pin.get("ipfs_pin_hash")
+ if cid:
+ resp = requests.delete(
+ f"{self.base_url}/pinning/unpin/{cid}",
+ headers=self._headers(),
+ timeout=30
+ )
+ resp.raise_for_status()
+ return True
+
+ result = await asyncio.to_thread(do_unpin)
+ logger.info(f"Pinata: Unpinned {content_hash[:16]}...")
+ return result
+ except Exception as e:
+ logger.error(f"Pinata unpin failed: {e}")
+ return False
+
+ async def get(self, content_hash: str) -> Optional[bytes]:
+ """Get content from Pinata via IPFS gateway."""
+ try:
+ import asyncio
+
+ def do_get():
+ # First find the CID
+ response = requests.get(
+ f"{self.base_url}/data/pinList",
+ params={"metadata[keyvalues][content_hash]": content_hash, "status": "pinned"},
+ headers=self._headers(),
+ timeout=30
+ )
+ response.raise_for_status()
+ pins = response.json().get("rows", [])
+
+ if not pins:
+ return None
+
+ cid = pins[0].get("ipfs_pin_hash")
+ if not cid:
+ return None
+
+ # Fetch from gateway
+ gateway_response = requests.get(
+ f"https://gateway.pinata.cloud/ipfs/{cid}",
+ timeout=120
+ )
+ gateway_response.raise_for_status()
+ return gateway_response.content
+
+ return await asyncio.to_thread(do_get)
+ except Exception as e:
+ logger.error(f"Pinata get failed: {e}")
+ return None
+
+ async def is_pinned(self, content_hash: str) -> bool:
+ """Check if content is pinned on Pinata."""
+ try:
+ import asyncio
+
+ def do_check():
+ response = requests.get(
+ f"{self.base_url}/data/pinList",
+ params={"metadata[keyvalues][content_hash]": content_hash, "status": "pinned"},
+ headers=self._headers(),
+ timeout=30
+ )
+ response.raise_for_status()
+ return len(response.json().get("rows", [])) > 0
+
+ return await asyncio.to_thread(do_check)
+ except Exception:
+ return False
+
+ async def test_connection(self) -> tuple[bool, str]:
+ """Test Pinata API connectivity."""
+ try:
+ import asyncio
+
+ def do_test():
+ response = requests.get(
+ f"{self.base_url}/data/testAuthentication",
+ headers=self._headers(),
+ timeout=10
+ )
+ response.raise_for_status()
+ return True, "Connected to Pinata successfully"
+
+ return await asyncio.to_thread(do_test)
+ except requests.exceptions.HTTPError as e:
+ if e.response.status_code == 401:
+ return False, "Invalid API credentials"
+ return False, f"HTTP error: {e}"
+ except Exception as e:
+ return False, f"Connection failed: {e}"
+
+ def get_usage(self) -> dict:
+ """Get Pinata usage stats."""
+ try:
+ response = requests.get(
+ f"{self.base_url}/data/userPinnedDataTotal",
+ headers=self._headers(),
+ timeout=10
+ )
+ response.raise_for_status()
+ data = response.json()
+ return {
+ "used_bytes": data.get("pin_size_total", 0),
+ "capacity_bytes": self.capacity_bytes,
+ "pin_count": data.get("pin_count", 0)
+ }
+ except Exception:
+ return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0}
+
+
+class Web3StorageProvider(StorageProvider):
+ """web3.storage pinning service provider."""
+
+ provider_type = "web3storage"
+
+ def __init__(self, api_token: str, capacity_gb: int = 1):
+ self.api_token = api_token
+ self.capacity_bytes = capacity_gb * 1024**3
+ self.base_url = "https://api.web3.storage"
+
+ def _headers(self) -> dict:
+ return {"Authorization": f"Bearer {self.api_token}"}
+
+ async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
+ """Pin content to web3.storage."""
+ try:
+ import asyncio
+
+ def do_pin():
+ response = requests.post(
+ f"{self.base_url}/upload",
+ data=data,
+ headers={
+ **self._headers(),
+ "X-Name": filename or content_hash[:16]
+ },
+ timeout=120
+ )
+ response.raise_for_status()
+ return response.json().get("cid")
+
+ cid = await asyncio.to_thread(do_pin)
+ logger.info(f"web3.storage: Pinned {content_hash[:16]}... as {cid}")
+ return cid
+ except Exception as e:
+ logger.error(f"web3.storage pin failed: {e}")
+ return None
+
+ async def unpin(self, content_hash: str) -> bool:
+ """web3.storage doesn't support unpinning - data is stored permanently."""
+ logger.warning("web3.storage: Unpinning not supported (permanent storage)")
+ return False
+
+ async def get(self, content_hash: str) -> Optional[bytes]:
+ """Get content from web3.storage - would need CID mapping."""
+ # web3.storage requires knowing the CID to fetch
+ # For now, return None - we'd need to maintain a mapping
+ return None
+
+ async def is_pinned(self, content_hash: str) -> bool:
+ """Check if content is pinned - would need CID mapping."""
+ return False
+
+ async def test_connection(self) -> tuple[bool, str]:
+ """Test web3.storage API connectivity."""
+ try:
+ import asyncio
+
+ def do_test():
+ response = requests.get(
+ f"{self.base_url}/user/uploads",
+ headers=self._headers(),
+ params={"size": 1},
+ timeout=10
+ )
+ response.raise_for_status()
+ return True, "Connected to web3.storage successfully"
+
+ return await asyncio.to_thread(do_test)
+ except requests.exceptions.HTTPError as e:
+ if e.response.status_code == 401:
+ return False, "Invalid API token"
+ return False, f"HTTP error: {e}"
+ except Exception as e:
+ return False, f"Connection failed: {e}"
+
+ def get_usage(self) -> dict:
+ """Get web3.storage usage stats."""
+ try:
+ response = requests.get(
+ f"{self.base_url}/user/uploads",
+ headers=self._headers(),
+ params={"size": 1000},
+ timeout=30
+ )
+ response.raise_for_status()
+ uploads = response.json()
+ total_size = sum(u.get("dagSize", 0) for u in uploads)
+ return {
+ "used_bytes": total_size,
+ "capacity_bytes": self.capacity_bytes,
+ "pin_count": len(uploads)
+ }
+ except Exception:
+ return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0}
+
+
+class NFTStorageProvider(StorageProvider):
+ """NFT.Storage pinning service provider (free for NFT data)."""
+
+ provider_type = "nftstorage"
+
+ def __init__(self, api_token: str, capacity_gb: int = 5):
+ self.api_token = api_token
+ self.capacity_bytes = capacity_gb * 1024**3
+ self.base_url = "https://api.nft.storage"
+
+ def _headers(self) -> dict:
+ return {"Authorization": f"Bearer {self.api_token}"}
+
+ async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
+ """Pin content to NFT.Storage."""
+ try:
+ import asyncio
+
+ def do_pin():
+ response = requests.post(
+ f"{self.base_url}/upload",
+ data=data,
+ headers={**self._headers(), "Content-Type": "application/octet-stream"},
+ timeout=120
+ )
+ response.raise_for_status()
+ return response.json().get("value", {}).get("cid")
+
+ cid = await asyncio.to_thread(do_pin)
+ logger.info(f"NFT.Storage: Pinned {content_hash[:16]}... as {cid}")
+ return cid
+ except Exception as e:
+ logger.error(f"NFT.Storage pin failed: {e}")
+ return None
+
+ async def unpin(self, content_hash: str) -> bool:
+ """NFT.Storage doesn't support unpinning - data is stored permanently."""
+ logger.warning("NFT.Storage: Unpinning not supported (permanent storage)")
+ return False
+
+ async def get(self, content_hash: str) -> Optional[bytes]:
+ """Get content from NFT.Storage - would need CID mapping."""
+ return None
+
+ async def is_pinned(self, content_hash: str) -> bool:
+ """Check if content is pinned - would need CID mapping."""
+ return False
+
+ async def test_connection(self) -> tuple[bool, str]:
+ """Test NFT.Storage API connectivity."""
+ try:
+ import asyncio
+
+ def do_test():
+ response = requests.get(
+ f"{self.base_url}/",
+ headers=self._headers(),
+ timeout=10
+ )
+ response.raise_for_status()
+ return True, "Connected to NFT.Storage successfully"
+
+ return await asyncio.to_thread(do_test)
+ except requests.exceptions.HTTPError as e:
+ if e.response.status_code == 401:
+ return False, "Invalid API token"
+ return False, f"HTTP error: {e}"
+ except Exception as e:
+ return False, f"Connection failed: {e}"
+
+ def get_usage(self) -> dict:
+ """Get NFT.Storage usage stats."""
+ return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0}
+
+
+class InfuraIPFSProvider(StorageProvider):
+ """Infura IPFS pinning service provider."""
+
+ provider_type = "infura"
+
+ def __init__(self, project_id: str, project_secret: str, capacity_gb: int = 5):
+ self.project_id = project_id
+ self.project_secret = project_secret
+ self.capacity_bytes = capacity_gb * 1024**3
+ self.base_url = "https://ipfs.infura.io:5001/api/v0"
+
+ def _auth(self) -> tuple:
+ return (self.project_id, self.project_secret)
+
+ async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
+ """Pin content to Infura IPFS."""
+ try:
+ import asyncio
+
+ def do_pin():
+ files = {"file": (filename or f"{content_hash[:16]}.bin", data)}
+ response = requests.post(
+ f"{self.base_url}/add",
+ files=files,
+ auth=self._auth(),
+ timeout=120
+ )
+ response.raise_for_status()
+ return response.json().get("Hash")
+
+ cid = await asyncio.to_thread(do_pin)
+ logger.info(f"Infura IPFS: Pinned {content_hash[:16]}... as {cid}")
+ return cid
+ except Exception as e:
+ logger.error(f"Infura IPFS pin failed: {e}")
+ return None
+
+ async def unpin(self, content_hash: str) -> bool:
+ """Unpin content from Infura IPFS."""
+ try:
+ import asyncio
+
+ def do_unpin():
+ response = requests.post(
+ f"{self.base_url}/pin/rm",
+ params={"arg": content_hash},
+ auth=self._auth(),
+ timeout=30
+ )
+ response.raise_for_status()
+ return True
+
+ return await asyncio.to_thread(do_unpin)
+ except Exception as e:
+ logger.error(f"Infura IPFS unpin failed: {e}")
+ return False
+
+ async def get(self, content_hash: str) -> Optional[bytes]:
+ """Get content from Infura IPFS gateway."""
+ try:
+ import asyncio
+
+ def do_get():
+ response = requests.post(
+ f"{self.base_url}/cat",
+ params={"arg": content_hash},
+ auth=self._auth(),
+ timeout=120
+ )
+ response.raise_for_status()
+ return response.content
+
+ return await asyncio.to_thread(do_get)
+ except Exception as e:
+ logger.error(f"Infura IPFS get failed: {e}")
+ return None
+
+ async def is_pinned(self, content_hash: str) -> bool:
+ """Check if content is pinned on Infura IPFS."""
+ try:
+ import asyncio
+
+ def do_check():
+ response = requests.post(
+ f"{self.base_url}/pin/ls",
+ params={"arg": content_hash},
+ auth=self._auth(),
+ timeout=30
+ )
+ return response.status_code == 200
+
+ return await asyncio.to_thread(do_check)
+ except Exception:
+ return False
+
+ async def test_connection(self) -> tuple[bool, str]:
+ """Test Infura IPFS API connectivity."""
+ try:
+ import asyncio
+
+ def do_test():
+ response = requests.post(
+ f"{self.base_url}/id",
+ auth=self._auth(),
+ timeout=10
+ )
+ response.raise_for_status()
+ return True, "Connected to Infura IPFS successfully"
+
+ return await asyncio.to_thread(do_test)
+ except requests.exceptions.HTTPError as e:
+ if e.response.status_code == 401:
+ return False, "Invalid project credentials"
+ return False, f"HTTP error: {e}"
+ except Exception as e:
+ return False, f"Connection failed: {e}"
+
+ def get_usage(self) -> dict:
+ """Get Infura usage stats."""
+ return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0}
+
+
+class FilebaseProvider(StorageProvider):
+ """Filebase S3-compatible IPFS pinning service."""
+
+ provider_type = "filebase"
+
+ def __init__(self, access_key: str, secret_key: str, bucket: str, capacity_gb: int = 5):
+ self.access_key = access_key
+ self.secret_key = secret_key
+ self.bucket = bucket
+ self.capacity_bytes = capacity_gb * 1024**3
+ self.endpoint = "https://s3.filebase.com"
+
+ async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
+ """Pin content to Filebase."""
+ try:
+ import asyncio
+ import boto3
+ from botocore.config import Config
+
+ def do_pin():
+ s3 = boto3.client(
+ 's3',
+ endpoint_url=self.endpoint,
+ aws_access_key_id=self.access_key,
+ aws_secret_access_key=self.secret_key,
+ config=Config(signature_version='s3v4')
+ )
+ key = filename or f"{content_hash[:16]}.bin"
+ s3.put_object(Bucket=self.bucket, Key=key, Body=data)
+ # Get CID from response headers
+ head = s3.head_object(Bucket=self.bucket, Key=key)
+ return head.get('Metadata', {}).get('cid', content_hash)
+
+ cid = await asyncio.to_thread(do_pin)
+ logger.info(f"Filebase: Pinned {content_hash[:16]}... as {cid}")
+ return cid
+ except Exception as e:
+ logger.error(f"Filebase pin failed: {e}")
+ return None
+
+ async def unpin(self, content_hash: str) -> bool:
+ """Remove content from Filebase."""
+ try:
+ import asyncio
+ import boto3
+ from botocore.config import Config
+
+ def do_unpin():
+ s3 = boto3.client(
+ 's3',
+ endpoint_url=self.endpoint,
+ aws_access_key_id=self.access_key,
+ aws_secret_access_key=self.secret_key,
+ config=Config(signature_version='s3v4')
+ )
+ s3.delete_object(Bucket=self.bucket, Key=content_hash)
+ return True
+
+ return await asyncio.to_thread(do_unpin)
+ except Exception as e:
+ logger.error(f"Filebase unpin failed: {e}")
+ return False
+
+ async def get(self, content_hash: str) -> Optional[bytes]:
+ """Get content from Filebase."""
+ try:
+ import asyncio
+ import boto3
+ from botocore.config import Config
+
+ def do_get():
+ s3 = boto3.client(
+ 's3',
+ endpoint_url=self.endpoint,
+ aws_access_key_id=self.access_key,
+ aws_secret_access_key=self.secret_key,
+ config=Config(signature_version='s3v4')
+ )
+ response = s3.get_object(Bucket=self.bucket, Key=content_hash)
+ return response['Body'].read()
+
+ return await asyncio.to_thread(do_get)
+ except Exception as e:
+ logger.error(f"Filebase get failed: {e}")
+ return None
+
+ async def is_pinned(self, content_hash: str) -> bool:
+ """Check if content exists in Filebase."""
+ try:
+ import asyncio
+ import boto3
+ from botocore.config import Config
+
+ def do_check():
+ s3 = boto3.client(
+ 's3',
+ endpoint_url=self.endpoint,
+ aws_access_key_id=self.access_key,
+ aws_secret_access_key=self.secret_key,
+ config=Config(signature_version='s3v4')
+ )
+ s3.head_object(Bucket=self.bucket, Key=content_hash)
+ return True
+
+ return await asyncio.to_thread(do_check)
+ except Exception:
+ return False
+
+ async def test_connection(self) -> tuple[bool, str]:
+ """Test Filebase connectivity."""
+ try:
+ import asyncio
+ import boto3
+ from botocore.config import Config
+
+ def do_test():
+ s3 = boto3.client(
+ 's3',
+ endpoint_url=self.endpoint,
+ aws_access_key_id=self.access_key,
+ aws_secret_access_key=self.secret_key,
+ config=Config(signature_version='s3v4')
+ )
+ s3.head_bucket(Bucket=self.bucket)
+ return True, f"Connected to Filebase bucket '{self.bucket}'"
+
+ return await asyncio.to_thread(do_test)
+ except Exception as e:
+ if "404" in str(e):
+ return False, f"Bucket '{self.bucket}' not found"
+ if "403" in str(e):
+ return False, "Invalid credentials or no access to bucket"
+ return False, f"Connection failed: {e}"
+
+ def get_usage(self) -> dict:
+ """Get Filebase usage stats."""
+ return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0}
+
+
+class StorjProvider(StorageProvider):
+ """Storj decentralized cloud storage (S3-compatible)."""
+
+ provider_type = "storj"
+
+ def __init__(self, access_key: str, secret_key: str, bucket: str, capacity_gb: int = 25):
+ self.access_key = access_key
+ self.secret_key = secret_key
+ self.bucket = bucket
+ self.capacity_bytes = capacity_gb * 1024**3
+ self.endpoint = "https://gateway.storjshare.io"
+
+ async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
+ """Store content on Storj."""
+ try:
+ import asyncio
+ import boto3
+ from botocore.config import Config
+
+ def do_pin():
+ s3 = boto3.client(
+ 's3',
+ endpoint_url=self.endpoint,
+ aws_access_key_id=self.access_key,
+ aws_secret_access_key=self.secret_key,
+ config=Config(signature_version='s3v4')
+ )
+ key = filename or content_hash
+ s3.put_object(Bucket=self.bucket, Key=key, Body=data)
+ return content_hash
+
+ result = await asyncio.to_thread(do_pin)
+ logger.info(f"Storj: Stored {content_hash[:16]}...")
+ return result
+ except Exception as e:
+ logger.error(f"Storj pin failed: {e}")
+ return None
+
+ async def unpin(self, content_hash: str) -> bool:
+ """Remove content from Storj."""
+ try:
+ import asyncio
+ import boto3
+ from botocore.config import Config
+
+ def do_unpin():
+ s3 = boto3.client(
+ 's3',
+ endpoint_url=self.endpoint,
+ aws_access_key_id=self.access_key,
+ aws_secret_access_key=self.secret_key,
+ config=Config(signature_version='s3v4')
+ )
+ s3.delete_object(Bucket=self.bucket, Key=content_hash)
+ return True
+
+ return await asyncio.to_thread(do_unpin)
+ except Exception as e:
+ logger.error(f"Storj unpin failed: {e}")
+ return False
+
+ async def get(self, content_hash: str) -> Optional[bytes]:
+ """Get content from Storj."""
+ try:
+ import asyncio
+ import boto3
+ from botocore.config import Config
+
+ def do_get():
+ s3 = boto3.client(
+ 's3',
+ endpoint_url=self.endpoint,
+ aws_access_key_id=self.access_key,
+ aws_secret_access_key=self.secret_key,
+ config=Config(signature_version='s3v4')
+ )
+ response = s3.get_object(Bucket=self.bucket, Key=content_hash)
+ return response['Body'].read()
+
+ return await asyncio.to_thread(do_get)
+ except Exception as e:
+ logger.error(f"Storj get failed: {e}")
+ return None
+
+ async def is_pinned(self, content_hash: str) -> bool:
+ """Check if content exists on Storj."""
+ try:
+ import asyncio
+ import boto3
+ from botocore.config import Config
+
+ def do_check():
+ s3 = boto3.client(
+ 's3',
+ endpoint_url=self.endpoint,
+ aws_access_key_id=self.access_key,
+ aws_secret_access_key=self.secret_key,
+ config=Config(signature_version='s3v4')
+ )
+ s3.head_object(Bucket=self.bucket, Key=content_hash)
+ return True
+
+ return await asyncio.to_thread(do_check)
+ except Exception:
+ return False
+
+ async def test_connection(self) -> tuple[bool, str]:
+ """Test Storj connectivity."""
+ try:
+ import asyncio
+ import boto3
+ from botocore.config import Config
+
+ def do_test():
+ s3 = boto3.client(
+ 's3',
+ endpoint_url=self.endpoint,
+ aws_access_key_id=self.access_key,
+ aws_secret_access_key=self.secret_key,
+ config=Config(signature_version='s3v4')
+ )
+ s3.head_bucket(Bucket=self.bucket)
+ return True, f"Connected to Storj bucket '{self.bucket}'"
+
+ return await asyncio.to_thread(do_test)
+ except Exception as e:
+ if "404" in str(e):
+ return False, f"Bucket '{self.bucket}' not found"
+ if "403" in str(e):
+ return False, "Invalid credentials or no access to bucket"
+ return False, f"Connection failed: {e}"
+
+ def get_usage(self) -> dict:
+ """Get Storj usage stats."""
+ return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0}
+
+
+class LocalStorageProvider(StorageProvider):
+ """Local filesystem storage provider."""
+
+ provider_type = "local"
+
+ def __init__(self, base_path: str, capacity_gb: int = 10):
+ self.base_path = Path(base_path)
+ self.capacity_bytes = capacity_gb * 1024**3
+ # Create directory if it doesn't exist
+ self.base_path.mkdir(parents=True, exist_ok=True)
+
+ def _get_file_path(self, content_hash: str) -> Path:
+ """Get file path for a content hash (using subdirectories)."""
+ # Use first 2 chars as subdirectory for better filesystem performance
+ subdir = content_hash[:2]
+ return self.base_path / subdir / content_hash
+
+ async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
+ """Store content locally."""
+ try:
+ import asyncio
+
+ def do_store():
+ file_path = self._get_file_path(content_hash)
+ file_path.parent.mkdir(parents=True, exist_ok=True)
+ file_path.write_bytes(data)
+ return content_hash # Use content_hash as ID for local storage
+
+ result = await asyncio.to_thread(do_store)
+ logger.info(f"Local: Stored {content_hash[:16]}...")
+ return result
+ except Exception as e:
+ logger.error(f"Local storage failed: {e}")
+ return None
+
+ async def unpin(self, content_hash: str) -> bool:
+ """Remove content from local storage."""
+ try:
+ import asyncio
+
+ def do_remove():
+ file_path = self._get_file_path(content_hash)
+ if file_path.exists():
+ file_path.unlink()
+ return True
+ return False
+
+ return await asyncio.to_thread(do_remove)
+ except Exception as e:
+ logger.error(f"Local unpin failed: {e}")
+ return False
+
+ async def get(self, content_hash: str) -> Optional[bytes]:
+ """Get content from local storage."""
+ try:
+ import asyncio
+
+ def do_get():
+ file_path = self._get_file_path(content_hash)
+ if file_path.exists():
+ return file_path.read_bytes()
+ return None
+
+ return await asyncio.to_thread(do_get)
+ except Exception as e:
+ logger.error(f"Local get failed: {e}")
+ return None
+
+ async def is_pinned(self, content_hash: str) -> bool:
+ """Check if content exists in local storage."""
+ return self._get_file_path(content_hash).exists()
+
+ async def test_connection(self) -> tuple[bool, str]:
+ """Test local storage is writable."""
+ try:
+ test_file = self.base_path / ".write_test"
+ test_file.write_text("test")
+ test_file.unlink()
+ return True, f"Local storage ready at {self.base_path}"
+ except Exception as e:
+ return False, f"Cannot write to {self.base_path}: {e}"
+
+ def get_usage(self) -> dict:
+ """Get local storage usage stats."""
+ try:
+ total_size = 0
+ file_count = 0
+ for subdir in self.base_path.iterdir():
+ if subdir.is_dir() and len(subdir.name) == 2:
+ for f in subdir.iterdir():
+ if f.is_file():
+ total_size += f.stat().st_size
+ file_count += 1
+ return {
+ "used_bytes": total_size,
+ "capacity_bytes": self.capacity_bytes,
+ "pin_count": file_count
+ }
+ except Exception:
+ return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0}
+
+
+def create_provider(provider_type: str, config: dict) -> Optional[StorageProvider]:
+ """
+ Factory function to create a storage provider from config.
+
+ Args:
+ provider_type: One of 'pinata', 'web3storage', 'nftstorage', 'infura', 'filebase', 'storj', 'local'
+ config: Provider-specific configuration dict
+
+ Returns:
+ StorageProvider instance or None if invalid
+ """
+ try:
+ if provider_type == "pinata":
+ return PinataProvider(
+ api_key=config["api_key"],
+ secret_key=config["secret_key"],
+ capacity_gb=config.get("capacity_gb", 1)
+ )
+ elif provider_type == "web3storage":
+ return Web3StorageProvider(
+ api_token=config["api_token"],
+ capacity_gb=config.get("capacity_gb", 5)
+ )
+ elif provider_type == "nftstorage":
+ return NFTStorageProvider(
+ api_token=config["api_token"],
+ capacity_gb=config.get("capacity_gb", 5)
+ )
+ elif provider_type == "infura":
+ return InfuraIPFSProvider(
+ project_id=config["project_id"],
+ project_secret=config["project_secret"],
+ capacity_gb=config.get("capacity_gb", 5)
+ )
+ elif provider_type == "filebase":
+ return FilebaseProvider(
+ access_key=config["access_key"],
+ secret_key=config["secret_key"],
+ bucket=config["bucket"],
+ capacity_gb=config.get("capacity_gb", 5)
+ )
+ elif provider_type == "storj":
+ return StorjProvider(
+ access_key=config["access_key"],
+ secret_key=config["secret_key"],
+ bucket=config["bucket"],
+ capacity_gb=config.get("capacity_gb", 25)
+ )
+ elif provider_type == "local":
+ return LocalStorageProvider(
+ base_path=config["path"],
+ capacity_gb=config.get("capacity_gb", 10)
+ )
+ else:
+ logger.error(f"Unknown provider type: {provider_type}")
+ return None
+ except KeyError as e:
+ logger.error(f"Missing config key for {provider_type}: {e}")
+ return None
+ except Exception as e:
+ logger.error(f"Failed to create provider {provider_type}: {e}")
+ return None