"""SQL-backed CalendarService implementation. Queries ``shared.models.calendars.*`` — only this module may write to calendar-domain tables on behalf of other domains. """ from __future__ import annotations from datetime import datetime from decimal import Decimal from sqlalchemy import select, update, func from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from shared.models.calendars import Calendar, CalendarEntry, CalendarEntryPost, Ticket from shared.contracts.dtos import CalendarDTO, CalendarEntryDTO, TicketDTO def _cal_to_dto(cal: Calendar) -> CalendarDTO: return CalendarDTO( id=cal.id, container_type=cal.container_type, container_id=cal.container_id, name=cal.name, slug=cal.slug, description=cal.description, ) def _entry_to_dto(entry: CalendarEntry) -> CalendarEntryDTO: cal = getattr(entry, "calendar", None) return CalendarEntryDTO( id=entry.id, calendar_id=entry.calendar_id, name=entry.name, start_at=entry.start_at, state=entry.state, cost=entry.cost, end_at=entry.end_at, user_id=entry.user_id, session_id=entry.session_id, order_id=entry.order_id, slot_id=entry.slot_id, ticket_price=entry.ticket_price, ticket_count=entry.ticket_count, calendar_name=cal.name if cal else None, calendar_slug=cal.slug if cal else None, calendar_container_id=cal.container_id if cal else None, calendar_container_type=cal.container_type if cal else None, ) def _ticket_to_dto(ticket: Ticket) -> TicketDTO: entry = getattr(ticket, "entry", None) tt = getattr(ticket, "ticket_type", None) cal = getattr(entry, "calendar", None) if entry else None # Price: ticket type cost if available, else entry ticket_price price = None if tt and tt.cost is not None: price = tt.cost elif entry and entry.ticket_price is not None: price = entry.ticket_price return TicketDTO( id=ticket.id, code=ticket.code, state=ticket.state, entry_name=entry.name if entry else "", entry_start_at=entry.start_at if entry else ticket.created_at, entry_end_at=entry.end_at if entry else None, ticket_type_name=tt.name if tt else None, calendar_name=cal.name if cal else None, created_at=ticket.created_at, checked_in_at=ticket.checked_in_at, entry_id=entry.id if entry else None, ticket_type_id=ticket.ticket_type_id, price=price, order_id=ticket.order_id, calendar_container_id=cal.container_id if cal else None, ) class SqlCalendarService: # -- reads ---------------------------------------------------------------- async def calendars_for_container( self, session: AsyncSession, container_type: str, container_id: int, ) -> list[CalendarDTO]: result = await session.execute( select(Calendar).where( Calendar.container_type == container_type, Calendar.container_id == container_id, Calendar.deleted_at.is_(None), ).order_by(Calendar.name.asc()) ) return [_cal_to_dto(c) for c in result.scalars().all()] async def pending_entries( self, session: AsyncSession, *, user_id: int | None, session_id: str | None, ) -> list[CalendarEntryDTO]: filters = [ CalendarEntry.deleted_at.is_(None), CalendarEntry.state == "pending", ] if user_id is not None: filters.append(CalendarEntry.user_id == user_id) elif session_id is not None: filters.append(CalendarEntry.session_id == session_id) else: return [] result = await session.execute( select(CalendarEntry) .where(*filters) .order_by(CalendarEntry.start_at.asc()) .options(selectinload(CalendarEntry.calendar)) ) return [_entry_to_dto(e) for e in result.scalars().all()] async def entries_for_page( self, session: AsyncSession, page_id: int, *, user_id: int | None, session_id: str | None, ) -> list[CalendarEntryDTO]: cal_ids = select(Calendar.id).where( Calendar.container_type == "page", Calendar.container_id == page_id, Calendar.deleted_at.is_(None), ).scalar_subquery() filters = [ CalendarEntry.deleted_at.is_(None), CalendarEntry.state == "pending", CalendarEntry.calendar_id.in_(cal_ids), ] if user_id is not None: filters.append(CalendarEntry.user_id == user_id) elif session_id is not None: filters.append(CalendarEntry.session_id == session_id) else: return [] result = await session.execute( select(CalendarEntry) .where(*filters) .order_by(CalendarEntry.start_at.asc()) .options(selectinload(CalendarEntry.calendar)) ) return [_entry_to_dto(e) for e in result.scalars().all()] async def entry_by_id(self, session: AsyncSession, entry_id: int) -> CalendarEntryDTO | None: entry = ( await session.execute( select(CalendarEntry) .where(CalendarEntry.id == entry_id, CalendarEntry.deleted_at.is_(None)) .options(selectinload(CalendarEntry.calendar)) ) ).scalar_one_or_none() return _entry_to_dto(entry) if entry else None async def entry_ids_for_content( self, session: AsyncSession, content_type: str, content_id: int, ) -> set[int]: """Get entry IDs associated with a content item (e.g. post).""" result = await session.execute( select(CalendarEntryPost.entry_id).where( CalendarEntryPost.content_type == content_type, CalendarEntryPost.content_id == content_id, CalendarEntryPost.deleted_at.is_(None), ) ) return set(result.scalars().all()) async def visible_entries_for_period( self, session: AsyncSession, calendar_id: int, period_start: datetime, period_end: datetime, *, user_id: int | None, is_admin: bool, session_id: str | None, ) -> list[CalendarEntryDTO]: """Return visible entries for a calendar in a date range. Visibility rules: - Everyone sees confirmed entries. - Current user/session sees their own entries (any state). - Admins also see ordered + provisional entries for all users. """ # User/session entries (any state) user_entries: list[CalendarEntry] = [] if user_id or session_id: conditions = [ CalendarEntry.calendar_id == calendar_id, CalendarEntry.deleted_at.is_(None), CalendarEntry.start_at >= period_start, CalendarEntry.start_at < period_end, ] if user_id: conditions.append(CalendarEntry.user_id == user_id) elif session_id: conditions.append(CalendarEntry.session_id == session_id) result = await session.execute( select(CalendarEntry).where(*conditions) .options(selectinload(CalendarEntry.calendar)) ) user_entries = list(result.scalars().all()) # Confirmed entries for everyone result = await session.execute( select(CalendarEntry).where( CalendarEntry.calendar_id == calendar_id, CalendarEntry.state == "confirmed", CalendarEntry.deleted_at.is_(None), CalendarEntry.start_at >= period_start, CalendarEntry.start_at < period_end, ).options(selectinload(CalendarEntry.calendar)) ) confirmed_entries = list(result.scalars().all()) # Admin: ordered + provisional for everyone admin_entries: list[CalendarEntry] = [] if is_admin: result = await session.execute( select(CalendarEntry).where( CalendarEntry.calendar_id == calendar_id, CalendarEntry.state.in_(("ordered", "provisional")), CalendarEntry.deleted_at.is_(None), CalendarEntry.start_at >= period_start, CalendarEntry.start_at < period_end, ).options(selectinload(CalendarEntry.calendar)) ) admin_entries = list(result.scalars().all()) # Merge, deduplicate, sort entries_by_id: dict[int, CalendarEntry] = {} for e in confirmed_entries: entries_by_id[e.id] = e for e in admin_entries: entries_by_id[e.id] = e for e in user_entries: entries_by_id[e.id] = e merged = sorted(entries_by_id.values(), key=lambda e: e.start_at or period_start) return [_entry_to_dto(e) for e in merged] async def associated_entries( self, session: AsyncSession, content_type: str, content_id: int, page: int, ) -> tuple[list[CalendarEntryDTO], bool]: """Get paginated confirmed entries associated with a content item.""" per_page = 10 entry_ids_result = await session.execute( select(CalendarEntryPost.entry_id).where( CalendarEntryPost.content_type == content_type, CalendarEntryPost.content_id == content_id, CalendarEntryPost.deleted_at.is_(None), ) ) entry_ids = set(entry_ids_result.scalars().all()) if not entry_ids: return [], False offset = (page - 1) * per_page result = await session.execute( select(CalendarEntry) .where( CalendarEntry.id.in_(entry_ids), CalendarEntry.deleted_at.is_(None), CalendarEntry.state == "confirmed", ) .order_by(CalendarEntry.start_at.desc()) .limit(per_page) .offset(offset) .options(selectinload(CalendarEntry.calendar)) ) entries = result.scalars().all() has_more = len(entries) == per_page return [_entry_to_dto(e) for e in entries], has_more async def toggle_entry_post( self, session: AsyncSession, entry_id: int, content_type: str, content_id: int, ) -> bool: """Toggle association; returns True if now associated, False if removed.""" existing = await session.scalar( select(CalendarEntryPost).where( CalendarEntryPost.entry_id == entry_id, CalendarEntryPost.content_type == content_type, CalendarEntryPost.content_id == content_id, CalendarEntryPost.deleted_at.is_(None), ) ) if existing: existing.deleted_at = func.now() await session.flush() return False else: assoc = CalendarEntryPost( entry_id=entry_id, content_type=content_type, content_id=content_id, ) session.add(assoc) await session.flush() return True async def get_entries_for_order( self, session: AsyncSession, order_id: int, ) -> list[CalendarEntryDTO]: result = await session.execute( select(CalendarEntry) .where( CalendarEntry.order_id == order_id, CalendarEntry.deleted_at.is_(None), ) .options(selectinload(CalendarEntry.calendar)) ) return [_entry_to_dto(e) for e in result.scalars().all()] async def user_tickets( self, session: AsyncSession, *, user_id: int, ) -> list[TicketDTO]: result = await session.execute( select(Ticket) .where( Ticket.user_id == user_id, Ticket.state != "cancelled", ) .order_by(Ticket.created_at.desc()) .options( selectinload(Ticket.entry).selectinload(CalendarEntry.calendar), selectinload(Ticket.ticket_type), ) ) return [_ticket_to_dto(t) for t in result.scalars().all()] async def user_bookings( self, session: AsyncSession, *, user_id: int, ) -> list[CalendarEntryDTO]: result = await session.execute( select(CalendarEntry) .where( CalendarEntry.user_id == user_id, CalendarEntry.deleted_at.is_(None), CalendarEntry.state.in_(("ordered", "provisional", "confirmed")), ) .order_by(CalendarEntry.start_at.desc()) .options(selectinload(CalendarEntry.calendar)) ) return [_entry_to_dto(e) for e in result.scalars().all()] # -- batch reads (not in protocol — convenience for blog service) --------- async def confirmed_entries_for_posts( self, session: AsyncSession, post_ids: list[int], ) -> dict[int, list[CalendarEntryDTO]]: """Return confirmed entries grouped by post_id for a batch of posts.""" if not post_ids: return {} result = await session.execute( select(CalendarEntry, CalendarEntryPost.content_id) .join(CalendarEntryPost, CalendarEntry.id == CalendarEntryPost.entry_id) .options(selectinload(CalendarEntry.calendar)) .where( CalendarEntryPost.content_type == "post", CalendarEntryPost.content_id.in_(post_ids), CalendarEntryPost.deleted_at.is_(None), CalendarEntry.deleted_at.is_(None), CalendarEntry.state == "confirmed", ) .order_by(CalendarEntry.start_at.asc()) ) entries_by_post: dict[int, list[CalendarEntryDTO]] = {} for entry, post_id in result: entries_by_post.setdefault(post_id, []).append(_entry_to_dto(entry)) return entries_by_post # -- writes --------------------------------------------------------------- async def adopt_entries_for_user( self, session: AsyncSession, user_id: int, session_id: str, ) -> None: """Adopt anonymous calendar entries for a logged-in user. Only deletes stale *pending* entries for the user — confirmed/ordered entries must be preserved. """ await session.execute( update(CalendarEntry) .where( CalendarEntry.deleted_at.is_(None), CalendarEntry.user_id == user_id, CalendarEntry.state == "pending", ) .values(deleted_at=func.now()) ) cal_result = await session.execute( select(CalendarEntry).where( CalendarEntry.deleted_at.is_(None), CalendarEntry.session_id == session_id, ) ) for entry in cal_result.scalars().all(): entry.user_id = user_id async def claim_entries_for_order( self, session: AsyncSession, order_id: int, user_id: int | None, session_id: str | None, page_post_id: int | None, ) -> None: """Mark pending CalendarEntries as 'ordered' and set order_id.""" filters = [ CalendarEntry.deleted_at.is_(None), CalendarEntry.state == "pending", ] if user_id is not None: filters.append(CalendarEntry.user_id == user_id) elif session_id is not None: filters.append(CalendarEntry.session_id == session_id) if page_post_id is not None: cal_ids = select(Calendar.id).where( Calendar.container_type == "page", Calendar.container_id == page_post_id, Calendar.deleted_at.is_(None), ).scalar_subquery() filters.append(CalendarEntry.calendar_id.in_(cal_ids)) await session.execute( update(CalendarEntry) .where(*filters) .values(state="ordered", order_id=order_id) ) async def confirm_entries_for_order( self, session: AsyncSession, order_id: int, user_id: int | None, session_id: str | None, ) -> None: """Mark ordered CalendarEntries as 'provisional'.""" filters = [ CalendarEntry.deleted_at.is_(None), CalendarEntry.state == "ordered", CalendarEntry.order_id == order_id, ] if user_id is not None: filters.append(CalendarEntry.user_id == user_id) elif session_id is not None: filters.append(CalendarEntry.session_id == session_id) await session.execute( update(CalendarEntry) .where(*filters) .values(state="provisional") ) # -- ticket methods ------------------------------------------------------- def _ticket_query_options(self): return [ selectinload(Ticket.entry).selectinload(CalendarEntry.calendar), selectinload(Ticket.ticket_type), ] async def pending_tickets( self, session: AsyncSession, *, user_id: int | None, session_id: str | None, ) -> list[TicketDTO]: """Reserved tickets for the given identity (cart line items).""" filters = [Ticket.state == "reserved"] if user_id is not None: filters.append(Ticket.user_id == user_id) elif session_id is not None: filters.append(Ticket.session_id == session_id) else: return [] result = await session.execute( select(Ticket) .where(*filters) .order_by(Ticket.created_at.asc()) .options(*self._ticket_query_options()) ) return [_ticket_to_dto(t) for t in result.scalars().all()] async def tickets_for_page( self, session: AsyncSession, page_id: int, *, user_id: int | None, session_id: str | None, ) -> list[TicketDTO]: """Reserved tickets scoped to a page (via entry → calendar → container_id).""" cal_ids = select(Calendar.id).where( Calendar.container_type == "page", Calendar.container_id == page_id, Calendar.deleted_at.is_(None), ).scalar_subquery() entry_ids = select(CalendarEntry.id).where( CalendarEntry.calendar_id.in_(cal_ids), CalendarEntry.deleted_at.is_(None), ).scalar_subquery() filters = [ Ticket.state == "reserved", Ticket.entry_id.in_(entry_ids), ] if user_id is not None: filters.append(Ticket.user_id == user_id) elif session_id is not None: filters.append(Ticket.session_id == session_id) else: return [] result = await session.execute( select(Ticket) .where(*filters) .order_by(Ticket.created_at.asc()) .options(*self._ticket_query_options()) ) return [_ticket_to_dto(t) for t in result.scalars().all()] async def claim_tickets_for_order( self, session: AsyncSession, order_id: int, user_id: int | None, session_id: str | None, page_post_id: int | None, ) -> None: """Set order_id on reserved tickets at checkout.""" filters = [Ticket.state == "reserved"] if user_id is not None: filters.append(Ticket.user_id == user_id) elif session_id is not None: filters.append(Ticket.session_id == session_id) if page_post_id is not None: cal_ids = select(Calendar.id).where( Calendar.container_type == "page", Calendar.container_id == page_post_id, Calendar.deleted_at.is_(None), ).scalar_subquery() entry_ids = select(CalendarEntry.id).where( CalendarEntry.calendar_id.in_(cal_ids), CalendarEntry.deleted_at.is_(None), ).scalar_subquery() filters.append(Ticket.entry_id.in_(entry_ids)) await session.execute( update(Ticket).where(*filters).values(order_id=order_id) ) async def confirm_tickets_for_order( self, session: AsyncSession, order_id: int, ) -> None: """Reserved → confirmed on payment.""" await session.execute( update(Ticket) .where(Ticket.order_id == order_id, Ticket.state == "reserved") .values(state="confirmed") ) async def get_tickets_for_order( self, session: AsyncSession, order_id: int, ) -> list[TicketDTO]: """Tickets for a given order (checkout return display).""" result = await session.execute( select(Ticket) .where(Ticket.order_id == order_id) .order_by(Ticket.created_at.asc()) .options(*self._ticket_query_options()) ) return [_ticket_to_dto(t) for t in result.scalars().all()] async def adopt_tickets_for_user( self, session: AsyncSession, user_id: int, session_id: str, ) -> None: """Migrate anonymous reserved tickets to user on login.""" result = await session.execute( select(Ticket).where( Ticket.session_id == session_id, Ticket.state == "reserved", ) ) for ticket in result.scalars().all(): ticket.user_id = user_id async def adjust_ticket_quantity( self, session: AsyncSession, entry_id: int, count: int, *, user_id: int | None, session_id: str | None, ticket_type_id: int | None = None, ) -> int: """Adjust reserved ticket count to target. Returns new count.""" import uuid count = max(count, 0) # Current reserved count filters = [ Ticket.entry_id == entry_id, Ticket.state == "reserved", ] if user_id is not None: filters.append(Ticket.user_id == user_id) elif session_id is not None: filters.append(Ticket.session_id == session_id) else: return 0 if ticket_type_id is not None: filters.append(Ticket.ticket_type_id == ticket_type_id) current = await session.scalar( select(func.count(Ticket.id)).where(*filters) ) or 0 if count > current: # Create tickets for _ in range(count - current): ticket = Ticket( entry_id=entry_id, ticket_type_id=ticket_type_id, user_id=user_id, session_id=session_id, code=uuid.uuid4().hex, state="reserved", ) session.add(ticket) await session.flush() elif count < current: # Cancel newest tickets to_cancel = current - count result = await session.execute( select(Ticket) .where(*filters) .order_by(Ticket.created_at.desc()) .limit(to_cancel) ) for ticket in result.scalars().all(): ticket.state = "cancelled" await session.flush() return count