This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
blog/bp/menu_items/services/menu_items.py
giles 4155df7e7c
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 50s
Domain isolation: replace cross-domain imports with service calls
Replace direct Calendar, MarketPlace, and Post model queries with typed
service calls (services.blog, services.calendar, services.market,
services.cart). Blog registers all 4 services via domain_services_fn
with has() guards for composable deployment.

Key changes:
- app.py: use domain_services_fn instead of inline service registration
- admin routes: MarketPlace queries → services.market.marketplaces_for_container()
- entry_associations: CalendarEntryPost → services.calendar.entry_ids_for_content()
- markets service: Post query → services.blog.get_post_by_id/slug()
- posts_data, post routes: use calendar/market/cart services
- menu_items: glue imports → shared imports

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 04:30:14 +00:00

210 lines
6.0 KiB
Python

from __future__ import annotations
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from shared.models.menu_node import MenuNode
from models.ghost_content import Post
from shared.services.relationships import attach_child, detach_child
class MenuItemError(ValueError):
"""Base error for menu item service operations."""
async def get_all_menu_items(session: AsyncSession) -> list[MenuNode]:
"""
Get all menu nodes (excluding deleted), ordered by sort_order.
"""
result = await session.execute(
select(MenuNode)
.where(MenuNode.deleted_at.is_(None), MenuNode.depth == 0)
.order_by(MenuNode.sort_order.asc(), MenuNode.id.asc())
)
return list(result.scalars().all())
async def get_menu_item_by_id(session: AsyncSession, item_id: int) -> MenuNode | None:
"""Get a menu node by ID (excluding deleted)."""
result = await session.execute(
select(MenuNode)
.where(MenuNode.id == item_id, MenuNode.deleted_at.is_(None))
)
return result.scalar_one_or_none()
async def create_menu_item(
session: AsyncSession,
post_id: int,
sort_order: int | None = None
) -> MenuNode:
"""
Create a MenuNode + ContainerRelation for a page.
If sort_order is not provided, adds to end of list.
"""
# Verify post exists and is a page
post = await session.scalar(
select(Post).where(Post.id == post_id)
)
if not post:
raise MenuItemError(f"Post {post_id} does not exist.")
if not post.is_page:
raise MenuItemError("Only pages can be added as menu items, not posts.")
# If no sort_order provided, add to end
if sort_order is None:
max_order = await session.scalar(
select(func.max(MenuNode.sort_order))
.where(MenuNode.deleted_at.is_(None), MenuNode.depth == 0)
)
sort_order = (max_order or 0) + 1
# Check for duplicate (same page, not deleted)
existing = await session.scalar(
select(MenuNode).where(
MenuNode.container_type == "page",
MenuNode.container_id == post_id,
MenuNode.deleted_at.is_(None),
)
)
if existing:
raise MenuItemError("Menu item for this page already exists.")
menu_node = MenuNode(
container_type="page",
container_id=post_id,
label=post.title,
slug=post.slug,
feature_image=post.feature_image,
sort_order=sort_order,
)
session.add(menu_node)
await session.flush()
await attach_child(session, "page", post_id, "menu_node", menu_node.id)
return menu_node
async def update_menu_item(
session: AsyncSession,
item_id: int,
post_id: int | None = None,
sort_order: int | None = None
) -> MenuNode:
"""Update an existing menu node."""
menu_node = await get_menu_item_by_id(session, item_id)
if not menu_node:
raise MenuItemError(f"Menu item {item_id} not found.")
if post_id is not None:
# Verify post exists and is a page
post = await session.scalar(
select(Post).where(Post.id == post_id)
)
if not post:
raise MenuItemError(f"Post {post_id} does not exist.")
if not post.is_page:
raise MenuItemError("Only pages can be added as menu items, not posts.")
# Check for duplicate (same page, different menu node)
existing = await session.scalar(
select(MenuNode).where(
MenuNode.container_type == "page",
MenuNode.container_id == post_id,
MenuNode.id != item_id,
MenuNode.deleted_at.is_(None),
)
)
if existing:
raise MenuItemError("Menu item for this page already exists.")
old_post_id = menu_node.container_id
menu_node.container_id = post_id
menu_node.label = post.title
menu_node.slug = post.slug
menu_node.feature_image = post.feature_image
if sort_order is not None:
menu_node.sort_order = sort_order
await session.flush()
if post_id is not None and post_id != old_post_id:
await detach_child(session, "page", old_post_id, "menu_node", menu_node.id)
await attach_child(session, "page", post_id, "menu_node", menu_node.id)
return menu_node
async def delete_menu_item(session: AsyncSession, item_id: int) -> bool:
"""Soft delete a menu node."""
menu_node = await get_menu_item_by_id(session, item_id)
if not menu_node:
return False
menu_node.deleted_at = func.now()
await session.flush()
await detach_child(session, "page", menu_node.container_id, "menu_node", menu_node.id)
return True
async def reorder_menu_items(
session: AsyncSession,
item_ids: list[int]
) -> list[MenuNode]:
"""
Reorder menu nodes by providing a list of IDs in desired order.
Updates sort_order for each node.
"""
items = []
for index, item_id in enumerate(item_ids):
menu_node = await get_menu_item_by_id(session, item_id)
if menu_node:
menu_node.sort_order = index
items.append(menu_node)
await session.flush()
return items
async def search_pages(
session: AsyncSession,
query: str,
page: int = 1,
per_page: int = 10
) -> tuple[list[Post], int]:
"""
Search for pages (not posts) by title.
Returns (pages, total_count).
"""
filters = [
Post.is_page == True, # noqa: E712
Post.status == "published",
Post.deleted_at.is_(None)
]
if query:
filters.append(Post.title.ilike(f"%{query}%"))
# Get total count
count_result = await session.execute(
select(func.count(Post.id)).where(*filters)
)
total = count_result.scalar() or 0
# Get paginated results
offset = (page - 1) * per_page
result = await session.execute(
select(Post)
.where(*filters)
.order_by(Post.title.asc())
.limit(per_page)
.offset(offset)
)
pages = list(result.scalars().all())
return pages, total