1059 lines
37 KiB
Python
1059 lines
37 KiB
Python
"""
|
|
Database module for Art DAG L2 Server.
|
|
|
|
Uses asyncpg for async PostgreSQL access with connection pooling.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
from contextlib import asynccontextmanager
|
|
|
|
import asyncpg
|
|
|
|
# Connection pool (initialized on startup)
|
|
|
|
|
|
def _parse_timestamp(ts) -> datetime:
|
|
"""Parse a timestamp string or datetime to datetime object."""
|
|
if ts is None:
|
|
return datetime.now(timezone.utc)
|
|
if isinstance(ts, datetime):
|
|
return ts
|
|
# Parse ISO format string
|
|
if isinstance(ts, str):
|
|
if ts.endswith('Z'):
|
|
ts = ts[:-1] + '+00:00'
|
|
return datetime.fromisoformat(ts)
|
|
return datetime.now(timezone.utc)
|
|
|
|
|
|
_pool: Optional[asyncpg.Pool] = None
|
|
|
|
# Configuration from environment
|
|
DATABASE_URL = os.environ.get(
|
|
"DATABASE_URL",
|
|
"postgresql://artdag:artdag@localhost:5432/artdag"
|
|
)
|
|
|
|
# Schema for database initialization
|
|
SCHEMA = """
|
|
-- Users table
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
username VARCHAR(255) PRIMARY KEY,
|
|
password_hash VARCHAR(255) NOT NULL,
|
|
email VARCHAR(255),
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
);
|
|
|
|
-- Assets table
|
|
CREATE TABLE IF NOT EXISTS assets (
|
|
name VARCHAR(255) PRIMARY KEY,
|
|
content_hash VARCHAR(128) NOT NULL,
|
|
ipfs_cid VARCHAR(128),
|
|
asset_type VARCHAR(50) NOT NULL,
|
|
tags JSONB DEFAULT '[]'::jsonb,
|
|
metadata JSONB DEFAULT '{}'::jsonb,
|
|
url TEXT,
|
|
provenance JSONB,
|
|
description TEXT,
|
|
origin JSONB,
|
|
owner VARCHAR(255) NOT NULL REFERENCES users(username),
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
updated_at TIMESTAMPTZ
|
|
);
|
|
|
|
-- Activities table (activity_id is content-addressable run_id hash)
|
|
CREATE TABLE IF NOT EXISTS activities (
|
|
activity_id VARCHAR(64) PRIMARY KEY,
|
|
activity_type VARCHAR(50) NOT NULL,
|
|
actor_id TEXT NOT NULL,
|
|
object_data JSONB NOT NULL,
|
|
published TIMESTAMPTZ NOT NULL,
|
|
signature JSONB,
|
|
anchor_root VARCHAR(64) -- Merkle root this activity is anchored to
|
|
);
|
|
|
|
-- Anchors table (Bitcoin timestamps via OpenTimestamps)
|
|
CREATE TABLE IF NOT EXISTS anchors (
|
|
id SERIAL PRIMARY KEY,
|
|
merkle_root VARCHAR(64) NOT NULL UNIQUE,
|
|
tree_ipfs_cid VARCHAR(128),
|
|
ots_proof_cid VARCHAR(128),
|
|
activity_count INTEGER NOT NULL,
|
|
first_activity_id VARCHAR(64),
|
|
last_activity_id VARCHAR(64),
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
confirmed_at TIMESTAMPTZ,
|
|
bitcoin_txid VARCHAR(64)
|
|
);
|
|
|
|
-- Followers table
|
|
CREATE TABLE IF NOT EXISTS followers (
|
|
id SERIAL PRIMARY KEY,
|
|
username VARCHAR(255) NOT NULL REFERENCES users(username),
|
|
acct VARCHAR(255) NOT NULL,
|
|
url TEXT NOT NULL,
|
|
public_key TEXT,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
UNIQUE(username, acct)
|
|
);
|
|
|
|
-- User's attached L1 renderers
|
|
CREATE TABLE IF NOT EXISTS user_renderers (
|
|
id SERIAL PRIMARY KEY,
|
|
username VARCHAR(255) NOT NULL REFERENCES users(username),
|
|
l1_url TEXT NOT NULL,
|
|
attached_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
UNIQUE(username, l1_url)
|
|
);
|
|
|
|
-- Revoked tokens (for federated logout)
|
|
CREATE TABLE IF NOT EXISTS revoked_tokens (
|
|
token_hash VARCHAR(64) PRIMARY KEY,
|
|
username VARCHAR(255) NOT NULL,
|
|
revoked_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
expires_at TIMESTAMPTZ NOT NULL
|
|
);
|
|
|
|
-- User storage providers (IPFS pinning services, local storage, etc.)
|
|
-- Users can have multiple configs of the same provider type
|
|
CREATE TABLE IF NOT EXISTS user_storage (
|
|
id SERIAL PRIMARY KEY,
|
|
username VARCHAR(255) NOT NULL REFERENCES users(username),
|
|
provider_type VARCHAR(50) NOT NULL, -- 'pinata', 'web3storage', 'nftstorage', 'infura', 'filebase', 'storj', 'local'
|
|
provider_name VARCHAR(255), -- User-friendly name
|
|
description TEXT, -- User description to distinguish configs
|
|
config JSONB NOT NULL DEFAULT '{}', -- API keys, endpoints, paths
|
|
capacity_gb INTEGER NOT NULL, -- Total capacity user is contributing
|
|
is_active BOOLEAN DEFAULT true,
|
|
created_at TIMESTAMPTZ DEFAULT NOW(),
|
|
updated_at TIMESTAMPTZ DEFAULT NOW()
|
|
);
|
|
|
|
-- Track what's stored where
|
|
CREATE TABLE IF NOT EXISTS storage_pins (
|
|
id SERIAL PRIMARY KEY,
|
|
content_hash VARCHAR(64) NOT NULL,
|
|
storage_id INTEGER NOT NULL REFERENCES user_storage(id) ON DELETE CASCADE,
|
|
ipfs_cid VARCHAR(128),
|
|
pin_type VARCHAR(20) NOT NULL, -- 'user_content', 'donated', 'system'
|
|
size_bytes BIGINT,
|
|
pinned_at TIMESTAMPTZ DEFAULT NOW(),
|
|
UNIQUE(content_hash, storage_id)
|
|
);
|
|
|
|
-- Indexes
|
|
CREATE INDEX IF NOT EXISTS idx_users_created_at ON users(created_at);
|
|
CREATE INDEX IF NOT EXISTS idx_assets_content_hash ON assets(content_hash);
|
|
CREATE INDEX IF NOT EXISTS idx_assets_owner ON assets(owner);
|
|
CREATE INDEX IF NOT EXISTS idx_assets_created_at ON assets(created_at DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_assets_tags ON assets USING GIN(tags);
|
|
CREATE INDEX IF NOT EXISTS idx_activities_actor_id ON activities(actor_id);
|
|
CREATE INDEX IF NOT EXISTS idx_activities_published ON activities(published DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_activities_anchor ON activities(anchor_root);
|
|
CREATE INDEX IF NOT EXISTS idx_anchors_created ON anchors(created_at DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_followers_username ON followers(username);
|
|
CREATE INDEX IF NOT EXISTS idx_revoked_tokens_expires ON revoked_tokens(expires_at);
|
|
CREATE INDEX IF NOT EXISTS idx_user_storage_username ON user_storage(username);
|
|
CREATE INDEX IF NOT EXISTS idx_storage_pins_hash ON storage_pins(content_hash);
|
|
CREATE INDEX IF NOT EXISTS idx_storage_pins_storage ON storage_pins(storage_id);
|
|
|
|
-- Add source URL columns to assets if they don't exist
|
|
DO $$ BEGIN
|
|
ALTER TABLE assets ADD COLUMN source_url TEXT;
|
|
EXCEPTION WHEN duplicate_column THEN NULL;
|
|
END $$;
|
|
|
|
DO $$ BEGIN
|
|
ALTER TABLE assets ADD COLUMN source_type VARCHAR(50);
|
|
EXCEPTION WHEN duplicate_column THEN NULL;
|
|
END $$;
|
|
|
|
-- Add description column to user_storage if it doesn't exist
|
|
DO $$ BEGIN
|
|
ALTER TABLE user_storage ADD COLUMN description TEXT;
|
|
EXCEPTION WHEN duplicate_column THEN NULL;
|
|
END $$;
|
|
"""
|
|
|
|
|
|
async def init_pool():
|
|
"""Initialize the connection pool and create tables. Call on app startup."""
|
|
global _pool
|
|
_pool = await asyncpg.create_pool(
|
|
DATABASE_URL,
|
|
min_size=2,
|
|
max_size=10,
|
|
command_timeout=60
|
|
)
|
|
# Create tables if they don't exist
|
|
async with _pool.acquire() as conn:
|
|
await conn.execute(SCHEMA)
|
|
|
|
|
|
async def close_pool():
|
|
"""Close the connection pool. Call on app shutdown."""
|
|
global _pool
|
|
if _pool:
|
|
await _pool.close()
|
|
_pool = None
|
|
|
|
|
|
def get_pool() -> asyncpg.Pool:
|
|
"""Get the connection pool."""
|
|
if _pool is None:
|
|
raise RuntimeError("Database pool not initialized")
|
|
return _pool
|
|
|
|
|
|
@asynccontextmanager
|
|
async def get_connection():
|
|
"""Get a connection from the pool."""
|
|
async with get_pool().acquire() as conn:
|
|
yield conn
|
|
|
|
|
|
@asynccontextmanager
|
|
async def transaction():
|
|
"""
|
|
Get a connection with an active transaction.
|
|
|
|
Usage:
|
|
async with db.transaction() as conn:
|
|
await create_asset_tx(conn, asset1)
|
|
await create_asset_tx(conn, asset2)
|
|
await create_activity_tx(conn, activity)
|
|
# Commits on exit, rolls back on exception
|
|
"""
|
|
async with get_pool().acquire() as conn:
|
|
async with conn.transaction():
|
|
yield conn
|
|
|
|
|
|
# ============ Users ============
|
|
|
|
async def get_user(username: str) -> Optional[dict]:
|
|
"""Get user by username."""
|
|
async with get_connection() as conn:
|
|
row = await conn.fetchrow(
|
|
"SELECT username, password_hash, email, created_at FROM users WHERE username = $1",
|
|
username
|
|
)
|
|
if row:
|
|
return dict(row)
|
|
return None
|
|
|
|
|
|
async def get_all_users() -> dict[str, dict]:
|
|
"""Get all users as a dict indexed by username."""
|
|
async with get_connection() as conn:
|
|
rows = await conn.fetch(
|
|
"SELECT username, password_hash, email, created_at FROM users ORDER BY username"
|
|
)
|
|
return {row["username"]: dict(row) for row in rows}
|
|
|
|
|
|
async def create_user(username: str, password_hash: str, email: Optional[str] = None) -> dict:
|
|
"""Create a new user."""
|
|
async with get_connection() as conn:
|
|
row = await conn.fetchrow(
|
|
"""INSERT INTO users (username, password_hash, email)
|
|
VALUES ($1, $2, $3)
|
|
RETURNING username, password_hash, email, created_at""",
|
|
username, password_hash, email
|
|
)
|
|
return dict(row)
|
|
|
|
|
|
async def user_exists(username: str) -> bool:
|
|
"""Check if user exists."""
|
|
async with get_connection() as conn:
|
|
result = await conn.fetchval(
|
|
"SELECT EXISTS(SELECT 1 FROM users WHERE username = $1)",
|
|
username
|
|
)
|
|
return result
|
|
|
|
|
|
# ============ Assets ============
|
|
|
|
async def get_asset(name: str) -> Optional[dict]:
|
|
"""Get asset by name."""
|
|
async with get_connection() as conn:
|
|
row = await conn.fetchrow(
|
|
"""SELECT name, content_hash, ipfs_cid, asset_type, tags, metadata, url,
|
|
provenance, description, origin, owner, created_at, updated_at
|
|
FROM assets WHERE name = $1""",
|
|
name
|
|
)
|
|
if row:
|
|
return _parse_asset_row(row)
|
|
return None
|
|
|
|
|
|
async def get_asset_by_hash(content_hash: str) -> Optional[dict]:
|
|
"""Get asset by content hash."""
|
|
async with get_connection() as conn:
|
|
row = await conn.fetchrow(
|
|
"""SELECT name, content_hash, ipfs_cid, asset_type, tags, metadata, url,
|
|
provenance, description, origin, owner, created_at, updated_at
|
|
FROM assets WHERE content_hash = $1""",
|
|
content_hash
|
|
)
|
|
if row:
|
|
return _parse_asset_row(row)
|
|
return None
|
|
|
|
|
|
async def get_asset_by_run_id(run_id: str) -> Optional[dict]:
|
|
"""Get asset by run_id stored in provenance."""
|
|
async with get_connection() as conn:
|
|
row = await conn.fetchrow(
|
|
"""SELECT name, content_hash, ipfs_cid, asset_type, tags, metadata, url,
|
|
provenance, description, origin, owner, created_at, updated_at
|
|
FROM assets WHERE provenance->>'run_id' = $1""",
|
|
run_id
|
|
)
|
|
if row:
|
|
return _parse_asset_row(row)
|
|
return None
|
|
|
|
|
|
async def get_all_assets() -> dict[str, dict]:
|
|
"""Get all assets as a dict indexed by name."""
|
|
async with get_connection() as conn:
|
|
rows = await conn.fetch(
|
|
"""SELECT name, content_hash, ipfs_cid, asset_type, tags, metadata, url,
|
|
provenance, description, origin, owner, created_at, updated_at
|
|
FROM assets ORDER BY created_at DESC"""
|
|
)
|
|
return {row["name"]: _parse_asset_row(row) for row in rows}
|
|
|
|
|
|
async def get_assets_paginated(limit: int = 100, offset: int = 0) -> tuple[list[tuple[str, dict]], int]:
|
|
"""Get paginated assets, returns (list of (name, asset) tuples, total_count)."""
|
|
async with get_connection() as conn:
|
|
total = await conn.fetchval("SELECT COUNT(*) FROM assets")
|
|
rows = await conn.fetch(
|
|
"""SELECT name, content_hash, ipfs_cid, asset_type, tags, metadata, url,
|
|
provenance, description, origin, owner, created_at, updated_at
|
|
FROM assets ORDER BY created_at DESC LIMIT $1 OFFSET $2""",
|
|
limit, offset
|
|
)
|
|
return [(row["name"], _parse_asset_row(row)) for row in rows], total
|
|
|
|
|
|
async def get_assets_by_owner(owner: str) -> dict[str, dict]:
|
|
"""Get all assets owned by a user."""
|
|
async with get_connection() as conn:
|
|
rows = await conn.fetch(
|
|
"""SELECT name, content_hash, ipfs_cid, asset_type, tags, metadata, url,
|
|
provenance, description, origin, owner, created_at, updated_at
|
|
FROM assets WHERE owner = $1 ORDER BY created_at DESC""",
|
|
owner
|
|
)
|
|
return {row["name"]: _parse_asset_row(row) for row in rows}
|
|
|
|
|
|
async def create_asset(asset: dict) -> dict:
|
|
"""Create a new asset."""
|
|
async with get_connection() as conn:
|
|
row = await conn.fetchrow(
|
|
"""INSERT INTO assets (name, content_hash, ipfs_cid, asset_type, tags, metadata,
|
|
url, provenance, description, origin, owner, created_at)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
|
|
RETURNING *""",
|
|
asset["name"],
|
|
asset["content_hash"],
|
|
asset.get("ipfs_cid"),
|
|
asset["asset_type"],
|
|
json.dumps(asset.get("tags", [])),
|
|
json.dumps(asset.get("metadata", {})),
|
|
asset.get("url"),
|
|
json.dumps(asset.get("provenance")) if asset.get("provenance") else None,
|
|
asset.get("description"),
|
|
json.dumps(asset.get("origin")) if asset.get("origin") else None,
|
|
asset["owner"],
|
|
_parse_timestamp(asset.get("created_at"))
|
|
)
|
|
return _parse_asset_row(row)
|
|
|
|
|
|
async def update_asset(name: str, updates: dict) -> Optional[dict]:
|
|
"""Update an existing asset."""
|
|
# Build dynamic UPDATE query
|
|
set_clauses = []
|
|
values = []
|
|
idx = 1
|
|
|
|
for key, value in updates.items():
|
|
if key in ("tags", "metadata", "provenance", "origin"):
|
|
set_clauses.append(f"{key} = ${idx}")
|
|
values.append(json.dumps(value) if value is not None else None)
|
|
else:
|
|
set_clauses.append(f"{key} = ${idx}")
|
|
values.append(value)
|
|
idx += 1
|
|
|
|
set_clauses.append(f"updated_at = ${idx}")
|
|
values.append(datetime.now(timezone.utc))
|
|
idx += 1
|
|
|
|
values.append(name) # WHERE clause
|
|
|
|
async with get_connection() as conn:
|
|
row = await conn.fetchrow(
|
|
f"""UPDATE assets SET {', '.join(set_clauses)}
|
|
WHERE name = ${idx} RETURNING *""",
|
|
*values
|
|
)
|
|
if row:
|
|
return _parse_asset_row(row)
|
|
return None
|
|
|
|
|
|
async def asset_exists(name: str) -> bool:
|
|
"""Check if asset exists."""
|
|
async with get_connection() as conn:
|
|
return await conn.fetchval(
|
|
"SELECT EXISTS(SELECT 1 FROM assets WHERE name = $1)",
|
|
name
|
|
)
|
|
|
|
|
|
def _parse_asset_row(row) -> dict:
|
|
"""Parse a database row into an asset dict, handling JSONB fields."""
|
|
asset = dict(row)
|
|
# Convert datetime to ISO string
|
|
if asset.get("created_at"):
|
|
asset["created_at"] = asset["created_at"].isoformat()
|
|
if asset.get("updated_at"):
|
|
asset["updated_at"] = asset["updated_at"].isoformat()
|
|
# Ensure JSONB fields are dicts (handle string case)
|
|
for field in ("tags", "metadata", "provenance", "origin"):
|
|
if isinstance(asset.get(field), str):
|
|
try:
|
|
asset[field] = json.loads(asset[field])
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
return asset
|
|
|
|
|
|
# ============ Assets (Transaction variants) ============
|
|
|
|
async def get_asset_by_hash_tx(conn, content_hash: str) -> Optional[dict]:
|
|
"""Get asset by content hash within a transaction."""
|
|
row = await conn.fetchrow(
|
|
"""SELECT name, content_hash, ipfs_cid, asset_type, tags, metadata, url,
|
|
provenance, description, origin, owner, created_at, updated_at
|
|
FROM assets WHERE content_hash = $1""",
|
|
content_hash
|
|
)
|
|
if row:
|
|
return _parse_asset_row(row)
|
|
return None
|
|
|
|
|
|
async def asset_exists_by_name_tx(conn, name: str) -> bool:
|
|
"""Check if asset name exists within a transaction."""
|
|
return await conn.fetchval(
|
|
"SELECT EXISTS(SELECT 1 FROM assets WHERE name = $1)",
|
|
name
|
|
)
|
|
|
|
|
|
async def create_asset_tx(conn, asset: dict) -> dict:
|
|
"""Create a new asset within a transaction."""
|
|
row = await conn.fetchrow(
|
|
"""INSERT INTO assets (name, content_hash, ipfs_cid, asset_type, tags, metadata,
|
|
url, provenance, description, origin, owner, created_at)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
|
|
RETURNING *""",
|
|
asset["name"],
|
|
asset["content_hash"],
|
|
asset.get("ipfs_cid"),
|
|
asset["asset_type"],
|
|
json.dumps(asset.get("tags", [])),
|
|
json.dumps(asset.get("metadata", {})),
|
|
asset.get("url"),
|
|
json.dumps(asset.get("provenance")) if asset.get("provenance") else None,
|
|
asset.get("description"),
|
|
json.dumps(asset.get("origin")) if asset.get("origin") else None,
|
|
asset["owner"],
|
|
_parse_timestamp(asset.get("created_at"))
|
|
)
|
|
return _parse_asset_row(row)
|
|
|
|
|
|
# ============ Activities ============
|
|
|
|
async def get_activity(activity_id: str) -> Optional[dict]:
|
|
"""Get activity by ID."""
|
|
async with get_connection() as conn:
|
|
row = await conn.fetchrow(
|
|
"""SELECT activity_id, activity_type, actor_id, object_data, published, signature
|
|
FROM activities WHERE activity_id = $1""",
|
|
activity_id
|
|
)
|
|
if row:
|
|
return _parse_activity_row(row)
|
|
return None
|
|
|
|
|
|
async def get_activity_by_index(index: int) -> Optional[dict]:
|
|
"""Get activity by index (for backward compatibility with URL scheme)."""
|
|
async with get_connection() as conn:
|
|
row = await conn.fetchrow(
|
|
"""SELECT activity_id, activity_type, actor_id, object_data, published, signature
|
|
FROM activities ORDER BY published ASC LIMIT 1 OFFSET $1""",
|
|
index
|
|
)
|
|
if row:
|
|
return _parse_activity_row(row)
|
|
return None
|
|
|
|
|
|
async def get_all_activities() -> list[dict]:
|
|
"""Get all activities ordered by published date (oldest first for index compatibility)."""
|
|
async with get_connection() as conn:
|
|
rows = await conn.fetch(
|
|
"""SELECT activity_id, activity_type, actor_id, object_data, published, signature
|
|
FROM activities ORDER BY published ASC"""
|
|
)
|
|
return [_parse_activity_row(row) for row in rows]
|
|
|
|
|
|
async def get_activities_paginated(limit: int = 100, offset: int = 0) -> tuple[list[dict], int]:
|
|
"""Get paginated activities (newest first), returns (activities, total_count)."""
|
|
async with get_connection() as conn:
|
|
total = await conn.fetchval("SELECT COUNT(*) FROM activities")
|
|
rows = await conn.fetch(
|
|
"""SELECT activity_id, activity_type, actor_id, object_data, published, signature
|
|
FROM activities ORDER BY published DESC LIMIT $1 OFFSET $2""",
|
|
limit, offset
|
|
)
|
|
return [_parse_activity_row(row) for row in rows], total
|
|
|
|
|
|
async def get_activities_by_actor(actor_id: str) -> list[dict]:
|
|
"""Get all activities by an actor."""
|
|
async with get_connection() as conn:
|
|
rows = await conn.fetch(
|
|
"""SELECT activity_id, activity_type, actor_id, object_data, published, signature
|
|
FROM activities WHERE actor_id = $1 ORDER BY published DESC""",
|
|
actor_id
|
|
)
|
|
return [_parse_activity_row(row) for row in rows]
|
|
|
|
|
|
async def create_activity(activity: dict) -> dict:
|
|
"""Create a new activity."""
|
|
async with get_connection() as conn:
|
|
row = await conn.fetchrow(
|
|
"""INSERT INTO activities (activity_id, activity_type, actor_id, object_data, published, signature)
|
|
VALUES ($1, $2, $3, $4, $5, $6)
|
|
RETURNING *""",
|
|
activity["activity_id"],
|
|
activity["activity_type"],
|
|
activity["actor_id"],
|
|
json.dumps(activity["object_data"]),
|
|
_parse_timestamp(activity["published"]),
|
|
json.dumps(activity.get("signature")) if activity.get("signature") else None
|
|
)
|
|
return _parse_activity_row(row)
|
|
|
|
|
|
async def count_activities() -> int:
|
|
"""Get total activity count."""
|
|
async with get_connection() as conn:
|
|
return await conn.fetchval("SELECT COUNT(*) FROM activities")
|
|
|
|
|
|
def _parse_activity_row(row) -> dict:
|
|
"""Parse a database row into an activity dict, handling JSONB fields."""
|
|
activity = dict(row)
|
|
# Convert datetime to ISO string
|
|
if activity.get("published"):
|
|
activity["published"] = activity["published"].isoformat()
|
|
# Ensure JSONB fields are dicts (handle string case)
|
|
for field in ("object_data", "signature"):
|
|
if isinstance(activity.get(field), str):
|
|
try:
|
|
activity[field] = json.loads(activity[field])
|
|
except (json.JSONDecodeError, TypeError):
|
|
pass
|
|
return activity
|
|
|
|
|
|
# ============ Activities (Transaction variants) ============
|
|
|
|
async def create_activity_tx(conn, activity: dict) -> dict:
|
|
"""Create a new activity within a transaction."""
|
|
row = await conn.fetchrow(
|
|
"""INSERT INTO activities (activity_id, activity_type, actor_id, object_data, published, signature)
|
|
VALUES ($1, $2, $3, $4, $5, $6)
|
|
RETURNING *""",
|
|
activity["activity_id"],
|
|
activity["activity_type"],
|
|
activity["actor_id"],
|
|
json.dumps(activity["object_data"]),
|
|
_parse_timestamp(activity["published"]),
|
|
json.dumps(activity.get("signature")) if activity.get("signature") else None
|
|
)
|
|
return _parse_activity_row(row)
|
|
|
|
|
|
# ============ Followers ============
|
|
|
|
async def get_followers(username: str) -> list[dict]:
|
|
"""Get followers for a user."""
|
|
async with get_connection() as conn:
|
|
rows = await conn.fetch(
|
|
"""SELECT id, username, acct, url, public_key, created_at
|
|
FROM followers WHERE username = $1""",
|
|
username
|
|
)
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
async def get_all_followers() -> list:
|
|
"""Get all followers (for backward compatibility with old global list)."""
|
|
async with get_connection() as conn:
|
|
rows = await conn.fetch(
|
|
"""SELECT DISTINCT url FROM followers"""
|
|
)
|
|
return [row["url"] for row in rows]
|
|
|
|
|
|
async def add_follower(username: str, acct: str, url: str, public_key: Optional[str] = None) -> dict:
|
|
"""Add a follower."""
|
|
async with get_connection() as conn:
|
|
row = await conn.fetchrow(
|
|
"""INSERT INTO followers (username, acct, url, public_key)
|
|
VALUES ($1, $2, $3, $4)
|
|
ON CONFLICT (username, acct) DO UPDATE SET url = $3, public_key = $4
|
|
RETURNING *""",
|
|
username, acct, url, public_key
|
|
)
|
|
return dict(row)
|
|
|
|
|
|
async def remove_follower(username: str, acct: str) -> bool:
|
|
"""Remove a follower."""
|
|
async with get_connection() as conn:
|
|
result = await conn.execute(
|
|
"DELETE FROM followers WHERE username = $1 AND acct = $2",
|
|
username, acct
|
|
)
|
|
return result == "DELETE 1"
|
|
|
|
|
|
# ============ Stats ============
|
|
|
|
async def get_stats() -> dict:
|
|
"""Get counts for dashboard."""
|
|
async with get_connection() as conn:
|
|
assets = await conn.fetchval("SELECT COUNT(*) FROM assets")
|
|
activities = await conn.fetchval("SELECT COUNT(*) FROM activities")
|
|
users = await conn.fetchval("SELECT COUNT(*) FROM users")
|
|
return {"assets": assets, "activities": activities, "users": users}
|
|
|
|
|
|
# ============ Anchors (Bitcoin timestamps) ============
|
|
|
|
async def get_unanchored_activities() -> list[dict]:
|
|
"""Get all activities not yet anchored to Bitcoin."""
|
|
async with get_connection() as conn:
|
|
rows = await conn.fetch(
|
|
"""SELECT activity_id, activity_type, actor_id, object_data, published, signature
|
|
FROM activities WHERE anchor_root IS NULL ORDER BY published ASC"""
|
|
)
|
|
return [_parse_activity_row(row) for row in rows]
|
|
|
|
|
|
async def create_anchor(anchor: dict) -> dict:
|
|
"""Create an anchor record."""
|
|
async with get_connection() as conn:
|
|
row = await conn.fetchrow(
|
|
"""INSERT INTO anchors (merkle_root, tree_ipfs_cid, ots_proof_cid,
|
|
activity_count, first_activity_id, last_activity_id)
|
|
VALUES ($1, $2, $3, $4, $5, $6)
|
|
RETURNING *""",
|
|
anchor["merkle_root"],
|
|
anchor.get("tree_ipfs_cid"),
|
|
anchor.get("ots_proof_cid"),
|
|
anchor["activity_count"],
|
|
anchor.get("first_activity_id"),
|
|
anchor.get("last_activity_id")
|
|
)
|
|
return dict(row)
|
|
|
|
|
|
async def mark_activities_anchored(activity_ids: list[str], merkle_root: str) -> int:
|
|
"""Mark activities as anchored with the given merkle root."""
|
|
async with get_connection() as conn:
|
|
result = await conn.execute(
|
|
"""UPDATE activities SET anchor_root = $1
|
|
WHERE activity_id = ANY($2::text[])""",
|
|
merkle_root,
|
|
activity_ids
|
|
)
|
|
# Returns "UPDATE N"
|
|
return int(result.split()[1]) if result else 0
|
|
|
|
|
|
async def get_anchor(merkle_root: str) -> Optional[dict]:
|
|
"""Get anchor by merkle root."""
|
|
async with get_connection() as conn:
|
|
row = await conn.fetchrow(
|
|
"SELECT * FROM anchors WHERE merkle_root = $1",
|
|
merkle_root
|
|
)
|
|
if row:
|
|
result = dict(row)
|
|
if result.get("first_activity_id"):
|
|
result["first_activity_id"] = str(result["first_activity_id"])
|
|
if result.get("last_activity_id"):
|
|
result["last_activity_id"] = str(result["last_activity_id"])
|
|
if result.get("created_at"):
|
|
result["created_at"] = result["created_at"].isoformat()
|
|
if result.get("confirmed_at"):
|
|
result["confirmed_at"] = result["confirmed_at"].isoformat()
|
|
return result
|
|
return None
|
|
|
|
|
|
async def get_all_anchors() -> list[dict]:
|
|
"""Get all anchors, newest first."""
|
|
async with get_connection() as conn:
|
|
rows = await conn.fetch(
|
|
"SELECT * FROM anchors ORDER BY created_at DESC"
|
|
)
|
|
results = []
|
|
for row in rows:
|
|
result = dict(row)
|
|
if result.get("first_activity_id"):
|
|
result["first_activity_id"] = str(result["first_activity_id"])
|
|
if result.get("last_activity_id"):
|
|
result["last_activity_id"] = str(result["last_activity_id"])
|
|
if result.get("created_at"):
|
|
result["created_at"] = result["created_at"].isoformat()
|
|
if result.get("confirmed_at"):
|
|
result["confirmed_at"] = result["confirmed_at"].isoformat()
|
|
results.append(result)
|
|
return results
|
|
|
|
|
|
async def update_anchor_confirmed(merkle_root: str, bitcoin_txid: str) -> bool:
|
|
"""Mark anchor as confirmed with Bitcoin txid."""
|
|
async with get_connection() as conn:
|
|
result = await conn.execute(
|
|
"""UPDATE anchors SET confirmed_at = NOW(), bitcoin_txid = $1
|
|
WHERE merkle_root = $2""",
|
|
bitcoin_txid, merkle_root
|
|
)
|
|
return result == "UPDATE 1"
|
|
|
|
|
|
async def get_anchor_stats() -> dict:
|
|
"""Get anchoring statistics."""
|
|
async with get_connection() as conn:
|
|
total_anchors = await conn.fetchval("SELECT COUNT(*) FROM anchors")
|
|
confirmed_anchors = await conn.fetchval(
|
|
"SELECT COUNT(*) FROM anchors WHERE confirmed_at IS NOT NULL"
|
|
)
|
|
pending_anchors = await conn.fetchval(
|
|
"SELECT COUNT(*) FROM anchors WHERE confirmed_at IS NULL"
|
|
)
|
|
anchored_activities = await conn.fetchval(
|
|
"SELECT COUNT(*) FROM activities WHERE anchor_root IS NOT NULL"
|
|
)
|
|
unanchored_activities = await conn.fetchval(
|
|
"SELECT COUNT(*) FROM activities WHERE anchor_root IS NULL"
|
|
)
|
|
return {
|
|
"total_anchors": total_anchors,
|
|
"confirmed_anchors": confirmed_anchors,
|
|
"pending_anchors": pending_anchors,
|
|
"anchored_activities": anchored_activities,
|
|
"unanchored_activities": unanchored_activities
|
|
}
|
|
|
|
|
|
# ============ User Renderers (L1 attachments) ============
|
|
|
|
async def get_user_renderers(username: str) -> list[str]:
|
|
"""Get L1 renderer URLs attached by a user."""
|
|
async with get_connection() as conn:
|
|
rows = await conn.fetch(
|
|
"SELECT l1_url FROM user_renderers WHERE username = $1 ORDER BY attached_at",
|
|
username
|
|
)
|
|
return [row["l1_url"] for row in rows]
|
|
|
|
|
|
async def attach_renderer(username: str, l1_url: str) -> bool:
|
|
"""Attach a user to an L1 renderer. Returns True if newly attached."""
|
|
async with get_connection() as conn:
|
|
try:
|
|
await conn.execute(
|
|
"""INSERT INTO user_renderers (username, l1_url)
|
|
VALUES ($1, $2)
|
|
ON CONFLICT (username, l1_url) DO NOTHING""",
|
|
username, l1_url
|
|
)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
async def detach_renderer(username: str, l1_url: str) -> bool:
|
|
"""Detach a user from an L1 renderer. Returns True if was attached."""
|
|
async with get_connection() as conn:
|
|
result = await conn.execute(
|
|
"DELETE FROM user_renderers WHERE username = $1 AND l1_url = $2",
|
|
username, l1_url
|
|
)
|
|
return "DELETE 1" in result
|
|
|
|
|
|
# ============ User Storage ============
|
|
|
|
async def get_user_storage(username: str) -> list[dict]:
|
|
"""Get all storage providers for a user."""
|
|
async with get_connection() as conn:
|
|
rows = await conn.fetch(
|
|
"""SELECT id, username, provider_type, provider_name, description, config,
|
|
capacity_gb, is_active, created_at, updated_at
|
|
FROM user_storage WHERE username = $1
|
|
ORDER BY provider_type, created_at""",
|
|
username
|
|
)
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
async def get_user_storage_by_type(username: str, provider_type: str) -> list[dict]:
|
|
"""Get storage providers of a specific type for a user."""
|
|
async with get_connection() as conn:
|
|
rows = await conn.fetch(
|
|
"""SELECT id, username, provider_type, provider_name, description, config,
|
|
capacity_gb, is_active, created_at, updated_at
|
|
FROM user_storage WHERE username = $1 AND provider_type = $2
|
|
ORDER BY created_at""",
|
|
username, provider_type
|
|
)
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
async def get_storage_by_id(storage_id: int) -> Optional[dict]:
|
|
"""Get a storage provider by ID."""
|
|
async with get_connection() as conn:
|
|
row = await conn.fetchrow(
|
|
"""SELECT id, username, provider_type, provider_name, description, config,
|
|
capacity_gb, is_active, created_at, updated_at
|
|
FROM user_storage WHERE id = $1""",
|
|
storage_id
|
|
)
|
|
return dict(row) if row else None
|
|
|
|
|
|
async def add_user_storage(
|
|
username: str,
|
|
provider_type: str,
|
|
provider_name: str,
|
|
config: dict,
|
|
capacity_gb: int,
|
|
description: Optional[str] = None
|
|
) -> Optional[int]:
|
|
"""Add a storage provider for a user. Returns storage ID."""
|
|
async with get_connection() as conn:
|
|
try:
|
|
row = await conn.fetchrow(
|
|
"""INSERT INTO user_storage (username, provider_type, provider_name, description, config, capacity_gb)
|
|
VALUES ($1, $2, $3, $4, $5, $6)
|
|
RETURNING id""",
|
|
username, provider_type, provider_name, description, json.dumps(config), capacity_gb
|
|
)
|
|
return row["id"] if row else None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
async def update_user_storage(
|
|
storage_id: int,
|
|
provider_name: Optional[str] = None,
|
|
description: Optional[str] = None,
|
|
config: Optional[dict] = None,
|
|
capacity_gb: Optional[int] = None,
|
|
is_active: Optional[bool] = None
|
|
) -> bool:
|
|
"""Update a storage provider."""
|
|
updates = []
|
|
params = []
|
|
param_num = 1
|
|
|
|
if provider_name is not None:
|
|
updates.append(f"provider_name = ${param_num}")
|
|
params.append(provider_name)
|
|
param_num += 1
|
|
if description is not None:
|
|
updates.append(f"description = ${param_num}")
|
|
params.append(description)
|
|
param_num += 1
|
|
if config is not None:
|
|
updates.append(f"config = ${param_num}")
|
|
params.append(json.dumps(config))
|
|
param_num += 1
|
|
if capacity_gb is not None:
|
|
updates.append(f"capacity_gb = ${param_num}")
|
|
params.append(capacity_gb)
|
|
param_num += 1
|
|
if is_active is not None:
|
|
updates.append(f"is_active = ${param_num}")
|
|
params.append(is_active)
|
|
param_num += 1
|
|
|
|
if not updates:
|
|
return False
|
|
|
|
updates.append("updated_at = NOW()")
|
|
params.append(storage_id)
|
|
|
|
async with get_connection() as conn:
|
|
result = await conn.execute(
|
|
f"UPDATE user_storage SET {', '.join(updates)} WHERE id = ${param_num}",
|
|
*params
|
|
)
|
|
return "UPDATE 1" in result
|
|
|
|
|
|
async def remove_user_storage(storage_id: int) -> bool:
|
|
"""Remove a storage provider. Cascades to storage_pins."""
|
|
async with get_connection() as conn:
|
|
result = await conn.execute(
|
|
"DELETE FROM user_storage WHERE id = $1",
|
|
storage_id
|
|
)
|
|
return "DELETE 1" in result
|
|
|
|
|
|
async def get_storage_usage(storage_id: int) -> dict:
|
|
"""Get storage usage stats for a provider."""
|
|
async with get_connection() as conn:
|
|
row = await conn.fetchrow(
|
|
"""SELECT
|
|
COUNT(*) as pin_count,
|
|
COALESCE(SUM(size_bytes), 0) as used_bytes
|
|
FROM storage_pins WHERE storage_id = $1""",
|
|
storage_id
|
|
)
|
|
return {"pin_count": row["pin_count"], "used_bytes": row["used_bytes"]}
|
|
|
|
|
|
async def add_storage_pin(
|
|
content_hash: str,
|
|
storage_id: int,
|
|
ipfs_cid: Optional[str],
|
|
pin_type: str,
|
|
size_bytes: int
|
|
) -> Optional[int]:
|
|
"""Add a pin record. Returns pin ID."""
|
|
async with get_connection() as conn:
|
|
try:
|
|
row = await conn.fetchrow(
|
|
"""INSERT INTO storage_pins (content_hash, storage_id, ipfs_cid, pin_type, size_bytes)
|
|
VALUES ($1, $2, $3, $4, $5)
|
|
ON CONFLICT (content_hash, storage_id) DO UPDATE SET
|
|
ipfs_cid = EXCLUDED.ipfs_cid,
|
|
pin_type = EXCLUDED.pin_type,
|
|
size_bytes = EXCLUDED.size_bytes,
|
|
pinned_at = NOW()
|
|
RETURNING id""",
|
|
content_hash, storage_id, ipfs_cid, pin_type, size_bytes
|
|
)
|
|
return row["id"] if row else None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
async def remove_storage_pin(content_hash: str, storage_id: int) -> bool:
|
|
"""Remove a pin record."""
|
|
async with get_connection() as conn:
|
|
result = await conn.execute(
|
|
"DELETE FROM storage_pins WHERE content_hash = $1 AND storage_id = $2",
|
|
content_hash, storage_id
|
|
)
|
|
return "DELETE 1" in result
|
|
|
|
|
|
async def get_pins_for_content(content_hash: str) -> list[dict]:
|
|
"""Get all storage locations where content is pinned."""
|
|
async with get_connection() as conn:
|
|
rows = await conn.fetch(
|
|
"""SELECT sp.*, us.provider_type, us.provider_name, us.username
|
|
FROM storage_pins sp
|
|
JOIN user_storage us ON sp.storage_id = us.id
|
|
WHERE sp.content_hash = $1""",
|
|
content_hash
|
|
)
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
async def get_all_active_storage() -> list[dict]:
|
|
"""Get all active storage providers (for distributed pinning)."""
|
|
async with get_connection() as conn:
|
|
rows = await conn.fetch(
|
|
"""SELECT us.id, us.username, us.provider_type, us.provider_name, us.description,
|
|
us.config, us.capacity_gb, us.is_active, us.created_at, us.updated_at,
|
|
COALESCE(SUM(sp.size_bytes), 0) as used_bytes,
|
|
COUNT(sp.id) as pin_count
|
|
FROM user_storage us
|
|
LEFT JOIN storage_pins sp ON us.id = sp.storage_id
|
|
WHERE us.is_active = true
|
|
GROUP BY us.id
|
|
ORDER BY us.provider_type, us.created_at"""
|
|
)
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
# ============ Token Revocation ============
|
|
|
|
async def revoke_token(token_hash: str, username: str, expires_at) -> bool:
|
|
"""Revoke a token. Returns True if newly revoked."""
|
|
async with get_connection() as conn:
|
|
try:
|
|
await conn.execute(
|
|
"""INSERT INTO revoked_tokens (token_hash, username, expires_at)
|
|
VALUES ($1, $2, $3)
|
|
ON CONFLICT (token_hash) DO NOTHING""",
|
|
token_hash, username, expires_at
|
|
)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
async def is_token_revoked(token_hash: str) -> bool:
|
|
"""Check if a token has been revoked."""
|
|
async with get_connection() as conn:
|
|
row = await conn.fetchrow(
|
|
"SELECT 1 FROM revoked_tokens WHERE token_hash = $1 AND expires_at > NOW()",
|
|
token_hash
|
|
)
|
|
return row is not None
|
|
|
|
|
|
async def cleanup_expired_revocations() -> int:
|
|
"""Remove expired revocation entries. Returns count removed."""
|
|
async with get_connection() as conn:
|
|
result = await conn.execute(
|
|
"DELETE FROM revoked_tokens WHERE expires_at < NOW()"
|
|
)
|
|
# Extract count from "DELETE N"
|
|
try:
|
|
return int(result.split()[-1])
|
|
except (ValueError, IndexError):
|
|
return 0
|