diff --git a/auth.py b/auth.py index fcbbeed..112fd9c 100644 --- a/auth.py +++ b/auth.py @@ -4,7 +4,6 @@ Authentication for Art DAG L2 Server. User registration, login, and JWT tokens. """ -import json import os import secrets from datetime import datetime, timezone, timedelta @@ -15,6 +14,8 @@ import bcrypt from jose import JWTError, jwt from pydantic import BaseModel +import db + # JWT settings ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_DAYS = 30 @@ -68,25 +69,8 @@ class Token(BaseModel): expires_at: str -def get_users_path(data_dir: Path) -> Path: - """Get users file path.""" - return data_dir / "users.json" - - -def load_users(data_dir: Path) -> dict[str, dict]: - """Load users from disk.""" - path = get_users_path(data_dir) - if path.exists(): - with open(path) as f: - return json.load(f) - return {} - - -def save_users(data_dir: Path, users: dict[str, dict]): - """Save users to disk.""" - path = get_users_path(data_dir) - with open(path, "w") as f: - json.dump(users, f, indent=2) +# Keep DATA_DIR for keys (RSA keys still stored as files) +DATA_DIR = Path(os.environ.get("ARTDAG_DATA", str(Path.home() / ".artdag" / "l2"))) def hash_password(password: str) -> str: @@ -102,43 +86,53 @@ def verify_password(plain_password: str, hashed_password: str) -> bool: return bcrypt.checkpw(pw_bytes, hashed_password.encode('utf-8')) -def create_user(data_dir: Path, username: str, password: str, email: Optional[str] = None) -> User: +async def create_user(data_dir: Path, username: str, password: str, email: Optional[str] = None) -> User: """Create a new user with ActivityPub keys.""" from keys import generate_keypair - users = load_users(data_dir) - - if username in users: + if await db.user_exists(username): raise ValueError(f"Username already exists: {username}") - user = User( - username=username, - password_hash=hash_password(password), - created_at=datetime.now(timezone.utc).isoformat(), - email=email - ) - - users[username] = user.model_dump() - save_users(data_dir, users) + password_hash = hash_password(password) + user_data = await db.create_user(username, password_hash, email) # Generate ActivityPub keys for this user generate_keypair(data_dir, username) - return user + # Convert datetime to ISO string if needed + created_at = user_data.get("created_at") + if hasattr(created_at, 'isoformat'): + created_at = created_at.isoformat() + + return User( + username=username, + password_hash=password_hash, + created_at=created_at, + email=email + ) -def authenticate_user(data_dir: Path, username: str, password: str) -> Optional[User]: +async def authenticate_user(data_dir: Path, username: str, password: str) -> Optional[User]: """Authenticate a user by username and password.""" - users = load_users(data_dir) + user_data = await db.get_user(username) - if username not in users: + if not user_data: return None - user_data = users[username] if not verify_password(password, user_data["password_hash"]): return None - return User(**user_data) + # Convert datetime to ISO string if needed + created_at = user_data.get("created_at") + if hasattr(created_at, 'isoformat'): + created_at = created_at.isoformat() + + return User( + username=user_data["username"], + password_hash=user_data["password_hash"], + created_at=created_at, + email=user_data.get("email") + ) def create_access_token(username: str) -> Token: @@ -170,14 +164,24 @@ def verify_token(token: str) -> Optional[str]: return None -def get_current_user(data_dir: Path, token: str) -> Optional[User]: +async def get_current_user(data_dir: Path, token: str) -> Optional[User]: """Get current user from token.""" username = verify_token(token) if not username: return None - users = load_users(data_dir) - if username not in users: + user_data = await db.get_user(username) + if not user_data: return None - return User(**users[username]) + # Convert datetime to ISO string if needed + created_at = user_data.get("created_at") + if hasattr(created_at, 'isoformat'): + created_at = created_at.isoformat() + + return User( + username=user_data["username"], + password_hash=user_data["password_hash"], + created_at=created_at, + email=user_data.get("email") + ) diff --git a/db.py b/db.py new file mode 100644 index 0000000..6e997aa --- /dev/null +++ b/db.py @@ -0,0 +1,456 @@ +""" +Database module for Art DAG L2 Server. + +Uses asyncpg for async PostgreSQL access with connection pooling. +""" + +import json +import os +from datetime import datetime, timezone +from typing import Optional +from contextlib import asynccontextmanager +from uuid import UUID + +import asyncpg + +# Connection pool (initialized on startup) +_pool: Optional[asyncpg.Pool] = None + +# Configuration from environment +DATABASE_URL = os.environ.get( + "DATABASE_URL", + "postgresql://artdag:artdag@localhost:5432/artdag" +) + +# Schema for database initialization +SCHEMA = """ +-- Users table +CREATE TABLE IF NOT EXISTS users ( + username VARCHAR(255) PRIMARY KEY, + password_hash VARCHAR(255) NOT NULL, + email VARCHAR(255), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Assets table +CREATE TABLE IF NOT EXISTS assets ( + name VARCHAR(255) PRIMARY KEY, + content_hash VARCHAR(128) NOT NULL, + asset_type VARCHAR(50) NOT NULL, + tags JSONB DEFAULT '[]'::jsonb, + metadata JSONB DEFAULT '{}'::jsonb, + url TEXT, + provenance JSONB, + description TEXT, + origin JSONB, + owner VARCHAR(255) NOT NULL REFERENCES users(username), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ +); + +-- Activities table +CREATE TABLE IF NOT EXISTS activities ( + activity_id UUID PRIMARY KEY, + activity_type VARCHAR(50) NOT NULL, + actor_id TEXT NOT NULL, + object_data JSONB NOT NULL, + published TIMESTAMPTZ NOT NULL, + signature JSONB +); + +-- Followers table +CREATE TABLE IF NOT EXISTS followers ( + id SERIAL PRIMARY KEY, + username VARCHAR(255) NOT NULL REFERENCES users(username), + acct VARCHAR(255) NOT NULL, + url TEXT NOT NULL, + public_key TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE(username, acct) +); + +-- Indexes +CREATE INDEX IF NOT EXISTS idx_users_created_at ON users(created_at); +CREATE INDEX IF NOT EXISTS idx_assets_content_hash ON assets(content_hash); +CREATE INDEX IF NOT EXISTS idx_assets_owner ON assets(owner); +CREATE INDEX IF NOT EXISTS idx_assets_created_at ON assets(created_at DESC); +CREATE INDEX IF NOT EXISTS idx_assets_tags ON assets USING GIN(tags); +CREATE INDEX IF NOT EXISTS idx_activities_actor_id ON activities(actor_id); +CREATE INDEX IF NOT EXISTS idx_activities_published ON activities(published DESC); +CREATE INDEX IF NOT EXISTS idx_followers_username ON followers(username); +""" + + +async def init_pool(): + """Initialize the connection pool and create tables. Call on app startup.""" + global _pool + _pool = await asyncpg.create_pool( + DATABASE_URL, + min_size=2, + max_size=10, + command_timeout=60 + ) + # Create tables if they don't exist + async with _pool.acquire() as conn: + await conn.execute(SCHEMA) + + +async def close_pool(): + """Close the connection pool. Call on app shutdown.""" + global _pool + if _pool: + await _pool.close() + _pool = None + + +def get_pool() -> asyncpg.Pool: + """Get the connection pool.""" + if _pool is None: + raise RuntimeError("Database pool not initialized") + return _pool + + +@asynccontextmanager +async def get_connection(): + """Get a connection from the pool.""" + async with get_pool().acquire() as conn: + yield conn + + +# ============ Users ============ + +async def get_user(username: str) -> Optional[dict]: + """Get user by username.""" + async with get_connection() as conn: + row = await conn.fetchrow( + "SELECT username, password_hash, email, created_at FROM users WHERE username = $1", + username + ) + if row: + return dict(row) + return None + + +async def get_all_users() -> dict[str, dict]: + """Get all users as a dict indexed by username.""" + async with get_connection() as conn: + rows = await conn.fetch( + "SELECT username, password_hash, email, created_at FROM users ORDER BY username" + ) + return {row["username"]: dict(row) for row in rows} + + +async def create_user(username: str, password_hash: str, email: Optional[str] = None) -> dict: + """Create a new user.""" + async with get_connection() as conn: + row = await conn.fetchrow( + """INSERT INTO users (username, password_hash, email) + VALUES ($1, $2, $3) + RETURNING username, password_hash, email, created_at""", + username, password_hash, email + ) + return dict(row) + + +async def user_exists(username: str) -> bool: + """Check if user exists.""" + async with get_connection() as conn: + result = await conn.fetchval( + "SELECT EXISTS(SELECT 1 FROM users WHERE username = $1)", + username + ) + return result + + +# ============ Assets ============ + +async def get_asset(name: str) -> Optional[dict]: + """Get asset by name.""" + async with get_connection() as conn: + row = await conn.fetchrow( + """SELECT name, content_hash, asset_type, tags, metadata, url, + provenance, description, origin, owner, created_at, updated_at + FROM assets WHERE name = $1""", + name + ) + if row: + return _parse_asset_row(row) + return None + + +async def get_asset_by_hash(content_hash: str) -> Optional[dict]: + """Get asset by content hash.""" + async with get_connection() as conn: + row = await conn.fetchrow( + """SELECT name, content_hash, asset_type, tags, metadata, url, + provenance, description, origin, owner, created_at, updated_at + FROM assets WHERE content_hash = $1""", + content_hash + ) + if row: + return _parse_asset_row(row) + return None + + +async def get_all_assets() -> dict[str, dict]: + """Get all assets as a dict indexed by name.""" + async with get_connection() as conn: + rows = await conn.fetch( + """SELECT name, content_hash, asset_type, tags, metadata, url, + provenance, description, origin, owner, created_at, updated_at + FROM assets ORDER BY created_at DESC""" + ) + return {row["name"]: _parse_asset_row(row) for row in rows} + + +async def get_assets_paginated(limit: int = 100, offset: int = 0) -> tuple[list[tuple[str, dict]], int]: + """Get paginated assets, returns (list of (name, asset) tuples, total_count).""" + async with get_connection() as conn: + total = await conn.fetchval("SELECT COUNT(*) FROM assets") + rows = await conn.fetch( + """SELECT name, content_hash, asset_type, tags, metadata, url, + provenance, description, origin, owner, created_at, updated_at + FROM assets ORDER BY created_at DESC LIMIT $1 OFFSET $2""", + limit, offset + ) + return [(row["name"], _parse_asset_row(row)) for row in rows], total + + +async def get_assets_by_owner(owner: str) -> dict[str, dict]: + """Get all assets owned by a user.""" + async with get_connection() as conn: + rows = await conn.fetch( + """SELECT name, content_hash, asset_type, tags, metadata, url, + provenance, description, origin, owner, created_at, updated_at + FROM assets WHERE owner = $1 ORDER BY created_at DESC""", + owner + ) + return {row["name"]: _parse_asset_row(row) for row in rows} + + +async def create_asset(asset: dict) -> dict: + """Create a new asset.""" + async with get_connection() as conn: + row = await conn.fetchrow( + """INSERT INTO assets (name, content_hash, asset_type, tags, metadata, + url, provenance, description, origin, owner, created_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + RETURNING *""", + asset["name"], + asset["content_hash"], + asset["asset_type"], + json.dumps(asset.get("tags", [])), + json.dumps(asset.get("metadata", {})), + asset.get("url"), + json.dumps(asset.get("provenance")) if asset.get("provenance") else None, + asset.get("description"), + json.dumps(asset.get("origin")) if asset.get("origin") else None, + asset["owner"], + asset.get("created_at") or datetime.now(timezone.utc).isoformat() + ) + return _parse_asset_row(row) + + +async def update_asset(name: str, updates: dict) -> Optional[dict]: + """Update an existing asset.""" + # Build dynamic UPDATE query + set_clauses = [] + values = [] + idx = 1 + + for key, value in updates.items(): + if key in ("tags", "metadata", "provenance", "origin"): + set_clauses.append(f"{key} = ${idx}") + values.append(json.dumps(value) if value is not None else None) + else: + set_clauses.append(f"{key} = ${idx}") + values.append(value) + idx += 1 + + set_clauses.append(f"updated_at = ${idx}") + values.append(datetime.now(timezone.utc)) + idx += 1 + + values.append(name) # WHERE clause + + async with get_connection() as conn: + row = await conn.fetchrow( + f"""UPDATE assets SET {', '.join(set_clauses)} + WHERE name = ${idx} RETURNING *""", + *values + ) + if row: + return _parse_asset_row(row) + return None + + +async def asset_exists(name: str) -> bool: + """Check if asset exists.""" + async with get_connection() as conn: + return await conn.fetchval( + "SELECT EXISTS(SELECT 1 FROM assets WHERE name = $1)", + name + ) + + +def _parse_asset_row(row) -> dict: + """Parse a database row into an asset dict, handling JSONB fields.""" + asset = dict(row) + # Convert datetime to ISO string + if asset.get("created_at"): + asset["created_at"] = asset["created_at"].isoformat() + if asset.get("updated_at"): + asset["updated_at"] = asset["updated_at"].isoformat() + return asset + + +# ============ Activities ============ + +async def get_activity(activity_id: str) -> Optional[dict]: + """Get activity by ID.""" + async with get_connection() as conn: + row = await conn.fetchrow( + """SELECT activity_id, activity_type, actor_id, object_data, published, signature + FROM activities WHERE activity_id = $1""", + UUID(activity_id) + ) + if row: + return _parse_activity_row(row) + return None + + +async def get_activity_by_index(index: int) -> Optional[dict]: + """Get activity by index (for backward compatibility with URL scheme).""" + async with get_connection() as conn: + row = await conn.fetchrow( + """SELECT activity_id, activity_type, actor_id, object_data, published, signature + FROM activities ORDER BY published ASC LIMIT 1 OFFSET $1""", + index + ) + if row: + return _parse_activity_row(row) + return None + + +async def get_all_activities() -> list[dict]: + """Get all activities ordered by published date (oldest first for index compatibility).""" + async with get_connection() as conn: + rows = await conn.fetch( + """SELECT activity_id, activity_type, actor_id, object_data, published, signature + FROM activities ORDER BY published ASC""" + ) + return [_parse_activity_row(row) for row in rows] + + +async def get_activities_paginated(limit: int = 100, offset: int = 0) -> tuple[list[dict], int]: + """Get paginated activities (newest first), returns (activities, total_count).""" + async with get_connection() as conn: + total = await conn.fetchval("SELECT COUNT(*) FROM activities") + rows = await conn.fetch( + """SELECT activity_id, activity_type, actor_id, object_data, published, signature + FROM activities ORDER BY published DESC LIMIT $1 OFFSET $2""", + limit, offset + ) + return [_parse_activity_row(row) for row in rows], total + + +async def get_activities_by_actor(actor_id: str) -> list[dict]: + """Get all activities by an actor.""" + async with get_connection() as conn: + rows = await conn.fetch( + """SELECT activity_id, activity_type, actor_id, object_data, published, signature + FROM activities WHERE actor_id = $1 ORDER BY published DESC""", + actor_id + ) + return [_parse_activity_row(row) for row in rows] + + +async def create_activity(activity: dict) -> dict: + """Create a new activity.""" + async with get_connection() as conn: + row = await conn.fetchrow( + """INSERT INTO activities (activity_id, activity_type, actor_id, object_data, published, signature) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING *""", + UUID(activity["activity_id"]), + activity["activity_type"], + activity["actor_id"], + json.dumps(activity["object_data"]), + activity["published"], + json.dumps(activity.get("signature")) if activity.get("signature") else None + ) + return _parse_activity_row(row) + + +async def count_activities() -> int: + """Get total activity count.""" + async with get_connection() as conn: + return await conn.fetchval("SELECT COUNT(*) FROM activities") + + +def _parse_activity_row(row) -> dict: + """Parse a database row into an activity dict, handling JSONB fields.""" + activity = dict(row) + # Convert UUID to string + if activity.get("activity_id"): + activity["activity_id"] = str(activity["activity_id"]) + # Convert datetime to ISO string + if activity.get("published"): + activity["published"] = activity["published"].isoformat() + return activity + + +# ============ Followers ============ + +async def get_followers(username: str) -> list[dict]: + """Get followers for a user.""" + async with get_connection() as conn: + rows = await conn.fetch( + """SELECT id, username, acct, url, public_key, created_at + FROM followers WHERE username = $1""", + username + ) + return [dict(row) for row in rows] + + +async def get_all_followers() -> list: + """Get all followers (for backward compatibility with old global list).""" + async with get_connection() as conn: + rows = await conn.fetch( + """SELECT DISTINCT url FROM followers""" + ) + return [row["url"] for row in rows] + + +async def add_follower(username: str, acct: str, url: str, public_key: Optional[str] = None) -> dict: + """Add a follower.""" + async with get_connection() as conn: + row = await conn.fetchrow( + """INSERT INTO followers (username, acct, url, public_key) + VALUES ($1, $2, $3, $4) + ON CONFLICT (username, acct) DO UPDATE SET url = $3, public_key = $4 + RETURNING *""", + username, acct, url, public_key + ) + return dict(row) + + +async def remove_follower(username: str, acct: str) -> bool: + """Remove a follower.""" + async with get_connection() as conn: + result = await conn.execute( + "DELETE FROM followers WHERE username = $1 AND acct = $2", + username, acct + ) + return result == "DELETE 1" + + +# ============ Stats ============ + +async def get_stats() -> dict: + """Get counts for dashboard.""" + async with get_connection() as conn: + assets = await conn.fetchval("SELECT COUNT(*) FROM assets") + activities = await conn.fetchval("SELECT COUNT(*) FROM activities") + users = await conn.fetchval("SELECT COUNT(*) FROM users") + return {"assets": assets, "activities": activities, "users": users} diff --git a/docker-compose.yml b/docker-compose.yml index c895e0d..0c56941 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,17 +1,38 @@ version: "3.8" services: + postgres: + image: postgres:16-alpine + environment: + POSTGRES_USER: artdag + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-artdag} + POSTGRES_DB: artdag + volumes: + - postgres_data:/var/lib/postgresql/data + networks: + - internal + healthcheck: + test: ["CMD-SHELL", "pg_isready -U artdag"] + interval: 5s + timeout: 5s + retries: 5 + l2-server: image: git.rose-ash.com/art-dag/l2-server:latest env_file: - .env environment: - ARTDAG_DATA=/data/l2 + - DATABASE_URL=postgresql://artdag:${POSTGRES_PASSWORD:-artdag}@postgres:5432/artdag # ARTDAG_DOMAIN, ARTDAG_USER, ARTDAG_L1, JWT_SECRET from .env file volumes: - - l2_data:/data/l2 + - l2_data:/data/l2 # Still needed for RSA keys networks: + - internal - externalnet + depends_on: + postgres: + condition: service_healthy deploy: replicas: 1 restart_policy: @@ -19,7 +40,9 @@ services: volumes: l2_data: + postgres_data: networks: + internal: externalnet: external: true diff --git a/migrate.py b/migrate.py new file mode 100755 index 0000000..6dbfb1f --- /dev/null +++ b/migrate.py @@ -0,0 +1,246 @@ +#!/usr/bin/env python3 +""" +Migration script: JSON files to PostgreSQL. + +Usage: + python migrate.py [--dry-run] + +Migrates: +- users.json -> users table +- registry.json -> assets table +- activities.json -> activities table +- followers.json -> followers table + +Does NOT migrate: +- keys/ directory (stays as files) +""" + +import asyncio +import json +import os +import sys +from pathlib import Path +from datetime import datetime, timezone +from uuid import UUID + +import asyncpg + +# Configuration +DATA_DIR = Path(os.environ.get("ARTDAG_DATA", str(Path.home() / ".artdag" / "l2"))) +DATABASE_URL = os.environ.get( + "DATABASE_URL", + "postgresql://artdag:artdag@localhost:5432/artdag" +) + +SCHEMA = """ +-- Drop existing tables (careful in production!) +DROP TABLE IF EXISTS followers CASCADE; +DROP TABLE IF EXISTS activities CASCADE; +DROP TABLE IF EXISTS assets CASCADE; +DROP TABLE IF EXISTS users CASCADE; + +-- Users table +CREATE TABLE users ( + username VARCHAR(255) PRIMARY KEY, + password_hash VARCHAR(255) NOT NULL, + email VARCHAR(255), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Assets table +CREATE TABLE assets ( + name VARCHAR(255) PRIMARY KEY, + content_hash VARCHAR(128) NOT NULL, + asset_type VARCHAR(50) NOT NULL, + tags JSONB DEFAULT '[]'::jsonb, + metadata JSONB DEFAULT '{}'::jsonb, + url TEXT, + provenance JSONB, + description TEXT, + origin JSONB, + owner VARCHAR(255) NOT NULL REFERENCES users(username), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ +); + +-- Activities table +CREATE TABLE activities ( + activity_id UUID PRIMARY KEY, + activity_type VARCHAR(50) NOT NULL, + actor_id TEXT NOT NULL, + object_data JSONB NOT NULL, + published TIMESTAMPTZ NOT NULL, + signature JSONB +); + +-- Followers table +CREATE TABLE followers ( + id SERIAL PRIMARY KEY, + username VARCHAR(255) NOT NULL REFERENCES users(username), + acct VARCHAR(255) NOT NULL, + url TEXT NOT NULL, + public_key TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE(username, acct) +); + +-- Indexes +CREATE INDEX idx_users_created_at ON users(created_at); +CREATE INDEX idx_assets_content_hash ON assets(content_hash); +CREATE INDEX idx_assets_owner ON assets(owner); +CREATE INDEX idx_assets_created_at ON assets(created_at DESC); +CREATE INDEX idx_assets_tags ON assets USING GIN(tags); +CREATE INDEX idx_activities_actor_id ON activities(actor_id); +CREATE INDEX idx_activities_published ON activities(published DESC); +CREATE INDEX idx_followers_username ON followers(username); +""" + + +async def migrate(dry_run: bool = False): + """Run the migration.""" + print(f"Migrating from {DATA_DIR} to PostgreSQL") + print(f"Database: {DATABASE_URL}") + print(f"Dry run: {dry_run}") + print() + + # Load JSON files + users = load_json(DATA_DIR / "users.json") or {} + registry = load_json(DATA_DIR / "registry.json") or {"assets": {}} + activities_data = load_json(DATA_DIR / "activities.json") or {"activities": []} + followers = load_json(DATA_DIR / "followers.json") or [] + + assets = registry.get("assets", {}) + activities = activities_data.get("activities", []) + + print(f"Found {len(users)} users") + print(f"Found {len(assets)} assets") + print(f"Found {len(activities)} activities") + print(f"Found {len(followers)} followers") + print() + + if dry_run: + print("DRY RUN - no changes made") + return + + # Connect and migrate + conn = await asyncpg.connect(DATABASE_URL) + try: + # Create schema + print("Creating schema...") + await conn.execute(SCHEMA) + + # Migrate users + print("Migrating users...") + for username, user_data in users.items(): + await conn.execute( + """INSERT INTO users (username, password_hash, email, created_at) + VALUES ($1, $2, $3, $4)""", + username, + user_data["password_hash"], + user_data.get("email"), + parse_timestamp(user_data.get("created_at")) + ) + print(f" Migrated {len(users)} users") + + # Migrate assets + print("Migrating assets...") + for name, asset in assets.items(): + await conn.execute( + """INSERT INTO assets (name, content_hash, asset_type, tags, metadata, + url, provenance, description, origin, owner, + created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)""", + name, + asset["content_hash"], + asset["asset_type"], + json.dumps(asset.get("tags", [])), + json.dumps(asset.get("metadata", {})), + asset.get("url"), + json.dumps(asset.get("provenance")) if asset.get("provenance") else None, + asset.get("description"), + json.dumps(asset.get("origin")) if asset.get("origin") else None, + asset["owner"], + parse_timestamp(asset.get("created_at")), + parse_timestamp(asset.get("updated_at")) + ) + print(f" Migrated {len(assets)} assets") + + # Migrate activities + print("Migrating activities...") + for activity in activities: + await conn.execute( + """INSERT INTO activities (activity_id, activity_type, actor_id, + object_data, published, signature) + VALUES ($1, $2, $3, $4, $5, $6)""", + UUID(activity["activity_id"]), + activity["activity_type"], + activity["actor_id"], + json.dumps(activity["object_data"]), + parse_timestamp(activity["published"]), + json.dumps(activity.get("signature")) if activity.get("signature") else None + ) + print(f" Migrated {len(activities)} activities") + + # Migrate followers + print("Migrating followers...") + if followers and users: + first_user = list(users.keys())[0] + migrated = 0 + for follower in followers: + if isinstance(follower, str): + # Old format: just URL string + await conn.execute( + """INSERT INTO followers (username, acct, url) + VALUES ($1, $2, $3) + ON CONFLICT DO NOTHING""", + first_user, + follower, + follower + ) + migrated += 1 + elif isinstance(follower, dict): + await conn.execute( + """INSERT INTO followers (username, acct, url, public_key) + VALUES ($1, $2, $3, $4) + ON CONFLICT DO NOTHING""", + follower.get("username", first_user), + follower.get("acct", follower.get("url", "")), + follower["url"], + follower.get("public_key") + ) + migrated += 1 + print(f" Migrated {migrated} followers") + else: + print(" No followers to migrate") + + print() + print("Migration complete!") + + finally: + await conn.close() + + +def load_json(path: Path) -> dict | list | None: + """Load JSON file if it exists.""" + if path.exists(): + with open(path) as f: + return json.load(f) + return None + + +def parse_timestamp(ts: str | None) -> datetime | None: + """Parse ISO timestamp string to datetime.""" + if not ts: + return datetime.now(timezone.utc) + try: + # Handle various ISO formats + if ts.endswith('Z'): + ts = ts[:-1] + '+00:00' + return datetime.fromisoformat(ts) + except Exception: + return datetime.now(timezone.utc) + + +if __name__ == "__main__": + dry_run = "--dry-run" in sys.argv + asyncio.run(migrate(dry_run)) diff --git a/requirements.txt b/requirements.txt index 4cfd4d9..faa8463 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ bcrypt>=4.0.0 python-jose[cryptography]>=3.3.0 markdown>=3.5.0 python-multipart>=0.0.6 +asyncpg>=0.29.0 diff --git a/server.py b/server.py index f436752..4ef0a61 100644 --- a/server.py +++ b/server.py @@ -13,6 +13,7 @@ import hashlib import json import os import uuid +from contextlib import asynccontextmanager from datetime import datetime, timezone from pathlib import Path from typing import Optional @@ -25,10 +26,11 @@ from pydantic import BaseModel import requests import markdown +import db from auth import ( UserCreate, UserLogin, Token, User, create_user, authenticate_user, create_access_token, - verify_token, get_current_user, load_users + verify_token, get_current_user ) # Configuration @@ -47,10 +49,20 @@ README_CONTENT = "" if README_PATH.exists(): README_CONTENT = README_PATH.read_text() + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Manage database connection pool lifecycle.""" + await db.init_pool() + yield + await db.close_pool() + + app = FastAPI( title="Art DAG L2 Server", description="ActivityPub server for Art DAG ownership and federation", - version="0.1.0" + version="0.1.0", + lifespan=lifespan ) @@ -115,39 +127,17 @@ class UpdateAssetRequest(BaseModel): origin: Optional[dict] = None -# ============ Storage ============ +# ============ Storage (Database) ============ -def load_registry() -> dict: - """Load registry from disk.""" - path = DATA_DIR / "registry.json" - if path.exists(): - with open(path) as f: - return json.load(f) - return {"version": "1.0", "assets": {}} +async def load_registry() -> dict: + """Load registry from database.""" + assets = await db.get_all_assets() + return {"version": "1.0", "assets": assets} -def save_registry(registry: dict): - """Save registry to disk.""" - path = DATA_DIR / "registry.json" - with open(path, "w") as f: - json.dump(registry, f, indent=2) - - -def load_activities() -> list: - """Load activities from disk.""" - path = DATA_DIR / "activities.json" - if path.exists(): - with open(path) as f: - data = json.load(f) - return data.get("activities", []) - return [] - - -def save_activities(activities: list): - """Save activities to disk.""" - path = DATA_DIR / "activities.json" - with open(path, "w") as f: - json.dump({"version": "1.0", "activities": activities}, f, indent=2) +async def load_activities() -> list: + """Load activities from database.""" + return await db.get_all_activities() def load_actor(username: str) -> dict: @@ -175,26 +165,14 @@ def load_actor(username: str) -> dict: return actor -def user_exists(username: str) -> bool: +async def user_exists(username: str) -> bool: """Check if a user exists.""" - users = load_users(DATA_DIR) - return username in users + return await db.user_exists(username) -def load_followers() -> list: - """Load followers list.""" - path = DATA_DIR / "followers.json" - if path.exists(): - with open(path) as f: - return json.load(f) - return [] - - -def save_followers(followers: list): - """Save followers list.""" - path = DATA_DIR / "followers.json" - with open(path, "w") as f: - json.dump(followers, f, indent=2) +async def load_followers() -> list: + """Load followers list from database.""" + return await db.get_all_followers() # ============ Signing ============ @@ -300,48 +278,7 @@ def wants_html(request: Request) -> bool: return "text/html" in accept and "application/json" not in accept and "application/activity+json" not in accept -# ============ UI Endpoints ============ - -@app.get("/ui", response_class=HTMLResponse) -async def ui_home(request: Request): - """Home page with README and stats.""" - username = get_user_from_cookie(request) - registry = load_registry() - activities = load_activities() - users = load_users(DATA_DIR) - - readme_html = markdown.markdown(README_CONTENT, extensions=['tables', 'fenced_code']) - - content = f''' -
No assets registered yet.
- ''' - else: - rows = "" - for name, asset in sorted(assets.items(), key=lambda x: x[1].get("created_at", ""), reverse=True): - hash_short = asset.get("content_hash", "")[:16] + "..." - owner = asset.get("owner", "unknown") - asset_type = asset.get("asset_type", "") - type_color = "bg-blue-600" if asset_type == "image" else "bg-purple-600" if asset_type == "video" else "bg-gray-600" - rows += f''' -{hash_short}| Name | -Type | -Owner | -Content Hash | -Tags | -
|---|
No activities yet.
- ''' - else: - rows = "" - for i, activity in enumerate(reversed(activities)): - # Index from end since we reversed - activity_index = len(activities) - 1 - i - obj = activity.get("object_data", {}) - activity_type = activity.get("activity_type", "") - type_color = "bg-green-600" if activity_type == "Create" else "bg-yellow-600" if activity_type == "Update" else "bg-gray-600" - actor_id = activity.get("actor_id", "") - actor_name = actor_id.split("/")[-1] if actor_id else "unknown" - rows += f''' -| Type | -Object | -Actor | -Published | -- |
|---|
No users registered yet.
- ''' - else: - rows = "" - for uname, user_data in sorted(users.items()): - webfinger = f"@{uname}@{DOMAIN}" - rows += f''' -{webfinger}Each user has their own ActivityPub actor that can be followed from Mastodon and other federated platforms.
-| Username | -ActivityPub Handle | -Registered | -
|---|
No user named "{username}" exists.
@@ -1173,12 +956,12 @@ async def ui_user_detail(username: str, request: Request): return HTMLResponse(base_html("User Not Found", content, current_user)) # Get user's assets - registry = load_registry() + registry = await load_registry() all_assets = registry.get("assets", {}) user_assets = {name: asset for name, asset in all_assets.items() if asset.get("owner") == username} # Get user's activities - all_activities = load_activities() + all_activities = await load_activities() actor_id = f"https://{DOMAIN}/users/{username}" user_activities = [a for a in all_activities if a.get("actor_id") == actor_id] @@ -1194,7 +977,7 @@ async def ui_user_detail(username: str, request: Request): rows += f'''{hash_short}{content_hash}