Add user-attachable storage system

Phase 1 of distributed storage implementation:

Database:
- user_storage table for storage providers (Pinata, web3.storage, local)
- storage_pins table to track what's stored where
- source_url/source_type columns on assets for reconstruction

Storage Providers:
- Abstract StorageProvider base class
- PinataProvider for Pinata IPFS pinning
- Web3StorageProvider for web3.storage
- LocalStorageProvider for filesystem storage
- Factory function create_provider()

API Endpoints:
- GET/POST /storage - list/add storage providers
- GET/PATCH/DELETE /storage/{id} - manage individual providers
- POST /storage/{id}/test - test connectivity

UI:
- /storage page with provider cards
- Add provider form (Pinata, web3.storage, local)
- Test/remove buttons per provider
- Usage stats (capacity, donated, used, pins)

50% donation model: half of user capacity is available for
system use to store shared content across the network.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-09 23:19:10 +00:00
parent 64749af3fc
commit 1e3d1bb65e
3 changed files with 1113 additions and 0 deletions

214
db.py
View File

@@ -117,6 +117,32 @@ CREATE TABLE IF NOT EXISTS revoked_tokens (
expires_at TIMESTAMPTZ NOT NULL
);
-- User storage providers (IPFS pinning services, local storage, etc.)
CREATE TABLE IF NOT EXISTS user_storage (
id SERIAL PRIMARY KEY,
username VARCHAR(255) NOT NULL REFERENCES users(username),
provider_type VARCHAR(50) NOT NULL, -- 'pinata', 'web3storage', 'filebase', 'local'
provider_name VARCHAR(255), -- User-friendly name
config JSONB NOT NULL DEFAULT '{}', -- API keys, endpoints, paths
capacity_gb INTEGER NOT NULL, -- Total capacity user is contributing
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(username, provider_type, provider_name)
);
-- Track what's stored where
CREATE TABLE IF NOT EXISTS storage_pins (
id SERIAL PRIMARY KEY,
content_hash VARCHAR(64) NOT NULL,
storage_id INTEGER NOT NULL REFERENCES user_storage(id) ON DELETE CASCADE,
ipfs_cid VARCHAR(128),
pin_type VARCHAR(20) NOT NULL, -- 'user_content', 'donated', 'system'
size_bytes BIGINT,
pinned_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(content_hash, storage_id)
);
-- Indexes
CREATE INDEX IF NOT EXISTS idx_users_created_at ON users(created_at);
CREATE INDEX IF NOT EXISTS idx_assets_content_hash ON assets(content_hash);
@@ -129,6 +155,20 @@ CREATE INDEX IF NOT EXISTS idx_activities_anchor ON activities(anchor_root);
CREATE INDEX IF NOT EXISTS idx_anchors_created ON anchors(created_at DESC);
CREATE INDEX IF NOT EXISTS idx_followers_username ON followers(username);
CREATE INDEX IF NOT EXISTS idx_revoked_tokens_expires ON revoked_tokens(expires_at);
CREATE INDEX IF NOT EXISTS idx_user_storage_username ON user_storage(username);
CREATE INDEX IF NOT EXISTS idx_storage_pins_hash ON storage_pins(content_hash);
CREATE INDEX IF NOT EXISTS idx_storage_pins_storage ON storage_pins(storage_id);
-- Add source URL columns to assets if they don't exist
DO $$ BEGIN
ALTER TABLE assets ADD COLUMN source_url TEXT;
EXCEPTION WHEN duplicate_column THEN NULL;
END $$;
DO $$ BEGIN
ALTER TABLE assets ADD COLUMN source_type VARCHAR(50);
EXCEPTION WHEN duplicate_column THEN NULL;
END $$;
"""
@@ -772,6 +812,180 @@ async def detach_renderer(username: str, l1_url: str) -> bool:
return "DELETE 1" in result
# ============ User Storage ============
async def get_user_storage(username: str) -> list[dict]:
"""Get all storage providers for a user."""
async with get_connection() as conn:
rows = await conn.fetch(
"""SELECT id, username, provider_type, provider_name, config,
capacity_gb, is_active, created_at, updated_at
FROM user_storage WHERE username = $1
ORDER BY created_at""",
username
)
return [dict(row) for row in rows]
async def get_storage_by_id(storage_id: int) -> Optional[dict]:
"""Get a storage provider by ID."""
async with get_connection() as conn:
row = await conn.fetchrow(
"""SELECT id, username, provider_type, provider_name, config,
capacity_gb, is_active, created_at, updated_at
FROM user_storage WHERE id = $1""",
storage_id
)
return dict(row) if row else None
async def add_user_storage(
username: str,
provider_type: str,
provider_name: str,
config: dict,
capacity_gb: int
) -> Optional[int]:
"""Add a storage provider for a user. Returns storage ID."""
async with get_connection() as conn:
try:
row = await conn.fetchrow(
"""INSERT INTO user_storage (username, provider_type, provider_name, config, capacity_gb)
VALUES ($1, $2, $3, $4, $5)
RETURNING id""",
username, provider_type, provider_name, json.dumps(config), capacity_gb
)
return row["id"] if row else None
except Exception:
return None
async def update_user_storage(
storage_id: int,
config: Optional[dict] = None,
capacity_gb: Optional[int] = None,
is_active: Optional[bool] = None
) -> bool:
"""Update a storage provider."""
updates = []
params = []
param_num = 1
if config is not None:
updates.append(f"config = ${param_num}")
params.append(json.dumps(config))
param_num += 1
if capacity_gb is not None:
updates.append(f"capacity_gb = ${param_num}")
params.append(capacity_gb)
param_num += 1
if is_active is not None:
updates.append(f"is_active = ${param_num}")
params.append(is_active)
param_num += 1
if not updates:
return False
updates.append("updated_at = NOW()")
params.append(storage_id)
async with get_connection() as conn:
result = await conn.execute(
f"UPDATE user_storage SET {', '.join(updates)} WHERE id = ${param_num}",
*params
)
return "UPDATE 1" in result
async def remove_user_storage(storage_id: int) -> bool:
"""Remove a storage provider. Cascades to storage_pins."""
async with get_connection() as conn:
result = await conn.execute(
"DELETE FROM user_storage WHERE id = $1",
storage_id
)
return "DELETE 1" in result
async def get_storage_usage(storage_id: int) -> dict:
"""Get storage usage stats for a provider."""
async with get_connection() as conn:
row = await conn.fetchrow(
"""SELECT
COUNT(*) as pin_count,
COALESCE(SUM(size_bytes), 0) as used_bytes
FROM storage_pins WHERE storage_id = $1""",
storage_id
)
return {"pin_count": row["pin_count"], "used_bytes": row["used_bytes"]}
async def add_storage_pin(
content_hash: str,
storage_id: int,
ipfs_cid: Optional[str],
pin_type: str,
size_bytes: int
) -> Optional[int]:
"""Add a pin record. Returns pin ID."""
async with get_connection() as conn:
try:
row = await conn.fetchrow(
"""INSERT INTO storage_pins (content_hash, storage_id, ipfs_cid, pin_type, size_bytes)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (content_hash, storage_id) DO UPDATE SET
ipfs_cid = EXCLUDED.ipfs_cid,
pin_type = EXCLUDED.pin_type,
size_bytes = EXCLUDED.size_bytes,
pinned_at = NOW()
RETURNING id""",
content_hash, storage_id, ipfs_cid, pin_type, size_bytes
)
return row["id"] if row else None
except Exception:
return None
async def remove_storage_pin(content_hash: str, storage_id: int) -> bool:
"""Remove a pin record."""
async with get_connection() as conn:
result = await conn.execute(
"DELETE FROM storage_pins WHERE content_hash = $1 AND storage_id = $2",
content_hash, storage_id
)
return "DELETE 1" in result
async def get_pins_for_content(content_hash: str) -> list[dict]:
"""Get all storage locations where content is pinned."""
async with get_connection() as conn:
rows = await conn.fetch(
"""SELECT sp.*, us.provider_type, us.provider_name, us.username
FROM storage_pins sp
JOIN user_storage us ON sp.storage_id = us.id
WHERE sp.content_hash = $1""",
content_hash
)
return [dict(row) for row in rows]
async def get_all_active_storage() -> list[dict]:
"""Get all active storage providers (for distributed pinning)."""
async with get_connection() as conn:
rows = await conn.fetch(
"""SELECT us.*,
COALESCE(SUM(sp.size_bytes), 0) as used_bytes,
COUNT(sp.id) as pin_count
FROM user_storage us
LEFT JOIN storage_pins sp ON us.id = sp.storage_id
WHERE us.is_active = true
GROUP BY us.id
ORDER BY us.created_at"""
)
return [dict(row) for row in rows]
# ============ Token Revocation ============
async def revoke_token(token_hash: str, username: str, expires_at) -> bool:

388
server.py
View File

@@ -195,6 +195,27 @@ class UpdateAssetRequest(BaseModel):
ipfs_cid: Optional[str] = None # IPFS content identifier
class AddStorageRequest(BaseModel):
"""Request to add a storage provider."""
provider_type: str # 'pinata', 'web3storage', 'local'
provider_name: Optional[str] = None # User-friendly name
config: dict # Provider-specific config (api_key, path, etc.)
capacity_gb: int # Storage capacity in GB
class UpdateStorageRequest(BaseModel):
"""Request to update a storage provider."""
config: Optional[dict] = None
capacity_gb: Optional[int] = None
is_active: Optional[bool] = None
class SetAssetSourceRequest(BaseModel):
"""Request to set source URL for an asset."""
source_url: str
source_type: str # 'youtube', 'local', 'url'
# ============ Storage (Database) ============
async def load_registry() -> dict:
@@ -324,6 +345,7 @@ def base_html(title: str, content: str, username: str = None) -> str:
<a href="/users" class="text-gray-400 hover:text-white transition-colors">Users</a>
<a href="/anchors/ui" class="text-gray-400 hover:text-white transition-colors">Anchors</a>
<a href="/renderers" class="text-gray-400 hover:text-white transition-colors">Renderers</a>
<a href="/storage" class="text-gray-400 hover:text-white transition-colors">Storage</a>
<a href="/download/client" class="text-gray-400 hover:text-white transition-colors ml-auto" title="Download CLI client">Download Client</a>
</nav>
@@ -3017,6 +3039,372 @@ async def detach_renderer(request: Request):
''')
# ============ User Storage ============
import storage_providers
@app.get("/storage")
async def list_storage(request: Request, user: User = Depends(get_optional_user)):
"""List user's storage providers. HTML for browsers, JSON for API."""
if not user:
if wants_html(request):
return RedirectResponse(url="/login", status_code=302)
raise HTTPException(401, "Authentication required")
storages = await db.get_user_storage(user.username)
# Add usage stats to each storage
for storage in storages:
usage = await db.get_storage_usage(storage["id"])
storage["used_bytes"] = usage["used_bytes"]
storage["pin_count"] = usage["pin_count"]
storage["donated_gb"] = storage["capacity_gb"] // 2
# Mask sensitive config keys for display
if storage.get("config"):
config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"])
masked = {}
for k, v in config.items():
if "key" in k.lower() or "token" in k.lower() or "secret" in k.lower():
masked[k] = v[:4] + "..." + v[-4:] if len(str(v)) > 8 else "****"
else:
masked[k] = v
storage["config_display"] = masked
if wants_html(request):
return await ui_storage_page(user.username, storages, request)
return {"storages": storages}
@app.post("/storage")
async def add_storage(req: AddStorageRequest, user: User = Depends(get_required_user)):
"""Add a storage provider."""
# Validate provider type
if req.provider_type not in ["pinata", "web3storage", "local"]:
raise HTTPException(400, f"Invalid provider type: {req.provider_type}")
# Test the provider connection before saving
provider = storage_providers.create_provider(req.provider_type, {
**req.config,
"capacity_gb": req.capacity_gb
})
if not provider:
raise HTTPException(400, "Failed to create provider with given config")
success, message = await provider.test_connection()
if not success:
raise HTTPException(400, f"Provider connection failed: {message}")
# Save to database
provider_name = req.provider_name or f"{req.provider_type}-{user.username}"
storage_id = await db.add_user_storage(
username=user.username,
provider_type=req.provider_type,
provider_name=provider_name,
config=req.config,
capacity_gb=req.capacity_gb
)
if not storage_id:
raise HTTPException(500, "Failed to save storage provider")
return {"id": storage_id, "message": f"Storage provider added: {provider_name}"}
@app.get("/storage/{storage_id}")
async def get_storage(storage_id: int, user: User = Depends(get_required_user)):
"""Get a specific storage provider."""
storage = await db.get_storage_by_id(storage_id)
if not storage:
raise HTTPException(404, "Storage provider not found")
if storage["username"] != user.username:
raise HTTPException(403, "Not authorized")
usage = await db.get_storage_usage(storage_id)
storage["used_bytes"] = usage["used_bytes"]
storage["pin_count"] = usage["pin_count"]
storage["donated_gb"] = storage["capacity_gb"] // 2
return storage
@app.patch("/storage/{storage_id}")
async def update_storage(storage_id: int, req: UpdateStorageRequest, user: User = Depends(get_required_user)):
"""Update a storage provider."""
storage = await db.get_storage_by_id(storage_id)
if not storage:
raise HTTPException(404, "Storage provider not found")
if storage["username"] != user.username:
raise HTTPException(403, "Not authorized")
# If updating config, test the new connection
if req.config:
existing_config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"])
new_config = {**existing_config, **req.config}
provider = storage_providers.create_provider(storage["provider_type"], {
**new_config,
"capacity_gb": req.capacity_gb or storage["capacity_gb"]
})
if provider:
success, message = await provider.test_connection()
if not success:
raise HTTPException(400, f"Provider connection failed: {message}")
success = await db.update_user_storage(
storage_id,
config=req.config,
capacity_gb=req.capacity_gb,
is_active=req.is_active
)
if not success:
raise HTTPException(500, "Failed to update storage provider")
return {"message": "Storage provider updated"}
@app.delete("/storage/{storage_id}")
async def remove_storage(storage_id: int, user: User = Depends(get_required_user)):
"""Remove a storage provider."""
storage = await db.get_storage_by_id(storage_id)
if not storage:
raise HTTPException(404, "Storage provider not found")
if storage["username"] != user.username:
raise HTTPException(403, "Not authorized")
success = await db.remove_user_storage(storage_id)
if not success:
raise HTTPException(500, "Failed to remove storage provider")
return {"message": "Storage provider removed"}
@app.post("/storage/{storage_id}/test")
async def test_storage(storage_id: int, request: Request, user: User = Depends(get_required_user)):
"""Test storage provider connectivity."""
storage = await db.get_storage_by_id(storage_id)
if not storage:
raise HTTPException(404, "Storage provider not found")
if storage["username"] != user.username:
raise HTTPException(403, "Not authorized")
config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"])
provider = storage_providers.create_provider(storage["provider_type"], {
**config,
"capacity_gb": storage["capacity_gb"]
})
if not provider:
if wants_html(request):
return HTMLResponse('<span class="text-red-400">Failed to create provider</span>')
raise HTTPException(500, "Failed to create provider")
success, message = await provider.test_connection()
if wants_html(request):
if success:
return HTMLResponse(f'<span class="text-green-400">{message}</span>')
return HTMLResponse(f'<span class="text-red-400">{message}</span>')
return {"success": success, "message": message}
async def ui_storage_page(username: str, storages: list, request: Request) -> HTMLResponse:
"""Render storage settings page."""
def format_bytes(b):
if b > 1024**3:
return f"{b / 1024**3:.1f} GB"
if b > 1024**2:
return f"{b / 1024**2:.1f} MB"
if b > 1024:
return f"{b / 1024:.1f} KB"
return f"{b} bytes"
storage_rows = ""
for s in storages:
status_class = "bg-green-600" if s["is_active"] else "bg-gray-600"
status_text = "Active" if s["is_active"] else "Inactive"
config_display = s.get("config_display", {})
config_html = ", ".join(f"{k}: {v}" for k, v in config_display.items() if k != "path")
storage_rows += f'''
<div class="p-4 bg-dark-600 rounded-lg mb-4">
<div class="flex justify-between items-start mb-3">
<div>
<h3 class="font-semibold text-white">{s["provider_name"] or s["provider_type"]}</h3>
<span class="text-sm text-gray-400">{s["provider_type"]}</span>
</div>
<div class="flex items-center gap-2">
<span class="px-2 py-1 {status_class} text-white text-xs rounded-full">{status_text}</span>
<button hx-post="/storage/{s["id"]}/test" hx-target="#test-result-{s["id"]}" hx-swap="innerHTML"
class="px-2 py-1 bg-blue-600 hover:bg-blue-700 text-white text-xs rounded">Test</button>
<button hx-delete="/storage/{s["id"]}" hx-confirm="Remove this storage provider?"
hx-target="closest div.p-4" hx-swap="outerHTML"
class="px-2 py-1 bg-red-600 hover:bg-red-700 text-white text-xs rounded">Remove</button>
</div>
</div>
<div class="grid grid-cols-2 md:grid-cols-4 gap-4 text-sm mb-3">
<div>
<div class="text-gray-400">Capacity</div>
<div class="text-white">{s["capacity_gb"]} GB</div>
</div>
<div>
<div class="text-gray-400">Donated</div>
<div class="text-white">{s["donated_gb"]} GB</div>
</div>
<div>
<div class="text-gray-400">Used</div>
<div class="text-white">{format_bytes(s["used_bytes"])}</div>
</div>
<div>
<div class="text-gray-400">Pins</div>
<div class="text-white">{s["pin_count"]}</div>
</div>
</div>
<div class="text-xs text-gray-500">{config_html}</div>
<div id="test-result-{s["id"]}" class="mt-2 text-sm"></div>
</div>
'''
if not storages:
storage_rows = '<p class="text-gray-400 text-center py-8">No storage providers configured.</p>'
content = f'''
<div class="max-w-4xl mx-auto">
<div class="flex justify-between items-center mb-6">
<h1 class="text-2xl font-bold text-white">Storage Providers</h1>
</div>
<div class="bg-dark-700 rounded-lg p-6 mb-6">
<p class="text-gray-400 mb-4">
Attach your own storage to help power the network. 50% of your capacity is donated to store
shared content, making popular assets more resilient.
</p>
<h2 class="text-lg font-semibold text-white mb-4">Add Storage Provider</h2>
<div class="grid grid-cols-1 md:grid-cols-3 gap-4 mb-4">
<button onclick="showAddForm('pinata')"
class="p-4 bg-dark-600 hover:bg-dark-500 rounded-lg text-left transition-colors">
<div class="font-semibold text-white">Pinata</div>
<div class="text-sm text-gray-400">IPFS pinning service</div>
</button>
<button onclick="showAddForm('web3storage')"
class="p-4 bg-dark-600 hover:bg-dark-500 rounded-lg text-left transition-colors">
<div class="font-semibold text-white">web3.storage</div>
<div class="text-sm text-gray-400">IPFS + Filecoin</div>
</button>
<button onclick="showAddForm('local')"
class="p-4 bg-dark-600 hover:bg-dark-500 rounded-lg text-left transition-colors">
<div class="font-semibold text-white">Local Storage</div>
<div class="text-sm text-gray-400">Your own disk</div>
</button>
</div>
<div id="add-form" class="hidden bg-dark-600 rounded-lg p-4">
<form id="storage-form" hx-post="/storage" hx-target="#add-result" hx-swap="innerHTML">
<input type="hidden" name="provider_type" id="provider_type">
<div id="pinata-fields" class="hidden space-y-4">
<div>
<label class="block text-sm text-gray-400 mb-1">API Key</label>
<input type="text" name="api_key" class="w-full px-3 py-2 bg-dark-700 border border-dark-500 rounded text-white">
</div>
<div>
<label class="block text-sm text-gray-400 mb-1">Secret Key</label>
<input type="password" name="secret_key" class="w-full px-3 py-2 bg-dark-700 border border-dark-500 rounded text-white">
</div>
</div>
<div id="web3storage-fields" class="hidden space-y-4">
<div>
<label class="block text-sm text-gray-400 mb-1">API Token</label>
<input type="password" name="api_token" class="w-full px-3 py-2 bg-dark-700 border border-dark-500 rounded text-white">
</div>
</div>
<div id="local-fields" class="hidden space-y-4">
<div>
<label class="block text-sm text-gray-400 mb-1">Storage Path</label>
<input type="text" name="path" placeholder="/path/to/storage" class="w-full px-3 py-2 bg-dark-700 border border-dark-500 rounded text-white">
</div>
</div>
<div class="space-y-4 mt-4">
<div>
<label class="block text-sm text-gray-400 mb-1">Provider Name (optional)</label>
<input type="text" name="provider_name" placeholder="My Storage" class="w-full px-3 py-2 bg-dark-700 border border-dark-500 rounded text-white">
</div>
<div>
<label class="block text-sm text-gray-400 mb-1">Capacity (GB)</label>
<input type="number" name="capacity_gb" value="5" min="1" class="w-full px-3 py-2 bg-dark-700 border border-dark-500 rounded text-white">
</div>
</div>
<div class="flex justify-end gap-3 mt-4">
<button type="button" onclick="hideAddForm()" class="px-4 py-2 bg-gray-600 hover:bg-gray-700 text-white rounded">Cancel</button>
<button type="submit" class="px-4 py-2 bg-green-600 hover:bg-green-700 text-white rounded">Add Storage</button>
</div>
</form>
<div id="add-result" class="mt-4"></div>
</div>
</div>
<h2 class="text-lg font-semibold text-white mb-4">Your Storage Providers</h2>
<div id="storage-list">
{storage_rows}
</div>
</div>
<script>
function showAddForm(type) {{
document.getElementById('add-form').classList.remove('hidden');
document.getElementById('provider_type').value = type;
// Hide all field groups
document.getElementById('pinata-fields').classList.add('hidden');
document.getElementById('web3storage-fields').classList.add('hidden');
document.getElementById('local-fields').classList.add('hidden');
// Show the relevant fields
document.getElementById(type + '-fields').classList.remove('hidden');
}}
function hideAddForm() {{
document.getElementById('add-form').classList.add('hidden');
}}
// Handle form submission to build proper JSON
document.getElementById('storage-form').addEventListener('htmx:configRequest', function(evt) {{
const formData = new FormData(evt.detail.elt);
const providerType = formData.get('provider_type');
const config = {{}};
if (providerType === 'pinata') {{
config.api_key = formData.get('api_key');
config.secret_key = formData.get('secret_key');
}} else if (providerType === 'web3storage') {{
config.api_token = formData.get('api_token');
}} else if (providerType === 'local') {{
config.path = formData.get('path');
}}
evt.detail.headers['Content-Type'] = 'application/json';
evt.detail.parameters = JSON.stringify({{
provider_type: providerType,
provider_name: formData.get('provider_name') || null,
config: config,
capacity_gb: parseInt(formData.get('capacity_gb'))
}});
}});
</script>
'''
return HTMLResponse(base_html("Storage", content, username))
# ============ Client Download ============
CLIENT_TARBALL = Path(__file__).parent / "artdag-client.tar.gz"

511
storage_providers.py Normal file
View File

@@ -0,0 +1,511 @@
"""
Storage provider abstraction for user-attachable storage.
Supports:
- Pinata (IPFS pinning service)
- web3.storage (IPFS pinning service)
- Local filesystem storage
"""
import hashlib
import json
import logging
import os
import shutil
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Optional
import requests
logger = logging.getLogger(__name__)
class StorageProvider(ABC):
"""Abstract base class for storage backends."""
provider_type: str = "unknown"
@abstractmethod
async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
"""
Pin content to storage.
Args:
content_hash: SHA3-256 hash of the content
data: Raw bytes to store
filename: Optional filename hint
Returns:
IPFS CID or provider-specific ID, or None on failure
"""
pass
@abstractmethod
async def unpin(self, content_hash: str) -> bool:
"""
Unpin content from storage.
Args:
content_hash: SHA3-256 hash of the content
Returns:
True if unpinned successfully
"""
pass
@abstractmethod
async def get(self, content_hash: str) -> Optional[bytes]:
"""
Retrieve content from storage.
Args:
content_hash: SHA3-256 hash of the content
Returns:
Raw bytes or None if not found
"""
pass
@abstractmethod
async def is_pinned(self, content_hash: str) -> bool:
"""Check if content is pinned in this storage."""
pass
@abstractmethod
async def test_connection(self) -> tuple[bool, str]:
"""
Test connectivity to the storage provider.
Returns:
(success, message) tuple
"""
pass
@abstractmethod
def get_usage(self) -> dict:
"""
Get storage usage statistics.
Returns:
{used_bytes, capacity_bytes, pin_count}
"""
pass
class PinataProvider(StorageProvider):
"""Pinata IPFS pinning service provider."""
provider_type = "pinata"
def __init__(self, api_key: str, secret_key: str, capacity_gb: int = 1):
self.api_key = api_key
self.secret_key = secret_key
self.capacity_bytes = capacity_gb * 1024**3
self.base_url = "https://api.pinata.cloud"
self._usage_cache = None
def _headers(self) -> dict:
return {
"pinata_api_key": self.api_key,
"pinata_secret_api_key": self.secret_key,
}
async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
"""Pin content to Pinata."""
try:
import asyncio
def do_pin():
files = {"file": (filename or f"{content_hash[:16]}.bin", data)}
metadata = {
"name": filename or content_hash[:16],
"keyvalues": {"content_hash": content_hash}
}
response = requests.post(
f"{self.base_url}/pinning/pinFileToIPFS",
files=files,
data={"pinataMetadata": json.dumps(metadata)},
headers=self._headers(),
timeout=120
)
response.raise_for_status()
return response.json().get("IpfsHash")
cid = await asyncio.to_thread(do_pin)
logger.info(f"Pinata: Pinned {content_hash[:16]}... as {cid}")
return cid
except Exception as e:
logger.error(f"Pinata pin failed: {e}")
return None
async def unpin(self, content_hash: str) -> bool:
"""Unpin content from Pinata by finding its CID first."""
try:
import asyncio
def do_unpin():
# First find the pin by content_hash metadata
response = requests.get(
f"{self.base_url}/data/pinList",
params={"metadata[keyvalues][content_hash]": content_hash, "status": "pinned"},
headers=self._headers(),
timeout=30
)
response.raise_for_status()
pins = response.json().get("rows", [])
if not pins:
return False
# Unpin each matching CID
for pin in pins:
cid = pin.get("ipfs_pin_hash")
if cid:
resp = requests.delete(
f"{self.base_url}/pinning/unpin/{cid}",
headers=self._headers(),
timeout=30
)
resp.raise_for_status()
return True
result = await asyncio.to_thread(do_unpin)
logger.info(f"Pinata: Unpinned {content_hash[:16]}...")
return result
except Exception as e:
logger.error(f"Pinata unpin failed: {e}")
return False
async def get(self, content_hash: str) -> Optional[bytes]:
"""Get content from Pinata via IPFS gateway."""
try:
import asyncio
def do_get():
# First find the CID
response = requests.get(
f"{self.base_url}/data/pinList",
params={"metadata[keyvalues][content_hash]": content_hash, "status": "pinned"},
headers=self._headers(),
timeout=30
)
response.raise_for_status()
pins = response.json().get("rows", [])
if not pins:
return None
cid = pins[0].get("ipfs_pin_hash")
if not cid:
return None
# Fetch from gateway
gateway_response = requests.get(
f"https://gateway.pinata.cloud/ipfs/{cid}",
timeout=120
)
gateway_response.raise_for_status()
return gateway_response.content
return await asyncio.to_thread(do_get)
except Exception as e:
logger.error(f"Pinata get failed: {e}")
return None
async def is_pinned(self, content_hash: str) -> bool:
"""Check if content is pinned on Pinata."""
try:
import asyncio
def do_check():
response = requests.get(
f"{self.base_url}/data/pinList",
params={"metadata[keyvalues][content_hash]": content_hash, "status": "pinned"},
headers=self._headers(),
timeout=30
)
response.raise_for_status()
return len(response.json().get("rows", [])) > 0
return await asyncio.to_thread(do_check)
except Exception:
return False
async def test_connection(self) -> tuple[bool, str]:
"""Test Pinata API connectivity."""
try:
import asyncio
def do_test():
response = requests.get(
f"{self.base_url}/data/testAuthentication",
headers=self._headers(),
timeout=10
)
response.raise_for_status()
return True, "Connected to Pinata successfully"
return await asyncio.to_thread(do_test)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
return False, "Invalid API credentials"
return False, f"HTTP error: {e}"
except Exception as e:
return False, f"Connection failed: {e}"
def get_usage(self) -> dict:
"""Get Pinata usage stats."""
try:
response = requests.get(
f"{self.base_url}/data/userPinnedDataTotal",
headers=self._headers(),
timeout=10
)
response.raise_for_status()
data = response.json()
return {
"used_bytes": data.get("pin_size_total", 0),
"capacity_bytes": self.capacity_bytes,
"pin_count": data.get("pin_count", 0)
}
except Exception:
return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0}
class Web3StorageProvider(StorageProvider):
"""web3.storage pinning service provider."""
provider_type = "web3storage"
def __init__(self, api_token: str, capacity_gb: int = 1):
self.api_token = api_token
self.capacity_bytes = capacity_gb * 1024**3
self.base_url = "https://api.web3.storage"
def _headers(self) -> dict:
return {"Authorization": f"Bearer {self.api_token}"}
async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
"""Pin content to web3.storage."""
try:
import asyncio
def do_pin():
response = requests.post(
f"{self.base_url}/upload",
data=data,
headers={
**self._headers(),
"X-Name": filename or content_hash[:16]
},
timeout=120
)
response.raise_for_status()
return response.json().get("cid")
cid = await asyncio.to_thread(do_pin)
logger.info(f"web3.storage: Pinned {content_hash[:16]}... as {cid}")
return cid
except Exception as e:
logger.error(f"web3.storage pin failed: {e}")
return None
async def unpin(self, content_hash: str) -> bool:
"""web3.storage doesn't support unpinning - data is stored permanently."""
logger.warning("web3.storage: Unpinning not supported (permanent storage)")
return False
async def get(self, content_hash: str) -> Optional[bytes]:
"""Get content from web3.storage - would need CID mapping."""
# web3.storage requires knowing the CID to fetch
# For now, return None - we'd need to maintain a mapping
return None
async def is_pinned(self, content_hash: str) -> bool:
"""Check if content is pinned - would need CID mapping."""
return False
async def test_connection(self) -> tuple[bool, str]:
"""Test web3.storage API connectivity."""
try:
import asyncio
def do_test():
response = requests.get(
f"{self.base_url}/user/uploads",
headers=self._headers(),
params={"size": 1},
timeout=10
)
response.raise_for_status()
return True, "Connected to web3.storage successfully"
return await asyncio.to_thread(do_test)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
return False, "Invalid API token"
return False, f"HTTP error: {e}"
except Exception as e:
return False, f"Connection failed: {e}"
def get_usage(self) -> dict:
"""Get web3.storage usage stats."""
try:
response = requests.get(
f"{self.base_url}/user/uploads",
headers=self._headers(),
params={"size": 1000},
timeout=30
)
response.raise_for_status()
uploads = response.json()
total_size = sum(u.get("dagSize", 0) for u in uploads)
return {
"used_bytes": total_size,
"capacity_bytes": self.capacity_bytes,
"pin_count": len(uploads)
}
except Exception:
return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0}
class LocalStorageProvider(StorageProvider):
"""Local filesystem storage provider."""
provider_type = "local"
def __init__(self, base_path: str, capacity_gb: int = 10):
self.base_path = Path(base_path)
self.capacity_bytes = capacity_gb * 1024**3
# Create directory if it doesn't exist
self.base_path.mkdir(parents=True, exist_ok=True)
def _get_file_path(self, content_hash: str) -> Path:
"""Get file path for a content hash (using subdirectories)."""
# Use first 2 chars as subdirectory for better filesystem performance
subdir = content_hash[:2]
return self.base_path / subdir / content_hash
async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]:
"""Store content locally."""
try:
import asyncio
def do_store():
file_path = self._get_file_path(content_hash)
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_bytes(data)
return content_hash # Use content_hash as ID for local storage
result = await asyncio.to_thread(do_store)
logger.info(f"Local: Stored {content_hash[:16]}...")
return result
except Exception as e:
logger.error(f"Local storage failed: {e}")
return None
async def unpin(self, content_hash: str) -> bool:
"""Remove content from local storage."""
try:
import asyncio
def do_remove():
file_path = self._get_file_path(content_hash)
if file_path.exists():
file_path.unlink()
return True
return False
return await asyncio.to_thread(do_remove)
except Exception as e:
logger.error(f"Local unpin failed: {e}")
return False
async def get(self, content_hash: str) -> Optional[bytes]:
"""Get content from local storage."""
try:
import asyncio
def do_get():
file_path = self._get_file_path(content_hash)
if file_path.exists():
return file_path.read_bytes()
return None
return await asyncio.to_thread(do_get)
except Exception as e:
logger.error(f"Local get failed: {e}")
return None
async def is_pinned(self, content_hash: str) -> bool:
"""Check if content exists in local storage."""
return self._get_file_path(content_hash).exists()
async def test_connection(self) -> tuple[bool, str]:
"""Test local storage is writable."""
try:
test_file = self.base_path / ".write_test"
test_file.write_text("test")
test_file.unlink()
return True, f"Local storage ready at {self.base_path}"
except Exception as e:
return False, f"Cannot write to {self.base_path}: {e}"
def get_usage(self) -> dict:
"""Get local storage usage stats."""
try:
total_size = 0
file_count = 0
for subdir in self.base_path.iterdir():
if subdir.is_dir() and len(subdir.name) == 2:
for f in subdir.iterdir():
if f.is_file():
total_size += f.stat().st_size
file_count += 1
return {
"used_bytes": total_size,
"capacity_bytes": self.capacity_bytes,
"pin_count": file_count
}
except Exception:
return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0}
def create_provider(provider_type: str, config: dict) -> Optional[StorageProvider]:
"""
Factory function to create a storage provider from config.
Args:
provider_type: 'pinata', 'web3storage', or 'local'
config: Provider-specific configuration dict
Returns:
StorageProvider instance or None if invalid
"""
try:
if provider_type == "pinata":
return PinataProvider(
api_key=config["api_key"],
secret_key=config["secret_key"],
capacity_gb=config.get("capacity_gb", 1)
)
elif provider_type == "web3storage":
return Web3StorageProvider(
api_token=config["api_token"],
capacity_gb=config.get("capacity_gb", 1)
)
elif provider_type == "local":
return LocalStorageProvider(
base_path=config["path"],
capacity_gb=config.get("capacity_gb", 10)
)
else:
logger.error(f"Unknown provider type: {provider_type}")
return None
except KeyError as e:
logger.error(f"Missing config key for {provider_type}: {e}")
return None
except Exception as e:
logger.error(f"Failed to create provider {provider_type}: {e}")
return None