"""Authentication routes for the federation app. Ported from blog/bp/auth/routes.py — owns magic link login/logout plus the OOB account page system (newsletters, widget pages). """ from __future__ import annotations from datetime import datetime, timezone from quart import ( Blueprint, request, render_template, make_response, redirect, url_for, session as qsession, g, current_app, ) from sqlalchemy import select from sqlalchemy.exc import SQLAlchemyError from shared.db.session import get_session from shared.models import User, UserNewsletter from shared.models.ghost_membership_entities import GhostNewsletter from shared.services.widget_registry import widgets from shared.config import config from shared.utils import host_url from shared.infrastructure.urls import federation_url from shared.infrastructure.cart_identity import current_cart_identity from shared.events import emit_activity from shared.services.registry import services from .services import ( pop_login_redirect_target, store_login_redirect_target, send_magic_email, find_or_create_user, create_magic_link, validate_magic_link, validate_email, ) SESSION_USER_KEY = "uid" oob = { "oob_extends": "oob_elements.html", "extends": "_types/root/_index.html", "parent_id": "root-header-child", "child_id": "auth-header-child", "header": "_types/auth/header/_header.html", "parent_header": "_types/root/header/_header.html", "nav": "_types/auth/_nav.html", "main": "_types/auth/_main_panel.html", } def register(url_prefix="/auth"): auth_bp = Blueprint("auth", __name__, url_prefix=url_prefix) @auth_bp.context_processor def context(): return {"oob": oob, "account_nav_links": widgets.account_nav} @auth_bp.get("/login/") async def login_form(): store_login_redirect_target() cross_cart_sid = request.args.get("cart_sid") if cross_cart_sid: qsession["cart_sid"] = cross_cart_sid if g.get("user"): return redirect(federation_url("/")) return await render_template("auth/login.html") @auth_bp.get("/account/") async def account(): from shared.browser.app.utils.htmx import is_htmx_request if not g.get("user"): return redirect(host_url(url_for("auth.login_form"))) if not is_htmx_request(): html = await render_template("_types/auth/index.html") else: html = await render_template("_types/auth/_oob_elements.html") return await make_response(html) @auth_bp.get("/newsletters/") async def newsletters(): from shared.browser.app.utils.htmx import is_htmx_request if not g.get("user"): return redirect(host_url(url_for("auth.login_form"))) result = await g.s.execute( select(GhostNewsletter).order_by(GhostNewsletter.name) ) all_newsletters = result.scalars().all() sub_result = await g.s.execute( select(UserNewsletter).where( UserNewsletter.user_id == g.user.id, ) ) user_subs = {un.newsletter_id: un for un in sub_result.scalars().all()} newsletter_list = [] for nl in all_newsletters: un = user_subs.get(nl.id) newsletter_list.append({ "newsletter": nl, "un": un, "subscribed": un.subscribed if un else False, }) nl_oob = {**oob, "main": "_types/auth/_newsletters_panel.html"} if not is_htmx_request(): html = await render_template( "_types/auth/index.html", oob=nl_oob, newsletter_list=newsletter_list, ) else: html = await render_template( "_types/auth/_oob_elements.html", oob=nl_oob, newsletter_list=newsletter_list, ) return await make_response(html) @auth_bp.post("/newsletter//toggle/") async def toggle_newsletter(newsletter_id: int): if not g.get("user"): return "", 401 result = await g.s.execute( select(UserNewsletter).where( UserNewsletter.user_id == g.user.id, UserNewsletter.newsletter_id == newsletter_id, ) ) un = result.scalar_one_or_none() if un: un.subscribed = not un.subscribed else: un = UserNewsletter( user_id=g.user.id, newsletter_id=newsletter_id, subscribed=True, ) g.s.add(un) await g.s.flush() return await render_template( "_types/auth/_newsletter_toggle.html", un=un, ) @auth_bp.post("/start/") async def start_login(): form = await request.form email_input = form.get("email") or "" is_valid, email = validate_email(email_input) if not is_valid: return ( await render_template( "auth/login.html", error="Please enter a valid email address.", email=email_input, ), 400, ) user = await find_or_create_user(g.s, email) token, expires = await create_magic_link(g.s, user.id) magic_url = host_url(url_for("auth.magic", token=token)) email_error = None try: await send_magic_email(email, magic_url) except Exception as e: current_app.logger.error("EMAIL SEND FAILED: %r", e) email_error = ( "We couldn't send the email automatically. " "Please try again in a moment." ) return await render_template( "auth/check_email.html", email=email, email_error=email_error, ) @auth_bp.get("/magic//") async def magic(token: str): now = datetime.now(timezone.utc) user_id: int | None = None try: async with get_session() as s: async with s.begin(): user, error = await validate_magic_link(s, token) if error: return ( await render_template("auth/login.html", error=error), 400, ) user_id = user.id except Exception: return ( await render_template( "auth/login.html", error="Could not sign you in right now. Please try again.", ), 502, ) assert user_id is not None ident = current_cart_identity() anon_session_id = ident.get("session_id") try: async with get_session() as s: async with s.begin(): u2 = await s.get(User, user_id) if u2: u2.last_login_at = now if anon_session_id: await emit_activity( s, activity_type="rose:Login", actor_uri="internal:system", object_type="Person", object_data={ "user_id": user_id, "session_id": anon_session_id, }, ) except SQLAlchemyError: current_app.logger.exception( "[auth] non-fatal DB update for user_id=%s", user_id ) qsession[SESSION_USER_KEY] = user_id redirect_url = pop_login_redirect_target() return redirect(redirect_url, 303) @auth_bp.post("/logout/") async def logout(): qsession.pop(SESSION_USER_KEY, None) return redirect(federation_url("/")) # Catch-all for widget pages — must be last to avoid shadowing specific routes @auth_bp.get("//") async def widget_page(slug): from shared.browser.app.utils.htmx import is_htmx_request from quart import abort widget = widgets.account_page_by_slug(slug) if not widget: abort(404) if not g.get("user"): return redirect(host_url(url_for("auth.login_form"))) ctx = await widget.context_fn(g.s, user_id=g.user.id) w_oob = {**oob, "main": widget.template} if not is_htmx_request(): html = await render_template( "_types/auth/index.html", oob=w_oob, **ctx, ) else: html = await render_template( "_types/auth/_oob_elements.html", oob=w_oob, **ctx, ) return await make_response(html) return auth_bp