""" Authentication routes — OAuth2 authorization code flow via account.rose-ash.com. GET /auth/login — redirect to account OAuth authorize GET /auth/callback — exchange code for user info, set session cookie GET /auth/logout — clear cookie, redirect through account SSO logout """ import secrets import time import httpx from fastapi import APIRouter, Request from fastapi.responses import RedirectResponse from itsdangerous import URLSafeSerializer from artdag_common.middleware.auth import UserContext, set_auth_cookie, clear_auth_cookie from ..config import settings router = APIRouter() _signer = None def _get_signer() -> URLSafeSerializer: global _signer if _signer is None: _signer = URLSafeSerializer(settings.secret_key, salt="oauth-state") return _signer @router.get("/login") async def login(request: Request): """Store state + next in signed cookie, redirect to account OAuth authorize.""" next_url = request.query_params.get("next", "/") prompt = request.query_params.get("prompt", "") state = secrets.token_urlsafe(32) signer = _get_signer() state_payload = signer.dumps({"state": state, "next": next_url, "prompt": prompt}) authorize_url = ( f"{settings.oauth_authorize_url}" f"?client_id={settings.oauth_client_id}" f"&redirect_uri={settings.oauth_redirect_uri}" f"&state={state}" ) if prompt: authorize_url += f"&prompt={prompt}" response = RedirectResponse(url=authorize_url, status_code=302) response.set_cookie( key="oauth_state", value=state_payload, max_age=600, # 10 minutes httponly=True, samesite="lax", secure=True, ) return response @router.get("/callback") async def callback(request: Request): """Validate state, exchange code via token endpoint, set session cookie.""" code = request.query_params.get("code", "") state = request.query_params.get("state", "") error = request.query_params.get("error", "") # Recover state from signed cookie state_cookie = request.cookies.get("oauth_state", "") signer = _get_signer() try: payload = signer.loads(state_cookie) if state_cookie else {} except Exception: payload = {} next_url = payload.get("next", "/") was_silent = payload.get("prompt") == "none" # Handle prompt=none rejection (user not logged in on account) if error == "login_required": response = RedirectResponse(url=next_url, status_code=302) response.delete_cookie("oauth_state") # Set cooldown cookie — don't re-check for 5 minutes response.set_cookie( key="pnone_at", value=str(time.time()), max_age=300, httponly=True, samesite="lax", secure=True, ) return response # Normal callback — validate state + code if not state_cookie or not code or not state: return RedirectResponse(url="/", status_code=302) if payload.get("state") != state: return RedirectResponse(url="/", status_code=302) # Exchange code for user info via account's token endpoint async with httpx.AsyncClient(timeout=10) as client: try: resp = await client.post( settings.oauth_token_url, json={ "code": code, "client_id": settings.oauth_client_id, "redirect_uri": settings.oauth_redirect_uri, }, ) except httpx.HTTPError: return RedirectResponse(url="/", status_code=302) if resp.status_code != 200: return RedirectResponse(url="/", status_code=302) data = resp.json() if "error" in data: return RedirectResponse(url="/", status_code=302) # Map OAuth response to artdag UserContext display_name = data.get("display_name", "") username = data.get("username", "") actor_id = f"@{display_name}" if display_name else f"@{username}" user = UserContext(username=username, actor_id=actor_id) response = RedirectResponse(url=next_url, status_code=302) set_auth_cookie(response, user) response.delete_cookie("oauth_state") response.delete_cookie("pnone_at") return response @router.get("/logout") async def logout(): """Clear session cookie, redirect through account SSO logout.""" response = RedirectResponse(url=settings.oauth_logout_url, status_code=302) clear_auth_cookie(response) response.delete_cookie("oauth_state") response.delete_cookie("pnone_at") return response