from __future__ import annotations import os import secrets from datetime import datetime, timedelta, timezone from typing import Optional, Tuple from quart import current_app, render_template, request, g from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from shared.models import User, MagicLink, UserNewsletter from shared.config import config def get_app_host() -> str: """Get the application host URL from config or environment.""" host = ( config().get("host") or os.getenv("APP_HOST") or "http://localhost:8000" ).rstrip("/") return host def get_app_root() -> str: """Get the application root path from request context.""" root = (g.root).rstrip("/") return root async def send_magic_email(to_email: str, link_url: str) -> None: """ Send magic link email via SMTP if configured, otherwise log to console. Args: to_email: Recipient email address link_url: Magic link URL to include in email Raises: Exception: If SMTP sending fails """ host = os.getenv("SMTP_HOST") port = int(os.getenv("SMTP_PORT") or "587") username = os.getenv("SMTP_USER") password = os.getenv("SMTP_PASS") mail_from = os.getenv("MAIL_FROM") or "no-reply@example.com" site_name = config().get("title", "Rose Ash") subject = f"Your sign-in link — {site_name}" tpl_vars = dict(site_name=site_name, link_url=link_url) text_body = await render_template("_email/magic_link.txt", **tpl_vars) html_body = await render_template("_email/magic_link.html", **tpl_vars) if not host or not username or not password: # Fallback: log to console current_app.logger.warning( "SMTP not configured. Printing magic link to console for %s: %s", to_email, link_url, ) print(f"[DEV] Magic link for {to_email}: {link_url}") return # Lazy import to avoid dependency unless used import aiosmtplib from email.message import EmailMessage msg = EmailMessage() msg["From"] = mail_from msg["To"] = to_email msg["Subject"] = subject msg.set_content(text_body) msg.add_alternative(html_body, subtype="html") is_secure = port == 465 # implicit TLS if true if is_secure: # implicit TLS (like nodemailer secure: true) smtp = aiosmtplib.SMTP( hostname=host, port=port, use_tls=True, username=username, password=password, ) else: # plain connect then STARTTLS (like secure: false but with TLS upgrade) smtp = aiosmtplib.SMTP( hostname=host, port=port, start_tls=True, username=username, password=password, ) async with smtp: await smtp.send_message(msg) async def load_user_by_id(session: AsyncSession, user_id: int) -> Optional[User]: """ Load a user by ID with labels and newsletters eagerly loaded. Args: session: Database session user_id: User ID to load Returns: User object or None if not found """ stmt = ( select(User) .options( selectinload(User.labels), selectinload(User.user_newsletters).selectinload( UserNewsletter.newsletter ), ) .where(User.id == user_id) ) result = await session.execute(stmt) return result.scalar_one_or_none() async def find_or_create_user(session: AsyncSession, email: str) -> User: """ Find existing user by email or create a new one. Args: session: Database session email: User email address (should be lowercase and trimmed) Returns: User object (either existing or newly created) """ result = await session.execute(select(User).where(User.email == email)) user = result.scalar_one_or_none() if user is None: user = User(email=email) session.add(user) await session.flush() # Ensure user.id exists return user async def create_magic_link( session: AsyncSession, user_id: int, purpose: str = "signin", expires_minutes: int = 15, ) -> Tuple[str, datetime]: """ Create a new magic link token for authentication. Args: session: Database session user_id: User ID to create link for purpose: Purpose of the link (default: "signin") expires_minutes: Minutes until expiration (default: 15) Returns: Tuple of (token, expires_at) """ token = secrets.token_urlsafe(32) expires = datetime.now(timezone.utc) + timedelta(minutes=expires_minutes) ml = MagicLink( token=token, user_id=user_id, purpose=purpose, expires_at=expires, ip=request.headers.get("x-forwarded-for", request.remote_addr), user_agent=request.headers.get("user-agent"), ) session.add(ml) return token, expires async def validate_magic_link( session: AsyncSession, token: str, ) -> Tuple[Optional[User], Optional[str]]: """ Validate and consume a magic link token. Args: session: Database session (should be in a transaction) token: Magic link token to validate Returns: Tuple of (user, error_message) - If user is None, error_message contains the reason - If user is returned, the link was valid and has been consumed """ now = datetime.now(timezone.utc) ml = await session.scalar( select(MagicLink) .where(MagicLink.token == token) .with_for_update() ) if not ml or ml.purpose != "signin": return None, "Invalid or expired link." if ml.used_at or ml.expires_at < now: return None, "This link has expired. Please request a new one." user = await session.get(User, ml.user_id) if not user: return None, "User not found." # Mark link as used ml.used_at = now return user, None def validate_email(email: str) -> Tuple[bool, str]: """ Validate email address format. Args: email: Email address to validate Returns: Tuple of (is_valid, normalized_email) """ email = email.strip().lower() if not email or "@" not in email: return False, email return True, email