Files
activity-pub/db.py
giles f8f44945ab Fix db function calls and add missing functions
- Fix get_activities to use get_activities_paginated
- Add get_user_assets, delete_asset, count_users, count_user_activities
- Add get_user_activities, get_renderer, update_anchor, delete_anchor
- Add record_run and get_run functions
- Fix create_asset calls to use dict parameter
- Fix update_asset call signature

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-11 13:13:40 +00:00

1217 lines
43 KiB
Python

"""
Database module for Art DAG L2 Server.
Uses asyncpg for async PostgreSQL access with connection pooling.
"""
import json
import os
from datetime import datetime, timezone
from typing import Optional
from contextlib import asynccontextmanager
import asyncpg
# Connection pool (initialized on startup)
def _parse_timestamp(ts) -> datetime:
"""Parse a timestamp string or datetime to datetime object."""
if ts is None:
return datetime.now(timezone.utc)
if isinstance(ts, datetime):
return ts
# Parse ISO format string
if isinstance(ts, str):
if ts.endswith('Z'):
ts = ts[:-1] + '+00:00'
return datetime.fromisoformat(ts)
return datetime.now(timezone.utc)
_pool: Optional[asyncpg.Pool] = None
# Configuration from environment
DATABASE_URL = os.environ.get(
"DATABASE_URL",
"postgresql://artdag:artdag@localhost:5432/artdag"
)
# Schema for database initialization
SCHEMA = """
-- Users table
CREATE TABLE IF NOT EXISTS users (
username VARCHAR(255) PRIMARY KEY,
password_hash VARCHAR(255) NOT NULL,
email VARCHAR(255),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Assets table
CREATE TABLE IF NOT EXISTS assets (
name VARCHAR(255) PRIMARY KEY,
content_hash VARCHAR(128) NOT NULL,
ipfs_cid VARCHAR(128),
asset_type VARCHAR(50) NOT NULL,
tags JSONB DEFAULT '[]'::jsonb,
metadata JSONB DEFAULT '{}'::jsonb,
url TEXT,
provenance JSONB,
description TEXT,
origin JSONB,
owner VARCHAR(255) NOT NULL REFERENCES users(username),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ
);
-- Activities table (activity_id is content-addressable run_id hash)
CREATE TABLE IF NOT EXISTS activities (
activity_id VARCHAR(64) PRIMARY KEY,
activity_type VARCHAR(50) NOT NULL,
actor_id TEXT NOT NULL,
object_data JSONB NOT NULL,
published TIMESTAMPTZ NOT NULL,
signature JSONB,
anchor_root VARCHAR(64) -- Merkle root this activity is anchored to
);
-- Anchors table (Bitcoin timestamps via OpenTimestamps)
CREATE TABLE IF NOT EXISTS anchors (
id SERIAL PRIMARY KEY,
merkle_root VARCHAR(64) NOT NULL UNIQUE,
tree_ipfs_cid VARCHAR(128),
ots_proof_cid VARCHAR(128),
activity_count INTEGER NOT NULL,
first_activity_id VARCHAR(64),
last_activity_id VARCHAR(64),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
confirmed_at TIMESTAMPTZ,
bitcoin_txid VARCHAR(64)
);
-- Followers table
CREATE TABLE IF NOT EXISTS followers (
id SERIAL PRIMARY KEY,
username VARCHAR(255) NOT NULL REFERENCES users(username),
acct VARCHAR(255) NOT NULL,
url TEXT NOT NULL,
public_key TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(username, acct)
);
-- User's attached L1 renderers
CREATE TABLE IF NOT EXISTS user_renderers (
id SERIAL PRIMARY KEY,
username VARCHAR(255) NOT NULL REFERENCES users(username),
l1_url TEXT NOT NULL,
attached_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(username, l1_url)
);
-- Revoked tokens (for federated logout)
CREATE TABLE IF NOT EXISTS revoked_tokens (
token_hash VARCHAR(64) PRIMARY KEY,
username VARCHAR(255) NOT NULL,
revoked_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ NOT NULL
);
-- User storage providers (IPFS pinning services, local storage, etc.)
-- Users can have multiple configs of the same provider type
CREATE TABLE IF NOT EXISTS user_storage (
id SERIAL PRIMARY KEY,
username VARCHAR(255) NOT NULL REFERENCES users(username),
provider_type VARCHAR(50) NOT NULL, -- 'pinata', 'web3storage', 'nftstorage', 'infura', 'filebase', 'storj', 'local'
provider_name VARCHAR(255), -- User-friendly name
description TEXT, -- User description to distinguish configs
config JSONB NOT NULL DEFAULT '{}', -- API keys, endpoints, paths
capacity_gb INTEGER NOT NULL, -- Total capacity user is contributing
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Track what's stored where
CREATE TABLE IF NOT EXISTS storage_pins (
id SERIAL PRIMARY KEY,
content_hash VARCHAR(64) NOT NULL,
storage_id INTEGER NOT NULL REFERENCES user_storage(id) ON DELETE CASCADE,
ipfs_cid VARCHAR(128),
pin_type VARCHAR(20) NOT NULL, -- 'user_content', 'donated', 'system'
size_bytes BIGINT,
pinned_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(content_hash, storage_id)
);
-- Indexes
CREATE INDEX IF NOT EXISTS idx_users_created_at ON users(created_at);
CREATE INDEX IF NOT EXISTS idx_assets_content_hash ON assets(content_hash);
CREATE INDEX IF NOT EXISTS idx_assets_owner ON assets(owner);
CREATE INDEX IF NOT EXISTS idx_assets_created_at ON assets(created_at DESC);
CREATE INDEX IF NOT EXISTS idx_assets_tags ON assets USING GIN(tags);
CREATE INDEX IF NOT EXISTS idx_activities_actor_id ON activities(actor_id);
CREATE INDEX IF NOT EXISTS idx_activities_published ON activities(published DESC);
CREATE INDEX IF NOT EXISTS idx_activities_anchor ON activities(anchor_root);
CREATE INDEX IF NOT EXISTS idx_anchors_created ON anchors(created_at DESC);
CREATE INDEX IF NOT EXISTS idx_followers_username ON followers(username);
CREATE INDEX IF NOT EXISTS idx_revoked_tokens_expires ON revoked_tokens(expires_at);
CREATE INDEX IF NOT EXISTS idx_user_storage_username ON user_storage(username);
CREATE INDEX IF NOT EXISTS idx_storage_pins_hash ON storage_pins(content_hash);
CREATE INDEX IF NOT EXISTS idx_storage_pins_storage ON storage_pins(storage_id);
-- Add source URL columns to assets if they don't exist
DO $$ BEGIN
ALTER TABLE assets ADD COLUMN source_url TEXT;
EXCEPTION WHEN duplicate_column THEN NULL;
END $$;
DO $$ BEGIN
ALTER TABLE assets ADD COLUMN source_type VARCHAR(50);
EXCEPTION WHEN duplicate_column THEN NULL;
END $$;
-- Add description column to user_storage if it doesn't exist
DO $$ BEGIN
ALTER TABLE user_storage ADD COLUMN description TEXT;
EXCEPTION WHEN duplicate_column THEN NULL;
END $$;
"""
async def init_pool():
"""Initialize the connection pool and create tables. Call on app startup."""
global _pool
_pool = await asyncpg.create_pool(
DATABASE_URL,
min_size=2,
max_size=10,
command_timeout=60
)
# Create tables if they don't exist
async with _pool.acquire() as conn:
await conn.execute(SCHEMA)
async def close_pool():
"""Close the connection pool. Call on app shutdown."""
global _pool
if _pool:
await _pool.close()
_pool = None
def get_pool() -> asyncpg.Pool:
"""Get the connection pool."""
if _pool is None:
raise RuntimeError("Database pool not initialized")
return _pool
@asynccontextmanager
async def get_connection():
"""Get a connection from the pool."""
async with get_pool().acquire() as conn:
yield conn
@asynccontextmanager
async def transaction():
"""
Get a connection with an active transaction.
Usage:
async with db.transaction() as conn:
await create_asset_tx(conn, asset1)
await create_asset_tx(conn, asset2)
await create_activity_tx(conn, activity)
# Commits on exit, rolls back on exception
"""
async with get_pool().acquire() as conn:
async with conn.transaction():
yield conn
# ============ Users ============
async def get_user(username: str) -> Optional[dict]:
"""Get user by username."""
async with get_connection() as conn:
row = await conn.fetchrow(
"SELECT username, password_hash, email, created_at FROM users WHERE username = $1",
username
)
if row:
return dict(row)
return None
async def get_all_users() -> dict[str, dict]:
"""Get all users as a dict indexed by username."""
async with get_connection() as conn:
rows = await conn.fetch(
"SELECT username, password_hash, email, created_at FROM users ORDER BY username"
)
return {row["username"]: dict(row) for row in rows}
async def create_user(username: str, password_hash: str, email: Optional[str] = None) -> dict:
"""Create a new user."""
async with get_connection() as conn:
row = await conn.fetchrow(
"""INSERT INTO users (username, password_hash, email)
VALUES ($1, $2, $3)
RETURNING username, password_hash, email, created_at""",
username, password_hash, email
)
return dict(row)
async def user_exists(username: str) -> bool:
"""Check if user exists."""
async with get_connection() as conn:
result = await conn.fetchval(
"SELECT EXISTS(SELECT 1 FROM users WHERE username = $1)",
username
)
return result
# ============ Assets ============
async def get_asset(name: str) -> Optional[dict]:
"""Get asset by name."""
async with get_connection() as conn:
row = await conn.fetchrow(
"""SELECT name, content_hash, ipfs_cid, asset_type, tags, metadata, url,
provenance, description, origin, owner, created_at, updated_at
FROM assets WHERE name = $1""",
name
)
if row:
return _parse_asset_row(row)
return None
async def get_asset_by_hash(content_hash: str) -> Optional[dict]:
"""Get asset by content hash."""
async with get_connection() as conn:
row = await conn.fetchrow(
"""SELECT name, content_hash, ipfs_cid, asset_type, tags, metadata, url,
provenance, description, origin, owner, created_at, updated_at
FROM assets WHERE content_hash = $1""",
content_hash
)
if row:
return _parse_asset_row(row)
return None
async def get_asset_by_run_id(run_id: str) -> Optional[dict]:
"""Get asset by run_id stored in provenance."""
async with get_connection() as conn:
row = await conn.fetchrow(
"""SELECT name, content_hash, ipfs_cid, asset_type, tags, metadata, url,
provenance, description, origin, owner, created_at, updated_at
FROM assets WHERE provenance->>'run_id' = $1""",
run_id
)
if row:
return _parse_asset_row(row)
return None
async def get_all_assets() -> dict[str, dict]:
"""Get all assets as a dict indexed by name."""
async with get_connection() as conn:
rows = await conn.fetch(
"""SELECT name, content_hash, ipfs_cid, asset_type, tags, metadata, url,
provenance, description, origin, owner, created_at, updated_at
FROM assets ORDER BY created_at DESC"""
)
return {row["name"]: _parse_asset_row(row) for row in rows}
async def get_assets_paginated(limit: int = 100, offset: int = 0) -> tuple[list[tuple[str, dict]], int]:
"""Get paginated assets, returns (list of (name, asset) tuples, total_count)."""
async with get_connection() as conn:
total = await conn.fetchval("SELECT COUNT(*) FROM assets")
rows = await conn.fetch(
"""SELECT name, content_hash, ipfs_cid, asset_type, tags, metadata, url,
provenance, description, origin, owner, created_at, updated_at
FROM assets ORDER BY created_at DESC LIMIT $1 OFFSET $2""",
limit, offset
)
return [(row["name"], _parse_asset_row(row)) for row in rows], total
async def get_assets_by_owner(owner: str) -> dict[str, dict]:
"""Get all assets owned by a user."""
async with get_connection() as conn:
rows = await conn.fetch(
"""SELECT name, content_hash, ipfs_cid, asset_type, tags, metadata, url,
provenance, description, origin, owner, created_at, updated_at
FROM assets WHERE owner = $1 ORDER BY created_at DESC""",
owner
)
return {row["name"]: _parse_asset_row(row) for row in rows}
async def create_asset(asset: dict) -> dict:
"""Create a new asset."""
async with get_connection() as conn:
row = await conn.fetchrow(
"""INSERT INTO assets (name, content_hash, ipfs_cid, asset_type, tags, metadata,
url, provenance, description, origin, owner, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
RETURNING *""",
asset["name"],
asset["content_hash"],
asset.get("ipfs_cid"),
asset["asset_type"],
json.dumps(asset.get("tags", [])),
json.dumps(asset.get("metadata", {})),
asset.get("url"),
json.dumps(asset.get("provenance")) if asset.get("provenance") else None,
asset.get("description"),
json.dumps(asset.get("origin")) if asset.get("origin") else None,
asset["owner"],
_parse_timestamp(asset.get("created_at"))
)
return _parse_asset_row(row)
async def update_asset(name: str, updates: dict) -> Optional[dict]:
"""Update an existing asset."""
# Build dynamic UPDATE query
set_clauses = []
values = []
idx = 1
for key, value in updates.items():
if key in ("tags", "metadata", "provenance", "origin"):
set_clauses.append(f"{key} = ${idx}")
values.append(json.dumps(value) if value is not None else None)
else:
set_clauses.append(f"{key} = ${idx}")
values.append(value)
idx += 1
set_clauses.append(f"updated_at = ${idx}")
values.append(datetime.now(timezone.utc))
idx += 1
values.append(name) # WHERE clause
async with get_connection() as conn:
row = await conn.fetchrow(
f"""UPDATE assets SET {', '.join(set_clauses)}
WHERE name = ${idx} RETURNING *""",
*values
)
if row:
return _parse_asset_row(row)
return None
async def asset_exists(name: str) -> bool:
"""Check if asset exists."""
async with get_connection() as conn:
return await conn.fetchval(
"SELECT EXISTS(SELECT 1 FROM assets WHERE name = $1)",
name
)
def _parse_asset_row(row) -> dict:
"""Parse a database row into an asset dict, handling JSONB fields."""
asset = dict(row)
# Convert datetime to ISO string
if asset.get("created_at"):
asset["created_at"] = asset["created_at"].isoformat()
if asset.get("updated_at"):
asset["updated_at"] = asset["updated_at"].isoformat()
# Ensure JSONB fields are dicts (handle string case)
for field in ("tags", "metadata", "provenance", "origin"):
if isinstance(asset.get(field), str):
try:
asset[field] = json.loads(asset[field])
except (json.JSONDecodeError, TypeError):
pass
return asset
# ============ Assets (Transaction variants) ============
async def get_asset_by_hash_tx(conn, content_hash: str) -> Optional[dict]:
"""Get asset by content hash within a transaction."""
row = await conn.fetchrow(
"""SELECT name, content_hash, ipfs_cid, asset_type, tags, metadata, url,
provenance, description, origin, owner, created_at, updated_at
FROM assets WHERE content_hash = $1""",
content_hash
)
if row:
return _parse_asset_row(row)
return None
async def asset_exists_by_name_tx(conn, name: str) -> bool:
"""Check if asset name exists within a transaction."""
return await conn.fetchval(
"SELECT EXISTS(SELECT 1 FROM assets WHERE name = $1)",
name
)
async def get_asset_by_name_tx(conn, name: str) -> Optional[dict]:
"""Get asset by name within a transaction."""
row = await conn.fetchrow(
"""SELECT name, content_hash, ipfs_cid, asset_type, tags, metadata, url,
provenance, description, origin, owner, created_at, updated_at
FROM assets WHERE name = $1""",
name
)
if row:
return _parse_asset_row(row)
return None
async def create_asset_tx(conn, asset: dict) -> dict:
"""Create a new asset within a transaction."""
row = await conn.fetchrow(
"""INSERT INTO assets (name, content_hash, ipfs_cid, asset_type, tags, metadata,
url, provenance, description, origin, owner, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
RETURNING *""",
asset["name"],
asset["content_hash"],
asset.get("ipfs_cid"),
asset["asset_type"],
json.dumps(asset.get("tags", [])),
json.dumps(asset.get("metadata", {})),
asset.get("url"),
json.dumps(asset.get("provenance")) if asset.get("provenance") else None,
asset.get("description"),
json.dumps(asset.get("origin")) if asset.get("origin") else None,
asset["owner"],
_parse_timestamp(asset.get("created_at"))
)
return _parse_asset_row(row)
# ============ Activities ============
async def get_activity(activity_id: str) -> Optional[dict]:
"""Get activity by ID."""
async with get_connection() as conn:
row = await conn.fetchrow(
"""SELECT activity_id, activity_type, actor_id, object_data, published, signature
FROM activities WHERE activity_id = $1""",
activity_id
)
if row:
return _parse_activity_row(row)
return None
async def get_activity_by_index(index: int) -> Optional[dict]:
"""Get activity by index (for backward compatibility with URL scheme)."""
async with get_connection() as conn:
row = await conn.fetchrow(
"""SELECT activity_id, activity_type, actor_id, object_data, published, signature
FROM activities ORDER BY published ASC LIMIT 1 OFFSET $1""",
index
)
if row:
return _parse_activity_row(row)
return None
async def get_all_activities() -> list[dict]:
"""Get all activities ordered by published date (oldest first for index compatibility)."""
async with get_connection() as conn:
rows = await conn.fetch(
"""SELECT activity_id, activity_type, actor_id, object_data, published, signature
FROM activities ORDER BY published ASC"""
)
return [_parse_activity_row(row) for row in rows]
async def get_activities_paginated(limit: int = 100, offset: int = 0) -> tuple[list[dict], int]:
"""Get paginated activities (newest first), returns (activities, total_count)."""
async with get_connection() as conn:
total = await conn.fetchval("SELECT COUNT(*) FROM activities")
rows = await conn.fetch(
"""SELECT activity_id, activity_type, actor_id, object_data, published, signature
FROM activities ORDER BY published DESC LIMIT $1 OFFSET $2""",
limit, offset
)
return [_parse_activity_row(row) for row in rows], total
async def get_activities_by_actor(actor_id: str) -> list[dict]:
"""Get all activities by an actor."""
async with get_connection() as conn:
rows = await conn.fetch(
"""SELECT activity_id, activity_type, actor_id, object_data, published, signature
FROM activities WHERE actor_id = $1 ORDER BY published DESC""",
actor_id
)
return [_parse_activity_row(row) for row in rows]
async def create_activity(activity: dict) -> dict:
"""Create a new activity."""
async with get_connection() as conn:
row = await conn.fetchrow(
"""INSERT INTO activities (activity_id, activity_type, actor_id, object_data, published, signature)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING *""",
activity["activity_id"],
activity["activity_type"],
activity["actor_id"],
json.dumps(activity["object_data"]),
_parse_timestamp(activity["published"]),
json.dumps(activity.get("signature")) if activity.get("signature") else None
)
return _parse_activity_row(row)
async def count_activities() -> int:
"""Get total activity count."""
async with get_connection() as conn:
return await conn.fetchval("SELECT COUNT(*) FROM activities")
def _parse_activity_row(row) -> dict:
"""Parse a database row into an activity dict, handling JSONB fields."""
activity = dict(row)
# Convert datetime to ISO string
if activity.get("published"):
activity["published"] = activity["published"].isoformat()
# Ensure JSONB fields are dicts (handle string case)
for field in ("object_data", "signature"):
if isinstance(activity.get(field), str):
try:
activity[field] = json.loads(activity[field])
except (json.JSONDecodeError, TypeError):
pass
return activity
# ============ Activities (Transaction variants) ============
async def create_activity_tx(conn, activity: dict) -> dict:
"""Create a new activity within a transaction."""
row = await conn.fetchrow(
"""INSERT INTO activities (activity_id, activity_type, actor_id, object_data, published, signature)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING *""",
activity["activity_id"],
activity["activity_type"],
activity["actor_id"],
json.dumps(activity["object_data"]),
_parse_timestamp(activity["published"]),
json.dumps(activity.get("signature")) if activity.get("signature") else None
)
return _parse_activity_row(row)
# ============ Followers ============
async def get_followers(username: str) -> list[dict]:
"""Get followers for a user."""
async with get_connection() as conn:
rows = await conn.fetch(
"""SELECT id, username, acct, url, public_key, created_at
FROM followers WHERE username = $1""",
username
)
return [dict(row) for row in rows]
async def get_all_followers() -> list:
"""Get all followers (for backward compatibility with old global list)."""
async with get_connection() as conn:
rows = await conn.fetch(
"""SELECT DISTINCT url FROM followers"""
)
return [row["url"] for row in rows]
async def add_follower(username: str, acct: str, url: str, public_key: Optional[str] = None) -> dict:
"""Add a follower."""
async with get_connection() as conn:
row = await conn.fetchrow(
"""INSERT INTO followers (username, acct, url, public_key)
VALUES ($1, $2, $3, $4)
ON CONFLICT (username, acct) DO UPDATE SET url = $3, public_key = $4
RETURNING *""",
username, acct, url, public_key
)
return dict(row)
async def remove_follower(username: str, acct: str) -> bool:
"""Remove a follower."""
async with get_connection() as conn:
result = await conn.execute(
"DELETE FROM followers WHERE username = $1 AND acct = $2",
username, acct
)
return result == "DELETE 1"
# ============ Stats ============
async def get_stats() -> dict:
"""Get counts for dashboard."""
async with get_connection() as conn:
assets = await conn.fetchval("SELECT COUNT(*) FROM assets")
activities = await conn.fetchval("SELECT COUNT(*) FROM activities")
users = await conn.fetchval("SELECT COUNT(*) FROM users")
return {"assets": assets, "activities": activities, "users": users}
# ============ Anchors (Bitcoin timestamps) ============
async def get_unanchored_activities() -> list[dict]:
"""Get all activities not yet anchored to Bitcoin."""
async with get_connection() as conn:
rows = await conn.fetch(
"""SELECT activity_id, activity_type, actor_id, object_data, published, signature
FROM activities WHERE anchor_root IS NULL ORDER BY published ASC"""
)
return [_parse_activity_row(row) for row in rows]
async def create_anchor(anchor: dict) -> dict:
"""Create an anchor record."""
async with get_connection() as conn:
row = await conn.fetchrow(
"""INSERT INTO anchors (merkle_root, tree_ipfs_cid, ots_proof_cid,
activity_count, first_activity_id, last_activity_id)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING *""",
anchor["merkle_root"],
anchor.get("tree_ipfs_cid"),
anchor.get("ots_proof_cid"),
anchor["activity_count"],
anchor.get("first_activity_id"),
anchor.get("last_activity_id")
)
return dict(row)
async def mark_activities_anchored(activity_ids: list[str], merkle_root: str) -> int:
"""Mark activities as anchored with the given merkle root."""
async with get_connection() as conn:
result = await conn.execute(
"""UPDATE activities SET anchor_root = $1
WHERE activity_id = ANY($2::text[])""",
merkle_root,
activity_ids
)
# Returns "UPDATE N"
return int(result.split()[1]) if result else 0
async def get_anchor(merkle_root: str) -> Optional[dict]:
"""Get anchor by merkle root."""
async with get_connection() as conn:
row = await conn.fetchrow(
"SELECT * FROM anchors WHERE merkle_root = $1",
merkle_root
)
if row:
result = dict(row)
if result.get("first_activity_id"):
result["first_activity_id"] = str(result["first_activity_id"])
if result.get("last_activity_id"):
result["last_activity_id"] = str(result["last_activity_id"])
if result.get("created_at"):
result["created_at"] = result["created_at"].isoformat()
if result.get("confirmed_at"):
result["confirmed_at"] = result["confirmed_at"].isoformat()
return result
return None
async def get_all_anchors() -> list[dict]:
"""Get all anchors, newest first."""
async with get_connection() as conn:
rows = await conn.fetch(
"SELECT * FROM anchors ORDER BY created_at DESC"
)
results = []
for row in rows:
result = dict(row)
if result.get("first_activity_id"):
result["first_activity_id"] = str(result["first_activity_id"])
if result.get("last_activity_id"):
result["last_activity_id"] = str(result["last_activity_id"])
if result.get("created_at"):
result["created_at"] = result["created_at"].isoformat()
if result.get("confirmed_at"):
result["confirmed_at"] = result["confirmed_at"].isoformat()
results.append(result)
return results
async def get_anchors_paginated(offset: int = 0, limit: int = 20) -> list[dict]:
"""Get anchors with pagination, newest first."""
async with get_connection() as conn:
rows = await conn.fetch(
"SELECT * FROM anchors ORDER BY created_at DESC LIMIT $1 OFFSET $2",
limit, offset
)
results = []
for row in rows:
result = dict(row)
if result.get("first_activity_id"):
result["first_activity_id"] = str(result["first_activity_id"])
if result.get("last_activity_id"):
result["last_activity_id"] = str(result["last_activity_id"])
if result.get("created_at"):
result["created_at"] = result["created_at"].isoformat()
if result.get("confirmed_at"):
result["confirmed_at"] = result["confirmed_at"].isoformat()
results.append(result)
return results
async def update_anchor_confirmed(merkle_root: str, bitcoin_txid: str) -> bool:
"""Mark anchor as confirmed with Bitcoin txid."""
async with get_connection() as conn:
result = await conn.execute(
"""UPDATE anchors SET confirmed_at = NOW(), bitcoin_txid = $1
WHERE merkle_root = $2""",
bitcoin_txid, merkle_root
)
return result == "UPDATE 1"
async def get_anchor_stats() -> dict:
"""Get anchoring statistics."""
async with get_connection() as conn:
total_anchors = await conn.fetchval("SELECT COUNT(*) FROM anchors")
confirmed_anchors = await conn.fetchval(
"SELECT COUNT(*) FROM anchors WHERE confirmed_at IS NOT NULL"
)
pending_anchors = await conn.fetchval(
"SELECT COUNT(*) FROM anchors WHERE confirmed_at IS NULL"
)
anchored_activities = await conn.fetchval(
"SELECT COUNT(*) FROM activities WHERE anchor_root IS NOT NULL"
)
unanchored_activities = await conn.fetchval(
"SELECT COUNT(*) FROM activities WHERE anchor_root IS NULL"
)
return {
"total_anchors": total_anchors,
"confirmed_anchors": confirmed_anchors,
"pending_anchors": pending_anchors,
"anchored_activities": anchored_activities,
"unanchored_activities": unanchored_activities
}
# ============ User Renderers (L1 attachments) ============
async def get_user_renderers(username: str) -> list[str]:
"""Get L1 renderer URLs attached by a user."""
async with get_connection() as conn:
rows = await conn.fetch(
"SELECT l1_url FROM user_renderers WHERE username = $1 ORDER BY attached_at",
username
)
return [row["l1_url"] for row in rows]
async def attach_renderer(username: str, l1_url: str) -> bool:
"""Attach a user to an L1 renderer. Returns True if newly attached."""
async with get_connection() as conn:
try:
await conn.execute(
"""INSERT INTO user_renderers (username, l1_url)
VALUES ($1, $2)
ON CONFLICT (username, l1_url) DO NOTHING""",
username, l1_url
)
return True
except Exception:
return False
async def detach_renderer(username: str, l1_url: str) -> bool:
"""Detach a user from an L1 renderer. Returns True if was attached."""
async with get_connection() as conn:
result = await conn.execute(
"DELETE FROM user_renderers WHERE username = $1 AND l1_url = $2",
username, l1_url
)
return "DELETE 1" in result
# ============ User Storage ============
async def get_user_storage(username: str) -> list[dict]:
"""Get all storage providers for a user."""
async with get_connection() as conn:
rows = await conn.fetch(
"""SELECT id, username, provider_type, provider_name, description, config,
capacity_gb, is_active, created_at, updated_at
FROM user_storage WHERE username = $1
ORDER BY provider_type, created_at""",
username
)
return [dict(row) for row in rows]
async def get_user_storage_by_type(username: str, provider_type: str) -> list[dict]:
"""Get storage providers of a specific type for a user."""
async with get_connection() as conn:
rows = await conn.fetch(
"""SELECT id, username, provider_type, provider_name, description, config,
capacity_gb, is_active, created_at, updated_at
FROM user_storage WHERE username = $1 AND provider_type = $2
ORDER BY created_at""",
username, provider_type
)
return [dict(row) for row in rows]
async def get_storage_by_id(storage_id: int) -> Optional[dict]:
"""Get a storage provider by ID."""
async with get_connection() as conn:
row = await conn.fetchrow(
"""SELECT id, username, provider_type, provider_name, description, config,
capacity_gb, is_active, created_at, updated_at
FROM user_storage WHERE id = $1""",
storage_id
)
return dict(row) if row else None
async def add_user_storage(
username: str,
provider_type: str,
provider_name: str,
config: dict,
capacity_gb: int,
description: Optional[str] = None
) -> Optional[int]:
"""Add a storage provider for a user. Returns storage ID."""
async with get_connection() as conn:
try:
row = await conn.fetchrow(
"""INSERT INTO user_storage (username, provider_type, provider_name, description, config, capacity_gb)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING id""",
username, provider_type, provider_name, description, json.dumps(config), capacity_gb
)
return row["id"] if row else None
except Exception:
return None
async def update_user_storage(
storage_id: int,
provider_name: Optional[str] = None,
description: Optional[str] = None,
config: Optional[dict] = None,
capacity_gb: Optional[int] = None,
is_active: Optional[bool] = None
) -> bool:
"""Update a storage provider."""
updates = []
params = []
param_num = 1
if provider_name is not None:
updates.append(f"provider_name = ${param_num}")
params.append(provider_name)
param_num += 1
if description is not None:
updates.append(f"description = ${param_num}")
params.append(description)
param_num += 1
if config is not None:
updates.append(f"config = ${param_num}")
params.append(json.dumps(config))
param_num += 1
if capacity_gb is not None:
updates.append(f"capacity_gb = ${param_num}")
params.append(capacity_gb)
param_num += 1
if is_active is not None:
updates.append(f"is_active = ${param_num}")
params.append(is_active)
param_num += 1
if not updates:
return False
updates.append("updated_at = NOW()")
params.append(storage_id)
async with get_connection() as conn:
result = await conn.execute(
f"UPDATE user_storage SET {', '.join(updates)} WHERE id = ${param_num}",
*params
)
return "UPDATE 1" in result
async def remove_user_storage(storage_id: int) -> bool:
"""Remove a storage provider. Cascades to storage_pins."""
async with get_connection() as conn:
result = await conn.execute(
"DELETE FROM user_storage WHERE id = $1",
storage_id
)
return "DELETE 1" in result
async def get_storage_usage(storage_id: int) -> dict:
"""Get storage usage stats for a provider."""
async with get_connection() as conn:
row = await conn.fetchrow(
"""SELECT
COUNT(*) as pin_count,
COALESCE(SUM(size_bytes), 0) as used_bytes
FROM storage_pins WHERE storage_id = $1""",
storage_id
)
return {"pin_count": row["pin_count"], "used_bytes": row["used_bytes"]}
async def add_storage_pin(
content_hash: str,
storage_id: int,
ipfs_cid: Optional[str],
pin_type: str,
size_bytes: int
) -> Optional[int]:
"""Add a pin record. Returns pin ID."""
async with get_connection() as conn:
try:
row = await conn.fetchrow(
"""INSERT INTO storage_pins (content_hash, storage_id, ipfs_cid, pin_type, size_bytes)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (content_hash, storage_id) DO UPDATE SET
ipfs_cid = EXCLUDED.ipfs_cid,
pin_type = EXCLUDED.pin_type,
size_bytes = EXCLUDED.size_bytes,
pinned_at = NOW()
RETURNING id""",
content_hash, storage_id, ipfs_cid, pin_type, size_bytes
)
return row["id"] if row else None
except Exception:
return None
async def remove_storage_pin(content_hash: str, storage_id: int) -> bool:
"""Remove a pin record."""
async with get_connection() as conn:
result = await conn.execute(
"DELETE FROM storage_pins WHERE content_hash = $1 AND storage_id = $2",
content_hash, storage_id
)
return "DELETE 1" in result
async def get_pins_for_content(content_hash: str) -> list[dict]:
"""Get all storage locations where content is pinned."""
async with get_connection() as conn:
rows = await conn.fetch(
"""SELECT sp.*, us.provider_type, us.provider_name, us.username
FROM storage_pins sp
JOIN user_storage us ON sp.storage_id = us.id
WHERE sp.content_hash = $1""",
content_hash
)
return [dict(row) for row in rows]
async def get_all_active_storage() -> list[dict]:
"""Get all active storage providers (for distributed pinning)."""
async with get_connection() as conn:
rows = await conn.fetch(
"""SELECT us.id, us.username, us.provider_type, us.provider_name, us.description,
us.config, us.capacity_gb, us.is_active, us.created_at, us.updated_at,
COALESCE(SUM(sp.size_bytes), 0) as used_bytes,
COUNT(sp.id) as pin_count
FROM user_storage us
LEFT JOIN storage_pins sp ON us.id = sp.storage_id
WHERE us.is_active = true
GROUP BY us.id
ORDER BY us.provider_type, us.created_at"""
)
return [dict(row) for row in rows]
# ============ Token Revocation ============
async def revoke_token(token_hash: str, username: str, expires_at) -> bool:
"""Revoke a token. Returns True if newly revoked."""
async with get_connection() as conn:
try:
await conn.execute(
"""INSERT INTO revoked_tokens (token_hash, username, expires_at)
VALUES ($1, $2, $3)
ON CONFLICT (token_hash) DO NOTHING""",
token_hash, username, expires_at
)
return True
except Exception:
return False
async def is_token_revoked(token_hash: str) -> bool:
"""Check if a token has been revoked."""
async with get_connection() as conn:
row = await conn.fetchrow(
"SELECT 1 FROM revoked_tokens WHERE token_hash = $1 AND expires_at > NOW()",
token_hash
)
return row is not None
async def cleanup_expired_revocations() -> int:
"""Remove expired revocation entries. Returns count removed."""
async with get_connection() as conn:
result = await conn.execute(
"DELETE FROM revoked_tokens WHERE expires_at < NOW()"
)
# Extract count from "DELETE N"
try:
return int(result.split()[-1])
except (ValueError, IndexError):
return 0
# ============ Additional helper functions ============
async def get_user_assets(username: str, offset: int = 0, limit: int = 20, asset_type: str = None) -> list[dict]:
"""Get assets owned by a user with pagination."""
async with get_connection() as conn:
if asset_type:
rows = await conn.fetch(
"""SELECT * FROM assets WHERE owner = $1 AND asset_type = $2
ORDER BY created_at DESC LIMIT $3 OFFSET $4""",
username, asset_type, limit, offset
)
else:
rows = await conn.fetch(
"""SELECT * FROM assets WHERE owner = $1
ORDER BY created_at DESC LIMIT $2 OFFSET $3""",
username, limit, offset
)
return [dict(row) for row in rows]
async def delete_asset(asset_id: str) -> bool:
"""Delete an asset by name/id."""
async with get_connection() as conn:
result = await conn.execute("DELETE FROM assets WHERE name = $1", asset_id)
return "DELETE 1" in result
async def count_users() -> int:
"""Count total users."""
async with get_connection() as conn:
return await conn.fetchval("SELECT COUNT(*) FROM users")
async def count_user_activities(username: str) -> int:
"""Count activities by a user."""
async with get_connection() as conn:
return await conn.fetchval(
"SELECT COUNT(*) FROM activities WHERE actor_id LIKE $1",
f"%{username}%"
)
async def get_user_activities(username: str, limit: int = 20, offset: int = 0) -> list[dict]:
"""Get activities by a user."""
async with get_connection() as conn:
rows = await conn.fetch(
"""SELECT activity_id, activity_type, actor_id, object_data, published, signature
FROM activities WHERE actor_id LIKE $1
ORDER BY published DESC LIMIT $2 OFFSET $3""",
f"%{username}%", limit, offset
)
return [_parse_activity_row(row) for row in rows]
async def get_renderer(renderer_id: str) -> Optional[dict]:
"""Get a renderer by ID/URL."""
async with get_connection() as conn:
row = await conn.fetchrow(
"SELECT * FROM user_renderers WHERE l1_url = $1",
renderer_id
)
return dict(row) if row else None
async def update_anchor(anchor_id: str, **updates) -> bool:
"""Update an anchor."""
async with get_connection() as conn:
if "bitcoin_txid" in updates:
result = await conn.execute(
"""UPDATE anchors SET bitcoin_txid = $1, confirmed_at = NOW()
WHERE merkle_root = $2""",
updates["bitcoin_txid"], anchor_id
)
return "UPDATE 1" in result
return False
async def delete_anchor(anchor_id: str) -> bool:
"""Delete an anchor."""
async with get_connection() as conn:
result = await conn.execute(
"DELETE FROM anchors WHERE merkle_root = $1", anchor_id
)
return "DELETE 1" in result
async def record_run(run_id: str, username: str, recipe: str, inputs: list,
output_hash: str, ipfs_cid: str = None, asset_id: str = None) -> dict:
"""Record a completed run."""
async with get_connection() as conn:
# Check if runs table exists, if not just return the data
try:
row = await conn.fetchrow(
"""INSERT INTO runs (run_id, username, recipe, inputs, output_hash, ipfs_cid, asset_id, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, NOW())
ON CONFLICT (run_id) DO UPDATE SET
output_hash = EXCLUDED.output_hash,
ipfs_cid = EXCLUDED.ipfs_cid,
asset_id = EXCLUDED.asset_id
RETURNING *""",
run_id, username, recipe, json.dumps(inputs), output_hash, ipfs_cid, asset_id
)
return dict(row) if row else None
except Exception:
# Table might not exist
return {"run_id": run_id, "username": username, "recipe": recipe}
async def get_run(run_id: str) -> Optional[dict]:
"""Get a run by ID."""
async with get_connection() as conn:
try:
row = await conn.fetchrow("SELECT * FROM runs WHERE run_id = $1", run_id)
if row:
result = dict(row)
if result.get("inputs") and isinstance(result["inputs"], str):
result["inputs"] = json.loads(result["inputs"])
return result
except Exception:
pass
return None