Files
activity-pub/auth.py
gilesb 4155427f03 feat: multi-actor ActivityPub support
Each registered user now has their own ActivityPub actor:
- Generate RSA keys per user on registration
- Webfinger resolves any registered user (@user@domain)
- Actor endpoints work for any registered user
- Each user has their own outbox (filtered activities)
- Activities signed with the publishing user's keys
- Objects attributed to the asset owner

Removed:
- ARTDAG_USER config (no longer single-actor)
- L1_SERVER config (comes with each request)

Added:
- /ui/users page listing all registered users
- user_exists() helper function

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-07 19:54:11 +00:00

184 lines
4.5 KiB
Python

"""
Authentication for Art DAG L2 Server.
User registration, login, and JWT tokens.
"""
import json
import os
import secrets
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Optional
import bcrypt
from jose import JWTError, jwt
from pydantic import BaseModel
# JWT settings
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_DAYS = 30
def load_jwt_secret() -> str:
"""Load JWT secret from Docker secret, env var, or generate."""
# Try Docker secret first
secret_path = Path("/run/secrets/jwt_secret")
if secret_path.exists():
return secret_path.read_text().strip()
# Try environment variable
if os.environ.get("JWT_SECRET"):
return os.environ["JWT_SECRET"]
# Generate one (tokens won't persist across restarts!)
print("WARNING: No JWT_SECRET configured. Tokens will be invalidated on restart.")
return secrets.token_hex(32)
SECRET_KEY = load_jwt_secret()
class User(BaseModel):
"""A registered user."""
username: str
password_hash: str
created_at: str
email: Optional[str] = None
class UserCreate(BaseModel):
"""Request to register a user."""
username: str
password: str
email: Optional[str] = None
class UserLogin(BaseModel):
"""Request to login."""
username: str
password: str
class Token(BaseModel):
"""JWT token response."""
access_token: str
token_type: str = "bearer"
username: str
expires_at: str
def get_users_path(data_dir: Path) -> Path:
"""Get users file path."""
return data_dir / "users.json"
def load_users(data_dir: Path) -> dict[str, dict]:
"""Load users from disk."""
path = get_users_path(data_dir)
if path.exists():
with open(path) as f:
return json.load(f)
return {}
def save_users(data_dir: Path, users: dict[str, dict]):
"""Save users to disk."""
path = get_users_path(data_dir)
with open(path, "w") as f:
json.dump(users, f, indent=2)
def hash_password(password: str) -> str:
"""Hash a password (truncate to 72 bytes for bcrypt)."""
# Truncate to 72 bytes (bcrypt limit)
pw_bytes = password.encode('utf-8')[:72]
return bcrypt.hashpw(pw_bytes, bcrypt.gensalt()).decode('utf-8')
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
pw_bytes = plain_password.encode('utf-8')[:72]
return bcrypt.checkpw(pw_bytes, hashed_password.encode('utf-8'))
def create_user(data_dir: Path, username: str, password: str, email: Optional[str] = None) -> User:
"""Create a new user with ActivityPub keys."""
from keys import generate_keypair
users = load_users(data_dir)
if username in users:
raise ValueError(f"Username already exists: {username}")
user = User(
username=username,
password_hash=hash_password(password),
created_at=datetime.now(timezone.utc).isoformat(),
email=email
)
users[username] = user.model_dump()
save_users(data_dir, users)
# Generate ActivityPub keys for this user
generate_keypair(data_dir, username)
return user
def authenticate_user(data_dir: Path, username: str, password: str) -> Optional[User]:
"""Authenticate a user by username and password."""
users = load_users(data_dir)
if username not in users:
return None
user_data = users[username]
if not verify_password(password, user_data["password_hash"]):
return None
return User(**user_data)
def create_access_token(username: str) -> Token:
"""Create a JWT access token."""
expires = datetime.now(timezone.utc) + timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS)
payload = {
"sub": username,
"exp": expires,
"iat": datetime.now(timezone.utc)
}
token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
return Token(
access_token=token,
username=username,
expires_at=expires.isoformat()
)
def verify_token(token: str) -> Optional[str]:
"""Verify a JWT token, return username if valid."""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
return username
except JWTError:
return None
def get_current_user(data_dir: Path, token: str) -> Optional[User]:
"""Get current user from token."""
username = verify_token(token)
if not username:
return None
users = load_users(data_dir)
if username not in users:
return None
return User(**users[username])