Files
rose-ash/blog/bp/blog/ghost/ghost_sync.py
giles e65bd41ebe
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m19s
Decouple per-service Alembic migrations and fix cross-DB queries
Each service (blog, market, cart, events, federation, account) now owns
its own database schema with independent Alembic migrations. Removes the
monolithic shared/alembic/ that ran all migrations against a single DB.

- Add per-service alembic.ini, env.py, and 0001_initial.py migrations
- Add shared/db/alembic_env.py helper with table-name filtering
- Fix cross-DB FK in blog/models/snippet.py (users lives in db_account)
- Fix cart_impl.py cross-DB queries: fetch products and market_places
  via internal data endpoints instead of direct SQL joins
- Fix blog ghost_sync to fetch page_configs from cart via data endpoint
- Add products-by-ids and page-config-ensure data endpoints
- Update all entrypoint.sh to create own DB and run own migrations
- Cart now uses db_cart instead of db_market
- Add docker-compose.dev.yml, dev.sh for local development
- CI deploys both rose-ash swarm stack and rose-ash-dev compose stack
- Fix Quart namespace package crash (root_path in factory.py)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 12:07:24 +00:00

597 lines
21 KiB
Python

"""Ghost content sync — blog-owned.
Handles Ghost ↔ blog DB sync for content data only:
posts, pages, authors, tags. All models live in db_blog.
Membership sync (users, labels, newsletters, tiers, subscriptions) is
handled by the account service — see account/services/ghost_membership.py.
"""
from __future__ import annotations
import os
import re
import asyncio
from datetime import datetime
from html import escape as html_escape
from typing import Dict, Any, Optional
import httpx
from sqlalchemy import select, delete
from sqlalchemy.ext.asyncio import AsyncSession
from models.ghost_content import (
Post, Author, Tag, PostAuthor, PostTag
)
from shared.infrastructure.data_client import fetch_data
from shared.infrastructure.ghost_admin_token import make_ghost_admin_jwt
GHOST_ADMIN_API_URL = os.environ["GHOST_ADMIN_API_URL"]
from shared.browser.app.utils import utcnow
def _auth_header() -> dict[str, str]:
return {"Authorization": f"Ghost {make_ghost_admin_jwt()}"}
def _iso(val: str | None) -> datetime | None:
if not val:
return None
return datetime.fromisoformat(val.replace("Z", "+00:00"))
# =====================
# CONTENT UPSERT HELPERS
# =====================
async def _upsert_author(sess: AsyncSession, ga: Dict[str, Any]) -> Author:
res = await sess.execute(select(Author).where(Author.ghost_id == ga["id"]))
obj = res.scalar_one_or_none()
if obj is None:
obj = Author(ghost_id=ga["id"])
sess.add(obj)
obj.deleted_at = None
obj.slug = ga.get("slug") or obj.slug
obj.name = ga.get("name") or obj.name
obj.email = ga.get("email") or obj.email
obj.profile_image = ga.get("profile_image")
obj.cover_image = ga.get("cover_image")
obj.bio = ga.get("bio")
obj.website = ga.get("website")
obj.location = ga.get("location")
obj.facebook = ga.get("facebook")
obj.twitter = ga.get("twitter")
obj.created_at = _iso(ga.get("created_at")) or obj.created_at or utcnow()
obj.updated_at = _iso(ga.get("updated_at")) or utcnow()
await sess.flush()
return obj
async def _upsert_tag(sess: AsyncSession, gt: Dict[str, Any]) -> Tag:
res = await sess.execute(select(Tag).where(Tag.ghost_id == gt["id"]))
obj = res.scalar_one_or_none()
if obj is None:
obj = Tag(ghost_id=gt["id"])
sess.add(obj)
obj.deleted_at = None
obj.slug = gt.get("slug") or obj.slug
obj.name = gt.get("name") or obj.name
obj.description = gt.get("description")
obj.visibility = gt.get("visibility") or obj.visibility
obj.feature_image = gt.get("feature_image")
obj.meta_title = gt.get("meta_title")
obj.meta_description = gt.get("meta_description")
obj.created_at = _iso(gt.get("created_at")) or obj.created_at or utcnow()
obj.updated_at = _iso(gt.get("updated_at")) or utcnow()
await sess.flush()
return obj
def _apply_ghost_fields(obj: Post, gp: Dict[str, Any], author_map: Dict[str, Author], tag_map: Dict[str, Tag]) -> None:
"""Apply Ghost API fields to a Post ORM object."""
obj.deleted_at = None
obj.uuid = gp.get("uuid") or obj.uuid
obj.slug = gp.get("slug") or obj.slug
obj.title = gp.get("title") or obj.title
obj.html = gp.get("html")
obj.plaintext = gp.get("plaintext")
obj.mobiledoc = gp.get("mobiledoc")
obj.lexical = gp.get("lexical")
obj.feature_image = gp.get("feature_image")
obj.feature_image_alt = gp.get("feature_image_alt")
obj.feature_image_caption = gp.get("feature_image_caption")
obj.excerpt = gp.get("excerpt")
obj.custom_excerpt = gp.get("custom_excerpt")
obj.visibility = gp.get("visibility") or obj.visibility
obj.status = gp.get("status") or obj.status
obj.featured = bool(gp.get("featured") or False)
obj.is_page = bool(gp.get("page") or False)
obj.email_only = bool(gp.get("email_only") or False)
obj.canonical_url = gp.get("canonical_url")
obj.meta_title = gp.get("meta_title")
obj.meta_description = gp.get("meta_description")
obj.og_image = gp.get("og_image")
obj.og_title = gp.get("og_title")
obj.og_description = gp.get("og_description")
obj.twitter_image = gp.get("twitter_image")
obj.twitter_title = gp.get("twitter_title")
obj.twitter_description = gp.get("twitter_description")
obj.custom_template = gp.get("custom_template")
obj.reading_time = gp.get("reading_time")
obj.comment_id = gp.get("comment_id")
obj.published_at = _iso(gp.get("published_at"))
obj.updated_at = _iso(gp.get("updated_at")) or obj.updated_at or utcnow()
obj.created_at = _iso(gp.get("created_at")) or obj.created_at or utcnow()
pa = gp.get("primary_author")
obj.primary_author_id = author_map[pa["id"].strip()].id if pa else None
pt = gp.get("primary_tag")
obj.primary_tag_id = tag_map[pt["id"].strip()].id if (pt and pt["id"] in tag_map) else None
async def _resolve_user_id_by_email(email: str) -> Optional[int]:
"""Look up user_id from account service via HTTP (cross-domain safe)."""
from shared.infrastructure.data_client import fetch_data
result = await fetch_data(
"account", "user-by-email",
params={"email": email},
required=False,
)
if result and isinstance(result, dict):
return result.get("user_id")
return None
async def _upsert_post(sess: AsyncSession, gp: Dict[str, Any], author_map: Dict[str, Author], tag_map: Dict[str, Tag]) -> tuple[Post, str | None]:
"""Upsert a post. Returns (post, old_status) where old_status is None for new rows."""
from sqlalchemy.exc import IntegrityError
res = await sess.execute(select(Post).where(Post.ghost_id == gp["id"]))
obj = res.scalar_one_or_none()
old_status = obj.status if obj is not None else None
if obj is not None:
_apply_ghost_fields(obj, gp, author_map, tag_map)
await sess.flush()
else:
obj = Post(ghost_id=gp["id"])
try:
async with sess.begin_nested():
sess.add(obj)
_apply_ghost_fields(obj, gp, author_map, tag_map)
await sess.flush()
except IntegrityError:
res = await sess.execute(select(Post).where(Post.ghost_id == gp["id"]))
obj = res.scalar_one()
_apply_ghost_fields(obj, gp, author_map, tag_map)
await sess.flush()
# Backfill user_id from primary author email via account service
if obj.user_id is None and obj.primary_author_id is not None:
pa_obj = author_map.get(gp.get("primary_author", {}).get("id", ""))
if pa_obj and pa_obj.email:
user_id = await _resolve_user_id_by_email(pa_obj.email)
if user_id:
obj.user_id = user_id
await sess.flush()
# Rebuild post_authors + post_tags with synchronize_session to keep
# identity map consistent and prevent autoflush IntegrityError.
old_autoflush = sess.autoflush
sess.autoflush = False
try:
# Delete + re-add post_authors (dedup for Ghost duplicate authors)
await sess.execute(
delete(PostAuthor).where(PostAuthor.post_id == obj.id),
execution_options={"synchronize_session": "fetch"},
)
seen_authors: set[int] = set()
for idx, a in enumerate(gp.get("authors") or []):
aa = author_map[a["id"]]
if aa.id not in seen_authors:
seen_authors.add(aa.id)
sess.add(PostAuthor(post_id=obj.id, author_id=aa.id, sort_order=idx))
# Delete + re-add post_tags (dedup similarly)
await sess.execute(
delete(PostTag).where(PostTag.post_id == obj.id),
execution_options={"synchronize_session": "fetch"},
)
seen_tags: set[int] = set()
for idx, t in enumerate(gp.get("tags") or []):
tt = tag_map[t["id"]]
if tt.id not in seen_tags:
seen_tags.add(tt.id)
sess.add(PostTag(post_id=obj.id, tag_id=tt.id, sort_order=idx))
await sess.flush()
finally:
sess.autoflush = old_autoflush
# Auto-create PageConfig for pages (lives in db_cart, accessed via internal API)
if obj.is_page:
await fetch_data(
"cart", "page-config-ensure",
params={"container_type": "page", "container_id": obj.id},
required=False,
)
return obj, old_status
def _build_ap_post_data(post, post_url: str, tag_objs: list) -> dict:
"""Build rich AP object_data for a blog post/page."""
parts: list[str] = []
if post.title:
parts.append(f"<p><strong>{html_escape(post.title)}</strong></p>")
body = post.plaintext or post.custom_excerpt or post.excerpt or ""
if body:
for para in body.split("\n\n"):
para = para.strip()
if para:
parts.append(f"<p>{html_escape(para)}</p>")
parts.append(f'<p><a href="{html_escape(post_url)}">Read more \u2192</a></p>')
if tag_objs:
ht_links = []
for t in tag_objs:
clean = t.slug.replace("-", "")
ht_links.append(
f'<a href="{html_escape(post_url)}tag/{t.slug}/" rel="tag">#{clean}</a>'
)
parts.append(f'<p>{" ".join(ht_links)}</p>')
obj: dict = {
"name": post.title or "",
"content": "\n".join(parts),
"url": post_url,
}
attachments: list[dict] = []
seen: set[str] = set()
if post.feature_image:
att: dict = {"type": "Image", "url": post.feature_image}
if post.feature_image_alt:
att["name"] = post.feature_image_alt
attachments.append(att)
seen.add(post.feature_image)
if post.html:
for src in re.findall(r'<img[^>]+src="([^"]+)"', post.html):
if src not in seen and len(attachments) < 4:
attachments.append({"type": "Image", "url": src})
seen.add(src)
if attachments:
obj["attachment"] = attachments
if tag_objs:
obj["tag"] = [
{
"type": "Hashtag",
"href": f"{post_url}tag/{t.slug}/",
"name": f"#{t.slug.replace('-', '')}",
}
for t in tag_objs
]
return obj
# =====================================================
# Ghost API fetch helpers
# =====================================================
async def _fetch_all_from_ghost(endpoint: str) -> list[dict[str, Any]]:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(
f"{GHOST_ADMIN_API_URL}/{endpoint}/?include=authors,tags&limit=all&formats=html,plaintext,mobiledoc,lexical",
headers=_auth_header(),
)
resp.raise_for_status()
key = "posts" if endpoint == "posts" else "pages"
return resp.json().get(key, [])
async def fetch_all_posts_and_pages_from_ghost() -> list[dict[str, Any]]:
posts, pages = await asyncio.gather(
_fetch_all_from_ghost("posts"),
_fetch_all_from_ghost("pages"),
)
for p in pages:
p["page"] = True
return posts + pages
async def sync_all_content_from_ghost(sess: AsyncSession) -> None:
"""Bulk sync all Ghost content (posts + pages) into db_blog."""
data = await fetch_all_posts_and_pages_from_ghost()
author_bucket: Dict[str, dict[str, Any]] = {}
tag_bucket: Dict[str, dict[str, Any]] = {}
for p in data:
for a in p.get("authors") or []:
author_bucket[a["id"]] = a
if p.get("primary_author"):
author_bucket[p["primary_author"]["id"]] = p["primary_author"]
for t in p.get("tags") or []:
tag_bucket[t["id"]] = t
if p.get("primary_tag"):
tag_bucket[p["primary_tag"]["id"]] = p["primary_tag"]
seen_post_ids = {p["id"] for p in data}
seen_author_ids = set(author_bucket.keys())
seen_tag_ids = set(tag_bucket.keys())
author_map: Dict[str, Author] = {}
for ga in author_bucket.values():
a = await _upsert_author(sess, ga)
author_map[ga["id"]] = a
tag_map: Dict[str, Tag] = {}
for gt in tag_bucket.values():
t = await _upsert_tag(sess, gt)
tag_map[gt["id"]] = t
for gp in data:
await _upsert_post(sess, gp, author_map, tag_map)
# soft-delete anything that no longer exists in Ghost
now = utcnow()
db_authors = await sess.execute(select(Author))
for local_author in db_authors.scalars():
if local_author.ghost_id not in seen_author_ids:
if local_author.deleted_at is None:
local_author.deleted_at = now
db_tags = await sess.execute(select(Tag))
for local_tag in db_tags.scalars():
if local_tag.ghost_id not in seen_tag_ids:
if local_tag.deleted_at is None:
local_tag.deleted_at = now
db_posts = await sess.execute(select(Post))
for local_post in db_posts.scalars():
if local_post.ghost_id not in seen_post_ids:
if local_post.deleted_at is None:
local_post.deleted_at = now
# =====================================================
# Single-item content helpers (posts/authors/tags)
# =====================================================
async def fetch_single_post_from_ghost(ghost_id: str) -> Optional[dict[str, Any]]:
url = (
f"{GHOST_ADMIN_API_URL}/posts/{ghost_id}/"
"?include=authors,tags&formats=html,plaintext,mobiledoc,lexical"
)
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url, headers=_auth_header())
if resp.status_code == 404:
return None
resp.raise_for_status()
data = resp.json()
posts = data.get("posts") or []
return posts[0] if posts else None
async def fetch_single_page_from_ghost(ghost_id: str) -> Optional[dict[str, Any]]:
url = (
f"{GHOST_ADMIN_API_URL}/pages/{ghost_id}/"
"?include=authors,tags&formats=html,plaintext,mobiledoc,lexical"
)
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url, headers=_auth_header())
if resp.status_code == 404:
return None
resp.raise_for_status()
data = resp.json()
pages = data.get("pages") or []
return pages[0] if pages else None
async def fetch_single_author_from_ghost(ghost_id: str) -> Optional[dict[str, Any]]:
url = f"{GHOST_ADMIN_API_URL}/users/{ghost_id}/"
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url, headers=_auth_header())
if resp.status_code == 404:
return None
resp.raise_for_status()
data = resp.json()
users = data.get("users") or []
return users[0] if users else None
async def fetch_single_tag_from_ghost(ghost_id: str) -> Optional[dict[str, Any]]:
url = f"{GHOST_ADMIN_API_URL}/tags/{ghost_id}/"
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url, headers=_auth_header())
if resp.status_code == 404:
return None
resp.raise_for_status()
data = resp.json()
tags = data.get("tags") or []
return tags[0] if tags else None
async def sync_single_post(sess: AsyncSession, ghost_id: str) -> None:
gp = await fetch_single_post_from_ghost(ghost_id)
if gp is None:
res = await sess.execute(select(Post).where(Post.ghost_id == ghost_id))
obj = res.scalar_one_or_none()
if obj is not None and obj.deleted_at is None:
obj.deleted_at = utcnow()
return
author_map: Dict[str, Author] = {}
tag_map: Dict[str, Tag] = {}
for a in gp.get("authors") or []:
author_obj = await _upsert_author(sess, a)
author_map[a["id"]] = author_obj
if gp.get("primary_author"):
pa = gp["primary_author"]
author_obj = await _upsert_author(sess, pa)
author_map[pa["id"]] = author_obj
for t in gp.get("tags") or []:
tag_obj = await _upsert_tag(sess, t)
tag_map[t["id"]] = tag_obj
if gp.get("primary_tag"):
pt = gp["primary_tag"]
tag_obj = await _upsert_tag(sess, pt)
tag_map[pt["id"]] = tag_obj
post, old_status = await _upsert_post(sess, gp, author_map, tag_map)
# Publish to federation inline (posts, not pages)
if not post.is_page and post.user_id:
from shared.services.federation_publish import try_publish
from shared.infrastructure.urls import app_url
post_url = app_url("blog", f"/{post.slug}/")
post_tags = [tag_map[t["id"]] for t in (gp.get("tags") or []) if t["id"] in tag_map]
if post.status == "published":
activity_type = "Create" if old_status != "published" else "Update"
await try_publish(
sess,
user_id=post.user_id,
activity_type=activity_type,
object_type="Note",
object_data=_build_ap_post_data(post, post_url, post_tags),
source_type="Post",
source_id=post.id,
)
elif old_status == "published" and post.status != "published":
await try_publish(
sess,
user_id=post.user_id,
activity_type="Delete",
object_type="Tombstone",
object_data={
"id": post_url,
"formerType": "Note",
},
source_type="Post",
source_id=post.id,
)
async def sync_single_page(sess: AsyncSession, ghost_id: str) -> None:
gp = await fetch_single_page_from_ghost(ghost_id)
if gp is not None:
gp["page"] = True
if gp is None:
res = await sess.execute(select(Post).where(Post.ghost_id == ghost_id))
obj = res.scalar_one_or_none()
if obj is not None and obj.deleted_at is None:
obj.deleted_at = utcnow()
return
author_map: Dict[str, Author] = {}
tag_map: Dict[str, Tag] = {}
for a in gp.get("authors") or []:
author_obj = await _upsert_author(sess, a)
author_map[a["id"]] = author_obj
if gp.get("primary_author"):
pa = gp["primary_author"]
author_obj = await _upsert_author(sess, pa)
author_map[pa["id"]] = author_obj
for t in gp.get("tags") or []:
tag_obj = await _upsert_tag(sess, t)
tag_map[t["id"]] = tag_obj
if gp.get("primary_tag"):
pt = gp["primary_tag"]
tag_obj = await _upsert_tag(sess, pt)
tag_map[pt["id"]] = tag_obj
post, old_status = await _upsert_post(sess, gp, author_map, tag_map)
# Publish to federation inline (pages)
if post.user_id:
from shared.services.federation_publish import try_publish
from shared.infrastructure.urls import app_url
post_url = app_url("blog", f"/{post.slug}/")
post_tags = [tag_map[t["id"]] for t in (gp.get("tags") or []) if t["id"] in tag_map]
if post.status == "published":
activity_type = "Create" if old_status != "published" else "Update"
await try_publish(
sess,
user_id=post.user_id,
activity_type=activity_type,
object_type="Note",
object_data=_build_ap_post_data(post, post_url, post_tags),
source_type="Post",
source_id=post.id,
)
elif old_status == "published" and post.status != "published":
await try_publish(
sess,
user_id=post.user_id,
activity_type="Delete",
object_type="Tombstone",
object_data={
"id": post_url,
"formerType": "Note",
},
source_type="Post",
source_id=post.id,
)
async def sync_single_author(sess: AsyncSession, ghost_id: str) -> None:
ga = await fetch_single_author_from_ghost(ghost_id)
if ga is None:
result = await sess.execute(select(Author).where(Author.ghost_id == ghost_id))
author_obj = result.scalar_one_or_none()
if author_obj and author_obj.deleted_at is None:
author_obj.deleted_at = utcnow()
return
await _upsert_author(sess, ga)
async def sync_single_tag(sess: AsyncSession, ghost_id: str) -> None:
gt = await fetch_single_tag_from_ghost(ghost_id)
if gt is None:
result = await sess.execute(select(Tag).where(Tag.ghost_id == ghost_id))
tag_obj = result.scalar_one_or_none()
if tag_obj and tag_obj.deleted_at is None:
tag_obj.deleted_at = utcnow()
return
await _upsert_tag(sess, gt)
__all__ = [
# bulk content
"sync_all_content_from_ghost",
# single fetch
"fetch_single_post_from_ghost",
"fetch_single_author_from_ghost",
"fetch_single_tag_from_ghost",
# single sync
"sync_single_post",
"sync_single_author",
"sync_single_tag",
]