Files
rose-ash/common/artdag_common/middleware/auth.py
2026-02-24 23:08:41 +00:00

277 lines
7.9 KiB
Python

"""
Authentication middleware and dependencies.
Provides common authentication patterns for L1 and L2 servers.
Each server can extend or customize these as needed.
"""
from dataclasses import dataclass
from typing import Callable, Optional, Awaitable, Any
import base64
import json
from fastapi import Request, HTTPException, Depends
from fastapi.responses import RedirectResponse
@dataclass
class UserContext:
"""User context extracted from authentication."""
username: str
actor_id: str # Full actor ID like "@user@server.com"
token: Optional[str] = None
l2_server: Optional[str] = None # L2 server URL for this user
email: Optional[str] = None # User's email address
@property
def display_name(self) -> str:
"""Get display name (username without @ prefix)."""
return self.username.lstrip("@")
def get_user_from_cookie(request: Request) -> Optional[UserContext]:
"""
Extract user context from session cookie.
Supports two cookie formats:
1. artdag_session: base64-encoded JSON {"username": "user", "actor_id": "@user@server.com"}
2. auth_token: raw JWT token (used by L1 servers)
Args:
request: FastAPI request
Returns:
UserContext if valid cookie found, None otherwise
"""
# Try artdag_session cookie first (base64-encoded JSON)
cookie = request.cookies.get("artdag_session")
if cookie:
try:
data = json.loads(base64.b64decode(cookie))
username = data.get("username", "")
actor_id = data.get("actor_id", "")
if not actor_id and username:
actor_id = f"@{username}"
return UserContext(
username=username,
actor_id=actor_id,
email=data.get("email", ""),
)
except (json.JSONDecodeError, ValueError, KeyError):
pass
# Try auth_token cookie (raw JWT, used by L1)
token = request.cookies.get("auth_token")
if token:
claims = decode_jwt_claims(token)
if claims:
username = claims.get("username") or claims.get("sub", "")
actor_id = claims.get("actor_id") or claims.get("actor")
if not actor_id and username:
actor_id = f"@{username}"
return UserContext(
username=username,
actor_id=actor_id or "",
token=token,
email=claims.get("email", ""),
)
return None
def get_user_from_header(request: Request) -> Optional[UserContext]:
"""
Extract user context from Authorization header.
Supports:
- Bearer <token> format (JWT or opaque token)
- Basic <base64(user:pass)> format
Args:
request: FastAPI request
Returns:
UserContext if valid header found, None otherwise
"""
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
# Attempt to decode JWT claims
claims = decode_jwt_claims(token)
if claims:
username = claims.get("username") or claims.get("sub", "")
actor_id = claims.get("actor_id") or claims.get("actor")
# Default actor_id to @username if not provided
if not actor_id and username:
actor_id = f"@{username}"
return UserContext(
username=username,
actor_id=actor_id or "",
token=token,
)
return None
def decode_jwt_claims(token: str) -> Optional[dict]:
"""
Decode JWT claims without verification.
This is useful for extracting user info from a token
when full verification is handled elsewhere.
Args:
token: JWT token string
Returns:
Claims dict if valid JWT format, None otherwise
"""
try:
parts = token.split(".")
if len(parts) != 3:
return None
# Decode payload (second part)
payload = parts[1]
# Add padding if needed
padding = 4 - len(payload) % 4
if padding != 4:
payload += "=" * padding
return json.loads(base64.urlsafe_b64decode(payload))
except (json.JSONDecodeError, ValueError):
return None
def create_auth_dependency(
token_validator: Optional[Callable[[str], Awaitable[Optional[dict]]]] = None,
allow_cookie: bool = True,
allow_header: bool = True,
):
"""
Create a customized auth dependency for a specific server.
Args:
token_validator: Optional async function to validate tokens with backend
allow_cookie: Whether to check cookies for auth
allow_header: Whether to check Authorization header
Returns:
FastAPI dependency function
"""
async def get_current_user(request: Request) -> Optional[UserContext]:
ctx = None
# Try header first (API clients)
if allow_header:
ctx = get_user_from_header(request)
if ctx and token_validator:
# Validate token with backend
validated = await token_validator(ctx.token)
if not validated:
ctx = None
# Fall back to cookie (browser)
if ctx is None and allow_cookie:
ctx = get_user_from_cookie(request)
return ctx
return get_current_user
async def require_auth(request: Request) -> UserContext:
"""
Dependency that requires authentication.
Raises HTTPException 401 if not authenticated.
Use with Depends() in route handlers.
Example:
@app.get("/protected")
async def protected_route(user: UserContext = Depends(require_auth)):
return {"user": user.username}
"""
# Try header first
ctx = get_user_from_header(request)
if ctx is None:
ctx = get_user_from_cookie(request)
if ctx is None:
# Check Accept header to determine response type
accept = request.headers.get("accept", "")
if "text/html" in accept:
raise HTTPException(
status_code=302,
headers={"Location": "/login"}
)
raise HTTPException(
status_code=401,
detail="Authentication required"
)
return ctx
def require_owner(resource_owner_field: str = "username"):
"""
Dependency factory that requires the user to own the resource.
Args:
resource_owner_field: Field name on the resource that contains owner username
Returns:
Dependency function
Example:
@app.delete("/items/{item_id}")
async def delete_item(
item: Item = Depends(get_item),
user: UserContext = Depends(require_owner("created_by"))
):
...
"""
async def check_ownership(
request: Request,
user: UserContext = Depends(require_auth),
) -> UserContext:
# The actual ownership check must be done in the route
# after fetching the resource
return user
return check_ownership
def set_auth_cookie(response: Any, user: UserContext, max_age: int = 86400 * 30) -> None:
"""
Set authentication cookie on response.
Args:
response: FastAPI response object
user: User context to store
max_age: Cookie max age in seconds (default 30 days)
"""
cookie_data = {
"username": user.username,
"actor_id": user.actor_id,
}
if user.email:
cookie_data["email"] = user.email
data = json.dumps(cookie_data)
cookie_value = base64.b64encode(data.encode()).decode()
response.set_cookie(
key="artdag_session",
value=cookie_value,
max_age=max_age,
httponly=True,
samesite="lax",
secure=True, # Require HTTPS in production
)
def clear_auth_cookie(response: Any) -> None:
"""Clear authentication cookie."""
response.delete_cookie(key="artdag_session")