This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
blog/bp/auth/routes.py
giles 8f7a15186c
Some checks failed
Build and Deploy / build-and-deploy (push) Has been cancelled
feat: initialize blog app with blueprints and templates
Extract blog-specific code from the coop monolith into a standalone
repository. Includes auth, blog, post, admin, menu_items, snippets
blueprints, associated templates, Dockerfile (APP_MODULE=app:app),
entrypoint, and Gitea CI workflow.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 23:15:56 +00:00

314 lines
9.6 KiB
Python

from __future__ import annotations
import os
import secrets
from datetime import datetime, timedelta, timezone
from quart import (
Blueprint,
request,
render_template,
make_response,
redirect,
url_for,
session as qsession,
g,
current_app,
)
from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from ..blog.ghost.ghost_sync import (
sync_member_to_ghost,
)
from db.session import get_session
from models import User, MagicLink, UserNewsletter
from models.ghost_membership_entities import GhostNewsletter
from config import config
from utils import host_url
from shared.urls import coop_url
from sqlalchemy.orm import selectinload
from suma_browser.app.redis_cacher import clear_cache
from shared.cart_identity import current_cart_identity
from shared.internal_api import post as api_post
from .services import pop_login_redirect_target, store_login_redirect_target
from .services.auth_operations import (
get_app_host,
get_app_root,
send_magic_email,
load_user_by_id,
find_or_create_user,
create_magic_link,
validate_magic_link,
validate_email,
)
oob = {
"oob_extends": "oob_elements.html",
"extends": "_types/root/_index.html",
"parent_id": "root-header-child",
"child_id": "auth-header-child",
"header": "_types/auth/header/_header.html",
"parent_header": "_types/root/header/_header.html",
"nav": "_types/auth/_nav.html",
"main": "_types/auth/_main_panel.html"
}
def register(url_prefix="/auth"):
auth_bp = Blueprint("auth", __name__, url_prefix=url_prefix)
@auth_bp.before_request
def route():
pass
SESSION_USER_KEY = "uid"
@auth_bp.context_processor
def context():
return {
"oob": oob,
}
# NOTE: load_current_user moved to shared/user_loader.py
# and registered in shared/factory.py as an app-level before_request
@auth_bp.get("/login/")
async def login_form():
store_login_redirect_target()
if g.get("user"):
return redirect(coop_url("/"))
return await render_template("_types/auth/login.html")
@auth_bp.get("/account/")
async def account():
from suma_browser.app.utils.htmx import is_htmx_request
if not g.get("user"):
return redirect(host_url(url_for("auth.login_form")))
# TODO: Create _main_panel.html and _oob_elements.html for optimized HTMX
# For now, render full template for both HTMX and normal requests
# Determine which template to use based on request type
if not is_htmx_request():
# Normal browser request: full page with layout
html = await render_template("_types/auth/index.html")
else:
# HTMX request: main panel + OOB elements
html = await render_template(
"_types/auth/_oob_elements.html",
)
return await make_response(html)
@auth_bp.get("/newsletters/")
async def newsletters():
from suma_browser.app.utils.htmx import is_htmx_request
if not g.get("user"):
return redirect(host_url(url_for("auth.login_form")))
# Fetch all newsletters, sorted alphabetically
result = await g.s.execute(
select(GhostNewsletter).order_by(GhostNewsletter.name)
)
all_newsletters = result.scalars().all()
# Fetch user's subscription states
sub_result = await g.s.execute(
select(UserNewsletter).where(
UserNewsletter.user_id == g.user.id,
)
)
user_subs = {un.newsletter_id: un for un in sub_result.scalars().all()}
# Build list with subscription state for template
newsletter_list = []
for nl in all_newsletters:
un = user_subs.get(nl.id)
newsletter_list.append({
"newsletter": nl,
"un": un,
"subscribed": un.subscribed if un else False,
})
nl_oob = {**oob, "main": "_types/auth/_newsletters_panel.html"}
if not is_htmx_request():
html = await render_template(
"_types/auth/index.html",
oob=nl_oob,
newsletter_list=newsletter_list,
)
else:
html = await render_template(
"_types/auth/_oob_elements.html",
oob=nl_oob,
newsletter_list=newsletter_list,
)
return await make_response(html)
@auth_bp.post("/start/")
@clear_cache(tag_scope="user", clear_user=True)
async def start_login():
# 1. Get and validate email
form = await request.form
email_input = form.get("email") or ""
is_valid, email = validate_email(email_input)
if not is_valid:
return (
await render_template(
"_types/auth/login.html",
error="Please enter a valid email address.",
email=email_input,
),
400,
)
# 2. Create/find user and issue magic link token
user = await find_or_create_user(g.s, email)
token, expires = await create_magic_link(g.s, user.id)
g.s.commit()
# 3. Build the magic link URL
magic_url = host_url(url_for("auth.magic", token=token))
# 4. Try sending the email
email_error = None
try:
await send_magic_email(email, magic_url)
except Exception as e:
print("EMAIL SEND FAILED:", repr(e))
email_error = (
"We couldn't send the email automatically. "
"Please try again in a moment."
)
# 5. Render "check your email" page
return await render_template(
"_types/auth/check_email.html",
email=email,
email_error=email_error,
)
@auth_bp.get("/magic/<token>/")
async def magic(token: str):
now = datetime.now(timezone.utc)
user_id: int | None = None
# ---- Step 1: Validate & consume magic link ----
try:
async with get_session() as s:
async with s.begin():
user, error = await validate_magic_link(s, token)
if error:
return (
await render_template(
"_types/auth/login.html",
error=error,
),
400,
)
user_id = user.id
# Try to ensure Ghost membership inside this txn
try:
if not user.ghost_id:
await sync_member_to_ghost(s, user.id)
except Exception:
current_app.logger.exception(
"[auth] Ghost upsert failed for user_id=%s", user.id
)
raise
except Exception:
# Any DB/Ghost error → generic failure
return (
await render_template(
"_types/auth/login.html",
error="Could not sign you in right now. Please try again.",
),
502,
)
# At this point:
# - magic link is consumed
# - user_id is valid
# - Ghost membership is ensured or we already returned 502
assert user_id is not None # for type checkers / sanity
# Figure out any anonymous session we want to adopt
ident = current_cart_identity()
anon_session_id = ident.get("session_id")
# ---- Step 3: best-effort local update (non-fatal) ----
try:
async with get_session() as s:
async with s.begin():
u2 = await s.get(User, user_id)
if u2:
u2.last_login_at = now
# s.begin() will commit on successful exit
except SQLAlchemyError:
current_app.logger.exception(
"[auth] non-fatal DB update after Ghost upsert for user_id=%s", user_id
)
# Adopt cart + calendar entries via cart app internal API
if anon_session_id:
await api_post(
"cart",
"/internal/cart/adopt",
json={"user_id": user_id, "session_id": anon_session_id},
)
# ---- Finalize login ----
qsession[SESSION_USER_KEY] = user_id
# Redirect back to where they came from, if we stored it.
redirect_url = pop_login_redirect_target()
return redirect(redirect_url, 303)
@auth_bp.post("/newsletter/<int:newsletter_id>/toggle/")
async def toggle_newsletter(newsletter_id: int):
if not g.get("user"):
return "", 401
result = await g.s.execute(
select(UserNewsletter).where(
UserNewsletter.user_id == g.user.id,
UserNewsletter.newsletter_id == newsletter_id,
)
)
un = result.scalar_one_or_none()
if un:
un.subscribed = not un.subscribed
else:
un = UserNewsletter(
user_id=g.user.id,
newsletter_id=newsletter_id,
subscribed=True,
)
g.s.add(un)
await g.s.flush()
return await render_template(
"_types/auth/_newsletter_toggle.html",
un=un,
)
@auth_bp.post("/logout/")
async def logout():
qsession.pop(SESSION_USER_KEY, None)
return redirect(coop_url("/"))
return auth_bp