Migrate server.py from JSON metadata to PostgreSQL database

- Replace all load_cache_meta/save_cache_meta calls with database functions
- Update get_user_cache_hashes to async, query both DB and legacy JSON
- Replace get_user_from_cookie with get_user_context_from_cookie throughout
- Update all endpoints to use UserContext (ctx) instead of plain username
- Update render_page calls to use ctx.actor_id
- Add high-level database helper functions:
  - save_item_metadata, load_item_metadata, update_item_metadata
  - save_l2_share, get_user_items, count_user_items
- Keep legacy JSON functions for backwards compatibility during migration

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-08 16:45:22 +00:00
parent 66beda70c4
commit c917a0539d
2 changed files with 835 additions and 235 deletions

View File

@@ -37,10 +37,19 @@ CREATE TABLE IF NOT EXISTS item_types (
source_url TEXT,
source_note TEXT,
pinned BOOLEAN DEFAULT FALSE,
filename VARCHAR(255),
metadata JSONB DEFAULT '{}',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
UNIQUE(content_hash, actor_id, type, path)
);
-- Add columns if they don't exist (for existing databases)
DO $$ BEGIN
ALTER TABLE item_types ADD COLUMN IF NOT EXISTS filename VARCHAR(255);
ALTER TABLE item_types ADD COLUMN IF NOT EXISTS metadata JSONB DEFAULT '{}';
EXCEPTION WHEN others THEN NULL;
END $$;
-- Pin reasons: one-to-many from item_types
CREATE TABLE IF NOT EXISTS pin_reasons (
id SERIAL PRIMARY KEY,
@@ -521,3 +530,453 @@ async def cleanup_orphaned_cache_item(content_hash: str) -> bool:
content_hash
)
return result == "DELETE 1"
# ============ High-Level Metadata Functions ============
# These provide a compatible interface to the old JSON-based save_cache_meta/load_cache_meta
import json as _json
async def save_item_metadata(
content_hash: str,
actor_id: str,
item_type: str = "media",
filename: Optional[str] = None,
description: Optional[str] = None,
source_type: Optional[str] = None,
source_url: Optional[str] = None,
source_note: Optional[str] = None,
pinned: bool = False,
pin_reason: Optional[str] = None,
tags: Optional[List[str]] = None,
folder: Optional[str] = None,
collections: Optional[List[str]] = None,
**extra_metadata
) -> dict:
"""
Save or update item metadata in the database.
Returns a dict with the item metadata (compatible with old JSON format).
"""
# Build metadata JSONB for extra fields
metadata = {}
if tags:
metadata["tags"] = tags
if folder:
metadata["folder"] = folder
if collections:
metadata["collections"] = collections
metadata.update(extra_metadata)
async with pool.acquire() as conn:
# Ensure cache_item exists
await conn.execute(
"INSERT INTO cache_items (content_hash) VALUES ($1) ON CONFLICT DO NOTHING",
content_hash
)
# Upsert item_type
row = await conn.fetchrow(
"""
INSERT INTO item_types (content_hash, actor_id, type, description, source_type, source_url, source_note, pinned, filename, metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (content_hash, actor_id, type, path) DO UPDATE SET
description = COALESCE(EXCLUDED.description, item_types.description),
source_type = COALESCE(EXCLUDED.source_type, item_types.source_type),
source_url = COALESCE(EXCLUDED.source_url, item_types.source_url),
source_note = COALESCE(EXCLUDED.source_note, item_types.source_note),
pinned = EXCLUDED.pinned,
filename = COALESCE(EXCLUDED.filename, item_types.filename),
metadata = item_types.metadata || EXCLUDED.metadata
RETURNING id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, filename, metadata, created_at
""",
content_hash, actor_id, item_type, description, source_type, source_url, source_note, pinned, filename, _json.dumps(metadata)
)
item_type_id = row["id"]
# Handle pinning
if pinned and pin_reason:
# Add pin reason if not exists
await conn.execute(
"""
INSERT INTO pin_reasons (item_type_id, reason)
VALUES ($1, $2)
ON CONFLICT DO NOTHING
""",
item_type_id, pin_reason
)
# Build response dict (compatible with old format)
result = {
"uploader": actor_id,
"uploaded_at": row["created_at"].isoformat() if row["created_at"] else None,
"filename": row["filename"],
"type": row["type"],
"description": row["description"],
"pinned": row["pinned"],
}
# Add origin if present
if row["source_type"] or row["source_url"] or row["source_note"]:
result["origin"] = {
"type": row["source_type"],
"url": row["source_url"],
"note": row["source_note"]
}
# Add metadata fields
if row["metadata"]:
meta = row["metadata"] if isinstance(row["metadata"], dict) else _json.loads(row["metadata"])
if meta.get("tags"):
result["tags"] = meta["tags"]
if meta.get("folder"):
result["folder"] = meta["folder"]
if meta.get("collections"):
result["collections"] = meta["collections"]
# Get pin reasons
if row["pinned"]:
reasons = await conn.fetch(
"SELECT reason FROM pin_reasons WHERE item_type_id = $1",
item_type_id
)
if reasons:
result["pin_reason"] = reasons[0]["reason"]
return result
async def load_item_metadata(content_hash: str, actor_id: Optional[str] = None) -> dict:
"""
Load item metadata from the database.
If actor_id is provided, returns metadata for that user's view of the item.
Otherwise, returns combined metadata from all users (for backwards compat).
Returns a dict compatible with old JSON format.
"""
async with pool.acquire() as conn:
# Get cache item
cache_item = await conn.fetchrow(
"SELECT content_hash, ipfs_cid, created_at FROM cache_items WHERE content_hash = $1",
content_hash
)
if not cache_item:
return {}
# Get item types
if actor_id:
item_types = await conn.fetch(
"""
SELECT id, actor_id, type, path, description, source_type, source_url, source_note, pinned, filename, metadata, created_at
FROM item_types WHERE content_hash = $1 AND actor_id = $2
ORDER BY created_at
""",
content_hash, actor_id
)
else:
item_types = await conn.fetch(
"""
SELECT id, actor_id, type, path, description, source_type, source_url, source_note, pinned, filename, metadata, created_at
FROM item_types WHERE content_hash = $1
ORDER BY created_at
""",
content_hash
)
if not item_types:
return {"uploaded_at": cache_item["created_at"].isoformat() if cache_item["created_at"] else None}
# Use first item type as primary (for backwards compat)
primary = item_types[0]
result = {
"uploader": primary["actor_id"],
"uploaded_at": primary["created_at"].isoformat() if primary["created_at"] else None,
"filename": primary["filename"],
"type": primary["type"],
"description": primary["description"],
"pinned": any(it["pinned"] for it in item_types),
}
# Add origin if present
if primary["source_type"] or primary["source_url"] or primary["source_note"]:
result["origin"] = {
"type": primary["source_type"],
"url": primary["source_url"],
"note": primary["source_note"]
}
# Add metadata fields
if primary["metadata"]:
meta = primary["metadata"] if isinstance(primary["metadata"], dict) else _json.loads(primary["metadata"])
if meta.get("tags"):
result["tags"] = meta["tags"]
if meta.get("folder"):
result["folder"] = meta["folder"]
if meta.get("collections"):
result["collections"] = meta["collections"]
# Get pin reasons for pinned items
for it in item_types:
if it["pinned"]:
reasons = await conn.fetch(
"SELECT reason FROM pin_reasons WHERE item_type_id = $1",
it["id"]
)
if reasons:
result["pin_reason"] = reasons[0]["reason"]
break
# Get L2 shares
if actor_id:
shares = await conn.fetch(
"""
SELECT l2_server, asset_name, content_type, published_at, last_synced_at
FROM l2_shares WHERE content_hash = $1 AND actor_id = $2
""",
content_hash, actor_id
)
else:
shares = await conn.fetch(
"""
SELECT l2_server, asset_name, content_type, published_at, last_synced_at
FROM l2_shares WHERE content_hash = $1
""",
content_hash
)
if shares:
result["l2_shares"] = [
{
"l2_server": s["l2_server"],
"asset_name": s["asset_name"],
"content_type": s["content_type"],
"published_at": s["published_at"].isoformat() if s["published_at"] else None,
"last_synced_at": s["last_synced_at"].isoformat() if s["last_synced_at"] else None,
}
for s in shares
]
# For backwards compat, also set "published" if shared
result["published"] = {
"to_l2": True,
"asset_name": shares[0]["asset_name"],
"l2_server": shares[0]["l2_server"],
}
return result
async def update_item_metadata(
content_hash: str,
actor_id: str,
item_type: str = "media",
**updates
) -> dict:
"""
Update specific fields of item metadata.
Returns updated metadata dict.
"""
# Extract known fields from updates
description = updates.pop("description", None)
source_type = updates.pop("source_type", None)
source_url = updates.pop("source_url", None)
source_note = updates.pop("source_note", None)
# Handle origin dict format
origin = updates.pop("origin", None)
if origin:
source_type = origin.get("type", source_type)
source_url = origin.get("url", source_url)
source_note = origin.get("note", source_note)
pinned = updates.pop("pinned", None)
pin_reason = updates.pop("pin_reason", None)
filename = updates.pop("filename", None)
tags = updates.pop("tags", None)
folder = updates.pop("folder", None)
collections = updates.pop("collections", None)
async with pool.acquire() as conn:
# Get existing item_type
existing = await conn.fetchrow(
"""
SELECT id, metadata FROM item_types
WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path IS NULL
""",
content_hash, actor_id, item_type
)
if not existing:
# Create new entry
return await save_item_metadata(
content_hash, actor_id, item_type,
filename=filename, description=description,
source_type=source_type, source_url=source_url, source_note=source_note,
pinned=pinned or False, pin_reason=pin_reason,
tags=tags, folder=folder, collections=collections,
**updates
)
# Build update query dynamically
set_parts = []
params = [content_hash, actor_id, item_type]
param_idx = 4
if description is not None:
set_parts.append(f"description = ${param_idx}")
params.append(description)
param_idx += 1
if source_type is not None:
set_parts.append(f"source_type = ${param_idx}")
params.append(source_type)
param_idx += 1
if source_url is not None:
set_parts.append(f"source_url = ${param_idx}")
params.append(source_url)
param_idx += 1
if source_note is not None:
set_parts.append(f"source_note = ${param_idx}")
params.append(source_note)
param_idx += 1
if pinned is not None:
set_parts.append(f"pinned = ${param_idx}")
params.append(pinned)
param_idx += 1
if filename is not None:
set_parts.append(f"filename = ${param_idx}")
params.append(filename)
param_idx += 1
# Handle metadata updates
current_metadata = existing["metadata"] if isinstance(existing["metadata"], dict) else (_json.loads(existing["metadata"]) if existing["metadata"] else {})
if tags is not None:
current_metadata["tags"] = tags
if folder is not None:
current_metadata["folder"] = folder
if collections is not None:
current_metadata["collections"] = collections
current_metadata.update(updates)
if current_metadata:
set_parts.append(f"metadata = ${param_idx}")
params.append(_json.dumps(current_metadata))
param_idx += 1
if set_parts:
query = f"""
UPDATE item_types SET {', '.join(set_parts)}
WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path IS NULL
"""
await conn.execute(query, *params)
# Handle pin reason
if pinned and pin_reason:
await conn.execute(
"""
INSERT INTO pin_reasons (item_type_id, reason)
VALUES ($1, $2)
ON CONFLICT DO NOTHING
""",
existing["id"], pin_reason
)
return await load_item_metadata(content_hash, actor_id)
async def save_l2_share(
content_hash: str,
actor_id: str,
l2_server: str,
asset_name: str,
content_type: str = "media"
) -> dict:
"""Save an L2 share and return share info."""
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO l2_shares (content_hash, actor_id, l2_server, asset_name, content_type, last_synced_at)
VALUES ($1, $2, $3, $4, $5, NOW())
ON CONFLICT (content_hash, actor_id, l2_server, content_type) DO UPDATE SET
asset_name = EXCLUDED.asset_name,
last_synced_at = NOW()
RETURNING l2_server, asset_name, content_type, published_at, last_synced_at
""",
content_hash, actor_id, l2_server, asset_name, content_type
)
return {
"l2_server": row["l2_server"],
"asset_name": row["asset_name"],
"content_type": row["content_type"],
"published_at": row["published_at"].isoformat() if row["published_at"] else None,
"last_synced_at": row["last_synced_at"].isoformat() if row["last_synced_at"] else None,
}
async def get_user_items(actor_id: str, item_type: Optional[str] = None, limit: int = 100, offset: int = 0) -> List[dict]:
"""Get all items for a user, optionally filtered by type."""
async with pool.acquire() as conn:
if item_type:
rows = await conn.fetch(
"""
SELECT it.content_hash, it.type, it.description, it.filename, it.pinned, it.created_at,
ci.ipfs_cid
FROM item_types it
JOIN cache_items ci ON it.content_hash = ci.content_hash
WHERE it.actor_id = $1 AND it.type = $2
ORDER BY it.created_at DESC
LIMIT $3 OFFSET $4
""",
actor_id, item_type, limit, offset
)
else:
rows = await conn.fetch(
"""
SELECT it.content_hash, it.type, it.description, it.filename, it.pinned, it.created_at,
ci.ipfs_cid
FROM item_types it
JOIN cache_items ci ON it.content_hash = ci.content_hash
WHERE it.actor_id = $1
ORDER BY it.created_at DESC
LIMIT $2 OFFSET $3
""",
actor_id, limit, offset
)
return [
{
"content_hash": r["content_hash"],
"type": r["type"],
"description": r["description"],
"filename": r["filename"],
"pinned": r["pinned"],
"created_at": r["created_at"].isoformat() if r["created_at"] else None,
"ipfs_cid": r["ipfs_cid"],
}
for r in rows
]
async def count_user_items(actor_id: str, item_type: Optional[str] = None) -> int:
"""Count items for a user."""
async with pool.acquire() as conn:
if item_type:
return await conn.fetchval(
"SELECT COUNT(*) FROM item_types WHERE actor_id = $1 AND type = $2",
actor_id, item_type
)
else:
return await conn.fetchval(
"SELECT COUNT(*) FROM item_types WHERE actor_id = $1",
actor_id
)