diff --git a/database.py b/database.py index 5829d1e..b0b21e8 100644 --- a/database.py +++ b/database.py @@ -17,16 +17,19 @@ pool: Optional[asyncpg.Pool] = None SCHEMA_SQL = """ -- Core cache: just content hash and IPFS CID +-- Physical file storage - shared by all users CREATE TABLE IF NOT EXISTS cache_items ( content_hash VARCHAR(64) PRIMARY KEY, ipfs_cid VARCHAR(128), created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ); --- Item types: metadata lives here (same item can be recipe AND media) +-- Item types: per-user metadata (same item can be recipe AND media, per user) +-- actor_id format: @username@server (ActivityPub style) CREATE TABLE IF NOT EXISTS item_types ( id SERIAL PRIMARY KEY, content_hash VARCHAR(64) REFERENCES cache_items(content_hash) ON DELETE CASCADE, + actor_id VARCHAR(255) NOT NULL, type VARCHAR(50) NOT NULL, path VARCHAR(255), description TEXT, @@ -35,7 +38,7 @@ CREATE TABLE IF NOT EXISTS item_types ( source_note TEXT, pinned BOOLEAN DEFAULT FALSE, created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), - UNIQUE(content_hash, type, path) + UNIQUE(content_hash, actor_id, type, path) ); -- Pin reasons: one-to-many from item_types @@ -46,24 +49,27 @@ CREATE TABLE IF NOT EXISTS pin_reasons ( created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ); --- L2 shares: includes content_type for role when shared +-- L2 shares: per-user shares (includes content_type for role when shared) CREATE TABLE IF NOT EXISTS l2_shares ( id SERIAL PRIMARY KEY, content_hash VARCHAR(64) REFERENCES cache_items(content_hash) ON DELETE CASCADE, + actor_id VARCHAR(255) NOT NULL, l2_server VARCHAR(255) NOT NULL, asset_name VARCHAR(255) NOT NULL, content_type VARCHAR(50) NOT NULL, published_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), last_synced_at TIMESTAMP WITH TIME ZONE, - UNIQUE(content_hash, l2_server, content_type) + UNIQUE(content_hash, actor_id, l2_server, content_type) ); -- Indexes CREATE INDEX IF NOT EXISTS idx_item_types_content_hash ON item_types(content_hash); +CREATE INDEX IF NOT EXISTS idx_item_types_actor_id ON item_types(actor_id); CREATE INDEX IF NOT EXISTS idx_item_types_type ON item_types(type); CREATE INDEX IF NOT EXISTS idx_item_types_path ON item_types(path); CREATE INDEX IF NOT EXISTS idx_pin_reasons_item_type ON pin_reasons(item_type_id); CREATE INDEX IF NOT EXISTS idx_l2_shares_content_hash ON l2_shares(content_hash); +CREATE INDEX IF NOT EXISTS idx_l2_shares_actor_id ON l2_shares(actor_id); """ @@ -149,6 +155,7 @@ async def list_cache_items(limit: int = 100, offset: int = 0) -> List[dict]: async def add_item_type( content_hash: str, + actor_id: str, item_type: str, path: Optional[str] = None, description: Optional[str] = None, @@ -156,7 +163,7 @@ async def add_item_type( source_url: Optional[str] = None, source_note: Optional[str] = None, ) -> dict: - """Add a type to a cache item. Creates cache_item if needed.""" + """Add a type to a cache item for a user. Creates cache_item if needed.""" async with pool.acquire() as conn: # Ensure cache_item exists await conn.execute( @@ -166,55 +173,66 @@ async def add_item_type( # Insert or update item_type row = await conn.fetchrow( """ - INSERT INTO item_types (content_hash, type, path, description, source_type, source_url, source_note) - VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT (content_hash, type, path) DO UPDATE SET - description = COALESCE($4, item_types.description), - source_type = COALESCE($5, item_types.source_type), - source_url = COALESCE($6, item_types.source_url), - source_note = COALESCE($7, item_types.source_note) - RETURNING id, content_hash, type, path, description, source_type, source_url, source_note, pinned, created_at + INSERT INTO item_types (content_hash, actor_id, type, path, description, source_type, source_url, source_note) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (content_hash, actor_id, type, path) DO UPDATE SET + description = COALESCE($5, item_types.description), + source_type = COALESCE($6, item_types.source_type), + source_url = COALESCE($7, item_types.source_url), + source_note = COALESCE($8, item_types.source_note) + RETURNING id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at """, - content_hash, item_type, path, description, source_type, source_url, source_note + content_hash, actor_id, item_type, path, description, source_type, source_url, source_note ) return dict(row) -async def get_item_types(content_hash: str) -> List[dict]: - """Get all types for a cache item.""" +async def get_item_types(content_hash: str, actor_id: Optional[str] = None) -> List[dict]: + """Get types for a cache item, optionally filtered by user.""" async with pool.acquire() as conn: - rows = await conn.fetch( - """ - SELECT id, content_hash, type, path, description, source_type, source_url, source_note, pinned, created_at - FROM item_types - WHERE content_hash = $1 - ORDER BY created_at - """, - content_hash - ) + if actor_id: + rows = await conn.fetch( + """ + SELECT id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at + FROM item_types + WHERE content_hash = $1 AND actor_id = $2 + ORDER BY created_at + """, + content_hash, actor_id + ) + else: + rows = await conn.fetch( + """ + SELECT id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at + FROM item_types + WHERE content_hash = $1 + ORDER BY created_at + """, + content_hash + ) return [dict(row) for row in rows] -async def get_item_type(content_hash: str, item_type: str, path: Optional[str] = None) -> Optional[dict]: - """Get a specific type for a cache item.""" +async def get_item_type(content_hash: str, actor_id: str, item_type: str, path: Optional[str] = None) -> Optional[dict]: + """Get a specific type for a cache item and user.""" async with pool.acquire() as conn: if path is None: row = await conn.fetchrow( """ - SELECT id, content_hash, type, path, description, source_type, source_url, source_note, pinned, created_at + SELECT id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at FROM item_types - WHERE content_hash = $1 AND type = $2 AND path IS NULL + WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path IS NULL """, - content_hash, item_type + content_hash, actor_id, item_type ) else: row = await conn.fetchrow( """ - SELECT id, content_hash, type, path, description, source_type, source_url, source_note, pinned, created_at + SELECT id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at FROM item_types - WHERE content_hash = $1 AND type = $2 AND path = $3 + WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path = $4 """, - content_hash, item_type, path + content_hash, actor_id, item_type, path ) return dict(row) if row else None @@ -242,55 +260,83 @@ async def update_item_type( return result == "UPDATE 1" -async def delete_item_type(content_hash: str, item_type: str, path: Optional[str] = None) -> bool: - """Delete a specific type from a cache item.""" +async def delete_item_type(content_hash: str, actor_id: str, item_type: str, path: Optional[str] = None) -> bool: + """Delete a specific type from a cache item for a user.""" async with pool.acquire() as conn: if path is None: result = await conn.execute( - "DELETE FROM item_types WHERE content_hash = $1 AND type = $2 AND path IS NULL", - content_hash, item_type + "DELETE FROM item_types WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path IS NULL", + content_hash, actor_id, item_type ) else: result = await conn.execute( - "DELETE FROM item_types WHERE content_hash = $1 AND type = $2 AND path = $3", - content_hash, item_type, path + "DELETE FROM item_types WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path = $4", + content_hash, actor_id, item_type, path ) return result == "DELETE 1" -async def list_items_by_type(item_type: str, limit: int = 100, offset: int = 0) -> List[dict]: - """List all items of a specific type.""" +async def list_items_by_type(item_type: str, actor_id: Optional[str] = None, limit: int = 100, offset: int = 0) -> List[dict]: + """List items of a specific type, optionally filtered by user.""" async with pool.acquire() as conn: - rows = await conn.fetch( - """ - SELECT it.id, it.content_hash, it.type, it.path, it.description, - it.source_type, it.source_url, it.source_note, it.pinned, it.created_at, - ci.ipfs_cid - FROM item_types it - JOIN cache_items ci ON it.content_hash = ci.content_hash - WHERE it.type = $1 - ORDER BY it.created_at DESC - LIMIT $2 OFFSET $3 - """, - item_type, limit, offset - ) + if actor_id: + rows = await conn.fetch( + """ + SELECT it.id, it.content_hash, it.actor_id, it.type, it.path, it.description, + it.source_type, it.source_url, it.source_note, it.pinned, it.created_at, + ci.ipfs_cid + FROM item_types it + JOIN cache_items ci ON it.content_hash = ci.content_hash + WHERE it.type = $1 AND it.actor_id = $2 + ORDER BY it.created_at DESC + LIMIT $3 OFFSET $4 + """, + item_type, actor_id, limit, offset + ) + else: + rows = await conn.fetch( + """ + SELECT it.id, it.content_hash, it.actor_id, it.type, it.path, it.description, + it.source_type, it.source_url, it.source_note, it.pinned, it.created_at, + ci.ipfs_cid + FROM item_types it + JOIN cache_items ci ON it.content_hash = ci.content_hash + WHERE it.type = $1 + ORDER BY it.created_at DESC + LIMIT $2 OFFSET $3 + """, + item_type, limit, offset + ) return [dict(row) for row in rows] -async def get_item_by_path(item_type: str, path: str) -> Optional[dict]: - """Get an item by its type and path (e.g., recipe:/effects/dog).""" +async def get_item_by_path(item_type: str, path: str, actor_id: Optional[str] = None) -> Optional[dict]: + """Get an item by its type and path (e.g., recipe:/effects/dog), optionally for a specific user.""" async with pool.acquire() as conn: - row = await conn.fetchrow( - """ - SELECT it.id, it.content_hash, it.type, it.path, it.description, - it.source_type, it.source_url, it.source_note, it.pinned, it.created_at, - ci.ipfs_cid - FROM item_types it - JOIN cache_items ci ON it.content_hash = ci.content_hash - WHERE it.type = $1 AND it.path = $2 - """, - item_type, path - ) + if actor_id: + row = await conn.fetchrow( + """ + SELECT it.id, it.content_hash, it.actor_id, it.type, it.path, it.description, + it.source_type, it.source_url, it.source_note, it.pinned, it.created_at, + ci.ipfs_cid + FROM item_types it + JOIN cache_items ci ON it.content_hash = ci.content_hash + WHERE it.type = $1 AND it.path = $2 AND it.actor_id = $3 + """, + item_type, path, actor_id + ) + else: + row = await conn.fetchrow( + """ + SELECT it.id, it.content_hash, it.actor_id, it.type, it.path, it.description, + it.source_type, it.source_url, it.source_note, it.pinned, it.created_at, + ci.ipfs_cid + FROM item_types it + JOIN cache_items ci ON it.content_hash = ci.content_hash + WHERE it.type = $1 AND it.path = $2 + """, + item_type, path + ) return dict(row) if row else None @@ -385,46 +431,93 @@ async def is_item_pinned(content_hash: str, item_type: Optional[str] = None) -> async def add_l2_share( content_hash: str, + actor_id: str, l2_server: str, asset_name: str, content_type: str, ) -> dict: - """Add or update an L2 share.""" + """Add or update an L2 share for a user.""" async with pool.acquire() as conn: row = await conn.fetchrow( """ - INSERT INTO l2_shares (content_hash, l2_server, asset_name, content_type, last_synced_at) - VALUES ($1, $2, $3, $4, NOW()) - ON CONFLICT (content_hash, l2_server, content_type) DO UPDATE SET - asset_name = $3, + INSERT INTO l2_shares (content_hash, actor_id, l2_server, asset_name, content_type, last_synced_at) + VALUES ($1, $2, $3, $4, $5, NOW()) + ON CONFLICT (content_hash, actor_id, l2_server, content_type) DO UPDATE SET + asset_name = $4, last_synced_at = NOW() - RETURNING id, content_hash, l2_server, asset_name, content_type, published_at, last_synced_at + RETURNING id, content_hash, actor_id, l2_server, asset_name, content_type, published_at, last_synced_at """, - content_hash, l2_server, asset_name, content_type + content_hash, actor_id, l2_server, asset_name, content_type ) return dict(row) -async def get_l2_shares(content_hash: str) -> List[dict]: - """Get all L2 shares for a cache item.""" +async def get_l2_shares(content_hash: str, actor_id: Optional[str] = None) -> List[dict]: + """Get L2 shares for a cache item, optionally filtered by user.""" async with pool.acquire() as conn: - rows = await conn.fetch( - """ - SELECT id, content_hash, l2_server, asset_name, content_type, published_at, last_synced_at - FROM l2_shares - WHERE content_hash = $1 - ORDER BY published_at - """, - content_hash - ) + if actor_id: + rows = await conn.fetch( + """ + SELECT id, content_hash, actor_id, l2_server, asset_name, content_type, published_at, last_synced_at + FROM l2_shares + WHERE content_hash = $1 AND actor_id = $2 + ORDER BY published_at + """, + content_hash, actor_id + ) + else: + rows = await conn.fetch( + """ + SELECT id, content_hash, actor_id, l2_server, asset_name, content_type, published_at, last_synced_at + FROM l2_shares + WHERE content_hash = $1 + ORDER BY published_at + """, + content_hash + ) return [dict(row) for row in rows] -async def delete_l2_share(content_hash: str, l2_server: str, content_type: str) -> bool: - """Delete an L2 share.""" +async def delete_l2_share(content_hash: str, actor_id: str, l2_server: str, content_type: str) -> bool: + """Delete an L2 share for a user.""" async with pool.acquire() as conn: result = await conn.execute( - "DELETE FROM l2_shares WHERE content_hash = $1 AND l2_server = $2 AND content_type = $3", - content_hash, l2_server, content_type + "DELETE FROM l2_shares WHERE content_hash = $1 AND actor_id = $2 AND l2_server = $3 AND content_type = $4", + content_hash, actor_id, l2_server, content_type + ) + return result == "DELETE 1" + + +# ============ Cache Item Cleanup ============ + +async def has_remaining_references(content_hash: str) -> bool: + """Check if a cache item has any remaining item_types or l2_shares.""" + async with pool.acquire() as conn: + item_types_count = await conn.fetchval( + "SELECT COUNT(*) FROM item_types WHERE content_hash = $1", + content_hash + ) + if item_types_count > 0: + return True + + l2_shares_count = await conn.fetchval( + "SELECT COUNT(*) FROM l2_shares WHERE content_hash = $1", + content_hash + ) + return l2_shares_count > 0 + + +async def cleanup_orphaned_cache_item(content_hash: str) -> bool: + """Delete a cache item if it has no remaining references. Returns True if deleted.""" + async with pool.acquire() as conn: + # Only delete if no item_types or l2_shares reference it + result = await conn.execute( + """ + DELETE FROM cache_items + WHERE content_hash = $1 + AND NOT EXISTS (SELECT 1 FROM item_types WHERE content_hash = $1) + AND NOT EXISTS (SELECT 1 FROM l2_shares WHERE content_hash = $1) + """, + content_hash ) return result == "DELETE 1"