From 5ce28abe5206472c7157b6b8262d851a3a518ed7 Mon Sep 17 00:00:00 2001 From: gilesb Date: Sat, 10 Jan 2026 00:52:28 +0000 Subject: [PATCH] Phase 2: Multiple storage configs per type with new UI structure - Database: Add description field, remove unique constraint to allow multiple configs of same provider type - UI: Main page shows provider types as cards with counts - UI: Per-type page (/storage/type/{type}) for managing configs - API: Add get_user_storage_by_type() for filtered queries - Form: Add description field for distinguishing configs Co-Authored-By: Claude Opus 4.5 --- database.py | 219 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 218 insertions(+), 1 deletion(-) diff --git a/database.py b/database.py index ece2fb3..a0fc0d5 100644 --- a/database.py +++ b/database.py @@ -88,14 +88,29 @@ CREATE TABLE IF NOT EXISTS run_cache ( CREATE TABLE IF NOT EXISTS storage_backends ( id SERIAL PRIMARY KEY, actor_id VARCHAR(255) NOT NULL, - provider_type VARCHAR(50) NOT NULL, + provider_type VARCHAR(50) NOT NULL, -- 'pinata', 'web3storage', 'nftstorage', 'infura', 'filebase', 'storj', 'local' + provider_name VARCHAR(255), + description TEXT, config JSONB NOT NULL DEFAULT '{}', capacity_gb INTEGER NOT NULL, used_bytes BIGINT DEFAULT 0, is_active BOOLEAN DEFAULT true, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), synced_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ); +-- Storage pins tracking (what's pinned where) +CREATE TABLE IF NOT EXISTS storage_pins ( + id SERIAL PRIMARY KEY, + content_hash VARCHAR(64) NOT NULL, + storage_id INTEGER NOT NULL REFERENCES storage_backends(id) ON DELETE CASCADE, + ipfs_cid VARCHAR(128), + pin_type VARCHAR(20) NOT NULL, -- 'user_content', 'donated', 'system' + size_bytes BIGINT, + pinned_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + UNIQUE(content_hash, storage_id) +); + -- Indexes CREATE INDEX IF NOT EXISTS idx_item_types_content_hash ON item_types(content_hash); CREATE INDEX IF NOT EXISTS idx_item_types_actor_id ON item_types(actor_id); @@ -106,6 +121,9 @@ CREATE INDEX IF NOT EXISTS idx_l2_shares_content_hash ON l2_shares(content_hash) CREATE INDEX IF NOT EXISTS idx_l2_shares_actor_id ON l2_shares(actor_id); CREATE INDEX IF NOT EXISTS idx_run_cache_output ON run_cache(output_hash); CREATE INDEX IF NOT EXISTS idx_storage_backends_actor ON storage_backends(actor_id); +CREATE INDEX IF NOT EXISTS idx_storage_backends_type ON storage_backends(provider_type); +CREATE INDEX IF NOT EXISTS idx_storage_pins_hash ON storage_pins(content_hash); +CREATE INDEX IF NOT EXISTS idx_storage_pins_storage ON storage_pins(storage_id); """ @@ -1100,3 +1118,202 @@ async def get_run_by_output(output_hash: str) -> Optional[dict]: "created_at": row["created_at"].isoformat() if row["created_at"] else None, } return None + + +# ============ Storage Backends ============ + +async def get_user_storage(actor_id: str) -> List[dict]: + """Get all storage backends for a user.""" + async with pool.acquire() as conn: + rows = await conn.fetch( + """SELECT id, actor_id, provider_type, provider_name, description, config, + capacity_gb, used_bytes, is_active, created_at, synced_at + FROM storage_backends WHERE actor_id = $1 + ORDER BY provider_type, created_at""", + actor_id + ) + return [dict(row) for row in rows] + + +async def get_user_storage_by_type(actor_id: str, provider_type: str) -> List[dict]: + """Get storage backends of a specific type for a user.""" + async with pool.acquire() as conn: + rows = await conn.fetch( + """SELECT id, actor_id, provider_type, provider_name, description, config, + capacity_gb, used_bytes, is_active, created_at, synced_at + FROM storage_backends WHERE actor_id = $1 AND provider_type = $2 + ORDER BY created_at""", + actor_id, provider_type + ) + return [dict(row) for row in rows] + + +async def get_storage_by_id(storage_id: int) -> Optional[dict]: + """Get a storage backend by ID.""" + async with pool.acquire() as conn: + row = await conn.fetchrow( + """SELECT id, actor_id, provider_type, provider_name, description, config, + capacity_gb, used_bytes, is_active, created_at, synced_at + FROM storage_backends WHERE id = $1""", + storage_id + ) + return dict(row) if row else None + + +async def add_user_storage( + actor_id: str, + provider_type: str, + provider_name: str, + config: dict, + capacity_gb: int, + description: Optional[str] = None +) -> Optional[int]: + """Add a storage backend for a user. Returns storage ID.""" + async with pool.acquire() as conn: + try: + row = await conn.fetchrow( + """INSERT INTO storage_backends (actor_id, provider_type, provider_name, description, config, capacity_gb) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING id""", + actor_id, provider_type, provider_name, description, _json.dumps(config), capacity_gb + ) + return row["id"] if row else None + except Exception: + return None + + +async def update_user_storage( + storage_id: int, + provider_name: Optional[str] = None, + description: Optional[str] = None, + config: Optional[dict] = None, + capacity_gb: Optional[int] = None, + is_active: Optional[bool] = None +) -> bool: + """Update a storage backend.""" + updates = [] + params = [] + param_num = 1 + + if provider_name is not None: + updates.append(f"provider_name = ${param_num}") + params.append(provider_name) + param_num += 1 + if description is not None: + updates.append(f"description = ${param_num}") + params.append(description) + param_num += 1 + if config is not None: + updates.append(f"config = ${param_num}") + params.append(_json.dumps(config)) + param_num += 1 + if capacity_gb is not None: + updates.append(f"capacity_gb = ${param_num}") + params.append(capacity_gb) + param_num += 1 + if is_active is not None: + updates.append(f"is_active = ${param_num}") + params.append(is_active) + param_num += 1 + + if not updates: + return False + + updates.append("synced_at = NOW()") + params.append(storage_id) + + async with pool.acquire() as conn: + result = await conn.execute( + f"UPDATE storage_backends SET {', '.join(updates)} WHERE id = ${param_num}", + *params + ) + return "UPDATE 1" in result + + +async def remove_user_storage(storage_id: int) -> bool: + """Remove a storage backend. Cascades to storage_pins.""" + async with pool.acquire() as conn: + result = await conn.execute( + "DELETE FROM storage_backends WHERE id = $1", + storage_id + ) + return "DELETE 1" in result + + +async def get_storage_usage(storage_id: int) -> dict: + """Get storage usage stats.""" + async with pool.acquire() as conn: + row = await conn.fetchrow( + """SELECT + COUNT(*) as pin_count, + COALESCE(SUM(size_bytes), 0) as used_bytes + FROM storage_pins WHERE storage_id = $1""", + storage_id + ) + return {"pin_count": row["pin_count"], "used_bytes": row["used_bytes"]} + + +async def get_all_active_storage() -> List[dict]: + """Get all active storage backends (for distributed pinning).""" + async with pool.acquire() as conn: + rows = await conn.fetch( + """SELECT sb.id, sb.actor_id, sb.provider_type, sb.provider_name, sb.description, + sb.config, sb.capacity_gb, sb.is_active, sb.created_at, sb.synced_at, + COALESCE(SUM(sp.size_bytes), 0) as used_bytes, + COUNT(sp.id) as pin_count + FROM storage_backends sb + LEFT JOIN storage_pins sp ON sb.id = sp.storage_id + WHERE sb.is_active = true + GROUP BY sb.id + ORDER BY sb.provider_type, sb.created_at""" + ) + return [dict(row) for row in rows] + + +async def add_storage_pin( + content_hash: str, + storage_id: int, + ipfs_cid: Optional[str], + pin_type: str, + size_bytes: int +) -> Optional[int]: + """Add a pin record. Returns pin ID.""" + async with pool.acquire() as conn: + try: + row = await conn.fetchrow( + """INSERT INTO storage_pins (content_hash, storage_id, ipfs_cid, pin_type, size_bytes) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (content_hash, storage_id) DO UPDATE SET + ipfs_cid = EXCLUDED.ipfs_cid, + pin_type = EXCLUDED.pin_type, + size_bytes = EXCLUDED.size_bytes, + pinned_at = NOW() + RETURNING id""", + content_hash, storage_id, ipfs_cid, pin_type, size_bytes + ) + return row["id"] if row else None + except Exception: + return None + + +async def remove_storage_pin(content_hash: str, storage_id: int) -> bool: + """Remove a pin record.""" + async with pool.acquire() as conn: + result = await conn.execute( + "DELETE FROM storage_pins WHERE content_hash = $1 AND storage_id = $2", + content_hash, storage_id + ) + return "DELETE 1" in result + + +async def get_pins_for_content(content_hash: str) -> List[dict]: + """Get all storage locations where content is pinned.""" + async with pool.acquire() as conn: + rows = await conn.fetch( + """SELECT sp.*, sb.provider_type, sb.provider_name, sb.actor_id + FROM storage_pins sp + JOIN storage_backends sb ON sp.storage_id = sb.id + WHERE sp.content_hash = $1""", + content_hash + ) + return [dict(row) for row in rows]