Client apps now do a silent OAuth round-trip (prompt=none) to account on first visit. If user is logged in on account, they get silently logged in. If not, the result is cached (5 min) to avoid repeated handshakes. Grant verification now uses direct DB query instead of aiohttp HTTP calls. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
198 lines
6.8 KiB
Python
198 lines
6.8 KiB
Python
"""OAuth2 client blueprint for non-account apps.
|
|
|
|
Each client app gets /auth/login, /auth/callback, /auth/logout.
|
|
Account is the OAuth authorization server.
|
|
|
|
Device cookie ({app}_did) ties the browser to its auth state so
|
|
client apps can detect login/logout without cross-domain cookies.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import secrets
|
|
from datetime import datetime, timezone
|
|
|
|
from quart import (
|
|
Blueprint,
|
|
redirect,
|
|
request,
|
|
session as qsession,
|
|
g,
|
|
current_app,
|
|
make_response,
|
|
)
|
|
from sqlalchemy import select
|
|
|
|
from shared.db.session import get_session
|
|
from shared.models import User
|
|
from shared.models.oauth_code import OAuthCode
|
|
from shared.infrastructure.urls import account_url, app_url
|
|
from shared.infrastructure.cart_identity import current_cart_identity
|
|
from shared.events import emit_activity
|
|
|
|
SESSION_USER_KEY = "uid"
|
|
GRANT_TOKEN_KEY = "grant_token"
|
|
|
|
|
|
def _device_cookie_name(app_name: str) -> str:
|
|
return f"{app_name}_did"
|
|
|
|
|
|
def _internal_account_url() -> str:
|
|
return (os.getenv("INTERNAL_URL_ACCOUNT") or "http://account:8000").rstrip("/")
|
|
|
|
|
|
def create_oauth_blueprint(app_name: str) -> Blueprint:
|
|
"""Return an OAuth client blueprint for *app_name*."""
|
|
bp = Blueprint("oauth_auth", __name__, url_prefix="/auth")
|
|
cookie_name = _device_cookie_name(app_name)
|
|
|
|
# Ensure device cookie exists on every response
|
|
@bp.after_app_request
|
|
async def _ensure_device_cookie(response):
|
|
if not request.cookies.get(cookie_name):
|
|
did = secrets.token_urlsafe(32)
|
|
response.set_cookie(
|
|
cookie_name, did,
|
|
max_age=30 * 24 * 3600,
|
|
secure=True, samesite="Lax", httponly=True,
|
|
)
|
|
return response
|
|
|
|
@bp.get("/login")
|
|
@bp.get("/login/")
|
|
async def login():
|
|
next_url = request.args.get("next", "/")
|
|
prompt = request.args.get("prompt", "")
|
|
state = secrets.token_urlsafe(32)
|
|
qsession["oauth_state"] = state
|
|
qsession["oauth_next"] = next_url
|
|
|
|
device_id = request.cookies.get(cookie_name, "")
|
|
redirect_uri = app_url(app_name, "/auth/callback")
|
|
params = (
|
|
f"?client_id={app_name}"
|
|
f"&redirect_uri={redirect_uri}"
|
|
f"&device_id={device_id}"
|
|
f"&state={state}"
|
|
)
|
|
if prompt:
|
|
params += f"&prompt={prompt}"
|
|
authorize_url = account_url(f"/auth/oauth/authorize{params}")
|
|
return redirect(authorize_url)
|
|
|
|
@bp.get("/callback")
|
|
@bp.get("/callback/")
|
|
async def callback():
|
|
# Handle prompt=none error (user not logged in on account)
|
|
error = request.args.get("error")
|
|
if error == "login_required":
|
|
next_url = qsession.pop("oauth_next", "/")
|
|
qsession.pop("oauth_state", None)
|
|
import time as _time
|
|
qsession["_pnone_at"] = _time.time()
|
|
device_id = request.cookies.get(cookie_name, "")
|
|
if device_id:
|
|
from shared.browser.app.redis_cacher import get_redis
|
|
_redis = get_redis()
|
|
if _redis:
|
|
await _redis.set(
|
|
f"prompt:{app_name}:{device_id}", b"none", ex=300
|
|
)
|
|
return redirect(next_url)
|
|
|
|
code = request.args.get("code")
|
|
state = request.args.get("state")
|
|
expected_state = qsession.pop("oauth_state", None)
|
|
next_url = qsession.pop("oauth_next", "/")
|
|
|
|
if not code or not state or state != expected_state:
|
|
current_app.logger.warning("OAuth callback: bad state or missing code")
|
|
return redirect("/")
|
|
|
|
expected_redirect = app_url(app_name, "/auth/callback")
|
|
now = datetime.now(timezone.utc)
|
|
|
|
async with get_session() as s:
|
|
async with s.begin():
|
|
result = await s.execute(
|
|
select(OAuthCode)
|
|
.where(OAuthCode.code == code)
|
|
.with_for_update()
|
|
)
|
|
oauth_code = result.scalar_one_or_none()
|
|
|
|
if not oauth_code:
|
|
current_app.logger.warning("OAuth callback: code not found")
|
|
return redirect("/")
|
|
|
|
if oauth_code.used_at is not None:
|
|
current_app.logger.warning("OAuth callback: code already used")
|
|
return redirect("/")
|
|
|
|
if oauth_code.expires_at < now:
|
|
current_app.logger.warning("OAuth callback: code expired")
|
|
return redirect("/")
|
|
|
|
if oauth_code.client_id != app_name:
|
|
current_app.logger.warning("OAuth callback: client_id mismatch")
|
|
return redirect("/")
|
|
|
|
if oauth_code.redirect_uri != expected_redirect:
|
|
current_app.logger.warning("OAuth callback: redirect_uri mismatch")
|
|
return redirect("/")
|
|
|
|
oauth_code.used_at = now
|
|
user_id = oauth_code.user_id
|
|
grant_token = oauth_code.grant_token
|
|
|
|
# Set local session with grant token for revocation checking
|
|
qsession[SESSION_USER_KEY] = user_id
|
|
if grant_token:
|
|
qsession[GRANT_TOKEN_KEY] = grant_token
|
|
qsession.pop("_pnone_at", None)
|
|
|
|
# Emit login activity for cart adoption
|
|
ident = current_cart_identity()
|
|
anon_session_id = ident.get("session_id")
|
|
if anon_session_id:
|
|
try:
|
|
async with get_session() as s:
|
|
async with s.begin():
|
|
await emit_activity(
|
|
s,
|
|
activity_type="rose:Login",
|
|
actor_uri="internal:system",
|
|
object_type="Person",
|
|
object_data={
|
|
"user_id": user_id,
|
|
"session_id": anon_session_id,
|
|
},
|
|
)
|
|
except Exception:
|
|
current_app.logger.exception("OAuth: failed to emit login activity")
|
|
|
|
return redirect(next_url, 303)
|
|
|
|
@bp.get("/clear")
|
|
@bp.get("/clear/")
|
|
async def clear():
|
|
"""One-time migration helper: clear all session cookies."""
|
|
qsession.clear()
|
|
resp = await make_response(redirect("/"))
|
|
resp.delete_cookie("blog_session", domain=".rose-ash.com", path="/")
|
|
resp.delete_cookie(cookie_name, path="/")
|
|
return resp
|
|
|
|
@bp.post("/logout")
|
|
@bp.post("/logout/")
|
|
async def logout():
|
|
qsession.pop(SESSION_USER_KEY, None)
|
|
qsession.pop(GRANT_TOKEN_KEY, None)
|
|
qsession.pop("cart_sid", None)
|
|
qsession.pop("_pnone_at", None)
|
|
# Redirect through account to revoke grants + clear account session
|
|
return redirect(account_url("/auth/sso-logout/"))
|
|
|
|
return bp
|