Files
mono/account/services/ghost_membership.py
giles 95bd32bd71 Decouple cross-domain DB queries for per-app database split
Move Ghost membership sync from blog to account service so blog no
longer queries account tables (users, ghost_labels, etc.). Account
runs membership sync at startup and exposes HTTP action/data endpoints
for webhook-triggered syncs and user lookups.

Key changes:
- account/services/ghost_membership.py: all membership sync functions
- account/bp/actions + data: ghost-sync-member, user-by-email, newsletters
- blog ghost_sync.py: stripped to content-only (posts, authors, tags)
- blog webhook member: delegates to account via call_action()
- try_publish: opens federation session when DBs differ
- oauth.py callback: uses get_account_session() for OAuthCode
- page_configs moved from db_events to db_blog in split script

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 11:32:14 +00:00

622 lines
21 KiB
Python

"""Ghost membership sync — account-owned.
Handles Ghost ↔ DB sync for user/membership data:
- Ghost → DB: fetch members from Ghost API, upsert into account tables
- DB → Ghost: push local user changes back to Ghost API
All tables involved (users, ghost_labels, user_labels, ghost_newsletters,
user_newsletters, ghost_tiers, ghost_subscriptions) live in db_account.
"""
from __future__ import annotations
import os
import re
import asyncio
from datetime import datetime
from typing import Dict, Any, Optional
import httpx
from sqlalchemy import select, delete, or_, and_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm.attributes import flag_modified
from shared.models import User
from shared.models.ghost_membership_entities import (
GhostLabel, UserLabel,
GhostNewsletter, UserNewsletter,
GhostTier, GhostSubscription,
)
from shared.infrastructure.ghost_admin_token import make_ghost_admin_jwt
from urllib.parse import quote
GHOST_ADMIN_API_URL = os.environ.get("GHOST_ADMIN_API_URL", "")
def _auth_header() -> dict[str, str]:
return {"Authorization": f"Ghost {make_ghost_admin_jwt()}"}
def _iso(val: str | None) -> datetime | None:
if not val:
return None
return datetime.fromisoformat(val.replace("Z", "+00:00"))
def _to_str_or_none(v) -> Optional[str]:
if v is None:
return None
if isinstance(v, (dict, list, set, tuple, bytes, bytearray)):
return None
s = str(v).strip()
return s or None
def _sanitize_member_payload(payload: dict) -> dict:
"""Coerce types Ghost expects and drop empties to avoid 422/500 quirks."""
out: dict = {}
email = _to_str_or_none(payload.get("email"))
if email:
out["email"] = email.lower()
name = _to_str_or_none(payload.get("name"))
if name is not None:
out["name"] = name
note = _to_str_or_none(payload.get("note"))
if note is not None:
out["note"] = note
if "subscribed" in payload:
out["subscribed"] = bool(payload.get("subscribed"))
labels = []
for item in payload.get("labels") or []:
gid = _to_str_or_none(item.get("id"))
gname = _to_str_or_none(item.get("name"))
if gid:
labels.append({"id": gid})
elif gname:
labels.append({"name": gname})
if labels:
out["labels"] = labels
newsletters = []
for item in payload.get("newsletters") or []:
gid = _to_str_or_none(item.get("id"))
gname = _to_str_or_none(item.get("name"))
row = {"subscribed": bool(item.get("subscribed", True))}
if gid:
row["id"] = gid
newsletters.append(row)
elif gname:
row["name"] = gname
newsletters.append(row)
if newsletters:
out["newsletters"] = newsletters
gid = _to_str_or_none(payload.get("id"))
if gid:
out["id"] = gid
return out
def _member_email(m: dict[str, Any]) -> Optional[str]:
email = (m.get("email") or "").strip().lower() or None
return email
# ---- upsert helpers for related entities ----
async def _upsert_label(sess: AsyncSession, data: dict) -> GhostLabel:
res = await sess.execute(select(GhostLabel).where(GhostLabel.ghost_id == data["id"]))
obj = res.scalar_one_or_none()
if not obj:
obj = GhostLabel(ghost_id=data["id"])
sess.add(obj)
obj.name = data.get("name") or obj.name
obj.slug = data.get("slug") or obj.slug
await sess.flush()
return obj
async def _upsert_newsletter(sess: AsyncSession, data: dict) -> GhostNewsletter:
res = await sess.execute(select(GhostNewsletter).where(GhostNewsletter.ghost_id == data["id"]))
obj = res.scalar_one_or_none()
if not obj:
obj = GhostNewsletter(ghost_id=data["id"])
sess.add(obj)
obj.name = data.get("name") or obj.name
obj.slug = data.get("slug") or obj.slug
obj.description = data.get("description") or obj.description
await sess.flush()
return obj
async def _upsert_tier(sess: AsyncSession, data: dict) -> GhostTier:
res = await sess.execute(select(GhostTier).where(GhostTier.ghost_id == data["id"]))
obj = res.scalar_one_or_none()
if not obj:
obj = GhostTier(ghost_id=data["id"])
sess.add(obj)
obj.name = data.get("name") or obj.name
obj.slug = data.get("slug") or obj.slug
obj.type = data.get("type") or obj.type
obj.visibility = data.get("visibility") or obj.visibility
await sess.flush()
return obj
def _price_cents(sd: dict) -> Optional[int]:
try:
return int((sd.get("price") or {}).get("amount"))
except Exception:
return None
# ---- find/create user by ghost_id or email ----
async def _find_or_create_user_by_ghost_or_email(sess: AsyncSession, data: dict) -> User:
ghost_id = data.get("id")
email = _member_email(data)
if ghost_id:
res = await sess.execute(select(User).where(User.ghost_id == ghost_id))
u = res.scalar_one_or_none()
if u:
return u
if email:
res = await sess.execute(select(User).where(User.email.ilike(email)))
u = res.scalar_one_or_none()
if u:
if ghost_id and not u.ghost_id:
u.ghost_id = ghost_id
return u
u = User(email=email or f"_ghost_{ghost_id}@invalid.local")
if ghost_id:
u.ghost_id = ghost_id
sess.add(u)
await sess.flush()
return u
# ---- apply membership data to user ----
async def _apply_user_membership(sess: AsyncSession, user: User, m: dict) -> User:
"""Apply Ghost member payload to local User."""
sess.add(user)
user.name = m.get("name") or user.name
user.ghost_status = m.get("status") or user.ghost_status
user.ghost_subscribed = bool(m.get("subscribed", True))
user.ghost_note = m.get("note") or user.ghost_note
user.avatar_image = m.get("avatar_image") or user.avatar_image
user.stripe_customer_id = (
(m.get("stripe") or {}).get("customer_id")
or (m.get("customer") or {}).get("id")
or m.get("stripe_customer_id")
or user.stripe_customer_id
)
user.ghost_raw = dict(m)
flag_modified(user, "ghost_raw")
await sess.flush()
# Labels join
label_ids: list[int] = []
for ld in m.get("labels") or []:
lbl = await _upsert_label(sess, ld)
label_ids.append(lbl.id)
await sess.execute(delete(UserLabel).where(UserLabel.user_id == user.id))
for lid in label_ids:
sess.add(UserLabel(user_id=user.id, label_id=lid))
await sess.flush()
# Newsletters join with subscribed flag
nl_rows: list[tuple[int, bool]] = []
for nd in m.get("newsletters") or []:
nl = await _upsert_newsletter(sess, nd)
nl_rows.append((nl.id, bool(nd.get("subscribed", True))))
await sess.execute(delete(UserNewsletter).where(UserNewsletter.user_id == user.id))
for nl_id, subbed in nl_rows:
sess.add(UserNewsletter(user_id=user.id, newsletter_id=nl_id, subscribed=subbed))
await sess.flush()
# Subscriptions
for sd in m.get("subscriptions") or []:
sid = sd.get("id")
if not sid:
continue
tier_id: Optional[int] = None
if sd.get("tier"):
tier = await _upsert_tier(sess, sd["tier"])
await sess.flush()
tier_id = tier.id
res = await sess.execute(select(GhostSubscription).where(GhostSubscription.ghost_id == sid))
sub = res.scalar_one_or_none()
if not sub:
sub = GhostSubscription(ghost_id=sid, user_id=user.id)
sess.add(sub)
sub.user_id = user.id
sub.status = sd.get("status") or sub.status
sub.cadence = (sd.get("plan") or {}).get("interval") or sd.get("cadence") or sub.cadence
sub.price_amount = _price_cents(sd)
sub.price_currency = (sd.get("price") or {}).get("currency") or sub.price_currency
sub.stripe_customer_id = (
(sd.get("customer") or {}).get("id")
or (sd.get("stripe") or {}).get("customer_id")
or sub.stripe_customer_id
)
sub.stripe_subscription_id = (
sd.get("stripe_subscription_id")
or (sd.get("stripe") or {}).get("subscription_id")
or sub.stripe_subscription_id
)
if tier_id is not None:
sub.tier_id = tier_id
sub.raw = dict(sd)
flag_modified(sub, "raw")
await sess.flush()
return user
# =====================================================
# PUSH MEMBERS FROM LOCAL DB -> GHOST (DB -> Ghost)
# =====================================================
def _ghost_member_payload_base(u: User) -> dict:
email = _to_str_or_none(getattr(u, "email", None))
payload: dict = {}
if email:
payload["email"] = email.lower()
name = _to_str_or_none(getattr(u, "name", None))
if name:
payload["name"] = name
note = _to_str_or_none(getattr(u, "ghost_note", None))
if note:
payload["note"] = note
subscribed = getattr(u, "ghost_subscribed", True)
payload["subscribed"] = bool(subscribed)
return payload
async def _newsletters_for_user(sess: AsyncSession, user_id: int) -> list[dict]:
q = await sess.execute(
select(GhostNewsletter.ghost_id, UserNewsletter.subscribed, GhostNewsletter.name)
.join(UserNewsletter, UserNewsletter.newsletter_id == GhostNewsletter.id)
.where(UserNewsletter.user_id == user_id)
)
seen = set()
out: list[dict] = []
for gid, subscribed, name in q.all():
gid = (gid or "").strip() or None
name = (name or "").strip() or None
row: dict = {"subscribed": bool(subscribed)}
if gid:
key = ("id", gid)
if key in seen:
continue
row["id"] = gid
seen.add(key)
out.append(row)
elif name:
key = ("name", name.lower())
if key in seen:
continue
row["name"] = name
seen.add(key)
out.append(row)
return out
async def _labels_for_user(sess: AsyncSession, user_id: int) -> list[dict]:
q = await sess.execute(
select(GhostLabel.ghost_id, GhostLabel.name)
.join(UserLabel, UserLabel.label_id == GhostLabel.id)
.where(UserLabel.user_id == user_id)
)
seen = set()
out: list[dict] = []
for gid, name in q.all():
gid = (gid or "").strip() or None
name = (name or "").strip() or None
if gid:
key = ("id", gid)
if key not in seen:
out.append({"id": gid})
seen.add(key)
elif name:
key = ("name", name.lower())
if key not in seen:
out.append({"name": name})
seen.add(key)
return out
async def _ghost_find_member_by_email(email: str) -> Optional[dict]:
if not email:
return None
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(
f"{GHOST_ADMIN_API_URL}/members/?filter=email:{quote(email)}&limit=1",
headers=_auth_header(),
)
resp.raise_for_status()
members = resp.json().get("members") or []
return members[0] if members else None
async def _ghost_upsert_member(payload: dict, ghost_id: str | None = None) -> dict:
"""Create/update a member, with sanitization + 5xx retry/backoff."""
safe_keys = ("email", "name", "note", "subscribed", "labels", "newsletters", "id")
pl_raw = {k: v for k, v in payload.items() if k in safe_keys}
pl = _sanitize_member_payload(pl_raw)
async def _request_with_retry(client: httpx.AsyncClient, method: str, url: str, json: dict) -> httpx.Response:
delay = 0.5
for attempt in range(3):
r = await client.request(method, url, headers=_auth_header(), json=json)
if r.status_code >= 500:
if attempt < 2:
await asyncio.sleep(delay)
delay *= 2
continue
return r
return r
async with httpx.AsyncClient(timeout=30) as client:
async def _put(mid: str, p: dict) -> dict:
r = await _request_with_retry(
client, "PUT",
f"{GHOST_ADMIN_API_URL}/members/{mid}/",
{"members": [p]},
)
if r.status_code == 404:
existing = await _ghost_find_member_by_email(p.get("email", ""))
if existing and existing.get("id"):
r2 = await _request_with_retry(
client, "PUT",
f"{GHOST_ADMIN_API_URL}/members/{existing['id']}/",
{"members": [p]},
)
r2.raise_for_status()
return (r2.json().get("members") or [None])[0] or {}
r3 = await _request_with_retry(
client, "POST",
f"{GHOST_ADMIN_API_URL}/members/",
{"members": [p]},
)
r3.raise_for_status()
return (r3.json().get("members") or [None])[0] or {}
if r.status_code == 422:
body = (r.text or "").lower()
retry = dict(p)
dropped = False
if '"note"' in body or "for note" in body:
retry.pop("note", None); dropped = True
if '"name"' in body or "for name" in body:
retry.pop("name", None); dropped = True
if "labels.name" in body:
retry.pop("labels", None); dropped = True
if dropped:
r2 = await _request_with_retry(
client, "PUT",
f"{GHOST_ADMIN_API_URL}/members/{mid}/",
{"members": [retry]},
)
if r2.status_code == 404:
existing = await _ghost_find_member_by_email(retry.get("email", ""))
if existing and existing.get("id"):
r3 = await _request_with_retry(
client, "PUT",
f"{GHOST_ADMIN_API_URL}/members/{existing['id']}/",
{"members": [retry]},
)
r3.raise_for_status()
return (r3.json().get("members") or [None])[0] or {}
r3 = await _request_with_retry(
client, "POST",
f"{GHOST_ADMIN_API_URL}/members/",
{"members": [retry]},
)
r3.raise_for_status()
return (r3.json().get("members") or [None])[0] or {}
r2.raise_for_status()
return (r2.json().get("members") or [None])[0] or {}
r.raise_for_status()
return (r.json().get("members") or [None])[0] or {}
async def _post_upsert(p: dict) -> dict:
r = await _request_with_retry(
client, "POST",
f"{GHOST_ADMIN_API_URL}/members/?upsert=true",
{"members": [p]},
)
if r.status_code == 422:
lower = (r.text or "").lower()
retry = dict(p)
changed = False
if '"note"' in lower or "for note" in lower:
retry.pop("note", None); changed = True
if '"name"' in lower or "for name" in lower:
retry.pop("name", None); changed = True
if "labels.name" in lower:
retry.pop("labels", None); changed = True
if changed:
r2 = await _request_with_retry(
client, "POST",
f"{GHOST_ADMIN_API_URL}/members/?upsert=true",
{"members": [retry]},
)
if r2.status_code != 422:
r2.raise_for_status()
return (r2.json().get("members") or [None])[0] or {}
lower = (r2.text or "").lower()
if "already exists" in lower and "email address" in lower:
existing = await _ghost_find_member_by_email(p.get("email", ""))
if existing and existing.get("id"):
return await _put(existing["id"], p)
raise httpx.HTTPStatusError(
"Validation error, cannot edit member.",
request=r.request,
response=r,
)
r.raise_for_status()
return (r.json().get("members") or [None])[0] or {}
if ghost_id:
return await _put(ghost_id, pl)
return await _post_upsert(pl)
async def sync_member_to_ghost(sess: AsyncSession, user_id: int) -> Optional[str]:
"""Push a single user's membership data to Ghost."""
res = await sess.execute(select(User).where(User.id == user_id))
user = res.scalar_one_or_none()
if not user:
return None
payload = _ghost_member_payload_base(user)
labels = await _labels_for_user(sess, user.id)
if labels:
payload["labels"] = labels
ghost_member = await _ghost_upsert_member(payload, ghost_id=user.ghost_id)
if ghost_member:
gm_id = ghost_member.get("id")
if gm_id and user.ghost_id != gm_id:
user.ghost_id = gm_id
user.ghost_raw = dict(ghost_member)
flag_modified(user, "ghost_raw")
await sess.flush()
return user.ghost_id or gm_id
return user.ghost_id
async def sync_members_to_ghost(
sess: AsyncSession,
changed_since: Optional[datetime] = None,
limit: Optional[int] = None,
) -> int:
"""Upsert a batch of users to Ghost. Returns count processed."""
stmt = select(User.id)
if changed_since:
stmt = stmt.where(
or_(
User.created_at >= changed_since,
and_(User.last_login_at != None, User.last_login_at >= changed_since),
)
)
if limit:
stmt = stmt.limit(limit)
ids = [row[0] for row in (await sess.execute(stmt)).all()]
processed = 0
for uid in ids:
try:
await sync_member_to_ghost(sess, uid)
processed += 1
except httpx.HTTPStatusError as e:
print(f"[ghost sync] failed upsert for user {uid}: {e.response.status_code} {e.response.text}")
except Exception as e:
print(f"[ghost sync] failed upsert for user {uid}: {e}")
return processed
# =====================================================
# Membership fetch/sync (Ghost -> DB) bulk + single
# =====================================================
async def fetch_all_members_from_ghost() -> list[dict[str, Any]]:
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.get(
f"{GHOST_ADMIN_API_URL}/members/?include=labels,subscriptions,tiers,newsletters&limit=all",
headers=_auth_header(),
)
resp.raise_for_status()
return resp.json().get("members", [])
async def sync_all_membership_from_ghost(sess: AsyncSession) -> None:
"""Bulk sync: fetch all members from Ghost, upsert into DB."""
members = await fetch_all_members_from_ghost()
label_bucket: Dict[str, dict[str, Any]] = {}
tier_bucket: Dict[str, dict[str, Any]] = {}
newsletter_bucket: Dict[str, dict[str, Any]] = {}
for m in members:
for l in m.get("labels") or []:
label_bucket[l["id"]] = l
for n in m.get("newsletters") or []:
newsletter_bucket[n["id"]] = n
for s in m.get("subscriptions") or []:
t = s.get("tier")
if isinstance(t, dict) and t.get("id"):
tier_bucket[t["id"]] = t
for L in label_bucket.values():
await _upsert_label(sess, L)
for T in tier_bucket.values():
await _upsert_tier(sess, T)
for N in newsletter_bucket.values():
await _upsert_newsletter(sess, N)
for gm in members:
user = await _find_or_create_user_by_ghost_or_email(sess, gm)
await _apply_user_membership(sess, user, gm)
async def fetch_single_member_from_ghost(ghost_id: str) -> Optional[dict[str, Any]]:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(
f"{GHOST_ADMIN_API_URL}/members/{ghost_id}/?include=labels,newsletters,subscriptions,tiers",
headers=_auth_header(),
)
if resp.status_code == 404:
return None
resp.raise_for_status()
data = resp.json()
items = data.get("members") or data.get("member") or []
if isinstance(items, dict):
return items
return (items[0] if items else None)
async def sync_single_member(sess: AsyncSession, ghost_id: str) -> None:
"""Sync a single member from Ghost into DB."""
m = await fetch_single_member_from_ghost(ghost_id)
if m is None:
return
for l in m.get("labels") or []:
await _upsert_label(sess, l)
for n in m.get("newsletters") or []:
await _upsert_newsletter(sess, n)
for s in m.get("subscriptions") or []:
if isinstance(s.get("tier"), dict):
await _upsert_tier(sess, s["tier"])
user = await _find_or_create_user_by_ghost_or_email(sess, m)
await _apply_user_membership(sess, user, m)