Allows L1 servers to track which L2 server the user authenticated with. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
271 lines
7.6 KiB
Python
271 lines
7.6 KiB
Python
"""
|
|
Authentication middleware and dependencies.
|
|
|
|
Provides common authentication patterns for L1 and L2 servers.
|
|
Each server can extend or customize these as needed.
|
|
"""
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Callable, Optional, Awaitable, Any
|
|
import base64
|
|
import json
|
|
|
|
from fastapi import Request, HTTPException, Depends
|
|
from fastapi.responses import RedirectResponse
|
|
|
|
|
|
@dataclass
|
|
class UserContext:
|
|
"""User context extracted from authentication."""
|
|
username: str
|
|
actor_id: str # Full actor ID like "@user@server.com"
|
|
token: Optional[str] = None
|
|
l2_server: Optional[str] = None # L2 server URL for this user
|
|
|
|
@property
|
|
def display_name(self) -> str:
|
|
"""Get display name (username without @ prefix)."""
|
|
return self.username.lstrip("@")
|
|
|
|
|
|
def get_user_from_cookie(request: Request) -> Optional[UserContext]:
|
|
"""
|
|
Extract user context from session cookie.
|
|
|
|
Supports two cookie formats:
|
|
1. artdag_session: base64-encoded JSON {"username": "user", "actor_id": "@user@server.com"}
|
|
2. auth_token: raw JWT token (used by L1 servers)
|
|
|
|
Args:
|
|
request: FastAPI request
|
|
|
|
Returns:
|
|
UserContext if valid cookie found, None otherwise
|
|
"""
|
|
# Try artdag_session cookie first (base64-encoded JSON)
|
|
cookie = request.cookies.get("artdag_session")
|
|
if cookie:
|
|
try:
|
|
data = json.loads(base64.b64decode(cookie))
|
|
username = data.get("username", "")
|
|
actor_id = data.get("actor_id", "")
|
|
if not actor_id and username:
|
|
actor_id = f"@{username}"
|
|
return UserContext(
|
|
username=username,
|
|
actor_id=actor_id,
|
|
)
|
|
except (json.JSONDecodeError, ValueError, KeyError):
|
|
pass
|
|
|
|
# Try auth_token cookie (raw JWT, used by L1)
|
|
token = request.cookies.get("auth_token")
|
|
if token:
|
|
claims = decode_jwt_claims(token)
|
|
if claims:
|
|
username = claims.get("username") or claims.get("sub", "")
|
|
actor_id = claims.get("actor_id") or claims.get("actor")
|
|
if not actor_id and username:
|
|
actor_id = f"@{username}"
|
|
return UserContext(
|
|
username=username,
|
|
actor_id=actor_id or "",
|
|
token=token,
|
|
)
|
|
|
|
return None
|
|
|
|
|
|
def get_user_from_header(request: Request) -> Optional[UserContext]:
|
|
"""
|
|
Extract user context from Authorization header.
|
|
|
|
Supports:
|
|
- Bearer <token> format (JWT or opaque token)
|
|
- Basic <base64(user:pass)> format
|
|
|
|
Args:
|
|
request: FastAPI request
|
|
|
|
Returns:
|
|
UserContext if valid header found, None otherwise
|
|
"""
|
|
auth_header = request.headers.get("Authorization", "")
|
|
|
|
if auth_header.startswith("Bearer "):
|
|
token = auth_header[7:]
|
|
# Attempt to decode JWT claims
|
|
claims = decode_jwt_claims(token)
|
|
if claims:
|
|
username = claims.get("username") or claims.get("sub", "")
|
|
actor_id = claims.get("actor_id") or claims.get("actor")
|
|
# Default actor_id to @username if not provided
|
|
if not actor_id and username:
|
|
actor_id = f"@{username}"
|
|
return UserContext(
|
|
username=username,
|
|
actor_id=actor_id or "",
|
|
token=token,
|
|
)
|
|
|
|
return None
|
|
|
|
|
|
def decode_jwt_claims(token: str) -> Optional[dict]:
|
|
"""
|
|
Decode JWT claims without verification.
|
|
|
|
This is useful for extracting user info from a token
|
|
when full verification is handled elsewhere.
|
|
|
|
Args:
|
|
token: JWT token string
|
|
|
|
Returns:
|
|
Claims dict if valid JWT format, None otherwise
|
|
"""
|
|
try:
|
|
parts = token.split(".")
|
|
if len(parts) != 3:
|
|
return None
|
|
|
|
# Decode payload (second part)
|
|
payload = parts[1]
|
|
# Add padding if needed
|
|
padding = 4 - len(payload) % 4
|
|
if padding != 4:
|
|
payload += "=" * padding
|
|
|
|
return json.loads(base64.urlsafe_b64decode(payload))
|
|
except (json.JSONDecodeError, ValueError):
|
|
return None
|
|
|
|
|
|
def create_auth_dependency(
|
|
token_validator: Optional[Callable[[str], Awaitable[Optional[dict]]]] = None,
|
|
allow_cookie: bool = True,
|
|
allow_header: bool = True,
|
|
):
|
|
"""
|
|
Create a customized auth dependency for a specific server.
|
|
|
|
Args:
|
|
token_validator: Optional async function to validate tokens with backend
|
|
allow_cookie: Whether to check cookies for auth
|
|
allow_header: Whether to check Authorization header
|
|
|
|
Returns:
|
|
FastAPI dependency function
|
|
"""
|
|
async def get_current_user(request: Request) -> Optional[UserContext]:
|
|
ctx = None
|
|
|
|
# Try header first (API clients)
|
|
if allow_header:
|
|
ctx = get_user_from_header(request)
|
|
if ctx and token_validator:
|
|
# Validate token with backend
|
|
validated = await token_validator(ctx.token)
|
|
if not validated:
|
|
ctx = None
|
|
|
|
# Fall back to cookie (browser)
|
|
if ctx is None and allow_cookie:
|
|
ctx = get_user_from_cookie(request)
|
|
|
|
return ctx
|
|
|
|
return get_current_user
|
|
|
|
|
|
async def require_auth(request: Request) -> UserContext:
|
|
"""
|
|
Dependency that requires authentication.
|
|
|
|
Raises HTTPException 401 if not authenticated.
|
|
Use with Depends() in route handlers.
|
|
|
|
Example:
|
|
@app.get("/protected")
|
|
async def protected_route(user: UserContext = Depends(require_auth)):
|
|
return {"user": user.username}
|
|
"""
|
|
# Try header first
|
|
ctx = get_user_from_header(request)
|
|
if ctx is None:
|
|
ctx = get_user_from_cookie(request)
|
|
|
|
if ctx is None:
|
|
# Check Accept header to determine response type
|
|
accept = request.headers.get("accept", "")
|
|
if "text/html" in accept:
|
|
raise HTTPException(
|
|
status_code=302,
|
|
headers={"Location": "/login"}
|
|
)
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Authentication required"
|
|
)
|
|
|
|
return ctx
|
|
|
|
|
|
def require_owner(resource_owner_field: str = "username"):
|
|
"""
|
|
Dependency factory that requires the user to own the resource.
|
|
|
|
Args:
|
|
resource_owner_field: Field name on the resource that contains owner username
|
|
|
|
Returns:
|
|
Dependency function
|
|
|
|
Example:
|
|
@app.delete("/items/{item_id}")
|
|
async def delete_item(
|
|
item: Item = Depends(get_item),
|
|
user: UserContext = Depends(require_owner("created_by"))
|
|
):
|
|
...
|
|
"""
|
|
async def check_ownership(
|
|
request: Request,
|
|
user: UserContext = Depends(require_auth),
|
|
) -> UserContext:
|
|
# The actual ownership check must be done in the route
|
|
# after fetching the resource
|
|
return user
|
|
|
|
return check_ownership
|
|
|
|
|
|
def set_auth_cookie(response: Any, user: UserContext, max_age: int = 86400 * 30) -> None:
|
|
"""
|
|
Set authentication cookie on response.
|
|
|
|
Args:
|
|
response: FastAPI response object
|
|
user: User context to store
|
|
max_age: Cookie max age in seconds (default 30 days)
|
|
"""
|
|
data = json.dumps({
|
|
"username": user.username,
|
|
"actor_id": user.actor_id,
|
|
})
|
|
cookie_value = base64.b64encode(data.encode()).decode()
|
|
|
|
response.set_cookie(
|
|
key="artdag_session",
|
|
value=cookie_value,
|
|
max_age=max_age,
|
|
httponly=True,
|
|
samesite="lax",
|
|
secure=True, # Require HTTPS in production
|
|
)
|
|
|
|
|
|
def clear_auth_cookie(response: Any) -> None:
|
|
"""Clear authentication cookie."""
|
|
response.delete_cookie(key="artdag_session")
|