Add scoped tokens, L2-side revocation, and security docs
Security improvements: - Tokens now include optional l1_server claim for scoping - /auth/verify checks token scope matches requesting L1 - L2 maintains revoked_tokens table - even if L1 ignores revoke, token fails - Logout revokes token in L2 db before notifying L1s - /renderers/attach creates scoped tokens (not embedded in HTML) - Add get_token_claims() to auth.py Database: - Add revoked_tokens table with token_hash, username, expires_at - Add db functions: revoke_token, is_token_revoked, cleanup_expired_revocations Documentation: - Document security features in README Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
17
auth.py
17
auth.py
@@ -135,13 +135,15 @@ async def authenticate_user(data_dir: Path, username: str, password: str) -> Opt
|
||||
)
|
||||
|
||||
|
||||
def create_access_token(username: str, l2_server: str = None) -> Token:
|
||||
def create_access_token(username: str, l2_server: str = None, l1_server: str = None) -> Token:
|
||||
"""Create a JWT access token.
|
||||
|
||||
Args:
|
||||
username: The username
|
||||
l2_server: The L2 server URL (e.g., https://artdag.rose-ash.com)
|
||||
Required for L1 to verify tokens with the correct L2.
|
||||
l1_server: Optional L1 server URL to scope the token to.
|
||||
If set, token only works for this specific L1.
|
||||
"""
|
||||
expires = datetime.now(timezone.utc) + timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS)
|
||||
|
||||
@@ -156,6 +158,10 @@ def create_access_token(username: str, l2_server: str = None) -> Token:
|
||||
if l2_server:
|
||||
payload["l2_server"] = l2_server
|
||||
|
||||
# Include l1_server to scope token to specific L1
|
||||
if l1_server:
|
||||
payload["l1_server"] = l1_server
|
||||
|
||||
token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
|
||||
|
||||
return Token(
|
||||
@@ -175,6 +181,15 @@ def verify_token(token: str) -> Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
def get_token_claims(token: str) -> Optional[dict]:
|
||||
"""Decode token and return all claims. Returns None if invalid."""
|
||||
try:
|
||||
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
||||
return payload
|
||||
except JWTError:
|
||||
return None
|
||||
|
||||
|
||||
async def get_current_user(data_dir: Path, token: str) -> Optional[User]:
|
||||
"""Get current user from token."""
|
||||
username = verify_token(token)
|
||||
|
||||
Reference in New Issue
Block a user