Files
mono/blog/services/__init__.py
giles 382d1b7c7a Decouple blog models and BlogService from shared layer
Move Post/Author/Tag/PostAuthor/PostTag/PostUser models from
shared/models/ghost_content.py to blog/models/content.py so blog-domain
models no longer live in the shared layer. Replace the shared
SqlBlogService + BlogService protocol with a blog-local singleton
(blog_service), and switch entry_associations.py from direct DB access
to HTTP fetch_data("blog", "post-by-id") to respect the inter-service
boundary.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-01 13:28:11 +00:00

78 lines
2.8 KiB
Python

"""Blog app service registration."""
from __future__ import annotations
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from shared.contracts.dtos import PostDTO
from models.content import Post
def _post_to_dto(post: Post) -> PostDTO:
return PostDTO(
id=post.id,
slug=post.slug,
title=post.title,
status=post.status,
visibility=post.visibility,
is_page=post.is_page,
feature_image=post.feature_image,
html=post.html,
excerpt=post.excerpt,
custom_excerpt=post.custom_excerpt,
published_at=post.published_at,
)
class SqlBlogService:
async def get_post_by_slug(self, session: AsyncSession, slug: str) -> PostDTO | None:
post = (
await session.execute(select(Post).where(Post.slug == slug))
).scalar_one_or_none()
return _post_to_dto(post) if post else None
async def get_post_by_id(self, session: AsyncSession, id: int) -> PostDTO | None:
post = (
await session.execute(select(Post).where(Post.id == id))
).scalar_one_or_none()
return _post_to_dto(post) if post else None
async def get_posts_by_ids(self, session: AsyncSession, ids: list[int]) -> list[PostDTO]:
if not ids:
return []
result = await session.execute(select(Post).where(Post.id.in_(ids)))
return [_post_to_dto(p) for p in result.scalars().all()]
async def search_posts(
self, session: AsyncSession, query: str, page: int = 1, per_page: int = 10,
) -> tuple[list[PostDTO], int]:
if query:
count_stmt = select(func.count(Post.id)).where(Post.title.ilike(f"%{query}%"))
posts_stmt = select(Post).where(Post.title.ilike(f"%{query}%")).order_by(Post.title)
else:
count_stmt = select(func.count(Post.id))
posts_stmt = select(Post).order_by(Post.published_at.desc().nullslast())
total = (await session.execute(count_stmt)).scalar() or 0
offset = (page - 1) * per_page
result = await session.execute(posts_stmt.limit(per_page).offset(offset))
return [_post_to_dto(p) for p in result.scalars().all()], total
# Module-level singleton — import this in blog code.
blog_service = SqlBlogService()
def register_domain_services() -> None:
"""Register services for the blog app.
Blog owns: Post, Tag, Author, PostAuthor, PostTag.
Cross-app calls go over HTTP via call_action() / fetch_data().
"""
# Federation needed for AP shared infrastructure (activitypub blueprint)
from shared.services.registry import services
if not services.has("federation"):
from shared.services.federation_impl import SqlFederationService
services.federation = SqlFederationService()