""" Authentication for Art DAG L2 Server. User registration, login, and JWT tokens. """ import json import os import secrets from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Optional import bcrypt from jose import JWTError, jwt from pydantic import BaseModel # JWT settings ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_DAYS = 30 def load_jwt_secret() -> str: """Load JWT secret from Docker secret, env var, or generate.""" # Try Docker secret first secret_path = Path("/run/secrets/jwt_secret") if secret_path.exists(): return secret_path.read_text().strip() # Try environment variable if os.environ.get("JWT_SECRET"): return os.environ["JWT_SECRET"] # Generate one (tokens won't persist across restarts!) print("WARNING: No JWT_SECRET configured. Tokens will be invalidated on restart.") return secrets.token_hex(32) SECRET_KEY = load_jwt_secret() class User(BaseModel): """A registered user.""" username: str password_hash: str created_at: str email: Optional[str] = None class UserCreate(BaseModel): """Request to register a user.""" username: str password: str email: Optional[str] = None class UserLogin(BaseModel): """Request to login.""" username: str password: str class Token(BaseModel): """JWT token response.""" access_token: str token_type: str = "bearer" username: str expires_at: str def get_users_path(data_dir: Path) -> Path: """Get users file path.""" return data_dir / "users.json" def load_users(data_dir: Path) -> dict[str, dict]: """Load users from disk.""" path = get_users_path(data_dir) if path.exists(): with open(path) as f: return json.load(f) return {} def save_users(data_dir: Path, users: dict[str, dict]): """Save users to disk.""" path = get_users_path(data_dir) with open(path, "w") as f: json.dump(users, f, indent=2) def hash_password(password: str) -> str: """Hash a password (truncate to 72 bytes for bcrypt).""" # Truncate to 72 bytes (bcrypt limit) pw_bytes = password.encode('utf-8')[:72] return bcrypt.hashpw(pw_bytes, bcrypt.gensalt()).decode('utf-8') def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash.""" pw_bytes = plain_password.encode('utf-8')[:72] return bcrypt.checkpw(pw_bytes, hashed_password.encode('utf-8')) def create_user(data_dir: Path, username: str, password: str, email: Optional[str] = None) -> User: """Create a new user.""" users = load_users(data_dir) if username in users: raise ValueError(f"Username already exists: {username}") user = User( username=username, password_hash=hash_password(password), created_at=datetime.now(timezone.utc).isoformat(), email=email ) users[username] = user.model_dump() save_users(data_dir, users) return user def authenticate_user(data_dir: Path, username: str, password: str) -> Optional[User]: """Authenticate a user by username and password.""" users = load_users(data_dir) if username not in users: return None user_data = users[username] if not verify_password(password, user_data["password_hash"]): return None return User(**user_data) def create_access_token(username: str) -> Token: """Create a JWT access token.""" expires = datetime.now(timezone.utc) + timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS) payload = { "sub": username, "exp": expires, "iat": datetime.now(timezone.utc) } token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) return Token( access_token=token, username=username, expires_at=expires.isoformat() ) def verify_token(token: str) -> Optional[str]: """Verify a JWT token, return username if valid.""" try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username = payload.get("sub") return username except JWTError: return None def get_current_user(data_dir: Path, token: str) -> Optional[User]: """Get current user from token.""" username = verify_token(token) if not username: return None users = load_users(data_dir) if username not in users: return None return User(**users[username])