This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
shared/services/calendar_impl.py
giles 70b1c7de10 Domain isolation: typed contracts, service registry, and composable wiring
Add typed service contracts (Protocols + frozen DTOs) in shared/contracts/
for cross-domain communication. Each domain exposes a service interface
(BlogService, CalendarService, MarketService, CartService) backed by SQL
implementations in shared/services/. A singleton registry with has() guards
enables composable startup — apps register their own domain service and
stubs for absent domains.

Absorbs glue layer: navigation, relationships, event handlers (login,
container, order) now live in shared/ with has()-guarded service calls.
Factory gains domain_services_fn parameter for per-app service registration.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 04:29:10 +00:00

307 lines
11 KiB
Python

"""SQL-backed CalendarService implementation.
Queries ``shared.models.calendars.*`` — only this module may write to
calendar-domain tables on behalf of other domains.
"""
from __future__ import annotations
from sqlalchemy import select, update, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from shared.models.calendars import Calendar, CalendarEntry, CalendarEntryPost
from shared.contracts.dtos import CalendarDTO, CalendarEntryDTO
def _cal_to_dto(cal: Calendar) -> CalendarDTO:
return CalendarDTO(
id=cal.id,
container_type=cal.container_type,
container_id=cal.container_id,
name=cal.name,
slug=cal.slug,
description=cal.description,
)
def _entry_to_dto(entry: CalendarEntry) -> CalendarEntryDTO:
cal = getattr(entry, "calendar", None)
return CalendarEntryDTO(
id=entry.id,
calendar_id=entry.calendar_id,
name=entry.name,
start_at=entry.start_at,
state=entry.state,
cost=entry.cost,
end_at=entry.end_at,
user_id=entry.user_id,
session_id=entry.session_id,
order_id=entry.order_id,
slot_id=entry.slot_id,
ticket_price=entry.ticket_price,
ticket_count=entry.ticket_count,
calendar_name=cal.name if cal else None,
calendar_slug=cal.slug if cal else None,
)
class SqlCalendarService:
# -- reads ----------------------------------------------------------------
async def calendars_for_container(
self, session: AsyncSession, container_type: str, container_id: int,
) -> list[CalendarDTO]:
result = await session.execute(
select(Calendar).where(
Calendar.container_type == container_type,
Calendar.container_id == container_id,
Calendar.deleted_at.is_(None),
).order_by(Calendar.name.asc())
)
return [_cal_to_dto(c) for c in result.scalars().all()]
async def pending_entries(
self, session: AsyncSession, *, user_id: int | None, session_id: str | None,
) -> list[CalendarEntryDTO]:
filters = [
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "pending",
]
if user_id is not None:
filters.append(CalendarEntry.user_id == user_id)
elif session_id is not None:
filters.append(CalendarEntry.session_id == session_id)
else:
return []
result = await session.execute(
select(CalendarEntry)
.where(*filters)
.order_by(CalendarEntry.start_at.asc())
.options(selectinload(CalendarEntry.calendar))
)
return [_entry_to_dto(e) for e in result.scalars().all()]
async def entries_for_page(
self, session: AsyncSession, page_id: int, *,
user_id: int | None, session_id: str | None,
) -> list[CalendarEntryDTO]:
cal_ids = select(Calendar.id).where(
Calendar.container_type == "page",
Calendar.container_id == page_id,
Calendar.deleted_at.is_(None),
).scalar_subquery()
filters = [
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "pending",
CalendarEntry.calendar_id.in_(cal_ids),
]
if user_id is not None:
filters.append(CalendarEntry.user_id == user_id)
elif session_id is not None:
filters.append(CalendarEntry.session_id == session_id)
else:
return []
result = await session.execute(
select(CalendarEntry)
.where(*filters)
.order_by(CalendarEntry.start_at.asc())
.options(selectinload(CalendarEntry.calendar))
)
return [_entry_to_dto(e) for e in result.scalars().all()]
async def entry_by_id(self, session: AsyncSession, entry_id: int) -> CalendarEntryDTO | None:
entry = (
await session.execute(
select(CalendarEntry)
.where(CalendarEntry.id == entry_id, CalendarEntry.deleted_at.is_(None))
.options(selectinload(CalendarEntry.calendar))
)
).scalar_one_or_none()
return _entry_to_dto(entry) if entry else None
async def entry_ids_for_content(
self, session: AsyncSession, content_type: str, content_id: int,
) -> set[int]:
"""Get entry IDs associated with a content item (e.g. post)."""
result = await session.execute(
select(CalendarEntryPost.entry_id).where(
CalendarEntryPost.content_type == content_type,
CalendarEntryPost.content_id == content_id,
CalendarEntryPost.deleted_at.is_(None),
)
)
return set(result.scalars().all())
async def associated_entries(
self, session: AsyncSession, content_type: str, content_id: int, page: int,
) -> tuple[list[CalendarEntryDTO], bool]:
"""Get paginated confirmed entries associated with a content item."""
per_page = 10
entry_ids_result = await session.execute(
select(CalendarEntryPost.entry_id).where(
CalendarEntryPost.content_type == content_type,
CalendarEntryPost.content_id == content_id,
CalendarEntryPost.deleted_at.is_(None),
)
)
entry_ids = set(entry_ids_result.scalars().all())
if not entry_ids:
return [], False
offset = (page - 1) * per_page
result = await session.execute(
select(CalendarEntry)
.where(
CalendarEntry.id.in_(entry_ids),
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "confirmed",
)
.order_by(CalendarEntry.start_at.desc())
.limit(per_page)
.offset(offset)
.options(selectinload(CalendarEntry.calendar))
)
entries = result.scalars().all()
has_more = len(entries) == per_page
return [_entry_to_dto(e) for e in entries], has_more
async def toggle_entry_post(
self, session: AsyncSession, entry_id: int, content_type: str, content_id: int,
) -> bool:
"""Toggle association; returns True if now associated, False if removed."""
existing = await session.scalar(
select(CalendarEntryPost).where(
CalendarEntryPost.entry_id == entry_id,
CalendarEntryPost.content_type == content_type,
CalendarEntryPost.content_id == content_id,
CalendarEntryPost.deleted_at.is_(None),
)
)
if existing:
existing.deleted_at = func.now()
await session.flush()
return False
else:
assoc = CalendarEntryPost(
entry_id=entry_id,
content_type=content_type,
content_id=content_id,
)
session.add(assoc)
await session.flush()
return True
async def get_entries_for_order(
self, session: AsyncSession, order_id: int,
) -> list[CalendarEntryDTO]:
result = await session.execute(
select(CalendarEntry)
.where(
CalendarEntry.order_id == order_id,
CalendarEntry.deleted_at.is_(None),
)
.options(selectinload(CalendarEntry.calendar))
)
return [_entry_to_dto(e) for e in result.scalars().all()]
# -- batch reads (not in protocol — convenience for blog service) ---------
async def confirmed_entries_for_posts(
self, session: AsyncSession, post_ids: list[int],
) -> dict[int, list[CalendarEntryDTO]]:
"""Return confirmed entries grouped by post_id for a batch of posts."""
if not post_ids:
return {}
result = await session.execute(
select(CalendarEntry, CalendarEntryPost.content_id)
.join(CalendarEntryPost, CalendarEntry.id == CalendarEntryPost.entry_id)
.options(selectinload(CalendarEntry.calendar))
.where(
CalendarEntryPost.content_type == "post",
CalendarEntryPost.content_id.in_(post_ids),
CalendarEntryPost.deleted_at.is_(None),
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "confirmed",
)
.order_by(CalendarEntry.start_at.asc())
)
entries_by_post: dict[int, list[CalendarEntryDTO]] = {}
for entry, post_id in result:
entries_by_post.setdefault(post_id, []).append(_entry_to_dto(entry))
return entries_by_post
# -- writes (absorb glue lifecycle) ---------------------------------------
async def adopt_entries_for_user(
self, session: AsyncSession, user_id: int, session_id: str,
) -> None:
"""Adopt anonymous calendar entries for a logged-in user."""
await session.execute(
update(CalendarEntry)
.where(CalendarEntry.deleted_at.is_(None), CalendarEntry.user_id == user_id)
.values(deleted_at=func.now())
)
cal_result = await session.execute(
select(CalendarEntry).where(
CalendarEntry.deleted_at.is_(None),
CalendarEntry.session_id == session_id,
)
)
for entry in cal_result.scalars().all():
entry.user_id = user_id
async def claim_entries_for_order(
self, session: AsyncSession, order_id: int, user_id: int | None,
session_id: str | None, page_post_id: int | None,
) -> None:
"""Mark pending CalendarEntries as 'ordered' and set order_id."""
filters = [
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "pending",
]
if user_id is not None:
filters.append(CalendarEntry.user_id == user_id)
elif session_id is not None:
filters.append(CalendarEntry.session_id == session_id)
if page_post_id is not None:
cal_ids = select(Calendar.id).where(
Calendar.container_type == "page",
Calendar.container_id == page_post_id,
Calendar.deleted_at.is_(None),
).scalar_subquery()
filters.append(CalendarEntry.calendar_id.in_(cal_ids))
await session.execute(
update(CalendarEntry)
.where(*filters)
.values(state="ordered", order_id=order_id)
)
async def confirm_entries_for_order(
self, session: AsyncSession, order_id: int, user_id: int | None,
session_id: str | None,
) -> None:
"""Mark ordered CalendarEntries as 'provisional'."""
filters = [
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "ordered",
CalendarEntry.order_id == order_id,
]
if user_id is not None:
filters.append(CalendarEntry.user_id == user_id)
elif session_id is not None:
filters.append(CalendarEntry.session_id == session_id)
await session.execute(
update(CalendarEntry)
.where(*filters)
.values(state="provisional")
)