"""Ghost content sync — blog-owned. Handles Ghost ↔ blog DB sync for content data only: posts, pages, authors, tags. All models live in db_blog. Membership sync (users, labels, newsletters, tiers, subscriptions) is handled by the account service — see account/services/ghost_membership.py. """ from __future__ import annotations import os import re import asyncio from datetime import datetime from html import escape as html_escape from typing import Dict, Any, Optional import httpx import nh3 from sqlalchemy import select, delete from sqlalchemy.ext.asyncio import AsyncSession from models.ghost_content import ( Post, Author, Tag, PostAuthor, PostTag ) from shared.infrastructure.data_client import fetch_data from shared.infrastructure.ghost_admin_token import make_ghost_admin_jwt GHOST_ADMIN_API_URL = os.environ["GHOST_ADMIN_API_URL"] from shared.browser.app.utils import utcnow def _sanitize_html(html: str | None) -> str | None: """Sanitize HTML content using nh3, allowing safe formatting tags.""" if not html: return html return nh3.clean( html, tags={ "a", "abbr", "acronym", "b", "blockquote", "br", "code", "div", "em", "figcaption", "figure", "h1", "h2", "h3", "h4", "h5", "h6", "hr", "i", "img", "li", "ol", "p", "pre", "span", "strong", "sub", "sup", "table", "tbody", "td", "th", "thead", "tr", "ul", "video", "source", "picture", "iframe", "audio", }, attributes={ "*": {"class", "id", "style"}, "a": {"href", "title", "target"}, "img": {"src", "alt", "title", "width", "height", "loading"}, "video": {"src", "controls", "width", "height", "poster"}, "audio": {"src", "controls"}, "source": {"src", "type"}, "iframe": {"src", "width", "height", "frameborder", "allowfullscreen"}, "td": {"colspan", "rowspan"}, "th": {"colspan", "rowspan"}, }, link_rel="noopener noreferrer", url_schemes={"http", "https", "mailto"}, ) def _auth_header() -> dict[str, str]: return {"Authorization": f"Ghost {make_ghost_admin_jwt()}"} def _iso(val: str | None) -> datetime | None: if not val: return None return datetime.fromisoformat(val.replace("Z", "+00:00")) # ===================== # CONTENT UPSERT HELPERS # ===================== async def _upsert_author(sess: AsyncSession, ga: Dict[str, Any]) -> Author: res = await sess.execute(select(Author).where(Author.ghost_id == ga["id"])) obj = res.scalar_one_or_none() if obj is None: obj = Author(ghost_id=ga["id"]) sess.add(obj) obj.deleted_at = None obj.slug = ga.get("slug") or obj.slug obj.name = ga.get("name") or obj.name obj.email = ga.get("email") or obj.email obj.profile_image = ga.get("profile_image") obj.cover_image = ga.get("cover_image") obj.bio = ga.get("bio") obj.website = ga.get("website") obj.location = ga.get("location") obj.facebook = ga.get("facebook") obj.twitter = ga.get("twitter") obj.created_at = _iso(ga.get("created_at")) or obj.created_at or utcnow() obj.updated_at = _iso(ga.get("updated_at")) or utcnow() await sess.flush() return obj async def _upsert_tag(sess: AsyncSession, gt: Dict[str, Any]) -> Tag: res = await sess.execute(select(Tag).where(Tag.ghost_id == gt["id"])) obj = res.scalar_one_or_none() if obj is None: obj = Tag(ghost_id=gt["id"]) sess.add(obj) obj.deleted_at = None obj.slug = gt.get("slug") or obj.slug obj.name = gt.get("name") or obj.name obj.description = gt.get("description") obj.visibility = gt.get("visibility") or obj.visibility obj.feature_image = gt.get("feature_image") obj.meta_title = gt.get("meta_title") obj.meta_description = gt.get("meta_description") obj.created_at = _iso(gt.get("created_at")) or obj.created_at or utcnow() obj.updated_at = _iso(gt.get("updated_at")) or utcnow() await sess.flush() return obj def _apply_ghost_fields(obj: Post, gp: Dict[str, Any], author_map: Dict[str, Author], tag_map: Dict[str, Tag]) -> None: """Apply Ghost API fields to a Post ORM object.""" obj.deleted_at = None obj.uuid = gp.get("uuid") or obj.uuid obj.slug = gp.get("slug") or obj.slug obj.title = gp.get("title") or obj.title obj.html = _sanitize_html(gp.get("html")) obj.plaintext = gp.get("plaintext") obj.mobiledoc = gp.get("mobiledoc") obj.lexical = gp.get("lexical") obj.feature_image = gp.get("feature_image") obj.feature_image_alt = gp.get("feature_image_alt") obj.feature_image_caption = _sanitize_html(gp.get("feature_image_caption")) obj.excerpt = gp.get("excerpt") obj.custom_excerpt = gp.get("custom_excerpt") obj.visibility = gp.get("visibility") or obj.visibility obj.status = gp.get("status") or obj.status obj.featured = bool(gp.get("featured") or False) obj.is_page = bool(gp.get("page") or False) obj.email_only = bool(gp.get("email_only") or False) obj.canonical_url = gp.get("canonical_url") obj.meta_title = gp.get("meta_title") obj.meta_description = gp.get("meta_description") obj.og_image = gp.get("og_image") obj.og_title = gp.get("og_title") obj.og_description = gp.get("og_description") obj.twitter_image = gp.get("twitter_image") obj.twitter_title = gp.get("twitter_title") obj.twitter_description = gp.get("twitter_description") obj.custom_template = gp.get("custom_template") obj.reading_time = gp.get("reading_time") obj.comment_id = gp.get("comment_id") obj.published_at = _iso(gp.get("published_at")) obj.updated_at = _iso(gp.get("updated_at")) or obj.updated_at or utcnow() obj.created_at = _iso(gp.get("created_at")) or obj.created_at or utcnow() pa = gp.get("primary_author") obj.primary_author_id = author_map[pa["id"].strip()].id if pa else None pt = gp.get("primary_tag") obj.primary_tag_id = tag_map[pt["id"].strip()].id if (pt and pt["id"] in tag_map) else None async def _resolve_user_id_by_email(email: str) -> Optional[int]: """Look up user_id from account service via HTTP (cross-domain safe).""" from shared.infrastructure.data_client import fetch_data result = await fetch_data( "account", "user-by-email", params={"email": email}, required=False, ) if result and isinstance(result, dict): return result.get("user_id") return None async def _upsert_post(sess: AsyncSession, gp: Dict[str, Any], author_map: Dict[str, Author], tag_map: Dict[str, Tag]) -> tuple[Post, str | None]: """Upsert a post. Returns (post, old_status) where old_status is None for new rows.""" from sqlalchemy.exc import IntegrityError res = await sess.execute(select(Post).where(Post.ghost_id == gp["id"])) obj = res.scalar_one_or_none() old_status = obj.status if obj is not None else None if obj is not None: _apply_ghost_fields(obj, gp, author_map, tag_map) await sess.flush() else: obj = Post(ghost_id=gp["id"]) try: async with sess.begin_nested(): sess.add(obj) _apply_ghost_fields(obj, gp, author_map, tag_map) await sess.flush() except IntegrityError: res = await sess.execute(select(Post).where(Post.ghost_id == gp["id"])) obj = res.scalar_one() _apply_ghost_fields(obj, gp, author_map, tag_map) await sess.flush() # Backfill user_id from primary author email via account service if obj.user_id is None and obj.primary_author_id is not None: pa_obj = author_map.get(gp.get("primary_author", {}).get("id", "")) if pa_obj and pa_obj.email: user_id = await _resolve_user_id_by_email(pa_obj.email) if user_id: obj.user_id = user_id await sess.flush() # Rebuild post_authors + post_tags with synchronize_session to keep # identity map consistent and prevent autoflush IntegrityError. old_autoflush = sess.autoflush sess.autoflush = False try: # Delete + re-add post_authors (dedup for Ghost duplicate authors) await sess.execute( delete(PostAuthor).where(PostAuthor.post_id == obj.id), execution_options={"synchronize_session": "fetch"}, ) seen_authors: set[int] = set() for idx, a in enumerate(gp.get("authors") or []): aa = author_map[a["id"]] if aa.id not in seen_authors: seen_authors.add(aa.id) sess.add(PostAuthor(post_id=obj.id, author_id=aa.id, sort_order=idx)) # Delete + re-add post_tags (dedup similarly) await sess.execute( delete(PostTag).where(PostTag.post_id == obj.id), execution_options={"synchronize_session": "fetch"}, ) seen_tags: set[int] = set() for idx, t in enumerate(gp.get("tags") or []): tt = tag_map[t["id"]] if tt.id not in seen_tags: seen_tags.add(tt.id) sess.add(PostTag(post_id=obj.id, tag_id=tt.id, sort_order=idx)) await sess.flush() finally: sess.autoflush = old_autoflush # Auto-create PageConfig for pages (blog owns this table — direct DB, # not via HTTP, since this may run during startup before the server is ready) if obj.is_page: from shared.models.page_config import PageConfig existing = (await sess.execute( select(PageConfig).where( PageConfig.container_type == "page", PageConfig.container_id == obj.id, ) )).scalar_one_or_none() if existing is None: sess.add(PageConfig( container_type="page", container_id=obj.id, features={}, )) await sess.flush() return obj, old_status def _build_ap_post_data(post, post_url: str, tag_objs: list) -> dict: """Build rich AP object_data for a blog post/page.""" parts: list[str] = [] if post.title: parts.append(f"

{html_escape(post.title)}

") body = post.plaintext or post.custom_excerpt or post.excerpt or "" if body: for para in body.split("\n\n"): para = para.strip() if para: parts.append(f"

{html_escape(para)}

") parts.append(f'

Read more \u2192

') if tag_objs: ht_links = [] for t in tag_objs: clean = t.slug.replace("-", "") ht_links.append( f'' ) parts.append(f'

{" ".join(ht_links)}

') obj: dict = { "name": post.title or "", "content": "\n".join(parts), "url": post_url, } attachments: list[dict] = [] seen: set[str] = set() if post.feature_image: att: dict = {"type": "Image", "url": post.feature_image} if post.feature_image_alt: att["name"] = post.feature_image_alt attachments.append(att) seen.add(post.feature_image) if post.html: for src in re.findall(r']+src="([^"]+)"', post.html): if src not in seen and len(attachments) < 4: attachments.append({"type": "Image", "url": src}) seen.add(src) if attachments: obj["attachment"] = attachments if tag_objs: obj["tag"] = [ { "type": "Hashtag", "href": f"{post_url}tag/{t.slug}/", "name": f"#{t.slug.replace('-', '')}", } for t in tag_objs ] return obj # ===================================================== # Ghost API fetch helpers # ===================================================== async def _fetch_all_from_ghost(endpoint: str) -> list[dict[str, Any]]: async with httpx.AsyncClient(timeout=30) as client: resp = await client.get( f"{GHOST_ADMIN_API_URL}/{endpoint}/?include=authors,tags&limit=all&formats=html,plaintext,mobiledoc,lexical", headers=_auth_header(), ) resp.raise_for_status() key = "posts" if endpoint == "posts" else "pages" return resp.json().get(key, []) async def fetch_all_posts_and_pages_from_ghost() -> list[dict[str, Any]]: posts, pages = await asyncio.gather( _fetch_all_from_ghost("posts"), _fetch_all_from_ghost("pages"), ) for p in pages: p["page"] = True return posts + pages async def sync_all_content_from_ghost(sess: AsyncSession) -> None: """Bulk sync all Ghost content (posts + pages) into db_blog.""" data = await fetch_all_posts_and_pages_from_ghost() author_bucket: Dict[str, dict[str, Any]] = {} tag_bucket: Dict[str, dict[str, Any]] = {} for p in data: for a in p.get("authors") or []: author_bucket[a["id"]] = a if p.get("primary_author"): author_bucket[p["primary_author"]["id"]] = p["primary_author"] for t in p.get("tags") or []: tag_bucket[t["id"]] = t if p.get("primary_tag"): tag_bucket[p["primary_tag"]["id"]] = p["primary_tag"] seen_post_ids = {p["id"] for p in data} seen_author_ids = set(author_bucket.keys()) seen_tag_ids = set(tag_bucket.keys()) author_map: Dict[str, Author] = {} for ga in author_bucket.values(): a = await _upsert_author(sess, ga) author_map[ga["id"]] = a tag_map: Dict[str, Tag] = {} for gt in tag_bucket.values(): t = await _upsert_tag(sess, gt) tag_map[gt["id"]] = t for gp in data: await _upsert_post(sess, gp, author_map, tag_map) # soft-delete anything that no longer exists in Ghost now = utcnow() db_authors = await sess.execute(select(Author)) for local_author in db_authors.scalars(): if local_author.ghost_id not in seen_author_ids: if local_author.deleted_at is None: local_author.deleted_at = now db_tags = await sess.execute(select(Tag)) for local_tag in db_tags.scalars(): if local_tag.ghost_id not in seen_tag_ids: if local_tag.deleted_at is None: local_tag.deleted_at = now db_posts = await sess.execute(select(Post)) for local_post in db_posts.scalars(): if local_post.ghost_id not in seen_post_ids: if local_post.deleted_at is None: local_post.deleted_at = now # ===================================================== # Single-item content helpers (posts/authors/tags) # ===================================================== async def fetch_single_post_from_ghost(ghost_id: str) -> Optional[dict[str, Any]]: url = ( f"{GHOST_ADMIN_API_URL}/posts/{ghost_id}/" "?include=authors,tags&formats=html,plaintext,mobiledoc,lexical" ) async with httpx.AsyncClient(timeout=30) as client: resp = await client.get(url, headers=_auth_header()) if resp.status_code == 404: return None resp.raise_for_status() data = resp.json() posts = data.get("posts") or [] return posts[0] if posts else None async def fetch_single_page_from_ghost(ghost_id: str) -> Optional[dict[str, Any]]: url = ( f"{GHOST_ADMIN_API_URL}/pages/{ghost_id}/" "?include=authors,tags&formats=html,plaintext,mobiledoc,lexical" ) async with httpx.AsyncClient(timeout=30) as client: resp = await client.get(url, headers=_auth_header()) if resp.status_code == 404: return None resp.raise_for_status() data = resp.json() pages = data.get("pages") or [] return pages[0] if pages else None async def fetch_single_author_from_ghost(ghost_id: str) -> Optional[dict[str, Any]]: url = f"{GHOST_ADMIN_API_URL}/users/{ghost_id}/" async with httpx.AsyncClient(timeout=30) as client: resp = await client.get(url, headers=_auth_header()) if resp.status_code == 404: return None resp.raise_for_status() data = resp.json() users = data.get("users") or [] return users[0] if users else None async def fetch_single_tag_from_ghost(ghost_id: str) -> Optional[dict[str, Any]]: url = f"{GHOST_ADMIN_API_URL}/tags/{ghost_id}/" async with httpx.AsyncClient(timeout=30) as client: resp = await client.get(url, headers=_auth_header()) if resp.status_code == 404: return None resp.raise_for_status() data = resp.json() tags = data.get("tags") or [] return tags[0] if tags else None async def sync_single_post(sess: AsyncSession, ghost_id: str) -> None: gp = await fetch_single_post_from_ghost(ghost_id) if gp is None: res = await sess.execute(select(Post).where(Post.ghost_id == ghost_id)) obj = res.scalar_one_or_none() if obj is not None and obj.deleted_at is None: obj.deleted_at = utcnow() return author_map: Dict[str, Author] = {} tag_map: Dict[str, Tag] = {} for a in gp.get("authors") or []: author_obj = await _upsert_author(sess, a) author_map[a["id"]] = author_obj if gp.get("primary_author"): pa = gp["primary_author"] author_obj = await _upsert_author(sess, pa) author_map[pa["id"]] = author_obj for t in gp.get("tags") or []: tag_obj = await _upsert_tag(sess, t) tag_map[t["id"]] = tag_obj if gp.get("primary_tag"): pt = gp["primary_tag"] tag_obj = await _upsert_tag(sess, pt) tag_map[pt["id"]] = tag_obj post, old_status = await _upsert_post(sess, gp, author_map, tag_map) # Publish to federation inline (posts, not pages) if not post.is_page and post.user_id: from shared.services.federation_publish import try_publish from shared.infrastructure.urls import app_url post_url = app_url("blog", f"/{post.slug}/") post_tags = [tag_map[t["id"]] for t in (gp.get("tags") or []) if t["id"] in tag_map] if post.status == "published": activity_type = "Create" if old_status != "published" else "Update" await try_publish( sess, user_id=post.user_id, activity_type=activity_type, object_type="Note", object_data=_build_ap_post_data(post, post_url, post_tags), source_type="Post", source_id=post.id, ) elif old_status == "published" and post.status != "published": await try_publish( sess, user_id=post.user_id, activity_type="Delete", object_type="Tombstone", object_data={ "id": post_url, "formerType": "Note", }, source_type="Post", source_id=post.id, ) async def sync_single_page(sess: AsyncSession, ghost_id: str) -> None: gp = await fetch_single_page_from_ghost(ghost_id) if gp is not None: gp["page"] = True if gp is None: res = await sess.execute(select(Post).where(Post.ghost_id == ghost_id)) obj = res.scalar_one_or_none() if obj is not None and obj.deleted_at is None: obj.deleted_at = utcnow() return author_map: Dict[str, Author] = {} tag_map: Dict[str, Tag] = {} for a in gp.get("authors") or []: author_obj = await _upsert_author(sess, a) author_map[a["id"]] = author_obj if gp.get("primary_author"): pa = gp["primary_author"] author_obj = await _upsert_author(sess, pa) author_map[pa["id"]] = author_obj for t in gp.get("tags") or []: tag_obj = await _upsert_tag(sess, t) tag_map[t["id"]] = tag_obj if gp.get("primary_tag"): pt = gp["primary_tag"] tag_obj = await _upsert_tag(sess, pt) tag_map[pt["id"]] = tag_obj post, old_status = await _upsert_post(sess, gp, author_map, tag_map) # Publish to federation inline (pages) if post.user_id: from shared.services.federation_publish import try_publish from shared.infrastructure.urls import app_url post_url = app_url("blog", f"/{post.slug}/") post_tags = [tag_map[t["id"]] for t in (gp.get("tags") or []) if t["id"] in tag_map] if post.status == "published": activity_type = "Create" if old_status != "published" else "Update" await try_publish( sess, user_id=post.user_id, activity_type=activity_type, object_type="Note", object_data=_build_ap_post_data(post, post_url, post_tags), source_type="Post", source_id=post.id, ) elif old_status == "published" and post.status != "published": await try_publish( sess, user_id=post.user_id, activity_type="Delete", object_type="Tombstone", object_data={ "id": post_url, "formerType": "Note", }, source_type="Post", source_id=post.id, ) async def sync_single_author(sess: AsyncSession, ghost_id: str) -> None: ga = await fetch_single_author_from_ghost(ghost_id) if ga is None: result = await sess.execute(select(Author).where(Author.ghost_id == ghost_id)) author_obj = result.scalar_one_or_none() if author_obj and author_obj.deleted_at is None: author_obj.deleted_at = utcnow() return await _upsert_author(sess, ga) async def sync_single_tag(sess: AsyncSession, ghost_id: str) -> None: gt = await fetch_single_tag_from_ghost(ghost_id) if gt is None: result = await sess.execute(select(Tag).where(Tag.ghost_id == ghost_id)) tag_obj = result.scalar_one_or_none() if tag_obj and tag_obj.deleted_at is None: tag_obj.deleted_at = utcnow() return await _upsert_tag(sess, gt) __all__ = [ # bulk content "sync_all_content_from_ghost", # single fetch "fetch_single_post_from_ghost", "fetch_single_author_from_ghost", "fetch_single_tag_from_ghost", # single sync "sync_single_post", "sync_single_author", "sync_single_tag", ]