"""Authentication routes for the account app. Account is the OAuth authorization server. Owns magic link login/logout, OAuth2 authorize endpoint, grant verification, and SSO logout. """ from __future__ import annotations import secrets from datetime import datetime, timezone, timedelta from quart import ( Blueprint, request, render_template, redirect, url_for, session as qsession, g, current_app, jsonify, ) from sqlalchemy import select, update from sqlalchemy.exc import SQLAlchemyError from shared.db.session import get_session from shared.models import User from shared.models.oauth_code import OAuthCode from shared.models.oauth_grant import OAuthGrant from shared.infrastructure.urls import account_url, app_url from shared.infrastructure.cart_identity import current_cart_identity from shared.events import emit_activity from .services import ( pop_login_redirect_target, store_login_redirect_target, send_magic_email, find_or_create_user, create_magic_link, validate_magic_link, validate_email, ) SESSION_USER_KEY = "uid" ACCOUNT_SESSION_KEY = "account_sid" ALLOWED_CLIENTS = {"blog", "market", "cart", "events", "federation"} def register(url_prefix="/auth"): auth_bp = Blueprint("auth", __name__, url_prefix=url_prefix) # --- OAuth2 authorize endpoint ------------------------------------------- @auth_bp.get("/oauth/authorize") @auth_bp.get("/oauth/authorize/") async def oauth_authorize(): client_id = request.args.get("client_id", "") redirect_uri = request.args.get("redirect_uri", "") state = request.args.get("state", "") if client_id not in ALLOWED_CLIENTS: return "Invalid client_id", 400 expected_redirect = app_url(client_id, "/auth/callback") if redirect_uri != expected_redirect: return "Invalid redirect_uri", 400 # Not logged in — bounce to magic link login, then back here if not g.get("user"): authorize_path = request.full_path store_login_redirect_target() return redirect(url_for("auth.login_form", next=authorize_path)) # Logged in — create grant + authorization code account_sid = qsession.get(ACCOUNT_SESSION_KEY) if not account_sid: account_sid = secrets.token_urlsafe(32) qsession[ACCOUNT_SESSION_KEY] = account_sid grant_token = secrets.token_urlsafe(48) code = secrets.token_urlsafe(48) now = datetime.now(timezone.utc) expires = now + timedelta(minutes=5) async with get_session() as s: async with s.begin(): grant = OAuthGrant( token=grant_token, user_id=g.user.id, client_id=client_id, issuer_session=account_sid, ) s.add(grant) oauth_code = OAuthCode( code=code, user_id=g.user.id, client_id=client_id, redirect_uri=redirect_uri, expires_at=expires, grant_token=grant_token, ) s.add(oauth_code) sep = "&" if "?" in redirect_uri else "?" return redirect(f"{redirect_uri}{sep}code={code}&state={state}") # --- Grant verification (internal endpoint) ------------------------------ @auth_bp.get("/internal/verify-grant") async def verify_grant(): """Called by client apps to check if a grant is still valid.""" token = request.args.get("token", "") if not token: return jsonify({"valid": False}), 200 async with get_session() as s: grant = await s.scalar( select(OAuthGrant).where(OAuthGrant.token == token) ) if not grant or grant.revoked_at is not None: return jsonify({"valid": False}), 200 return jsonify({"valid": True}), 200 # --- Magic link login flow ----------------------------------------------- @auth_bp.get("/login/") async def login_form(): store_login_redirect_target() cross_cart_sid = request.args.get("cart_sid") if cross_cart_sid: qsession["cart_sid"] = cross_cart_sid if g.get("user"): redirect_url = pop_login_redirect_target() return redirect(redirect_url) return await render_template("auth/login.html") @auth_bp.post("/start/") async def start_login(): form = await request.form email_input = form.get("email") or "" is_valid, email = validate_email(email_input) if not is_valid: return ( await render_template( "auth/login.html", error="Please enter a valid email address.", email=email_input, ), 400, ) user = await find_or_create_user(g.s, email) token, expires = await create_magic_link(g.s, user.id) from shared.utils import host_url magic_url = host_url(url_for("auth.magic", token=token)) email_error = None try: await send_magic_email(email, magic_url) except Exception as e: current_app.logger.error("EMAIL SEND FAILED: %r", e) email_error = ( "We couldn't send the email automatically. " "Please try again in a moment." ) return await render_template( "auth/check_email.html", email=email, email_error=email_error, ) @auth_bp.get("/magic//") async def magic(token: str): now = datetime.now(timezone.utc) user_id: int | None = None try: async with get_session() as s: async with s.begin(): user, error = await validate_magic_link(s, token) if error: return ( await render_template("auth/login.html", error=error), 400, ) user_id = user.id except Exception: return ( await render_template( "auth/login.html", error="Could not sign you in right now. Please try again.", ), 502, ) assert user_id is not None ident = current_cart_identity() anon_session_id = ident.get("session_id") try: async with get_session() as s: async with s.begin(): u2 = await s.get(User, user_id) if u2: u2.last_login_at = now if anon_session_id: await emit_activity( s, activity_type="rose:Login", actor_uri="internal:system", object_type="Person", object_data={ "user_id": user_id, "session_id": anon_session_id, }, ) except SQLAlchemyError: current_app.logger.exception( "[auth] non-fatal DB update for user_id=%s", user_id ) qsession[SESSION_USER_KEY] = user_id # Fresh account session ID for grant tracking qsession[ACCOUNT_SESSION_KEY] = secrets.token_urlsafe(32) # Propagate login to all client apps via OAuth chain redirect_url = pop_login_redirect_target() qsession["sso_final"] = redirect_url qsession["sso_chain"] = list(ALLOWED_CLIENTS) return redirect(url_for("auth.propagate"), 303) @auth_bp.get("/propagate") @auth_bp.get("/propagate/") async def propagate(): """Chain through each client app's OAuth login to propagate the account session. Each app does its OAuth flow (instant since account is already logged in) then redirects back here. When the chain is empty, redirect to the original target. Dead apps are skipped via internal health check.""" import os, aiohttp from urllib.parse import quote chain = qsession.get("sso_chain", []) final = qsession.get("sso_final", account_url("/")) if not g.get("user"): qsession.pop("sso_chain", None) qsession.pop("sso_final", None) return redirect(final) comeback = account_url("/auth/propagate") while chain: next_app = chain.pop(0) qsession["sso_chain"] = chain # Health check via internal URL before redirecting internal = (os.getenv(f"INTERNAL_URL_{next_app.upper()}") or f"http://{next_app}:8000").rstrip("/") try: async with aiohttp.ClientSession() as http: async with http.head( internal, timeout=aiohttp.ClientTimeout(total=2), allow_redirects=True, ) as resp: if resp.status < 500: login = app_url(next_app, f"/auth/login?next={quote(comeback, safe='')}") return redirect(login) except Exception: current_app.logger.warning("[propagate] skipping dead app: %s", next_app) continue qsession.pop("sso_chain", None) qsession.pop("sso_final", None) return redirect(final) @auth_bp.post("/logout/") async def logout(): # Revoke all grants issued by this account session account_sid = qsession.get(ACCOUNT_SESSION_KEY) if account_sid: try: async with get_session() as s: async with s.begin(): await s.execute( update(OAuthGrant) .where(OAuthGrant.issuer_session == account_sid) .where(OAuthGrant.revoked_at.is_(None)) .values(revoked_at=datetime.now(timezone.utc)) ) except SQLAlchemyError: current_app.logger.exception("[auth] failed to revoke grants") qsession.pop(SESSION_USER_KEY, None) qsession.pop(ACCOUNT_SESSION_KEY, None) from shared.infrastructure.urls import blog_url return redirect(blog_url("/")) @auth_bp.get("/sso-logout/") async def sso_logout(): """SSO logout called by client apps: revoke grants, clear session.""" account_sid = qsession.get(ACCOUNT_SESSION_KEY) if account_sid: try: async with get_session() as s: async with s.begin(): await s.execute( update(OAuthGrant) .where(OAuthGrant.issuer_session == account_sid) .where(OAuthGrant.revoked_at.is_(None)) .values(revoked_at=datetime.now(timezone.utc)) ) except SQLAlchemyError: current_app.logger.exception("[auth] failed to revoke grants") qsession.pop(SESSION_USER_KEY, None) qsession.pop(ACCOUNT_SESSION_KEY, None) from shared.infrastructure.urls import blog_url return redirect(blog_url("/")) @auth_bp.get("/clear/") async def clear(): """One-time migration helper: clear all session cookies.""" qsession.clear() resp = redirect(account_url("/")) resp.delete_cookie("blog_session", domain=".rose-ash.com", path="/") return resp return auth_bp