"""OAuth2 client blueprint for non-account apps. Each client app gets /auth/login, /auth/callback, /auth/logout. Account is the OAuth authorization server. Device cookie ({app}_did) ties the browser to its auth state so client apps can detect login/logout without cross-domain cookies. """ from __future__ import annotations import secrets from datetime import datetime, timezone from quart import ( Blueprint, redirect, request, session as qsession, g, current_app, make_response, ) from sqlalchemy import select from shared.db.session import get_session from shared.models.oauth_code import OAuthCode from shared.infrastructure.urls import account_url, app_url from shared.infrastructure.cart_identity import current_cart_identity from shared.events import emit_activity SESSION_USER_KEY = "uid" GRANT_TOKEN_KEY = "grant_token" def create_oauth_blueprint(app_name: str) -> Blueprint: """Return an OAuth client blueprint for *app_name*.""" bp = Blueprint("oauth_auth", __name__, url_prefix="/auth") @bp.get("/login") @bp.get("/login/") async def login(): next_url = request.args.get("next", "/") prompt = request.args.get("prompt", "") state = secrets.token_urlsafe(32) qsession["oauth_state"] = state qsession["oauth_next"] = next_url device_id = g.device_id redirect_uri = app_url(app_name, "/auth/callback") params = ( f"?client_id={app_name}" f"&redirect_uri={redirect_uri}" f"&device_id={device_id}" f"&state={state}" ) if prompt: params += f"&prompt={prompt}" authorize_url = account_url(f"/auth/oauth/authorize{params}") return redirect(authorize_url) @bp.get("/callback") @bp.get("/callback/") async def callback(): # Adopt account's device id as our own — one identity across all apps account_did = request.args.get("account_did", "") if account_did: qsession["_account_did"] = account_did # Overwrite this app's device cookie with account's device id g.device_id = account_did g._new_device_id = True # factory after_request will set the cookie # Handle prompt=none error (user not logged in on account) error = request.args.get("error") if error == "login_required": next_url = qsession.pop("oauth_next", "/") qsession.pop("oauth_state", None) import time as _time qsession["_pnone_at"] = _time.time() device_id = g.device_id if device_id: from shared.browser.app.redis_cacher import get_redis _redis = get_redis() if _redis: await _redis.set( f"prompt:{app_name}:{device_id}", b"none", ex=300 ) return redirect(next_url) code = request.args.get("code") state = request.args.get("state") expected_state = qsession.pop("oauth_state", None) next_url = qsession.pop("oauth_next", "/") if not code or not state or state != expected_state: current_app.logger.warning("OAuth callback: bad state or missing code") return redirect("/") expected_redirect = app_url(app_name, "/auth/callback") now = datetime.now(timezone.utc) async with get_session() as s: async with s.begin(): result = await s.execute( select(OAuthCode) .where(OAuthCode.code == code) .with_for_update() ) oauth_code = result.scalar_one_or_none() if not oauth_code: current_app.logger.warning("OAuth callback: code not found") return redirect("/") if oauth_code.used_at is not None: current_app.logger.warning("OAuth callback: code already used") return redirect("/") if oauth_code.expires_at < now: current_app.logger.warning("OAuth callback: code expired") return redirect("/") if oauth_code.client_id != app_name: current_app.logger.warning("OAuth callback: client_id mismatch") return redirect("/") if oauth_code.redirect_uri != expected_redirect: current_app.logger.warning("OAuth callback: redirect_uri mismatch") return redirect("/") oauth_code.used_at = now user_id = oauth_code.user_id grant_token = oauth_code.grant_token # Set local session with grant token for revocation checking qsession[SESSION_USER_KEY] = user_id if grant_token: qsession[GRANT_TOKEN_KEY] = grant_token qsession.pop("_pnone_at", None) # Emit login activity for cart adoption ident = current_cart_identity() anon_session_id = ident.get("session_id") if anon_session_id: try: async with get_session() as s: async with s.begin(): await emit_activity( s, activity_type="rose:Login", actor_uri="internal:system", object_type="Person", object_data={ "user_id": user_id, "session_id": anon_session_id, }, ) except Exception: current_app.logger.exception("OAuth: failed to emit login activity") return redirect(next_url, 303) @bp.get("/clear") @bp.get("/clear/") async def clear(): """One-time migration helper: clear all session cookies.""" qsession.clear() resp = await make_response(redirect("/")) resp.delete_cookie("blog_session", domain=".rose-ash.com", path="/") resp.delete_cookie(f"{app_name}_did", path="/") return resp @bp.post("/logout") @bp.post("/logout/") async def logout(): qsession.pop(SESSION_USER_KEY, None) qsession.pop(GRANT_TOKEN_KEY, None) qsession.pop("cart_sid", None) qsession.pop("_pnone_at", None) qsession.pop("_account_did", None) # Redirect through account to revoke grants + clear account session return redirect(account_url("/auth/sso-logout/")) return bp