""" Authentication routes for L2 server. Handles login, registration, logout, and token verification. """ import hashlib from datetime import datetime, timezone from fastapi import APIRouter, Request, Form, HTTPException, Depends from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from artdag_common import render from artdag_common.middleware import wants_html from ..config import settings from ..dependencies import get_templates, get_user_from_cookie router = APIRouter() security = HTTPBearer(auto_error=False) @router.get("/login", response_class=HTMLResponse) async def login_page(request: Request, return_to: str = None): """Login page.""" username = get_user_from_cookie(request) if username: templates = get_templates(request) return render(templates, "auth/already_logged_in.html", request, user={"username": username}, ) templates = get_templates(request) return render(templates, "auth/login.html", request, return_to=return_to, ) @router.post("/login", response_class=HTMLResponse) async def login_submit( request: Request, username: str = Form(...), password: str = Form(...), return_to: str = Form(None), ): """Handle login form submission.""" from auth import authenticate_user, create_access_token if not username or not password: return HTMLResponse( '
Username and password are required
' ) user = await authenticate_user(settings.data_dir, username.strip(), password) if not user: return HTMLResponse( '
Invalid username or password
' ) token = create_access_token(user.username, l2_server=f"https://{settings.domain}") # Handle return_to redirect if return_to and return_to.startswith("http"): separator = "&" if "?" in return_to else "?" redirect_url = f"{return_to}{separator}auth_token={token.access_token}" response = HTMLResponse(f'''
Login successful! Redirecting...
''') else: response = HTMLResponse('''
Login successful! Redirecting...
''') response.set_cookie( key="auth_token", value=token.access_token, httponly=True, max_age=60 * 60 * 24 * 30, samesite="lax", secure=True, ) return response @router.get("/register", response_class=HTMLResponse) async def register_page(request: Request): """Registration page.""" username = get_user_from_cookie(request) if username: templates = get_templates(request) return render(templates, "auth/already_logged_in.html", request, user={"username": username}, ) templates = get_templates(request) return render(templates, "auth/register.html", request) @router.post("/register", response_class=HTMLResponse) async def register_submit( request: Request, username: str = Form(...), password: str = Form(...), password2: str = Form(...), email: str = Form(None), ): """Handle registration form submission.""" from auth import create_user, create_access_token if not username or not password: return HTMLResponse('
Username and password are required
') if password != password2: return HTMLResponse('
Passwords do not match
') if len(password) < 6: return HTMLResponse('
Password must be at least 6 characters
') try: user = await create_user(settings.data_dir, username.strip(), password, email) except ValueError as e: return HTMLResponse(f'
{str(e)}
') token = create_access_token(user.username, l2_server=f"https://{settings.domain}") response = HTMLResponse('''
Registration successful! Redirecting...
''') response.set_cookie( key="auth_token", value=token.access_token, httponly=True, max_age=60 * 60 * 24 * 30, samesite="lax", secure=True, ) return response @router.get("/logout") async def logout(request: Request): """Handle logout.""" import db import requests from auth import get_token_claims token = request.cookies.get("auth_token") claims = get_token_claims(token) if token else None username = claims.get("sub") if claims else None if username and token and claims: # Revoke token in database token_hash = hashlib.sha256(token.encode()).hexdigest() expires_at = datetime.fromtimestamp(claims.get("exp", 0), tz=timezone.utc) await db.revoke_token(token_hash, username, expires_at) # Revoke on attached L1 servers attached = await db.get_user_renderers(username) for l1_url in attached: try: requests.post( f"{l1_url}/auth/revoke-user", json={"username": username, "l2_server": f"https://{settings.domain}"}, timeout=5, ) except Exception: pass response = RedirectResponse(url="/", status_code=302) response.delete_cookie("auth_token") return response @router.get("/verify") async def verify_token( request: Request, credentials: HTTPAuthorizationCredentials = Depends(security), ): """ Verify a token is valid. Called by L1 servers to verify tokens during auth callback. Returns user info if valid, 401 if not. """ import db from auth import verify_token as verify_jwt, get_token_claims # Get token from Authorization header or query param token = None if credentials: token = credentials.credentials else: # Try Authorization header manually (for clients that don't use Bearer format) auth_header = request.headers.get("Authorization", "") if auth_header.startswith("Bearer "): token = auth_header[7:] if not token: raise HTTPException(401, "No token provided") # Verify JWT signature and expiry username = verify_jwt(token) if not username: raise HTTPException(401, "Invalid or expired token") # Check if token is revoked claims = get_token_claims(token) if claims: token_hash = hashlib.sha256(token.encode()).hexdigest() if await db.is_token_revoked(token_hash): raise HTTPException(401, "Token has been revoked") return { "valid": True, "username": username, "claims": claims, }