This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
shared/infrastructure/oauth.py
giles cad528d732 Device-id SSO: account sets did, signals login via Redis
- Factory: set {name}_did cookie for all apps (including account)
  via before_request/after_request hooks (g.device_id always available)
- Factory: _check_auth_state checks did_auth:{account_did} in Redis
  to override stale "not logged in" cache when account login detected
- OAuth: removed _ensure_device_cookie (moved to factory), callback
  stores account_did from authorize redirect in session
- OAuth: login uses g.device_id, logout clears _account_did

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 14:57:40 +00:00

181 lines
6.3 KiB
Python

"""OAuth2 client blueprint for non-account apps.
Each client app gets /auth/login, /auth/callback, /auth/logout.
Account is the OAuth authorization server.
Device cookie ({app}_did) ties the browser to its auth state so
client apps can detect login/logout without cross-domain cookies.
"""
from __future__ import annotations
import secrets
from datetime import datetime, timezone
from quart import (
Blueprint,
redirect,
request,
session as qsession,
g,
current_app,
make_response,
)
from sqlalchemy import select
from shared.db.session import get_session
from shared.models.oauth_code import OAuthCode
from shared.infrastructure.urls import account_url, app_url
from shared.infrastructure.cart_identity import current_cart_identity
from shared.events import emit_activity
SESSION_USER_KEY = "uid"
GRANT_TOKEN_KEY = "grant_token"
def create_oauth_blueprint(app_name: str) -> Blueprint:
"""Return an OAuth client blueprint for *app_name*."""
bp = Blueprint("oauth_auth", __name__, url_prefix="/auth")
@bp.get("/login")
@bp.get("/login/")
async def login():
next_url = request.args.get("next", "/")
prompt = request.args.get("prompt", "")
state = secrets.token_urlsafe(32)
qsession["oauth_state"] = state
qsession["oauth_next"] = next_url
device_id = g.device_id
redirect_uri = app_url(app_name, "/auth/callback")
params = (
f"?client_id={app_name}"
f"&redirect_uri={redirect_uri}"
f"&device_id={device_id}"
f"&state={state}"
)
if prompt:
params += f"&prompt={prompt}"
authorize_url = account_url(f"/auth/oauth/authorize{params}")
return redirect(authorize_url)
@bp.get("/callback")
@bp.get("/callback/")
async def callback():
# Always store account_did when account passes it back
account_did = request.args.get("account_did", "")
if account_did:
qsession["_account_did"] = account_did
# Handle prompt=none error (user not logged in on account)
error = request.args.get("error")
if error == "login_required":
next_url = qsession.pop("oauth_next", "/")
qsession.pop("oauth_state", None)
import time as _time
qsession["_pnone_at"] = _time.time()
device_id = g.device_id
if device_id:
from shared.browser.app.redis_cacher import get_redis
_redis = get_redis()
if _redis:
await _redis.set(
f"prompt:{app_name}:{device_id}", b"none", ex=300
)
return redirect(next_url)
code = request.args.get("code")
state = request.args.get("state")
expected_state = qsession.pop("oauth_state", None)
next_url = qsession.pop("oauth_next", "/")
if not code or not state or state != expected_state:
current_app.logger.warning("OAuth callback: bad state or missing code")
return redirect("/")
expected_redirect = app_url(app_name, "/auth/callback")
now = datetime.now(timezone.utc)
async with get_session() as s:
async with s.begin():
result = await s.execute(
select(OAuthCode)
.where(OAuthCode.code == code)
.with_for_update()
)
oauth_code = result.scalar_one_or_none()
if not oauth_code:
current_app.logger.warning("OAuth callback: code not found")
return redirect("/")
if oauth_code.used_at is not None:
current_app.logger.warning("OAuth callback: code already used")
return redirect("/")
if oauth_code.expires_at < now:
current_app.logger.warning("OAuth callback: code expired")
return redirect("/")
if oauth_code.client_id != app_name:
current_app.logger.warning("OAuth callback: client_id mismatch")
return redirect("/")
if oauth_code.redirect_uri != expected_redirect:
current_app.logger.warning("OAuth callback: redirect_uri mismatch")
return redirect("/")
oauth_code.used_at = now
user_id = oauth_code.user_id
grant_token = oauth_code.grant_token
# Set local session with grant token for revocation checking
qsession[SESSION_USER_KEY] = user_id
if grant_token:
qsession[GRANT_TOKEN_KEY] = grant_token
qsession.pop("_pnone_at", None)
# Emit login activity for cart adoption
ident = current_cart_identity()
anon_session_id = ident.get("session_id")
if anon_session_id:
try:
async with get_session() as s:
async with s.begin():
await emit_activity(
s,
activity_type="rose:Login",
actor_uri="internal:system",
object_type="Person",
object_data={
"user_id": user_id,
"session_id": anon_session_id,
},
)
except Exception:
current_app.logger.exception("OAuth: failed to emit login activity")
return redirect(next_url, 303)
@bp.get("/clear")
@bp.get("/clear/")
async def clear():
"""One-time migration helper: clear all session cookies."""
qsession.clear()
resp = await make_response(redirect("/"))
resp.delete_cookie("blog_session", domain=".rose-ash.com", path="/")
resp.delete_cookie(f"{app_name}_did", path="/")
return resp
@bp.post("/logout")
@bp.post("/logout/")
async def logout():
qsession.pop(SESSION_USER_KEY, None)
qsession.pop(GRANT_TOKEN_KEY, None)
qsession.pop("cart_sid", None)
qsession.pop("_pnone_at", None)
qsession.pop("_account_did", None)
# Redirect through account to revoke grants + clear account session
return redirect(account_url("/auth/sso-logout/"))
return bp