Model: add sx_content column to Post. Writer: accept sx_content in create_post, create_page, update_post. Routes: read sx_content from form data in new post, new page, and edit routes. Read pipeline: ghost_db includes sx_content in public dict, detail/home views prefer sx_content over html when available, PostDTO includes sx_content. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
466 lines
15 KiB
Python
466 lines
15 KiB
Python
"""Native post/page CRUD — replaces Ghost Admin API writes.
|
|
|
|
All operations go directly to db_blog. Ghost is never called.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
from datetime import datetime
|
|
from typing import Any, Optional
|
|
|
|
import nh3
|
|
from sqlalchemy import select, delete, func
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from models.ghost_content import Post, Tag, PostTag, PostUser
|
|
from shared.browser.app.utils import utcnow
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _slugify(name: str) -> str:
|
|
s = name.strip().lower()
|
|
s = re.sub(r"[^\w\s-]", "", s)
|
|
s = re.sub(r"[\s_]+", "-", s)
|
|
return s.strip("-")
|
|
|
|
|
|
def _reading_time(plaintext: str | None) -> int:
|
|
"""Estimate reading time in minutes (word count / 265, min 1)."""
|
|
if not plaintext:
|
|
return 0
|
|
words = len(plaintext.split())
|
|
return max(1, round(words / 265))
|
|
|
|
|
|
def _extract_plaintext(html: str) -> str:
|
|
"""Strip HTML tags to get plaintext."""
|
|
text = re.sub(r"<[^>]+>", "", html)
|
|
text = re.sub(r"\s+", " ", text).strip()
|
|
return text
|
|
|
|
|
|
def _sanitize_html(html: str | None) -> str | None:
|
|
if not html:
|
|
return html
|
|
return nh3.clean(
|
|
html,
|
|
tags={
|
|
"a", "abbr", "acronym", "b", "blockquote", "br", "code",
|
|
"div", "em", "figcaption", "figure", "h1", "h2", "h3",
|
|
"h4", "h5", "h6", "hr", "i", "img", "li", "ol", "p",
|
|
"pre", "span", "strong", "sub", "sup", "table", "tbody",
|
|
"td", "th", "thead", "tr", "ul", "video", "source",
|
|
"picture", "iframe", "audio",
|
|
},
|
|
attributes={
|
|
"*": {"class", "id", "style"},
|
|
"a": {"href", "title", "target"},
|
|
"img": {"src", "alt", "title", "width", "height", "loading"},
|
|
"video": {"src", "controls", "width", "height", "poster"},
|
|
"audio": {"src", "controls"},
|
|
"source": {"src", "type"},
|
|
"iframe": {"src", "width", "height", "frameborder", "allowfullscreen"},
|
|
"td": {"colspan", "rowspan"},
|
|
"th": {"colspan", "rowspan"},
|
|
},
|
|
link_rel="noopener noreferrer",
|
|
url_schemes={"http", "https", "mailto"},
|
|
)
|
|
|
|
|
|
def _render_and_extract(lexical_json: str) -> tuple[str, str, int]:
|
|
"""Render HTML from Lexical JSON, extract plaintext, compute reading time.
|
|
|
|
Returns (html, plaintext, reading_time).
|
|
"""
|
|
from bp.blog.ghost.lexical_renderer import render_lexical
|
|
|
|
doc = json.loads(lexical_json) if isinstance(lexical_json, str) else lexical_json
|
|
html = render_lexical(doc)
|
|
html = _sanitize_html(html)
|
|
plaintext = _extract_plaintext(html or "")
|
|
rt = _reading_time(plaintext)
|
|
return html, plaintext, rt
|
|
|
|
|
|
async def _ensure_slug_unique(sess: AsyncSession, slug: str, exclude_post_id: int | None = None) -> str:
|
|
"""Append -2, -3, etc. if slug already taken."""
|
|
base_slug = slug
|
|
counter = 1
|
|
while True:
|
|
q = select(Post.id).where(Post.slug == slug, Post.deleted_at.is_(None))
|
|
if exclude_post_id:
|
|
q = q.where(Post.id != exclude_post_id)
|
|
existing = await sess.scalar(q)
|
|
if existing is None:
|
|
return slug
|
|
counter += 1
|
|
slug = f"{base_slug}-{counter}"
|
|
|
|
|
|
async def _upsert_tags_by_name(sess: AsyncSession, tag_names: list[str]) -> list[Tag]:
|
|
"""Find or create tags by name. Returns Tag objects in order."""
|
|
tags: list[Tag] = []
|
|
for name in tag_names:
|
|
name = name.strip()
|
|
if not name:
|
|
continue
|
|
tag = (await sess.execute(
|
|
select(Tag).where(Tag.name == name, Tag.deleted_at.is_(None))
|
|
)).scalar_one_or_none()
|
|
if tag is None:
|
|
tag = Tag(
|
|
name=name,
|
|
slug=_slugify(name),
|
|
visibility="public",
|
|
)
|
|
sess.add(tag)
|
|
await sess.flush()
|
|
tags.append(tag)
|
|
return tags
|
|
|
|
|
|
async def _rebuild_post_tags(sess: AsyncSession, post_id: int, tags: list[Tag]) -> None:
|
|
"""Replace all post_tags for a post."""
|
|
await sess.execute(delete(PostTag).where(PostTag.post_id == post_id))
|
|
seen: set[int] = set()
|
|
for idx, tag in enumerate(tags):
|
|
if tag.id not in seen:
|
|
seen.add(tag.id)
|
|
sess.add(PostTag(post_id=post_id, tag_id=tag.id, sort_order=idx))
|
|
await sess.flush()
|
|
|
|
|
|
async def _rebuild_post_users(sess: AsyncSession, post_id: int, user_ids: list[int]) -> None:
|
|
"""Replace all post_users for a post."""
|
|
await sess.execute(delete(PostUser).where(PostUser.post_id == post_id))
|
|
seen: set[int] = set()
|
|
for idx, uid in enumerate(user_ids):
|
|
if uid not in seen:
|
|
seen.add(uid)
|
|
sess.add(PostUser(post_id=post_id, user_id=uid, sort_order=idx))
|
|
await sess.flush()
|
|
|
|
|
|
async def _fire_ap_publish(
|
|
sess: AsyncSession,
|
|
post: Post,
|
|
old_status: str | None,
|
|
tag_objs: list[Tag],
|
|
) -> None:
|
|
"""Fire AP federation activity on status transitions."""
|
|
if post.is_page or not post.user_id:
|
|
return
|
|
|
|
from bp.blog.ghost.ghost_sync import _build_ap_post_data
|
|
from shared.services.federation_publish import try_publish
|
|
from shared.infrastructure.urls import app_url
|
|
|
|
post_url = app_url("blog", f"/{post.slug}/")
|
|
|
|
if post.status == "published":
|
|
activity_type = "Create" if old_status != "published" else "Update"
|
|
await try_publish(
|
|
sess,
|
|
user_id=post.user_id,
|
|
activity_type=activity_type,
|
|
object_type="Note",
|
|
object_data=_build_ap_post_data(post, post_url, tag_objs),
|
|
source_type="Post",
|
|
source_id=post.id,
|
|
)
|
|
elif old_status == "published" and post.status != "published":
|
|
await try_publish(
|
|
sess,
|
|
user_id=post.user_id,
|
|
activity_type="Delete",
|
|
object_type="Tombstone",
|
|
object_data={
|
|
"id": post_url,
|
|
"formerType": "Note",
|
|
},
|
|
source_type="Post",
|
|
source_id=post.id,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public API
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def create_post(
|
|
sess: AsyncSession,
|
|
*,
|
|
title: str,
|
|
lexical_json: str,
|
|
status: str = "draft",
|
|
user_id: int,
|
|
feature_image: str | None = None,
|
|
custom_excerpt: str | None = None,
|
|
feature_image_caption: str | None = None,
|
|
tag_names: list[str] | None = None,
|
|
is_page: bool = False,
|
|
sx_content: str | None = None,
|
|
) -> Post:
|
|
"""Create a new post or page directly in db_blog."""
|
|
html, plaintext, reading_time = _render_and_extract(lexical_json)
|
|
slug = await _ensure_slug_unique(sess, _slugify(title or "untitled"))
|
|
|
|
now = utcnow()
|
|
post = Post(
|
|
title=title or "Untitled",
|
|
slug=slug,
|
|
lexical=lexical_json if isinstance(lexical_json, str) else json.dumps(lexical_json),
|
|
sx_content=sx_content,
|
|
html=html,
|
|
plaintext=plaintext,
|
|
reading_time=reading_time,
|
|
status=status,
|
|
is_page=is_page,
|
|
feature_image=feature_image,
|
|
feature_image_caption=_sanitize_html(feature_image_caption),
|
|
custom_excerpt=custom_excerpt,
|
|
user_id=user_id,
|
|
visibility="public",
|
|
created_at=now,
|
|
updated_at=now,
|
|
published_at=now if status == "published" else None,
|
|
)
|
|
sess.add(post)
|
|
await sess.flush()
|
|
|
|
# Tags
|
|
if tag_names:
|
|
tags = await _upsert_tags_by_name(sess, tag_names)
|
|
await _rebuild_post_tags(sess, post.id, tags)
|
|
if tags:
|
|
post.primary_tag_id = tags[0].id
|
|
|
|
# Post users (author)
|
|
await _rebuild_post_users(sess, post.id, [user_id])
|
|
|
|
# PageConfig for pages
|
|
if is_page:
|
|
from shared.models.page_config import PageConfig
|
|
existing = (await sess.execute(
|
|
select(PageConfig).where(
|
|
PageConfig.container_type == "page",
|
|
PageConfig.container_id == post.id,
|
|
)
|
|
)).scalar_one_or_none()
|
|
if existing is None:
|
|
sess.add(PageConfig(
|
|
container_type="page",
|
|
container_id=post.id,
|
|
features={},
|
|
))
|
|
|
|
await sess.flush()
|
|
|
|
# AP federation
|
|
if status == "published":
|
|
tag_objs = (await _upsert_tags_by_name(sess, tag_names)) if tag_names else []
|
|
await _fire_ap_publish(sess, post, None, tag_objs)
|
|
|
|
return post
|
|
|
|
|
|
async def create_page(
|
|
sess: AsyncSession,
|
|
*,
|
|
title: str,
|
|
lexical_json: str,
|
|
status: str = "draft",
|
|
user_id: int,
|
|
feature_image: str | None = None,
|
|
custom_excerpt: str | None = None,
|
|
feature_image_caption: str | None = None,
|
|
tag_names: list[str] | None = None,
|
|
sx_content: str | None = None,
|
|
) -> Post:
|
|
"""Create a new page. Convenience wrapper around create_post."""
|
|
return await create_post(
|
|
sess,
|
|
title=title,
|
|
lexical_json=lexical_json,
|
|
status=status,
|
|
user_id=user_id,
|
|
feature_image=feature_image,
|
|
custom_excerpt=custom_excerpt,
|
|
feature_image_caption=feature_image_caption,
|
|
tag_names=tag_names,
|
|
is_page=True,
|
|
sx_content=sx_content,
|
|
)
|
|
|
|
|
|
async def update_post(
|
|
sess: AsyncSession,
|
|
*,
|
|
post_id: int,
|
|
lexical_json: str,
|
|
title: str | None = None,
|
|
expected_updated_at: datetime | str,
|
|
feature_image: str | None = ..., # type: ignore[assignment]
|
|
custom_excerpt: str | None = ..., # type: ignore[assignment]
|
|
feature_image_caption: str | None = ..., # type: ignore[assignment]
|
|
status: str | None = None,
|
|
sx_content: str | None = ..., # type: ignore[assignment]
|
|
) -> Post:
|
|
"""Update post content. Optimistic lock via expected_updated_at.
|
|
|
|
Fields set to ... (sentinel) are left unchanged. None clears the field.
|
|
Raises ValueError on optimistic lock conflict (409).
|
|
"""
|
|
_SENTINEL = ...
|
|
|
|
post = await sess.get(Post, post_id)
|
|
if post is None:
|
|
raise ValueError(f"Post {post_id} not found")
|
|
|
|
# Optimistic lock
|
|
if isinstance(expected_updated_at, str):
|
|
expected_updated_at = datetime.fromisoformat(
|
|
expected_updated_at.replace("Z", "+00:00")
|
|
)
|
|
if post.updated_at and abs((post.updated_at - expected_updated_at).total_seconds()) > 1:
|
|
raise OptimisticLockError(
|
|
f"Post was modified at {post.updated_at}, expected {expected_updated_at}"
|
|
)
|
|
|
|
old_status = post.status
|
|
|
|
# Render content
|
|
html, plaintext, reading_time = _render_and_extract(lexical_json)
|
|
post.lexical = lexical_json if isinstance(lexical_json, str) else json.dumps(lexical_json)
|
|
post.html = html
|
|
post.plaintext = plaintext
|
|
post.reading_time = reading_time
|
|
|
|
if title is not None:
|
|
post.title = title
|
|
|
|
if sx_content is not _SENTINEL:
|
|
post.sx_content = sx_content
|
|
|
|
if feature_image is not _SENTINEL:
|
|
post.feature_image = feature_image
|
|
if custom_excerpt is not _SENTINEL:
|
|
post.custom_excerpt = custom_excerpt
|
|
if feature_image_caption is not _SENTINEL:
|
|
post.feature_image_caption = _sanitize_html(feature_image_caption)
|
|
|
|
if status is not None:
|
|
post.status = status
|
|
if status == "published" and not post.published_at:
|
|
post.published_at = utcnow()
|
|
|
|
post.updated_at = utcnow()
|
|
await sess.flush()
|
|
|
|
# AP federation on status change
|
|
tags = list(post.tags) if hasattr(post, "tags") and post.tags else []
|
|
await _fire_ap_publish(sess, post, old_status, tags)
|
|
|
|
return post
|
|
|
|
|
|
_SETTINGS_FIELDS = (
|
|
"slug", "published_at", "featured", "visibility", "email_only",
|
|
"custom_template", "meta_title", "meta_description", "canonical_url",
|
|
"og_image", "og_title", "og_description",
|
|
"twitter_image", "twitter_title", "twitter_description",
|
|
"feature_image_alt",
|
|
)
|
|
|
|
|
|
async def update_post_settings(
|
|
sess: AsyncSession,
|
|
*,
|
|
post_id: int,
|
|
expected_updated_at: datetime | str,
|
|
tag_names: list[str] | None = None,
|
|
**kwargs: Any,
|
|
) -> Post:
|
|
"""Update post settings (slug, tags, SEO, social, etc.).
|
|
|
|
Optimistic lock via expected_updated_at.
|
|
"""
|
|
post = await sess.get(Post, post_id)
|
|
if post is None:
|
|
raise ValueError(f"Post {post_id} not found")
|
|
|
|
# Optimistic lock
|
|
if isinstance(expected_updated_at, str):
|
|
expected_updated_at = datetime.fromisoformat(
|
|
expected_updated_at.replace("Z", "+00:00")
|
|
)
|
|
if post.updated_at and abs((post.updated_at - expected_updated_at).total_seconds()) > 1:
|
|
raise OptimisticLockError(
|
|
f"Post was modified at {post.updated_at}, expected {expected_updated_at}"
|
|
)
|
|
|
|
old_status = post.status
|
|
|
|
for field in _SETTINGS_FIELDS:
|
|
val = kwargs.get(field)
|
|
if val is not None:
|
|
if field == "slug":
|
|
val = await _ensure_slug_unique(sess, val, exclude_post_id=post.id)
|
|
if field == "featured":
|
|
val = bool(val)
|
|
if field == "email_only":
|
|
val = bool(val)
|
|
if field == "published_at":
|
|
if isinstance(val, str):
|
|
val = datetime.fromisoformat(val.replace("Z", "+00:00"))
|
|
setattr(post, field, val)
|
|
|
|
# Tags
|
|
if tag_names is not None:
|
|
tags = await _upsert_tags_by_name(sess, tag_names)
|
|
await _rebuild_post_tags(sess, post.id, tags)
|
|
post.primary_tag_id = tags[0].id if tags else None
|
|
|
|
post.updated_at = utcnow()
|
|
await sess.flush()
|
|
|
|
# AP federation if visibility/status changed
|
|
tags_obj = list(post.tags) if hasattr(post, "tags") and post.tags else []
|
|
await _fire_ap_publish(sess, post, old_status, tags_obj)
|
|
|
|
return post
|
|
|
|
|
|
async def delete_post(sess: AsyncSession, post_id: int) -> None:
|
|
"""Soft-delete a post via deleted_at."""
|
|
post = await sess.get(Post, post_id)
|
|
if post is None:
|
|
return
|
|
|
|
old_status = post.status
|
|
post.deleted_at = utcnow()
|
|
post.status = "deleted"
|
|
await sess.flush()
|
|
|
|
# Fire AP Delete if was published
|
|
if old_status == "published" and post.user_id:
|
|
tags = list(post.tags) if hasattr(post, "tags") and post.tags else []
|
|
await _fire_ap_publish(sess, post, old_status, tags)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Exceptions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class OptimisticLockError(Exception):
|
|
"""Raised when optimistic lock check fails (stale updated_at)."""
|
|
pass
|