feat: add user registration and JWT authentication

- POST /auth/register - create account
- POST /auth/login - get JWT token
- GET /auth/me - get current user
- POST /auth/verify - verify token (for L1)
- Password hashing with bcrypt
- 30-day JWT tokens

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
gilesb
2026-01-07 14:43:14 +00:00
parent dec5266554
commit a2190801e8
3 changed files with 243 additions and 1 deletions

161
auth.py Normal file
View File

@@ -0,0 +1,161 @@
"""
Authentication for Art DAG L2 Server.
User registration, login, and JWT tokens.
"""
import json
import os
import secrets
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Optional
from passlib.context import CryptContext
from jose import JWTError, jwt
from pydantic import BaseModel
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# JWT settings
SECRET_KEY = os.environ.get("JWT_SECRET", secrets.token_hex(32))
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_DAYS = 30
class User(BaseModel):
"""A registered user."""
username: str
password_hash: str
created_at: str
email: Optional[str] = None
class UserCreate(BaseModel):
"""Request to register a user."""
username: str
password: str
email: Optional[str] = None
class UserLogin(BaseModel):
"""Request to login."""
username: str
password: str
class Token(BaseModel):
"""JWT token response."""
access_token: str
token_type: str = "bearer"
username: str
expires_at: str
def get_users_path(data_dir: Path) -> Path:
"""Get users file path."""
return data_dir / "users.json"
def load_users(data_dir: Path) -> dict[str, dict]:
"""Load users from disk."""
path = get_users_path(data_dir)
if path.exists():
with open(path) as f:
return json.load(f)
return {}
def save_users(data_dir: Path, users: dict[str, dict]):
"""Save users to disk."""
path = get_users_path(data_dir)
with open(path, "w") as f:
json.dump(users, f, indent=2)
def hash_password(password: str) -> str:
"""Hash a password."""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
return pwd_context.verify(plain_password, hashed_password)
def create_user(data_dir: Path, username: str, password: str, email: Optional[str] = None) -> User:
"""Create a new user."""
users = load_users(data_dir)
if username in users:
raise ValueError(f"Username already exists: {username}")
user = User(
username=username,
password_hash=hash_password(password),
created_at=datetime.now(timezone.utc).isoformat(),
email=email
)
users[username] = user.model_dump()
save_users(data_dir, users)
return user
def authenticate_user(data_dir: Path, username: str, password: str) -> Optional[User]:
"""Authenticate a user by username and password."""
users = load_users(data_dir)
if username not in users:
return None
user_data = users[username]
if not verify_password(password, user_data["password_hash"]):
return None
return User(**user_data)
def create_access_token(username: str) -> Token:
"""Create a JWT access token."""
expires = datetime.now(timezone.utc) + timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS)
payload = {
"sub": username,
"exp": expires,
"iat": datetime.now(timezone.utc)
}
token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
return Token(
access_token=token,
username=username,
expires_at=expires.isoformat()
)
def verify_token(token: str) -> Optional[str]:
"""Verify a JWT token, return username if valid."""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
return username
except JWTError:
return None
def get_current_user(data_dir: Path, token: str) -> Optional[User]:
"""Get current user from token."""
username = verify_token(token)
if not username:
return None
users = load_users(data_dir)
if username not in users:
return None
return User(**users[username])

View File

@@ -2,3 +2,5 @@ fastapi>=0.109.0
uvicorn>=0.27.0
requests>=2.31.0
cryptography>=42.0.0
passlib[bcrypt]>=1.7.4
python-jose[cryptography]>=3.3.0

View File

@@ -18,11 +18,18 @@ from pathlib import Path
from typing import Optional
from urllib.parse import urlparse
from fastapi import FastAPI, HTTPException, Request, Response
from fastapi import FastAPI, HTTPException, Request, Response, Depends
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
import requests
from auth import (
UserCreate, UserLogin, Token, User,
create_user, authenticate_user, create_access_token,
verify_token, get_current_user, load_users
)
# Configuration
DOMAIN = os.environ.get("ARTDAG_DOMAIN", "artdag.rose-ash.com")
USERNAME = os.environ.get("ARTDAG_USER", "giles")
@@ -184,6 +191,7 @@ async def root():
"""Server info."""
registry = load_registry()
activities = load_activities()
users = load_users(DATA_DIR)
return {
"name": "Art DAG L2 Server",
"version": "0.1.0",
@@ -191,10 +199,81 @@ async def root():
"user": USERNAME,
"assets_count": len(registry.get("assets", {})),
"activities_count": len(activities),
"users_count": len(users),
"l1_server": L1_SERVER
}
# ============ Auth Endpoints ============
security = HTTPBearer(auto_error=False)
async def get_optional_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> Optional[User]:
"""Get current user if authenticated, None otherwise."""
if not credentials:
return None
return get_current_user(DATA_DIR, credentials.credentials)
async def get_required_user(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> User:
"""Get current user, raise 401 if not authenticated."""
if not credentials:
raise HTTPException(401, "Not authenticated")
user = get_current_user(DATA_DIR, credentials.credentials)
if not user:
raise HTTPException(401, "Invalid token")
return user
@app.post("/auth/register", response_model=Token)
async def register(req: UserCreate):
"""Register a new user."""
try:
user = create_user(DATA_DIR, req.username, req.password, req.email)
except ValueError as e:
raise HTTPException(400, str(e))
return create_access_token(user.username)
@app.post("/auth/login", response_model=Token)
async def login(req: UserLogin):
"""Login and get access token."""
user = authenticate_user(DATA_DIR, req.username, req.password)
if not user:
raise HTTPException(401, "Invalid username or password")
return create_access_token(user.username)
@app.get("/auth/me")
async def get_me(user: User = Depends(get_required_user)):
"""Get current user info."""
return {
"username": user.username,
"email": user.email,
"created_at": user.created_at
}
@app.post("/auth/verify")
async def verify_auth(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify a token and return username. Used by L1 server."""
if not credentials:
raise HTTPException(401, "No token provided")
username = verify_token(credentials.credentials)
if not username:
raise HTTPException(401, "Invalid token")
return {"username": username, "valid": True}
@app.get("/.well-known/webfinger")
async def webfinger(resource: str):
"""WebFinger endpoint for actor discovery."""