"""Authentication routes for the account app. Account is the OAuth authorization server. Owns magic link login/logout, OAuth2 authorize endpoint, and SSO logout. """ from __future__ import annotations import secrets from datetime import datetime, timezone, timedelta from quart import ( Blueprint, request, render_template, redirect, url_for, session as qsession, g, current_app, ) from sqlalchemy import select from sqlalchemy.exc import SQLAlchemyError from shared.db.session import get_session from shared.models import User from shared.models.oauth_code import OAuthCode from shared.infrastructure.urls import account_url, app_url from shared.infrastructure.cart_identity import current_cart_identity from shared.events import emit_activity from .services import ( pop_login_redirect_target, store_login_redirect_target, send_magic_email, find_or_create_user, create_magic_link, validate_magic_link, validate_email, ) SESSION_USER_KEY = "uid" ALLOWED_CLIENTS = {"blog", "market", "cart", "events", "federation"} def register(url_prefix="/auth"): auth_bp = Blueprint("auth", __name__, url_prefix=url_prefix) # --- OAuth2 authorize endpoint ------------------------------------------- @auth_bp.get("/oauth/authorize") @auth_bp.get("/oauth/authorize/") async def oauth_authorize(): client_id = request.args.get("client_id", "") redirect_uri = request.args.get("redirect_uri", "") state = request.args.get("state", "") if client_id not in ALLOWED_CLIENTS: return "Invalid client_id", 400 expected_redirect = app_url(client_id, "/auth/callback") if redirect_uri != expected_redirect: return "Invalid redirect_uri", 400 # Not logged in — bounce to magic link login, then back here if not g.get("user"): # Preserve the full authorize URL so we return here after login authorize_path = request.full_path # includes query string store_login_redirect_target() return redirect(url_for("auth.login_form", next=authorize_path)) # Logged in — issue authorization code code = secrets.token_urlsafe(48) now = datetime.now(timezone.utc) expires = now + timedelta(minutes=5) async with get_session() as s: async with s.begin(): oauth_code = OAuthCode( code=code, user_id=g.user.id, client_id=client_id, redirect_uri=redirect_uri, expires_at=expires, ) s.add(oauth_code) sep = "&" if "?" in redirect_uri else "?" return redirect(f"{redirect_uri}{sep}code={code}&state={state}") # --- Magic link login flow ----------------------------------------------- @auth_bp.get("/login/") async def login_form(): store_login_redirect_target() cross_cart_sid = request.args.get("cart_sid") if cross_cart_sid: qsession["cart_sid"] = cross_cart_sid if g.get("user"): # If there's a pending redirect (e.g. OAuth authorize), follow it redirect_url = pop_login_redirect_target() return redirect(redirect_url) return await render_template("auth/login.html") @auth_bp.post("/start/") async def start_login(): form = await request.form email_input = form.get("email") or "" is_valid, email = validate_email(email_input) if not is_valid: return ( await render_template( "auth/login.html", error="Please enter a valid email address.", email=email_input, ), 400, ) user = await find_or_create_user(g.s, email) token, expires = await create_magic_link(g.s, user.id) from shared.utils import host_url magic_url = host_url(url_for("auth.magic", token=token)) email_error = None try: await send_magic_email(email, magic_url) except Exception as e: current_app.logger.error("EMAIL SEND FAILED: %r", e) email_error = ( "We couldn't send the email automatically. " "Please try again in a moment." ) return await render_template( "auth/check_email.html", email=email, email_error=email_error, ) @auth_bp.get("/magic//") async def magic(token: str): now = datetime.now(timezone.utc) user_id: int | None = None try: async with get_session() as s: async with s.begin(): user, error = await validate_magic_link(s, token) if error: return ( await render_template("auth/login.html", error=error), 400, ) user_id = user.id except Exception: return ( await render_template( "auth/login.html", error="Could not sign you in right now. Please try again.", ), 502, ) assert user_id is not None ident = current_cart_identity() anon_session_id = ident.get("session_id") try: async with get_session() as s: async with s.begin(): u2 = await s.get(User, user_id) if u2: u2.last_login_at = now if anon_session_id: await emit_activity( s, activity_type="rose:Login", actor_uri="internal:system", object_type="Person", object_data={ "user_id": user_id, "session_id": anon_session_id, }, ) except SQLAlchemyError: current_app.logger.exception( "[auth] non-fatal DB update for user_id=%s", user_id ) qsession[SESSION_USER_KEY] = user_id redirect_url = pop_login_redirect_target() return redirect(redirect_url, 303) @auth_bp.post("/logout/") async def logout(): qsession.pop(SESSION_USER_KEY, None) return redirect(url_for("auth.sso_logout")) @auth_bp.get("/clear/") async def clear(): """One-time migration helper: clear all session cookies.""" qsession.clear() resp = redirect(account_url("/")) resp.delete_cookie("blog_session", domain=".rose-ash.com", path="/") return resp @auth_bp.get("/sso-logout/") async def sso_logout(): """SSO logout: clear account session, then render a page with hidden iframes that clear each client app's first-party session cookie. Tolerates dead apps — iframes that fail are silently ignored.""" qsession.pop(SESSION_USER_KEY, None) from shared.infrastructure.urls import blog_url, market_url, cart_url, events_url, federation_url clear_urls = [ blog_url("/auth/sso-clear"), market_url("/auth/sso-clear"), cart_url("/auth/sso-clear"), events_url("/auth/sso-clear"), federation_url("/auth/sso-clear"), ] return await render_template( "auth/signing_out.html", clear_urls=clear_urls, final_url=blog_url("/"), ) return auth_bp