Files
activity-pub/auth.py
gilesb a6e83c72bd Migrate to PostgreSQL database, consolidate routes, improve home page
- Add PostgreSQL with asyncpg for persistent storage
- Create db.py module with async database operations
- Create migrate.py script to migrate JSON data to PostgreSQL
- Update docker-compose.yml with PostgreSQL service
- Home page now shows README with styled headings
- Remove /ui prefix routes, use content negotiation on main routes
- Add /activities/{idx} as canonical route (with /activity redirect)
- Update /assets/{name} to support HTML and JSON responses
- Convert auth.py to use async database operations
- RSA keys still stored as files in $ARTDAG_DATA/keys/

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-08 00:22:21 +00:00

188 lines
4.9 KiB
Python

"""
Authentication for Art DAG L2 Server.
User registration, login, and JWT tokens.
"""
import os
import secrets
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Optional
import bcrypt
from jose import JWTError, jwt
from pydantic import BaseModel
import db
# JWT settings
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_DAYS = 30
def load_jwt_secret() -> str:
"""Load JWT secret from Docker secret, env var, or generate."""
# Try Docker secret first
secret_path = Path("/run/secrets/jwt_secret")
if secret_path.exists():
return secret_path.read_text().strip()
# Try environment variable
if os.environ.get("JWT_SECRET"):
return os.environ["JWT_SECRET"]
# Generate one (tokens won't persist across restarts!)
print("WARNING: No JWT_SECRET configured. Tokens will be invalidated on restart.")
return secrets.token_hex(32)
SECRET_KEY = load_jwt_secret()
class User(BaseModel):
"""A registered user."""
username: str
password_hash: str
created_at: str
email: Optional[str] = None
class UserCreate(BaseModel):
"""Request to register a user."""
username: str
password: str
email: Optional[str] = None
class UserLogin(BaseModel):
"""Request to login."""
username: str
password: str
class Token(BaseModel):
"""JWT token response."""
access_token: str
token_type: str = "bearer"
username: str
expires_at: str
# Keep DATA_DIR for keys (RSA keys still stored as files)
DATA_DIR = Path(os.environ.get("ARTDAG_DATA", str(Path.home() / ".artdag" / "l2")))
def hash_password(password: str) -> str:
"""Hash a password (truncate to 72 bytes for bcrypt)."""
# Truncate to 72 bytes (bcrypt limit)
pw_bytes = password.encode('utf-8')[:72]
return bcrypt.hashpw(pw_bytes, bcrypt.gensalt()).decode('utf-8')
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
pw_bytes = plain_password.encode('utf-8')[:72]
return bcrypt.checkpw(pw_bytes, hashed_password.encode('utf-8'))
async def create_user(data_dir: Path, username: str, password: str, email: Optional[str] = None) -> User:
"""Create a new user with ActivityPub keys."""
from keys import generate_keypair
if await db.user_exists(username):
raise ValueError(f"Username already exists: {username}")
password_hash = hash_password(password)
user_data = await db.create_user(username, password_hash, email)
# Generate ActivityPub keys for this user
generate_keypair(data_dir, username)
# Convert datetime to ISO string if needed
created_at = user_data.get("created_at")
if hasattr(created_at, 'isoformat'):
created_at = created_at.isoformat()
return User(
username=username,
password_hash=password_hash,
created_at=created_at,
email=email
)
async def authenticate_user(data_dir: Path, username: str, password: str) -> Optional[User]:
"""Authenticate a user by username and password."""
user_data = await db.get_user(username)
if not user_data:
return None
if not verify_password(password, user_data["password_hash"]):
return None
# Convert datetime to ISO string if needed
created_at = user_data.get("created_at")
if hasattr(created_at, 'isoformat'):
created_at = created_at.isoformat()
return User(
username=user_data["username"],
password_hash=user_data["password_hash"],
created_at=created_at,
email=user_data.get("email")
)
def create_access_token(username: str) -> Token:
"""Create a JWT access token."""
expires = datetime.now(timezone.utc) + timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS)
payload = {
"sub": username,
"exp": expires,
"iat": datetime.now(timezone.utc)
}
token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
return Token(
access_token=token,
username=username,
expires_at=expires.isoformat()
)
def verify_token(token: str) -> Optional[str]:
"""Verify a JWT token, return username if valid."""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
return username
except JWTError:
return None
async def get_current_user(data_dir: Path, token: str) -> Optional[User]:
"""Get current user from token."""
username = verify_token(token)
if not username:
return None
user_data = await db.get_user(username)
if not user_data:
return None
# Convert datetime to ISO string if needed
created_at = user_data.get("created_at")
if hasattr(created_at, 'isoformat'):
created_at = created_at.isoformat()
return User(
username=user_data["username"],
password_hash=user_data["password_hash"],
created_at=created_at,
email=user_data.get("email")
)