- config.py: OAuth settings replace l2_server/l2_domain - auth.py: full rewrite — login/callback/logout with itsdangerous signed state cookies and httpx token exchange - dependencies.py: remove l2_server assignment, fix redirect path - home.py: simplify /login to redirect to /auth/login - base.html: cross-app nav (Blog, Market, Account) + Rose Ash branding - requirements.txt: add itsdangerous Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
124 lines
3.8 KiB
Python
124 lines
3.8 KiB
Python
"""
|
|
Authentication routes — OAuth2 authorization code flow via account.rose-ash.com.
|
|
|
|
GET /auth/login — redirect to account OAuth authorize
|
|
GET /auth/callback — exchange code for user info, set session cookie
|
|
GET /auth/logout — clear cookie, redirect through account SSO logout
|
|
"""
|
|
|
|
import secrets
|
|
|
|
import httpx
|
|
from fastapi import APIRouter, Request
|
|
from fastapi.responses import RedirectResponse
|
|
from itsdangerous import URLSafeSerializer
|
|
|
|
from artdag_common.middleware.auth import UserContext, set_auth_cookie, clear_auth_cookie
|
|
|
|
from ..config import settings
|
|
|
|
router = APIRouter()
|
|
|
|
_signer = None
|
|
|
|
|
|
def _get_signer() -> URLSafeSerializer:
|
|
global _signer
|
|
if _signer is None:
|
|
_signer = URLSafeSerializer(settings.secret_key, salt="oauth-state")
|
|
return _signer
|
|
|
|
|
|
@router.get("/login")
|
|
async def login(request: Request):
|
|
"""Store state + next in signed cookie, redirect to account OAuth authorize."""
|
|
next_url = request.query_params.get("next", "/")
|
|
state = secrets.token_urlsafe(32)
|
|
|
|
signer = _get_signer()
|
|
state_payload = signer.dumps({"state": state, "next": next_url})
|
|
|
|
authorize_url = (
|
|
f"{settings.oauth_authorize_url}"
|
|
f"?client_id={settings.oauth_client_id}"
|
|
f"&redirect_uri={settings.oauth_redirect_uri}"
|
|
f"&state={state}"
|
|
)
|
|
|
|
response = RedirectResponse(url=authorize_url, status_code=302)
|
|
response.set_cookie(
|
|
key="oauth_state",
|
|
value=state_payload,
|
|
max_age=600, # 10 minutes
|
|
httponly=True,
|
|
samesite="lax",
|
|
secure=True,
|
|
)
|
|
return response
|
|
|
|
|
|
@router.get("/callback")
|
|
async def callback(request: Request):
|
|
"""Validate state, exchange code via token endpoint, set session cookie."""
|
|
code = request.query_params.get("code", "")
|
|
state = request.query_params.get("state", "")
|
|
|
|
# Recover and validate state from signed cookie
|
|
state_cookie = request.cookies.get("oauth_state", "")
|
|
if not state_cookie or not code or not state:
|
|
return RedirectResponse(url="/", status_code=302)
|
|
|
|
signer = _get_signer()
|
|
try:
|
|
payload = signer.loads(state_cookie)
|
|
except Exception:
|
|
return RedirectResponse(url="/", status_code=302)
|
|
|
|
if payload.get("state") != state:
|
|
return RedirectResponse(url="/", status_code=302)
|
|
|
|
next_url = payload.get("next", "/")
|
|
|
|
# Exchange code for user info via account's token endpoint
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
try:
|
|
resp = await client.post(
|
|
settings.oauth_token_url,
|
|
json={
|
|
"code": code,
|
|
"client_id": settings.oauth_client_id,
|
|
"redirect_uri": settings.oauth_redirect_uri,
|
|
},
|
|
)
|
|
except httpx.HTTPError:
|
|
return RedirectResponse(url="/", status_code=302)
|
|
|
|
if resp.status_code != 200:
|
|
return RedirectResponse(url="/", status_code=302)
|
|
|
|
data = resp.json()
|
|
if "error" in data:
|
|
return RedirectResponse(url="/", status_code=302)
|
|
|
|
# Map OAuth response to artdag UserContext
|
|
display_name = data.get("display_name", "")
|
|
username = data.get("username", "")
|
|
actor_id = f"@{display_name}" if display_name else f"@{username}"
|
|
|
|
user = UserContext(username=username, actor_id=actor_id)
|
|
|
|
response = RedirectResponse(url=next_url, status_code=302)
|
|
set_auth_cookie(response, user)
|
|
# Clear the temporary state cookie
|
|
response.delete_cookie("oauth_state")
|
|
return response
|
|
|
|
|
|
@router.get("/logout")
|
|
async def logout():
|
|
"""Clear session cookie, redirect through account SSO logout."""
|
|
response = RedirectResponse(url=settings.oauth_logout_url, status_code=302)
|
|
clear_auth_cookie(response)
|
|
response.delete_cookie("oauth_state")
|
|
return response
|