Some checks failed
Build and Deploy / build-and-deploy (push) Has been cancelled
Extract events/calendar functionality into standalone microservice: - app.py and events_api.py from apps/events/ - Calendar blueprints (calendars, calendar, calendar_entries, calendar_entry, day, slots, slot, ticket_types, ticket_type) - Templates for all calendar/event views including admin - Dockerfile (APP_MODULE=app:app, IMAGE=events) - entrypoint.sh (no Alembic - migrations managed by blog app) - Gitea CI workflow for build and deploy Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
65 lines
1.6 KiB
Python
65 lines
1.6 KiB
Python
|
|
from __future__ import annotations
|
|
from datetime import time
|
|
from typing import Sequence
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from models.calendars import CalendarSlot
|
|
|
|
class SlotError(ValueError):
|
|
pass
|
|
|
|
def _b(v):
|
|
if isinstance(v, bool):
|
|
return v
|
|
s = str(v).lower()
|
|
return s in {"1","true","t","yes","y","on"}
|
|
|
|
async def list_slots(sess: AsyncSession, calendar_id: int) -> Sequence[CalendarSlot]:
|
|
res = await sess.execute(
|
|
select(CalendarSlot)
|
|
.where(CalendarSlot.calendar_id == calendar_id, CalendarSlot.deleted_at.is_(None))
|
|
.order_by(CalendarSlot.time_start.asc(), CalendarSlot.id.asc())
|
|
)
|
|
return res.scalars().all()
|
|
|
|
async def create_slot(
|
|
sess: AsyncSession,
|
|
calendar_id: int,
|
|
*,
|
|
name: str,
|
|
description: str | None,
|
|
days: dict,
|
|
time_start: time,
|
|
time_end: time,
|
|
cost: float | None,
|
|
flexible: bool = False, # NEW
|
|
):
|
|
if not name:
|
|
raise SlotError("name is required")
|
|
|
|
if not time_start or not time_end or time_end <= time_start:
|
|
raise SlotError("time range invalid")
|
|
|
|
slot = CalendarSlot(
|
|
calendar_id=calendar_id,
|
|
name=name,
|
|
description=(description or None),
|
|
mon=_b(days.get("mon")),
|
|
tue=_b(days.get("tue")),
|
|
wed=_b(days.get("wed")),
|
|
thu=_b(days.get("thu")),
|
|
fri=_b(days.get("fri")),
|
|
sat=_b(days.get("sat")),
|
|
sun=_b(days.get("sun")),
|
|
time_start=time_start,
|
|
time_end=time_end,
|
|
cost=cost,
|
|
flexible=flexible, # NEW
|
|
)
|
|
sess.add(slot)
|
|
await sess.flush()
|
|
return slot
|