From 95bd32bd71f98ece7a76de03383a2b39e5c3074d Mon Sep 17 00:00:00 2001 From: giles Date: Wed, 25 Feb 2026 11:32:14 +0000 Subject: [PATCH] Decouple cross-domain DB queries for per-app database split Move Ghost membership sync from blog to account service so blog no longer queries account tables (users, ghost_labels, etc.). Account runs membership sync at startup and exposes HTTP action/data endpoints for webhook-triggered syncs and user lookups. Key changes: - account/services/ghost_membership.py: all membership sync functions - account/bp/actions + data: ghost-sync-member, user-by-email, newsletters - blog ghost_sync.py: stripped to content-only (posts, authors, tags) - blog webhook member: delegates to account via call_action() - try_publish: opens federation session when DBs differ - oauth.py callback: uses get_account_session() for OAuthCode - page_configs moved from db_events to db_blog in split script Co-Authored-By: Claude Opus 4.6 --- _config/move-page-configs.sql | 17 + _config/split-databases.sh | 2 +- account/app.py | 19 + account/bp/actions/__init__.py | 0 account/bp/actions/routes.py | 64 ++ account/bp/data/__init__.py | 0 account/bp/data/routes.py | 64 ++ account/services/ghost_membership.py | 621 +++++++++++++++ blog/bp/blog/ghost/ghost_sync.py | 865 +++------------------ blog/bp/blog/routes.py | 8 +- blog/bp/blog/web_hooks/routes.py | 36 +- blog/bp/post/admin/routes.py | 11 +- shared/infrastructure/ghost_admin_token.py | 46 ++ shared/infrastructure/oauth.py | 5 +- shared/services/federation_publish.py | 57 +- 15 files changed, 1007 insertions(+), 808 deletions(-) create mode 100644 _config/move-page-configs.sql create mode 100644 account/bp/actions/__init__.py create mode 100644 account/bp/actions/routes.py create mode 100644 account/bp/data/__init__.py create mode 100644 account/bp/data/routes.py create mode 100644 account/services/ghost_membership.py create mode 100644 shared/infrastructure/ghost_admin_token.py diff --git a/_config/move-page-configs.sql b/_config/move-page-configs.sql new file mode 100644 index 0000000..99e418b --- /dev/null +++ b/_config/move-page-configs.sql @@ -0,0 +1,17 @@ +-- Move page_configs data from db_events to db_blog. +-- Run after split-databases.sh if page_configs data ended up in db_events. +-- +-- Usage: +-- PGHOST=db PGUSER=postgres PGPASSWORD=change-me psql -f move-page-configs.sql +-- + +-- Step 1: Dump page_configs from db_events into db_blog +\c db_events +COPY page_configs TO '/tmp/page_configs.csv' WITH CSV HEADER; + +\c db_blog +TRUNCATE page_configs; +COPY page_configs FROM '/tmp/page_configs.csv' WITH CSV HEADER; + +-- Step 2: Verify +SELECT count(*) AS blog_page_configs FROM page_configs; diff --git a/_config/split-databases.sh b/_config/split-databases.sh index f4c6588..0d059a4 100755 --- a/_config/split-databases.sh +++ b/_config/split-databases.sh @@ -42,6 +42,7 @@ DB_TABLES[db_blog]=" menu_items menu_nodes container_relations + page_configs " DB_TABLES[db_market]=" @@ -78,7 +79,6 @@ DB_TABLES[db_events]=" calendar_entry_posts ticket_types tickets - page_configs " DB_TABLES[db_federation]=" diff --git a/account/app.py b/account/app.py index c7b0783..5ac1fe2 100644 --- a/account/app.py +++ b/account/app.py @@ -77,6 +77,25 @@ def create_app() -> "Quart": app.register_blueprint(register_account_bp()) app.register_blueprint(register_fragments()) + from bp.actions.routes import register as register_actions + app.register_blueprint(register_actions()) + + from bp.data.routes import register as register_data + app.register_blueprint(register_data()) + + # --- Ghost membership sync at startup --- + @app.before_serving + async def _sync_ghost_membership(): + from services.ghost_membership import sync_all_membership_from_ghost + from shared.db.session import get_session + try: + async with get_session() as s: + await sync_all_membership_from_ghost(s) + await s.commit() + print("[account] Ghost membership sync complete") + except Exception as e: + print(f"[account] Ghost membership sync failed (non-fatal): {e}") + return app diff --git a/account/bp/actions/__init__.py b/account/bp/actions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/account/bp/actions/routes.py b/account/bp/actions/routes.py new file mode 100644 index 0000000..e941123 --- /dev/null +++ b/account/bp/actions/routes.py @@ -0,0 +1,64 @@ +"""Account app action endpoints. + +Exposes write operations at ``/internal/actions/`` for +cross-app callers (blog webhooks) via the internal action client. +""" +from __future__ import annotations + +from quart import Blueprint, g, jsonify, request + +from shared.infrastructure.actions import ACTION_HEADER + + +def register() -> Blueprint: + bp = Blueprint("actions", __name__, url_prefix="/internal/actions") + + @bp.before_request + async def _require_action_header(): + if not request.headers.get(ACTION_HEADER): + return jsonify({"error": "forbidden"}), 403 + + _handlers: dict[str, object] = {} + + @bp.post("/") + async def handle_action(action_name: str): + handler = _handlers.get(action_name) + if handler is None: + return jsonify({"error": "unknown action"}), 404 + try: + result = await handler() + return jsonify(result) + except Exception as exc: + import logging + logging.getLogger(__name__).exception("Action %s failed", action_name) + return jsonify({"error": str(exc)}), 500 + + # --- ghost-sync-member --- + async def _ghost_sync_member(): + """Sync a single Ghost member into db_account.""" + data = await request.get_json() + ghost_id = data.get("ghost_id") + if not ghost_id: + return {"error": "ghost_id required"}, 400 + + from services.ghost_membership import sync_single_member + await sync_single_member(g.s, ghost_id) + return {"ok": True} + + _handlers["ghost-sync-member"] = _ghost_sync_member + + # --- ghost-push-member --- + async def _ghost_push_member(): + """Push a local user's membership data to Ghost.""" + data = await request.get_json() + user_id = data.get("user_id") + if not user_id: + return {"error": "user_id required"}, 400 + + from services.ghost_membership import sync_member_to_ghost + result_id = await sync_member_to_ghost(g.s, int(user_id)) + return {"ok": True, "ghost_id": result_id} + + _handlers["ghost-push-member"] = _ghost_push_member + + return bp diff --git a/account/bp/data/__init__.py b/account/bp/data/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/account/bp/data/routes.py b/account/bp/data/routes.py new file mode 100644 index 0000000..d465081 --- /dev/null +++ b/account/bp/data/routes.py @@ -0,0 +1,64 @@ +"""Account app data endpoints. + +Exposes read-only JSON queries at ``/internal/data/`` for +cross-app callers via the internal data client. +""" +from __future__ import annotations + +from quart import Blueprint, g, jsonify, request + +from shared.infrastructure.data_client import DATA_HEADER +from sqlalchemy import select +from shared.models import User + + +def register() -> Blueprint: + bp = Blueprint("data", __name__, url_prefix="/internal/data") + + @bp.before_request + async def _require_data_header(): + if not request.headers.get(DATA_HEADER): + return jsonify({"error": "forbidden"}), 403 + + _handlers: dict[str, object] = {} + + @bp.get("/") + async def handle_query(query_name: str): + handler = _handlers.get(query_name) + if handler is None: + return jsonify({"error": "unknown query"}), 404 + result = await handler() + return jsonify(result) + + # --- user-by-email --- + async def _user_by_email(): + """Return user_id for a given email address.""" + email = request.args.get("email", "").strip().lower() + if not email: + return None + result = await g.s.execute( + select(User.id).where(User.email.ilike(email)) + ) + row = result.first() + if not row: + return None + return {"user_id": row[0]} + + _handlers["user-by-email"] = _user_by_email + + # --- newsletters --- + async def _newsletters(): + """Return all Ghost newsletters (for blog post editor).""" + from shared.models.ghost_membership_entities import GhostNewsletter + result = await g.s.execute( + select(GhostNewsletter.id, GhostNewsletter.ghost_id, GhostNewsletter.name, GhostNewsletter.slug) + .order_by(GhostNewsletter.name) + ) + return [ + {"id": row[0], "ghost_id": row[1], "name": row[2], "slug": row[3]} + for row in result.all() + ] + + _handlers["newsletters"] = _newsletters + + return bp diff --git a/account/services/ghost_membership.py b/account/services/ghost_membership.py new file mode 100644 index 0000000..58c912b --- /dev/null +++ b/account/services/ghost_membership.py @@ -0,0 +1,621 @@ +"""Ghost membership sync — account-owned. + +Handles Ghost ↔ DB sync for user/membership data: +- Ghost → DB: fetch members from Ghost API, upsert into account tables +- DB → Ghost: push local user changes back to Ghost API + +All tables involved (users, ghost_labels, user_labels, ghost_newsletters, +user_newsletters, ghost_tiers, ghost_subscriptions) live in db_account. +""" +from __future__ import annotations + +import os +import re +import asyncio +from datetime import datetime +from typing import Dict, Any, Optional + +import httpx +from sqlalchemy import select, delete, or_, and_ +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm.attributes import flag_modified + +from shared.models import User +from shared.models.ghost_membership_entities import ( + GhostLabel, UserLabel, + GhostNewsletter, UserNewsletter, + GhostTier, GhostSubscription, +) + +from shared.infrastructure.ghost_admin_token import make_ghost_admin_jwt +from urllib.parse import quote + +GHOST_ADMIN_API_URL = os.environ.get("GHOST_ADMIN_API_URL", "") + + +def _auth_header() -> dict[str, str]: + return {"Authorization": f"Ghost {make_ghost_admin_jwt()}"} + + +def _iso(val: str | None) -> datetime | None: + if not val: + return None + return datetime.fromisoformat(val.replace("Z", "+00:00")) + + +def _to_str_or_none(v) -> Optional[str]: + if v is None: + return None + if isinstance(v, (dict, list, set, tuple, bytes, bytearray)): + return None + s = str(v).strip() + return s or None + + +def _sanitize_member_payload(payload: dict) -> dict: + """Coerce types Ghost expects and drop empties to avoid 422/500 quirks.""" + out: dict = {} + + email = _to_str_or_none(payload.get("email")) + if email: + out["email"] = email.lower() + + name = _to_str_or_none(payload.get("name")) + if name is not None: + out["name"] = name + + note = _to_str_or_none(payload.get("note")) + if note is not None: + out["note"] = note + + if "subscribed" in payload: + out["subscribed"] = bool(payload.get("subscribed")) + + labels = [] + for item in payload.get("labels") or []: + gid = _to_str_or_none(item.get("id")) + gname = _to_str_or_none(item.get("name")) + if gid: + labels.append({"id": gid}) + elif gname: + labels.append({"name": gname}) + if labels: + out["labels"] = labels + + newsletters = [] + for item in payload.get("newsletters") or []: + gid = _to_str_or_none(item.get("id")) + gname = _to_str_or_none(item.get("name")) + row = {"subscribed": bool(item.get("subscribed", True))} + if gid: + row["id"] = gid + newsletters.append(row) + elif gname: + row["name"] = gname + newsletters.append(row) + if newsletters: + out["newsletters"] = newsletters + + gid = _to_str_or_none(payload.get("id")) + if gid: + out["id"] = gid + + return out + + +def _member_email(m: dict[str, Any]) -> Optional[str]: + email = (m.get("email") or "").strip().lower() or None + return email + + +# ---- upsert helpers for related entities ---- + +async def _upsert_label(sess: AsyncSession, data: dict) -> GhostLabel: + res = await sess.execute(select(GhostLabel).where(GhostLabel.ghost_id == data["id"])) + obj = res.scalar_one_or_none() + if not obj: + obj = GhostLabel(ghost_id=data["id"]) + sess.add(obj) + obj.name = data.get("name") or obj.name + obj.slug = data.get("slug") or obj.slug + await sess.flush() + return obj + + +async def _upsert_newsletter(sess: AsyncSession, data: dict) -> GhostNewsletter: + res = await sess.execute(select(GhostNewsletter).where(GhostNewsletter.ghost_id == data["id"])) + obj = res.scalar_one_or_none() + if not obj: + obj = GhostNewsletter(ghost_id=data["id"]) + sess.add(obj) + obj.name = data.get("name") or obj.name + obj.slug = data.get("slug") or obj.slug + obj.description = data.get("description") or obj.description + await sess.flush() + return obj + + +async def _upsert_tier(sess: AsyncSession, data: dict) -> GhostTier: + res = await sess.execute(select(GhostTier).where(GhostTier.ghost_id == data["id"])) + obj = res.scalar_one_or_none() + if not obj: + obj = GhostTier(ghost_id=data["id"]) + sess.add(obj) + obj.name = data.get("name") or obj.name + obj.slug = data.get("slug") or obj.slug + obj.type = data.get("type") or obj.type + obj.visibility = data.get("visibility") or obj.visibility + await sess.flush() + return obj + + +def _price_cents(sd: dict) -> Optional[int]: + try: + return int((sd.get("price") or {}).get("amount")) + except Exception: + return None + + +# ---- find/create user by ghost_id or email ---- + +async def _find_or_create_user_by_ghost_or_email(sess: AsyncSession, data: dict) -> User: + ghost_id = data.get("id") + email = _member_email(data) + + if ghost_id: + res = await sess.execute(select(User).where(User.ghost_id == ghost_id)) + u = res.scalar_one_or_none() + if u: + return u + + if email: + res = await sess.execute(select(User).where(User.email.ilike(email))) + u = res.scalar_one_or_none() + if u: + if ghost_id and not u.ghost_id: + u.ghost_id = ghost_id + return u + + u = User(email=email or f"_ghost_{ghost_id}@invalid.local") + if ghost_id: + u.ghost_id = ghost_id + sess.add(u) + await sess.flush() + return u + + +# ---- apply membership data to user ---- + +async def _apply_user_membership(sess: AsyncSession, user: User, m: dict) -> User: + """Apply Ghost member payload to local User.""" + sess.add(user) + + user.name = m.get("name") or user.name + user.ghost_status = m.get("status") or user.ghost_status + user.ghost_subscribed = bool(m.get("subscribed", True)) + user.ghost_note = m.get("note") or user.ghost_note + user.avatar_image = m.get("avatar_image") or user.avatar_image + user.stripe_customer_id = ( + (m.get("stripe") or {}).get("customer_id") + or (m.get("customer") or {}).get("id") + or m.get("stripe_customer_id") + or user.stripe_customer_id + ) + user.ghost_raw = dict(m) + flag_modified(user, "ghost_raw") + + await sess.flush() + + # Labels join + label_ids: list[int] = [] + for ld in m.get("labels") or []: + lbl = await _upsert_label(sess, ld) + label_ids.append(lbl.id) + await sess.execute(delete(UserLabel).where(UserLabel.user_id == user.id)) + for lid in label_ids: + sess.add(UserLabel(user_id=user.id, label_id=lid)) + await sess.flush() + + # Newsletters join with subscribed flag + nl_rows: list[tuple[int, bool]] = [] + for nd in m.get("newsletters") or []: + nl = await _upsert_newsletter(sess, nd) + nl_rows.append((nl.id, bool(nd.get("subscribed", True)))) + await sess.execute(delete(UserNewsletter).where(UserNewsletter.user_id == user.id)) + for nl_id, subbed in nl_rows: + sess.add(UserNewsletter(user_id=user.id, newsletter_id=nl_id, subscribed=subbed)) + await sess.flush() + + # Subscriptions + for sd in m.get("subscriptions") or []: + sid = sd.get("id") + if not sid: + continue + + tier_id: Optional[int] = None + if sd.get("tier"): + tier = await _upsert_tier(sess, sd["tier"]) + await sess.flush() + tier_id = tier.id + + res = await sess.execute(select(GhostSubscription).where(GhostSubscription.ghost_id == sid)) + sub = res.scalar_one_or_none() + if not sub: + sub = GhostSubscription(ghost_id=sid, user_id=user.id) + sess.add(sub) + + sub.user_id = user.id + sub.status = sd.get("status") or sub.status + sub.cadence = (sd.get("plan") or {}).get("interval") or sd.get("cadence") or sub.cadence + sub.price_amount = _price_cents(sd) + sub.price_currency = (sd.get("price") or {}).get("currency") or sub.price_currency + sub.stripe_customer_id = ( + (sd.get("customer") or {}).get("id") + or (sd.get("stripe") or {}).get("customer_id") + or sub.stripe_customer_id + ) + sub.stripe_subscription_id = ( + sd.get("stripe_subscription_id") + or (sd.get("stripe") or {}).get("subscription_id") + or sub.stripe_subscription_id + ) + if tier_id is not None: + sub.tier_id = tier_id + sub.raw = dict(sd) + flag_modified(sub, "raw") + + await sess.flush() + return user + + +# ===================================================== +# PUSH MEMBERS FROM LOCAL DB -> GHOST (DB -> Ghost) +# ===================================================== + +def _ghost_member_payload_base(u: User) -> dict: + email = _to_str_or_none(getattr(u, "email", None)) + payload: dict = {} + if email: + payload["email"] = email.lower() + + name = _to_str_or_none(getattr(u, "name", None)) + if name: + payload["name"] = name + + note = _to_str_or_none(getattr(u, "ghost_note", None)) + if note: + payload["note"] = note + + subscribed = getattr(u, "ghost_subscribed", True) + payload["subscribed"] = bool(subscribed) + + return payload + + +async def _newsletters_for_user(sess: AsyncSession, user_id: int) -> list[dict]: + q = await sess.execute( + select(GhostNewsletter.ghost_id, UserNewsletter.subscribed, GhostNewsletter.name) + .join(UserNewsletter, UserNewsletter.newsletter_id == GhostNewsletter.id) + .where(UserNewsletter.user_id == user_id) + ) + seen = set() + out: list[dict] = [] + for gid, subscribed, name in q.all(): + gid = (gid or "").strip() or None + name = (name or "").strip() or None + row: dict = {"subscribed": bool(subscribed)} + if gid: + key = ("id", gid) + if key in seen: + continue + row["id"] = gid + seen.add(key) + out.append(row) + elif name: + key = ("name", name.lower()) + if key in seen: + continue + row["name"] = name + seen.add(key) + out.append(row) + return out + + +async def _labels_for_user(sess: AsyncSession, user_id: int) -> list[dict]: + q = await sess.execute( + select(GhostLabel.ghost_id, GhostLabel.name) + .join(UserLabel, UserLabel.label_id == GhostLabel.id) + .where(UserLabel.user_id == user_id) + ) + seen = set() + out: list[dict] = [] + for gid, name in q.all(): + gid = (gid or "").strip() or None + name = (name or "").strip() or None + if gid: + key = ("id", gid) + if key not in seen: + out.append({"id": gid}) + seen.add(key) + elif name: + key = ("name", name.lower()) + if key not in seen: + out.append({"name": name}) + seen.add(key) + return out + + +async def _ghost_find_member_by_email(email: str) -> Optional[dict]: + if not email: + return None + async with httpx.AsyncClient(timeout=30) as client: + resp = await client.get( + f"{GHOST_ADMIN_API_URL}/members/?filter=email:{quote(email)}&limit=1", + headers=_auth_header(), + ) + resp.raise_for_status() + members = resp.json().get("members") or [] + return members[0] if members else None + + +async def _ghost_upsert_member(payload: dict, ghost_id: str | None = None) -> dict: + """Create/update a member, with sanitization + 5xx retry/backoff.""" + safe_keys = ("email", "name", "note", "subscribed", "labels", "newsletters", "id") + pl_raw = {k: v for k, v in payload.items() if k in safe_keys} + pl = _sanitize_member_payload(pl_raw) + + async def _request_with_retry(client: httpx.AsyncClient, method: str, url: str, json: dict) -> httpx.Response: + delay = 0.5 + for attempt in range(3): + r = await client.request(method, url, headers=_auth_header(), json=json) + if r.status_code >= 500: + if attempt < 2: + await asyncio.sleep(delay) + delay *= 2 + continue + return r + return r + + async with httpx.AsyncClient(timeout=30) as client: + + async def _put(mid: str, p: dict) -> dict: + r = await _request_with_retry( + client, "PUT", + f"{GHOST_ADMIN_API_URL}/members/{mid}/", + {"members": [p]}, + ) + if r.status_code == 404: + existing = await _ghost_find_member_by_email(p.get("email", "")) + if existing and existing.get("id"): + r2 = await _request_with_retry( + client, "PUT", + f"{GHOST_ADMIN_API_URL}/members/{existing['id']}/", + {"members": [p]}, + ) + r2.raise_for_status() + return (r2.json().get("members") or [None])[0] or {} + r3 = await _request_with_retry( + client, "POST", + f"{GHOST_ADMIN_API_URL}/members/", + {"members": [p]}, + ) + r3.raise_for_status() + return (r3.json().get("members") or [None])[0] or {} + + if r.status_code == 422: + body = (r.text or "").lower() + retry = dict(p) + dropped = False + if '"note"' in body or "for note" in body: + retry.pop("note", None); dropped = True + if '"name"' in body or "for name" in body: + retry.pop("name", None); dropped = True + if "labels.name" in body: + retry.pop("labels", None); dropped = True + if dropped: + r2 = await _request_with_retry( + client, "PUT", + f"{GHOST_ADMIN_API_URL}/members/{mid}/", + {"members": [retry]}, + ) + if r2.status_code == 404: + existing = await _ghost_find_member_by_email(retry.get("email", "")) + if existing and existing.get("id"): + r3 = await _request_with_retry( + client, "PUT", + f"{GHOST_ADMIN_API_URL}/members/{existing['id']}/", + {"members": [retry]}, + ) + r3.raise_for_status() + return (r3.json().get("members") or [None])[0] or {} + r3 = await _request_with_retry( + client, "POST", + f"{GHOST_ADMIN_API_URL}/members/", + {"members": [retry]}, + ) + r3.raise_for_status() + return (r3.json().get("members") or [None])[0] or {} + r2.raise_for_status() + return (r2.json().get("members") or [None])[0] or {} + r.raise_for_status() + return (r.json().get("members") or [None])[0] or {} + + async def _post_upsert(p: dict) -> dict: + r = await _request_with_retry( + client, "POST", + f"{GHOST_ADMIN_API_URL}/members/?upsert=true", + {"members": [p]}, + ) + if r.status_code == 422: + lower = (r.text or "").lower() + + retry = dict(p) + changed = False + if '"note"' in lower or "for note" in lower: + retry.pop("note", None); changed = True + if '"name"' in lower or "for name" in lower: + retry.pop("name", None); changed = True + if "labels.name" in lower: + retry.pop("labels", None); changed = True + + if changed: + r2 = await _request_with_retry( + client, "POST", + f"{GHOST_ADMIN_API_URL}/members/?upsert=true", + {"members": [retry]}, + ) + if r2.status_code != 422: + r2.raise_for_status() + return (r2.json().get("members") or [None])[0] or {} + lower = (r2.text or "").lower() + + if "already exists" in lower and "email address" in lower: + existing = await _ghost_find_member_by_email(p.get("email", "")) + if existing and existing.get("id"): + return await _put(existing["id"], p) + + raise httpx.HTTPStatusError( + "Validation error, cannot edit member.", + request=r.request, + response=r, + ) + r.raise_for_status() + return (r.json().get("members") or [None])[0] or {} + + if ghost_id: + return await _put(ghost_id, pl) + return await _post_upsert(pl) + + +async def sync_member_to_ghost(sess: AsyncSession, user_id: int) -> Optional[str]: + """Push a single user's membership data to Ghost.""" + res = await sess.execute(select(User).where(User.id == user_id)) + user = res.scalar_one_or_none() + if not user: + return None + + payload = _ghost_member_payload_base(user) + + labels = await _labels_for_user(sess, user.id) + if labels: + payload["labels"] = labels + + ghost_member = await _ghost_upsert_member(payload, ghost_id=user.ghost_id) + + if ghost_member: + gm_id = ghost_member.get("id") + if gm_id and user.ghost_id != gm_id: + user.ghost_id = gm_id + user.ghost_raw = dict(ghost_member) + flag_modified(user, "ghost_raw") + await sess.flush() + return user.ghost_id or gm_id + return user.ghost_id + + +async def sync_members_to_ghost( + sess: AsyncSession, + changed_since: Optional[datetime] = None, + limit: Optional[int] = None, +) -> int: + """Upsert a batch of users to Ghost. Returns count processed.""" + stmt = select(User.id) + if changed_since: + stmt = stmt.where( + or_( + User.created_at >= changed_since, + and_(User.last_login_at != None, User.last_login_at >= changed_since), + ) + ) + if limit: + stmt = stmt.limit(limit) + + ids = [row[0] for row in (await sess.execute(stmt)).all()] + processed = 0 + for uid in ids: + try: + await sync_member_to_ghost(sess, uid) + processed += 1 + except httpx.HTTPStatusError as e: + print(f"[ghost sync] failed upsert for user {uid}: {e.response.status_code} {e.response.text}") + except Exception as e: + print(f"[ghost sync] failed upsert for user {uid}: {e}") + return processed + + +# ===================================================== +# Membership fetch/sync (Ghost -> DB) bulk + single +# ===================================================== + +async def fetch_all_members_from_ghost() -> list[dict[str, Any]]: + async with httpx.AsyncClient(timeout=60) as client: + resp = await client.get( + f"{GHOST_ADMIN_API_URL}/members/?include=labels,subscriptions,tiers,newsletters&limit=all", + headers=_auth_header(), + ) + resp.raise_for_status() + return resp.json().get("members", []) + + +async def sync_all_membership_from_ghost(sess: AsyncSession) -> None: + """Bulk sync: fetch all members from Ghost, upsert into DB.""" + members = await fetch_all_members_from_ghost() + + label_bucket: Dict[str, dict[str, Any]] = {} + tier_bucket: Dict[str, dict[str, Any]] = {} + newsletter_bucket: Dict[str, dict[str, Any]] = {} + + for m in members: + for l in m.get("labels") or []: + label_bucket[l["id"]] = l + for n in m.get("newsletters") or []: + newsletter_bucket[n["id"]] = n + for s in m.get("subscriptions") or []: + t = s.get("tier") + if isinstance(t, dict) and t.get("id"): + tier_bucket[t["id"]] = t + + for L in label_bucket.values(): + await _upsert_label(sess, L) + for T in tier_bucket.values(): + await _upsert_tier(sess, T) + for N in newsletter_bucket.values(): + await _upsert_newsletter(sess, N) + + for gm in members: + user = await _find_or_create_user_by_ghost_or_email(sess, gm) + await _apply_user_membership(sess, user, gm) + + +async def fetch_single_member_from_ghost(ghost_id: str) -> Optional[dict[str, Any]]: + async with httpx.AsyncClient(timeout=30) as client: + resp = await client.get( + f"{GHOST_ADMIN_API_URL}/members/{ghost_id}/?include=labels,newsletters,subscriptions,tiers", + headers=_auth_header(), + ) + if resp.status_code == 404: + return None + resp.raise_for_status() + data = resp.json() + items = data.get("members") or data.get("member") or [] + if isinstance(items, dict): + return items + return (items[0] if items else None) + + +async def sync_single_member(sess: AsyncSession, ghost_id: str) -> None: + """Sync a single member from Ghost into DB.""" + m = await fetch_single_member_from_ghost(ghost_id) + if m is None: + return + + for l in m.get("labels") or []: + await _upsert_label(sess, l) + for n in m.get("newsletters") or []: + await _upsert_newsletter(sess, n) + for s in m.get("subscriptions") or []: + if isinstance(s.get("tier"), dict): + await _upsert_tier(sess, s["tier"]) + + user = await _find_or_create_user_by_ghost_or_email(sess, m) + await _apply_user_membership(sess, user, m) diff --git a/blog/bp/blog/ghost/ghost_sync.py b/blog/bp/blog/ghost/ghost_sync.py index c3d92ee..f1bd6fd 100644 --- a/blog/bp/blog/ghost/ghost_sync.py +++ b/blog/bp/blog/ghost/ghost_sync.py @@ -1,3 +1,11 @@ +"""Ghost content sync — blog-owned. + +Handles Ghost ↔ blog DB sync for content data only: +posts, pages, authors, tags. All models live in db_blog. + +Membership sync (users, labels, newsletters, tiers, subscriptions) is +handled by the account service — see account/services/ghost_membership.py. +""" from __future__ import annotations import os import re @@ -7,34 +15,19 @@ from html import escape as html_escape from typing import Dict, Any, Optional import httpx -from sqlalchemy import select, delete, or_, and_ +from sqlalchemy import select, delete from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm.attributes import flag_modified # for non-Mutable JSON columns -# Content models from models.ghost_content import ( Post, Author, Tag, PostAuthor, PostTag ) from shared.models.page_config import PageConfig -# User-centric membership models -from shared.models import User -from shared.models.ghost_membership_entities import ( - GhostLabel, UserLabel, - GhostNewsletter, UserNewsletter, - GhostTier, GhostSubscription, -) - -from .ghost_admin_token import make_ghost_admin_jwt - -from urllib.parse import quote +from shared.infrastructure.ghost_admin_token import make_ghost_admin_jwt GHOST_ADMIN_API_URL = os.environ["GHOST_ADMIN_API_URL"] -from shared.browser.app.utils import ( - utcnow -) - +from shared.browser.app.utils import utcnow def _auth_header() -> dict[str, str]: @@ -46,72 +39,7 @@ def _iso(val: str | None) -> datetime | None: return None return datetime.fromisoformat(val.replace("Z", "+00:00")) -def _to_str_or_none(v) -> Optional[str]: - """Return a trimmed string if v is safely stringifiable; else None.""" - if v is None: - return None - # Disallow complex types that would stringify to JSON-like noise - if isinstance(v, (dict, list, set, tuple, bytes, bytearray)): - return None - s = str(v).strip() - return s or None - -def _sanitize_member_payload(payload: dict) -> dict: - """Coerce types Ghost expects and drop empties to avoid 422/500 quirks.""" - out: dict = {} - - # email -> lowercase string - email = _to_str_or_none(payload.get("email")) - if email: - out["email"] = email.lower() - - # name / note must be strings if present - name = _to_str_or_none(payload.get("name")) - if name is not None: - out["name"] = name - - note = _to_str_or_none(payload.get("note")) - if note is not None: - out["note"] = note - - # subscribed -> bool - if "subscribed" in payload: - out["subscribed"] = bool(payload.get("subscribed")) - - # labels: keep only rows that have a non-empty id OR name - labels = [] - for item in payload.get("labels") or []: - gid = _to_str_or_none(item.get("id")) - gname = _to_str_or_none(item.get("name")) - if gid: - labels.append({"id": gid}) - elif gname: # only include if non-empty - labels.append({"name": gname}) - if labels: - out["labels"] = labels - - # newsletters: keep only rows with id OR name; coerce subscribed -> bool - newsletters = [] - for item in payload.get("newsletters") or []: - gid = _to_str_or_none(item.get("id")) - gname = _to_str_or_none(item.get("name")) - row = {"subscribed": bool(item.get("subscribed", True))} - if gid: - row["id"] = gid - newsletters.append(row) - elif gname: - row["name"] = gname - newsletters.append(row) - if newsletters: - out["newsletters"] = newsletters - - # id (if we carry a known ghost_id) - gid = _to_str_or_none(payload.get("id")) - if gid: - out["id"] = gid - - return out # ===================== # CONTENT UPSERT HELPERS # ===================== @@ -123,7 +51,6 @@ async def _upsert_author(sess: AsyncSession, ga: Dict[str, Any]) -> Author: obj = Author(ghost_id=ga["id"]) sess.add(obj) - # revive if soft-deleted obj.deleted_at = None obj.slug = ga.get("slug") or obj.slug @@ -150,7 +77,7 @@ async def _upsert_tag(sess: AsyncSession, gt: Dict[str, Any]) -> Tag: obj = Tag(ghost_id=gt["id"]) sess.add(obj) - obj.deleted_at = None # revive if soft-deleted + obj.deleted_at = None obj.slug = gt.get("slug") or obj.slug obj.name = gt.get("name") or obj.name @@ -168,7 +95,7 @@ async def _upsert_tag(sess: AsyncSession, gt: Dict[str, Any]) -> Tag: def _apply_ghost_fields(obj: Post, gp: Dict[str, Any], author_map: Dict[str, Author], tag_map: Dict[str, Tag]) -> None: """Apply Ghost API fields to a Post ORM object.""" - obj.deleted_at = None # revive if soft-deleted + obj.deleted_at = None obj.uuid = gp.get("uuid") or obj.uuid obj.slug = gp.get("slug") or obj.slug @@ -205,10 +132,23 @@ def _apply_ghost_fields(obj: Post, gp: Dict[str, Any], author_map: Dict[str, Aut obj.created_at = _iso(gp.get("created_at")) or obj.created_at or utcnow() pa = gp.get("primary_author") - obj.primary_author_id = author_map[pa["id"].strip()].id if pa else None # type: ignore[index] + obj.primary_author_id = author_map[pa["id"].strip()].id if pa else None pt = gp.get("primary_tag") - obj.primary_tag_id = tag_map[pt["id"].strip()].id if (pt and pt["id"] in tag_map) else None # type: ignore[index] + obj.primary_tag_id = tag_map[pt["id"].strip()].id if (pt and pt["id"] in tag_map) else None + + +async def _resolve_user_id_by_email(email: str) -> Optional[int]: + """Look up user_id from account service via HTTP (cross-domain safe).""" + from shared.infrastructure.data_client import fetch_data + result = await fetch_data( + "account", "user-by-email", + params={"email": email}, + required=False, + ) + if result and isinstance(result, dict): + return result.get("user_id") + return None async def _upsert_post(sess: AsyncSession, gp: Dict[str, Any], author_map: Dict[str, Author], tag_map: Dict[str, Tag]) -> tuple[Post, str | None]: @@ -221,35 +161,28 @@ async def _upsert_post(sess: AsyncSession, gp: Dict[str, Any], author_map: Dict[ old_status = obj.status if obj is not None else None if obj is not None: - # Row exists — just update _apply_ghost_fields(obj, gp, author_map, tag_map) await sess.flush() else: - # Row doesn't exist — try to insert within a savepoint - obj = Post(ghost_id=gp["id"]) # type: ignore[call-arg] + obj = Post(ghost_id=gp["id"]) try: async with sess.begin_nested(): sess.add(obj) _apply_ghost_fields(obj, gp, author_map, tag_map) await sess.flush() except IntegrityError: - # Race condition: another request inserted this ghost_id. - # Savepoint rolled back; re-select and update. res = await sess.execute(select(Post).where(Post.ghost_id == gp["id"])) obj = res.scalar_one() _apply_ghost_fields(obj, gp, author_map, tag_map) await sess.flush() - # Backfill user_id from primary author email if not already set + # Backfill user_id from primary author email via account service if obj.user_id is None and obj.primary_author_id is not None: pa_obj = author_map.get(gp.get("primary_author", {}).get("id", "")) if pa_obj and pa_obj.email: - user_res = await sess.execute( - select(User).where(User.email.ilike(pa_obj.email)) - ) - matched_user = user_res.scalar_one_or_none() - if matched_user: - obj.user_id = matched_user.id + user_id = await _resolve_user_id_by_email(pa_obj.email) + if user_id: + obj.user_id = user_id await sess.flush() # rebuild post_authors @@ -275,21 +208,73 @@ async def _upsert_post(sess: AsyncSession, gp: Dict[str, Any], author_map: Dict[ return obj, old_status -async def _ghost_find_member_by_email(email: str) -> Optional[dict]: - """Return first Ghost member with this email, or None.""" - if not email: - return None - async with httpx.AsyncClient(timeout=30) as client: - resp = await client.get( - f"{GHOST_ADMIN_API_URL}/members/?filter=email:{quote(email)}&limit=1", - headers=_auth_header(), - ) - resp.raise_for_status() - members = resp.json().get("members") or [] - return members[0] if members else None + +def _build_ap_post_data(post, post_url: str, tag_objs: list) -> dict: + """Build rich AP object_data for a blog post/page.""" + parts: list[str] = [] + if post.title: + parts.append(f"

{html_escape(post.title)}

") + + body = post.plaintext or post.custom_excerpt or post.excerpt or "" + + if body: + for para in body.split("\n\n"): + para = para.strip() + if para: + parts.append(f"

{html_escape(para)}

") + + parts.append(f'

Read more \u2192

') + + if tag_objs: + ht_links = [] + for t in tag_objs: + clean = t.slug.replace("-", "") + ht_links.append( + f'' + ) + parts.append(f'

{" ".join(ht_links)}

') + + obj: dict = { + "name": post.title or "", + "content": "\n".join(parts), + "url": post_url, + } + + attachments: list[dict] = [] + seen: set[str] = set() + + if post.feature_image: + att: dict = {"type": "Image", "url": post.feature_image} + if post.feature_image_alt: + att["name"] = post.feature_image_alt + attachments.append(att) + seen.add(post.feature_image) + + if post.html: + for src in re.findall(r']+src="([^"]+)"', post.html): + if src not in seen and len(attachments) < 4: + attachments.append({"type": "Image", "url": src}) + seen.add(src) + + if attachments: + obj["attachment"] = attachments + + if tag_objs: + obj["tag"] = [ + { + "type": "Hashtag", + "href": f"{post_url}tag/{t.slug}/", + "name": f"#{t.slug.replace('-', '')}", + } + for t in tag_objs + ] + + return obj -# --- add this helper next to fetch_all_posts_from_ghost() --- +# ===================================================== +# Ghost API fetch helpers +# ===================================================== async def _fetch_all_from_ghost(endpoint: str) -> list[dict[str, Any]]: async with httpx.AsyncClient(timeout=30) as client: @@ -298,26 +283,23 @@ async def _fetch_all_from_ghost(endpoint: str) -> list[dict[str, Any]]: headers=_auth_header(), ) resp.raise_for_status() - # admin posts endpoint returns {"posts": [...]}, pages returns {"pages": [...]} key = "posts" if endpoint == "posts" else "pages" return resp.json().get(key, []) + async def fetch_all_posts_and_pages_from_ghost() -> list[dict[str, Any]]: posts, pages = await asyncio.gather( _fetch_all_from_ghost("posts"), _fetch_all_from_ghost("pages"), ) - # Be explicit: ensure page flag exists for pages (Ghost typically includes "page": true) for p in pages: p["page"] = True return posts + pages async def sync_all_content_from_ghost(sess: AsyncSession) -> None: - #data = await fetch_all_posts_from_ghost() + """Bulk sync all Ghost content (posts + pages) into db_blog.""" data = await fetch_all_posts_and_pages_from_ghost() - # Use a transaction so all upserts/soft-deletes commit together - # buckets of authors/tags we saw in Ghost author_bucket: Dict[str, dict[str, Any]] = {} tag_bucket: Dict[str, dict[str, Any]] = {} @@ -332,610 +314,44 @@ async def sync_all_content_from_ghost(sess: AsyncSession) -> None: if p.get("primary_tag"): tag_bucket[p["primary_tag"]["id"]] = p["primary_tag"] - # sets of ghost_ids we've seen in Ghost RIGHT NOW seen_post_ids = {p["id"] for p in data} seen_author_ids = set(author_bucket.keys()) seen_tag_ids = set(tag_bucket.keys()) - # upsert authors author_map: Dict[str, Author] = {} for ga in author_bucket.values(): a = await _upsert_author(sess, ga) author_map[ga["id"]] = a - # upsert tags tag_map: Dict[str, Tag] = {} for gt in tag_bucket.values(): t = await _upsert_tag(sess, gt) tag_map[gt["id"]] = t - # upsert posts (including M2M) for gp in data: await _upsert_post(sess, gp, author_map, tag_map) # soft-delete anything that no longer exists in Ghost now = utcnow() - # Authors not seen -> mark deleted_at if not already db_authors = await sess.execute(select(Author)) for local_author in db_authors.scalars(): if local_author.ghost_id not in seen_author_ids: if local_author.deleted_at is None: local_author.deleted_at = now - # Tags not seen -> mark deleted_at db_tags = await sess.execute(select(Tag)) for local_tag in db_tags.scalars(): if local_tag.ghost_id not in seen_tag_ids: if local_tag.deleted_at is None: local_tag.deleted_at = now - # Posts not seen -> mark deleted_at db_posts = await sess.execute(select(Post)) for local_post in db_posts.scalars(): if local_post.ghost_id not in seen_post_ids: if local_post.deleted_at is None: local_post.deleted_at = now - # transaction auto-commits here - - -#===================================================== -# MEMBERSHIP SYNC (USER-CENTRIC) Ghost -> DB -#===================================================== - -def _member_email(m: dict[str, Any]) -> Optional[str]: - email = (m.get("email") or "").strip().lower() or None - return email - - -# ---- small upsert helpers for related entities ---- - -async def _upsert_label(sess: AsyncSession, data: dict) -> GhostLabel: - res = await sess.execute(select(GhostLabel).where(GhostLabel.ghost_id == data["id"])) - obj = res.scalar_one_or_none() - if not obj: - obj = GhostLabel(ghost_id=data["id"]) - sess.add(obj) - obj.name = data.get("name") or obj.name - obj.slug = data.get("slug") or obj.slug - await sess.flush() - return obj - - -async def _upsert_newsletter(sess: AsyncSession, data: dict) -> GhostNewsletter: - res = await sess.execute(select(GhostNewsletter).where(GhostNewsletter.ghost_id == data["id"])) - obj = res.scalar_one_or_none() - if not obj: - obj = GhostNewsletter(ghost_id=data["id"]) - sess.add(obj) - obj.name = data.get("name") or obj.name - obj.slug = data.get("slug") or obj.slug - obj.description = data.get("description") or obj.description - await sess.flush() - return obj - - -async def _upsert_tier(sess: AsyncSession, data: dict) -> GhostTier: - res = await sess.execute(select(GhostTier).where(GhostTier.ghost_id == data["id"])) - obj = res.scalar_one_or_none() - if not obj: - obj = GhostTier(ghost_id=data["id"]) - sess.add(obj) - obj.name = data.get("name") or obj.name - obj.slug = data.get("slug") or obj.slug - obj.type = data.get("type") or obj.type - obj.visibility = data.get("visibility") or obj.visibility - await sess.flush() - return obj - - -def _price_cents(sd: dict) -> Optional[int]: - try: - return int((sd.get("price") or {}).get("amount")) - except Exception: - return None - - -# ---- application of member payload onto User + related tables ---- - -async def _find_or_create_user_by_ghost_or_email(sess: AsyncSession, data: dict) -> User: - ghost_id = data.get("id") - email = _member_email(data) - - if ghost_id: - res = await sess.execute(select(User).where(User.ghost_id == ghost_id)) - u = res.scalar_one_or_none() - if u: - return u - - if email: - res = await sess.execute(select(User).where(User.email.ilike(email))) - u = res.scalar_one_or_none() - if u: - if ghost_id and not u.ghost_id: - u.ghost_id = ghost_id - return u - - # create a new user (Ghost is source of truth for member list) - u = User(email=email or f"_ghost_{ghost_id}@invalid.local") - if ghost_id: - u.ghost_id = ghost_id - sess.add(u) - await sess.flush() - return u - - -async def _apply_user_membership(sess: AsyncSession, user: User, m: dict) -> User: - """Apply Ghost member payload to local User WITHOUT touching relationship collections directly. - We mutate join tables explicitly to avoid lazy-loads (which cause MissingGreenlet in async). - """ - sess.add(user) - - # scalar fields - user.name = m.get("name") or user.name - user.ghost_status = m.get("status") or user.ghost_status - user.ghost_subscribed = bool(m.get("subscribed", True)) - user.ghost_note = m.get("note") or user.ghost_note - user.avatar_image = m.get("avatar_image") or user.avatar_image - user.stripe_customer_id = ( - (m.get("stripe") or {}).get("customer_id") - or (m.get("customer") or {}).get("id") - or m.get("stripe_customer_id") - or user.stripe_customer_id - ) - user.ghost_raw = dict(m) - flag_modified(user, "ghost_raw") - - await sess.flush() # ensure user.id exists - - # Labels join - label_ids: list[int] = [] - for ld in m.get("labels") or []: - lbl = await _upsert_label(sess, ld) - label_ids.append(lbl.id) - await sess.execute(delete(UserLabel).where(UserLabel.user_id == user.id)) - for lid in label_ids: - sess.add(UserLabel(user_id=user.id, label_id=lid)) - await sess.flush() - - # Newsletters join with subscribed flag - nl_rows: list[tuple[int, bool]] = [] - for nd in m.get("newsletters") or []: - nl = await _upsert_newsletter(sess, nd) - nl_rows.append((nl.id, bool(nd.get("subscribed", True)))) - await sess.execute(delete(UserNewsletter).where(UserNewsletter.user_id == user.id)) - for nl_id, subbed in nl_rows: - sess.add(UserNewsletter(user_id=user.id, newsletter_id=nl_id, subscribed=subbed)) - await sess.flush() - - # Subscriptions - for sd in m.get("subscriptions") or []: - sid = sd.get("id") - if not sid: - continue - - tier_id: Optional[int] = None - if sd.get("tier"): - tier = await _upsert_tier(sess, sd["tier"]) - await sess.flush() - tier_id = tier.id - - res = await sess.execute(select(GhostSubscription).where(GhostSubscription.ghost_id == sid)) - sub = res.scalar_one_or_none() - if not sub: - sub = GhostSubscription(ghost_id=sid, user_id=user.id) - sess.add(sub) - - sub.user_id = user.id - sub.status = sd.get("status") or sub.status - sub.cadence = (sd.get("plan") or {}).get("interval") or sd.get("cadence") or sub.cadence - sub.price_amount = _price_cents(sd) - sub.price_currency = (sd.get("price") or {}).get("currency") or sub.price_currency - sub.stripe_customer_id = ( - (sd.get("customer") or {}).get("id") - or (sd.get("stripe") or {}).get("customer_id") - or sub.stripe_customer_id - ) - sub.stripe_subscription_id = ( - sd.get("stripe_subscription_id") - or (sd.get("stripe") or {}).get("subscription_id") - or sub.stripe_subscription_id - ) - if tier_id is not None: - sub.tier_id = tier_id - sub.raw = dict(sd) - flag_modified(sub, "raw") - - await sess.flush() - return user - - -# ===================================================== -# PUSH MEMBERS FROM LOCAL DB -> GHOST (DB -> Ghost) -# ===================================================== - -def _ghost_member_payload_base(u: User) -> dict: - """Compose writable Ghost member fields from local User, validating types.""" - email = _to_str_or_none(getattr(u, "email", None)) - payload: dict = {} - if email: - payload["email"] = email.lower() - - name = _to_str_or_none(getattr(u, "name", None)) - if name: - payload["name"] = name - - note = _to_str_or_none(getattr(u, "ghost_note", None)) - if note: - payload["note"] = note - - # If ghost_subscribed is None, default True (Ghost expects boolean) - subscribed = getattr(u, "ghost_subscribed", True) - payload["subscribed"] = bool(subscribed) - - return payload - -async def _newsletters_for_user(sess: AsyncSession, user_id: int) -> list[dict]: - """Return list of {'id': ghost_id, 'subscribed': bool} rows for Ghost API, excluding blanks.""" - q = await sess.execute( - select(GhostNewsletter.ghost_id, UserNewsletter.subscribed, GhostNewsletter.name) - .join(UserNewsletter, UserNewsletter.newsletter_id == GhostNewsletter.id) - .where(UserNewsletter.user_id == user_id) - ) - seen = set() - out: list[dict] = [] - for gid, subscribed, name in q.all(): - gid = (gid or "").strip() or None - name = (name or "").strip() or None - row: dict = {"subscribed": bool(subscribed)} - if gid: - key = ("id", gid) - if key in seen: - continue - row["id"] = gid - seen.add(key) - out.append(row) - elif name: - key = ("name", name.lower()) - if key in seen: - continue - row["name"] = name - seen.add(key) - out.append(row) - # else: skip - return out - -async def _labels_for_user(sess: AsyncSession, user_id: int) -> list[dict]: - """Return list of {'id': ghost_id} or {'name': name} for Ghost API, excluding blanks.""" - q = await sess.execute( - select(GhostLabel.ghost_id, GhostLabel.name) - .join(UserLabel, UserLabel.label_id == GhostLabel.id) - .where(UserLabel.user_id == user_id) - ) - seen = set() - out: list[dict] = [] - for gid, name in q.all(): - gid = (gid or "").strip() or None - name = (name or "").strip() or None - if gid: - key = ("id", gid) - if key not in seen: - out.append({"id": gid}) - seen.add(key) - elif name: - key = ("name", name.lower()) - if key not in seen: - out.append({"name": name}) - seen.add(key) - # else: skip empty label row - return out - - -async def _ghost_find_member_by_email(email: str) -> dict | None: - """Query Ghost for a member by email to resolve conflicts / missing IDs.""" - if not email: - return None - async with httpx.AsyncClient(timeout=20) as client: - resp = await client.get( - f"{GHOST_ADMIN_API_URL}/members/", - headers=_auth_header(), - params={"filter": f"email:{email}", "limit": 1}, - ) - resp.raise_for_status() - members = (resp.json() or {}).get("members") or [] - return members[0] if members else None - - -from urllib.parse import quote # make sure this import exists at top - -async def _ghost_find_member_by_email(email: str) -> Optional[dict]: - if not email: - return None - async with httpx.AsyncClient(timeout=30) as client: - resp = await client.get( - f"{GHOST_ADMIN_API_URL}/members/?filter=email:{quote(email)}&limit=1", - headers=_auth_header(), - ) - resp.raise_for_status() - members = resp.json().get("members") or [] - return members[0] if members else None - -async def _ghost_upsert_member(payload: dict, ghost_id: str | None = None) -> dict: - """Create/update a member, with sanitization + 5xx retry/backoff. - - Prefer PUT if ghost_id given. - - On 422: retry without name/note; if 'already exists', find-by-email then PUT. - - On 404: find-by-email and PUT; if still missing, POST create. - - On 5xx: small exponential backoff retry. - """ - safe_keys = ("email", "name", "note", "subscribed", "labels", "newsletters", "id") - pl_raw = {k: v for k, v in payload.items() if k in safe_keys} - pl = _sanitize_member_payload(pl_raw) - - async def _request_with_retry(client: httpx.AsyncClient, method: str, url: str, json: dict) -> httpx.Response: - delay = 0.5 - for attempt in range(3): - r = await client.request(method, url, headers=_auth_header(), json=json) - if r.status_code >= 500: - if attempt < 2: - await asyncio.sleep(delay) - delay *= 2 - continue - return r - return r # last response - - async with httpx.AsyncClient(timeout=30) as client: - - async def _put(mid: str, p: dict) -> dict: - r = await _request_with_retry( - client, "PUT", - f"{GHOST_ADMIN_API_URL}/members/{mid}/", - {"members": [p]}, - ) - if r.status_code == 404: - # Stale id: try by email, then create if absent - existing = await _ghost_find_member_by_email(p.get("email", "")) - if existing and existing.get("id"): - r2 = await _request_with_retry( - client, "PUT", - f"{GHOST_ADMIN_API_URL}/members/{existing['id']}/", - {"members": [p]}, - ) - r2.raise_for_status() - return (r2.json().get("members") or [None])[0] or {} - r3 = await _request_with_retry( - client, "POST", - f"{GHOST_ADMIN_API_URL}/members/", - {"members": [p]}, - ) - r3.raise_for_status() - return (r3.json().get("members") or [None])[0] or {} - - if r.status_code == 422: - body = (r.text or "").lower() - retry = dict(p) - dropped = False - if '"note"' in body or "for note" in body: - retry.pop("note", None); dropped = True - if '"name"' in body or "for name" in body: - retry.pop("name", None); dropped = True - if "labels.name" in body: - retry.pop("labels", None); dropped = True - if dropped: - r2 = await _request_with_retry( - client, "PUT", - f"{GHOST_ADMIN_API_URL}/members/{mid}/", - {"members": [retry]}, - ) - if r2.status_code == 404: - existing = await _ghost_find_member_by_email(retry.get("email", "")) - if existing and existing.get("id"): - r3 = await _request_with_retry( - client, "PUT", - f"{GHOST_ADMIN_API_URL}/members/{existing['id']}/", - {"members": [retry]}, - ) - r3.raise_for_status() - return (r3.json().get("members") or [None])[0] or {} - r3 = await _request_with_retry( - client, "POST", - f"{GHOST_ADMIN_API_URL}/members/", - {"members": [retry]}, - ) - r3.raise_for_status() - return (r3.json().get("members") or [None])[0] or {} - r2.raise_for_status() - return (r2.json().get("members") or [None])[0] or {} - r.raise_for_status() - return (r.json().get("members") or [None])[0] or {} - - async def _post_upsert(p: dict) -> dict: - r = await _request_with_retry( - client, "POST", - f"{GHOST_ADMIN_API_URL}/members/?upsert=true", - {"members": [p]}, - ) - if r.status_code == 422: - lower = (r.text or "").lower() - - # sanitize further name/note/labels on schema complaints - retry = dict(p) - changed = False - if '"note"' in lower or "for note" in lower: - retry.pop("note", None); changed = True - if '"name"' in lower or "for name" in lower: - retry.pop("name", None); changed = True - if "labels.name" in lower: - retry.pop("labels", None); changed = True - - if changed: - r2 = await _request_with_retry( - client, "POST", - f"{GHOST_ADMIN_API_URL}/members/?upsert=true", - {"members": [retry]}, - ) - if r2.status_code != 422: - r2.raise_for_status() - return (r2.json().get("members") or [None])[0] or {} - lower = (r2.text or "").lower() - - # existing email => find-by-email then PUT - if "already exists" in lower and "email address" in lower: - existing = await _ghost_find_member_by_email(p.get("email", "")) - if existing and existing.get("id"): - return await _put(existing["id"], p) - - # unrecoverable - raise httpx.HTTPStatusError( - "Validation error, cannot edit member.", - request=r.request, - response=r, - ) - r.raise_for_status() - return (r.json().get("members") or [None])[0] or {} - - if ghost_id: - return await _put(ghost_id, pl) - return await _post_upsert(pl) - -async def sync_member_to_ghost(sess: AsyncSession, user_id: int) -> Optional[str]: - res = await sess.execute(select(User).where(User.id == user_id)) - user = res.scalar_one_or_none() - if not user: - return None - - payload = _ghost_member_payload_base(user) - - labels = await _labels_for_user(sess, user.id) - if labels: - payload["labels"] = labels # Ghost accepts label ids on upsert - - ghost_member = await _ghost_upsert_member(payload, ghost_id=user.ghost_id) - - if ghost_member: - gm_id = ghost_member.get("id") - if gm_id and user.ghost_id != gm_id: - user.ghost_id = gm_id - user.ghost_raw = dict(ghost_member) - flag_modified(user, "ghost_raw") - await sess.flush() - return user.ghost_id or gm_id - return user.ghost_id - - -async def sync_members_to_ghost( - sess: AsyncSession, - changed_since: Optional[datetime] = None, - limit: Optional[int] = None, -) -> int: - """Upsert a batch of users to Ghost. Returns count processed.""" - stmt = select(User.id) - if changed_since: - stmt = stmt.where( - or_( - User.created_at >= changed_since, - and_(User.last_login_at != None, User.last_login_at >= changed_since), - ) - ) - if limit: - stmt = stmt.limit(limit) - - ids = [row[0] for row in (await sess.execute(stmt)).all()] - processed = 0 - for uid in ids: - try: - await sync_member_to_ghost(sess, uid) - processed += 1 - except httpx.HTTPStatusError as e: - # Log and continue; don't kill startup - print(f"[ghost sync] failed upsert for user {uid}: {e.response.status_code} {e.response.text}") - except Exception as e: - print(f"[ghost sync] failed upsert for user {uid}: {e}") - return processed - - -# ===================================================== -# Membership fetch/sync (Ghost -> DB) bulk + single -# ===================================================== - -async def fetch_all_members_from_ghost() -> list[dict[str, Any]]: - async with httpx.AsyncClient(timeout=60) as client: - resp = await client.get( - f"{GHOST_ADMIN_API_URL}/members/?include=labels,subscriptions,tiers,newsletters&limit=all", - headers=_auth_header(), - ) - resp.raise_for_status() - return resp.json().get("members", []) - - -async def sync_all_membership_from_ghost(sess: AsyncSession) -> None: - members = await fetch_all_members_from_ghost() - - # collect related lookups and ensure catalogs exist first (avoid FK races) - label_bucket: Dict[str, dict[str, Any]] = {} - tier_bucket: Dict[str, dict[str, Any]] = {} - newsletter_bucket: Dict[str, dict[str, Any]] = {} - - for m in members: - for l in m.get("labels") or []: - label_bucket[l["id"]] = l - for n in m.get("newsletters") or []: - newsletter_bucket[n["id"]] = n - for s in m.get("subscriptions") or []: - t = s.get("tier") - if isinstance(t, dict) and t.get("id"): - tier_bucket[t["id"]] = t - - for L in label_bucket.values(): - await _upsert_label(sess, L) - for T in tier_bucket.values(): - await _upsert_tier(sess, T) - for N in newsletter_bucket.values(): - await _upsert_newsletter(sess, N) - - # Users - for gm in members: - user = await _find_or_create_user_by_ghost_or_email(sess, gm) - await _apply_user_membership(sess, user, gm) - - # transaction auto-commits here - - -async def fetch_single_member_from_ghost(ghost_id: str) -> Optional[dict[str, Any]]: - async with httpx.AsyncClient(timeout=30) as client: - resp = await client.get( - f"{GHOST_ADMIN_API_URL}/members/{ghost_id}/?include=labels,newsletters,subscriptions,tiers", - headers=_auth_header(), - ) - if resp.status_code == 404: - return None - resp.raise_for_status() - data = resp.json() - items = data.get("members") or data.get("member") or [] - if isinstance(items, dict): - return items - return (items[0] if items else None) - - -async def sync_single_member(sess: AsyncSession, ghost_id: str) -> None: - m = await fetch_single_member_from_ghost(ghost_id) - if m is None: - # If member deleted in Ghost, we won't delete local user here. - return - - # ensure catalogs for this payload - for l in m.get("labels") or []: - await _upsert_label(sess, l) - for n in m.get("newsletters") or []: - await _upsert_newsletter(sess, n) - for s in m.get("subscriptions") or []: - if isinstance(s.get("tier"), dict): - await _upsert_tier(sess, s["tier"]) - - user = await _find_or_create_user_by_ghost_or_email(sess, m) - await _apply_user_membership(sess, user, m) - # transaction auto-commits here - # ===================================================== # Single-item content helpers (posts/authors/tags) @@ -995,77 +411,6 @@ async def fetch_single_tag_from_ghost(ghost_id: str) -> Optional[dict[str, Any]] return tags[0] if tags else None -def _build_ap_post_data(post, post_url: str, tag_objs: list) -> dict: - """Build rich AP object_data for a blog post/page. - - Produces a Note with HTML content (excerpt), feature image + inline - images as attachments, and tags as AP Hashtag objects. - """ - # Content HTML: title + excerpt + "Read more" link - parts: list[str] = [] - if post.title: - parts.append(f"

{html_escape(post.title)}

") - - body = post.plaintext or post.custom_excerpt or post.excerpt or "" - - if body: - for para in body.split("\n\n"): - para = para.strip() - if para: - parts.append(f"

{html_escape(para)}

") - - parts.append(f'

Read more \u2192

') - - # Hashtag links in content (Mastodon expects them inline too) - if tag_objs: - ht_links = [] - for t in tag_objs: - clean = t.slug.replace("-", "") - ht_links.append( - f'' - ) - parts.append(f'

{" ".join(ht_links)}

') - - obj: dict = { - "name": post.title or "", - "content": "\n".join(parts), - "url": post_url, - } - - # Attachments: feature image + inline images (max 4) - attachments: list[dict] = [] - seen: set[str] = set() - - if post.feature_image: - att: dict = {"type": "Image", "url": post.feature_image} - if post.feature_image_alt: - att["name"] = post.feature_image_alt - attachments.append(att) - seen.add(post.feature_image) - - if post.html: - for src in re.findall(r']+src="([^"]+)"', post.html): - if src not in seen and len(attachments) < 4: - attachments.append({"type": "Image", "url": src}) - seen.add(src) - - if attachments: - obj["attachment"] = attachments - - # AP Hashtag objects - if tag_objs: - obj["tag"] = [ - { - "type": "Hashtag", - "href": f"{post_url}tag/{t.slug}/", - "name": f"#{t.slug.replace('-', '')}", - } - for t in tag_objs - ] - - return obj - - async def sync_single_post(sess: AsyncSession, ghost_id: str) -> None: gp = await fetch_single_post_from_ghost(ghost_id) if gp is None: @@ -1132,7 +477,7 @@ async def sync_single_post(sess: AsyncSession, ghost_id: str) -> None: async def sync_single_page(sess: AsyncSession, ghost_id: str) -> None: gp = await fetch_single_page_from_ghost(ghost_id) if gp is not None: - gp["page"] = True # Ghost /pages/ endpoint may omit this flag + gp["page"] = True if gp is None: res = await sess.execute(select(Post).where(Post.ghost_id == ghost_id)) obj = res.scalar_one_or_none() @@ -1218,23 +563,15 @@ async def sync_single_tag(sess: AsyncSession, ghost_id: str) -> None: await _upsert_tag(sess, gt) -# ---- explicit public exports (back-compat) ---- __all__ = [ # bulk content "sync_all_content_from_ghost", - # bulk membership (user-centric) - "sync_all_membership_from_ghost", - # DB -> Ghost - "sync_member_to_ghost", - "sync_members_to_ghost", # single fetch "fetch_single_post_from_ghost", "fetch_single_author_from_ghost", "fetch_single_tag_from_ghost", - "fetch_single_member_from_ghost", # single sync "sync_single_post", "sync_single_author", "sync_single_tag", - "sync_single_member", ] diff --git a/blog/bp/blog/routes.py b/blog/bp/blog/routes.py index 7d823b5..2b788d9 100644 --- a/blog/bp/blog/routes.py +++ b/blog/bp/blog/routes.py @@ -47,14 +47,10 @@ def register(url_prefix, title): @blogs_bp.before_app_serving async def init(): - from .ghost.ghost_sync import ( - sync_all_content_from_ghost, - sync_all_membership_from_ghost, - ) - + from .ghost.ghost_sync import sync_all_content_from_ghost + async with get_session() as s: await sync_all_content_from_ghost(s) - await sync_all_membership_from_ghost(s) await s.commit() @blogs_bp.before_request diff --git a/blog/bp/blog/web_hooks/routes.py b/blog/bp/blog/web_hooks/routes.py index b02138b..20a680b 100644 --- a/blog/bp/blog/web_hooks/routes.py +++ b/blog/bp/blog/web_hooks/routes.py @@ -4,7 +4,6 @@ import os from quart import Blueprint, request, abort, Response, g from ..ghost.ghost_sync import ( - sync_single_member, sync_single_page, sync_single_post, sync_single_author, @@ -18,18 +17,12 @@ ghost_webhooks = Blueprint("ghost_webhooks", __name__, url_prefix="/__ghost-webh def _check_secret(req) -> None: expected = os.getenv("GHOST_WEBHOOK_SECRET") if not expected: - # if you don't set a secret, we allow anything (dev mode) return got = req.args.get("secret") or req.headers.get("X-Webhook-Secret") if got != expected: abort(401) def _extract_id(data: dict, key: str) -> str | None: - """ - key is "post", "tag", or "user"/"author". - Ghost usually sends { key: { current: { id: ... }, previous: { id: ... } } } - We'll try current first, then previous. - """ block = data.get(key) or {} cur = block.get("current") or {} prev = block.get("previous") or {} @@ -38,7 +31,6 @@ def _extract_id(data: dict, key: str) -> str | None: @csrf_exempt @ghost_webhooks.route("/member/", methods=["POST"]) -#@ghost_webhooks.post("/member/") async def webhook_member() -> Response: _check_secret(request) @@ -47,9 +39,17 @@ async def webhook_member() -> Response: if not ghost_id: abort(400, "no member id") - # sync one post - #async_session_factory = request.app.config["ASYNC_SESSION_FACTORY"] # we'll set this in app.py - await sync_single_member(g.s, ghost_id) + # Delegate to account service (membership data lives in db_account) + from shared.infrastructure.actions import call_action + try: + await call_action( + "account", "ghost-sync-member", + payload={"ghost_id": ghost_id}, + timeout=30.0, + ) + except Exception as e: + import logging + logging.getLogger(__name__).error("Member sync via account failed: %s", e) return Response(status=204) @csrf_exempt @@ -63,10 +63,8 @@ async def webhook_post() -> Response: if not ghost_id: abort(400, "no post id") - # sync one post - #async_session_factory = request.app.config["ASYNC_SESSION_FACTORY"] # we'll set this in app.py await sync_single_post(g.s, ghost_id) - + return Response(status=204) @csrf_exempt @@ -80,10 +78,8 @@ async def webhook_page() -> Response: if not ghost_id: abort(400, "no page id") - # sync one post - #async_session_factory = request.app.config["ASYNC_SESSION_FACTORY"] # we'll set this in app.py await sync_single_page(g.s, ghost_id) - + return Response(status=204) @csrf_exempt @@ -93,15 +89,12 @@ async def webhook_author() -> Response: _check_secret(request) data = await request.get_json(force=True, silent=True) or {} - # Ghost calls them "user" in webhook payload in many versions, - # and you want authors in your mirror. We'll try both keys. ghost_id = _extract_id(data, "user") or _extract_id(data, "author") if not ghost_id: abort(400, "no author id") - #async_session_factory = request.app.config["ASYNC_SESSION_FACTORY"] await sync_single_author(g.s, ghost_id) - + return Response(status=204) @csrf_exempt @@ -115,6 +108,5 @@ async def webhook_tag() -> Response: if not ghost_id: abort(400, "no tag id") - #async_session_factory = request.app.config["ASYNC_SESSION_FACTORY"] await sync_single_tag(g.s, ghost_id) return Response(status=204) diff --git a/blog/bp/post/admin/routes.py b/blog/bp/post/admin/routes.py index e04d479..603e927 100644 --- a/blog/bp/post/admin/routes.py +++ b/blog/bp/post/admin/routes.py @@ -465,17 +465,18 @@ def register(): @require_post_author async def edit(slug: str): from ...blog.ghost.ghost_posts import get_post_for_edit - from shared.models.ghost_membership_entities import GhostNewsletter - from sqlalchemy import select as sa_select + from shared.infrastructure.data_client import fetch_data ghost_id = g.post_data["post"]["ghost_id"] is_page = bool(g.post_data["post"].get("is_page")) ghost_post = await get_post_for_edit(ghost_id, is_page=is_page) save_success = request.args.get("saved") == "1" - newsletters = (await g.s.execute( - sa_select(GhostNewsletter).order_by(GhostNewsletter.name) - )).scalars().all() + # Newsletters live in db_account — fetch via HTTP + raw_newsletters = await fetch_data("account", "newsletters", required=False) or [] + # Convert dicts to objects with .name/.ghost_id attributes for template compat + from types import SimpleNamespace + newsletters = [SimpleNamespace(**nl) for nl in raw_newsletters] if not is_htmx_request(): html = await render_template( diff --git a/shared/infrastructure/ghost_admin_token.py b/shared/infrastructure/ghost_admin_token.py new file mode 100644 index 0000000..1974075 --- /dev/null +++ b/shared/infrastructure/ghost_admin_token.py @@ -0,0 +1,46 @@ +import os +import time +import jwt # PyJWT +from typing import Tuple + + +def _split_key(raw_key: str) -> Tuple[str, bytes]: + """ + raw_key is the 'id:secret' from Ghost. + Returns (id, secret_bytes) + """ + key_id, key_secret_hex = raw_key.split(':', 1) + secret_bytes = bytes.fromhex(key_secret_hex) + return key_id, secret_bytes + + +def make_ghost_admin_jwt() -> str: + """ + Generate a short-lived JWT suitable for Authorization: Ghost + """ + raw_key = os.environ["GHOST_ADMIN_API_KEY"] + key_id, secret_bytes = _split_key(raw_key) + + now = int(time.time()) + + payload = { + "iat": now, + "exp": now + 5 * 60, # now + 5 minutes + "aud": "/admin/", + } + + headers = { + "alg": "HS256", + "kid": key_id, + "typ": "JWT", + } + + token = jwt.encode( + payload, + secret_bytes, + algorithm="HS256", + headers=headers, + ) + + # PyJWT returns str in recent versions; Ghost expects bare token string + return token diff --git a/shared/infrastructure/oauth.py b/shared/infrastructure/oauth.py index 46ce7b7..2121d31 100644 --- a/shared/infrastructure/oauth.py +++ b/shared/infrastructure/oauth.py @@ -22,7 +22,7 @@ from quart import ( ) from sqlalchemy import select -from shared.db.session import get_session +from shared.db.session import get_session, get_account_session from shared.models.oauth_code import OAuthCode from shared.infrastructure.urls import account_url, app_url from shared.infrastructure.cart_identity import current_cart_identity @@ -100,7 +100,8 @@ def create_oauth_blueprint(app_name: str) -> Blueprint: expected_redirect = app_url(app_name, "/auth/callback") now = datetime.now(timezone.utc) - async with get_session() as s: + # OAuthCode lives in db_account — use account session + async with get_account_session() as s: async with s.begin(): result = await s.execute( select(OAuthCode) diff --git a/shared/services/federation_publish.py b/shared/services/federation_publish.py index fb26ea0..7936645 100644 --- a/shared/services/federation_publish.py +++ b/shared/services/federation_publish.py @@ -4,6 +4,9 @@ The originating service calls try_publish() directly, which creates the APActivity (with process_state='pending') in the same DB transaction. The EventProcessor picks it up and the delivery wildcard handler POSTs to follower inboxes. + +When the federation database is separate from the caller's database, +this module opens its own federation session for all AP reads/writes. """ from __future__ import annotations @@ -17,6 +20,12 @@ from shared.services.registry import services log = logging.getLogger(__name__) +def _needs_federation_session() -> bool: + """True when the federation DB differs from the app's default DB.""" + from shared.db.session import DATABASE_URL, DATABASE_URL_FEDERATION + return DATABASE_URL_FEDERATION != DATABASE_URL + + async def try_publish( session: AsyncSession, *, @@ -38,6 +47,42 @@ async def try_publish( if not user_id: return + if _needs_federation_session(): + from shared.db.session import get_federation_session + async with get_federation_session() as fed_s: + async with fed_s.begin(): + await _publish_inner( + fed_s, + user_id=user_id, + activity_type=activity_type, + object_type=object_type, + object_data=object_data, + source_type=source_type, + source_id=source_id, + ) + else: + await _publish_inner( + session, + user_id=user_id, + activity_type=activity_type, + object_type=object_type, + object_data=object_data, + source_type=source_type, + source_id=source_id, + ) + + +async def _publish_inner( + session: AsyncSession, + *, + user_id: int, + activity_type: str, + object_type: str, + object_data: dict, + source_type: str, + source_id: int, +) -> None: + """Core publish logic using a session bound to the federation DB.""" actor = await services.federation.get_actor_by_user_id(session, user_id) if not actor: return @@ -48,28 +93,24 @@ async def try_publish( ) if existing: if activity_type == "Create" and existing.activity_type != "Delete": - return # already published (allow re-Create after Delete/unpublish) + return if activity_type == "Delete" and existing.activity_type == "Delete": - return # already deleted + return elif activity_type in ("Delete", "Update"): - return # never published, nothing to delete/update + return - # Stable object ID within a publish cycle. After Delete + re-Create - # we append a version suffix so remote servers (Mastodon) treat it as - # a brand-new post rather than ignoring the tombstoned ID. + # Stable object ID within a publish cycle domain = os.getenv("AP_DOMAIN", "federation.rose-ash.com") base_object_id = ( f"https://{domain}/users/{actor.preferred_username}" f"/objects/{source_type.lower()}/{source_id}" ) if activity_type == "Create" and existing and existing.activity_type == "Delete": - # Count prior Creates to derive a version number create_count = await services.federation.count_activities_for_source( session, source_type, source_id, activity_type="Create", ) object_data["id"] = f"{base_object_id}/v{create_count + 1}" elif activity_type in ("Update", "Delete") and existing and existing.object_data: - # Use the same object ID as the most recent activity object_data["id"] = existing.object_data.get("id", base_object_id) else: object_data["id"] = base_object_id