All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 49s
Extract inline email body into separate Jinja2 templates (_email/magic_link.html and .txt) with a styled sign-in button and fallback plain-text link. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
237 lines
6.3 KiB
Python
237 lines
6.3 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import secrets
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Optional, Tuple
|
|
|
|
from quart import current_app, render_template, request, g
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from shared.models import User, MagicLink, UserNewsletter
|
|
from shared.config import config
|
|
|
|
|
|
def get_app_host() -> str:
|
|
"""Get the application host URL from config or environment."""
|
|
host = (
|
|
config().get("host") or os.getenv("APP_HOST") or "http://localhost:8000"
|
|
).rstrip("/")
|
|
return host
|
|
|
|
|
|
def get_app_root() -> str:
|
|
"""Get the application root path from request context."""
|
|
root = (g.root).rstrip("/")
|
|
return root
|
|
|
|
|
|
async def send_magic_email(to_email: str, link_url: str) -> None:
|
|
"""
|
|
Send magic link email via SMTP if configured, otherwise log to console.
|
|
|
|
Args:
|
|
to_email: Recipient email address
|
|
link_url: Magic link URL to include in email
|
|
|
|
Raises:
|
|
Exception: If SMTP sending fails
|
|
"""
|
|
host = os.getenv("SMTP_HOST")
|
|
port = int(os.getenv("SMTP_PORT") or "587")
|
|
username = os.getenv("SMTP_USER")
|
|
password = os.getenv("SMTP_PASS")
|
|
mail_from = os.getenv("MAIL_FROM") or "no-reply@example.com"
|
|
|
|
site_name = config().get("title", "Rose Ash")
|
|
subject = f"Your sign-in link — {site_name}"
|
|
|
|
tpl_vars = dict(site_name=site_name, link_url=link_url)
|
|
text_body = await render_template("_email/magic_link.txt", **tpl_vars)
|
|
html_body = await render_template("_email/magic_link.html", **tpl_vars)
|
|
|
|
if not host or not username or not password:
|
|
# Fallback: log to console
|
|
current_app.logger.warning(
|
|
"SMTP not configured. Printing magic link to console for %s: %s",
|
|
to_email,
|
|
link_url,
|
|
)
|
|
print(f"[DEV] Magic link for {to_email}: {link_url}")
|
|
return
|
|
|
|
# Lazy import to avoid dependency unless used
|
|
import aiosmtplib
|
|
from email.message import EmailMessage
|
|
|
|
msg = EmailMessage()
|
|
msg["From"] = mail_from
|
|
msg["To"] = to_email
|
|
msg["Subject"] = subject
|
|
msg.set_content(text_body)
|
|
msg.add_alternative(html_body, subtype="html")
|
|
|
|
is_secure = port == 465 # implicit TLS if true
|
|
if is_secure:
|
|
# implicit TLS (like nodemailer secure: true)
|
|
smtp = aiosmtplib.SMTP(
|
|
hostname=host,
|
|
port=port,
|
|
use_tls=True,
|
|
username=username,
|
|
password=password,
|
|
)
|
|
else:
|
|
# plain connect then STARTTLS (like secure: false but with TLS upgrade)
|
|
smtp = aiosmtplib.SMTP(
|
|
hostname=host,
|
|
port=port,
|
|
start_tls=True,
|
|
username=username,
|
|
password=password,
|
|
)
|
|
|
|
async with smtp:
|
|
await smtp.send_message(msg)
|
|
|
|
|
|
async def load_user_by_id(session: AsyncSession, user_id: int) -> Optional[User]:
|
|
"""
|
|
Load a user by ID with labels and newsletters eagerly loaded.
|
|
|
|
Args:
|
|
session: Database session
|
|
user_id: User ID to load
|
|
|
|
Returns:
|
|
User object or None if not found
|
|
"""
|
|
stmt = (
|
|
select(User)
|
|
.options(
|
|
selectinload(User.labels),
|
|
selectinload(User.user_newsletters).selectinload(
|
|
UserNewsletter.newsletter
|
|
),
|
|
)
|
|
.where(User.id == user_id)
|
|
)
|
|
result = await session.execute(stmt)
|
|
return result.scalar_one_or_none()
|
|
|
|
|
|
async def find_or_create_user(session: AsyncSession, email: str) -> User:
|
|
"""
|
|
Find existing user by email or create a new one.
|
|
|
|
Args:
|
|
session: Database session
|
|
email: User email address (should be lowercase and trimmed)
|
|
|
|
Returns:
|
|
User object (either existing or newly created)
|
|
"""
|
|
result = await session.execute(select(User).where(User.email == email))
|
|
user = result.scalar_one_or_none()
|
|
|
|
if user is None:
|
|
user = User(email=email)
|
|
session.add(user)
|
|
await session.flush() # Ensure user.id exists
|
|
|
|
return user
|
|
|
|
|
|
async def create_magic_link(
|
|
session: AsyncSession,
|
|
user_id: int,
|
|
purpose: str = "signin",
|
|
expires_minutes: int = 15,
|
|
) -> Tuple[str, datetime]:
|
|
"""
|
|
Create a new magic link token for authentication.
|
|
|
|
Args:
|
|
session: Database session
|
|
user_id: User ID to create link for
|
|
purpose: Purpose of the link (default: "signin")
|
|
expires_minutes: Minutes until expiration (default: 15)
|
|
|
|
Returns:
|
|
Tuple of (token, expires_at)
|
|
"""
|
|
token = secrets.token_urlsafe(32)
|
|
expires = datetime.now(timezone.utc) + timedelta(minutes=expires_minutes)
|
|
|
|
ml = MagicLink(
|
|
token=token,
|
|
user_id=user_id,
|
|
purpose=purpose,
|
|
expires_at=expires,
|
|
ip=request.headers.get("x-forwarded-for", request.remote_addr),
|
|
user_agent=request.headers.get("user-agent"),
|
|
)
|
|
session.add(ml)
|
|
|
|
return token, expires
|
|
|
|
|
|
async def validate_magic_link(
|
|
session: AsyncSession,
|
|
token: str,
|
|
) -> Tuple[Optional[User], Optional[str]]:
|
|
"""
|
|
Validate and consume a magic link token.
|
|
|
|
Args:
|
|
session: Database session (should be in a transaction)
|
|
token: Magic link token to validate
|
|
|
|
Returns:
|
|
Tuple of (user, error_message)
|
|
- If user is None, error_message contains the reason
|
|
- If user is returned, the link was valid and has been consumed
|
|
"""
|
|
now = datetime.now(timezone.utc)
|
|
|
|
ml = await session.scalar(
|
|
select(MagicLink)
|
|
.where(MagicLink.token == token)
|
|
.with_for_update()
|
|
)
|
|
|
|
if not ml or ml.purpose != "signin":
|
|
return None, "Invalid or expired link."
|
|
|
|
if ml.used_at or ml.expires_at < now:
|
|
return None, "This link has expired. Please request a new one."
|
|
|
|
user = await session.get(User, ml.user_id)
|
|
if not user:
|
|
return None, "User not found."
|
|
|
|
# Mark link as used
|
|
ml.used_at = now
|
|
|
|
return user, None
|
|
|
|
|
|
def validate_email(email: str) -> Tuple[bool, str]:
|
|
"""
|
|
Validate email address format.
|
|
|
|
Args:
|
|
email: Email address to validate
|
|
|
|
Returns:
|
|
Tuple of (is_valid, normalized_email)
|
|
"""
|
|
email = email.strip().lower()
|
|
|
|
if not email or "@" not in email:
|
|
return False, email
|
|
|
|
return True, email
|