"""OAuth2 client blueprint for non-account apps. Each client app gets /auth/login, /auth/callback, /auth/logout. Account is the OAuth authorization server. """ from __future__ import annotations import secrets from datetime import datetime, timezone from quart import ( Blueprint, redirect, request, session as qsession, g, current_app, ) from sqlalchemy import select from shared.db.session import get_session from shared.models import User from shared.models.oauth_code import OAuthCode from shared.infrastructure.urls import account_url, app_url from shared.infrastructure.cart_identity import current_cart_identity from shared.events import emit_activity SESSION_USER_KEY = "uid" def create_oauth_blueprint(app_name: str) -> Blueprint: """Return an OAuth client blueprint for *app_name*.""" bp = Blueprint("oauth_auth", __name__, url_prefix="/auth") @bp.get("/login") @bp.get("/login/") async def login(): next_url = request.args.get("next", "/") state = secrets.token_urlsafe(32) qsession["oauth_state"] = state qsession["oauth_next"] = next_url redirect_uri = app_url(app_name, "/auth/callback") authorize_url = account_url( f"/auth/oauth/authorize?client_id={app_name}" f"&redirect_uri={redirect_uri}" f"&state={state}" ) return redirect(authorize_url) @bp.get("/callback") @bp.get("/callback/") async def callback(): code = request.args.get("code") state = request.args.get("state") expected_state = qsession.pop("oauth_state", None) next_url = qsession.pop("oauth_next", "/") if not code or not state or state != expected_state: current_app.logger.warning("OAuth callback: bad state or missing code") return redirect("/") expected_redirect = app_url(app_name, "/auth/callback") now = datetime.now(timezone.utc) async with get_session() as s: async with s.begin(): result = await s.execute( select(OAuthCode) .where(OAuthCode.code == code) .with_for_update() ) oauth_code = result.scalar_one_or_none() if not oauth_code: current_app.logger.warning("OAuth callback: code not found") return redirect("/") if oauth_code.used_at is not None: current_app.logger.warning("OAuth callback: code already used") return redirect("/") if oauth_code.expires_at < now: current_app.logger.warning("OAuth callback: code expired") return redirect("/") if oauth_code.client_id != app_name: current_app.logger.warning("OAuth callback: client_id mismatch") return redirect("/") if oauth_code.redirect_uri != expected_redirect: current_app.logger.warning("OAuth callback: redirect_uri mismatch") return redirect("/") oauth_code.used_at = now user_id = oauth_code.user_id # Set local session qsession[SESSION_USER_KEY] = user_id # Emit login activity for cart adoption ident = current_cart_identity() anon_session_id = ident.get("session_id") if anon_session_id: try: async with get_session() as s: async with s.begin(): await emit_activity( s, activity_type="rose:Login", actor_uri="internal:system", object_type="Person", object_data={ "user_id": user_id, "session_id": anon_session_id, }, ) except Exception: current_app.logger.exception("OAuth: failed to emit login activity") return redirect(next_url, 303) @bp.get("/clear") @bp.get("/clear/") async def clear(): """One-time migration helper: clear all session cookies.""" qsession.clear() resp = redirect("/") resp.delete_cookie("blog_session", domain=".rose-ash.com", path="/") resp.delete_cookie("sso_hint", domain=".rose-ash.com", path="/") return resp @bp.post("/logout") @bp.post("/logout/") async def logout(): qsession.pop(SESSION_USER_KEY, None) qsession.pop("cart_sid", None) qsession.pop("sso_checked", None) # Redirect through account to clear the SSO session too return redirect(account_url("/auth/sso-logout/")) return bp