This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
account/bp/auth/routes.py
giles 2178607484
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 42s
Move auth server from federation to account
Account is now the OAuth authorization server with magic link login,
OAuth2 authorize endpoint, SSO logout, and session management.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 12:00:20 +00:00

233 lines
7.5 KiB
Python

"""Authentication routes for the account app.
Account is the OAuth authorization server. Owns magic link login/logout,
OAuth2 authorize endpoint, and SSO logout.
"""
from __future__ import annotations
import secrets
from datetime import datetime, timezone, timedelta
from quart import (
Blueprint,
request,
render_template,
redirect,
url_for,
session as qsession,
g,
current_app,
)
from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from shared.db.session import get_session
from shared.models import User
from shared.models.oauth_code import OAuthCode
from shared.infrastructure.urls import account_url, app_url
from shared.infrastructure.cart_identity import current_cart_identity
from shared.events import emit_activity
from .services import (
pop_login_redirect_target,
store_login_redirect_target,
send_magic_email,
find_or_create_user,
create_magic_link,
validate_magic_link,
validate_email,
)
SESSION_USER_KEY = "uid"
ALLOWED_CLIENTS = {"blog", "market", "cart", "events", "federation"}
def register(url_prefix="/auth"):
auth_bp = Blueprint("auth", __name__, url_prefix=url_prefix)
# --- OAuth2 authorize endpoint -------------------------------------------
@auth_bp.get("/oauth/authorize")
@auth_bp.get("/oauth/authorize/")
async def oauth_authorize():
client_id = request.args.get("client_id", "")
redirect_uri = request.args.get("redirect_uri", "")
state = request.args.get("state", "")
if client_id not in ALLOWED_CLIENTS:
return "Invalid client_id", 400
expected_redirect = app_url(client_id, "/auth/callback")
if redirect_uri != expected_redirect:
return "Invalid redirect_uri", 400
# Not logged in — bounce to magic link login, then back here
if not g.get("user"):
# Preserve the full authorize URL so we return here after login
authorize_path = request.full_path # includes query string
store_login_redirect_target()
return redirect(url_for("auth.login_form", next=authorize_path))
# Logged in — issue authorization code
code = secrets.token_urlsafe(48)
now = datetime.now(timezone.utc)
expires = now + timedelta(minutes=5)
async with get_session() as s:
async with s.begin():
oauth_code = OAuthCode(
code=code,
user_id=g.user.id,
client_id=client_id,
redirect_uri=redirect_uri,
expires_at=expires,
)
s.add(oauth_code)
sep = "&" if "?" in redirect_uri else "?"
return redirect(f"{redirect_uri}{sep}code={code}&state={state}")
# --- Magic link login flow -----------------------------------------------
@auth_bp.get("/login/")
async def login_form():
store_login_redirect_target()
cross_cart_sid = request.args.get("cart_sid")
if cross_cart_sid:
qsession["cart_sid"] = cross_cart_sid
if g.get("user"):
# If there's a pending redirect (e.g. OAuth authorize), follow it
redirect_url = pop_login_redirect_target()
return redirect(redirect_url)
return await render_template("auth/login.html")
@auth_bp.post("/start/")
async def start_login():
form = await request.form
email_input = form.get("email") or ""
is_valid, email = validate_email(email_input)
if not is_valid:
return (
await render_template(
"auth/login.html",
error="Please enter a valid email address.",
email=email_input,
),
400,
)
user = await find_or_create_user(g.s, email)
token, expires = await create_magic_link(g.s, user.id)
from shared.utils import host_url
magic_url = host_url(url_for("auth.magic", token=token))
email_error = None
try:
await send_magic_email(email, magic_url)
except Exception as e:
current_app.logger.error("EMAIL SEND FAILED: %r", e)
email_error = (
"We couldn't send the email automatically. "
"Please try again in a moment."
)
return await render_template(
"auth/check_email.html",
email=email,
email_error=email_error,
)
@auth_bp.get("/magic/<token>/")
async def magic(token: str):
now = datetime.now(timezone.utc)
user_id: int | None = None
try:
async with get_session() as s:
async with s.begin():
user, error = await validate_magic_link(s, token)
if error:
return (
await render_template("auth/login.html", error=error),
400,
)
user_id = user.id
except Exception:
return (
await render_template(
"auth/login.html",
error="Could not sign you in right now. Please try again.",
),
502,
)
assert user_id is not None
ident = current_cart_identity()
anon_session_id = ident.get("session_id")
try:
async with get_session() as s:
async with s.begin():
u2 = await s.get(User, user_id)
if u2:
u2.last_login_at = now
if anon_session_id:
await emit_activity(
s,
activity_type="rose:Login",
actor_uri="internal:system",
object_type="Person",
object_data={
"user_id": user_id,
"session_id": anon_session_id,
},
)
except SQLAlchemyError:
current_app.logger.exception(
"[auth] non-fatal DB update for user_id=%s", user_id
)
qsession[SESSION_USER_KEY] = user_id
redirect_url = pop_login_redirect_target()
resp = redirect(redirect_url, 303)
resp.set_cookie(
"sso_hint", "1",
domain=".rose-ash.com", max_age=30 * 24 * 3600,
secure=True, samesite="Lax", httponly=True,
)
return resp
@auth_bp.post("/logout/")
async def logout():
qsession.pop(SESSION_USER_KEY, None)
resp = redirect(account_url("/"))
resp.delete_cookie("sso_hint", domain=".rose-ash.com", path="/")
return resp
@auth_bp.get("/clear/")
async def clear():
"""One-time migration helper: clear all session cookies."""
qsession.clear()
resp = redirect(account_url("/"))
resp.delete_cookie("blog_session", domain=".rose-ash.com", path="/")
resp.delete_cookie("sso_hint", domain=".rose-ash.com", path="/")
return resp
@auth_bp.get("/sso-logout/")
async def sso_logout():
"""SSO logout: clear account session + sso_hint, redirect to blog."""
qsession.pop(SESSION_USER_KEY, None)
from shared.infrastructure.urls import blog_url
resp = redirect(blog_url("/"))
resp.delete_cookie("sso_hint", domain=".rose-ash.com", path="/")
return resp
return auth_bp