Files
rose-ash/shared/infrastructure/factory.py
giles 1f36987f77
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 2m5s
Replace inter-service _handlers dicts with declarative sx defquery/defaction
The inter-service data layer (fetch_data/call_action) was the least
structured part of the codebase — Python _handlers dicts with ad-hoc
param extraction scattered across 16 route files. This replaces them
with declarative .sx query/action definitions that make the entire
inter-service protocol self-describing and greppable.

Infrastructure:
- defquery/defaction special forms in the sx evaluator
- Query/action registry with load, lookup, and schema introspection
- Query executor using async_eval with I/O primitives
- Blueprint factories (create_data_blueprint/create_action_blueprint)
  with sx-first dispatch and Python fallback
- /internal/schema endpoint on every service
- parse-datetime and split-ids primitives for type coercion

Service extractions:
- LikesService (toggle, is_liked, liked_slugs, liked_ids)
- PageConfigService (ensure, get_by_container, get_by_id, get_batch, update)
- RelationsService (wraps module-level functions)
- AccountDataService (user_by_email, newsletters)
- CartItemsService, MarketDataService (raw SQLAlchemy lookups)

50 of 54 handlers converted to sx, 4 Python fallbacks remain
(ghost-sync/push-member, clear-cart-for-order, create-order).
Net: -1,383 lines Python, +251 lines modified.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-04 08:13:50 +00:00

381 lines
14 KiB
Python

from __future__ import annotations
import asyncio
import os
import secrets
from pathlib import Path
from typing import Callable, Awaitable, Sequence
from quart import Quart, request, g, redirect, send_from_directory
from shared.config import init_config, config, pretty
from shared.models import KV # ensure shared models imported
# Register all app model classes with SQLAlchemy so cross-domain
# relationship() string references resolve correctly.
for _mod in ("blog.models", "market.models", "cart.models", "events.models", "federation.models", "account.models", "relations.models", "likes.models", "orders.models"):
try:
__import__(_mod)
except ImportError:
pass
from shared.log_config import configure_logging
from shared.events import EventProcessor
from shared.db.session import register_db
from shared.browser.app.middleware import register as register_middleware
from shared.browser.app.redis_cacher import register as register_redis
from shared.browser.app.csrf import protect
from shared.browser.app.errors import errors
from .jinja_setup import setup_jinja
from .user_loader import load_current_user
from shared.sx.jinja_bridge import setup_sx_bridge
from shared.sx.components import load_shared_components
from shared.sx.relations import load_relation_registry
# Async init of config (runs once at import)
asyncio.run(init_config())
BASE_DIR = Path(__file__).resolve().parent.parent
STATIC_DIR = str(BASE_DIR / "static")
TEMPLATE_DIR = str(BASE_DIR / "browser" / "templates")
def create_base_app(
name: str,
*,
context_fn: Callable[[], Awaitable[dict]] | None = None,
before_request_fns: Sequence[Callable[[], Awaitable[None]]] | None = None,
domain_services_fn: Callable[[], None] | None = None,
) -> Quart:
"""
Create a Quart app with shared infrastructure.
Parameters
----------
name:
Application name (also used as CACHE_APP_PREFIX).
context_fn:
Async function returning a dict for template context.
Each app provides its own — the cart app queries locally,
while blog/market apps fetch via internal API.
If not provided, a minimal default context is used.
before_request_fns:
Extra before-request hooks (e.g. cart_loader for the cart app).
domain_services_fn:
Callable that registers domain services on the shared registry.
Each app provides its own — registering real impls for owned
domains and stubs (or real impls) for others.
"""
if domain_services_fn is not None:
domain_services_fn()
app = Quart(
name,
static_folder=STATIC_DIR,
static_url_path="/static",
template_folder=TEMPLATE_DIR,
root_path=str(BASE_DIR),
)
# Disable aggressive browser caching of static files in dev
app.config["SEND_FILE_MAX_AGE_DEFAULT"] = 0
configure_logging(name)
secret_key = os.getenv("SECRET_KEY")
if not secret_key:
env = os.getenv("ENVIRONMENT", "development")
if env in ("production", "staging"):
raise RuntimeError("SECRET_KEY environment variable must be set in production")
secret_key = "dev-secret-key-change-me-777"
app.secret_key = secret_key
# Per-app first-party session cookie (no shared domain — avoids Safari ITP)
app.config["SESSION_COOKIE_NAME"] = f"{name}_session"
app.config["SESSION_COOKIE_SAMESITE"] = "Lax"
app.config["SESSION_COOKIE_SECURE"] = True
# Ghost / Redis config
app.config["GHOST_API_URL"] = os.getenv("GHOST_API_URL")
app.config["GHOST_PUBLIC_URL"] = os.getenv("GHOST_PUBLIC_URL")
app.config["GHOST_CONTENT_KEY"] = os.getenv("GHOST_CONTENT_API_KEY")
app.config["REDIS_URL"] = os.getenv("REDIS_URL")
# Cache app prefix for key namespacing
app.config["CACHE_APP_PREFIX"] = name
# Disable page caching in development
env = os.getenv("ENVIRONMENT", "development")
app.config["NO_PAGE_CACHE"] = env not in ("production", "staging")
# --- infrastructure ---
register_middleware(app)
register_db(app)
register_redis(app)
setup_jinja(app)
setup_sx_bridge(app)
load_shared_components()
load_relation_registry()
# Load defquery/defaction definitions from {service}/queries.sx and actions.sx
from shared.sx.query_registry import load_service_protocols
_app_root = Path(os.getcwd())
load_service_protocols(name, str(_app_root))
# Register /internal/schema endpoint for protocol introspection
from shared.infrastructure.schema_blueprint import create_schema_blueprint
app.register_blueprint(create_schema_blueprint(name))
# Load CSS registry (tw.css → class-to-rule lookup for on-demand CSS)
from shared.sx.css_registry import load_css_registry, registry_loaded
_styles = BASE_DIR / "static" / "styles"
_fa_css = BASE_DIR / "static" / "fontawesome" / "css"
if (_styles / "tw.css").exists() and not registry_loaded():
load_css_registry(
_styles / "tw.css",
extra_css=[
_styles / "basics.css",
_styles / "cards.css",
_styles / "blog-content.css",
_styles / "prism.css",
_fa_css / "all.min.css",
_fa_css / "v4-shims.min.css",
],
url_rewrites={"../webfonts/": "/static/fontawesome/webfonts/"},
)
# Dev-mode: auto-reload sx templates when files change on disk
if os.getenv("RELOAD") == "true":
from shared.sx.jinja_bridge import reload_if_changed
@app.before_request
async def _sx_hot_reload():
reload_if_changed()
errors(app)
# Auto-register OAuth client blueprint for non-account apps
# (account is the OAuth authorization server)
_NO_OAUTH = {"account"}
if name not in _NO_OAUTH:
from shared.infrastructure.oauth import create_oauth_blueprint
app.register_blueprint(create_oauth_blueprint(name))
# Auto-register ActivityPub blueprint for AP-enabled apps
from shared.infrastructure.activitypub import AP_APPS
if name in AP_APPS:
from shared.infrastructure.activitypub import create_activitypub_blueprint
app.register_blueprint(create_activitypub_blueprint(name))
# Auto-register per-app social blueprint (not federation — it has its own)
if name in AP_APPS and name != "federation":
from shared.infrastructure.ap_social import create_ap_social_blueprint
from shared.infrastructure.ap_social_sx import setup_social_layout
setup_social_layout()
app.register_blueprint(create_ap_social_blueprint(name))
# --- device id (all apps, including account) ---
_did_cookie = f"{name}_did"
@app.before_request
async def _init_device_id():
did = request.cookies.get(_did_cookie)
if did:
g.device_id = did
g._new_device_id = False
else:
g.device_id = secrets.token_urlsafe(32)
g._new_device_id = True
@app.after_request
async def _set_device_cookie(response):
if getattr(g, "_new_device_id", False):
response.set_cookie(
_did_cookie, g.device_id,
max_age=30 * 24 * 3600,
secure=True, samesite="Lax", httponly=True,
)
return response
# --- before-request hooks ---
@app.before_request
async def _route_log():
g.root = request.headers.get("x-forwarded-prefix", "/")
g.scheme = request.scheme
g.host = request.host
# Auth state check via grant verification + silent OAuth handshake
# MUST run before _load_user so stale sessions are cleared first
if name not in _NO_OAUTH:
@app.before_request
async def _check_auth_state():
from quart import session as qs
from urllib.parse import quote as _quote
if request.path.startswith(("/auth/", "/static/", "/.well-known/", "/users/", "/nodeinfo/", "/internal/")):
return
uid = qs.get("uid")
grant_token = qs.get("grant_token")
from shared.infrastructure.auth_redis import get_auth_redis
try:
auth_redis = await get_auth_redis()
except Exception:
auth_redis = None
# Case 1: logged in — verify grant still valid (direct DB, cached)
if uid and not grant_token:
# Legacy session without grant token — clear it
qs.pop("uid", None)
qs.pop("cart_sid", None)
g.user = None
uid = None
if uid and grant_token:
cache_key = f"grant:{grant_token}"
if auth_redis:
# Quick check: if did_auth was cleared (logout), skip cache
device_id = g.device_id
did_auth_present = await auth_redis.get(f"did_auth:{device_id}") if device_id else True
cached = await auth_redis.get(cache_key)
if cached == b"ok" and did_auth_present:
return
if cached == b"revoked":
qs.pop("uid", None)
qs.pop("grant_token", None)
qs.pop("cart_sid", None)
g.user = None
return
from sqlalchemy import select
from shared.db.session import get_account_session
from shared.models.oauth_grant import OAuthGrant, hash_token
try:
token_h = hash_token(grant_token)
async with get_account_session() as s:
grant = await s.scalar(
select(OAuthGrant).where(
(OAuthGrant.token_hash == token_h) | (OAuthGrant.token == grant_token)
)
)
valid = grant is not None and grant.revoked_at is None
except Exception:
valid = False # DB error — treat as invalid
if auth_redis:
await auth_redis.set(cache_key, b"ok" if valid else b"revoked", ex=60)
if not valid:
qs.pop("uid", None)
qs.pop("grant_token", None)
qs.pop("cart_sid", None)
g.user = None
return
# Case 2: not logged in — prompt=none OAuth (GET, non-HTMX only)
if not uid and request.method == "GET":
if request.headers.get("SX-Request") or request.headers.get("HX-Request"):
return
import time as _time
now = _time.time()
pnone_at = qs.get("_pnone_at")
device_id = g.device_id
# Check if account signalled a login after we cached "not logged in"
# (blog_did == account_did — same value set during OAuth callback)
if device_id and auth_redis and pnone_at:
auth_ts = await auth_redis.get(f"did_auth:{device_id}")
if auth_ts:
try:
if float(auth_ts) > pnone_at:
qs.pop("_pnone_at", None)
return redirect(f"/auth/login?prompt=none&next={_quote(request.url, safe='')}")
except (ValueError, TypeError):
pass
if pnone_at and (now - pnone_at) < 300:
return
if device_id and auth_redis:
cached = await auth_redis.get(f"prompt:{name}:{device_id}")
if cached == b"none":
return
return redirect(f"/auth/login?prompt=none&next={_quote(request.url, safe='')}")
@app.before_request
async def _load_user():
await load_current_user()
# Register any app-specific before-request hooks (e.g. cart loader)
if before_request_fns:
for fn in before_request_fns:
app.before_request(fn)
@app.before_request
async def _csrf_protect():
await protect()
# --- after-request hooks ---
# Clear old shared-domain session cookie (migration from .rose-ash.com)
@app.after_request
async def _clear_old_shared_cookie(response):
if request.cookies.get("blog_session"):
response.delete_cookie("blog_session", domain=".rose-ash.com", path="/")
return response
@app.after_request
async def _cors_for_subdomains(response):
origin = request.headers.get("Origin", "")
if origin.endswith(".rose-ash.com") or origin.endswith(".localhost"):
response.headers["Access-Control-Allow-Origin"] = origin
response.headers["Access-Control-Allow-Credentials"] = "true"
response.headers["Access-Control-Allow-Headers"] = (
"SX-Request, SX-Target, SX-Current-URL, SX-Components, SX-Css, "
"HX-Request, HX-Target, HX-Current-URL, HX-Trigger, "
"Content-Type, X-CSRFToken"
)
response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE, OPTIONS"
return response
@app.after_request
async def _add_hx_preserve_search_header(response):
value = request.headers.get("X-Search")
if value is not None:
response.headers["HX-Preserve-Search"] = value
return response
# --- context processor ---
if context_fn is not None:
@app.context_processor
async def _inject_base():
return await context_fn()
else:
# Minimal fallback (no cart, no menu_items)
from .context import base_context
@app.context_processor
async def _inject_base():
return await base_context()
# --- event processor ---
_event_processor = EventProcessor(app_name=name)
# --- startup ---
@app.before_serving
async def _startup():
from shared.events.handlers import register_shared_handlers
register_shared_handlers()
await init_config()
print(pretty())
await _event_processor.start()
@app.after_serving
async def _stop_event_processor():
await _event_processor.stop()
from shared.infrastructure.auth_redis import close_auth_redis
await close_auth_redis()
# --- favicon ---
@app.get("/favicon.ico")
async def favicon():
return await send_from_directory(STATIC_DIR, "favicon.ico")
return app