Grant-based session revocation, remove iframe logout
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 43s
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 43s
Account creates OAuthGrant per authorization, revokes on logout. Client apps verify grants via /auth/internal/verify-grant endpoint. Removes iframe-based logout page. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
"""Authentication routes for the account app.
|
||||
|
||||
Account is the OAuth authorization server. Owns magic link login/logout,
|
||||
OAuth2 authorize endpoint, and SSO logout.
|
||||
OAuth2 authorize endpoint, grant verification, and SSO logout.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
@@ -17,13 +17,15 @@ from quart import (
|
||||
session as qsession,
|
||||
g,
|
||||
current_app,
|
||||
jsonify,
|
||||
)
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy import select, update
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
||||
from shared.db.session import get_session
|
||||
from shared.models import User
|
||||
from shared.models.oauth_code import OAuthCode
|
||||
from shared.models.oauth_grant import OAuthGrant
|
||||
from shared.infrastructure.urls import account_url, app_url
|
||||
from shared.infrastructure.cart_identity import current_cart_identity
|
||||
from shared.events import emit_activity
|
||||
@@ -39,6 +41,7 @@ from .services import (
|
||||
)
|
||||
|
||||
SESSION_USER_KEY = "uid"
|
||||
ACCOUNT_SESSION_KEY = "account_sid"
|
||||
|
||||
ALLOWED_CLIENTS = {"blog", "market", "cart", "events", "federation"}
|
||||
|
||||
@@ -64,30 +67,61 @@ def register(url_prefix="/auth"):
|
||||
|
||||
# Not logged in — bounce to magic link login, then back here
|
||||
if not g.get("user"):
|
||||
# Preserve the full authorize URL so we return here after login
|
||||
authorize_path = request.full_path # includes query string
|
||||
authorize_path = request.full_path
|
||||
store_login_redirect_target()
|
||||
return redirect(url_for("auth.login_form", next=authorize_path))
|
||||
|
||||
# Logged in — issue authorization code
|
||||
# Logged in — create grant + authorization code
|
||||
account_sid = qsession.get(ACCOUNT_SESSION_KEY)
|
||||
if not account_sid:
|
||||
account_sid = secrets.token_urlsafe(32)
|
||||
qsession[ACCOUNT_SESSION_KEY] = account_sid
|
||||
|
||||
grant_token = secrets.token_urlsafe(48)
|
||||
code = secrets.token_urlsafe(48)
|
||||
now = datetime.now(timezone.utc)
|
||||
expires = now + timedelta(minutes=5)
|
||||
|
||||
async with get_session() as s:
|
||||
async with s.begin():
|
||||
grant = OAuthGrant(
|
||||
token=grant_token,
|
||||
user_id=g.user.id,
|
||||
client_id=client_id,
|
||||
issuer_session=account_sid,
|
||||
)
|
||||
s.add(grant)
|
||||
|
||||
oauth_code = OAuthCode(
|
||||
code=code,
|
||||
user_id=g.user.id,
|
||||
client_id=client_id,
|
||||
redirect_uri=redirect_uri,
|
||||
expires_at=expires,
|
||||
grant_token=grant_token,
|
||||
)
|
||||
s.add(oauth_code)
|
||||
|
||||
sep = "&" if "?" in redirect_uri else "?"
|
||||
return redirect(f"{redirect_uri}{sep}code={code}&state={state}")
|
||||
|
||||
# --- Grant verification (internal endpoint) ------------------------------
|
||||
|
||||
@auth_bp.get("/internal/verify-grant")
|
||||
async def verify_grant():
|
||||
"""Called by client apps to check if a grant is still valid."""
|
||||
token = request.args.get("token", "")
|
||||
if not token:
|
||||
return jsonify({"valid": False}), 200
|
||||
|
||||
async with get_session() as s:
|
||||
grant = await s.scalar(
|
||||
select(OAuthGrant).where(OAuthGrant.token == token)
|
||||
)
|
||||
if not grant or grant.revoked_at is not None:
|
||||
return jsonify({"valid": False}), 200
|
||||
return jsonify({"valid": True}), 200
|
||||
|
||||
# --- Magic link login flow -----------------------------------------------
|
||||
|
||||
@auth_bp.get("/login/")
|
||||
@@ -97,7 +131,6 @@ def register(url_prefix="/auth"):
|
||||
if cross_cart_sid:
|
||||
qsession["cart_sid"] = cross_cart_sid
|
||||
if g.get("user"):
|
||||
# If there's a pending redirect (e.g. OAuth authorize), follow it
|
||||
redirect_url = pop_login_redirect_target()
|
||||
return redirect(redirect_url)
|
||||
return await render_template("auth/login.html")
|
||||
@@ -194,14 +227,55 @@ def register(url_prefix="/auth"):
|
||||
)
|
||||
|
||||
qsession[SESSION_USER_KEY] = user_id
|
||||
# Fresh account session ID for grant tracking
|
||||
qsession[ACCOUNT_SESSION_KEY] = secrets.token_urlsafe(32)
|
||||
|
||||
redirect_url = pop_login_redirect_target()
|
||||
return redirect(redirect_url, 303)
|
||||
|
||||
@auth_bp.post("/logout/")
|
||||
async def logout():
|
||||
# Revoke all grants issued by this account session
|
||||
account_sid = qsession.get(ACCOUNT_SESSION_KEY)
|
||||
if account_sid:
|
||||
try:
|
||||
async with get_session() as s:
|
||||
async with s.begin():
|
||||
await s.execute(
|
||||
update(OAuthGrant)
|
||||
.where(OAuthGrant.issuer_session == account_sid)
|
||||
.where(OAuthGrant.revoked_at.is_(None))
|
||||
.values(revoked_at=datetime.now(timezone.utc))
|
||||
)
|
||||
except SQLAlchemyError:
|
||||
current_app.logger.exception("[auth] failed to revoke grants")
|
||||
|
||||
qsession.pop(SESSION_USER_KEY, None)
|
||||
return redirect(url_for("auth.sso_logout"))
|
||||
qsession.pop(ACCOUNT_SESSION_KEY, None)
|
||||
from shared.infrastructure.urls import blog_url
|
||||
return redirect(blog_url("/"))
|
||||
|
||||
@auth_bp.get("/sso-logout/")
|
||||
async def sso_logout():
|
||||
"""SSO logout called by client apps: revoke grants, clear session."""
|
||||
account_sid = qsession.get(ACCOUNT_SESSION_KEY)
|
||||
if account_sid:
|
||||
try:
|
||||
async with get_session() as s:
|
||||
async with s.begin():
|
||||
await s.execute(
|
||||
update(OAuthGrant)
|
||||
.where(OAuthGrant.issuer_session == account_sid)
|
||||
.where(OAuthGrant.revoked_at.is_(None))
|
||||
.values(revoked_at=datetime.now(timezone.utc))
|
||||
)
|
||||
except SQLAlchemyError:
|
||||
current_app.logger.exception("[auth] failed to revoke grants")
|
||||
|
||||
qsession.pop(SESSION_USER_KEY, None)
|
||||
qsession.pop(ACCOUNT_SESSION_KEY, None)
|
||||
from shared.infrastructure.urls import blog_url
|
||||
return redirect(blog_url("/"))
|
||||
|
||||
@auth_bp.get("/clear/")
|
||||
async def clear():
|
||||
@@ -211,27 +285,4 @@ def register(url_prefix="/auth"):
|
||||
resp.delete_cookie("blog_session", domain=".rose-ash.com", path="/")
|
||||
return resp
|
||||
|
||||
@auth_bp.get("/sso-logout/")
|
||||
async def sso_logout():
|
||||
"""SSO logout: clear account session, then render a page with hidden
|
||||
iframes that clear each client app's first-party session cookie.
|
||||
Tolerates dead apps — iframes that fail are silently ignored."""
|
||||
qsession.pop(SESSION_USER_KEY, None)
|
||||
|
||||
from shared.infrastructure.urls import blog_url, market_url, cart_url, events_url, federation_url
|
||||
|
||||
clear_urls = [
|
||||
blog_url("/auth/sso-clear"),
|
||||
market_url("/auth/sso-clear"),
|
||||
cart_url("/auth/sso-clear"),
|
||||
events_url("/auth/sso-clear"),
|
||||
federation_url("/auth/sso-clear"),
|
||||
]
|
||||
|
||||
return await render_template(
|
||||
"auth/signing_out.html",
|
||||
clear_urls=clear_urls,
|
||||
final_url=blog_url("/"),
|
||||
)
|
||||
|
||||
return auth_bp
|
||||
|
||||
Reference in New Issue
Block a user