This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
blog/bp/blog/ghost/ghost_sync.py
giles 49e7739853
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m6s
Bold post titles in federated AP content
2026-02-23 21:40:04 +00:00

1241 lines
44 KiB
Python

from __future__ import annotations
import os
import re
import asyncio
from datetime import datetime
from html import escape as html_escape
from typing import Dict, Any, Optional
import httpx
from sqlalchemy import select, delete, or_, and_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm.attributes import flag_modified # for non-Mutable JSON columns
# Content models
from models.ghost_content import (
Post, Author, Tag, PostAuthor, PostTag
)
from shared.models.page_config import PageConfig
# User-centric membership models
from shared.models import User
from shared.models.ghost_membership_entities import (
GhostLabel, UserLabel,
GhostNewsletter, UserNewsletter,
GhostTier, GhostSubscription,
)
from .ghost_admin_token import make_ghost_admin_jwt
from urllib.parse import quote
GHOST_ADMIN_API_URL = os.environ["GHOST_ADMIN_API_URL"]
from shared.browser.app.utils import (
utcnow
)
def _auth_header() -> dict[str, str]:
return {"Authorization": f"Ghost {make_ghost_admin_jwt()}"}
def _iso(val: str | None) -> datetime | None:
if not val:
return None
return datetime.fromisoformat(val.replace("Z", "+00:00"))
def _to_str_or_none(v) -> Optional[str]:
"""Return a trimmed string if v is safely stringifiable; else None."""
if v is None:
return None
# Disallow complex types that would stringify to JSON-like noise
if isinstance(v, (dict, list, set, tuple, bytes, bytearray)):
return None
s = str(v).strip()
return s or None
def _sanitize_member_payload(payload: dict) -> dict:
"""Coerce types Ghost expects and drop empties to avoid 422/500 quirks."""
out: dict = {}
# email -> lowercase string
email = _to_str_or_none(payload.get("email"))
if email:
out["email"] = email.lower()
# name / note must be strings if present
name = _to_str_or_none(payload.get("name"))
if name is not None:
out["name"] = name
note = _to_str_or_none(payload.get("note"))
if note is not None:
out["note"] = note
# subscribed -> bool
if "subscribed" in payload:
out["subscribed"] = bool(payload.get("subscribed"))
# labels: keep only rows that have a non-empty id OR name
labels = []
for item in payload.get("labels") or []:
gid = _to_str_or_none(item.get("id"))
gname = _to_str_or_none(item.get("name"))
if gid:
labels.append({"id": gid})
elif gname: # only include if non-empty
labels.append({"name": gname})
if labels:
out["labels"] = labels
# newsletters: keep only rows with id OR name; coerce subscribed -> bool
newsletters = []
for item in payload.get("newsletters") or []:
gid = _to_str_or_none(item.get("id"))
gname = _to_str_or_none(item.get("name"))
row = {"subscribed": bool(item.get("subscribed", True))}
if gid:
row["id"] = gid
newsletters.append(row)
elif gname:
row["name"] = gname
newsletters.append(row)
if newsletters:
out["newsletters"] = newsletters
# id (if we carry a known ghost_id)
gid = _to_str_or_none(payload.get("id"))
if gid:
out["id"] = gid
return out
# =====================
# CONTENT UPSERT HELPERS
# =====================
async def _upsert_author(sess: AsyncSession, ga: Dict[str, Any]) -> Author:
res = await sess.execute(select(Author).where(Author.ghost_id == ga["id"]))
obj = res.scalar_one_or_none()
if obj is None:
obj = Author(ghost_id=ga["id"])
sess.add(obj)
# revive if soft-deleted
obj.deleted_at = None
obj.slug = ga.get("slug") or obj.slug
obj.name = ga.get("name") or obj.name
obj.email = ga.get("email") or obj.email
obj.profile_image = ga.get("profile_image")
obj.cover_image = ga.get("cover_image")
obj.bio = ga.get("bio")
obj.website = ga.get("website")
obj.location = ga.get("location")
obj.facebook = ga.get("facebook")
obj.twitter = ga.get("twitter")
obj.created_at = _iso(ga.get("created_at")) or obj.created_at or utcnow()
obj.updated_at = _iso(ga.get("updated_at")) or utcnow()
await sess.flush()
return obj
async def _upsert_tag(sess: AsyncSession, gt: Dict[str, Any]) -> Tag:
res = await sess.execute(select(Tag).where(Tag.ghost_id == gt["id"]))
obj = res.scalar_one_or_none()
if obj is None:
obj = Tag(ghost_id=gt["id"])
sess.add(obj)
obj.deleted_at = None # revive if soft-deleted
obj.slug = gt.get("slug") or obj.slug
obj.name = gt.get("name") or obj.name
obj.description = gt.get("description")
obj.visibility = gt.get("visibility") or obj.visibility
obj.feature_image = gt.get("feature_image")
obj.meta_title = gt.get("meta_title")
obj.meta_description = gt.get("meta_description")
obj.created_at = _iso(gt.get("created_at")) or obj.created_at or utcnow()
obj.updated_at = _iso(gt.get("updated_at")) or utcnow()
await sess.flush()
return obj
def _apply_ghost_fields(obj: Post, gp: Dict[str, Any], author_map: Dict[str, Author], tag_map: Dict[str, Tag]) -> None:
"""Apply Ghost API fields to a Post ORM object."""
obj.deleted_at = None # revive if soft-deleted
obj.uuid = gp.get("uuid") or obj.uuid
obj.slug = gp.get("slug") or obj.slug
obj.title = gp.get("title") or obj.title
obj.html = gp.get("html")
obj.plaintext = gp.get("plaintext")
obj.mobiledoc = gp.get("mobiledoc")
obj.lexical = gp.get("lexical")
obj.feature_image = gp.get("feature_image")
obj.feature_image_alt = gp.get("feature_image_alt")
obj.feature_image_caption = gp.get("feature_image_caption")
obj.excerpt = gp.get("excerpt")
obj.custom_excerpt = gp.get("custom_excerpt")
obj.visibility = gp.get("visibility") or obj.visibility
obj.status = gp.get("status") or obj.status
obj.featured = bool(gp.get("featured") or False)
obj.is_page = bool(gp.get("page") or False)
obj.email_only = bool(gp.get("email_only") or False)
obj.canonical_url = gp.get("canonical_url")
obj.meta_title = gp.get("meta_title")
obj.meta_description = gp.get("meta_description")
obj.og_image = gp.get("og_image")
obj.og_title = gp.get("og_title")
obj.og_description = gp.get("og_description")
obj.twitter_image = gp.get("twitter_image")
obj.twitter_title = gp.get("twitter_title")
obj.twitter_description = gp.get("twitter_description")
obj.custom_template = gp.get("custom_template")
obj.reading_time = gp.get("reading_time")
obj.comment_id = gp.get("comment_id")
obj.published_at = _iso(gp.get("published_at"))
obj.updated_at = _iso(gp.get("updated_at")) or obj.updated_at or utcnow()
obj.created_at = _iso(gp.get("created_at")) or obj.created_at or utcnow()
pa = gp.get("primary_author")
obj.primary_author_id = author_map[pa["id"].strip()].id if pa else None # type: ignore[index]
pt = gp.get("primary_tag")
obj.primary_tag_id = tag_map[pt["id"].strip()].id if (pt and pt["id"] in tag_map) else None # type: ignore[index]
async def _upsert_post(sess: AsyncSession, gp: Dict[str, Any], author_map: Dict[str, Author], tag_map: Dict[str, Tag]) -> tuple[Post, str | None]:
"""Upsert a post. Returns (post, old_status) where old_status is None for new rows."""
from sqlalchemy.exc import IntegrityError
res = await sess.execute(select(Post).where(Post.ghost_id == gp["id"]))
obj = res.scalar_one_or_none()
old_status = obj.status if obj is not None else None
if obj is not None:
# Row exists — just update
_apply_ghost_fields(obj, gp, author_map, tag_map)
await sess.flush()
else:
# Row doesn't exist — try to insert within a savepoint
obj = Post(ghost_id=gp["id"]) # type: ignore[call-arg]
try:
async with sess.begin_nested():
sess.add(obj)
_apply_ghost_fields(obj, gp, author_map, tag_map)
await sess.flush()
except IntegrityError:
# Race condition: another request inserted this ghost_id.
# Savepoint rolled back; re-select and update.
res = await sess.execute(select(Post).where(Post.ghost_id == gp["id"]))
obj = res.scalar_one()
_apply_ghost_fields(obj, gp, author_map, tag_map)
await sess.flush()
# Backfill user_id from primary author email if not already set
if obj.user_id is None and obj.primary_author_id is not None:
pa_obj = author_map.get(gp.get("primary_author", {}).get("id", ""))
if pa_obj and pa_obj.email:
user_res = await sess.execute(
select(User).where(User.email.ilike(pa_obj.email))
)
matched_user = user_res.scalar_one_or_none()
if matched_user:
obj.user_id = matched_user.id
await sess.flush()
# rebuild post_authors
await sess.execute(delete(PostAuthor).where(PostAuthor.post_id == obj.id))
for idx, a in enumerate(gp.get("authors") or []):
aa = author_map[a["id"]]
sess.add(PostAuthor(post_id=obj.id, author_id=aa.id, sort_order=idx))
# rebuild post_tags
await sess.execute(delete(PostTag).where(PostTag.post_id == obj.id))
for idx, t in enumerate(gp.get("tags") or []):
tt = tag_map[t["id"]]
sess.add(PostTag(post_id=obj.id, tag_id=tt.id, sort_order=idx))
# Auto-create PageConfig for pages
if obj.is_page:
existing_pc = (await sess.execute(
select(PageConfig).where(PageConfig.container_type == "page", PageConfig.container_id == obj.id)
)).scalar_one_or_none()
if existing_pc is None:
sess.add(PageConfig(container_type="page", container_id=obj.id, features={}))
await sess.flush()
return obj, old_status
async def _ghost_find_member_by_email(email: str) -> Optional[dict]:
"""Return first Ghost member with this email, or None."""
if not email:
return None
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(
f"{GHOST_ADMIN_API_URL}/members/?filter=email:{quote(email)}&limit=1",
headers=_auth_header(),
)
resp.raise_for_status()
members = resp.json().get("members") or []
return members[0] if members else None
# --- add this helper next to fetch_all_posts_from_ghost() ---
async def _fetch_all_from_ghost(endpoint: str) -> list[dict[str, Any]]:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(
f"{GHOST_ADMIN_API_URL}/{endpoint}/?include=authors,tags&limit=all&formats=html,plaintext,mobiledoc,lexical",
headers=_auth_header(),
)
resp.raise_for_status()
# admin posts endpoint returns {"posts": [...]}, pages returns {"pages": [...]}
key = "posts" if endpoint == "posts" else "pages"
return resp.json().get(key, [])
async def fetch_all_posts_and_pages_from_ghost() -> list[dict[str, Any]]:
posts, pages = await asyncio.gather(
_fetch_all_from_ghost("posts"),
_fetch_all_from_ghost("pages"),
)
# Be explicit: ensure page flag exists for pages (Ghost typically includes "page": true)
for p in pages:
p["page"] = True
return posts + pages
async def sync_all_content_from_ghost(sess: AsyncSession) -> None:
#data = await fetch_all_posts_from_ghost()
data = await fetch_all_posts_and_pages_from_ghost()
# Use a transaction so all upserts/soft-deletes commit together
# buckets of authors/tags we saw in Ghost
author_bucket: Dict[str, dict[str, Any]] = {}
tag_bucket: Dict[str, dict[str, Any]] = {}
for p in data:
for a in p.get("authors") or []:
author_bucket[a["id"]] = a
if p.get("primary_author"):
author_bucket[p["primary_author"]["id"]] = p["primary_author"]
for t in p.get("tags") or []:
tag_bucket[t["id"]] = t
if p.get("primary_tag"):
tag_bucket[p["primary_tag"]["id"]] = p["primary_tag"]
# sets of ghost_ids we've seen in Ghost RIGHT NOW
seen_post_ids = {p["id"] for p in data}
seen_author_ids = set(author_bucket.keys())
seen_tag_ids = set(tag_bucket.keys())
# upsert authors
author_map: Dict[str, Author] = {}
for ga in author_bucket.values():
a = await _upsert_author(sess, ga)
author_map[ga["id"]] = a
# upsert tags
tag_map: Dict[str, Tag] = {}
for gt in tag_bucket.values():
t = await _upsert_tag(sess, gt)
tag_map[gt["id"]] = t
# upsert posts (including M2M)
for gp in data:
await _upsert_post(sess, gp, author_map, tag_map)
# soft-delete anything that no longer exists in Ghost
now = utcnow()
# Authors not seen -> mark deleted_at if not already
db_authors = await sess.execute(select(Author))
for local_author in db_authors.scalars():
if local_author.ghost_id not in seen_author_ids:
if local_author.deleted_at is None:
local_author.deleted_at = now
# Tags not seen -> mark deleted_at
db_tags = await sess.execute(select(Tag))
for local_tag in db_tags.scalars():
if local_tag.ghost_id not in seen_tag_ids:
if local_tag.deleted_at is None:
local_tag.deleted_at = now
# Posts not seen -> mark deleted_at
db_posts = await sess.execute(select(Post))
for local_post in db_posts.scalars():
if local_post.ghost_id not in seen_post_ids:
if local_post.deleted_at is None:
local_post.deleted_at = now
# transaction auto-commits here
#=====================================================
# MEMBERSHIP SYNC (USER-CENTRIC) Ghost -> DB
#=====================================================
def _member_email(m: dict[str, Any]) -> Optional[str]:
email = (m.get("email") or "").strip().lower() or None
return email
# ---- small upsert helpers for related entities ----
async def _upsert_label(sess: AsyncSession, data: dict) -> GhostLabel:
res = await sess.execute(select(GhostLabel).where(GhostLabel.ghost_id == data["id"]))
obj = res.scalar_one_or_none()
if not obj:
obj = GhostLabel(ghost_id=data["id"])
sess.add(obj)
obj.name = data.get("name") or obj.name
obj.slug = data.get("slug") or obj.slug
await sess.flush()
return obj
async def _upsert_newsletter(sess: AsyncSession, data: dict) -> GhostNewsletter:
res = await sess.execute(select(GhostNewsletter).where(GhostNewsletter.ghost_id == data["id"]))
obj = res.scalar_one_or_none()
if not obj:
obj = GhostNewsletter(ghost_id=data["id"])
sess.add(obj)
obj.name = data.get("name") or obj.name
obj.slug = data.get("slug") or obj.slug
obj.description = data.get("description") or obj.description
await sess.flush()
return obj
async def _upsert_tier(sess: AsyncSession, data: dict) -> GhostTier:
res = await sess.execute(select(GhostTier).where(GhostTier.ghost_id == data["id"]))
obj = res.scalar_one_or_none()
if not obj:
obj = GhostTier(ghost_id=data["id"])
sess.add(obj)
obj.name = data.get("name") or obj.name
obj.slug = data.get("slug") or obj.slug
obj.type = data.get("type") or obj.type
obj.visibility = data.get("visibility") or obj.visibility
await sess.flush()
return obj
def _price_cents(sd: dict) -> Optional[int]:
try:
return int((sd.get("price") or {}).get("amount"))
except Exception:
return None
# ---- application of member payload onto User + related tables ----
async def _find_or_create_user_by_ghost_or_email(sess: AsyncSession, data: dict) -> User:
ghost_id = data.get("id")
email = _member_email(data)
if ghost_id:
res = await sess.execute(select(User).where(User.ghost_id == ghost_id))
u = res.scalar_one_or_none()
if u:
return u
if email:
res = await sess.execute(select(User).where(User.email.ilike(email)))
u = res.scalar_one_or_none()
if u:
if ghost_id and not u.ghost_id:
u.ghost_id = ghost_id
return u
# create a new user (Ghost is source of truth for member list)
u = User(email=email or f"_ghost_{ghost_id}@invalid.local")
if ghost_id:
u.ghost_id = ghost_id
sess.add(u)
await sess.flush()
return u
async def _apply_user_membership(sess: AsyncSession, user: User, m: dict) -> User:
"""Apply Ghost member payload to local User WITHOUT touching relationship collections directly.
We mutate join tables explicitly to avoid lazy-loads (which cause MissingGreenlet in async).
"""
sess.add(user)
# scalar fields
user.name = m.get("name") or user.name
user.ghost_status = m.get("status") or user.ghost_status
user.ghost_subscribed = bool(m.get("subscribed", True))
user.ghost_note = m.get("note") or user.ghost_note
user.avatar_image = m.get("avatar_image") or user.avatar_image
user.stripe_customer_id = (
(m.get("stripe") or {}).get("customer_id")
or (m.get("customer") or {}).get("id")
or m.get("stripe_customer_id")
or user.stripe_customer_id
)
user.ghost_raw = dict(m)
flag_modified(user, "ghost_raw")
await sess.flush() # ensure user.id exists
# Labels join
label_ids: list[int] = []
for ld in m.get("labels") or []:
lbl = await _upsert_label(sess, ld)
label_ids.append(lbl.id)
await sess.execute(delete(UserLabel).where(UserLabel.user_id == user.id))
for lid in label_ids:
sess.add(UserLabel(user_id=user.id, label_id=lid))
await sess.flush()
# Newsletters join with subscribed flag
nl_rows: list[tuple[int, bool]] = []
for nd in m.get("newsletters") or []:
nl = await _upsert_newsletter(sess, nd)
nl_rows.append((nl.id, bool(nd.get("subscribed", True))))
await sess.execute(delete(UserNewsletter).where(UserNewsletter.user_id == user.id))
for nl_id, subbed in nl_rows:
sess.add(UserNewsletter(user_id=user.id, newsletter_id=nl_id, subscribed=subbed))
await sess.flush()
# Subscriptions
for sd in m.get("subscriptions") or []:
sid = sd.get("id")
if not sid:
continue
tier_id: Optional[int] = None
if sd.get("tier"):
tier = await _upsert_tier(sess, sd["tier"])
await sess.flush()
tier_id = tier.id
res = await sess.execute(select(GhostSubscription).where(GhostSubscription.ghost_id == sid))
sub = res.scalar_one_or_none()
if not sub:
sub = GhostSubscription(ghost_id=sid, user_id=user.id)
sess.add(sub)
sub.user_id = user.id
sub.status = sd.get("status") or sub.status
sub.cadence = (sd.get("plan") or {}).get("interval") or sd.get("cadence") or sub.cadence
sub.price_amount = _price_cents(sd)
sub.price_currency = (sd.get("price") or {}).get("currency") or sub.price_currency
sub.stripe_customer_id = (
(sd.get("customer") or {}).get("id")
or (sd.get("stripe") or {}).get("customer_id")
or sub.stripe_customer_id
)
sub.stripe_subscription_id = (
sd.get("stripe_subscription_id")
or (sd.get("stripe") or {}).get("subscription_id")
or sub.stripe_subscription_id
)
if tier_id is not None:
sub.tier_id = tier_id
sub.raw = dict(sd)
flag_modified(sub, "raw")
await sess.flush()
return user
# =====================================================
# PUSH MEMBERS FROM LOCAL DB -> GHOST (DB -> Ghost)
# =====================================================
def _ghost_member_payload_base(u: User) -> dict:
"""Compose writable Ghost member fields from local User, validating types."""
email = _to_str_or_none(getattr(u, "email", None))
payload: dict = {}
if email:
payload["email"] = email.lower()
name = _to_str_or_none(getattr(u, "name", None))
if name:
payload["name"] = name
note = _to_str_or_none(getattr(u, "ghost_note", None))
if note:
payload["note"] = note
# If ghost_subscribed is None, default True (Ghost expects boolean)
subscribed = getattr(u, "ghost_subscribed", True)
payload["subscribed"] = bool(subscribed)
return payload
async def _newsletters_for_user(sess: AsyncSession, user_id: int) -> list[dict]:
"""Return list of {'id': ghost_id, 'subscribed': bool} rows for Ghost API, excluding blanks."""
q = await sess.execute(
select(GhostNewsletter.ghost_id, UserNewsletter.subscribed, GhostNewsletter.name)
.join(UserNewsletter, UserNewsletter.newsletter_id == GhostNewsletter.id)
.where(UserNewsletter.user_id == user_id)
)
seen = set()
out: list[dict] = []
for gid, subscribed, name in q.all():
gid = (gid or "").strip() or None
name = (name or "").strip() or None
row: dict = {"subscribed": bool(subscribed)}
if gid:
key = ("id", gid)
if key in seen:
continue
row["id"] = gid
seen.add(key)
out.append(row)
elif name:
key = ("name", name.lower())
if key in seen:
continue
row["name"] = name
seen.add(key)
out.append(row)
# else: skip
return out
async def _labels_for_user(sess: AsyncSession, user_id: int) -> list[dict]:
"""Return list of {'id': ghost_id} or {'name': name} for Ghost API, excluding blanks."""
q = await sess.execute(
select(GhostLabel.ghost_id, GhostLabel.name)
.join(UserLabel, UserLabel.label_id == GhostLabel.id)
.where(UserLabel.user_id == user_id)
)
seen = set()
out: list[dict] = []
for gid, name in q.all():
gid = (gid or "").strip() or None
name = (name or "").strip() or None
if gid:
key = ("id", gid)
if key not in seen:
out.append({"id": gid})
seen.add(key)
elif name:
key = ("name", name.lower())
if key not in seen:
out.append({"name": name})
seen.add(key)
# else: skip empty label row
return out
async def _ghost_find_member_by_email(email: str) -> dict | None:
"""Query Ghost for a member by email to resolve conflicts / missing IDs."""
if not email:
return None
async with httpx.AsyncClient(timeout=20) as client:
resp = await client.get(
f"{GHOST_ADMIN_API_URL}/members/",
headers=_auth_header(),
params={"filter": f"email:{email}", "limit": 1},
)
resp.raise_for_status()
members = (resp.json() or {}).get("members") or []
return members[0] if members else None
from urllib.parse import quote # make sure this import exists at top
async def _ghost_find_member_by_email(email: str) -> Optional[dict]:
if not email:
return None
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(
f"{GHOST_ADMIN_API_URL}/members/?filter=email:{quote(email)}&limit=1",
headers=_auth_header(),
)
resp.raise_for_status()
members = resp.json().get("members") or []
return members[0] if members else None
async def _ghost_upsert_member(payload: dict, ghost_id: str | None = None) -> dict:
"""Create/update a member, with sanitization + 5xx retry/backoff.
- Prefer PUT if ghost_id given.
- On 422: retry without name/note; if 'already exists', find-by-email then PUT.
- On 404: find-by-email and PUT; if still missing, POST create.
- On 5xx: small exponential backoff retry.
"""
safe_keys = ("email", "name", "note", "subscribed", "labels", "newsletters", "id")
pl_raw = {k: v for k, v in payload.items() if k in safe_keys}
pl = _sanitize_member_payload(pl_raw)
async def _request_with_retry(client: httpx.AsyncClient, method: str, url: str, json: dict) -> httpx.Response:
delay = 0.5
for attempt in range(3):
r = await client.request(method, url, headers=_auth_header(), json=json)
if r.status_code >= 500:
if attempt < 2:
await asyncio.sleep(delay)
delay *= 2
continue
return r
return r # last response
async with httpx.AsyncClient(timeout=30) as client:
async def _put(mid: str, p: dict) -> dict:
r = await _request_with_retry(
client, "PUT",
f"{GHOST_ADMIN_API_URL}/members/{mid}/",
{"members": [p]},
)
if r.status_code == 404:
# Stale id: try by email, then create if absent
existing = await _ghost_find_member_by_email(p.get("email", ""))
if existing and existing.get("id"):
r2 = await _request_with_retry(
client, "PUT",
f"{GHOST_ADMIN_API_URL}/members/{existing['id']}/",
{"members": [p]},
)
r2.raise_for_status()
return (r2.json().get("members") or [None])[0] or {}
r3 = await _request_with_retry(
client, "POST",
f"{GHOST_ADMIN_API_URL}/members/",
{"members": [p]},
)
r3.raise_for_status()
return (r3.json().get("members") or [None])[0] or {}
if r.status_code == 422:
body = (r.text or "").lower()
retry = dict(p)
dropped = False
if '"note"' in body or "for note" in body:
retry.pop("note", None); dropped = True
if '"name"' in body or "for name" in body:
retry.pop("name", None); dropped = True
if "labels.name" in body:
retry.pop("labels", None); dropped = True
if dropped:
r2 = await _request_with_retry(
client, "PUT",
f"{GHOST_ADMIN_API_URL}/members/{mid}/",
{"members": [retry]},
)
if r2.status_code == 404:
existing = await _ghost_find_member_by_email(retry.get("email", ""))
if existing and existing.get("id"):
r3 = await _request_with_retry(
client, "PUT",
f"{GHOST_ADMIN_API_URL}/members/{existing['id']}/",
{"members": [retry]},
)
r3.raise_for_status()
return (r3.json().get("members") or [None])[0] or {}
r3 = await _request_with_retry(
client, "POST",
f"{GHOST_ADMIN_API_URL}/members/",
{"members": [retry]},
)
r3.raise_for_status()
return (r3.json().get("members") or [None])[0] or {}
r2.raise_for_status()
return (r2.json().get("members") or [None])[0] or {}
r.raise_for_status()
return (r.json().get("members") or [None])[0] or {}
async def _post_upsert(p: dict) -> dict:
r = await _request_with_retry(
client, "POST",
f"{GHOST_ADMIN_API_URL}/members/?upsert=true",
{"members": [p]},
)
if r.status_code == 422:
lower = (r.text or "").lower()
# sanitize further name/note/labels on schema complaints
retry = dict(p)
changed = False
if '"note"' in lower or "for note" in lower:
retry.pop("note", None); changed = True
if '"name"' in lower or "for name" in lower:
retry.pop("name", None); changed = True
if "labels.name" in lower:
retry.pop("labels", None); changed = True
if changed:
r2 = await _request_with_retry(
client, "POST",
f"{GHOST_ADMIN_API_URL}/members/?upsert=true",
{"members": [retry]},
)
if r2.status_code != 422:
r2.raise_for_status()
return (r2.json().get("members") or [None])[0] or {}
lower = (r2.text or "").lower()
# existing email => find-by-email then PUT
if "already exists" in lower and "email address" in lower:
existing = await _ghost_find_member_by_email(p.get("email", ""))
if existing and existing.get("id"):
return await _put(existing["id"], p)
# unrecoverable
raise httpx.HTTPStatusError(
"Validation error, cannot edit member.",
request=r.request,
response=r,
)
r.raise_for_status()
return (r.json().get("members") or [None])[0] or {}
if ghost_id:
return await _put(ghost_id, pl)
return await _post_upsert(pl)
async def sync_member_to_ghost(sess: AsyncSession, user_id: int) -> Optional[str]:
res = await sess.execute(select(User).where(User.id == user_id))
user = res.scalar_one_or_none()
if not user:
return None
payload = _ghost_member_payload_base(user)
labels = await _labels_for_user(sess, user.id)
if labels:
payload["labels"] = labels # Ghost accepts label ids on upsert
ghost_member = await _ghost_upsert_member(payload, ghost_id=user.ghost_id)
if ghost_member:
gm_id = ghost_member.get("id")
if gm_id and user.ghost_id != gm_id:
user.ghost_id = gm_id
user.ghost_raw = dict(ghost_member)
flag_modified(user, "ghost_raw")
await sess.flush()
return user.ghost_id or gm_id
return user.ghost_id
async def sync_members_to_ghost(
sess: AsyncSession,
changed_since: Optional[datetime] = None,
limit: Optional[int] = None,
) -> int:
"""Upsert a batch of users to Ghost. Returns count processed."""
stmt = select(User.id)
if changed_since:
stmt = stmt.where(
or_(
User.created_at >= changed_since,
and_(User.last_login_at != None, User.last_login_at >= changed_since),
)
)
if limit:
stmt = stmt.limit(limit)
ids = [row[0] for row in (await sess.execute(stmt)).all()]
processed = 0
for uid in ids:
try:
await sync_member_to_ghost(sess, uid)
processed += 1
except httpx.HTTPStatusError as e:
# Log and continue; don't kill startup
print(f"[ghost sync] failed upsert for user {uid}: {e.response.status_code} {e.response.text}")
except Exception as e:
print(f"[ghost sync] failed upsert for user {uid}: {e}")
return processed
# =====================================================
# Membership fetch/sync (Ghost -> DB) bulk + single
# =====================================================
async def fetch_all_members_from_ghost() -> list[dict[str, Any]]:
async with httpx.AsyncClient(timeout=60) as client:
resp = await client.get(
f"{GHOST_ADMIN_API_URL}/members/?include=labels,subscriptions,tiers,newsletters&limit=all",
headers=_auth_header(),
)
resp.raise_for_status()
return resp.json().get("members", [])
async def sync_all_membership_from_ghost(sess: AsyncSession) -> None:
members = await fetch_all_members_from_ghost()
# collect related lookups and ensure catalogs exist first (avoid FK races)
label_bucket: Dict[str, dict[str, Any]] = {}
tier_bucket: Dict[str, dict[str, Any]] = {}
newsletter_bucket: Dict[str, dict[str, Any]] = {}
for m in members:
for l in m.get("labels") or []:
label_bucket[l["id"]] = l
for n in m.get("newsletters") or []:
newsletter_bucket[n["id"]] = n
for s in m.get("subscriptions") or []:
t = s.get("tier")
if isinstance(t, dict) and t.get("id"):
tier_bucket[t["id"]] = t
for L in label_bucket.values():
await _upsert_label(sess, L)
for T in tier_bucket.values():
await _upsert_tier(sess, T)
for N in newsletter_bucket.values():
await _upsert_newsletter(sess, N)
# Users
for gm in members:
user = await _find_or_create_user_by_ghost_or_email(sess, gm)
await _apply_user_membership(sess, user, gm)
# transaction auto-commits here
async def fetch_single_member_from_ghost(ghost_id: str) -> Optional[dict[str, Any]]:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(
f"{GHOST_ADMIN_API_URL}/members/{ghost_id}/?include=labels,newsletters,subscriptions,tiers",
headers=_auth_header(),
)
if resp.status_code == 404:
return None
resp.raise_for_status()
data = resp.json()
items = data.get("members") or data.get("member") or []
if isinstance(items, dict):
return items
return (items[0] if items else None)
async def sync_single_member(sess: AsyncSession, ghost_id: str) -> None:
m = await fetch_single_member_from_ghost(ghost_id)
if m is None:
# If member deleted in Ghost, we won't delete local user here.
return
# ensure catalogs for this payload
for l in m.get("labels") or []:
await _upsert_label(sess, l)
for n in m.get("newsletters") or []:
await _upsert_newsletter(sess, n)
for s in m.get("subscriptions") or []:
if isinstance(s.get("tier"), dict):
await _upsert_tier(sess, s["tier"])
user = await _find_or_create_user_by_ghost_or_email(sess, m)
await _apply_user_membership(sess, user, m)
# transaction auto-commits here
# =====================================================
# Single-item content helpers (posts/authors/tags)
# =====================================================
async def fetch_single_post_from_ghost(ghost_id: str) -> Optional[dict[str, Any]]:
url = (
f"{GHOST_ADMIN_API_URL}/posts/{ghost_id}/"
"?include=authors,tags&formats=html,plaintext,mobiledoc,lexical"
)
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url, headers=_auth_header())
if resp.status_code == 404:
return None
resp.raise_for_status()
data = resp.json()
posts = data.get("posts") or []
return posts[0] if posts else None
async def fetch_single_page_from_ghost(ghost_id: str) -> Optional[dict[str, Any]]:
url = (
f"{GHOST_ADMIN_API_URL}/pages/{ghost_id}/"
"?include=authors,tags&formats=html,plaintext,mobiledoc,lexical"
)
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url, headers=_auth_header())
if resp.status_code == 404:
return None
resp.raise_for_status()
data = resp.json()
pages = data.get("pages") or []
return pages[0] if pages else None
async def fetch_single_author_from_ghost(ghost_id: str) -> Optional[dict[str, Any]]:
url = f"{GHOST_ADMIN_API_URL}/users/{ghost_id}/"
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url, headers=_auth_header())
if resp.status_code == 404:
return None
resp.raise_for_status()
data = resp.json()
users = data.get("users") or []
return users[0] if users else None
async def fetch_single_tag_from_ghost(ghost_id: str) -> Optional[dict[str, Any]]:
url = f"{GHOST_ADMIN_API_URL}/tags/{ghost_id}/"
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url, headers=_auth_header())
if resp.status_code == 404:
return None
resp.raise_for_status()
data = resp.json()
tags = data.get("tags") or []
return tags[0] if tags else None
def _build_ap_post_data(post, post_url: str, tag_objs: list) -> dict:
"""Build rich AP object_data for a blog post/page.
Produces a Note with HTML content (excerpt), feature image + inline
images as attachments, and tags as AP Hashtag objects.
"""
# Content HTML: title + excerpt + "Read more" link
parts: list[str] = []
if post.title:
parts.append(f"<p><strong>{html_escape(post.title)}</strong></p>")
body = post.plaintext or post.custom_excerpt or post.excerpt or ""
if body:
for para in body.split("\n\n"):
para = para.strip()
if para:
parts.append(f"<p>{html_escape(para)}</p>")
parts.append(f'<p><a href="{html_escape(post_url)}">Read more \u2192</a></p>')
# Hashtag links in content (Mastodon expects them inline too)
if tag_objs:
ht_links = []
for t in tag_objs:
clean = t.slug.replace("-", "")
ht_links.append(
f'<a href="{html_escape(post_url)}tag/{t.slug}/" rel="tag">#{clean}</a>'
)
parts.append(f'<p>{" ".join(ht_links)}</p>')
obj: dict = {
"name": post.title or "",
"content": "\n".join(parts),
"url": post_url,
}
# Attachments: feature image + inline images (max 4)
attachments: list[dict] = []
seen: set[str] = set()
if post.feature_image:
att: dict = {"type": "Image", "url": post.feature_image}
if post.feature_image_alt:
att["name"] = post.feature_image_alt
attachments.append(att)
seen.add(post.feature_image)
if post.html:
for src in re.findall(r'<img[^>]+src="([^"]+)"', post.html):
if src not in seen and len(attachments) < 4:
attachments.append({"type": "Image", "url": src})
seen.add(src)
if attachments:
obj["attachment"] = attachments
# AP Hashtag objects
if tag_objs:
obj["tag"] = [
{
"type": "Hashtag",
"href": f"{post_url}tag/{t.slug}/",
"name": f"#{t.slug.replace('-', '')}",
}
for t in tag_objs
]
return obj
async def sync_single_post(sess: AsyncSession, ghost_id: str) -> None:
gp = await fetch_single_post_from_ghost(ghost_id)
if gp is None:
res = await sess.execute(select(Post).where(Post.ghost_id == ghost_id))
obj = res.scalar_one_or_none()
if obj is not None and obj.deleted_at is None:
obj.deleted_at = utcnow()
return
author_map: Dict[str, Author] = {}
tag_map: Dict[str, Tag] = {}
for a in gp.get("authors") or []:
author_obj = await _upsert_author(sess, a)
author_map[a["id"]] = author_obj
if gp.get("primary_author"):
pa = gp["primary_author"]
author_obj = await _upsert_author(sess, pa)
author_map[pa["id"]] = author_obj
for t in gp.get("tags") or []:
tag_obj = await _upsert_tag(sess, t)
tag_map[t["id"]] = tag_obj
if gp.get("primary_tag"):
pt = gp["primary_tag"]
tag_obj = await _upsert_tag(sess, pt)
tag_map[pt["id"]] = tag_obj
post, old_status = await _upsert_post(sess, gp, author_map, tag_map)
# Publish to federation inline (posts, not pages)
if not post.is_page and post.user_id:
from shared.services.federation_publish import try_publish
from shared.infrastructure.urls import app_url
post_url = app_url("blog", f"/{post.slug}/")
post_tags = [tag_map[t["id"]] for t in (gp.get("tags") or []) if t["id"] in tag_map]
if post.status == "published":
activity_type = "Create" if old_status != "published" else "Update"
await try_publish(
sess,
user_id=post.user_id,
activity_type=activity_type,
object_type="Note",
object_data=_build_ap_post_data(post, post_url, post_tags),
source_type="Post",
source_id=post.id,
)
elif old_status == "published" and post.status != "published":
await try_publish(
sess,
user_id=post.user_id,
activity_type="Delete",
object_type="Tombstone",
object_data={
"id": post_url,
"formerType": "Note",
},
source_type="Post",
source_id=post.id,
)
async def sync_single_page(sess: AsyncSession, ghost_id: str) -> None:
gp = await fetch_single_page_from_ghost(ghost_id)
if gp is not None:
gp["page"] = True # Ghost /pages/ endpoint may omit this flag
if gp is None:
res = await sess.execute(select(Post).where(Post.ghost_id == ghost_id))
obj = res.scalar_one_or_none()
if obj is not None and obj.deleted_at is None:
obj.deleted_at = utcnow()
return
author_map: Dict[str, Author] = {}
tag_map: Dict[str, Tag] = {}
for a in gp.get("authors") or []:
author_obj = await _upsert_author(sess, a)
author_map[a["id"]] = author_obj
if gp.get("primary_author"):
pa = gp["primary_author"]
author_obj = await _upsert_author(sess, pa)
author_map[pa["id"]] = author_obj
for t in gp.get("tags") or []:
tag_obj = await _upsert_tag(sess, t)
tag_map[t["id"]] = tag_obj
if gp.get("primary_tag"):
pt = gp["primary_tag"]
tag_obj = await _upsert_tag(sess, pt)
tag_map[pt["id"]] = tag_obj
post, old_status = await _upsert_post(sess, gp, author_map, tag_map)
# Publish to federation inline (pages)
if post.user_id:
from shared.services.federation_publish import try_publish
from shared.infrastructure.urls import app_url
post_url = app_url("blog", f"/{post.slug}/")
post_tags = [tag_map[t["id"]] for t in (gp.get("tags") or []) if t["id"] in tag_map]
if post.status == "published":
activity_type = "Create" if old_status != "published" else "Update"
await try_publish(
sess,
user_id=post.user_id,
activity_type=activity_type,
object_type="Note",
object_data=_build_ap_post_data(post, post_url, post_tags),
source_type="Post",
source_id=post.id,
)
elif old_status == "published" and post.status != "published":
await try_publish(
sess,
user_id=post.user_id,
activity_type="Delete",
object_type="Tombstone",
object_data={
"id": post_url,
"formerType": "Note",
},
source_type="Post",
source_id=post.id,
)
async def sync_single_author(sess: AsyncSession, ghost_id: str) -> None:
ga = await fetch_single_author_from_ghost(ghost_id)
if ga is None:
result = await sess.execute(select(Author).where(Author.ghost_id == ghost_id))
author_obj = result.scalar_one_or_none()
if author_obj and author_obj.deleted_at is None:
author_obj.deleted_at = utcnow()
return
await _upsert_author(sess, ga)
async def sync_single_tag(sess: AsyncSession, ghost_id: str) -> None:
gt = await fetch_single_tag_from_ghost(ghost_id)
if gt is None:
result = await sess.execute(select(Tag).where(Tag.ghost_id == ghost_id))
tag_obj = result.scalar_one_or_none()
if tag_obj and tag_obj.deleted_at is None:
tag_obj.deleted_at = utcnow()
return
await _upsert_tag(sess, gt)
# ---- explicit public exports (back-compat) ----
__all__ = [
# bulk content
"sync_all_content_from_ghost",
# bulk membership (user-centric)
"sync_all_membership_from_ghost",
# DB -> Ghost
"sync_member_to_ghost",
"sync_members_to_ghost",
# single fetch
"fetch_single_post_from_ghost",
"fetch_single_author_from_ghost",
"fetch_single_tag_from_ghost",
"fetch_single_member_from_ghost",
# single sync
"sync_single_post",
"sync_single_author",
"sync_single_tag",
"sync_single_member",
]