""" Authentication for Art DAG L2 Server. User registration, login, and JWT tokens. """ import os import secrets from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Optional import bcrypt from jose import JWTError, jwt from pydantic import BaseModel import db # JWT settings ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_DAYS = 30 def load_jwt_secret() -> str: """Load JWT secret from Docker secret, env var, or generate.""" # Try Docker secret first secret_path = Path("/run/secrets/jwt_secret") if secret_path.exists(): return secret_path.read_text().strip() # Try environment variable if os.environ.get("JWT_SECRET"): return os.environ["JWT_SECRET"] # Generate one (tokens won't persist across restarts!) print("WARNING: No JWT_SECRET configured. Tokens will be invalidated on restart.") return secrets.token_hex(32) SECRET_KEY = load_jwt_secret() class User(BaseModel): """A registered user.""" username: str password_hash: str created_at: str email: Optional[str] = None class UserCreate(BaseModel): """Request to register a user.""" username: str password: str email: Optional[str] = None class UserLogin(BaseModel): """Request to login.""" username: str password: str class Token(BaseModel): """JWT token response.""" access_token: str token_type: str = "bearer" username: str expires_at: str # Keep DATA_DIR for keys (RSA keys still stored as files) DATA_DIR = Path(os.environ.get("ARTDAG_DATA", str(Path.home() / ".artdag" / "l2"))) def hash_password(password: str) -> str: """Hash a password (truncate to 72 bytes for bcrypt).""" # Truncate to 72 bytes (bcrypt limit) pw_bytes = password.encode('utf-8')[:72] return bcrypt.hashpw(pw_bytes, bcrypt.gensalt()).decode('utf-8') def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash.""" pw_bytes = plain_password.encode('utf-8')[:72] return bcrypt.checkpw(pw_bytes, hashed_password.encode('utf-8')) async def create_user(data_dir: Path, username: str, password: str, email: Optional[str] = None) -> User: """Create a new user with ActivityPub keys.""" from keys import generate_keypair if await db.user_exists(username): raise ValueError(f"Username already exists: {username}") password_hash = hash_password(password) user_data = await db.create_user(username, password_hash, email) # Generate ActivityPub keys for this user generate_keypair(data_dir, username) # Convert datetime to ISO string if needed created_at = user_data.get("created_at") if hasattr(created_at, 'isoformat'): created_at = created_at.isoformat() return User( username=username, password_hash=password_hash, created_at=created_at, email=email ) async def authenticate_user(data_dir: Path, username: str, password: str) -> Optional[User]: """Authenticate a user by username and password.""" user_data = await db.get_user(username) if not user_data: return None if not verify_password(password, user_data["password_hash"]): return None # Convert datetime to ISO string if needed created_at = user_data.get("created_at") if hasattr(created_at, 'isoformat'): created_at = created_at.isoformat() return User( username=user_data["username"], password_hash=user_data["password_hash"], created_at=created_at, email=user_data.get("email") ) def create_access_token(username: str, l2_server: str = None, l1_server: str = None) -> Token: """Create a JWT access token. Args: username: The username l2_server: The L2 server URL (e.g., https://artdag.rose-ash.com) Required for L1 to verify tokens with the correct L2. l1_server: Optional L1 server URL to scope the token to. If set, token only works for this specific L1. """ expires = datetime.now(timezone.utc) + timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS) payload = { "sub": username, "username": username, # Also include as username for compatibility "exp": expires, "iat": datetime.now(timezone.utc) } # Include l2_server so L1 knows which L2 to verify with if l2_server: payload["l2_server"] = l2_server # Include l1_server to scope token to specific L1 if l1_server: payload["l1_server"] = l1_server token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) return Token( access_token=token, username=username, expires_at=expires.isoformat() ) def verify_token(token: str) -> Optional[str]: """Verify a JWT token, return username if valid.""" try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username = payload.get("sub") return username except JWTError: return None def get_token_claims(token: str) -> Optional[dict]: """Decode token and return all claims. Returns None if invalid.""" try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload except JWTError: return None async def get_current_user(data_dir: Path, token: str) -> Optional[User]: """Get current user from token.""" username = verify_token(token) if not username: return None user_data = await db.get_user(username) if not user_data: return None # Convert datetime to ISO string if needed created_at = user_data.get("created_at") if hasattr(created_at, 'isoformat'): created_at = created_at.isoformat() return User( username=user_data["username"], password_hash=user_data["password_hash"], created_at=created_at, email=user_data.get("email") )