Add actor_id (user) to item_types and l2_shares tables

Multi-user support for cache items:
- item_types now includes actor_id (@user@server format)
- l2_shares now includes actor_id
- Same cache item can be owned by multiple users with different types
- Deleting severs user's connection, not the actual file
- Cache files only removed when no users reference them
- Added has_remaining_references() and cleanup_orphaned_cache_item()
- Updated all CRUD functions to include actor_id parameter

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-08 15:26:09 +00:00
parent c0a98dd0ff
commit bc07a101d1

View File

@@ -17,16 +17,19 @@ pool: Optional[asyncpg.Pool] = None
SCHEMA_SQL = """
-- Core cache: just content hash and IPFS CID
-- Physical file storage - shared by all users
CREATE TABLE IF NOT EXISTS cache_items (
content_hash VARCHAR(64) PRIMARY KEY,
ipfs_cid VARCHAR(128),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Item types: metadata lives here (same item can be recipe AND media)
-- Item types: per-user metadata (same item can be recipe AND media, per user)
-- actor_id format: @username@server (ActivityPub style)
CREATE TABLE IF NOT EXISTS item_types (
id SERIAL PRIMARY KEY,
content_hash VARCHAR(64) REFERENCES cache_items(content_hash) ON DELETE CASCADE,
actor_id VARCHAR(255) NOT NULL,
type VARCHAR(50) NOT NULL,
path VARCHAR(255),
description TEXT,
@@ -35,7 +38,7 @@ CREATE TABLE IF NOT EXISTS item_types (
source_note TEXT,
pinned BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
UNIQUE(content_hash, type, path)
UNIQUE(content_hash, actor_id, type, path)
);
-- Pin reasons: one-to-many from item_types
@@ -46,24 +49,27 @@ CREATE TABLE IF NOT EXISTS pin_reasons (
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- L2 shares: includes content_type for role when shared
-- L2 shares: per-user shares (includes content_type for role when shared)
CREATE TABLE IF NOT EXISTS l2_shares (
id SERIAL PRIMARY KEY,
content_hash VARCHAR(64) REFERENCES cache_items(content_hash) ON DELETE CASCADE,
actor_id VARCHAR(255) NOT NULL,
l2_server VARCHAR(255) NOT NULL,
asset_name VARCHAR(255) NOT NULL,
content_type VARCHAR(50) NOT NULL,
published_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_synced_at TIMESTAMP WITH TIME ZONE,
UNIQUE(content_hash, l2_server, content_type)
UNIQUE(content_hash, actor_id, l2_server, content_type)
);
-- Indexes
CREATE INDEX IF NOT EXISTS idx_item_types_content_hash ON item_types(content_hash);
CREATE INDEX IF NOT EXISTS idx_item_types_actor_id ON item_types(actor_id);
CREATE INDEX IF NOT EXISTS idx_item_types_type ON item_types(type);
CREATE INDEX IF NOT EXISTS idx_item_types_path ON item_types(path);
CREATE INDEX IF NOT EXISTS idx_pin_reasons_item_type ON pin_reasons(item_type_id);
CREATE INDEX IF NOT EXISTS idx_l2_shares_content_hash ON l2_shares(content_hash);
CREATE INDEX IF NOT EXISTS idx_l2_shares_actor_id ON l2_shares(actor_id);
"""
@@ -149,6 +155,7 @@ async def list_cache_items(limit: int = 100, offset: int = 0) -> List[dict]:
async def add_item_type(
content_hash: str,
actor_id: str,
item_type: str,
path: Optional[str] = None,
description: Optional[str] = None,
@@ -156,7 +163,7 @@ async def add_item_type(
source_url: Optional[str] = None,
source_note: Optional[str] = None,
) -> dict:
"""Add a type to a cache item. Creates cache_item if needed."""
"""Add a type to a cache item for a user. Creates cache_item if needed."""
async with pool.acquire() as conn:
# Ensure cache_item exists
await conn.execute(
@@ -166,55 +173,66 @@ async def add_item_type(
# Insert or update item_type
row = await conn.fetchrow(
"""
INSERT INTO item_types (content_hash, type, path, description, source_type, source_url, source_note)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (content_hash, type, path) DO UPDATE SET
description = COALESCE($4, item_types.description),
source_type = COALESCE($5, item_types.source_type),
source_url = COALESCE($6, item_types.source_url),
source_note = COALESCE($7, item_types.source_note)
RETURNING id, content_hash, type, path, description, source_type, source_url, source_note, pinned, created_at
INSERT INTO item_types (content_hash, actor_id, type, path, description, source_type, source_url, source_note)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (content_hash, actor_id, type, path) DO UPDATE SET
description = COALESCE($5, item_types.description),
source_type = COALESCE($6, item_types.source_type),
source_url = COALESCE($7, item_types.source_url),
source_note = COALESCE($8, item_types.source_note)
RETURNING id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
""",
content_hash, item_type, path, description, source_type, source_url, source_note
content_hash, actor_id, item_type, path, description, source_type, source_url, source_note
)
return dict(row)
async def get_item_types(content_hash: str) -> List[dict]:
"""Get all types for a cache item."""
async def get_item_types(content_hash: str, actor_id: Optional[str] = None) -> List[dict]:
"""Get types for a cache item, optionally filtered by user."""
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, content_hash, type, path, description, source_type, source_url, source_note, pinned, created_at
FROM item_types
WHERE content_hash = $1
ORDER BY created_at
""",
content_hash
)
if actor_id:
rows = await conn.fetch(
"""
SELECT id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
FROM item_types
WHERE content_hash = $1 AND actor_id = $2
ORDER BY created_at
""",
content_hash, actor_id
)
else:
rows = await conn.fetch(
"""
SELECT id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
FROM item_types
WHERE content_hash = $1
ORDER BY created_at
""",
content_hash
)
return [dict(row) for row in rows]
async def get_item_type(content_hash: str, item_type: str, path: Optional[str] = None) -> Optional[dict]:
"""Get a specific type for a cache item."""
async def get_item_type(content_hash: str, actor_id: str, item_type: str, path: Optional[str] = None) -> Optional[dict]:
"""Get a specific type for a cache item and user."""
async with pool.acquire() as conn:
if path is None:
row = await conn.fetchrow(
"""
SELECT id, content_hash, type, path, description, source_type, source_url, source_note, pinned, created_at
SELECT id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
FROM item_types
WHERE content_hash = $1 AND type = $2 AND path IS NULL
WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path IS NULL
""",
content_hash, item_type
content_hash, actor_id, item_type
)
else:
row = await conn.fetchrow(
"""
SELECT id, content_hash, type, path, description, source_type, source_url, source_note, pinned, created_at
SELECT id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at
FROM item_types
WHERE content_hash = $1 AND type = $2 AND path = $3
WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path = $4
""",
content_hash, item_type, path
content_hash, actor_id, item_type, path
)
return dict(row) if row else None
@@ -242,55 +260,83 @@ async def update_item_type(
return result == "UPDATE 1"
async def delete_item_type(content_hash: str, item_type: str, path: Optional[str] = None) -> bool:
"""Delete a specific type from a cache item."""
async def delete_item_type(content_hash: str, actor_id: str, item_type: str, path: Optional[str] = None) -> bool:
"""Delete a specific type from a cache item for a user."""
async with pool.acquire() as conn:
if path is None:
result = await conn.execute(
"DELETE FROM item_types WHERE content_hash = $1 AND type = $2 AND path IS NULL",
content_hash, item_type
"DELETE FROM item_types WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path IS NULL",
content_hash, actor_id, item_type
)
else:
result = await conn.execute(
"DELETE FROM item_types WHERE content_hash = $1 AND type = $2 AND path = $3",
content_hash, item_type, path
"DELETE FROM item_types WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path = $4",
content_hash, actor_id, item_type, path
)
return result == "DELETE 1"
async def list_items_by_type(item_type: str, limit: int = 100, offset: int = 0) -> List[dict]:
"""List all items of a specific type."""
async def list_items_by_type(item_type: str, actor_id: Optional[str] = None, limit: int = 100, offset: int = 0) -> List[dict]:
"""List items of a specific type, optionally filtered by user."""
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT it.id, it.content_hash, it.type, it.path, it.description,
it.source_type, it.source_url, it.source_note, it.pinned, it.created_at,
ci.ipfs_cid
FROM item_types it
JOIN cache_items ci ON it.content_hash = ci.content_hash
WHERE it.type = $1
ORDER BY it.created_at DESC
LIMIT $2 OFFSET $3
""",
item_type, limit, offset
)
if actor_id:
rows = await conn.fetch(
"""
SELECT it.id, it.content_hash, it.actor_id, it.type, it.path, it.description,
it.source_type, it.source_url, it.source_note, it.pinned, it.created_at,
ci.ipfs_cid
FROM item_types it
JOIN cache_items ci ON it.content_hash = ci.content_hash
WHERE it.type = $1 AND it.actor_id = $2
ORDER BY it.created_at DESC
LIMIT $3 OFFSET $4
""",
item_type, actor_id, limit, offset
)
else:
rows = await conn.fetch(
"""
SELECT it.id, it.content_hash, it.actor_id, it.type, it.path, it.description,
it.source_type, it.source_url, it.source_note, it.pinned, it.created_at,
ci.ipfs_cid
FROM item_types it
JOIN cache_items ci ON it.content_hash = ci.content_hash
WHERE it.type = $1
ORDER BY it.created_at DESC
LIMIT $2 OFFSET $3
""",
item_type, limit, offset
)
return [dict(row) for row in rows]
async def get_item_by_path(item_type: str, path: str) -> Optional[dict]:
"""Get an item by its type and path (e.g., recipe:/effects/dog)."""
async def get_item_by_path(item_type: str, path: str, actor_id: Optional[str] = None) -> Optional[dict]:
"""Get an item by its type and path (e.g., recipe:/effects/dog), optionally for a specific user."""
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT it.id, it.content_hash, it.type, it.path, it.description,
it.source_type, it.source_url, it.source_note, it.pinned, it.created_at,
ci.ipfs_cid
FROM item_types it
JOIN cache_items ci ON it.content_hash = ci.content_hash
WHERE it.type = $1 AND it.path = $2
""",
item_type, path
)
if actor_id:
row = await conn.fetchrow(
"""
SELECT it.id, it.content_hash, it.actor_id, it.type, it.path, it.description,
it.source_type, it.source_url, it.source_note, it.pinned, it.created_at,
ci.ipfs_cid
FROM item_types it
JOIN cache_items ci ON it.content_hash = ci.content_hash
WHERE it.type = $1 AND it.path = $2 AND it.actor_id = $3
""",
item_type, path, actor_id
)
else:
row = await conn.fetchrow(
"""
SELECT it.id, it.content_hash, it.actor_id, it.type, it.path, it.description,
it.source_type, it.source_url, it.source_note, it.pinned, it.created_at,
ci.ipfs_cid
FROM item_types it
JOIN cache_items ci ON it.content_hash = ci.content_hash
WHERE it.type = $1 AND it.path = $2
""",
item_type, path
)
return dict(row) if row else None
@@ -385,46 +431,93 @@ async def is_item_pinned(content_hash: str, item_type: Optional[str] = None) ->
async def add_l2_share(
content_hash: str,
actor_id: str,
l2_server: str,
asset_name: str,
content_type: str,
) -> dict:
"""Add or update an L2 share."""
"""Add or update an L2 share for a user."""
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO l2_shares (content_hash, l2_server, asset_name, content_type, last_synced_at)
VALUES ($1, $2, $3, $4, NOW())
ON CONFLICT (content_hash, l2_server, content_type) DO UPDATE SET
asset_name = $3,
INSERT INTO l2_shares (content_hash, actor_id, l2_server, asset_name, content_type, last_synced_at)
VALUES ($1, $2, $3, $4, $5, NOW())
ON CONFLICT (content_hash, actor_id, l2_server, content_type) DO UPDATE SET
asset_name = $4,
last_synced_at = NOW()
RETURNING id, content_hash, l2_server, asset_name, content_type, published_at, last_synced_at
RETURNING id, content_hash, actor_id, l2_server, asset_name, content_type, published_at, last_synced_at
""",
content_hash, l2_server, asset_name, content_type
content_hash, actor_id, l2_server, asset_name, content_type
)
return dict(row)
async def get_l2_shares(content_hash: str) -> List[dict]:
"""Get all L2 shares for a cache item."""
async def get_l2_shares(content_hash: str, actor_id: Optional[str] = None) -> List[dict]:
"""Get L2 shares for a cache item, optionally filtered by user."""
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, content_hash, l2_server, asset_name, content_type, published_at, last_synced_at
FROM l2_shares
WHERE content_hash = $1
ORDER BY published_at
""",
content_hash
)
if actor_id:
rows = await conn.fetch(
"""
SELECT id, content_hash, actor_id, l2_server, asset_name, content_type, published_at, last_synced_at
FROM l2_shares
WHERE content_hash = $1 AND actor_id = $2
ORDER BY published_at
""",
content_hash, actor_id
)
else:
rows = await conn.fetch(
"""
SELECT id, content_hash, actor_id, l2_server, asset_name, content_type, published_at, last_synced_at
FROM l2_shares
WHERE content_hash = $1
ORDER BY published_at
""",
content_hash
)
return [dict(row) for row in rows]
async def delete_l2_share(content_hash: str, l2_server: str, content_type: str) -> bool:
"""Delete an L2 share."""
async def delete_l2_share(content_hash: str, actor_id: str, l2_server: str, content_type: str) -> bool:
"""Delete an L2 share for a user."""
async with pool.acquire() as conn:
result = await conn.execute(
"DELETE FROM l2_shares WHERE content_hash = $1 AND l2_server = $2 AND content_type = $3",
content_hash, l2_server, content_type
"DELETE FROM l2_shares WHERE content_hash = $1 AND actor_id = $2 AND l2_server = $3 AND content_type = $4",
content_hash, actor_id, l2_server, content_type
)
return result == "DELETE 1"
# ============ Cache Item Cleanup ============
async def has_remaining_references(content_hash: str) -> bool:
"""Check if a cache item has any remaining item_types or l2_shares."""
async with pool.acquire() as conn:
item_types_count = await conn.fetchval(
"SELECT COUNT(*) FROM item_types WHERE content_hash = $1",
content_hash
)
if item_types_count > 0:
return True
l2_shares_count = await conn.fetchval(
"SELECT COUNT(*) FROM l2_shares WHERE content_hash = $1",
content_hash
)
return l2_shares_count > 0
async def cleanup_orphaned_cache_item(content_hash: str) -> bool:
"""Delete a cache item if it has no remaining references. Returns True if deleted."""
async with pool.acquire() as conn:
# Only delete if no item_types or l2_shares reference it
result = await conn.execute(
"""
DELETE FROM cache_items
WHERE content_hash = $1
AND NOT EXISTS (SELECT 1 FROM item_types WHERE content_hash = $1)
AND NOT EXISTS (SELECT 1 FROM l2_shares WHERE content_hash = $1)
""",
content_hash
)
return result == "DELETE 1"