Files
rose-ash/federation/bp/auth/routes.py
giles 959e63d440
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m44s
Remove render_to_sx from public API: enforce sx_call for all service code
Replace ~250 render_to_sx calls across all services with sync sx_call,
converting many async functions to sync where no other awaits remained.
Make render_to_sx/render_to_sx_with_env private (_render_to_sx).
Add (post-header-ctx) IO primitive and shared post/post-admin defmacros.
Convert built-in post/post-admin layouts from Python to register_sx_layout
with .sx defcomps. Remove dead post_admin_mobile_nav_sx.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-04 19:30:45 +00:00

234 lines
8.1 KiB
Python

"""Authentication routes for the federation app.
Owns magic link login/logout + OAuth2 authorization server endpoint.
Account pages (newsletters, widget pages) have moved to the account app.
"""
from __future__ import annotations
import secrets
from datetime import datetime, timezone, timedelta
from quart import (
Blueprint,
request,
redirect,
url_for,
session as qsession,
g,
current_app,
)
from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from shared.db.session import get_session
from shared.models import User
from shared.models.oauth_code import OAuthCode
from shared.infrastructure.urls import federation_url, app_url
from shared.infrastructure.cart_identity import current_cart_identity
from shared.events import emit_activity
from .services import (
pop_login_redirect_target,
store_login_redirect_target,
send_magic_email,
find_or_create_user,
create_magic_link,
validate_magic_link,
validate_email,
)
SESSION_USER_KEY = "uid"
ALLOWED_CLIENTS = {"blog", "market", "cart", "events", "account"}
async def _render_social_auth_page(component: str, title: str, **kwargs) -> str:
"""Render an auth page with social layout — replaces sx_components helpers."""
from shared.sx.helpers import sx_call
from shared.sx.page import get_template_context
from sxc.pages.utils import _social_page
ctx = await get_template_context()
content = sx_call(component, **{k: v for k, v in kwargs.items() if v})
return await _social_page(ctx, None, content=content, title=title)
def register(url_prefix="/auth"):
auth_bp = Blueprint("auth", __name__, url_prefix=url_prefix)
# --- OAuth2 authorize endpoint -------------------------------------------
@auth_bp.get("/oauth/authorize")
@auth_bp.get("/oauth/authorize/")
async def oauth_authorize():
client_id = request.args.get("client_id", "")
redirect_uri = request.args.get("redirect_uri", "")
state = request.args.get("state", "")
if client_id not in ALLOWED_CLIENTS:
return "Invalid client_id", 400
expected_redirect = app_url(client_id, "/auth/callback")
if redirect_uri != expected_redirect:
return "Invalid redirect_uri", 400
# Not logged in — bounce to magic link login, then back here
if not g.get("user"):
# Preserve the full authorize URL so we return here after login
authorize_path = request.full_path # includes query string
store_login_redirect_target()
return redirect(url_for("auth.login_form", next=authorize_path))
# Logged in — issue authorization code
code = secrets.token_urlsafe(48)
now = datetime.now(timezone.utc)
expires = now + timedelta(minutes=5)
async with get_session() as s:
async with s.begin():
oauth_code = OAuthCode(
code=code,
user_id=g.user.id,
client_id=client_id,
redirect_uri=redirect_uri,
expires_at=expires,
)
s.add(oauth_code)
sep = "&" if "?" in redirect_uri else "?"
return redirect(f"{redirect_uri}{sep}code={code}&state={state}")
# --- Magic link login flow -----------------------------------------------
@auth_bp.get("/login/")
async def login_form():
store_login_redirect_target()
cross_cart_sid = request.args.get("cart_sid")
if cross_cart_sid:
qsession["cart_sid"] = cross_cart_sid
if g.get("user"):
# If there's a pending redirect (e.g. OAuth authorize), follow it
redirect_url = pop_login_redirect_target()
return redirect(redirect_url)
return await _render_social_auth_page("account-login-content", "Login \u2014 Rose Ash")
@auth_bp.post("/start/")
async def start_login():
form = await request.form
email_input = form.get("email") or ""
is_valid, email = validate_email(email_input)
if not is_valid:
return await _render_social_auth_page(
"account-login-content", "Login \u2014 Rose Ash",
error="Please enter a valid email address.", email=email_input,
), 400
user = await find_or_create_user(g.s, email)
token, expires = await create_magic_link(g.s, user.id)
from shared.utils import host_url
magic_url = host_url(url_for("auth.magic", token=token))
email_error = None
try:
await send_magic_email(email, magic_url)
except Exception as e:
current_app.logger.error("EMAIL SEND FAILED: %r", e)
email_error = (
"We couldn't send the email automatically. "
"Please try again in a moment."
)
return await _render_social_auth_page(
"account-check-email-content", "Check your email \u2014 Rose Ash",
email=email, email_error=email_error,
)
@auth_bp.get("/magic/<token>/")
async def magic(token: str):
now = datetime.now(timezone.utc)
user_id: int | None = None
try:
async with get_session() as s:
async with s.begin():
user, error = await validate_magic_link(s, token)
if error:
return await _render_social_auth_page(
"account-login-content", "Login \u2014 Rose Ash",
error=error,
), 400
user_id = user.id
except Exception:
return await _render_social_auth_page(
"account-login-content", "Login \u2014 Rose Ash",
error="Could not sign you in right now. Please try again.",
), 502
assert user_id is not None
ident = current_cart_identity()
anon_session_id = ident.get("session_id")
try:
async with get_session() as s:
async with s.begin():
u2 = await s.get(User, user_id)
if u2:
u2.last_login_at = now
if anon_session_id:
await emit_activity(
s,
activity_type="rose:Login",
actor_uri="internal:system",
object_type="Person",
object_data={
"user_id": user_id,
"session_id": anon_session_id,
},
)
except SQLAlchemyError:
current_app.logger.exception(
"[auth] non-fatal DB update for user_id=%s", user_id
)
qsession[SESSION_USER_KEY] = user_id
redirect_url = pop_login_redirect_target()
resp = redirect(redirect_url, 303)
resp.set_cookie(
"sso_hint", "1",
domain=".rose-ash.com", max_age=30 * 24 * 3600,
secure=True, samesite="Lax", httponly=True,
)
return resp
@auth_bp.post("/logout/")
async def logout():
qsession.pop(SESSION_USER_KEY, None)
resp = redirect(federation_url("/"))
resp.delete_cookie("sso_hint", domain=".rose-ash.com", path="/")
return resp
@auth_bp.get("/clear/")
async def clear():
"""One-time migration helper: clear all session cookies."""
qsession.clear()
resp = redirect(federation_url("/"))
resp.delete_cookie("blog_session", domain=".rose-ash.com", path="/")
resp.delete_cookie("sso_hint", domain=".rose-ash.com", path="/")
return resp
@auth_bp.get("/sso-logout/")
async def sso_logout():
"""SSO logout: clear federation session + sso_hint, redirect to blog."""
qsession.pop(SESSION_USER_KEY, None)
from shared.infrastructure.urls import blog_url
resp = redirect(blog_url("/"))
resp.delete_cookie("sso_hint", domain=".rose-ash.com", path="/")
return resp
return auth_bp