Model: add sx_content column to Post. Writer: accept sx_content in create_post, create_page, update_post. Routes: read sx_content from form data in new post, new page, and edit routes. Read pipeline: ghost_db includes sx_content in public dict, detail/home views prefer sx_content over html when available, PostDTO includes sx_content. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
79 lines
2.8 KiB
Python
79 lines
2.8 KiB
Python
"""Blog app service registration."""
|
|
from __future__ import annotations
|
|
|
|
from sqlalchemy import select, func
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from shared.contracts.dtos import PostDTO
|
|
|
|
from models.content import Post
|
|
|
|
|
|
def _post_to_dto(post: Post) -> PostDTO:
|
|
return PostDTO(
|
|
id=post.id,
|
|
slug=post.slug,
|
|
title=post.title,
|
|
status=post.status,
|
|
visibility=post.visibility,
|
|
is_page=post.is_page,
|
|
feature_image=post.feature_image,
|
|
html=post.html,
|
|
sx_content=post.sx_content,
|
|
excerpt=post.excerpt,
|
|
custom_excerpt=post.custom_excerpt,
|
|
published_at=post.published_at,
|
|
)
|
|
|
|
|
|
class SqlBlogService:
|
|
async def get_post_by_slug(self, session: AsyncSession, slug: str) -> PostDTO | None:
|
|
post = (
|
|
await session.execute(select(Post).where(Post.slug == slug))
|
|
).scalar_one_or_none()
|
|
return _post_to_dto(post) if post else None
|
|
|
|
async def get_post_by_id(self, session: AsyncSession, id: int) -> PostDTO | None:
|
|
post = (
|
|
await session.execute(select(Post).where(Post.id == id))
|
|
).scalar_one_or_none()
|
|
return _post_to_dto(post) if post else None
|
|
|
|
async def get_posts_by_ids(self, session: AsyncSession, ids: list[int]) -> list[PostDTO]:
|
|
if not ids:
|
|
return []
|
|
result = await session.execute(select(Post).where(Post.id.in_(ids)))
|
|
return [_post_to_dto(p) for p in result.scalars().all()]
|
|
|
|
async def search_posts(
|
|
self, session: AsyncSession, query: str, page: int = 1, per_page: int = 10,
|
|
) -> tuple[list[PostDTO], int]:
|
|
if query:
|
|
count_stmt = select(func.count(Post.id)).where(Post.title.ilike(f"%{query}%"))
|
|
posts_stmt = select(Post).where(Post.title.ilike(f"%{query}%")).order_by(Post.title)
|
|
else:
|
|
count_stmt = select(func.count(Post.id))
|
|
posts_stmt = select(Post).order_by(Post.published_at.desc().nullslast())
|
|
|
|
total = (await session.execute(count_stmt)).scalar() or 0
|
|
offset = (page - 1) * per_page
|
|
result = await session.execute(posts_stmt.limit(per_page).offset(offset))
|
|
return [_post_to_dto(p) for p in result.scalars().all()], total
|
|
|
|
|
|
# Module-level singleton — import this in blog code.
|
|
blog_service = SqlBlogService()
|
|
|
|
|
|
def register_domain_services() -> None:
|
|
"""Register services for the blog app.
|
|
|
|
Blog owns: Post, Tag, Author, PostAuthor, PostTag.
|
|
Cross-app calls go over HTTP via call_action() / fetch_data().
|
|
"""
|
|
# Federation needed for AP shared infrastructure (activitypub blueprint)
|
|
from shared.services.registry import services
|
|
if not services.has("federation"):
|
|
from shared.services.federation_impl import SqlFederationService
|
|
services.federation = SqlFederationService()
|