#!/usr/bin/env python3 """ Migration script: JSON files to PostgreSQL. Usage: python migrate.py [--dry-run] Migrates: - users.json -> users table - registry.json -> assets table - activities.json -> activities table - followers.json -> followers table Does NOT migrate: - keys/ directory (stays as files) """ import asyncio import json import os import sys from pathlib import Path from datetime import datetime, timezone from uuid import UUID import asyncpg # Configuration DATA_DIR = Path(os.environ.get("ARTDAG_DATA", str(Path.home() / ".artdag" / "l2"))) DATABASE_URL = os.environ.get( "DATABASE_URL", "postgresql://artdag:artdag@localhost:5432/artdag" ) SCHEMA = """ -- Drop existing tables (careful in production!) DROP TABLE IF EXISTS followers CASCADE; DROP TABLE IF EXISTS activities CASCADE; DROP TABLE IF EXISTS assets CASCADE; DROP TABLE IF EXISTS users CASCADE; -- Users table CREATE TABLE users ( username VARCHAR(255) PRIMARY KEY, password_hash VARCHAR(255) NOT NULL, email VARCHAR(255), created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); -- Assets table CREATE TABLE assets ( name VARCHAR(255) PRIMARY KEY, content_hash VARCHAR(128) NOT NULL, asset_type VARCHAR(50) NOT NULL, tags JSONB DEFAULT '[]'::jsonb, metadata JSONB DEFAULT '{}'::jsonb, url TEXT, provenance JSONB, description TEXT, origin JSONB, owner VARCHAR(255) NOT NULL REFERENCES users(username), created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ ); -- Activities table CREATE TABLE activities ( activity_id UUID PRIMARY KEY, activity_type VARCHAR(50) NOT NULL, actor_id TEXT NOT NULL, object_data JSONB NOT NULL, published TIMESTAMPTZ NOT NULL, signature JSONB ); -- Followers table CREATE TABLE followers ( id SERIAL PRIMARY KEY, username VARCHAR(255) NOT NULL REFERENCES users(username), acct VARCHAR(255) NOT NULL, url TEXT NOT NULL, public_key TEXT, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), UNIQUE(username, acct) ); -- Indexes CREATE INDEX idx_users_created_at ON users(created_at); CREATE INDEX idx_assets_content_hash ON assets(content_hash); CREATE INDEX idx_assets_owner ON assets(owner); CREATE INDEX idx_assets_created_at ON assets(created_at DESC); CREATE INDEX idx_assets_tags ON assets USING GIN(tags); CREATE INDEX idx_activities_actor_id ON activities(actor_id); CREATE INDEX idx_activities_published ON activities(published DESC); CREATE INDEX idx_followers_username ON followers(username); """ async def migrate(dry_run: bool = False): """Run the migration.""" print(f"Migrating from {DATA_DIR} to PostgreSQL") print(f"Database: {DATABASE_URL}") print(f"Dry run: {dry_run}") print() # Load JSON files users = load_json(DATA_DIR / "users.json") or {} registry = load_json(DATA_DIR / "registry.json") or {"assets": {}} activities_data = load_json(DATA_DIR / "activities.json") or {"activities": []} followers = load_json(DATA_DIR / "followers.json") or [] assets = registry.get("assets", {}) activities = activities_data.get("activities", []) print(f"Found {len(users)} users") print(f"Found {len(assets)} assets") print(f"Found {len(activities)} activities") print(f"Found {len(followers)} followers") print() if dry_run: print("DRY RUN - no changes made") return # Connect and migrate conn = await asyncpg.connect(DATABASE_URL) try: # Create schema print("Creating schema...") await conn.execute(SCHEMA) # Migrate users print("Migrating users...") for username, user_data in users.items(): await conn.execute( """INSERT INTO users (username, password_hash, email, created_at) VALUES ($1, $2, $3, $4)""", username, user_data["password_hash"], user_data.get("email"), parse_timestamp(user_data.get("created_at")) ) print(f" Migrated {len(users)} users") # Migrate assets print("Migrating assets...") for name, asset in assets.items(): await conn.execute( """INSERT INTO assets (name, content_hash, asset_type, tags, metadata, url, provenance, description, origin, owner, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)""", name, asset["content_hash"], asset["asset_type"], json.dumps(asset.get("tags", [])), json.dumps(asset.get("metadata", {})), asset.get("url"), json.dumps(asset.get("provenance")) if asset.get("provenance") else None, asset.get("description"), json.dumps(asset.get("origin")) if asset.get("origin") else None, asset["owner"], parse_timestamp(asset.get("created_at")), parse_timestamp(asset.get("updated_at")) ) print(f" Migrated {len(assets)} assets") # Migrate activities print("Migrating activities...") for activity in activities: await conn.execute( """INSERT INTO activities (activity_id, activity_type, actor_id, object_data, published, signature) VALUES ($1, $2, $3, $4, $5, $6)""", UUID(activity["activity_id"]), activity["activity_type"], activity["actor_id"], json.dumps(activity["object_data"]), parse_timestamp(activity["published"]), json.dumps(activity.get("signature")) if activity.get("signature") else None ) print(f" Migrated {len(activities)} activities") # Migrate followers print("Migrating followers...") if followers and users: first_user = list(users.keys())[0] migrated = 0 for follower in followers: if isinstance(follower, str): # Old format: just URL string await conn.execute( """INSERT INTO followers (username, acct, url) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING""", first_user, follower, follower ) migrated += 1 elif isinstance(follower, dict): await conn.execute( """INSERT INTO followers (username, acct, url, public_key) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING""", follower.get("username", first_user), follower.get("acct", follower.get("url", "")), follower["url"], follower.get("public_key") ) migrated += 1 print(f" Migrated {migrated} followers") else: print(" No followers to migrate") print() print("Migration complete!") finally: await conn.close() def load_json(path: Path) -> dict | list | None: """Load JSON file if it exists.""" if path.exists(): with open(path) as f: return json.load(f) return None def parse_timestamp(ts: str | None) -> datetime | None: """Parse ISO timestamp string to datetime.""" if not ts: return datetime.now(timezone.utc) try: # Handle various ISO formats if ts.endswith('Z'): ts = ts[:-1] + '+00:00' return datetime.fromisoformat(ts) except Exception: return datetime.now(timezone.utc) if __name__ == "__main__": dry_run = "--dry-run" in sys.argv asyncio.run(migrate(dry_run))