This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
shared/services/calendar_impl.py
giles dfc324b1be Add tickets & bookings to account page
Add TicketDTO, user_tickets/user_bookings to CalendarService protocol
and SqlCalendarService implementation, plus nav links and panel
templates for the auth account sub-pages.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 16:06:21 +00:00

359 lines
13 KiB
Python

"""SQL-backed CalendarService implementation.
Queries ``shared.models.calendars.*`` — only this module may write to
calendar-domain tables on behalf of other domains.
"""
from __future__ import annotations
from sqlalchemy import select, update, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from shared.models.calendars import Calendar, CalendarEntry, CalendarEntryPost, Ticket
from shared.contracts.dtos import CalendarDTO, CalendarEntryDTO, TicketDTO
def _cal_to_dto(cal: Calendar) -> CalendarDTO:
return CalendarDTO(
id=cal.id,
container_type=cal.container_type,
container_id=cal.container_id,
name=cal.name,
slug=cal.slug,
description=cal.description,
)
def _entry_to_dto(entry: CalendarEntry) -> CalendarEntryDTO:
cal = getattr(entry, "calendar", None)
return CalendarEntryDTO(
id=entry.id,
calendar_id=entry.calendar_id,
name=entry.name,
start_at=entry.start_at,
state=entry.state,
cost=entry.cost,
end_at=entry.end_at,
user_id=entry.user_id,
session_id=entry.session_id,
order_id=entry.order_id,
slot_id=entry.slot_id,
ticket_price=entry.ticket_price,
ticket_count=entry.ticket_count,
calendar_name=cal.name if cal else None,
calendar_slug=cal.slug if cal else None,
calendar_container_id=cal.container_id if cal else None,
calendar_container_type=cal.container_type if cal else None,
)
def _ticket_to_dto(ticket: Ticket) -> TicketDTO:
entry = getattr(ticket, "entry", None)
tt = getattr(ticket, "ticket_type", None)
cal = getattr(entry, "calendar", None) if entry else None
return TicketDTO(
id=ticket.id,
code=ticket.code,
state=ticket.state,
entry_name=entry.name if entry else "",
entry_start_at=entry.start_at if entry else ticket.created_at,
entry_end_at=entry.end_at if entry else None,
ticket_type_name=tt.name if tt else None,
calendar_name=cal.name if cal else None,
created_at=ticket.created_at,
checked_in_at=ticket.checked_in_at,
)
class SqlCalendarService:
# -- reads ----------------------------------------------------------------
async def calendars_for_container(
self, session: AsyncSession, container_type: str, container_id: int,
) -> list[CalendarDTO]:
result = await session.execute(
select(Calendar).where(
Calendar.container_type == container_type,
Calendar.container_id == container_id,
Calendar.deleted_at.is_(None),
).order_by(Calendar.name.asc())
)
return [_cal_to_dto(c) for c in result.scalars().all()]
async def pending_entries(
self, session: AsyncSession, *, user_id: int | None, session_id: str | None,
) -> list[CalendarEntryDTO]:
filters = [
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "pending",
]
if user_id is not None:
filters.append(CalendarEntry.user_id == user_id)
elif session_id is not None:
filters.append(CalendarEntry.session_id == session_id)
else:
return []
result = await session.execute(
select(CalendarEntry)
.where(*filters)
.order_by(CalendarEntry.start_at.asc())
.options(selectinload(CalendarEntry.calendar))
)
return [_entry_to_dto(e) for e in result.scalars().all()]
async def entries_for_page(
self, session: AsyncSession, page_id: int, *,
user_id: int | None, session_id: str | None,
) -> list[CalendarEntryDTO]:
cal_ids = select(Calendar.id).where(
Calendar.container_type == "page",
Calendar.container_id == page_id,
Calendar.deleted_at.is_(None),
).scalar_subquery()
filters = [
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "pending",
CalendarEntry.calendar_id.in_(cal_ids),
]
if user_id is not None:
filters.append(CalendarEntry.user_id == user_id)
elif session_id is not None:
filters.append(CalendarEntry.session_id == session_id)
else:
return []
result = await session.execute(
select(CalendarEntry)
.where(*filters)
.order_by(CalendarEntry.start_at.asc())
.options(selectinload(CalendarEntry.calendar))
)
return [_entry_to_dto(e) for e in result.scalars().all()]
async def entry_by_id(self, session: AsyncSession, entry_id: int) -> CalendarEntryDTO | None:
entry = (
await session.execute(
select(CalendarEntry)
.where(CalendarEntry.id == entry_id, CalendarEntry.deleted_at.is_(None))
.options(selectinload(CalendarEntry.calendar))
)
).scalar_one_or_none()
return _entry_to_dto(entry) if entry else None
async def entry_ids_for_content(
self, session: AsyncSession, content_type: str, content_id: int,
) -> set[int]:
"""Get entry IDs associated with a content item (e.g. post)."""
result = await session.execute(
select(CalendarEntryPost.entry_id).where(
CalendarEntryPost.content_type == content_type,
CalendarEntryPost.content_id == content_id,
CalendarEntryPost.deleted_at.is_(None),
)
)
return set(result.scalars().all())
async def associated_entries(
self, session: AsyncSession, content_type: str, content_id: int, page: int,
) -> tuple[list[CalendarEntryDTO], bool]:
"""Get paginated confirmed entries associated with a content item."""
per_page = 10
entry_ids_result = await session.execute(
select(CalendarEntryPost.entry_id).where(
CalendarEntryPost.content_type == content_type,
CalendarEntryPost.content_id == content_id,
CalendarEntryPost.deleted_at.is_(None),
)
)
entry_ids = set(entry_ids_result.scalars().all())
if not entry_ids:
return [], False
offset = (page - 1) * per_page
result = await session.execute(
select(CalendarEntry)
.where(
CalendarEntry.id.in_(entry_ids),
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "confirmed",
)
.order_by(CalendarEntry.start_at.desc())
.limit(per_page)
.offset(offset)
.options(selectinload(CalendarEntry.calendar))
)
entries = result.scalars().all()
has_more = len(entries) == per_page
return [_entry_to_dto(e) for e in entries], has_more
async def toggle_entry_post(
self, session: AsyncSession, entry_id: int, content_type: str, content_id: int,
) -> bool:
"""Toggle association; returns True if now associated, False if removed."""
existing = await session.scalar(
select(CalendarEntryPost).where(
CalendarEntryPost.entry_id == entry_id,
CalendarEntryPost.content_type == content_type,
CalendarEntryPost.content_id == content_id,
CalendarEntryPost.deleted_at.is_(None),
)
)
if existing:
existing.deleted_at = func.now()
await session.flush()
return False
else:
assoc = CalendarEntryPost(
entry_id=entry_id,
content_type=content_type,
content_id=content_id,
)
session.add(assoc)
await session.flush()
return True
async def get_entries_for_order(
self, session: AsyncSession, order_id: int,
) -> list[CalendarEntryDTO]:
result = await session.execute(
select(CalendarEntry)
.where(
CalendarEntry.order_id == order_id,
CalendarEntry.deleted_at.is_(None),
)
.options(selectinload(CalendarEntry.calendar))
)
return [_entry_to_dto(e) for e in result.scalars().all()]
async def user_tickets(
self, session: AsyncSession, *, user_id: int,
) -> list[TicketDTO]:
result = await session.execute(
select(Ticket)
.where(
Ticket.user_id == user_id,
Ticket.state != "cancelled",
)
.order_by(Ticket.created_at.desc())
.options(
selectinload(Ticket.entry).selectinload(CalendarEntry.calendar),
selectinload(Ticket.ticket_type),
)
)
return [_ticket_to_dto(t) for t in result.scalars().all()]
async def user_bookings(
self, session: AsyncSession, *, user_id: int,
) -> list[CalendarEntryDTO]:
result = await session.execute(
select(CalendarEntry)
.where(
CalendarEntry.user_id == user_id,
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state.in_(("ordered", "provisional", "confirmed")),
)
.order_by(CalendarEntry.start_at.desc())
.options(selectinload(CalendarEntry.calendar))
)
return [_entry_to_dto(e) for e in result.scalars().all()]
# -- batch reads (not in protocol — convenience for blog service) ---------
async def confirmed_entries_for_posts(
self, session: AsyncSession, post_ids: list[int],
) -> dict[int, list[CalendarEntryDTO]]:
"""Return confirmed entries grouped by post_id for a batch of posts."""
if not post_ids:
return {}
result = await session.execute(
select(CalendarEntry, CalendarEntryPost.content_id)
.join(CalendarEntryPost, CalendarEntry.id == CalendarEntryPost.entry_id)
.options(selectinload(CalendarEntry.calendar))
.where(
CalendarEntryPost.content_type == "post",
CalendarEntryPost.content_id.in_(post_ids),
CalendarEntryPost.deleted_at.is_(None),
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "confirmed",
)
.order_by(CalendarEntry.start_at.asc())
)
entries_by_post: dict[int, list[CalendarEntryDTO]] = {}
for entry, post_id in result:
entries_by_post.setdefault(post_id, []).append(_entry_to_dto(entry))
return entries_by_post
# -- writes (absorb glue lifecycle) ---------------------------------------
async def adopt_entries_for_user(
self, session: AsyncSession, user_id: int, session_id: str,
) -> None:
"""Adopt anonymous calendar entries for a logged-in user."""
await session.execute(
update(CalendarEntry)
.where(CalendarEntry.deleted_at.is_(None), CalendarEntry.user_id == user_id)
.values(deleted_at=func.now())
)
cal_result = await session.execute(
select(CalendarEntry).where(
CalendarEntry.deleted_at.is_(None),
CalendarEntry.session_id == session_id,
)
)
for entry in cal_result.scalars().all():
entry.user_id = user_id
async def claim_entries_for_order(
self, session: AsyncSession, order_id: int, user_id: int | None,
session_id: str | None, page_post_id: int | None,
) -> None:
"""Mark pending CalendarEntries as 'ordered' and set order_id."""
filters = [
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "pending",
]
if user_id is not None:
filters.append(CalendarEntry.user_id == user_id)
elif session_id is not None:
filters.append(CalendarEntry.session_id == session_id)
if page_post_id is not None:
cal_ids = select(Calendar.id).where(
Calendar.container_type == "page",
Calendar.container_id == page_post_id,
Calendar.deleted_at.is_(None),
).scalar_subquery()
filters.append(CalendarEntry.calendar_id.in_(cal_ids))
await session.execute(
update(CalendarEntry)
.where(*filters)
.values(state="ordered", order_id=order_id)
)
async def confirm_entries_for_order(
self, session: AsyncSession, order_id: int, user_id: int | None,
session_id: str | None,
) -> None:
"""Mark ordered CalendarEntries as 'provisional'."""
filters = [
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "ordered",
CalendarEntry.order_id == order_id,
]
if user_id is not None:
filters.append(CalendarEntry.user_id == user_id)
elif session_id is not None:
filters.append(CalendarEntry.session_id == session_id)
await session.execute(
update(CalendarEntry)
.where(*filters)
.values(state="provisional")
)