Phase 0+1: native post writes, Ghost no longer write-primary
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m50s

- Final sync script with HTML verification + author→user migration
- Make ghost_id nullable on posts/authors/tags, add UUID/timestamp defaults
- Add user profile fields (bio, slug, profile_image, etc.) to User model
- New PostUser M2M table (replaces post_authors for new posts)
- PostWriter service: direct DB CRUD with Lexical rendering, optimistic
  locking, AP federation, tag upsert
- Rewrite create/edit/settings routes to use PostWriter (no Ghost API calls)
- Neuter Ghost webhooks (post/page/author/tag → 204 no-op)
- Disable Ghost startup sync

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-01 12:33:37 +00:00
parent e8bc228c7f
commit 0f9af31ffe
10 changed files with 1203 additions and 211 deletions

View File

@@ -15,6 +15,43 @@ from shared.browser.app.utils.htmx import is_htmx_request
from shared.sx.helpers import sx_response
from shared.utils import host_url
def _post_to_edit_dict(post) -> dict:
"""Convert an ORM Post to a dict matching the shape templates expect.
The templates were written for Ghost Admin API responses, so we mimic
that structure (dot-access on dicts via Jinja) from ORM columns.
"""
d: dict = {}
for col in (
"id", "slug", "title", "html", "plaintext", "lexical", "mobiledoc",
"feature_image", "feature_image_alt", "feature_image_caption",
"excerpt", "custom_excerpt", "visibility", "status", "featured",
"is_page", "email_only", "canonical_url",
"meta_title", "meta_description",
"og_image", "og_title", "og_description",
"twitter_image", "twitter_title", "twitter_description",
"custom_template", "reading_time", "comment_id",
):
d[col] = getattr(post, col, None)
# Timestamps as ISO strings (templates do [:16] slicing)
for ts in ("published_at", "updated_at", "created_at"):
val = getattr(post, ts, None)
d[ts] = val.isoformat() if val else ""
# Tags as list of dicts with .name (for Jinja map(attribute='name'))
if hasattr(post, "tags") and post.tags:
d["tags"] = [{"name": t.name, "slug": t.slug, "id": t.id} for t in post.tags]
else:
d["tags"] = []
# email/newsletter — not available without Ghost, set safe defaults
d["email"] = None
d["newsletter"] = None
return d
def register():
bp = Blueprint("admin", __name__, url_prefix='/admin')
@@ -341,11 +378,18 @@ def register():
@bp.get("/settings/")
@require_post_author
async def settings(slug: str):
from ...blog.ghost.ghost_posts import get_post_for_edit
from models.ghost_content import Post
from sqlalchemy import select as sa_select
from sqlalchemy.orm import selectinload
ghost_id = g.post_data["post"]["ghost_id"]
is_page = bool(g.post_data["post"].get("is_page"))
ghost_post = await get_post_for_edit(ghost_id, is_page=is_page)
post_id = g.post_data["post"]["id"]
post = (await g.s.execute(
sa_select(Post)
.where(Post.id == post_id)
.options(selectinload(Post.tags))
)).scalar_one_or_none()
ghost_post = _post_to_edit_dict(post) if post else {}
save_success = request.args.get("saved") == "1"
from shared.sx.page import get_template_context
@@ -368,12 +412,10 @@ def register():
@bp.post("/settings/")
@require_post_author
async def settings_save(slug: str):
from ...blog.ghost.ghost_posts import update_post_settings
from ...blog.ghost.ghost_sync import sync_single_post, sync_single_page
from services.post_writer import update_post_settings, OptimisticLockError
from shared.browser.app.redis_cacher import invalidate_tag_cache
ghost_id = g.post_data["post"]["ghost_id"]
is_page = bool(g.post_data["post"].get("is_page"))
post_id = g.post_data["post"]["id"]
form = await request.form
updated_at = form.get("updated_at", "")
@@ -406,49 +448,55 @@ def register():
kwargs["featured"] = form.get("featured") == "on"
kwargs["email_only"] = form.get("email_only") == "on"
# Tags — comma-separated string → list of {"name": "..."} dicts
# Tags — comma-separated string → list of names
tags_str = form.get("tags", "").strip()
if tags_str:
kwargs["tags"] = [{"name": t.strip()} for t in tags_str.split(",") if t.strip()]
else:
kwargs["tags"] = []
tag_names = [t.strip() for t in tags_str.split(",") if t.strip()] if tags_str else []
# Update in Ghost
await update_post_settings(
ghost_id=ghost_id,
updated_at=updated_at,
is_page=is_page,
**kwargs,
)
try:
post = await update_post_settings(
g.s,
post_id=post_id,
expected_updated_at=updated_at,
tag_names=tag_names,
**kwargs,
)
except OptimisticLockError:
from urllib.parse import quote
return redirect(
host_url(url_for("blog.post.admin.settings", slug=slug))
+ "?error=" + quote("Someone else edited this post. Please reload and try again.")
)
# Sync to local DB
if is_page:
await sync_single_page(g.s, ghost_id)
else:
await sync_single_post(g.s, ghost_id)
await g.s.flush()
# Clear caches
await invalidate_tag_cache("blog")
await invalidate_tag_cache("post.post_detail")
return redirect(host_url(url_for("blog.post.admin.settings", slug=slug)) + "?saved=1")
# Redirect using the (possibly new) slug
return redirect(host_url(url_for("blog.post.admin.settings", slug=post.slug)) + "?saved=1")
@bp.get("/edit/")
@require_post_author
async def edit(slug: str):
from ...blog.ghost.ghost_posts import get_post_for_edit
from models.ghost_content import Post
from sqlalchemy import select as sa_select
from sqlalchemy.orm import selectinload
from shared.infrastructure.data_client import fetch_data
ghost_id = g.post_data["post"]["ghost_id"]
is_page = bool(g.post_data["post"].get("is_page"))
ghost_post = await get_post_for_edit(ghost_id, is_page=is_page)
post_id = g.post_data["post"]["id"]
post = (await g.s.execute(
sa_select(Post)
.where(Post.id == post_id)
.options(selectinload(Post.tags))
)).scalar_one_or_none()
ghost_post = _post_to_edit_dict(post) if post else {}
save_success = request.args.get("saved") == "1"
save_error = request.args.get("error", "")
# Newsletters live in db_account — fetch via HTTP
raw_newsletters = await fetch_data("account", "newsletters", required=False) or []
# Convert dicts to objects with .name/.ghost_id attributes for template compat
from types import SimpleNamespace
newsletters = [SimpleNamespace(**nl) for nl in raw_newsletters]
@@ -475,20 +523,16 @@ def register():
@require_post_author
async def edit_save(slug: str):
import json
from ...blog.ghost.ghost_posts import update_post
from ...blog.ghost.lexical_validator import validate_lexical
from ...blog.ghost.ghost_sync import sync_single_post, sync_single_page
from services.post_writer import update_post as writer_update, OptimisticLockError
from shared.browser.app.redis_cacher import invalidate_tag_cache
ghost_id = g.post_data["post"]["ghost_id"]
is_page = bool(g.post_data["post"].get("is_page"))
post_id = g.post_data["post"]["id"]
form = await request.form
title = form.get("title", "").strip()
lexical_raw = form.get("lexical", "")
updated_at = form.get("updated_at", "")
status = form.get("status", "draft")
publish_mode = form.get("publish_mode", "web")
newsletter_slug = form.get("newsletter_slug", "").strip() or None
feature_image = form.get("feature_image", "").strip()
custom_excerpt = form.get("custom_excerpt", "").strip()
feature_image_caption = form.get("feature_image_caption", "").strip()
@@ -504,76 +548,51 @@ def register():
if not ok:
return redirect(host_url(url_for("blog.post.admin.edit", slug=slug)) + "?error=" + quote(reason))
# Update in Ghost (content save — no status change yet)
ghost_post = await update_post(
ghost_id=ghost_id,
lexical_json=lexical_raw,
title=title or None,
updated_at=updated_at,
feature_image=feature_image,
custom_excerpt=custom_excerpt,
feature_image_caption=feature_image_caption,
is_page=is_page,
)
# Publish workflow
is_admin = bool((g.get("rights") or {}).get("admin"))
publish_requested_msg = None
# Guard: if already emailed, force publish_mode to "web" to prevent re-send
already_emailed = bool(ghost_post.get("email") and ghost_post["email"].get("status"))
if already_emailed and publish_mode in ("email", "both"):
publish_mode = "web"
# Determine effective status
effective_status: str | None = None
current_status = g.post_data["post"].get("status", "draft")
if status == "published" and ghost_post.get("status") != "published" and not is_admin:
# Non-admin requesting publish: don't send status to Ghost, set local flag
if status == "published" and current_status != "published" and not is_admin:
# Non-admin requesting publish: keep as draft, set local flag
publish_requested_msg = "Publish requested — an admin will review."
elif status and status != ghost_post.get("status"):
# Status is changing — determine email params based on publish_mode
email_kwargs: dict = {}
if status == "published" and publish_mode in ("email", "both") and newsletter_slug:
email_kwargs["newsletter_slug"] = newsletter_slug
email_kwargs["email_segment"] = "all"
if publish_mode == "email":
email_kwargs["email_only"] = True
elif status and status != current_status:
effective_status = status
from ...blog.ghost.ghost_posts import update_post as _up
ghost_post = await _up(
ghost_id=ghost_id,
try:
post = await writer_update(
g.s,
post_id=post_id,
lexical_json=lexical_raw,
title=None,
updated_at=ghost_post["updated_at"],
status=status,
is_page=is_page,
**email_kwargs,
title=title or None,
expected_updated_at=updated_at,
feature_image=feature_image or None,
custom_excerpt=custom_excerpt or None,
feature_image_caption=feature_image_caption or None,
status=effective_status,
)
except OptimisticLockError:
return redirect(
host_url(url_for("blog.post.admin.edit", slug=slug))
+ "?error=" + quote("Someone else edited this post. Please reload and try again.")
)
# Sync to local DB
if is_page:
await sync_single_page(g.s, ghost_id)
else:
await sync_single_post(g.s, ghost_id)
# Handle publish_requested flag
if publish_requested_msg:
post.publish_requested = True
elif status == "published" and is_admin:
post.publish_requested = False
await g.s.flush()
# Handle publish_requested flag on the local post
from models.ghost_content import Post
from sqlalchemy import select as sa_select
local_post = (await g.s.execute(
sa_select(Post).where(Post.ghost_id == ghost_id)
)).scalar_one_or_none()
if local_post:
if publish_requested_msg:
local_post.publish_requested = True
elif status == "published" and is_admin:
local_post.publish_requested = False
await g.s.flush()
# Clear caches
await invalidate_tag_cache("blog")
await invalidate_tag_cache("post.post_detail")
# Redirect to GET to avoid resubmit warning on refresh (PRG pattern)
redirect_url = host_url(url_for("blog.post.admin.edit", slug=slug)) + "?saved=1"
# Redirect to GET (PRG pattern) — use post.slug in case it changed
redirect_url = host_url(url_for("blog.post.admin.edit", slug=post.slug)) + "?saved=1"
if publish_requested_msg:
redirect_url += "&publish_requested=1"
return redirect(redirect_url)