This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
shared/services/calendar_impl.py
giles e83df2f742 Decoupling audit cleanup: fix protocol gaps, remove dead APIs
- Add search_posts, entry_ids_for_content, visible_entries_for_period
  to protocols and stubs
- Delete internal_api.py and factory cleanup hook (zero callers)
- Convert utils.py to utils/ package with calendar_helpers module
- Remove deleted_at check from calendar_view template (service filters)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-20 10:58:07 +00:00

566 lines
21 KiB
Python

"""SQL-backed CalendarService implementation.
Queries ``shared.models.calendars.*`` — only this module may write to
calendar-domain tables on behalf of other domains.
"""
from __future__ import annotations
from datetime import datetime
from decimal import Decimal
from sqlalchemy import select, update, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from shared.models.calendars import Calendar, CalendarEntry, CalendarEntryPost, Ticket
from shared.contracts.dtos import CalendarDTO, CalendarEntryDTO, TicketDTO
def _cal_to_dto(cal: Calendar) -> CalendarDTO:
return CalendarDTO(
id=cal.id,
container_type=cal.container_type,
container_id=cal.container_id,
name=cal.name,
slug=cal.slug,
description=cal.description,
)
def _entry_to_dto(entry: CalendarEntry) -> CalendarEntryDTO:
cal = getattr(entry, "calendar", None)
return CalendarEntryDTO(
id=entry.id,
calendar_id=entry.calendar_id,
name=entry.name,
start_at=entry.start_at,
state=entry.state,
cost=entry.cost,
end_at=entry.end_at,
user_id=entry.user_id,
session_id=entry.session_id,
order_id=entry.order_id,
slot_id=entry.slot_id,
ticket_price=entry.ticket_price,
ticket_count=entry.ticket_count,
calendar_name=cal.name if cal else None,
calendar_slug=cal.slug if cal else None,
calendar_container_id=cal.container_id if cal else None,
calendar_container_type=cal.container_type if cal else None,
)
def _ticket_to_dto(ticket: Ticket) -> TicketDTO:
entry = getattr(ticket, "entry", None)
tt = getattr(ticket, "ticket_type", None)
cal = getattr(entry, "calendar", None) if entry else None
# Price: ticket type cost if available, else entry ticket_price
price = None
if tt and tt.cost is not None:
price = tt.cost
elif entry and entry.ticket_price is not None:
price = entry.ticket_price
return TicketDTO(
id=ticket.id,
code=ticket.code,
state=ticket.state,
entry_name=entry.name if entry else "",
entry_start_at=entry.start_at if entry else ticket.created_at,
entry_end_at=entry.end_at if entry else None,
ticket_type_name=tt.name if tt else None,
calendar_name=cal.name if cal else None,
created_at=ticket.created_at,
checked_in_at=ticket.checked_in_at,
entry_id=entry.id if entry else None,
price=price,
order_id=ticket.order_id,
calendar_container_id=cal.container_id if cal else None,
)
class SqlCalendarService:
# -- reads ----------------------------------------------------------------
async def calendars_for_container(
self, session: AsyncSession, container_type: str, container_id: int,
) -> list[CalendarDTO]:
result = await session.execute(
select(Calendar).where(
Calendar.container_type == container_type,
Calendar.container_id == container_id,
Calendar.deleted_at.is_(None),
).order_by(Calendar.name.asc())
)
return [_cal_to_dto(c) for c in result.scalars().all()]
async def pending_entries(
self, session: AsyncSession, *, user_id: int | None, session_id: str | None,
) -> list[CalendarEntryDTO]:
filters = [
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "pending",
]
if user_id is not None:
filters.append(CalendarEntry.user_id == user_id)
elif session_id is not None:
filters.append(CalendarEntry.session_id == session_id)
else:
return []
result = await session.execute(
select(CalendarEntry)
.where(*filters)
.order_by(CalendarEntry.start_at.asc())
.options(selectinload(CalendarEntry.calendar))
)
return [_entry_to_dto(e) for e in result.scalars().all()]
async def entries_for_page(
self, session: AsyncSession, page_id: int, *,
user_id: int | None, session_id: str | None,
) -> list[CalendarEntryDTO]:
cal_ids = select(Calendar.id).where(
Calendar.container_type == "page",
Calendar.container_id == page_id,
Calendar.deleted_at.is_(None),
).scalar_subquery()
filters = [
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "pending",
CalendarEntry.calendar_id.in_(cal_ids),
]
if user_id is not None:
filters.append(CalendarEntry.user_id == user_id)
elif session_id is not None:
filters.append(CalendarEntry.session_id == session_id)
else:
return []
result = await session.execute(
select(CalendarEntry)
.where(*filters)
.order_by(CalendarEntry.start_at.asc())
.options(selectinload(CalendarEntry.calendar))
)
return [_entry_to_dto(e) for e in result.scalars().all()]
async def entry_by_id(self, session: AsyncSession, entry_id: int) -> CalendarEntryDTO | None:
entry = (
await session.execute(
select(CalendarEntry)
.where(CalendarEntry.id == entry_id, CalendarEntry.deleted_at.is_(None))
.options(selectinload(CalendarEntry.calendar))
)
).scalar_one_or_none()
return _entry_to_dto(entry) if entry else None
async def entry_ids_for_content(
self, session: AsyncSession, content_type: str, content_id: int,
) -> set[int]:
"""Get entry IDs associated with a content item (e.g. post)."""
result = await session.execute(
select(CalendarEntryPost.entry_id).where(
CalendarEntryPost.content_type == content_type,
CalendarEntryPost.content_id == content_id,
CalendarEntryPost.deleted_at.is_(None),
)
)
return set(result.scalars().all())
async def visible_entries_for_period(
self, session: AsyncSession, calendar_id: int,
period_start: datetime, period_end: datetime,
*, user_id: int | None, is_admin: bool, session_id: str | None,
) -> list[CalendarEntryDTO]:
"""Return visible entries for a calendar in a date range.
Visibility rules:
- Everyone sees confirmed entries.
- Current user/session sees their own entries (any state).
- Admins also see ordered + provisional entries for all users.
"""
# User/session entries (any state)
user_entries: list[CalendarEntry] = []
if user_id or session_id:
conditions = [
CalendarEntry.calendar_id == calendar_id,
CalendarEntry.deleted_at.is_(None),
CalendarEntry.start_at >= period_start,
CalendarEntry.start_at < period_end,
]
if user_id:
conditions.append(CalendarEntry.user_id == user_id)
elif session_id:
conditions.append(CalendarEntry.session_id == session_id)
result = await session.execute(
select(CalendarEntry).where(*conditions)
.options(selectinload(CalendarEntry.calendar))
)
user_entries = list(result.scalars().all())
# Confirmed entries for everyone
result = await session.execute(
select(CalendarEntry).where(
CalendarEntry.calendar_id == calendar_id,
CalendarEntry.state == "confirmed",
CalendarEntry.deleted_at.is_(None),
CalendarEntry.start_at >= period_start,
CalendarEntry.start_at < period_end,
).options(selectinload(CalendarEntry.calendar))
)
confirmed_entries = list(result.scalars().all())
# Admin: ordered + provisional for everyone
admin_entries: list[CalendarEntry] = []
if is_admin:
result = await session.execute(
select(CalendarEntry).where(
CalendarEntry.calendar_id == calendar_id,
CalendarEntry.state.in_(("ordered", "provisional")),
CalendarEntry.deleted_at.is_(None),
CalendarEntry.start_at >= period_start,
CalendarEntry.start_at < period_end,
).options(selectinload(CalendarEntry.calendar))
)
admin_entries = list(result.scalars().all())
# Merge, deduplicate, sort
entries_by_id: dict[int, CalendarEntry] = {}
for e in confirmed_entries:
entries_by_id[e.id] = e
for e in admin_entries:
entries_by_id[e.id] = e
for e in user_entries:
entries_by_id[e.id] = e
merged = sorted(entries_by_id.values(), key=lambda e: e.start_at or period_start)
return [_entry_to_dto(e) for e in merged]
async def associated_entries(
self, session: AsyncSession, content_type: str, content_id: int, page: int,
) -> tuple[list[CalendarEntryDTO], bool]:
"""Get paginated confirmed entries associated with a content item."""
per_page = 10
entry_ids_result = await session.execute(
select(CalendarEntryPost.entry_id).where(
CalendarEntryPost.content_type == content_type,
CalendarEntryPost.content_id == content_id,
CalendarEntryPost.deleted_at.is_(None),
)
)
entry_ids = set(entry_ids_result.scalars().all())
if not entry_ids:
return [], False
offset = (page - 1) * per_page
result = await session.execute(
select(CalendarEntry)
.where(
CalendarEntry.id.in_(entry_ids),
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "confirmed",
)
.order_by(CalendarEntry.start_at.desc())
.limit(per_page)
.offset(offset)
.options(selectinload(CalendarEntry.calendar))
)
entries = result.scalars().all()
has_more = len(entries) == per_page
return [_entry_to_dto(e) for e in entries], has_more
async def toggle_entry_post(
self, session: AsyncSession, entry_id: int, content_type: str, content_id: int,
) -> bool:
"""Toggle association; returns True if now associated, False if removed."""
existing = await session.scalar(
select(CalendarEntryPost).where(
CalendarEntryPost.entry_id == entry_id,
CalendarEntryPost.content_type == content_type,
CalendarEntryPost.content_id == content_id,
CalendarEntryPost.deleted_at.is_(None),
)
)
if existing:
existing.deleted_at = func.now()
await session.flush()
return False
else:
assoc = CalendarEntryPost(
entry_id=entry_id,
content_type=content_type,
content_id=content_id,
)
session.add(assoc)
await session.flush()
return True
async def get_entries_for_order(
self, session: AsyncSession, order_id: int,
) -> list[CalendarEntryDTO]:
result = await session.execute(
select(CalendarEntry)
.where(
CalendarEntry.order_id == order_id,
CalendarEntry.deleted_at.is_(None),
)
.options(selectinload(CalendarEntry.calendar))
)
return [_entry_to_dto(e) for e in result.scalars().all()]
async def user_tickets(
self, session: AsyncSession, *, user_id: int,
) -> list[TicketDTO]:
result = await session.execute(
select(Ticket)
.where(
Ticket.user_id == user_id,
Ticket.state != "cancelled",
)
.order_by(Ticket.created_at.desc())
.options(
selectinload(Ticket.entry).selectinload(CalendarEntry.calendar),
selectinload(Ticket.ticket_type),
)
)
return [_ticket_to_dto(t) for t in result.scalars().all()]
async def user_bookings(
self, session: AsyncSession, *, user_id: int,
) -> list[CalendarEntryDTO]:
result = await session.execute(
select(CalendarEntry)
.where(
CalendarEntry.user_id == user_id,
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state.in_(("ordered", "provisional", "confirmed")),
)
.order_by(CalendarEntry.start_at.desc())
.options(selectinload(CalendarEntry.calendar))
)
return [_entry_to_dto(e) for e in result.scalars().all()]
# -- batch reads (not in protocol — convenience for blog service) ---------
async def confirmed_entries_for_posts(
self, session: AsyncSession, post_ids: list[int],
) -> dict[int, list[CalendarEntryDTO]]:
"""Return confirmed entries grouped by post_id for a batch of posts."""
if not post_ids:
return {}
result = await session.execute(
select(CalendarEntry, CalendarEntryPost.content_id)
.join(CalendarEntryPost, CalendarEntry.id == CalendarEntryPost.entry_id)
.options(selectinload(CalendarEntry.calendar))
.where(
CalendarEntryPost.content_type == "post",
CalendarEntryPost.content_id.in_(post_ids),
CalendarEntryPost.deleted_at.is_(None),
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "confirmed",
)
.order_by(CalendarEntry.start_at.asc())
)
entries_by_post: dict[int, list[CalendarEntryDTO]] = {}
for entry, post_id in result:
entries_by_post.setdefault(post_id, []).append(_entry_to_dto(entry))
return entries_by_post
# -- writes (absorb glue lifecycle) ---------------------------------------
async def adopt_entries_for_user(
self, session: AsyncSession, user_id: int, session_id: str,
) -> None:
"""Adopt anonymous calendar entries for a logged-in user."""
await session.execute(
update(CalendarEntry)
.where(CalendarEntry.deleted_at.is_(None), CalendarEntry.user_id == user_id)
.values(deleted_at=func.now())
)
cal_result = await session.execute(
select(CalendarEntry).where(
CalendarEntry.deleted_at.is_(None),
CalendarEntry.session_id == session_id,
)
)
for entry in cal_result.scalars().all():
entry.user_id = user_id
async def claim_entries_for_order(
self, session: AsyncSession, order_id: int, user_id: int | None,
session_id: str | None, page_post_id: int | None,
) -> None:
"""Mark pending CalendarEntries as 'ordered' and set order_id."""
filters = [
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "pending",
]
if user_id is not None:
filters.append(CalendarEntry.user_id == user_id)
elif session_id is not None:
filters.append(CalendarEntry.session_id == session_id)
if page_post_id is not None:
cal_ids = select(Calendar.id).where(
Calendar.container_type == "page",
Calendar.container_id == page_post_id,
Calendar.deleted_at.is_(None),
).scalar_subquery()
filters.append(CalendarEntry.calendar_id.in_(cal_ids))
await session.execute(
update(CalendarEntry)
.where(*filters)
.values(state="ordered", order_id=order_id)
)
async def confirm_entries_for_order(
self, session: AsyncSession, order_id: int, user_id: int | None,
session_id: str | None,
) -> None:
"""Mark ordered CalendarEntries as 'provisional'."""
filters = [
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "ordered",
CalendarEntry.order_id == order_id,
]
if user_id is not None:
filters.append(CalendarEntry.user_id == user_id)
elif session_id is not None:
filters.append(CalendarEntry.session_id == session_id)
await session.execute(
update(CalendarEntry)
.where(*filters)
.values(state="provisional")
)
# -- ticket methods -------------------------------------------------------
def _ticket_query_options(self):
return [
selectinload(Ticket.entry).selectinload(CalendarEntry.calendar),
selectinload(Ticket.ticket_type),
]
async def pending_tickets(
self, session: AsyncSession, *, user_id: int | None, session_id: str | None,
) -> list[TicketDTO]:
"""Reserved tickets for the given identity (cart line items)."""
filters = [Ticket.state == "reserved"]
if user_id is not None:
filters.append(Ticket.user_id == user_id)
elif session_id is not None:
filters.append(Ticket.session_id == session_id)
else:
return []
result = await session.execute(
select(Ticket)
.where(*filters)
.order_by(Ticket.created_at.asc())
.options(*self._ticket_query_options())
)
return [_ticket_to_dto(t) for t in result.scalars().all()]
async def tickets_for_page(
self, session: AsyncSession, page_id: int, *,
user_id: int | None, session_id: str | None,
) -> list[TicketDTO]:
"""Reserved tickets scoped to a page (via entry → calendar → container_id)."""
cal_ids = select(Calendar.id).where(
Calendar.container_type == "page",
Calendar.container_id == page_id,
Calendar.deleted_at.is_(None),
).scalar_subquery()
entry_ids = select(CalendarEntry.id).where(
CalendarEntry.calendar_id.in_(cal_ids),
CalendarEntry.deleted_at.is_(None),
).scalar_subquery()
filters = [
Ticket.state == "reserved",
Ticket.entry_id.in_(entry_ids),
]
if user_id is not None:
filters.append(Ticket.user_id == user_id)
elif session_id is not None:
filters.append(Ticket.session_id == session_id)
else:
return []
result = await session.execute(
select(Ticket)
.where(*filters)
.order_by(Ticket.created_at.asc())
.options(*self._ticket_query_options())
)
return [_ticket_to_dto(t) for t in result.scalars().all()]
async def claim_tickets_for_order(
self, session: AsyncSession, order_id: int, user_id: int | None,
session_id: str | None, page_post_id: int | None,
) -> None:
"""Set order_id on reserved tickets at checkout."""
filters = [Ticket.state == "reserved"]
if user_id is not None:
filters.append(Ticket.user_id == user_id)
elif session_id is not None:
filters.append(Ticket.session_id == session_id)
if page_post_id is not None:
cal_ids = select(Calendar.id).where(
Calendar.container_type == "page",
Calendar.container_id == page_post_id,
Calendar.deleted_at.is_(None),
).scalar_subquery()
entry_ids = select(CalendarEntry.id).where(
CalendarEntry.calendar_id.in_(cal_ids),
CalendarEntry.deleted_at.is_(None),
).scalar_subquery()
filters.append(Ticket.entry_id.in_(entry_ids))
await session.execute(
update(Ticket).where(*filters).values(order_id=order_id)
)
async def confirm_tickets_for_order(
self, session: AsyncSession, order_id: int,
) -> None:
"""Reserved → confirmed on payment."""
await session.execute(
update(Ticket)
.where(Ticket.order_id == order_id, Ticket.state == "reserved")
.values(state="confirmed")
)
async def get_tickets_for_order(
self, session: AsyncSession, order_id: int,
) -> list[TicketDTO]:
"""Tickets for a given order (checkout return display)."""
result = await session.execute(
select(Ticket)
.where(Ticket.order_id == order_id)
.order_by(Ticket.created_at.asc())
.options(*self._ticket_query_options())
)
return [_ticket_to_dto(t) for t in result.scalars().all()]
async def adopt_tickets_for_user(
self, session: AsyncSession, user_id: int, session_id: str,
) -> None:
"""Migrate anonymous reserved tickets to user on login."""
result = await session.execute(
select(Ticket).where(
Ticket.session_id == session_id,
Ticket.state == "reserved",
)
)
for ticket in result.scalars().all():
ticket.user_id = user_id