"""Auth operations for the account app. Owns magic-link login. Shared models, shared config. """ from __future__ import annotations import os import secrets from datetime import datetime, timedelta, timezone from typing import Optional, Tuple from quart import current_app, render_template, request, g from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from shared.models import User, MagicLink from shared.config import config def get_app_host() -> str: host = ( config().get("host") or os.getenv("APP_HOST") or "http://localhost:8000" ).rstrip("/") return host def get_app_root() -> str: root = (g.root).rstrip("/") return root async def send_magic_email(to_email: str, link_url: str) -> None: host = os.getenv("SMTP_HOST") port = int(os.getenv("SMTP_PORT") or "587") username = os.getenv("SMTP_USER") password = os.getenv("SMTP_PASS") mail_from = os.getenv("MAIL_FROM") or "no-reply@example.com" site_name = config().get("title", "Rose Ash") subject = f"Your sign-in link \u2014 {site_name}" tpl_vars = dict(site_name=site_name, link_url=link_url) text_body = await render_template("_email/magic_link.txt", **tpl_vars) html_body = await render_template("_email/magic_link.html", **tpl_vars) if not host or not username or not password: current_app.logger.warning( "SMTP not configured. Printing magic link to console for %s: %s", to_email, link_url, ) print(f"[DEV] Magic link for {to_email}: {link_url}") return import aiosmtplib from email.message import EmailMessage msg = EmailMessage() msg["From"] = mail_from msg["To"] = to_email msg["Subject"] = subject msg.set_content(text_body) msg.add_alternative(html_body, subtype="html") is_secure = port == 465 if is_secure: smtp = aiosmtplib.SMTP( hostname=host, port=port, use_tls=True, username=username, password=password, ) else: smtp = aiosmtplib.SMTP( hostname=host, port=port, start_tls=True, username=username, password=password, ) async with smtp: await smtp.send_message(msg) async def load_user_by_id(session: AsyncSession, user_id: int) -> Optional[User]: stmt = ( select(User) .options(selectinload(User.labels)) .where(User.id == user_id) ) result = await session.execute(stmt) return result.scalar_one_or_none() async def find_or_create_user(session: AsyncSession, email: str) -> User: result = await session.execute(select(User).where(User.email == email)) user = result.scalar_one_or_none() if user is None: user = User(email=email) session.add(user) await session.flush() return user async def create_magic_link( session: AsyncSession, user_id: int, purpose: str = "signin", expires_minutes: int = 15, ) -> Tuple[str, datetime]: token = secrets.token_urlsafe(32) expires = datetime.now(timezone.utc) + timedelta(minutes=expires_minutes) ml = MagicLink( token=token, user_id=user_id, purpose=purpose, expires_at=expires, ip=request.headers.get("x-forwarded-for", request.remote_addr), user_agent=request.headers.get("user-agent"), ) session.add(ml) return token, expires async def validate_magic_link( session: AsyncSession, token: str, ) -> Tuple[Optional[User], Optional[str]]: now = datetime.now(timezone.utc) ml = await session.scalar( select(MagicLink) .where(MagicLink.token == token) .with_for_update() ) if not ml or ml.purpose != "signin": return None, "Invalid or expired link." if ml.used_at or ml.expires_at < now: return None, "This link has expired. Please request a new one." user = await session.get(User, ml.user_id) if not user: return None, "User not found." ml.used_at = now return user, None def validate_email(email: str) -> Tuple[bool, str]: email = email.strip().lower() if not email or "@" not in email: return False, email return True, email