# art-celery/database.py """ PostgreSQL database module for Art DAG L1 server. Provides connection pooling and CRUD operations for cache metadata. """ import os from datetime import datetime, timezone from typing import List, Optional import asyncpg DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://artdag:artdag@localhost:5432/artdag") pool: Optional[asyncpg.Pool] = None SCHEMA_SQL = """ -- Core cache: just content hash and IPFS CID -- Physical file storage - shared by all users CREATE TABLE IF NOT EXISTS cache_items ( content_hash VARCHAR(64) PRIMARY KEY, ipfs_cid VARCHAR(128), created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ); -- Item types: per-user metadata (same item can be recipe AND media, per user) -- actor_id format: @username@server (ActivityPub style) CREATE TABLE IF NOT EXISTS item_types ( id SERIAL PRIMARY KEY, content_hash VARCHAR(64) REFERENCES cache_items(content_hash) ON DELETE CASCADE, actor_id VARCHAR(255) NOT NULL, type VARCHAR(50) NOT NULL, path VARCHAR(255), description TEXT, source_type VARCHAR(20), source_url TEXT, source_note TEXT, pinned BOOLEAN DEFAULT FALSE, filename VARCHAR(255), metadata JSONB DEFAULT '{}', created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), UNIQUE(content_hash, actor_id, type, path) ); -- Add columns if they don't exist (for existing databases) DO $$ BEGIN ALTER TABLE item_types ADD COLUMN IF NOT EXISTS filename VARCHAR(255); ALTER TABLE item_types ADD COLUMN IF NOT EXISTS metadata JSONB DEFAULT '{}'; EXCEPTION WHEN others THEN NULL; END $$; -- Pin reasons: one-to-many from item_types CREATE TABLE IF NOT EXISTS pin_reasons ( id SERIAL PRIMARY KEY, item_type_id INTEGER REFERENCES item_types(id) ON DELETE CASCADE, reason VARCHAR(100) NOT NULL, created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ); -- L2 shares: per-user shares (includes content_type for role when shared) CREATE TABLE IF NOT EXISTS l2_shares ( id SERIAL PRIMARY KEY, content_hash VARCHAR(64) REFERENCES cache_items(content_hash) ON DELETE CASCADE, actor_id VARCHAR(255) NOT NULL, l2_server VARCHAR(255) NOT NULL, asset_name VARCHAR(255) NOT NULL, content_type VARCHAR(50) NOT NULL, published_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), last_synced_at TIMESTAMP WITH TIME ZONE, UNIQUE(content_hash, actor_id, l2_server, content_type) ); -- Run cache: maps content-addressable run_id to output -- run_id is a hash of (sorted inputs + recipe), making runs deterministic CREATE TABLE IF NOT EXISTS run_cache ( run_id VARCHAR(64) PRIMARY KEY, output_hash VARCHAR(64) NOT NULL, ipfs_cid VARCHAR(128), provenance_cid VARCHAR(128), recipe VARCHAR(255) NOT NULL, inputs JSONB NOT NULL, actor_id VARCHAR(255), created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() ); -- Indexes CREATE INDEX IF NOT EXISTS idx_item_types_content_hash ON item_types(content_hash); CREATE INDEX IF NOT EXISTS idx_item_types_actor_id ON item_types(actor_id); CREATE INDEX IF NOT EXISTS idx_item_types_type ON item_types(type); CREATE INDEX IF NOT EXISTS idx_item_types_path ON item_types(path); CREATE INDEX IF NOT EXISTS idx_pin_reasons_item_type ON pin_reasons(item_type_id); CREATE INDEX IF NOT EXISTS idx_l2_shares_content_hash ON l2_shares(content_hash); CREATE INDEX IF NOT EXISTS idx_l2_shares_actor_id ON l2_shares(actor_id); CREATE INDEX IF NOT EXISTS idx_run_cache_output ON run_cache(output_hash); """ async def init_db(): """Initialize database connection pool and create schema.""" global pool pool = await asyncpg.create_pool(DATABASE_URL) async with pool.acquire() as conn: await conn.execute(SCHEMA_SQL) async def close_db(): """Close database connection pool.""" global pool if pool: await pool.close() pool = None # ============ Cache Items ============ async def create_cache_item(content_hash: str, ipfs_cid: Optional[str] = None) -> dict: """Create a cache item. Returns the created item.""" async with pool.acquire() as conn: row = await conn.fetchrow( """ INSERT INTO cache_items (content_hash, ipfs_cid) VALUES ($1, $2) ON CONFLICT (content_hash) DO UPDATE SET ipfs_cid = COALESCE($2, cache_items.ipfs_cid) RETURNING content_hash, ipfs_cid, created_at """, content_hash, ipfs_cid ) return dict(row) async def get_cache_item(content_hash: str) -> Optional[dict]: """Get a cache item by content hash.""" async with pool.acquire() as conn: row = await conn.fetchrow( "SELECT content_hash, ipfs_cid, created_at FROM cache_items WHERE content_hash = $1", content_hash ) return dict(row) if row else None async def update_cache_item_ipfs_cid(content_hash: str, ipfs_cid: str) -> bool: """Update the IPFS CID for a cache item.""" async with pool.acquire() as conn: result = await conn.execute( "UPDATE cache_items SET ipfs_cid = $2 WHERE content_hash = $1", content_hash, ipfs_cid ) return result == "UPDATE 1" async def delete_cache_item(content_hash: str) -> bool: """Delete a cache item and all associated data (cascades).""" async with pool.acquire() as conn: result = await conn.execute( "DELETE FROM cache_items WHERE content_hash = $1", content_hash ) return result == "DELETE 1" async def list_cache_items(limit: int = 100, offset: int = 0) -> List[dict]: """List cache items with pagination.""" async with pool.acquire() as conn: rows = await conn.fetch( """ SELECT content_hash, ipfs_cid, created_at FROM cache_items ORDER BY created_at DESC LIMIT $1 OFFSET $2 """, limit, offset ) return [dict(row) for row in rows] # ============ Item Types ============ async def add_item_type( content_hash: str, actor_id: str, item_type: str, path: Optional[str] = None, description: Optional[str] = None, source_type: Optional[str] = None, source_url: Optional[str] = None, source_note: Optional[str] = None, ) -> dict: """Add a type to a cache item for a user. Creates cache_item if needed.""" async with pool.acquire() as conn: # Ensure cache_item exists await conn.execute( "INSERT INTO cache_items (content_hash) VALUES ($1) ON CONFLICT DO NOTHING", content_hash ) # Insert or update item_type row = await conn.fetchrow( """ INSERT INTO item_types (content_hash, actor_id, type, path, description, source_type, source_url, source_note) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (content_hash, actor_id, type, path) DO UPDATE SET description = COALESCE($5, item_types.description), source_type = COALESCE($6, item_types.source_type), source_url = COALESCE($7, item_types.source_url), source_note = COALESCE($8, item_types.source_note) RETURNING id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at """, content_hash, actor_id, item_type, path, description, source_type, source_url, source_note ) return dict(row) async def get_item_types(content_hash: str, actor_id: Optional[str] = None) -> List[dict]: """Get types for a cache item, optionally filtered by user.""" async with pool.acquire() as conn: if actor_id: rows = await conn.fetch( """ SELECT id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at FROM item_types WHERE content_hash = $1 AND actor_id = $2 ORDER BY created_at """, content_hash, actor_id ) else: rows = await conn.fetch( """ SELECT id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at FROM item_types WHERE content_hash = $1 ORDER BY created_at """, content_hash ) return [dict(row) for row in rows] async def get_item_type(content_hash: str, actor_id: str, item_type: str, path: Optional[str] = None) -> Optional[dict]: """Get a specific type for a cache item and user.""" async with pool.acquire() as conn: if path is None: row = await conn.fetchrow( """ SELECT id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at FROM item_types WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path IS NULL """, content_hash, actor_id, item_type ) else: row = await conn.fetchrow( """ SELECT id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, created_at FROM item_types WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path = $4 """, content_hash, actor_id, item_type, path ) return dict(row) if row else None async def update_item_type( item_type_id: int, description: Optional[str] = None, source_type: Optional[str] = None, source_url: Optional[str] = None, source_note: Optional[str] = None, ) -> bool: """Update an item type's metadata.""" async with pool.acquire() as conn: result = await conn.execute( """ UPDATE item_types SET description = COALESCE($2, description), source_type = COALESCE($3, source_type), source_url = COALESCE($4, source_url), source_note = COALESCE($5, source_note) WHERE id = $1 """, item_type_id, description, source_type, source_url, source_note ) return result == "UPDATE 1" async def delete_item_type(content_hash: str, actor_id: str, item_type: str, path: Optional[str] = None) -> bool: """Delete a specific type from a cache item for a user.""" async with pool.acquire() as conn: if path is None: result = await conn.execute( "DELETE FROM item_types WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path IS NULL", content_hash, actor_id, item_type ) else: result = await conn.execute( "DELETE FROM item_types WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path = $4", content_hash, actor_id, item_type, path ) return result == "DELETE 1" async def list_items_by_type(item_type: str, actor_id: Optional[str] = None, limit: int = 100, offset: int = 0) -> List[dict]: """List items of a specific type, optionally filtered by user.""" async with pool.acquire() as conn: if actor_id: rows = await conn.fetch( """ SELECT it.id, it.content_hash, it.actor_id, it.type, it.path, it.description, it.source_type, it.source_url, it.source_note, it.pinned, it.created_at, ci.ipfs_cid FROM item_types it JOIN cache_items ci ON it.content_hash = ci.content_hash WHERE it.type = $1 AND it.actor_id = $2 ORDER BY it.created_at DESC LIMIT $3 OFFSET $4 """, item_type, actor_id, limit, offset ) else: rows = await conn.fetch( """ SELECT it.id, it.content_hash, it.actor_id, it.type, it.path, it.description, it.source_type, it.source_url, it.source_note, it.pinned, it.created_at, ci.ipfs_cid FROM item_types it JOIN cache_items ci ON it.content_hash = ci.content_hash WHERE it.type = $1 ORDER BY it.created_at DESC LIMIT $2 OFFSET $3 """, item_type, limit, offset ) return [dict(row) for row in rows] async def get_item_by_path(item_type: str, path: str, actor_id: Optional[str] = None) -> Optional[dict]: """Get an item by its type and path (e.g., recipe:/effects/dog), optionally for a specific user.""" async with pool.acquire() as conn: if actor_id: row = await conn.fetchrow( """ SELECT it.id, it.content_hash, it.actor_id, it.type, it.path, it.description, it.source_type, it.source_url, it.source_note, it.pinned, it.created_at, ci.ipfs_cid FROM item_types it JOIN cache_items ci ON it.content_hash = ci.content_hash WHERE it.type = $1 AND it.path = $2 AND it.actor_id = $3 """, item_type, path, actor_id ) else: row = await conn.fetchrow( """ SELECT it.id, it.content_hash, it.actor_id, it.type, it.path, it.description, it.source_type, it.source_url, it.source_note, it.pinned, it.created_at, ci.ipfs_cid FROM item_types it JOIN cache_items ci ON it.content_hash = ci.content_hash WHERE it.type = $1 AND it.path = $2 """, item_type, path ) return dict(row) if row else None # ============ Pinning ============ async def pin_item_type(item_type_id: int, reason: str) -> bool: """Pin an item type with a reason.""" async with pool.acquire() as conn: async with conn.transaction(): # Set pinned flag await conn.execute( "UPDATE item_types SET pinned = TRUE WHERE id = $1", item_type_id ) # Add pin reason await conn.execute( "INSERT INTO pin_reasons (item_type_id, reason) VALUES ($1, $2)", item_type_id, reason ) return True async def unpin_item_type(item_type_id: int, reason: Optional[str] = None) -> bool: """Remove a pin reason from an item type. If no reasons left, unpins the item.""" async with pool.acquire() as conn: async with conn.transaction(): if reason: # Remove specific reason await conn.execute( "DELETE FROM pin_reasons WHERE item_type_id = $1 AND reason = $2", item_type_id, reason ) else: # Remove all reasons await conn.execute( "DELETE FROM pin_reasons WHERE item_type_id = $1", item_type_id ) # Check if any reasons remain count = await conn.fetchval( "SELECT COUNT(*) FROM pin_reasons WHERE item_type_id = $1", item_type_id ) if count == 0: await conn.execute( "UPDATE item_types SET pinned = FALSE WHERE id = $1", item_type_id ) return True async def get_pin_reasons(item_type_id: int) -> List[dict]: """Get all pin reasons for an item type.""" async with pool.acquire() as conn: rows = await conn.fetch( "SELECT id, reason, created_at FROM pin_reasons WHERE item_type_id = $1 ORDER BY created_at", item_type_id ) return [dict(row) for row in rows] async def is_item_pinned(content_hash: str, item_type: Optional[str] = None) -> tuple[bool, List[str]]: """Check if any type of a cache item is pinned. Returns (is_pinned, reasons).""" async with pool.acquire() as conn: if item_type: rows = await conn.fetch( """ SELECT pr.reason FROM pin_reasons pr JOIN item_types it ON pr.item_type_id = it.id WHERE it.content_hash = $1 AND it.type = $2 AND it.pinned = TRUE """, content_hash, item_type ) else: rows = await conn.fetch( """ SELECT pr.reason FROM pin_reasons pr JOIN item_types it ON pr.item_type_id = it.id WHERE it.content_hash = $1 AND it.pinned = TRUE """, content_hash ) reasons = [row["reason"] for row in rows] return len(reasons) > 0, reasons # ============ L2 Shares ============ async def add_l2_share( content_hash: str, actor_id: str, l2_server: str, asset_name: str, content_type: str, ) -> dict: """Add or update an L2 share for a user.""" async with pool.acquire() as conn: row = await conn.fetchrow( """ INSERT INTO l2_shares (content_hash, actor_id, l2_server, asset_name, content_type, last_synced_at) VALUES ($1, $2, $3, $4, $5, NOW()) ON CONFLICT (content_hash, actor_id, l2_server, content_type) DO UPDATE SET asset_name = $4, last_synced_at = NOW() RETURNING id, content_hash, actor_id, l2_server, asset_name, content_type, published_at, last_synced_at """, content_hash, actor_id, l2_server, asset_name, content_type ) return dict(row) async def get_l2_shares(content_hash: str, actor_id: Optional[str] = None) -> List[dict]: """Get L2 shares for a cache item, optionally filtered by user.""" async with pool.acquire() as conn: if actor_id: rows = await conn.fetch( """ SELECT id, content_hash, actor_id, l2_server, asset_name, content_type, published_at, last_synced_at FROM l2_shares WHERE content_hash = $1 AND actor_id = $2 ORDER BY published_at """, content_hash, actor_id ) else: rows = await conn.fetch( """ SELECT id, content_hash, actor_id, l2_server, asset_name, content_type, published_at, last_synced_at FROM l2_shares WHERE content_hash = $1 ORDER BY published_at """, content_hash ) return [dict(row) for row in rows] async def delete_l2_share(content_hash: str, actor_id: str, l2_server: str, content_type: str) -> bool: """Delete an L2 share for a user.""" async with pool.acquire() as conn: result = await conn.execute( "DELETE FROM l2_shares WHERE content_hash = $1 AND actor_id = $2 AND l2_server = $3 AND content_type = $4", content_hash, actor_id, l2_server, content_type ) return result == "DELETE 1" # ============ Cache Item Cleanup ============ async def has_remaining_references(content_hash: str) -> bool: """Check if a cache item has any remaining item_types or l2_shares.""" async with pool.acquire() as conn: item_types_count = await conn.fetchval( "SELECT COUNT(*) FROM item_types WHERE content_hash = $1", content_hash ) if item_types_count > 0: return True l2_shares_count = await conn.fetchval( "SELECT COUNT(*) FROM l2_shares WHERE content_hash = $1", content_hash ) return l2_shares_count > 0 async def cleanup_orphaned_cache_item(content_hash: str) -> bool: """Delete a cache item if it has no remaining references. Returns True if deleted.""" async with pool.acquire() as conn: # Only delete if no item_types or l2_shares reference it result = await conn.execute( """ DELETE FROM cache_items WHERE content_hash = $1 AND NOT EXISTS (SELECT 1 FROM item_types WHERE content_hash = $1) AND NOT EXISTS (SELECT 1 FROM l2_shares WHERE content_hash = $1) """, content_hash ) return result == "DELETE 1" # ============ High-Level Metadata Functions ============ # These provide a compatible interface to the old JSON-based save_cache_meta/load_cache_meta import json as _json async def save_item_metadata( content_hash: str, actor_id: str, item_type: str = "media", filename: Optional[str] = None, description: Optional[str] = None, source_type: Optional[str] = None, source_url: Optional[str] = None, source_note: Optional[str] = None, pinned: bool = False, pin_reason: Optional[str] = None, tags: Optional[List[str]] = None, folder: Optional[str] = None, collections: Optional[List[str]] = None, **extra_metadata ) -> dict: """ Save or update item metadata in the database. Returns a dict with the item metadata (compatible with old JSON format). """ # Build metadata JSONB for extra fields metadata = {} if tags: metadata["tags"] = tags if folder: metadata["folder"] = folder if collections: metadata["collections"] = collections metadata.update(extra_metadata) async with pool.acquire() as conn: # Ensure cache_item exists await conn.execute( "INSERT INTO cache_items (content_hash) VALUES ($1) ON CONFLICT DO NOTHING", content_hash ) # Upsert item_type row = await conn.fetchrow( """ INSERT INTO item_types (content_hash, actor_id, type, description, source_type, source_url, source_note, pinned, filename, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (content_hash, actor_id, type, path) DO UPDATE SET description = COALESCE(EXCLUDED.description, item_types.description), source_type = COALESCE(EXCLUDED.source_type, item_types.source_type), source_url = COALESCE(EXCLUDED.source_url, item_types.source_url), source_note = COALESCE(EXCLUDED.source_note, item_types.source_note), pinned = EXCLUDED.pinned, filename = COALESCE(EXCLUDED.filename, item_types.filename), metadata = item_types.metadata || EXCLUDED.metadata RETURNING id, content_hash, actor_id, type, path, description, source_type, source_url, source_note, pinned, filename, metadata, created_at """, content_hash, actor_id, item_type, description, source_type, source_url, source_note, pinned, filename, _json.dumps(metadata) ) item_type_id = row["id"] # Handle pinning if pinned and pin_reason: # Add pin reason if not exists await conn.execute( """ INSERT INTO pin_reasons (item_type_id, reason) VALUES ($1, $2) ON CONFLICT DO NOTHING """, item_type_id, pin_reason ) # Build response dict (compatible with old format) result = { "uploader": actor_id, "uploaded_at": row["created_at"].isoformat() if row["created_at"] else None, "filename": row["filename"], "type": row["type"], "description": row["description"], "pinned": row["pinned"], } # Add origin if present if row["source_type"] or row["source_url"] or row["source_note"]: result["origin"] = { "type": row["source_type"], "url": row["source_url"], "note": row["source_note"] } # Add metadata fields if row["metadata"]: meta = row["metadata"] if isinstance(row["metadata"], dict) else _json.loads(row["metadata"]) if meta.get("tags"): result["tags"] = meta["tags"] if meta.get("folder"): result["folder"] = meta["folder"] if meta.get("collections"): result["collections"] = meta["collections"] # Get pin reasons if row["pinned"]: reasons = await conn.fetch( "SELECT reason FROM pin_reasons WHERE item_type_id = $1", item_type_id ) if reasons: result["pin_reason"] = reasons[0]["reason"] return result async def load_item_metadata(content_hash: str, actor_id: Optional[str] = None) -> dict: """ Load item metadata from the database. If actor_id is provided, returns metadata for that user's view of the item. Otherwise, returns combined metadata from all users (for backwards compat). Returns a dict compatible with old JSON format. """ async with pool.acquire() as conn: # Get cache item cache_item = await conn.fetchrow( "SELECT content_hash, ipfs_cid, created_at FROM cache_items WHERE content_hash = $1", content_hash ) if not cache_item: return {} # Get item types if actor_id: item_types = await conn.fetch( """ SELECT id, actor_id, type, path, description, source_type, source_url, source_note, pinned, filename, metadata, created_at FROM item_types WHERE content_hash = $1 AND actor_id = $2 ORDER BY created_at """, content_hash, actor_id ) else: item_types = await conn.fetch( """ SELECT id, actor_id, type, path, description, source_type, source_url, source_note, pinned, filename, metadata, created_at FROM item_types WHERE content_hash = $1 ORDER BY created_at """, content_hash ) if not item_types: return {"uploaded_at": cache_item["created_at"].isoformat() if cache_item["created_at"] else None} # Use first item type as primary (for backwards compat) primary = item_types[0] result = { "uploader": primary["actor_id"], "uploaded_at": primary["created_at"].isoformat() if primary["created_at"] else None, "filename": primary["filename"], "type": primary["type"], "description": primary["description"], "pinned": any(it["pinned"] for it in item_types), } # Add origin if present if primary["source_type"] or primary["source_url"] or primary["source_note"]: result["origin"] = { "type": primary["source_type"], "url": primary["source_url"], "note": primary["source_note"] } # Add metadata fields if primary["metadata"]: meta = primary["metadata"] if isinstance(primary["metadata"], dict) else _json.loads(primary["metadata"]) if meta.get("tags"): result["tags"] = meta["tags"] if meta.get("folder"): result["folder"] = meta["folder"] if meta.get("collections"): result["collections"] = meta["collections"] # Get pin reasons for pinned items for it in item_types: if it["pinned"]: reasons = await conn.fetch( "SELECT reason FROM pin_reasons WHERE item_type_id = $1", it["id"] ) if reasons: result["pin_reason"] = reasons[0]["reason"] break # Get L2 shares if actor_id: shares = await conn.fetch( """ SELECT l2_server, asset_name, content_type, published_at, last_synced_at FROM l2_shares WHERE content_hash = $1 AND actor_id = $2 """, content_hash, actor_id ) else: shares = await conn.fetch( """ SELECT l2_server, asset_name, content_type, published_at, last_synced_at FROM l2_shares WHERE content_hash = $1 """, content_hash ) if shares: result["l2_shares"] = [ { "l2_server": s["l2_server"], "asset_name": s["asset_name"], "content_type": s["content_type"], "published_at": s["published_at"].isoformat() if s["published_at"] else None, "last_synced_at": s["last_synced_at"].isoformat() if s["last_synced_at"] else None, } for s in shares ] # For backwards compat, also set "published" if shared result["published"] = { "to_l2": True, "asset_name": shares[0]["asset_name"], "l2_server": shares[0]["l2_server"], } return result async def update_item_metadata( content_hash: str, actor_id: str, item_type: str = "media", **updates ) -> dict: """ Update specific fields of item metadata. Returns updated metadata dict. """ # Extract known fields from updates description = updates.pop("description", None) source_type = updates.pop("source_type", None) source_url = updates.pop("source_url", None) source_note = updates.pop("source_note", None) # Handle origin dict format origin = updates.pop("origin", None) if origin: source_type = origin.get("type", source_type) source_url = origin.get("url", source_url) source_note = origin.get("note", source_note) pinned = updates.pop("pinned", None) pin_reason = updates.pop("pin_reason", None) filename = updates.pop("filename", None) tags = updates.pop("tags", None) folder = updates.pop("folder", None) collections = updates.pop("collections", None) async with pool.acquire() as conn: # Get existing item_type existing = await conn.fetchrow( """ SELECT id, metadata FROM item_types WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path IS NULL """, content_hash, actor_id, item_type ) if not existing: # Create new entry return await save_item_metadata( content_hash, actor_id, item_type, filename=filename, description=description, source_type=source_type, source_url=source_url, source_note=source_note, pinned=pinned or False, pin_reason=pin_reason, tags=tags, folder=folder, collections=collections, **updates ) # Build update query dynamically set_parts = [] params = [content_hash, actor_id, item_type] param_idx = 4 if description is not None: set_parts.append(f"description = ${param_idx}") params.append(description) param_idx += 1 if source_type is not None: set_parts.append(f"source_type = ${param_idx}") params.append(source_type) param_idx += 1 if source_url is not None: set_parts.append(f"source_url = ${param_idx}") params.append(source_url) param_idx += 1 if source_note is not None: set_parts.append(f"source_note = ${param_idx}") params.append(source_note) param_idx += 1 if pinned is not None: set_parts.append(f"pinned = ${param_idx}") params.append(pinned) param_idx += 1 if filename is not None: set_parts.append(f"filename = ${param_idx}") params.append(filename) param_idx += 1 # Handle metadata updates current_metadata = existing["metadata"] if isinstance(existing["metadata"], dict) else (_json.loads(existing["metadata"]) if existing["metadata"] else {}) if tags is not None: current_metadata["tags"] = tags if folder is not None: current_metadata["folder"] = folder if collections is not None: current_metadata["collections"] = collections current_metadata.update(updates) if current_metadata: set_parts.append(f"metadata = ${param_idx}") params.append(_json.dumps(current_metadata)) param_idx += 1 if set_parts: query = f""" UPDATE item_types SET {', '.join(set_parts)} WHERE content_hash = $1 AND actor_id = $2 AND type = $3 AND path IS NULL """ await conn.execute(query, *params) # Handle pin reason if pinned and pin_reason: await conn.execute( """ INSERT INTO pin_reasons (item_type_id, reason) VALUES ($1, $2) ON CONFLICT DO NOTHING """, existing["id"], pin_reason ) return await load_item_metadata(content_hash, actor_id) async def save_l2_share( content_hash: str, actor_id: str, l2_server: str, asset_name: str, content_type: str = "media" ) -> dict: """Save an L2 share and return share info.""" async with pool.acquire() as conn: row = await conn.fetchrow( """ INSERT INTO l2_shares (content_hash, actor_id, l2_server, asset_name, content_type, last_synced_at) VALUES ($1, $2, $3, $4, $5, NOW()) ON CONFLICT (content_hash, actor_id, l2_server, content_type) DO UPDATE SET asset_name = EXCLUDED.asset_name, last_synced_at = NOW() RETURNING l2_server, asset_name, content_type, published_at, last_synced_at """, content_hash, actor_id, l2_server, asset_name, content_type ) return { "l2_server": row["l2_server"], "asset_name": row["asset_name"], "content_type": row["content_type"], "published_at": row["published_at"].isoformat() if row["published_at"] else None, "last_synced_at": row["last_synced_at"].isoformat() if row["last_synced_at"] else None, } async def get_user_items(actor_id: str, item_type: Optional[str] = None, limit: int = 100, offset: int = 0) -> List[dict]: """Get all items for a user, optionally filtered by type. Deduplicates by content_hash.""" async with pool.acquire() as conn: if item_type: rows = await conn.fetch( """ SELECT * FROM ( SELECT DISTINCT ON (it.content_hash) it.content_hash, it.type, it.description, it.filename, it.pinned, it.created_at, ci.ipfs_cid FROM item_types it JOIN cache_items ci ON it.content_hash = ci.content_hash WHERE it.actor_id = $1 AND it.type = $2 ORDER BY it.content_hash, it.created_at DESC ) deduped ORDER BY created_at DESC LIMIT $3 OFFSET $4 """, actor_id, item_type, limit, offset ) else: rows = await conn.fetch( """ SELECT * FROM ( SELECT DISTINCT ON (it.content_hash) it.content_hash, it.type, it.description, it.filename, it.pinned, it.created_at, ci.ipfs_cid FROM item_types it JOIN cache_items ci ON it.content_hash = ci.content_hash WHERE it.actor_id = $1 ORDER BY it.content_hash, it.created_at DESC ) deduped ORDER BY created_at DESC LIMIT $2 OFFSET $3 """, actor_id, limit, offset ) return [ { "content_hash": r["content_hash"], "type": r["type"], "description": r["description"], "filename": r["filename"], "pinned": r["pinned"], "created_at": r["created_at"].isoformat() if r["created_at"] else None, "ipfs_cid": r["ipfs_cid"], } for r in rows ] async def count_user_items(actor_id: str, item_type: Optional[str] = None) -> int: """Count unique items (by content_hash) for a user.""" async with pool.acquire() as conn: if item_type: return await conn.fetchval( "SELECT COUNT(DISTINCT content_hash) FROM item_types WHERE actor_id = $1 AND type = $2", actor_id, item_type ) else: return await conn.fetchval( "SELECT COUNT(DISTINCT content_hash) FROM item_types WHERE actor_id = $1", actor_id ) # ============ Run Cache ============ async def get_run_cache(run_id: str) -> Optional[dict]: """Get cached run result by content-addressable run_id.""" async with pool.acquire() as conn: row = await conn.fetchrow( """ SELECT run_id, output_hash, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at FROM run_cache WHERE run_id = $1 """, run_id ) if row: return { "run_id": row["run_id"], "output_hash": row["output_hash"], "ipfs_cid": row["ipfs_cid"], "provenance_cid": row["provenance_cid"], "recipe": row["recipe"], "inputs": row["inputs"], "actor_id": row["actor_id"], "created_at": row["created_at"].isoformat() if row["created_at"] else None, } return None async def save_run_cache( run_id: str, output_hash: str, recipe: str, inputs: List[str], ipfs_cid: Optional[str] = None, provenance_cid: Optional[str] = None, actor_id: Optional[str] = None, ) -> dict: """Save run result to cache. Updates if run_id already exists.""" async with pool.acquire() as conn: row = await conn.fetchrow( """ INSERT INTO run_cache (run_id, output_hash, ipfs_cid, provenance_cid, recipe, inputs, actor_id) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (run_id) DO UPDATE SET output_hash = EXCLUDED.output_hash, ipfs_cid = COALESCE(EXCLUDED.ipfs_cid, run_cache.ipfs_cid), provenance_cid = COALESCE(EXCLUDED.provenance_cid, run_cache.provenance_cid) RETURNING run_id, output_hash, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at """, run_id, output_hash, ipfs_cid, provenance_cid, recipe, _json.dumps(inputs), actor_id ) return { "run_id": row["run_id"], "output_hash": row["output_hash"], "ipfs_cid": row["ipfs_cid"], "provenance_cid": row["provenance_cid"], "recipe": row["recipe"], "inputs": row["inputs"], "actor_id": row["actor_id"], "created_at": row["created_at"].isoformat() if row["created_at"] else None, } async def get_run_by_output(output_hash: str) -> Optional[dict]: """Get run cache entry by output hash.""" async with pool.acquire() as conn: row = await conn.fetchrow( """ SELECT run_id, output_hash, ipfs_cid, provenance_cid, recipe, inputs, actor_id, created_at FROM run_cache WHERE output_hash = $1 """, output_hash ) if row: return { "run_id": row["run_id"], "output_hash": row["output_hash"], "ipfs_cid": row["ipfs_cid"], "provenance_cid": row["provenance_cid"], "recipe": row["recipe"], "inputs": row["inputs"], "actor_id": row["actor_id"], "created_at": row["created_at"].isoformat() if row["created_at"] else None, } return None