Domain isolation: replace cross-domain imports with service calls
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 55s

Replace direct Post, MarketPlace, Calendar model queries and HTTP API
calls with typed service calls. Events registers all 4 services via
domain_services_fn with has() guards.

Key changes:
- app.py: use domain_services_fn, Post/Calendar/MarketPlace queries
  → services.blog/calendar/market, HTTP cart API → services.cart
- calendars/markets services: Post → services.blog
- post_associations: Post → services.blog, direct queries → services
- markets routes: remove unused MarketPlace import
- glue imports → shared imports throughout

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
giles
2026-02-19 04:30:20 +00:00
parent 7b15f37686
commit f1b5aeac53
7 changed files with 82 additions and 89 deletions

View File

@@ -5,7 +5,7 @@ from sqlalchemy import select
from sqlalchemy.sql import func
from models.calendars import CalendarEntry, CalendarEntryPost
from shared.models.ghost_content import Post
from shared.services.registry import services
async def add_post_to_entry(
@@ -28,9 +28,7 @@ async def add_post_to_entry(
return False, "Calendar entry not found"
# Check if post exists
post = await session.scalar(
select(Post).where(Post.id == post_id)
)
post = await services.blog.get_post_by_id(session, post_id)
if not post:
return False, "Post not found"
@@ -91,20 +89,22 @@ async def remove_post_from_entry(
async def get_entry_posts(
session: AsyncSession,
entry_id: int
) -> list[Post]:
) -> list:
"""
Get all posts associated with a calendar entry.
Get all posts (as PostDTOs) associated with a calendar entry.
"""
result = await session.execute(
select(Post)
.join(CalendarEntryPost, (CalendarEntryPost.content_id == Post.id) & (CalendarEntryPost.content_type == "post"))
.where(
select(CalendarEntryPost.content_id).where(
CalendarEntryPost.entry_id == entry_id,
CalendarEntryPost.deleted_at.is_(None)
CalendarEntryPost.content_type == "post",
CalendarEntryPost.deleted_at.is_(None),
)
.order_by(Post.title)
)
return list(result.scalars().all())
post_ids = list(result.scalars().all())
if not post_ids:
return []
posts = await services.blog.get_posts_by_ids(session, post_ids)
return sorted(posts, key=lambda p: (p.title or ""))
async def search_posts(
@@ -112,29 +112,10 @@ async def search_posts(
query: str,
page: int = 1,
per_page: int = 10
) -> tuple[list[Post], int]:
) -> tuple[list, int]:
"""
Search for posts by title with pagination.
If query is empty, returns all posts in published order.
Returns (posts, total_count).
Returns (post_dtos, total_count).
"""
# Build base query
if query:
# Search by title
count_stmt = select(func.count(Post.id)).where(Post.title.ilike(f"%{query}%"))
posts_stmt = select(Post).where(Post.title.ilike(f"%{query}%")).order_by(Post.title)
else:
# All posts in published order (newest first)
count_stmt = select(func.count(Post.id))
posts_stmt = select(Post).order_by(Post.published_at.desc().nullslast())
# Count total
count_result = await session.execute(count_stmt)
total = count_result.scalar() or 0
# Get paginated results
offset = (page - 1) * per_page
result = await session.execute(
posts_stmt.limit(per_page).offset(offset)
)
return list(result.scalars().all()), total
return await services.blog.search_posts(session, query, page, per_page)