This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
account/bp/auth/routes.py
giles 3e0669a335
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 48s
Exempt oauth/token from CSRF — server-to-server API endpoint
External clients like artdag POST to this endpoint from their
backend, so there's no browser session with a CSRF token.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 01:06:19 +00:00

487 lines
17 KiB
Python

"""Authentication routes for the account app.
Account is the OAuth authorization server. Owns magic link login/logout,
OAuth2 authorize endpoint, grant verification, and SSO logout.
"""
from __future__ import annotations
import secrets
from datetime import datetime, timezone, timedelta
from quart import (
Blueprint,
request,
render_template,
redirect,
url_for,
session as qsession,
g,
current_app,
jsonify,
)
from sqlalchemy import select, update
from sqlalchemy.exc import SQLAlchemyError
from shared.db.session import get_session
from shared.models import User
from shared.models.oauth_code import OAuthCode
from shared.models.oauth_grant import OAuthGrant
from shared.infrastructure.urls import account_url, app_url
from shared.infrastructure.cart_identity import current_cart_identity
from shared.events import emit_activity
from .services import (
pop_login_redirect_target,
store_login_redirect_target,
send_magic_email,
find_or_create_user,
create_magic_link,
validate_magic_link,
validate_email,
)
SESSION_USER_KEY = "uid"
ACCOUNT_SESSION_KEY = "account_sid"
ALLOWED_CLIENTS = {"blog", "market", "cart", "events", "federation", "artdag"}
def register(url_prefix="/auth"):
auth_bp = Blueprint("auth", __name__, url_prefix=url_prefix)
# --- OAuth2 authorize endpoint -------------------------------------------
@auth_bp.get("/oauth/authorize")
@auth_bp.get("/oauth/authorize/")
async def oauth_authorize():
client_id = request.args.get("client_id", "")
redirect_uri = request.args.get("redirect_uri", "")
state = request.args.get("state", "")
device_id = request.args.get("device_id", "")
prompt = request.args.get("prompt", "")
if client_id not in ALLOWED_CLIENTS:
return "Invalid client_id", 400
expected_redirect = app_url(client_id, "/auth/callback")
if redirect_uri != expected_redirect:
return "Invalid redirect_uri", 400
# Account's own device id — always available via factory hook
account_did = g.device_id
# Not logged in
if not g.get("user"):
if prompt == "none":
# Silent check — pass account_did so client can watch for future logins
sep = "&" if "?" in redirect_uri else "?"
return redirect(
f"{redirect_uri}{sep}error=login_required"
f"&state={state}&account_did={account_did}"
)
authorize_path = request.full_path
store_login_redirect_target()
return redirect(url_for("auth.login_form", next=authorize_path))
# Logged in — create grant + authorization code
account_sid = qsession.get(ACCOUNT_SESSION_KEY)
if not account_sid:
account_sid = secrets.token_urlsafe(32)
qsession[ACCOUNT_SESSION_KEY] = account_sid
grant_token = secrets.token_urlsafe(48)
code = secrets.token_urlsafe(48)
now = datetime.now(timezone.utc)
expires = now + timedelta(minutes=5)
async with get_session() as s:
async with s.begin():
grant = OAuthGrant(
token=grant_token,
user_id=g.user.id,
client_id=client_id,
issuer_session=account_sid,
device_id=device_id or None,
)
s.add(grant)
oauth_code = OAuthCode(
code=code,
user_id=g.user.id,
client_id=client_id,
redirect_uri=redirect_uri,
expires_at=expires,
grant_token=grant_token,
)
s.add(oauth_code)
sep = "&" if "?" in redirect_uri else "?"
return redirect(
f"{redirect_uri}{sep}code={code}&state={state}"
f"&account_did={account_did}"
)
# --- OAuth2 token exchange (for external clients like artdag) -------------
from shared.browser.app.csrf import csrf_exempt
@csrf_exempt
@auth_bp.post("/oauth/token")
@auth_bp.post("/oauth/token/")
async def oauth_token():
"""Exchange an authorization code for user info + grant token.
Used by clients that don't share the coop database (e.g. artdag).
Accepts JSON: {code, client_id, redirect_uri}
Returns JSON: {user_id, username, display_name, grant_token}
"""
data = await request.get_json()
if not data:
return jsonify({"error": "invalid_request"}), 400
code = data.get("code", "")
client_id = data.get("client_id", "")
redirect_uri = data.get("redirect_uri", "")
if client_id not in ALLOWED_CLIENTS:
return jsonify({"error": "invalid_client"}), 400
now = datetime.now(timezone.utc)
async with get_session() as s:
async with s.begin():
result = await s.execute(
select(OAuthCode)
.where(OAuthCode.code == code)
.with_for_update()
)
oauth_code = result.scalar_one_or_none()
if not oauth_code:
return jsonify({"error": "invalid_grant"}), 400
if oauth_code.used_at is not None:
return jsonify({"error": "invalid_grant"}), 400
if oauth_code.expires_at < now:
return jsonify({"error": "invalid_grant"}), 400
if oauth_code.client_id != client_id:
return jsonify({"error": "invalid_grant"}), 400
if oauth_code.redirect_uri != redirect_uri:
return jsonify({"error": "invalid_grant"}), 400
oauth_code.used_at = now
user_id = oauth_code.user_id
grant_token = oauth_code.grant_token
user = await s.get(User, user_id)
if not user:
return jsonify({"error": "invalid_grant"}), 400
return jsonify({
"user_id": user_id,
"username": user.email or "",
"display_name": user.name or "",
"grant_token": grant_token,
})
# --- Grant verification (internal endpoint) ------------------------------
@auth_bp.get("/internal/verify-grant")
async def verify_grant():
"""Called by client apps to check if a grant is still valid."""
token = request.args.get("token", "")
if not token:
return jsonify({"valid": False}), 200
async with get_session() as s:
grant = await s.scalar(
select(OAuthGrant).where(OAuthGrant.token == token)
)
if not grant or grant.revoked_at is not None:
return jsonify({"valid": False}), 200
return jsonify({"valid": True}), 200
@auth_bp.get("/internal/check-device")
async def check_device():
"""Called by client apps to check if a device has an active auth.
Looks up the most recent grant for (device_id, client_id).
If the grant is active → {active: true}.
If revoked but user has logged in since → {active: true} (re-auth needed).
Otherwise → {active: false}.
"""
device_id = request.args.get("device_id", "")
app_name = request.args.get("app", "")
if not device_id or not app_name:
return jsonify({"active": False}), 200
async with get_session() as s:
# Find the most recent grant for this device + app
result = await s.execute(
select(OAuthGrant)
.where(OAuthGrant.device_id == device_id)
.where(OAuthGrant.client_id == app_name)
.order_by(OAuthGrant.created_at.desc())
.limit(1)
)
grant = result.scalar_one_or_none()
if not grant:
return jsonify({"active": False}), 200
# Grant still active
if grant.revoked_at is None:
return jsonify({"active": True}), 200
# Grant revoked — check if user logged in since
user = await s.get(User, grant.user_id)
if user and user.last_login_at and user.last_login_at > grant.revoked_at:
return jsonify({"active": True}), 200
return jsonify({"active": False}), 200
# --- Magic link login flow -----------------------------------------------
@auth_bp.get("/login/")
async def login_form():
store_login_redirect_target()
cross_cart_sid = request.args.get("cart_sid")
if cross_cart_sid:
qsession["cart_sid"] = cross_cart_sid
if g.get("user"):
redirect_url = pop_login_redirect_target()
return redirect(redirect_url)
return await render_template("auth/login.html")
@auth_bp.post("/start/")
async def start_login():
form = await request.form
email_input = form.get("email") or ""
is_valid, email = validate_email(email_input)
if not is_valid:
return (
await render_template(
"auth/login.html",
error="Please enter a valid email address.",
email=email_input,
),
400,
)
user = await find_or_create_user(g.s, email)
token, expires = await create_magic_link(g.s, user.id)
from shared.utils import host_url
magic_url = host_url(url_for("auth.magic", token=token))
email_error = None
try:
await send_magic_email(email, magic_url)
except Exception as e:
current_app.logger.error("EMAIL SEND FAILED: %r", e)
email_error = (
"We couldn't send the email automatically. "
"Please try again in a moment."
)
return await render_template(
"auth/check_email.html",
email=email,
email_error=email_error,
)
@auth_bp.get("/magic/<token>/")
async def magic(token: str):
now = datetime.now(timezone.utc)
user_id: int | None = None
try:
async with get_session() as s:
async with s.begin():
user, error = await validate_magic_link(s, token)
if error:
return (
await render_template("auth/login.html", error=error),
400,
)
user_id = user.id
except Exception:
return (
await render_template(
"auth/login.html",
error="Could not sign you in right now. Please try again.",
),
502,
)
assert user_id is not None
ident = current_cart_identity()
anon_session_id = ident.get("session_id")
try:
async with get_session() as s:
async with s.begin():
u2 = await s.get(User, user_id)
if u2:
u2.last_login_at = now
if anon_session_id:
await emit_activity(
s,
activity_type="rose:Login",
actor_uri="internal:system",
object_type="Person",
object_data={
"user_id": user_id,
"session_id": anon_session_id,
},
)
# Notify external services of device login
await emit_activity(
s,
activity_type="rose:DeviceAuth",
actor_uri="internal:system",
object_type="Device",
object_data={
"device_id": g.device_id,
"action": "login",
},
)
except SQLAlchemyError:
current_app.logger.exception(
"[auth] non-fatal DB update for user_id=%s", user_id
)
qsession[SESSION_USER_KEY] = user_id
# Fresh account session ID for grant tracking
qsession[ACCOUNT_SESSION_KEY] = secrets.token_urlsafe(32)
# Signal login for this device so client apps can detect it
try:
from shared.browser.app.redis_cacher import get_redis
import time as _time
_redis = get_redis()
if _redis:
await _redis.set(
f"did_auth:{g.device_id}",
str(_time.time()).encode(),
ex=30 * 24 * 3600,
)
except Exception:
current_app.logger.exception("[auth] failed to set did_auth in Redis")
redirect_url = pop_login_redirect_target()
return redirect(redirect_url, 303)
@auth_bp.post("/logout/")
async def logout():
# Revoke all grants issued by this account session
account_sid = qsession.get(ACCOUNT_SESSION_KEY)
if account_sid:
try:
async with get_session() as s:
async with s.begin():
await s.execute(
update(OAuthGrant)
.where(OAuthGrant.issuer_session == account_sid)
.where(OAuthGrant.revoked_at.is_(None))
.values(revoked_at=datetime.now(timezone.utc))
)
except SQLAlchemyError:
current_app.logger.exception("[auth] failed to revoke grants")
# Clear login signal for this device
try:
from shared.browser.app.redis_cacher import get_redis
_redis = get_redis()
if _redis:
await _redis.delete(f"did_auth:{g.device_id}")
except Exception:
pass
# Notify external services of device logout
try:
async with get_session() as s:
async with s.begin():
await emit_activity(
s,
activity_type="rose:DeviceAuth",
actor_uri="internal:system",
object_type="Device",
object_data={
"device_id": g.device_id,
"action": "logout",
},
)
except Exception:
current_app.logger.exception("[auth] failed to emit DeviceAuth logout")
qsession.pop(SESSION_USER_KEY, None)
qsession.pop(ACCOUNT_SESSION_KEY, None)
from shared.infrastructure.urls import blog_url
return redirect(blog_url("/"))
@auth_bp.get("/sso-logout/")
async def sso_logout():
"""SSO logout called by client apps: revoke grants, clear session."""
account_sid = qsession.get(ACCOUNT_SESSION_KEY)
if account_sid:
try:
async with get_session() as s:
async with s.begin():
await s.execute(
update(OAuthGrant)
.where(OAuthGrant.issuer_session == account_sid)
.where(OAuthGrant.revoked_at.is_(None))
.values(revoked_at=datetime.now(timezone.utc))
)
except SQLAlchemyError:
current_app.logger.exception("[auth] failed to revoke grants")
# Clear login signal for this device
try:
from shared.browser.app.redis_cacher import get_redis
_redis = get_redis()
if _redis:
await _redis.delete(f"did_auth:{g.device_id}")
except Exception:
pass
# Notify external services of device logout
try:
async with get_session() as s:
async with s.begin():
await emit_activity(
s,
activity_type="rose:DeviceAuth",
actor_uri="internal:system",
object_type="Device",
object_data={
"device_id": g.device_id,
"action": "logout",
},
)
except Exception:
current_app.logger.exception("[auth] failed to emit DeviceAuth logout")
qsession.pop(SESSION_USER_KEY, None)
qsession.pop(ACCOUNT_SESSION_KEY, None)
from shared.infrastructure.urls import blog_url
return redirect(blog_url("/"))
@auth_bp.get("/clear/")
async def clear():
"""One-time migration helper: clear all session cookies."""
qsession.clear()
resp = redirect(account_url("/"))
resp.delete_cookie("blog_session", domain=".rose-ash.com", path="/")
return resp
return auth_bp