This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
shared/services/blog_impl.py
giles 70b1c7de10 Domain isolation: typed contracts, service registry, and composable wiring
Add typed service contracts (Protocols + frozen DTOs) in shared/contracts/
for cross-domain communication. Each domain exposes a service interface
(BlogService, CalendarService, MarketService, CartService) backed by SQL
implementations in shared/services/. A singleton registry with has() guards
enables composable startup — apps register their own domain service and
stubs for absent domains.

Absorbs glue layer: navigation, relationships, event handlers (login,
container, order) now live in shared/ with has()-guarded service calls.
Factory gains domain_services_fn parameter for per-app service registration.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 04:29:10 +00:00

66 lines
2.5 KiB
Python

"""SQL-backed BlogService implementation.
Queries ``shared.models.ghost_content.Post`` — only this module may read
blog-domain tables on behalf of other domains.
"""
from __future__ import annotations
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from shared.models.ghost_content import Post
from shared.contracts.dtos import PostDTO
def _post_to_dto(post: Post) -> PostDTO:
return PostDTO(
id=post.id,
slug=post.slug,
title=post.title,
status=post.status,
visibility=post.visibility,
is_page=post.is_page,
feature_image=post.feature_image,
html=post.html,
excerpt=post.excerpt,
custom_excerpt=post.custom_excerpt,
published_at=post.published_at,
)
class SqlBlogService:
async def get_post_by_slug(self, session: AsyncSession, slug: str) -> PostDTO | None:
post = (
await session.execute(select(Post).where(Post.slug == slug))
).scalar_one_or_none()
return _post_to_dto(post) if post else None
async def get_post_by_id(self, session: AsyncSession, id: int) -> PostDTO | None:
post = (
await session.execute(select(Post).where(Post.id == id))
).scalar_one_or_none()
return _post_to_dto(post) if post else None
async def get_posts_by_ids(self, session: AsyncSession, ids: list[int]) -> list[PostDTO]:
if not ids:
return []
result = await session.execute(select(Post).where(Post.id.in_(ids)))
return [_post_to_dto(p) for p in result.scalars().all()]
async def search_posts(
self, session: AsyncSession, query: str, page: int = 1, per_page: int = 10,
) -> tuple[list[PostDTO], int]:
"""Search posts by title with pagination. Not part of the Protocol
(admin-only use in events), but provided for convenience."""
if query:
count_stmt = select(func.count(Post.id)).where(Post.title.ilike(f"%{query}%"))
posts_stmt = select(Post).where(Post.title.ilike(f"%{query}%")).order_by(Post.title)
else:
count_stmt = select(func.count(Post.id))
posts_stmt = select(Post).order_by(Post.published_at.desc().nullslast())
total = (await session.execute(count_stmt)).scalar() or 0
offset = (page - 1) * per_page
result = await session.execute(posts_stmt.limit(per_page).offset(offset))
return [_post_to_dto(p) for p in result.scalars().all()], total