All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 48s
Add OOB account page system with newsletters, widget pages, and toggle routes. Update shared submodule for federation_url. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
297 lines
8.9 KiB
Python
297 lines
8.9 KiB
Python
"""Authentication routes for the federation app.
|
|
|
|
Ported from blog/bp/auth/routes.py — owns magic link login/logout
|
|
plus the OOB account page system (newsletters, widget pages).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
from quart import (
|
|
Blueprint,
|
|
request,
|
|
render_template,
|
|
make_response,
|
|
redirect,
|
|
url_for,
|
|
session as qsession,
|
|
g,
|
|
current_app,
|
|
)
|
|
from sqlalchemy import select
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
|
|
from shared.db.session import get_session
|
|
from shared.models import User, UserNewsletter
|
|
from shared.models.ghost_membership_entities import GhostNewsletter
|
|
from shared.services.widget_registry import widgets
|
|
from shared.config import config
|
|
from shared.utils import host_url
|
|
from shared.infrastructure.urls import federation_url
|
|
from shared.infrastructure.cart_identity import current_cart_identity
|
|
from shared.events import emit_activity
|
|
from shared.services.registry import services
|
|
|
|
from .services import (
|
|
pop_login_redirect_target,
|
|
store_login_redirect_target,
|
|
send_magic_email,
|
|
find_or_create_user,
|
|
create_magic_link,
|
|
validate_magic_link,
|
|
validate_email,
|
|
)
|
|
|
|
SESSION_USER_KEY = "uid"
|
|
|
|
oob = {
|
|
"oob_extends": "oob_elements.html",
|
|
"extends": "_types/root/_index.html",
|
|
"parent_id": "root-header-child",
|
|
"child_id": "auth-header-child",
|
|
"header": "_types/auth/header/_header.html",
|
|
"parent_header": "_types/root/header/_header.html",
|
|
"nav": "_types/auth/_nav.html",
|
|
"main": "_types/auth/_main_panel.html",
|
|
}
|
|
|
|
|
|
def register(url_prefix="/auth"):
|
|
auth_bp = Blueprint("auth", __name__, url_prefix=url_prefix)
|
|
|
|
@auth_bp.context_processor
|
|
def context():
|
|
return {"oob": oob, "account_nav_links": widgets.account_nav}
|
|
|
|
@auth_bp.get("/login/")
|
|
async def login_form():
|
|
store_login_redirect_target()
|
|
cross_cart_sid = request.args.get("cart_sid")
|
|
if cross_cart_sid:
|
|
qsession["cart_sid"] = cross_cart_sid
|
|
if g.get("user"):
|
|
return redirect(federation_url("/"))
|
|
return await render_template("auth/login.html")
|
|
|
|
@auth_bp.get("/account/")
|
|
async def account():
|
|
from shared.browser.app.utils.htmx import is_htmx_request
|
|
|
|
if not g.get("user"):
|
|
return redirect(host_url(url_for("auth.login_form")))
|
|
|
|
if not is_htmx_request():
|
|
html = await render_template("_types/auth/index.html")
|
|
else:
|
|
html = await render_template("_types/auth/_oob_elements.html")
|
|
|
|
return await make_response(html)
|
|
|
|
@auth_bp.get("/newsletters/")
|
|
async def newsletters():
|
|
from shared.browser.app.utils.htmx import is_htmx_request
|
|
|
|
if not g.get("user"):
|
|
return redirect(host_url(url_for("auth.login_form")))
|
|
|
|
result = await g.s.execute(
|
|
select(GhostNewsletter).order_by(GhostNewsletter.name)
|
|
)
|
|
all_newsletters = result.scalars().all()
|
|
|
|
sub_result = await g.s.execute(
|
|
select(UserNewsletter).where(
|
|
UserNewsletter.user_id == g.user.id,
|
|
)
|
|
)
|
|
user_subs = {un.newsletter_id: un for un in sub_result.scalars().all()}
|
|
|
|
newsletter_list = []
|
|
for nl in all_newsletters:
|
|
un = user_subs.get(nl.id)
|
|
newsletter_list.append({
|
|
"newsletter": nl,
|
|
"un": un,
|
|
"subscribed": un.subscribed if un else False,
|
|
})
|
|
|
|
nl_oob = {**oob, "main": "_types/auth/_newsletters_panel.html"}
|
|
|
|
if not is_htmx_request():
|
|
html = await render_template(
|
|
"_types/auth/index.html",
|
|
oob=nl_oob,
|
|
newsletter_list=newsletter_list,
|
|
)
|
|
else:
|
|
html = await render_template(
|
|
"_types/auth/_oob_elements.html",
|
|
oob=nl_oob,
|
|
newsletter_list=newsletter_list,
|
|
)
|
|
|
|
return await make_response(html)
|
|
|
|
@auth_bp.post("/newsletter/<int:newsletter_id>/toggle/")
|
|
async def toggle_newsletter(newsletter_id: int):
|
|
if not g.get("user"):
|
|
return "", 401
|
|
|
|
result = await g.s.execute(
|
|
select(UserNewsletter).where(
|
|
UserNewsletter.user_id == g.user.id,
|
|
UserNewsletter.newsletter_id == newsletter_id,
|
|
)
|
|
)
|
|
un = result.scalar_one_or_none()
|
|
|
|
if un:
|
|
un.subscribed = not un.subscribed
|
|
else:
|
|
un = UserNewsletter(
|
|
user_id=g.user.id,
|
|
newsletter_id=newsletter_id,
|
|
subscribed=True,
|
|
)
|
|
g.s.add(un)
|
|
|
|
await g.s.flush()
|
|
|
|
return await render_template(
|
|
"_types/auth/_newsletter_toggle.html",
|
|
un=un,
|
|
)
|
|
|
|
@auth_bp.post("/start/")
|
|
async def start_login():
|
|
form = await request.form
|
|
email_input = form.get("email") or ""
|
|
|
|
is_valid, email = validate_email(email_input)
|
|
if not is_valid:
|
|
return (
|
|
await render_template(
|
|
"auth/login.html",
|
|
error="Please enter a valid email address.",
|
|
email=email_input,
|
|
),
|
|
400,
|
|
)
|
|
|
|
user = await find_or_create_user(g.s, email)
|
|
token, expires = await create_magic_link(g.s, user.id)
|
|
|
|
magic_url = host_url(url_for("auth.magic", token=token))
|
|
|
|
email_error = None
|
|
try:
|
|
await send_magic_email(email, magic_url)
|
|
except Exception as e:
|
|
current_app.logger.error("EMAIL SEND FAILED: %r", e)
|
|
email_error = (
|
|
"We couldn't send the email automatically. "
|
|
"Please try again in a moment."
|
|
)
|
|
|
|
return await render_template(
|
|
"auth/check_email.html",
|
|
email=email,
|
|
email_error=email_error,
|
|
)
|
|
|
|
@auth_bp.get("/magic/<token>/")
|
|
async def magic(token: str):
|
|
now = datetime.now(timezone.utc)
|
|
user_id: int | None = None
|
|
|
|
try:
|
|
async with get_session() as s:
|
|
async with s.begin():
|
|
user, error = await validate_magic_link(s, token)
|
|
|
|
if error:
|
|
return (
|
|
await render_template("auth/login.html", error=error),
|
|
400,
|
|
)
|
|
user_id = user.id
|
|
|
|
except Exception:
|
|
return (
|
|
await render_template(
|
|
"auth/login.html",
|
|
error="Could not sign you in right now. Please try again.",
|
|
),
|
|
502,
|
|
)
|
|
|
|
assert user_id is not None
|
|
|
|
ident = current_cart_identity()
|
|
anon_session_id = ident.get("session_id")
|
|
|
|
try:
|
|
async with get_session() as s:
|
|
async with s.begin():
|
|
u2 = await s.get(User, user_id)
|
|
if u2:
|
|
u2.last_login_at = now
|
|
if anon_session_id:
|
|
await emit_activity(
|
|
s,
|
|
activity_type="rose:Login",
|
|
actor_uri="internal:system",
|
|
object_type="Person",
|
|
object_data={
|
|
"user_id": user_id,
|
|
"session_id": anon_session_id,
|
|
},
|
|
)
|
|
except SQLAlchemyError:
|
|
current_app.logger.exception(
|
|
"[auth] non-fatal DB update for user_id=%s", user_id
|
|
)
|
|
|
|
qsession[SESSION_USER_KEY] = user_id
|
|
|
|
redirect_url = pop_login_redirect_target()
|
|
return redirect(redirect_url, 303)
|
|
|
|
@auth_bp.post("/logout/")
|
|
async def logout():
|
|
qsession.pop(SESSION_USER_KEY, None)
|
|
return redirect(federation_url("/"))
|
|
|
|
# Catch-all for widget pages — must be last to avoid shadowing specific routes
|
|
@auth_bp.get("/<slug>/")
|
|
async def widget_page(slug):
|
|
from shared.browser.app.utils.htmx import is_htmx_request
|
|
from quart import abort
|
|
|
|
widget = widgets.account_page_by_slug(slug)
|
|
if not widget:
|
|
abort(404)
|
|
|
|
if not g.get("user"):
|
|
return redirect(host_url(url_for("auth.login_form")))
|
|
|
|
ctx = await widget.context_fn(g.s, user_id=g.user.id)
|
|
w_oob = {**oob, "main": widget.template}
|
|
|
|
if not is_htmx_request():
|
|
html = await render_template(
|
|
"_types/auth/index.html",
|
|
oob=w_oob,
|
|
**ctx,
|
|
)
|
|
else:
|
|
html = await render_template(
|
|
"_types/auth/_oob_elements.html",
|
|
oob=w_oob,
|
|
**ctx,
|
|
)
|
|
|
|
return await make_response(html)
|
|
|
|
return auth_bp
|