This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
events/bp/slots/services/slots.py
giles 3c0fa45f8c
Some checks failed
Build and Deploy / build-and-deploy (push) Has been cancelled
feat: initialize events app with calendars, slots, tickets, and internal API
Extract events/calendar functionality into standalone microservice:
- app.py and events_api.py from apps/events/
- Calendar blueprints (calendars, calendar, calendar_entries, calendar_entry, day, slots, slot, ticket_types, ticket_type)
- Templates for all calendar/event views including admin
- Dockerfile (APP_MODULE=app:app, IMAGE=events)
- entrypoint.sh (no Alembic - migrations managed by blog app)
- Gitea CI workflow for build and deploy

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 23:16:32 +00:00

65 lines
1.6 KiB
Python

from __future__ import annotations
from datetime import time
from typing import Sequence
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from models.calendars import CalendarSlot
class SlotError(ValueError):
pass
def _b(v):
if isinstance(v, bool):
return v
s = str(v).lower()
return s in {"1","true","t","yes","y","on"}
async def list_slots(sess: AsyncSession, calendar_id: int) -> Sequence[CalendarSlot]:
res = await sess.execute(
select(CalendarSlot)
.where(CalendarSlot.calendar_id == calendar_id, CalendarSlot.deleted_at.is_(None))
.order_by(CalendarSlot.time_start.asc(), CalendarSlot.id.asc())
)
return res.scalars().all()
async def create_slot(
sess: AsyncSession,
calendar_id: int,
*,
name: str,
description: str | None,
days: dict,
time_start: time,
time_end: time,
cost: float | None,
flexible: bool = False, # NEW
):
if not name:
raise SlotError("name is required")
if not time_start or not time_end or time_end <= time_start:
raise SlotError("time range invalid")
slot = CalendarSlot(
calendar_id=calendar_id,
name=name,
description=(description or None),
mon=_b(days.get("mon")),
tue=_b(days.get("tue")),
wed=_b(days.get("wed")),
thu=_b(days.get("thu")),
fri=_b(days.get("fri")),
sat=_b(days.get("sat")),
sun=_b(days.get("sun")),
time_start=time_start,
time_end=time_end,
cost=cost,
flexible=flexible, # NEW
)
sess.add(slot)
await sess.flush()
return slot