Files
rose-ash/shared/infrastructure/factory.py
giles 404449fcab
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 3m20s
Fix auth ordering: validate grant before loading user
_load_user ran before _check_auth_state, so g.user was set to the wrong
user before the grant check could clear the stale session. Now grant
verification runs first, ensuring stale sessions are cleared before
the user is loaded.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 12:22:35 +00:00

308 lines
11 KiB
Python

from __future__ import annotations
import asyncio
import os
import secrets
from pathlib import Path
from typing import Callable, Awaitable, Sequence
from quart import Quart, request, g, redirect, send_from_directory
from shared.config import init_config, config, pretty
from shared.models import KV # ensure shared models imported
# Register all app model classes with SQLAlchemy so cross-domain
# relationship() string references resolve correctly.
for _mod in ("blog.models", "market.models", "cart.models", "events.models", "federation.models", "account.models"):
try:
__import__(_mod)
except ImportError:
pass
from shared.log_config import configure_logging
from shared.events import EventProcessor
from shared.db.session import register_db
from shared.browser.app.middleware import register as register_middleware
from shared.browser.app.redis_cacher import register as register_redis
from shared.browser.app.csrf import protect
from shared.browser.app.errors import errors
from .jinja_setup import setup_jinja
from .user_loader import load_current_user
# Async init of config (runs once at import)
asyncio.run(init_config())
BASE_DIR = Path(__file__).resolve().parent.parent
STATIC_DIR = str(BASE_DIR / "static")
TEMPLATE_DIR = str(BASE_DIR / "browser" / "templates")
def create_base_app(
name: str,
*,
context_fn: Callable[[], Awaitable[dict]] | None = None,
before_request_fns: Sequence[Callable[[], Awaitable[None]]] | None = None,
domain_services_fn: Callable[[], None] | None = None,
) -> Quart:
"""
Create a Quart app with shared infrastructure.
Parameters
----------
name:
Application name (also used as CACHE_APP_PREFIX).
context_fn:
Async function returning a dict for template context.
Each app provides its own — the cart app queries locally,
while blog/market apps fetch via internal API.
If not provided, a minimal default context is used.
before_request_fns:
Extra before-request hooks (e.g. cart_loader for the cart app).
domain_services_fn:
Callable that registers domain services on the shared registry.
Each app provides its own — registering real impls for owned
domains and stubs (or real impls) for others.
"""
if domain_services_fn is not None:
domain_services_fn()
app = Quart(
name,
static_folder=STATIC_DIR,
static_url_path="/static",
template_folder=TEMPLATE_DIR,
root_path=str(BASE_DIR),
)
configure_logging(name)
app.secret_key = os.getenv("SECRET_KEY", "dev-secret-key-change-me-777")
# Per-app first-party session cookie (no shared domain — avoids Safari ITP)
app.config["SESSION_COOKIE_NAME"] = f"{name}_session"
app.config["SESSION_COOKIE_SAMESITE"] = "Lax"
app.config["SESSION_COOKIE_SECURE"] = True
# Ghost / Redis config
app.config["GHOST_API_URL"] = os.getenv("GHOST_API_URL")
app.config["GHOST_PUBLIC_URL"] = os.getenv("GHOST_PUBLIC_URL")
app.config["GHOST_CONTENT_KEY"] = os.getenv("GHOST_CONTENT_API_KEY")
app.config["REDIS_URL"] = os.getenv("REDIS_URL")
# Cache app prefix for key namespacing
app.config["CACHE_APP_PREFIX"] = name
# --- infrastructure ---
register_middleware(app)
register_db(app)
register_redis(app)
setup_jinja(app)
errors(app)
# Auto-register OAuth client blueprint for non-account apps
# (account is the OAuth authorization server)
if name != "account":
from shared.infrastructure.oauth import create_oauth_blueprint
app.register_blueprint(create_oauth_blueprint(name))
# Auto-register ActivityPub blueprint for AP-enabled apps
from shared.infrastructure.activitypub import AP_APPS
if name in AP_APPS:
from shared.infrastructure.activitypub import create_activitypub_blueprint
app.register_blueprint(create_activitypub_blueprint(name))
# Auto-register per-app social blueprint (not federation — it has its own)
if name in AP_APPS and name != "federation":
from shared.infrastructure.ap_social import create_ap_social_blueprint
app.register_blueprint(create_ap_social_blueprint(name))
# --- device id (all apps, including account) ---
_did_cookie = f"{name}_did"
@app.before_request
async def _init_device_id():
did = request.cookies.get(_did_cookie)
if did:
g.device_id = did
g._new_device_id = False
else:
g.device_id = secrets.token_urlsafe(32)
g._new_device_id = True
@app.after_request
async def _set_device_cookie(response):
if getattr(g, "_new_device_id", False):
response.set_cookie(
_did_cookie, g.device_id,
max_age=30 * 24 * 3600,
secure=True, samesite="Lax", httponly=True,
)
return response
# --- before-request hooks ---
@app.before_request
async def _route_log():
g.root = request.headers.get("x-forwarded-prefix", "/")
g.scheme = request.scheme
g.host = request.host
# Auth state check via grant verification + silent OAuth handshake
# MUST run before _load_user so stale sessions are cleared first
if name != "account":
@app.before_request
async def _check_auth_state():
from quart import session as qs
from urllib.parse import quote as _quote
if request.path.startswith(("/auth/", "/static/", "/.well-known/", "/users/", "/nodeinfo/", "/internal/")):
return
uid = qs.get("uid")
grant_token = qs.get("grant_token")
from shared.infrastructure.auth_redis import get_auth_redis
try:
auth_redis = await get_auth_redis()
except Exception:
auth_redis = None
# Case 1: logged in — verify grant still valid (direct DB, cached)
if uid and not grant_token:
# Legacy session without grant token — clear it
qs.pop("uid", None)
qs.pop("cart_sid", None)
g.user = None
uid = None
if uid and grant_token:
cache_key = f"grant:{grant_token}"
if auth_redis:
# Quick check: if did_auth was cleared (logout), skip cache
device_id = g.device_id
did_auth_present = await auth_redis.get(f"did_auth:{device_id}") if device_id else True
cached = await auth_redis.get(cache_key)
if cached == b"ok" and did_auth_present:
return
if cached == b"revoked":
qs.pop("uid", None)
qs.pop("grant_token", None)
qs.pop("cart_sid", None)
g.user = None
return
from sqlalchemy import select
from shared.db.session import get_account_session
from shared.models.oauth_grant import OAuthGrant
try:
async with get_account_session() as s:
grant = await s.scalar(
select(OAuthGrant).where(OAuthGrant.token == grant_token)
)
valid = grant is not None and grant.revoked_at is None
except Exception:
valid = False # DB error — treat as invalid
if auth_redis:
await auth_redis.set(cache_key, b"ok" if valid else b"revoked", ex=60)
if not valid:
qs.pop("uid", None)
qs.pop("grant_token", None)
qs.pop("cart_sid", None)
g.user = None
return
# Case 2: not logged in — prompt=none OAuth (GET, non-HTMX only)
if not uid and request.method == "GET":
if request.headers.get("HX-Request"):
return
import time as _time
now = _time.time()
pnone_at = qs.get("_pnone_at")
device_id = g.device_id
# Check if account signalled a login after we cached "not logged in"
# (blog_did == account_did — same value set during OAuth callback)
if device_id and auth_redis and pnone_at:
auth_ts = await auth_redis.get(f"did_auth:{device_id}")
if auth_ts:
try:
if float(auth_ts) > pnone_at:
qs.pop("_pnone_at", None)
return redirect(f"/auth/login?prompt=none&next={_quote(request.url, safe='')}")
except (ValueError, TypeError):
pass
if pnone_at and (now - pnone_at) < 300:
return
if device_id and auth_redis:
cached = await auth_redis.get(f"prompt:{name}:{device_id}")
if cached == b"none":
return
return redirect(f"/auth/login?prompt=none&next={_quote(request.url, safe='')}")
@app.before_request
async def _load_user():
await load_current_user()
# Register any app-specific before-request hooks (e.g. cart loader)
if before_request_fns:
for fn in before_request_fns:
app.before_request(fn)
@app.before_request
async def _csrf_protect():
await protect()
# --- after-request hooks ---
# Clear old shared-domain session cookie (migration from .rose-ash.com)
@app.after_request
async def _clear_old_shared_cookie(response):
if request.cookies.get("blog_session"):
response.delete_cookie("blog_session", domain=".rose-ash.com", path="/")
return response
@app.after_request
async def _add_hx_preserve_search_header(response):
value = request.headers.get("X-Search")
if value is not None:
response.headers["HX-Preserve-Search"] = value
return response
# --- context processor ---
if context_fn is not None:
@app.context_processor
async def _inject_base():
return await context_fn()
else:
# Minimal fallback (no cart, no menu_items)
from .context import base_context
@app.context_processor
async def _inject_base():
return await base_context()
# --- event processor ---
_event_processor = EventProcessor(app_name=name)
# --- startup ---
@app.before_serving
async def _startup():
from shared.events.handlers import register_shared_handlers
register_shared_handlers()
await init_config()
print(pretty())
await _event_processor.start()
@app.after_serving
async def _stop_event_processor():
await _event_processor.stop()
from shared.infrastructure.auth_redis import close_auth_redis
await close_auth_redis()
# --- favicon ---
@app.get("/favicon.ico")
async def favicon():
return await send_from_directory(STATIC_DIR, "favicon.ico")
return app