From a6e83c72bde03e733bbdff1cacc624f5b1f075d3 Mon Sep 17 00:00:00 2001 From: gilesb Date: Thu, 8 Jan 2026 00:22:21 +0000 Subject: [PATCH] Migrate to PostgreSQL database, consolidate routes, improve home page - Add PostgreSQL with asyncpg for persistent storage - Create db.py module with async database operations - Create migrate.py script to migrate JSON data to PostgreSQL - Update docker-compose.yml with PostgreSQL service - Home page now shows README with styled headings - Remove /ui prefix routes, use content negotiation on main routes - Add /activities/{idx} as canonical route (with /activity redirect) - Update /assets/{name} to support HTML and JSON responses - Convert auth.py to use async database operations - RSA keys still stored as files in $ARTDAG_DATA/keys/ Co-Authored-By: Claude Opus 4.5 --- auth.py | 90 ++++---- db.py | 456 +++++++++++++++++++++++++++++++++++++++++ docker-compose.yml | 25 ++- migrate.py | 246 ++++++++++++++++++++++ requirements.txt | 1 + server.py | 499 +++++++++++++-------------------------------- 6 files changed, 912 insertions(+), 405 deletions(-) create mode 100644 db.py create mode 100755 migrate.py diff --git a/auth.py b/auth.py index fcbbeed..112fd9c 100644 --- a/auth.py +++ b/auth.py @@ -4,7 +4,6 @@ Authentication for Art DAG L2 Server. User registration, login, and JWT tokens. """ -import json import os import secrets from datetime import datetime, timezone, timedelta @@ -15,6 +14,8 @@ import bcrypt from jose import JWTError, jwt from pydantic import BaseModel +import db + # JWT settings ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_DAYS = 30 @@ -68,25 +69,8 @@ class Token(BaseModel): expires_at: str -def get_users_path(data_dir: Path) -> Path: - """Get users file path.""" - return data_dir / "users.json" - - -def load_users(data_dir: Path) -> dict[str, dict]: - """Load users from disk.""" - path = get_users_path(data_dir) - if path.exists(): - with open(path) as f: - return json.load(f) - return {} - - -def save_users(data_dir: Path, users: dict[str, dict]): - """Save users to disk.""" - path = get_users_path(data_dir) - with open(path, "w") as f: - json.dump(users, f, indent=2) +# Keep DATA_DIR for keys (RSA keys still stored as files) +DATA_DIR = Path(os.environ.get("ARTDAG_DATA", str(Path.home() / ".artdag" / "l2"))) def hash_password(password: str) -> str: @@ -102,43 +86,53 @@ def verify_password(plain_password: str, hashed_password: str) -> bool: return bcrypt.checkpw(pw_bytes, hashed_password.encode('utf-8')) -def create_user(data_dir: Path, username: str, password: str, email: Optional[str] = None) -> User: +async def create_user(data_dir: Path, username: str, password: str, email: Optional[str] = None) -> User: """Create a new user with ActivityPub keys.""" from keys import generate_keypair - users = load_users(data_dir) - - if username in users: + if await db.user_exists(username): raise ValueError(f"Username already exists: {username}") - user = User( - username=username, - password_hash=hash_password(password), - created_at=datetime.now(timezone.utc).isoformat(), - email=email - ) - - users[username] = user.model_dump() - save_users(data_dir, users) + password_hash = hash_password(password) + user_data = await db.create_user(username, password_hash, email) # Generate ActivityPub keys for this user generate_keypair(data_dir, username) - return user + # Convert datetime to ISO string if needed + created_at = user_data.get("created_at") + if hasattr(created_at, 'isoformat'): + created_at = created_at.isoformat() + + return User( + username=username, + password_hash=password_hash, + created_at=created_at, + email=email + ) -def authenticate_user(data_dir: Path, username: str, password: str) -> Optional[User]: +async def authenticate_user(data_dir: Path, username: str, password: str) -> Optional[User]: """Authenticate a user by username and password.""" - users = load_users(data_dir) + user_data = await db.get_user(username) - if username not in users: + if not user_data: return None - user_data = users[username] if not verify_password(password, user_data["password_hash"]): return None - return User(**user_data) + # Convert datetime to ISO string if needed + created_at = user_data.get("created_at") + if hasattr(created_at, 'isoformat'): + created_at = created_at.isoformat() + + return User( + username=user_data["username"], + password_hash=user_data["password_hash"], + created_at=created_at, + email=user_data.get("email") + ) def create_access_token(username: str) -> Token: @@ -170,14 +164,24 @@ def verify_token(token: str) -> Optional[str]: return None -def get_current_user(data_dir: Path, token: str) -> Optional[User]: +async def get_current_user(data_dir: Path, token: str) -> Optional[User]: """Get current user from token.""" username = verify_token(token) if not username: return None - users = load_users(data_dir) - if username not in users: + user_data = await db.get_user(username) + if not user_data: return None - return User(**users[username]) + # Convert datetime to ISO string if needed + created_at = user_data.get("created_at") + if hasattr(created_at, 'isoformat'): + created_at = created_at.isoformat() + + return User( + username=user_data["username"], + password_hash=user_data["password_hash"], + created_at=created_at, + email=user_data.get("email") + ) diff --git a/db.py b/db.py new file mode 100644 index 0000000..6e997aa --- /dev/null +++ b/db.py @@ -0,0 +1,456 @@ +""" +Database module for Art DAG L2 Server. + +Uses asyncpg for async PostgreSQL access with connection pooling. +""" + +import json +import os +from datetime import datetime, timezone +from typing import Optional +from contextlib import asynccontextmanager +from uuid import UUID + +import asyncpg + +# Connection pool (initialized on startup) +_pool: Optional[asyncpg.Pool] = None + +# Configuration from environment +DATABASE_URL = os.environ.get( + "DATABASE_URL", + "postgresql://artdag:artdag@localhost:5432/artdag" +) + +# Schema for database initialization +SCHEMA = """ +-- Users table +CREATE TABLE IF NOT EXISTS users ( + username VARCHAR(255) PRIMARY KEY, + password_hash VARCHAR(255) NOT NULL, + email VARCHAR(255), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Assets table +CREATE TABLE IF NOT EXISTS assets ( + name VARCHAR(255) PRIMARY KEY, + content_hash VARCHAR(128) NOT NULL, + asset_type VARCHAR(50) NOT NULL, + tags JSONB DEFAULT '[]'::jsonb, + metadata JSONB DEFAULT '{}'::jsonb, + url TEXT, + provenance JSONB, + description TEXT, + origin JSONB, + owner VARCHAR(255) NOT NULL REFERENCES users(username), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ +); + +-- Activities table +CREATE TABLE IF NOT EXISTS activities ( + activity_id UUID PRIMARY KEY, + activity_type VARCHAR(50) NOT NULL, + actor_id TEXT NOT NULL, + object_data JSONB NOT NULL, + published TIMESTAMPTZ NOT NULL, + signature JSONB +); + +-- Followers table +CREATE TABLE IF NOT EXISTS followers ( + id SERIAL PRIMARY KEY, + username VARCHAR(255) NOT NULL REFERENCES users(username), + acct VARCHAR(255) NOT NULL, + url TEXT NOT NULL, + public_key TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE(username, acct) +); + +-- Indexes +CREATE INDEX IF NOT EXISTS idx_users_created_at ON users(created_at); +CREATE INDEX IF NOT EXISTS idx_assets_content_hash ON assets(content_hash); +CREATE INDEX IF NOT EXISTS idx_assets_owner ON assets(owner); +CREATE INDEX IF NOT EXISTS idx_assets_created_at ON assets(created_at DESC); +CREATE INDEX IF NOT EXISTS idx_assets_tags ON assets USING GIN(tags); +CREATE INDEX IF NOT EXISTS idx_activities_actor_id ON activities(actor_id); +CREATE INDEX IF NOT EXISTS idx_activities_published ON activities(published DESC); +CREATE INDEX IF NOT EXISTS idx_followers_username ON followers(username); +""" + + +async def init_pool(): + """Initialize the connection pool and create tables. Call on app startup.""" + global _pool + _pool = await asyncpg.create_pool( + DATABASE_URL, + min_size=2, + max_size=10, + command_timeout=60 + ) + # Create tables if they don't exist + async with _pool.acquire() as conn: + await conn.execute(SCHEMA) + + +async def close_pool(): + """Close the connection pool. Call on app shutdown.""" + global _pool + if _pool: + await _pool.close() + _pool = None + + +def get_pool() -> asyncpg.Pool: + """Get the connection pool.""" + if _pool is None: + raise RuntimeError("Database pool not initialized") + return _pool + + +@asynccontextmanager +async def get_connection(): + """Get a connection from the pool.""" + async with get_pool().acquire() as conn: + yield conn + + +# ============ Users ============ + +async def get_user(username: str) -> Optional[dict]: + """Get user by username.""" + async with get_connection() as conn: + row = await conn.fetchrow( + "SELECT username, password_hash, email, created_at FROM users WHERE username = $1", + username + ) + if row: + return dict(row) + return None + + +async def get_all_users() -> dict[str, dict]: + """Get all users as a dict indexed by username.""" + async with get_connection() as conn: + rows = await conn.fetch( + "SELECT username, password_hash, email, created_at FROM users ORDER BY username" + ) + return {row["username"]: dict(row) for row in rows} + + +async def create_user(username: str, password_hash: str, email: Optional[str] = None) -> dict: + """Create a new user.""" + async with get_connection() as conn: + row = await conn.fetchrow( + """INSERT INTO users (username, password_hash, email) + VALUES ($1, $2, $3) + RETURNING username, password_hash, email, created_at""", + username, password_hash, email + ) + return dict(row) + + +async def user_exists(username: str) -> bool: + """Check if user exists.""" + async with get_connection() as conn: + result = await conn.fetchval( + "SELECT EXISTS(SELECT 1 FROM users WHERE username = $1)", + username + ) + return result + + +# ============ Assets ============ + +async def get_asset(name: str) -> Optional[dict]: + """Get asset by name.""" + async with get_connection() as conn: + row = await conn.fetchrow( + """SELECT name, content_hash, asset_type, tags, metadata, url, + provenance, description, origin, owner, created_at, updated_at + FROM assets WHERE name = $1""", + name + ) + if row: + return _parse_asset_row(row) + return None + + +async def get_asset_by_hash(content_hash: str) -> Optional[dict]: + """Get asset by content hash.""" + async with get_connection() as conn: + row = await conn.fetchrow( + """SELECT name, content_hash, asset_type, tags, metadata, url, + provenance, description, origin, owner, created_at, updated_at + FROM assets WHERE content_hash = $1""", + content_hash + ) + if row: + return _parse_asset_row(row) + return None + + +async def get_all_assets() -> dict[str, dict]: + """Get all assets as a dict indexed by name.""" + async with get_connection() as conn: + rows = await conn.fetch( + """SELECT name, content_hash, asset_type, tags, metadata, url, + provenance, description, origin, owner, created_at, updated_at + FROM assets ORDER BY created_at DESC""" + ) + return {row["name"]: _parse_asset_row(row) for row in rows} + + +async def get_assets_paginated(limit: int = 100, offset: int = 0) -> tuple[list[tuple[str, dict]], int]: + """Get paginated assets, returns (list of (name, asset) tuples, total_count).""" + async with get_connection() as conn: + total = await conn.fetchval("SELECT COUNT(*) FROM assets") + rows = await conn.fetch( + """SELECT name, content_hash, asset_type, tags, metadata, url, + provenance, description, origin, owner, created_at, updated_at + FROM assets ORDER BY created_at DESC LIMIT $1 OFFSET $2""", + limit, offset + ) + return [(row["name"], _parse_asset_row(row)) for row in rows], total + + +async def get_assets_by_owner(owner: str) -> dict[str, dict]: + """Get all assets owned by a user.""" + async with get_connection() as conn: + rows = await conn.fetch( + """SELECT name, content_hash, asset_type, tags, metadata, url, + provenance, description, origin, owner, created_at, updated_at + FROM assets WHERE owner = $1 ORDER BY created_at DESC""", + owner + ) + return {row["name"]: _parse_asset_row(row) for row in rows} + + +async def create_asset(asset: dict) -> dict: + """Create a new asset.""" + async with get_connection() as conn: + row = await conn.fetchrow( + """INSERT INTO assets (name, content_hash, asset_type, tags, metadata, + url, provenance, description, origin, owner, created_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + RETURNING *""", + asset["name"], + asset["content_hash"], + asset["asset_type"], + json.dumps(asset.get("tags", [])), + json.dumps(asset.get("metadata", {})), + asset.get("url"), + json.dumps(asset.get("provenance")) if asset.get("provenance") else None, + asset.get("description"), + json.dumps(asset.get("origin")) if asset.get("origin") else None, + asset["owner"], + asset.get("created_at") or datetime.now(timezone.utc).isoformat() + ) + return _parse_asset_row(row) + + +async def update_asset(name: str, updates: dict) -> Optional[dict]: + """Update an existing asset.""" + # Build dynamic UPDATE query + set_clauses = [] + values = [] + idx = 1 + + for key, value in updates.items(): + if key in ("tags", "metadata", "provenance", "origin"): + set_clauses.append(f"{key} = ${idx}") + values.append(json.dumps(value) if value is not None else None) + else: + set_clauses.append(f"{key} = ${idx}") + values.append(value) + idx += 1 + + set_clauses.append(f"updated_at = ${idx}") + values.append(datetime.now(timezone.utc)) + idx += 1 + + values.append(name) # WHERE clause + + async with get_connection() as conn: + row = await conn.fetchrow( + f"""UPDATE assets SET {', '.join(set_clauses)} + WHERE name = ${idx} RETURNING *""", + *values + ) + if row: + return _parse_asset_row(row) + return None + + +async def asset_exists(name: str) -> bool: + """Check if asset exists.""" + async with get_connection() as conn: + return await conn.fetchval( + "SELECT EXISTS(SELECT 1 FROM assets WHERE name = $1)", + name + ) + + +def _parse_asset_row(row) -> dict: + """Parse a database row into an asset dict, handling JSONB fields.""" + asset = dict(row) + # Convert datetime to ISO string + if asset.get("created_at"): + asset["created_at"] = asset["created_at"].isoformat() + if asset.get("updated_at"): + asset["updated_at"] = asset["updated_at"].isoformat() + return asset + + +# ============ Activities ============ + +async def get_activity(activity_id: str) -> Optional[dict]: + """Get activity by ID.""" + async with get_connection() as conn: + row = await conn.fetchrow( + """SELECT activity_id, activity_type, actor_id, object_data, published, signature + FROM activities WHERE activity_id = $1""", + UUID(activity_id) + ) + if row: + return _parse_activity_row(row) + return None + + +async def get_activity_by_index(index: int) -> Optional[dict]: + """Get activity by index (for backward compatibility with URL scheme).""" + async with get_connection() as conn: + row = await conn.fetchrow( + """SELECT activity_id, activity_type, actor_id, object_data, published, signature + FROM activities ORDER BY published ASC LIMIT 1 OFFSET $1""", + index + ) + if row: + return _parse_activity_row(row) + return None + + +async def get_all_activities() -> list[dict]: + """Get all activities ordered by published date (oldest first for index compatibility).""" + async with get_connection() as conn: + rows = await conn.fetch( + """SELECT activity_id, activity_type, actor_id, object_data, published, signature + FROM activities ORDER BY published ASC""" + ) + return [_parse_activity_row(row) for row in rows] + + +async def get_activities_paginated(limit: int = 100, offset: int = 0) -> tuple[list[dict], int]: + """Get paginated activities (newest first), returns (activities, total_count).""" + async with get_connection() as conn: + total = await conn.fetchval("SELECT COUNT(*) FROM activities") + rows = await conn.fetch( + """SELECT activity_id, activity_type, actor_id, object_data, published, signature + FROM activities ORDER BY published DESC LIMIT $1 OFFSET $2""", + limit, offset + ) + return [_parse_activity_row(row) for row in rows], total + + +async def get_activities_by_actor(actor_id: str) -> list[dict]: + """Get all activities by an actor.""" + async with get_connection() as conn: + rows = await conn.fetch( + """SELECT activity_id, activity_type, actor_id, object_data, published, signature + FROM activities WHERE actor_id = $1 ORDER BY published DESC""", + actor_id + ) + return [_parse_activity_row(row) for row in rows] + + +async def create_activity(activity: dict) -> dict: + """Create a new activity.""" + async with get_connection() as conn: + row = await conn.fetchrow( + """INSERT INTO activities (activity_id, activity_type, actor_id, object_data, published, signature) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING *""", + UUID(activity["activity_id"]), + activity["activity_type"], + activity["actor_id"], + json.dumps(activity["object_data"]), + activity["published"], + json.dumps(activity.get("signature")) if activity.get("signature") else None + ) + return _parse_activity_row(row) + + +async def count_activities() -> int: + """Get total activity count.""" + async with get_connection() as conn: + return await conn.fetchval("SELECT COUNT(*) FROM activities") + + +def _parse_activity_row(row) -> dict: + """Parse a database row into an activity dict, handling JSONB fields.""" + activity = dict(row) + # Convert UUID to string + if activity.get("activity_id"): + activity["activity_id"] = str(activity["activity_id"]) + # Convert datetime to ISO string + if activity.get("published"): + activity["published"] = activity["published"].isoformat() + return activity + + +# ============ Followers ============ + +async def get_followers(username: str) -> list[dict]: + """Get followers for a user.""" + async with get_connection() as conn: + rows = await conn.fetch( + """SELECT id, username, acct, url, public_key, created_at + FROM followers WHERE username = $1""", + username + ) + return [dict(row) for row in rows] + + +async def get_all_followers() -> list: + """Get all followers (for backward compatibility with old global list).""" + async with get_connection() as conn: + rows = await conn.fetch( + """SELECT DISTINCT url FROM followers""" + ) + return [row["url"] for row in rows] + + +async def add_follower(username: str, acct: str, url: str, public_key: Optional[str] = None) -> dict: + """Add a follower.""" + async with get_connection() as conn: + row = await conn.fetchrow( + """INSERT INTO followers (username, acct, url, public_key) + VALUES ($1, $2, $3, $4) + ON CONFLICT (username, acct) DO UPDATE SET url = $3, public_key = $4 + RETURNING *""", + username, acct, url, public_key + ) + return dict(row) + + +async def remove_follower(username: str, acct: str) -> bool: + """Remove a follower.""" + async with get_connection() as conn: + result = await conn.execute( + "DELETE FROM followers WHERE username = $1 AND acct = $2", + username, acct + ) + return result == "DELETE 1" + + +# ============ Stats ============ + +async def get_stats() -> dict: + """Get counts for dashboard.""" + async with get_connection() as conn: + assets = await conn.fetchval("SELECT COUNT(*) FROM assets") + activities = await conn.fetchval("SELECT COUNT(*) FROM activities") + users = await conn.fetchval("SELECT COUNT(*) FROM users") + return {"assets": assets, "activities": activities, "users": users} diff --git a/docker-compose.yml b/docker-compose.yml index c895e0d..0c56941 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,17 +1,38 @@ version: "3.8" services: + postgres: + image: postgres:16-alpine + environment: + POSTGRES_USER: artdag + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-artdag} + POSTGRES_DB: artdag + volumes: + - postgres_data:/var/lib/postgresql/data + networks: + - internal + healthcheck: + test: ["CMD-SHELL", "pg_isready -U artdag"] + interval: 5s + timeout: 5s + retries: 5 + l2-server: image: git.rose-ash.com/art-dag/l2-server:latest env_file: - .env environment: - ARTDAG_DATA=/data/l2 + - DATABASE_URL=postgresql://artdag:${POSTGRES_PASSWORD:-artdag}@postgres:5432/artdag # ARTDAG_DOMAIN, ARTDAG_USER, ARTDAG_L1, JWT_SECRET from .env file volumes: - - l2_data:/data/l2 + - l2_data:/data/l2 # Still needed for RSA keys networks: + - internal - externalnet + depends_on: + postgres: + condition: service_healthy deploy: replicas: 1 restart_policy: @@ -19,7 +40,9 @@ services: volumes: l2_data: + postgres_data: networks: + internal: externalnet: external: true diff --git a/migrate.py b/migrate.py new file mode 100755 index 0000000..6dbfb1f --- /dev/null +++ b/migrate.py @@ -0,0 +1,246 @@ +#!/usr/bin/env python3 +""" +Migration script: JSON files to PostgreSQL. + +Usage: + python migrate.py [--dry-run] + +Migrates: +- users.json -> users table +- registry.json -> assets table +- activities.json -> activities table +- followers.json -> followers table + +Does NOT migrate: +- keys/ directory (stays as files) +""" + +import asyncio +import json +import os +import sys +from pathlib import Path +from datetime import datetime, timezone +from uuid import UUID + +import asyncpg + +# Configuration +DATA_DIR = Path(os.environ.get("ARTDAG_DATA", str(Path.home() / ".artdag" / "l2"))) +DATABASE_URL = os.environ.get( + "DATABASE_URL", + "postgresql://artdag:artdag@localhost:5432/artdag" +) + +SCHEMA = """ +-- Drop existing tables (careful in production!) +DROP TABLE IF EXISTS followers CASCADE; +DROP TABLE IF EXISTS activities CASCADE; +DROP TABLE IF EXISTS assets CASCADE; +DROP TABLE IF EXISTS users CASCADE; + +-- Users table +CREATE TABLE users ( + username VARCHAR(255) PRIMARY KEY, + password_hash VARCHAR(255) NOT NULL, + email VARCHAR(255), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- Assets table +CREATE TABLE assets ( + name VARCHAR(255) PRIMARY KEY, + content_hash VARCHAR(128) NOT NULL, + asset_type VARCHAR(50) NOT NULL, + tags JSONB DEFAULT '[]'::jsonb, + metadata JSONB DEFAULT '{}'::jsonb, + url TEXT, + provenance JSONB, + description TEXT, + origin JSONB, + owner VARCHAR(255) NOT NULL REFERENCES users(username), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ +); + +-- Activities table +CREATE TABLE activities ( + activity_id UUID PRIMARY KEY, + activity_type VARCHAR(50) NOT NULL, + actor_id TEXT NOT NULL, + object_data JSONB NOT NULL, + published TIMESTAMPTZ NOT NULL, + signature JSONB +); + +-- Followers table +CREATE TABLE followers ( + id SERIAL PRIMARY KEY, + username VARCHAR(255) NOT NULL REFERENCES users(username), + acct VARCHAR(255) NOT NULL, + url TEXT NOT NULL, + public_key TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE(username, acct) +); + +-- Indexes +CREATE INDEX idx_users_created_at ON users(created_at); +CREATE INDEX idx_assets_content_hash ON assets(content_hash); +CREATE INDEX idx_assets_owner ON assets(owner); +CREATE INDEX idx_assets_created_at ON assets(created_at DESC); +CREATE INDEX idx_assets_tags ON assets USING GIN(tags); +CREATE INDEX idx_activities_actor_id ON activities(actor_id); +CREATE INDEX idx_activities_published ON activities(published DESC); +CREATE INDEX idx_followers_username ON followers(username); +""" + + +async def migrate(dry_run: bool = False): + """Run the migration.""" + print(f"Migrating from {DATA_DIR} to PostgreSQL") + print(f"Database: {DATABASE_URL}") + print(f"Dry run: {dry_run}") + print() + + # Load JSON files + users = load_json(DATA_DIR / "users.json") or {} + registry = load_json(DATA_DIR / "registry.json") or {"assets": {}} + activities_data = load_json(DATA_DIR / "activities.json") or {"activities": []} + followers = load_json(DATA_DIR / "followers.json") or [] + + assets = registry.get("assets", {}) + activities = activities_data.get("activities", []) + + print(f"Found {len(users)} users") + print(f"Found {len(assets)} assets") + print(f"Found {len(activities)} activities") + print(f"Found {len(followers)} followers") + print() + + if dry_run: + print("DRY RUN - no changes made") + return + + # Connect and migrate + conn = await asyncpg.connect(DATABASE_URL) + try: + # Create schema + print("Creating schema...") + await conn.execute(SCHEMA) + + # Migrate users + print("Migrating users...") + for username, user_data in users.items(): + await conn.execute( + """INSERT INTO users (username, password_hash, email, created_at) + VALUES ($1, $2, $3, $4)""", + username, + user_data["password_hash"], + user_data.get("email"), + parse_timestamp(user_data.get("created_at")) + ) + print(f" Migrated {len(users)} users") + + # Migrate assets + print("Migrating assets...") + for name, asset in assets.items(): + await conn.execute( + """INSERT INTO assets (name, content_hash, asset_type, tags, metadata, + url, provenance, description, origin, owner, + created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)""", + name, + asset["content_hash"], + asset["asset_type"], + json.dumps(asset.get("tags", [])), + json.dumps(asset.get("metadata", {})), + asset.get("url"), + json.dumps(asset.get("provenance")) if asset.get("provenance") else None, + asset.get("description"), + json.dumps(asset.get("origin")) if asset.get("origin") else None, + asset["owner"], + parse_timestamp(asset.get("created_at")), + parse_timestamp(asset.get("updated_at")) + ) + print(f" Migrated {len(assets)} assets") + + # Migrate activities + print("Migrating activities...") + for activity in activities: + await conn.execute( + """INSERT INTO activities (activity_id, activity_type, actor_id, + object_data, published, signature) + VALUES ($1, $2, $3, $4, $5, $6)""", + UUID(activity["activity_id"]), + activity["activity_type"], + activity["actor_id"], + json.dumps(activity["object_data"]), + parse_timestamp(activity["published"]), + json.dumps(activity.get("signature")) if activity.get("signature") else None + ) + print(f" Migrated {len(activities)} activities") + + # Migrate followers + print("Migrating followers...") + if followers and users: + first_user = list(users.keys())[0] + migrated = 0 + for follower in followers: + if isinstance(follower, str): + # Old format: just URL string + await conn.execute( + """INSERT INTO followers (username, acct, url) + VALUES ($1, $2, $3) + ON CONFLICT DO NOTHING""", + first_user, + follower, + follower + ) + migrated += 1 + elif isinstance(follower, dict): + await conn.execute( + """INSERT INTO followers (username, acct, url, public_key) + VALUES ($1, $2, $3, $4) + ON CONFLICT DO NOTHING""", + follower.get("username", first_user), + follower.get("acct", follower.get("url", "")), + follower["url"], + follower.get("public_key") + ) + migrated += 1 + print(f" Migrated {migrated} followers") + else: + print(" No followers to migrate") + + print() + print("Migration complete!") + + finally: + await conn.close() + + +def load_json(path: Path) -> dict | list | None: + """Load JSON file if it exists.""" + if path.exists(): + with open(path) as f: + return json.load(f) + return None + + +def parse_timestamp(ts: str | None) -> datetime | None: + """Parse ISO timestamp string to datetime.""" + if not ts: + return datetime.now(timezone.utc) + try: + # Handle various ISO formats + if ts.endswith('Z'): + ts = ts[:-1] + '+00:00' + return datetime.fromisoformat(ts) + except Exception: + return datetime.now(timezone.utc) + + +if __name__ == "__main__": + dry_run = "--dry-run" in sys.argv + asyncio.run(migrate(dry_run)) diff --git a/requirements.txt b/requirements.txt index 4cfd4d9..faa8463 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ bcrypt>=4.0.0 python-jose[cryptography]>=3.3.0 markdown>=3.5.0 python-multipart>=0.0.6 +asyncpg>=0.29.0 diff --git a/server.py b/server.py index f436752..4ef0a61 100644 --- a/server.py +++ b/server.py @@ -13,6 +13,7 @@ import hashlib import json import os import uuid +from contextlib import asynccontextmanager from datetime import datetime, timezone from pathlib import Path from typing import Optional @@ -25,10 +26,11 @@ from pydantic import BaseModel import requests import markdown +import db from auth import ( UserCreate, UserLogin, Token, User, create_user, authenticate_user, create_access_token, - verify_token, get_current_user, load_users + verify_token, get_current_user ) # Configuration @@ -47,10 +49,20 @@ README_CONTENT = "" if README_PATH.exists(): README_CONTENT = README_PATH.read_text() + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Manage database connection pool lifecycle.""" + await db.init_pool() + yield + await db.close_pool() + + app = FastAPI( title="Art DAG L2 Server", description="ActivityPub server for Art DAG ownership and federation", - version="0.1.0" + version="0.1.0", + lifespan=lifespan ) @@ -115,39 +127,17 @@ class UpdateAssetRequest(BaseModel): origin: Optional[dict] = None -# ============ Storage ============ +# ============ Storage (Database) ============ -def load_registry() -> dict: - """Load registry from disk.""" - path = DATA_DIR / "registry.json" - if path.exists(): - with open(path) as f: - return json.load(f) - return {"version": "1.0", "assets": {}} +async def load_registry() -> dict: + """Load registry from database.""" + assets = await db.get_all_assets() + return {"version": "1.0", "assets": assets} -def save_registry(registry: dict): - """Save registry to disk.""" - path = DATA_DIR / "registry.json" - with open(path, "w") as f: - json.dump(registry, f, indent=2) - - -def load_activities() -> list: - """Load activities from disk.""" - path = DATA_DIR / "activities.json" - if path.exists(): - with open(path) as f: - data = json.load(f) - return data.get("activities", []) - return [] - - -def save_activities(activities: list): - """Save activities to disk.""" - path = DATA_DIR / "activities.json" - with open(path, "w") as f: - json.dump({"version": "1.0", "activities": activities}, f, indent=2) +async def load_activities() -> list: + """Load activities from database.""" + return await db.get_all_activities() def load_actor(username: str) -> dict: @@ -175,26 +165,14 @@ def load_actor(username: str) -> dict: return actor -def user_exists(username: str) -> bool: +async def user_exists(username: str) -> bool: """Check if a user exists.""" - users = load_users(DATA_DIR) - return username in users + return await db.user_exists(username) -def load_followers() -> list: - """Load followers list.""" - path = DATA_DIR / "followers.json" - if path.exists(): - with open(path) as f: - return json.load(f) - return [] - - -def save_followers(followers: list): - """Save followers list.""" - path = DATA_DIR / "followers.json" - with open(path, "w") as f: - json.dump(followers, f, indent=2) +async def load_followers() -> list: + """Load followers list from database.""" + return await db.get_all_followers() # ============ Signing ============ @@ -300,48 +278,7 @@ def wants_html(request: Request) -> bool: return "text/html" in accept and "application/json" not in accept and "application/activity+json" not in accept -# ============ UI Endpoints ============ - -@app.get("/ui", response_class=HTMLResponse) -async def ui_home(request: Request): - """Home page with README and stats.""" - username = get_user_from_cookie(request) - registry = load_registry() - activities = load_activities() - users = load_users(DATA_DIR) - - readme_html = markdown.markdown(README_CONTENT, extensions=['tables', 'fenced_code']) - - content = f''' -
-
-
{len(registry.get("assets", {}))}
-
Assets
-
-
-
{len(activities)}
-
Activities
-
-
-
{len(users)}
-
Users
-
-
-
- {readme_html} -
- ''' - return HTMLResponse(base_html("Home", content, username)) - +# ============ Auth UI Endpoints ============ @app.get("/login", response_class=HTMLResponse) async def ui_login_page(request: Request): @@ -388,7 +325,7 @@ async def ui_login_submit(request: Request): if not username or not password: return HTMLResponse('
Username and password are required
') - user = authenticate_user(DATA_DIR, username, password) + user = await authenticate_user(DATA_DIR, username, password) if not user: return HTMLResponse('
Invalid username or password
') @@ -472,7 +409,7 @@ async def ui_register_submit(request: Request): return HTMLResponse('
Password must be at least 6 characters
') try: - user = create_user(DATA_DIR, username, password, email) + user = await create_user(DATA_DIR, username, password, email) except ValueError as e: return HTMLResponse(f'
{str(e)}
') @@ -500,121 +437,12 @@ async def logout(): return response -@app.get("/ui/assets", response_class=HTMLResponse) -async def ui_registry_page(request: Request): - """Registry page showing all assets.""" - username = get_user_from_cookie(request) - registry = load_registry() - assets = registry.get("assets", {}) +# ============ HTML Rendering Helpers ============ - if not assets: - content = ''' -

Registry

-

No assets registered yet.

- ''' - else: - rows = "" - for name, asset in sorted(assets.items(), key=lambda x: x[1].get("created_at", ""), reverse=True): - hash_short = asset.get("content_hash", "")[:16] + "..." - owner = asset.get("owner", "unknown") - asset_type = asset.get("asset_type", "") - type_color = "bg-blue-600" if asset_type == "image" else "bg-purple-600" if asset_type == "video" else "bg-gray-600" - rows += f''' - - - {name} - - {asset_type} - - {owner} - - {hash_short} - {", ".join(asset.get("tags", []))} - - ''' - content = f''' -

Registry ({len(assets)} assets)

-
- - - - - - - - - - - - {rows} - -
NameTypeOwnerContent HashTags
-
- ''' - return HTMLResponse(base_html("Registry", content, username)) - - -@app.get("/ui/activities", response_class=HTMLResponse) -async def ui_activities_page(request: Request): - """Activities page showing all signed activities.""" - username = get_user_from_cookie(request) - activities = load_activities() - - if not activities: - content = ''' -

Activities

-

No activities yet.

- ''' - else: - rows = "" - for i, activity in enumerate(reversed(activities)): - # Index from end since we reversed - activity_index = len(activities) - 1 - i - obj = activity.get("object_data", {}) - activity_type = activity.get("activity_type", "") - type_color = "bg-green-600" if activity_type == "Create" else "bg-yellow-600" if activity_type == "Update" else "bg-gray-600" - actor_id = activity.get("actor_id", "") - actor_name = actor_id.split("/")[-1] if actor_id else "unknown" - rows += f''' - - {activity_type} - {obj.get("name", "Untitled")} - - {actor_name} - - {activity.get("published", "")[:10]} - - View - - - ''' - content = f''' -

Activities ({len(activities)} total)

-
- - - - - - - - - - - - {rows} - -
TypeObjectActorPublished
-
- ''' - return HTMLResponse(base_html("Activities", content, username)) - - -@app.get("/ui/activity/{activity_index}", response_class=HTMLResponse) async def ui_activity_detail(activity_index: int, request: Request): - """Activity detail page with full content display.""" + """Activity detail page with full content display. Helper function for HTML rendering.""" username = get_user_from_cookie(request) - activities = load_activities() + activities = await load_activities() if activity_index < 0 or activity_index >= len(activities): content = ''' @@ -646,7 +474,7 @@ async def ui_activity_detail(activity_index: int, request: Request): # Fallback: if activity doesn't have provenance, look up the asset from registry if not provenance or not origin: - registry = load_registry() + registry = await load_registry() assets = registry.get("assets", {}) # Find asset by content_hash or name for asset_name, asset_data in assets.items(): @@ -869,54 +697,10 @@ async def ui_activity_detail(activity_index: int, request: Request): return HTMLResponse(base_html(f"Activity: {obj_name}", content, username)) -@app.get("/ui/users", response_class=HTMLResponse) -async def ui_users_page(request: Request): - """Users page showing all registered users.""" - current_user = get_user_from_cookie(request) - users = load_users(DATA_DIR) - - if not users: - content = ''' -

Users

-

No users registered yet.

- ''' - else: - rows = "" - for uname, user_data in sorted(users.items()): - webfinger = f"@{uname}@{DOMAIN}" - rows += f''' - - {uname} - {webfinger} - {user_data.get("created_at", "")[:10]} - - ''' - content = f''' -

Users ({len(users)} registered)

-

Each user has their own ActivityPub actor that can be followed from Mastodon and other federated platforms.

-
- - - - - - - - - - {rows} - -
UsernameActivityPub HandleRegistered
-
- ''' - return HTMLResponse(base_html("Users", content, current_user)) - - -@app.get("/ui/asset/{name}", response_class=HTMLResponse) async def ui_asset_detail(name: str, request: Request): - """Asset detail page with content preview and provenance.""" + """Asset detail page with content preview and provenance. Helper function for HTML rendering.""" username = get_user_from_cookie(request) - registry = load_registry() + registry = await load_registry() assets = registry.get("assets", {}) if name not in assets: @@ -1159,12 +943,11 @@ async def ui_asset_detail(name: str, request: Request): return HTMLResponse(base_html(f"Asset: {name}", content, username)) -@app.get("/ui/user/{username}", response_class=HTMLResponse) async def ui_user_detail(username: str, request: Request): - """User detail page showing their published assets.""" + """User detail page showing their published assets. Helper function for HTML rendering.""" current_user = get_user_from_cookie(request) - if not user_exists(username): + if not await user_exists(username): content = f'''

User Not Found

No user named "{username}" exists.

@@ -1173,12 +956,12 @@ async def ui_user_detail(username: str, request: Request): return HTMLResponse(base_html("User Not Found", content, current_user)) # Get user's assets - registry = load_registry() + registry = await load_registry() all_assets = registry.get("assets", {}) user_assets = {name: asset for name, asset in all_assets.items() if asset.get("owner") == username} # Get user's activities - all_activities = load_activities() + all_activities = await load_activities() actor_id = f"https://{DOMAIN}/users/{username}" user_activities = [a for a in all_activities if a.get("actor_id") == actor_id] @@ -1194,7 +977,7 @@ async def ui_user_detail(username: str, request: Request): rows += f''' - {name} + {name} {asset_type} {hash_short} @@ -1261,9 +1044,9 @@ async def ui_user_detail(username: str, request: Request): @app.get("/") async def root(request: Request): """Server info. HTML shows home page with counts, JSON returns stats.""" - registry = load_registry() - activities = load_activities() - users = load_users(DATA_DIR) + registry = await load_registry() + activities = await load_activities() + users = await db.get_all_users() assets_count = len(registry.get("assets", {})) activities_count = len(activities) @@ -1271,21 +1054,34 @@ async def root(request: Request): if wants_html(request): username = get_user_from_cookie(request) + readme_html = markdown.markdown(README_CONTENT, extensions=['tables', 'fenced_code']) content = f''' -
- -
{assets_count}
-
Assets
+
+
+ {readme_html} +
''' return HTMLResponse(base_html("Home", content, username)) @@ -1310,7 +1106,7 @@ async def get_optional_user( """Get current user if authenticated, None otherwise.""" if not credentials: return None - return get_current_user(DATA_DIR, credentials.credentials) + return await get_current_user(DATA_DIR, credentials.credentials) async def get_required_user( @@ -1319,7 +1115,7 @@ async def get_required_user( """Get current user, raise 401 if not authenticated.""" if not credentials: raise HTTPException(401, "Not authenticated") - user = get_current_user(DATA_DIR, credentials.credentials) + user = await get_current_user(DATA_DIR, credentials.credentials) if not user: raise HTTPException(401, "Invalid token") return user @@ -1329,7 +1125,7 @@ async def get_required_user( async def register(req: UserCreate): """Register a new user.""" try: - user = create_user(DATA_DIR, req.username, req.password, req.email) + user = await create_user(DATA_DIR, req.username, req.password, req.email) except ValueError as e: raise HTTPException(400, str(e)) @@ -1339,7 +1135,7 @@ async def register(req: UserCreate): @app.post("/auth/login", response_model=Token) async def login(req: UserLogin): """Login and get access token.""" - user = authenticate_user(DATA_DIR, req.username, req.password) + user = await authenticate_user(DATA_DIR, req.username, req.password) if not user: raise HTTPException(401, "Invalid username or password") @@ -1385,7 +1181,7 @@ async def webfinger(resource: str): if domain != DOMAIN: raise HTTPException(404, f"Unknown domain: {domain}") - if not user_exists(username): + if not await user_exists(username): raise HTTPException(404, f"Unknown user: {username}") return JSONResponse( @@ -1406,7 +1202,7 @@ async def webfinger(resource: str): @app.get("/users") async def get_users_list(request: Request, page: int = 1, limit: int = 20): """Get all users. HTML for browsers (with infinite scroll), JSON for APIs (with pagination).""" - all_users = list(load_users(DATA_DIR).items()) + all_users = list((await db.get_all_users()).items()) total = len(all_users) # Sort by username @@ -1498,7 +1294,7 @@ async def get_users_list(request: Request, page: int = 1, limit: int = 20): @app.get("/users/{username}") async def get_actor(username: str, request: Request): """Get actor profile for any registered user. Content negotiation: HTML for browsers, JSON for APIs.""" - if not user_exists(username): + if not await user_exists(username): if wants_html(request): content = f'''

User Not Found

@@ -1529,11 +1325,11 @@ async def get_actor(username: str, request: Request): @app.get("/users/{username}/outbox") async def get_outbox(username: str, page: bool = False): """Get actor's outbox (activities they created).""" - if not user_exists(username): + if not await user_exists(username): raise HTTPException(404, f"Unknown user: {username}") # Filter activities by this user's actor_id - all_activities = load_activities() + all_activities = await load_activities() actor_id = f"https://{DOMAIN}/users/{username}" user_activities = [a for a in all_activities if a.get("actor_id") == actor_id] @@ -1565,7 +1361,7 @@ async def get_outbox(username: str, page: bool = False): @app.post("/users/{username}/inbox") async def post_inbox(username: str, request: Request): """Receive activities from other servers.""" - if not user_exists(username): + if not await user_exists(username): raise HTTPException(404, f"Unknown user: {username}") body = await request.json() @@ -1573,12 +1369,9 @@ async def post_inbox(username: str, request: Request): # Handle Follow requests if activity_type == "Follow": - follower = body.get("actor") - # TODO: Per-user followers - for now use global followers - followers = load_followers() - if follower not in followers: - followers.append(follower) - save_followers(followers) + follower_url = body.get("actor") + # Add follower to database + await db.add_follower(username, follower_url, follower_url) # Send Accept (in production, do this async) # For now just acknowledge @@ -1591,11 +1384,11 @@ async def post_inbox(username: str, request: Request): @app.get("/users/{username}/followers") async def get_followers(username: str): """Get actor's followers.""" - if not user_exists(username): + if not await user_exists(username): raise HTTPException(404, f"Unknown user: {username}") # TODO: Per-user followers - for now use global followers - followers = load_followers() + followers = await load_followers() return JSONResponse( content={ @@ -1614,7 +1407,7 @@ async def get_followers(username: str): @app.get("/assets") async def get_registry(request: Request, page: int = 1, limit: int = 20): """Get registry. HTML for browsers (with infinite scroll), JSON for APIs (with pagination).""" - registry = load_registry() + registry = await load_registry() all_assets = list(registry.get("assets", {}).items()) total = len(all_assets) @@ -1654,7 +1447,7 @@ async def get_registry(request: Request, page: int = 1, limit: int = 20): {content_hash} - View + View ''' @@ -1714,9 +1507,15 @@ async def get_registry(request: Request, page: int = 1, limit: int = 20): @app.get("/asset/{name}") -async def get_asset_by_name(name: str, request: Request): +async def get_asset_by_name_legacy(name: str): + """Legacy route - redirect to /assets/{name}.""" + return RedirectResponse(url=f"/assets/{name}", status_code=301) + + +@app.get("/assets/{name}") +async def get_asset(name: str, request: Request): """Get asset by name. HTML for browsers, JSON for APIs.""" - registry = load_registry() + registry = await load_registry() if name not in registry.get("assets", {}): if wants_html(request): content = f''' @@ -1733,43 +1532,30 @@ async def get_asset_by_name(name: str, request: Request): return registry["assets"][name] -@app.get("/assets/{name}") -async def get_asset(name: str): - """Get a specific asset (API only, use /asset/{name} for content negotiation).""" - registry = load_registry() - if name not in registry.get("assets", {}): - raise HTTPException(404, f"Asset not found: {name}") - return registry["assets"][name] - - @app.patch("/assets/{name}") async def update_asset(name: str, req: UpdateAssetRequest, user: User = Depends(get_required_user)): """Update an existing asset's metadata. Creates an Update activity.""" - registry = load_registry() - if name not in registry.get("assets", {}): + asset = await db.get_asset(name) + if not asset: raise HTTPException(404, f"Asset not found: {name}") - asset = registry["assets"][name] - # Check ownership if asset.get("owner") != user.username: raise HTTPException(403, f"Not authorized to update asset owned by {asset.get('owner')}") - # Update fields that were provided + # Build updates dict + updates = {} if req.description is not None: - asset["description"] = req.description + updates["description"] = req.description if req.tags is not None: - asset["tags"] = req.tags + updates["tags"] = req.tags if req.metadata is not None: - asset["metadata"] = {**asset.get("metadata", {}), **req.metadata} + updates["metadata"] = {**asset.get("metadata", {}), **req.metadata} if req.origin is not None: - asset["origin"] = req.origin + updates["origin"] = req.origin - asset["updated_at"] = datetime.now(timezone.utc).isoformat() - - # Save registry - registry["assets"][name] = asset - save_registry(registry) + # Update asset in database + updated_asset = await db.update_asset(name, updates) # Create Update activity activity = { @@ -1777,37 +1563,33 @@ async def update_asset(name: str, req: UpdateAssetRequest, user: User = Depends( "activity_type": "Update", "actor_id": f"https://{DOMAIN}/users/{user.username}", "object_data": { - "type": asset.get("asset_type", "Object").capitalize(), + "type": updated_asset.get("asset_type", "Object").capitalize(), "name": name, - "id": f"https://{DOMAIN}/objects/{asset['content_hash']}", + "id": f"https://{DOMAIN}/objects/{updated_asset['content_hash']}", "contentHash": { "algorithm": "sha3-256", - "value": asset["content_hash"] + "value": updated_asset["content_hash"] }, "attributedTo": f"https://{DOMAIN}/users/{user.username}", "summary": req.description, - "tag": req.tags or asset.get("tags", []) + "tag": req.tags or updated_asset.get("tags", []) }, - "published": asset["updated_at"] + "published": updated_asset.get("updated_at", datetime.now(timezone.utc).isoformat()) } # Sign activity with the user's keys activity = sign_activity(activity, user.username) - # Save activity - activities = load_activities() - activities.append(activity) - save_activities(activities) + # Save activity to database + await db.create_activity(activity) - return {"asset": asset, "activity": activity} + return {"asset": updated_asset, "activity": activity} -def _register_asset_impl(req: RegisterRequest, owner: str): +async def _register_asset_impl(req: RegisterRequest, owner: str): """Internal implementation for registering an asset.""" - registry = load_registry() - # Check if name exists - if req.name in registry.get("assets", {}): + if await db.asset_exists(req.name): raise HTTPException(400, f"Asset already exists: {req.name}") # Create asset @@ -1824,11 +1606,8 @@ def _register_asset_impl(req: RegisterRequest, owner: str): "created_at": now } - # Add to registry - if "assets" not in registry: - registry["assets"] = {} - registry["assets"][req.name] = asset - save_registry(registry) + # Save asset to database + created_asset = await db.create_asset(asset) # Create ownership activity object_data = { @@ -1857,18 +1636,16 @@ def _register_asset_impl(req: RegisterRequest, owner: str): # Sign activity with the owner's keys activity = sign_activity(activity, owner) - # Save activity - activities = load_activities() - activities.append(activity) - save_activities(activities) + # Save activity to database + await db.create_activity(activity) - return {"asset": asset, "activity": activity} + return {"asset": created_asset, "activity": activity} @app.post("/assets") async def register_asset(req: RegisterRequest, user: User = Depends(get_required_user)): """Register a new asset and create ownership activity. Requires authentication.""" - return _register_asset_impl(req, user.username) + return await _register_asset_impl(req, user.username) @app.post("/assets/record-run") @@ -1903,7 +1680,7 @@ async def record_run(req: RecordRunRequest, user: User = Depends(get_required_us } # Register the output under the authenticated user - return _register_asset_impl(RegisterRequest( + return await _register_asset_impl(RegisterRequest( name=req.output_name, content_hash=output_hash, asset_type="video", # Could be smarter about this @@ -1933,8 +1710,7 @@ async def publish_cache(req: PublishCacheRequest, user: User = Depends(get_requi raise HTTPException(400, "External origin requires a URL") # Check if asset name already exists - registry = load_registry() - if req.asset_name in registry.get("assets", {}): + if await db.asset_exists(req.asset_name): raise HTTPException(400, f"Asset name already exists: {req.asset_name}") # Create asset @@ -1951,11 +1727,8 @@ async def publish_cache(req: PublishCacheRequest, user: User = Depends(get_requi "created_at": now } - # Add to registry - if "assets" not in registry: - registry["assets"] = {} - registry["assets"][req.asset_name] = asset - save_registry(registry) + # Save asset to database + created_asset = await db.create_asset(asset) # Create ownership activity with origin info object_data = { @@ -1998,12 +1771,10 @@ async def publish_cache(req: PublishCacheRequest, user: User = Depends(get_requi # Sign activity with the user's keys activity = sign_activity(activity, user.username) - # Save activity - activities = load_activities() - activities.append(activity) - save_activities(activities) + # Save activity to database + await db.create_activity(activity) - return {"asset": asset, "activity": activity} + return {"asset": created_asset, "activity": activity} # ============ Activities Endpoints ============ @@ -2011,7 +1782,7 @@ async def publish_cache(req: PublishCacheRequest, user: User = Depends(get_requi @app.get("/activities") async def get_activities(request: Request, page: int = 1, limit: int = 20): """Get activities. HTML for browsers (with infinite scroll), JSON for APIs (with pagination).""" - all_activities = load_activities() + all_activities = await load_activities() total = len(all_activities) # Reverse for newest first @@ -2052,7 +1823,7 @@ async def get_activities(request: Request, page: int = 1, limit: int = 20): {activity.get("published", "")[:10]} - View + View ''' @@ -2111,10 +1882,10 @@ async def get_activities(request: Request, page: int = 1, limit: int = 20): } -@app.get("/activity/{activity_index}") -async def get_activity(activity_index: int, request: Request): +@app.get("/activities/{activity_index}") +async def get_activity_detail(activity_index: int, request: Request): """Get single activity. HTML for browsers, JSON for APIs.""" - activities = load_activities() + activities = await load_activities() if activity_index < 0 or activity_index >= len(activities): if wants_html(request): @@ -2135,10 +1906,16 @@ async def get_activity(activity_index: int, request: Request): return activity +@app.get("/activity/{activity_index}") +async def get_activity_legacy(activity_index: int): + """Legacy route - redirect to /activities/{activity_index}.""" + return RedirectResponse(url=f"/activities/{activity_index}", status_code=301) + + @app.get("/objects/{content_hash}") async def get_object(content_hash: str, request: Request): """Get object by content hash. Content negotiation: HTML for browsers, JSON for APIs.""" - registry = load_registry() + registry = await load_registry() # Find asset by hash for name, asset in registry.get("assets", {}).items(): @@ -2149,7 +1926,7 @@ async def get_object(content_hash: str, request: Request): if wants_html: # Redirect to detail page for browsers - return RedirectResponse(url=f"/asset/{name}", status_code=303) + return RedirectResponse(url=f"/assets/{name}", status_code=303) owner = asset.get("owner", "unknown") return JSONResponse(