From 0f9af31ffe7822791e81b2b68c9fde16540b9e25 Mon Sep 17 00:00:00 2001 From: giles Date: Sun, 1 Mar 2026 12:33:37 +0000 Subject: [PATCH] Phase 0+1: native post writes, Ghost no longer write-primary MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Final sync script with HTML verification + author→user migration - Make ghost_id nullable on posts/authors/tags, add UUID/timestamp defaults - Add user profile fields (bio, slug, profile_image, etc.) to User model - New PostUser M2M table (replaces post_authors for new posts) - PostWriter service: direct DB CRUD with Lexical rendering, optimistic locking, AP federation, tag upsert - Rewrite create/edit/settings routes to use PostWriter (no Ghost API calls) - Neuter Ghost webhooks (post/page/author/tag → 204 no-op) - Disable Ghost startup sync Co-Authored-By: Claude Opus 4.6 --- .../versions/0003_add_user_profile_fields.py | 43 ++ blog/alembic/env.py | 1 + .../0004_ghost_id_nullable_and_defaults.py | 67 +++ blog/bp/blog/routes.py | 81 +-- blog/bp/blog/web_hooks/routes.py | 58 +-- blog/bp/post/admin/routes.py | 201 ++++---- blog/scripts/final_ghost_sync.py | 469 ++++++++++++++++++ blog/services/post_writer.py | 457 +++++++++++++++++ shared/models/ghost_content.py | 26 +- shared/models/user.py | 11 + 10 files changed, 1203 insertions(+), 211 deletions(-) create mode 100644 account/alembic/versions/0003_add_user_profile_fields.py create mode 100644 blog/alembic/versions/0004_ghost_id_nullable_and_defaults.py create mode 100644 blog/scripts/final_ghost_sync.py create mode 100644 blog/services/post_writer.py diff --git a/account/alembic/versions/0003_add_user_profile_fields.py b/account/alembic/versions/0003_add_user_profile_fields.py new file mode 100644 index 0000000..36c88d8 --- /dev/null +++ b/account/alembic/versions/0003_add_user_profile_fields.py @@ -0,0 +1,43 @@ +"""Add author profile fields to users table. + +Merges Ghost Author profile data into User — bio, profile_image, cover_image, +website, location, facebook, twitter, slug, is_admin. + +Revision ID: 0003 +Revises: 0002_hash_oauth_tokens +""" +from alembic import op +import sqlalchemy as sa + +revision = "0003" +down_revision = "0002_hash_oauth_tokens" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column("users", sa.Column("slug", sa.String(191), nullable=True)) + op.add_column("users", sa.Column("bio", sa.Text(), nullable=True)) + op.add_column("users", sa.Column("profile_image", sa.Text(), nullable=True)) + op.add_column("users", sa.Column("cover_image", sa.Text(), nullable=True)) + op.add_column("users", sa.Column("website", sa.Text(), nullable=True)) + op.add_column("users", sa.Column("location", sa.Text(), nullable=True)) + op.add_column("users", sa.Column("facebook", sa.Text(), nullable=True)) + op.add_column("users", sa.Column("twitter", sa.Text(), nullable=True)) + op.add_column("users", sa.Column( + "is_admin", sa.Boolean(), nullable=False, server_default=sa.text("false"), + )) + op.create_index("ix_users_slug", "users", ["slug"], unique=True) + + +def downgrade(): + op.drop_index("ix_users_slug") + op.drop_column("users", "is_admin") + op.drop_column("users", "twitter") + op.drop_column("users", "facebook") + op.drop_column("users", "location") + op.drop_column("users", "website") + op.drop_column("users", "cover_image") + op.drop_column("users", "profile_image") + op.drop_column("users", "bio") + op.drop_column("users", "slug") diff --git a/blog/alembic/env.py b/blog/alembic/env.py index bcdac6c..1c280e3 100644 --- a/blog/alembic/env.py +++ b/blog/alembic/env.py @@ -13,6 +13,7 @@ MODELS = [ TABLES = frozenset({ "posts", "authors", "post_authors", "tags", "post_tags", + "post_users", "snippets", "tag_groups", "tag_group_tags", "menu_items", "menu_nodes", "kv", "page_configs", diff --git a/blog/alembic/versions/0004_ghost_id_nullable_and_defaults.py b/blog/alembic/versions/0004_ghost_id_nullable_and_defaults.py new file mode 100644 index 0000000..bc268cf --- /dev/null +++ b/blog/alembic/versions/0004_ghost_id_nullable_and_defaults.py @@ -0,0 +1,67 @@ +"""Make ghost_id nullable, add defaults, create post_users M2M table. + +Revision ID: 0004 +Revises: 0003_add_page_configs +""" +from alembic import op +import sqlalchemy as sa + +revision = "0004" +down_revision = "0003_add_page_configs" +branch_labels = None +depends_on = None + + +def upgrade(): + # Make ghost_id nullable + op.alter_column("posts", "ghost_id", existing_type=sa.String(64), nullable=True) + op.alter_column("authors", "ghost_id", existing_type=sa.String(64), nullable=True) + op.alter_column("tags", "ghost_id", existing_type=sa.String(64), nullable=True) + + # Add server defaults for Post + op.alter_column( + "posts", "uuid", + existing_type=sa.String(64), + server_default=sa.text("gen_random_uuid()"), + ) + op.alter_column( + "posts", "updated_at", + existing_type=sa.DateTime(timezone=True), + server_default=sa.text("now()"), + ) + op.alter_column( + "posts", "created_at", + existing_type=sa.DateTime(timezone=True), + server_default=sa.text("now()"), + ) + + # Create post_users M2M table (replaces post_authors for new posts) + op.create_table( + "post_users", + sa.Column("post_id", sa.Integer, sa.ForeignKey("posts.id", ondelete="CASCADE"), primary_key=True), + sa.Column("user_id", sa.Integer, primary_key=True), + sa.Column("sort_order", sa.Integer, nullable=False, server_default="0"), + ) + op.create_index("ix_post_users_user_id", "post_users", ["user_id"]) + + # Backfill post_users from post_authors for posts that already have user_id. + # This maps each post's authors to the post's user_id (primary author). + # Multi-author mapping requires the full sync script. + op.execute(""" + INSERT INTO post_users (post_id, user_id, sort_order) + SELECT p.id, p.user_id, 0 + FROM posts p + WHERE p.user_id IS NOT NULL + AND p.deleted_at IS NULL + ON CONFLICT DO NOTHING + """) + + +def downgrade(): + op.drop_table("post_users") + op.alter_column("posts", "created_at", existing_type=sa.DateTime(timezone=True), server_default=None) + op.alter_column("posts", "updated_at", existing_type=sa.DateTime(timezone=True), server_default=None) + op.alter_column("posts", "uuid", existing_type=sa.String(64), server_default=None) + op.alter_column("tags", "ghost_id", existing_type=sa.String(64), nullable=False) + op.alter_column("authors", "ghost_id", existing_type=sa.String(64), nullable=False) + op.alter_column("posts", "ghost_id", existing_type=sa.String(64), nullable=False) diff --git a/blog/bp/blog/routes.py b/blog/bp/blog/routes.py index e860450..32f6424 100644 --- a/blog/bp/blog/routes.py +++ b/blog/bp/blog/routes.py @@ -14,7 +14,6 @@ from quart import ( url_for, ) from .ghost_db import DBClient # adjust import path -from shared.db.session import get_session from .filters.qs import makeqs_factory, decode from .services.posts_data import posts_data from .services.pages_data import pages_data @@ -47,33 +46,9 @@ def register(url_prefix, title): @blogs_bp.before_app_serving async def init(): - from .ghost.ghost_sync import sync_all_content_from_ghost - from sqlalchemy import text - import logging - logger = logging.getLogger(__name__) - - # Advisory lock prevents multiple Hypercorn workers from - # running the sync concurrently (which causes PK conflicts). - async with get_session() as s: - got_lock = await s.scalar(text("SELECT pg_try_advisory_lock(900001)")) - if not got_lock: - await s.rollback() # clean up before returning connection to pool - return - try: - await sync_all_content_from_ghost(s) - await s.commit() - except Exception: - logger.exception("Ghost sync failed — will retry on next deploy") - try: - await s.rollback() - except Exception: - pass - finally: - try: - await s.execute(text("SELECT pg_advisory_unlock(900001)")) - await s.commit() - except Exception: - pass # lock auto-releases when session closes + # Ghost startup sync disabled (Phase 1) — blog service owns content + # directly. The final_ghost_sync.py script was run before cutover. + pass @blogs_bp.before_request def route(): @@ -258,9 +233,8 @@ def register(url_prefix, title): @blogs_bp.post("/new/") @require_admin async def new_post_save(): - from .ghost.ghost_posts import create_post from .ghost.lexical_validator import validate_lexical - from .ghost.ghost_sync import sync_single_post + from services.post_writer import create_post as writer_create form = await request.form title = form.get("title", "").strip() or "Untitled" @@ -290,35 +264,24 @@ def register(url_prefix, title): html = await render_new_post_page(tctx) return await make_response(html, 400) - # Create in Ghost - ghost_post = await create_post( + # Create directly in db_blog + post = await writer_create( + g.s, title=title, lexical_json=lexical_raw, status=status, + user_id=g.user.id, feature_image=feature_image or None, custom_excerpt=custom_excerpt or None, feature_image_caption=feature_image_caption or None, ) - - # Sync to local DB - await sync_single_post(g.s, ghost_post["id"]) await g.s.flush() - # Set user_id on the newly created post - from models.ghost_content import Post - from sqlalchemy import select - local_post = (await g.s.execute( - select(Post).where(Post.ghost_id == ghost_post["id"]) - )).scalar_one_or_none() - if local_post and local_post.user_id is None: - local_post.user_id = g.user.id - await g.s.flush() - # Clear blog listing cache await invalidate_tag_cache("blog") - # Redirect to the edit page (post is likely a draft, so public detail would 404) - return redirect(host_url(url_for("blog.post.admin.edit", slug=ghost_post["slug"]))) + # Redirect to the edit page + return redirect(host_url(url_for("blog.post.admin.edit", slug=post.slug))) @blogs_bp.get("/new-page/") @@ -340,9 +303,8 @@ def register(url_prefix, title): @blogs_bp.post("/new-page/") @require_admin async def new_page_save(): - from .ghost.ghost_posts import create_page from .ghost.lexical_validator import validate_lexical - from .ghost.ghost_sync import sync_single_page + from services.post_writer import create_page as writer_create_page form = await request.form title = form.get("title", "").strip() or "Untitled" @@ -374,35 +336,24 @@ def register(url_prefix, title): html = await render_new_post_page(tctx) return await make_response(html, 400) - # Create in Ghost (as page) - ghost_page = await create_page( + # Create directly in db_blog + page = await writer_create_page( + g.s, title=title, lexical_json=lexical_raw, status=status, + user_id=g.user.id, feature_image=feature_image or None, custom_excerpt=custom_excerpt or None, feature_image_caption=feature_image_caption or None, ) - - # Sync to local DB (uses pages endpoint) - await sync_single_page(g.s, ghost_page["id"]) await g.s.flush() - # Set user_id on the newly created page - from models.ghost_content import Post - from sqlalchemy import select - local_post = (await g.s.execute( - select(Post).where(Post.ghost_id == ghost_page["id"]) - )).scalar_one_or_none() - if local_post and local_post.user_id is None: - local_post.user_id = g.user.id - await g.s.flush() - # Clear blog listing cache await invalidate_tag_cache("blog") # Redirect to the page admin - return redirect(host_url(url_for("blog.post.admin.edit", slug=ghost_page["slug"]))) + return redirect(host_url(url_for("blog.post.admin.edit", slug=page.slug))) @blogs_bp.get("/drafts/") diff --git a/blog/bp/blog/web_hooks/routes.py b/blog/bp/blog/web_hooks/routes.py index 20a680b..3e07e76 100644 --- a/blog/bp/blog/web_hooks/routes.py +++ b/blog/bp/blog/web_hooks/routes.py @@ -1,15 +1,11 @@ -# suma_browser/webhooks.py +# Ghost webhooks — neutered (Phase 1). +# +# Post/page/author/tag handlers return 204 no-op. +# Member webhook remains active (membership sync handled by account service). from __future__ import annotations import os -from quart import Blueprint, request, abort, Response, g +from quart import Blueprint, request, abort, Response -from ..ghost.ghost_sync import ( - sync_single_page, - sync_single_post, - sync_single_author, - sync_single_tag, -) -from shared.browser.app.redis_cacher import clear_cache from shared.browser.app.csrf import csrf_exempt ghost_webhooks = Blueprint("ghost_webhooks", __name__, url_prefix="/__ghost-webhook") @@ -32,6 +28,7 @@ def _extract_id(data: dict, key: str) -> str | None: @csrf_exempt @ghost_webhooks.route("/member/", methods=["POST"]) async def webhook_member() -> Response: + """Member webhook still active — delegates to account service.""" _check_secret(request) data = await request.get_json(force=True, silent=True) or {} @@ -39,7 +36,6 @@ async def webhook_member() -> Response: if not ghost_id: abort(400, "no member id") - # Delegate to account service (membership data lives in db_account) from shared.infrastructure.actions import call_action try: await call_action( @@ -52,61 +48,25 @@ async def webhook_member() -> Response: logging.getLogger(__name__).error("Member sync via account failed: %s", e) return Response(status=204) + +# --- Neutered handlers: Ghost no longer writes content --- + @csrf_exempt @ghost_webhooks.post("/post/") -@clear_cache(tag='blog') async def webhook_post() -> Response: - _check_secret(request) - - data = await request.get_json(force=True, silent=True) or {} - ghost_id = _extract_id(data, "post") - if not ghost_id: - abort(400, "no post id") - - await sync_single_post(g.s, ghost_id) - return Response(status=204) @csrf_exempt @ghost_webhooks.post("/page/") -@clear_cache(tag='blog') async def webhook_page() -> Response: - _check_secret(request) - - data = await request.get_json(force=True, silent=True) or {} - ghost_id = _extract_id(data, "page") - if not ghost_id: - abort(400, "no page id") - - await sync_single_page(g.s, ghost_id) - return Response(status=204) @csrf_exempt @ghost_webhooks.post("/author/") -@clear_cache(tag='blog') async def webhook_author() -> Response: - _check_secret(request) - - data = await request.get_json(force=True, silent=True) or {} - ghost_id = _extract_id(data, "user") or _extract_id(data, "author") - if not ghost_id: - abort(400, "no author id") - - await sync_single_author(g.s, ghost_id) - return Response(status=204) @csrf_exempt @ghost_webhooks.post("/tag/") -@clear_cache(tag='blog') async def webhook_tag() -> Response: - _check_secret(request) - - data = await request.get_json(force=True, silent=True) or {} - ghost_id = _extract_id(data, "tag") - if not ghost_id: - abort(400, "no tag id") - - await sync_single_tag(g.s, ghost_id) return Response(status=204) diff --git a/blog/bp/post/admin/routes.py b/blog/bp/post/admin/routes.py index 0ff50fe..c8f7d22 100644 --- a/blog/bp/post/admin/routes.py +++ b/blog/bp/post/admin/routes.py @@ -15,6 +15,43 @@ from shared.browser.app.utils.htmx import is_htmx_request from shared.sx.helpers import sx_response from shared.utils import host_url +def _post_to_edit_dict(post) -> dict: + """Convert an ORM Post to a dict matching the shape templates expect. + + The templates were written for Ghost Admin API responses, so we mimic + that structure (dot-access on dicts via Jinja) from ORM columns. + """ + d: dict = {} + for col in ( + "id", "slug", "title", "html", "plaintext", "lexical", "mobiledoc", + "feature_image", "feature_image_alt", "feature_image_caption", + "excerpt", "custom_excerpt", "visibility", "status", "featured", + "is_page", "email_only", "canonical_url", + "meta_title", "meta_description", + "og_image", "og_title", "og_description", + "twitter_image", "twitter_title", "twitter_description", + "custom_template", "reading_time", "comment_id", + ): + d[col] = getattr(post, col, None) + + # Timestamps as ISO strings (templates do [:16] slicing) + for ts in ("published_at", "updated_at", "created_at"): + val = getattr(post, ts, None) + d[ts] = val.isoformat() if val else "" + + # Tags as list of dicts with .name (for Jinja map(attribute='name')) + if hasattr(post, "tags") and post.tags: + d["tags"] = [{"name": t.name, "slug": t.slug, "id": t.id} for t in post.tags] + else: + d["tags"] = [] + + # email/newsletter — not available without Ghost, set safe defaults + d["email"] = None + d["newsletter"] = None + + return d + + def register(): bp = Blueprint("admin", __name__, url_prefix='/admin') @@ -341,11 +378,18 @@ def register(): @bp.get("/settings/") @require_post_author async def settings(slug: str): - from ...blog.ghost.ghost_posts import get_post_for_edit + from models.ghost_content import Post + from sqlalchemy import select as sa_select + from sqlalchemy.orm import selectinload - ghost_id = g.post_data["post"]["ghost_id"] - is_page = bool(g.post_data["post"].get("is_page")) - ghost_post = await get_post_for_edit(ghost_id, is_page=is_page) + post_id = g.post_data["post"]["id"] + post = (await g.s.execute( + sa_select(Post) + .where(Post.id == post_id) + .options(selectinload(Post.tags)) + )).scalar_one_or_none() + + ghost_post = _post_to_edit_dict(post) if post else {} save_success = request.args.get("saved") == "1" from shared.sx.page import get_template_context @@ -368,12 +412,10 @@ def register(): @bp.post("/settings/") @require_post_author async def settings_save(slug: str): - from ...blog.ghost.ghost_posts import update_post_settings - from ...blog.ghost.ghost_sync import sync_single_post, sync_single_page + from services.post_writer import update_post_settings, OptimisticLockError from shared.browser.app.redis_cacher import invalidate_tag_cache - ghost_id = g.post_data["post"]["ghost_id"] - is_page = bool(g.post_data["post"].get("is_page")) + post_id = g.post_data["post"]["id"] form = await request.form updated_at = form.get("updated_at", "") @@ -406,49 +448,55 @@ def register(): kwargs["featured"] = form.get("featured") == "on" kwargs["email_only"] = form.get("email_only") == "on" - # Tags — comma-separated string → list of {"name": "..."} dicts + # Tags — comma-separated string → list of names tags_str = form.get("tags", "").strip() - if tags_str: - kwargs["tags"] = [{"name": t.strip()} for t in tags_str.split(",") if t.strip()] - else: - kwargs["tags"] = [] + tag_names = [t.strip() for t in tags_str.split(",") if t.strip()] if tags_str else [] - # Update in Ghost - await update_post_settings( - ghost_id=ghost_id, - updated_at=updated_at, - is_page=is_page, - **kwargs, - ) + try: + post = await update_post_settings( + g.s, + post_id=post_id, + expected_updated_at=updated_at, + tag_names=tag_names, + **kwargs, + ) + except OptimisticLockError: + from urllib.parse import quote + return redirect( + host_url(url_for("blog.post.admin.settings", slug=slug)) + + "?error=" + quote("Someone else edited this post. Please reload and try again.") + ) - # Sync to local DB - if is_page: - await sync_single_page(g.s, ghost_id) - else: - await sync_single_post(g.s, ghost_id) await g.s.flush() # Clear caches await invalidate_tag_cache("blog") await invalidate_tag_cache("post.post_detail") - return redirect(host_url(url_for("blog.post.admin.settings", slug=slug)) + "?saved=1") + # Redirect using the (possibly new) slug + return redirect(host_url(url_for("blog.post.admin.settings", slug=post.slug)) + "?saved=1") @bp.get("/edit/") @require_post_author async def edit(slug: str): - from ...blog.ghost.ghost_posts import get_post_for_edit + from models.ghost_content import Post + from sqlalchemy import select as sa_select + from sqlalchemy.orm import selectinload from shared.infrastructure.data_client import fetch_data - ghost_id = g.post_data["post"]["ghost_id"] - is_page = bool(g.post_data["post"].get("is_page")) - ghost_post = await get_post_for_edit(ghost_id, is_page=is_page) + post_id = g.post_data["post"]["id"] + post = (await g.s.execute( + sa_select(Post) + .where(Post.id == post_id) + .options(selectinload(Post.tags)) + )).scalar_one_or_none() + + ghost_post = _post_to_edit_dict(post) if post else {} save_success = request.args.get("saved") == "1" save_error = request.args.get("error", "") # Newsletters live in db_account — fetch via HTTP raw_newsletters = await fetch_data("account", "newsletters", required=False) or [] - # Convert dicts to objects with .name/.ghost_id attributes for template compat from types import SimpleNamespace newsletters = [SimpleNamespace(**nl) for nl in raw_newsletters] @@ -475,20 +523,16 @@ def register(): @require_post_author async def edit_save(slug: str): import json - from ...blog.ghost.ghost_posts import update_post from ...blog.ghost.lexical_validator import validate_lexical - from ...blog.ghost.ghost_sync import sync_single_post, sync_single_page + from services.post_writer import update_post as writer_update, OptimisticLockError from shared.browser.app.redis_cacher import invalidate_tag_cache - ghost_id = g.post_data["post"]["ghost_id"] - is_page = bool(g.post_data["post"].get("is_page")) + post_id = g.post_data["post"]["id"] form = await request.form title = form.get("title", "").strip() lexical_raw = form.get("lexical", "") updated_at = form.get("updated_at", "") status = form.get("status", "draft") - publish_mode = form.get("publish_mode", "web") - newsletter_slug = form.get("newsletter_slug", "").strip() or None feature_image = form.get("feature_image", "").strip() custom_excerpt = form.get("custom_excerpt", "").strip() feature_image_caption = form.get("feature_image_caption", "").strip() @@ -504,76 +548,51 @@ def register(): if not ok: return redirect(host_url(url_for("blog.post.admin.edit", slug=slug)) + "?error=" + quote(reason)) - # Update in Ghost (content save — no status change yet) - ghost_post = await update_post( - ghost_id=ghost_id, - lexical_json=lexical_raw, - title=title or None, - updated_at=updated_at, - feature_image=feature_image, - custom_excerpt=custom_excerpt, - feature_image_caption=feature_image_caption, - is_page=is_page, - ) - # Publish workflow is_admin = bool((g.get("rights") or {}).get("admin")) publish_requested_msg = None - # Guard: if already emailed, force publish_mode to "web" to prevent re-send - already_emailed = bool(ghost_post.get("email") and ghost_post["email"].get("status")) - if already_emailed and publish_mode in ("email", "both"): - publish_mode = "web" + # Determine effective status + effective_status: str | None = None + current_status = g.post_data["post"].get("status", "draft") - if status == "published" and ghost_post.get("status") != "published" and not is_admin: - # Non-admin requesting publish: don't send status to Ghost, set local flag + if status == "published" and current_status != "published" and not is_admin: + # Non-admin requesting publish: keep as draft, set local flag publish_requested_msg = "Publish requested — an admin will review." - elif status and status != ghost_post.get("status"): - # Status is changing — determine email params based on publish_mode - email_kwargs: dict = {} - if status == "published" and publish_mode in ("email", "both") and newsletter_slug: - email_kwargs["newsletter_slug"] = newsletter_slug - email_kwargs["email_segment"] = "all" - if publish_mode == "email": - email_kwargs["email_only"] = True + elif status and status != current_status: + effective_status = status - from ...blog.ghost.ghost_posts import update_post as _up - ghost_post = await _up( - ghost_id=ghost_id, + try: + post = await writer_update( + g.s, + post_id=post_id, lexical_json=lexical_raw, - title=None, - updated_at=ghost_post["updated_at"], - status=status, - is_page=is_page, - **email_kwargs, + title=title or None, + expected_updated_at=updated_at, + feature_image=feature_image or None, + custom_excerpt=custom_excerpt or None, + feature_image_caption=feature_image_caption or None, + status=effective_status, + ) + except OptimisticLockError: + return redirect( + host_url(url_for("blog.post.admin.edit", slug=slug)) + + "?error=" + quote("Someone else edited this post. Please reload and try again.") ) - # Sync to local DB - if is_page: - await sync_single_page(g.s, ghost_id) - else: - await sync_single_post(g.s, ghost_id) + # Handle publish_requested flag + if publish_requested_msg: + post.publish_requested = True + elif status == "published" and is_admin: + post.publish_requested = False await g.s.flush() - # Handle publish_requested flag on the local post - from models.ghost_content import Post - from sqlalchemy import select as sa_select - local_post = (await g.s.execute( - sa_select(Post).where(Post.ghost_id == ghost_id) - )).scalar_one_or_none() - if local_post: - if publish_requested_msg: - local_post.publish_requested = True - elif status == "published" and is_admin: - local_post.publish_requested = False - await g.s.flush() - # Clear caches await invalidate_tag_cache("blog") await invalidate_tag_cache("post.post_detail") - # Redirect to GET to avoid resubmit warning on refresh (PRG pattern) - redirect_url = host_url(url_for("blog.post.admin.edit", slug=slug)) + "?saved=1" + # Redirect to GET (PRG pattern) — use post.slug in case it changed + redirect_url = host_url(url_for("blog.post.admin.edit", slug=post.slug)) + "?saved=1" if publish_requested_msg: redirect_url += "&publish_requested=1" return redirect(redirect_url) diff --git a/blog/scripts/final_ghost_sync.py b/blog/scripts/final_ghost_sync.py new file mode 100644 index 0000000..2ec9ac5 --- /dev/null +++ b/blog/scripts/final_ghost_sync.py @@ -0,0 +1,469 @@ +#!/usr/bin/env python3 +"""Final Ghost → db_blog sync with HTML verification + author→user migration. + +Run once before cutting over to native writes (Phase 1). + +Usage: + cd blog && python -m scripts.final_ghost_sync + +Requires GHOST_ADMIN_API_URL, GHOST_ADMIN_API_KEY, DATABASE_URL, +and DATABASE_URL_ACCOUNT env vars. +""" +from __future__ import annotations + +import asyncio +import difflib +import logging +import os +import re +import sys + +import httpx +from sqlalchemy import select, func, delete + +# Ensure project root is importable +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..")) +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "shared")) + +from shared.db.base import Base # noqa: E402 +from shared.db.session import get_session, get_account_session, _engine # noqa: E402 +from shared.infrastructure.ghost_admin_token import make_ghost_admin_jwt # noqa: E402 +from shared.models.ghost_content import Post, Author, Tag, PostUser, PostAuthor # noqa: E402 +from shared.models.user import User # noqa: E402 + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", +) +log = logging.getLogger("final_ghost_sync") + +GHOST_ADMIN_API_URL = os.environ["GHOST_ADMIN_API_URL"] + + +def _auth_header() -> dict[str, str]: + return {"Authorization": f"Ghost {make_ghost_admin_jwt()}"} + + +def _slugify(name: str) -> str: + s = name.strip().lower() + s = re.sub(r"[^\w\s-]", "", s) + s = re.sub(r"[\s_]+", "-", s) + return s.strip("-") + + +# --------------------------------------------------------------------------- +# Ghost API fetch +# --------------------------------------------------------------------------- + +async def _fetch_all(endpoint: str) -> list[dict]: + """Fetch all items from a Ghost Admin API endpoint.""" + url = ( + f"{GHOST_ADMIN_API_URL}/{endpoint}/" + "?include=authors,tags&limit=all" + "&formats=html,plaintext,mobiledoc,lexical" + ) + async with httpx.AsyncClient(timeout=60) as client: + resp = await client.get(url, headers=_auth_header()) + resp.raise_for_status() + key = endpoint # "posts" or "pages" + return resp.json().get(key, []) + + +async def fetch_all_ghost_content() -> list[dict]: + """Fetch all posts + pages from Ghost.""" + posts, pages = await asyncio.gather( + _fetch_all("posts"), + _fetch_all("pages"), + ) + for p in pages: + p["page"] = True + return posts + pages + + +# --------------------------------------------------------------------------- +# Author → User migration +# --------------------------------------------------------------------------- + +async def migrate_authors_to_users(author_bucket: dict[str, dict]) -> dict[str, int]: + """Ensure every Ghost author has a corresponding User in db_account. + + Returns mapping of ghost_author_id → user_id. + """ + ghost_id_to_user_id: dict[str, int] = {} + + async with get_account_session() as sess: + async with sess.begin(): + for ghost_author_id, ga in author_bucket.items(): + email = (ga.get("email") or "").strip().lower() + if not email: + log.warning( + "Author %s (%s) has no email — skipping user creation", + ghost_author_id, ga.get("name"), + ) + continue + + # Find existing user by email + user = (await sess.execute( + select(User).where(User.email == email) + )).scalar_one_or_none() + + if user is None: + # Auto-create user for this Ghost author + user = User( + email=email, + name=ga.get("name"), + slug=ga.get("slug") or _slugify(ga.get("name") or email.split("@")[0]), + bio=ga.get("bio"), + profile_image=ga.get("profile_image"), + cover_image=ga.get("cover_image"), + website=ga.get("website"), + location=ga.get("location"), + facebook=ga.get("facebook"), + twitter=ga.get("twitter"), + is_admin=True, # Ghost authors are admins + ) + sess.add(user) + await sess.flush() + log.info("Created user %d for author %s (%s)", user.id, ga.get("name"), email) + else: + # Update profile fields from Ghost author (fill in blanks) + if not user.slug: + user.slug = ga.get("slug") or _slugify(ga.get("name") or email.split("@")[0]) + if not user.name and ga.get("name"): + user.name = ga["name"] + if not user.bio and ga.get("bio"): + user.bio = ga["bio"] + if not user.profile_image and ga.get("profile_image"): + user.profile_image = ga["profile_image"] + if not user.cover_image and ga.get("cover_image"): + user.cover_image = ga["cover_image"] + if not user.website and ga.get("website"): + user.website = ga["website"] + if not user.location and ga.get("location"): + user.location = ga["location"] + if not user.facebook and ga.get("facebook"): + user.facebook = ga["facebook"] + if not user.twitter and ga.get("twitter"): + user.twitter = ga["twitter"] + await sess.flush() + log.info("Updated user %d profile from author %s", user.id, ga.get("name")) + + ghost_id_to_user_id[ghost_author_id] = user.id + + return ghost_id_to_user_id + + +async def populate_post_users( + data: list[dict], + ghost_author_to_user: dict[str, int], +) -> int: + """Populate post_users M2M and set user_id on all posts from author mapping. + + Returns number of post_users rows created. + """ + rows_created = 0 + + async with get_session() as sess: + async with sess.begin(): + for gp in data: + ghost_post_id = gp["id"] + post = (await sess.execute( + select(Post).where(Post.ghost_id == ghost_post_id) + )).scalar_one_or_none() + if not post: + continue + + # Set primary user_id from primary author + pa = gp.get("primary_author") + if pa and pa["id"] in ghost_author_to_user: + post.user_id = ghost_author_to_user[pa["id"]] + + # Build post_users from all authors + await sess.execute( + delete(PostUser).where(PostUser.post_id == post.id) + ) + seen_user_ids: set[int] = set() + for idx, a in enumerate(gp.get("authors") or []): + uid = ghost_author_to_user.get(a["id"]) + if uid and uid not in seen_user_ids: + seen_user_ids.add(uid) + sess.add(PostUser(post_id=post.id, user_id=uid, sort_order=idx)) + rows_created += 1 + + await sess.flush() + + return rows_created + + +# --------------------------------------------------------------------------- +# Content sync (reuse ghost_sync upsert logic) +# --------------------------------------------------------------------------- + +async def run_sync() -> dict: + """Run full Ghost content sync and author→user migration.""" + from bp.blog.ghost.ghost_sync import ( + _upsert_author, + _upsert_tag, + _upsert_post, + ) + + log.info("Fetching all content from Ghost...") + data = await fetch_all_ghost_content() + log.info("Received %d posts/pages from Ghost", len(data)) + + # Collect authors and tags + author_bucket: dict[str, dict] = {} + tag_bucket: dict[str, dict] = {} + + for p in data: + for a in p.get("authors") or []: + author_bucket[a["id"]] = a + if p.get("primary_author"): + author_bucket[p["primary_author"]["id"]] = p["primary_author"] + for t in p.get("tags") or []: + tag_bucket[t["id"]] = t + if p.get("primary_tag"): + tag_bucket[p["primary_tag"]["id"]] = p["primary_tag"] + + # Step 1: Upsert content into db_blog (existing Ghost sync logic) + async with get_session() as sess: + async with sess.begin(): + author_map: dict[str, Author] = {} + for ga in author_bucket.values(): + a = await _upsert_author(sess, ga) + author_map[ga["id"]] = a + log.info("Upserted %d authors (legacy table)", len(author_map)) + + tag_map: dict[str, Tag] = {} + for gt in tag_bucket.values(): + t = await _upsert_tag(sess, gt) + tag_map[gt["id"]] = t + log.info("Upserted %d tags", len(tag_map)) + + for gp in data: + await _upsert_post(sess, gp, author_map, tag_map) + log.info("Upserted %d posts/pages", len(data)) + + # Step 2: Migrate authors → users in db_account + log.info("") + log.info("--- Migrating Ghost authors → Users ---") + ghost_author_to_user = await migrate_authors_to_users(author_bucket) + log.info("Mapped %d Ghost authors to User records", len(ghost_author_to_user)) + + # Step 3: Populate post_users M2M and set user_id + log.info("") + log.info("--- Populating post_users M2M ---") + pu_rows = await populate_post_users(data, ghost_author_to_user) + log.info("Created %d post_users rows", pu_rows) + + n_posts = sum(1 for p in data if not p.get("page")) + n_pages = sum(1 for p in data if p.get("page")) + + return { + "posts": n_posts, + "pages": n_pages, + "authors": len(author_bucket), + "tags": len(tag_bucket), + "users_mapped": len(ghost_author_to_user), + "post_users_rows": pu_rows, + } + + +# --------------------------------------------------------------------------- +# HTML rendering verification +# --------------------------------------------------------------------------- + +def _normalize_html(html: str | None) -> str: + if not html: + return "" + return re.sub(r"\s+", " ", html.strip()) + + +async def verify_html_rendering() -> dict: + """Re-render all posts from lexical and compare with stored HTML.""" + from bp.blog.ghost.lexical_renderer import render_lexical + import json + + diffs_found = 0 + posts_checked = 0 + posts_no_lexical = 0 + + async with get_session() as sess: + result = await sess.execute( + select(Post).where( + Post.deleted_at.is_(None), + Post.status == "published", + ) + ) + posts = result.scalars().all() + + for post in posts: + if not post.lexical: + posts_no_lexical += 1 + continue + + posts_checked += 1 + try: + rendered = render_lexical(json.loads(post.lexical)) + except Exception as e: + log.error( + "Render failed for post %d (%s): %s", + post.id, post.slug, e, + ) + diffs_found += 1 + continue + + ghost_html = _normalize_html(post.html) + our_html = _normalize_html(rendered) + + if ghost_html != our_html: + diffs_found += 1 + diff = difflib.unified_diff( + ghost_html.splitlines(keepends=True), + our_html.splitlines(keepends=True), + fromfile=f"ghost/{post.slug}", + tofile=f"rendered/{post.slug}", + n=2, + ) + diff_text = "".join(diff) + if len(diff_text) > 2000: + diff_text = diff_text[:2000] + "\n... (truncated)" + log.warning( + "HTML diff for post %d (%s):\n%s", + post.id, post.slug, diff_text, + ) + + return { + "checked": posts_checked, + "no_lexical": posts_no_lexical, + "diffs": diffs_found, + } + + +# --------------------------------------------------------------------------- +# Verification queries +# --------------------------------------------------------------------------- + +async def run_verification() -> dict: + async with get_session() as sess: + total_posts = await sess.scalar( + select(func.count(Post.id)).where(Post.deleted_at.is_(None)) + ) + null_html = await sess.scalar( + select(func.count(Post.id)).where( + Post.deleted_at.is_(None), + Post.status == "published", + Post.html.is_(None), + ) + ) + null_lexical = await sess.scalar( + select(func.count(Post.id)).where( + Post.deleted_at.is_(None), + Post.status == "published", + Post.lexical.is_(None), + ) + ) + # Posts with user_id set + has_user = await sess.scalar( + select(func.count(Post.id)).where( + Post.deleted_at.is_(None), + Post.user_id.isnot(None), + ) + ) + no_user = await sess.scalar( + select(func.count(Post.id)).where( + Post.deleted_at.is_(None), + Post.user_id.is_(None), + ) + ) + + return { + "total_posts": total_posts, + "published_null_html": null_html, + "published_null_lexical": null_lexical, + "posts_with_user": has_user, + "posts_without_user": no_user, + } + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +async def main(): + log.info("=" * 60) + log.info("Final Ghost Sync — Cutover Preparation") + log.info("=" * 60) + + # Step 1: Full sync + author→user migration + log.info("") + log.info("--- Step 1: Full sync from Ghost + author→user migration ---") + stats = await run_sync() + log.info( + "Sync complete: %d posts, %d pages, %d authors, %d tags, %d users mapped", + stats["posts"], stats["pages"], stats["authors"], stats["tags"], + stats["users_mapped"], + ) + + # Step 2: Verification queries + log.info("") + log.info("--- Step 2: Verification queries ---") + vq = await run_verification() + log.info("Total non-deleted posts/pages: %d", vq["total_posts"]) + log.info("Published with NULL html: %d", vq["published_null_html"]) + log.info("Published with NULL lexical: %d", vq["published_null_lexical"]) + log.info("Posts with user_id: %d", vq["posts_with_user"]) + log.info("Posts WITHOUT user_id: %d", vq["posts_without_user"]) + + if vq["published_null_html"] > 0: + log.warning("WARN: Some published posts have no HTML!") + if vq["published_null_lexical"] > 0: + log.warning("WARN: Some published posts have no Lexical JSON!") + if vq["posts_without_user"] > 0: + log.warning("WARN: Some posts have no user_id — authors may lack email!") + + # Step 3: HTML rendering verification + log.info("") + log.info("--- Step 3: HTML rendering verification ---") + html_stats = await verify_html_rendering() + log.info( + "Checked %d posts, %d with diffs, %d without lexical", + html_stats["checked"], html_stats["diffs"], html_stats["no_lexical"], + ) + + if html_stats["diffs"] > 0: + log.warning( + "WARN: %d posts have HTML rendering differences — " + "review diffs above before cutover", + html_stats["diffs"], + ) + + # Summary + log.info("") + log.info("=" * 60) + log.info("SUMMARY") + log.info(" Posts synced: %d", stats["posts"]) + log.info(" Pages synced: %d", stats["pages"]) + log.info(" Authors synced: %d", stats["authors"]) + log.info(" Tags synced: %d", stats["tags"]) + log.info(" Users mapped: %d", stats["users_mapped"]) + log.info(" post_users rows: %d", stats["post_users_rows"]) + log.info(" HTML diffs: %d", html_stats["diffs"]) + log.info(" Published null HTML: %d", vq["published_null_html"]) + log.info(" Published null lex: %d", vq["published_null_lexical"]) + log.info(" Posts without user: %d", vq["posts_without_user"]) + log.info("=" * 60) + + if (html_stats["diffs"] == 0 + and vq["published_null_html"] == 0 + and vq["posts_without_user"] == 0): + log.info("All checks passed — safe to proceed with cutover.") + else: + log.warning("Review warnings above before proceeding.") + + await _engine.dispose() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/blog/services/post_writer.py b/blog/services/post_writer.py new file mode 100644 index 0000000..face44d --- /dev/null +++ b/blog/services/post_writer.py @@ -0,0 +1,457 @@ +"""Native post/page CRUD — replaces Ghost Admin API writes. + +All operations go directly to db_blog. Ghost is never called. +""" +from __future__ import annotations + +import json +import logging +import re +from datetime import datetime +from typing import Any, Optional + +import nh3 +from sqlalchemy import select, delete, func +from sqlalchemy.ext.asyncio import AsyncSession + +from models.ghost_content import Post, Tag, PostTag, PostUser +from shared.browser.app.utils import utcnow + +log = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _slugify(name: str) -> str: + s = name.strip().lower() + s = re.sub(r"[^\w\s-]", "", s) + s = re.sub(r"[\s_]+", "-", s) + return s.strip("-") + + +def _reading_time(plaintext: str | None) -> int: + """Estimate reading time in minutes (word count / 265, min 1).""" + if not plaintext: + return 0 + words = len(plaintext.split()) + return max(1, round(words / 265)) + + +def _extract_plaintext(html: str) -> str: + """Strip HTML tags to get plaintext.""" + text = re.sub(r"<[^>]+>", "", html) + text = re.sub(r"\s+", " ", text).strip() + return text + + +def _sanitize_html(html: str | None) -> str | None: + if not html: + return html + return nh3.clean( + html, + tags={ + "a", "abbr", "acronym", "b", "blockquote", "br", "code", + "div", "em", "figcaption", "figure", "h1", "h2", "h3", + "h4", "h5", "h6", "hr", "i", "img", "li", "ol", "p", + "pre", "span", "strong", "sub", "sup", "table", "tbody", + "td", "th", "thead", "tr", "ul", "video", "source", + "picture", "iframe", "audio", + }, + attributes={ + "*": {"class", "id", "style"}, + "a": {"href", "title", "target"}, + "img": {"src", "alt", "title", "width", "height", "loading"}, + "video": {"src", "controls", "width", "height", "poster"}, + "audio": {"src", "controls"}, + "source": {"src", "type"}, + "iframe": {"src", "width", "height", "frameborder", "allowfullscreen"}, + "td": {"colspan", "rowspan"}, + "th": {"colspan", "rowspan"}, + }, + link_rel="noopener noreferrer", + url_schemes={"http", "https", "mailto"}, + ) + + +def _render_and_extract(lexical_json: str) -> tuple[str, str, int]: + """Render HTML from Lexical JSON, extract plaintext, compute reading time. + + Returns (html, plaintext, reading_time). + """ + from bp.blog.ghost.lexical_renderer import render_lexical + + doc = json.loads(lexical_json) if isinstance(lexical_json, str) else lexical_json + html = render_lexical(doc) + html = _sanitize_html(html) + plaintext = _extract_plaintext(html or "") + rt = _reading_time(plaintext) + return html, plaintext, rt + + +async def _ensure_slug_unique(sess: AsyncSession, slug: str, exclude_post_id: int | None = None) -> str: + """Append -2, -3, etc. if slug already taken.""" + base_slug = slug + counter = 1 + while True: + q = select(Post.id).where(Post.slug == slug, Post.deleted_at.is_(None)) + if exclude_post_id: + q = q.where(Post.id != exclude_post_id) + existing = await sess.scalar(q) + if existing is None: + return slug + counter += 1 + slug = f"{base_slug}-{counter}" + + +async def _upsert_tags_by_name(sess: AsyncSession, tag_names: list[str]) -> list[Tag]: + """Find or create tags by name. Returns Tag objects in order.""" + tags: list[Tag] = [] + for name in tag_names: + name = name.strip() + if not name: + continue + tag = (await sess.execute( + select(Tag).where(Tag.name == name, Tag.deleted_at.is_(None)) + )).scalar_one_or_none() + if tag is None: + tag = Tag( + name=name, + slug=_slugify(name), + visibility="public", + ) + sess.add(tag) + await sess.flush() + tags.append(tag) + return tags + + +async def _rebuild_post_tags(sess: AsyncSession, post_id: int, tags: list[Tag]) -> None: + """Replace all post_tags for a post.""" + await sess.execute(delete(PostTag).where(PostTag.post_id == post_id)) + seen: set[int] = set() + for idx, tag in enumerate(tags): + if tag.id not in seen: + seen.add(tag.id) + sess.add(PostTag(post_id=post_id, tag_id=tag.id, sort_order=idx)) + await sess.flush() + + +async def _rebuild_post_users(sess: AsyncSession, post_id: int, user_ids: list[int]) -> None: + """Replace all post_users for a post.""" + await sess.execute(delete(PostUser).where(PostUser.post_id == post_id)) + seen: set[int] = set() + for idx, uid in enumerate(user_ids): + if uid not in seen: + seen.add(uid) + sess.add(PostUser(post_id=post_id, user_id=uid, sort_order=idx)) + await sess.flush() + + +async def _fire_ap_publish( + sess: AsyncSession, + post: Post, + old_status: str | None, + tag_objs: list[Tag], +) -> None: + """Fire AP federation activity on status transitions.""" + if post.is_page or not post.user_id: + return + + from bp.blog.ghost.ghost_sync import _build_ap_post_data + from shared.services.federation_publish import try_publish + from shared.infrastructure.urls import app_url + + post_url = app_url("blog", f"/{post.slug}/") + + if post.status == "published": + activity_type = "Create" if old_status != "published" else "Update" + await try_publish( + sess, + user_id=post.user_id, + activity_type=activity_type, + object_type="Note", + object_data=_build_ap_post_data(post, post_url, tag_objs), + source_type="Post", + source_id=post.id, + ) + elif old_status == "published" and post.status != "published": + await try_publish( + sess, + user_id=post.user_id, + activity_type="Delete", + object_type="Tombstone", + object_data={ + "id": post_url, + "formerType": "Note", + }, + source_type="Post", + source_id=post.id, + ) + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +async def create_post( + sess: AsyncSession, + *, + title: str, + lexical_json: str, + status: str = "draft", + user_id: int, + feature_image: str | None = None, + custom_excerpt: str | None = None, + feature_image_caption: str | None = None, + tag_names: list[str] | None = None, + is_page: bool = False, +) -> Post: + """Create a new post or page directly in db_blog.""" + html, plaintext, reading_time = _render_and_extract(lexical_json) + slug = await _ensure_slug_unique(sess, _slugify(title or "untitled")) + + now = utcnow() + post = Post( + title=title or "Untitled", + slug=slug, + lexical=lexical_json if isinstance(lexical_json, str) else json.dumps(lexical_json), + html=html, + plaintext=plaintext, + reading_time=reading_time, + status=status, + is_page=is_page, + feature_image=feature_image, + feature_image_caption=_sanitize_html(feature_image_caption), + custom_excerpt=custom_excerpt, + user_id=user_id, + visibility="public", + created_at=now, + updated_at=now, + published_at=now if status == "published" else None, + ) + sess.add(post) + await sess.flush() + + # Tags + if tag_names: + tags = await _upsert_tags_by_name(sess, tag_names) + await _rebuild_post_tags(sess, post.id, tags) + if tags: + post.primary_tag_id = tags[0].id + + # Post users (author) + await _rebuild_post_users(sess, post.id, [user_id]) + + # PageConfig for pages + if is_page: + from shared.models.page_config import PageConfig + existing = (await sess.execute( + select(PageConfig).where( + PageConfig.container_type == "page", + PageConfig.container_id == post.id, + ) + )).scalar_one_or_none() + if existing is None: + sess.add(PageConfig( + container_type="page", + container_id=post.id, + features={}, + )) + + await sess.flush() + + # AP federation + if status == "published": + tag_objs = (await _upsert_tags_by_name(sess, tag_names)) if tag_names else [] + await _fire_ap_publish(sess, post, None, tag_objs) + + return post + + +async def create_page( + sess: AsyncSession, + *, + title: str, + lexical_json: str, + status: str = "draft", + user_id: int, + feature_image: str | None = None, + custom_excerpt: str | None = None, + feature_image_caption: str | None = None, + tag_names: list[str] | None = None, +) -> Post: + """Create a new page. Convenience wrapper around create_post.""" + return await create_post( + sess, + title=title, + lexical_json=lexical_json, + status=status, + user_id=user_id, + feature_image=feature_image, + custom_excerpt=custom_excerpt, + feature_image_caption=feature_image_caption, + tag_names=tag_names, + is_page=True, + ) + + +async def update_post( + sess: AsyncSession, + *, + post_id: int, + lexical_json: str, + title: str | None = None, + expected_updated_at: datetime | str, + feature_image: str | None = ..., # type: ignore[assignment] + custom_excerpt: str | None = ..., # type: ignore[assignment] + feature_image_caption: str | None = ..., # type: ignore[assignment] + status: str | None = None, +) -> Post: + """Update post content. Optimistic lock via expected_updated_at. + + Fields set to ... (sentinel) are left unchanged. None clears the field. + Raises ValueError on optimistic lock conflict (409). + """ + _SENTINEL = ... + + post = await sess.get(Post, post_id) + if post is None: + raise ValueError(f"Post {post_id} not found") + + # Optimistic lock + if isinstance(expected_updated_at, str): + expected_updated_at = datetime.fromisoformat( + expected_updated_at.replace("Z", "+00:00") + ) + if post.updated_at and abs((post.updated_at - expected_updated_at).total_seconds()) > 1: + raise OptimisticLockError( + f"Post was modified at {post.updated_at}, expected {expected_updated_at}" + ) + + old_status = post.status + + # Render content + html, plaintext, reading_time = _render_and_extract(lexical_json) + post.lexical = lexical_json if isinstance(lexical_json, str) else json.dumps(lexical_json) + post.html = html + post.plaintext = plaintext + post.reading_time = reading_time + + if title is not None: + post.title = title + + if feature_image is not _SENTINEL: + post.feature_image = feature_image + if custom_excerpt is not _SENTINEL: + post.custom_excerpt = custom_excerpt + if feature_image_caption is not _SENTINEL: + post.feature_image_caption = _sanitize_html(feature_image_caption) + + if status is not None: + post.status = status + if status == "published" and not post.published_at: + post.published_at = utcnow() + + post.updated_at = utcnow() + await sess.flush() + + # AP federation on status change + tags = list(post.tags) if hasattr(post, "tags") and post.tags else [] + await _fire_ap_publish(sess, post, old_status, tags) + + return post + + +_SETTINGS_FIELDS = ( + "slug", "published_at", "featured", "visibility", "email_only", + "custom_template", "meta_title", "meta_description", "canonical_url", + "og_image", "og_title", "og_description", + "twitter_image", "twitter_title", "twitter_description", + "feature_image_alt", +) + + +async def update_post_settings( + sess: AsyncSession, + *, + post_id: int, + expected_updated_at: datetime | str, + tag_names: list[str] | None = None, + **kwargs: Any, +) -> Post: + """Update post settings (slug, tags, SEO, social, etc.). + + Optimistic lock via expected_updated_at. + """ + post = await sess.get(Post, post_id) + if post is None: + raise ValueError(f"Post {post_id} not found") + + # Optimistic lock + if isinstance(expected_updated_at, str): + expected_updated_at = datetime.fromisoformat( + expected_updated_at.replace("Z", "+00:00") + ) + if post.updated_at and abs((post.updated_at - expected_updated_at).total_seconds()) > 1: + raise OptimisticLockError( + f"Post was modified at {post.updated_at}, expected {expected_updated_at}" + ) + + old_status = post.status + + for field in _SETTINGS_FIELDS: + val = kwargs.get(field) + if val is not None: + if field == "slug": + val = await _ensure_slug_unique(sess, val, exclude_post_id=post.id) + if field == "featured": + val = bool(val) + if field == "email_only": + val = bool(val) + if field == "published_at": + if isinstance(val, str): + val = datetime.fromisoformat(val.replace("Z", "+00:00")) + setattr(post, field, val) + + # Tags + if tag_names is not None: + tags = await _upsert_tags_by_name(sess, tag_names) + await _rebuild_post_tags(sess, post.id, tags) + post.primary_tag_id = tags[0].id if tags else None + + post.updated_at = utcnow() + await sess.flush() + + # AP federation if visibility/status changed + tags_obj = list(post.tags) if hasattr(post, "tags") and post.tags else [] + await _fire_ap_publish(sess, post, old_status, tags_obj) + + return post + + +async def delete_post(sess: AsyncSession, post_id: int) -> None: + """Soft-delete a post via deleted_at.""" + post = await sess.get(Post, post_id) + if post is None: + return + + old_status = post.status + post.deleted_at = utcnow() + post.status = "deleted" + await sess.flush() + + # Fire AP Delete if was published + if old_status == "published" and post.user_id: + tags = list(post.tags) if hasattr(post, "tags") and post.tags else [] + await _fire_ap_publish(sess, post, old_status, tags) + + +# --------------------------------------------------------------------------- +# Exceptions +# --------------------------------------------------------------------------- + +class OptimisticLockError(Exception): + """Raised when optimistic lock check fails (stale updated_at).""" + pass diff --git a/shared/models/ghost_content.py b/shared/models/ghost_content.py index bb2e8be..3b49f94 100644 --- a/shared/models/ghost_content.py +++ b/shared/models/ghost_content.py @@ -19,7 +19,7 @@ class Tag(Base): __tablename__ = "tags" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) - ghost_id: Mapped[str] = mapped_column(String(64), index=True, unique=True, nullable=False) + ghost_id: Mapped[Optional[str]] = mapped_column(String(64), index=True, unique=True, nullable=True) slug: Mapped[str] = mapped_column(String(191), index=True, nullable=False) name: Mapped[str] = mapped_column(String(255), nullable=False) @@ -50,8 +50,8 @@ class Post(Base): __tablename__ = "posts" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) - ghost_id: Mapped[str] = mapped_column(String(64), index=True, unique=True, nullable=False) - uuid: Mapped[str] = mapped_column(String(64), unique=True, nullable=False) + ghost_id: Mapped[Optional[str]] = mapped_column(String(64), index=True, unique=True, nullable=True) + uuid: Mapped[str] = mapped_column(String(64), unique=True, nullable=False, server_default=func.gen_random_uuid()) slug: Mapped[str] = mapped_column(String(191), index=True, nullable=False) title: Mapped[str] = mapped_column(String(500), nullable=False) @@ -89,8 +89,8 @@ class Post(Base): comment_id: Mapped[Optional[str]] = mapped_column(String(191)) published_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True)) - updated_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True)) - created_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True)) + updated_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), server_default=func.now()) + created_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True), server_default=func.now()) deleted_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True)) user_id: Mapped[Optional[int]] = mapped_column( @@ -136,7 +136,7 @@ class Author(Base): __tablename__ = "authors" id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) - ghost_id: Mapped[str] = mapped_column(String(64), index=True, unique=True, nullable=False) + ghost_id: Mapped[Optional[str]] = mapped_column(String(64), index=True, unique=True, nullable=True) slug: Mapped[str] = mapped_column(String(191), index=True, nullable=False) name: Mapped[str] = mapped_column(String(255), nullable=False) @@ -192,3 +192,17 @@ class PostTag(Base): sort_order: Mapped[int] = mapped_column(Integer, default=0, nullable=False) +class PostUser(Base): + """Multi-author M2M: links posts to users (cross-DB, no FK on user_id).""" + __tablename__ = "post_users" + + post_id: Mapped[int] = mapped_column( + ForeignKey("posts.id", ondelete="CASCADE"), + primary_key=True, + ) + user_id: Mapped[int] = mapped_column( + Integer, primary_key=True, index=True, + ) + sort_order: Mapped[int] = mapped_column(Integer, default=0, nullable=False) + + diff --git a/shared/models/user.py b/shared/models/user.py index baa162b..fe388a1 100644 --- a/shared/models/user.py +++ b/shared/models/user.py @@ -23,6 +23,17 @@ class User(Base): stripe_customer_id: Mapped[str | None] = mapped_column(String(255), index=True, nullable=True) ghost_raw: Mapped[dict | None] = mapped_column(JSONB, nullable=True) + # Author profile fields (merged from Ghost Author) + slug: Mapped[str | None] = mapped_column(String(191), unique=True, index=True, nullable=True) + bio: Mapped[str | None] = mapped_column(Text, nullable=True) + profile_image: Mapped[str | None] = mapped_column(Text, nullable=True) + cover_image: Mapped[str | None] = mapped_column(Text, nullable=True) + website: Mapped[str | None] = mapped_column(Text, nullable=True) + location: Mapped[str | None] = mapped_column(Text, nullable=True) + facebook: Mapped[str | None] = mapped_column(Text, nullable=True) + twitter: Mapped[str | None] = mapped_column(Text, nullable=True) + is_admin: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False, server_default=func.false()) + # Relationships to Ghost-related entities user_newsletters = relationship("UserNewsletter", back_populates="user", cascade="all, delete-orphan", lazy="selectin")