Remove blog auth blueprint — login/account now in federation
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 56s

All auth routes (login, magic link, account, newsletters,
widget pages, logout) are handled by the federation app.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
giles
2026-02-23 09:09:21 +00:00
parent 85fd9d9f60
commit fb1cef6cb5
6 changed files with 0 additions and 660 deletions

3
app.py
View File

@@ -11,7 +11,6 @@ from shared.config import config
from shared.models import KV
from bp import (
register_auth_bp,
register_blog_bp,
register_admin,
register_menu_items,
@@ -63,8 +62,6 @@ def create_app() -> "Quart":
])
# --- blueprints ---
app.register_blueprint(register_auth_bp())
app.register_blueprint(
register_blog_bp(
url_prefix=config()["blog_root"],

View File

@@ -1,4 +1,3 @@
from .auth.routes import register as register_auth_bp
from .blog.routes import register as register_blog_bp
from .admin.routes import register as register_admin
from .menu_items.routes import register as register_menu_items

View File

@@ -1,351 +0,0 @@
from __future__ import annotations
import os
import secrets
from datetime import datetime, timedelta, timezone
from quart import (
Blueprint,
request,
render_template,
make_response,
redirect,
url_for,
session as qsession,
g,
current_app,
)
from sqlalchemy import select
from sqlalchemy.exc import SQLAlchemyError
from ..blog.ghost.ghost_sync import (
sync_member_to_ghost,
)
from shared.db.session import get_session
from shared.models import User, MagicLink, UserNewsletter
from shared.models.ghost_membership_entities import GhostNewsletter
from shared.config import config
from shared.utils import host_url
from shared.infrastructure.urls import blog_url
from shared.services.widget_registry import widgets
from sqlalchemy.orm import selectinload
from shared.browser.app.redis_cacher import clear_cache
from shared.infrastructure.cart_identity import current_cart_identity
from shared.events import emit_activity
from .services import pop_login_redirect_target, store_login_redirect_target
from .services.auth_operations import (
get_app_host,
get_app_root,
send_magic_email,
load_user_by_id,
find_or_create_user,
create_magic_link,
validate_magic_link,
validate_email,
)
oob = {
"oob_extends": "oob_elements.html",
"extends": "_types/root/_index.html",
"parent_id": "root-header-child",
"child_id": "auth-header-child",
"header": "_types/auth/header/_header.html",
"parent_header": "_types/root/header/_header.html",
"nav": "_types/auth/_nav.html",
"main": "_types/auth/_main_panel.html"
}
def register(url_prefix="/auth"):
auth_bp = Blueprint("auth", __name__, url_prefix=url_prefix)
@auth_bp.before_request
def route():
pass
SESSION_USER_KEY = "uid"
@auth_bp.context_processor
def context():
return {
"oob": oob,
"account_nav_links": widgets.account_nav,
}
# NOTE: load_current_user moved to shared/user_loader.py
# and registered in shared/factory.py as an app-level before_request
@auth_bp.get("/login/")
async def login_form():
store_login_redirect_target()
# Preserve anonymous cart session from cross-app redirect
cross_cart_sid = request.args.get("cart_sid")
if cross_cart_sid:
qsession["cart_sid"] = cross_cart_sid
if g.get("user"):
return redirect(blog_url("/"))
return await render_template("_types/auth/login.html")
@auth_bp.get("/account/")
async def account():
from shared.browser.app.utils.htmx import is_htmx_request
if not g.get("user"):
return redirect(host_url(url_for("auth.login_form")))
# Full template for both HTMX and normal requests
# Determine which template to use based on request type
if not is_htmx_request():
# Normal browser request: full page with layout
html = await render_template("_types/auth/index.html")
else:
# HTMX request: main panel + OOB elements
html = await render_template(
"_types/auth/_oob_elements.html",
)
return await make_response(html)
@auth_bp.get("/newsletters/")
async def newsletters():
from shared.browser.app.utils.htmx import is_htmx_request
if not g.get("user"):
return redirect(host_url(url_for("auth.login_form")))
# Fetch all newsletters, sorted alphabetically
result = await g.s.execute(
select(GhostNewsletter).order_by(GhostNewsletter.name)
)
all_newsletters = result.scalars().all()
# Fetch user's subscription states
sub_result = await g.s.execute(
select(UserNewsletter).where(
UserNewsletter.user_id == g.user.id,
)
)
user_subs = {un.newsletter_id: un for un in sub_result.scalars().all()}
# Build list with subscription state for template
newsletter_list = []
for nl in all_newsletters:
un = user_subs.get(nl.id)
newsletter_list.append({
"newsletter": nl,
"un": un,
"subscribed": un.subscribed if un else False,
})
nl_oob = {**oob, "main": "_types/auth/_newsletters_panel.html"}
if not is_htmx_request():
html = await render_template(
"_types/auth/index.html",
oob=nl_oob,
newsletter_list=newsletter_list,
)
else:
html = await render_template(
"_types/auth/_oob_elements.html",
oob=nl_oob,
newsletter_list=newsletter_list,
)
return await make_response(html)
@auth_bp.get("/<slug>/")
async def widget_page(slug):
from shared.browser.app.utils.htmx import is_htmx_request
widget = widgets.account_page_by_slug(slug)
if not widget:
from quart import abort
abort(404)
if not g.get("user"):
return redirect(host_url(url_for("auth.login_form")))
ctx = await widget.context_fn(g.s, user_id=g.user.id)
w_oob = {**oob, "main": widget.template}
if not is_htmx_request():
html = await render_template(
"_types/auth/index.html",
oob=w_oob,
**ctx,
)
else:
html = await render_template(
"_types/auth/_oob_elements.html",
oob=w_oob,
**ctx,
)
return await make_response(html)
@auth_bp.post("/start/")
@clear_cache(tag_scope="user", clear_user=True)
async def start_login():
# 1. Get and validate email
form = await request.form
email_input = form.get("email") or ""
is_valid, email = validate_email(email_input)
if not is_valid:
return (
await render_template(
"_types/auth/login.html",
error="Please enter a valid email address.",
email=email_input,
),
400,
)
# 2. Create/find user and issue magic link token
user = await find_or_create_user(g.s, email)
token, expires = await create_magic_link(g.s, user.id)
g.s.commit()
# 3. Build the magic link URL
magic_url = host_url(url_for("auth.magic", token=token))
# 4. Try sending the email
email_error = None
try:
await send_magic_email(email, magic_url)
except Exception as e:
print("EMAIL SEND FAILED:", repr(e))
email_error = (
"We couldn't send the email automatically. "
"Please try again in a moment."
)
# 5. Render "check your email" page
return await render_template(
"_types/auth/check_email.html",
email=email,
email_error=email_error,
)
@auth_bp.get("/magic/<token>/")
async def magic(token: str):
now = datetime.now(timezone.utc)
user_id: int | None = None
# ---- Step 1: Validate & consume magic link ----
try:
async with get_session() as s:
async with s.begin():
user, error = await validate_magic_link(s, token)
if error:
return (
await render_template(
"_types/auth/login.html",
error=error,
),
400,
)
user_id = user.id
# Try to ensure Ghost membership inside this txn
try:
if not user.ghost_id:
await sync_member_to_ghost(s, user.id)
except Exception:
current_app.logger.exception(
"[auth] Ghost upsert failed for user_id=%s", user.id
)
raise
except Exception:
# Any DB/Ghost error → generic failure
return (
await render_template(
"_types/auth/login.html",
error="Could not sign you in right now. Please try again.",
),
502,
)
# At this point:
# - magic link is consumed
# - user_id is valid
# - Ghost membership is ensured or we already returned 502
assert user_id is not None # for type checkers / sanity
# Figure out any anonymous session we want to adopt
ident = current_cart_identity()
anon_session_id = ident.get("session_id")
# ---- Step 3: best-effort local update (non-fatal) ----
try:
async with get_session() as s:
async with s.begin():
u2 = await s.get(User, user_id)
if u2:
u2.last_login_at = now
# Emit adoption event inside this transaction
if anon_session_id:
await emit_activity(
s,
activity_type="rose:Login",
actor_uri="internal:system",
object_type="Person",
object_data={
"user_id": user_id,
"session_id": anon_session_id,
},
)
except SQLAlchemyError:
current_app.logger.exception(
"[auth] non-fatal DB update after Ghost upsert for user_id=%s", user_id
)
# ---- Finalize login ----
qsession[SESSION_USER_KEY] = user_id
# Redirect back to where they came from, if we stored it.
redirect_url = pop_login_redirect_target()
return redirect(redirect_url, 303)
@auth_bp.post("/newsletter/<int:newsletter_id>/toggle/")
async def toggle_newsletter(newsletter_id: int):
if not g.get("user"):
return "", 401
result = await g.s.execute(
select(UserNewsletter).where(
UserNewsletter.user_id == g.user.id,
UserNewsletter.newsletter_id == newsletter_id,
)
)
un = result.scalar_one_or_none()
if un:
un.subscribed = not un.subscribed
else:
un = UserNewsletter(
user_id=g.user.id,
newsletter_id=newsletter_id,
subscribed=True,
)
g.s.add(un)
await g.s.flush()
return await render_template(
"_types/auth/_newsletter_toggle.html",
un=un,
)
@auth_bp.post("/logout/")
async def logout():
qsession.pop(SESSION_USER_KEY, None)
return redirect(blog_url("/"))
return auth_bp

View File

@@ -1,24 +0,0 @@
from .login_redirect import pop_login_redirect_target, store_login_redirect_target
from .auth_operations import (
get_app_host,
get_app_root,
send_magic_email,
load_user_by_id,
find_or_create_user,
create_magic_link,
validate_magic_link,
validate_email,
)
__all__ = [
"pop_login_redirect_target",
"store_login_redirect_target",
"get_app_host",
"get_app_root",
"send_magic_email",
"load_user_by_id",
"find_or_create_user",
"create_magic_link",
"validate_magic_link",
"validate_email",
]

View File

@@ -1,236 +0,0 @@
from __future__ import annotations
import os
import secrets
from datetime import datetime, timedelta, timezone
from typing import Optional, Tuple
from quart import current_app, render_template, request, g
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from shared.models import User, MagicLink, UserNewsletter
from shared.config import config
def get_app_host() -> str:
"""Get the application host URL from config or environment."""
host = (
config().get("host") or os.getenv("APP_HOST") or "http://localhost:8000"
).rstrip("/")
return host
def get_app_root() -> str:
"""Get the application root path from request context."""
root = (g.root).rstrip("/")
return root
async def send_magic_email(to_email: str, link_url: str) -> None:
"""
Send magic link email via SMTP if configured, otherwise log to console.
Args:
to_email: Recipient email address
link_url: Magic link URL to include in email
Raises:
Exception: If SMTP sending fails
"""
host = os.getenv("SMTP_HOST")
port = int(os.getenv("SMTP_PORT") or "587")
username = os.getenv("SMTP_USER")
password = os.getenv("SMTP_PASS")
mail_from = os.getenv("MAIL_FROM") or "no-reply@example.com"
site_name = config().get("title", "Rose Ash")
subject = f"Your sign-in link — {site_name}"
tpl_vars = dict(site_name=site_name, link_url=link_url)
text_body = await render_template("_email/magic_link.txt", **tpl_vars)
html_body = await render_template("_email/magic_link.html", **tpl_vars)
if not host or not username or not password:
# Fallback: log to console
current_app.logger.warning(
"SMTP not configured. Printing magic link to console for %s: %s",
to_email,
link_url,
)
print(f"[DEV] Magic link for {to_email}: {link_url}")
return
# Lazy import to avoid dependency unless used
import aiosmtplib
from email.message import EmailMessage
msg = EmailMessage()
msg["From"] = mail_from
msg["To"] = to_email
msg["Subject"] = subject
msg.set_content(text_body)
msg.add_alternative(html_body, subtype="html")
is_secure = port == 465 # implicit TLS if true
if is_secure:
# implicit TLS (like nodemailer secure: true)
smtp = aiosmtplib.SMTP(
hostname=host,
port=port,
use_tls=True,
username=username,
password=password,
)
else:
# plain connect then STARTTLS (like secure: false but with TLS upgrade)
smtp = aiosmtplib.SMTP(
hostname=host,
port=port,
start_tls=True,
username=username,
password=password,
)
async with smtp:
await smtp.send_message(msg)
async def load_user_by_id(session: AsyncSession, user_id: int) -> Optional[User]:
"""
Load a user by ID with labels and newsletters eagerly loaded.
Args:
session: Database session
user_id: User ID to load
Returns:
User object or None if not found
"""
stmt = (
select(User)
.options(
selectinload(User.labels),
selectinload(User.user_newsletters).selectinload(
UserNewsletter.newsletter
),
)
.where(User.id == user_id)
)
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def find_or_create_user(session: AsyncSession, email: str) -> User:
"""
Find existing user by email or create a new one.
Args:
session: Database session
email: User email address (should be lowercase and trimmed)
Returns:
User object (either existing or newly created)
"""
result = await session.execute(select(User).where(User.email == email))
user = result.scalar_one_or_none()
if user is None:
user = User(email=email)
session.add(user)
await session.flush() # Ensure user.id exists
return user
async def create_magic_link(
session: AsyncSession,
user_id: int,
purpose: str = "signin",
expires_minutes: int = 15,
) -> Tuple[str, datetime]:
"""
Create a new magic link token for authentication.
Args:
session: Database session
user_id: User ID to create link for
purpose: Purpose of the link (default: "signin")
expires_minutes: Minutes until expiration (default: 15)
Returns:
Tuple of (token, expires_at)
"""
token = secrets.token_urlsafe(32)
expires = datetime.now(timezone.utc) + timedelta(minutes=expires_minutes)
ml = MagicLink(
token=token,
user_id=user_id,
purpose=purpose,
expires_at=expires,
ip=request.headers.get("x-forwarded-for", request.remote_addr),
user_agent=request.headers.get("user-agent"),
)
session.add(ml)
return token, expires
async def validate_magic_link(
session: AsyncSession,
token: str,
) -> Tuple[Optional[User], Optional[str]]:
"""
Validate and consume a magic link token.
Args:
session: Database session (should be in a transaction)
token: Magic link token to validate
Returns:
Tuple of (user, error_message)
- If user is None, error_message contains the reason
- If user is returned, the link was valid and has been consumed
"""
now = datetime.now(timezone.utc)
ml = await session.scalar(
select(MagicLink)
.where(MagicLink.token == token)
.with_for_update()
)
if not ml or ml.purpose != "signin":
return None, "Invalid or expired link."
if ml.used_at or ml.expires_at < now:
return None, "This link has expired. Please request a new one."
user = await session.get(User, ml.user_id)
if not user:
return None, "User not found."
# Mark link as used
ml.used_at = now
return user, None
def validate_email(email: str) -> Tuple[bool, str]:
"""
Validate email address format.
Args:
email: Email address to validate
Returns:
Tuple of (is_valid, normalized_email)
"""
email = email.strip().lower()
if not email or "@" not in email:
return False, email
return True, email

View File

@@ -1,45 +0,0 @@
from urllib.parse import urlparse
from quart import session
from shared.infrastructure.urls import blog_url
LOGIN_REDIRECT_SESSION_KEY = "login_redirect_to"
def store_login_redirect_target() -> None:
from quart import request
target = request.args.get("next")
if not target:
ref = request.referrer or ""
try:
parsed = urlparse(ref)
target = parsed.path or ""
except Exception:
target = ""
if not target:
return
# Accept both relative paths and absolute URLs (cross-app redirects)
if target.startswith("http://") or target.startswith("https://"):
session[LOGIN_REDIRECT_SESSION_KEY] = target
elif target.startswith("/") and not target.startswith("//"):
session[LOGIN_REDIRECT_SESSION_KEY] = target
def pop_login_redirect_target() -> str:
path = session.pop(LOGIN_REDIRECT_SESSION_KEY, None)
if not path or not isinstance(path, str):
return blog_url("/auth/")
# Absolute URL: return as-is (cross-app redirect)
if path.startswith("http://") or path.startswith("https://"):
return path
# Relative path: must start with / and not //
if path.startswith("/") and not path.startswith("//"):
return blog_url(path)
return blog_url("/auth/")