- Add PostgreSQL database for cache metadata storage with schema for cache_items, item_types, pin_reasons, and l2_shares tables - Add IPFS integration as durable backing store (local cache as hot storage) - Add postgres and ipfs services to docker-compose.yml - Update cache_manager to upload to IPFS and track CIDs - Rename all config references to recipe throughout server.py - Update API endpoints: /configs/* -> /recipes/* - Update models: ConfigStatus -> RecipeStatus, ConfigRunRequest -> RecipeRunRequest - Update UI tabs and pages to show Recipes instead of Configs Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
431 lines
15 KiB
Python
431 lines
15 KiB
Python
# art-celery/database.py
|
|
"""
|
|
PostgreSQL database module for Art DAG L1 server.
|
|
|
|
Provides connection pooling and CRUD operations for cache metadata.
|
|
"""
|
|
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from typing import List, Optional
|
|
|
|
import asyncpg
|
|
|
|
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://artdag:artdag@localhost:5432/artdag")
|
|
|
|
pool: Optional[asyncpg.Pool] = None
|
|
|
|
SCHEMA_SQL = """
|
|
-- Core cache: just content hash and IPFS CID
|
|
CREATE TABLE IF NOT EXISTS cache_items (
|
|
content_hash VARCHAR(64) PRIMARY KEY,
|
|
ipfs_cid VARCHAR(128),
|
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
|
|
);
|
|
|
|
-- Item types: metadata lives here (same item can be recipe AND media)
|
|
CREATE TABLE IF NOT EXISTS item_types (
|
|
id SERIAL PRIMARY KEY,
|
|
content_hash VARCHAR(64) REFERENCES cache_items(content_hash) ON DELETE CASCADE,
|
|
type VARCHAR(50) NOT NULL,
|
|
path VARCHAR(255),
|
|
description TEXT,
|
|
source_type VARCHAR(20),
|
|
source_url TEXT,
|
|
source_note TEXT,
|
|
pinned BOOLEAN DEFAULT FALSE,
|
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
|
UNIQUE(content_hash, type, path)
|
|
);
|
|
|
|
-- Pin reasons: one-to-many from item_types
|
|
CREATE TABLE IF NOT EXISTS pin_reasons (
|
|
id SERIAL PRIMARY KEY,
|
|
item_type_id INTEGER REFERENCES item_types(id) ON DELETE CASCADE,
|
|
reason VARCHAR(100) NOT NULL,
|
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
|
|
);
|
|
|
|
-- L2 shares: includes content_type for role when shared
|
|
CREATE TABLE IF NOT EXISTS l2_shares (
|
|
id SERIAL PRIMARY KEY,
|
|
content_hash VARCHAR(64) REFERENCES cache_items(content_hash) ON DELETE CASCADE,
|
|
l2_server VARCHAR(255) NOT NULL,
|
|
asset_name VARCHAR(255) NOT NULL,
|
|
content_type VARCHAR(50) NOT NULL,
|
|
published_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
|
last_synced_at TIMESTAMP WITH TIME ZONE,
|
|
UNIQUE(content_hash, l2_server, content_type)
|
|
);
|
|
|
|
-- Indexes
|
|
CREATE INDEX IF NOT EXISTS idx_item_types_content_hash ON item_types(content_hash);
|
|
CREATE INDEX IF NOT EXISTS idx_item_types_type ON item_types(type);
|
|
CREATE INDEX IF NOT EXISTS idx_item_types_path ON item_types(path);
|
|
CREATE INDEX IF NOT EXISTS idx_pin_reasons_item_type ON pin_reasons(item_type_id);
|
|
CREATE INDEX IF NOT EXISTS idx_l2_shares_content_hash ON l2_shares(content_hash);
|
|
"""
|
|
|
|
|
|
async def init_db():
|
|
"""Initialize database connection pool and create schema."""
|
|
global pool
|
|
pool = await asyncpg.create_pool(DATABASE_URL)
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(SCHEMA_SQL)
|
|
|
|
|
|
async def close_db():
|
|
"""Close database connection pool."""
|
|
global pool
|
|
if pool:
|
|
await pool.close()
|
|
pool = None
|
|
|
|
|
|
# ============ Cache Items ============
|
|
|
|
async def create_cache_item(content_hash: str, ipfs_cid: Optional[str] = None) -> dict:
|
|
"""Create a cache item. Returns the created item."""
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"""
|
|
INSERT INTO cache_items (content_hash, ipfs_cid)
|
|
VALUES ($1, $2)
|
|
ON CONFLICT (content_hash) DO UPDATE SET ipfs_cid = COALESCE($2, cache_items.ipfs_cid)
|
|
RETURNING content_hash, ipfs_cid, created_at
|
|
""",
|
|
content_hash, ipfs_cid
|
|
)
|
|
return dict(row)
|
|
|
|
|
|
async def get_cache_item(content_hash: str) -> Optional[dict]:
|
|
"""Get a cache item by content hash."""
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"SELECT content_hash, ipfs_cid, created_at FROM cache_items WHERE content_hash = $1",
|
|
content_hash
|
|
)
|
|
return dict(row) if row else None
|
|
|
|
|
|
async def update_cache_item_ipfs_cid(content_hash: str, ipfs_cid: str) -> bool:
|
|
"""Update the IPFS CID for a cache item."""
|
|
async with pool.acquire() as conn:
|
|
result = await conn.execute(
|
|
"UPDATE cache_items SET ipfs_cid = $2 WHERE content_hash = $1",
|
|
content_hash, ipfs_cid
|
|
)
|
|
return result == "UPDATE 1"
|
|
|
|
|
|
async def delete_cache_item(content_hash: str) -> bool:
|
|
"""Delete a cache item and all associated data (cascades)."""
|
|
async with pool.acquire() as conn:
|
|
result = await conn.execute(
|
|
"DELETE FROM cache_items WHERE content_hash = $1",
|
|
content_hash
|
|
)
|
|
return result == "DELETE 1"
|
|
|
|
|
|
async def list_cache_items(limit: int = 100, offset: int = 0) -> List[dict]:
|
|
"""List cache items with pagination."""
|
|
async with pool.acquire() as conn:
|
|
rows = await conn.fetch(
|
|
"""
|
|
SELECT content_hash, ipfs_cid, created_at
|
|
FROM cache_items
|
|
ORDER BY created_at DESC
|
|
LIMIT $1 OFFSET $2
|
|
""",
|
|
limit, offset
|
|
)
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
# ============ Item Types ============
|
|
|
|
async def add_item_type(
|
|
content_hash: str,
|
|
item_type: str,
|
|
path: Optional[str] = None,
|
|
description: Optional[str] = None,
|
|
source_type: Optional[str] = None,
|
|
source_url: Optional[str] = None,
|
|
source_note: Optional[str] = None,
|
|
) -> dict:
|
|
"""Add a type to a cache item. Creates cache_item if needed."""
|
|
async with pool.acquire() as conn:
|
|
# Ensure cache_item exists
|
|
await conn.execute(
|
|
"INSERT INTO cache_items (content_hash) VALUES ($1) ON CONFLICT DO NOTHING",
|
|
content_hash
|
|
)
|
|
# Insert or update item_type
|
|
row = await conn.fetchrow(
|
|
"""
|
|
INSERT INTO item_types (content_hash, type, path, description, source_type, source_url, source_note)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
|
ON CONFLICT (content_hash, type, path) DO UPDATE SET
|
|
description = COALESCE($4, item_types.description),
|
|
source_type = COALESCE($5, item_types.source_type),
|
|
source_url = COALESCE($6, item_types.source_url),
|
|
source_note = COALESCE($7, item_types.source_note)
|
|
RETURNING id, content_hash, type, path, description, source_type, source_url, source_note, pinned, created_at
|
|
""",
|
|
content_hash, item_type, path, description, source_type, source_url, source_note
|
|
)
|
|
return dict(row)
|
|
|
|
|
|
async def get_item_types(content_hash: str) -> List[dict]:
|
|
"""Get all types for a cache item."""
|
|
async with pool.acquire() as conn:
|
|
rows = await conn.fetch(
|
|
"""
|
|
SELECT id, content_hash, type, path, description, source_type, source_url, source_note, pinned, created_at
|
|
FROM item_types
|
|
WHERE content_hash = $1
|
|
ORDER BY created_at
|
|
""",
|
|
content_hash
|
|
)
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
async def get_item_type(content_hash: str, item_type: str, path: Optional[str] = None) -> Optional[dict]:
|
|
"""Get a specific type for a cache item."""
|
|
async with pool.acquire() as conn:
|
|
if path is None:
|
|
row = await conn.fetchrow(
|
|
"""
|
|
SELECT id, content_hash, type, path, description, source_type, source_url, source_note, pinned, created_at
|
|
FROM item_types
|
|
WHERE content_hash = $1 AND type = $2 AND path IS NULL
|
|
""",
|
|
content_hash, item_type
|
|
)
|
|
else:
|
|
row = await conn.fetchrow(
|
|
"""
|
|
SELECT id, content_hash, type, path, description, source_type, source_url, source_note, pinned, created_at
|
|
FROM item_types
|
|
WHERE content_hash = $1 AND type = $2 AND path = $3
|
|
""",
|
|
content_hash, item_type, path
|
|
)
|
|
return dict(row) if row else None
|
|
|
|
|
|
async def update_item_type(
|
|
item_type_id: int,
|
|
description: Optional[str] = None,
|
|
source_type: Optional[str] = None,
|
|
source_url: Optional[str] = None,
|
|
source_note: Optional[str] = None,
|
|
) -> bool:
|
|
"""Update an item type's metadata."""
|
|
async with pool.acquire() as conn:
|
|
result = await conn.execute(
|
|
"""
|
|
UPDATE item_types SET
|
|
description = COALESCE($2, description),
|
|
source_type = COALESCE($3, source_type),
|
|
source_url = COALESCE($4, source_url),
|
|
source_note = COALESCE($5, source_note)
|
|
WHERE id = $1
|
|
""",
|
|
item_type_id, description, source_type, source_url, source_note
|
|
)
|
|
return result == "UPDATE 1"
|
|
|
|
|
|
async def delete_item_type(content_hash: str, item_type: str, path: Optional[str] = None) -> bool:
|
|
"""Delete a specific type from a cache item."""
|
|
async with pool.acquire() as conn:
|
|
if path is None:
|
|
result = await conn.execute(
|
|
"DELETE FROM item_types WHERE content_hash = $1 AND type = $2 AND path IS NULL",
|
|
content_hash, item_type
|
|
)
|
|
else:
|
|
result = await conn.execute(
|
|
"DELETE FROM item_types WHERE content_hash = $1 AND type = $2 AND path = $3",
|
|
content_hash, item_type, path
|
|
)
|
|
return result == "DELETE 1"
|
|
|
|
|
|
async def list_items_by_type(item_type: str, limit: int = 100, offset: int = 0) -> List[dict]:
|
|
"""List all items of a specific type."""
|
|
async with pool.acquire() as conn:
|
|
rows = await conn.fetch(
|
|
"""
|
|
SELECT it.id, it.content_hash, it.type, it.path, it.description,
|
|
it.source_type, it.source_url, it.source_note, it.pinned, it.created_at,
|
|
ci.ipfs_cid
|
|
FROM item_types it
|
|
JOIN cache_items ci ON it.content_hash = ci.content_hash
|
|
WHERE it.type = $1
|
|
ORDER BY it.created_at DESC
|
|
LIMIT $2 OFFSET $3
|
|
""",
|
|
item_type, limit, offset
|
|
)
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
async def get_item_by_path(item_type: str, path: str) -> Optional[dict]:
|
|
"""Get an item by its type and path (e.g., recipe:/effects/dog)."""
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"""
|
|
SELECT it.id, it.content_hash, it.type, it.path, it.description,
|
|
it.source_type, it.source_url, it.source_note, it.pinned, it.created_at,
|
|
ci.ipfs_cid
|
|
FROM item_types it
|
|
JOIN cache_items ci ON it.content_hash = ci.content_hash
|
|
WHERE it.type = $1 AND it.path = $2
|
|
""",
|
|
item_type, path
|
|
)
|
|
return dict(row) if row else None
|
|
|
|
|
|
# ============ Pinning ============
|
|
|
|
async def pin_item_type(item_type_id: int, reason: str) -> bool:
|
|
"""Pin an item type with a reason."""
|
|
async with pool.acquire() as conn:
|
|
async with conn.transaction():
|
|
# Set pinned flag
|
|
await conn.execute(
|
|
"UPDATE item_types SET pinned = TRUE WHERE id = $1",
|
|
item_type_id
|
|
)
|
|
# Add pin reason
|
|
await conn.execute(
|
|
"INSERT INTO pin_reasons (item_type_id, reason) VALUES ($1, $2)",
|
|
item_type_id, reason
|
|
)
|
|
return True
|
|
|
|
|
|
async def unpin_item_type(item_type_id: int, reason: Optional[str] = None) -> bool:
|
|
"""Remove a pin reason from an item type. If no reasons left, unpins the item."""
|
|
async with pool.acquire() as conn:
|
|
async with conn.transaction():
|
|
if reason:
|
|
# Remove specific reason
|
|
await conn.execute(
|
|
"DELETE FROM pin_reasons WHERE item_type_id = $1 AND reason = $2",
|
|
item_type_id, reason
|
|
)
|
|
else:
|
|
# Remove all reasons
|
|
await conn.execute(
|
|
"DELETE FROM pin_reasons WHERE item_type_id = $1",
|
|
item_type_id
|
|
)
|
|
|
|
# Check if any reasons remain
|
|
count = await conn.fetchval(
|
|
"SELECT COUNT(*) FROM pin_reasons WHERE item_type_id = $1",
|
|
item_type_id
|
|
)
|
|
|
|
if count == 0:
|
|
await conn.execute(
|
|
"UPDATE item_types SET pinned = FALSE WHERE id = $1",
|
|
item_type_id
|
|
)
|
|
return True
|
|
|
|
|
|
async def get_pin_reasons(item_type_id: int) -> List[dict]:
|
|
"""Get all pin reasons for an item type."""
|
|
async with pool.acquire() as conn:
|
|
rows = await conn.fetch(
|
|
"SELECT id, reason, created_at FROM pin_reasons WHERE item_type_id = $1 ORDER BY created_at",
|
|
item_type_id
|
|
)
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
async def is_item_pinned(content_hash: str, item_type: Optional[str] = None) -> tuple[bool, List[str]]:
|
|
"""Check if any type of a cache item is pinned. Returns (is_pinned, reasons)."""
|
|
async with pool.acquire() as conn:
|
|
if item_type:
|
|
rows = await conn.fetch(
|
|
"""
|
|
SELECT pr.reason
|
|
FROM pin_reasons pr
|
|
JOIN item_types it ON pr.item_type_id = it.id
|
|
WHERE it.content_hash = $1 AND it.type = $2 AND it.pinned = TRUE
|
|
""",
|
|
content_hash, item_type
|
|
)
|
|
else:
|
|
rows = await conn.fetch(
|
|
"""
|
|
SELECT pr.reason
|
|
FROM pin_reasons pr
|
|
JOIN item_types it ON pr.item_type_id = it.id
|
|
WHERE it.content_hash = $1 AND it.pinned = TRUE
|
|
""",
|
|
content_hash
|
|
)
|
|
reasons = [row["reason"] for row in rows]
|
|
return len(reasons) > 0, reasons
|
|
|
|
|
|
# ============ L2 Shares ============
|
|
|
|
async def add_l2_share(
|
|
content_hash: str,
|
|
l2_server: str,
|
|
asset_name: str,
|
|
content_type: str,
|
|
) -> dict:
|
|
"""Add or update an L2 share."""
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"""
|
|
INSERT INTO l2_shares (content_hash, l2_server, asset_name, content_type, last_synced_at)
|
|
VALUES ($1, $2, $3, $4, NOW())
|
|
ON CONFLICT (content_hash, l2_server, content_type) DO UPDATE SET
|
|
asset_name = $3,
|
|
last_synced_at = NOW()
|
|
RETURNING id, content_hash, l2_server, asset_name, content_type, published_at, last_synced_at
|
|
""",
|
|
content_hash, l2_server, asset_name, content_type
|
|
)
|
|
return dict(row)
|
|
|
|
|
|
async def get_l2_shares(content_hash: str) -> List[dict]:
|
|
"""Get all L2 shares for a cache item."""
|
|
async with pool.acquire() as conn:
|
|
rows = await conn.fetch(
|
|
"""
|
|
SELECT id, content_hash, l2_server, asset_name, content_type, published_at, last_synced_at
|
|
FROM l2_shares
|
|
WHERE content_hash = $1
|
|
ORDER BY published_at
|
|
""",
|
|
content_hash
|
|
)
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
async def delete_l2_share(content_hash: str, l2_server: str, content_type: str) -> bool:
|
|
"""Delete an L2 share."""
|
|
async with pool.acquire() as conn:
|
|
result = await conn.execute(
|
|
"DELETE FROM l2_shares WHERE content_hash = $1 AND l2_server = $2 AND content_type = $3",
|
|
content_hash, l2_server, content_type
|
|
)
|
|
return result == "DELETE 1"
|