#!/usr/bin/env python3 """Final Ghost → db_blog sync with HTML verification + author→user migration. Run once before cutting over to native writes (Phase 1). Usage: cd blog && python -m scripts.final_ghost_sync Requires GHOST_ADMIN_API_URL, GHOST_ADMIN_API_KEY, DATABASE_URL, and DATABASE_URL_ACCOUNT env vars. """ from __future__ import annotations import asyncio import difflib import logging import os import re import sys import httpx from sqlalchemy import select, func, delete # Ensure project root is importable sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..")) sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "shared")) from shared.db.base import Base # noqa: E402 from shared.db.session import get_session, get_account_session, _engine # noqa: E402 from shared.infrastructure.ghost_admin_token import make_ghost_admin_jwt # noqa: E402 from blog.models.content import Post, Author, Tag, PostUser, PostAuthor # noqa: E402 from shared.models.user import User # noqa: E402 logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) log = logging.getLogger("final_ghost_sync") GHOST_ADMIN_API_URL = os.environ["GHOST_ADMIN_API_URL"] def _auth_header() -> dict[str, str]: return {"Authorization": f"Ghost {make_ghost_admin_jwt()}"} def _slugify(name: str) -> str: s = name.strip().lower() s = re.sub(r"[^\w\s-]", "", s) s = re.sub(r"[\s_]+", "-", s) return s.strip("-") # --------------------------------------------------------------------------- # Ghost API fetch # --------------------------------------------------------------------------- async def _fetch_all(endpoint: str) -> list[dict]: """Fetch all items from a Ghost Admin API endpoint.""" url = ( f"{GHOST_ADMIN_API_URL}/{endpoint}/" "?include=authors,tags&limit=all" "&formats=html,plaintext,mobiledoc,lexical" ) async with httpx.AsyncClient(timeout=60) as client: resp = await client.get(url, headers=_auth_header()) resp.raise_for_status() key = endpoint # "posts" or "pages" return resp.json().get(key, []) async def fetch_all_ghost_content() -> list[dict]: """Fetch all posts + pages from Ghost.""" posts, pages = await asyncio.gather( _fetch_all("posts"), _fetch_all("pages"), ) for p in pages: p["page"] = True return posts + pages # --------------------------------------------------------------------------- # Author → User migration # --------------------------------------------------------------------------- async def migrate_authors_to_users(author_bucket: dict[str, dict]) -> dict[str, int]: """Ensure every Ghost author has a corresponding User in db_account. Returns mapping of ghost_author_id → user_id. """ ghost_id_to_user_id: dict[str, int] = {} async with get_account_session() as sess: async with sess.begin(): for ghost_author_id, ga in author_bucket.items(): email = (ga.get("email") or "").strip().lower() if not email: log.warning( "Author %s (%s) has no email — skipping user creation", ghost_author_id, ga.get("name"), ) continue # Find existing user by email user = (await sess.execute( select(User).where(User.email == email) )).scalar_one_or_none() if user is None: # Auto-create user for this Ghost author user = User( email=email, name=ga.get("name"), slug=ga.get("slug") or _slugify(ga.get("name") or email.split("@")[0]), bio=ga.get("bio"), profile_image=ga.get("profile_image"), cover_image=ga.get("cover_image"), website=ga.get("website"), location=ga.get("location"), facebook=ga.get("facebook"), twitter=ga.get("twitter"), is_admin=True, # Ghost authors are admins ) sess.add(user) await sess.flush() log.info("Created user %d for author %s (%s)", user.id, ga.get("name"), email) else: # Update profile fields from Ghost author (fill in blanks) if not user.slug: user.slug = ga.get("slug") or _slugify(ga.get("name") or email.split("@")[0]) if not user.name and ga.get("name"): user.name = ga["name"] if not user.bio and ga.get("bio"): user.bio = ga["bio"] if not user.profile_image and ga.get("profile_image"): user.profile_image = ga["profile_image"] if not user.cover_image and ga.get("cover_image"): user.cover_image = ga["cover_image"] if not user.website and ga.get("website"): user.website = ga["website"] if not user.location and ga.get("location"): user.location = ga["location"] if not user.facebook and ga.get("facebook"): user.facebook = ga["facebook"] if not user.twitter and ga.get("twitter"): user.twitter = ga["twitter"] await sess.flush() log.info("Updated user %d profile from author %s", user.id, ga.get("name")) ghost_id_to_user_id[ghost_author_id] = user.id return ghost_id_to_user_id async def populate_post_users( data: list[dict], ghost_author_to_user: dict[str, int], ) -> int: """Populate post_users M2M and set user_id on all posts from author mapping. Returns number of post_users rows created. """ rows_created = 0 async with get_session() as sess: async with sess.begin(): for gp in data: ghost_post_id = gp["id"] post = (await sess.execute( select(Post).where(Post.ghost_id == ghost_post_id) )).scalar_one_or_none() if not post: continue # Set primary user_id from primary author pa = gp.get("primary_author") if pa and pa["id"] in ghost_author_to_user: post.user_id = ghost_author_to_user[pa["id"]] # Build post_users from all authors await sess.execute( delete(PostUser).where(PostUser.post_id == post.id) ) seen_user_ids: set[int] = set() for idx, a in enumerate(gp.get("authors") or []): uid = ghost_author_to_user.get(a["id"]) if uid and uid not in seen_user_ids: seen_user_ids.add(uid) sess.add(PostUser(post_id=post.id, user_id=uid, sort_order=idx)) rows_created += 1 await sess.flush() return rows_created # --------------------------------------------------------------------------- # Content sync (reuse ghost_sync upsert logic) # --------------------------------------------------------------------------- async def run_sync() -> dict: """Run full Ghost content sync and author→user migration.""" from bp.blog.ghost.ghost_sync import ( _upsert_author, _upsert_tag, _upsert_post, ) log.info("Fetching all content from Ghost...") data = await fetch_all_ghost_content() log.info("Received %d posts/pages from Ghost", len(data)) # Collect authors and tags author_bucket: dict[str, dict] = {} tag_bucket: dict[str, dict] = {} for p in data: for a in p.get("authors") or []: author_bucket[a["id"]] = a if p.get("primary_author"): author_bucket[p["primary_author"]["id"]] = p["primary_author"] for t in p.get("tags") or []: tag_bucket[t["id"]] = t if p.get("primary_tag"): tag_bucket[p["primary_tag"]["id"]] = p["primary_tag"] # Step 1: Upsert content into db_blog (existing Ghost sync logic) async with get_session() as sess: async with sess.begin(): author_map: dict[str, Author] = {} for ga in author_bucket.values(): a = await _upsert_author(sess, ga) author_map[ga["id"]] = a log.info("Upserted %d authors (legacy table)", len(author_map)) tag_map: dict[str, Tag] = {} for gt in tag_bucket.values(): t = await _upsert_tag(sess, gt) tag_map[gt["id"]] = t log.info("Upserted %d tags", len(tag_map)) for gp in data: await _upsert_post(sess, gp, author_map, tag_map) log.info("Upserted %d posts/pages", len(data)) # Step 2: Migrate authors → users in db_account log.info("") log.info("--- Migrating Ghost authors → Users ---") ghost_author_to_user = await migrate_authors_to_users(author_bucket) log.info("Mapped %d Ghost authors to User records", len(ghost_author_to_user)) # Step 3: Populate post_users M2M and set user_id log.info("") log.info("--- Populating post_users M2M ---") pu_rows = await populate_post_users(data, ghost_author_to_user) log.info("Created %d post_users rows", pu_rows) n_posts = sum(1 for p in data if not p.get("page")) n_pages = sum(1 for p in data if p.get("page")) return { "posts": n_posts, "pages": n_pages, "authors": len(author_bucket), "tags": len(tag_bucket), "users_mapped": len(ghost_author_to_user), "post_users_rows": pu_rows, } # --------------------------------------------------------------------------- # HTML rendering verification # --------------------------------------------------------------------------- def _normalize_html(html: str | None) -> str: if not html: return "" return re.sub(r"\s+", " ", html.strip()) async def verify_html_rendering() -> dict: """Re-render all posts from lexical and compare with stored HTML.""" from bp.blog.ghost.lexical_renderer import render_lexical import json diffs_found = 0 posts_checked = 0 posts_no_lexical = 0 async with get_session() as sess: result = await sess.execute( select(Post).where( Post.deleted_at.is_(None), Post.status == "published", ) ) posts = result.scalars().all() for post in posts: if not post.lexical: posts_no_lexical += 1 continue posts_checked += 1 try: rendered = render_lexical(json.loads(post.lexical)) except Exception as e: log.error( "Render failed for post %d (%s): %s", post.id, post.slug, e, ) diffs_found += 1 continue ghost_html = _normalize_html(post.html) our_html = _normalize_html(rendered) if ghost_html != our_html: diffs_found += 1 diff = difflib.unified_diff( ghost_html.splitlines(keepends=True), our_html.splitlines(keepends=True), fromfile=f"ghost/{post.slug}", tofile=f"rendered/{post.slug}", n=2, ) diff_text = "".join(diff) if len(diff_text) > 2000: diff_text = diff_text[:2000] + "\n... (truncated)" log.warning( "HTML diff for post %d (%s):\n%s", post.id, post.slug, diff_text, ) return { "checked": posts_checked, "no_lexical": posts_no_lexical, "diffs": diffs_found, } # --------------------------------------------------------------------------- # Verification queries # --------------------------------------------------------------------------- async def run_verification() -> dict: async with get_session() as sess: total_posts = await sess.scalar( select(func.count(Post.id)).where(Post.deleted_at.is_(None)) ) null_html = await sess.scalar( select(func.count(Post.id)).where( Post.deleted_at.is_(None), Post.status == "published", Post.html.is_(None), ) ) null_lexical = await sess.scalar( select(func.count(Post.id)).where( Post.deleted_at.is_(None), Post.status == "published", Post.lexical.is_(None), ) ) # Posts with user_id set has_user = await sess.scalar( select(func.count(Post.id)).where( Post.deleted_at.is_(None), Post.user_id.isnot(None), ) ) no_user = await sess.scalar( select(func.count(Post.id)).where( Post.deleted_at.is_(None), Post.user_id.is_(None), ) ) return { "total_posts": total_posts, "published_null_html": null_html, "published_null_lexical": null_lexical, "posts_with_user": has_user, "posts_without_user": no_user, } # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- async def main(): log.info("=" * 60) log.info("Final Ghost Sync — Cutover Preparation") log.info("=" * 60) # Step 1: Full sync + author→user migration log.info("") log.info("--- Step 1: Full sync from Ghost + author→user migration ---") stats = await run_sync() log.info( "Sync complete: %d posts, %d pages, %d authors, %d tags, %d users mapped", stats["posts"], stats["pages"], stats["authors"], stats["tags"], stats["users_mapped"], ) # Step 2: Verification queries log.info("") log.info("--- Step 2: Verification queries ---") vq = await run_verification() log.info("Total non-deleted posts/pages: %d", vq["total_posts"]) log.info("Published with NULL html: %d", vq["published_null_html"]) log.info("Published with NULL lexical: %d", vq["published_null_lexical"]) log.info("Posts with user_id: %d", vq["posts_with_user"]) log.info("Posts WITHOUT user_id: %d", vq["posts_without_user"]) if vq["published_null_html"] > 0: log.warning("WARN: Some published posts have no HTML!") if vq["published_null_lexical"] > 0: log.warning("WARN: Some published posts have no Lexical JSON!") if vq["posts_without_user"] > 0: log.warning("WARN: Some posts have no user_id — authors may lack email!") # Step 3: HTML rendering verification log.info("") log.info("--- Step 3: HTML rendering verification ---") html_stats = await verify_html_rendering() log.info( "Checked %d posts, %d with diffs, %d without lexical", html_stats["checked"], html_stats["diffs"], html_stats["no_lexical"], ) if html_stats["diffs"] > 0: log.warning( "WARN: %d posts have HTML rendering differences — " "review diffs above before cutover", html_stats["diffs"], ) # Summary log.info("") log.info("=" * 60) log.info("SUMMARY") log.info(" Posts synced: %d", stats["posts"]) log.info(" Pages synced: %d", stats["pages"]) log.info(" Authors synced: %d", stats["authors"]) log.info(" Tags synced: %d", stats["tags"]) log.info(" Users mapped: %d", stats["users_mapped"]) log.info(" post_users rows: %d", stats["post_users_rows"]) log.info(" HTML diffs: %d", html_stats["diffs"]) log.info(" Published null HTML: %d", vq["published_null_html"]) log.info(" Published null lex: %d", vq["published_null_lexical"]) log.info(" Posts without user: %d", vq["posts_without_user"]) log.info("=" * 60) if (html_stats["diffs"] == 0 and vq["published_null_html"] == 0 and vq["posts_without_user"] == 0): log.info("All checks passed — safe to proceed with cutover.") else: log.warning("Review warnings above before proceeding.") await _engine.dispose() if __name__ == "__main__": asyncio.run(main())