This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
events/bp/tickets/services/tickets.py
giles 154f968296 feat: decouple events from shared_lib, add app-owned models
Phase 1-3 of decoupling:
- path_setup.py adds project root to sys.path
- Events-owned models in events/models/ (calendars with all related models)
- All imports updated: shared.infrastructure, shared.db, shared.browser, etc.
- Calendar uses container_type/container_id instead of post_id FK
- CalendarEntryPost uses content_type/content_id (generic content refs)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 12:46:36 +00:00

240 lines
6.5 KiB
Python

"""
Ticket service layer — create, query, and manage tickets.
"""
from __future__ import annotations
import uuid
from decimal import Decimal
from typing import Optional
from sqlalchemy import select, update, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from models.calendars import Ticket, TicketType, CalendarEntry
async def create_ticket(
session: AsyncSession,
*,
entry_id: int,
ticket_type_id: Optional[int] = None,
user_id: Optional[int] = None,
session_id: Optional[str] = None,
order_id: Optional[int] = None,
state: str = "reserved",
) -> Ticket:
"""Create a single ticket with a unique code."""
ticket = Ticket(
entry_id=entry_id,
ticket_type_id=ticket_type_id,
user_id=user_id,
session_id=session_id,
order_id=order_id,
code=uuid.uuid4().hex,
state=state,
)
session.add(ticket)
await session.flush()
return ticket
async def create_tickets_for_order(
session: AsyncSession,
order_id: int,
user_id: Optional[int],
session_id: Optional[str],
) -> list[Ticket]:
"""
Create ticket records for all calendar entries in an order
that have ticket_price configured.
Called during checkout after calendar entries are transitioned to 'ordered'.
"""
# Find all ordered entries for this order that have ticket pricing
result = await session.execute(
select(CalendarEntry)
.where(
CalendarEntry.order_id == order_id,
CalendarEntry.deleted_at.is_(None),
CalendarEntry.ticket_price.isnot(None),
)
.options(selectinload(CalendarEntry.ticket_types))
)
entries = result.scalars().all()
tickets = []
for entry in entries:
if entry.ticket_types:
# Entry has specific ticket types — create one ticket per type
# (quantity handling can be added later)
for tt in entry.ticket_types:
if tt.deleted_at is None:
ticket = await create_ticket(
session,
entry_id=entry.id,
ticket_type_id=tt.id,
user_id=user_id,
session_id=session_id,
order_id=order_id,
state="reserved",
)
tickets.append(ticket)
else:
# Simple ticket — one per entry
ticket = await create_ticket(
session,
entry_id=entry.id,
user_id=user_id,
session_id=session_id,
order_id=order_id,
state="reserved",
)
tickets.append(ticket)
return tickets
async def confirm_tickets_for_order(
session: AsyncSession,
order_id: int,
) -> int:
"""
Transition tickets from reserved → confirmed when payment succeeds.
Returns the number of tickets confirmed.
"""
result = await session.execute(
update(Ticket)
.where(
Ticket.order_id == order_id,
Ticket.state == "reserved",
)
.values(state="confirmed")
)
return result.rowcount
async def get_ticket_by_code(
session: AsyncSession,
code: str,
) -> Optional[Ticket]:
"""Look up a ticket by its unique code."""
result = await session.execute(
select(Ticket)
.where(Ticket.code == code)
.options(
selectinload(Ticket.entry).selectinload(CalendarEntry.calendar),
selectinload(Ticket.ticket_type),
)
)
return result.scalar_one_or_none()
async def get_user_tickets(
session: AsyncSession,
user_id: Optional[int] = None,
session_id: Optional[str] = None,
state: Optional[str] = None,
) -> list[Ticket]:
"""Get all tickets for a user or session."""
filters = []
if user_id is not None:
filters.append(Ticket.user_id == user_id)
elif session_id is not None:
filters.append(Ticket.session_id == session_id)
else:
return []
if state:
filters.append(Ticket.state == state)
else:
# Exclude cancelled by default
filters.append(Ticket.state != "cancelled")
result = await session.execute(
select(Ticket)
.where(*filters)
.options(
selectinload(Ticket.entry).selectinload(CalendarEntry.calendar),
selectinload(Ticket.ticket_type),
)
.order_by(Ticket.created_at.desc())
)
return result.scalars().all()
async def get_tickets_for_entry(
session: AsyncSession,
entry_id: int,
) -> list[Ticket]:
"""Get all non-cancelled tickets for a calendar entry."""
result = await session.execute(
select(Ticket)
.where(
Ticket.entry_id == entry_id,
Ticket.state != "cancelled",
)
.options(
selectinload(Ticket.ticket_type),
)
.order_by(Ticket.created_at.asc())
)
return result.scalars().all()
async def get_available_ticket_count(
session: AsyncSession,
entry_id: int,
) -> Optional[int]:
"""
Get number of remaining tickets for an entry.
Returns None if unlimited.
"""
entry = await session.scalar(
select(CalendarEntry).where(
CalendarEntry.id == entry_id,
CalendarEntry.deleted_at.is_(None),
)
)
if not entry or entry.ticket_price is None:
return None
if entry.ticket_count is None:
return None # Unlimited
# Count non-cancelled tickets
sold = await session.scalar(
select(func.count(Ticket.id)).where(
Ticket.entry_id == entry_id,
Ticket.state != "cancelled",
)
)
return max(0, entry.ticket_count - (sold or 0))
async def checkin_ticket(
session: AsyncSession,
code: str,
) -> tuple[bool, Optional[str]]:
"""
Check in a ticket by its code.
Returns (success, error_message).
"""
from datetime import datetime, timezone
ticket = await get_ticket_by_code(session, code)
if not ticket:
return False, "Ticket not found"
if ticket.state == "checked_in":
return False, "Ticket already checked in"
if ticket.state == "cancelled":
return False, "Ticket is cancelled"
if ticket.state not in ("confirmed", "reserved"):
return False, f"Ticket in unexpected state: {ticket.state}"
ticket.state = "checked_in"
ticket.checked_in_at = datetime.now(timezone.utc)
return True, None