This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
blog/bp/auth/services/auth_operations.py
giles 8f7a15186c
Some checks failed
Build and Deploy / build-and-deploy (push) Has been cancelled
feat: initialize blog app with blueprints and templates
Extract blog-specific code from the coop monolith into a standalone
repository. Includes auth, blog, post, admin, menu_items, snippets
blueprints, associated templates, Dockerfile (APP_MODULE=app:app),
entrypoint, and Gitea CI workflow.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 23:15:56 +00:00

240 lines
6.1 KiB
Python

from __future__ import annotations
import os
import secrets
from datetime import datetime, timedelta, timezone
from typing import Optional, Tuple
from quart import current_app, request, g
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from models import User, MagicLink, UserNewsletter
from config import config
def get_app_host() -> str:
"""Get the application host URL from config or environment."""
host = (
config().get("host") or os.getenv("APP_HOST") or "http://localhost:8000"
).rstrip("/")
return host
def get_app_root() -> str:
"""Get the application root path from request context."""
root = (g.root).rstrip("/")
return root
async def send_magic_email(to_email: str, link_url: str) -> None:
"""
Send magic link email via SMTP if configured, otherwise log to console.
Args:
to_email: Recipient email address
link_url: Magic link URL to include in email
Raises:
Exception: If SMTP sending fails
"""
host = os.getenv("SMTP_HOST")
port = int(os.getenv("SMTP_PORT") or "587")
username = os.getenv("SMTP_USER")
password = os.getenv("SMTP_PASS")
mail_from = os.getenv("MAIL_FROM") or "no-reply@example.com"
subject = "Your sign-in link"
body = f"""Hello,
Click this link to sign in:
{link_url}
This link will expire in 15 minutes.
If you did not request this, you can ignore this email.
"""
if not host or not username or not password:
# Fallback: log to console
current_app.logger.warning(
"SMTP not configured. Printing magic link to console for %s: %s",
to_email,
link_url,
)
print(f"[DEV] Magic link for {to_email}: {link_url}")
return
# Lazy import to avoid dependency unless used
import aiosmtplib
from email.message import EmailMessage
msg = EmailMessage()
msg["From"] = mail_from
msg["To"] = to_email
msg["Subject"] = subject
msg.set_content(body)
is_secure = port == 465 # implicit TLS if true
if is_secure:
# implicit TLS (like nodemailer secure: true)
smtp = aiosmtplib.SMTP(
hostname=host,
port=port,
use_tls=True,
username=username,
password=password,
)
else:
# plain connect then STARTTLS (like secure: false but with TLS upgrade)
smtp = aiosmtplib.SMTP(
hostname=host,
port=port,
start_tls=True,
username=username,
password=password,
)
async with smtp:
await smtp.send_message(msg)
async def load_user_by_id(session: AsyncSession, user_id: int) -> Optional[User]:
"""
Load a user by ID with labels and newsletters eagerly loaded.
Args:
session: Database session
user_id: User ID to load
Returns:
User object or None if not found
"""
stmt = (
select(User)
.options(
selectinload(User.labels),
selectinload(User.user_newsletters).selectinload(
UserNewsletter.newsletter
),
)
.where(User.id == user_id)
)
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def find_or_create_user(session: AsyncSession, email: str) -> User:
"""
Find existing user by email or create a new one.
Args:
session: Database session
email: User email address (should be lowercase and trimmed)
Returns:
User object (either existing or newly created)
"""
result = await session.execute(select(User).where(User.email == email))
user = result.scalar_one_or_none()
if user is None:
user = User(email=email)
session.add(user)
await session.flush() # Ensure user.id exists
return user
async def create_magic_link(
session: AsyncSession,
user_id: int,
purpose: str = "signin",
expires_minutes: int = 15,
) -> Tuple[str, datetime]:
"""
Create a new magic link token for authentication.
Args:
session: Database session
user_id: User ID to create link for
purpose: Purpose of the link (default: "signin")
expires_minutes: Minutes until expiration (default: 15)
Returns:
Tuple of (token, expires_at)
"""
token = secrets.token_urlsafe(32)
expires = datetime.now(timezone.utc) + timedelta(minutes=expires_minutes)
ml = MagicLink(
token=token,
user_id=user_id,
purpose=purpose,
expires_at=expires,
ip=request.headers.get("x-forwarded-for", request.remote_addr),
user_agent=request.headers.get("user-agent"),
)
session.add(ml)
return token, expires
async def validate_magic_link(
session: AsyncSession,
token: str,
) -> Tuple[Optional[User], Optional[str]]:
"""
Validate and consume a magic link token.
Args:
session: Database session (should be in a transaction)
token: Magic link token to validate
Returns:
Tuple of (user, error_message)
- If user is None, error_message contains the reason
- If user is returned, the link was valid and has been consumed
"""
now = datetime.now(timezone.utc)
ml = await session.scalar(
select(MagicLink)
.where(MagicLink.token == token)
.with_for_update()
)
if not ml or ml.purpose != "signin":
return None, "Invalid or expired link."
if ml.used_at or ml.expires_at < now:
return None, "This link has expired. Please request a new one."
user = await session.get(User, ml.user_id)
if not user:
return None, "User not found."
# Mark link as used
ml.used_at = now
return user, None
def validate_email(email: str) -> Tuple[bool, str]:
"""
Validate email address format.
Args:
email: Email address to validate
Returns:
Tuple of (is_valid, normalized_email)
"""
email = email.strip().lower()
if not email or "@" not in email:
return False, email
return True, email