diff --git a/db.py b/db.py index bdbb012..a6d51d5 100644 --- a/db.py +++ b/db.py @@ -117,6 +117,32 @@ CREATE TABLE IF NOT EXISTS revoked_tokens ( expires_at TIMESTAMPTZ NOT NULL ); +-- User storage providers (IPFS pinning services, local storage, etc.) +CREATE TABLE IF NOT EXISTS user_storage ( + id SERIAL PRIMARY KEY, + username VARCHAR(255) NOT NULL REFERENCES users(username), + provider_type VARCHAR(50) NOT NULL, -- 'pinata', 'web3storage', 'filebase', 'local' + provider_name VARCHAR(255), -- User-friendly name + config JSONB NOT NULL DEFAULT '{}', -- API keys, endpoints, paths + capacity_gb INTEGER NOT NULL, -- Total capacity user is contributing + is_active BOOLEAN DEFAULT true, + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW(), + UNIQUE(username, provider_type, provider_name) +); + +-- Track what's stored where +CREATE TABLE IF NOT EXISTS storage_pins ( + id SERIAL PRIMARY KEY, + content_hash VARCHAR(64) NOT NULL, + storage_id INTEGER NOT NULL REFERENCES user_storage(id) ON DELETE CASCADE, + ipfs_cid VARCHAR(128), + pin_type VARCHAR(20) NOT NULL, -- 'user_content', 'donated', 'system' + size_bytes BIGINT, + pinned_at TIMESTAMPTZ DEFAULT NOW(), + UNIQUE(content_hash, storage_id) +); + -- Indexes CREATE INDEX IF NOT EXISTS idx_users_created_at ON users(created_at); CREATE INDEX IF NOT EXISTS idx_assets_content_hash ON assets(content_hash); @@ -129,6 +155,20 @@ CREATE INDEX IF NOT EXISTS idx_activities_anchor ON activities(anchor_root); CREATE INDEX IF NOT EXISTS idx_anchors_created ON anchors(created_at DESC); CREATE INDEX IF NOT EXISTS idx_followers_username ON followers(username); CREATE INDEX IF NOT EXISTS idx_revoked_tokens_expires ON revoked_tokens(expires_at); +CREATE INDEX IF NOT EXISTS idx_user_storage_username ON user_storage(username); +CREATE INDEX IF NOT EXISTS idx_storage_pins_hash ON storage_pins(content_hash); +CREATE INDEX IF NOT EXISTS idx_storage_pins_storage ON storage_pins(storage_id); + +-- Add source URL columns to assets if they don't exist +DO $$ BEGIN + ALTER TABLE assets ADD COLUMN source_url TEXT; +EXCEPTION WHEN duplicate_column THEN NULL; +END $$; + +DO $$ BEGIN + ALTER TABLE assets ADD COLUMN source_type VARCHAR(50); +EXCEPTION WHEN duplicate_column THEN NULL; +END $$; """ @@ -772,6 +812,180 @@ async def detach_renderer(username: str, l1_url: str) -> bool: return "DELETE 1" in result +# ============ User Storage ============ + +async def get_user_storage(username: str) -> list[dict]: + """Get all storage providers for a user.""" + async with get_connection() as conn: + rows = await conn.fetch( + """SELECT id, username, provider_type, provider_name, config, + capacity_gb, is_active, created_at, updated_at + FROM user_storage WHERE username = $1 + ORDER BY created_at""", + username + ) + return [dict(row) for row in rows] + + +async def get_storage_by_id(storage_id: int) -> Optional[dict]: + """Get a storage provider by ID.""" + async with get_connection() as conn: + row = await conn.fetchrow( + """SELECT id, username, provider_type, provider_name, config, + capacity_gb, is_active, created_at, updated_at + FROM user_storage WHERE id = $1""", + storage_id + ) + return dict(row) if row else None + + +async def add_user_storage( + username: str, + provider_type: str, + provider_name: str, + config: dict, + capacity_gb: int +) -> Optional[int]: + """Add a storage provider for a user. Returns storage ID.""" + async with get_connection() as conn: + try: + row = await conn.fetchrow( + """INSERT INTO user_storage (username, provider_type, provider_name, config, capacity_gb) + VALUES ($1, $2, $3, $4, $5) + RETURNING id""", + username, provider_type, provider_name, json.dumps(config), capacity_gb + ) + return row["id"] if row else None + except Exception: + return None + + +async def update_user_storage( + storage_id: int, + config: Optional[dict] = None, + capacity_gb: Optional[int] = None, + is_active: Optional[bool] = None +) -> bool: + """Update a storage provider.""" + updates = [] + params = [] + param_num = 1 + + if config is not None: + updates.append(f"config = ${param_num}") + params.append(json.dumps(config)) + param_num += 1 + if capacity_gb is not None: + updates.append(f"capacity_gb = ${param_num}") + params.append(capacity_gb) + param_num += 1 + if is_active is not None: + updates.append(f"is_active = ${param_num}") + params.append(is_active) + param_num += 1 + + if not updates: + return False + + updates.append("updated_at = NOW()") + params.append(storage_id) + + async with get_connection() as conn: + result = await conn.execute( + f"UPDATE user_storage SET {', '.join(updates)} WHERE id = ${param_num}", + *params + ) + return "UPDATE 1" in result + + +async def remove_user_storage(storage_id: int) -> bool: + """Remove a storage provider. Cascades to storage_pins.""" + async with get_connection() as conn: + result = await conn.execute( + "DELETE FROM user_storage WHERE id = $1", + storage_id + ) + return "DELETE 1" in result + + +async def get_storage_usage(storage_id: int) -> dict: + """Get storage usage stats for a provider.""" + async with get_connection() as conn: + row = await conn.fetchrow( + """SELECT + COUNT(*) as pin_count, + COALESCE(SUM(size_bytes), 0) as used_bytes + FROM storage_pins WHERE storage_id = $1""", + storage_id + ) + return {"pin_count": row["pin_count"], "used_bytes": row["used_bytes"]} + + +async def add_storage_pin( + content_hash: str, + storage_id: int, + ipfs_cid: Optional[str], + pin_type: str, + size_bytes: int +) -> Optional[int]: + """Add a pin record. Returns pin ID.""" + async with get_connection() as conn: + try: + row = await conn.fetchrow( + """INSERT INTO storage_pins (content_hash, storage_id, ipfs_cid, pin_type, size_bytes) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (content_hash, storage_id) DO UPDATE SET + ipfs_cid = EXCLUDED.ipfs_cid, + pin_type = EXCLUDED.pin_type, + size_bytes = EXCLUDED.size_bytes, + pinned_at = NOW() + RETURNING id""", + content_hash, storage_id, ipfs_cid, pin_type, size_bytes + ) + return row["id"] if row else None + except Exception: + return None + + +async def remove_storage_pin(content_hash: str, storage_id: int) -> bool: + """Remove a pin record.""" + async with get_connection() as conn: + result = await conn.execute( + "DELETE FROM storage_pins WHERE content_hash = $1 AND storage_id = $2", + content_hash, storage_id + ) + return "DELETE 1" in result + + +async def get_pins_for_content(content_hash: str) -> list[dict]: + """Get all storage locations where content is pinned.""" + async with get_connection() as conn: + rows = await conn.fetch( + """SELECT sp.*, us.provider_type, us.provider_name, us.username + FROM storage_pins sp + JOIN user_storage us ON sp.storage_id = us.id + WHERE sp.content_hash = $1""", + content_hash + ) + return [dict(row) for row in rows] + + +async def get_all_active_storage() -> list[dict]: + """Get all active storage providers (for distributed pinning).""" + async with get_connection() as conn: + rows = await conn.fetch( + """SELECT us.*, + COALESCE(SUM(sp.size_bytes), 0) as used_bytes, + COUNT(sp.id) as pin_count + FROM user_storage us + LEFT JOIN storage_pins sp ON us.id = sp.storage_id + WHERE us.is_active = true + GROUP BY us.id + ORDER BY us.created_at""" + ) + return [dict(row) for row in rows] + + # ============ Token Revocation ============ async def revoke_token(token_hash: str, username: str, expires_at) -> bool: diff --git a/server.py b/server.py index 4646d0d..75b6a8a 100644 --- a/server.py +++ b/server.py @@ -195,6 +195,27 @@ class UpdateAssetRequest(BaseModel): ipfs_cid: Optional[str] = None # IPFS content identifier +class AddStorageRequest(BaseModel): + """Request to add a storage provider.""" + provider_type: str # 'pinata', 'web3storage', 'local' + provider_name: Optional[str] = None # User-friendly name + config: dict # Provider-specific config (api_key, path, etc.) + capacity_gb: int # Storage capacity in GB + + +class UpdateStorageRequest(BaseModel): + """Request to update a storage provider.""" + config: Optional[dict] = None + capacity_gb: Optional[int] = None + is_active: Optional[bool] = None + + +class SetAssetSourceRequest(BaseModel): + """Request to set source URL for an asset.""" + source_url: str + source_type: str # 'youtube', 'local', 'url' + + # ============ Storage (Database) ============ async def load_registry() -> dict: @@ -324,6 +345,7 @@ def base_html(title: str, content: str, username: str = None) -> str: Users Anchors Renderers + Storage Download Client @@ -3017,6 +3039,372 @@ async def detach_renderer(request: Request): ''') +# ============ User Storage ============ + +import storage_providers + + +@app.get("/storage") +async def list_storage(request: Request, user: User = Depends(get_optional_user)): + """List user's storage providers. HTML for browsers, JSON for API.""" + if not user: + if wants_html(request): + return RedirectResponse(url="/login", status_code=302) + raise HTTPException(401, "Authentication required") + + storages = await db.get_user_storage(user.username) + + # Add usage stats to each storage + for storage in storages: + usage = await db.get_storage_usage(storage["id"]) + storage["used_bytes"] = usage["used_bytes"] + storage["pin_count"] = usage["pin_count"] + storage["donated_gb"] = storage["capacity_gb"] // 2 + # Mask sensitive config keys for display + if storage.get("config"): + config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"]) + masked = {} + for k, v in config.items(): + if "key" in k.lower() or "token" in k.lower() or "secret" in k.lower(): + masked[k] = v[:4] + "..." + v[-4:] if len(str(v)) > 8 else "****" + else: + masked[k] = v + storage["config_display"] = masked + + if wants_html(request): + return await ui_storage_page(user.username, storages, request) + + return {"storages": storages} + + +@app.post("/storage") +async def add_storage(req: AddStorageRequest, user: User = Depends(get_required_user)): + """Add a storage provider.""" + # Validate provider type + if req.provider_type not in ["pinata", "web3storage", "local"]: + raise HTTPException(400, f"Invalid provider type: {req.provider_type}") + + # Test the provider connection before saving + provider = storage_providers.create_provider(req.provider_type, { + **req.config, + "capacity_gb": req.capacity_gb + }) + if not provider: + raise HTTPException(400, "Failed to create provider with given config") + + success, message = await provider.test_connection() + if not success: + raise HTTPException(400, f"Provider connection failed: {message}") + + # Save to database + provider_name = req.provider_name or f"{req.provider_type}-{user.username}" + storage_id = await db.add_user_storage( + username=user.username, + provider_type=req.provider_type, + provider_name=provider_name, + config=req.config, + capacity_gb=req.capacity_gb + ) + + if not storage_id: + raise HTTPException(500, "Failed to save storage provider") + + return {"id": storage_id, "message": f"Storage provider added: {provider_name}"} + + +@app.get("/storage/{storage_id}") +async def get_storage(storage_id: int, user: User = Depends(get_required_user)): + """Get a specific storage provider.""" + storage = await db.get_storage_by_id(storage_id) + if not storage: + raise HTTPException(404, "Storage provider not found") + if storage["username"] != user.username: + raise HTTPException(403, "Not authorized") + + usage = await db.get_storage_usage(storage_id) + storage["used_bytes"] = usage["used_bytes"] + storage["pin_count"] = usage["pin_count"] + storage["donated_gb"] = storage["capacity_gb"] // 2 + + return storage + + +@app.patch("/storage/{storage_id}") +async def update_storage(storage_id: int, req: UpdateStorageRequest, user: User = Depends(get_required_user)): + """Update a storage provider.""" + storage = await db.get_storage_by_id(storage_id) + if not storage: + raise HTTPException(404, "Storage provider not found") + if storage["username"] != user.username: + raise HTTPException(403, "Not authorized") + + # If updating config, test the new connection + if req.config: + existing_config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"]) + new_config = {**existing_config, **req.config} + provider = storage_providers.create_provider(storage["provider_type"], { + **new_config, + "capacity_gb": req.capacity_gb or storage["capacity_gb"] + }) + if provider: + success, message = await provider.test_connection() + if not success: + raise HTTPException(400, f"Provider connection failed: {message}") + + success = await db.update_user_storage( + storage_id, + config=req.config, + capacity_gb=req.capacity_gb, + is_active=req.is_active + ) + + if not success: + raise HTTPException(500, "Failed to update storage provider") + + return {"message": "Storage provider updated"} + + +@app.delete("/storage/{storage_id}") +async def remove_storage(storage_id: int, user: User = Depends(get_required_user)): + """Remove a storage provider.""" + storage = await db.get_storage_by_id(storage_id) + if not storage: + raise HTTPException(404, "Storage provider not found") + if storage["username"] != user.username: + raise HTTPException(403, "Not authorized") + + success = await db.remove_user_storage(storage_id) + if not success: + raise HTTPException(500, "Failed to remove storage provider") + + return {"message": "Storage provider removed"} + + +@app.post("/storage/{storage_id}/test") +async def test_storage(storage_id: int, request: Request, user: User = Depends(get_required_user)): + """Test storage provider connectivity.""" + storage = await db.get_storage_by_id(storage_id) + if not storage: + raise HTTPException(404, "Storage provider not found") + if storage["username"] != user.username: + raise HTTPException(403, "Not authorized") + + config = storage["config"] if isinstance(storage["config"], dict) else json.loads(storage["config"]) + provider = storage_providers.create_provider(storage["provider_type"], { + **config, + "capacity_gb": storage["capacity_gb"] + }) + + if not provider: + if wants_html(request): + return HTMLResponse('Failed to create provider') + raise HTTPException(500, "Failed to create provider") + + success, message = await provider.test_connection() + + if wants_html(request): + if success: + return HTMLResponse(f'{message}') + return HTMLResponse(f'{message}') + + return {"success": success, "message": message} + + +async def ui_storage_page(username: str, storages: list, request: Request) -> HTMLResponse: + """Render storage settings page.""" + + def format_bytes(b): + if b > 1024**3: + return f"{b / 1024**3:.1f} GB" + if b > 1024**2: + return f"{b / 1024**2:.1f} MB" + if b > 1024: + return f"{b / 1024:.1f} KB" + return f"{b} bytes" + + storage_rows = "" + for s in storages: + status_class = "bg-green-600" if s["is_active"] else "bg-gray-600" + status_text = "Active" if s["is_active"] else "Inactive" + config_display = s.get("config_display", {}) + config_html = ", ".join(f"{k}: {v}" for k, v in config_display.items() if k != "path") + + storage_rows += f''' +
+
+
+

{s["provider_name"] or s["provider_type"]}

+ {s["provider_type"]} +
+
+ {status_text} + + +
+
+
+
+
Capacity
+
{s["capacity_gb"]} GB
+
+
+
Donated
+
{s["donated_gb"]} GB
+
+
+
Used
+
{format_bytes(s["used_bytes"])}
+
+
+
Pins
+
{s["pin_count"]}
+
+
+
{config_html}
+
+
+ ''' + + if not storages: + storage_rows = '

No storage providers configured.

' + + content = f''' +
+
+

Storage Providers

+
+ +
+

+ Attach your own storage to help power the network. 50% of your capacity is donated to store + shared content, making popular assets more resilient. +

+ +

Add Storage Provider

+ +
+ + + +
+ + +
+ +

Your Storage Providers

+
+ {storage_rows} +
+
+ + + ''' + + return HTMLResponse(base_html("Storage", content, username)) + + # ============ Client Download ============ CLIENT_TARBALL = Path(__file__).parent / "artdag-client.tar.gz" diff --git a/storage_providers.py b/storage_providers.py new file mode 100644 index 0000000..2088afe --- /dev/null +++ b/storage_providers.py @@ -0,0 +1,511 @@ +""" +Storage provider abstraction for user-attachable storage. + +Supports: +- Pinata (IPFS pinning service) +- web3.storage (IPFS pinning service) +- Local filesystem storage +""" + +import hashlib +import json +import logging +import os +import shutil +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Optional + +import requests + +logger = logging.getLogger(__name__) + + +class StorageProvider(ABC): + """Abstract base class for storage backends.""" + + provider_type: str = "unknown" + + @abstractmethod + async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]: + """ + Pin content to storage. + + Args: + content_hash: SHA3-256 hash of the content + data: Raw bytes to store + filename: Optional filename hint + + Returns: + IPFS CID or provider-specific ID, or None on failure + """ + pass + + @abstractmethod + async def unpin(self, content_hash: str) -> bool: + """ + Unpin content from storage. + + Args: + content_hash: SHA3-256 hash of the content + + Returns: + True if unpinned successfully + """ + pass + + @abstractmethod + async def get(self, content_hash: str) -> Optional[bytes]: + """ + Retrieve content from storage. + + Args: + content_hash: SHA3-256 hash of the content + + Returns: + Raw bytes or None if not found + """ + pass + + @abstractmethod + async def is_pinned(self, content_hash: str) -> bool: + """Check if content is pinned in this storage.""" + pass + + @abstractmethod + async def test_connection(self) -> tuple[bool, str]: + """ + Test connectivity to the storage provider. + + Returns: + (success, message) tuple + """ + pass + + @abstractmethod + def get_usage(self) -> dict: + """ + Get storage usage statistics. + + Returns: + {used_bytes, capacity_bytes, pin_count} + """ + pass + + +class PinataProvider(StorageProvider): + """Pinata IPFS pinning service provider.""" + + provider_type = "pinata" + + def __init__(self, api_key: str, secret_key: str, capacity_gb: int = 1): + self.api_key = api_key + self.secret_key = secret_key + self.capacity_bytes = capacity_gb * 1024**3 + self.base_url = "https://api.pinata.cloud" + self._usage_cache = None + + def _headers(self) -> dict: + return { + "pinata_api_key": self.api_key, + "pinata_secret_api_key": self.secret_key, + } + + async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]: + """Pin content to Pinata.""" + try: + import asyncio + + def do_pin(): + files = {"file": (filename or f"{content_hash[:16]}.bin", data)} + metadata = { + "name": filename or content_hash[:16], + "keyvalues": {"content_hash": content_hash} + } + response = requests.post( + f"{self.base_url}/pinning/pinFileToIPFS", + files=files, + data={"pinataMetadata": json.dumps(metadata)}, + headers=self._headers(), + timeout=120 + ) + response.raise_for_status() + return response.json().get("IpfsHash") + + cid = await asyncio.to_thread(do_pin) + logger.info(f"Pinata: Pinned {content_hash[:16]}... as {cid}") + return cid + except Exception as e: + logger.error(f"Pinata pin failed: {e}") + return None + + async def unpin(self, content_hash: str) -> bool: + """Unpin content from Pinata by finding its CID first.""" + try: + import asyncio + + def do_unpin(): + # First find the pin by content_hash metadata + response = requests.get( + f"{self.base_url}/data/pinList", + params={"metadata[keyvalues][content_hash]": content_hash, "status": "pinned"}, + headers=self._headers(), + timeout=30 + ) + response.raise_for_status() + pins = response.json().get("rows", []) + + if not pins: + return False + + # Unpin each matching CID + for pin in pins: + cid = pin.get("ipfs_pin_hash") + if cid: + resp = requests.delete( + f"{self.base_url}/pinning/unpin/{cid}", + headers=self._headers(), + timeout=30 + ) + resp.raise_for_status() + return True + + result = await asyncio.to_thread(do_unpin) + logger.info(f"Pinata: Unpinned {content_hash[:16]}...") + return result + except Exception as e: + logger.error(f"Pinata unpin failed: {e}") + return False + + async def get(self, content_hash: str) -> Optional[bytes]: + """Get content from Pinata via IPFS gateway.""" + try: + import asyncio + + def do_get(): + # First find the CID + response = requests.get( + f"{self.base_url}/data/pinList", + params={"metadata[keyvalues][content_hash]": content_hash, "status": "pinned"}, + headers=self._headers(), + timeout=30 + ) + response.raise_for_status() + pins = response.json().get("rows", []) + + if not pins: + return None + + cid = pins[0].get("ipfs_pin_hash") + if not cid: + return None + + # Fetch from gateway + gateway_response = requests.get( + f"https://gateway.pinata.cloud/ipfs/{cid}", + timeout=120 + ) + gateway_response.raise_for_status() + return gateway_response.content + + return await asyncio.to_thread(do_get) + except Exception as e: + logger.error(f"Pinata get failed: {e}") + return None + + async def is_pinned(self, content_hash: str) -> bool: + """Check if content is pinned on Pinata.""" + try: + import asyncio + + def do_check(): + response = requests.get( + f"{self.base_url}/data/pinList", + params={"metadata[keyvalues][content_hash]": content_hash, "status": "pinned"}, + headers=self._headers(), + timeout=30 + ) + response.raise_for_status() + return len(response.json().get("rows", [])) > 0 + + return await asyncio.to_thread(do_check) + except Exception: + return False + + async def test_connection(self) -> tuple[bool, str]: + """Test Pinata API connectivity.""" + try: + import asyncio + + def do_test(): + response = requests.get( + f"{self.base_url}/data/testAuthentication", + headers=self._headers(), + timeout=10 + ) + response.raise_for_status() + return True, "Connected to Pinata successfully" + + return await asyncio.to_thread(do_test) + except requests.exceptions.HTTPError as e: + if e.response.status_code == 401: + return False, "Invalid API credentials" + return False, f"HTTP error: {e}" + except Exception as e: + return False, f"Connection failed: {e}" + + def get_usage(self) -> dict: + """Get Pinata usage stats.""" + try: + response = requests.get( + f"{self.base_url}/data/userPinnedDataTotal", + headers=self._headers(), + timeout=10 + ) + response.raise_for_status() + data = response.json() + return { + "used_bytes": data.get("pin_size_total", 0), + "capacity_bytes": self.capacity_bytes, + "pin_count": data.get("pin_count", 0) + } + except Exception: + return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0} + + +class Web3StorageProvider(StorageProvider): + """web3.storage pinning service provider.""" + + provider_type = "web3storage" + + def __init__(self, api_token: str, capacity_gb: int = 1): + self.api_token = api_token + self.capacity_bytes = capacity_gb * 1024**3 + self.base_url = "https://api.web3.storage" + + def _headers(self) -> dict: + return {"Authorization": f"Bearer {self.api_token}"} + + async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]: + """Pin content to web3.storage.""" + try: + import asyncio + + def do_pin(): + response = requests.post( + f"{self.base_url}/upload", + data=data, + headers={ + **self._headers(), + "X-Name": filename or content_hash[:16] + }, + timeout=120 + ) + response.raise_for_status() + return response.json().get("cid") + + cid = await asyncio.to_thread(do_pin) + logger.info(f"web3.storage: Pinned {content_hash[:16]}... as {cid}") + return cid + except Exception as e: + logger.error(f"web3.storage pin failed: {e}") + return None + + async def unpin(self, content_hash: str) -> bool: + """web3.storage doesn't support unpinning - data is stored permanently.""" + logger.warning("web3.storage: Unpinning not supported (permanent storage)") + return False + + async def get(self, content_hash: str) -> Optional[bytes]: + """Get content from web3.storage - would need CID mapping.""" + # web3.storage requires knowing the CID to fetch + # For now, return None - we'd need to maintain a mapping + return None + + async def is_pinned(self, content_hash: str) -> bool: + """Check if content is pinned - would need CID mapping.""" + return False + + async def test_connection(self) -> tuple[bool, str]: + """Test web3.storage API connectivity.""" + try: + import asyncio + + def do_test(): + response = requests.get( + f"{self.base_url}/user/uploads", + headers=self._headers(), + params={"size": 1}, + timeout=10 + ) + response.raise_for_status() + return True, "Connected to web3.storage successfully" + + return await asyncio.to_thread(do_test) + except requests.exceptions.HTTPError as e: + if e.response.status_code == 401: + return False, "Invalid API token" + return False, f"HTTP error: {e}" + except Exception as e: + return False, f"Connection failed: {e}" + + def get_usage(self) -> dict: + """Get web3.storage usage stats.""" + try: + response = requests.get( + f"{self.base_url}/user/uploads", + headers=self._headers(), + params={"size": 1000}, + timeout=30 + ) + response.raise_for_status() + uploads = response.json() + total_size = sum(u.get("dagSize", 0) for u in uploads) + return { + "used_bytes": total_size, + "capacity_bytes": self.capacity_bytes, + "pin_count": len(uploads) + } + except Exception: + return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0} + + +class LocalStorageProvider(StorageProvider): + """Local filesystem storage provider.""" + + provider_type = "local" + + def __init__(self, base_path: str, capacity_gb: int = 10): + self.base_path = Path(base_path) + self.capacity_bytes = capacity_gb * 1024**3 + # Create directory if it doesn't exist + self.base_path.mkdir(parents=True, exist_ok=True) + + def _get_file_path(self, content_hash: str) -> Path: + """Get file path for a content hash (using subdirectories).""" + # Use first 2 chars as subdirectory for better filesystem performance + subdir = content_hash[:2] + return self.base_path / subdir / content_hash + + async def pin(self, content_hash: str, data: bytes, filename: Optional[str] = None) -> Optional[str]: + """Store content locally.""" + try: + import asyncio + + def do_store(): + file_path = self._get_file_path(content_hash) + file_path.parent.mkdir(parents=True, exist_ok=True) + file_path.write_bytes(data) + return content_hash # Use content_hash as ID for local storage + + result = await asyncio.to_thread(do_store) + logger.info(f"Local: Stored {content_hash[:16]}...") + return result + except Exception as e: + logger.error(f"Local storage failed: {e}") + return None + + async def unpin(self, content_hash: str) -> bool: + """Remove content from local storage.""" + try: + import asyncio + + def do_remove(): + file_path = self._get_file_path(content_hash) + if file_path.exists(): + file_path.unlink() + return True + return False + + return await asyncio.to_thread(do_remove) + except Exception as e: + logger.error(f"Local unpin failed: {e}") + return False + + async def get(self, content_hash: str) -> Optional[bytes]: + """Get content from local storage.""" + try: + import asyncio + + def do_get(): + file_path = self._get_file_path(content_hash) + if file_path.exists(): + return file_path.read_bytes() + return None + + return await asyncio.to_thread(do_get) + except Exception as e: + logger.error(f"Local get failed: {e}") + return None + + async def is_pinned(self, content_hash: str) -> bool: + """Check if content exists in local storage.""" + return self._get_file_path(content_hash).exists() + + async def test_connection(self) -> tuple[bool, str]: + """Test local storage is writable.""" + try: + test_file = self.base_path / ".write_test" + test_file.write_text("test") + test_file.unlink() + return True, f"Local storage ready at {self.base_path}" + except Exception as e: + return False, f"Cannot write to {self.base_path}: {e}" + + def get_usage(self) -> dict: + """Get local storage usage stats.""" + try: + total_size = 0 + file_count = 0 + for subdir in self.base_path.iterdir(): + if subdir.is_dir() and len(subdir.name) == 2: + for f in subdir.iterdir(): + if f.is_file(): + total_size += f.stat().st_size + file_count += 1 + return { + "used_bytes": total_size, + "capacity_bytes": self.capacity_bytes, + "pin_count": file_count + } + except Exception: + return {"used_bytes": 0, "capacity_bytes": self.capacity_bytes, "pin_count": 0} + + +def create_provider(provider_type: str, config: dict) -> Optional[StorageProvider]: + """ + Factory function to create a storage provider from config. + + Args: + provider_type: 'pinata', 'web3storage', or 'local' + config: Provider-specific configuration dict + + Returns: + StorageProvider instance or None if invalid + """ + try: + if provider_type == "pinata": + return PinataProvider( + api_key=config["api_key"], + secret_key=config["secret_key"], + capacity_gb=config.get("capacity_gb", 1) + ) + elif provider_type == "web3storage": + return Web3StorageProvider( + api_token=config["api_token"], + capacity_gb=config.get("capacity_gb", 1) + ) + elif provider_type == "local": + return LocalStorageProvider( + base_path=config["path"], + capacity_gb=config.get("capacity_gb", 10) + ) + else: + logger.error(f"Unknown provider type: {provider_type}") + return None + except KeyError as e: + logger.error(f"Missing config key for {provider_type}: {e}") + return None + except Exception as e: + logger.error(f"Failed to create provider {provider_type}: {e}") + return None