"""Account app data endpoints. Exposes read-only JSON queries at ``/internal/data/`` for cross-app callers via the internal data client. """ from __future__ import annotations from quart import Blueprint, g, jsonify, request from shared.infrastructure.data_client import DATA_HEADER from sqlalchemy import select from shared.models import User def register() -> Blueprint: bp = Blueprint("data", __name__, url_prefix="/internal/data") @bp.before_request async def _require_data_header(): if not request.headers.get(DATA_HEADER): return jsonify({"error": "forbidden"}), 403 from shared.infrastructure.internal_auth import validate_internal_request if not validate_internal_request(): return jsonify({"error": "forbidden"}), 403 _handlers: dict[str, object] = {} @bp.get("/") async def handle_query(query_name: str): handler = _handlers.get(query_name) if handler is None: return jsonify({"error": "unknown query"}), 404 result = await handler() return jsonify(result) # --- user-by-email --- async def _user_by_email(): """Return user_id for a given email address.""" email = request.args.get("email", "").strip().lower() if not email: return None result = await g.s.execute( select(User.id).where(User.email.ilike(email)) ) row = result.first() if not row: return None return {"user_id": row[0]} _handlers["user-by-email"] = _user_by_email # --- newsletters --- async def _newsletters(): """Return all Ghost newsletters (for blog post editor).""" from shared.models.ghost_membership_entities import GhostNewsletter result = await g.s.execute( select(GhostNewsletter.id, GhostNewsletter.ghost_id, GhostNewsletter.name, GhostNewsletter.slug) .order_by(GhostNewsletter.name) ) return [ {"id": row[0], "ghost_id": row[1], "name": row[2], "slug": row[3]} for row in result.all() ] _handlers["newsletters"] = _newsletters return bp