Add device authorization flow (RFC 8628) for CLI login

Implements the device code grant flow so artdag CLI can authenticate
via browser approval. Includes device/authorize, device/token endpoints,
user code verification page, and approval confirmation template.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
giles
2026-02-25 19:41:09 +00:00
parent b9fe884ab9
commit c3ba28ea03
3 changed files with 281 additions and 1 deletions

View File

@@ -5,6 +5,7 @@ OAuth2 authorize endpoint, grant verification, and SSO logout.
"""
from __future__ import annotations
import json
import secrets
from datetime import datetime, timezone, timedelta
@@ -202,7 +203,13 @@ def register(url_prefix="/auth"):
)
if not grant or grant.revoked_at is not None:
return jsonify({"valid": False}), 200
return jsonify({"valid": True}), 200
user = await s.get(User, grant.user_id)
return jsonify({
"valid": True,
"user_id": grant.user_id,
"username": user.email if user else "",
"display_name": user.name if user else "",
}), 200
@auth_bp.get("/internal/check-device")
async def check_device():
@@ -480,4 +487,227 @@ def register(url_prefix="/auth"):
resp.delete_cookie("blog_session", domain=".rose-ash.com", path="/")
return resp
# --- Device Authorization Flow (RFC 8628) ---------------------------------
_DEVICE_ALPHABET = "ABCDEFGHJKMNPQRSTVWXYZ"
_DEVICE_CODE_TTL = 900 # 15 minutes
_DEVICE_POLL_INTERVAL = 5
def _generate_user_code() -> str:
"""Generate an unambiguous 8-char user code like KBMN-TWRP."""
chars = [secrets.choice(_DEVICE_ALPHABET) for _ in range(8)]
return "".join(chars[:4]) + "-" + "".join(chars[4:])
async def _approve_device(device_code: str, user) -> bool:
"""Approve a pending device flow and create an OAuthGrant."""
from shared.infrastructure.auth_redis import get_auth_redis
r = await get_auth_redis()
raw = await r.get(f"devflow:{device_code}")
if not raw:
return False
blob = json.loads(raw)
if blob.get("status") != "pending":
return False
account_sid = qsession.get(ACCOUNT_SESSION_KEY)
if not account_sid:
account_sid = secrets.token_urlsafe(32)
qsession[ACCOUNT_SESSION_KEY] = account_sid
grant_token = secrets.token_urlsafe(48)
async with get_session() as s:
async with s.begin():
grant = OAuthGrant(
token=grant_token,
user_id=user.id,
client_id=blob["client_id"],
issuer_session=account_sid,
)
s.add(grant)
# Update Redis blob
blob["status"] = "approved"
blob["user_id"] = user.id
blob["grant_token"] = grant_token
user_code = blob["user_code"]
ttl = await r.ttl(f"devflow:{device_code}")
if ttl and ttl > 0:
await r.set(f"devflow:{device_code}", json.dumps(blob).encode(), ex=ttl)
else:
await r.set(f"devflow:{device_code}", json.dumps(blob).encode(), ex=_DEVICE_CODE_TTL)
# Remove reverse lookup (code already used)
normalized_uc = user_code.replace("-", "").upper()
await r.delete(f"devflow_uc:{normalized_uc}")
return True
@csrf_exempt
@auth_bp.post("/device/authorize")
@auth_bp.post("/device/authorize/")
async def device_authorize():
"""RFC 8628 — CLI requests a device code."""
data = await request.get_json(silent=True) or {}
client_id = data.get("client_id", "")
if client_id not in ALLOWED_CLIENTS:
return jsonify({"error": "invalid_client"}), 400
device_code = secrets.token_urlsafe(32)
user_code = _generate_user_code()
from shared.infrastructure.auth_redis import get_auth_redis
r = await get_auth_redis()
blob = json.dumps({
"client_id": client_id,
"user_code": user_code,
"status": "pending",
"user_id": None,
"grant_token": None,
}).encode()
normalized_uc = user_code.replace("-", "").upper()
pipe = r.pipeline()
pipe.set(f"devflow:{device_code}", blob, ex=_DEVICE_CODE_TTL)
pipe.set(f"devflow_uc:{normalized_uc}", device_code.encode(), ex=_DEVICE_CODE_TTL)
await pipe.execute()
verification_uri = account_url("/auth/device")
return jsonify({
"device_code": device_code,
"user_code": user_code,
"verification_uri": verification_uri,
"expires_in": _DEVICE_CODE_TTL,
"interval": _DEVICE_POLL_INTERVAL,
})
@csrf_exempt
@auth_bp.post("/device/token")
@auth_bp.post("/device/token/")
async def device_token():
"""RFC 8628 — CLI polls for the grant token."""
data = await request.get_json(silent=True) or {}
device_code = data.get("device_code", "")
client_id = data.get("client_id", "")
if not device_code or client_id not in ALLOWED_CLIENTS:
return jsonify({"error": "invalid_request"}), 400
from shared.infrastructure.auth_redis import get_auth_redis
r = await get_auth_redis()
raw = await r.get(f"devflow:{device_code}")
if not raw:
return jsonify({"error": "expired_token"}), 400
blob = json.loads(raw)
if blob.get("client_id") != client_id:
return jsonify({"error": "invalid_request"}), 400
if blob["status"] == "pending":
return jsonify({"error": "authorization_pending"}), 428
if blob["status"] == "denied":
return jsonify({"error": "access_denied"}), 400
if blob["status"] == "approved":
async with get_session() as s:
user = await s.get(User, blob["user_id"])
if not user:
return jsonify({"error": "access_denied"}), 400
# Clean up Redis
await r.delete(f"devflow:{device_code}")
return jsonify({
"access_token": blob["grant_token"],
"token_type": "bearer",
"user_id": blob["user_id"],
"username": user.email or "",
"display_name": user.name or "",
})
return jsonify({"error": "invalid_request"}), 400
@auth_bp.get("/device")
@auth_bp.get("/device/")
async def device_form():
"""Browser form where user enters the code displayed in terminal."""
code = request.args.get("code", "")
return await render_template("auth/device.html", code=code)
@auth_bp.post("/device")
@auth_bp.post("/device/")
async def device_submit():
"""Browser submit — validates code, approves if logged in."""
form = await request.form
user_code = (form.get("code") or "").strip().replace("-", "").upper()
if not user_code or len(user_code) != 8:
return await render_template(
"auth/device.html",
error="Please enter a valid 8-character code.",
code=form.get("code", ""),
), 400
from shared.infrastructure.auth_redis import get_auth_redis
r = await get_auth_redis()
device_code = await r.get(f"devflow_uc:{user_code}")
if not device_code:
return await render_template(
"auth/device.html",
error="Code not found or expired. Please try again.",
code=form.get("code", ""),
), 400
if isinstance(device_code, bytes):
device_code = device_code.decode()
# Not logged in — redirect to login, then come back to complete
if not g.get("user"):
complete_url = url_for("auth.device_complete", code=device_code)
store_login_redirect_target()
return redirect(url_for("auth.login_form", next=complete_url))
# Logged in — approve immediately
ok = await _approve_device(device_code, g.user)
if not ok:
return await render_template(
"auth/device.html",
error="Code expired or already used.",
), 400
return await render_template("auth/device_approved.html")
@auth_bp.get("/device/complete")
@auth_bp.get("/device/complete/")
async def device_complete():
"""Post-login redirect — completes approval after magic link auth."""
device_code = request.args.get("code", "")
if not device_code:
return redirect(url_for("auth.device_form"))
if not g.get("user"):
store_login_redirect_target()
return redirect(url_for("auth.login_form"))
ok = await _approve_device(device_code, g.user)
if not ok:
return await render_template(
"auth/device.html",
error="Code expired or already used. Please start the login process again in your terminal.",
), 400
return await render_template("auth/device_approved.html")
return auth_bp

View File

@@ -0,0 +1,41 @@
{% extends "_types/root/_index.html" %}
{% block meta %}{% endblock %}
{% block title %}Authorize Device — Rose Ash{% endblock %}
{% block content %}
<div class="py-8 max-w-md mx-auto">
<h1 class="text-2xl font-bold mb-6">Authorize device</h1>
<p class="text-stone-600 mb-4">Enter the code shown in your terminal to sign in.</p>
{% if error %}
<div class="bg-red-50 border border-red-200 text-red-700 p-3 rounded mb-4">
{{ error }}
</div>
{% endif %}
<form method="post" action="{{ url_for('auth.device_submit') }}" class="space-y-4">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<div>
<label for="code" class="block text-sm font-medium mb-1">Device code</label>
<input
type="text"
name="code"
id="code"
value="{{ code | default('') }}"
placeholder="XXXX-XXXX"
required
autofocus
maxlength="9"
autocomplete="off"
spellcheck="false"
class="w-full border border-stone-300 rounded px-3 py-3 text-center text-2xl tracking-widest font-mono uppercase focus:outline-none focus:ring-2 focus:ring-stone-500"
>
</div>
<button
type="submit"
class="w-full bg-stone-800 text-white py-2 px-4 rounded hover:bg-stone-700 transition"
>
Authorize
</button>
</form>
</div>
{% endblock %}

View File

@@ -0,0 +1,9 @@
{% extends "_types/root/_index.html" %}
{% block meta %}{% endblock %}
{% block title %}Device Authorized — Rose Ash{% endblock %}
{% block content %}
<div class="py-8 max-w-md mx-auto text-center">
<h1 class="text-2xl font-bold mb-4">Device authorized</h1>
<p class="text-stone-600">You can close this window and return to your terminal.</p>
</div>
{% endblock %}