Phase 2: Multiple storage configs per type with new UI structure
- Database: Add description field, remove unique constraint to allow
multiple configs of same provider type
- UI: Main page shows provider types as cards with counts
- UI: Per-type page (/storage/type/{type}) for managing configs
- API: Add get_user_storage_by_type() for filtered queries
- Form: Add description field for distinguishing configs
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
219
database.py
219
database.py
@@ -88,14 +88,29 @@ CREATE TABLE IF NOT EXISTS run_cache (
|
||||
CREATE TABLE IF NOT EXISTS storage_backends (
|
||||
id SERIAL PRIMARY KEY,
|
||||
actor_id VARCHAR(255) NOT NULL,
|
||||
provider_type VARCHAR(50) NOT NULL,
|
||||
provider_type VARCHAR(50) NOT NULL, -- 'pinata', 'web3storage', 'nftstorage', 'infura', 'filebase', 'storj', 'local'
|
||||
provider_name VARCHAR(255),
|
||||
description TEXT,
|
||||
config JSONB NOT NULL DEFAULT '{}',
|
||||
capacity_gb INTEGER NOT NULL,
|
||||
used_bytes BIGINT DEFAULT 0,
|
||||
is_active BOOLEAN DEFAULT true,
|
||||
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
||||
synced_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
|
||||
);
|
||||
|
||||
-- Storage pins tracking (what's pinned where)
|
||||
CREATE TABLE IF NOT EXISTS storage_pins (
|
||||
id SERIAL PRIMARY KEY,
|
||||
content_hash VARCHAR(64) NOT NULL,
|
||||
storage_id INTEGER NOT NULL REFERENCES storage_backends(id) ON DELETE CASCADE,
|
||||
ipfs_cid VARCHAR(128),
|
||||
pin_type VARCHAR(20) NOT NULL, -- 'user_content', 'donated', 'system'
|
||||
size_bytes BIGINT,
|
||||
pinned_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
||||
UNIQUE(content_hash, storage_id)
|
||||
);
|
||||
|
||||
-- Indexes
|
||||
CREATE INDEX IF NOT EXISTS idx_item_types_content_hash ON item_types(content_hash);
|
||||
CREATE INDEX IF NOT EXISTS idx_item_types_actor_id ON item_types(actor_id);
|
||||
@@ -106,6 +121,9 @@ CREATE INDEX IF NOT EXISTS idx_l2_shares_content_hash ON l2_shares(content_hash)
|
||||
CREATE INDEX IF NOT EXISTS idx_l2_shares_actor_id ON l2_shares(actor_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_run_cache_output ON run_cache(output_hash);
|
||||
CREATE INDEX IF NOT EXISTS idx_storage_backends_actor ON storage_backends(actor_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_storage_backends_type ON storage_backends(provider_type);
|
||||
CREATE INDEX IF NOT EXISTS idx_storage_pins_hash ON storage_pins(content_hash);
|
||||
CREATE INDEX IF NOT EXISTS idx_storage_pins_storage ON storage_pins(storage_id);
|
||||
"""
|
||||
|
||||
|
||||
@@ -1100,3 +1118,202 @@ async def get_run_by_output(output_hash: str) -> Optional[dict]:
|
||||
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
|
||||
}
|
||||
return None
|
||||
|
||||
|
||||
# ============ Storage Backends ============
|
||||
|
||||
async def get_user_storage(actor_id: str) -> List[dict]:
|
||||
"""Get all storage backends for a user."""
|
||||
async with pool.acquire() as conn:
|
||||
rows = await conn.fetch(
|
||||
"""SELECT id, actor_id, provider_type, provider_name, description, config,
|
||||
capacity_gb, used_bytes, is_active, created_at, synced_at
|
||||
FROM storage_backends WHERE actor_id = $1
|
||||
ORDER BY provider_type, created_at""",
|
||||
actor_id
|
||||
)
|
||||
return [dict(row) for row in rows]
|
||||
|
||||
|
||||
async def get_user_storage_by_type(actor_id: str, provider_type: str) -> List[dict]:
|
||||
"""Get storage backends of a specific type for a user."""
|
||||
async with pool.acquire() as conn:
|
||||
rows = await conn.fetch(
|
||||
"""SELECT id, actor_id, provider_type, provider_name, description, config,
|
||||
capacity_gb, used_bytes, is_active, created_at, synced_at
|
||||
FROM storage_backends WHERE actor_id = $1 AND provider_type = $2
|
||||
ORDER BY created_at""",
|
||||
actor_id, provider_type
|
||||
)
|
||||
return [dict(row) for row in rows]
|
||||
|
||||
|
||||
async def get_storage_by_id(storage_id: int) -> Optional[dict]:
|
||||
"""Get a storage backend by ID."""
|
||||
async with pool.acquire() as conn:
|
||||
row = await conn.fetchrow(
|
||||
"""SELECT id, actor_id, provider_type, provider_name, description, config,
|
||||
capacity_gb, used_bytes, is_active, created_at, synced_at
|
||||
FROM storage_backends WHERE id = $1""",
|
||||
storage_id
|
||||
)
|
||||
return dict(row) if row else None
|
||||
|
||||
|
||||
async def add_user_storage(
|
||||
actor_id: str,
|
||||
provider_type: str,
|
||||
provider_name: str,
|
||||
config: dict,
|
||||
capacity_gb: int,
|
||||
description: Optional[str] = None
|
||||
) -> Optional[int]:
|
||||
"""Add a storage backend for a user. Returns storage ID."""
|
||||
async with pool.acquire() as conn:
|
||||
try:
|
||||
row = await conn.fetchrow(
|
||||
"""INSERT INTO storage_backends (actor_id, provider_type, provider_name, description, config, capacity_gb)
|
||||
VALUES ($1, $2, $3, $4, $5, $6)
|
||||
RETURNING id""",
|
||||
actor_id, provider_type, provider_name, description, _json.dumps(config), capacity_gb
|
||||
)
|
||||
return row["id"] if row else None
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
async def update_user_storage(
|
||||
storage_id: int,
|
||||
provider_name: Optional[str] = None,
|
||||
description: Optional[str] = None,
|
||||
config: Optional[dict] = None,
|
||||
capacity_gb: Optional[int] = None,
|
||||
is_active: Optional[bool] = None
|
||||
) -> bool:
|
||||
"""Update a storage backend."""
|
||||
updates = []
|
||||
params = []
|
||||
param_num = 1
|
||||
|
||||
if provider_name is not None:
|
||||
updates.append(f"provider_name = ${param_num}")
|
||||
params.append(provider_name)
|
||||
param_num += 1
|
||||
if description is not None:
|
||||
updates.append(f"description = ${param_num}")
|
||||
params.append(description)
|
||||
param_num += 1
|
||||
if config is not None:
|
||||
updates.append(f"config = ${param_num}")
|
||||
params.append(_json.dumps(config))
|
||||
param_num += 1
|
||||
if capacity_gb is not None:
|
||||
updates.append(f"capacity_gb = ${param_num}")
|
||||
params.append(capacity_gb)
|
||||
param_num += 1
|
||||
if is_active is not None:
|
||||
updates.append(f"is_active = ${param_num}")
|
||||
params.append(is_active)
|
||||
param_num += 1
|
||||
|
||||
if not updates:
|
||||
return False
|
||||
|
||||
updates.append("synced_at = NOW()")
|
||||
params.append(storage_id)
|
||||
|
||||
async with pool.acquire() as conn:
|
||||
result = await conn.execute(
|
||||
f"UPDATE storage_backends SET {', '.join(updates)} WHERE id = ${param_num}",
|
||||
*params
|
||||
)
|
||||
return "UPDATE 1" in result
|
||||
|
||||
|
||||
async def remove_user_storage(storage_id: int) -> bool:
|
||||
"""Remove a storage backend. Cascades to storage_pins."""
|
||||
async with pool.acquire() as conn:
|
||||
result = await conn.execute(
|
||||
"DELETE FROM storage_backends WHERE id = $1",
|
||||
storage_id
|
||||
)
|
||||
return "DELETE 1" in result
|
||||
|
||||
|
||||
async def get_storage_usage(storage_id: int) -> dict:
|
||||
"""Get storage usage stats."""
|
||||
async with pool.acquire() as conn:
|
||||
row = await conn.fetchrow(
|
||||
"""SELECT
|
||||
COUNT(*) as pin_count,
|
||||
COALESCE(SUM(size_bytes), 0) as used_bytes
|
||||
FROM storage_pins WHERE storage_id = $1""",
|
||||
storage_id
|
||||
)
|
||||
return {"pin_count": row["pin_count"], "used_bytes": row["used_bytes"]}
|
||||
|
||||
|
||||
async def get_all_active_storage() -> List[dict]:
|
||||
"""Get all active storage backends (for distributed pinning)."""
|
||||
async with pool.acquire() as conn:
|
||||
rows = await conn.fetch(
|
||||
"""SELECT sb.id, sb.actor_id, sb.provider_type, sb.provider_name, sb.description,
|
||||
sb.config, sb.capacity_gb, sb.is_active, sb.created_at, sb.synced_at,
|
||||
COALESCE(SUM(sp.size_bytes), 0) as used_bytes,
|
||||
COUNT(sp.id) as pin_count
|
||||
FROM storage_backends sb
|
||||
LEFT JOIN storage_pins sp ON sb.id = sp.storage_id
|
||||
WHERE sb.is_active = true
|
||||
GROUP BY sb.id
|
||||
ORDER BY sb.provider_type, sb.created_at"""
|
||||
)
|
||||
return [dict(row) for row in rows]
|
||||
|
||||
|
||||
async def add_storage_pin(
|
||||
content_hash: str,
|
||||
storage_id: int,
|
||||
ipfs_cid: Optional[str],
|
||||
pin_type: str,
|
||||
size_bytes: int
|
||||
) -> Optional[int]:
|
||||
"""Add a pin record. Returns pin ID."""
|
||||
async with pool.acquire() as conn:
|
||||
try:
|
||||
row = await conn.fetchrow(
|
||||
"""INSERT INTO storage_pins (content_hash, storage_id, ipfs_cid, pin_type, size_bytes)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
ON CONFLICT (content_hash, storage_id) DO UPDATE SET
|
||||
ipfs_cid = EXCLUDED.ipfs_cid,
|
||||
pin_type = EXCLUDED.pin_type,
|
||||
size_bytes = EXCLUDED.size_bytes,
|
||||
pinned_at = NOW()
|
||||
RETURNING id""",
|
||||
content_hash, storage_id, ipfs_cid, pin_type, size_bytes
|
||||
)
|
||||
return row["id"] if row else None
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
async def remove_storage_pin(content_hash: str, storage_id: int) -> bool:
|
||||
"""Remove a pin record."""
|
||||
async with pool.acquire() as conn:
|
||||
result = await conn.execute(
|
||||
"DELETE FROM storage_pins WHERE content_hash = $1 AND storage_id = $2",
|
||||
content_hash, storage_id
|
||||
)
|
||||
return "DELETE 1" in result
|
||||
|
||||
|
||||
async def get_pins_for_content(content_hash: str) -> List[dict]:
|
||||
"""Get all storage locations where content is pinned."""
|
||||
async with pool.acquire() as conn:
|
||||
rows = await conn.fetch(
|
||||
"""SELECT sp.*, sb.provider_type, sb.provider_name, sb.actor_id
|
||||
FROM storage_pins sp
|
||||
JOIN storage_backends sb ON sp.storage_id = sb.id
|
||||
WHERE sp.content_hash = $1""",
|
||||
content_hash
|
||||
)
|
||||
return [dict(row) for row in rows]
|
||||
|
||||
Reference in New Issue
Block a user