"""Authentication routes for the account app. Account is the OAuth authorization server. Owns magic link login/logout, OAuth2 authorize endpoint, grant verification, and SSO logout. """ from __future__ import annotations import json import secrets from datetime import datetime, timezone, timedelta from quart import ( Blueprint, request, redirect, url_for, session as qsession, g, current_app, jsonify, ) from sqlalchemy import select, update from sqlalchemy.exc import SQLAlchemyError from shared.db.session import get_session from shared.models import User from shared.models.oauth_code import OAuthCode from shared.models.oauth_grant import OAuthGrant, hash_token from shared.infrastructure.urls import account_url, app_url from shared.infrastructure.cart_identity import current_cart_identity from shared.infrastructure.rate_limit import rate_limit, check_poll_backoff from shared.events import emit_activity from .services import ( pop_login_redirect_target, store_login_redirect_target, send_magic_email, find_or_create_user, create_magic_link, validate_magic_link, validate_email, ) SESSION_USER_KEY = "uid" ACCOUNT_SESSION_KEY = "account_sid" ALLOWED_CLIENTS = {"blog", "market", "cart", "events", "federation", "orders", "test", "artdag", "artdag_l2"} def register(url_prefix="/auth"): auth_bp = Blueprint("auth", __name__, url_prefix=url_prefix) # --- OAuth2 authorize endpoint ------------------------------------------- @auth_bp.get("/oauth/authorize") @auth_bp.get("/oauth/authorize/") async def oauth_authorize(): client_id = request.args.get("client_id", "") redirect_uri = request.args.get("redirect_uri", "") state = request.args.get("state", "") device_id = request.args.get("device_id", "") prompt = request.args.get("prompt", "") if client_id not in ALLOWED_CLIENTS: return "Invalid client_id", 400 expected_redirect = app_url(client_id, "/auth/callback") if redirect_uri != expected_redirect: return "Invalid redirect_uri", 400 # Account's own device id — always available via factory hook account_did = g.device_id # Not logged in if not g.get("user"): if prompt == "none": # Silent check — pass account_did so client can watch for future logins sep = "&" if "?" in redirect_uri else "?" return redirect( f"{redirect_uri}{sep}error=login_required" f"&state={state}&account_did={account_did}" ) authorize_path = request.full_path store_login_redirect_target() return redirect(url_for("auth.login_form", next=authorize_path)) # Logged in — create grant + authorization code account_sid = qsession.get(ACCOUNT_SESSION_KEY) if not account_sid: account_sid = secrets.token_urlsafe(32) qsession[ACCOUNT_SESSION_KEY] = account_sid grant_token = secrets.token_urlsafe(48) code = secrets.token_urlsafe(48) now = datetime.now(timezone.utc) expires = now + timedelta(minutes=5) async with get_session() as s: async with s.begin(): grant = OAuthGrant( token=None, token_hash=hash_token(grant_token), user_id=g.user.id, client_id=client_id, issuer_session=account_sid, device_id=device_id or None, ) s.add(grant) oauth_code = OAuthCode( code=None, code_hash=hash_token(code), user_id=g.user.id, client_id=client_id, redirect_uri=redirect_uri, expires_at=expires, grant_token=None, grant_token_hash=hash_token(grant_token), ) s.add(oauth_code) sep = "&" if "?" in redirect_uri else "?" return redirect( f"{redirect_uri}{sep}code={code}&state={state}" f"&account_did={account_did}&grant_token={grant_token}" ) # --- OAuth2 token exchange (for external clients like artdag) ------------- from shared.browser.app.csrf import csrf_exempt @csrf_exempt @auth_bp.post("/oauth/token") @auth_bp.post("/oauth/token/") async def oauth_token(): """Exchange an authorization code for user info + grant token. Used by clients that don't share the coop database (e.g. artdag). Accepts JSON: {code, client_id, redirect_uri} Returns JSON: {user_id, username, display_name, grant_token} """ data = await request.get_json() if not data: return jsonify({"error": "invalid_request"}), 400 code = data.get("code", "") client_id = data.get("client_id", "") redirect_uri = data.get("redirect_uri", "") if client_id not in ALLOWED_CLIENTS: return jsonify({"error": "invalid_client"}), 400 now = datetime.now(timezone.utc) code_h = hash_token(code) async with get_session() as s: async with s.begin(): # Look up by hash first (new grants), fall back to plaintext (migration) result = await s.execute( select(OAuthCode) .where( (OAuthCode.code_hash == code_h) | (OAuthCode.code == code) ) .with_for_update() ) oauth_code = result.scalar_one_or_none() if not oauth_code: return jsonify({"error": "invalid_grant"}), 400 if oauth_code.used_at is not None: return jsonify({"error": "invalid_grant"}), 400 if oauth_code.expires_at < now: return jsonify({"error": "invalid_grant"}), 400 if oauth_code.client_id != client_id: return jsonify({"error": "invalid_grant"}), 400 if oauth_code.redirect_uri != redirect_uri: return jsonify({"error": "invalid_grant"}), 400 oauth_code.used_at = now user_id = oauth_code.user_id grant_token = oauth_code.grant_token user = await s.get(User, user_id) if not user: return jsonify({"error": "invalid_grant"}), 400 return jsonify({ "user_id": user_id, "username": user.email or "", "display_name": user.name or "", "grant_token": grant_token, }) # --- Grant verification (internal endpoint) ------------------------------ @auth_bp.get("/internal/verify-grant") async def verify_grant(): """Called by client apps to check if a grant is still valid.""" token = request.args.get("token", "") if not token: return jsonify({"valid": False}), 200 token_h = hash_token(token) async with get_session() as s: grant = await s.scalar( select(OAuthGrant).where( (OAuthGrant.token_hash == token_h) | (OAuthGrant.token == token) ) ) if not grant or grant.revoked_at is not None: return jsonify({"valid": False}), 200 user = await s.get(User, grant.user_id) return jsonify({ "valid": True, "user_id": grant.user_id, "username": user.email if user else "", "display_name": user.name if user else "", }), 200 @auth_bp.get("/internal/check-device") async def check_device(): """Called by client apps to check if a device has an active auth. Looks up the most recent grant for (device_id, client_id). If the grant is active → {active: true}. If revoked but user has logged in since → {active: true} (re-auth needed). Otherwise → {active: false}. """ device_id = request.args.get("device_id", "") app_name = request.args.get("app", "") if not device_id or not app_name: return jsonify({"active": False}), 200 async with get_session() as s: # Find the most recent grant for this device + app result = await s.execute( select(OAuthGrant) .where(OAuthGrant.device_id == device_id) .where(OAuthGrant.client_id == app_name) .order_by(OAuthGrant.created_at.desc()) .limit(1) ) grant = result.scalar_one_or_none() if not grant: return jsonify({"active": False}), 200 # Grant still active if grant.revoked_at is None: return jsonify({"active": True}), 200 # Grant revoked — check if user logged in since user = await s.get(User, grant.user_id) if user and user.last_login_at and user.last_login_at > grant.revoked_at: return jsonify({"active": True}), 200 return jsonify({"active": False}), 200 # --- Magic link login flow ----------------------------------------------- @auth_bp.get("/login/") async def login_form(): store_login_redirect_target() cross_cart_sid = request.args.get("cart_sid") if cross_cart_sid: import re # Validate cart_sid is a hex token (32 chars from token_hex(16)) if re.fullmatch(r"[0-9a-f]{32}", cross_cart_sid): qsession["cart_sid"] = cross_cart_sid if g.get("user"): redirect_url = pop_login_redirect_target() return redirect(redirect_url) from shared.sx.page import get_template_context from sx.sx_components import render_login_page ctx = await get_template_context() return await render_login_page(ctx) @rate_limit( key_func=lambda: request.headers.get("X-Forwarded-For", request.remote_addr), max_requests=10, window_seconds=900, scope="magic_ip", ) @auth_bp.post("/start/") async def start_login(): form = await request.form email_input = form.get("email") or "" is_valid, email = validate_email(email_input) if not is_valid: from shared.sx.page import get_template_context from sx.sx_components import render_login_page ctx = await get_template_context(error="Please enter a valid email address.", email=email_input) return await render_login_page(ctx), 400 # Per-email rate limit: 5 magic links per 15 minutes from shared.infrastructure.rate_limit import _check_rate_limit try: allowed, _ = await _check_rate_limit(f"magic_email:{email}", 5, 900) if not allowed: from shared.sx.page import get_template_context from sx.sx_components import render_check_email_page ctx = await get_template_context(email=email, email_error=None) return await render_check_email_page(ctx), 200 except Exception: pass # Redis down — allow the request user = await find_or_create_user(g.s, email) token, expires = await create_magic_link(g.s, user.id) from shared.utils import host_url magic_url = host_url(url_for("auth.magic", token=token)) email_error = None try: await send_magic_email(email, magic_url) except Exception as e: current_app.logger.error("EMAIL SEND FAILED: %r", e) email_error = ( "We couldn't send the email automatically. " "Please try again in a moment." ) from shared.sx.page import get_template_context from sx.sx_components import render_check_email_page ctx = await get_template_context(email=email, email_error=email_error) return await render_check_email_page(ctx) @auth_bp.get("/magic//") async def magic(token: str): now = datetime.now(timezone.utc) user_id: int | None = None try: async with get_session() as s: async with s.begin(): user, error = await validate_magic_link(s, token) if error: from shared.sx.page import get_template_context from sx.sx_components import render_login_page ctx = await get_template_context(error=error) return await render_login_page(ctx), 400 user_id = user.id except Exception: from shared.sx.page import get_template_context from sx.sx_components import render_login_page ctx = await get_template_context(error="Could not sign you in right now. Please try again.") return await render_login_page(ctx), 502 assert user_id is not None ident = current_cart_identity() anon_session_id = ident.get("session_id") try: async with get_session() as s: async with s.begin(): u2 = await s.get(User, user_id) if u2: u2.last_login_at = now if anon_session_id: await emit_activity( s, activity_type="rose:Login", actor_uri="internal:system", object_type="Person", object_data={ "user_id": user_id, "session_id": anon_session_id, }, ) # Notify external services of device login await emit_activity( s, activity_type="rose:DeviceAuth", actor_uri="internal:system", object_type="Device", object_data={ "device_id": g.device_id, "action": "login", }, ) except SQLAlchemyError: current_app.logger.exception( "[auth] non-fatal DB update for user_id=%s", user_id ) qsession[SESSION_USER_KEY] = user_id # Fresh account session ID for grant tracking qsession[ACCOUNT_SESSION_KEY] = secrets.token_urlsafe(32) # Signal login for this device so client apps can detect it try: from shared.infrastructure.auth_redis import get_auth_redis import time as _time _auth_r = await get_auth_redis() await _auth_r.set( f"did_auth:{g.device_id}", str(_time.time()).encode(), ex=30 * 24 * 3600, ) except Exception: current_app.logger.exception("[auth] failed to set did_auth in Redis") redirect_url = pop_login_redirect_target() return redirect(redirect_url, 303) @auth_bp.post("/logout/") async def logout(): # Revoke all grants issued by this account session account_sid = qsession.get(ACCOUNT_SESSION_KEY) if account_sid: try: async with get_session() as s: async with s.begin(): await s.execute( update(OAuthGrant) .where(OAuthGrant.issuer_session == account_sid) .where(OAuthGrant.revoked_at.is_(None)) .values(revoked_at=datetime.now(timezone.utc)) ) except SQLAlchemyError: current_app.logger.exception("[auth] failed to revoke grants") # Clear login signal for this device try: from shared.infrastructure.auth_redis import get_auth_redis _auth_r = await get_auth_redis() await _auth_r.delete(f"did_auth:{g.device_id}") except Exception: pass # Notify external services of device logout try: async with get_session() as s: async with s.begin(): await emit_activity( s, activity_type="rose:DeviceAuth", actor_uri="internal:system", object_type="Device", object_data={ "device_id": g.device_id, "action": "logout", }, ) except Exception: current_app.logger.exception("[auth] failed to emit DeviceAuth logout") qsession.pop(SESSION_USER_KEY, None) qsession.pop(ACCOUNT_SESSION_KEY, None) from shared.infrastructure.urls import blog_url return redirect(blog_url("/")) @auth_bp.get("/sso-logout/") async def sso_logout(): """SSO logout called by client apps: revoke grants, clear session.""" account_sid = qsession.get(ACCOUNT_SESSION_KEY) if account_sid: try: async with get_session() as s: async with s.begin(): await s.execute( update(OAuthGrant) .where(OAuthGrant.issuer_session == account_sid) .where(OAuthGrant.revoked_at.is_(None)) .values(revoked_at=datetime.now(timezone.utc)) ) except SQLAlchemyError: current_app.logger.exception("[auth] failed to revoke grants") # Clear login signal for this device try: from shared.infrastructure.auth_redis import get_auth_redis _auth_r = await get_auth_redis() await _auth_r.delete(f"did_auth:{g.device_id}") except Exception: pass # Notify external services of device logout try: async with get_session() as s: async with s.begin(): await emit_activity( s, activity_type="rose:DeviceAuth", actor_uri="internal:system", object_type="Device", object_data={ "device_id": g.device_id, "action": "logout", }, ) except Exception: current_app.logger.exception("[auth] failed to emit DeviceAuth logout") qsession.pop(SESSION_USER_KEY, None) qsession.pop(ACCOUNT_SESSION_KEY, None) from shared.infrastructure.urls import blog_url return redirect(blog_url("/")) @auth_bp.get("/clear/") async def clear(): """One-time migration helper: clear all session cookies.""" qsession.clear() resp = redirect(account_url("/")) resp.delete_cookie("blog_session", domain=".rose-ash.com", path="/") return resp # --- Device Authorization Flow (RFC 8628) --------------------------------- _DEVICE_ALPHABET = "ABCDEFGHJKMNPQRSTVWXYZ" _DEVICE_CODE_TTL = 900 # 15 minutes _DEVICE_POLL_INTERVAL = 5 def _generate_user_code() -> str: """Generate an unambiguous 8-char user code like KBMN-TWRP.""" chars = [secrets.choice(_DEVICE_ALPHABET) for _ in range(8)] return "".join(chars[:4]) + "-" + "".join(chars[4:]) async def _approve_device(device_code: str, user) -> bool: """Approve a pending device flow and create an OAuthGrant.""" from shared.infrastructure.auth_redis import get_auth_redis r = await get_auth_redis() raw = await r.get(f"devflow:{device_code}") if not raw: return False blob = json.loads(raw) if blob.get("status") != "pending": return False account_sid = qsession.get(ACCOUNT_SESSION_KEY) if not account_sid: account_sid = secrets.token_urlsafe(32) qsession[ACCOUNT_SESSION_KEY] = account_sid grant_token = secrets.token_urlsafe(48) async with get_session() as s: async with s.begin(): grant = OAuthGrant( token=None, token_hash=hash_token(grant_token), user_id=user.id, client_id=blob["client_id"], issuer_session=account_sid, ) s.add(grant) # Update Redis blob blob["status"] = "approved" blob["user_id"] = user.id blob["grant_token"] = grant_token user_code = blob["user_code"] ttl = await r.ttl(f"devflow:{device_code}") if ttl and ttl > 0: await r.set(f"devflow:{device_code}", json.dumps(blob).encode(), ex=ttl) else: await r.set(f"devflow:{device_code}", json.dumps(blob).encode(), ex=_DEVICE_CODE_TTL) # Remove reverse lookup (code already used) normalized_uc = user_code.replace("-", "").upper() await r.delete(f"devflow_uc:{normalized_uc}") return True @rate_limit( key_func=lambda: request.headers.get("X-Forwarded-For", request.remote_addr), max_requests=10, window_seconds=3600, scope="dev_auth", ) @csrf_exempt @auth_bp.post("/device/authorize") @auth_bp.post("/device/authorize/") async def device_authorize(): """RFC 8628 — CLI requests a device code.""" data = await request.get_json(silent=True) or {} client_id = data.get("client_id", "") if client_id not in ALLOWED_CLIENTS: return jsonify({"error": "invalid_client"}), 400 device_code = secrets.token_urlsafe(32) user_code = _generate_user_code() from shared.infrastructure.auth_redis import get_auth_redis r = await get_auth_redis() blob = json.dumps({ "client_id": client_id, "user_code": user_code, "status": "pending", "user_id": None, "grant_token": None, }).encode() normalized_uc = user_code.replace("-", "").upper() pipe = r.pipeline() pipe.set(f"devflow:{device_code}", blob, ex=_DEVICE_CODE_TTL) pipe.set(f"devflow_uc:{normalized_uc}", device_code.encode(), ex=_DEVICE_CODE_TTL) await pipe.execute() verification_uri = account_url("/auth/device") return jsonify({ "device_code": device_code, "user_code": user_code, "verification_uri": verification_uri, "expires_in": _DEVICE_CODE_TTL, "interval": _DEVICE_POLL_INTERVAL, }) @csrf_exempt @auth_bp.post("/device/token") @auth_bp.post("/device/token/") async def device_token(): """RFC 8628 — CLI polls for the grant token.""" data = await request.get_json(silent=True) or {} device_code = data.get("device_code", "") client_id = data.get("client_id", "") if not device_code or client_id not in ALLOWED_CLIENTS: return jsonify({"error": "invalid_request"}), 400 # Enforce polling backoff per RFC 8628 try: poll_ok, interval = await check_poll_backoff(device_code) if not poll_ok: return jsonify({"error": "slow_down", "interval": interval}), 400 except Exception: pass # Redis down — allow the request from shared.infrastructure.auth_redis import get_auth_redis r = await get_auth_redis() raw = await r.get(f"devflow:{device_code}") if not raw: return jsonify({"error": "expired_token"}), 400 blob = json.loads(raw) if blob.get("client_id") != client_id: return jsonify({"error": "invalid_request"}), 400 if blob["status"] == "pending": return jsonify({"error": "authorization_pending"}), 428 if blob["status"] == "denied": return jsonify({"error": "access_denied"}), 400 if blob["status"] == "approved": async with get_session() as s: user = await s.get(User, blob["user_id"]) if not user: return jsonify({"error": "access_denied"}), 400 # Clean up Redis await r.delete(f"devflow:{device_code}") return jsonify({ "access_token": blob["grant_token"], "token_type": "bearer", "user_id": blob["user_id"], "username": user.email or "", "display_name": user.name or "", }) return jsonify({"error": "invalid_request"}), 400 @auth_bp.get("/device") @auth_bp.get("/device/") async def device_form(): """Browser form where user enters the code displayed in terminal.""" from shared.sx.page import get_template_context from sx.sx_components import render_device_page code = request.args.get("code", "") ctx = await get_template_context(code=code) return await render_device_page(ctx) @auth_bp.post("/device") @auth_bp.post("/device/") async def device_submit(): """Browser submit — validates code, approves if logged in.""" form = await request.form user_code = (form.get("code") or "").strip().replace("-", "").upper() if not user_code or len(user_code) != 8: from shared.sx.page import get_template_context from sx.sx_components import render_device_page ctx = await get_template_context(error="Please enter a valid 8-character code.", code=form.get("code", "")) return await render_device_page(ctx), 400 from shared.infrastructure.auth_redis import get_auth_redis r = await get_auth_redis() device_code = await r.get(f"devflow_uc:{user_code}") if not device_code: from shared.sx.page import get_template_context from sx.sx_components import render_device_page ctx = await get_template_context(error="Code not found or expired. Please try again.", code=form.get("code", "")) return await render_device_page(ctx), 400 if isinstance(device_code, bytes): device_code = device_code.decode() # Not logged in — redirect to login, then come back to complete if not g.get("user"): complete_url = url_for("auth.device_complete", code=device_code) store_login_redirect_target() return redirect(url_for("auth.login_form", next=complete_url)) # Logged in — approve immediately ok = await _approve_device(device_code, g.user) if not ok: from shared.sx.page import get_template_context from sx.sx_components import render_device_page ctx = await get_template_context(error="Code expired or already used.") return await render_device_page(ctx), 400 from shared.sx.page import get_template_context from sx.sx_components import render_device_approved_page ctx = await get_template_context() return await render_device_approved_page(ctx) @auth_bp.get("/device/complete") @auth_bp.get("/device/complete/") async def device_complete(): """Post-login redirect — completes approval after magic link auth.""" from shared.sx.page import get_template_context from sx.sx_components import render_device_page, render_device_approved_page device_code = request.args.get("code", "") if not device_code: return redirect(url_for("auth.device_form")) if not g.get("user"): store_login_redirect_target() return redirect(url_for("auth.login_form")) ok = await _approve_device(device_code, g.user) if not ok: ctx = await get_template_context( error="Code expired or already used. Please start the login process again in your terminal.", ) return await render_device_page(ctx), 400 ctx = await get_template_context() return await render_device_approved_page(ctx) return auth_bp