Combines shared, blog, market, cart, events, federation, and account into a single repository. Eliminates submodule sync, sibling model copying at build time, and per-app CI orchestration. Changes: - Remove per-app .git, .gitmodules, .gitea, submodule shared/ dirs - Remove stale sibling model copies from each app - Update all 6 Dockerfiles for monorepo build context (root = .) - Add build directives to docker-compose.yml - Add single .gitea/workflows/ci.yml with change detection - Add .dockerignore for monorepo build context - Create __init__.py for federation and account (cross-app imports)
66 lines
2.5 KiB
Python
66 lines
2.5 KiB
Python
"""SQL-backed BlogService implementation.
|
|
|
|
Queries ``shared.models.ghost_content.Post`` — only this module may read
|
|
blog-domain tables on behalf of other domains.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from sqlalchemy import select, func
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from shared.models.ghost_content import Post
|
|
from shared.contracts.dtos import PostDTO
|
|
|
|
|
|
def _post_to_dto(post: Post) -> PostDTO:
|
|
return PostDTO(
|
|
id=post.id,
|
|
slug=post.slug,
|
|
title=post.title,
|
|
status=post.status,
|
|
visibility=post.visibility,
|
|
is_page=post.is_page,
|
|
feature_image=post.feature_image,
|
|
html=post.html,
|
|
excerpt=post.excerpt,
|
|
custom_excerpt=post.custom_excerpt,
|
|
published_at=post.published_at,
|
|
)
|
|
|
|
|
|
class SqlBlogService:
|
|
async def get_post_by_slug(self, session: AsyncSession, slug: str) -> PostDTO | None:
|
|
post = (
|
|
await session.execute(select(Post).where(Post.slug == slug))
|
|
).scalar_one_or_none()
|
|
return _post_to_dto(post) if post else None
|
|
|
|
async def get_post_by_id(self, session: AsyncSession, id: int) -> PostDTO | None:
|
|
post = (
|
|
await session.execute(select(Post).where(Post.id == id))
|
|
).scalar_one_or_none()
|
|
return _post_to_dto(post) if post else None
|
|
|
|
async def get_posts_by_ids(self, session: AsyncSession, ids: list[int]) -> list[PostDTO]:
|
|
if not ids:
|
|
return []
|
|
result = await session.execute(select(Post).where(Post.id.in_(ids)))
|
|
return [_post_to_dto(p) for p in result.scalars().all()]
|
|
|
|
async def search_posts(
|
|
self, session: AsyncSession, query: str, page: int = 1, per_page: int = 10,
|
|
) -> tuple[list[PostDTO], int]:
|
|
"""Search posts by title with pagination. Not part of the Protocol
|
|
(admin-only use in events), but provided for convenience."""
|
|
if query:
|
|
count_stmt = select(func.count(Post.id)).where(Post.title.ilike(f"%{query}%"))
|
|
posts_stmt = select(Post).where(Post.title.ilike(f"%{query}%")).order_by(Post.title)
|
|
else:
|
|
count_stmt = select(func.count(Post.id))
|
|
posts_stmt = select(Post).order_by(Post.published_at.desc().nullslast())
|
|
|
|
total = (await session.execute(count_stmt)).scalar() or 0
|
|
offset = (page - 1) * per_page
|
|
result = await session.execute(posts_stmt.limit(per_page).offset(offset))
|
|
return [_post_to_dto(p) for p in result.scalars().all()], total
|