Files
rose-ash/events/bp/calendar_entry/services/post_associations.py
giles f42042ccb7
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m5s
Monorepo: consolidate 7 repos into one
Combines shared, blog, market, cart, events, federation, and account
into a single repository. Eliminates submodule sync, sibling model
copying at build time, and per-app CI orchestration.

Changes:
- Remove per-app .git, .gitmodules, .gitea, submodule shared/ dirs
- Remove stale sibling model copies from each app
- Update all 6 Dockerfiles for monorepo build context (root = .)
- Add build directives to docker-compose.yml
- Add single .gitea/workflows/ci.yml with change detection
- Add .dockerignore for monorepo build context
- Create __init__.py for federation and account (cross-app imports)
2026-02-24 19:44:17 +00:00

122 lines
3.3 KiB
Python

from __future__ import annotations
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.sql import func
from models.calendars import CalendarEntry, CalendarEntryPost
from shared.services.registry import services
async def add_post_to_entry(
session: AsyncSession,
entry_id: int,
post_id: int
) -> tuple[bool, str | None]:
"""
Associate a post with a calendar entry.
Returns (success, error_message).
"""
# Check if entry exists
entry = await session.scalar(
select(CalendarEntry).where(
CalendarEntry.id == entry_id,
CalendarEntry.deleted_at.is_(None)
)
)
if not entry:
return False, "Calendar entry not found"
# Check if post exists
post = await services.blog.get_post_by_id(session, post_id)
if not post:
return False, "Post not found"
# Check if association already exists
existing = await session.scalar(
select(CalendarEntryPost).where(
CalendarEntryPost.entry_id == entry_id,
CalendarEntryPost.content_type == "post",
CalendarEntryPost.content_id == post_id,
CalendarEntryPost.deleted_at.is_(None)
)
)
if existing:
return False, "Post is already associated with this entry"
# Create association
association = CalendarEntryPost(
entry_id=entry_id,
content_type="post",
content_id=post_id
)
session.add(association)
await session.flush()
return True, None
async def remove_post_from_entry(
session: AsyncSession,
entry_id: int,
post_id: int
) -> tuple[bool, str | None]:
"""
Remove a post association from a calendar entry (soft delete).
Returns (success, error_message).
"""
# Find the association
association = await session.scalar(
select(CalendarEntryPost).where(
CalendarEntryPost.entry_id == entry_id,
CalendarEntryPost.content_type == "post",
CalendarEntryPost.content_id == post_id,
CalendarEntryPost.deleted_at.is_(None)
)
)
if not association:
return False, "Association not found"
# Soft delete
association.deleted_at = func.now()
await session.flush()
return True, None
async def get_entry_posts(
session: AsyncSession,
entry_id: int
) -> list:
"""
Get all posts (as PostDTOs) associated with a calendar entry.
"""
result = await session.execute(
select(CalendarEntryPost.content_id).where(
CalendarEntryPost.entry_id == entry_id,
CalendarEntryPost.content_type == "post",
CalendarEntryPost.deleted_at.is_(None),
)
)
post_ids = list(result.scalars().all())
if not post_ids:
return []
posts = await services.blog.get_posts_by_ids(session, post_ids)
return sorted(posts, key=lambda p: (p.title or ""))
async def search_posts(
session: AsyncSession,
query: str,
page: int = 1,
per_page: int = 10
) -> tuple[list, int]:
"""
Search for posts by title with pagination.
If query is empty, returns all posts in published order.
Returns (post_dtos, total_count).
"""
return await services.blog.search_posts(session, query, page, per_page)